Commit 64e6d619 authored by vertighel's avatar vertighel
Browse files

API: passed to quart alla apis. Still issues with blocks.py (I have to tell...

API: passed to quart alla apis. Still issues with blocks.py (I have to tell the default ob/path as noctua-api is now a global executable) and templates.py (not sure that pause/resume/delet work)
parent c0bf5f80
Loading
Loading
Loading
Loading
Loading
+88 −54
Original line number Diff line number Diff line

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

'''Automatic REST APIs'''

# System modules
import configparser
import importlib
import sys

# Third-party modules
from flask import Blueprint
from flask_restx import Api, Namespace, Resource
from pathlib import Path
from quart import Blueprint

# Custom modules
from noctua import devices
from noctua.utils.url_stuff import build_url

# from .blocks import api as blocks_api
# from .environment import api as environment_api
# from .templates import api as templates_api
from .blocks import blocks_api
from .templates import templates_api

api_blueprint = Blueprint('api', __name__)

api = Api(api_blueprint,
          title='Generic APIs for observatory control',
          version='1.0',
          description=['A description']
          )

base = "api." # this module
# Register Manual Blueprints (Namespaces)
api_blueprint.register_blueprint(blocks_api, url_prefix='/blocks')
api_blueprint.register_blueprint(templates_api, url_prefix='/sequencer')

# Load api.ini
ends = configparser.ConfigParser()
ends.read('./config/api.ini')

def dynamic_import(url):
    """Dynamically import into this module api.ini files.
    """

    parts = url.split('/') # /dome/shutter/movement
    namespace = parts[1]  #  dome
    end = '/' + '/'.join(parts[2:]) # /shutter/movement

    dev_api = Namespace(namespace) # adding namespace /dome. Endpoint will be added

    dev = getattr(devices, ends.get(url,"device"))           # devices.light instance
    cls = dev.__class__.__name__                             # string "Switch"
    mod_name = cls.lower()                                   # string "switch"
    module = importlib.import_module(base + mod_name )       # module api.switch
    module_class = getattr(module, ends.get(url,"resource")) # class api.switch.State

    # Assigning endpoint to api.ShutterMovement
    dev_api.add_resource(module_class, end , resource_class_kwargs={"dev": dev})
    api.add_namespace(dev_api) # Adding to /dome: /shutter/movement

for end in ends.sections():
    dynamic_import(end)

# Apart, what is not in the api.ini.

# api.add_namespace(environment_api)

# api.add_namespace(blocks_api)
# api.add_namespace(templates_api)

# api.add_namespace(testfulvio_api)
config_path = Path(__file__).parent.parent / "config" / "api.ini"
ends.read(config_path)

def dynamic_import(url_path):
    """Dynamically import into this module api.ini files."""
    try:
        # Avoid overriding routes already handled by manual Blueprints
        if url_path.startswith(('/blocks', '/sequencer')):
            return

        res_class_name = ends.get(url_path, "resource")
        dev_name = ends.get(url_path, "device")
        
        # Hardware devices logic
        dev_instance = getattr(devices, dev_name)
        mod_name = dev_instance.__class__.__name__.lower()

        module = importlib.import_module(f"noctua.api.{mod_name}")
        resource_class = getattr(module, res_class_name)

        # Register dynamic route as MethodView
        # This automatically maps GET/POST/PUT/DELETE to class methods
        view = resource_class.as_view(url_path, dev=dev_instance)
        api_blueprint.add_url_rule(url_path, view_func=view)
        
        print(f"Route registered: /api{url_path:<40} {module.__name__:>5}.{resource_class.__name__}")            
    except Exception as e:
        print(f"Route skipped:    /api{url_path:<40} -- {str(e):>5}")

for section in ends.sections():
    dynamic_import(section)

# #!/usr/bin/env python3
# # -*- coding: utf-8 -*-

# '''Automatic REST APIs using Quart Blueprints'''
# import configparser
# import importlib
# from pathlib import Path
# from quart import Blueprint
# from noctua import devices
# from .baseresource import BaseResource

# # 1. Importiamo i blueprint "complessi" definiti nei file
# from .blocks import blocks_api
# from .templates import templates_api

# api_blueprint = Blueprint('api', __name__)

# # 2. Caricamento dinamico per api.ini (Camera, Telescope, Switch, etc.)
# ends = configparser.ConfigParser()
# ends.read(Path(__file__).parent.parent / "config" / "api.ini")

# def dynamic_import(url_path):
#     try:
#         # Se la rotta è già gestita dai blueprint manuali, saltala
#         if url_path.startswith(('/blocks', '/sequencer')): return

#         res_class_name = ends.get(url_path, "resource")
#         dev_name = ends.get(url_path, "device")
#         dev_instance = getattr(devices, dev_name)
#         mod_name = dev_instance.__class__.__name__.lower()

#         module = importlib.import_module(f"noctua.api.{mod_name}")
#         resource_class = getattr(module, res_class_name)

#         # Registrazione dinamica come MethodView
#         view = resource_class.as_view(url_path, dev=dev_instance)
#         api_blueprint.add_url_rule(url_path, view_func=view)
#         print(f"Route registered: /api{url_path:<40} {module.__name__:>5}.{resource_class.__name__}")
            
#     except Exception as e:
#         print(f"Route skipped:    /api{url_path:<40} -- {str(e):>5}")

# for section in ends.sections():
#     dynamic_import(section)

# # 3. Registrazione dei Blueprint manuali
# api_blueprint.register_blueprint(blocks_api, url_prefix='/blocks')
# api_blueprint.register_blueprint(templates_api, url_prefix='/sequencer')
+14 −27
Original line number Diff line number Diff line
@@ -8,9 +8,14 @@ import asyncio
from datetime import datetime

# Third-party modules
from quart import request

class BaseResource:
from quart import request, jsonify
from quart.views import MethodView

class BaseResource(MethodView):
    """
    Equivalent to flask_restx Resource.
    Methods get(), post(), put(), delete() will be called automatically.
    """
    def __init__(self, dev):
        self.dev = dev

@@ -18,16 +23,6 @@ class BaseResource:
    def timestamp(self):
        return datetime.utcnow().isoformat()

    @staticmethod
    def error_response(message, status_code):
        """Standardized error format for global app errors."""
        return {
            "raw": None,
            "response": None,
            "error": [message],
            "timestamp": datetime.utcnow().isoformat()
        }, status_code

    async def get_payload(self):
        return await request.get_json(force=True)

@@ -36,8 +31,6 @@ class BaseResource:

    def make_response(self, response_data, raw_data=None, status_code=200):
        errors = getattr(self.dev, 'error', [])
        
        # If hardware reported errors, switch 200 to 500
        if errors and status_code == 200:
            status_code = 500 
        
@@ -47,25 +40,19 @@ class BaseResource:
            "error": errors if errors else None,
            "timestamp": self.timestamp,
        }
        return res, status_code

# --- Helper to register handlers in app.py ---
        return jsonify(res), status_code

def register_error_handlers(app):
    """Binds global HTTP errors to BaseResource JSON format."""
    
    @app.errorhandler(400)
    async def bad_request(e):
        return BaseResource.error_response("Bad Request - Malformed JSON or invalid parameters", 400)

        return jsonify({"error": ["Bad Request"], "timestamp": datetime.utcnow().isoformat()}), 400
    @app.errorhandler(404)
    async def not_found(e):
        return BaseResource.error_response("URL not found", 404)

        return jsonify({"error": ["URL not found"], "timestamp": datetime.utcnow().isoformat()}), 404
    @app.errorhandler(405)
    async def method_not_allowed(e):
        return BaseResource.error_response("Method not allowed", 405)

        return jsonify({"error": ["Method not allowed"], "timestamp": datetime.utcnow().isoformat()}), 405
    @app.errorhandler(500)
    async def internal_error(e):
        return BaseResource.error_response(f"Internal Server Error: {str(e)}", 500)
        return jsonify({"error": [str(e)], "timestamp": datetime.utcnow().isoformat()}), 500
    

noctua/api/blocks.py

0 → 100644
+90 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

'''REST API to edit Observation Blocks'''

# Third-party modules
from quart import Blueprint

# Custom modules
from noctua.utils.data_access_object import ObservationBlockObject as DAO
from noctua.utils.data_access_object import guess
from .baseresource import BaseResource

blocks_api = Blueprint('blocks', __name__)

dao = DAO("ob")
tpl = DAO("defaults")

class BlockList(BaseResource):
    """Show and create OBs"""

    async def get(self):
        """Show all observation OB files"""
        res = await self.run_blocking(lambda: dao.todos)
        return self.make_response(res)

    async def post(self):
        """Create a new OB file based on name"""
        name = await self.get_payload()
        res = await self.run_blocking(dao.create, name)
        return self.make_response(res, status_code=204)

    async def delete(self):
        """Delete an OB file based on name"""
        name = await self.get_payload()
        res = await self.run_blocking(dao.delete, name)
        return self.make_response(res, status_code=204)


class Block(BaseResource):
    """Show a selected OB, update it, or add a new
    template in it.
    """

    async def get(self, name):
        """Show a specific OB"""
        res = await self.run_blocking(dao.show, name)
        return self.make_response(res)

    async def put(self, name):
        """Update the OB"""
        data = await self.get_payload()
        
        def logic():
            for datum in data:
                for key, val in datum["params"].items():
                    datum["params"][key] = guess(val)
            return dao.update(name, data)
            
        res = await self.run_blocking(logic)
        return self.make_response(res, status_code=204)

    async def post(self, name):
        """Add a template to the selected OB"""
        payload = await self.get_payload()
        
        def logic():
            content = dao.content(name)
            new_data = tpl.content(payload)
            data = content + new_data
            return dao.update(name, data)
            
        res = await self.run_blocking(logic)
        return self.make_response(res, status_code=204)

    async def delete(self, name):
        """Delete a template instance in the selected OB"""
        instance = await self.get_payload()
        
        def logic():
            data = dao.content(name)
            data.pop(instance - 1) # jinjia loop starts from 1 in html
            return dao.update(name, data)
            
        res = await self.run_blocking(logic)
        return self.make_response(res, status_code=204)

# Association of classes to endpoints (internal routing)
blocks_api.add_url_rule('/', view_func=BlockList.as_view('block_list', dev=dao))
blocks_api.add_url_rule('/<name>', view_func=Block.as_view('block_detail', dev=dao))
+0 −423
Original line number Diff line number Diff line
@@ -3,9 +3,6 @@

'''REST API for Camera related operations'''

# Third-party modules
from quart import request

# This module imports
from .baseresource import BaseResource
from noctua.config import constants
@@ -251,423 +248,3 @@ class Settings(BaseResource):
        '''Retrieve all-in-one the settings of the camera.'''
        res = await self.run_blocking(lambda: self.dev.all)
        return self.make_response(res)


# # System modules

# # Third-party modules
# from quart import request
# import asyncio

# # Custom modules
# from config import constants
# from .baseresource import ResourceDev
# from .sequencer_instance import seq

# # @api.route("/frame/binning")
# class FrameBinning(ResourceDev):
#     """Binning of the camera."""

#     # def get(self):
#     #     """Retrieve the binning of the camera."""
#     #     res = {
#     #         "response": self.dev.binning,
#     #         "error": self.dev.error,
#     #         "timestamp": self.timestamp,
#     #     }
#     #     return res

#     def put(self):
#         """Set a new binning for the camera."""
#         binning = request.json
#         self.dev.binning = binning
#         res = {
#             "response": self.dev.binning,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/cooler")
# class Cooler(ResourceDev):
#     """Manage the CCD cooler status"""

#     def get(self):
#         """Check wether the CCD cooler is on or off."""
#         res = {
#             "raw": self.dev.cooler,
#             "response": constants.on_off[self.dev.cooler],
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res

#     def put(self):
#         """Set on or off the CCD cooler."""
#         state = request.json
#         self.dev.cooler = state
#         res = {
#             "response": self.dev.cooler,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/cooler/temperature/setpoint")
# class CoolerTemperatureSetpoint(ResourceDev):
#     """Manage the CCD temperature"""

#     def put(self):
#         """Set a new temperature of the CCD."""
#         state = request.json
#         self.dev.temperature = state
#         res = {
#             "response": self.dev.temperature,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/filters")
# class Filters(ResourceDev):
#     """Camera filters names."""

#     def get(self):
#         """Retrieve the filter names."""
#         res = {
#             "response": constants.filter_number,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/filter")
# class Filter(ResourceDev):
#     """Camera filter information."""

#     def get(self):
#         """Retrieve the current filter."""
#         res = {
#             "raw": self.dev.filter,
#             "response": constants.filter_name[self.dev.filter],
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/filter/movement")
# class FilterMovement(ResourceDev):
#     """Manage the camera filter wheel."""

#     def get(self):
#         """Check if the filter wheel is moving."""
#         res = {
#             "raw": self.dev.is_moving,
#             "response": constants.filter_state[self.dev.is_moving],
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res

#     def post(self):
#         """Set a new filter."""
#         target = request.json
#         self.dev.filter = target
#         res = {
#             "response": self.dev.filter,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/frame/custom")
# class FrameCustom(ResourceDev):
#     """Camera custom frame."""

#     def put(self):
#         """Set a custom windowing."""
#         new_frame = request.json
#         self.dev.binning = new_frame["binning"]
#         frame = {
#             "xrange": new_frame["xrange"],
#             "yrange": new_frame["yrange"],
#             "binning": new_frame["binning"]
#         }
#         res = {
#             "response": frame,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/frame/full")
# class FrameFull(ResourceDev):
#     """Camera full frame."""

#     def put(self):
#         """Set the ccd to full frame in current binning."""
#         res = {
#             "response": self.dev.full_frame(),
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/frame/half")
# class FrameHalf(ResourceDev):
#     """Camera frame of half size the full frame."""

#     def put(self):
#         """Set a center ccd window spanning half the
#         size of the full frame in the current binning."""
#         res = {
#             "response": self.dev.half_frame(),
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/frame/small")
# class FrameSmall(ResourceDev):
#     """Camera frame of 2 arcmin."""

#     def put(self):
#         """Set a center ccd window spanning 2 arcminutes
#         on the sky."""
#         res = {
#             "response": self.dev.small_frame(),
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/snapshot/")
# class Snapshot(ResourceDev):
#     """The acquired image."""

#     def post(self):
#         """Start a simple acquisition."""
#         new = request.json  # exptime, type
#         res = {
#             "response": self.dev.start(new["exptime"],
#                                           new["type"],
#                                           self.timestamp),
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res

#     def delete(self):
#         """Stop the acquisition process."""
#         res = {
#             "response": self.dev.abort(),
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res

# # @api.route("/snapshot/state")
# class SnapshotState(ResourceDev):
#     """Manage the acquisition of an image."""

#     def get(self):
#         """Retrieve the status of the acquisition process."""
#         res = {            
#             "raw": self.dev.state,
#             "response": constants.camera_state[self.dev.state],
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res

# # @api.route("/snapshot/acquisition")
# class SnapshotAcquisition(ResourceDev):
#     """Manage the acquisition of an image using th sequencer."""

#     def get(self, seq=seq):
#         """Retrieve the status of the acquisition process."""

#         res = {
#             "response": {
#                 "paused": seq.tpl.paused if seq.tpl else None,
#                 "quitting": seq.tpl.aborted if seq.tpl else None,
#             },
#             "error": "ciccio",
#             "timestamp": self.timestamp,
#         }

#         return res

#     def post(self, seq=seq):
#         """Start a new acquisition."""
#         data = request.json  # exptime, type, repeat, recenter

#         # Run the template given its parameters in the json.
#         seq.tpl.run(data)

#         res = {
#             "response": seq.tpl.filenames,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }

#         return res

#     def delete(self, seq=seq):
#         """Stop the acquisition process."""
#         res = {
#             "response": seq.tpl.abort(),
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res

# # @api.route("/snapshot/recenter")
# class SnapshotRecenter(ResourceDev):
#     """Manage the recentering via the the apply_offset function"""

#     def get(self, seq=seq):
#         """Retrieve the recenter status and parameters of the box."""

#         res = {
#             "response": {
#                 "recenter": getattr(seq.tpl, 'recenter', None),
#                 "box": getattr(seq.tpl, 'box', None),
#             },
#             "error": "ciccio",
#             "timestamp": self.timestamp,
#         }

#         return res

#     def put(self, seq=seq):
#         """Set a given box to the recentering function"""
#         data = request.json  # exptime, type, repeat, recenter

#         seq.tpl.box = data

#         res = {
#             "response": getattr(seq.tpl, 'box', None),
#             "error": "ciccio",
#             "timestamp": self.timestamp,
#         }

#         return res

#     def post(self, seq=seq):
#         """Enable the recentering function."""

#         seq.tpl.recenter = True

#         res = {
#             "response": getattr(seq.tpl, 'recenter', None),
#             "error": "ciccio",
#             "timestamp": self.timestamp,
#         }

#         return res

#     def delete(self, seq=seq):
#         """Disable the recentering function."""

#         seq.tpl.recenter = False

#         res = {
#             "response": getattr(seq.tpl, 'recenter', None),
#             "error": "ciccio",
#             "timestamp": self.timestamp,
#         }

#         return res

# # @api.route("/snapshot/domeslewing")
# class SnapshotDomeslewing(ResourceDev):
#     """Get dome slewing status"""

#     def get(self, seq=seq):
#         """Retrieve the domeslewing status"""

#         res = {
#             "response": {
#                 "domeslewing": getattr(seq.tpl, 'domeslewing', None),
#             },
#             "error": "ciccio",
#             "timestamp": self.timestamp,
#         }

#         return res

#     def post(self, seq=seq):
#         """Enable the domeslewing function."""

#         seq.tpl.domeslewing = True

#         res = {
#             "response":  getattr(seq.tpl, 'domeslewing', None),
#             "error": "ciccio",
#             "timestamp": self.timestamp,
#         }

#         return res

#     def delete(self, seq=seq):
#         """Disable the domeslewing function."""

#         seq.tpl.domeslewing = False

#         res = {
#             "response":  getattr(seq.tpl, 'domeslewing', None),
#             "error": "ciccio",
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/cooler/warmup")
# class CoolerWarmup(ResourceDev):
#     """Manage the warmup of the CCD."""

#     def post(self):
#         """Start the warm up the CCD."""

#     def delete(self):
#         """Stop the warm up of the CCD."""


# # @api.route("/connection")
# class Connection(ResourceDev):
#     '''Manage the connection to ASCOM.'''

#     def get(self):
#         '''Check if the telescope is connected to ASCOM.'''

#         res = {
#             "response": self.dev.connection,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res

# # @api.route("/settings")
# class Settings(ResourceDev):
#     '''General telescope settings.'''

#     def get(self):
#         '''Retrieve all-in-one the settings of the camera.'''

#         res = {
#             "response": self.dev.all,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res

noctua/api/common.py

0 → 100644
+14 −0
Original line number Diff line number Diff line
import asyncio
from datetime import datetime
from dataclasses import dataclass
from typing import Optional, Any

@dataclass
class StandardResponse:
    response: Any
    error: Optional[list] = None
    timestamp: str = datetime.utcnow().isoformat()

async def wrap_blocking(func, *args, **kwargs):
    """Esegue una funzione sincrona (L1) in un thread separato"""
    return await asyncio.to_thread(func, *args, **kwargs)
Loading