Loading proto/Request.proto +16 −4 Original line number Diff line number Diff line Loading @@ -7,7 +7,9 @@ message Request enum Type { AUTHORIZATION = 0; DATA = 1; VALIDATION = 1; TRANSFER = 2; KEEPALIVE = 3; } required Type type = 1; Loading @@ -22,13 +24,23 @@ message Request optional Authorization authorization = 2; //Data request //Validation request message Data message Validation { required string schema = 1; required string table = 2; } optional Validation validation = 3; //Transfer request message Transfer { required int32 file_version = 1; required string file_name = 2; } optional Data data = 3; optional Transfer transfer = 4; } proto/Response.proto +27 −7 Original line number Diff line number Diff line Loading @@ -7,7 +7,9 @@ message Response enum Type { AUTHORIZATION = 0; DATA = 1; VALIDATION = 1; TRANSFER = 2; KEEPALIVE = 3; } required Type type = 1; Loading @@ -28,9 +30,9 @@ message Response optional Authorization authorization = 2; //Data response //Validation response message Data message Validation { enum State { Loading @@ -40,10 +42,28 @@ message Response required State state = 1; required string status = 2; } optional Validation validation = 3; //Transfer response message Transfer { enum State { ACCEPTED = 0; REJECTED = 1; } required string file_path = 3; required string file_version = 4; required string file_name = 5; required uint64 size = 6; required State state = 1; required string status = 2; optional string file_path = 3; optional int32 file_version = 4; optional string file_name = 5; optional uint64 size = 6; } optional Transfer transfer = 4; } src/DBManager.cpp +46 −0 Original line number Diff line number Diff line Loading @@ -77,4 +77,50 @@ void DBManager::disconnect() m_connectionPool_sp.reset(); } //============================================================================== // DBManager::retrieveFileInfo() //============================================================================== DBManager::FileTuple DBManager::retrieveFileInfo(std::string schema, std::string table, int version, std::string name) throw(soci::soci_error, std::runtime_error) { DEBUG_STREAM << "DBManager::retrieveFileInfo()" << endl; if(!m_connectionPool_sp) throw soci::soci_error("Connection pool not initialized"); soci::session session(*m_connectionPool_sp); if(session.get_backend() == NULL) session.reconnect(); soci::rowset<FileTuple> rows = (session.prepare << "select storage_path, " "file_path from " << schema << "." << table << " where " "file_version=:version and file_name like :name", soci::use(version, "version"), soci::use(name, "name")); std::vector<FileTuple> fileTupleList; std::copy(rows.begin(), rows.end(), std::back_inserter(fileTupleList)); if(fileTupleList.empty()) { std::stringstream errorStream; errorStream << "Table " << schema << "." << table << " does not contain file " << name << " version " << version; throw std::runtime_error(errorStream.str()); } if(fileTupleList.size()>1) { std::stringstream errorStream; errorStream << "Table " << schema << "." << table << " has duplicate for file " << name << " version " << version; throw std::runtime_error(errorStream.str()); } return fileTupleList.at(0); } } //namespace src/DBManager.h +9 −0 Original line number Diff line number Diff line Loading @@ -61,6 +61,15 @@ public: virtual void disconnect(); //------------------------------------------------------------------------------ // [Public] File method //------------------------------------------------------------------------------ typedef boost::tuple< boost::optional<std::string>, boost::optional<std::string> > FileTuple; virtual FileTuple retrieveFileInfo(std::string, std::string, int, std::string) throw(soci::soci_error, std::runtime_error); protected: //------------------------------------------------------------------------------ // [Protected] Class variables Loading src/ProtocolManager.cpp +263 −1 Original line number Diff line number Diff line #include <ProtocolManager.h> #include <boost/date_time.hpp> #include <boost/filesystem.hpp> namespace DataExporter_ns { Loading Loading @@ -51,10 +52,271 @@ void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint) //============================================================================== // ProtocolManager::setRemoteEndpoint() //============================================================================== ResponseSP ProtocolManager::prepareResponse(RequestSP) ResponseSP ProtocolManager::prepareResponse(RequestSP request_sp) throw(std::runtime_error) { DEBUG_STREAM << "ProtocolManager::prepareResponse()" << endl; if(!request_sp->IsInitialized()) throw std::runtime_error("Not initialized request!"); ResponseSP response_sp; switch(request_sp->type()) { case Request::AUTHORIZATION: { response_sp = prepareAuthroisation(request_sp); break; } case Request::VALIDATION: { response_sp = prepareValidation(request_sp); break; } case Request::TRANSFER: { response_sp = prepareTransfer(request_sp); break; } case Request::KEEPALIVE: { response_sp = prepareKeepAlive(request_sp); break; } default: throw std::runtime_error("Unknown request type!"); } if(!response_sp->IsInitialized()) throw std::runtime_error("Not initialized response!"); return response_sp; } //============================================================================== // ProtocolManager::prepareAuthroisation() //============================================================================== ResponseSP ProtocolManager::prepareAuthroisation(RequestSP request_sp) { DEBUG_STREAM << "ProtocolManager::prepareAuthroisation()" << endl; ResponseSP response_sp(new Response()); response_sp->set_type(Response::AUTHORIZATION); Response::Authorization* authResp = response_sp->mutable_authorization(); if(!m_isAuthorised) { const Request::Authorization& authReq = request_sp->authorization(); std::string username = authReq.username(); std::string password = authReq.password(); if(m_configuration_sp->isUserAuthorized(username, password)) { INFO_STREAM << "ProtocolManager::prepareAuthroisation() " << "Authorization accepted from " << m_remoteEndpoint << endl; m_isAuthorised = true; authResp->set_state(Response::Authorization::ACCEPTED); authResp->set_status("Authorization accepted"); } else { WARN_STREAM << "ProtocolManager::prepareAuthroisation() " << "Invalid username or password from " << m_remoteEndpoint << endl; m_isAuthorised = false; authResp->set_state(Response::Authorization::REJECTED); authResp->set_status("Invalid username or password"); } } else { WARN_STREAM << "ProtocolManager::prepareAuthroisation() " << "Already authorized from " << m_remoteEndpoint << endl; authResp->set_state(Response::Authorization::REJECTED); authResp->set_status("Already authorized"); } return response_sp; } //============================================================================== // ProtocolManager::prepareValidation() //============================================================================== ResponseSP ProtocolManager::prepareValidation(RequestSP request_sp) { DEBUG_STREAM << "ProtocolManager::prepareValidation()" << endl; ResponseSP response_sp(new Response()); response_sp->set_type(Response::VALIDATION); Response::Validation* validationRes = response_sp->mutable_validation(); if(m_isAuthorised) { if(!m_isValidated) { const Request::Validation& validationReq = request_sp->validation(); m_validatedSchema = validationReq.schema(); m_validatedTable = validationReq.table(); if(m_configuration_sp->isTableExported(m_validatedSchema, m_validatedTable)) { INFO_STREAM << "ProtocolManager::prepareValidation() " << "Validation accepted for " << m_validatedSchema << "." << m_validatedTable << " from " << m_remoteEndpoint << endl; m_isValidated = true; validationRes->set_state(Response::Validation::ACCEPTED); validationRes->set_status("Table validated"); } else { WARN_STREAM << "ProtocolManager::prepareValidation() " << "Not exported table from " << m_remoteEndpoint << endl; validationRes->set_state(Response::Validation::REJECTED); validationRes->set_status("Not exported table"); } } else { WARN_STREAM << "ProtocolManager::prepareValidation() " << "Already validated from " << m_remoteEndpoint << endl; validationRes->set_state(Response::Validation::REJECTED); validationRes->set_status("Already validated"); } } else { WARN_STREAM << "ProtocolManager::prepareValidation() " << "Not authorised from " << m_remoteEndpoint << endl; validationRes->set_state(Response::Validation::REJECTED); validationRes->set_status("Not authorised"); } return response_sp; } //============================================================================== // ProtocolManager::prepareTransfer() //============================================================================== ResponseSP ProtocolManager::prepareTransfer(RequestSP request_sp) { DEBUG_STREAM << "ProtocolManager::prepareTransfer()" << endl; ResponseSP response_sp(new Response()); response_sp->set_type(Response::TRANSFER); Response::Transfer* transferRes = response_sp->mutable_transfer(); if(m_isAuthorised) { if(m_isValidated) { const Request::Transfer& transferReq = request_sp->transfer(); int fileVersion = transferReq.file_version(); std::string fileName = transferReq.file_name(); try { DBManager::FileTuple fileTuple = m_dBManager_sp->retrieveFileInfo(m_validatedSchema, m_validatedTable, fileVersion, fileName); if(!fileTuple.get<0>() || !fileTuple.get<1>()) { std::stringstream errorStream; errorStream << "File " << fileName << " version " << fileVersion << " not yet downloaded in storage"; throw std::runtime_error(errorStream.str()); } std::string storagePath = fileTuple.get<0>().get(); boost::filesystem::path absPath(storagePath); std::string filePath = fileTuple.get<1>().get(); boost::filesystem::path relPath(filePath); absPath /= relPath; std::stringstream pathStream; pathStream << "/" << fileVersion << "/" << fileName; boost::filesystem::path lastPath(pathStream.str()); absPath /= lastPath; DEBUG_STREAM << "FILE: " << absPath.string() << endl; if(!boost::filesystem::is_regular_file(absPath)) { std::stringstream errorStream; errorStream << "File " << fileName << " version " << fileVersion << " not exists in storage"; throw std::runtime_error(errorStream.str()); } transferRes->set_file_path(filePath); transferRes->set_file_version(fileVersion); transferRes->set_file_name(fileName); transferRes->set_size(boost::filesystem::file_size(pathStream.str())); transferRes->set_state(Response::Transfer::ACCEPTED); transferRes->set_status("File found"); } catch(std::exception& ex) { WARN_STREAM << "ProtocolManager::prepareMetadata() " << ex.what() << " from " << m_remoteEndpoint << endl; transferRes->set_state(Response::Transfer::REJECTED); transferRes->set_status(ex.what()); } } else { WARN_STREAM << "ProtocolManager::prepareMetadata() " << "Not validated from " << m_remoteEndpoint << endl; transferRes->set_state(Response::Transfer::REJECTED); transferRes->set_status("Not validated"); } } else { WARN_STREAM << "ProtocolManager::prepareData() " << "Not authorised from " << m_remoteEndpoint << endl; transferRes->set_state(Response::Transfer::REJECTED); transferRes->set_status("Not authorised"); } return response_sp; } //============================================================================== // ProtocolManager::prepareKeepAlive() //============================================================================== ResponseSP ProtocolManager::prepareKeepAlive(RequestSP request_sp) { ResponseSP response_sp(new Response()); response_sp->set_type(Response::KEEPALIVE); return response_sp; } } //namespace Loading
proto/Request.proto +16 −4 Original line number Diff line number Diff line Loading @@ -7,7 +7,9 @@ message Request enum Type { AUTHORIZATION = 0; DATA = 1; VALIDATION = 1; TRANSFER = 2; KEEPALIVE = 3; } required Type type = 1; Loading @@ -22,13 +24,23 @@ message Request optional Authorization authorization = 2; //Data request //Validation request message Data message Validation { required string schema = 1; required string table = 2; } optional Validation validation = 3; //Transfer request message Transfer { required int32 file_version = 1; required string file_name = 2; } optional Data data = 3; optional Transfer transfer = 4; }
proto/Response.proto +27 −7 Original line number Diff line number Diff line Loading @@ -7,7 +7,9 @@ message Response enum Type { AUTHORIZATION = 0; DATA = 1; VALIDATION = 1; TRANSFER = 2; KEEPALIVE = 3; } required Type type = 1; Loading @@ -28,9 +30,9 @@ message Response optional Authorization authorization = 2; //Data response //Validation response message Data message Validation { enum State { Loading @@ -40,10 +42,28 @@ message Response required State state = 1; required string status = 2; } optional Validation validation = 3; //Transfer response message Transfer { enum State { ACCEPTED = 0; REJECTED = 1; } required string file_path = 3; required string file_version = 4; required string file_name = 5; required uint64 size = 6; required State state = 1; required string status = 2; optional string file_path = 3; optional int32 file_version = 4; optional string file_name = 5; optional uint64 size = 6; } optional Transfer transfer = 4; }
src/DBManager.cpp +46 −0 Original line number Diff line number Diff line Loading @@ -77,4 +77,50 @@ void DBManager::disconnect() m_connectionPool_sp.reset(); } //============================================================================== // DBManager::retrieveFileInfo() //============================================================================== DBManager::FileTuple DBManager::retrieveFileInfo(std::string schema, std::string table, int version, std::string name) throw(soci::soci_error, std::runtime_error) { DEBUG_STREAM << "DBManager::retrieveFileInfo()" << endl; if(!m_connectionPool_sp) throw soci::soci_error("Connection pool not initialized"); soci::session session(*m_connectionPool_sp); if(session.get_backend() == NULL) session.reconnect(); soci::rowset<FileTuple> rows = (session.prepare << "select storage_path, " "file_path from " << schema << "." << table << " where " "file_version=:version and file_name like :name", soci::use(version, "version"), soci::use(name, "name")); std::vector<FileTuple> fileTupleList; std::copy(rows.begin(), rows.end(), std::back_inserter(fileTupleList)); if(fileTupleList.empty()) { std::stringstream errorStream; errorStream << "Table " << schema << "." << table << " does not contain file " << name << " version " << version; throw std::runtime_error(errorStream.str()); } if(fileTupleList.size()>1) { std::stringstream errorStream; errorStream << "Table " << schema << "." << table << " has duplicate for file " << name << " version " << version; throw std::runtime_error(errorStream.str()); } return fileTupleList.at(0); } } //namespace
src/DBManager.h +9 −0 Original line number Diff line number Diff line Loading @@ -61,6 +61,15 @@ public: virtual void disconnect(); //------------------------------------------------------------------------------ // [Public] File method //------------------------------------------------------------------------------ typedef boost::tuple< boost::optional<std::string>, boost::optional<std::string> > FileTuple; virtual FileTuple retrieveFileInfo(std::string, std::string, int, std::string) throw(soci::soci_error, std::runtime_error); protected: //------------------------------------------------------------------------------ // [Protected] Class variables Loading
src/ProtocolManager.cpp +263 −1 Original line number Diff line number Diff line #include <ProtocolManager.h> #include <boost/date_time.hpp> #include <boost/filesystem.hpp> namespace DataExporter_ns { Loading Loading @@ -51,10 +52,271 @@ void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint) //============================================================================== // ProtocolManager::setRemoteEndpoint() //============================================================================== ResponseSP ProtocolManager::prepareResponse(RequestSP) ResponseSP ProtocolManager::prepareResponse(RequestSP request_sp) throw(std::runtime_error) { DEBUG_STREAM << "ProtocolManager::prepareResponse()" << endl; if(!request_sp->IsInitialized()) throw std::runtime_error("Not initialized request!"); ResponseSP response_sp; switch(request_sp->type()) { case Request::AUTHORIZATION: { response_sp = prepareAuthroisation(request_sp); break; } case Request::VALIDATION: { response_sp = prepareValidation(request_sp); break; } case Request::TRANSFER: { response_sp = prepareTransfer(request_sp); break; } case Request::KEEPALIVE: { response_sp = prepareKeepAlive(request_sp); break; } default: throw std::runtime_error("Unknown request type!"); } if(!response_sp->IsInitialized()) throw std::runtime_error("Not initialized response!"); return response_sp; } //============================================================================== // ProtocolManager::prepareAuthroisation() //============================================================================== ResponseSP ProtocolManager::prepareAuthroisation(RequestSP request_sp) { DEBUG_STREAM << "ProtocolManager::prepareAuthroisation()" << endl; ResponseSP response_sp(new Response()); response_sp->set_type(Response::AUTHORIZATION); Response::Authorization* authResp = response_sp->mutable_authorization(); if(!m_isAuthorised) { const Request::Authorization& authReq = request_sp->authorization(); std::string username = authReq.username(); std::string password = authReq.password(); if(m_configuration_sp->isUserAuthorized(username, password)) { INFO_STREAM << "ProtocolManager::prepareAuthroisation() " << "Authorization accepted from " << m_remoteEndpoint << endl; m_isAuthorised = true; authResp->set_state(Response::Authorization::ACCEPTED); authResp->set_status("Authorization accepted"); } else { WARN_STREAM << "ProtocolManager::prepareAuthroisation() " << "Invalid username or password from " << m_remoteEndpoint << endl; m_isAuthorised = false; authResp->set_state(Response::Authorization::REJECTED); authResp->set_status("Invalid username or password"); } } else { WARN_STREAM << "ProtocolManager::prepareAuthroisation() " << "Already authorized from " << m_remoteEndpoint << endl; authResp->set_state(Response::Authorization::REJECTED); authResp->set_status("Already authorized"); } return response_sp; } //============================================================================== // ProtocolManager::prepareValidation() //============================================================================== ResponseSP ProtocolManager::prepareValidation(RequestSP request_sp) { DEBUG_STREAM << "ProtocolManager::prepareValidation()" << endl; ResponseSP response_sp(new Response()); response_sp->set_type(Response::VALIDATION); Response::Validation* validationRes = response_sp->mutable_validation(); if(m_isAuthorised) { if(!m_isValidated) { const Request::Validation& validationReq = request_sp->validation(); m_validatedSchema = validationReq.schema(); m_validatedTable = validationReq.table(); if(m_configuration_sp->isTableExported(m_validatedSchema, m_validatedTable)) { INFO_STREAM << "ProtocolManager::prepareValidation() " << "Validation accepted for " << m_validatedSchema << "." << m_validatedTable << " from " << m_remoteEndpoint << endl; m_isValidated = true; validationRes->set_state(Response::Validation::ACCEPTED); validationRes->set_status("Table validated"); } else { WARN_STREAM << "ProtocolManager::prepareValidation() " << "Not exported table from " << m_remoteEndpoint << endl; validationRes->set_state(Response::Validation::REJECTED); validationRes->set_status("Not exported table"); } } else { WARN_STREAM << "ProtocolManager::prepareValidation() " << "Already validated from " << m_remoteEndpoint << endl; validationRes->set_state(Response::Validation::REJECTED); validationRes->set_status("Already validated"); } } else { WARN_STREAM << "ProtocolManager::prepareValidation() " << "Not authorised from " << m_remoteEndpoint << endl; validationRes->set_state(Response::Validation::REJECTED); validationRes->set_status("Not authorised"); } return response_sp; } //============================================================================== // ProtocolManager::prepareTransfer() //============================================================================== ResponseSP ProtocolManager::prepareTransfer(RequestSP request_sp) { DEBUG_STREAM << "ProtocolManager::prepareTransfer()" << endl; ResponseSP response_sp(new Response()); response_sp->set_type(Response::TRANSFER); Response::Transfer* transferRes = response_sp->mutable_transfer(); if(m_isAuthorised) { if(m_isValidated) { const Request::Transfer& transferReq = request_sp->transfer(); int fileVersion = transferReq.file_version(); std::string fileName = transferReq.file_name(); try { DBManager::FileTuple fileTuple = m_dBManager_sp->retrieveFileInfo(m_validatedSchema, m_validatedTable, fileVersion, fileName); if(!fileTuple.get<0>() || !fileTuple.get<1>()) { std::stringstream errorStream; errorStream << "File " << fileName << " version " << fileVersion << " not yet downloaded in storage"; throw std::runtime_error(errorStream.str()); } std::string storagePath = fileTuple.get<0>().get(); boost::filesystem::path absPath(storagePath); std::string filePath = fileTuple.get<1>().get(); boost::filesystem::path relPath(filePath); absPath /= relPath; std::stringstream pathStream; pathStream << "/" << fileVersion << "/" << fileName; boost::filesystem::path lastPath(pathStream.str()); absPath /= lastPath; DEBUG_STREAM << "FILE: " << absPath.string() << endl; if(!boost::filesystem::is_regular_file(absPath)) { std::stringstream errorStream; errorStream << "File " << fileName << " version " << fileVersion << " not exists in storage"; throw std::runtime_error(errorStream.str()); } transferRes->set_file_path(filePath); transferRes->set_file_version(fileVersion); transferRes->set_file_name(fileName); transferRes->set_size(boost::filesystem::file_size(pathStream.str())); transferRes->set_state(Response::Transfer::ACCEPTED); transferRes->set_status("File found"); } catch(std::exception& ex) { WARN_STREAM << "ProtocolManager::prepareMetadata() " << ex.what() << " from " << m_remoteEndpoint << endl; transferRes->set_state(Response::Transfer::REJECTED); transferRes->set_status(ex.what()); } } else { WARN_STREAM << "ProtocolManager::prepareMetadata() " << "Not validated from " << m_remoteEndpoint << endl; transferRes->set_state(Response::Transfer::REJECTED); transferRes->set_status("Not validated"); } } else { WARN_STREAM << "ProtocolManager::prepareData() " << "Not authorised from " << m_remoteEndpoint << endl; transferRes->set_state(Response::Transfer::REJECTED); transferRes->set_status("Not authorised"); } return response_sp; } //============================================================================== // ProtocolManager::prepareKeepAlive() //============================================================================== ResponseSP ProtocolManager::prepareKeepAlive(RequestSP request_sp) { ResponseSP response_sp(new Response()); response_sp->set_type(Response::KEEPALIVE); return response_sp; } } //namespace