Skip to content
JDBCConnection.java 155 KiB
Newer Older
				// add TAP_SCHEMA.coosys to the schema TAP_SCHEMA:
				tap_schema.addTable(coosysTable);
				// load all declared coordinate systems from it:
				mapCoosys = loadCoosys(coosysTable, metadata, stmt);
			// load all columns from TAP_SCHEMA.columns:
			if (logger != null)
				logger.logDB(LogLevel.INFO, this, "LOAD_TAP_SCHEMA", "Loading TAP_SCHEMA.columns.", null);
			loadColumns(tap_schema.getTable(STDTable.COLUMNS.label), lstTables, mapCoosys, stmt);

			// load all foreign keys from TAP_SCHEMA.keys and TAP_SCHEMA.key_columns:
			if (logger != null)
				logger.logDB(LogLevel.INFO, this, "LOAD_TAP_SCHEMA", "Loading TAP_SCHEMA.keys and TAP_SCHEMA.key_columns.", null);
			loadKeys(tap_schema.getTable(STDTable.KEYS.label), tap_schema.getTable(STDTable.KEY_COLUMNS.label), lstTables, stmt);

			if (!isCancelled() && logger != null)
				logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to create a Statement!", se);
			throw new DBException("Can not create a Statement!", se);
			cancel(stmt, true); // note: this function is called instead of cancel(true) in order to avoid a log message about the cancellation operation result.
	/**
	 * <p>Load into the given metadata all schemas listed in TAP_SCHEMA.schemas.</p>
	 * 	If schemas are not supported by this DBMS connection, the DB name of the loaded schemas is set to NULL.
	 * </i></p>
	 * <p><i>Note 2:
	 * 	Schema entries are retrieved ordered by ascending schema_name.
	 * </i></p>
	 * @param tableDef		Definition of the table TAP_SCHEMA.schemas.
	 * @param metadata		Metadata to fill with all found schemas.
	 * @param stmt			Statement to use in order to interact with the database.
	 * @throws DBException	If any error occurs while interacting with the database.
	protected void loadSchemas(final TAPTable tableDef, final TAPMetadata metadata, final Statement stmt) throws DBException {
			// Determine whether the dbName column exists:
			/* note: if the schema notion is not supported by this DBMS, the column "dbname" is ignored. */
			boolean hasDBName = supportsSchema && isColumnExisting(tableDef.getDBSchemaName(), tableDef.getDBName(), DB_NAME_COLUMN, connection.getMetaData());

			// Determine whether the schemaIndex column exists:
			boolean hasSchemaIndex = isColumnExisting(tableDef.getDBSchemaName(), tableDef.getDBName(), "schema_index", connection.getMetaData());

			// Build the SQL query:
			StringBuffer sqlBuf = new StringBuffer("SELECT ");
			sqlBuf.append(translator.getColumnName(tableDef.getColumn("schema_name")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("description")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("utype")));
			if (hasSchemaIndex)
				sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("schema_index")));
				translator.appendIdentifier(sqlBuf, DB_NAME_COLUMN, IdentifierField.COLUMN);
			sqlBuf.append(" FROM ").append(translator.getTableName(tableDef, supportsSchema));
			else
				sqlBuf.append(" ORDER BY 1");

			// Execute the query:
			rs = stmt.executeQuery(sqlBuf.toString());

			// Create all schemas:
				String schemaName = rs.getString(1),
						description = rs.getString(2), utype = rs.getString(3),
						dbName = (hasDBName ? (hasSchemaIndex ? rs.getString(5) : rs.getString(4)) : null);
				int schemaIndex = (hasSchemaIndex ? (rs.getObject(4) == null ? -1 : rs.getInt(4)) : -1);

				// create the new schema:
				TAPSchema newSchema = new TAPSchema(schemaName, nullifyIfNeeded(description), nullifyIfNeeded(utype));
				if (DBIdentifier.normalize(dbName) != null)
				newSchema.setIndex(schemaIndex);
				// force the dbName of TAP_SCHEMA to be the same as the used one:
				if (STDSchema.TAPSCHEMA.label.equalsIgnoreCase(schemaName))
					newSchema.setDBName(tableDef.getDBSchemaName());

				// add the new schema inside the given metadata:
				metadata.addSchema(newSchema);
			}
			if (!isCancelled() && logger != null)
				logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to load schemas from TAP_SCHEMA.schemas!", se);
			throw new DBException("Impossible to load schemas from TAP_SCHEMA.schemas!", se);
	/**
	 * <p>Load into the corresponding metadata all tables listed in TAP_SCHEMA.tables.</p>
	 * 	Schemas are searched in the given metadata by their ADQL name and case sensitively.
	 * 	If they can not be found a {@link DBException} is thrown.
	 * </i></p>
	 * 	If schemas are not supported by this DBMS connection, the DB name of the loaded
	 * 	{@link TAPTable}s is prefixed by the ADQL name of their respective schema.
	 * <p><i>Note 3:
	 * 	If the column table_index exists, table entries are retrieved ordered by ascending schema_name, then table_index, and finally table_name.
	 * 	If this column does not exist, table entries are retrieved ordered by ascending schema_name and then table_name.
	 * </i></p>
	 * @param tableDef		Definition of the table TAP_SCHEMA.tables.
	 * @param metadata		Metadata (containing already all schemas listed in TAP_SCHEMA.schemas).
	 * @param stmt			Statement to use in order to interact with the database.
	 * @return	The complete list of all loaded tables. <i>note: this list is required by {@link #loadColumns(TAPTable, List, Statement)}.</i>
	 * @throws DBException	If a schema can not be found, or if any other error occurs while interacting with the database.
	 */
	protected List<TAPTable> loadTables(final TAPTable tableDef, final TAPMetadata metadata, final Statement stmt) throws DBException {
			// Determine whether the dbName column exists:
			boolean hasDBName = isColumnExisting(tableDef.getDBSchemaName(), tableDef.getDBName(), DB_NAME_COLUMN, connection.getMetaData());

			// Determine whether the tableIndex column exists:
			boolean hasTableIndex = isColumnExisting(tableDef.getDBSchemaName(), tableDef.getDBName(), "table_index", connection.getMetaData());

			// Build the SQL query:
			StringBuffer sqlBuf = new StringBuffer("SELECT ");
			sqlBuf.append(translator.getColumnName(tableDef.getColumn("schema_name")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("table_name")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("table_type")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("description")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("utype")));
			if (hasTableIndex)
				sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("table_index")));
				translator.appendIdentifier(sqlBuf, DB_NAME_COLUMN, IdentifierField.COLUMN);
			sqlBuf.append(" FROM ").append(translator.getTableName(tableDef, supportsSchema));
			if (hasTableIndex)
				sqlBuf.append(" ORDER BY 1,6,2");

			// Execute the query:
			rs = stmt.executeQuery(sqlBuf.toString());

			// Create all tables:
			ArrayList<TAPTable> lstTables = new ArrayList<TAPTable>();
				String schemaName = rs.getString(1),
						tableName = rs.getString(2), typeStr = rs.getString(3),
						description = rs.getString(4), utype = rs.getString(5),
						dbName = (hasDBName ? (hasTableIndex ? rs.getString(7) : rs.getString(6)) : null);
				int tableIndex = (hasTableIndex ? (rs.getObject(6) == null ? -1 : rs.getInt(6)) : -1);

				// get the schema:
				TAPSchema schema = metadata.getSchema(schemaName);
					if (logger != null)
						logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to find the schema of the table \"" + tableName + "\": \"" + schemaName + "\"!", null);
					throw new DBException("Impossible to find the schema of the table \"" + tableName + "\": \"" + schemaName + "\"!");
				}

				/*// If the table name is qualified, check its prefix (it must match to the schema name):
				int endPrefix = tableName.indexOf('.');
					if (endPrefix == 0)
						throw new DBException("Incorrect table name syntax: \"" + tableName + "\"! Missing schema name (before '.').");
					else if (endPrefix == tableName.length() - 1)
						throw new DBException("Incorrect table name syntax: \"" + tableName + "\"! Missing table name (after '.').");
					else if (schemaName == null)
						throw new DBException("Incorrect schema prefix for the table \"" + tableName.substring(endPrefix + 1) + "\": this table is not in a schema, according to the column \"schema_name\" of TAP_SCHEMA.tables!");
					else if (!tableName.substring(0, endPrefix).trim().equalsIgnoreCase(schemaName))
						throw new DBException("Incorrect schema prefix for the table \"" + schemaName + "." + tableName.substring(tableName.indexOf('.') + 1) + "\": " + tableName + "! Mismatch between the schema specified in prefix of the column \"table_name\" and in the column \"schema_name\".");
				// resolve the table type (if any) ; by default, it will be "table":
				TableType type = TableType.table;
						type = TableType.valueOf(typeStr.toLowerCase());
				}

				// create the new table:
				TAPTable newTable = new TAPTable(tableName, type, nullifyIfNeeded(description), nullifyIfNeeded(utype));
				// force the dbName of TAP_SCHEMA table to be the same as the used one:
				if (STDSchema.TAPSCHEMA.label.equalsIgnoreCase(schemaName)) {
					if (tableDef.getSchema() != null && tableDef.getSchema().getTable(newTable.getADQLName()) != null)
						newTable.setDBName(tableDef.getSchema().getTable(newTable.getADQLName()).getDBName());
				// add the new table inside its corresponding schema:
				schema.addTable(newTable);
				lstTables.add(newTable);
			}

			return lstTables;
			if (!isCancelled() && logger != null)
				logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to load tables from TAP_SCHEMA.tables!", se);
			throw new DBException("Impossible to load tables from TAP_SCHEMA.tables!", se);
	/**
	 * Load all coordinate systems declared in the TAP_SCHEMA.
	 * @param tableDef		Definition of the table TAP_SCHEMA.coosys.
	 * @param metadata		Metadata in which the found coordinate systems will be inserted (see {@link TAPMetadata#addCoosys(TAPCoosys)}).
	 * @param stmt			Statement to use in order to interact with the database.
	 * @return	A map containing all declared coordinate systems (key=coosys ID, value={@link TAPCoosys}).
	 *        	<i>note: this map is required by {@link #loadColumns(TAPTable, List, Map, Statement)}.</i>
	 * @throws DBException	If any error occurs while interacting with the database.
	protected Map<String, TAPCoosys> loadCoosys(final TAPTable tableDef, final TAPMetadata metadata, final Statement stmt) throws DBException {
			// Build the SQL query:
			StringBuffer sqlBuf = new StringBuffer("SELECT ");
			sqlBuf.append(translator.getColumnName(tableDef.getColumn("id")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("system")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("equinox")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("epoch")));
			sqlBuf.append(" FROM ").append(translator.getTableName(tableDef, supportsSchema));
			sqlBuf.append(" ORDER BY 1,2,3,4");

			// Execute the query:
			rs = stmt.executeQuery(sqlBuf.toString());

			// Create all coosys:
			HashMap<String, TAPCoosys> mapCoosys = new HashMap<String, TAPCoosys>();
				String coosysId = rs.getString(1), system = rs.getString(2),
						equinox = rs.getString(3), epoch = rs.getString(4);

				// create the new coosys:
				TAPCoosys newCoosys = new TAPCoosys(coosysId, system, nullifyIfNeeded(equinox), nullifyIfNeeded(epoch));
				// create and add the new coosys:
				metadata.addCoosys(newCoosys);
				mapCoosys.put(coosysId, newCoosys);
			}

			return mapCoosys;
			if (!isCancelled() && logger != null)
				logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to load coordinate systems from TAP_SCHEMA.coosys!", se);
			throw new DBException("Impossible to load coordinate systems from TAP_SCHEMA.coosys!", se);
	/**
	 * <p>Load into the corresponding tables all columns listed in TAP_SCHEMA.columns.</p>
	 * <p><i>Note:
	 * 	Tables are searched in the given list by their ADQL name and case sensitively.
	 * 	If they can not be found a {@link DBException} is thrown.
	 * </i></p>
	 * <p><i>Note 2:
	 * 	If the column column_index exists, column entries are retrieved ordered by ascending table_name, then column_index, and finally column_name.
	 * 	If this column does not exist, column entries are retrieved ordered by ascending table_name and then column_name.
	 * </i></p>
	 * @param tableDef		Definition of the table TAP_SCHEMA.columns.
	 * @param lstTables		List of all published tables (= all tables listed in TAP_SCHEMA.tables).
	 * @param stmt			Statement to use in order to interact with the database.
	 * @throws DBException	If a table can not be found, or if any other error occurs while interacting with the database.
	 * @deprecated	This method is now replaced by {@link #loadColumns(TAPTable, List, Map, Statement)} which has an additional parameter:
	 *            	the list of declared coordinate systems.
	protected void loadColumns(final TAPTable tableDef, final List<TAPTable> lstTables, final Statement stmt) throws DBException {
		loadColumns(tableDef, lstTables, null, stmt);
	}

	/**
	 * <p>Load into the corresponding tables all columns listed in TAP_SCHEMA.columns.</p>
	 * <p><i>Note:
	 * 	Tables are searched in the given list by their ADQL name and case sensitively.
	 * 	If they can not be found a {@link DBException} is thrown.
	 * </i></p>
	 * <p><i>Note 2:
	 * 	If the column column_index exists, column entries are retrieved ordered by ascending table_name, then column_index, and finally column_name.
	 * 	If this column does not exist, column entries are retrieved ordered by ascending table_name and then column_name.
	 * </i></p>
	 * @param tableDef		Definition of the table TAP_SCHEMA.columns.
	 * @param lstTables		List of all published tables (= all tables listed in TAP_SCHEMA.tables).
	 * @param mapCoosys		List of all published coordinate systems (= all coordinates systems listed in TAP_SCHEMA.coosys).
	 * @param stmt			Statement to use in order to interact with the database.
	 * @throws DBException	If a table can not be found, or if any other error occurs while interacting with the database.
	protected void loadColumns(final TAPTable tableDef, final List<TAPTable> lstTables, final Map<String, TAPCoosys> mapCoosys, final Statement stmt) throws DBException {
			// Determine whether the dbName column exists:
			boolean hasArraysize = isColumnExisting(tableDef.getDBSchemaName(), tableDef.getDBName(), "arraysize", connection.getMetaData());

			// Determine whether the dbName column exists:
			boolean hasDBName = isColumnExisting(tableDef.getDBSchemaName(), tableDef.getDBName(), DB_NAME_COLUMN, connection.getMetaData());

			// Determine whether the columnIndex column exists:
			boolean hasColumnIndex = isColumnExisting(tableDef.getDBSchemaName(), tableDef.getDBName(), "column_index", connection.getMetaData());

			// Determine whether the coosys_id column exists:
			boolean hasCoosys = (mapCoosys != null) && isColumnExisting(tableDef.getDBSchemaName(), tableDef.getDBName(), "coosys_id", connection.getMetaData());

			// Build the SQL query:
			StringBuffer sqlBuf = new StringBuffer("SELECT ");
			sqlBuf.append(translator.getColumnName(tableDef.getColumn("table_name")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("column_name")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("description")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("unit")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("ucd")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("utype")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("datatype")));
			if (hasArraysize)
				sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("arraysize")));
			else
				sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("size")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("principal")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("indexed")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("std")));
			if (hasColumnIndex)
				sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("column_index")));
				translator.appendIdentifier(sqlBuf, DB_NAME_COLUMN, IdentifierField.COLUMN);
				sqlBuf.append(", ");
				translator.appendIdentifier(sqlBuf, COOSYS_ID_COLUMN, IdentifierField.COLUMN);
			}
			sqlBuf.append(" FROM ").append(translator.getTableName(tableDef, supportsSchema));
			if (hasColumnIndex)
				sqlBuf.append(" ORDER BY 1,12,2");

			// Execute the query:
			rs = stmt.executeQuery(sqlBuf.toString());

			// Create all tables:
				String tableName = rs.getString(1),
						columnName = rs.getString(2),
						description = rs.getString(3), unit = rs.getString(4),
						ucd = rs.getString(5), utype = rs.getString(6),
						datatype = rs.getString(7),
						dbName = (hasDBName ? (hasColumnIndex ? rs.getString(13) : rs.getString(12)) : null);
				int size = 0;
                                if(hasArraysize) {
                                    String arraySize = rs.getString(8);
                                    if(!rs.wasNull()) {
                                        arraySize = arraySize.replace("*", "");
                                        if(arraySize.isEmpty()) {
                                            size = Integer.MAX_VALUE;
                                        } else {
                                            size = Integer.parseInt(arraySize);
                                        }
                                    }
                                } else {
                                    size = rs.getInt(8);
                                }
				int colIndex = (hasColumnIndex ? (rs.getObject(12) == null ? -1 : rs.getInt(12)) : -1);
				boolean principal = toBoolean(rs.getObject(9)),
						indexed = toBoolean(rs.getObject(10)),
						std = toBoolean(rs.getObject(11));

				// get the table:
				TAPTable table = searchTable(tableName, lstTables.iterator());
					if (logger != null)
						logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to find the table of the column \"" + columnName + "\": \"" + tableName + "\"!", null);
					throw new DBException("Impossible to find the table of the column \"" + columnName + "\": \"" + tableName + "\"!");
				}

				// resolve the column type (if any) ; by default, it will be "VARCHAR" if unknown or missing:
				// ...try to resolve the datatype in function of all datatypes declared by the TAP standard.
						tapDatatype = DBDatatype.valueOf(datatype.toUpperCase());
					type = new DBType(DBDatatype.UNKNOWN);
					type = new DBType(tapDatatype, size);

				// create the new column:
				TAPColumn newColumn = new TAPColumn(columnName, type, nullifyIfNeeded(description), nullifyIfNeeded(unit), nullifyIfNeeded(ucd), nullifyIfNeeded(utype));
				newColumn.setPrincipal(principal);
				newColumn.setIndexed(indexed);
				newColumn.setStd(std);
				// set the coordinate system if any is specified:
					if (hasColumnIndex)
						indCoosys++;
					if (hasDBName)
						indCoosys++;
					String coosysId = rs.getString(indCoosys);
						newColumn.setCoosys(mapCoosys.get(coosysId));
						if (logger != null && newColumn.getCoosys() == null)
							logger.logDB(LogLevel.WARNING, this, "LOAD_TAP_SCHEMA", "No coordinate system for the column \"" + columnName + "\"! Cause: unknown coordinate system: \"" + coosysId + "\".", null);
				// force the dbName of TAP_SCHEMA column to be the same as the used one:
				if (STDSchema.TAPSCHEMA.label.equalsIgnoreCase(table.getADQLSchemaName()) && tableDef.getSchema() != null && tableDef.getSchema().getTable(table.getADQLName()) != null && tableDef.getSchema().getTable(table.getADQLName()).getColumn(columnName) != null)
					newColumn.setDBName(tableDef.getSchema().getTable(table.getADQLName()).getColumn(columnName).getDBName());

				// add the new column inside its corresponding table:
				table.addColumn(newColumn);
			}
			if (!isCancelled() && logger != null)
				logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to load columns from TAP_SCHEMA.columns!", se);
			throw new DBException("Impossible to load columns from TAP_SCHEMA.columns!", se);
	/**
	 * <p>Load into the corresponding tables all keys listed in TAP_SCHEMA.keys and detailed in TAP_SCHEMA.key_columns.</p>
	 * 	Tables and columns are searched in the given list by their ADQL name and case sensitively.
	 * 	If they can not be found a {@link DBException} is thrown.
	 * </i></p>
	 * <p><i>Note 2:
	 * 	Key entries are retrieved ordered by ascending key_id, then from_table and finally target_table.
	 * 	Key_Column entries are retrieved ordered by ascending from_column and then target_column.
	 * </i></p>
	 * @param keysDef		Definition of the table TAP_SCHEMA.keys.
	 * @param keyColumnsDef	Definition of the table TAP_SCHEMA.key_columns.
	 * @param lstTables		List of all published tables (= all tables listed in TAP_SCHEMA.tables).
	 * @param stmt			Statement to use in order to interact with the database.
	 * @throws DBException	If a table or a column can not be found, or if any other error occurs while interacting with the database.
	protected void loadKeys(final TAPTable keysDef, final TAPTable keyColumnsDef, final List<TAPTable> lstTables, final Statement stmt) throws DBException {
		ResultSet rs = null;
		PreparedStatement keyColumnsStmt = null;
			// Prepare the query to get the columns of each key:
			StringBuffer sqlBuf = new StringBuffer("SELECT ");
			sqlBuf.append(translator.getColumnName(keyColumnsDef.getColumn("from_column")));
			sqlBuf.append(", ").append(translator.getColumnName(keyColumnsDef.getColumn("target_column")));
			sqlBuf.append(" FROM ").append(translator.getTableName(keyColumnsDef, supportsSchema));
			sqlBuf.append(" WHERE ").append(translator.getColumnName(keyColumnsDef.getColumn("key_id"))).append(" = ?");
			sqlBuf.append(" ORDER BY 1,2");
			keyColumnsStmt = connection.prepareStatement(sqlBuf.toString());

			// Build the SQL query to get the keys:
			sqlBuf.delete(0, sqlBuf.length());
			sqlBuf.append("SELECT ").append(translator.getColumnName(keysDef.getColumn("key_id")));
			sqlBuf.append(", ").append(translator.getColumnName(keysDef.getColumn("from_table")));
			sqlBuf.append(", ").append(translator.getColumnName(keysDef.getColumn("target_table")));
			sqlBuf.append(", ").append(translator.getColumnName(keysDef.getColumn("description")));
			sqlBuf.append(", ").append(translator.getColumnName(keysDef.getColumn("utype")));
			sqlBuf.append(" FROM ").append(translator.getTableName(keysDef, supportsSchema));
			sqlBuf.append(" ORDER BY 1,2,3");

			// Execute the query:
			rs = stmt.executeQuery(sqlBuf.toString());

			// Create all foreign keys:
				String key_id = rs.getString(1), from_table = rs.getString(2),
						target_table = rs.getString(3),
						description = rs.getString(4), utype = rs.getString(5);

				// get the two tables (source and target):
				TAPTable sourceTable = searchTable(from_table, lstTables.iterator());
					if (logger != null)
						logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to find the source table of the foreign key \"" + key_id + "\": \"" + from_table + "\"!", null);
					throw new DBException("Impossible to find the source table of the foreign key \"" + key_id + "\": \"" + from_table + "\"!");
				}
				TAPTable targetTable = searchTable(target_table, lstTables.iterator());
					if (logger != null)
						logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to find the target table of the foreign key \"" + key_id + "\": \"" + target_table + "\"!", null);
					throw new DBException("Impossible to find the target table of the foreign key \"" + key_id + "\": \"" + target_table + "\"!");
				}

				// get the list of columns joining the two tables of the foreign key:
				HashMap<String, String> columns = new HashMap<String, String>();
					keyColumnsStmt.setString(1, key_id);
					rsKeyCols = keyColumnsStmt.executeQuery();
					while(rsKeyCols.next())
						columns.put(rsKeyCols.getString(1), rsKeyCols.getString(2));
					if (!isCancelled() && logger != null)
						logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to load key columns from TAP_SCHEMA.key_columns for the foreign key: \"" + key_id + "\"!", se);
					throw new DBException("Impossible to load key columns from TAP_SCHEMA.key_columns for the foreign key: \"" + key_id + "\"!", se);
					close(rsKeyCols);
				}

				// create and add the new foreign key inside the source table:
					sourceTable.addForeignKey(key_id, targetTable, columns, nullifyIfNeeded(description), nullifyIfNeeded(utype));
					if ((ex instanceof SQLException && !isCancelled()) && logger != null)
						logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to create the foreign key \"" + key_id + "\" because: " + ex.getMessage(), ex);
					throw new DBException("Impossible to create the foreign key \"" + key_id + "\" because: " + ex.getMessage(), ex);
				}
			}
			if (!isCancelled() && logger != null)
				logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to load columns from TAP_SCHEMA.columns!", se);
			throw new DBException("Impossible to load columns from TAP_SCHEMA.columns!", se);
	/* ********************************** */
	/* SETTING TAP_SCHEMA IN THE DATABASE */
	/* ********************************** */

	/**
	 * <p>This function is just calling the following functions:</p>
	 * <ol>
	 * 	<li>{@link #mergeTAPSchemaDefs(TAPMetadata)}</li>
	 * 	<li>{@link #startTransaction()}</li>
	 * 	<li>{@link #resetTAPSchema(Statement, TAPTable[])}</li>
	 * 	<li>{@link #createTAPSchemaTable(TAPTable, Statement)} for each standard TAP_SCHEMA table</li>
	 * 	<li>{@link #fillTAPSchema(TAPMetadata)}</li>
	 * 	<li>{@link #createTAPTableIndexes(TAPTable, Statement)} for each standard TA_SCHEMA table</li>
	 * 	<li>{@link #commit()} or {@link #rollback()}</li>
	 * 	<li>{@link #endTransaction()}</li>
	 * </ol>
	 * <p><i><b>Important note:
	 * 	If the connection does not support transactions, then there will be merely no transaction.
	 * 	Consequently, any failure (exception/error) will not clean the partial modifications done by this function.
	 * </i></p>
	 * @see tap.db.DBConnection#setTAPSchema(tap.metadata.TAPMetadata)
	 */
	public synchronized void setTAPSchema(final TAPMetadata metadata) throws DBCancelledException, DBException {
		// Starting of new query execution => disable the cancel flag:
			// A. GET THE DEFINITION OF ALL STANDARD TAP TABLES:
			TAPTable[] stdTables = mergeTAPSchemaDefs(metadata);

			startTransaction();

			// B. RE-CREATE THE STANDARD TAP_SCHEMA TABLES:

			// 1. Ensure TAP_SCHEMA exists and drop all its standard TAP tables:
			if (logger != null)
				logger.logDB(LogLevel.INFO, this, "CLEAN_TAP_SCHEMA", "Cleaning TAP_SCHEMA.", null);
			// If the query has been aborted, return immediately:
			if (isCancelled())
				throw new DBCancelledException();

			// 2. Create all standard TAP tables:
			if (logger != null)
				logger.logDB(LogLevel.INFO, this, "CREATE_TAP_SCHEMA", "Creating TAP_SCHEMA tables.", null);
				createTAPSchemaTable(table, stmt);
				if (isCancelled())
					throw new DBCancelledException();
			}

			// C. FILL THE NEW TABLE USING THE GIVEN DATA ITERATOR:
			if (logger != null)
				logger.logDB(LogLevel.INFO, this, "CREATE_TAP_SCHEMA", "Filling TAP_SCHEMA tables.", null);
			fillTAPSchema(metadata);

			// D. CREATE THE INDEXES OF ALL STANDARD TAP TABLES:
			if (logger != null)
				logger.logDB(LogLevel.INFO, this, "CREATE_TAP_SCHEMA", "Creating TAP_SCHEMA tables' indexes.", null);
			for(TAPTable table : stdTables)
				createTAPTableIndexes(table, stmt);

			commit();
			if (!isCancelled() && logger != null)
				logger.logDB(LogLevel.ERROR, this, "CREATE_TAP_SCHEMA", "Impossible to SET TAP_SCHEMA in DB!", se);
			if (isCancelled())
				throw new DBCancelledException();
			else
				throw new DBException("Impossible to SET TAP_SCHEMA in DB!", se);
	 * <p>Merge the definition of TAP_SCHEMA tables given in parameter with the definition provided in the TAP standard.</p>
	 * <p>
	 * 	The goal is to get in output the list of all standard TAP_SCHEMA tables. But it must take into account the customized
	 * 	definition given in parameter if there is one. Indeed, if a part of TAP_SCHEMA is not provided, it will be completed here by the
	 * 	definition provided in the TAP standard. And so, if the whole TAP_SCHEMA is not provided at all, the returned tables will be those
	 * 	of the IVOA standard.
	 * </p>
	 * <p><i><b>Important note:</b>
	 * 	If the TAP_SCHEMA definition is missing or incomplete in the given metadata, it will be added or completed automatically
	 * 	by this function with the definition provided in the IVOA TAP standard.
	 * </i></p>
	 * <p><i>Note:
	 * 	Only the standard tables of TAP_SCHEMA are considered. The others are skipped (that's to say: never returned by this function ;
	 *  however, they will stay in the given metadata).
	 * </i></p>
	 * <p><i>Note:
	 * 	If schemas are not supported by this DBMS connection, the DB name of schemas is set to NULL and
	 * 	the DB name of tables is prefixed by the schema name.
	 * @param metadata	Metadata (with or without TAP_SCHEMA schema or some of its table). <i>Must not be NULL</i>
	 * @return	The list of all standard TAP_SCHEMA tables, ordered by creation order (see {@link #getCreationOrder(tap.metadata.TAPMetadata.STDTable)}).
	 * @see TAPMetadata#resolveStdTable(String)
	 * @see TAPMetadata#getStdSchema(boolean)
	 * @see TAPMetadata#getStdTable(STDTable)
	protected TAPTable[] mergeTAPSchemaDefs(final TAPMetadata metadata) {
		// 1. Get the TAP_SCHEMA schema from the given metadata:
		TAPSchema tapSchema = null;
		Iterator<TAPSchema> itSchema = metadata.iterator();
		while(tapSchema == null && itSchema.hasNext()) {
			TAPSchema schema = itSchema.next();
			if (schema.getADQLName().equalsIgnoreCase(STDSchema.TAPSCHEMA.label))
				tapSchema = schema;
		}

		// 2. Get the provided definition of the standard TAP tables:
		TAPTable[] customStdTables = new TAPTable[5];

			/* if the schemas are not supported with this DBMS,
			 * remove its DB name: */
			if (!supportsSchema)
				tapSchema.setDBName(null);
			// retrieve only the standard TAP tables:
			Iterator<TAPTable> itTable = tapSchema.iterator();
				TAPTable table = itTable.next();
				int indStdTable = getCreationOrder(TAPMetadata.resolveStdTable(table.getADQLName()));
				if (indStdTable > -1)
					customStdTables[indStdTable] = table;
			}
		// 3. Build a common TAPSchema, if needed:

			// build a new TAP_SCHEMA definition based on the standard definition:
			tapSchema = TAPMetadata.getStdSchema(supportsSchema);

			// add the new TAP_SCHEMA definition in the given metadata object:
			metadata.addSchema(tapSchema);
		}

		// 4. Finally, build the join between the standard tables and the custom ones:
		TAPTable[] stdTables = new TAPTable[]{ TAPMetadata.getStdTable(STDTable.SCHEMAS), TAPMetadata.getStdTable(STDTable.TABLES), TAPMetadata.getStdTable(STDTable.COLUMNS), TAPMetadata.getStdTable(STDTable.KEYS), TAPMetadata.getStdTable(STDTable.KEY_COLUMNS) };
		for(int i = 0; i < stdTables.length; i++) {
				// add the table to the fetched or built-in schema:
				tapSchema.addTable(stdTables[i]);
			}
			// CASE: custom definition
			else
				stdTables[i] = customStdTables[i];
	/**
	 * <p>Ensure the TAP_SCHEMA schema exists in the database AND it must especially drop all of its standard tables
	 * (schemas, tables, columns, keys and key_columns), if they exist.</p>
	 * <p><i><b>Important note</b>:
	 * 	If TAP_SCHEMA already exists and contains other tables than the standard ones, they will not be dropped and they will stay in place.
	 * </i></p>
	 * @param stmt			The statement to use in order to interact with the database.
	 * @param stdTables		List of all standard tables that must be (re-)created.
	 *                      They will be used just to know the name of the standard tables that should be dropped here.
	 * @throws SQLException	If any error occurs while querying or updating the database.
	 */
	protected void resetTAPSchema(final Statement stmt, final TAPTable[] stdTables) throws SQLException {
		DatabaseMetaData dbMeta = connection.getMetaData();

		// 1. Get the qualified DB schema name:
		String dbSchemaName = (supportsSchema ? stdTables[0].getDBSchemaName() : null);

		/* 2. Test whether the schema TAP_SCHEMA exists
		 *    and if it does not, create it: */
			// test whether the schema TAP_SCHEMA exists:
			boolean hasTAPSchema = isSchemaExisting(dbSchemaName, dbMeta);

			// create TAP_SCHEMA if it does not exist:
			if (!hasTAPSchema)
				stmt.executeUpdate("CREATE SCHEMA " + translator.getQualifiedSchemaName(stdTables[0]));
		}

		// 2-bis. Drop all its standard tables:
		dropTAPSchemaTables(stdTables, stmt, dbMeta);
	}

	/**
	 * <p>Remove/Drop all standard TAP_SCHEMA tables given in parameter.</p>
	 * <p><i>Note:
	 * 	To test the existence of tables to drop, {@link DatabaseMetaData#getTables(String, String, String, String[])} is called.
	 * 	Then the schema and table names are compared with the case sensitivity defined by the translator.
	 * 	Only tables matching with these comparisons will be dropped.
	 * </i></p>
	 * @param stdTables	Tables to drop. (they should be provided ordered by their creation order (see {@link #getCreationOrder(STDTable)})).
	 * @param stmt		Statement to use in order to interact with the database.
	 * @param dbMeta	Database metadata. Used to list all existing tables.
	 * @throws SQLException	If any error occurs while querying or updating the database.
	 * @see JDBCTranslator#isCaseSensitive(IdentifierField)
	private void dropTAPSchemaTables(final TAPTable[] stdTables, final Statement stmt, final DatabaseMetaData dbMeta) throws SQLException {
		String[] stdTablesToDrop = new String[]{ null, null, null, null, null };
			// Retrieve only the schema name and determine whether the search should be case sensitive:
			String tapSchemaName = stdTables[0].getDBSchemaName();
			boolean schemaCaseSensitive = translator.isCaseSensitive(IdentifierField.SCHEMA);
			boolean tableCaseSensitive = translator.isCaseSensitive(IdentifierField.TABLE);

			// Identify which standard TAP tables must be dropped:
			rs = dbMeta.getTables(null, null, null, null);
				String rsSchema = nullifyIfNeeded(rs.getString(2)),
						rsTable = rs.getString(3);
				if (!supportsSchema || (tapSchemaName == null && rsSchema == null) || equals(rsSchema, tapSchemaName, schemaCaseSensitive)) {
					int indStdTable;
					indStdTable = getCreationOrder(isStdTable(rsTable, stdTables, tableCaseSensitive));
						stdTablesToDrop[indStdTable] = (rsSchema != null ? "\"" + rsSchema + "\"." : "") + "\"" + rsTable + "\"";
					}
				}
			}
			close(rs);
		}

		// Drop the existing tables (in the reverse order of creation):
		for(int i = stdTablesToDrop.length - 1; i >= 0; i--) {
				stmt.executeUpdate("DROP TABLE " + stdTablesToDrop[i]);
	/**
	 * <p>Create the specified standard TAP_SCHEMA tables into the database.</p>
	 * <p><i><b>Important note:</b>
	 * 	Only standard TAP_SCHEMA tables (schemas, tables, columns, keys and key_columns) can be created here.
	 * 	If the given table is not part of the schema TAP_SCHEMA (comparison done on the ADQL name case-sensitively)
	 * 	and is not a standard TAP_SCHEMA table (comparison done on the ADQL name case-sensitively),
	 * 	this function will do nothing and will throw an exception.
	 * </i></p>
	 * <p><i>Note:
	 * 	An extra column is added in TAP_SCHEMA.schemas, TAP_SCHEMA.tables and TAP_SCHEMA.columns: {@value #DB_NAME_COLUMN}.
	 * 	This column is particularly used when getting the TAP metadata from the database to alias some schema, table and/or column names in ADQL.
	 * @param table	Table to create.
	 * @param stmt	Statement to use in order to interact with the database.
	 * @throws DBException	If the given table is not a standard TAP_SCHEMA table.
	 * @throws SQLException	If any error occurs while querying or updating the database.
	 */
	protected void createTAPSchemaTable(final TAPTable table, final Statement stmt) throws DBException, SQLException {
		// 1. ENSURE THE GIVEN TABLE IS REALLY A TAP_SCHEMA TABLE (according to the ADQL names):
		if (!table.getADQLSchemaName().equalsIgnoreCase(STDSchema.TAPSCHEMA.label) || TAPMetadata.resolveStdTable(table.getADQLName()) == null)
			throw new DBException("Forbidden table creation: " + table + " is not a standard table of TAP_SCHEMA!");

		// 2. BUILD THE SQL QUERY TO CREATE THE TABLE:
		StringBuffer sql = new StringBuffer("CREATE TABLE ");
		// a. Write the fully qualified table name:
		sql.append(translator.getTableName(table, supportsSchema));

		// b. List all the columns:
		sql.append('(');
		Iterator<TAPColumn> it = table.getColumns();
			TAPColumn col = it.next();

			// column name:
			sql.append(translator.getColumnName(col));

			// column type:
			sql.append(' ').append(convertTypeToDB(col.getDatatype()));
		// b bis. Add the extra dbName column (giving the database name of a schema, table or column):
		if ((supportsSchema && table.getADQLName().equalsIgnoreCase(STDTable.SCHEMAS.label)) || table.getADQLName().equalsIgnoreCase(STDTable.TABLES.label) || table.getADQLName().equalsIgnoreCase(STDTable.COLUMNS.label))
			sql.append(',').append(DB_NAME_COLUMN).append(" VARCHAR");

		// c. Append the primary key definition, if needed:
		String primaryKey = getPrimaryKeyDef(table.getADQLName());
		if (primaryKey != null)
			sql.append(',').append(primaryKey);

		// d. End the query:

		// 3. FINALLY CREATE THE TABLE:
		stmt.executeUpdate(sql.toString());
	}

	/**
	 * <p>Get the primary key corresponding to the specified table.</p>
	 * <p>If the specified table is not a standard TAP_SCHEMA table, NULL will be returned.</p>
	 * @param tableName	ADQL table name.
	 * @return	The primary key definition (prefixed by a space) corresponding to the specified table (ex: " PRIMARY KEY(schema_name)"),
	 *          or NULL if the specified table is not a standard TAP_SCHEMA table.
	 */
	private String getPrimaryKeyDef(final String tableName) {
		STDTable stdTable = TAPMetadata.resolveStdTable(tableName);
		if (stdTable == null)
			return null;

		boolean caseSensitive = translator.isCaseSensitive(IdentifierField.COLUMN);
			case SCHEMAS:
				return " PRIMARY KEY(" + (caseSensitive ? "\"schema_name\"" : "schema_name") + ")";
			case TABLES:
				return " PRIMARY KEY(" + (caseSensitive ? "\"table_name\"" : "table_name") + ")";
			case COLUMNS:
				return " PRIMARY KEY(" + (caseSensitive ? "\"table_name\"" : "table_name") + ", " + (caseSensitive ? "\"column_name\"" : "column_name") + ")";
			case KEYS:
			case KEY_COLUMNS:
				return " PRIMARY KEY(" + (caseSensitive ? "\"key_id\"" : "key_id") + ")";
			default:
				return null;
		}
	}

	/**
	 * <p>Create the DB indexes corresponding to the given TAP_SCHEMA table.</p>
	 * <p><i><b>Important note:</b>
	 * 	Only standard TAP_SCHEMA tables (schemas, tables, columns, keys and key_columns) can be created here.
	 * 	If the given table is not part of the schema TAP_SCHEMA (comparison done on the ADQL name case-sensitively)
	 * 	and is not a standard TAP_SCHEMA table (comparison done on the ADQL name case-sensitively),
	 * 	this function will do nothing and will throw an exception.
	 * </i></p>
	 * @param table	Table whose indexes must be created here.
	 * @param stmt	Statement to use in order to interact with the database.
	 * @throws DBCancelledException	If {@link #cancel(boolean)} has been called during the processing,
	 * @throws DBException			If the given table is not a standard TAP_SCHEMA table.
	 * @throws SQLException			If any error occurs while querying or updating the database.
	protected void createTAPTableIndexes(final TAPTable table, final Statement stmt) throws DBCancelledException, DBException, SQLException {
		// 1. Ensure the given table is really a TAP_SCHEMA table (according to the ADQL names):
		if (!table.getADQLSchemaName().equalsIgnoreCase(STDSchema.TAPSCHEMA.label) || TAPMetadata.resolveStdTable(table.getADQLName()) == null)
			throw new DBException("Forbidden index creation: " + table + " is not a standard table of TAP_SCHEMA!");

		// Build the fully qualified DB name of the table:
		final String dbTableName = translator.getTableName(table, supportsSchema);
		// Build the name prefix of all the indexes to create:
		final String indexNamePrefix = "INDEX_" + ((table.getADQLSchemaName() != null) ? (table.getADQLSchemaName() + "_") : "") + table.getADQLName() + "_";

		Iterator<TAPColumn> it = table.getColumns();
			TAPColumn col = it.next();
			// Create an index only for columns that have the 'indexed' flag:
			if (col.isIndexed() && !isPartOfPrimaryKey(col.getADQLName()))
				stmt.executeUpdate("CREATE INDEX " + indexNamePrefix + col.getADQLName() + " ON " + dbTableName + "(" + translator.getColumnName(col) + ")");
			// If the query has been aborted, return immediately:
			if (isCancelled())
				throw new DBCancelledException();
		}
	}

	/**
	 * Tell whether the specified column is part of the primary key of its table.
	 * @param adqlName	ADQL name of a column.
	 * @return	<i>true</i> if the specified column is part of the primary key,
	 *          <i>false</i> otherwise.
	 */
	private boolean isPartOfPrimaryKey(final String adqlName) {
		if (adqlName == null)
			return false;
		else
			return (adqlName.equalsIgnoreCase("schema_name") || adqlName.equalsIgnoreCase("table_name") || adqlName.equalsIgnoreCase("column_name") || adqlName.equalsIgnoreCase("key_id"));
	}

	/**
	 * <p>Fill all the standard tables of TAP_SCHEMA (schemas, tables, columns, keys and key_columns).</p>
	 * <p>This function just call the following functions:</p>
	 * <ol>
	 * 	<li>{@link #fillSchemas(TAPTable, Iterator)}</li>
	 * 	<li>{@link #fillTables(TAPTable, Iterator)}</li>
	 * 	<li>{@link #fillColumns(TAPTable, Iterator)}</li>
	 * 	<li>{@link #fillKeys(TAPTable, TAPTable, Iterator)}</li>
	 * </ol>
	 * @param meta	All schemas and tables to list inside the TAP_SCHEMA tables.
	 * @throws DBCancelledException	If {@link #cancel(boolean)} has been called during the processing,
	 * @throws DBException			If rows can not be inserted because the SQL update query has failed.
	 * @throws SQLException			If any other SQL exception occurs.
	protected void fillTAPSchema(final TAPMetadata meta) throws SQLException, DBCancelledException, DBException {