Commit 738c4ea8 authored by Massimo Costantini's avatar Massimo Costantini
Browse files

Added DataLink

parent 94568036
Loading
Loading
Loading
Loading
+365 −20
Original line number Diff line number Diff line
@@ -2,6 +2,7 @@
// IVOA implementation

// IVOA requirements:
// - npm install axios
// - npm install uuid

import { InternalError, IO, performIO } from "./Lib/IO.js";
@@ -13,12 +14,16 @@ import {
  quitPostgreSQL,
} from "./Lib/PostgreSQL.js";

import { connRedis, setRedis, quitRedis } from "./Lib/Redis.js";
import { connRedis, getRedis, setRedis, quitRedis } from "./Lib/Redis.js";
import { startRESTServer, readRESTPort } from "./Lib/REST.js";
import { buildVOTablesFromRows } from "./Lib/VOTable.js";
import { buildVOTablesFromRows, makeVOTableError } from "./Lib/VOTable.js";

import xmlbuilder from "xmlbuilder";
import { v4 as uuidv4 } from "uuid";
import URLs from "./Conf/URL.json" with { type: "json" };

const makeSyncError = makeVOTableError;
const makeDatalinkError = makeVOTableError;

/**
 * Factory for serving static file endpoints.
@@ -67,7 +72,8 @@ const IVOA_tablesEndpoint = (req) => async (IO) => {
 * @returns {function(Symbol): Promise<string>} - XML content in VOTable format or error message
 */
const IVOA_syncEndpoint = (req) => async (IO) => {
  const query = req.method === "GET" ? req.query?.QUERY : req.body?.QUERY;
  const query = req.query?.QUERY || req.body?.QUERY || req.body?.query || null;

  const requestType =
    req.method === "GET" ? req.query?.request : req.body?.request || "doQuery";
  const clientId =
@@ -118,7 +124,7 @@ const IVOA_syncEndpoint = (req) => async (IO) => {

  for (const row of rows) {
    const rowUUID = uuidv4();
    const accessUrl = `https://.../datalink?id=${rowUUID}&clientId=${clientId}`;
    const accessUrl = `http://localhost:3000/datalink?id=${rowUUID}&clientId=${clientId}`;

    await setRedis(redis, {
      key: `client:${clientId}:query:${rowUUID}`,
@@ -143,27 +149,362 @@ const IVOA_syncEndpoint = (req) => async (IO) => {
};

/**
 * Generates a VOTable XML error document for synchronous TAP responses.
 * Handles the DataLink service.
 *
 * Functional signature: Request -> IO -> IO String
 *
 * @param {Request} req - The incoming HTTP request
 * @returns {function(Symbol): Promise<string>} - A CPS IO action that returns a VOTable XML response.
 */
const IVOA_datalinkEndpoint = (req) => async (IO) => {
  const id = req.query?.id;
  const clientId = req.query?.clientId;

  if (!id || !clientId) {
    return makeDatalinkError("Missing id or clientId");
  }

  const redis = await connRedis()(IO);
  let result;
  const possibleKeys = [
    `client:${clientId}:query:${id}`,
    `client:${clientId}:merge:sources:${id}`,
    `client:${clientId}:merge:transits:${id}`,
  ];

  let matchedKey = null;

  for (const key of possibleKeys) {
    const attempt = await getRedis(redis, key)(IO);
    if (attempt.tag === "just") {
      result = attempt;
      matchedKey = key;
      console.log("🔍 Redis result per", key, "=>", result);
      break;
    }
  }

  if (!result || result.tag === "nothing") {
    return makeDatalinkError("Row not found or expired");
  }

  const rowData = JSON.parse(result.value);
  const type = rowData.transit_id ? "transits" : "sources";

  const uuid = uuidv4();
  const storeKey = `client:${clientId}:merge:${type}:${uuid}`;

  await setRedis(redis, {
    key: storeKey,
    value: JSON.stringify(rowData),
  })(IO);

  const didName = rowData.did_name;
  const baseUrl = URLs.serverHost;
  const transitsUrl = `${baseUrl}/datalink/transits?id=${id}&clientId=${clientId}`;
  const gaiamergerUrl = `${baseUrl}/datalink/gaiamerger?id=${uuid}&clientId=${clientId}`;
  const progenitorUrl = `${baseUrl}/api/files?name=${encodeURIComponent(didName)}`;

  const votable = xmlbuilder
    .create("VOTABLE", { encoding: "UTF-8" })
    .att("version", "1.3")
    .att("xmlns", "http://www.ivoa.net/xml/VOTable/v1.3");

  const resource = votable.ele("RESOURCE", { type: "results" });
  const table = resource.ele("TABLE");

  table.ele("FIELD", { name: "access_url", datatype: "char", arraysize: "*" });
  table.ele("FIELD", { name: "description", datatype: "char", arraysize: "*" });
  table.ele("FIELD", {
    name: "content_type",
    datatype: "char",
    arraysize: "*",
  });
  table.ele("FIELD", { name: "semantics", datatype: "char", arraysize: "*" });

  const data = table.ele("DATA").ele("TABLEDATA");

  data
    .ele("TR")
    .ele("TD", gaiamergerUrl)
    .up()
    .ele("TD", "Cut & Merge (from progenitor)")
    .up()
    .ele("TD", "application/x-hdf5")
    .up()
    .ele("TD", "#this");

  data
    .ele("TR")
    .ele("TD", progenitorUrl)
    .up()
    .ele("TD", "Original progenitor dataset")
    .up()
    .ele("TD", "application/x-hdf5")
    .up()
    .ele("TD", "#progenitor");

  if (type === "sources") {
    data
      .ele("TR")
      .ele("TD", transitsUrl)
      .up()
      .ele("TD", "Link to transits")
      .up()
      .ele("TD", "application/x-votable+xml")
      .up()
      .ele("TD", "#counterpart");
  }

  return votable.end({ pretty: true });
};

/**
 * Handles the /datalink/transits endpoint.
 *
 * Functional signature: Request -> IO -> IO String
 *
 * @param {Request} req - The HTTP request object with id and clientId
 * @returns {function(Symbol): Promise<string>} - A CPS IO action returning a VOTable XML
 */
export const datalinkTransitsEndpoint = (req) => async (IO) => {
  const id = req.query?.id;
  const clientId = req.query?.clientId;

  if (!id || !clientId) {
    return makeDatalinkError("Missing id or clientId");
  }

  const redis = await connRedis()(IO);

  const keyQuery = `client:${clientId}:query:${id}`;
  let redisResult = await redis.get(keyQuery);

  if (!redisResult) {
    const keyMerge = `client:${clientId}:merge:sources:${id}`;
    redisResult = await redis.get(keyMerge);
  }

  if (!redisResult) {
    return makeDatalinkError("Row not found or expired");
  }

  const row = JSON.parse(redisResult);
  const sourceId = row.source_id;
  const sql = `
    SELECT a.file_name, c.source_id, a.transit_id, a.ac_win_coord,
           to_char(a.transit_time AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS.US') as transit_time,
           c.did_name, c.key_name
    FROM gaia.astroelementary a
    JOIN gaia.crossmatch c ON a.transit_id = c.transit_id
    WHERE c.source_id = $1;
  `;

  const conn = await connPostgreSQL()(IO);
  const result = await fetchQueryResult(conn, sql, [sourceId])(IO);
  await quitPostgreSQL(conn)(IO);

  if (result.tag === "nothing") {
    return makeDatalinkError(`No transits found for source_id ${sourceId}`);
  }

  const rows = result.value.rows;

  const votable = xmlbuilder
    .create("VOTABLE", { encoding: "UTF-8" })
    .att("version", "1.3")
    .att("xmlns", "http://www.ivoa.net/xml/VOTable/v1.3");

  const resource = votable.ele("RESOURCE", { type: "results" });
  const table = resource.ele("TABLE");

  const fields = [
    ["file_name", "char"],
    ["source_id", "char"],
    ["transit_id", "long"],
    ["ac_win_coord", "char"],
    ["transit_time", "char"],
    ["access_url", "char"],
  ];

  fields.forEach(([name, datatype]) => {
    const field = { name, datatype };
    if (datatype === "char") field.arraysize = "*";
    table.ele("FIELD", field);
  });

  const data = table.ele("DATA").ele("TABLEDATA");

  for (const row of rows) {
    const rowUUID = uuidv4();
    const accessUrl = `http://localhost:3000/datalink?id=${rowUUID}&clientId=${clientId}`;

    await setRedis(redis, {
      key: `client:${clientId}:merge:transits:${rowUUID}`,
      value: JSON.stringify({
        did_name: row.did_name,
        source_id: row.source_id,
        transit_id: row.transit_id,
        key_name: row.key_name,
      }),
    })(IO);

    const tr = data.ele("TR");
    tr.ele("TD", row.file_name);
    tr.ele("TD", row.source_id);
    tr.ele("TD", String(row.transit_id));
    tr.ele("TD", String(row.ac_win_coord));
    tr.ele("TD", row.transit_time);
    tr.ele("TD", accessUrl);
  }

  return votable.end({ pretty: true });
};

/**
 * Handles the /datalink/gaiamerger endpoint.
 *
 * Functional signature: Request -> IO -> IO String
 *
 * @param {Request} req - HTTP request with 'id' and 'clientId'
 * @returns {(IO) => Promise<string>} - A CPS IO action returning a redirect or error
 */
export const datalinkGaiamergerEndpoint = (req) => async (IO) => {
  const id = req.query?.id;
  const clientId = req.query?.clientId;

  if (!id || !clientId) {
    return makeDatalinkError("Missing id or clientId");
  }

  const redis = await connRedis()(IO);
  const mergeSources = await redis.keys(`client:${clientId}:merge:sources:*`);
  const mergeTransits = await redis.keys(`client:${clientId}:merge:transits:*`);

  const getAll = async (keys) => {
    const values = await Promise.all(keys.map((k) => redis.get(k)));
    return values.filter(Boolean).map(JSON.parse);
  };

  const sources = await getAll(mergeSources);
  const transits = await getAll(mergeTransits);

  const renameKeys = (obj) => ({
    ...obj,
    didName: obj.did_name,
    transitId: obj.transit_id,
    keyName: obj.key_name,
    sourceId: obj.source_id,
  });

  const payload = {
    type: transits.length > 0 ? "both" : "sources",
    data: [...sources, ...transits].map(renameKeys),
  };

  const axios = await import("axios").then((m) => m.default);
  const https = await import("https");

  try {
    const agent = new https.Agent({ rejectUnauthorized: false });

    const mergerResp = await axios.post(URLs.gaiaMergerUrl, payload, {
      httpsAgent: agent,
    });

    const fileName = mergerResp.data;

    const rucioResp = await axios.get(
      `${URLs.rucioApiUrl}?name=${encodeURIComponent(fileName)}`,
      { httpsAgent: agent },
    );

    return `302 REDIRECT ${rucioResp.data[0]}`;
  } catch (err) {
    return makeDatalinkError(
      "Error calling GaiaMerger or retrieving result: " +
        (err.response?.data || err.message),
    );
  }
};

/**
 * Handles the /datalink/progenitor endpoint.
 *
 * Functional signature: String -> String
 * Functional signature: Request -> IO -> IO String
 *
 * @param {string} message - The error message to embed in the VOTable
 * @returns {string} - The XML error response as a serialized string
 * @param {Request} req - HTTP request with 'id' and 'clientId'
 * @returns {(IO) => Promise<string>} - A CPS IO action returning a VOTable with progenitor link
 */
export function makeSyncError(message) {
  return xmlbuilder
export const datalinkProgenitorEndpoint = (req) => async (IO) => {
  const id = req.query?.id;
  const clientId = req.query?.clientId;

  if (!id || !clientId) {
    return makeDatalinkError("Missing id or clientId");
  }

  const redis = await connRedis()(IO);
  const key = `client:${clientId}:query:${id}`;
  const value = await redis.get(key);

  if (!value) {
    return makeDatalinkError("Row not found or expired");
  }

  const row = JSON.parse(value);
  const didName = row.did_name;

  const https = await import("https");
  const agent = new https.Agent({ rejectUnauthorized: false });
  const axios = await import("axios").then((m) => m.default);

  try {
    const rucioResp = await axios.get(
      `${URLs.rucioApiUrl}?name=${encodeURIComponent(didName)}`,
      { httpsAgent: agent },
    );

    const fileUrl = rucioResp.data[0];

    const votable = xmlbuilder
      .create("VOTABLE", { encoding: "UTF-8" })
      .att("version", "1.3")
    .att("xmlns", "http://www.ivoa.net/xml/VOTable/v1.3")
    .att(
      "xsi:schemaLocation",
      "http://www.ivoa.net/xml/VOTable/v1.3 http://www.ivoa.net/xml/VOTable/v1.3",
    )
    .att("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
    .ele("RESOURCE", { type: "results" })
    .ele("INFO", { name: "QUERY_STATUS", value: "ERROR" }, message)
    .end({ pretty: true });
      .att("xmlns", "http://www.ivoa.net/xml/VOTable/v1.3");

    const resource = votable.ele("RESOURCE", { type: "results" });
    const table = resource.ele("TABLE");

    table.ele("FIELD", {
      name: "access_url",
      datatype: "char",
      arraysize: "*",
    });
    table.ele("FIELD", {
      name: "description",
      datatype: "char",
      arraysize: "*",
    });
    table.ele("FIELD", {
      name: "content_type",
      datatype: "char",
      arraysize: "*",
    });
    table.ele("FIELD", { name: "semantics", datatype: "char", arraysize: "*" });

    const data = table.ele("DATA").ele("TABLEDATA");

    const tr = data.ele("TR");
    tr.ele("TD", fileUrl);
    tr.ele("TD", "Original progenitor dataset");
    tr.ele("TD", "application/x-hdf5");
    tr.ele("TD", "#progenitor");

    return votable.end({ pretty: true });
  } catch (err) {
    return makeDatalinkError("Error retrieving progenitor file URL");
  }
};

/**
 * Main entry point for the IVOA TAP and DataLink server.
@@ -181,6 +522,10 @@ const IVOA_main = () => async (IO) => {
    { path: "/tap/capabilities", endpoint: IVOA_capabilitiesEndpoint },
    { path: "/tap/tables", endpoint: IVOA_tablesEndpoint },
    { path: "/tap/sync", endpoint: IVOA_syncEndpoint },
    { path: "/datalink", endpoint: IVOA_datalinkEndpoint },
    { path: "/datalink/transits", endpoint: datalinkTransitsEndpoint },
    { path: "/datalink/gaiamerger", endpoint: datalinkGaiamergerEndpoint },
    { path: "/datalink/progenitor", endpoint: datalinkProgenitorEndpoint },
  ])(IO);
  await putStr("IVOA server started")(IO);
};
+379 −44

File changed.

Preview size limit exceeded, changes collapsed.

+7 −2
Original line number Diff line number Diff line
@@ -19,6 +19,8 @@ import config from "../Conf/REST.json" with { type: "json" };
 */

const app = express();
app.use(bodyParser.urlencoded({ extended: true }));

app.use(bodyParser.json());

let server = null;
@@ -39,12 +41,15 @@ const makeWrapper = (handler) => {
      const reqPath = req.path;
      let contentType = "text/plain";

      if (
      if (typeof result === "string" && result.trim().startsWith("<?xml")) {
        contentType = "application/xml";
      } else if (
        reqPath.endsWith(".xml") ||
        reqPath.includes("availability") ||
        reqPath.includes("capabilities") ||
        reqPath.includes("tables") ||
        reqPath.includes("sync")
        reqPath.includes("sync") ||
        reqPath.includes("datalink")
      ) {
        contentType = "application/xml";
      } else if (reqPath.endsWith(".html") || reqPath === "/tap") {
+10 −1
Original line number Diff line number Diff line
@@ -45,9 +45,18 @@ def start_rest_server(
                try:
                    result = await handler(request)(io)
                    path = request.path
                    if result.strip().startswith("<?xml"):
                        content_type = "application/xml"

                    if any(
                        x in path
                        for x in ["availability", "capabilities", "tables", "sync"]
                        for x in [
                            "availability",
                            "capabilities",
                            "tables",
                            "sync",
                            "datalink",
                        ]
                    ):
                        content_type = "application/xml"
                    elif path.endswith(".html") or path == "/tap":
+30 −0
Original line number Diff line number Diff line
@@ -62,6 +62,36 @@ def set_redis(
    return inner_cmd


def get_redis(
    redis: Redis,
) -> Callable[[str], Callable[[object], Awaitable[Dict[str, str]]]]:
    """
    Retrieves a value from Redis by key.

    Functional signature: RedisConn -> String -> IO -> IO { tag: "just", value: String } | { tag: "nothing" }

    Args:
        redis: A connected Redis client.

    Returns:
        Callable[[str], Callable[[IO], Awaitable[Dict]]]: A CPS IO action that retrieves a key from Redis.
    """

    def inner_cmd(key: str) -> Callable[[object], Awaitable[Dict[str, str]]]:
        async def inner(io: object) -> Dict[str, str]:
            value = await redis.get(key)
            if value is not None:
                await put_str(f"Redis: get {key} -> HIT")(io)
                return {"tag": "just", "value": value.decode("utf-8")}
            else:
                await put_str(f"Redis: get {key} -> MISS")(io)
                return {"tag": "nothing"}

        return inner

    return inner_cmd


def quit_redis(redis: Redis) -> Callable[[object], Awaitable[object]]:
    """
    Closes the Redis connection.
Loading