Skip to content
Commits on Source (4)
Subproject commit 8a0ea2d0e699863df5fe1c91caf2d7b0855957be
Subproject commit a00f9a27afbf5f75dab7db2368b9b9b6fcb395e1
/*
*
* Created on: Mar 1, 2021
* Author: astrisw
*/
#pragma once
#include <Base_Provider.h>
......@@ -16,53 +10,74 @@
#include <mutex>
namespace inaf::oasbo::Providers {
/**
* @class KafkaAvroProvider
* @brief The KafkaAvroProvider class is a derived class of BaseProvider and provides functionality for sending
* BasePackets to a Kafka topic in Avro format.
* Check Base_Provider for other information.
*/
class KafkaAvroProvider: public BaseProvider {
protected:
//Kafka
RdKafka::Producer *producer = nullptr;
std::atomic<bool> stopFlag = false;
std::thread pollThread;
bool pollThreadFlag = false;
std::mutex closeMutex;
RdKafka::Producer *producer = nullptr; /**< Pointer to the Kafka producer object. */
std::atomic<bool> stopFlag = false; /**< Atomic flag to indicate if the provider should stop. */
std::thread pollThread; /**< Thread for polling Kafka events. */
bool pollThreadFlag = false; /**< Flag to indicate if the poll thread is running. */
std::mutex closeMutex; /**< Mutex for thread-safe closing of the provider. */
class KafkaDeliveryReportCb;
KafkaDeliveryReportCb *dr_cb = nullptr;
class KafkaConnectionCallback;
KafkaConnectionCallback *connection_cb = nullptr;
class KafkaDeliveryReportCb; /**< Forward declaration of the KafkaDeliveryReportCb class. */
KafkaDeliveryReportCb *dr_cb = nullptr; /**< Pointer to the Kafka delivery report callback object. */
class KafkaConnectionCallback; /**< Forward declaration of the KafkaConnectionCallback class. */
KafkaConnectionCallback *connection_cb = nullptr; /**< Pointer to the Kafka connection callback object. */
// AVRO
avro::EncoderPtr avroencoder;
avro::EncoderPtr avroencoder; /**< Pointer to the AVRO encoder object. */
KafkaAvroProvider();
KafkaAvroProvider(std::string ip, int port, std::string topic);
KafkaAvroProvider(); /**< Default constructor. */
KafkaAvroProvider(std::string ip, int port, std::string topic); /**< Constructor with parameters. */
/**
* @brief Function for the polling thread.
*/
void pollingThreadFunction();
void encodeToAvro(PacketLib::BasePacket&);
/**
* @brief Function for encoding a BasePacket to AVRO format.
* @param packet The BasePacket to encode.
*/
void encodeToAvro(Packets::BasePacket&);
public:
std::string brokerIp;
int brokerPort;
std::string brokerIp; /**< The IP address of the Kafka broker. */
int brokerPort; /**< The port number of the Kafka broker. */
void setDest(std::string dest) override;
void setDest(std::string dest) override {
this->dest = dest;
}
std::string getDest() override {
return dest;
}
std::string getDest() override;
int write(PacketLib::BasePacket&) override;
int write(PacketLib::BasePacket&, std::string dest) override;
int write(Packets::BasePacket&) override;
int write(Packets::BasePacket&, std::string dest) override;
int close() override;
int open() override;
bool isOpen() override;
~KafkaAvroProvider();
~KafkaAvroProvider(); /**< Destructor. */
friend class KafkaAvroProviderBuilder;
};
/**
* @class KafkaAvroProviderBuilder
* @brief The KafkaAvroProviderBuilder class is used to build KafkaAvroProvider objects with configurable parameters.
*/
class KafkaAvroProviderBuilder {
protected:
KafkaAvroProvider *provider;
......@@ -77,16 +92,44 @@ public:
KafkaAvroProviderBuilder(std::string ip, int port, std::string topic);
~KafkaAvroProviderBuilder();
/**
* @brief Resets the builder to its initial state.
*/
void reset();
/**
* @brief Configures the builder using a BaseConfigurator object.
* @param conf The BaseConfigurator object to configure from.
* @return A pointer to the KafkaAvroProviderBuilder.
*/
KafkaAvroProviderBuilder* configFrom(Configurators::BaseConfigurator &conf);
/**
* @brief Sets the IP address for the KafkaAvroProvider.
* @param ip The IP address to set.
* @return A pointer to the KafkaAvroProviderBuilder.
*/
KafkaAvroProviderBuilder* setIp(std::string ip);
/**
* @brief Sets the port number for the KafkaAvroProvider.
* @param port The port number to set.
* @return A pointer to the KafkaAvroProviderBuilder.
*/
KafkaAvroProviderBuilder* setPort(int port);
/**
* @brief Sets the topic for the KafkaAvroProvider.
* @param topic The topic to set.
* @return A pointer to the KafkaAvroProviderBuilder.
*/
KafkaAvroProviderBuilder* setTopic(std::string topic);
/**
* @brief Gets the built KafkaAvroProvider object.
* @return A pointer to the built KafkaAvroProvider.
*/
KafkaAvroProvider* getProvider();
};
}
} // namespace inaf::oasbo::Providers
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
#pragma once
#include <sstream>
......@@ -33,309 +32,286 @@ static const int TRIGGER_COUNT_SIZE = 64;
static const int NUM_PDM = 37;
struct PDMBlock {
bool pdmVal;
int32_t spare_1;
int32_t pdmID;
std::vector<int32_t > triggerCounts;
int32_t sipmTemp1;
int32_t sipmTemp2;
int32_t sipmTemp3;
int32_t sipmHighVoltage;
int32_t sipmCurrent;
PDMBlock() :
pdmVal(bool()),
spare_1(int32_t()),
pdmID(int32_t()),
triggerCounts(std::vector<int32_t >()),
sipmTemp1(int32_t()),
sipmTemp2(int32_t()),
sipmTemp3(int32_t()),
sipmHighVoltage(int32_t()),
sipmCurrent(int32_t())
{ }
bool pdmVal;
int32_t spare_1;
int32_t pdmID;
std::vector<int32_t> triggerCounts;
int32_t sipmTemp1;
int32_t sipmTemp2;
int32_t sipmTemp3;
int32_t sipmHighVoltage;
int32_t sipmCurrent;
PDMBlock() :
pdmVal(bool()), spare_1(int32_t()), pdmID(int32_t()), triggerCounts(
std::vector<int32_t>()), sipmTemp1(int32_t()), sipmTemp2(
int32_t()), sipmTemp3(int32_t()), sipmHighVoltage(
int32_t()), sipmCurrent(int32_t()) {
}
};
struct C11 {
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t packetLength;
int32_t year;
int32_t month;
int32_t day;
int32_t hours;
int32_t minutes;
int32_t seconds;
bool validTime;
int32_t timeTagNanosec;
int32_t eventCounter;
bool lid;
bool fibSt;
bool fibCont;
bool fibPuls;
int32_t rgbCont;
int32_t rgbPuls;
int32_t spare_0;
int32_t pixelTriggerDiscriminatorThreshold;
bool ptm;
int32_t triggeredPixel;
int32_t timeWindow;
int32_t discriminatorChain;
std::vector<PDMBlock > PDMs;
C11() :
telescopeID(int32_t()),
type(int32_t()),
subType(int32_t()),
ssc(int32_t()),
packetLength(int32_t()),
year(int32_t()),
month(int32_t()),
day(int32_t()),
hours(int32_t()),
minutes(int32_t()),
seconds(int32_t()),
validTime(bool()),
timeTagNanosec(int32_t()),
eventCounter(int32_t()),
lid(bool()),
fibSt(bool()),
fibCont(bool()),
fibPuls(bool()),
rgbCont(int32_t()),
rgbPuls(int32_t()),
spare_0(int32_t()),
pixelTriggerDiscriminatorThreshold(int32_t()),
ptm(bool()),
triggeredPixel(int32_t()),
timeWindow(int32_t()),
discriminatorChain(int32_t()),
PDMs(std::vector<PDMBlock >())
{ }
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t packetLength;
int32_t year;
int32_t month;
int32_t day;
int32_t hours;
int32_t minutes;
int32_t seconds;
bool validTime;
int32_t timeTagNanosec;
int32_t eventCounter;
bool lid;
bool fibSt;
bool fibCont;
bool fibPuls;
int32_t rgbCont;
int32_t rgbPuls;
int32_t spare_0;
int32_t pixelTriggerDiscriminatorThreshold;
bool ptm;
int32_t triggeredPixel;
int32_t timeWindow;
int32_t discriminatorChain;
std::vector<PDMBlock> PDMs;
C11() :
telescopeID(int32_t()), type(int32_t()), subType(int32_t()), ssc(
int32_t()), packetLength(int32_t()), year(int32_t()), month(
int32_t()), day(int32_t()), hours(int32_t()), minutes(
int32_t()), seconds(int32_t()), validTime(bool()), timeTagNanosec(
int32_t()), eventCounter(int32_t()), lid(bool()), fibSt(
bool()), fibCont(bool()), fibPuls(bool()), rgbCont(
int32_t()), rgbPuls(int32_t()), spare_0(int32_t()), pixelTriggerDiscriminatorThreshold(
int32_t()), ptm(bool()), triggeredPixel(int32_t()), timeWindow(
int32_t()), discriminatorChain(int32_t()), PDMs(
std::vector<PDMBlock>()) {
}
};
}
namespace avro {
template<> struct codec_traits<C11::PDMBlock> {
static void encode(Encoder& e, const C11::PDMBlock& v) {
avro::encode(e, v.pdmVal);
avro::encode(e, v.spare_1);
avro::encode(e, v.pdmID);
avro::encode(e, v.triggerCounts);
avro::encode(e, v.sipmTemp1);
avro::encode(e, v.sipmTemp2);
avro::encode(e, v.sipmTemp3);
avro::encode(e, v.sipmHighVoltage);
avro::encode(e, v.sipmCurrent);
}
static void decode(Decoder& d, C11::PDMBlock& v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder *>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.pdmVal);
break;
case 1:
avro::decode(d, v.spare_1);
break;
case 2:
avro::decode(d, v.pdmID);
break;
case 3:
avro::decode(d, v.triggerCounts);
break;
case 4:
avro::decode(d, v.sipmTemp1);
break;
case 5:
avro::decode(d, v.sipmTemp2);
break;
case 6:
avro::decode(d, v.sipmTemp3);
break;
case 7:
avro::decode(d, v.sipmHighVoltage);
break;
case 8:
avro::decode(d, v.sipmCurrent);
break;
default:
break;
}
}
} else {
avro::decode(d, v.pdmVal);
avro::decode(d, v.spare_1);
avro::decode(d, v.pdmID);
avro::decode(d, v.triggerCounts);
avro::decode(d, v.sipmTemp1);
avro::decode(d, v.sipmTemp2);
avro::decode(d, v.sipmTemp3);
avro::decode(d, v.sipmHighVoltage);
avro::decode(d, v.sipmCurrent);
}
}
static void encode(Encoder &e, const C11::PDMBlock &v) {
avro::encode(e, v.pdmVal);
avro::encode(e, v.spare_1);
avro::encode(e, v.pdmID);
avro::encode(e, v.triggerCounts);
avro::encode(e, v.sipmTemp1);
avro::encode(e, v.sipmTemp2);
avro::encode(e, v.sipmTemp3);
avro::encode(e, v.sipmHighVoltage);
avro::encode(e, v.sipmCurrent);
}
static void decode(Decoder &d, C11::PDMBlock &v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder*>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.pdmVal);
break;
case 1:
avro::decode(d, v.spare_1);
break;
case 2:
avro::decode(d, v.pdmID);
break;
case 3:
avro::decode(d, v.triggerCounts);
break;
case 4:
avro::decode(d, v.sipmTemp1);
break;
case 5:
avro::decode(d, v.sipmTemp2);
break;
case 6:
avro::decode(d, v.sipmTemp3);
break;
case 7:
avro::decode(d, v.sipmHighVoltage);
break;
case 8:
avro::decode(d, v.sipmCurrent);
break;
default:
break;
}
}
} else {
avro::decode(d, v.pdmVal);
avro::decode(d, v.spare_1);
avro::decode(d, v.pdmID);
avro::decode(d, v.triggerCounts);
avro::decode(d, v.sipmTemp1);
avro::decode(d, v.sipmTemp2);
avro::decode(d, v.sipmTemp3);
avro::decode(d, v.sipmHighVoltage);
avro::decode(d, v.sipmCurrent);
}
}
};
template<> struct codec_traits<C11::C11> {
static void encode(Encoder& e, const C11::C11& v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.packetLength);
avro::encode(e, v.year);
avro::encode(e, v.month);
avro::encode(e, v.day);
avro::encode(e, v.hours);
avro::encode(e, v.minutes);
avro::encode(e, v.seconds);
avro::encode(e, v.validTime);
avro::encode(e, v.timeTagNanosec);
avro::encode(e, v.eventCounter);
avro::encode(e, v.lid);
avro::encode(e, v.fibSt);
avro::encode(e, v.fibCont);
avro::encode(e, v.fibPuls);
avro::encode(e, v.rgbCont);
avro::encode(e, v.rgbPuls);
avro::encode(e, v.spare_0);
avro::encode(e, v.pixelTriggerDiscriminatorThreshold);
avro::encode(e, v.ptm);
avro::encode(e, v.triggeredPixel);
avro::encode(e, v.timeWindow);
avro::encode(e, v.discriminatorChain);
avro::encode(e, v.PDMs);
}
static void decode(Decoder& d, C11::C11& v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder *>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.packetLength);
break;
case 5:
avro::decode(d, v.year);
break;
case 6:
avro::decode(d, v.month);
break;
case 7:
avro::decode(d, v.day);
break;
case 8:
avro::decode(d, v.hours);
break;
case 9:
avro::decode(d, v.minutes);
break;
case 10:
avro::decode(d, v.seconds);
break;
case 11:
avro::decode(d, v.validTime);
break;
case 12:
avro::decode(d, v.timeTagNanosec);
break;
case 13:
avro::decode(d, v.eventCounter);
break;
case 14:
avro::decode(d, v.lid);
break;
case 15:
avro::decode(d, v.fibSt);
break;
case 16:
avro::decode(d, v.fibCont);
break;
case 17:
avro::decode(d, v.fibPuls);
break;
case 18:
avro::decode(d, v.rgbCont);
break;
case 19:
avro::decode(d, v.rgbPuls);
break;
case 20:
avro::decode(d, v.spare_0);
break;
case 21:
avro::decode(d, v.pixelTriggerDiscriminatorThreshold);
break;
case 22:
avro::decode(d, v.ptm);
break;
case 23:
avro::decode(d, v.triggeredPixel);
break;
case 24:
avro::decode(d, v.timeWindow);
break;
case 25:
avro::decode(d, v.discriminatorChain);
break;
case 26:
avro::decode(d, v.PDMs);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.packetLength);
avro::decode(d, v.year);
avro::decode(d, v.month);
avro::decode(d, v.day);
avro::decode(d, v.hours);
avro::decode(d, v.minutes);
avro::decode(d, v.seconds);
avro::decode(d, v.validTime);
avro::decode(d, v.timeTagNanosec);
avro::decode(d, v.eventCounter);
avro::decode(d, v.lid);
avro::decode(d, v.fibSt);
avro::decode(d, v.fibCont);
avro::decode(d, v.fibPuls);
avro::decode(d, v.rgbCont);
avro::decode(d, v.rgbPuls);
avro::decode(d, v.spare_0);
avro::decode(d, v.pixelTriggerDiscriminatorThreshold);
avro::decode(d, v.ptm);
avro::decode(d, v.triggeredPixel);
avro::decode(d, v.timeWindow);
avro::decode(d, v.discriminatorChain);
avro::decode(d, v.PDMs);
}
}
static void encode(Encoder &e, const C11::C11 &v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.packetLength);
avro::encode(e, v.year);
avro::encode(e, v.month);
avro::encode(e, v.day);
avro::encode(e, v.hours);
avro::encode(e, v.minutes);
avro::encode(e, v.seconds);
avro::encode(e, v.validTime);
avro::encode(e, v.timeTagNanosec);
avro::encode(e, v.eventCounter);
avro::encode(e, v.lid);
avro::encode(e, v.fibSt);
avro::encode(e, v.fibCont);
avro::encode(e, v.fibPuls);
avro::encode(e, v.rgbCont);
avro::encode(e, v.rgbPuls);
avro::encode(e, v.spare_0);
avro::encode(e, v.pixelTriggerDiscriminatorThreshold);
avro::encode(e, v.ptm);
avro::encode(e, v.triggeredPixel);
avro::encode(e, v.timeWindow);
avro::encode(e, v.discriminatorChain);
avro::encode(e, v.PDMs);
}
static void decode(Decoder &d, C11::C11 &v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder*>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.packetLength);
break;
case 5:
avro::decode(d, v.year);
break;
case 6:
avro::decode(d, v.month);
break;
case 7:
avro::decode(d, v.day);
break;
case 8:
avro::decode(d, v.hours);
break;
case 9:
avro::decode(d, v.minutes);
break;
case 10:
avro::decode(d, v.seconds);
break;
case 11:
avro::decode(d, v.validTime);
break;
case 12:
avro::decode(d, v.timeTagNanosec);
break;
case 13:
avro::decode(d, v.eventCounter);
break;
case 14:
avro::decode(d, v.lid);
break;
case 15:
avro::decode(d, v.fibSt);
break;
case 16:
avro::decode(d, v.fibCont);
break;
case 17:
avro::decode(d, v.fibPuls);
break;
case 18:
avro::decode(d, v.rgbCont);
break;
case 19:
avro::decode(d, v.rgbPuls);
break;
case 20:
avro::decode(d, v.spare_0);
break;
case 21:
avro::decode(d, v.pixelTriggerDiscriminatorThreshold);
break;
case 22:
avro::decode(d, v.ptm);
break;
case 23:
avro::decode(d, v.triggeredPixel);
break;
case 24:
avro::decode(d, v.timeWindow);
break;
case 25:
avro::decode(d, v.discriminatorChain);
break;
case 26:
avro::decode(d, v.PDMs);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.packetLength);
avro::decode(d, v.year);
avro::decode(d, v.month);
avro::decode(d, v.day);
avro::decode(d, v.hours);
avro::decode(d, v.minutes);
avro::decode(d, v.seconds);
avro::decode(d, v.validTime);
avro::decode(d, v.timeTagNanosec);
avro::decode(d, v.eventCounter);
avro::decode(d, v.lid);
avro::decode(d, v.fibSt);
avro::decode(d, v.fibCont);
avro::decode(d, v.fibPuls);
avro::decode(d, v.rgbCont);
avro::decode(d, v.rgbPuls);
avro::decode(d, v.spare_0);
avro::decode(d, v.pixelTriggerDiscriminatorThreshold);
avro::decode(d, v.ptm);
avro::decode(d, v.triggeredPixel);
avro::decode(d, v.timeWindow);
avro::decode(d, v.discriminatorChain);
avro::decode(d, v.PDMs);
}
}
};
void encodeC11(Encoder &avroencoder,
inaf::oasbo::PacketLib::BasePacket &packet) {
void encodeC11(Encoder &avroencoder, inaf::oasbo::Packets::BasePacket &packet) {
// PACKET HEADER
int offset;
for (offset = 0; offset < 11; offset++)
......@@ -371,9 +347,9 @@ void encodeC11(Encoder &avroencoder,
avroencoder.arrayStart();
avroencoder.setItemCount(C11::TRIGGER_COUNT_SIZE);
for(int i = 0; i < C11::TRIGGER_COUNT_SIZE; i++){
for (int i = 0; i < C11::TRIGGER_COUNT_SIZE; i++) {
avroencoder.startItem();
avro::encode(avroencoder, (int32_t) packet[offset+i].value());
avro::encode(avroencoder, (int32_t) packet[offset + i].value());
}
offset += C11::TRIGGER_COUNT_SIZE;
avroencoder.arrayEnd();
......
This diff is collapsed.
......@@ -27,81 +27,77 @@
#include <Base_Packet.h>
namespace CMD151 {
struct CMD151 {
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t packetLength;
int32_t spare;
int32_t sync;
CMD151() :
telescopeID(int32_t()),
type(int32_t()),
subType(int32_t()),
ssc(int32_t()),
packetLength(int32_t()),
spare(int32_t()),
sync(int32_t())
{ }
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t packetLength;
int32_t spare;
int32_t sync;
CMD151() :
telescopeID(int32_t()), type(int32_t()), subType(int32_t()), ssc(
int32_t()), packetLength(int32_t()), spare(int32_t()), sync(
int32_t()) {
}
};
}
namespace avro {
template<> struct codec_traits<CMD151::CMD151> {
static void encode(Encoder& e, const CMD151::CMD151& v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.packetLength);
avro::encode(e, v.spare);
avro::encode(e, v.sync);
}
static void decode(Decoder& d, CMD151::CMD151& v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder *>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.packetLength);
break;
case 5:
avro::decode(d, v.spare);
break;
case 6:
avro::decode(d, v.sync);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.packetLength);
avro::decode(d, v.spare);
avro::decode(d, v.sync);
}
}
static void encode(Encoder &e, const CMD151::CMD151 &v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.packetLength);
avro::encode(e, v.spare);
avro::encode(e, v.sync);
}
static void decode(Decoder &d, CMD151::CMD151 &v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder*>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.packetLength);
break;
case 5:
avro::decode(d, v.spare);
break;
case 6:
avro::decode(d, v.sync);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.packetLength);
avro::decode(d, v.spare);
avro::decode(d, v.sync);
}
}
};
void encodeCMD151(Encoder &avroencoder,
inaf::oasbo::PacketLib::BasePacket &packet) {
inaf::oasbo::Packets::BasePacket &packet) {
// PACKET HEADER
int offset;
for (offset = 0; offset < 7; offset++)
......
This diff is collapsed.
......@@ -34,7 +34,6 @@ static const int LOW_GAINS_SIZE = 64;
static const int TIME_TRIGGERS_SIZE = 64;
static const int NUM_PDM = 37;
struct PDMBlock {
bool pdmVal;
bool trgEnb;
......@@ -360,8 +359,7 @@ template<> struct codec_traits<S22::S22> {
}
};
void encodeS22(Encoder &avroencoder,
inaf::oasbo::PacketLib::BasePacket &packet) {
void encodeS22(Encoder &avroencoder, inaf::oasbo::Packets::BasePacket &packet) {
// PACKET HEADER
int offset;
for (offset = 0; offset < 11; offset++)
......@@ -395,25 +393,25 @@ void encodeS22(Encoder &avroencoder,
avroencoder.arrayStart();
avroencoder.setItemCount(S22::HIGH_GAINS_SIZE);
for(int i = 0; i < S22::HIGH_GAINS_SIZE; i++){
for (int i = 0; i < S22::HIGH_GAINS_SIZE; i++) {
avroencoder.startItem();
avro::encode(avroencoder, (int32_t) packet[offset+i].value());
avro::encode(avroencoder, (int32_t) packet[offset + i].value());
}
offset += S22::HIGH_GAINS_SIZE;
avroencoder.arrayEnd();
avroencoder.arrayStart();
avroencoder.setItemCount(S22::LOW_GAINS_SIZE);
for(int i = 0; i < S22::LOW_GAINS_SIZE; i++){
for (int i = 0; i < S22::LOW_GAINS_SIZE; i++) {
avroencoder.startItem();
avro::encode(avroencoder, (int32_t) packet[offset+i].value());
avro::encode(avroencoder, (int32_t) packet[offset + i].value());
}
offset += S22::LOW_GAINS_SIZE;
avroencoder.arrayEnd();
avroencoder.arrayStart();
avroencoder.setItemCount(S22::TIME_TRIGGERS_SIZE);
for(int i = 0; i < S22::TIME_TRIGGERS_SIZE; i++){
for (int i = 0; i < S22::TIME_TRIGGERS_SIZE; i++) {
avroencoder.startItem();
avro::encode(avroencoder, (int32_t) packet[offset+i].value());
avro::encode(avroencoder, (int32_t) packet[offset + i].value());
}
offset += S22::TIME_TRIGGERS_SIZE;
avroencoder.arrayEnd();
......
......@@ -31,238 +31,222 @@ static const int VAR_PIXELS_SIZE = 64;
static const int NUM_PDM = 37;
struct PDMBlock {
bool pdmVal;
int32_t samplingPar;
int32_t pdmID;
std::vector<int32_t > var_pixels;
PDMBlock() :
pdmVal(bool()),
samplingPar(int32_t()),
pdmID(int32_t()),
var_pixels(std::vector<int32_t >())
{ }
bool pdmVal;
int32_t samplingPar;
int32_t pdmID;
std::vector<int32_t> var_pixels;
PDMBlock() :
pdmVal(bool()), samplingPar(int32_t()), pdmID(int32_t()), var_pixels(
std::vector<int32_t>()) {
}
};
struct VAR102 {
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t packetLength;
int32_t year;
int32_t month;
int32_t day;
int32_t hours;
int32_t minutes;
int32_t seconds;
bool validTime;
int32_t timeTagNanosec;
int32_t eventCounter;
bool lid;
bool fibSt;
bool fibCont;
bool fibPuls;
int32_t rgbCont;
int32_t rgbPuls;
int32_t spare_0;
std::vector<PDMBlock > PDMs;
VAR102() :
telescopeID(int32_t()),
type(int32_t()),
subType(int32_t()),
ssc(int32_t()),
packetLength(int32_t()),
year(int32_t()),
month(int32_t()),
day(int32_t()),
hours(int32_t()),
minutes(int32_t()),
seconds(int32_t()),
validTime(bool()),
timeTagNanosec(int32_t()),
eventCounter(int32_t()),
lid(bool()),
fibSt(bool()),
fibCont(bool()),
fibPuls(bool()),
rgbCont(int32_t()),
rgbPuls(int32_t()),
spare_0(int32_t()),
PDMs(std::vector<PDMBlock >())
{ }
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t packetLength;
int32_t year;
int32_t month;
int32_t day;
int32_t hours;
int32_t minutes;
int32_t seconds;
bool validTime;
int32_t timeTagNanosec;
int32_t eventCounter;
bool lid;
bool fibSt;
bool fibCont;
bool fibPuls;
int32_t rgbCont;
int32_t rgbPuls;
int32_t spare_0;
std::vector<PDMBlock> PDMs;
VAR102() :
telescopeID(int32_t()), type(int32_t()), subType(int32_t()), ssc(
int32_t()), packetLength(int32_t()), year(int32_t()), month(
int32_t()), day(int32_t()), hours(int32_t()), minutes(
int32_t()), seconds(int32_t()), validTime(bool()), timeTagNanosec(
int32_t()), eventCounter(int32_t()), lid(bool()), fibSt(
bool()), fibCont(bool()), fibPuls(bool()), rgbCont(
int32_t()), rgbPuls(int32_t()), spare_0(int32_t()), PDMs(
std::vector<PDMBlock>()) {
}
};
}
namespace avro {
template<> struct codec_traits<VAR102::PDMBlock> {
static void encode(Encoder& e, const VAR102::PDMBlock& v) {
avro::encode(e, v.pdmVal);
avro::encode(e, v.samplingPar);
avro::encode(e, v.pdmID);
avro::encode(e, v.var_pixels);
}
static void decode(Decoder& d, VAR102::PDMBlock& v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder *>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.pdmVal);
break;
case 1:
avro::decode(d, v.samplingPar);
break;
case 2:
avro::decode(d, v.pdmID);
break;
case 3:
avro::decode(d, v.var_pixels);
break;
default:
break;
}
}
} else {
avro::decode(d, v.pdmVal);
avro::decode(d, v.samplingPar);
avro::decode(d, v.pdmID);
avro::decode(d, v.var_pixels);
}
}
static void encode(Encoder &e, const VAR102::PDMBlock &v) {
avro::encode(e, v.pdmVal);
avro::encode(e, v.samplingPar);
avro::encode(e, v.pdmID);
avro::encode(e, v.var_pixels);
}
static void decode(Decoder &d, VAR102::PDMBlock &v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder*>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.pdmVal);
break;
case 1:
avro::decode(d, v.samplingPar);
break;
case 2:
avro::decode(d, v.pdmID);
break;
case 3:
avro::decode(d, v.var_pixels);
break;
default:
break;
}
}
} else {
avro::decode(d, v.pdmVal);
avro::decode(d, v.samplingPar);
avro::decode(d, v.pdmID);
avro::decode(d, v.var_pixels);
}
}
};
template<> struct codec_traits<VAR102::VAR102> {
static void encode(Encoder& e, const VAR102::VAR102& v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.packetLength);
avro::encode(e, v.year);
avro::encode(e, v.month);
avro::encode(e, v.day);
avro::encode(e, v.hours);
avro::encode(e, v.minutes);
avro::encode(e, v.seconds);
avro::encode(e, v.validTime);
avro::encode(e, v.timeTagNanosec);
avro::encode(e, v.eventCounter);
avro::encode(e, v.lid);
avro::encode(e, v.fibSt);
avro::encode(e, v.fibCont);
avro::encode(e, v.fibPuls);
avro::encode(e, v.rgbCont);
avro::encode(e, v.rgbPuls);
avro::encode(e, v.spare_0);
avro::encode(e, v.PDMs);
}
static void decode(Decoder& d, VAR102::VAR102& v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder *>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.packetLength);
break;
case 5:
avro::decode(d, v.year);
break;
case 6:
avro::decode(d, v.month);
break;
case 7:
avro::decode(d, v.day);
break;
case 8:
avro::decode(d, v.hours);
break;
case 9:
avro::decode(d, v.minutes);
break;
case 10:
avro::decode(d, v.seconds);
break;
case 11:
avro::decode(d, v.validTime);
break;
case 12:
avro::decode(d, v.timeTagNanosec);
break;
case 13:
avro::decode(d, v.eventCounter);
break;
case 14:
avro::decode(d, v.lid);
break;
case 15:
avro::decode(d, v.fibSt);
break;
case 16:
avro::decode(d, v.fibCont);
break;
case 17:
avro::decode(d, v.fibPuls);
break;
case 18:
avro::decode(d, v.rgbCont);
break;
case 19:
avro::decode(d, v.rgbPuls);
break;
case 20:
avro::decode(d, v.spare_0);
break;
case 21:
avro::decode(d, v.PDMs);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.packetLength);
avro::decode(d, v.year);
avro::decode(d, v.month);
avro::decode(d, v.day);
avro::decode(d, v.hours);
avro::decode(d, v.minutes);
avro::decode(d, v.seconds);
avro::decode(d, v.validTime);
avro::decode(d, v.timeTagNanosec);
avro::decode(d, v.eventCounter);
avro::decode(d, v.lid);
avro::decode(d, v.fibSt);
avro::decode(d, v.fibCont);
avro::decode(d, v.fibPuls);
avro::decode(d, v.rgbCont);
avro::decode(d, v.rgbPuls);
avro::decode(d, v.spare_0);
avro::decode(d, v.PDMs);
}
}
static void encode(Encoder &e, const VAR102::VAR102 &v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.packetLength);
avro::encode(e, v.year);
avro::encode(e, v.month);
avro::encode(e, v.day);
avro::encode(e, v.hours);
avro::encode(e, v.minutes);
avro::encode(e, v.seconds);
avro::encode(e, v.validTime);
avro::encode(e, v.timeTagNanosec);
avro::encode(e, v.eventCounter);
avro::encode(e, v.lid);
avro::encode(e, v.fibSt);
avro::encode(e, v.fibCont);
avro::encode(e, v.fibPuls);
avro::encode(e, v.rgbCont);
avro::encode(e, v.rgbPuls);
avro::encode(e, v.spare_0);
avro::encode(e, v.PDMs);
}
static void decode(Decoder &d, VAR102::VAR102 &v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder*>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.packetLength);
break;
case 5:
avro::decode(d, v.year);
break;
case 6:
avro::decode(d, v.month);
break;
case 7:
avro::decode(d, v.day);
break;
case 8:
avro::decode(d, v.hours);
break;
case 9:
avro::decode(d, v.minutes);
break;
case 10:
avro::decode(d, v.seconds);
break;
case 11:
avro::decode(d, v.validTime);
break;
case 12:
avro::decode(d, v.timeTagNanosec);
break;
case 13:
avro::decode(d, v.eventCounter);
break;
case 14:
avro::decode(d, v.lid);
break;
case 15:
avro::decode(d, v.fibSt);
break;
case 16:
avro::decode(d, v.fibCont);
break;
case 17:
avro::decode(d, v.fibPuls);
break;
case 18:
avro::decode(d, v.rgbCont);
break;
case 19:
avro::decode(d, v.rgbPuls);
break;
case 20:
avro::decode(d, v.spare_0);
break;
case 21:
avro::decode(d, v.PDMs);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.packetLength);
avro::decode(d, v.year);
avro::decode(d, v.month);
avro::decode(d, v.day);
avro::decode(d, v.hours);
avro::decode(d, v.minutes);
avro::decode(d, v.seconds);
avro::decode(d, v.validTime);
avro::decode(d, v.timeTagNanosec);
avro::decode(d, v.eventCounter);
avro::decode(d, v.lid);
avro::decode(d, v.fibSt);
avro::decode(d, v.fibCont);
avro::decode(d, v.fibPuls);
avro::decode(d, v.rgbCont);
avro::decode(d, v.rgbPuls);
avro::decode(d, v.spare_0);
avro::decode(d, v.PDMs);
}
}
};
void encodeVAR102(Encoder &avroencoder,
inaf::oasbo::PacketLib::BasePacket &packet) {
inaf::oasbo::Packets::BasePacket &packet) {
// PACKET HEADER
int offset;
for (offset = 0; offset < 11; offset++)
......@@ -293,9 +277,9 @@ void encodeVAR102(Encoder &avroencoder,
avroencoder.arrayStart();
avroencoder.setItemCount(VAR102::VAR_PIXELS_SIZE);
for(int i = 0; i < VAR102::VAR_PIXELS_SIZE; i++){
for (int i = 0; i < VAR102::VAR_PIXELS_SIZE; i++) {
avroencoder.startItem();
avro::encode(avroencoder, (int32_t) packet[offset+i].value());
avro::encode(avroencoder, (int32_t) packet[offset + i].value());
}
offset += VAR102::VAR_PIXELS_SIZE;
avroencoder.arrayEnd();
......
......@@ -32,238 +32,222 @@ static const int VAR_PIXELS_SIZE = 64;
static const int NUM_PDM = 37;
struct PDMBlock {
bool pdmVal;
int32_t samplingPar;
int32_t pdmID;
std::vector<int32_t > var_pixels;
PDMBlock() :
pdmVal(bool()),
samplingPar(int32_t()),
pdmID(int32_t()),
var_pixels(std::vector<int32_t >())
{ }
bool pdmVal;
int32_t samplingPar;
int32_t pdmID;
std::vector<int32_t> var_pixels;
PDMBlock() :
pdmVal(bool()), samplingPar(int32_t()), pdmID(int32_t()), var_pixels(
std::vector<int32_t>()) {
}
};
struct VAR103 {
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t packetLength;
int32_t year;
int32_t month;
int32_t day;
int32_t hours;
int32_t minutes;
int32_t seconds;
bool validTime;
int32_t timeTagNanosec;
int32_t eventCounter;
bool lid;
bool fibSt;
bool fibCont;
bool fibPuls;
int32_t rgbCont;
int32_t rgbPuls;
int32_t spare_0;
std::vector<PDMBlock > PDMs;
VAR103() :
telescopeID(int32_t()),
type(int32_t()),
subType(int32_t()),
ssc(int32_t()),
packetLength(int32_t()),
year(int32_t()),
month(int32_t()),
day(int32_t()),
hours(int32_t()),
minutes(int32_t()),
seconds(int32_t()),
validTime(bool()),
timeTagNanosec(int32_t()),
eventCounter(int32_t()),
lid(bool()),
fibSt(bool()),
fibCont(bool()),
fibPuls(bool()),
rgbCont(int32_t()),
rgbPuls(int32_t()),
spare_0(int32_t()),
PDMs(std::vector<PDMBlock >())
{ }
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t packetLength;
int32_t year;
int32_t month;
int32_t day;
int32_t hours;
int32_t minutes;
int32_t seconds;
bool validTime;
int32_t timeTagNanosec;
int32_t eventCounter;
bool lid;
bool fibSt;
bool fibCont;
bool fibPuls;
int32_t rgbCont;
int32_t rgbPuls;
int32_t spare_0;
std::vector<PDMBlock> PDMs;
VAR103() :
telescopeID(int32_t()), type(int32_t()), subType(int32_t()), ssc(
int32_t()), packetLength(int32_t()), year(int32_t()), month(
int32_t()), day(int32_t()), hours(int32_t()), minutes(
int32_t()), seconds(int32_t()), validTime(bool()), timeTagNanosec(
int32_t()), eventCounter(int32_t()), lid(bool()), fibSt(
bool()), fibCont(bool()), fibPuls(bool()), rgbCont(
int32_t()), rgbPuls(int32_t()), spare_0(int32_t()), PDMs(
std::vector<PDMBlock>()) {
}
};
}
namespace avro {
template<> struct codec_traits<VAR103::PDMBlock> {
static void encode(Encoder& e, const VAR103::PDMBlock& v) {
avro::encode(e, v.pdmVal);
avro::encode(e, v.samplingPar);
avro::encode(e, v.pdmID);
avro::encode(e, v.var_pixels);
}
static void decode(Decoder& d, VAR103::PDMBlock& v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder *>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.pdmVal);
break;
case 1:
avro::decode(d, v.samplingPar);
break;
case 2:
avro::decode(d, v.pdmID);
break;
case 3:
avro::decode(d, v.var_pixels);
break;
default:
break;
}
}
} else {
avro::decode(d, v.pdmVal);
avro::decode(d, v.samplingPar);
avro::decode(d, v.pdmID);
avro::decode(d, v.var_pixels);
}
}
static void encode(Encoder &e, const VAR103::PDMBlock &v) {
avro::encode(e, v.pdmVal);
avro::encode(e, v.samplingPar);
avro::encode(e, v.pdmID);
avro::encode(e, v.var_pixels);
}
static void decode(Decoder &d, VAR103::PDMBlock &v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder*>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.pdmVal);
break;
case 1:
avro::decode(d, v.samplingPar);
break;
case 2:
avro::decode(d, v.pdmID);
break;
case 3:
avro::decode(d, v.var_pixels);
break;
default:
break;
}
}
} else {
avro::decode(d, v.pdmVal);
avro::decode(d, v.samplingPar);
avro::decode(d, v.pdmID);
avro::decode(d, v.var_pixels);
}
}
};
template<> struct codec_traits<VAR103::VAR103> {
static void encode(Encoder& e, const VAR103::VAR103& v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.packetLength);
avro::encode(e, v.year);
avro::encode(e, v.month);
avro::encode(e, v.day);
avro::encode(e, v.hours);
avro::encode(e, v.minutes);
avro::encode(e, v.seconds);
avro::encode(e, v.validTime);
avro::encode(e, v.timeTagNanosec);
avro::encode(e, v.eventCounter);
avro::encode(e, v.lid);
avro::encode(e, v.fibSt);
avro::encode(e, v.fibCont);
avro::encode(e, v.fibPuls);
avro::encode(e, v.rgbCont);
avro::encode(e, v.rgbPuls);
avro::encode(e, v.spare_0);
avro::encode(e, v.PDMs);
}
static void decode(Decoder& d, VAR103::VAR103& v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder *>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.packetLength);
break;
case 5:
avro::decode(d, v.year);
break;
case 6:
avro::decode(d, v.month);
break;
case 7:
avro::decode(d, v.day);
break;
case 8:
avro::decode(d, v.hours);
break;
case 9:
avro::decode(d, v.minutes);
break;
case 10:
avro::decode(d, v.seconds);
break;
case 11:
avro::decode(d, v.validTime);
break;
case 12:
avro::decode(d, v.timeTagNanosec);
break;
case 13:
avro::decode(d, v.eventCounter);
break;
case 14:
avro::decode(d, v.lid);
break;
case 15:
avro::decode(d, v.fibSt);
break;
case 16:
avro::decode(d, v.fibCont);
break;
case 17:
avro::decode(d, v.fibPuls);
break;
case 18:
avro::decode(d, v.rgbCont);
break;
case 19:
avro::decode(d, v.rgbPuls);
break;
case 20:
avro::decode(d, v.spare_0);
break;
case 21:
avro::decode(d, v.PDMs);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.packetLength);
avro::decode(d, v.year);
avro::decode(d, v.month);
avro::decode(d, v.day);
avro::decode(d, v.hours);
avro::decode(d, v.minutes);
avro::decode(d, v.seconds);
avro::decode(d, v.validTime);
avro::decode(d, v.timeTagNanosec);
avro::decode(d, v.eventCounter);
avro::decode(d, v.lid);
avro::decode(d, v.fibSt);
avro::decode(d, v.fibCont);
avro::decode(d, v.fibPuls);
avro::decode(d, v.rgbCont);
avro::decode(d, v.rgbPuls);
avro::decode(d, v.spare_0);
avro::decode(d, v.PDMs);
}
}
static void encode(Encoder &e, const VAR103::VAR103 &v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.packetLength);
avro::encode(e, v.year);
avro::encode(e, v.month);
avro::encode(e, v.day);
avro::encode(e, v.hours);
avro::encode(e, v.minutes);
avro::encode(e, v.seconds);
avro::encode(e, v.validTime);
avro::encode(e, v.timeTagNanosec);
avro::encode(e, v.eventCounter);
avro::encode(e, v.lid);
avro::encode(e, v.fibSt);
avro::encode(e, v.fibCont);
avro::encode(e, v.fibPuls);
avro::encode(e, v.rgbCont);
avro::encode(e, v.rgbPuls);
avro::encode(e, v.spare_0);
avro::encode(e, v.PDMs);
}
static void decode(Decoder &d, VAR103::VAR103 &v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder*>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.packetLength);
break;
case 5:
avro::decode(d, v.year);
break;
case 6:
avro::decode(d, v.month);
break;
case 7:
avro::decode(d, v.day);
break;
case 8:
avro::decode(d, v.hours);
break;
case 9:
avro::decode(d, v.minutes);
break;
case 10:
avro::decode(d, v.seconds);
break;
case 11:
avro::decode(d, v.validTime);
break;
case 12:
avro::decode(d, v.timeTagNanosec);
break;
case 13:
avro::decode(d, v.eventCounter);
break;
case 14:
avro::decode(d, v.lid);
break;
case 15:
avro::decode(d, v.fibSt);
break;
case 16:
avro::decode(d, v.fibCont);
break;
case 17:
avro::decode(d, v.fibPuls);
break;
case 18:
avro::decode(d, v.rgbCont);
break;
case 19:
avro::decode(d, v.rgbPuls);
break;
case 20:
avro::decode(d, v.spare_0);
break;
case 21:
avro::decode(d, v.PDMs);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.packetLength);
avro::decode(d, v.year);
avro::decode(d, v.month);
avro::decode(d, v.day);
avro::decode(d, v.hours);
avro::decode(d, v.minutes);
avro::decode(d, v.seconds);
avro::decode(d, v.validTime);
avro::decode(d, v.timeTagNanosec);
avro::decode(d, v.eventCounter);
avro::decode(d, v.lid);
avro::decode(d, v.fibSt);
avro::decode(d, v.fibCont);
avro::decode(d, v.fibPuls);
avro::decode(d, v.rgbCont);
avro::decode(d, v.rgbPuls);
avro::decode(d, v.spare_0);
avro::decode(d, v.PDMs);
}
}
};
void encodeVAR103(Encoder &avroencoder,
inaf::oasbo::PacketLib::BasePacket &packet) {
inaf::oasbo::Packets::BasePacket &packet) {
// PACKET HEADER
int offset;
for (offset = 0; offset < 11; offset++)
......@@ -294,9 +278,9 @@ void encodeVAR103(Encoder &avroencoder,
avroencoder.arrayStart();
avroencoder.setItemCount(VAR103::VAR_PIXELS_SIZE);
for(int i = 0; i < VAR103::VAR_PIXELS_SIZE; i++){
for (int i = 0; i < VAR103::VAR_PIXELS_SIZE; i++) {
avroencoder.startItem();
avro::encode(avroencoder, (int32_t) packet[offset+i].value());
avro::encode(avroencoder, (int32_t) packet[offset + i].value());
}
offset += VAR103::VAR_PIXELS_SIZE;
avroencoder.arrayEnd();
......
......@@ -63,8 +63,8 @@ KafkaAvroProvider::KafkaAvroProvider(std::string ip, int port,
KafkaAvroProvider::~KafkaAvroProvider() {
this->stopFlag = true; // Stopping the polling thread
if (pollThreadFlag){
if(this->pollThread.joinable()){
if (pollThreadFlag) {
if (this->pollThread.joinable()) {
pollThread.join();
}
}
......@@ -187,15 +187,15 @@ void KafkaAvroProvider::pollingThreadFunction() {
}
now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << "polling thread exit..."
<< "]\t[KafkaAvro Provider]\t" << "polling thread stop..."
<< std::endl;
}
int KafkaAvroProvider::write(PacketLib::BasePacket &packet) {
int KafkaAvroProvider::write(Packets::BasePacket &packet) {
return KafkaAvroProvider::write(packet, this->dest);
}
int KafkaAvroProvider::write(PacketLib::BasePacket &packet, std::string dest) {
int KafkaAvroProvider::write(Packets::BasePacket &packet, std::string dest) {
if (!isOpen()) {
time_t now = time(nullptr);
std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
......@@ -247,7 +247,7 @@ int KafkaAvroProvider::write(PacketLib::BasePacket &packet, std::string dest) {
return -1;
}
void KafkaAvroProvider::encodeToAvro(PacketLib::BasePacket &packet) {
void KafkaAvroProvider::encodeToAvro(Packets::BasePacket &packet) {
auto type = packet[1].value();
auto subtype = packet[2].value();
if (type == 2 && subtype == 2) {
......@@ -287,3 +287,12 @@ void KafkaAvroProvider::encodeToAvro(PacketLib::BasePacket &packet) {
}
}
void KafkaAvroProvider::setDest(std::string dest){
this->dest = dest;
}
std::string KafkaAvroProvider::getDest(){
return this->dest;
}