Newer
Older
import time
from astropy.time import Time
from flask import Flask, render_template, Response
from flask_cors import CORS
from flask_socketio import SocketIO
import numpy as np
import json
import base64
app = Flask(__name__)
socketio = SocketIO(app, async_mode='threading') # async_mode=None
def create_circle_mask(width, height, radius):
x = np.arange(width)
y = np.arange(height)
xx, yy = np.meshgrid(x, y, indexing='xy')
circle = (xx - width/2)**2 + (yy - height/2)**2 <= radius**2
return circle.astype(int)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/dm')
def dm():
return render_template('dm.html')
@app.route('/tri')
def tri():
return render_template('tri.html')
@app.route('/smooth')
def smooth():
return render_template('smooth.html')
@app.route('/smooth2')
def smooth2():
return render_template('smooth2.html')
@app.route('/smooth3')
def smooth3():
return render_template('smooth3.html')
@socketio.on('connect')
def handle_connect():
print('Client connected')
@socketio.on('disconnect')
def handle_disconnect():
print('Client disconnected')
def send_timestamp():
while True:
now = Time.now()
socketio.emit('timestamp', now.unix)
time.sleep(1)
def send_mask():
while True:
now = Time.now()
dtype = "uint16"
width = 500
height = 400
radius = 100
matrix = create_circle_mask(width, height, radius)
binary_data = matrix.tobytes()
data_bundle = {
"shape": matrix.shape,
"dtype": dtype,
"data": binary_data,
"timestamp": now.unix,
}
socketio.emit('mask', data_bundle)
print((Time.now()-now).sec, "emitted")
print("-------")
print(f"mask {len(binary_data)}")
def send_dm():
while True:
now = Time.now()
#es = requests.get("http://sharknirws.shark-nir.lbto.org:5001/api/rtc/plots/")
es = {"dm":"ciccio"}
# shape = np.array(es.json()["dm"]["shape"])
# flat = np.array(es.json()["dm"]["flat"])
# matrix = shape-flat
# print(matrix)
# print((Time.now()-now).sec, "fetched")
binary_data = matrix.astype(dtype).tobytes()#.tobytes()
data_bundle = {
"shape": matrix.shape,
"dtype": dtype,
"data": binary_data,
"timestamp": now.unix,
}
print(matrix)
socketio.emit('dm_bundle', data_bundle)
print((Time.now()-now).sec, "emitted")
time.sleep(2)
if __name__ == '__main__':
socketio.start_background_task(send_timestamp)
# socketio.start_background_task(send_mask)
socketio.start_background_task(send_dm)