Commit aad76ef3 authored by Massimo Costantini's avatar Massimo Costantini
Browse files

Updated comments and style

parent 23408404
Loading
Loading
Loading
Loading
+46 −43
Original line number Diff line number Diff line
@@ -14,56 +14,57 @@ import {
} from "./Lib/PostgreSQL.js";

import { connRedis, setRedis, quitRedis } from "./Lib/Redis.js";

import { startRESTServer, readRESTPort } from "./Lib/REST.js";

import { buildVOTablesFromRows } from "./Lib/VOTable.js";

import xmlbuilder from "xmlbuilder";

import { v4 as uuidv4 } from "uuid";

/**
 * Serves the static /tap HTML landing page (functional signature: Request -> IO -> IO String).
 * Serves the static /tap HTML landing page.
 *
 * Functional signature: Request -> IO -> IO String
 *
 * @param {Request} req - Incoming HTTP request
 * @param {Symbol} IO - IO token
 * @returns {Promise<string>} - HTML content
 * @returns {function(Symbol): Promise<string>} - HTML content
 */
async function IVOA_tapEndpoint(req, IO) {
const IVOA_tapEndpoint = (req) => async (IO) => {
  return await readFile("Stat/tap.html")(IO);
}
};

/**
 * Serves the static /availability XML response (functional signature: Request -> IO -> IO String).
 * Serves the static /availability XML response.
 *
 * Functional signature: Request -> IO -> IO String
 *
 * @param {Request} req - Incoming HTTP request
 * @param {Symbol} IO - IO token
 * @returns {Promise<string>} - XML content
 * @returns {function(Symbol): Promise<string>} - XML content in VOTable format or error message
 */
async function IVOA_availabilityEndpoint(req, IO) {
const IVOA_availabilityEndpoint = (req) => async (IO) => {
  return await readFile("Stat/availability.xml")(IO);
}
};

/**
 * Serves the static /capabilities XML response (functional signature: Request -> IO -> IO String).
 * Serves the static /capabilities XML response.
 *
 * Functional signature: Request -> IO -> IO String
 *
 * @param {Request} req - Incoming HTTP request
 * @param {Symbol} IO - IO token
 * @returns {Promise<string>} - XML content
 * @returns {function(Symbol): Promise<string>} - XML content in VOTable format or error message
 */
async function IVOA_capabilitiesEndpoint(req, IO) {
const IVOA_capabilitiesEndpoint = (req) => async (IO) => {
  return await readFile("Stat/capabilities.xml")(IO);
}
};

/**
 * Executes an SQL query from file and returns a VOTable representation (functional signature: Request -> IO -> IO String).
 * Executes an SQL query from file and returns a VOTable representation.
 *
 * Functional signature: Request -> IO -> IO String
 *
 * @param {Request} req - Incoming HTTP request
 * @param {Symbol} IO - IO token
 * @returns {Promise<string>} - XML content in VOTable format or error message
 * @returns {function(Symbol): Promise<string>} - XML content in VOTable format or error message
 */
async function IVOA_tablesEndpoint(req, IO) {
const IVOA_tablesEndpoint = (req) => async (IO) => {
  const query = await readFile("Query/tables.sql")(IO);
  const conn = await connPostgreSQL()(IO);
  const result = await fetchQueryResult(conn, query, [])(IO);
@@ -74,17 +75,18 @@ async function IVOA_tablesEndpoint(req, IO) {
  }

  return buildVOTablesFromRows(result.value);
}
};

/**
 * Handles synchronous TAP queries (functional signature: Request -> IO -> IO String).
 * Handles synchronous TAP queries.
 * Executes a SQL query and returns results in VOTable format.
 *
 * Functional signature: Request -> IO -> IO String
 *
 * @param {Request} req - Incoming HTTP request
 * @param {Symbol} IO - IO token
 * @returns {Promise<string>} - XML content in VOTable format or error message
 * @returns {function(Symbol): Promise<string>} - XML content in VOTable format or error message
 */
async function IVOA_syncEndpoint(req, IO) {
const IVOA_syncEndpoint = (req) => async (IO) => {
  const query = req.method === "GET" ? req.query?.QUERY : req.body?.QUERY;
  const requestType =
    req.method === "GET" ? req.query?.request : req.body?.request || "doQuery";
@@ -107,14 +109,11 @@ async function IVOA_syncEndpoint(req, IO) {
  await quitPostgreSQL(conn)(IO);

  if (result.tag === "nothing") {
    return makeSyncError("No results.");
    return makeSyncError("No results");
  }

  const { fields, rows } = result.value;

  if (!Array.isArray(rows)) {
    throw new InternalError("BUG: rows is not iterable");
  }
  const redis = await connRedis()(IO);

  const votable = xmlbuilder
@@ -161,13 +160,15 @@ async function IVOA_syncEndpoint(req, IO) {

  await quitRedis(redis)(IO);
  return votable.end({ pretty: true });
}
};

/**
 * Generates a VOTable error document for /tap/sync (functional signature: String -> String).
 * Generates a VOTable error document for /tap/sync.
 *
 * Functional signature: String -> IO -> IO String
 *
 * @param {string} message - Error message to embed in VOTable
 * @returns {string} - XML error in VOTable format
 * @returns {(IO) => Promise<string>} - A CPS IO action returning an XML error document
 */
function makeSyncError(message) {
  return xmlbuilder
@@ -185,12 +186,14 @@ function makeSyncError(message) {
}

/**
 * Main entry point for the IVOA TAP and DataLink server (functional signature: IO -> IO ()).
 * Main entry point for the IVOA TAP and DataLink server.
 *
 * Functional signature: IO -> IO ()
 *
 * @param {Symbol} IO - IO token
 * @returns {Promise<void>} - Starts the server and binds all routes
 */
async function IVOA_main(IO) {
const IVOA_main = () => async (IO) => {
  const port = await readRESTPort(IO);
  startRESTServer(port, [
    { path: "/tap", endpoint: IVOA_tapEndpoint },
@@ -200,19 +203,19 @@ async function IVOA_main(IO) {
    { path: "/tap/sync", endpoint: IVOA_syncEndpoint },
  ])(IO);
  await putStr("IVOA server started")(IO);
}
};

/**
 * Wraps the program execution in a safe IO environment (functional signature: () -> IO ()).
 * Wraps the program execution in a safe IO environment.
 *
 * Functional signature: () -> IO ()
 *
 * @returns {IO} - The IO token after the execution
 * @returns {(IO) => Promise<void>} - A CPS IO action  that launches the main TAP and DataLink server logic
 */
function mainExpression() {
  return performIO((IO) => IVOA_main(IO));
}
const mainExpression = () => IVOA_main();

try {
  await mainExpression();
  await performIO(mainExpression());
} catch (e) {
  if (e instanceof InternalError) {
    await putStr("ERROR: " + e.message)(IO);
+172 −139
Original line number Diff line number Diff line
@@ -5,97 +5,120 @@ from Lib.IO import InternalError, IO, perform_io
from Lib.IO import put_str, read_file

from Lib.PostgreSQL import conn_postgresql, fetch_query_result, quit_postgresql

from Lib.Redis import conn_redis, set_redis, quit_redis

from Lib.REST import start_rest_server, read_rest_port

from Lib.VOTable import build_votables_from_rows

import asyncio
from typing import Awaitable, Callable
from xml.dom import minidom

import asyncio
import uuid

import xml.etree.ElementTree as ET

from xml.dom import minidom


async def ivoa_tap_endpoint(req, io):
def ivoa_tap_endpoint(req) -> Callable[[object], Awaitable[str]]:
    """
    Serves the static /tap HTML landing page (functional signature: Request -> IO -> IO String).
    Serves the static /tap HTML landing page.

    Functional signature: Request -> IO -> IO String

    Args:
        req: Incoming HTTP request.
        io: IO token.

    Returns:
        Awaitable[str]: HTML content.
        A function that takes an IO token and returns an awaitable HTML string.
    """
    return await read_file("Stat/tap.html")(io)

    def inner(io):
        return read_file("Stat/tap.html")(io)

async def ivoa_availability_endpoint(req, io):
    return inner


def ivoa_availability_endpoint(req) -> Callable[[object], Awaitable[str]]:
    """
    Serves the static /availability XML response (functional signature: Request -> IO -> IO String).
    Serves the static /availability XML response.

    Functional signature: Request -> IO -> IO String

    Args:
        req: Incoming HTTP request.
        io: IO token.

    Returns:
        Awaitable[str]: XML content.
        A function that takes an IO token and returns an awaitable XML string.
    """
    return await read_file("Stat/availability.xml")(io)

    def inner(io):
        return read_file("Stat/availability.xml")(io)

    return inner


async def ivoa_capabilities_endpoint(req, io):
def ivoa_capabilities_endpoint(req) -> Callable[[object], Awaitable[str]]:
    """
    Serves the static /capabilities XML response (functional signature: Request -> IO -> IO String).
    Serves the static /capabilities XML response.

    Functional signature: Request -> IO -> IO String

    Args:
        req: Incoming HTTP request.
        io: IO token.

    Returns:
        Awaitable[str]: XML content.
        A function that takes an IO token and returns an awaitable XML string.
    """
    return await read_file("Stat/capabilities.xml")(io)

    def inner(io):
        return read_file("Stat/capabilities.xml")(io)

    return inner


async def ivoa_tables_endpoint(req, io):
def ivoa_tables_endpoint(req) -> Callable[[object], Awaitable[str]]:
    """
    Executes an SQL query from file and returns a VOTable representation (functional signature: Request -> IO -> IO String).
    Executes an SQL query from file and returns a VOTable representation.

    Functional signature: Request -> IO -> IO String

    Args:
        req: Incoming HTTP request.
        io: IO token.

    Returns:
        Awaitable[str]: XML content in VOTable format or error message.
        A function that takes an IO token and returns an awaitable XML VOTable string.
    """

    async def inner(io):
        query = await read_file("Query/tables.sql")(io)
        conn = await conn_postgresql()(io)
        result = await fetch_query_result(conn, query, [])(io)

        await quit_postgresql(conn)(io)

        if result["tag"] == "nothing":
        return '<?xml version="1.0" encoding="UTF-8"?><error>No tables found</error>'
            return (
                '<?xml version="1.0" encoding="UTF-8"?><error>No tables found</error>'
            )

    return build_votables_from_rows(result["value"]["fields"], result["value"]["rows"])
        return build_votables_from_rows(
            result["value"]["fields"], result["value"]["rows"]
        )

    return inner

async def make_sync_error(message: str) -> str:

def make_sync_error(message: str) -> Callable[[object], Awaitable[str]]:
    """
    Generates a VOTable error document for /tap/sync.

    Functional signature: String -> IO -> IO String

    Args:
        message (str): Error message to embed in the VOTable.
        message: Error message to embed in the VOTable.

    Returns:
        str: XML error in VOTable format.
        A CPS IO action that returns the XML error document.
    """

    async def inner(io):
        votable = ET.Element(
            "VOTABLE",
            {
@@ -106,26 +129,30 @@ async def make_sync_error(message: str) -> str:
            },
        )
        resource = ET.SubElement(votable, "RESOURCE", {"type": "results"})
    ET.SubElement(resource, "INFO", {"name": "QUERY_STATUS", "value": "ERROR"}).text = (
        message
    )
        ET.SubElement(
            resource, "INFO", {"name": "QUERY_STATUS", "value": "ERROR"}
        ).text = message

        xml_str = ET.tostring(votable, encoding="utf-8")
        return minidom.parseString(xml_str).toprettyxml(indent="  ")

    return inner

async def ivoa_sync_endpoint(req, io):

def ivoa_sync_endpoint(req) -> Callable[[object], Awaitable[str]]:
    """
    Handles synchronous TAP queries (/tap/sync).
    Executes a SQL query and returns results in VOTable format, including Redis storage for access URLs.
    Handles synchronous TAP queries.

    Functional signature: Request -> IO -> IO String

    Args:
        req: Incoming HTTP request.
        io: IO token.

    Returns:
        str: XML VOTable result or error response.
        A function that takes an IO token and returns an awaitable VOTable XML string.
    """

    async def inner(io):
        method = req.method
        query = req.query.get("QUERY") if method == "GET" else req.post.get("QUERY")
        request_type = (
@@ -140,29 +167,28 @@ async def ivoa_sync_endpoint(req, io):
        )

        if not query:
        return await make_sync_error("Missing QUERY parameter")
            return await make_sync_error("Missing QUERY parameter")(io)

        if request_type != "doQuery":
            return await make_sync_error(
                'The parameter "request" must be "doQuery" or omitted'
        )
            )(io)

        conn = await conn_postgresql()(io)
        result = await fetch_query_result(conn, query, [])(io)
        await quit_postgresql(conn)(io)

        if result["tag"] == "nothing":
        return await make_sync_error("No results.")
            return await make_sync_error("No results")(io)

        fields = result["value"]["fields"]
        rows = result["value"]["rows"]
    if not isinstance(rows, list):
        raise InternalError("BUG: rows is not iterable")

        redis = await conn_redis()(io)

        votable = ET.Element(
        "VOTABLE", {"version": "1.3", "xmlns": "http://www.ivoa.net/xml/VOTable/v1.3"}
            "VOTABLE",
            {"version": "1.3", "xmlns": "http://www.ivoa.net/xml/VOTable/v1.3"},
        )
        resource = ET.SubElement(votable, "RESOURCE", {"type": "results"})
        table = ET.SubElement(resource, "TABLE")
@@ -202,17 +228,20 @@ async def ivoa_sync_endpoint(req, io):
        xml_str = ET.tostring(votable, encoding="utf-8")
        return minidom.parseString(xml_str).toprettyxml(indent="  ")

    return inner


async def ivoa_main(io):
def ivoa_main() -> Callable[[object], Awaitable[None]]:
    """
    Main entry point for the IVOA TAP and DataLink server (functional signature: IO -> IO ()).
    Main entry point for the IVOA TAP and DataLink server.

    Args:
        io: IO token.
    Functional signature: IO -> IO ()

    Returns:
        Awaitable[None]: Starts the server and binds all routes.
        A CPS IO action that takes the IO token and starts the IVOA server.
    """

    async def inner(io):
        port = await read_rest_port()(io)
        await start_rest_server(
            port,
@@ -227,20 +256,24 @@ async def ivoa_main(io):
        await put_str("IVOA server started")(io)
        await asyncio.Event().wait()

    return inner

def main_expression():

def main_expression() -> Callable[[object], Awaitable[None]]:
    """
    Wraps the program execution in a safe IO environment (functional signature: () -> IO ()).
    Wraps the program execution in a safe IO environment.

    Functional signature: () -> IO ()

    Returns:
        IO: The IO token after the execution.
        A CPS IO action that launches the main TAP and DataLink server logic.
    """
    return perform_io(lambda io: ivoa_main(io))
    return ivoa_main()


if __name__ == "__main__":
    try:
        asyncio.run(main_expression())
        asyncio.run(perform_io(main_expression()))
    except InternalError as e:
        asyncio.run(put_str("ERROR: " + str(e))(IO))
    except Exception as e:
+24 −8
Original line number Diff line number Diff line
@@ -4,14 +4,18 @@
import fs from "fs/promises";

/**
 * Custom error type to represent internal application exceptions (functional signature: String -> InternalError).
 * Custom error type to represent internal application exceptions.
 *
 * Functional signature: String -> InternalError
 *
 * @param {string} message - The error message
 */
export class InternalError extends Error {}

/**
 * Token representing the "IO world" in an implicit continuation-passing style (functional signature: Symbol).
 * Token representing the "IO world" in an implicit continuation-passing style.
 *
 * Functional signature: Symbol
 *
 * @const
 * @type {symbol}
@@ -19,7 +23,9 @@ export class InternalError extends Error {}
export const IO = Symbol("IO");

/**
 * Applies an implicit CPS IO continuation to the IO token (functional signature: ((Symbol) -> a) -> a).
 * Applies an implicit CPS IO continuation to the IO token.
 *
 * Functional signature: ((Symbol) -> a) -> a)
 *
 * @param {function(Symbol): any} f - A CPS function expecting an IO token
 * @returns {*} - The result of applying the function to the "IO world"
@@ -29,7 +35,9 @@ export function createIO(f) {
}

/**
 * Alias for createIO which semantically marks a point where an effect is executed (functional signature: ((Symbol) -> a) -> a).
 * Alias for createIO which semantically marks a point where an effect is executed.
 *
 * Functional signature: ((Symbol) -> a) -> a)
 *
 * @param {function(Symbol): any} f - A CPS function
 * @returns {*} - The result of the function performing IO operations
@@ -39,7 +47,9 @@ export function performIO(f) {
}

/**
 * Prints a string to the console (functional signature: String -> IO -> IO).
 * Prints a string to the console.
 *
 * Functional signature: String -> IO -> IO)
 *
 * @param {string} x - Text to print
 * @returns {function(Symbol): Promise<Symbol>} - A CPS IO action that prints a string to the console
@@ -50,7 +60,9 @@ export const putStr = (x) => async (IO) => {
};

/**
 * Reads the contents of a file as UTF-8 (functional signature: String -> IO -> IO String).
 * Reads the contents of a file as UTF-8.
 *
 * Functional signature: String -> IO -> IO String)
 *
 * @param {string} path - Path to the file
 * @returns {function(Symbol): Promise<string>} - A CPS IO action that reads the contents of a file as a UTF-8 string
@@ -60,7 +72,9 @@ export const readFile = (path) => async (IO) => {
};

/**
 * Represents the absence of a value (functional signature: Maybe a).
 * Represents the absence of a value.
 *
 * Functional signature: Maybe a)
 *
 * @const
 * @type {{tag: "nothing"}}
@@ -68,7 +82,9 @@ export const readFile = (path) => async (IO) => {
export const nothing = { tag: "nothing" };

/**
 * Wraps a value into a 'just' tagged union (functional signature: a -> Maybe a).
 * Wraps a value into a 'just' tagged union
 *
 * Functional signature: a -> Maybe a)
 *
 * @param {*} value - The value to wrap
 * @returns {{tag: "just", value: *}} - A tagged value
+35 −22
Original line number Diff line number Diff line
@@ -5,12 +5,15 @@
# - pip install aiofiles

import aiofiles
from typing import Callable, Awaitable

from typing import Callable, Awaitable, Any


class InternalError(Exception):
    """
    Custom error type to represent internal application exceptions (functional signature: String -> InternalError).
    Custom error type to represent internal application exceptions.

    Functional signature: String -> InternalError

    Args:
        message (str): The error message.
@@ -20,45 +23,52 @@ class InternalError(Exception):
        super().__init__(message)


# Token representing the 'IO world' in an implicit continuation-passing style (functional signature: Symbol).
# Token representing the 'IO world' in an implicit continuation-passing style.
# Functional signature: Symbol
IO = object()


def create_io(f: Callable[[object], any]) -> any:
def create_io(f: Callable[[object], Any]) -> Any:
    """
    Applies an implicit CPS IO continuation to the IO token (functional signature: ((Symbol) -> a) -> a).
    Applies an implicit CPS IO continuation to the IO token.

    Functional signature: ((IO) -> a) -> a

    Args:
        f (Callable[[Symbol], Any]): A CPS function expecting an IO token.
        f: A CPS function expecting the IO token.

    Returns:
        Any: The result of applying the function to the IO world.
        The result of applying the function to the IO world.
    """
    return f(IO)


def perform_io(f: Callable[[object], Awaitable[any]]) -> Awaitable[any]:
def perform_io(f: Callable[[object], Awaitable[Any]]) -> Awaitable[Any]:
    """
    Alias for create_io which semantically marks a point where an effect is executed (functional signature: ((Symbol) -> a) -> a).
    Semantically marks a point where an effectful IO computation is performed.

    Functional signature: ((IO) -> IO a) -> IO a

    Args:
        f (Callable[[Symbol], Awaitable[Any]]): A CPS function.
        f: A CPS function.

    Returns:
        Awaitable[Any]: The result of the function performing IO operations.
        The result of the function performing IO operations.
    """
    return create_io(lambda w: f(w))


def put_str(x: str) -> Callable[[object], Awaitable[object]]:
    """
    Prints a string to the console (functional signature: String -> IO -> IO).
    Prints a string to the console.

    Functional signature: String -> IO -> IO ()

    Args:
        x (str): Text to print.
        x: Text to print.

    Returns:
        Callable[[Symbol], Awaitable[Symbol]]: A CPS IO action that prints the string.
        A CPS IO action that prints the string.
    """

    async def inner(io: object) -> object:
@@ -70,13 +80,15 @@ def put_str(x: str) -> Callable[[object], Awaitable[object]]:

def read_file(path: str) -> Callable[[object], Awaitable[str]]:
    """
    Reads the contents of a file as UTF-8 (functional signature: String -> IO -> IO String).
    Reads the contents of a file as UTF-8.

    Functional signature: String -> IO -> IO String

    Args:
        path (str): Path to the file.
        path: Path to the file.

    Returns:
        Callable[[Symbol], Awaitable[str]]: A CPS IO action that reads the contents.
        A CPS IO action that reads the contents.
    """

    async def inner(io: object) -> str:
@@ -86,18 +98,19 @@ def read_file(path: str) -> Callable[[object], Awaitable[str]]:
    return inner


# Represents the absence of a value (functional signature: Maybe a).
nothing = {"tag": "nothing"}


def just(value: any) -> dict:
def just(value: Any) -> dict:
    """
    Wraps a value into a 'just' tagged union (functional signature: a -> Maybe a).
    Wraps a value into a 'just' tagged union.

    Functional signature: a -> Maybe a

    Args:
        value (Any): The value to wrap.
        value: The value to wrap.

    Returns:
        dict: A tagged value {'tag': 'just', 'value': *}.
        A tagged value {'tag': 'just', 'value': *}.
    """
    return {"tag": "just", "value": value}
+12 −4
Original line number Diff line number Diff line
@@ -19,7 +19,9 @@ const { Client } = pkg;
 */

/**
 * Connects to the PostgreSQL server using parameters from the config file (functional signature: () -> IO PGConn).
 * Connects to the PostgreSQL server using parameters from the config file.
 *
 * Functional signature: () -> IO PGConn)
 *
 * @returns {function(Symbol): Promise<PGConn>} - A CPS IO action that returns a connected PostgreSQL connection
 * @throws {InternalError} - If the connection fails
@@ -39,7 +41,9 @@ export function connPostgreSQL() {
}

/**
 * Executes an SQL query with parameters, returning no result (functional signature: PGConn -> String -> List String -> IO ()).
 * Executes an SQL query with parameters, returning no result.
 *
 * Functional signature: PGConn -> String -> List String -> IO ())
 *
 * @param {PGConn} conn - A connected PostgreSQL client
 * @param {string} sql - The SQL statement
@@ -60,7 +64,9 @@ export function execQuery(conn, sql, params) {
}

/**
 * Executes an SQL query and returns both field metadata and row data (functional signature: PGConn -> String -> List String -> IO (Maybe { fields, rows})).
 * Executes an SQL query and returns both field metadata and row data.
 *
 * Functional signature: PGConn -> String -> List String -> IO (Maybe { fields, rows}))
 *
 * @param {PGConn} conn - A connected PostgreSQL client
 * @param {string} sql - The SQL query
@@ -91,7 +97,9 @@ export function fetchQueryResult(conn, sql, params) {
}

/**
 * Closes the PostgreSQL connection (functional signature: PGConn -> IO ()).
 * Closes the PostgreSQL connection.
 *
 * Functional signature: PGConn -> IO ())
 *
 * @param {PGConn} conn - A connected PostgreSQL client
 * @returns {function(Symbol): Promise<Symbol>} - A CPS IO action that closes the PostgreSQL connection
Loading