Skip to content
Commits on Source (4)
Subproject commit 8a0ea2d0e699863df5fe1c91caf2d7b0855957be
Subproject commit a00f9a27afbf5f75dab7db2368b9b9b6fcb395e1
#pragma once
#include <librdkafka/rdkafkacpp.h>
class DeliveryReportCb: public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &message) {
/* If message.err() is non-zero the message delivery failed permanently
* for the message. */
if (message.err())
std::cerr << "Kafka Provider error: Message delivery failed: " << message.errstr()
<< std::endl;
}
};
......@@ -5,71 +5,127 @@
#include <librdkafka/rdkafkacpp.h>
#include <atomic>
#include <thread>
#include <Delivery_Report.h>
#include <Base_Configurator.h>
/**
* @brief The namespace inaf::oasbo::Providers contains classes related to providers.
*/
namespace inaf::oasbo::Providers {
/**
* @brief The KafkaProvider class is a derived class of BaseProvider and provides functionality for sendig packets to Kafka.
* BaseProvider.h for more information.
*/
class KafkaProvider: public BaseProvider {
protected:
RdKafka::Producer *producer = nullptr;
std::atomic<bool> stopFlag = false;
std::thread pollThread;
DeliveryReportCb *dr_cb = nullptr;
RdKafka::Producer *producer = nullptr; /**< Pointer to the Kafka producer object. */
std::atomic<bool> stopFlag = false; /**< Atomic flag to control the polling thread. */
std::thread pollThread; /**< Thread for polling Kafka events. */
class DeliveryReportCb;
DeliveryReportCb *dr_cb = nullptr; /**< Pointer to the delivery report callback object. */
void pollingThreadFunction();
void pollingThreadFunction(); /**< Function executed by the polling thread. */
KafkaProvider();
KafkaProvider(std::string ip, int port, std::string topic);
KafkaProvider(); /**< Default constructor. */
KafkaProvider(std::string ip, int port, std::string topic); /**< Constructor with parameters. */
public:
std::string brokerIp;
int brokerPort;
std::string brokerIp; /**< IP address of the Kafka broker. */
int brokerPort; /**< Port number of the Kafka broker. */
void setDest(std::string dest) override;
std::string getDest() override;
void setDest(std::string dest) override {
this->dest = dest;
}
std::string getDest() override {
return dest;
}
int write(Packets::BasePacket&) override;
int write(PacketLib::BasePacket&) override;
int write(PacketLib::BasePacket&, std::string dest) override;
int write(Packets::BasePacket&, std::string dest) override;
int close() override;
int open() override;
bool isOpen() override;
/**
* @brief Destructor.
*/
~KafkaProvider();
friend class KafkaProviderBuilder;
};
/**
* @brief The KafkaProviderBuilder class is responsible for building KafkaProvider objects.
*/
class KafkaProviderBuilder {
protected:
KafkaProvider *provider;
KafkaProvider *provider; /**< Pointer to the KafkaProvider object being built. */
public:
std::string config_target { "kafkaprovider" };
std::string ip_key { "ip" };
std::string port_key { "port" };
std::string topic_key { "topic" };
std::string config_target { "kafkaprovider" }; /**< Configuration target for the Kafka provider. */
std::string ip_key { "ip" }; /**< Configuration key for the IP address. */
std::string port_key { "port" }; /**< Configuration key for the port number. */
std::string topic_key { "topic" }; /**< Configuration key for the topic. */
/**
* @brief Default constructor.
*/
KafkaProviderBuilder();
/**
* @brief Constructor with parameters.
* @param ip The IP address of the Kafka broker.
* @param port The port number of the Kafka broker.
* @param topic The topic to subscribe to.
*/
KafkaProviderBuilder(std::string ip, int port, std::string topic);
/**
* @brief Destructor.
*/
~KafkaProviderBuilder();
/**
* @brief Resets the builder to its initial state.
*/
void reset();
/**
* @brief Configures the builder from a BaseConfigurator object.
* @param conf The BaseConfigurator object to configure from.
* @return A pointer to the KafkaProviderBuilder object.
*/
KafkaProviderBuilder* configFrom(Configurators::BaseConfigurator &conf);
/**
* @brief Sets the IP address for the Kafka provider.
* @param ip The IP address to set.
* @return A pointer to the KafkaProviderBuilder object.
*/
KafkaProviderBuilder* setIp(std::string ip);
/**
* @brief Sets the port number for the Kafka provider.
* @param port The port number to set.
* @return A pointer to the KafkaProviderBuilder object.
*/
KafkaProviderBuilder* setPort(int port);
/**
* @brief Sets the topic for the Kafka provider.
* @param topic The topic to set.
* @return A pointer to the KafkaProviderBuilder object.
*/
KafkaProviderBuilder* setTopic(std::string topic);
/**
* @brief Gets the built KafkaProvider object.
* @return A pointer to the KafkaProvider object.
*/
KafkaProvider* getProvider();
};
}
......@@ -2,12 +2,26 @@
#include <memory>
#include <chrono>
#include <unistd.h>
#include <ctime>
#include <iomanip>
using namespace inaf::oasbo::Providers;
class KafkaProvider::DeliveryReportCb: public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &message) {
/* If message.err() is non-zero the message delivery failed permanently
* for the message. */
if (message.err()) {
time_t now = time(nullptr);
std::cerr << "["
<< std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[Kafka Provider]\t" << "Message delivery failed"
<< std::endl;
}
}
};
KafkaProvider::KafkaProvider() :
KafkaProvider("127.0.0.1", 9092, "Astri_ADAS_topic") {
}
......@@ -18,20 +32,23 @@ KafkaProvider::KafkaProvider(std::string ip, int port, std::string topic_name) :
}
void KafkaProvider::pollingThreadFunction() {
std::cout << "Kafka Provider: polling thread start..." << std::endl;
time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[Kafka Provider]\t" << "Polling thread start" << std::endl;
while (!this->stopFlag) {
if (isOpen())
this->producer->poll(0);
std::this_thread::sleep_for(std::chrono::seconds(1)); // pool every second
}
std::cout << "Kafka Provider: polling thread exit..." << std::endl;
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[Kafka Provider]\t" << "Polling thread stop" << std::endl;
}
int KafkaProvider::write(PacketLib::BasePacket &packet) {
int KafkaProvider::write(Packets::BasePacket &packet) {
return KafkaProvider::write(packet, this->dest);
}
int KafkaProvider::write(PacketLib::BasePacket &packet, std::string dest) {
int KafkaProvider::write(Packets::BasePacket &packet, std::string dest) {
if (!isOpen()) {
time_t now = time(nullptr);
std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
......@@ -151,6 +168,15 @@ bool KafkaProvider::isOpen() {
return this->producer != nullptr;
}
void KafkaProvider::setDest(std::string dest){
this->dest = dest;
}
std::string KafkaProvider::getDest(){
return this->dest;
}
KafkaProvider::~KafkaProvider() {
close();
}
......