Commit 7ff003d9 authored by Sonia Zorba's avatar Sonia Zorba
Browse files

Passed JobOwner to ADQLExecutor and added QueryExecutor interface

parent 082f04b5
......@@ -45,6 +45,7 @@ import uws.UWSException;
import uws.UWSToolBox;
import uws.job.JobThread;
import uws.job.Result;
import uws.job.user.JobOwner;
import uws.service.log.UWSLog.LogLevel;
/**
......@@ -223,7 +224,7 @@ public class ADQLExecutor {
this.response = null;
try {
return start();
return start(tapJob.getOwner());
} catch(IOException ioe) {
if (thread.isInterrupted())
return report;
......@@ -281,6 +282,7 @@ public class ADQLExecutor {
* @param jobId ID of the corresponding job.
* @param params All execution parameters (including the query to process).
* @param response Object in which the result or the error must be written.
* @param jobOwner The user executing the query
*
* @return The resulting execution report.
*
......@@ -290,7 +292,7 @@ public class ADQLExecutor {
*
* @see #start()
*/
public final TAPExecutionReport start(final Thread thread, final String jobId, final TAPParameters params, final HttpServletResponse response) throws TAPException, IOException, InterruptedException {
public final TAPExecutionReport start(final Thread thread, final String jobId, final TAPParameters params, final HttpServletResponse response, final JobOwner jobOwner) throws TAPException, IOException, InterruptedException {
if (this.thread != null || this.report != null)
throw new TAPException("This ADQLExecutor has already been executed!");
......@@ -300,7 +302,7 @@ public class ADQLExecutor {
this.response = response;
try {
return start();
return start(jobOwner);
} catch(UWSException ue) {
throw new TAPException(ue, ue.getHttpErrorCode());
}
......@@ -336,7 +338,7 @@ public class ADQLExecutor {
* In asynchronous, the error is stored as job error report and is never propagated.</i>
* @throws InterruptedException If the job has been interrupted (by the user or a time-out).
*/
protected final TAPExecutionReport start() throws TAPException, UWSException, IOException, InterruptedException {
protected final TAPExecutionReport start(JobOwner jobOwner) throws TAPException, UWSException, IOException, InterruptedException {
logger.logTAP(LogLevel.INFO, report, "START_EXEC", (report.synchronous ? "Synchronous" : "Asynchronous") + " execution of an ADQL query STARTED.", null);
// Save the start time (for reporting usage):
......@@ -378,8 +380,8 @@ public class ADQLExecutor {
throw new InterruptedException();
// 3. EXECUTE THE ADQL QUERY:
startStep(ExecutionProgression.EXECUTING_ADQL);
queryResult = executeADQL(adqlQuery);
startStep(ExecutionProgression.EXECUTING_ADQL);
queryResult = executeADQL(adqlQuery, jobOwner);
endStep();
if (queryResult == null || thread.isInterrupted())
......@@ -598,6 +600,7 @@ public class ADQLExecutor {
* </i></p>
*
* @param adql The object representation of the ADQL query to execute.
* @param jobOwner The user executing the query
*
* @return The result of the query.
*
......@@ -607,7 +610,7 @@ public class ADQLExecutor {
*
* @see DBConnection#executeQuery(ADQLQuery)
*/
protected TableIterator executeADQL(final ADQLQuery adql) throws InterruptedException, DBCancelledException, TAPException {
protected TableIterator executeADQL(final ADQLQuery adql, JobOwner jobOwner) throws InterruptedException, DBCancelledException, TAPException {
// Log the start of execution:
logger.logTAP(LogLevel.INFO, report, "START_DB_EXECUTION", "ADQL query: " + adql.toADQL().replaceAll("(\t|\r?\n)+", " "), null);
......@@ -621,8 +624,8 @@ public class ADQLExecutor {
try {
// Execute the ADQL query:
TableIterator result = dbConn.executeQuery(adql);
TableIterator result = dbConn.executeQuery(adql, jobOwner);
// If NULL, in a former version of the library, it means the query execution has been aborted:
if (result == null)
throw new DBCancelledException();
......
......@@ -25,6 +25,7 @@ import java.util.Iterator;
import adql.db.FunctionDef;
import tap.db.DBConnection;
import tap.db.QueryExecutor;
import tap.formatter.OutputFormat;
import tap.log.DefaultTAPLog;
import tap.log.TAPLog;
......@@ -455,6 +456,8 @@ public interface ServiceConnection {
*/
public UserIdentifier getUserIdentifier();
public QueryExecutor getQueryExecutor();
/**
* <i><b>[MANDATORY]</b></i>
* <p>This function lets enable or disable the upload capability of this TAP
......
......@@ -23,14 +23,17 @@ package tap;
import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import tap.parameters.TAPExecutionDurationController;
import tap.parameters.TAPParameters;
import uws.UWSException;
import uws.UWSToolBox;
import uws.job.JobThread;
import uws.job.UWSJob;
import uws.job.user.JobOwner;
import uws.service.log.UWSLog.LogLevel;
import uws.service.request.UploadFile;
......@@ -249,7 +252,7 @@ public class TAPSyncJob {
*
* @see SyncThread
*/
public synchronized boolean start(final HttpServletResponse response) throws IllegalStateException, IOException, TAPException {
public synchronized boolean start(final HttpServletRequest request, final HttpServletResponse response) throws IllegalStateException, IOException, TAPException {
if (startedAt != null)
throw new IllegalStateException("Impossible to restart a synchronous TAP query!");
......@@ -270,7 +273,13 @@ public class TAPSyncJob {
final long timeToStop = determineMaxExecutionDuration();
// Give to a thread which will execute the query:
thread = new SyncThread(executor, ID, tapParams, response);
JobOwner jobOwner = null;
try {
jobOwner = UWSToolBox.getUser(request, service.getUserIdentifier());
} catch(UWSException ex) {
service.getLogger().log(LogLevel.WARNING, "UWS", "Unable to retrieve user information", ex);
}
thread = new SyncThread(executor, ID, tapParams, response, jobOwner);
thread.start();
// Wait the end of the thread until the maximum execution duration is reached:
......@@ -438,6 +447,7 @@ public class TAPSyncJob {
/** Object knowing how to execute an ADQL query and which will execute
* it by calling {@link ADQLExecutor#start(Thread, String, TAPParameters, HttpServletResponse)}. */
protected final ADQLExecutor executor;
protected final JobOwner jobOwner;
/** Response in which the query result must be written. No error should
* be written in it directly at this level ; the error must be
* propagated and it will be written in this HTTP response later on a
......@@ -467,12 +477,14 @@ public class TAPSyncJob {
* and the execution parameters.
* @param response HTTP response in which the ADQL query result must be
* written.
* @param jobOwner The user executing the ADQL query
*/
public SyncThread(final ADQLExecutor executor, final String ID, final TAPParameters tapParams, final HttpServletResponse response) {
public SyncThread(final ADQLExecutor executor, final String ID, final TAPParameters tapParams, final HttpServletResponse response, final JobOwner jobOwner) {
super(JobThread.tg, ID);
this.executor = executor;
this.ID = ID;
this.tapParams = tapParams;
this.jobOwner = jobOwner;
this.response = response;
}
......@@ -522,7 +534,7 @@ public class TAPSyncJob {
try {
// Execute the ADQL query:
report = executor.start(this, ID, tapParams, response);
report = executor.start(this, ID, tapParams, response, jobOwner);
// Log the successful end of this thread:
executor.getLogger().logThread(LogLevel.INFO, thread, "END", "Synchronous thread \"" + ID + "\" successfully ended.", null);
......
......@@ -108,6 +108,7 @@ import tap.TAPException;
import tap.TAPFactory;
import tap.db.DBConnection;
import tap.db.JDBCConnection;
import tap.db.QueryExecutor;
import tap.formatter.FITSFormat;
import tap.formatter.HTMLFormat;
import tap.formatter.JSONFormat;
......@@ -127,6 +128,7 @@ import uws.service.UserIdentifier;
import uws.service.file.LocalUWSFileManager;
import uws.service.file.UWSFileManager;
import uws.service.log.UWSLog.LogLevel;
import static tap.config.TAPConfiguration.KEY_QUERY_EXECUTOR;
/**
* Concrete implementation of {@link ServiceConnection}, fully parameterized
......@@ -208,6 +210,8 @@ public final class ConfigurableServiceConnection implements ServiceConnection {
/** The method to use in order to identify a TAP user. */
private UserIdentifier userIdentifier = null;
private QueryExecutor queryExecutor = null;
/** List of all allowed coordinate systems.
* <em>
......@@ -303,7 +307,10 @@ public final class ConfigurableServiceConnection implements ServiceConnection {
// 9. SET A USER IDENTIFIER:
initUserIdentifier(tapConfig);
// 10. CONFIGURE ADQL:
// 10. SET A QUERY EXECUTOR:
initQueryExecutor(tapConfig);
// 11. CONFIGURE ADQL:
initCoordSys(tapConfig);
initADQLGeometries(tapConfig);
initUDFs(tapConfig);
......@@ -1101,6 +1108,15 @@ public final class ConfigurableServiceConnection implements ServiceConnection {
if (propValue != null)
userIdentifier = newInstance(propValue, KEY_USER_IDENTIFIER, UserIdentifier.class);
}
private void initQueryExecutor(final Properties tapConfig) throws TAPException {
// Get the property value:
String propValue = getProperty(tapConfig, KEY_QUERY_EXECUTOR);
if (propValue != null)
queryExecutor = newInstance(propValue, KEY_QUERY_EXECUTOR, QueryExecutor.class);
else
queryExecutor = new QueryExecutor();
}
/**
* Initialize the list of all allowed coordinate systems.
......@@ -1768,6 +1784,11 @@ public final class ConfigurableServiceConnection implements ServiceConnection {
public UserIdentifier getUserIdentifier() {
return userIdentifier;
}
@Override
public QueryExecutor getQueryExecutor() {
return queryExecutor;
}
@Override
public TAPMetadata getTAPMetadata() {
......
......@@ -282,12 +282,12 @@ public class ConfigurableTAPFactory extends AbstractTAPFactory {
public DBConnection getConnection(String jobID) throws TAPException{
if (datasource != null){
try{
return new JDBCConnection(datasource.getConnection(), createADQLTranslator(), jobID, this.service.getLogger());
return new JDBCConnection(datasource.getConnection(), createADQLTranslator(), jobID, this.service.getQueryExecutor(), this.service.getLogger());
}catch(SQLException se){
throw new TAPException("Impossible to establish a connection to the database using the set up datasource!", se);
}
}else
return new JDBCConnection(driverPath, dbUrl, dbUser, dbPassword, createADQLTranslator(), jobID, this.service.getLogger());
return new JDBCConnection(driverPath, dbUrl, dbUser, dbPassword, createADQLTranslator(), jobID, this.service.getQueryExecutor(), this.service.getLogger());
}
@Override
......
......@@ -386,6 +386,8 @@ public final class TAPConfiguration {
* value of this property. */
public final static String KEY_USER_IDENTIFIER = "user_identifier";
public final static String KEY_QUERY_EXECUTOR = "query_executor";
/* ADQL RESTRICTIONS */
/** Name/Key of the property specifying the list of all allowed coordinate
* systems that can be used in ADQL queries. By default, all are allowed,
......
......@@ -30,6 +30,7 @@ import tap.data.TableIterator;
import tap.metadata.TAPColumn;
import tap.metadata.TAPMetadata;
import tap.metadata.TAPTable;
import uws.job.user.JobOwner;
/**
* <p>Connection to the "database" (whatever is the type or whether it is linked to a true DBMS connection).</p>
......@@ -238,6 +239,7 @@ public interface DBConnection {
* </p>
*
* @param adqlQuery ADQL query to execute.
* @param jobOwner The user executing the query
*
* @return The table result.
*
......@@ -249,7 +251,7 @@ public interface DBConnection {
* @see #endQuery()
* @see TableIterator#close()
*/
public TableIterator executeQuery(final ADQLQuery adqlQuery) throws DBCancelledException, DBException;
public TableIterator executeQuery(final ADQLQuery adqlQuery, JobOwner jobOwner) throws DBCancelledException, DBException;
/**
* <p>Set the number of rows to fetch before searching/getting the following.
......
......@@ -62,6 +62,7 @@ import tap.metadata.TAPSchema;
import tap.metadata.TAPTable;
import tap.metadata.TAPTable.TableType;
import uws.ISO8601Format;
import uws.job.user.JobOwner;
import uws.service.log.UWSLog.LogLevel;
/**
......@@ -237,9 +238,11 @@ public class JDBCConnection implements DBConnection {
/** The translator this connection must use to translate ADQL into SQL. It is also used to get information about the case sensitivity of all types of identifier (schema, table, column). */
protected final JDBCTranslator translator;
protected final QueryExecutor queryExecutor;
/** Object to use if any message needs to be logged. <i>note: this logger may be NULL. If NULL, messages will never be printed.</i> */
protected final TAPLog logger;
/* JDBC URL MANAGEMENT */
/** JDBC prefix of any database URL (for instance: jdbc:postgresql://127.0.0.1/myDB or jdbc:postgresql:myDB). */
......@@ -341,8 +344,8 @@ public class JDBCConnection implements DBConnection {
*
* @throws DBException If the driver can not be found or if the connection can not merely be created (usually because DB parameters are wrong).
*/
public JDBCConnection(final String driverPath, final String dbUrl, final String dbUser, final String dbPassword, final JDBCTranslator translator, final String connID, final TAPLog logger) throws DBException{
this(createConnection(driverPath, dbUrl, dbUser, dbPassword), translator, connID, logger);
public JDBCConnection(final String driverPath, final String dbUrl, final String dbUser, final String dbPassword, final JDBCTranslator translator, final String connID, final QueryExecutor queryExecutor, final TAPLog logger) throws DBException{
this(createConnection(driverPath, dbUrl, dbUser, dbPassword), translator, connID, queryExecutor, logger);
}
/**
......@@ -353,7 +356,7 @@ public class JDBCConnection implements DBConnection {
* @param connID ID of this connection. <i>note: may be NULL ; but in this case, logs concerning this connection will be more difficult to localize.</i>
* @param logger Logger to use in case of need. <i>note: may be NULL ; in this case, error will never be logged, but sometimes DBException may be raised.</i>
*/
public JDBCConnection(final Connection conn, final JDBCTranslator translator, final String connID, final TAPLog logger) throws DBException{
public JDBCConnection(final Connection conn, final JDBCTranslator translator, final String connID, final QueryExecutor queryExecutor, final TAPLog logger) throws DBException{
if (conn == null)
throw new NullPointerException("Missing SQL connection! => can not create a JDBCConnection object.");
if (translator == null)
......@@ -362,6 +365,7 @@ public class JDBCConnection implements DBConnection {
this.connection = conn;
this.translator = translator;
this.ID = connID;
this.queryExecutor = queryExecutor;
this.logger = logger;
// Set the supporting features' flags + DBMS type:
......@@ -683,10 +687,10 @@ public class JDBCConnection implements DBConnection {
/* INTERROGATION METHODS */
/* ********************* */
@Override
public synchronized TableIterator executeQuery(final ADQLQuery adqlQuery) throws DBException{
public synchronized TableIterator executeQuery(final ADQLQuery adqlQuery, JobOwner jobOwner) throws DBException{
// Starting of new query execution => disable the cancel flag:
resetCancel();
String sql = null;
ResultSet result = null;
try{
......@@ -731,7 +735,10 @@ public class JDBCConnection implements DBConnection {
// 3. Execute the SQL query:
if (logger != null)
logger.logDB(LogLevel.INFO, this, "EXECUTE", "SQL query: " + sql.replaceAll("(\t|\r?\n)+", " "), null);
result = stmt.executeQuery(sql);
queryExecutor.executeQuery(stmt, sql, jobOwner);
result = stmt.executeQuery(sql);
// If the query has been aborted, return immediately:
if (isCancelled())
......
package tap.db;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import uws.job.user.JobOwner;
public class QueryExecutor {
public ResultSet executeQuery(Statement statement, String sql, JobOwner jobOwner) throws SQLException {
return statement.executeQuery(sql);
}
}
......@@ -113,7 +113,7 @@ public class Sync implements TAPResource {
// Execute synchronously the given job:
TAPSyncJob syncJob = new TAPSyncJob(service, params, requestID);
syncJob.start(response);
syncJob.start(request, response);
return true;
......
......@@ -68,6 +68,7 @@ import tap.TAPException;
import tap.db.DBConnection;
import tap.db.DBException;
import tap.db.JDBCConnection;
import tap.db.QueryExecutor;
import tap.db_testtools.DBTools;
import tap.formatter.OutputFormat;
import tap.formatter.VOTableFormat;
......@@ -1392,7 +1393,7 @@ public class TestConfigurableServiceConnection {
public CustomTAPFactory(final ServiceConnection conn) throws DBException {
super(conn);
dbConn = new JDBCConnection(DBTools.DB_TEST_JDBC_DRIVER, DBTools.DB_TEST_URL, DBTools.DB_TEST_USER, DBTools.DB_TEST_PWD, new AstroH2Translator(), "TheOnlyConnection", conn.getLogger());
dbConn = new JDBCConnection(DBTools.DB_TEST_JDBC_DRIVER, DBTools.DB_TEST_URL, DBTools.DB_TEST_USER, DBTools.DB_TEST_PWD, new AstroH2Translator(), "TheOnlyConnection", new QueryExecutor(), conn.getLogger());
}
@Override
......@@ -1427,7 +1428,7 @@ public class TestConfigurableServiceConnection {
public CustomConfigurableTAPFactory(final ServiceConnection conn, final Properties prop) throws DBException {
super(conn);
dbConn = new JDBCConnection(DBTools.DB_TEST_JDBC_DRIVER, DBTools.DB_TEST_URL, DBTools.DB_TEST_USER, DBTools.DB_TEST_PWD, new AstroH2Translator(), "TheOnlyConnection", conn.getLogger());
dbConn = new JDBCConnection(DBTools.DB_TEST_JDBC_DRIVER, DBTools.DB_TEST_URL, DBTools.DB_TEST_USER, DBTools.DB_TEST_PWD, new AstroH2Translator(), "TheOnlyConnection", new QueryExecutor(), conn.getLogger());
}
@Override
......
......@@ -45,6 +45,7 @@ import tap.backup.DefaultTAPBackupManager;
import tap.db.DBConnection;
import tap.db.DBException;
import tap.db.JDBCConnection;
import tap.db.QueryExecutor;
import tap.db_testtools.DBTools;
import tap.formatter.OutputFormat;
import tap.log.DefaultTAPLog;
......@@ -440,6 +441,11 @@ public class TestConfigurableTAPFactory {
public UserIdentifier getUserIdentifier(){
return null;
}
@Override
public QueryExecutor getQueryExecutor(){
return null;
}
@Override
public boolean uploadEnabled(){
......
......@@ -66,12 +66,12 @@ public class TestJDBCConnection {
DBTools.createTestDB();
h2Connection = DBTools.createConnection("h2", null, null, DBTools.DB_TEST_PATH, DBTools.DB_TEST_USER, DBTools.DB_TEST_PWD);
h2JDBCConnection = new JDBCConnection(h2Connection, new AstroH2Translator(false), "H2", null);
sensH2JDBCConnection = new JDBCConnection(h2Connection, new AstroH2Translator(true, true, true, true), "SensitiveH2", null);
h2JDBCConnection = new JDBCConnection(h2Connection, new AstroH2Translator(false), "H2", new QueryExecutor(), null);
sensH2JDBCConnection = new JDBCConnection(h2Connection, new AstroH2Translator(true, true, true, true), "SensitiveH2", new QueryExecutor(), null);
sqliteConnection = DBTools.createConnection("sqlite", null, null, sqliteDbFile, null, null);
sqliteJDBCConnection = new JDBCConnection(sqliteConnection, new PostgreSQLTranslator(false), "SQLITE", null);
sensSqliteJDBCConnection = new JDBCConnection(sqliteConnection, new PostgreSQLTranslator(true), "SensitiveSQLite", null);
sqliteJDBCConnection = new JDBCConnection(sqliteConnection, new PostgreSQLTranslator(false), "SQLITE", new QueryExecutor(), null);
sensSqliteJDBCConnection = new JDBCConnection(sqliteConnection, new PostgreSQLTranslator(true), "SensitiveSQLite", new QueryExecutor(), null);
}
@AfterClass
......@@ -541,7 +541,7 @@ public class TestJDBCConnection {
// Build the ADQLQuery object:
ADQLQuery query = parser.parseQuery("SELECT table_name FROM TAP_SCHEMA.tables;");
// Execute the query:
result = conn.executeQuery(query);
result = conn.executeQuery(query, null);
fail("{" + conn.ID + "} This test should have failed because TAP_SCHEMA was supposed to not exist!");
}catch(DBException de){
assertTrue(de.getMessage().startsWith("Unexpected error while executing a SQL query: "));
......@@ -564,7 +564,7 @@ public class TestJDBCConnection {
// Build the ADQLQuery object:
ADQLQuery query = parser.parseQuery("SELECT table_name FROM TAP_SCHEMA.tables;");
// Execute the query:
result = conn.executeQuery(query);
result = conn.executeQuery(query, null);
assertEquals(1, result.getMetadata().length);
int cntRow = 0;
while(result.nextRow()){
......@@ -601,7 +601,7 @@ public class TestJDBCConnection {
/* ************** */
public final static void main(final String[] args) throws Throwable{
JDBCConnection conn = new JDBCConnection(DBTools.createConnection("h2", null, null, DBTools.DB_TEST_PATH, DBTools.DB_TEST_USER, DBTools.DB_TEST_PWD), new AstroH2Translator(), "TEST_H2", null);
JDBCConnection conn = new JDBCConnection(DBTools.createConnection("h2", null, null, DBTools.DB_TEST_PATH, DBTools.DB_TEST_USER, DBTools.DB_TEST_PWD), new AstroH2Translator(), "TEST_H2", new QueryExecutor(), null);
TestJDBCConnection.createTAPSchema(conn);
TestJDBCConnection.dropSchema(STDSchema.TAPSCHEMA.label, conn);
}
......
......@@ -6,6 +6,7 @@ import java.util.Iterator;
import adql.db.FunctionDef;
import tap.ServiceConnection;
import tap.TAPFactory;
import tap.db.QueryExecutor;
import tap.log.TAPLog;
import tap.metadata.TAPMetadata;
import uws.service.UserIdentifier;
......@@ -70,6 +71,11 @@ public class ServiceConnection4Test implements ServiceConnection {
return null;
}
@Override
public QueryExecutor getQueryExecutor() {
return null;
}
@Override
public boolean uploadEnabled(){
return false;
......
......@@ -9,6 +9,7 @@ import adql.db.FunctionDef;
import tap.ServiceConnection;
import tap.TAPFactory;
import tap.TAPJob;
import tap.db.QueryExecutor;
import tap.formatter.FITSFormat;
import tap.formatter.OutputFormat;
import tap.formatter.SVFormat;
......@@ -96,6 +97,11 @@ public class ServiceConnectionOfTest implements ServiceConnection {
return null;
}
@Override
public QueryExecutor getQueryExecutor() {
return null;
}
@Override
public boolean uploadEnabled(){
return false;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment