From 62c19b84b7660270e5df8f67a951c06167ce0d48 Mon Sep 17 00:00:00 2001 From: astri Date: Thu, 18 May 2023 20:25:31 +0200 Subject: [PATCH 1/3] configuration refactoring --- CMakeLists.txt | 1 - include/Kafka_Provider.h | 30 +++++++++--------- src/Kafka_Provider.cpp | 66 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 79 insertions(+), 18 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 159b76f..f842ecf 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,7 +13,6 @@ option(KAFKAPROVIDER_BUILD_SHARED "Build Kafkaprovider as a shared library." OFF # sources set(SOURCES src/Kafka_Provider.cpp - src/Kafka_Provider_Configurator.cpp ) # target diff --git a/include/Kafka_Provider.h b/include/Kafka_Provider.h index f318b86..308098e 100755 --- a/include/Kafka_Provider.h +++ b/include/Kafka_Provider.h @@ -17,35 +17,35 @@ namespace inaf::oasbo::Providers { class KafkaProvider : public BaseProvider{ protected: - std::string brokerIp; - int brokerPort; + RdKafka::Producer *producer = nullptr; std::atomic stopFlag = false; std::thread pollThread; DeliveryReportCb *dr_cb = nullptr; - std::string getBrokerIp(){return this->brokerIp;} - int getBrokerPort(){return this->brokerPort;} - void setBrokerIp(std::string ip){this->brokerIp=ip;} - void setBrokerPort(int port){this->brokerPort=port;} void pollingThreadFunction(); - -public: KafkaProvider(); KafkaProvider(std::string ip, int port, std::string topic); - void setDest(std::string dest){this->dest = dest;} - std::string getDest(){return dest;} +public: + + std::string brokerIp; + int brokerPort; + + + void setDest(std::string dest) override {this->dest = dest;} + std::string getDest() override {return dest;} - int write(PacketLib::BasePacket &); - int write(PacketLib::BasePacket &, std::string dest); + int write(PacketLib::BasePacket &) override; + int write(PacketLib::BasePacket &, std::string dest) override; - int close(); - int open(); - bool isOpen(); + int close() override; + int open() override; + bool isOpen() override; ~KafkaProvider(); + class Builder; }; } diff --git a/src/Kafka_Provider.cpp b/src/Kafka_Provider.cpp index 4b7dcff..b2a8d24 100755 --- a/src/Kafka_Provider.cpp +++ b/src/Kafka_Provider.cpp @@ -2,6 +2,7 @@ #include #include #include +#include using namespace inaf::oasbo::Providers; @@ -10,9 +11,9 @@ KafkaProvider::KafkaProvider() { } KafkaProvider::KafkaProvider(std::string ip, int port, std::string topic_name) { + this->brokerIp = ip; + this->brokerPort = port; setDest(topic_name); - setBrokerIp(ip); - setBrokerPort(port); } void KafkaProvider::pollingThreadFunction() { @@ -115,3 +116,64 @@ bool KafkaProvider::isOpen() { KafkaProvider::~KafkaProvider() { close(); } + + + +class KafkaProvider::Builder { +protected: + KafkaProvider *provider; + +public: + + std::string ip_key{"ip"}; + std::string port_key{"port"}; + std::string topic_key{"topic"}; + + Builder() { + this->reset(); + } + Builder(std::string ip, int port, std::string topic) { + this->provider = new KafkaProvider(ip, port, topic); + + } + ~Builder() { + delete provider; + } + + void reset() { + this->provider = new KafkaProvider(); + } + + Builder* configFrom(Configurators::BaseConfigurator *conf) { + std::map params = + conf->readConfig(); + if (params.count(ip_key) > 0) + provider->brokerIp = params[ip_key]; + if (params.count(port_key) > 0) + provider->brokerPort = std::stoi(params[port_key]); + if (params.count(topic_key) > 0) + provider->setDest(params[topic_key]); + return this; + } + + Builder* setIp(std::string ip) { + provider->brokerIp = ip; + return this; + } + + Builder* setPort(int port) { + provider->brokerPort = port; + return this; + } + + Builder* setTopic(std::string topic) { + provider->setDest(topic); + return this; + } + + KafkaProvider* getProvider() { + KafkaProvider *result = this->provider; + this->reset(); + return result; + } +}; -- GitLab From 9f6965c61748b1020d17803a73aa485519266ea5 Mon Sep 17 00:00:00 2001 From: astri Date: Thu, 25 May 2023 15:56:14 +0200 Subject: [PATCH 2/3] builder refactor --- include/Kafka_Provider.h | 46 +++++++++++++---- src/Kafka_Provider.cpp | 106 +++++++++++++++++---------------------- 2 files changed, 84 insertions(+), 68 deletions(-) diff --git a/include/Kafka_Provider.h b/include/Kafka_Provider.h index 308098e..ba68044 100755 --- a/include/Kafka_Provider.h +++ b/include/Kafka_Provider.h @@ -12,10 +12,10 @@ #include #include #include - +#include namespace inaf::oasbo::Providers { -class KafkaProvider : public BaseProvider{ +class KafkaProvider: public BaseProvider { protected: RdKafka::Producer *producer = nullptr; @@ -23,7 +23,6 @@ protected: std::thread pollThread; DeliveryReportCb *dr_cb = nullptr; - void pollingThreadFunction(); KafkaProvider(); @@ -34,18 +33,47 @@ public: std::string brokerIp; int brokerPort; + void setDest(std::string dest) override { + this->dest = dest; + } + std::string getDest() override { + return dest; + } - void setDest(std::string dest) override {this->dest = dest;} - std::string getDest() override {return dest;} - - int write(PacketLib::BasePacket &) override; - int write(PacketLib::BasePacket &, std::string dest) override; + int write(PacketLib::BasePacket&) override; + int write(PacketLib::BasePacket&, std::string dest) override; int close() override; int open() override; bool isOpen() override; ~KafkaProvider(); - class Builder; + friend class Builder; + class Builder { + protected: + KafkaProvider *provider; + + public: + + std::string ip_key { "ip" }; + std::string port_key { "port" }; + std::string topic_key { "topic" }; + + Builder(); + Builder(std::string ip, int port, std::string topic); + ~Builder(); + + void reset(); + + Builder* configFrom(Configurators::BaseConfigurator *conf); + + Builder* setIp(std::string ip); + + Builder* setPort(int port); + + Builder* setTopic(std::string topic); + + KafkaProvider* getProvider(); + }; }; } diff --git a/src/Kafka_Provider.cpp b/src/Kafka_Provider.cpp index b2a8d24..34225b2 100755 --- a/src/Kafka_Provider.cpp +++ b/src/Kafka_Provider.cpp @@ -19,7 +19,7 @@ KafkaProvider::KafkaProvider(std::string ip, int port, std::string topic_name) { void KafkaProvider::pollingThreadFunction() { std::cout << "Kafka Provider: polling thread start..." << std::endl; while (!this->stopFlag) { - if(isOpen()) + if (isOpen()) this->producer->poll(0); sleep(1); // pool every second } @@ -57,15 +57,15 @@ int KafkaProvider::write(PacketLib::BasePacket &packet, std::string dest) { attempts += 1; } - std::cerr << "Kafka Provider: Failed to produce to topic " << dest << ": " << err - << std::endl; + std::cerr << "Kafka Provider: Failed to produce to topic " << dest << ": " + << err << std::endl; return -1; } int KafkaProvider::open() { - if(isOpen()) + if (isOpen()) close(); - RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); std::string errstr; if (conf->set("bootstrap.servers", std::string(this->brokerIp).append(":").append( @@ -82,7 +82,8 @@ int KafkaProvider::open() { this->producer = RdKafka::Producer::create(conf, errstr); if (!this->producer) { this->producer = nullptr; - std::cerr << "Kafka Provider: Failed to create producer: " << errstr << std::endl; + std::cerr << "Kafka Provider: Failed to create producer: " << errstr + << std::endl; return -1; } delete conf; @@ -117,63 +118,50 @@ KafkaProvider::~KafkaProvider() { close(); } +using K_B = KafkaProvider::Builder; +K_B::Builder() { + this->reset(); +} +K_B::Builder(std::string ip, int port, std::string topic) { + this->provider = new KafkaProvider(ip, port, topic); +} +K_B::~Builder() { + delete provider; +} -class KafkaProvider::Builder { -protected: - KafkaProvider *provider; - -public: - - std::string ip_key{"ip"}; - std::string port_key{"port"}; - std::string topic_key{"topic"}; - - Builder() { - this->reset(); - } - Builder(std::string ip, int port, std::string topic) { - this->provider = new KafkaProvider(ip, port, topic); - - } - ~Builder() { - delete provider; - } - - void reset() { - this->provider = new KafkaProvider(); - } +void K_B::reset() { + this->provider = new KafkaProvider(); +} - Builder* configFrom(Configurators::BaseConfigurator *conf) { - std::map params = - conf->readConfig(); - if (params.count(ip_key) > 0) - provider->brokerIp = params[ip_key]; - if (params.count(port_key) > 0) - provider->brokerPort = std::stoi(params[port_key]); - if (params.count(topic_key) > 0) - provider->setDest(params[topic_key]); - return this; - } +K_B* K_B::configFrom(Configurators::BaseConfigurator *conf) { + std::map params = conf->readConfig(); + if (params.count(ip_key) > 0) + provider->brokerIp = params[ip_key]; + if (params.count(port_key) > 0) + provider->brokerPort = std::stoi(params[port_key]); + if (params.count(topic_key) > 0) + provider->setDest(params[topic_key]); + return this; +} - Builder* setIp(std::string ip) { - provider->brokerIp = ip; - return this; - } +K_B* K_B::setIp(std::string ip) { + provider->brokerIp = ip; + return this; +} - Builder* setPort(int port) { - provider->brokerPort = port; - return this; - } +K_B* K_B::setPort(int port) { + provider->brokerPort = port; + return this; +} - Builder* setTopic(std::string topic) { - provider->setDest(topic); - return this; - } +K_B* K_B::setTopic(std::string topic) { + provider->setDest(topic); + return this; +} - KafkaProvider* getProvider() { - KafkaProvider *result = this->provider; - this->reset(); - return result; - } -}; +KafkaProvider* K_B::getProvider() { + KafkaProvider *result = this->provider; + this->reset(); + return result; +} -- GitLab From 6e72c1ca3af5508353ac9f265fec3c3219a38d0c Mon Sep 17 00:00:00 2001 From: astri Date: Sat, 3 Jun 2023 18:48:51 +0200 Subject: [PATCH 3/3] config refactorig ok --- deps/Base-DAQ | 2 +- src/Kafka_Provider.cpp | 31 +++++++++++++++++++------------ 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/deps/Base-DAQ b/deps/Base-DAQ index a2095e5..2b4e8b0 160000 --- a/deps/Base-DAQ +++ b/deps/Base-DAQ @@ -1 +1 @@ -Subproject commit a2095e59d57271e3bef3422a67c6905fd237467c +Subproject commit 2b4e8b039ec5da9ddd53720991c1b95f034e86fb diff --git a/src/Kafka_Provider.cpp b/src/Kafka_Provider.cpp index 34225b2..4eb679b 100755 --- a/src/Kafka_Provider.cpp +++ b/src/Kafka_Provider.cpp @@ -6,13 +6,12 @@ using namespace inaf::oasbo::Providers; -KafkaProvider::KafkaProvider() { - KafkaProvider("127.0.0.1", 9092, "DAQ_topic"); +KafkaProvider::KafkaProvider() : + KafkaProvider("127.0.0.1", 9092, "DAQ_topic") { } -KafkaProvider::KafkaProvider(std::string ip, int port, std::string topic_name) { - this->brokerIp = ip; - this->brokerPort = port; +KafkaProvider::KafkaProvider(std::string ip, int port, std::string topic_name) : + brokerIp(ip), brokerPort(port) { setDest(topic_name); } @@ -135,13 +134,21 @@ void K_B::reset() { } K_B* K_B::configFrom(Configurators::BaseConfigurator *conf) { - std::map params = conf->readConfig(); - if (params.count(ip_key) > 0) - provider->brokerIp = params[ip_key]; - if (params.count(port_key) > 0) - provider->brokerPort = std::stoi(params[port_key]); - if (params.count(topic_key) > 0) - provider->setDest(params[topic_key]); + std::string target = std::string("KafkaProvider"); + conf->readConfigFromSource(target); + std::map params = conf->getConfig(); + + std::string key = target+"_"+ip_key; + if (params.count(key) > 0) + provider->brokerIp = params[key]; + + key = target+"_"+port_key; + if (params.count(key) > 0) + provider->brokerPort = std::stoi(params[key]); + + key = target+"_"+topic_key; + if (params.count(key) > 0) + provider->setDest(params[key]); return this; } -- GitLab