Commit c0bf5f80 authored by vertighel's avatar vertighel
Browse files

Started with api layer: passed to quart+uvicorn, tested offline api/camera.py....

Started with api layer: passed to quart+uvicorn, tested offline api/camera.py. Added executable noctua-api to start the api server
parent 88c0abd1
Loading
Loading
Loading
Loading
Loading

noctua/api/__init__.py

0 → 100644
+67 −0
Original line number Diff line number Diff line

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

'''Automatic REST APIs'''

# System modules
import configparser
import importlib
import sys

# Third-party modules
from flask import Blueprint
from flask_restx import Api, Namespace, Resource

# Custom modules
from noctua import devices
from noctua.utils.url_stuff import build_url

# from .blocks import api as blocks_api
# from .environment import api as environment_api
# from .templates import api as templates_api

api_blueprint = Blueprint('api', __name__)

api = Api(api_blueprint,
          title='Generic APIs for observatory control',
          version='1.0',
          description=['A description']
          )

base = "api." # this module

ends = configparser.ConfigParser()
ends.read('./config/api.ini')

def dynamic_import(url):
    """Dynamically import into this module api.ini files.
    """

    parts = url.split('/') # /dome/shutter/movement
    namespace = parts[1]  #  dome
    end = '/' + '/'.join(parts[2:]) # /shutter/movement

    dev_api = Namespace(namespace) # adding namespace /dome. Endpoint will be added

    dev = getattr(devices, ends.get(url,"device"))           # devices.light instance
    cls = dev.__class__.__name__                             # string "Switch"
    mod_name = cls.lower()                                   # string "switch"
    module = importlib.import_module(base + mod_name )       # module api.switch
    module_class = getattr(module, ends.get(url,"resource")) # class api.switch.State

    # Assigning endpoint to api.ShutterMovement
    dev_api.add_resource(module_class, end , resource_class_kwargs={"dev": dev})
    api.add_namespace(dev_api) # Adding to /dome: /shutter/movement

for end in ends.sections():
    dynamic_import(end)

# Apart, what is not in the api.ini.

# api.add_namespace(environment_api)

# api.add_namespace(blocks_api)
# api.add_namespace(templates_api)

# api.add_namespace(testfulvio_api)
+71 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

'''Base class for REST API'''
# System modules

import asyncio
from datetime import datetime

# Third-party modules
from quart import request

class BaseResource:
    def __init__(self, dev):
        self.dev = dev

    @property
    def timestamp(self):
        return datetime.utcnow().isoformat()

    @staticmethod
    def error_response(message, status_code):
        """Standardized error format for global app errors."""
        return {
            "raw": None,
            "response": None,
            "error": [message],
            "timestamp": datetime.utcnow().isoformat()
        }, status_code

    async def get_payload(self):
        return await request.get_json(force=True)

    async def run_blocking(self, func, *args, **kwargs):
        return await asyncio.to_thread(func, *args, **kwargs)

    def make_response(self, response_data, raw_data=None, status_code=200):
        errors = getattr(self.dev, 'error', [])
        
        # If hardware reported errors, switch 200 to 500
        if errors and status_code == 200:
            status_code = 500 
        
        res = {
            "raw": raw_data if raw_data is not None else response_data,
            "response": response_data,
            "error": errors if errors else None,
            "timestamp": self.timestamp,
        }
        return res, status_code

# --- Helper to register handlers in app.py ---

def register_error_handlers(app):
    """Binds global HTTP errors to BaseResource JSON format."""
    
    @app.errorhandler(400)
    async def bad_request(e):
        return BaseResource.error_response("Bad Request - Malformed JSON or invalid parameters", 400)

    @app.errorhandler(404)
    async def not_found(e):
        return BaseResource.error_response("URL not found", 404)

    @app.errorhandler(405)
    async def method_not_allowed(e):
        return BaseResource.error_response("Method not allowed", 405)

    @app.errorhandler(500)
    async def internal_error(e):
        return BaseResource.error_response(f"Internal Server Error: {str(e)}", 500)

noctua/api/camera.py

0 → 100644
+673 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

'''REST API for Camera related operations'''

# Third-party modules
from quart import request

# This module imports
from .baseresource import BaseResource
from noctua.config import constants
from noctua.api.sequencer_instance import seq

class FrameBinning(BaseResource):
    """Binning of the camera."""

    async def put(self):
        """Set a new binning for the camera."""
        binning = await self.get_payload()
        def action():
            self.dev.binning = binning
            return self.dev.binning
        res = await self.run_blocking(action)
        return self.make_response(res)

class Cooler(BaseResource):
    """Manage the CCD cooler status"""

    async def get(self):
        """Check wether the CCD cooler is on or off."""
        raw = await self.run_blocking(lambda: self.dev.cooler)
        res = constants.on_off.get(raw, "N/A")
        return self.make_response(res, raw_data=raw)

    async def put(self):
        """Set on or off the CCD cooler."""
        state = await self.get_payload()
        def action():
            self.dev.cooler = state
            return self.dev.cooler
        res = await self.run_blocking(action)
        return self.make_response(res)

class CoolerTemperatureSetpoint(BaseResource):
    """Manage the CCD temperature"""

    async def put(self):
        """Set a new temperature of the CCD."""
        state = await self.get_payload()
        def action():
            self.dev.temperature = state
            return self.dev.temperature
        res = await self.run_blocking(action)
        return self.make_response(res)

class Filters(BaseResource):
    """Camera filters names."""

    async def get(self):
        """Retrieve the filter names."""
        # Constants are not blocking
        res = constants.filter_number
        return self.make_response(res)

class Filter(BaseResource):
    """Camera filter information."""

    async def get(self):
        """Retrieve the current filter."""
        raw = await self.run_blocking(lambda: self.dev.filter)
        res = constants.filter_name.get(raw, "Undef.")
        return self.make_response(res, raw_data=raw)

class FilterMovement(BaseResource):
    """Manage the camera filter wheel."""

    async def get(self):
        """Check if the filter wheel is moving."""
        raw = await self.run_blocking(lambda: self.dev.is_moving)
        res = constants.filter_state.get(raw, "Off")
        return self.make_response(res, raw_data=raw)

    async def post(self):
        """Set a new filter."""
        target = await self.get_payload()
        def action():
            self.dev.filter = target
            return self.dev.filter
        res = await self.run_blocking(action)
        return self.make_response(res)
class FrameCustom(BaseResource):
    """Camera custom frame."""

    async def put(self):
        """Set a custom windowing."""
        new_frame = await self.get_payload()
        def action():
            self.dev.binning = new_frame["binning"]
            # Logic depends on driver implementation of set_window
            return new_frame
        res = await self.run_blocking(action)
        return self.make_response(res)

class FrameFull(BaseResource):
    """Camera full frame."""

    async def put(self):
        """Set the ccd to full frame in current binning."""
        res = await self.run_blocking(self.dev.full_frame)
        return self.make_response(res)

class FrameHalf(BaseResource):
    """Camera frame of half size the full frame."""

    async def put(self):
        """Set a center ccd window spanning half the
        size of the full frame in the current binning."""
        res = await self.run_blocking(self.dev.half_frame)
        return self.make_response(res)

class FrameSmall(BaseResource):
    """Camera frame of 2 arcmin."""

    async def put(self):
        """Set a center ccd window spanning 2 arcminutes
        on the sky."""
        res = await self.run_blocking(self.dev.small_frame)
        return self.make_response(res)

class Snapshot(BaseResource):
    """The acquired image."""

    async def post(self):
        """Start a simple acquisition."""
        new = await self.get_payload()  # exptime, type
        res = await self.run_blocking(
            self.dev.start, new["exptime"], new["type"], self.timestamp
        )
        return self.make_response(res)

    async def delete(self):
        """Stop the acquisition process."""
        res = await self.run_blocking(self.dev.abort)
        return self.make_response(res)

class SnapshotState(BaseResource):
    """Manage the acquisition of an image."""

    async def get(self):
        """Retrieve the status of the acquisition process."""
        raw = await self.run_blocking(lambda: self.dev.state)
        res = constants.camera_state.get(raw, "Off")
        return self.make_response(res, raw_data=raw)

class SnapshotAcquisition(BaseResource):
    """Manage the acquisition of an image using the sequencer."""

    async def get(self):
        """Retrieve the status of the acquisition process."""
        data = {
            "paused": seq.tpl.paused if seq.tpl else None,
            "quitting": seq.tpl.aborted if seq.tpl else None,
        }
        return self.make_response(data)

    async def post(self):
        """Start a new acquisition."""
        data = await self.get_payload()
        # Templates are Layer 2
        await self.run_blocking(seq.tpl.run, data)
        return self.make_response(getattr(seq.tpl, 'filenames', []))

    async def delete(self):
        """Stop the acquisition process."""
        res = await self.run_blocking(seq.tpl.abort)
        return self.make_response(res)

class SnapshotRecenter(BaseResource):
    """Manage the recentering via the the apply_offset function"""

    async def get(self):
        """Retrieve the recenter status and parameters of the box."""
        res = {
            "recenter": getattr(seq.tpl, 'recenter', None),
            "box": getattr(seq.tpl, 'box', None),
        }
        return self.make_response(res)

    async def put(self):
        """Set a given box to the recentering function"""
        data = await self.get_payload()
        if seq.tpl:
            seq.tpl.box = data
        return self.make_response(getattr(seq.tpl, 'box', None))

    async def post(self):
        """Enable the recentering function."""
        if seq.tpl:
            seq.tpl.recenter = True
        return self.make_response(True)

    async def delete(self):
        """Disable the recentering function."""
        if seq.tpl:
            seq.tpl.recenter = False
        return self.make_response(False)

class SnapshotDomeslewing(BaseResource):
    """Get dome slewing status"""

    async def get(self):
        """Retrieve the domeslewing status"""
        res = {"domeslewing": getattr(seq.tpl, 'domeslewing', None)}
        return self.make_response(res)

    async def post(self):
        """Enable the domeslewing function."""
        if seq.tpl:
            seq.tpl.domeslewing = True
        return self.make_response(True)

    async def delete(self):
        """Disable the domeslewing function."""
        if seq.tpl:
            seq.tpl.domeslewing = False
        return self.make_response(False)

class CoolerWarmup(BaseResource):
    """Manage the warmup of the CCD."""

    async def post(self):
        """Start the warm up the CCD."""
        return self.make_response("Warmup sequence initiated")

    async def delete(self):
        """Stop the warm up of the CCD."""
        return self.make_response("Warmup sequence aborted")

class Connection(BaseResource):
    '''Manage the connection to ASCOM.'''

    async def get(self):
        '''Check if the telescope is connected to ASCOM.'''
        res = await self.run_blocking(lambda: self.dev.connection)
        return self.make_response(res)

class Settings(BaseResource):
    '''General camera settings.'''

    async def get(self):
        '''Retrieve all-in-one the settings of the camera.'''
        res = await self.run_blocking(lambda: self.dev.all)
        return self.make_response(res)


# # System modules

# # Third-party modules
# from quart import request
# import asyncio

# # Custom modules
# from config import constants
# from .baseresource import ResourceDev
# from .sequencer_instance import seq

# # @api.route("/frame/binning")
# class FrameBinning(ResourceDev):
#     """Binning of the camera."""

#     # def get(self):
#     #     """Retrieve the binning of the camera."""
#     #     res = {
#     #         "response": self.dev.binning,
#     #         "error": self.dev.error,
#     #         "timestamp": self.timestamp,
#     #     }
#     #     return res

#     def put(self):
#         """Set a new binning for the camera."""
#         binning = request.json
#         self.dev.binning = binning
#         res = {
#             "response": self.dev.binning,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/cooler")
# class Cooler(ResourceDev):
#     """Manage the CCD cooler status"""

#     def get(self):
#         """Check wether the CCD cooler is on or off."""
#         res = {
#             "raw": self.dev.cooler,
#             "response": constants.on_off[self.dev.cooler],
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res

#     def put(self):
#         """Set on or off the CCD cooler."""
#         state = request.json
#         self.dev.cooler = state
#         res = {
#             "response": self.dev.cooler,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/cooler/temperature/setpoint")
# class CoolerTemperatureSetpoint(ResourceDev):
#     """Manage the CCD temperature"""

#     def put(self):
#         """Set a new temperature of the CCD."""
#         state = request.json
#         self.dev.temperature = state
#         res = {
#             "response": self.dev.temperature,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/filters")
# class Filters(ResourceDev):
#     """Camera filters names."""

#     def get(self):
#         """Retrieve the filter names."""
#         res = {
#             "response": constants.filter_number,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/filter")
# class Filter(ResourceDev):
#     """Camera filter information."""

#     def get(self):
#         """Retrieve the current filter."""
#         res = {
#             "raw": self.dev.filter,
#             "response": constants.filter_name[self.dev.filter],
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/filter/movement")
# class FilterMovement(ResourceDev):
#     """Manage the camera filter wheel."""

#     def get(self):
#         """Check if the filter wheel is moving."""
#         res = {
#             "raw": self.dev.is_moving,
#             "response": constants.filter_state[self.dev.is_moving],
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res

#     def post(self):
#         """Set a new filter."""
#         target = request.json
#         self.dev.filter = target
#         res = {
#             "response": self.dev.filter,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/frame/custom")
# class FrameCustom(ResourceDev):
#     """Camera custom frame."""

#     def put(self):
#         """Set a custom windowing."""
#         new_frame = request.json
#         self.dev.binning = new_frame["binning"]
#         frame = {
#             "xrange": new_frame["xrange"],
#             "yrange": new_frame["yrange"],
#             "binning": new_frame["binning"]
#         }
#         res = {
#             "response": frame,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/frame/full")
# class FrameFull(ResourceDev):
#     """Camera full frame."""

#     def put(self):
#         """Set the ccd to full frame in current binning."""
#         res = {
#             "response": self.dev.full_frame(),
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/frame/half")
# class FrameHalf(ResourceDev):
#     """Camera frame of half size the full frame."""

#     def put(self):
#         """Set a center ccd window spanning half the
#         size of the full frame in the current binning."""
#         res = {
#             "response": self.dev.half_frame(),
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/frame/small")
# class FrameSmall(ResourceDev):
#     """Camera frame of 2 arcmin."""

#     def put(self):
#         """Set a center ccd window spanning 2 arcminutes
#         on the sky."""
#         res = {
#             "response": self.dev.small_frame(),
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/snapshot/")
# class Snapshot(ResourceDev):
#     """The acquired image."""

#     def post(self):
#         """Start a simple acquisition."""
#         new = request.json  # exptime, type
#         res = {
#             "response": self.dev.start(new["exptime"],
#                                           new["type"],
#                                           self.timestamp),
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res

#     def delete(self):
#         """Stop the acquisition process."""
#         res = {
#             "response": self.dev.abort(),
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res

# # @api.route("/snapshot/state")
# class SnapshotState(ResourceDev):
#     """Manage the acquisition of an image."""

#     def get(self):
#         """Retrieve the status of the acquisition process."""
#         res = {            
#             "raw": self.dev.state,
#             "response": constants.camera_state[self.dev.state],
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res

# # @api.route("/snapshot/acquisition")
# class SnapshotAcquisition(ResourceDev):
#     """Manage the acquisition of an image using th sequencer."""

#     def get(self, seq=seq):
#         """Retrieve the status of the acquisition process."""

#         res = {
#             "response": {
#                 "paused": seq.tpl.paused if seq.tpl else None,
#                 "quitting": seq.tpl.aborted if seq.tpl else None,
#             },
#             "error": "ciccio",
#             "timestamp": self.timestamp,
#         }

#         return res

#     def post(self, seq=seq):
#         """Start a new acquisition."""
#         data = request.json  # exptime, type, repeat, recenter

#         # Run the template given its parameters in the json.
#         seq.tpl.run(data)

#         res = {
#             "response": seq.tpl.filenames,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }

#         return res

#     def delete(self, seq=seq):
#         """Stop the acquisition process."""
#         res = {
#             "response": seq.tpl.abort(),
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res

# # @api.route("/snapshot/recenter")
# class SnapshotRecenter(ResourceDev):
#     """Manage the recentering via the the apply_offset function"""

#     def get(self, seq=seq):
#         """Retrieve the recenter status and parameters of the box."""

#         res = {
#             "response": {
#                 "recenter": getattr(seq.tpl, 'recenter', None),
#                 "box": getattr(seq.tpl, 'box', None),
#             },
#             "error": "ciccio",
#             "timestamp": self.timestamp,
#         }

#         return res

#     def put(self, seq=seq):
#         """Set a given box to the recentering function"""
#         data = request.json  # exptime, type, repeat, recenter

#         seq.tpl.box = data

#         res = {
#             "response": getattr(seq.tpl, 'box', None),
#             "error": "ciccio",
#             "timestamp": self.timestamp,
#         }

#         return res

#     def post(self, seq=seq):
#         """Enable the recentering function."""

#         seq.tpl.recenter = True

#         res = {
#             "response": getattr(seq.tpl, 'recenter', None),
#             "error": "ciccio",
#             "timestamp": self.timestamp,
#         }

#         return res

#     def delete(self, seq=seq):
#         """Disable the recentering function."""

#         seq.tpl.recenter = False

#         res = {
#             "response": getattr(seq.tpl, 'recenter', None),
#             "error": "ciccio",
#             "timestamp": self.timestamp,
#         }

#         return res

# # @api.route("/snapshot/domeslewing")
# class SnapshotDomeslewing(ResourceDev):
#     """Get dome slewing status"""

#     def get(self, seq=seq):
#         """Retrieve the domeslewing status"""

#         res = {
#             "response": {
#                 "domeslewing": getattr(seq.tpl, 'domeslewing', None),
#             },
#             "error": "ciccio",
#             "timestamp": self.timestamp,
#         }

#         return res

#     def post(self, seq=seq):
#         """Enable the domeslewing function."""

#         seq.tpl.domeslewing = True

#         res = {
#             "response":  getattr(seq.tpl, 'domeslewing', None),
#             "error": "ciccio",
#             "timestamp": self.timestamp,
#         }

#         return res

#     def delete(self, seq=seq):
#         """Disable the domeslewing function."""

#         seq.tpl.domeslewing = False

#         res = {
#             "response":  getattr(seq.tpl, 'domeslewing', None),
#             "error": "ciccio",
#             "timestamp": self.timestamp,
#         }
#         return res


# # @api.route("/cooler/warmup")
# class CoolerWarmup(ResourceDev):
#     """Manage the warmup of the CCD."""

#     def post(self):
#         """Start the warm up the CCD."""

#     def delete(self):
#         """Stop the warm up of the CCD."""


# # @api.route("/connection")
# class Connection(ResourceDev):
#     '''Manage the connection to ASCOM.'''

#     def get(self):
#         '''Check if the telescope is connected to ASCOM.'''

#         res = {
#             "response": self.dev.connection,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res

# # @api.route("/settings")
# class Settings(ResourceDev):
#     '''General telescope settings.'''

#     def get(self):
#         '''Retrieve all-in-one the settings of the camera.'''

#         res = {
#             "response": self.dev.all,
#             "error": self.dev.error,
#             "timestamp": self.timestamp,
#         }
#         return res

noctua/api/test.py

0 → 100644
+37 −0
Original line number Diff line number Diff line
from quart import request
import asyncio

class TestResource:
    def __init__(self, dev):
        self.dev = dev

    async def get(self):
        """Esempio GET: Legge lo stato del device"""
        return {
            "response": self.dev.state,
            "device": self.dev.device_name,
            "method": "GET"
        }

    async def post(self):
        """Esempio POST: Azione impulsiva (es. reset)"""
        return {
            "response": "Action executed",
            "method": "POST"
        }

    async def put(self):
        """Esempio PUT: Aggiorna un valore (es. cambia stato)"""
        data = await request.get_json()
        self.dev.state = data
        return {
            "response": self.dev.state,
            "method": "PUT"
        }

    async def delete(self):
        """Esempio DELETE: Abort / Stop"""
        return {
            "response": "Operation aborted",
            "method": "DELETE"
        }

noctua/app.py

0 → 100755
+70 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading