Skip to content
GitLab
Explore
Sign in
BIAS
Providers
Kafka Provider
Compare revisions
3bc7181896b908f1bb89350f362394f9eae942c2 to bc62173caeaf98587b7d6dc900cd38f40206c5cc
Commits on Source (4)
adding doxygen
· ec892ad3
Valerio Pastore
authored
Jan 12, 2024
ec892ad3
.
· aa817876
Valerio Pastore
authored
Jan 12, 2024
aa817876
.
· 5b7dc0ff
Valerio Pastore
authored
Jan 12, 2024
5b7dc0ff
Merge branch 'dev' into 'main'
· bc62173c
Valerio Pastore
authored
Jan 14, 2024
Dev See merge request
!2
bc62173c
Hide whitespace changes
Inline
Side-by-side
Base-DAQ
@
a00f9a27
Compare
8a0ea2d0
...
a00f9a27
Subproject commit
8
a0
ea2d0e699863df5fe1c91caf2d7b0855957be
Subproject commit a0
0f9a27afbf5f75dab7db2368b9b9b6fcb395e1
include/Delivery_Report.h
View file @
bc62173c
#pragma once
#include
<librdkafka/rdkafkacpp.h>
class
DeliveryReportCb
:
public
RdKafka
::
DeliveryReportCb
{
public:
void
dr_cb
(
RdKafka
::
Message
&
message
)
{
/* If message.err() is non-zero the message delivery failed permanently
* for the message. */
if
(
message
.
err
())
std
::
cerr
<<
"Kafka Provider error: Message delivery failed: "
<<
message
.
errstr
()
<<
std
::
endl
;
}
};
include/Kafka_Provider.h
View file @
bc62173c
...
...
@@ -5,71 +5,127 @@
#include
<librdkafka/rdkafkacpp.h>
#include
<atomic>
#include
<thread>
#include
<Delivery_Report.h>
#include
<Base_Configurator.h>
/**
* @brief The namespace inaf::oasbo::Providers contains classes related to providers.
*/
namespace
inaf
::
oasbo
::
Providers
{
/**
* @brief The KafkaProvider class is a derived class of BaseProvider and provides functionality for sendig packets to Kafka.
* BaseProvider.h for more information.
*/
class
KafkaProvider
:
public
BaseProvider
{
protected:
RdKafka
::
Producer
*
producer
=
nullptr
;
std
::
atomic
<
bool
>
stopFlag
=
false
;
std
::
thread
pollThread
;
DeliveryReportCb
*
dr_cb
=
nullptr
;
RdKafka
::
Producer
*
producer
=
nullptr
;
/**< Pointer to the Kafka producer object. */
std
::
atomic
<
bool
>
stopFlag
=
false
;
/**< Atomic flag to control the polling thread. */
std
::
thread
pollThread
;
/**< Thread for polling Kafka events. */
class
DeliveryReportCb
;
DeliveryReportCb
*
dr_cb
=
nullptr
;
/**< Pointer to the delivery report callback object. */
void
pollingThreadFunction
();
void
pollingThreadFunction
();
/**< Function executed by the polling thread. */
KafkaProvider
();
KafkaProvider
(
std
::
string
ip
,
int
port
,
std
::
string
topic
);
KafkaProvider
();
/**< Default constructor. */
KafkaProvider
(
std
::
string
ip
,
int
port
,
std
::
string
topic
);
/**< Constructor with parameters. */
public:
std
::
string
brokerIp
;
int
brokerPort
;
std
::
string
brokerIp
;
/**< IP address of the Kafka broker. */
int
brokerPort
;
/**< Port number of the Kafka broker. */
void
setDest
(
std
::
string
dest
)
override
;
std
::
string
getDest
()
override
;
void
setDest
(
std
::
string
dest
)
override
{
this
->
dest
=
dest
;
}
std
::
string
getDest
()
override
{
return
dest
;
}
int
write
(
Packets
::
BasePacket
&
)
override
;
int
write
(
PacketLib
::
BasePacket
&
)
override
;
int
write
(
PacketLib
::
BasePacket
&
,
std
::
string
dest
)
override
;
int
write
(
Packets
::
BasePacket
&
,
std
::
string
dest
)
override
;
int
close
()
override
;
int
open
()
override
;
bool
isOpen
()
override
;
/**
* @brief Destructor.
*/
~
KafkaProvider
();
friend
class
KafkaProviderBuilder
;
};
/**
* @brief The KafkaProviderBuilder class is responsible for building KafkaProvider objects.
*/
class
KafkaProviderBuilder
{
protected:
KafkaProvider
*
provider
;
KafkaProvider
*
provider
;
/**< Pointer to the KafkaProvider object being built. */
public:
std
::
string
config_target
{
"kafkaprovider"
};
std
::
string
ip_key
{
"ip"
};
std
::
string
port_key
{
"port"
};
std
::
string
topic_key
{
"topic"
};
std
::
string
config_target
{
"kafkaprovider"
};
/**< Configuration target for the Kafka provider. */
std
::
string
ip_key
{
"ip"
};
/**< Configuration key for the IP address. */
std
::
string
port_key
{
"port"
};
/**< Configuration key for the port number. */
std
::
string
topic_key
{
"topic"
};
/**< Configuration key for the topic. */
/**
* @brief Default constructor.
*/
KafkaProviderBuilder
();
/**
* @brief Constructor with parameters.
* @param ip The IP address of the Kafka broker.
* @param port The port number of the Kafka broker.
* @param topic The topic to subscribe to.
*/
KafkaProviderBuilder
(
std
::
string
ip
,
int
port
,
std
::
string
topic
);
/**
* @brief Destructor.
*/
~
KafkaProviderBuilder
();
/**
* @brief Resets the builder to its initial state.
*/
void
reset
();
/**
* @brief Configures the builder from a BaseConfigurator object.
* @param conf The BaseConfigurator object to configure from.
* @return A pointer to the KafkaProviderBuilder object.
*/
KafkaProviderBuilder
*
configFrom
(
Configurators
::
BaseConfigurator
&
conf
);
/**
* @brief Sets the IP address for the Kafka provider.
* @param ip The IP address to set.
* @return A pointer to the KafkaProviderBuilder object.
*/
KafkaProviderBuilder
*
setIp
(
std
::
string
ip
);
/**
* @brief Sets the port number for the Kafka provider.
* @param port The port number to set.
* @return A pointer to the KafkaProviderBuilder object.
*/
KafkaProviderBuilder
*
setPort
(
int
port
);
/**
* @brief Sets the topic for the Kafka provider.
* @param topic The topic to set.
* @return A pointer to the KafkaProviderBuilder object.
*/
KafkaProviderBuilder
*
setTopic
(
std
::
string
topic
);
/**
* @brief Gets the built KafkaProvider object.
* @return A pointer to the KafkaProvider object.
*/
KafkaProvider
*
getProvider
();
};
}
src/Kafka_Provider.cpp
View file @
bc62173c
...
...
@@ -2,12 +2,26 @@
#include
<memory>
#include
<chrono>
#include
<unistd.h>
#include
<ctime>
#include
<iomanip>
using
namespace
inaf
::
oasbo
::
Providers
;
class
KafkaProvider
::
DeliveryReportCb
:
public
RdKafka
::
DeliveryReportCb
{
public:
void
dr_cb
(
RdKafka
::
Message
&
message
)
{
/* If message.err() is non-zero the message delivery failed permanently
* for the message. */
if
(
message
.
err
())
{
time_t
now
=
time
(
nullptr
);
std
::
cerr
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[Kafka Provider]
\t
"
<<
"Message delivery failed"
<<
std
::
endl
;
}
}
};
KafkaProvider
::
KafkaProvider
()
:
KafkaProvider
(
"127.0.0.1"
,
9092
,
"Astri_ADAS_topic"
)
{
}
...
...
@@ -18,20 +32,23 @@ KafkaProvider::KafkaProvider(std::string ip, int port, std::string topic_name) :
}
void
KafkaProvider
::
pollingThreadFunction
()
{
std
::
cout
<<
"Kafka Provider: polling thread start..."
<<
std
::
endl
;
time_t
now
=
time
(
nullptr
);
std
::
cout
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[Kafka Provider]
\t
"
<<
"Polling thread start"
<<
std
::
endl
;
while
(
!
this
->
stopFlag
)
{
if
(
isOpen
())
this
->
producer
->
poll
(
0
);
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
seconds
(
1
));
// pool every second
}
std
::
cout
<<
"Kafka Provider: polling thread exit..."
<<
std
::
endl
;
std
::
cout
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[Kafka Provider]
\t
"
<<
"Polling thread stop"
<<
std
::
endl
;
}
int
KafkaProvider
::
write
(
Packet
Lib
::
BasePacket
&
packet
)
{
int
KafkaProvider
::
write
(
Packet
s
::
BasePacket
&
packet
)
{
return
KafkaProvider
::
write
(
packet
,
this
->
dest
);
}
int
KafkaProvider
::
write
(
Packet
Lib
::
BasePacket
&
packet
,
std
::
string
dest
)
{
int
KafkaProvider
::
write
(
Packet
s
::
BasePacket
&
packet
,
std
::
string
dest
)
{
if
(
!
isOpen
())
{
time_t
now
=
time
(
nullptr
);
std
::
cerr
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
...
...
@@ -151,6 +168,15 @@ bool KafkaProvider::isOpen() {
return
this
->
producer
!=
nullptr
;
}
void
KafkaProvider
::
setDest
(
std
::
string
dest
){
this
->
dest
=
dest
;
}
std
::
string
KafkaProvider
::
getDest
(){
return
this
->
dest
;
}
KafkaProvider
::~
KafkaProvider
()
{
close
();
}
...
...