diff --git a/deps/Base-DAQ b/deps/Base-DAQ index 8a0ea2d0e699863df5fe1c91caf2d7b0855957be..a00f9a27afbf5f75dab7db2368b9b9b6fcb395e1 160000 --- a/deps/Base-DAQ +++ b/deps/Base-DAQ @@ -1 +1 @@ -Subproject commit 8a0ea2d0e699863df5fe1c91caf2d7b0855957be +Subproject commit a00f9a27afbf5f75dab7db2368b9b9b6fcb395e1 diff --git a/include/Delivery_Report.h b/include/Delivery_Report.h index 7c9b77849908bb3aeabb0f39022e9ca20e836824..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 --- a/include/Delivery_Report.h +++ b/include/Delivery_Report.h @@ -1,14 +0,0 @@ - -#pragma once -#include - -class DeliveryReportCb: public RdKafka::DeliveryReportCb { -public: - void dr_cb(RdKafka::Message &message) { - /* If message.err() is non-zero the message delivery failed permanently - * for the message. */ - if (message.err()) - std::cerr << "Kafka Provider error: Message delivery failed: " << message.errstr() - << std::endl; - } -}; diff --git a/include/Kafka_Provider.h b/include/Kafka_Provider.h index b303e36c03d4ed6ba93b84254df43bbe3d377118..f828474fb2faa24df77920d0b612f1d1eb123e4e 100755 --- a/include/Kafka_Provider.h +++ b/include/Kafka_Provider.h @@ -5,71 +5,127 @@ #include #include #include -#include #include +/** + * @brief The namespace inaf::oasbo::Providers contains classes related to providers. + */ namespace inaf::oasbo::Providers { + +/** + * @brief The KafkaProvider class is a derived class of BaseProvider and provides functionality for sendig packets to Kafka. + * BaseProvider.h for more information. + */ class KafkaProvider: public BaseProvider { protected: - RdKafka::Producer *producer = nullptr; - std::atomic stopFlag = false; - std::thread pollThread; - DeliveryReportCb *dr_cb = nullptr; + RdKafka::Producer *producer = nullptr; /**< Pointer to the Kafka producer object. */ + std::atomic stopFlag = false; /**< Atomic flag to control the polling thread. */ + std::thread pollThread; /**< Thread for polling Kafka events. */ + class DeliveryReportCb; + DeliveryReportCb *dr_cb = nullptr; /**< Pointer to the delivery report callback object. */ - void pollingThreadFunction(); + void pollingThreadFunction(); /**< Function executed by the polling thread. */ - KafkaProvider(); - KafkaProvider(std::string ip, int port, std::string topic); + KafkaProvider(); /**< Default constructor. */ + KafkaProvider(std::string ip, int port, std::string topic); /**< Constructor with parameters. */ public: - std::string brokerIp; - int brokerPort; + std::string brokerIp; /**< IP address of the Kafka broker. */ + int brokerPort; /**< Port number of the Kafka broker. */ + + void setDest(std::string dest) override; + + std::string getDest() override; - void setDest(std::string dest) override { - this->dest = dest; - } - std::string getDest() override { - return dest; - } + int write(Packets::BasePacket&) override; - int write(PacketLib::BasePacket&) override; - int write(PacketLib::BasePacket&, std::string dest) override; + int write(Packets::BasePacket&, std::string dest) override; int close() override; + int open() override; + bool isOpen() override; + + /** + * @brief Destructor. + */ ~KafkaProvider(); friend class KafkaProviderBuilder; }; +/** + * @brief The KafkaProviderBuilder class is responsible for building KafkaProvider objects. + */ class KafkaProviderBuilder { protected: - KafkaProvider *provider; + KafkaProvider *provider; /**< Pointer to the KafkaProvider object being built. */ public: - std::string config_target { "kafkaprovider" }; - std::string ip_key { "ip" }; - std::string port_key { "port" }; - std::string topic_key { "topic" }; - + std::string config_target { "kafkaprovider" }; /**< Configuration target for the Kafka provider. */ + std::string ip_key { "ip" }; /**< Configuration key for the IP address. */ + std::string port_key { "port" }; /**< Configuration key for the port number. */ + std::string topic_key { "topic" }; /**< Configuration key for the topic. */ + + /** + * @brief Default constructor. + */ KafkaProviderBuilder(); + + /** + * @brief Constructor with parameters. + * @param ip The IP address of the Kafka broker. + * @param port The port number of the Kafka broker. + * @param topic The topic to subscribe to. + */ KafkaProviderBuilder(std::string ip, int port, std::string topic); + + /** + * @brief Destructor. + */ ~KafkaProviderBuilder(); + /** + * @brief Resets the builder to its initial state. + */ void reset(); + /** + * @brief Configures the builder from a BaseConfigurator object. + * @param conf The BaseConfigurator object to configure from. + * @return A pointer to the KafkaProviderBuilder object. + */ KafkaProviderBuilder* configFrom(Configurators::BaseConfigurator &conf); + /** + * @brief Sets the IP address for the Kafka provider. + * @param ip The IP address to set. + * @return A pointer to the KafkaProviderBuilder object. + */ KafkaProviderBuilder* setIp(std::string ip); + /** + * @brief Sets the port number for the Kafka provider. + * @param port The port number to set. + * @return A pointer to the KafkaProviderBuilder object. + */ KafkaProviderBuilder* setPort(int port); + /** + * @brief Sets the topic for the Kafka provider. + * @param topic The topic to set. + * @return A pointer to the KafkaProviderBuilder object. + */ KafkaProviderBuilder* setTopic(std::string topic); + /** + * @brief Gets the built KafkaProvider object. + * @return A pointer to the KafkaProvider object. + */ KafkaProvider* getProvider(); }; } diff --git a/src/Kafka_Provider.cpp b/src/Kafka_Provider.cpp index 30af067739f33185ce3e4531dd1259ad97906696..628198445886bd269a333772052826a9a140985b 100755 --- a/src/Kafka_Provider.cpp +++ b/src/Kafka_Provider.cpp @@ -2,12 +2,26 @@ #include #include #include - #include #include using namespace inaf::oasbo::Providers; +class KafkaProvider::DeliveryReportCb: public RdKafka::DeliveryReportCb { +public: + void dr_cb(RdKafka::Message &message) { + /* If message.err() is non-zero the message delivery failed permanently + * for the message. */ + if (message.err()) { + time_t now = time(nullptr); + std::cerr << "[" + << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") + << "]\t[Kafka Provider]\t" << "Message delivery failed" + << std::endl; + } + } +}; + KafkaProvider::KafkaProvider() : KafkaProvider("127.0.0.1", 9092, "Astri_ADAS_topic") { } @@ -18,20 +32,23 @@ KafkaProvider::KafkaProvider(std::string ip, int port, std::string topic_name) : } void KafkaProvider::pollingThreadFunction() { - std::cout << "Kafka Provider: polling thread start..." << std::endl; + time_t now = time(nullptr); + std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") + << "]\t[Kafka Provider]\t" << "Polling thread start" << std::endl; while (!this->stopFlag) { if (isOpen()) this->producer->poll(0); std::this_thread::sleep_for(std::chrono::seconds(1)); // pool every second } - std::cout << "Kafka Provider: polling thread exit..." << std::endl; + std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") + << "]\t[Kafka Provider]\t" << "Polling thread stop" << std::endl; } -int KafkaProvider::write(PacketLib::BasePacket &packet) { +int KafkaProvider::write(Packets::BasePacket &packet) { return KafkaProvider::write(packet, this->dest); } -int KafkaProvider::write(PacketLib::BasePacket &packet, std::string dest) { +int KafkaProvider::write(Packets::BasePacket &packet, std::string dest) { if (!isOpen()) { time_t now = time(nullptr); std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") @@ -151,6 +168,15 @@ bool KafkaProvider::isOpen() { return this->producer != nullptr; } +void KafkaProvider::setDest(std::string dest){ + this->dest = dest; +} + +std::string KafkaProvider::getDest(){ + return this->dest; +} + + KafkaProvider::~KafkaProvider() { close(); }