Newer
Older
#include <boost/filesystem.hpp>
namespace DataImporter_ns
{
//==============================================================================
//==============================================================================
ProtocolManager::ProtocolManager(DataImporter* dataImporter_p,
Configuration::SP configuration_sp, DBManager::SP dBManager_sp) :
Tango::LogAdapter(dataImporter_p), m_dataImporter_p(dataImporter_p),
m_configuration_sp(configuration_sp), m_dBManager_sp(dBManager_sp)
{
DEBUG_STREAM << "ProtocolManager::ProtocolManager()" << endl;
}
//==============================================================================
//==============================================================================
ProtocolManager::~ProtocolManager()
{
DEBUG_STREAM << "ProtocolManager::~ProtocolManager()" << endl;
}
//==============================================================================
//==============================================================================
ProtocolManager::SP ProtocolManager::create(DataImporter* dataImporter_p,
Configuration::SP configuration_sp, DBManager::SP dBManager_sp)
{
ProtocolManager::SP d_sp(new ProtocolManager(dataImporter_p, configuration_sp,
dBManager_sp), ProtocolManager::Deleter());
return d_sp;
}
//==============================================================================
//==============================================================================
void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint)
{
DEBUG_STREAM << "ProtocolManager::setRemoteEndpoint()" << endl;
m_remoteEndpoint = remoteEndpoint;
}
//==============================================================================
//==============================================================================
void ProtocolManager::retrieveFiles() throw(std::runtime_error)
DEBUG_STREAM << "ProtocolManager::retrieveFiles()" << endl;
boost::posix_time::ptime m_lastTimestamp =
m_dBManager_sp->retrieveLastTimestamp();
DEBUG_STREAM << "ProtocolManager::updateNewList() last timestamp "
<< boost::posix_time::to_simple_string(m_lastTimestamp) << endl;
m_newFileRowset_sp = m_dBManager_sp->retrieveNewFiles(m_lastTimestamp);
m_newFileRowsetIt = m_newFileRowset_sp->begin();
m_failedFileRowset_sp = m_dBManager_sp->retrieveFailedFiles();
m_failedFileRowsetIt = m_failedFileRowset_sp->begin();
}
//==============================================================================
// ProtocolManager::hasFilesToTransfer()
//==============================================================================
bool ProtocolManager::hasFilesToTransfer()
DEBUG_STREAM << "ProtocolManager::hasFilesToTransfer()" << endl;
if(m_newFileRowset_sp &&
m_newFileRowsetIt != m_newFileRowset_sp->end())
INFO_STREAM << "ProtocolManager::hasFilesToTransfer() in new list" << endl;
else if(m_failedFileRowset_sp &&
m_failedFileRowsetIt != m_failedFileRowset_sp->end())
INFO_STREAM << "ProtocolManager::hasFilesToTransfer() in failed list" << endl;
return true;
}
}
return false;
}
//==============================================================================
//==============================================================================
DEBUG_STREAM << "ProtocolManager::hasNextFile()" << endl;
if(m_newFileRowset_sp &&
m_newFileRowsetIt != m_newFileRowset_sp->end())
{
return true;
}
m_failedFileRowsetIt != m_failedFileRowset_sp->end())
{
return true;
}
else
{
return false;
}
//==============================================================================
//==============================================================================
RequestSP ProtocolManager::createtRequest() throw(std::runtime_error)
DEBUG_STREAM << "ProtocolManager::createtRequest()" << endl;
if(m_newFileRowset_sp &&
m_newFileRowsetIt != m_newFileRowset_sp->end())
else if(m_failedFileRowset_sp &&
m_failedFileRowsetIt != m_failedFileRowset_sp->end())
return fillRequest(m_failedFileRowsetIt);
else
{
throw std::runtime_error("Lists not initialized");
//==============================================================================
//==============================================================================
FileWrapper::SP ProtocolManager::processResponse(ResponseSP response_sp)
throw(std::logic_error, std::runtime_error)
DEBUG_STREAM << "ProtocolManager::processResponse()" << endl;
if(!response_sp->IsInitialized())
throw std::runtime_error("Response not initialized");
if(response_sp->state() == Response::REQUEST_ACCEPTED)
{
std::string filePath = response_sp->file_path();
throw std::runtime_error("Empty file path received");
int fileVersion = response_sp->file_version();
std::string fileName = response_sp->file_name();
throw std::runtime_error("Empty file path received");
boost::uint64_t fileSize = response_sp->file_size();
INFO_STREAM << "ProtocolManager::processResponse() transfer file "
<< fileName << " version " << fileVersion << " size " << fileSize
<< " from " << m_remoteEndpoint << endl;
return FileWrapper::create(m_dataImporter_p,
m_configuration_sp->getStoragePath(), filePath,
fileVersion, fileName, fileSize);
}
else if(response_sp->state() == Response::METADATA_NOT_FOUND ||
response_sp->state() == Response::FILE_NOT_DOWNLOADED ||
response_sp->state() == Response::FILE_NOT_FOUND)
{
throw std::logic_error(response_sp->status());
}
else
throw std::runtime_error(response_sp->status());
//==============================================================================
// ProtocolManager::setCurrentFileDownloaded()
//==============================================================================
void ProtocolManager::setCurrentFileDownloaded(FileWrapper::SP fileWrapper_sp)
throw(std::runtime_error)
DEBUG_STREAM << "ProtocolManager::setCurrentFileDownloaded()" << endl;
std::string storagePath = fileWrapper_sp->getStoragePath();
std::string filePath = fileWrapper_sp->getFilePath();
if(m_newFileRowset_sp &&
m_newFileRowsetIt != m_newFileRowset_sp->end())
{
if(!m_newFileRowsetIt->get<2>())
throw std::runtime_error("Empty file version found");
int fileVersion = m_newFileRowsetIt->get<2>().get();
if(!m_newFileRowsetIt->get<3>())
throw std::runtime_error("Empty file name found");
std::string fileName = m_newFileRowsetIt->get<3>().get();
if(!m_newFileRowsetIt->get<4>())
throw std::runtime_error("Empty update time found");
std::tm currentTm = m_newFileRowsetIt->get<4>().get();
INFO_STREAM << "ProtocolManager::setNewFileTransfered() file "
<< fileName << " version " << fileVersion << " transfered" << endl;
boost::posix_time::ptime currentPtime =
boost::posix_time::ptime_from_tm(currentTm);
boost::posix_time::ptime nextPtime(boost::posix_time::pos_infin);
if(m_newFileRowsetIt != m_newFileRowset_sp->end())
{
if(!m_newFileRowsetIt->get<4>())
throw std::runtime_error("Empty next update time found");
std::tm nextTm = m_newFileRowsetIt->get<4>().get();
nextPtime =boost::posix_time::ptime_from_tm(nextTm);
}
DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction();
DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction();
if(nextPtime > currentPtime)
m_dBManager_sp->persistLastTimestamp(currentPtime);
m_dBManager_sp->updateNewFilePath(storagePath, filePath, fileVersion, fileName);
auxTransaction_sp->commit();
mainTransaction_sp->commit();
m_dataImporter_p->incrementRegularCounter();
else if(m_failedFileRowset_sp &&
m_failedFileRowsetIt != m_failedFileRowset_sp->end())
{
if(!m_failedFileRowsetIt->get<2>())
throw std::runtime_error("Empty file version found");
int fileVersion = m_failedFileRowsetIt->get<2>().get();
if(!m_failedFileRowsetIt->get<3>())
throw std::runtime_error("Empty file name found");
string fileName = m_failedFileRowsetIt->get<3>().get();
DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction();
DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction();
m_dBManager_sp->removeFailedFile(fileVersion, fileName);
m_dBManager_sp->updateNewFilePath(storagePath, filePath, fileVersion, fileName);
auxTransaction_sp->commit();
mainTransaction_sp->commit();
m_dataImporter_p->decrementFailedCounter();
m_dataImporter_p->incrementRegularCounter();
}
else
{
throw std::runtime_error("Lists not initialized");
//==============================================================================
// ProtocolManager::setCurrentFileFailed()
//==============================================================================
void ProtocolManager::setCurrentFileFailed() throw(std::runtime_error)
DEBUG_STREAM << "ProtocolManager::setCurrentFileFailed()" << endl;
if(m_newFileRowset_sp &&
m_newFileRowsetIt != m_newFileRowset_sp->end())
{
if(!m_newFileRowsetIt->get<2>())
throw std::runtime_error("Empty file version found");
int fileVersion = m_newFileRowsetIt->get<2>().get();
if(!m_newFileRowsetIt->get<3>())
throw std::runtime_error("Empty file name found");
string fileName = m_newFileRowsetIt->get<3>().get();
if(!m_newFileRowsetIt->get<4>())
throw std::runtime_error("Empty update time found");
std::tm currentTm = m_newFileRowsetIt->get<4>().get();
INFO_STREAM << "ProtocolManager::setFileFailed() file "
<< fileName << " version " << fileVersion << " not transfered" << endl;
boost::posix_time::ptime currentPtime =
boost::posix_time::ptime_from_tm(currentTm);
boost::posix_time::ptime nextPtime(boost::posix_time::pos_infin);
if(m_newFileRowsetIt != m_newFileRowset_sp->end())
{
if(!m_newFileRowsetIt->get<4>())
throw std::runtime_error("Empty next update time found");
std::tm nextTm = m_newFileRowsetIt->get<4>().get();
nextPtime =boost::posix_time::ptime_from_tm(nextTm);
}
DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction();
DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction();
if(nextPtime > currentPtime)
m_dBManager_sp->persistLastTimestamp(currentPtime);
m_dBManager_sp->addFailedFile(fileVersion, fileName);
auxTransaction_sp->commit();
mainTransaction_sp->commit();
m_dataImporter_p->incrementFailedCounter();
}
else if(m_failedFileRowset_sp &&
m_failedFileRowsetIt != m_failedFileRowset_sp->end())
{
}
else
{
throw std::runtime_error("Lists not initialized");
}
//==============================================================================
// ProtocolManager::isRecoveryTimeElapsed()
//==============================================================================
bool ProtocolManager::isRecoveryTimeElapsed()
DEBUG_STREAM << "ProtocolManager::isRecoveryTimeElapsed()" << endl;
boost::posix_time::ptime now(boost::posix_time::second_clock::local_time());
if(m_recoveryModeTime.is_not_a_date_time())
m_recoveryModeTime = now;
boost::posix_time::time_duration diff = now - m_recoveryModeTime;
DEBUG_STREAM << "ProtocolManager::isRecoveryTimeElapsed() " << diff.total_seconds()
<< "/" << (int)m_configuration_sp->getRecoveryTime() << endl;
if(diff.total_seconds() > (int)m_configuration_sp->getRecoveryTime())
m_recoveryModeTime = now;
return true;
}
else
{
return false;
//==============================================================================
// ProtocolManager::fillRequest()
//==============================================================================
RequestSP ProtocolManager::fillRequest(DBManager::FileRowset::const_iterator it)
throw(std::runtime_error)
{
DEBUG_STREAM << "ProtocolManager::fillRequest()" << endl;
request_sp->set_username(m_configuration_sp->getDatabaseUsername());
request_sp->set_password(m_configuration_sp->getDatabasePassword());
request_sp->set_schema(m_configuration_sp->getRemoteSchema());
request_sp->set_table(m_configuration_sp->getRemoteTable());
if(!it->get<2>())
throw std::runtime_error("Empty file version found");
int fileVersion = it->get<2>().get();
if(!it->get<3>())
throw std::runtime_error("Empty file name found");
std::string fileName = it->get<3>().get();
request_sp->set_file_version(fileVersion);
request_sp->set_file_name(fileName);
INFO_STREAM << "ProtocolManager::fillRequest() file " << fileName
<< " version " << fileVersion << " to " << m_remoteEndpoint << endl;
if(!request_sp->IsInitialized())
throw std::runtime_error("Request not initialized");