Skip to content
Commits on Source (3)
cmake_minimum_required(VERSION 3.9) cmake_minimum_required(VERSION 3.9)
project(Redis_Receiver) project(Redis_Receiver)
set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall")
option(REDISRECEIVER_BUILD_SHARED "Build Redisreceiver as a shared library." OFF) option(REDISRECEIVER_BUILD_SHARED "Build Redisreceiver as a shared library." OFF)
......
Subproject commit 8a0ea2d0e699863df5fe1c91caf2d7b0855957be Subproject commit a00f9a27afbf5f75dab7db2368b9b9b6fcb395e1
#pragma once #pragma once
#include <Base_Configurator.h> #include <Base_Configurator.h>
#include <hiredis/hiredis.h> #include <hiredis/hiredis.h>
#include <Base_Receiver.h> #include <Base_Receiver.h>
/**
* @brief Namespace containing classes related to receiving data from Redis.
*/
namespace inaf::oasbo::Receivers { namespace inaf::oasbo::Receivers {
/**
* @brief Class for receiving data from Redis.
* @details This class inherits from BaseReceiver and provides functionality to connect to a Redis server,
* receive data from it, and manage the connection.
* check the Base_Receiver.h file for more information.
*/
class RedisReceiver: public BaseReceiver { class RedisReceiver: public BaseReceiver {
protected: protected:
redisContext *context = nullptr; redisContext *context = nullptr; /**< Pointer to the Redis context. */
std::string ip { }; std::string ip { }; /**< IP address of the Redis server. */
int port; int port; /**< Port number of the Redis server. */
std::string key; std::string key; /**< Key used to access data in the Redis server. */
/**
* @brief Constructor for RedisReceiver.
* @param ip The IP address of the Redis server.
* @param prt The port number of the Redis server.
* @param key The key used to access data in the Redis server.
*/
RedisReceiver(std::string ip, int prt, std::string key); RedisReceiver(std::string ip, int prt, std::string key);
/**
* @brief Default constructor for RedisReceiver.
*/
RedisReceiver(); RedisReceiver();
/**
* @brief Splits the IP address, port number, and key from a string.
* @param ip_port_key The string containing the IP address, port number, and key.
* @param ip The extracted IP address.
* @param port The extracted port number.
* @param key The extracted key.
* @return True if the string was successfully split, false otherwise.
*/
bool split_ip_port_key(const std::string &ip_port_key, std::string &ip, bool split_ip_port_key(const std::string &ip_port_key, std::string &ip,
int &port, std::string &key); int &port, std::string &key);
void resetPacket(PacketLib::BasePacket&, int bytes);
/**
* @brief Resets the first bytes of the packet to 0.
* @param pack The packet to reset.
* @param bytes The number of bytes to reset.
*/
void resetPacket(Packets::BasePacket &packet, int bytes);
public: public:
/**
* @brief Destructor for RedisReceiver.
*/
~RedisReceiver(); ~RedisReceiver();
int check_interval; int check_interval; /**< The interval at which to check for new data in Redis. */
/**
* @brief Get the host name of the Redis server.
* @return The host name.
*/
std::string getHost() override; std::string getHost() override;
/**
* @brief Set the host name of the Redis server.
* @param host The host name to set.
*/
void setHost(std::string host) override; void setHost(std::string host) override;
void setIp(std::string); /**
* @brief Set the IP address of the Redis server.
* @param ip The IP address to set.
*/
void setIp(std::string ip);
/**
* @brief Get the IP address of the Redis server.
* @return The IP address.
*/
std::string getIp() { std::string getIp() {
return this->ip; return this->ip;
} }
void setPort(int);
/**
* @brief Set the port number of the Redis server.
* @param port The port number to set.
*/
void setPort(int port);
/**
* @brief Get the port number of the Redis server.
* @return The port number.
*/
int getPort() { int getPort() {
return this->port; return this->port;
} }
void setKey(std::string);
/**
* @brief Set the key used to access data in the Redis server.
* @param key The key to set.
*/
void setKey(std::string key);
/**
* @brief Get the key used to access data in the Redis server.
* @return The key.
*/
std::string getKey() { std::string getKey() {
return this->key; return this->key;
} }
int connectToClient() override; int connectToClient() override;
int closeConnectionToClient() override; int closeConnectionToClient() override;
bool isConnectedToClient() const override; bool isConnectedToClient() const override;
int receiveFromClient(PacketLib::BasePacket&) override;
int receiveFromClient(Packets::BasePacket &packet) override;
friend class RedisReceiverBuilder; friend class RedisReceiverBuilder;
}; };
/**
* @brief Builder class for creating RedisReceiver objects.
*/
class RedisReceiverBuilder { class RedisReceiverBuilder {
protected: protected:
RedisReceiver *receiver; RedisReceiver *receiver; /**< Pointer to the RedisReceiver object being built. */
public: public:
std::string config_target { "redisreceiver" }; std::string config_target { "redisreceiver" }; /**< The configuration target for the builder. */
std::string ip_key { "ip" }; std::string ip_key { "ip" }; /**< The configuration key for the IP address. */
std::string port_key { "port" }; std::string port_key { "port" }; /**< The configuration key for the port number. */
std::string key_key { "key" }; std::string key_key { "key" }; /**< The configuration key for the Redis key. */
std::string checkinterval_key { "checkinterval" }; std::string checkinterval_key { "checkinterval" }; /**< The configuration key for the check interval. */
/**
* @brief Default constructor for RedisReceiverBuilder.
*/
RedisReceiverBuilder(); RedisReceiverBuilder();
/**
* @brief Constructor for RedisReceiverBuilder.
* @param ip The IP address of the Redis server.
* @param port The port number of the Redis server.
* @param key The key used to access data in the Redis server.
*/
RedisReceiverBuilder(std::string ip, int port, std::string key); RedisReceiverBuilder(std::string ip, int port, std::string key);
/**
* @brief Destructor for RedisReceiverBuilder.
*/
~RedisReceiverBuilder(); ~RedisReceiverBuilder();
/**
* @brief Reset the builder to its initial state.
*/
void reset(); void reset();
/**
* @brief Configure the builder from a configurator object.
* @param conf The configurator object to configure from.
* @return A pointer to the builder.
*/
RedisReceiverBuilder* configFrom(Configurators::BaseConfigurator &conf); RedisReceiverBuilder* configFrom(Configurators::BaseConfigurator &conf);
/**
* @brief Set the IP address of the Redis server.
* @param ip The IP address to set.
* @return A pointer to the builder.
*/
RedisReceiverBuilder* setIp(std::string ip); RedisReceiverBuilder* setIp(std::string ip);
/**
* @brief Set the port number of the Redis server.
* @param port The port number to set.
* @return A pointer to the builder.
*/
RedisReceiverBuilder* setPort(int port); RedisReceiverBuilder* setPort(int port);
/**
* @brief Set the key used to access data in the Redis server.
* @param key The key to set.
* @return A pointer to the builder.
*/
RedisReceiverBuilder* setKey(std::string key); RedisReceiverBuilder* setKey(std::string key);
/**
* @brief Get the built RedisReceiver object.
* @return A pointer to the RedisReceiver object.
*/
RedisReceiver* getReceiver(); RedisReceiver* getReceiver();
}; };
} }
...@@ -51,8 +51,7 @@ int RedisReceiver::connectToClient() { ...@@ -51,8 +51,7 @@ int RedisReceiver::connectToClient() {
} }
time_t now = time(nullptr); time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[Redis Receiver]\t" << "Connected" << "]\t[Redis Receiver]\t" << "Connected" << std::endl;
<< std::endl;
return 1; return 1;
} }
...@@ -68,7 +67,7 @@ bool RedisReceiver::isConnectedToClient() const { ...@@ -68,7 +67,7 @@ bool RedisReceiver::isConnectedToClient() const {
return !(context == nullptr || context->err); return !(context == nullptr || context->err);
} }
int RedisReceiver::receiveFromClient(PacketLib::BasePacket &pack) { int RedisReceiver::receiveFromClient(Packets::BasePacket &pack) {
if (!isConnectedToClient()) { if (!isConnectedToClient()) {
if (context) { if (context) {
time_t now = time(nullptr); time_t now = time(nullptr);
...@@ -92,25 +91,24 @@ int RedisReceiver::receiveFromClient(PacketLib::BasePacket &pack) { ...@@ -92,25 +91,24 @@ int RedisReceiver::receiveFromClient(PacketLib::BasePacket &pack) {
std::this_thread::sleep_for(std::chrono::seconds(this->check_interval)); std::this_thread::sleep_for(std::chrono::seconds(this->check_interval));
return -1; return -1;
} }
if(r->type == REDIS_REPLY_NIL){ if (r->type == REDIS_REPLY_NIL) {
freeReplyObject(r); freeReplyObject(r);
std::this_thread::sleep_for(std::chrono::seconds(this->check_interval)); std::this_thread::sleep_for(std::chrono::seconds(this->check_interval));
return 0; return 0;
} }
if (r->type == REDIS_REPLY_ERROR ) { if (r->type == REDIS_REPLY_ERROR) {
freeReplyObject(r); freeReplyObject(r);
std::this_thread::sleep_for(std::chrono::seconds(this->check_interval)); std::this_thread::sleep_for(std::chrono::seconds(this->check_interval));
return -1; return -1;
} }
if (r->type == REDIS_REPLY_STRING && r->type != REDIS_REPLY_NIL) { if (r->type == REDIS_REPLY_STRING && r->type != REDIS_REPLY_NIL) { // Received
int size = r->len; int size = r->len;
ssize_t header_size = pack.getHeaderSize(); ssize_t header_size = pack.getHeaderSize();
if (size < header_size) { if (size < header_size) {
freeReplyObject(r); freeReplyObject(r);
return -1; return -1;
} }
pack.copyToMemory(reinterpret_cast<uint8_t*>(r->str), pack.copyToMemory(reinterpret_cast<uint8_t*>(r->str), header_size);
header_size);
ssize_t tot_packet_size = pack.getHeaderSize() + pack.getPayloadSize() ssize_t tot_packet_size = pack.getHeaderSize() + pack.getPayloadSize()
+ pack.getTailSize(); + pack.getTailSize();
...@@ -203,7 +201,7 @@ bool RedisReceiver::split_ip_port_key(const std::string &ip_port_key, ...@@ -203,7 +201,7 @@ bool RedisReceiver::split_ip_port_key(const std::string &ip_port_key,
return true; return true;
} }
void RedisReceiver::resetPacket(PacketLib::BasePacket &pack, int bytes) { void RedisReceiver::resetPacket(Packets::BasePacket &pack, int bytes) {
uint8_t *buff = new uint8_t[bytes]; uint8_t *buff = new uint8_t[bytes];
std::memset(buff, 0, bytes); std::memset(buff, 0, bytes);
int toBeReset = std::min( int toBeReset = std::min(
......