Newer
Older
namespace DataImporter_ns
{
//==============================================================================
// ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::ProtocolManager(Tango::DeviceImpl* deviceImpl_p,
Configuration::SP configuration_sp, DBManager::SP dBManager_sp) :
Tango::LogAdapter(deviceImpl_p), m_deviceImpl_p(deviceImpl_p),
m_configuration_sp(configuration_sp), m_dBManager_sp(dBManager_sp)
{
DEBUG_STREAM << "ProtocolManager::ProtocolManager()" << endl;
}
//==============================================================================
// ProtocolManager::~ProtocolManager()
//==============================================================================
ProtocolManager::~ProtocolManager()
{
DEBUG_STREAM << "ProtocolManager::~ProtocolManager()" << endl;
}
//==============================================================================
//==============================================================================
ProtocolManager::SP ProtocolManager::create(Tango::DeviceImpl* deviceImpl_p,
Configuration::SP configuration_sp, DBManager::SP dBManager_sp)
{
ProtocolManager::SP d_sp(new ProtocolManager(deviceImpl_p, configuration_sp,
dBManager_sp), ProtocolManager::Deleter());
return d_sp;
}
//==============================================================================
// ProtocolManager::setRemoteEndpoint()
//==============================================================================
void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint)
{
DEBUG_STREAM << "ProtocolManager::setRemoteEndpoint()" << endl;
m_remoteEndpoint = remoteEndpoint;
}
//==============================================================================
//==============================================================================
void ProtocolManager::updateFileLists() throw(std::runtime_error)
DEBUG_STREAM << "ProtocolManager::updateFileLists()" << endl;
boost::posix_time::ptime m_lastTimestamp =
m_dBManager_sp->retrieveLastTimestamp();
DEBUG_STREAM << "ProtocolManager::updateFileLists() last timestamp "
<< boost::posix_time::to_simple_string(m_lastTimestamp) << endl;
m_newFileRowset_sp = m_dBManager_sp->retrieveNewFiles(m_lastTimestamp);
m_newFileRowsetIt = m_newFileRowset_sp->begin();
m_failedFileRowset_sp = m_dBManager_sp->retrieveFailedFiles();
m_failedFileRowsetIt = m_failedFileRowset_sp->begin();
}
//==============================================================================
//==============================================================================
DEBUG_STREAM << "ProtocolManager::hasNextFile()" << endl;
if(m_newFileRowset_sp &&
m_newFileRowsetIt != m_newFileRowset_sp->end())
{
DEBUG_STREAM << "ProtocolManager::hasNextFile() from new list" << endl;
m_recoveryMode = false;
return true;
}
else if(m_failedFileRowset_sp &&
m_failedFileRowsetIt != m_failedFileRowset_sp->end())
{
DEBUG_STREAM << "ProtocolManager::hasNextFile() from failed list" << endl;
else
{
DEBUG_STREAM << "ProtocolManager::hasNextFile() lists empty" << endl;
m_recoveryMode = false;
return false;
}
}
//==============================================================================
//==============================================================================
DEBUG_STREAM << "ProtocolManager::nextFile()" << endl;
if(!m_recoveryMode)
{
if(m_newFileRowset_sp &&
m_newFileRowsetIt != m_newFileRowset_sp->end())
{
DEBUG_STREAM << "ProtocolManager::nextFile() new list" << endl;
++m_newFileRowsetIt;
}
}
else
{
if(m_failedFileRowset_sp &&
m_failedFileRowsetIt != m_failedFileRowset_sp->end())
{
DEBUG_STREAM << "ProtocolManager::nextFile() from failed list" << endl;
//==============================================================================
//==============================================================================
RequestSP ProtocolManager::createRequest()
throw(std::logic_error, std::runtime_error)
DEBUG_STREAM << "ProtocolManager::createRequest()" << endl;
RequestSP request_sp(new Request);
request_sp->set_username(m_configuration_sp->getDatabaseUsername());
request_sp->set_password(m_configuration_sp->getDatabasePassword());
request_sp->set_schema(m_configuration_sp->getDatabaseSchema());
request_sp->set_table(m_configuration_sp->getDatabaseTable());
int fileVersion;
std::string fileName;
if(!m_recoveryMode)
{
if(!m_newFileRowset_sp ||
m_newFileRowsetIt == m_newFileRowset_sp->end())
throw std::runtime_error("New list not initialized or empty");
throw std::invalid_argument("Empty file version found on new list");
throw std::invalid_argument("Empty file name found on new list");
INFO_STREAM << "ProtocolManager::createRequest() request new file "
<< fileName << " version " << fileVersion << endl;
}
else
{
if(!m_failedFileRowset_sp ||
m_failedFileRowsetIt == m_failedFileRowset_sp->end())
throw std::runtime_error("Failed list not initialized or empty");
throw std::invalid_argument("Empty file version found on failed list");
fileVersion = m_failedFileRowsetIt->get<0>().get();
throw std::invalid_argument("Empty file name found on failed list");
INFO_STREAM << "ProtocolManager::createRequest() request failed file "
<< fileName << " version " << fileVersion << endl;
}
request_sp->set_file_version(fileVersion);
request_sp->set_file_name(fileName);
if(!request_sp->IsInitialized())
throw std::runtime_error("Request not initialized");
//==============================================================================
//==============================================================================
FileWrapper::SP ProtocolManager::processResponse(ResponseSP response_sp)
throw(std::logic_error, std::runtime_error)
DEBUG_STREAM << "ProtocolManager::processResponse()" << endl;
if(!response_sp->IsInitialized())
throw std::runtime_error("Response not initialized");
if(filePath.empty())
throw std::invalid_argument("Empty file path received");
if(fileName.empty())
throw std::invalid_argument("Empty file path received");
boost::uint64_t fileSize = response_sp->file_size();
if(response_sp->state() == Response::REQUEST_ACCEPTED)
{
INFO_STREAM << "ProtocolManager::processResponse() transfer file "
<< fileName << " version " << fileVersion << " size " << fileSize << endl;
return FileWrapper::create(m_deviceImpl_p,
m_configuration_sp->getStoragePath(), filePath,
fileVersion, fileName, fileSize);
}
else if(response_sp->state() == Response::METADATA_NOT_FOUND ||
response_sp->state() == Response::FILE_NOT_DOWNLOADED ||
response_sp->state() == Response::FILE_NOT_FOUND)
{
throw std::logic_error(response_sp->status());
}
else
throw std::runtime_error(response_sp->status());
//==============================================================================
// ProtocolManager::setFileTransfered()
//==============================================================================
void ProtocolManager::setFileTransfered(FileWrapper::SP fileWrapper_sp)
throw(std::logic_error, std::runtime_error)
DEBUG_STREAM << "ProtocolManager::setFileTransfered()" << endl;
std::string storagePath = fileWrapper_sp->getStoragePath();
std::string filePath = fileWrapper_sp->getFilePath();
if(!m_recoveryMode)
{
if(!m_newFileRowset_sp ||
m_newFileRowsetIt == m_newFileRowset_sp->end())
throw std::runtime_error("New list not initialized or empty");
if(!m_newFileRowsetIt->get<0>())
throw std::invalid_argument("Empty file version found on new list");
int fileVersion = m_newFileRowsetIt->get<0>().get();
if(!m_newFileRowsetIt->get<1>())
throw std::invalid_argument("Empty file name found on new list");
std::string fileName = m_newFileRowsetIt->get<1>().get();
if(!m_newFileRowsetIt->get<2>())
throw std::invalid_argument("Empty update time found on new list");
std::tm update_time = m_newFileRowsetIt->get<2>().get();
INFO_STREAM << "ProtocolManager::setFileTransfered() file "
<< fileName << " version " << fileVersion << " transfered" << endl;
boost::posix_time::ptime newPtime = boost::posix_time::ptime_from_tm(update_time);
if(m_currentPtime.is_not_a_date_time())
m_currentPtime = newPtime;
DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction();
DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction();
if(newPtime > m_currentPtime)
m_dBManager_sp->persistLastTimestamp(newPtime);
m_dBManager_sp->updateNewFilePath(storagePath, filePath, fileVersion, fileName);
auxTransaction_sp->commit();
mainTransaction_sp->commit();
}
else
{
if(!m_failedFileRowset_sp ||
m_failedFileRowsetIt == m_failedFileRowset_sp->end())
throw std::runtime_error("Failed list not initialized or empty");
if(!m_failedFileRowsetIt->get<0>())
throw std::invalid_argument("Empty file version found on failed list");
int fileVersion = m_failedFileRowsetIt->get<0>().get();
if(!m_failedFileRowsetIt->get<1>())
throw std::invalid_argument("Empty file name found on failed list");
string fileName = m_failedFileRowsetIt->get<1>().get();
DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction();
DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction();
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
m_dBManager_sp->removeFailedFile(fileVersion, fileName);
m_dBManager_sp->updateNewFilePath(storagePath, filePath, fileVersion, fileName);
auxTransaction_sp->commit();
mainTransaction_sp->commit();
}
}
//==============================================================================
// ProtocolManager::markAsFailed()
//==============================================================================
void ProtocolManager::setFileFailed() throw(std::logic_error, std::runtime_error)
{
DEBUG_STREAM << "ProtocolManager::markAsFailed()" << endl;
if(!m_recoveryMode)
{
if(!m_newFileRowset_sp ||
m_newFileRowsetIt == m_newFileRowset_sp->end())
throw std::runtime_error("New list not initialized or empty");
if(!m_newFileRowsetIt->get<0>())
throw std::invalid_argument("Empty file version found on new list");
int fileVersion = m_newFileRowsetIt->get<0>().get();
if(!m_newFileRowsetIt->get<1>())
throw std::invalid_argument("Empty file name found on new list");
string fileName = m_newFileRowsetIt->get<1>().get();
m_dBManager_sp->addFailedFile(fileVersion, fileName);
}
else
{
//TODO: file failed again -> what to do?
}