Commit 08624eaf authored by Grégory Mantelet's avatar Grégory Mantelet
Browse files

[TAP] Add possibility to set the execution duration of synchronous queries.

Fixes #78
Fixes #60
parent ccd563d2
Loading
Loading
Loading
Loading
+273 −162

File changed.

Preview size limit exceeded, changes collapsed.

+207 −92
Original line number Diff line number Diff line
@@ -16,7 +16,7 @@ package tap;
 * You should have received a copy of the GNU Lesser General Public License
 * along with TAPLibrary.  If not, see <http://www.gnu.org/licenses/>.
 *
 * Copyright 2012-2017 - UDS/Centre de Données astronomiques de Strasbourg (CDS),
 * Copyright 2012-2020 - UDS/Centre de Données astronomiques de Strasbourg (CDS),
 *                       Astronomisches Rechen Institut (ARI)
 */

@@ -26,34 +26,48 @@ import java.util.Iterator;

import javax.servlet.http.HttpServletResponse;

import tap.parameters.TAPExecutionDurationController;
import tap.parameters.TAPParameters;
import uws.UWSException;
import uws.job.JobThread;
import uws.job.UWSJob;
import uws.service.log.UWSLog.LogLevel;
import uws.service.request.UploadFile;

/**
 * <p>This class represent a TAP synchronous job.
 * A such job must execute an ADQL query and return immediately its result.</p>
 * This class represent a TAP synchronous job. A such job must execute an ADQL
 * query and return immediately its result.
 *
 * <h3>Timeout</h3>
 *
 * <p>
 * 	The execution of a such job is limited to a short time. Once this time elapsed, the job is stopped.
 * 	For a longer job, an asynchronous job should be used.
 * 	The execution of a such job is limited to a short time. Once this time is
 * 	elapsed, the job is stopped. For a longer job, an asynchronous job should be
 * 	used.
 * </p>
 *
 * <p>
 * 	The maximum execution duration of a synchronous job is determined by
 * 	{@link #determineMaxExecutionDuration()}.
 * </p>
 *
 * <h3>Error management</h3>
 *
 * <p>
 * 	If an error occurs it must be propagated ; it will be written later in the HTTP response on a top level.
 * 	If an error occurs it must be propagated ; it will be written later in the
 * 	HTTP response on a top level.
 * </p>
 *
 * @author Gr&eacute;gory Mantelet (CDS;ARI)
 * @version 2.1 (03/2017)
 * @version 2.4 (08/2020)
 */
public class TAPSyncJob {

	/** Ultimate execution duration (in milliseconds) to use if not a single
	 * alternative for this duration can be found.
	 * @since 2.4 */
	protected final long MAX_DURATION_FALLBACK = 10000;

	/** The time (in ms) to wait the end of the thread after an interruption. */
	protected long waitForStop = 1000;

@@ -66,7 +80,8 @@ public class TAPSyncJob {
	/** ID of this job. This ID is also used to identify the thread. */
	protected final String ID;

	/** Parameters of the execution. It mainly contains the ADQL query to execute. */
	/** Parameters of the execution. It mainly contains the ADQL query to
	 * execute. */
	protected final TAPParameters tapParams;

	/** The thread in which the query execution will be done. */
@@ -75,17 +90,23 @@ public class TAPSyncJob {
	/** Report of the query execution. It stays NULL until the execution ends. */
	protected TAPExecutionReport execReport = null;

	/** Date at which this synchronous job has really started. It is NULL when the job has never been started.
	/** Date at which this synchronous job has really started. It is NULL when
	 * the job has never been started.
	 *
	 * <p><i>Note: A synchronous job can be run just once ; so if an attempt of executing it again, the start date will be tested:
	 * if NULL, the second starting is not considered and an exception is thrown.</i></p> */
	 * <p><i><b>Note:</b>
	 * 	A synchronous job can be run just once ; so if an attempt of executing
	 * 	it again, the start date will be tested: if NULL, the second starting is
	 * 	not considered and an exception is thrown.
	 * </i></p> */
	private Date startedAt = null;

	/**
	 * Create a synchronous TAP job.
	 *
	 * @param service	Description of the TAP service which is in charge of this synchronous job.
	 * @param params	Parameters of the query to execute. It must mainly contain the ADQL query to execute.
	 * @param service	Description of the TAP service which is in charge of
	 *               	this synchronous job.
	 * @param params	Parameters of the query to execute. It must mainly
	 *              	contain the ADQL query to execute.
	 *
	 * @throws NullPointerException	If one of the parameters is NULL.
	 */
@@ -104,12 +125,17 @@ public class TAPSyncJob {

	/**
	 * Create a synchronous TAP job.
	 * The given HTTP request ID will be used as Job ID if not already used by another job.
	 *
	 * @param service	Description of the TAP service which is in charge of this synchronous job.
	 * @param params	Parameters of the query to execute. It must mainly contain the ADQL query to execute.
	 * @param requestID	ID of the HTTP request which has initiated the creation of this job.
	 *                 	<i>Note: if NULL, empty or already used, a job ID will be generated thanks to {@link #generateId()}.</i>
	 * The given HTTP request ID will be used as Job ID if not already used by
	 * another job.
	 *
	 * @param service	Description of the TAP service which is in charge of
	 *               	this synchronous job.
	 * @param params	Parameters of the query to execute. It must mainly
	 *              	contain the ADQL query to execute.
	 * @param requestID	ID of the HTTP request which has initiated the creation
	 *                 	of this job.
	 *                 	<i>Note: if NULL, empty or already used, a job ID will
	 *                 	be generated thanks to {@link #generateId()}.</i>
	 *
	 * @throws NullPointerException	If one of the 2 first parameters is NULL.
	 *
@@ -136,13 +162,21 @@ public class TAPSyncJob {
	}

	/**
	 * <p>This function lets generating a unique ID.</p>
	 *
	 * <p><i><b>By default:</b> "S"+System.currentTimeMillis()+UpperCharacter (UpperCharacter: one upper-case character: A, B, C, ....)</i></p>
	 *
	 * <p><i><u>note: </u> DO NOT USE in this function any of the following functions: {@link ServiceConnection#getLogger()},
	 * {@link ServiceConnection#getFileManager()} and {@link ServiceConnection#getFactory()}. All of them will return NULL, because this job does not
	 * yet know its jobs list (which is needed to know the UWS and so, all of the objects returned by these functions).</i></p>
	 * This function lets generating a unique ID.
	 *
	 * <p><i><b>By default:</b>
	 * 	"S"+System.currentTimeMillis()+UpperCharacter (UpperCharacter:
	 * 	one upper-case character: A, B, C, ....)
	 * </i></p>
	 *
	 * <p><i><b>Note: </b>
	 * 	DO NOT USE in this function any of the following functions:
	 * 	{@link ServiceConnection#getLogger()},
	 * 	{@link ServiceConnection#getFileManager()} and
	 * 	{@link ServiceConnection#getFactory()}. All of them will return NULL,
	 * 	because this job does not yet know its jobs list (which is needed to
	 * 	know the UWS and so, all of the objects returned by these functions).
	 * </i></p>
	 *
	 * @return	A unique job identifier.
	 */
@@ -168,7 +202,8 @@ public class TAPSyncJob {
	}

	/**
	 * Get the TAP parameters provided by the user and which will be used for the execution of this job.
	 * Get the TAP parameters provided by the user and which will be used for
	 * the execution of this job.
	 *
	 * @return	Job parameters.
	 */
@@ -187,21 +222,30 @@ public class TAPSyncJob {
	}

	/**
	 * <p>Start the execution of this job in order to execute the given ADQL query.</p>
	 * Start the execution of this job in order to execute the given ADQL query.
	 *
	 * <p>The execution itself will be processed by an {@link ADQLExecutor} inside a thread ({@link SyncThread}).</p>
	 *
	 * <p><b>Important:</b>
	 * 	No error should be written in this function. If any error occurs it should be thrown, in order to be manager on a top level.
	 * <p>
	 * 	The execution itself will be processed by an {@link ADQLExecutor} inside
	 * 	a thread ({@link SyncThread}).
	 * </p>
	 *
	 * <p><i><b>Important:</b>
	 * 	No error should be written in this function. If any error occurs it
	 * 	should be thrown, in order to be manager on a top level.
	 * </i></p>
	 *
	 * @param response	Response in which the result must be written.
	 *
	 * @return	<i>true</i> if the execution was successful, <i>false</i> otherwise.
	 * @return	<code>true</code> if the execution was successful,
	 *        	<code>false</code> otherwise.
	 *
	 * @throws IllegalStateException	If this synchronous job has already been started before.
	 * @throws IOException				If any error occurs while writing the query result in the given {@link HttpServletResponse}.
	 * @throws TAPException				If any error occurs while executing the ADQL query.
	 * @throws IllegalStateException	If this synchronous job has already been
	 *                              	started before.
	 * @throws IOException				If any error occurs while writing the
	 *                    				query result in the given
	 *                    				{@link HttpServletResponse}.
	 * @throws TAPException				If any error occurs while executing the
	 *                     				ADQL query.
	 *
	 * @see SyncThread
	 */
@@ -222,6 +266,9 @@ public class TAPSyncJob {
			throw new TAPException("TAP service too busy! No connection available for the moment. You should try later or create an asynchronous query (which will be executed when enough resources will be available again).", UWSException.SERVICE_UNAVAILABLE);
		}

		// Determine the maximum execution duration (in milliseconds):
		final long timeToStop = determineMaxExecutionDuration();

		// Give to a thread which will execute the query:
		thread = new SyncThread(executor, ID, tapParams, response);
		thread.start();
@@ -230,12 +277,17 @@ public class TAPSyncJob {
		boolean timeout = false;
		try {
			// wait the end:
			thread.join(tapParams.getExecutionDuration() * 1000);
			thread.join(timeToStop);
			// if still alive after this duration, interrupt it:
			if (thread.isAlive()) {
				timeout = true;
				thread.interrupt();
				thread.join(waitForStop);
				// Log the timeout:
				if (thread.isAlive())
					service.getLogger().logTAP(LogLevel.WARNING, this, "TIME_OUT", "Time out (after " + (timeToStop / 1000) + " seconds) for the synchonous job " + ID + ", but the thread can not be interrupted!", null);
				else
					service.getLogger().logTAP(LogLevel.INFO, this, "TIME_OUT", "Time out (after " + (timeToStop / 1000) + " seconds) for the synchonous job " + ID + ".", null);
			}
		} catch(InterruptedException ie) {
			/* Having a such exception here, is not surprising, because we may have interrupted the thread! */
@@ -251,12 +303,6 @@ public class TAPSyncJob {
		Throwable error = thread.getError();
		// CASE: TIMEOUT
		if (timeout && error != null && error instanceof InterruptedException) {
			// Log the timeout:
			if (thread.isAlive())
				service.getLogger().logTAP(LogLevel.WARNING, this, "TIME_OUT", "Time out (after " + tapParams.getExecutionDuration() + " seconds) for the synchonous job " + ID + ", but the thread can not be interrupted!", null);
			else
				service.getLogger().logTAP(LogLevel.INFO, this, "TIME_OUT", "Time out (after " + tapParams.getExecutionDuration() + " seconds) for the synchonous job " + ID + ".", null);

			// Report the timeout to the user:
			throw new TAPException("Time out! The execution of this synchronous TAP query was limited to " + tapParams.getExecutionDuration() + " seconds. You should try again but in asynchronous mode.", UWSException.ACCEPTED_BUT_NOT_COMPLETE);
		}
@@ -301,6 +347,60 @@ public class TAPSyncJob {
		return thread.isSuccess();
	}

	/**
	 * Determine the maximum execution duration of this synchronous query.
	 *
	 * <p>By default, this function use the following strategy:</p>
	 * <ul>
	 * 	<li>if set, use the synchronous duration specified in the TAP configuration
	 * 	    (i.e. {@link ServiceConnection#getExecutionDuration()}[2])</li>
	 * 	<li>if none is specified, then use the default execution duration
	 * 	    (i.e. {@link ServiceConnection#getExecutionDuration()}[0])</li>
	 * 	<li>if none is specified either, use the maximum execution duration
	 * 	    (i.e. {@link ServiceConnection#getExecutionDuration()}[1])</li>
	 * 	<li>if still none is specified, try to see if an execution duration is
	 * 	    provided in the HTTP request (using the corresponding UWS' parameter)
	 * 	    and use it</li>
	 * 	<li>in last chance, the execution is set to 60 seconds.</li>
	 * </ul>
	 * <p><i>
	 * 	This default strategy aims to avoid an unlimited execution duration in
	 * 	synchronous mode.
	 * </i></p>
	 *
	 * @return	The maximum execution duration of this synchronous query
	 *        	(in milliseconds) or {@link UWSJob#UNLIMITED_DURATION} for no
	 *        	limit at all.
	 *
	 * @since 2.4
	 */
	protected long determineMaxExecutionDuration() {

		long timeToStop = TAPJob.UNLIMITED_DURATION;

		// Try to use the durations set in the TAP configuration:
		if (service.getExecutionDuration() != null) {
			// use the synchronous execution duration (if any specified):
			if (service.getExecutionDuration().length >= 3 && service.getExecutionDuration()[2] > 0)
				timeToStop = service.getExecutionDuration()[2];
			// otherwise, just use the default value:
			else
				timeToStop = ((Long)(new TAPExecutionDurationController(service)).getDefault()) * 1000;
		}

		/* If the duration is still unlimited, try to see if a duration is
		 * given in the HTTP request (in the UWS way) and use it: */
		if (timeToStop <= 0)
			timeToStop = tapParams.getExecutionDuration() * 1000;

		/* In order to prevent an unlimited execution duration in synchronous
		 * mode (which should not happen), set a hard coded limit (60 seconds): */
		if (timeToStop <= 0)
			timeToStop = MAX_DURATION_FALLBACK;

		return timeToStop;
	}

	/**
	 * Delete all uploaded files.
	 *
@@ -321,11 +421,13 @@ public class TAPSyncJob {
	}

	/**
	 * <p>Thread which will process the job execution.</p>
	 * Thread which will process the job execution.
	 *
	 * <p>
	 * 	Actually, it will basically just call {@link ADQLExecutor#start(Thread, String, TAPParameters, HttpServletResponse)}
	 * 	with the given {@link ADQLExecutor} and TAP parameters (containing the ADQL query to execute).
	 * 	Actually, it will basically just call
	 * 	{@link ADQLExecutor#start(Thread, String, TAPParameters, HttpServletResponse)}
	 * 	with the given {@link ADQLExecutor} and TAP parameters (containing the
	 * 	ADQL query to execute).
	 * </p>
	 *
	 * @author Gr&eacute;gory Mantelet (CDS;ARI)
@@ -333,28 +435,38 @@ public class TAPSyncJob {
	 */
	protected class SyncThread extends Thread {

		/** Object knowing how to execute an ADQL query and which will execute it by calling {@link ADQLExecutor#start(Thread, String, TAPParameters, HttpServletResponse)}. */
		/** Object knowing how to execute an ADQL query and which will execute
		 * it by calling {@link ADQLExecutor#start(Thread, String, TAPParameters, HttpServletResponse)}. */
		protected final ADQLExecutor executor;
		/** Response in which the query result must be written. No error should be written in it directly at this level ;
		 * the error must be propagated and it will be written in this HTTP response later on a top level. */
		/** Response in which the query result must be written. No error should
		 * be written in it directly at this level ; the error must be
		 * propagated and it will be written in this HTTP response later on a
		 * top level. */
		protected final HttpServletResponse response;
		/** ID of this thread. It is also the ID of the synchronous job owning this thread. */
		/** ID of this thread. It is also the ID of the synchronous job owning
		 * this thread. */
		protected final String ID;
		/** Parameters containing the ADQL query to execute and other execution parameters/options. */
		/** Parameters containing the ADQL query to execute and other execution
		 * parameters/options. */
		protected final TAPParameters tapParams;

		/** Exception that occurs while executing this thread. NULL if the execution was a success. */
		/** Exception that occurs while executing this thread. NULL if the
		 * execution was a success. */
		protected Throwable exception = null;
		/** Query execution report. NULL if the execution has not yet started. */
		protected TAPExecutionReport report = null;

		/**
		 * Create a thread that will run the given executor with the given parameters.
		 * Create a thread that will run the given executor with the given
		 * parameters.
		 *
		 * @param executor	Object to execute and which knows how to execute an ADQL query.
		 * @param executor	Object to execute and which knows how to execute an
		 *                	ADQL query.
		 * @param ID		ID of the synchronous job owning this thread.
		 * @param tapParams	TAP parameters to use to get the query to execute and the execution parameters.
		 * @param response	HTTP response in which the ADQL query result must be written.
		 * @param tapParams	TAP parameters to use to get the query to execute
		 *                	and the execution parameters.
		 * @param response	HTTP response in which the ADQL query result must be
		 *                	written.
		 */
		public SyncThread(final ADQLExecutor executor, final String ID, final TAPParameters tapParams, final HttpServletResponse response) {
			super(JobThread.tg, ID);
@@ -367,8 +479,10 @@ public class TAPSyncJob {
		/**
		 * Tell whether the execution has ended with success.
		 *
		 * @return	<i>true</i> if the query has been successfully executed,
		 *        	<i>false</i> otherwise (or if this thread is still executed).
		 * @return	<code>true</code> if the query has been successfully
		 *        	executed,
		 *        	<code>false</code> otherwise (or if this thread is still
		 *        	executed).
		 */
		public final boolean isSuccess() {
			return !isAlive() && report != null && exception == null;
@@ -376,7 +490,8 @@ public class TAPSyncJob {

		/**
		 * Get the error that has interrupted/stopped this thread.
		 * This function returns NULL if the query has been successfully executed.
		 * This function returns NULL if the query has been successfully
		 * executed.
		 *
		 * @return	Error that occurs while executing the query
		 *        	or NULL if the execution was a success.
+25 −6
Original line number Diff line number Diff line
@@ -56,6 +56,7 @@ import static tap.config.TAPConfiguration.KEY_MIN_LOG_LEVEL;
import static tap.config.TAPConfiguration.KEY_OUTPUT_FORMATS;
import static tap.config.TAPConfiguration.KEY_PROVIDER_NAME;
import static tap.config.TAPConfiguration.KEY_SERVICE_DESCRIPTION;
import static tap.config.TAPConfiguration.KEY_SYNC_EXECUTION_DURATION;
import static tap.config.TAPConfiguration.KEY_SYNC_FETCH_SIZE;
import static tap.config.TAPConfiguration.KEY_TAP_FACTORY;
import static tap.config.TAPConfiguration.KEY_UDFS;
@@ -170,9 +171,9 @@ public final class ConfigurableServiceConnection implements ServiceConnection {
	/** Maximum number of asynchronous jobs that can run simultaneously. */
	private int maxAsyncJobs = DEFAULT_MAX_ASYNC_JOBS;

	/** Array of 2 integers: resp. default and maximum execution duration.
	 * <em>Both duration are expressed in milliseconds.</em> */
	private int[] executionDuration = new int[2];
	/** Array of 3 integers: resp. default, maximum and sync. execution
	 * durations. <em>All durations are expressed in milliseconds.</em> */
	private int[] executionDuration = new int[3];
	/** Array of 2 integers: resp. default and maximum retention period.
	 * <em>Both period are expressed in seconds.</em> */
	private int[] retentionPeriod = new int[2];
@@ -682,7 +683,7 @@ public final class ConfigurableServiceConnection implements ServiceConnection {
	 * @throws TAPException	If the corresponding TAP configuration properties are wrong.
	 */
	private void initExecutionDuration(final Properties tapConfig) throws TAPException {
		executionDuration = new int[2];
		executionDuration = new int[3];

		// Set the default duration:
		String propValue = getProperty(tapConfig, KEY_DEFAULT_EXECUTION_DURATION);
@@ -700,10 +701,28 @@ public final class ConfigurableServiceConnection implements ServiceConnection {
			throw new TAPException("Integer expected for the property \"" + KEY_MAX_EXECUTION_DURATION + "\", instead of: \"" + propValue + "\"!");
		}

		// The maximum duration MUST be greater or equals than the default duration.
		// If not, the default duration is set (so decreased) to the maximum duration.
		// Set the synchronous duration:
		propValue = getProperty(tapConfig, KEY_SYNC_EXECUTION_DURATION);
		try {
			executionDuration[2] = (propValue == null) ? DEFAULT_EXECUTION_DURATION : Integer.parseInt(propValue);
		} catch(NumberFormatException nfe) {
			throw new TAPException("Integer expected for the property \"" + KEY_SYNC_EXECUTION_DURATION + "\", instead of: \"" + propValue + "\"!");
		}

		/* The maximum duration MUST be greater or equals than the default
		 * duration. If not, the default duration is set (so decreased) to the
		 * maximum duration: */
		if (executionDuration[1] > 0 && executionDuration[1] < executionDuration[0])
			executionDuration[0] = executionDuration[1];

		/* The synchronous duration MUST be less or equals than the default
		 * duration (or the max if no default is set). If not, the sync.
		 * duration is set (so decreased) to the default duration (or max if no
		 * default): */
		if (executionDuration[0] > 0 && executionDuration[0] < executionDuration[2])
			executionDuration[2] = executionDuration[0];
		else if (executionDuration[0] <= 0 && executionDuration[1] > 0 && executionDuration[1] < executionDuration[2])
			executionDuration[2] = executionDuration[1];
	}

	/**
+300 −180

File changed.

Preview size limit exceeded, changes collapsed.

+34 −0
Original line number Diff line number Diff line
@@ -637,6 +637,40 @@
				</td>
				<td>3600000 <em>(1 hour)</em></td>
			</tr>
			<tr class="optional">
				<td class="done">sync_execution_duration</td>
				<td></td>
				<td>integer</td>
				<td>
					<p>Execution duration (in milliseconds) for SYNCHRONOUS queries.</p>
					<p>
						If this property is set, it will be used as default and maximum execution
						duration for synchronous queries. For asynchronous queries,
						<code>default_execution_duration</code> and <code>max_execution_duration</code>
						will still be used as expected.
					</p>
					<p>
						<code>sync_execution_duration</code> MUST be less or equals to default_execution_duration.
						Why? Because synchronous queries are supposed to be quicker than asynchronous
						ones. If this rule is not respected, this execution duration is immediately
						set to <code>default_execution_duration</code>.
					</p>
					<p>
						A negative or null value means that the default execution duration will be
						used instead. Float values are not allowed.
					</p>
					<p><em>Default: same as <code>default_execution_duration</code>.
						But if <code>default_execution_duration</code> is not set, <code>max_execution_duration</code>
						will be used.
						If <code>max_execution_duration</code> is not set, the duration specified in the
						HTTP request will be used.
						And if no such duration is given either, an hard coded duration of
						10 seconds will be applied in order to avoid unlimited synchronous
						query.
					</em></p>
				</td>
				<td>5000 <em>(5 seconds)</em></td>
			</tr>
			
			<tr><td colspan="5">Output</td></tr>
			<tr class="optional">
Loading