#include #include #include namespace DataImporter_ns { //============================================================================== // ProtocolManager::ProtocolManager() //============================================================================== ProtocolManager::ProtocolManager(DataImporter* dataImporter_p, Configuration::SP configuration_sp, DBManager::SP dBManager_sp) : Tango::LogAdapter(dataImporter_p), m_dataImporter_p(dataImporter_p), m_configuration_sp(configuration_sp), m_dBManager_sp(dBManager_sp) { DEBUG_STREAM << "ProtocolManager::ProtocolManager()" << endl; } //============================================================================== // ProtocolManager::~ProtocolManager() //============================================================================== ProtocolManager::~ProtocolManager() { DEBUG_STREAM << "ProtocolManager::~ProtocolManager()" << endl; } //============================================================================== // ProtocolManager::create() //============================================================================== ProtocolManager::SP ProtocolManager::create(DataImporter* dataImporter_p, Configuration::SP configuration_sp, DBManager::SP dBManager_sp) { ProtocolManager::SP d_sp(new ProtocolManager(dataImporter_p, configuration_sp, dBManager_sp), ProtocolManager::Deleter()); return d_sp; } //============================================================================== // ProtocolManager::setRemoteEndpoint() //============================================================================== void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint) { DEBUG_STREAM << "ProtocolManager::setRemoteEndpoint()" << endl; m_remoteEndpoint = remoteEndpoint; } //============================================================================== // ProtocolManager::retrieveFiles() //============================================================================== void ProtocolManager::retrieveFiles() throw(std::runtime_error) { DEBUG_STREAM << "ProtocolManager::retrieveFiles()" << endl; boost::posix_time::ptime m_lastTimestamp = m_dBManager_sp->retrieveLastTimestamp(); DEBUG_STREAM << "ProtocolManager::updateNewList() last timestamp " << boost::posix_time::to_simple_string(m_lastTimestamp) << endl; m_newFileRowset_sp = m_dBManager_sp->retrieveNewFiles(m_lastTimestamp); m_newFileRowsetIt = m_newFileRowset_sp->begin(); m_failedFileRowset_sp = m_dBManager_sp->retrieveFailedFiles(); m_failedFileRowsetIt = m_failedFileRowset_sp->begin(); } //============================================================================== // ProtocolManager::hasFilesToTransfer() //============================================================================== bool ProtocolManager::hasFilesToTransfer() { DEBUG_STREAM << "ProtocolManager::hasFilesToTransfer()" << endl; if(m_newFileRowset_sp && m_newFileRowsetIt != m_newFileRowset_sp->end()) { INFO_STREAM << "ProtocolManager::hasFilesToTransfer() in new list" << endl; return true; } else if(m_failedFileRowset_sp && m_failedFileRowsetIt != m_failedFileRowset_sp->end()) { if(isRecoveryTimeElapsed()) { INFO_STREAM << "ProtocolManager::hasFilesToTransfer() in failed list" << endl; return true; } } return false; } //============================================================================== // ProtocolManager::hasNextFile() //============================================================================== bool ProtocolManager::hasNextFile() { DEBUG_STREAM << "ProtocolManager::hasNextFile()" << endl; if(m_newFileRowset_sp && m_newFileRowsetIt != m_newFileRowset_sp->end()) { return true; } else if(m_failedFileRowset_sp && m_failedFileRowsetIt != m_failedFileRowset_sp->end()) { return true; } else { return false; } } //============================================================================== // ProtocolManager::createtRequest() //============================================================================== RequestSP ProtocolManager::createtRequest() throw(std::runtime_error) { DEBUG_STREAM << "ProtocolManager::createtRequest()" << endl; if(m_newFileRowset_sp && m_newFileRowsetIt != m_newFileRowset_sp->end()) { return fillRequest(m_newFileRowsetIt); } else if(m_failedFileRowset_sp && m_failedFileRowsetIt != m_failedFileRowset_sp->end()) { return fillRequest(m_failedFileRowsetIt); } else { throw std::runtime_error("Lists not initialized"); } } //============================================================================== // ProtocolManager::processResponse() //============================================================================== FileWrapper::SP ProtocolManager::processResponse(ResponseSP response_sp) throw(std::logic_error, std::runtime_error) { DEBUG_STREAM << "ProtocolManager::processResponse()" << endl; if(!response_sp->IsInitialized()) throw std::runtime_error("Response not initialized"); if(response_sp->state() == Response::REQUEST_ACCEPTED) { std::string filePath = response_sp->file_path(); if(filePath.empty()) throw std::runtime_error("Empty file path received"); int fileVersion = response_sp->file_version(); std::string fileName = response_sp->file_name(); if(fileName.empty()) throw std::runtime_error("Empty file path received"); boost::uint64_t fileSize = response_sp->file_size(); INFO_STREAM << "ProtocolManager::processResponse() transfer file " << fileName << " version " << fileVersion << " size " << fileSize << " from " << m_remoteEndpoint << endl; return FileWrapper::create(m_dataImporter_p, m_configuration_sp->getStoragePath(), filePath, fileVersion, fileName, fileSize); } else if(response_sp->state() == Response::METADATA_NOT_FOUND || response_sp->state() == Response::FILE_NOT_DOWNLOADED || response_sp->state() == Response::FILE_NOT_FOUND) { throw std::logic_error(response_sp->status()); } else throw std::runtime_error(response_sp->status()); } //============================================================================== // ProtocolManager::setCurrentFileDownloaded() //============================================================================== void ProtocolManager::setCurrentFileDownloaded(FileWrapper::SP fileWrapper_sp) throw(std::runtime_error) { DEBUG_STREAM << "ProtocolManager::setCurrentFileDownloaded()" << endl; std::string storagePath = fileWrapper_sp->getStoragePath(); std::string filePath = fileWrapper_sp->getFilePath(); if(m_newFileRowset_sp && m_newFileRowsetIt != m_newFileRowset_sp->end()) { if(!m_newFileRowsetIt->get<2>()) throw std::runtime_error("Empty file version found"); int fileVersion = m_newFileRowsetIt->get<2>().get(); if(!m_newFileRowsetIt->get<3>()) throw std::runtime_error("Empty file name found"); std::string fileName = m_newFileRowsetIt->get<3>().get(); if(!m_newFileRowsetIt->get<4>()) throw std::runtime_error("Empty update time found"); std::tm currentTm = m_newFileRowsetIt->get<4>().get(); INFO_STREAM << "ProtocolManager::setNewFileTransfered() file " << fileName << " version " << fileVersion << " transfered" << endl; boost::posix_time::ptime currentPtime = boost::posix_time::ptime_from_tm(currentTm); boost::posix_time::ptime nextPtime(boost::posix_time::pos_infin); ++m_newFileRowsetIt; if(m_newFileRowsetIt != m_newFileRowset_sp->end()) { if(!m_newFileRowsetIt->get<4>()) throw std::runtime_error("Empty next update time found"); std::tm nextTm = m_newFileRowsetIt->get<4>().get(); nextPtime =boost::posix_time::ptime_from_tm(nextTm); } DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction(); DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction(); if(nextPtime > currentPtime) m_dBManager_sp->persistLastTimestamp(currentPtime); m_dBManager_sp->updateNewFilePath(storagePath, filePath, fileVersion, fileName); auxTransaction_sp->commit(); mainTransaction_sp->commit(); m_dataImporter_p->incrementRegularCounter(); } else if(m_failedFileRowset_sp && m_failedFileRowsetIt != m_failedFileRowset_sp->end()) { if(!m_failedFileRowsetIt->get<2>()) throw std::runtime_error("Empty file version found"); int fileVersion = m_failedFileRowsetIt->get<2>().get(); if(!m_failedFileRowsetIt->get<3>()) throw std::runtime_error("Empty file name found"); string fileName = m_failedFileRowsetIt->get<3>().get(); ++m_failedFileRowsetIt; DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction(); DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction(); m_dBManager_sp->removeFailedFile(fileVersion, fileName); m_dBManager_sp->updateNewFilePath(storagePath, filePath, fileVersion, fileName); auxTransaction_sp->commit(); mainTransaction_sp->commit(); m_dataImporter_p->decrementFailedCounter(); m_dataImporter_p->incrementRegularCounter(); } else { throw std::runtime_error("Lists not initialized"); } } //============================================================================== // ProtocolManager::setCurrentFileFailed() //============================================================================== void ProtocolManager::setCurrentFileFailed() throw(std::runtime_error) { DEBUG_STREAM << "ProtocolManager::setCurrentFileFailed()" << endl; if(m_newFileRowset_sp && m_newFileRowsetIt != m_newFileRowset_sp->end()) { if(!m_newFileRowsetIt->get<2>()) throw std::runtime_error("Empty file version found"); int fileVersion = m_newFileRowsetIt->get<2>().get(); if(!m_newFileRowsetIt->get<3>()) throw std::runtime_error("Empty file name found"); string fileName = m_newFileRowsetIt->get<3>().get(); if(!m_newFileRowsetIt->get<4>()) throw std::runtime_error("Empty update time found"); std::tm currentTm = m_newFileRowsetIt->get<4>().get(); INFO_STREAM << "ProtocolManager::setFileFailed() file " << fileName << " version " << fileVersion << " not transfered" << endl; boost::posix_time::ptime currentPtime = boost::posix_time::ptime_from_tm(currentTm); boost::posix_time::ptime nextPtime(boost::posix_time::pos_infin); ++m_newFileRowsetIt; if(m_newFileRowsetIt != m_newFileRowset_sp->end()) { if(!m_newFileRowsetIt->get<4>()) throw std::runtime_error("Empty next update time found"); std::tm nextTm = m_newFileRowsetIt->get<4>().get(); nextPtime =boost::posix_time::ptime_from_tm(nextTm); } DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction(); DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction(); if(nextPtime > currentPtime) m_dBManager_sp->persistLastTimestamp(currentPtime); m_dBManager_sp->addFailedFile(fileVersion, fileName); auxTransaction_sp->commit(); mainTransaction_sp->commit(); m_dataImporter_p->incrementFailedCounter(); } else if(m_failedFileRowset_sp && m_failedFileRowsetIt != m_failedFileRowset_sp->end()) { ++m_failedFileRowsetIt; } else { throw std::runtime_error("Lists not initialized"); } } //============================================================================== // ProtocolManager::isRecoveryTimeElapsed() //============================================================================== bool ProtocolManager::isRecoveryTimeElapsed() { DEBUG_STREAM << "ProtocolManager::isRecoveryTimeElapsed()" << endl; boost::posix_time::ptime now(boost::posix_time::second_clock::local_time()); if(m_recoveryModeTime.is_not_a_date_time()) m_recoveryModeTime = now; boost::posix_time::time_duration diff = now - m_recoveryModeTime; DEBUG_STREAM << "ProtocolManager::isRecoveryTimeElapsed() " << diff.total_seconds() << "/" << (int)m_configuration_sp->getRecoveryTime() << endl; if(diff.total_seconds() > (int)m_configuration_sp->getRecoveryTime()) { m_recoveryModeTime = now; return true; } else { return false; } } //============================================================================== // ProtocolManager::fillRequest() //============================================================================== RequestSP ProtocolManager::fillRequest(DBManager::FileRowset::const_iterator it) throw(std::runtime_error) { DEBUG_STREAM << "ProtocolManager::fillRequest()" << endl; RequestSP request_sp(new Request); request_sp->set_username(m_configuration_sp->getDatabaseUsername()); request_sp->set_password(m_configuration_sp->getDatabasePassword()); request_sp->set_schema(m_configuration_sp->getRemoteSchema()); request_sp->set_table(m_configuration_sp->getRemoteTable()); if(!it->get<2>()) throw std::runtime_error("Empty file version found"); int fileVersion = it->get<2>().get(); if(!it->get<3>()) throw std::runtime_error("Empty file name found"); std::string fileName = it->get<3>().get(); request_sp->set_file_version(fileVersion); request_sp->set_file_name(fileName); INFO_STREAM << "ProtocolManager::fillRequest() file " << fileName << " version " << fileVersion << " to " << m_remoteEndpoint << endl; if(!request_sp->IsInitialized()) throw std::runtime_error("Request not initialized"); return request_sp; } } //namespace