Loading proto/Response.proto +1 −1 Original line number Diff line number Diff line Loading @@ -19,5 +19,5 @@ message Response optional string file_path = 3; optional int32 file_version = 4; optional string file_name = 5; optional uint64 size = 6; optional uint64 file_size = 6; } src/Client.cpp +32 −17 Original line number Diff line number Diff line Loading @@ -185,9 +185,7 @@ void Client::startUpdateLists() try { m_protocolManager_sp->updateNewFileList(); m_protocolManager_sp->updateFiledFileList(); m_protocolManager_sp->updateFileLists(); writeState(Tango::ON); writeStatus("Looking for new files"); Loading Loading @@ -217,9 +215,7 @@ void Client::handleUpdateLists() { DEBUG_STREAM << "Client::handleUpdateLists()" << endl; if(readState() != Tango::ALARM && (!m_protocolManager_sp->isNewFileListEmpty() || !m_protocolManager_sp->isFailedFileListEmpty())) if(readState() != Tango::ALARM && m_protocolManager_sp->hasNewFile()) { startResolve(); } Loading Loading @@ -332,7 +328,23 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) { try { //TODO: handle response ResponseSP response_sp(new Response); response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], m_readBuff.size() - HEADER_SIZE); if(response_sp->state() == Response::REQUEST_ACCEPTED) { startReadData(m_protocolManager_sp->processResponse(response_sp)); } else { ERROR_STREAM << "Client::handleResponse() " << response_sp->status() << endl; writeState(Tango::ALARM); writeStatus(response_sp->status()); } } catch(std::exception& ec) { Loading Loading @@ -361,29 +373,32 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) //============================================================================== // Client::handleReadData() //============================================================================== void Client::handleReadData(const boost::system::error_code& errorCode, std::size_t bytes_transferred) void Client::handleReadData(FileWrapper::SP fileWrapper_sp, std::size_t recvBytes, const boost::system::error_code& errorCode) { if(!errorCode) { if(bytes_transferred>0) { m_outputStream.write(&m_fileBuff[0], (std::streamsize)bytes_transferred); } //TODO: if output stream is bad? if(m_outputStream.tellp()<m_outputStreamSize) if(recvBytes>0) fileWrapper_sp->write(m_fileBuff, recvBytes); if(!fileWrapper_sp->isCompleted()) { startReadData(); startReadData(fileWrapper_sp); } else { INFO_STREAM << "Client::handleReadData() transfer complete " << endl; m_outputStream.close(); if(m_protocolManager_sp->hasNewFile()) { m_protocolManager_sp->nextFile(); startWriteRequest(); } } } else if(errorCode == boost::asio::error::eof) { DEBUG_STREAM << "Client::handleReadData() end of file from " Loading src/Client.h +7 −7 Original line number Diff line number Diff line Loading @@ -6,6 +6,7 @@ #include <ProtocolManager.h> #include <Request.pb.h> #include <Response.pb.h> #include <FileWrapper.h> #include <tango.h> Loading Loading @@ -110,9 +111,10 @@ protected: //------------------------------------------------------------------------------ // [Protected] Read data methods //------------------------------------------------------------------------------ virtual void startReadData() = 0; virtual void startReadData(FileWrapper::SP) = 0; virtual void handleReadData(const boost::system::error_code&, std::size_t); virtual void handleReadData(FileWrapper::SP, std::size_t, const boost::system::error_code&); //------------------------------------------------------------------------------ // [Protected] Connection reset and timeout handler methods Loading Loading @@ -184,13 +186,11 @@ protected: //Address and port of remote endpoint std::string m_remoteEndpoint; const int BUFFER_SIZE = 40960; //Read buffer size const boost::uint64_t BUFFER_SIZE = 40960; //Buffer for file data read from stream (TODO: unify two buffers) std::vector<char> m_fileBuff; std::ofstream m_outputStream; int m_outputStreamSize; }; } //End of namespace Loading src/FileWrapper.cpp 0 → 100644 +81 −0 Original line number Diff line number Diff line #include <FileWrapper.h> namespace DataImporter_ns { //============================================================================== // FileWrapper::FileWrapper() //============================================================================== FileWrapper::FileWrapper(Tango::DeviceImpl* deviceImpl_p, boost::filesystem::path filePath, boost::uint64_t totalFileSize) : Tango::LogAdapter(deviceImpl_p), m_totalFileSize(totalFileSize) { DEBUG_STREAM << "FileWrapper::FileWrapper()" << endl; m_outputFileStream.open(filePath.string(), std::ios::binary); } //============================================================================== // FileWrapper::~FileWrapper() //============================================================================== FileWrapper::~FileWrapper() { DEBUG_STREAM << "FileWrapper::~FileWrapper()" << endl; m_outputFileStream.close(); } //============================================================================== // FileWrapper::create() //============================================================================== FileWrapper::SP FileWrapper::create(Tango::DeviceImpl* deviceImpl_p, boost::filesystem::path filePath, boost::uint64_t totalFileSize) { FileWrapper::SP d_sp(new FileWrapper(deviceImpl_p, filePath, totalFileSize), FileWrapper::Deleter()); return d_sp; } //============================================================================== // FileWrapper::isOpen() //============================================================================== bool FileWrapper::isOpen() { return m_outputFileStream.is_open(); } //============================================================================== // FileWrapper::isBad() //============================================================================== bool FileWrapper::isBad() { return m_outputFileStream.bad(); } //============================================================================== // FileWrapper::isCompleted() //============================================================================== bool FileWrapper::isCompleted() { return (boost::uint64_t)m_outputFileStream.tellp() >= m_totalFileSize; } //============================================================================== // FileWrapper::getLeftToWrite() //============================================================================== boost::uint64_t FileWrapper::getLeftToWrite() { return m_totalFileSize - (boost::uint64_t)m_outputFileStream.tellp(); } //============================================================================== // FileWrapper::read() //============================================================================== void FileWrapper::write(std::vector<char>& writeBuff, boost::uint64_t& recvBytes) { m_outputFileStream.write(&writeBuff[0], (std::streamsize)recvBytes); } } src/FileWrapper.h 0 → 100644 +68 −0 Original line number Diff line number Diff line #ifndef FILEWRAPPER_H #define FILEWRAPPER_H #include <tango.h> #include <boost/filesystem.hpp> namespace DataImporter_ns { class FileWrapper : public Tango::LogAdapter { public: //------------------------------------------------------------------------------ // [Public] Shared pointer typedef //------------------------------------------------------------------------------ typedef boost::shared_ptr<FileWrapper> SP; protected: //------------------------------------------------------------------------------ // [Protected] Constructor destructor deleter //------------------------------------------------------------------------------ FileWrapper(Tango::DeviceImpl*, boost::filesystem::path, boost::uint64_t); virtual ~FileWrapper(); class Deleter; friend Deleter; class Deleter { public: void operator()(FileWrapper* d) { delete d; } }; public: //------------------------------------------------------------------------------ // [Public] Class creation method //------------------------------------------------------------------------------ static FileWrapper::SP create(Tango::DeviceImpl*, boost::filesystem::path, boost::uint64_t); //------------------------------------------------------------------------------ // [Public] Input stream methods //------------------------------------------------------------------------------ virtual bool isOpen(); virtual bool isBad(); virtual bool isCompleted(); boost::uint64_t getLeftToWrite(); virtual void write(std::vector<char>&, boost::uint64_t&); protected: //------------------------------------------------------------------------------ // [Protected] Class variables //------------------------------------------------------------------------------ //Input file size boost::uint64_t m_totalFileSize; //Input file stream std::ofstream m_outputFileStream; }; } //End of namespace #endif /* FILEWRAPPER_H */ Loading
proto/Response.proto +1 −1 Original line number Diff line number Diff line Loading @@ -19,5 +19,5 @@ message Response optional string file_path = 3; optional int32 file_version = 4; optional string file_name = 5; optional uint64 size = 6; optional uint64 file_size = 6; }
src/Client.cpp +32 −17 Original line number Diff line number Diff line Loading @@ -185,9 +185,7 @@ void Client::startUpdateLists() try { m_protocolManager_sp->updateNewFileList(); m_protocolManager_sp->updateFiledFileList(); m_protocolManager_sp->updateFileLists(); writeState(Tango::ON); writeStatus("Looking for new files"); Loading Loading @@ -217,9 +215,7 @@ void Client::handleUpdateLists() { DEBUG_STREAM << "Client::handleUpdateLists()" << endl; if(readState() != Tango::ALARM && (!m_protocolManager_sp->isNewFileListEmpty() || !m_protocolManager_sp->isFailedFileListEmpty())) if(readState() != Tango::ALARM && m_protocolManager_sp->hasNewFile()) { startResolve(); } Loading Loading @@ -332,7 +328,23 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) { try { //TODO: handle response ResponseSP response_sp(new Response); response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], m_readBuff.size() - HEADER_SIZE); if(response_sp->state() == Response::REQUEST_ACCEPTED) { startReadData(m_protocolManager_sp->processResponse(response_sp)); } else { ERROR_STREAM << "Client::handleResponse() " << response_sp->status() << endl; writeState(Tango::ALARM); writeStatus(response_sp->status()); } } catch(std::exception& ec) { Loading Loading @@ -361,29 +373,32 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) //============================================================================== // Client::handleReadData() //============================================================================== void Client::handleReadData(const boost::system::error_code& errorCode, std::size_t bytes_transferred) void Client::handleReadData(FileWrapper::SP fileWrapper_sp, std::size_t recvBytes, const boost::system::error_code& errorCode) { if(!errorCode) { if(bytes_transferred>0) { m_outputStream.write(&m_fileBuff[0], (std::streamsize)bytes_transferred); } //TODO: if output stream is bad? if(m_outputStream.tellp()<m_outputStreamSize) if(recvBytes>0) fileWrapper_sp->write(m_fileBuff, recvBytes); if(!fileWrapper_sp->isCompleted()) { startReadData(); startReadData(fileWrapper_sp); } else { INFO_STREAM << "Client::handleReadData() transfer complete " << endl; m_outputStream.close(); if(m_protocolManager_sp->hasNewFile()) { m_protocolManager_sp->nextFile(); startWriteRequest(); } } } else if(errorCode == boost::asio::error::eof) { DEBUG_STREAM << "Client::handleReadData() end of file from " Loading
src/Client.h +7 −7 Original line number Diff line number Diff line Loading @@ -6,6 +6,7 @@ #include <ProtocolManager.h> #include <Request.pb.h> #include <Response.pb.h> #include <FileWrapper.h> #include <tango.h> Loading Loading @@ -110,9 +111,10 @@ protected: //------------------------------------------------------------------------------ // [Protected] Read data methods //------------------------------------------------------------------------------ virtual void startReadData() = 0; virtual void startReadData(FileWrapper::SP) = 0; virtual void handleReadData(const boost::system::error_code&, std::size_t); virtual void handleReadData(FileWrapper::SP, std::size_t, const boost::system::error_code&); //------------------------------------------------------------------------------ // [Protected] Connection reset and timeout handler methods Loading Loading @@ -184,13 +186,11 @@ protected: //Address and port of remote endpoint std::string m_remoteEndpoint; const int BUFFER_SIZE = 40960; //Read buffer size const boost::uint64_t BUFFER_SIZE = 40960; //Buffer for file data read from stream (TODO: unify two buffers) std::vector<char> m_fileBuff; std::ofstream m_outputStream; int m_outputStreamSize; }; } //End of namespace Loading
src/FileWrapper.cpp 0 → 100644 +81 −0 Original line number Diff line number Diff line #include <FileWrapper.h> namespace DataImporter_ns { //============================================================================== // FileWrapper::FileWrapper() //============================================================================== FileWrapper::FileWrapper(Tango::DeviceImpl* deviceImpl_p, boost::filesystem::path filePath, boost::uint64_t totalFileSize) : Tango::LogAdapter(deviceImpl_p), m_totalFileSize(totalFileSize) { DEBUG_STREAM << "FileWrapper::FileWrapper()" << endl; m_outputFileStream.open(filePath.string(), std::ios::binary); } //============================================================================== // FileWrapper::~FileWrapper() //============================================================================== FileWrapper::~FileWrapper() { DEBUG_STREAM << "FileWrapper::~FileWrapper()" << endl; m_outputFileStream.close(); } //============================================================================== // FileWrapper::create() //============================================================================== FileWrapper::SP FileWrapper::create(Tango::DeviceImpl* deviceImpl_p, boost::filesystem::path filePath, boost::uint64_t totalFileSize) { FileWrapper::SP d_sp(new FileWrapper(deviceImpl_p, filePath, totalFileSize), FileWrapper::Deleter()); return d_sp; } //============================================================================== // FileWrapper::isOpen() //============================================================================== bool FileWrapper::isOpen() { return m_outputFileStream.is_open(); } //============================================================================== // FileWrapper::isBad() //============================================================================== bool FileWrapper::isBad() { return m_outputFileStream.bad(); } //============================================================================== // FileWrapper::isCompleted() //============================================================================== bool FileWrapper::isCompleted() { return (boost::uint64_t)m_outputFileStream.tellp() >= m_totalFileSize; } //============================================================================== // FileWrapper::getLeftToWrite() //============================================================================== boost::uint64_t FileWrapper::getLeftToWrite() { return m_totalFileSize - (boost::uint64_t)m_outputFileStream.tellp(); } //============================================================================== // FileWrapper::read() //============================================================================== void FileWrapper::write(std::vector<char>& writeBuff, boost::uint64_t& recvBytes) { m_outputFileStream.write(&writeBuff[0], (std::streamsize)recvBytes); } }
src/FileWrapper.h 0 → 100644 +68 −0 Original line number Diff line number Diff line #ifndef FILEWRAPPER_H #define FILEWRAPPER_H #include <tango.h> #include <boost/filesystem.hpp> namespace DataImporter_ns { class FileWrapper : public Tango::LogAdapter { public: //------------------------------------------------------------------------------ // [Public] Shared pointer typedef //------------------------------------------------------------------------------ typedef boost::shared_ptr<FileWrapper> SP; protected: //------------------------------------------------------------------------------ // [Protected] Constructor destructor deleter //------------------------------------------------------------------------------ FileWrapper(Tango::DeviceImpl*, boost::filesystem::path, boost::uint64_t); virtual ~FileWrapper(); class Deleter; friend Deleter; class Deleter { public: void operator()(FileWrapper* d) { delete d; } }; public: //------------------------------------------------------------------------------ // [Public] Class creation method //------------------------------------------------------------------------------ static FileWrapper::SP create(Tango::DeviceImpl*, boost::filesystem::path, boost::uint64_t); //------------------------------------------------------------------------------ // [Public] Input stream methods //------------------------------------------------------------------------------ virtual bool isOpen(); virtual bool isBad(); virtual bool isCompleted(); boost::uint64_t getLeftToWrite(); virtual void write(std::vector<char>&, boost::uint64_t&); protected: //------------------------------------------------------------------------------ // [Protected] Class variables //------------------------------------------------------------------------------ //Input file size boost::uint64_t m_totalFileSize; //Input file stream std::ofstream m_outputFileStream; }; } //End of namespace #endif /* FILEWRAPPER_H */