Commit 97a3b4e8 authored by Massimo Costantini's avatar Massimo Costantini
Browse files

Added comments

parent 2a852c68
Loading
Loading
Loading
Loading
+21 −4
Original line number Diff line number Diff line
@@ -283,9 +283,10 @@ const IVOA_datalinkEndpoint = (req) => async (IO) => {
 * Functional signature: Request -> IO -> IO String
 *
 * @param {Request} req - The HTTP request object with id and clientId
 * @returns {function(Symbol): Promise<string>} - A CPS IO action returning a VOTable XML
 * @returns {function(Symbol): Promise<string>} - A CPS IO action that returs a VOTable XML
 */
export const datalinkTransitsEndpoint = (req) => async (IO) => {
  // Validate required query parameters
  const id = req.query?.id;
  const clientId = req.query?.clientId;

@@ -295,6 +296,7 @@ export const datalinkTransitsEndpoint = (req) => async (IO) => {

  const redis = await connRedis()(IO);

  // Try multiple Redis keys to retrieve the source row
  const keyQuery = `client:${clientId}:query:${id}`;
  let redisResult = await redis.get(keyQuery);

@@ -309,6 +311,8 @@ export const datalinkTransitsEndpoint = (req) => async (IO) => {

  const row = JSON.parse(redisResult);
  const sourceId = row.source_id;

  // SQL query to fetch transits for a given source_id
  const sql = `
    SELECT a.file_name, c.source_id, a.transit_id, a.ac_win_coord,
           to_char(a.transit_time AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS.US') as transit_time,
@@ -328,6 +332,7 @@ export const datalinkTransitsEndpoint = (req) => async (IO) => {

  const rows = result.value.rows;

  // Build a VOTable with standard fields plus access_url for each transit
  const votable = xmlbuilder
    .create("VOTABLE", { encoding: "UTF-8" })
    .att("version", "1.3")
@@ -353,6 +358,7 @@ export const datalinkTransitsEndpoint = (req) => async (IO) => {

  const data = table.ele("DATA").ele("TABLEDATA");

  // Serialize each transit row, generate access_url, and store it in Redis
  for (const row of rows) {
    const rowUUID = uuidv4();
    const accessUrl = `http://localhost:3000/datalink?id=${rowUUID}&clientId=${clientId}`;
@@ -384,18 +390,21 @@ export const datalinkTransitsEndpoint = (req) => async (IO) => {
 *
 * Functional signature: Request -> IO -> IO String
 *
 * @param {Request} req - HTTP request with 'id' and 'clientId'
 * @param {Request} req - HTTP request with id and clientId
 * @returns {Promise<string>} - A CPS IO action that takes an IO token and returns a redirect or error string
 */
export const datalinkGaiamergerEndpoint = (req) => async (IO) => {
  const id = req.query?.id;
  const clientId = req.query?.clientId;

  // Validate query parameters
  if (!id || !clientId) {
    return makeDatalinkError("Missing id or clientId");
  }

  const redis = await connRedis()(IO);

  // Collect merge data from Redis
  const mergeSources = await redis.keys(`client:${clientId}:merge:sources:*`);
  const mergeTransits = await redis.keys(`client:${clientId}:merge:transits:*`);

@@ -407,6 +416,7 @@ export const datalinkGaiamergerEndpoint = (req) => async (IO) => {
  const sources = await getAll(mergeSources);
  const transits = await getAll(mergeTransits);

  // Normalize keys and prepare payload
  const renameKeys = (obj) => ({
    ...obj,
    didName: obj.did_name,
@@ -422,10 +432,9 @@ export const datalinkGaiamergerEndpoint = (req) => async (IO) => {

  const axios = await import("axios").then((m) => m.default);
  const https = await import("https");

  try {
  const agent = new https.Agent({ rejectUnauthorized: false });

  try {
    const mergerResp = await axios.post(URLs.gaiaMergerUrl, payload, {
      httpsAgent: agent,
    });
@@ -458,10 +467,12 @@ export const datalinkProgenitorEndpoint = (req) => async (IO) => {
  const id = req.query?.id;
  const clientId = req.query?.clientId;

  // Validate parameters
  if (!id || !clientId) {
    return makeDatalinkError("Missing id or clientId");
  }

  // Retrieve data from Redis
  const redis = await connRedis()(IO);
  const key = `client:${clientId}:query:${id}`;
  const value = await redis.get(key);
@@ -473,6 +484,7 @@ export const datalinkProgenitorEndpoint = (req) => async (IO) => {
  const row = JSON.parse(value);
  const didName = row.did_name;

  // Request download URL from Rucio
  const https = await import("https");
  const agent = new https.Agent({ rejectUnauthorized: false });
  const axios = await import("axios").then((m) => m.default);
@@ -485,6 +497,7 @@ export const datalinkProgenitorEndpoint = (req) => async (IO) => {

    const fileUrl = rucioResp.data[0];

    // Build and return VOTable with progenitor link
    const votable = xmlbuilder
      .create("VOTABLE", { encoding: "UTF-8" })
      .att("version", "1.3")
@@ -553,6 +566,7 @@ const IVOA_main = () => async (IO) => {
 *
 * Functional signature: () -> IO ()
 *
 * @param {void} - No input parameters
 * @returns {Promise<void>} - A CPS IO action that takes an IO token and launches the server logic
 */
const mainExpression = () => IVOA_main();
@@ -561,6 +575,9 @@ const mainExpression = () => IVOA_main();
 * Handles top-level execution and exception reporting.
 *
 * Functional signature: () -> IO ()
 *
 * @param {void} - No input parameters
 * @returns {Promise<void>} - A CPS IO action that executes the main expression and handles errors
 */
const mainWrapper = async () => {
  try {
+40 −2
Original line number Diff line number Diff line
@@ -300,7 +300,20 @@ def ivoa_datalink_endpoint(req) -> Callable[[object], Awaitable[str]]:


def ivoa_transits_endpoint(req) -> Callable[[object], Awaitable[str]]:
    """
    Handles the /datalink/transits endpoint.

    Functional signature: Request -> IO -> IO String

    Args:
        req: The HTTP request with id and clientId.

    Returns:
        Callable[[object], Awaitable[str]]: A CPS IO action that returns a VOTable XML.
    """

    async def inner(io):
        # Validate required query parameters
        id_ = req.query.get("id")
        client_id = req.query.get("clientId")

@@ -308,6 +321,8 @@ def ivoa_transits_endpoint(req) -> Callable[[object], Awaitable[str]]:
            return make_datalink_error("Missing id or clientId")

        redis = await conn_redis()(io)

        # Try multiple Redis keys to retrieve the source row
        possible_keys = [
            f"client:{client_id}:query:{id_}",
            f"client:{client_id}:merge:sources:{id_}",
@@ -326,6 +341,7 @@ def ivoa_transits_endpoint(req) -> Callable[[object], Awaitable[str]]:
        row_data = json.loads(result["value"])
        source_id = row_data.get("source_id")

        # SQL query to fetch transits for a given source_id
        query = """
            SELECT a.file_name, c.source_id, a.transit_id, a.ac_win_coord,
                   to_char(a.transit_time AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS.US') as transit_time,
@@ -344,6 +360,7 @@ def ivoa_transits_endpoint(req) -> Callable[[object], Awaitable[str]]:

        rows = result_query["value"]["rows"]

        # Build a VOTable with standard fields plus access_url for each transit
        votable = ET.Element(
            "VOTABLE",
            {
@@ -369,6 +386,7 @@ def ivoa_transits_endpoint(req) -> Callable[[object], Awaitable[str]]:

        data = ET.SubElement(ET.SubElement(table, "DATA"), "TABLEDATA")

        # Serialize each transit row, generate access_url, and store it in Redis
        for row in rows:
            row_uuid = str(uuid.uuid4())

@@ -411,7 +429,7 @@ def ivoa_gaiamerger_endpoint(req) -> Callable[[object], Awaitable[str]]:
    Functional signature: Request -> IO -> IO String

    Args:
        req: The HTTP request with 'id' and 'clientId'.
        req: The HTTP request with id and clientId.

    Returns:
        Callable[[object], Awaitable[str]]: A CPS IO action that returns a redirect or error.
@@ -421,11 +439,13 @@ def ivoa_gaiamerger_endpoint(req) -> Callable[[object], Awaitable[str]]:
        id_ = req.query.get("id")
        client_id = req.query.get("clientId")

        # Validate query parameters
        if not id_ or not client_id:
            return make_datalink_error("Missing id or clientId")

        redis = await conn_redis()(io)

        # Collect merge data from Redis
        async def get_keys(prefix):
            keys = await redis.keys(f"client:{client_id}:merge:{prefix}:*")
            values = await asyncio.gather(*[redis.get(k) for k in keys])
@@ -434,11 +454,13 @@ def ivoa_gaiamerger_endpoint(req) -> Callable[[object], Awaitable[str]]:
        sources = await get_keys("sources")
        transits = await get_keys("transits")

        # Prepare payload
        payload = {
            "type": "both" if transits else "sources",
            "data": sources + transits,
        }

        # Call GaiaMerger and redirect to Rucio
        try:
            async with aiohttp.ClientSession(
                connector=aiohttp.TCPConnector(ssl=False)
@@ -465,7 +487,7 @@ def ivoa_progenitor_endpoint(req) -> Callable[[object], Awaitable[str]]:
    Functional signature: Request -> IO -> IO String

    Args:
        req: The HTTP request with 'id' and 'clientId'.
        req: The HTTP request with id and clientId.

    Returns:
        Callable[[object], Awaitable[str]]: A CPS IO action that returns a VOTable XML.
@@ -475,9 +497,11 @@ def ivoa_progenitor_endpoint(req) -> Callable[[object], Awaitable[str]]:
        id_ = req.query.get("id")
        client_id = req.query.get("clientId")

        # Validate parameters
        if not id_ or not client_id:
            return make_datalink_error("Missing id or clientId")

        # Retrieve data from Redis
        redis = await conn_redis()(io)
        possible_keys = [
            f"client:{client_id}:query:{id_}",
@@ -500,6 +524,7 @@ def ivoa_progenitor_endpoint(req) -> Callable[[object], Awaitable[str]]:

        import aiohttp

        # Request download URL from Rucio
        try:
            async with aiohttp.ClientSession(
                connector=aiohttp.TCPConnector(ssl=False)
@@ -511,6 +536,7 @@ def ivoa_progenitor_endpoint(req) -> Callable[[object], Awaitable[str]]:
                    urls = await resp.json()
                    file_url = urls[0]

            # Build and return VOTable with progenitor link
            votable = ET.Element(
                "VOTABLE",
                {"version": "1.3", "xmlns": "http://www.ivoa.net/xml/VOTable/v1.3"},
@@ -576,6 +602,12 @@ def main_expression() -> Callable[[object], Awaitable[None]]:
    Wraps the program execution in a safe IO environment.

    Functional signature: () -> IO ()

    Args:
        None

    Returns:
        Callable[[object], Awaitable[None]]: A CPS IO action that launches the server logic.
    """
    return ivoa_main()

@@ -585,6 +617,12 @@ async def main_wrapper():
    Handles top-level execution and exception reporting.

    Functional signature: () -> IO ()

    Args:
        None

    Returns:
        Awaitable[None]: A CPS IO action that executes the main expression and handles errors.
    """
    try:
        await perform_io(main_expression())