Skip to content
Commits on Source (3)
Subproject commit 8a0ea2d0e699863df5fe1c91caf2d7b0855957be
Subproject commit a00f9a27afbf5f75dab7db2368b9b9b6fcb395e1
#pragma once
#include <Base_Receiver.h>
......@@ -9,24 +8,66 @@
#define RAW_FILES_PATH std::string(std::getenv("HOME")).append("/BIAS/config/RawTestFiles")
#endif
/**
* @brief The namespace inaf::oasbo::Receivers contains classes related to receiving data.
*/
namespace inaf::oasbo::Receivers {
/**
* @brief The FileReceiver class is a derived class of BaseReceiver and is responsible for receiving packets from files.
* check the Base_Receiver.h file for more information.
*/
class FileReceiver: public BaseReceiver {
protected:
/**
* @brief Constructs a FileReceiver object with a source and a rate.
* @param source The source files to receive.
* @param rate The rate at which the packets are received.
*/
FileReceiver(std::string source, int rate);
/**
* @brief Constructs a FileReceiver object with a source.
* @param source The source files to receive.
*/
FileReceiver(std::string source);
/**
* @brief Constructs a FileReceiver object.
*/
FileReceiver();
std::ifstream *ifile = nullptr;
ssize_t filesize = 0;
std::vector<std::string> filesToProcess;
size_t filesCount = 0;
std::ifstream *ifile = nullptr; /**< Pointer to the input file stream. */
ssize_t filesize = 0; /**< The size of the file being received. */
std::vector<std::string> filesToProcess; /**< Vector of files to process. */
size_t filesCount = 0; /**< The number of files to process. */
/**
* @brief Opens the next file to process.
* @return integer representing the status of the operation.
*/
int openNextFile();
void resetPacket(PacketLib::BasePacket &pack, int bytes);
/**
* @brief Resets the first bytes of the packet to 0.
* @param pack The packet to reset.
* @param bytes The number of bytes to reset.
*/
void resetPacket(Packets::BasePacket &pack, int bytes);
/**
* @brief Resolves the environment variable in the given path.
* @param path The path containing the environment variable.
* @return The resolved path.
*/
std::string resolveEnvVar(std::string path);
public:
int rate;
int rate; /**< The rate at which the packets are received. */
/**
* @brief Destructor for the FileReceiver class.
*/
~FileReceiver();
std::string getHost() override;
......@@ -35,25 +76,65 @@ public:
int connectToClient() override;
int closeConnectionToClient() override;
bool isConnectedToClient() const override;
int receiveFromClient(PacketLib::BasePacket&) override;
int receiveFromClient(Packets::BasePacket&) override;
friend class FileReceiverBuilder;
};
/**
* @brief The FileReceiverBuilder class is responsible for building FileReceiver objects.
*/
class FileReceiverBuilder {
protected:
FileReceiver *rcv;
std::string sourceFile;
FileReceiver *rcv; /**< Pointer to the FileReceiver object being built. */
std::string sourceFile; /**< The source file for the FileReceiver object. */
public:
std::string config_target { "filereceiver" };
std::string source_key { "source" };
std::string rate_key { "rate" };
std::string config_target { "filereceiver" }; /**< The configuration target for the FileReceiverBuilder. */
std::string source_key { "source" }; /**< The source key for the FileReceiverBuilder. */
std::string rate_key { "rate" }; /**< The rate key for the FileReceiverBuilder. */
/**
* @brief Constructs a FileReceiverBuilder object.
*/
FileReceiverBuilder();
/**
* @brief Destructor for the FileReceiverBuilder class.
*/
~FileReceiverBuilder();
/**
* @brief Resets the FileReceiverBuilder object.
*/
void reset();
/**
* @brief Configures the FileReceiverBuilder object from a BaseConfigurator object.
* @param conf The BaseConfigurator object to configure from.
* @return A pointer to the FileReceiverBuilder object.
*/
FileReceiverBuilder* configFrom(Configurators::BaseConfigurator &conf);
/**
* @brief Sets the source for the FileReceiverBuilder object.
* @param source The source of the files to receive.
* @return A pointer to the FileReceiverBuilder object.
*/
FileReceiverBuilder* setSource(std::string source);
/**
* @brief Sets the rate for the FileReceiverBuilder object.
* @param rate The rate at which the files are received.
* @return A pointer to the FileReceiverBuilder object.
*/
FileReceiverBuilder* setRate(int rate);
/**
* @brief Gets the built FileReceiver object.
* @return A pointer to the built FileReceiver object.
*/
FileReceiver* getReceiver();
};
}
} // namespace inaf::oasbo::Receivers
......@@ -13,16 +13,17 @@ void FileReceiverBuilder::reset() {
this->rcv = new FileReceiver();
}
FileReceiverBuilder* FileReceiverBuilder::configFrom(Configurators::BaseConfigurator &conf) {
FileReceiverBuilder* FileReceiverBuilder::configFrom(
Configurators::BaseConfigurator &conf) {
conf.readConfigFromSource(config_target);
std::map<std::string, std::string> params = conf.getConfig();
std::string key = config_target+"_"+source_key;
std::string key = config_target + "_" + source_key;
if (params.count(key) > 0)
rcv->setHost(params[key]);
key = config_target+"_"+rate_key;
if (params.count(key) > 0){
key = config_target + "_" + rate_key;
if (params.count(key) > 0) {
rcv->rate = std::stoi(params[key]);
}
return this;
......
......@@ -31,7 +31,7 @@ FileReceiver::FileReceiver(std::string source, int rate) :
this->setHost(source);
}
int FileReceiver::receiveFromClient(PacketLib::BasePacket &pack) {
int FileReceiver::receiveFromClient(Packets::BasePacket &pack) {
if (ifile->tellg() == filesize) { // end of file, open next
if (!openNextFile()) { // all file processed
closeConnectionToClient();
......@@ -75,18 +75,21 @@ int FileReceiver::connectToClient() {
namespace fs = std::filesystem;
std::string s = resolveEnvVar(host);
fs::path source(s);
if (!std::filesystem::exists(source)) {
time_t now = time(nullptr);
std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") <<"]\t[File Receiver]\t" << host << " does not exists!" << std::endl;
return -1;
}
if (!std::filesystem::exists(source)) {
time_t now = time(nullptr);
std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[File Receiver]\t" << host << " does not exists!"
<< std::endl;
return -1;
}
if (std::filesystem::is_regular_file(source)) { // host is only one file, not a folder.
filesToProcess.push_back(source);
}
else { // host is a folder, so iterates over all files into the folder
for (const auto &entry : std::filesystem::recursive_directory_iterator(source)) {
for (const auto &entry : std::filesystem::recursive_directory_iterator(
source)) {
if (entry.is_regular_file()) {
filesToProcess.push_back(entry.path().string());
}
......@@ -94,7 +97,7 @@ int FileReceiver::connectToClient() {
}
// sort alphabetically
std::sort(filesToProcess.begin(), filesToProcess.end());
std::sort(filesToProcess.begin(), filesToProcess.end());
if (!openNextFile()) // try to open first file
return -1;
......@@ -127,7 +130,7 @@ FileReceiver::~FileReceiver() {
}
}
void FileReceiver::resetPacket(PacketLib::BasePacket &pack, int bytes) {
void FileReceiver::resetPacket(Packets::BasePacket &pack, int bytes) {
uint8_t *buff = new uint8_t[bytes];
std::memset(buff, 0, bytes);
int toBeReset = std::min(
......@@ -141,27 +144,30 @@ int FileReceiver::openNextFile() {
ifile = new std::ifstream(filesToProcess.at(filesCount),
std::ios::binary);
if (!ifile->is_open()) {
time_t now = time(nullptr);
std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") <<"]\t[File Receiver]\t" << "Cannot open "
time_t now = time(nullptr);
std::cerr << "["
<< std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[File Receiver]\t" << "Cannot open "
<< filesToProcess.at(filesCount) << std::endl;
ifile = nullptr;
filesCount += 1;
continue; // go to next file
}
time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") <<"]\t[File Receiver]\t" << "Processing " << filesToProcess.at(filesCount) << std::endl;
time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[File Receiver]\t" << "Processing "
<< filesToProcess.at(filesCount) << std::endl;
// compute filesize
ifile->seekg(0, std::ios::end);
filesize = ifile->tellg();
ifile->seekg(0, std::ios::beg);
filesCount += 1;
return 1;
}
return 0; // all file processed
}
std::string FileReceiver::resolveEnvVar(std::string path) {
// resolve env var if present
if (path.at(0) == '$') {
......