Newer
Older
namespace DataImporter_ns
{
//==============================================================================
// ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::ProtocolManager(Tango::DeviceImpl* deviceImpl_p,
Configuration::SP configuration_sp, DBManager::SP dBManager_sp) :
Tango::LogAdapter(deviceImpl_p), m_deviceImpl_p(deviceImpl_p),
m_configuration_sp(configuration_sp), m_dBManager_sp(dBManager_sp)
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
{
DEBUG_STREAM << "ProtocolManager::ProtocolManager()" << endl;
}
//==============================================================================
// ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::~ProtocolManager()
{
DEBUG_STREAM << "ProtocolManager::~ProtocolManager()" << endl;
}
//==============================================================================
// ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::SP ProtocolManager::create(Tango::DeviceImpl* deviceImpl_p,
Configuration::SP configuration_sp, DBManager::SP dBManager_sp)
{
ProtocolManager::SP d_sp(new ProtocolManager(deviceImpl_p, configuration_sp,
dBManager_sp), ProtocolManager::Deleter());
return d_sp;
}
//==============================================================================
// ProtocolManager::ProtocolManager()
//==============================================================================
void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint)
{
DEBUG_STREAM << "ProtocolManager::setRemoteEndpoint()" << endl;
m_remoteEndpoint = remoteEndpoint;
}
//==============================================================================
// ProtocolManager::updateNewFileList()
//==============================================================================
void ProtocolManager::updateFileLists() throw(soci::soci_error)
DEBUG_STREAM << "ProtocolManager::updateFileLists()" << endl;
boost::posix_time::ptime m_lastTimestamp =
m_dBManager_sp->retrieveLastTimestamp();
DEBUG_STREAM << "ProtocolManager::updateFileLists() last timestamp "
<< boost::posix_time::to_simple_string(m_lastTimestamp) << endl;
m_newFileRowset_sp = m_dBManager_sp->retrieveNewFiles(m_lastTimestamp);
m_newFileRowsetIt = m_newFileRowset_sp->begin();
m_failedFileRowset_sp = m_dBManager_sp->retrieveFailedFiles();
m_failedFileRowsetIt = m_failedFileRowset_sp->begin();
}
//==============================================================================
// ProtocolManager::isNewFileListEmpty()
//==============================================================================
DEBUG_STREAM << "ProtocolManager::isNewFileListEmpty()" << endl;
if(m_newFileRowset_sp &&
m_newFileRowsetIt != m_newFileRowset_sp->end())
{
DEBUG_STREAM << "NEW FILE MODE" << endl;
m_recoveryMode = false;
return true;
}
else if(m_failedFileRowset_sp &&
m_failedFileRowsetIt != m_failedFileRowset_sp->end())
{
DEBUG_STREAM << "RECOVERY MODE" << endl;
m_recoveryMode = true;
return true;
}
return false;
}
//==============================================================================
//==============================================================================
DEBUG_STREAM << "ProtocolManager::nextFile()" << endl;
if(!m_recoveryMode)
{
if(m_newFileRowset_sp &&
m_newFileRowsetIt != m_newFileRowset_sp->end())
{
DEBUG_STREAM << "NEXT NEW FILE MODE" << endl;
++m_newFileRowsetIt;
}
}
else
{
if(m_failedFileRowset_sp &&
m_failedFileRowsetIt != m_failedFileRowset_sp->end())
{
DEBUG_STREAM << "NEXT NEW FILE MODE" << endl;
++m_failedFileRowsetIt;
}
}
//==============================================================================
// ProtocolManager::createNewFileRequest()
//==============================================================================
RequestSP ProtocolManager::createRequest() throw(std::runtime_error)
DEBUG_STREAM << "ProtocolManager::createNewFileRequest()" << endl;
throw std::runtime_error("New file list is empty");
RequestSP request_sp(new Request);
request_sp->set_username(m_configuration_sp->getDatabaseUsername());
request_sp->set_password(m_configuration_sp->getDatabasePassword());
request_sp->set_schema(m_configuration_sp->getDatabaseSchema());
request_sp->set_table(m_configuration_sp->getDatabaseTable());
int fileVersion;
std::string fileName;
if(!m_recoveryMode)
{
if(!m_newFileRowsetIt->get<0>())
throw std::runtime_error("Empty file version found");
fileVersion = m_newFileRowsetIt->get<0>().get();
if(!m_newFileRowsetIt->get<1>())
throw std::runtime_error("Empty file name found");
fileName = m_newFileRowsetIt->get<1>().get();
INFO_STREAM << "ProtocolManager::createTransfer() request new file "
<< fileName << " version " << fileVersion << endl;
}
else
{
if(!m_failedFileRowsetIt->get<0>())
throw std::runtime_error("Empty file version found");
fileVersion = m_failedFileRowsetIt->get<0>().get();
if(!m_failedFileRowsetIt->get<1>())
throw std::runtime_error("Empty file name found");
fileName = m_failedFileRowsetIt->get<1>().get();
INFO_STREAM << "ProtocolManager::createTransfer() request recovery file "
<< fileName << " version " << fileVersion << endl;
}
request_sp->set_file_version(fileVersion);
request_sp->set_file_name(fileName);
if(!request_sp->IsInitialized())
throw std::runtime_error("Request not initialized");
//==============================================================================
//==============================================================================
FileWrapper::SP ProtocolManager::processResponse(ResponseSP response_sp)
DEBUG_STREAM << "ProtocolManager::processResponse()" << endl;
std::string filePath = response_sp->file_path();
int fileVersion = response_sp->file_version();
std::string fileName = response_sp->file_name();
boost::uint64_t fileSize = response_sp->file_size();
INFO_STREAM << "ProtocolManager::processResponse() "
<< " transfer file " << fileName << " version "
<< fileVersion << " size " << fileSize << endl;
// boost::filesystem::path path = composePath(m_configuration_sp->getStoragePath(),
// filePath, fileVersion, fileName);
boost::filesystem::path path(fileName);
return FileWrapper::create(m_deviceImpl_p, path, fileSize);
}
//==============================================================================
// ProtocolManager::composePath()
//==============================================================================
boost::filesystem::path ProtocolManager::composePath(std::string storagePath,
std::string filePath, int fileVersion, std::string fileName)
{
DEBUG_STREAM << "ProtocolManager::composePath()" << endl;
boost::filesystem::path absolutePath(storagePath);
std::stringstream fileStream;
fileStream << "/" << fileVersion << "/" << fileName;
DEBUG_STREAM << "ProtocolManager::composePath() "
<< absolutePath.string() << endl;