From ec892ad3727f830dcf9b4f9cb9b28aadf08ebb54 Mon Sep 17 00:00:00 2001 From: Valerio Pastore Date: Fri, 12 Jan 2024 18:23:28 +0100 Subject: [PATCH 1/3] adding doxygen --- include/Delivery_Report.h | 4 +- include/Kafka_Provider.h | 122 ++++++++++++++++++++++++++++++++++++++ src/Kafka_Provider.cpp | 9 +-- 3 files changed, 129 insertions(+), 6 deletions(-) diff --git a/include/Delivery_Report.h b/include/Delivery_Report.h index 7c9b778..de943c1 100644 --- a/include/Delivery_Report.h +++ b/include/Delivery_Report.h @@ -8,7 +8,7 @@ public: /* If message.err() is non-zero the message delivery failed permanently * for the message. */ if (message.err()) - std::cerr << "Kafka Provider error: Message delivery failed: " << message.errstr() - << std::endl; + std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") + << "]\t[Kafka Provider]\t" << "Message delivery failed" << std::endl; } }; diff --git a/include/Kafka_Provider.h b/include/Kafka_Provider.h index b303e36..0e94748 100755 --- a/include/Kafka_Provider.h +++ b/include/Kafka_Provider.h @@ -8,6 +8,128 @@ #include #include +/** + * @brief The namespace inaf::oasbo::Providers contains classes related to providers. + */ +namespace inaf::oasbo::Providers { + +/** + * @brief The KafkaProvider class is a derived class of BaseProvider and provides functionality for sendig packets to Kafka. + * BaseProvider.h for more information. + */ +class KafkaProvider: public BaseProvider { +protected: + + RdKafka::Producer *producer = nullptr; /**< Pointer to the Kafka producer object. */ + std::atomic stopFlag = false; /**< Atomic flag to control the polling thread. */ + std::thread pollThread; /**< Thread for polling Kafka events. */ + DeliveryReportCb *dr_cb = nullptr; /**< Pointer to the delivery report callback object. */ + + void pollingThreadFunction(); /**< Function executed by the polling thread. */ + + KafkaProvider(); /**< Default constructor. */ + KafkaProvider(std::string ip, int port, std::string topic); /**< Constructor with parameters. */ + +public: + + std::string brokerIp; /**< IP address of the Kafka broker. */ + int brokerPort; /**< Port number of the Kafka broker. */ + + void setDest(std::string dest) override; + + std::string getDest() override; + + int write(PacketLib::BasePacket&) override; + + int write(PacketLib::BasePacket&, std::string dest) override; + + int close() override; + + int open() override; + + bool isOpen() override; + + /** + * @brief Destructor. + */ + ~KafkaProvider(); + + friend class KafkaProviderBuilder; + +}; + +/** + * @brief The KafkaProviderBuilder class is responsible for building KafkaProvider objects. + */ +class KafkaProviderBuilder { +protected: + KafkaProvider *provider; /**< Pointer to the KafkaProvider object being built. */ + +public: + std::string config_target { "kafkaprovider" }; /**< Configuration target for the Kafka provider. */ + std::string ip_key { "ip" }; /**< Configuration key for the IP address. */ + std::string port_key { "port" }; /**< Configuration key for the port number. */ + std::string topic_key { "topic" }; /**< Configuration key for the topic. */ + + /** + * @brief Default constructor. + */ + KafkaProviderBuilder(); + + /** + * @brief Constructor with parameters. + * @param ip The IP address of the Kafka broker. + * @param port The port number of the Kafka broker. + * @param topic The topic to subscribe to. + */ + KafkaProviderBuilder(std::string ip, int port, std::string topic); + + /** + * @brief Destructor. + */ + ~KafkaProviderBuilder(); + + /** + * @brief Resets the builder to its initial state. + */ + void reset(); + + /** + * @brief Configures the builder from a BaseConfigurator object. + * @param conf The BaseConfigurator object to configure from. + * @return A pointer to the KafkaProviderBuilder object. + */ + KafkaProviderBuilder* configFrom(Configurators::BaseConfigurator &conf); + + /** + * @brief Sets the IP address for the Kafka provider. + * @param ip The IP address to set. + * @return A pointer to the KafkaProviderBuilder object. + */ + KafkaProviderBuilder* setIp(std::string ip); + + /** + * @brief Sets the port number for the Kafka provider. + * @param port The port number to set. + * @return A pointer to the KafkaProviderBuilder object. + */ + KafkaProviderBuilder* setPort(int port); + + /** + * @brief Sets the topic for the Kafka provider. + * @param topic The topic to set. + * @return A pointer to the KafkaProviderBuilder object. + */ + KafkaProviderBuilder* setTopic(std::string topic); + + /** + * @brief Gets the built KafkaProvider object. + * @return A pointer to the KafkaProvider object. + */ + KafkaProvider* getProvider(); +}; + +} // namespace inaf::oasbo::Providers namespace inaf::oasbo::Providers { class KafkaProvider: public BaseProvider { protected: diff --git a/src/Kafka_Provider.cpp b/src/Kafka_Provider.cpp index 30af067..7ef2494 100755 --- a/src/Kafka_Provider.cpp +++ b/src/Kafka_Provider.cpp @@ -18,14 +18,15 @@ KafkaProvider::KafkaProvider(std::string ip, int port, std::string topic_name) : } void KafkaProvider::pollingThreadFunction() { - std::cout << "Kafka Provider: polling thread start..." << std::endl; - while (!this->stopFlag) { + std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") + << "]\t[Kafka Provider]\t" << "Polling thread start" << std::endl; + while (!this->stopFlag) { if (isOpen()) this->producer->poll(0); std::this_thread::sleep_for(std::chrono::seconds(1)); // pool every second } - std::cout << "Kafka Provider: polling thread exit..." << std::endl; -} +std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") + << "]\t[Kafka Provider]\t" << "Polling thread stop" << std::endl; } int KafkaProvider::write(PacketLib::BasePacket &packet) { return KafkaProvider::write(packet, this->dest); -- GitLab From aa81787630e7a1545e4cde8b994b38fab360ac1b Mon Sep 17 00:00:00 2001 From: valerio pastore Date: Fri, 12 Jan 2024 18:43:57 +0100 Subject: [PATCH 2/3] . --- deps/Base-DAQ | 2 +- include/Delivery_Report.h | 14 -------- include/Kafka_Provider.h | 72 ++------------------------------------- src/Kafka_Provider.cpp | 30 ++++++++++++---- 4 files changed, 27 insertions(+), 91 deletions(-) diff --git a/deps/Base-DAQ b/deps/Base-DAQ index 8a0ea2d..a00f9a2 160000 --- a/deps/Base-DAQ +++ b/deps/Base-DAQ @@ -1 +1 @@ -Subproject commit 8a0ea2d0e699863df5fe1c91caf2d7b0855957be +Subproject commit a00f9a27afbf5f75dab7db2368b9b9b6fcb395e1 diff --git a/include/Delivery_Report.h b/include/Delivery_Report.h index de943c1..e69de29 100644 --- a/include/Delivery_Report.h +++ b/include/Delivery_Report.h @@ -1,14 +0,0 @@ - -#pragma once -#include - -class DeliveryReportCb: public RdKafka::DeliveryReportCb { -public: - void dr_cb(RdKafka::Message &message) { - /* If message.err() is non-zero the message delivery failed permanently - * for the message. */ - if (message.err()) - std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") - << "]\t[Kafka Provider]\t" << "Message delivery failed" << std::endl; - } -}; diff --git a/include/Kafka_Provider.h b/include/Kafka_Provider.h index 0e94748..f828474 100755 --- a/include/Kafka_Provider.h +++ b/include/Kafka_Provider.h @@ -5,7 +5,6 @@ #include #include #include -#include #include /** @@ -23,6 +22,7 @@ protected: RdKafka::Producer *producer = nullptr; /**< Pointer to the Kafka producer object. */ std::atomic stopFlag = false; /**< Atomic flag to control the polling thread. */ std::thread pollThread; /**< Thread for polling Kafka events. */ + class DeliveryReportCb; DeliveryReportCb *dr_cb = nullptr; /**< Pointer to the delivery report callback object. */ void pollingThreadFunction(); /**< Function executed by the polling thread. */ @@ -39,9 +39,9 @@ public: std::string getDest() override; - int write(PacketLib::BasePacket&) override; + int write(Packets::BasePacket&) override; - int write(PacketLib::BasePacket&, std::string dest) override; + int write(Packets::BasePacket&, std::string dest) override; int close() override; @@ -128,70 +128,4 @@ public: */ KafkaProvider* getProvider(); }; - -} // namespace inaf::oasbo::Providers -namespace inaf::oasbo::Providers { -class KafkaProvider: public BaseProvider { -protected: - - RdKafka::Producer *producer = nullptr; - std::atomic stopFlag = false; - std::thread pollThread; - DeliveryReportCb *dr_cb = nullptr; - - void pollingThreadFunction(); - - KafkaProvider(); - KafkaProvider(std::string ip, int port, std::string topic); - -public: - - std::string brokerIp; - int brokerPort; - - void setDest(std::string dest) override { - this->dest = dest; - } - std::string getDest() override { - return dest; - } - - int write(PacketLib::BasePacket&) override; - int write(PacketLib::BasePacket&, std::string dest) override; - - int close() override; - int open() override; - bool isOpen() override; - ~KafkaProvider(); - - friend class KafkaProviderBuilder; - -}; - -class KafkaProviderBuilder { -protected: - KafkaProvider *provider; - -public: - std::string config_target { "kafkaprovider" }; - std::string ip_key { "ip" }; - std::string port_key { "port" }; - std::string topic_key { "topic" }; - - KafkaProviderBuilder(); - KafkaProviderBuilder(std::string ip, int port, std::string topic); - ~KafkaProviderBuilder(); - - void reset(); - - KafkaProviderBuilder* configFrom(Configurators::BaseConfigurator &conf); - - KafkaProviderBuilder* setIp(std::string ip); - - KafkaProviderBuilder* setPort(int port); - - KafkaProviderBuilder* setTopic(std::string topic); - - KafkaProvider* getProvider(); -}; } diff --git a/src/Kafka_Provider.cpp b/src/Kafka_Provider.cpp index 7ef2494..90eff0c 100755 --- a/src/Kafka_Provider.cpp +++ b/src/Kafka_Provider.cpp @@ -2,12 +2,26 @@ #include #include #include - #include #include using namespace inaf::oasbo::Providers; +class KafkaProvider::DeliveryReportCb: public RdKafka::DeliveryReportCb { +public: + void dr_cb(RdKafka::Message &message) { + /* If message.err() is non-zero the message delivery failed permanently + * for the message. */ + if (message.err()) { + time_t now = time(nullptr); + std::cerr << "[" + << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") + << "]\t[Kafka Provider]\t" << "Message delivery failed" + << std::endl; + } + } +}; + KafkaProvider::KafkaProvider() : KafkaProvider("127.0.0.1", 9092, "Astri_ADAS_topic") { } @@ -18,21 +32,23 @@ KafkaProvider::KafkaProvider(std::string ip, int port, std::string topic_name) : } void KafkaProvider::pollingThreadFunction() { + time_t now = time(nullptr); std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") - << "]\t[Kafka Provider]\t" << "Polling thread start" << std::endl; - while (!this->stopFlag) { + << "]\t[Kafka Provider]\t" << "Polling thread start" << std::endl; + while (!this->stopFlag) { if (isOpen()) this->producer->poll(0); std::this_thread::sleep_for(std::chrono::seconds(1)); // pool every second } -std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") - << "]\t[Kafka Provider]\t" << "Polling thread stop" << std::endl; } + std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") + << "]\t[Kafka Provider]\t" << "Polling thread stop" << std::endl; +} -int KafkaProvider::write(PacketLib::BasePacket &packet) { +int KafkaProvider::write(Packets::BasePacket &packet) { return KafkaProvider::write(packet, this->dest); } -int KafkaProvider::write(PacketLib::BasePacket &packet, std::string dest) { +int KafkaProvider::write(Packets::BasePacket &packet, std::string dest) { if (!isOpen()) { time_t now = time(nullptr); std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") -- GitLab From 5b7dc0ff506da6ff384a9d03f961675d4c5d68e1 Mon Sep 17 00:00:00 2001 From: valerio pastore Date: Fri, 12 Jan 2024 20:16:49 +0100 Subject: [PATCH 3/3] . --- src/Kafka_Provider.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/Kafka_Provider.cpp b/src/Kafka_Provider.cpp index 90eff0c..6281984 100755 --- a/src/Kafka_Provider.cpp +++ b/src/Kafka_Provider.cpp @@ -168,6 +168,15 @@ bool KafkaProvider::isOpen() { return this->producer != nullptr; } +void KafkaProvider::setDest(std::string dest){ + this->dest = dest; +} + +std::string KafkaProvider::getDest(){ + return this->dest; +} + + KafkaProvider::~KafkaProvider() { close(); } -- GitLab