Loading proto/Request.proto +2 −12 Original line number Diff line number Diff line Loading @@ -32,19 +32,9 @@ message Request message Column { enum Type { DT_DOUBLE = 0; DT_INTEGER = 1; DT_UNSIGNED_LONG = 2; DT_LONG_LONG = 3; DT_STRING = 4; DT_DATE = 5; } required string name = 1; required Type type = 2; required bool nullable = 3; required string type = 2; required string nullable = 3; } repeated Column columns = 3; Loading src/DBManager.cpp +66 −32 Original line number Diff line number Diff line #include <DBManager.h> #include <soci/mysql/soci-mysql.h> #include <soci/use.h> namespace MetadataExporter_ns { Loading Loading @@ -46,22 +47,24 @@ void DBManager::connect() throw(soci::soci_error) { DEBUG_STREAM << "DBManager::connect()" << endl; // std::stringstream connection; // connection << " host=" << m_configuration_sp->getDatabaseHost(); // connection << " port=" << m_configuration_sp->getDatabasePort(); // connection << " user=" << m_configuration_sp->getDatabaseUsername(); // connection << " password=" << m_configuration_sp->getDatabasePassword(); // // unsigned int connectionNumber = m_configuration_sp->getDatabaseConnectionNumber(); // // for(unsigned int i=0; i<connectionNumber; ++i) // { // m_connectionPool_sp->at(i).open(soci::mysql, connection.str()); // // #ifdef VERBOSE_DEBUG // INFO_STREAM << "CONNECTION: " << connection.str() << " -> OPEN" << endl; // #endif // } boost::mutex::scoped_lock lock(m_connectionPoolMutex); std::stringstream connection; connection << " host=" << m_configuration_sp->getDatabaseHost(); connection << " port=" << m_configuration_sp->getDatabasePort(); connection << " user=" << m_configuration_sp->getDatabaseUsername(); connection << " password=" << m_configuration_sp->getDatabasePassword(); #ifdef VERBOSE_DEBUG INFO_STREAM << "CONNECTION: " << connection.str() << endl; #endif unsigned int connectionNumber = m_configuration_sp->getDatabaseConnectionNumber(); for(unsigned int i=0; i<connectionNumber; ++i) { m_connectionPool_sp->at(i).open(soci::mysql, connection.str()); } } //============================================================================== Loading @@ -71,22 +74,53 @@ void DBManager::disconnect() { DEBUG_STREAM << "DBManager::disconnect()" << endl; // std::stringstream connection; // connection << " host=" << m_configuration_sp->getDatabaseHost(); // connection << " port=" << m_configuration_sp->getDatabasePort(); // connection << " user=" << m_configuration_sp->getDatabaseUsername(); // connection << " password=" << m_configuration_sp->getDatabasePassword(); // // unsigned int connectionNumber = m_configuration_sp->getDatabaseConnectionNumber(); // // for(unsigned int i=0; i<connectionNumber; ++i) // { // m_connectionPool_sp->at(i).close(); // // #ifdef VERBOSE_DEBUG // INFO_STREAM << "CONNECTION: " << connection.str() << " -> CLOSE" << endl; // #endif // } boost::mutex::scoped_lock lock(m_connectionPoolMutex); unsigned int connectionNumber = m_configuration_sp->getDatabaseConnectionNumber(); for(unsigned int i=0; i<connectionNumber; ++i) { m_connectionPool_sp->at(i).close(); } } //============================================================================== // DBManager::retrieveInformation() //============================================================================== DBManager::InformationList DBManager::retrieveInformation(std::string schema, std::string table) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveInformation()" << endl; soci::session session(*m_connectionPool_sp); soci::rowset<InformationTuple> rows = (session.prepare << "select " "column_name, column_type, is_nullable from information_schema.columns " "where table_schema like :schema and table_name like :table", soci::use(schema, "schema"), soci::use(table, "table")); InformationList informationList; std::copy(rows.begin(), rows.end(), std::back_inserter(informationList)); return informationList; } //============================================================================== // DBManager::retrieveInformation() //============================================================================== soci::rowset<soci::row> DBManager::searchNewTuples(std::string schema, std::string table, std::tm update_time) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::searchNewTuples()" << endl; soci::session session(*m_connectionPool_sp); soci::rowset<soci::row> rows = (session.prepare << "select * from " << schema << "." << table << " where update_time>=:timestamp", soci::use(update_time,"timestamp")); return rows; } } //namespace src/DBManager.h +23 −1 Original line number Diff line number Diff line Loading @@ -5,10 +5,19 @@ #include <tango.h> #include <ctime> #include <boost/tuple/tuple.hpp> #include <boost/optional/optional.hpp> #include <boost/scoped_ptr.hpp> #include <boost/thread/mutex.hpp> #include <soci/soci.h> #include <soci/error.h> #include <soci/row.h> #include <soci/rowset.h> #include <soci/boost-tuple.h> #include <soci/boost-optional.h> #include <soci/session.h> #include <soci/connection-pool.h> Loading Loading @@ -53,8 +62,21 @@ public: virtual void disconnect(); //------------------------------------------------------------------------------ // [Public] Data access methods // [Public] Retrieve information schema method //------------------------------------------------------------------------------ typedef boost::tuple< boost::optional<std::string>, boost::optional<std::string>, boost::optional<std::string> > InformationTuple; typedef std::vector< InformationTuple > InformationList; virtual InformationList retrieveInformation(std::string, std::string) throw(soci::soci_error); //------------------------------------------------------------------------------ // [Public] Search new tuple method //------------------------------------------------------------------------------ virtual soci::rowset<soci::row> searchNewTuples(std::string, std::string, std::tm) throw(soci::soci_error); protected: //------------------------------------------------------------------------------ Loading src/ProtocolManager.cpp +137 −19 Original line number Diff line number Diff line Loading @@ -13,7 +13,7 @@ ProtocolManager::ProtocolManager(Tango::DeviceImpl* deviceImpl_p, { DEBUG_STREAM << "ProtocolManager::ProtocolManager()" << endl; m_isAuthenticated = false; m_isAuthorised = false; m_isValidated = false; } Loading Loading @@ -88,7 +88,7 @@ ResponseSP ProtocolManager::prepareAuthroisation(RequestSP request_sp) Response::Authorization* auth_resp = response_sp->mutable_authorization(); if(!m_isAuthenticated) if(!m_isAuthorised) { const Request::Authorization& auth_req = request_sp->authorization(); std::string username = auth_req.username(); Loading @@ -99,7 +99,7 @@ ResponseSP ProtocolManager::prepareAuthroisation(RequestSP request_sp) INFO_STREAM << "ProtocolManager::prepareAuthroisation() " << "Authorization accepted" << endl; m_isAuthenticated = true; m_isAuthorised = true; auth_resp->set_state(Response::Authorization::ACCEPTED); auth_resp->set_status("Authorization accepted"); Loading @@ -109,7 +109,7 @@ ResponseSP ProtocolManager::prepareAuthroisation(RequestSP request_sp) WARN_STREAM << "ProtocolManager::prepareAuthroisation() " << "Invalid username or password" << endl; m_isAuthenticated = false; m_isAuthorised = false; auth_resp->set_state(Response::Authorization::REJECTED); auth_resp->set_status("Invalid username or password"); Loading Loading @@ -139,25 +139,68 @@ ResponseSP ProtocolManager::prepareValidation(RequestSP request_sp) response_sp->set_type(Response::VALIDATION); Response::Validation* validation = response_sp->mutable_validation(); Response::Validation* validationRes = response_sp->mutable_validation(); if(m_isAuthorised) { if(!m_isValidated) { INFO_STREAM << "ProtocolManager::prepareValidation() " << "Validation accepted" << endl; const Request::Validation& validationReq = request_sp->validation(); const std::string& schema = validationReq.schema(); const std::string& table = validationReq.table(); DBManager::InformationList informationList = m_dBManager_sp->retrieveInformation(schema, table); if(validationReq.columns_size() == (int)informationList.size()) { const google::protobuf::RepeatedPtrField < Request::Validation::Column >& columns = validationReq.columns(); google::protobuf::RepeatedPtrField < Request::Validation::Column >::const_iterator it; try { for(it=columns.begin(); it!=columns.end();++it) { validateColumn(*it, informationList); } m_isValidated = true; validation->set_state(Response::Validation::ACCEPTED); validation->set_status("Validation accepted"); validationRes->set_state(Response::Validation::ACCEPTED); validationRes->set_status("Table validated"); } catch(std::runtime_error& ex) { validationRes->set_state(Response::Validation::REJECTED); validationRes->set_status(ex.what()); } } else { validationRes->set_state(Response::Validation::REJECTED); validationRes->set_status("Columns number does not match"); } } else { WARN_STREAM << "ProtocolManager::prepareValidation() " << "Already validated" << endl; validation->set_state(Response::Validation::REJECTED); validation->set_status("Already validated"); validationRes->set_state(Response::Validation::REJECTED); validationRes->set_status("Already validated"); } } else { WARN_STREAM << "ProtocolManager::prepareValidation() " << "Not authorised" << endl; validationRes->set_state(Response::Validation::REJECTED); validationRes->set_status("Not authorised"); } return response_sp; Loading @@ -177,12 +220,87 @@ ResponseSP ProtocolManager::prepareMetadata(RequestSP request_sp) Response::Metadata* metadata = response_sp->mutable_metadata(); if(m_isAuthorised) { if(m_isValidated) { metadata->set_state(Response::Metadata::ACCEPTED); metadata->set_status("Metadata ready"); metadata->set_partial(1); metadata->set_total(1); metadata->set_partial(0); metadata->set_total(0); } else { metadata->set_state(Response::Metadata::REJECTED); metadata->set_status("Not validated"); metadata->set_partial(0); metadata->set_total(0); } } else { metadata->set_state(Response::Metadata::REJECTED); metadata->set_status("Not authorised"); metadata->set_partial(0); metadata->set_total(0); } return response_sp; } //============================================================================== // ProtocolManager::validateColumn() //============================================================================== void ProtocolManager::validateColumn(const Request::Validation::Column& column, DBManager::InformationList& informationList) throw(std::runtime_error) { DEBUG_STREAM << "ProtocolManager::validateColumn()" << endl; bool found = false; DBManager::InformationList::const_iterator it; for(it=informationList.begin(); it!=informationList.end(); ++it) { if(!it->get<0>()) throw std::runtime_error("Empty column name"); std::string columnName = it->get<0>().get(); if(column.name().compare(columnName)==0) { found = true; if(!it->get<1>()) throw std::runtime_error("Empty column type"); std::string columnType = it->get<1>().get(); if(column.type().compare(columnType)!=0) { std::stringstream errorStream; errorStream << "Column " << column.name() << " type error " << "server " << columnType << " client " << column.type(); throw std::runtime_error(errorStream.str()); } if(!it->get<2>()) throw std::runtime_error("Empty is nullable"); std::string isNullable = it->get<2>().get(); if(column.nullable().compare(isNullable)!=0) { std::stringstream errorStream; errorStream << "Column " << column.name() << " nullable error " << "server " << isNullable << " client " << column.nullable(); throw std::runtime_error(errorStream.str()); } } } if(!found) { std::stringstream errorStream; errorStream << "Column " << column.name() << " not found on server"; throw std::runtime_error(errorStream.str()); } } } //namespace src/ProtocolManager.h +8 −2 Original line number Diff line number Diff line Loading @@ -67,6 +67,12 @@ protected: virtual ResponseSP prepareMetadata(RequestSP) throw(std::runtime_error); //------------------------------------------------------------------------------ // [Protected] Validation related methods //------------------------------------------------------------------------------ virtual void validateColumn(const Request::Validation::Column&, DBManager::InformationList&) throw(std::runtime_error); //------------------------------------------------------------------------------ // [Protected] Class variables //------------------------------------------------------------------------------ Loading @@ -76,8 +82,8 @@ protected: //Database manger shared pointer DBManager::SP m_dBManager_sp; //Client is authenticated bool m_isAuthenticated; //Client is authorised bool m_isAuthorised; //Table structure is validated bool m_isValidated; Loading Loading
proto/Request.proto +2 −12 Original line number Diff line number Diff line Loading @@ -32,19 +32,9 @@ message Request message Column { enum Type { DT_DOUBLE = 0; DT_INTEGER = 1; DT_UNSIGNED_LONG = 2; DT_LONG_LONG = 3; DT_STRING = 4; DT_DATE = 5; } required string name = 1; required Type type = 2; required bool nullable = 3; required string type = 2; required string nullable = 3; } repeated Column columns = 3; Loading
src/DBManager.cpp +66 −32 Original line number Diff line number Diff line #include <DBManager.h> #include <soci/mysql/soci-mysql.h> #include <soci/use.h> namespace MetadataExporter_ns { Loading Loading @@ -46,22 +47,24 @@ void DBManager::connect() throw(soci::soci_error) { DEBUG_STREAM << "DBManager::connect()" << endl; // std::stringstream connection; // connection << " host=" << m_configuration_sp->getDatabaseHost(); // connection << " port=" << m_configuration_sp->getDatabasePort(); // connection << " user=" << m_configuration_sp->getDatabaseUsername(); // connection << " password=" << m_configuration_sp->getDatabasePassword(); // // unsigned int connectionNumber = m_configuration_sp->getDatabaseConnectionNumber(); // // for(unsigned int i=0; i<connectionNumber; ++i) // { // m_connectionPool_sp->at(i).open(soci::mysql, connection.str()); // // #ifdef VERBOSE_DEBUG // INFO_STREAM << "CONNECTION: " << connection.str() << " -> OPEN" << endl; // #endif // } boost::mutex::scoped_lock lock(m_connectionPoolMutex); std::stringstream connection; connection << " host=" << m_configuration_sp->getDatabaseHost(); connection << " port=" << m_configuration_sp->getDatabasePort(); connection << " user=" << m_configuration_sp->getDatabaseUsername(); connection << " password=" << m_configuration_sp->getDatabasePassword(); #ifdef VERBOSE_DEBUG INFO_STREAM << "CONNECTION: " << connection.str() << endl; #endif unsigned int connectionNumber = m_configuration_sp->getDatabaseConnectionNumber(); for(unsigned int i=0; i<connectionNumber; ++i) { m_connectionPool_sp->at(i).open(soci::mysql, connection.str()); } } //============================================================================== Loading @@ -71,22 +74,53 @@ void DBManager::disconnect() { DEBUG_STREAM << "DBManager::disconnect()" << endl; // std::stringstream connection; // connection << " host=" << m_configuration_sp->getDatabaseHost(); // connection << " port=" << m_configuration_sp->getDatabasePort(); // connection << " user=" << m_configuration_sp->getDatabaseUsername(); // connection << " password=" << m_configuration_sp->getDatabasePassword(); // // unsigned int connectionNumber = m_configuration_sp->getDatabaseConnectionNumber(); // // for(unsigned int i=0; i<connectionNumber; ++i) // { // m_connectionPool_sp->at(i).close(); // // #ifdef VERBOSE_DEBUG // INFO_STREAM << "CONNECTION: " << connection.str() << " -> CLOSE" << endl; // #endif // } boost::mutex::scoped_lock lock(m_connectionPoolMutex); unsigned int connectionNumber = m_configuration_sp->getDatabaseConnectionNumber(); for(unsigned int i=0; i<connectionNumber; ++i) { m_connectionPool_sp->at(i).close(); } } //============================================================================== // DBManager::retrieveInformation() //============================================================================== DBManager::InformationList DBManager::retrieveInformation(std::string schema, std::string table) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveInformation()" << endl; soci::session session(*m_connectionPool_sp); soci::rowset<InformationTuple> rows = (session.prepare << "select " "column_name, column_type, is_nullable from information_schema.columns " "where table_schema like :schema and table_name like :table", soci::use(schema, "schema"), soci::use(table, "table")); InformationList informationList; std::copy(rows.begin(), rows.end(), std::back_inserter(informationList)); return informationList; } //============================================================================== // DBManager::retrieveInformation() //============================================================================== soci::rowset<soci::row> DBManager::searchNewTuples(std::string schema, std::string table, std::tm update_time) throw(soci::soci_error) { DEBUG_STREAM << "DBManager::searchNewTuples()" << endl; soci::session session(*m_connectionPool_sp); soci::rowset<soci::row> rows = (session.prepare << "select * from " << schema << "." << table << " where update_time>=:timestamp", soci::use(update_time,"timestamp")); return rows; } } //namespace
src/DBManager.h +23 −1 Original line number Diff line number Diff line Loading @@ -5,10 +5,19 @@ #include <tango.h> #include <ctime> #include <boost/tuple/tuple.hpp> #include <boost/optional/optional.hpp> #include <boost/scoped_ptr.hpp> #include <boost/thread/mutex.hpp> #include <soci/soci.h> #include <soci/error.h> #include <soci/row.h> #include <soci/rowset.h> #include <soci/boost-tuple.h> #include <soci/boost-optional.h> #include <soci/session.h> #include <soci/connection-pool.h> Loading Loading @@ -53,8 +62,21 @@ public: virtual void disconnect(); //------------------------------------------------------------------------------ // [Public] Data access methods // [Public] Retrieve information schema method //------------------------------------------------------------------------------ typedef boost::tuple< boost::optional<std::string>, boost::optional<std::string>, boost::optional<std::string> > InformationTuple; typedef std::vector< InformationTuple > InformationList; virtual InformationList retrieveInformation(std::string, std::string) throw(soci::soci_error); //------------------------------------------------------------------------------ // [Public] Search new tuple method //------------------------------------------------------------------------------ virtual soci::rowset<soci::row> searchNewTuples(std::string, std::string, std::tm) throw(soci::soci_error); protected: //------------------------------------------------------------------------------ Loading
src/ProtocolManager.cpp +137 −19 Original line number Diff line number Diff line Loading @@ -13,7 +13,7 @@ ProtocolManager::ProtocolManager(Tango::DeviceImpl* deviceImpl_p, { DEBUG_STREAM << "ProtocolManager::ProtocolManager()" << endl; m_isAuthenticated = false; m_isAuthorised = false; m_isValidated = false; } Loading Loading @@ -88,7 +88,7 @@ ResponseSP ProtocolManager::prepareAuthroisation(RequestSP request_sp) Response::Authorization* auth_resp = response_sp->mutable_authorization(); if(!m_isAuthenticated) if(!m_isAuthorised) { const Request::Authorization& auth_req = request_sp->authorization(); std::string username = auth_req.username(); Loading @@ -99,7 +99,7 @@ ResponseSP ProtocolManager::prepareAuthroisation(RequestSP request_sp) INFO_STREAM << "ProtocolManager::prepareAuthroisation() " << "Authorization accepted" << endl; m_isAuthenticated = true; m_isAuthorised = true; auth_resp->set_state(Response::Authorization::ACCEPTED); auth_resp->set_status("Authorization accepted"); Loading @@ -109,7 +109,7 @@ ResponseSP ProtocolManager::prepareAuthroisation(RequestSP request_sp) WARN_STREAM << "ProtocolManager::prepareAuthroisation() " << "Invalid username or password" << endl; m_isAuthenticated = false; m_isAuthorised = false; auth_resp->set_state(Response::Authorization::REJECTED); auth_resp->set_status("Invalid username or password"); Loading Loading @@ -139,25 +139,68 @@ ResponseSP ProtocolManager::prepareValidation(RequestSP request_sp) response_sp->set_type(Response::VALIDATION); Response::Validation* validation = response_sp->mutable_validation(); Response::Validation* validationRes = response_sp->mutable_validation(); if(m_isAuthorised) { if(!m_isValidated) { INFO_STREAM << "ProtocolManager::prepareValidation() " << "Validation accepted" << endl; const Request::Validation& validationReq = request_sp->validation(); const std::string& schema = validationReq.schema(); const std::string& table = validationReq.table(); DBManager::InformationList informationList = m_dBManager_sp->retrieveInformation(schema, table); if(validationReq.columns_size() == (int)informationList.size()) { const google::protobuf::RepeatedPtrField < Request::Validation::Column >& columns = validationReq.columns(); google::protobuf::RepeatedPtrField < Request::Validation::Column >::const_iterator it; try { for(it=columns.begin(); it!=columns.end();++it) { validateColumn(*it, informationList); } m_isValidated = true; validation->set_state(Response::Validation::ACCEPTED); validation->set_status("Validation accepted"); validationRes->set_state(Response::Validation::ACCEPTED); validationRes->set_status("Table validated"); } catch(std::runtime_error& ex) { validationRes->set_state(Response::Validation::REJECTED); validationRes->set_status(ex.what()); } } else { validationRes->set_state(Response::Validation::REJECTED); validationRes->set_status("Columns number does not match"); } } else { WARN_STREAM << "ProtocolManager::prepareValidation() " << "Already validated" << endl; validation->set_state(Response::Validation::REJECTED); validation->set_status("Already validated"); validationRes->set_state(Response::Validation::REJECTED); validationRes->set_status("Already validated"); } } else { WARN_STREAM << "ProtocolManager::prepareValidation() " << "Not authorised" << endl; validationRes->set_state(Response::Validation::REJECTED); validationRes->set_status("Not authorised"); } return response_sp; Loading @@ -177,12 +220,87 @@ ResponseSP ProtocolManager::prepareMetadata(RequestSP request_sp) Response::Metadata* metadata = response_sp->mutable_metadata(); if(m_isAuthorised) { if(m_isValidated) { metadata->set_state(Response::Metadata::ACCEPTED); metadata->set_status("Metadata ready"); metadata->set_partial(1); metadata->set_total(1); metadata->set_partial(0); metadata->set_total(0); } else { metadata->set_state(Response::Metadata::REJECTED); metadata->set_status("Not validated"); metadata->set_partial(0); metadata->set_total(0); } } else { metadata->set_state(Response::Metadata::REJECTED); metadata->set_status("Not authorised"); metadata->set_partial(0); metadata->set_total(0); } return response_sp; } //============================================================================== // ProtocolManager::validateColumn() //============================================================================== void ProtocolManager::validateColumn(const Request::Validation::Column& column, DBManager::InformationList& informationList) throw(std::runtime_error) { DEBUG_STREAM << "ProtocolManager::validateColumn()" << endl; bool found = false; DBManager::InformationList::const_iterator it; for(it=informationList.begin(); it!=informationList.end(); ++it) { if(!it->get<0>()) throw std::runtime_error("Empty column name"); std::string columnName = it->get<0>().get(); if(column.name().compare(columnName)==0) { found = true; if(!it->get<1>()) throw std::runtime_error("Empty column type"); std::string columnType = it->get<1>().get(); if(column.type().compare(columnType)!=0) { std::stringstream errorStream; errorStream << "Column " << column.name() << " type error " << "server " << columnType << " client " << column.type(); throw std::runtime_error(errorStream.str()); } if(!it->get<2>()) throw std::runtime_error("Empty is nullable"); std::string isNullable = it->get<2>().get(); if(column.nullable().compare(isNullable)!=0) { std::stringstream errorStream; errorStream << "Column " << column.name() << " nullable error " << "server " << isNullable << " client " << column.nullable(); throw std::runtime_error(errorStream.str()); } } } if(!found) { std::stringstream errorStream; errorStream << "Column " << column.name() << " not found on server"; throw std::runtime_error(errorStream.str()); } } } //namespace
src/ProtocolManager.h +8 −2 Original line number Diff line number Diff line Loading @@ -67,6 +67,12 @@ protected: virtual ResponseSP prepareMetadata(RequestSP) throw(std::runtime_error); //------------------------------------------------------------------------------ // [Protected] Validation related methods //------------------------------------------------------------------------------ virtual void validateColumn(const Request::Validation::Column&, DBManager::InformationList&) throw(std::runtime_error); //------------------------------------------------------------------------------ // [Protected] Class variables //------------------------------------------------------------------------------ Loading @@ -76,8 +82,8 @@ protected: //Database manger shared pointer DBManager::SP m_dBManager_sp; //Client is authenticated bool m_isAuthenticated; //Client is authorised bool m_isAuthorised; //Table structure is validated bool m_isValidated; Loading