Skip to content
PostgresDBBroker.java 19.4 KiB
Newer Older
/*
 * _____________________________________________________________________________
 * 
 * INAF - OATS National Institute for Astrophysics - Astronomical Observatory of
 * Trieste INAF - IA2 Italian Center for Astronomical Archives
 * _____________________________________________________________________________
 * 
 * Copyright (C) 2017 Istituto Nazionale di Astrofisica
 * 
 * This program is free software; you can redistribute it and/or modify it under
 * the terms of the GNU General Public License Version 3 as published by the
 * Free Software Foundation.
 * 
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
 * details.
 * 
 * You should have received a copy of the GNU General Public License along with
 * this program; if not, write to the Free Software Foundation, Inc., 51
 * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 */
package it.inaf.ia2.tsm.datalayer.pgsql;

import it.inaf.ia2.tsm.Column;
import it.inaf.ia2.tsm.Key;
import it.inaf.ia2.tsm.TapSchema;
import it.inaf.ia2.tsm.datalayer.ADQL;
import it.inaf.ia2.tsm.datalayer.DBBrokerTemplate;
import it.inaf.ia2.tsm.datalayer.DataTypeMode;
import it.inaf.ia2.tsm.model.ColumnModel;
import it.inaf.ia2.tsm.model.TableModel;
import it.inaf.ia2.tsm.model.TypesMapping;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 * @author Sonia Zorba {@literal <zorba at oats.inaf.it>}
 */
public class PostgresDBBroker extends DBBrokerTemplate {
    private final static Logger LOG = LoggerFactory.getLogger(PostgresDBBroker.class);
    private final String pgDatabaseName;
    public PostgresDBBroker(DataSource dataSource, String pgDatabaseName, DataTypeMode mode) {
        super(dataSource, '"', mode);
        this.pgDatabaseName = pgDatabaseName;
    @Override
    protected String getDataTypeFromADQLType(String adqlType) {
        return TypesMapping.getPostgresSQLTypeFromADQLType(adqlType);
    }

    private boolean enumTypeExists(String enumName) throws SQLException {

        String query = "SELECT 1 FROM pg_type WHERE typname = ?";

        try (Connection conn = dataSource.getConnection();
                PreparedStatement ps = conn.prepareStatement(query)) {
            ps.setString(1, enumName);

            LOG.debug("Executing query {}", query);
            try (ResultSet rs = ps.executeQuery()) {
                while (rs.next()) {
                    return true;
                }
            }
        }
        return false;
    }

    private void createEnumType(String enumName, List<String> values) throws SQLException {

        String query = String.format("CREATE TYPE %s AS ENUM (%s)", enumName, getEnumDefinition(values));

        try (Connection conn = dataSource.getConnection();
                Statement stat = conn.createStatement()) {

            LOG.debug("Executing query {}", query);
            stat.executeUpdate(query);
        }
    }

    private String getEnumName(String schemaName, String tableName, String columnName) {
        // WARNING: for some reasons enum names are stored ONLY lower case in Postgres
        return String.format("TASMAN_%s_%s_%s", schemaName, tableName, columnName).toLowerCase();
    }

    protected void createTable(String schemaName, TableModel tableModel, Connection conn) throws SQLException {

        // Checking for enum
        Map<String, String> enumNames = new HashMap<>();
        for (ColumnModel column : tableModel.getColumns()) {
            if (column.getEnumValues() != null) {
                String enumName = getEnumName(schemaName, tableModel.getName(), column.getName());
                if (!enumTypeExists(enumName)) {
                    createEnumType(enumName, column.getEnumValues());
                }
                enumNames.put(column.getName(), enumName);
            }
        }

        StringBuilder querySb = new StringBuilder();
        querySb.append("CREATE TABLE IF NOT EXISTS ");
        querySb.append(escape(schemaName));
        querySb.append(".");
        querySb.append(escape(tableModel.getName()));
        querySb.append(" (\n");
        for (ColumnModel cm : tableModel.getColumns()) {
            if (cm.isMandatory()) {
                if (!first) {
                    querySb.append(",\n");
                }
                first = false;

                querySb.append(escape(cm.getName()));
                querySb.append(" ");

                if (cm.getEnumValues() == null) {
                    String pgsqlType = TypesMapping.getPostgresSQLTypeFromADQLType(cm.getType()).toLowerCase();
                    querySb.append(pgsqlType);
                    if (cm.getSize() != null) {
                        appendSize(querySb, cm.getSize());
                    }
                } else {
                    querySb.append(enumNames.get(cm.getName()));
                }

                if (cm.isNullable()) {
                    querySb.append(" NULL");
                } else {
                    querySb.append(" NOT NULL");
                }
        appendCheckConstraints(querySb, tableModel);
        String query = querySb.toString();
        try (Statement stat = conn.createStatement()) {
            LOG.debug("Executing query: {}", query);
            stat.executeUpdate(query);
        }
    protected String getAddPrimaryKeyQuery(String tapSchemaName, String tableName, String[] keyColumns) {
        return String.format("ALTER TABLE ONLY %s.%s ADD CONSTRAINT %s_pkey PRIMARY KEY (%s)",
                escape(tapSchemaName), escape(tableName), tableName, buildColumnsList(keyColumns));
    protected String getAddForeignKeyQuery(String tapSchemaName, String tableName, String[] fromKeyColumns, String targetTableName, String[] toKeyColumns) {
        // Building univocal constraint name
        StringBuilder constraintNameSb = new StringBuilder(tableName);
        for (String fromKeyColumn : fromKeyColumns) {
            constraintNameSb.append("_");
            constraintNameSb.append(fromKeyColumn);
        }
        return String.format("ALTER TABLE ONLY %s.%s ADD CONSTRAINT fk_%s FOREIGN KEY (%s) REFERENCES %s.%s(%s)",
                escape(tapSchemaName), escape(tableName), constraintNameSb.toString(), buildColumnsList(fromKeyColumns),
                escape(tapSchemaName), escape(targetTableName), buildColumnsList(toKeyColumns));
    protected String getCreateDatabaseQuery(String databaseName) {
        return "CREATE SCHEMA IF NOT EXISTS " + escape(databaseName);
    protected String getSchemaTablesQuery(String schemaName) {
        return String.format("SELECT tablename FROM pg_catalog.pg_tables where schemaname = '%s'", schemaName);
    /**
     * Example: 5x5x4 is approximated to *x*x*, because it seems there is no
     * easy way to retrieve that numbers.
     */
    private String formatArraySize(int arrayDimension) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < arrayDimension; i++) {
            if (i > 0) {
                sb.append("x");
            }
            sb.append("*");
        }
        return sb.toString();
    }
    public Map<String, Map<String, Object>> getAllColumnsOriginalMetadata(String schemaName, String tableSimpleName) throws SQLException {
        Map<String, Map<String, Object>> allColumnsMetadata = new HashMap<>();
        StringBuilder querySb = new StringBuilder();
        querySb.append("SELECT c.column_name, c.data_type, pg_catalog.format_type(a.atttypid, a.atttypmod), r.contype AS column_type, c.character_maximum_length, c.numeric_precision, a.attndims AS arraydim\n");
        querySb.append("FROM information_schema.columns c\n");
        querySb.append("JOIN pg_catalog.pg_tables t ON c.table_schema = t.schemaname AND c.table_name = t.tablename\n");
        querySb.append("LEFT JOIN pg_catalog.pg_constraint r ON c.ordinal_position = ANY(r.conkey) AND r.conrelid = (t.schemaname || '.' || t.tablename)::regclass::oid\n");
        querySb.append("LEFT JOIN pg_catalog.pg_attribute a ON a.attrelid = (t.schemaname || '.' || t.tablename)::regclass::oid and a.attname = c.column_name\n");
        querySb.append("WHERE t.schemaname = '");
        querySb.append(schemaName);
        querySb.append("' AND t.tablename = '");
        querySb.append(tableSimpleName);
        querySb.append("'");
        String query = querySb.toString();
        LOG.debug("Executing query {}", query);
        try (Connection conn = dataSource.getConnection();
                Statement statement = conn.createStatement();
                ResultSet resultSet = statement.executeQuery(query)) {
            while (resultSet.next()) {
                Map<String, Object> cm = new HashMap<>();

                // Column name
                String columnName = resultSet.getString("column_name");
                cm.put(Column.COLUMN_NAME_KEY, columnName);

                // Key info
                String columnType = resultSet.getString("column_type");
                boolean primaryKey = false;
                boolean indexed = false;
                if (columnType != null) {
                    primaryKey = "p".equals(columnType);
                    if ("p".equals(columnType)
                            || "f".equals(columnType)
                            || "u".equals(columnType)) {
                        indexed = true;
                    }
                }
                cm.put(Column.PRIMARY_KEY, primaryKey);
                cm.put(Column.INDEXED_KEY, indexed);
                Integer size = null;
                int arraydimension = 0;
                String dbType = resultSet.getString("data_type").toUpperCase();
                    isArray = true;
                    // example: integer array has data_type ARRAY and format_type integer[]
                    dbType = resultSet.getString("format_type").toUpperCase();
                    // example: an array defined as integer[5][5] has arraydim = 2
                    // unfortunately it seems there is no easy way to get also the 
                    // numbers inside brakets, so this case will be approximated to *x*
                    arraydimension = resultSet.getInt("arraydim");
                }
                ADQL adqlType = TypesMapping.getADQLFromDBType(dbType);
                String datatype = TypesMapping.getDataTypeFromPostgresType(dbType, getDataTypeMode());

                if (!isArray && (ADQL.VARCHAR.equals(adqlType) || ADQL.CHAR.equals(adqlType))) {
                    size = resultSet.getInt("character_maximum_length");
                cm.put(Column.DATATYPE_KEY, datatype);
                cm.put(Column.SIZE_KEY, size);
                String arraySize = "*";
                if (isArray) {
                    arraySize = formatArraySize(arraydimension);
                } else if (size != null) {
                    arraySize = String.valueOf(size);
                    // variable length columns must have a "*" symbol on arraysize
                    if (adqlType != null && ADQL.isVariable(adqlType)) {
                cm.put(Column.ARRAYSIZE_KEY, arraySize);
                allColumnsMetadata.put(columnName, cm);
            }
        }
        return allColumnsMetadata;
    public List<Key> getKeys(TapSchema tapSchema, String schemaName, String realSchemaName) throws SQLException {
        StringBuilder queryKeysSb = new StringBuilder();
        queryKeysSb.append("SELECT\n");
        queryKeysSb.append("conname AS constraint_name,\n");
        queryKeysSb.append("conrelid::regclass AS from_table, \n");
        queryKeysSb.append("confrelid::regclass AS target_table\n");
        queryKeysSb.append("FROM pg_catalog.pg_constraint\n");
        queryKeysSb.append("WHERE contype = 'f'\n");
        queryKeysSb.append("AND ((conrelid::regclass || '' LIKE '");
        queryKeysSb.append(realSchemaName);
        queryKeysSb.append(".%')\n");
        queryKeysSb.append("OR (confrelid::regclass || '' LIKE '");
        queryKeysSb.append(realSchemaName);
        queryKeysSb.append(".%'))");
        String queryKeys = queryKeysSb.toString();
        try (Connection connection = dataSource.getConnection();
                Statement statement = connection.createStatement();
                ResultSet resultSet = statement.executeQuery(queryKeys)) {
            LOG.debug("Executing query {}", queryKeys);
            List<Key> keys = new ArrayList<>();
            while (resultSet.next()) {
                String constraintName = resultSet.getString("constraint_name");
                String fromTableCompleteName = resultSet.getString("from_table");
                String[] splitFrom = fromTableCompleteName.split(Pattern.quote("."));
                String targetTableCompleteName = resultSet.getString("target_table");
                String[] splitTarget = targetTableCompleteName.split(Pattern.quote("."));

                String fromSchema = splitFrom[0];
                String fromTable = splitFrom[1];
                String targetSchema = splitTarget[0];
                String targetTable = splitTarget[1];
                String exposedFromSchemaName = getExposedSchemaName(schemaName, realSchemaName, fromSchema);
                String exposedTargetSchemaName = getExposedSchemaName(schemaName, realSchemaName, targetSchema);
                Map<String, Object> keyMetadata = new HashMap<>();
                keyMetadata.put(Key.FROM_TABLE_KEY, String.format("%s.%s", exposedFromSchemaName, fromTable));
                keyMetadata.put(Key.TARGET_TABLE_KEY, String.format("%s.%s", exposedTargetSchemaName, targetTable));
                Key key = new Key(tapSchema, keyMetadata);
                keys.add(key);
                StringBuilder queryFromKCSb = new StringBuilder();
                queryFromKCSb.append("SELECT\n");
                queryFromKCSb.append("c.column_name AS key_column\n");
                queryFromKCSb.append("FROM information_schema.columns c\n");
                queryFromKCSb.append("JOIN pg_catalog.pg_constraint r ON c.ordinal_position = ANY(r.conkey)\n");
                queryFromKCSb.append("AND (c.table_schema || '.' || c.table_name) = (r.conrelid::regclass || '')\n");
                queryFromKCSb.append("WHERE r.conname = '");
                queryFromKCSb.append(constraintName);
                queryFromKCSb.append("' AND r.contype = 'f'\n");
                queryFromKCSb.append("AND c.table_schema = '");
                queryFromKCSb.append(fromSchema);
                queryFromKCSb.append("'\nAND table_catalog = '");
                queryFromKCSb.append(pgDatabaseName);
                queryFromKCSb.append("'");

                // conkey conrelid
                String queryFromKC = queryFromKCSb.toString();
                StringBuilder queryTargetKCSb = new StringBuilder();
                queryTargetKCSb.append("SELECT\n");
                queryTargetKCSb.append("c.column_name AS key_column\n");
                queryTargetKCSb.append("FROM information_schema.columns c\n");
                queryTargetKCSb.append("JOIN pg_catalog.pg_constraint r ON c.ordinal_position = ANY(r.confkey)\n");
                queryTargetKCSb.append("AND (c.table_schema || '.' || c.table_name) = (r.confrelid::regclass || '')\n");
                queryTargetKCSb.append("WHERE r.conname = '");
                queryTargetKCSb.append(constraintName);
                queryTargetKCSb.append("' AND r.contype = 'f'\n");
                queryTargetKCSb.append("AND c.table_schema = '");
                queryTargetKCSb.append(targetSchema);
                queryTargetKCSb.append("'\nAND table_catalog = '");
                queryTargetKCSb.append(pgDatabaseName);
                queryTargetKCSb.append("'");

                // as above, but with confkey and confrelid and different c.table_schema where condition
                String queryTargetKC = queryTargetKCSb.toString();
                try (Statement statFromKC = connection.createStatement();
                        Statement statTargetKC = connection.createStatement()) {
                    try (ResultSet rsFromKC = statFromKC.executeQuery(queryFromKC);
                            ResultSet rsTargetKC = statTargetKC.executeQuery(queryTargetKC)) {
                        LOG.debug("Executing query {}", queryFromKC);
                        LOG.debug("Executing query {}", queryTargetKC);
                        while (rsFromKC.next()) {
                            if (rsTargetKC.next()) {
                                key.addKeyColumn(
                                        rsFromKC.getString("key_column"),
                                        rsTargetKC.getString("key_column")
                                );
                            }
                        }
                    }
                }
            }
    @Override
    protected String getTableTypesQuery(String schemaName) {
        StringBuilder sb = new StringBuilder();
        sb.append("SELECT tablename AS table_name, 'table' AS table_type\n");
        sb.append("FROM pg_catalog.pg_tables WHERE schemaname = '");
        sb.append(schemaName);
        sb.append("'\n");
        sb.append("UNION\n");
        sb.append("SELECT table_name AS table_name, 'view' AS table_type\n");
        sb.append("FROM INFORMATION_SCHEMA.views\n");
        sb.append("WHERE table_schema = '");
        sb.append(schemaName);
        sb.append("'");
        return sb.toString();
    }
    @Override
    protected String getColumnNamesQuery(String tapSchemaName, String tableName) {
        return String.format("SELECT column_name FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s'",
                tapSchemaName, tableName);
    }
    @Override
    protected String getAllSchemaNamesQuery() {
        return "SELECT schema_name FROM information_schema.schemata";
    }
    @Override
    protected String getAllTablesNamesQuery(String schemaName) {
        return String.format("SELECT tablename FROM pg_catalog.pg_tables where schemaname = '%s'", schemaName);
    }

    @Override
    public void alterDataType(String schemaName, String tableName, String columnName, String adqlDataType, Integer size) throws SQLException {

        String psqlDataType = TypesMapping.getPostgresSQLTypeFromADQLType(adqlDataType);
        if (size != null) {
            psqlDataType = String.format("%s(%s)", psqlDataType, size);
        }

        String query = String.format("ALTER TABLE %s.%s ALTER COLUMN %s TYPE %s", escape(schemaName), escape(tableName), escape(columnName), psqlDataType);

        try (Connection connection = dataSource.getConnection();
                Statement statement = connection.createStatement()) {
            LOG.debug("Executing query: {}", query);
            statement.executeUpdate(query);
        }
    }