Newer
Older
'''REST API to launch observation blocks'''
Davide Ricci
committed
# System modules
# Third-party modules
from flask_restx import Namespace, Resource, fields
Davide Ricci
committed
# Custom modules
from api.sequencer_instance import seq
from utils.data_access_object import ObservationBlockObject as DAO
dao = DAO("ob")
############################
# REST API
############################
api = Namespace('sequencer', description='Sequencer')
list_of_ob = api.model('list', {
'id': fields.Integer(readonly=True, description='The OB unique identifier'),
'name': fields.String(required=True, description='The OB name')
})
class BobList(Resource):
"""Show and modify a list of OBs to be sequenced"""
@api.doc('list_ob')
@api.marshal_list_with(list_of_ob)
"""Show all OB in the sequence"""
@api.doc('add_ob')
@api.expect(list_of_ob)
@api.marshal_with(list_of_ob, code=201)
def post(self):
"""Add a new OB to the sequence"""
data = api.payload
@api.route('/<int:id>')
@api.response(404, 'OB not found')
@api.param('id', 'The OB identifier')
class Bob(Resource):
'''Let delete a specific OB'''
@api.doc('delete_ob')
@api.response(204, 'OB deleted')
def delete(self, id):
'''Delete an OB given its identifier'''
dao.delete(id)
return '', 204
@api.route("/run")
class BobRun(Resource):
"""Manage the sequencer"""
def __init__(self, *args, **kwargs):
'''Constructor.'''
super().__init__(self)
self.seq = seq
def get(self):
"""Show the sequencer status"""
Davide Ricci
committed
if self.seq.tpl:
Davide Ricci
committed
res = {
"response": {
"name": self.seq.tpl.name,
"paused": self.seq.tpl.paused,
"quitting": self.seq.tpl.aborted,
"output": self.seq.tpl.output,
"error": self.seq.tpl.error,
"filename": self.seq.tpl.filename,
},
"error": self.seq.error,
}
Davide Ricci
committed
else:
Davide Ricci
committed
res = {
"response": {
"name": None,
"paused": None,
"quitting": None,
"output": None,
"error": None,
"filename": None,
},
"error": "No tpl object",
Davide Ricci
committed
}
Davide Ricci
committed
return res
def put(self):
Davide Ricci
committed
"""Pause or resume the template execution. Inverting the logic with
respect to sequencer class:
PUT /sequencer/run true: resume,
PUT /sequencer/run false: pause
"""
Davide Ricci
committed
in_execution = api.payload
self.seq.resume() if in_execution else self.seq.pause()
Davide Ricci
committed
res = {
"response": self.seq.tpl.paused,
"error": self.seq.error,
Davide Ricci
committed
}
return res
def post(self):
"""Start the sequencer"""
Davide Ricci
committed
payload = api.payload
if not payload:
res = {
"response": None,
Davide Ricci
committed
"error": ["No payload. either ob filename or json with template+params"],
Davide Ricci
committed
else:
try:
ob = dao.show(payload)["filename"]
self.seq.load_file(ob)
self.seq.execute()
Davide Ricci
committed
res = {
"response": self.seq.ob,
"error": self.seq.error,
Davide Ricci
committed
}
except TypeError as e:
#print(e)
try:
ob = payload
print(ob)
self.seq.ob = ob
self.seq.execute()
Davide Ricci
committed
res = {
"response": self.seq.ob,
"error": self.seq.error,
Davide Ricci
committed
}
except Exception as f:
print(f)
res = {
"response": None,
"error": ["Maybe not a json?"],
}
def delete(self):
"""Stop the sequencer"""
self.seq.quit()
"response": self.seq.ob if self.seq.ob else None,
"error": self.seq.error,