diff --git a/CMakeLists.txt b/CMakeLists.txt index 159b76f761a6c730528db490c8341b0036459c68..f842ecf8077c0d64839454bbc7c3503b9edc6e4d 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,7 +13,6 @@ option(KAFKAPROVIDER_BUILD_SHARED "Build Kafkaprovider as a shared library." OFF # sources set(SOURCES src/Kafka_Provider.cpp - src/Kafka_Provider_Configurator.cpp ) # target diff --git a/deps/Base-DAQ b/deps/Base-DAQ index a2095e59d57271e3bef3422a67c6905fd237467c..2b4e8b039ec5da9ddd53720991c1b95f034e86fb 160000 --- a/deps/Base-DAQ +++ b/deps/Base-DAQ @@ -1 +1 @@ -Subproject commit a2095e59d57271e3bef3422a67c6905fd237467c +Subproject commit 2b4e8b039ec5da9ddd53720991c1b95f034e86fb diff --git a/include/Kafka_Provider.h b/include/Kafka_Provider.h index f318b86e680be0cd75cbd4d5be05e1865f5e2c58..ba68044effe8b558cf72829424ddf5fd709d166f 100755 --- a/include/Kafka_Provider.h +++ b/include/Kafka_Provider.h @@ -12,40 +12,68 @@ #include #include #include - +#include namespace inaf::oasbo::Providers { -class KafkaProvider : public BaseProvider{ +class KafkaProvider: public BaseProvider { protected: - std::string brokerIp; - int brokerPort; + RdKafka::Producer *producer = nullptr; std::atomic stopFlag = false; std::thread pollThread; DeliveryReportCb *dr_cb = nullptr; - std::string getBrokerIp(){return this->brokerIp;} - int getBrokerPort(){return this->brokerPort;} - void setBrokerIp(std::string ip){this->brokerIp=ip;} - void setBrokerPort(int port){this->brokerPort=port;} - void pollingThreadFunction(); - -public: KafkaProvider(); KafkaProvider(std::string ip, int port, std::string topic); - void setDest(std::string dest){this->dest = dest;} - std::string getDest(){return dest;} +public: + + std::string brokerIp; + int brokerPort; - int write(PacketLib::BasePacket &); - int write(PacketLib::BasePacket &, std::string dest); + void setDest(std::string dest) override { + this->dest = dest; + } + std::string getDest() override { + return dest; + } - int close(); - int open(); - bool isOpen(); + int write(PacketLib::BasePacket&) override; + int write(PacketLib::BasePacket&, std::string dest) override; + + int close() override; + int open() override; + bool isOpen() override; ~KafkaProvider(); + friend class Builder; + class Builder { + protected: + KafkaProvider *provider; + + public: + + std::string ip_key { "ip" }; + std::string port_key { "port" }; + std::string topic_key { "topic" }; + + Builder(); + Builder(std::string ip, int port, std::string topic); + ~Builder(); + + void reset(); + + Builder* configFrom(Configurators::BaseConfigurator *conf); + + Builder* setIp(std::string ip); + + Builder* setPort(int port); + + Builder* setTopic(std::string topic); + + KafkaProvider* getProvider(); + }; }; } diff --git a/src/Kafka_Provider.cpp b/src/Kafka_Provider.cpp index 4b7dcff28dad04742a026c48a1373b20a87efb83..4eb679b48dc6287567f72bdee5342a1bd8780d32 100755 --- a/src/Kafka_Provider.cpp +++ b/src/Kafka_Provider.cpp @@ -2,23 +2,23 @@ #include #include #include +#include using namespace inaf::oasbo::Providers; -KafkaProvider::KafkaProvider() { - KafkaProvider("127.0.0.1", 9092, "DAQ_topic"); +KafkaProvider::KafkaProvider() : + KafkaProvider("127.0.0.1", 9092, "DAQ_topic") { } -KafkaProvider::KafkaProvider(std::string ip, int port, std::string topic_name) { +KafkaProvider::KafkaProvider(std::string ip, int port, std::string topic_name) : + brokerIp(ip), brokerPort(port) { setDest(topic_name); - setBrokerIp(ip); - setBrokerPort(port); } void KafkaProvider::pollingThreadFunction() { std::cout << "Kafka Provider: polling thread start..." << std::endl; while (!this->stopFlag) { - if(isOpen()) + if (isOpen()) this->producer->poll(0); sleep(1); // pool every second } @@ -56,15 +56,15 @@ int KafkaProvider::write(PacketLib::BasePacket &packet, std::string dest) { attempts += 1; } - std::cerr << "Kafka Provider: Failed to produce to topic " << dest << ": " << err - << std::endl; + std::cerr << "Kafka Provider: Failed to produce to topic " << dest << ": " + << err << std::endl; return -1; } int KafkaProvider::open() { - if(isOpen()) + if (isOpen()) close(); - RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); std::string errstr; if (conf->set("bootstrap.servers", std::string(this->brokerIp).append(":").append( @@ -81,7 +81,8 @@ int KafkaProvider::open() { this->producer = RdKafka::Producer::create(conf, errstr); if (!this->producer) { this->producer = nullptr; - std::cerr << "Kafka Provider: Failed to create producer: " << errstr << std::endl; + std::cerr << "Kafka Provider: Failed to create producer: " << errstr + << std::endl; return -1; } delete conf; @@ -115,3 +116,59 @@ bool KafkaProvider::isOpen() { KafkaProvider::~KafkaProvider() { close(); } + +using K_B = KafkaProvider::Builder; +K_B::Builder() { + this->reset(); +} +K_B::Builder(std::string ip, int port, std::string topic) { + this->provider = new KafkaProvider(ip, port, topic); + +} +K_B::~Builder() { + delete provider; +} + +void K_B::reset() { + this->provider = new KafkaProvider(); +} + +K_B* K_B::configFrom(Configurators::BaseConfigurator *conf) { + std::string target = std::string("KafkaProvider"); + conf->readConfigFromSource(target); + std::map params = conf->getConfig(); + + std::string key = target+"_"+ip_key; + if (params.count(key) > 0) + provider->brokerIp = params[key]; + + key = target+"_"+port_key; + if (params.count(key) > 0) + provider->brokerPort = std::stoi(params[key]); + + key = target+"_"+topic_key; + if (params.count(key) > 0) + provider->setDest(params[key]); + return this; +} + +K_B* K_B::setIp(std::string ip) { + provider->brokerIp = ip; + return this; +} + +K_B* K_B::setPort(int port) { + provider->brokerPort = port; + return this; +} + +K_B* K_B::setTopic(std::string topic) { + provider->setDest(topic); + return this; +} + +KafkaProvider* K_B::getProvider() { + KafkaProvider *result = this->provider; + this->reset(); + return result; +}