Skip to content
Commits on Source (4)
Subproject commit a55e5a75be55d58ced833d39a0b0f4e31c6bf150
Subproject commit db7ecbf313ed4e79ee77063cb47dd1d9ea40ddd0
Subproject commit b9c7127f512229371df5c98cda9a7bd728eaae83
Subproject commit a4c32676a99747fa8210886b20bdc39620635039
Subproject commit 8a0ea2d0e699863df5fe1c91caf2d7b0855957be
Subproject commit a00f9a27afbf5f75dab7db2368b9b9b6fcb395e1
Subproject commit 040a6736589b17033f28c9ad2f71879c1fcc7453
Subproject commit 4ab1353f28eccfa75f7216306ed9c893bced1083
Subproject commit 98876147870a418cab1451fe23d17a99a7337b45
Subproject commit 1f1094c8ab8558b8810b55573c4f08d1d4699800
Subproject commit e06b5132e2da1e49d4338be58d913b6304e5df8a
Subproject commit a5881a457f74ea6f5b9b1320d02cc4fda2046821
Subproject commit b8331fbae46c2b4f9670234caa2a9a23b389436e
Subproject commit 3afcad80087b5d67cf3ebecf3df575727ba08493
Subproject commit 3bc7181896b908f1bb89350f362394f9eae942c2
Subproject commit 5b7dc0ff506da6ff384a9d03f961675d4c5d68e1
Subproject commit ebbb0f36843dd8f58e3b0442bfe5078732b3e022
Subproject commit 3cf7397511847e3ffbf42a736aac74a97b131210
Subproject commit deb7edafd164eb518eaa7ad37141344a4dbe8eb9
Subproject commit a5b2a882f9bbc8d092bcd332769f10a1e86e3303
Subproject commit b955553fff662a97a49bc1f2169de4aea75a2e4a
Subproject commit 5e5f967aa57f9eb47c4084c9f02043c2031d4903
Subproject commit ac1a833c70b943c4afaf3984a9575892e196ef17
Subproject commit 1c075ab31001a7b1d9b269ec35e095b2169e9367
Subproject commit 8176ab6718c7b75c564b706cbf82e4133fcb6988
Subproject commit 619b119743e1f4de6313910e037447f5fe2876d4
#pragma once
#include <Base_DAQ.h>
......@@ -6,70 +7,234 @@
namespace inaf::oasbo::DAQ {
/**
* @brief The AstriDAQ class represents the ASTRI Data Acquisition (DAQ) system.
*
* This class inherits from the BaseDAQ class and provides additional functionality specific to the ASTRI DAQ system.
* It handles the connection to the provider, packet reception and processing, and state management.
*/
class AstriDAQ: public BaseDAQ {
protected:
/**
* @brief The scheduling_block struct represents a scheduling block for data acquisition.
*
* It contains information such as the date, target ID, observation mode, telescope ID, and run ID.
*/
struct scheduling_block {
std::string date, target_id, obs_mode;
int tel_id, run_id;
};
bool firstIdle;
int daq_id = 0;
void updateDaqIDinConf();
void updateDaqIDinConf(); // update daq_id in the configuration in order to keep track of the last daq_id used.
int connectProvider();
std::thread checkProvConn;
bool checkProvThreadFlag = false;
std::thread checkProvConn; // a thread to check, in loop, if the connection to the provider is still up.
int connectProvider(); // the function used in checkProvConn thread.
bool checkProvThreadFlag = false; // the flag to stop the checkProvConn thread.
std::thread receiveAndProcessThread;
bool runningFlag = false;
int receiveAndProcessPacket();
std::thread receiveAndDeliverThread; // a thread to receive and deliver packets.
bool runningFlag = false; // a flag that tells if the DAQ should be run.
int receiveAndDeliverPacket(); // receive packer from the receiver, process, and deliver to the archiver, provider etc.
/**
* @brief Prints the given message to the given output stream.
*
* @param os The output stream to print to.
* @param message The message to print.
*/
void printLog(std::ostream &os, std::string message);
public:
/**
* @brief Constructs an AstriDAQ object.
*/
AstriDAQ();
/**
* @brief Destroys the AstriDAQ object.
*/
~AstriDAQ();
std::string getStateStr(Status);
std::string getStateStr(Status) override;
void start() override;
void stop() override;
void switchState(const Status) override;
int deliverPacket() override;
/**
* @brief Updates the observers of the data acquisition system.
*/
void updateObservers();
/**
* @brief Updates the archiver observers of the data acquisition system.
*/
void updateArchiverObservers();
void setDaqID(int);
/**
* @brief Sets the daqID
* @param id The daqID to set.
*/
void setDaqID(int id);
/**
* @brief Gets the daqID.
*
* @return The daqID.
*/
int getDaqID();
/**
* @brief Increments the daqID.
*/
void incrementDaqID();
std::string computeNewDestination(Status);
/**
* @brief Computes the new destination based on the given status. // the file name based on the Astri Ma file naming convention.
*
* @param status The status to compute the new destination for.
* @return The computed new destination.
*/
std::string computeNewDestination(Status status);
scheduling_block scheduling_block;
friend class AstriDAQBuilder;
};
/**
* @brief The AstriDAQBuilder class is responsible for building an AstriDAQ object.
*/
class AstriDAQBuilder {
protected:
AstriDAQ* daq;
public:
/**
* @brief Constructs an AstriDAQBuilder object.
*/
AstriDAQBuilder();
/**
* @brief Destroys the AstriDAQBuilder object.
*/
~AstriDAQBuilder();
/**
* @brief Resets the builder to its initial state.
*/
void reset();
/**
* @brief Configures the builder from the given BaseConfigurator object.
*
* @param conf The BaseConfigurator object to configure from.
* @return The configured AstriDAQBuilder object.
*/
AstriDAQBuilder* configFrom(Configurators::BaseConfigurator &conf);
AstriDAQBuilder* setProtocol(Receivers::BaseReceiver*);
AstriDAQBuilder* setArchiver(Archivers::BaseArchiver*);
AstriDAQBuilder* setProvider(Providers::BaseProvider*);
AstriDAQBuilder* setMonitor(PacketMonitors::BasePacketMonitor*);
AstriDAQBuilder* setPacket(PacketLib::BasePacket*);
AstriDAQBuilder* setDateId(std::string);
AstriDAQBuilder* setTargetId(std::string);
AstriDAQBuilder* setObsMode(std::string);
AstriDAQBuilder* setTelId(int);
AstriDAQBuilder* setRunId(int);
AstriDAQBuilder* setDaqId(int);
/**
* @brief Sets the protocol for the data acquisition system.
*
* @param receiver The BaseReceiver object representing the protocol.
* @return The configured AstriDAQBuilder object.
*/
AstriDAQBuilder* setProtocol(Receivers::BaseReceiver* receiver);
/**
* @brief Sets the archiver for the data acquisition system.
*
* @param archiver The BaseArchiver object representing the archiver.
* @return The configured AstriDAQBuilder object.
*/
AstriDAQBuilder* setArchiver(Archivers::BaseArchiver* archiver);
/**
* @brief Sets the provider for the data acquisition system.
*
* @param provider The BaseProvider object representing the provider.
* @return The configured AstriDAQBuilder object.
*/
AstriDAQBuilder* setProvider(Providers::BaseProvider* provider);
/**
* @brief Sets the packet monitor for the data acquisition system.
*
* @param monitor The BasePacketMonitor object representing the packet monitor.
* @return The configured AstriDAQBuilder object.
*/
AstriDAQBuilder* setMonitor(PacketMonitors::BasePacketMonitor* monitor);
/**
* @brief Sets the packet for the data acquisition system.
*
* @param packet The BasePacket object representing the packet.
* @return The configured AstriDAQBuilder object.
*/
AstriDAQBuilder* setPacket(Packets::BasePacket* packet);
/**
* @brief Sets the date ID for the scheduling block.
*
* @param dateId The date ID to set.
* @return The configured AstriDAQBuilder object.
*/
AstriDAQBuilder* setDateId(std::string dateId);
/**
* @brief Sets the target ID for the scheduling block.
*
* @param targetId The target ID to set.
* @return The configured AstriDAQBuilder object.
*/
AstriDAQBuilder* setTargetId(std::string targetId);
/**
* @brief Sets the observation mode for the scheduling block.
*
* @param obsMode The observation mode to set.
* @return The configured AstriDAQBuilder object.
*/
AstriDAQBuilder* setObsMode(std::string obsMode);
/**
* @brief Sets the telescope ID for the scheduling block.
*
* @param telId The telescope ID to set.
* @return The configured AstriDAQBuilder object.
*/
AstriDAQBuilder* setTelId(int telId);
/**
* @brief Sets the run ID for the scheduling block.
*
* @param runId The run ID to set.
* @return The configured AstriDAQBuilder object.
*/
AstriDAQBuilder* setRunId(int runId);
/**
* @brief Sets the ID for the data acquisition system.
*
* @param daqId The ID to set.
* @return The configured AstriDAQBuilder object.
*/
AstriDAQBuilder* setDaqId(int daqId);
/**
* @brief Gets the built AstriDAQ object.
*
* @return The built AstriDAQ object.
*/
AstriDAQ* getDAQ();
static const std::string config_target;
......@@ -81,7 +246,6 @@ class AstriDAQBuilder {
static const std::string monitor_key;
static const std::string daq_id_key;
static const std::string tcp_prot_key;
static const std::string udp_prot_key;
static const std::string file_rcv_key;
......
/**
* @brief The AstriMySQLDAQObserver class is a derived class of BaseDAQ_Observer.
* It provides functionality to observe and interact with a MySQL database for the ASTRI DAQ system.
* Check Base_DAQ_Observer.h for more information.
*
* In particular, this class is used to update the MySQL database with only the Archiver statistics of the ASTRI DAQ system (acquired files table).
*/
#pragma once
#include <Base_DAQ_Observer.h>
......@@ -13,14 +21,15 @@
namespace inaf::oasbo::DAQ_observers {
class AstriMySQLDAQObserver: public BaseDAQ_Observer {
protected:
sql::mysql::MySQL_Driver *driver = nullptr;
sql::Connection *con = nullptr;
std::string ip { }, username { }, password { }, dbname { };
int port;
inaf::oasbo::DAQ::AstriDAQ *astri_daq;
sql::PreparedStatement *stmt = nullptr;
bool file_is_open;
sql::mysql::MySQL_Driver *driver = nullptr; // Pointer to the MySQL driver
sql::Connection *con = nullptr; // Pointer to the MySQL connection
std::string ip { }, username { }, password { }, dbname { }; // IP address, username, password, and database name of the MySQL server
int port; // Port number of the MySQL server
inaf::oasbo::DAQ::AstriDAQ *astri_daq; // Pointer to the ASTRI DAQ object
sql::PreparedStatement *stmt = nullptr; // Pointer to the MySQL prepared statement
bool file_is_open; // Flag to indicate if the file being archived is still open.
// names of the tables and fields in the database
static const std::string FILES_TABLE;
static const std::string CLOSED_FIELD;
static const std::string CREATED_FIELD;
......@@ -33,15 +42,55 @@ protected:
static const std::string ACTIVE_FIELD;
public:
AstriMySQLDAQObserver(inaf::oasbo::DAQ::BaseDAQ &dataAcquisition,std::string ip, int port, std::string username, std::string password, std::string dbname);
/**
* @brief Constructs an AstriMySQLDAQObserver object.
* @param dataAcquisition The reference to the BaseDAQ object for data acquisition.
* @param ip The IP address of the MySQL database server.
* @param port The port number of the MySQL database server.
* @param username The username for connecting to the MySQL database server.
* @param password The password for connecting to the MySQL database server.
* @param dbname The name of the MySQL database.
*/
AstriMySQLDAQObserver(inaf::oasbo::DAQ::BaseDAQ &dataAcquisition, std::string ip, int port, std::string username, std::string password, std::string dbname);
/**
* @brief Destroys the AstriMySQLDAQObserver object.
*/
~AstriMySQLDAQObserver();
/**
* @brief Updates the packet statistics in the MySQL database.
*/
void updatePacketStats() override;
/**
* @brief Updates the archiver statistics in the MySQL database.
*/
void updateArchiverStats() override;
/**
* @brief Updates the provider statistics in the MySQL database.
*/
void updateProviderStats() override;
/**
* @brief Updates the receiver statistics in the MySQL database.
*/
void updateReceiverStats() override;
/**
* @brief Updates all statistics in the MySQL database.
*/
void updateAll() override;
/**
* @brief Starts observing the MySQL database.
*/
void start() override;
/**
* @brief Stops observing the MySQL database.
*/
void stop() override;
};
}
/**
* @brief The AstriRedisDAQObserver class is a derived class of BaseDAQ_Observer that provides functionality for observing the ASTRI DAQ system using Redis.
*
* This class establishes a connection with a Redis server and periodically updates the statistics of the ASTRI DAQ system.
* It inherits the update methods from the BaseDAQ_Observer class and implements them to update the specific statistics related to packet, archiver, provider, and receiver.
* The class also provides methods to start and stop the observation process.
*/
#pragma once
#include <Astri_DAQ.h>
......@@ -13,42 +21,53 @@
namespace inaf::oasbo::DAQ_observers {
class AstriRedisDAQObserver: public BaseDAQ_Observer {
protected:
inaf::oasbo::DAQ::AstriDAQ *astri_daq;
redisContext *context = nullptr;
std::string ip{};
int port;
bool checkConnThreadFlag = false;
std::thread checkConn;
bool isConnected();
int connectLoop();
std::set<std::string> keys;
bool stopFlag = false;
std::mutex writeMutex;
std::thread observerThread;
std::condition_variable sleep_cv;
int updateInterval;
void updateThread();
void updateDAQStats();
static std::string lastopenfile_key;
static std::string lastclosedfile_key;
static std::string mode_key;
static std::string receiverSource_key;
static std::string receiverConn_key;
static std::string provider_destination_key;
inaf::oasbo::DAQ::AstriDAQ *astri_daq; // Pointer to the ASTRI DAQ object
redisContext *context = nullptr; // Pointer to the Redis context
std::string ip{}; // IP address of the Redis server
int port; // Port number of the Redis server
bool checkConnThreadFlag = false; // Flag to indicate if the connection check thread is running
std::thread checkConn; // Thread for checking the connection status
bool isConnected(); // Method to check if the connection to the Redis server is established
int connectLoop(); // Method to continuously attempt to establish a connection to the Redis server
std::set<std::string> keys; // Set of keys used for updating the statistics
bool stopFlag = false; // Flag to indicate if the observation process should stop
std::mutex writeMutex; // Mutex for thread-safe writing
std::thread observerThread; // Thread for updating the statistics
std::condition_variable sleep_cv; // Condition variable for wake-up in observerThread.
int updateInterval; // Interval seconds between updates
void updateThread(); // Method for updating the statistics in observerThread.
void updateDAQStats(); // Method for updating the ASTRI DAQ statistics
static std::string lastopenfile_key; // Key for the last open file statistic
static std::string lastclosedfile_key; // Key for the last closed file statistic
static std::string mode_key; // Key for the mode statistic
static std::string receiverSource_key; // Key for the receiver source statistic
static std::string receiverConn_key; // Key for the receiver connection statistic
static std::string provider_destination_key; // Key for the provider destination statistic
public:
/**
* @brief Constructs an AstriRedisDAQObserver object.
*
* @param dataAcquisition The reference to the BaseDAQ object representing the ASTRI DAQ system.
* @param ip The IP address of the Redis server.
* @param port The port number of the Redis server.
*/
AstriRedisDAQObserver(inaf::oasbo::DAQ::BaseDAQ &dataAcquisition,std::string ip, int port);
/**
* @brief Destroys the AstriRedisDAQObserver object.
*/
~AstriRedisDAQObserver();
void updatePacketStats() override;
void updateArchiverStats() override;
void updateProviderStats() override;
void updateReceiverStats() override;
void updateAll() override;
void start() override;
void stop() override;
void updatePacketStats() override; // Overrides the updatePacketStats method from the BaseDAQ_Observer class
void updateArchiverStats() override; // Overrides the updateArchiverStats method from the BaseDAQ_Observer class
void updateProviderStats() override; // Overrides the updateProviderStats method from the BaseDAQ_Observer class
void updateReceiverStats() override; // Overrides the updateReceiverStats method from the BaseDAQ_Observer class
void updateAll() override; // Overrides the updateAll method from the BaseDAQ_Observer class
void start() override; // Overrides the start method from the BaseDAQ_Observer class
void stop() override; // Overrides the stop method from the BaseDAQ_Observer class
};
}
......@@ -94,6 +94,15 @@ void DAQ::AstriDAQ::updateDaqIDinConf() {
}
}
/**
* @brief Switches the state of the AstriDAQ object.
*
* This function is responsible for changing the state of the AstriDAQ object based on the provided newState parameter.
* If the newState is STOP, it performs the necessary cleanup operations and stops the DAQ.
* If the newState is READY, IDLE, or RUN, it performs the required actions to transition to the new state.
*
* @param newState The new state to switch to.
*/
void DAQ::AstriDAQ::switchState(Status newState) {
if (newState == STOP) {
printLog(std::cout, "Stopping");
......@@ -102,8 +111,8 @@ void DAQ::AstriDAQ::switchState(Status newState) {
// stop receiving
this->receiver->closeConnectionToClient();
if (this->receiveAndProcessThread.joinable()) {
this->receiveAndProcessThread.join();
if (this->receiveAndDeliverThread.joinable()) {
this->receiveAndDeliverThread.join();
}
if (archiver->is_open()) {
this->archiver->close();
......@@ -128,7 +137,7 @@ void DAQ::AstriDAQ::switchState(Status newState) {
switch (this->currentState) {
case Status::READY: {
switch (newState) {
case Status::IDLE:
case Status::IDLE: // from READY to IDLE
dest = computeNewDestination(newState);
archiver->setDest(dest);
archiver->open();
......@@ -143,7 +152,7 @@ void DAQ::AstriDAQ::switchState(Status newState) {
case Status::IDLE: {
firstIdle = false;
switch (newState) {
case Status::IDLE:
case Status::IDLE: // from Idle to Idle
archiver->close();
updateArchiverObservers();
this->incrementDaqID();
......@@ -153,7 +162,7 @@ void DAQ::AstriDAQ::switchState(Status newState) {
archiver->open();
updateArchiverObservers();
break;
case Status::RUN:
case Status::RUN: // from idle to run
archiver->close();
updateArchiverObservers();
this->setCurrentState(newState);
......@@ -162,7 +171,7 @@ void DAQ::AstriDAQ::switchState(Status newState) {
archiver->open();
updateArchiverObservers();
break;
case Status::READY:
case Status::READY: // from idle to ready, it means that the client has disconnected.
this->setCurrentState(Status::READY);
archiver->close();
updateArchiverObservers();
......@@ -185,7 +194,7 @@ void DAQ::AstriDAQ::switchState(Status newState) {
}
case Status::RUN: {
switch (newState) {
case Status::IDLE:
case Status::IDLE: // from run to idle.
archiver->close();
updateArchiverObservers();
this->incrementDaqID();
......@@ -195,7 +204,7 @@ void DAQ::AstriDAQ::switchState(Status newState) {
archiver->open();
updateArchiverObservers();
break;
case Status::RUN:
case Status::RUN: // from run ro run.
archiver->close();
updateArchiverObservers();
this->incrementDaqID();
......@@ -204,14 +213,14 @@ void DAQ::AstriDAQ::switchState(Status newState) {
archiver->open();
updateArchiverObservers();
break;
case Status::READY:
case Status::READY: // from run to ready, it means that the client has disconnected.
this->setCurrentState(Status::READY);
archiver->close();
updateArchiverObservers();
receiver->connectToClient();
if (!receiver->isConnectedToClient()) {
printLog(std::cerr, "Unable to connect to receiver");
if (this->currentState != Status::STOP) { // not terminate by command
if (this->currentState != Status::STOP) { // not terminate by stop function.
switchState(Status::STOP);
}
break;
......@@ -227,7 +236,7 @@ void DAQ::AstriDAQ::switchState(Status newState) {
}
case Status::INIT:
switch (newState) {
case Status::READY:
case Status::READY: // from init to ready
std::for_each(this->observers.begin(), this->observers.end(),
[](inaf::oasbo::DAQ_observers::BaseDAQ_Observer *ob) {
ob->start();
......@@ -247,7 +256,7 @@ void DAQ::AstriDAQ::switchState(Status newState) {
} else {
printLog(std::cout, "Receiver connected");
this->switchState(IDLE);
this->receiveAndProcessThread = std::thread(
this->receiveAndDeliverThread = std::thread(
[this]() {
this->runningFlag = true;
while (this->runningFlag) {
......@@ -255,7 +264,7 @@ void DAQ::AstriDAQ::switchState(Status newState) {
std::this_thread::sleep_for(
std::chrono::seconds(1));
} else {
receiveAndProcessPacket();
receiveAndDeliverPacket();
}
}
});
......@@ -274,7 +283,7 @@ void DAQ::AstriDAQ::switchState(Status newState) {
}
}
int DAQ::AstriDAQ::receiveAndProcessPacket() {
int DAQ::AstriDAQ::receiveAndDeliverPacket() {
int rec = receiver->receiveFromClient(*this->packet);
if (rec == 0) { // connection closed
if (this->currentState != Status::STOP) { // closed by client
......@@ -300,14 +309,14 @@ int DAQ::AstriDAQ::receiveAndProcessPacket() {
}
this->deliverPacket();
Packets::AstriMaGeneric *packet =
static_cast<Packets::AstriMaGeneric*>(this->packet); // downcast
static_cast<Packets::AstriMaGeneric*>(this->packet); // downcast to use get type
if (packet->getType() == 15) { //NOTIF PACKET
printLog(std::cout, "Monitor:");
monitor->printStats();
std::cout << "-------------------------------" << std::endl;
unsigned short cmd = ((this->packet->getPointerToMemory()[8]) << 8)
+ (this->packet->getPointerToMemory()[9]);
Status newState = cmd == 43690 ? Status::RUN : Status::IDLE;
Status newState = cmd == 43690 ? Status::RUN : Status::IDLE; // values accoring to the DAQ-Camera BEE interface control document
std::stringstream ss;
ss << "Switching from "
......
......@@ -168,7 +168,7 @@ AstriDAQBuilder* AstriDAQBuilder::setMonitor(PacketMonitors::BasePacketMonitor *
this->daq->monitor = value;
return this;
}
AstriDAQBuilder* AstriDAQBuilder::setPacket(PacketLib::BasePacket *value) {
AstriDAQBuilder* AstriDAQBuilder::setPacket(Packets::BasePacket *value) {
this->daq->packet = value;
return this;
}
......
......@@ -70,7 +70,7 @@ void AstriMySQLDAQObserver::updateArchiverStats() {
name = dest;
}
if (this->dataAcquisition->getArchiverPtr()->is_open() && !file_is_open) {
if (this->dataAcquisition->getArchiverPtr()->is_open() && !file_is_open) { // new file just opened, insert into the database with current open timestamp.
std::stringstream query;
try {
std::string sched_block_id = "NULL";
......@@ -103,7 +103,7 @@ void AstriMySQLDAQObserver::updateArchiverStats() {
<< "]\t[MYSQL Observer]\t" << e.what() << " during query: "
<< query.str() << std::endl;
}
} else { // set closed timestamp.
} else { // the file has been closed, update the database with the current close timestamp.
std::stringstream query;
try {
query << "UPDATE " << FILES_TABLE;
......
......@@ -36,6 +36,18 @@ void my_handler(int s) {
daq->stop();
}
/**
* @brief The main function of the program.
*
* This function is the entry point of the program. It initializes the necessary configurators,
* creates an instance of AstriDAQ, registers observers, starts the data acquisition process,
* and waits until the acquisition is stopped. Finally, it terminates the program and cleans up
* the resources.
*
* @param argc The number of command-line arguments.
* @param argv An array of command-line arguments.
* @return The exit status of the program.
*/
int main(int argc, char **argv) {
Configurators::YamlConfigurator daqyamlconf(
std::string(ASTRIDAQ_CONFIG_PATH).append("/config.yaml"));
......