Commit 4e559f93 authored by Massimo Costantini's avatar Massimo Costantini
Browse files

added /sync endpoint

parent a9331794
Loading
Loading
Loading
Loading
+124 −3
Original line number Diff line number Diff line
// IVOA
// IVOA implementation

// IVOA requirements:
// - npm install uuid

import { InternalError, IO, performIO } from "./Lib/IO.js";
import { putStr, readFile } from "./Lib/IO.js";

import { connPostgreSQL, fetchRows, quitPostgreSQL } from "./Lib/PostgreSQL.js";
import {
  connPostgreSQL,
  fetchQueryResult,
  quitPostgreSQL,
} from "./Lib/PostgreSQL.js";

import { connRedis, setRedis, quitRedis } from "./Lib/Redis.js";

import { startRESTServer, readRESTPort } from "./Lib/REST.js";

import { buildVOTablesFromRows } from "./Lib/VOTable.js";

import xmlbuilder from "xmlbuilder";

import { v4 as uuidv4 } from "uuid";

/**
 * Serves the static /tap HTML landing page (functional signature: Request -> IO -> IO String).
 *
@@ -44,7 +57,6 @@ async function IVOA_capabilitiesEndpoint(req, IO) {
}

/**
 * IVOA_tablesEndpoint : Request -> IO -> IO String
 * Executes an SQL query from file and returns a VOTable representation (functional signature: Request -> IO -> IO String).
 *
 * @param {Request} req - Incoming HTTP request
@@ -54,7 +66,7 @@ async function IVOA_capabilitiesEndpoint(req, IO) {
async function IVOA_tablesEndpoint(req, IO) {
  const query = await readFile("Query/tables.sql")(IO);
  const conn = await connPostgreSQL()(IO);
  const result = await fetchRows(conn, query, [])(IO);
  const result = await fetchQueryResult(conn, query, [])(IO);
  await quitPostgreSQL(conn)(IO);

  if (result.tag === "nothing") {
@@ -64,6 +76,114 @@ async function IVOA_tablesEndpoint(req, IO) {
  return buildVOTablesFromRows(result.value);
}

/**
 * Handles synchronous TAP queries (functional signature: Request -> IO -> IO String).
 * Executes a SQL query and returns results in VOTable format.
 *
 * @param {Request} req - Incoming HTTP request
 * @param {Symbol} IO - IO token
 * @returns {Promise<string>} - XML content in VOTable format or error message
 */
async function IVOA_syncEndpoint(req, IO) {
  const query = req.method === "GET" ? req.query?.QUERY : req.body?.QUERY;
  const requestType =
    req.method === "GET" ? req.query?.request : req.body?.request || "doQuery";
  const clientId =
    (req.method === "GET" ? req.query?.clientId : req.body?.clientId) ||
    uuidv4();

  if (!query) {
    return makeSyncError("Missing QUERY parameter");
  }

  if (requestType !== "doQuery") {
    return makeSyncError(
      'The parameter "request" must be "doQuery" or omitted',
    );
  }

  const conn = await connPostgreSQL()(IO);
  const result = await fetchQueryResult(conn, query, [])(IO);
  await quitPostgreSQL(conn)(IO);

  if (result.tag === "nothing") {
    return makeSyncError("No results.");
  }

  const { fields, rows } = result.value;

  if (!Array.isArray(rows)) {
    throw new InternalError("BUG: rows is not iterable");
  }
  const redis = await connRedis()(IO);

  const votable = xmlbuilder
    .create("VOTABLE", { encoding: "UTF-8" })
    .att("version", "1.3")
    .att("xmlns", "http://www.ivoa.net/xml/VOTable/v1.3");

  const resource = votable.ele("RESOURCE", { type: "results" });
  const table = resource.ele("TABLE");

  fields.forEach((f) =>
    table.ele("FIELD", {
      name: f.name,
      datatype: "char",
      arraysize: "*",
    }),
  );

  table.ele("FIELD", { name: "access_url", datatype: "char", arraysize: "*" });

  const data = table.ele("DATA").ele("TABLEDATA");

  for (const row of rows) {
    const rowUUID = uuidv4();
    const accessUrl = `https://.../datalink?id=${rowUUID}&clientId=${clientId}`;

    await setRedis(redis, {
      key: `client:${clientId}:query:${rowUUID}`,
      value: JSON.stringify({ ...row, access_url: accessUrl }),
    })(IO);

    const tr = data.ele("TR");

    try {
      fields.forEach((f) => {
        tr.ele("TD", row[f.name] ?? "");
      });
      tr.ele("TD", accessUrl);
    } catch (e) {
      console.error("ERROR inside row mapping:", e);
      throw new InternalError("BUG: row is not iterable or not valid object");
    }
  }

  await quitRedis(redis)(IO);
  return votable.end({ pretty: true });
}

/**
 * Generates a VOTable error document for /tap/sync (functional signature: String -> String).
 *
 * @param {string} message - Error message to embed in VOTable
 * @returns {string} - XML error in VOTable format
 */
function makeSyncError(message) {
  return xmlbuilder
    .create("VOTABLE", { encoding: "UTF-8" })
    .att("version", "1.3")
    .att("xmlns", "http://www.ivoa.net/xml/VOTable/v1.3")
    .att(
      "xsi:schemaLocation",
      "http://www.ivoa.net/xml/VOTable/v1.3 http://www.ivoa.net/xml/VOTable/v1.3",
    )
    .att("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
    .ele("RESOURCE", { type: "results" })
    .ele("INFO", { name: "QUERY_STATUS", value: "ERROR" }, message)
    .end({ pretty: true });
}

/**
 * Main entry point for the IVOA TAP and DataLink server (functional signature: IO -> IO ()).
 *
@@ -77,6 +197,7 @@ async function IVOA_main(IO) {
    { path: "/tap/availability", endpoint: IVOA_availabilityEndpoint },
    { path: "/tap/capabilities", endpoint: IVOA_capabilitiesEndpoint },
    { path: "/tap/tables", endpoint: IVOA_tablesEndpoint },
    { path: "/tap/sync", endpoint: IVOA_syncEndpoint },
  ])(IO);
  await putStr("IVOA server started")(IO);
}
+133 −6
Original line number Diff line number Diff line
# IVOA
# IVOA implementation

import asyncio

from Lib.IO import InternalError, IO, perform_io
from Lib.IO import put_str, read_file

from Lib.PostgreSQL import conn_postgresql, fetch_rows, quit_postgresql
from Lib.PostgreSQL import conn_postgresql, fetch_query_result, quit_postgresql

from Lib.Redis import conn_redis, set_redis, quit_redis

from Lib.REST import start_rest_server, read_rest_port

from Lib.VOTable import build_votables_from_rows

import asyncio

import uuid

import xml.etree.ElementTree as ET

from xml.dom import minidom


async def ivoa_tap_endpoint(req, io):
    """
@@ -68,13 +76,131 @@ async def ivoa_tables_endpoint(req, io):
    """
    query = await read_file("Query/tables.sql")(io)
    conn = await conn_postgresql()(io)
    result = await fetch_rows(conn, query, [])(io)
    result = await fetch_query_result(conn, query, [])(io)

    await quit_postgresql(conn)(io)

    if result["tag"] == "nothing":
        return '<?xml version="1.0" encoding="UTF-8"?><error>No tables found</error>'

    return build_votables_from_rows(result["value"])
    return build_votables_from_rows(result["value"]["fields"], result["value"]["rows"])


async def make_sync_error(message: str) -> str:
    """
    Generates a VOTable error document for /tap/sync.

    Args:
        message (str): Error message to embed in the VOTable.

    Returns:
        str: XML error in VOTable format.
    """
    votable = ET.Element(
        "VOTABLE",
        {
            "version": "1.3",
            "xmlns": "http://www.ivoa.net/xml/VOTable/v1.3",
            "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance",
            "xsi:schemaLocation": "http://www.ivoa.net/xml/VOTable/v1.3 http://www.ivoa.net/xml/VOTable/v1.3",
        },
    )
    resource = ET.SubElement(votable, "RESOURCE", {"type": "results"})
    ET.SubElement(resource, "INFO", {"name": "QUERY_STATUS", "value": "ERROR"}).text = (
        message
    )

    xml_str = ET.tostring(votable, encoding="utf-8")
    return minidom.parseString(xml_str).toprettyxml(indent="  ")


async def ivoa_sync_endpoint(req, io):
    """
    Handles synchronous TAP queries (/tap/sync).
    Executes a SQL query and returns results in VOTable format, including Redis storage for access URLs.

    Args:
        req: Incoming HTTP request.
        io: IO token.

    Returns:
        str: XML VOTable result or error response.
    """
    method = req.method
    query = req.query.get("QUERY") if method == "GET" else req.post.get("QUERY")
    request_type = (
        req.query.get("request")
        if method == "GET"
        else req.post.get("request", "doQuery")
    )
    client_id = (
        req.query.get("clientId")
        if method == "GET"
        else req.post.get("clientId", str(uuid.uuid4()))
    )

    if not query:
        return await make_sync_error("Missing QUERY parameter")

    if request_type != "doQuery":
        return await make_sync_error(
            'The parameter "request" must be "doQuery" or omitted'
        )

    conn = await conn_postgresql()(io)
    result = await fetch_query_result(conn, query, [])(io)
    await quit_postgresql(conn)(io)

    if result["tag"] == "nothing":
        return await make_sync_error("No results.")

    fields = result["value"]["fields"]
    rows = result["value"]["rows"]
    if not isinstance(rows, list):
        raise InternalError("BUG: rows is not iterable")

    redis = await conn_redis()(io)

    votable = ET.Element(
        "VOTABLE", {"version": "1.3", "xmlns": "http://www.ivoa.net/xml/VOTable/v1.3"}
    )
    resource = ET.SubElement(votable, "RESOURCE", {"type": "results"})
    table = ET.SubElement(resource, "TABLE")

    for field in fields:
        ET.SubElement(
            table,
            "FIELD",
            {"name": field["name"], "datatype": "char", "arraysize": "*"},
        )

    ET.SubElement(
        table, "FIELD", {"name": "access_url", "datatype": "char", "arraysize": "*"}
    )

    data = ET.SubElement(ET.SubElement(table, "DATA"), "TABLEDATA")

    for row in rows:
        tr = ET.SubElement(data, "TR")
        row_uuid = str(uuid.uuid4())
        access_url = f"https://.../datalink?id={row_uuid}&clientId={client_id}"

        await set_redis(redis)(
            {
                "key": f"client:{client_id}:query:{row_uuid}",
                "value": str({**row, "access_url": access_url}),
            }
        )(io)

        for field in fields:
            val = str(row.get(field["name"], ""))
            ET.SubElement(tr, "TD").text = val
        ET.SubElement(tr, "TD").text = access_url

    await quit_redis(redis)(io)

    xml_str = ET.tostring(votable, encoding="utf-8")
    return minidom.parseString(xml_str).toprettyxml(indent="  ")


async def ivoa_main(io):
@@ -87,7 +213,7 @@ async def ivoa_main(io):
    Returns:
        Awaitable[None]: Starts the server and binds all routes.
    """
    port = read_rest_port(io)
    port = await read_rest_port()(io)
    await start_rest_server(
        port,
        [
@@ -95,6 +221,7 @@ async def ivoa_main(io):
            {"path": "/tap/availability", "endpoint": ivoa_availability_endpoint},
            {"path": "/tap/capabilities", "endpoint": ivoa_capabilities_endpoint},
            {"path": "/tap/tables", "endpoint": ivoa_tables_endpoint},
            {"path": "/tap/sync", "endpoint": ivoa_sync_endpoint},
        ],
    )(io)
    await put_str("IVOA server started")(io)
+8 −13
Original line number Diff line number Diff line
@@ -2,7 +2,6 @@
// PostgreSQL implementation

// PostgreSQL requirements:
// - sudo apt install postgresql
// - npm install pg

import { InternalError, IO } from "./IO.js";
@@ -61,15 +60,15 @@ export function execQuery(conn, sql, params) {
}

/**
 * Executes a SELECT query and returns the resulting rows (functional signature: PGConn -> String -> List String -> IO (Maybe (List (List String)))).
 * Executes an SQL query and returns both field metadata and row data (functional signature: PGConn -> String -> List String -> IO (Maybe { fields, rows})).
 *
 * @param {PGConn} conn - A connected PostgreSQL client
 * @param {string} sql - The SQL query
 * @param {Array<string>} params - A list of parameters to bind
 * @returns {function(Symbol): Promise<{ tag: "just", value: any[] }>} - A CPS IO action that returns query result rows wrapped in a Maybe
 * @returns {function(Symbol): Promise<{ tag: "just", value: { fields, rows } } | { tag: "nothing" }>} - A CPS IO action that returns query result wrapped in a Maybe
 * @throws {InternalError} - If the query fails
 */
export function fetchRows(conn, sql, params) {
export function fetchQueryResult(conn, sql, params) {
  return async (IO) => {
    try {
      const res = await conn.query(sql, params);
@@ -79,16 +78,12 @@ export function fetchRows(conn, sql, params) {
        return nothing;
      }

      const rowsAsArrays = res.rows.map((row) => [
        row.schema_name,
        row.table_name,
        row.table_type,
        row.column_name,
        row.datatype,
        row.description,
      ]);
      const fields = res.fields.map((f) => ({
        name: f.name,
        dataTypeID: f.dataTypeID,
      }));

      return just(rowsAsArrays);
      return just({ fields, rows: res.rows });
    } catch (err) {
      throw new InternalError("Select failed: " + err.message);
    }
+16 −65
Original line number Diff line number Diff line
@@ -5,13 +5,14 @@
# - sudo apt install postgresql
# - pip install asyncpg

import json
import asyncpg
from typing import Callable, Awaitable, List, Dict

from Lib.IO import InternalError, IO
from Lib.IO import put_str, just, nothing

import asyncpg

import json

from typing import Callable, Awaitable, List, Dict

with open("Conf/PostgreSQL.json", "r") as f:
    config = json.load(f)
@@ -39,72 +40,22 @@ def conn_postgresql() -> Callable[[object], Awaitable[object]]:
    return inner


def exec_query(
    conn, sql: str, params: List[str]
) -> Callable[[object], Awaitable[object]]:
    """
    Executes an SQL query with parameters, returning no result (functional signature: PGConn -> String -> List String -> IO ()).

    Args:
        conn: A connected PostgreSQL client.
        sql (str): The SQL statement.
        params (List[str]): A list of parameters to bind.

    Returns:
        Callable[[IO], Awaitable[IO]]: A CPS IO action.

    Raises:
        InternalError: If the query execution fails.
    """

    async def inner(io: object) -> object:
        try:
            await conn.execute(sql, *params)
            await put_str("Query executed")(io)
            return io
        except Exception as e:
            raise InternalError(f"Execution failed: {e}") from e

    return inner


def fetch_rows(
def fetch_query_result(
    conn, sql: str, params: List[str]
) -> Callable[[object], Awaitable[dict]]:
    """
    Executes a SELECT query and returns the resulting rows (functional signature: PGConn -> String -> List String -> IO (Maybe (List (List String)))).

    Args:
        conn: A connected PostgreSQL client.
        sql (str): The SQL query.
        params (List[str]): A list of parameters to bind.

    Returns:
        Callable[[IO], Awaitable[dict]]: A CPS IO action that returns a Maybe-wrapped list of rows.

    Raises:
        InternalError: If the query execution fails.
    """

    async def inner(io: object) -> dict:
        try:
            rows = await conn.fetch(sql, *params)
            await put_str(f"{len(rows)} rows retrieved")(io)
            if rows:
                result = [
                    [
                        str(row["schema_name"]),
                        str(row["table_name"]),
                        str(row["table_type"]),
                        str(row["column_name"]),
                        str(row["datatype"]),
                        str(row.get("description", "")),  # Safe fallback
                    ]
                    for row in rows
                ]
                return just(result)
            else:
            stmt = await conn.prepare(sql)
            records = await stmt.fetch(*params)
            await put_str(f"{len(records)} rows retrieved")(io)
            if not records:
                return nothing
            fields = [
                {"name": attr.name, "dataTypeID": 0} for attr in stmt.get_attributes()
            ]

            rows = [dict(r) for r in records]
            return just({"fields": fields, "rows": rows})
        except Exception as e:
            raise InternalError(f"Select failed: {e}") from e

+2 −1
Original line number Diff line number Diff line
@@ -50,7 +50,8 @@ export function startRESTServer(port, routes) {
              reqPath.endsWith(".xml") ||
              reqPath.includes("availability") ||
              reqPath.includes("capabilities") ||
              reqPath.includes("tables")
              reqPath.includes("tables") ||
              reqPath.includes("sync")
            ) {
              contentType = "application/xml";
            } else if (reqPath.endsWith(".html") || reqPath === "/tap") {
Loading