Loading Makefile +1 −1 Original line number Diff line number Diff line Loading @@ -24,7 +24,7 @@ CXX_RELEASE_FLAGS=-O3 CXX_DEFAULT_FLAGS=-c -Wall -Wextra -std=c++11 -std=gnu++11 LDFLAGS=-Wall -lomniORB4 -lomniDynamic4 -lCOS4 -lomnithread -ltango -llog4tango \ -lsoci_core -lsoci_mysql -lboost_system -lboost_thread -lboost_filesystem \ -lprotobuf -lssl -lboost_date_time -lprotobuf -lssl INC_PARM=$(foreach d, $(INC_DIR), -I$d) LIB_PARM=$(foreach d, $(LIB_DIR), -L$d) PROTOC :=/usr/local/protobuf-2.5.0/bin/protoc Loading proto/Request.proto +1 −3 Original line number Diff line number Diff line Loading @@ -46,9 +46,7 @@ message Request message Metadata { required string schema = 1; required string table = 2; required sfixed64 timestamp = 3; required sfixed64 timestamp = 1; } optional Metadata metadata = 4; Loading proto/Response.proto +61 −62 Original line number Diff line number Diff line Loading @@ -49,14 +49,6 @@ message Response message Metadata { //Metadata response partial required uint64 partial = 1; //Metadata response total required uint64 total = 2; //Metadata response state enum State Loading @@ -65,12 +57,16 @@ message Response REJECTED = 1; } required State state = 3; required State state = 1; //Metadata response status required string status = 4; required string status = 2; //Block of rows with same timestamp message Row { //Mysql: FLOAT, DOUBLE, DECIMAL //SOCI: dt_double //C++: double Loading @@ -81,7 +77,7 @@ message Response required double value = 2; } repeated DtDouble double_list = 5; repeated DtDouble double_list = 1; //Mysql: TINYINT, SMALLINT, INT, BIGINT //SOCI: dt_integer Loading @@ -93,7 +89,7 @@ message Response required int32 value = 2; } repeated DtInteger integer_list = 6; repeated DtInteger integer_list = 2; //Mysql: not used with mysql, needed for postgres //SOCI: dt_unsigned_long Loading @@ -105,7 +101,7 @@ message Response required uint64 value = 2; } repeated DtUnsignedLong unsinged_long_list = 7; repeated DtUnsignedLong unsinged_long_list = 3; //Mysql: not used with mysql, needed for postgres //SOCI: dt_long_long Loading @@ -117,7 +113,7 @@ message Response required int64 value = 2; } repeated DtLongLong long_long_list = 8; repeated DtLongLong long_long_list = 4; //Mysql: STRING/BINARY, VARCHAR/VARBINARY //SOCI: dt_string Loading @@ -129,7 +125,7 @@ message Response required string value = 2; } repeated DtString strings_list = 9; repeated DtString strings_list = 5; //Mysql: TIMESTAMP DATE, TIME, DATETIME //SOCI: dt_date Loading @@ -141,7 +137,10 @@ message Response required sfixed64 value = 2; } repeated DtDate date_list = 10; repeated DtDate date_list = 6; } repeated Row rows = 3; } optional Metadata metadata = 4; Loading src/DBManager.cpp +130 −2 Original line number Diff line number Diff line #include <DBManager.h> #include <ctime> #include <boost/date_time.hpp> #include <soci/mysql/soci-mysql.h> namespace MetadataImporter_ns Loading Loading @@ -126,4 +130,128 @@ std::tm DBManager::retrieveLastTimestamp(std::string schema, std::string table) return lastTimestamp; } //============================================================================== // DBManager::persistMetadata() //============================================================================== void DBManager::persistMetadata(std::string schema, std::string table, const Response::Metadata& metadata) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::persistMetadata()" << endl; boost::mutex::scoped_lock lock(m_connectionPoolMutex); if(m_session_sp->get_backend() == NULL) m_session_sp->reconnect(); soci::transaction transaction(*m_session_sp); const google::protobuf::RepeatedPtrField < Response::Metadata::Row >& rows = metadata.rows(); google::protobuf::RepeatedPtrField < Response::Metadata::Row >::const_iterator it; for(it=rows.begin(); it!=rows.end(); ++it) { *m_session_sp << composeInsertQuery(schema, table, *it); } transaction.commit(); } //============================================================================== // DBManager::composeInsertQuery() //============================================================================== std::string DBManager::composeInsertQuery(std::string schema, std::string table, const Response::Metadata::Row& row) { DEBUG_STREAM << "DBManager::composeInsertQuery()" << endl; std::stringstream keysStream; std::stringstream valuesStream; std::stringstream keyValuesStream; //@fixme: check against malformed timestamp //DtDate list for(int i = 0; i<row.date_list_size(); ++i) { int64_t rawTimestamp = row.date_list(i).value(); std::tm tmTimestamp = *localtime(&rawTimestamp); boost::posix_time::ptime timestamp = boost::posix_time::ptime_from_tm(tmTimestamp); keysStream << row.date_list(i).key() << ","; valuesStream << "'" << boost::posix_time::to_iso_string(timestamp) << "',"; keyValuesStream << row.date_list(i).key() << "='" << boost::posix_time::to_iso_string(timestamp) << "',"; } //DtDouble list for(int i = 0; i<row.double_list_size(); ++i) { keysStream << row.double_list(i).key() << ","; valuesStream << row.double_list(i).value() << ","; keyValuesStream << row.double_list(i).key() << "=" << row.double_list(i).value() << ","; } //DtInteger list for(int i = 0; i<row.integer_list_size(); ++i) { keysStream << row.integer_list(i).key() << ","; valuesStream << row.integer_list(i).value() << ","; keyValuesStream << row.integer_list(i).key() << "=" << row.integer_list(i).value() << ","; } //DtLongLong list for(int i = 0; i<row.long_long_list_size(); ++i) { keysStream << row.long_long_list(i).key() << ","; valuesStream << row.long_long_list(i).value() << ","; keyValuesStream << row.long_long_list(i).key() << "=" << row.long_long_list(i).value() << ","; } //DtString list for(int i = 0; i<row.strings_list_size(); ++i) { keysStream << row.strings_list(i).key() << ","; valuesStream << "'" << row.strings_list(i).value() << "',"; keyValuesStream << row.strings_list(i).key() << "='" << row.strings_list(i).value() << "',"; } //DtUnsignedLong list for(int i = 0; i<row.unsinged_long_list_size(); ++i) { keysStream << row.unsinged_long_list(i).key() << ","; valuesStream << row.unsinged_long_list(i).value() << ","; keyValuesStream << row.unsinged_long_list(i).key() << "=" << row.unsinged_long_list(i).value() << ","; } std::string keys = keysStream.str(); if(keys.at(keys.size()-1)==',') keys.erase(keys.size()-1); std::string values = valuesStream.str(); if(values.at(values.size()-1)==',') values.erase(values.size()-1); std::string keyValues = keyValuesStream.str(); if(keyValues.at(keyValues.size()-1)==',') keyValues.erase(keyValues.size()-1); stringstream query; query << "INSERT INTO " << schema << "." << table << " (" << keys << ") VALUES (" << values << ") " << " ON DUPLICATE KEY UPDATE " << keyValues; INFO_STREAM << "QUERY: " << query.str() << endl; return query.str(); } } //namespace src/DBManager.h +10 −0 Original line number Diff line number Diff line Loading @@ -9,6 +9,7 @@ #define DBMANAGER_H #include <Configuration.h> #include <Response.pb.h> #include <tango.h> Loading Loading @@ -81,7 +82,16 @@ public: virtual std::tm retrieveLastTimestamp(std::string, std::string) throw(soci::soci_error); virtual void persistMetadata(std::string, std::string, const Response::Metadata&) throw(soci::soci_error); protected: //------------------------------------------------------------------------------ // [Protected] Utilities method //------------------------------------------------------------------------------ virtual std::string composeInsertQuery(std::string, std::string, const Response::Metadata::Row&); //------------------------------------------------------------------------------ // [Protected] Class variables //------------------------------------------------------------------------------ Loading Loading
Makefile +1 −1 Original line number Diff line number Diff line Loading @@ -24,7 +24,7 @@ CXX_RELEASE_FLAGS=-O3 CXX_DEFAULT_FLAGS=-c -Wall -Wextra -std=c++11 -std=gnu++11 LDFLAGS=-Wall -lomniORB4 -lomniDynamic4 -lCOS4 -lomnithread -ltango -llog4tango \ -lsoci_core -lsoci_mysql -lboost_system -lboost_thread -lboost_filesystem \ -lprotobuf -lssl -lboost_date_time -lprotobuf -lssl INC_PARM=$(foreach d, $(INC_DIR), -I$d) LIB_PARM=$(foreach d, $(LIB_DIR), -L$d) PROTOC :=/usr/local/protobuf-2.5.0/bin/protoc Loading
proto/Request.proto +1 −3 Original line number Diff line number Diff line Loading @@ -46,9 +46,7 @@ message Request message Metadata { required string schema = 1; required string table = 2; required sfixed64 timestamp = 3; required sfixed64 timestamp = 1; } optional Metadata metadata = 4; Loading
proto/Response.proto +61 −62 Original line number Diff line number Diff line Loading @@ -49,14 +49,6 @@ message Response message Metadata { //Metadata response partial required uint64 partial = 1; //Metadata response total required uint64 total = 2; //Metadata response state enum State Loading @@ -65,12 +57,16 @@ message Response REJECTED = 1; } required State state = 3; required State state = 1; //Metadata response status required string status = 4; required string status = 2; //Block of rows with same timestamp message Row { //Mysql: FLOAT, DOUBLE, DECIMAL //SOCI: dt_double //C++: double Loading @@ -81,7 +77,7 @@ message Response required double value = 2; } repeated DtDouble double_list = 5; repeated DtDouble double_list = 1; //Mysql: TINYINT, SMALLINT, INT, BIGINT //SOCI: dt_integer Loading @@ -93,7 +89,7 @@ message Response required int32 value = 2; } repeated DtInteger integer_list = 6; repeated DtInteger integer_list = 2; //Mysql: not used with mysql, needed for postgres //SOCI: dt_unsigned_long Loading @@ -105,7 +101,7 @@ message Response required uint64 value = 2; } repeated DtUnsignedLong unsinged_long_list = 7; repeated DtUnsignedLong unsinged_long_list = 3; //Mysql: not used with mysql, needed for postgres //SOCI: dt_long_long Loading @@ -117,7 +113,7 @@ message Response required int64 value = 2; } repeated DtLongLong long_long_list = 8; repeated DtLongLong long_long_list = 4; //Mysql: STRING/BINARY, VARCHAR/VARBINARY //SOCI: dt_string Loading @@ -129,7 +125,7 @@ message Response required string value = 2; } repeated DtString strings_list = 9; repeated DtString strings_list = 5; //Mysql: TIMESTAMP DATE, TIME, DATETIME //SOCI: dt_date Loading @@ -141,7 +137,10 @@ message Response required sfixed64 value = 2; } repeated DtDate date_list = 10; repeated DtDate date_list = 6; } repeated Row rows = 3; } optional Metadata metadata = 4; Loading
src/DBManager.cpp +130 −2 Original line number Diff line number Diff line #include <DBManager.h> #include <ctime> #include <boost/date_time.hpp> #include <soci/mysql/soci-mysql.h> namespace MetadataImporter_ns Loading Loading @@ -126,4 +130,128 @@ std::tm DBManager::retrieveLastTimestamp(std::string schema, std::string table) return lastTimestamp; } //============================================================================== // DBManager::persistMetadata() //============================================================================== void DBManager::persistMetadata(std::string schema, std::string table, const Response::Metadata& metadata) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::persistMetadata()" << endl; boost::mutex::scoped_lock lock(m_connectionPoolMutex); if(m_session_sp->get_backend() == NULL) m_session_sp->reconnect(); soci::transaction transaction(*m_session_sp); const google::protobuf::RepeatedPtrField < Response::Metadata::Row >& rows = metadata.rows(); google::protobuf::RepeatedPtrField < Response::Metadata::Row >::const_iterator it; for(it=rows.begin(); it!=rows.end(); ++it) { *m_session_sp << composeInsertQuery(schema, table, *it); } transaction.commit(); } //============================================================================== // DBManager::composeInsertQuery() //============================================================================== std::string DBManager::composeInsertQuery(std::string schema, std::string table, const Response::Metadata::Row& row) { DEBUG_STREAM << "DBManager::composeInsertQuery()" << endl; std::stringstream keysStream; std::stringstream valuesStream; std::stringstream keyValuesStream; //@fixme: check against malformed timestamp //DtDate list for(int i = 0; i<row.date_list_size(); ++i) { int64_t rawTimestamp = row.date_list(i).value(); std::tm tmTimestamp = *localtime(&rawTimestamp); boost::posix_time::ptime timestamp = boost::posix_time::ptime_from_tm(tmTimestamp); keysStream << row.date_list(i).key() << ","; valuesStream << "'" << boost::posix_time::to_iso_string(timestamp) << "',"; keyValuesStream << row.date_list(i).key() << "='" << boost::posix_time::to_iso_string(timestamp) << "',"; } //DtDouble list for(int i = 0; i<row.double_list_size(); ++i) { keysStream << row.double_list(i).key() << ","; valuesStream << row.double_list(i).value() << ","; keyValuesStream << row.double_list(i).key() << "=" << row.double_list(i).value() << ","; } //DtInteger list for(int i = 0; i<row.integer_list_size(); ++i) { keysStream << row.integer_list(i).key() << ","; valuesStream << row.integer_list(i).value() << ","; keyValuesStream << row.integer_list(i).key() << "=" << row.integer_list(i).value() << ","; } //DtLongLong list for(int i = 0; i<row.long_long_list_size(); ++i) { keysStream << row.long_long_list(i).key() << ","; valuesStream << row.long_long_list(i).value() << ","; keyValuesStream << row.long_long_list(i).key() << "=" << row.long_long_list(i).value() << ","; } //DtString list for(int i = 0; i<row.strings_list_size(); ++i) { keysStream << row.strings_list(i).key() << ","; valuesStream << "'" << row.strings_list(i).value() << "',"; keyValuesStream << row.strings_list(i).key() << "='" << row.strings_list(i).value() << "',"; } //DtUnsignedLong list for(int i = 0; i<row.unsinged_long_list_size(); ++i) { keysStream << row.unsinged_long_list(i).key() << ","; valuesStream << row.unsinged_long_list(i).value() << ","; keyValuesStream << row.unsinged_long_list(i).key() << "=" << row.unsinged_long_list(i).value() << ","; } std::string keys = keysStream.str(); if(keys.at(keys.size()-1)==',') keys.erase(keys.size()-1); std::string values = valuesStream.str(); if(values.at(values.size()-1)==',') values.erase(values.size()-1); std::string keyValues = keyValuesStream.str(); if(keyValues.at(keyValues.size()-1)==',') keyValues.erase(keyValues.size()-1); stringstream query; query << "INSERT INTO " << schema << "." << table << " (" << keys << ") VALUES (" << values << ") " << " ON DUPLICATE KEY UPDATE " << keyValues; INFO_STREAM << "QUERY: " << query.str() << endl; return query.str(); } } //namespace
src/DBManager.h +10 −0 Original line number Diff line number Diff line Loading @@ -9,6 +9,7 @@ #define DBMANAGER_H #include <Configuration.h> #include <Response.pb.h> #include <tango.h> Loading Loading @@ -81,7 +82,16 @@ public: virtual std::tm retrieveLastTimestamp(std::string, std::string) throw(soci::soci_error); virtual void persistMetadata(std::string, std::string, const Response::Metadata&) throw(soci::soci_error); protected: //------------------------------------------------------------------------------ // [Protected] Utilities method //------------------------------------------------------------------------------ virtual std::string composeInsertQuery(std::string, std::string, const Response::Metadata::Row&); //------------------------------------------------------------------------------ // [Protected] Class variables //------------------------------------------------------------------------------ Loading