#include #include #include namespace DataImporter_ns { //============================================================================== // ProtocolManager::ProtocolManager() //============================================================================== ProtocolManager::ProtocolManager(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp, DBManager::SP dBManager_sp) : Tango::LogAdapter(deviceImpl_p), m_deviceImpl_p(deviceImpl_p), m_configuration_sp(configuration_sp), m_dBManager_sp(dBManager_sp) { DEBUG_STREAM << "ProtocolManager::ProtocolManager()" << endl; } //============================================================================== // ProtocolManager::~ProtocolManager() //============================================================================== ProtocolManager::~ProtocolManager() { DEBUG_STREAM << "ProtocolManager::~ProtocolManager()" << endl; } //============================================================================== // ProtocolManager::create() //============================================================================== ProtocolManager::SP ProtocolManager::create(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp, DBManager::SP dBManager_sp) { ProtocolManager::SP d_sp(new ProtocolManager(deviceImpl_p, configuration_sp, dBManager_sp), ProtocolManager::Deleter()); return d_sp; } //============================================================================== // ProtocolManager::setRemoteEndpoint() //============================================================================== void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint) { DEBUG_STREAM << "ProtocolManager::setRemoteEndpoint()" << endl; m_remoteEndpoint = remoteEndpoint; } //============================================================================== // ProtocolManager::updateFileLists() //============================================================================== void ProtocolManager::updateFileLists() throw(std::runtime_error) { DEBUG_STREAM << "ProtocolManager::updateFileLists()" << endl; boost::posix_time::ptime m_lastTimestamp = m_dBManager_sp->retrieveLastTimestamp(); DEBUG_STREAM << "ProtocolManager::updateFileLists() last timestamp " << boost::posix_time::to_simple_string(m_lastTimestamp) << endl; m_newFileRowset_sp = m_dBManager_sp->retrieveNewFiles(m_lastTimestamp); m_newFileRowsetIt = m_newFileRowset_sp->begin(); m_failedFileRowset_sp = m_dBManager_sp->retrieveFailedFiles(); m_failedFileRowsetIt = m_failedFileRowset_sp->begin(); } //============================================================================== // ProtocolManager::hasNextFile() //============================================================================== bool ProtocolManager::hasNextFile() { DEBUG_STREAM << "ProtocolManager::hasNextFile()" << endl; if(m_newFileRowset_sp && m_newFileRowsetIt != m_newFileRowset_sp->end()) { DEBUG_STREAM << "ProtocolManager::hasNextFile() from new list" << endl; m_recoveryMode = false; return true; } else if(m_failedFileRowset_sp && m_failedFileRowsetIt != m_failedFileRowset_sp->end()) { DEBUG_STREAM << "ProtocolManager::hasNextFile() from failed list" << endl; m_recoveryMode = true; return true; } else { DEBUG_STREAM << "ProtocolManager::hasNextFile() lists empty" << endl; m_recoveryMode = false; return false; } } //============================================================================== // ProtocolManager::nextFile() //============================================================================== void ProtocolManager::nextFile() { DEBUG_STREAM << "ProtocolManager::nextFile()" << endl; if(!m_recoveryMode) { if(m_newFileRowset_sp && m_newFileRowsetIt != m_newFileRowset_sp->end()) { DEBUG_STREAM << "ProtocolManager::nextFile() new list" << endl; ++m_newFileRowsetIt; } } else { if(m_failedFileRowset_sp && m_failedFileRowsetIt != m_failedFileRowset_sp->end()) { DEBUG_STREAM << "ProtocolManager::nextFile() from failed list" << endl; ++m_failedFileRowsetIt; } } } //============================================================================== // ProtocolManager::createRequest() //============================================================================== RequestSP ProtocolManager::createRequest() throw(std::logic_error, std::runtime_error) { DEBUG_STREAM << "ProtocolManager::createRequest()" << endl; RequestSP request_sp(new Request); request_sp->set_username(m_configuration_sp->getDatabaseUsername()); request_sp->set_password(m_configuration_sp->getDatabasePassword()); request_sp->set_schema(m_configuration_sp->getDatabaseSchema()); request_sp->set_table(m_configuration_sp->getDatabaseTable()); int fileVersion; std::string fileName; if(!m_recoveryMode) { if(!m_newFileRowset_sp || m_newFileRowsetIt == m_newFileRowset_sp->end()) throw std::runtime_error("New list not initialized or empty"); if(!m_newFileRowsetIt->get<0>()) throw std::invalid_argument("Empty file version found on new list"); fileVersion = m_newFileRowsetIt->get<0>().get(); if(!m_newFileRowsetIt->get<1>()) throw std::invalid_argument("Empty file name found on new list"); fileName = m_newFileRowsetIt->get<1>().get(); INFO_STREAM << "ProtocolManager::createRequest() request new file " << fileName << " version " << fileVersion << endl; } else { if(!m_failedFileRowset_sp || m_failedFileRowsetIt == m_failedFileRowset_sp->end()) throw std::runtime_error("Failed list not initialized or empty"); if(!m_failedFileRowsetIt->get<0>()) throw std::invalid_argument("Empty file version found on failed list"); fileVersion = m_failedFileRowsetIt->get<0>().get(); if(!m_failedFileRowsetIt->get<1>()) throw std::invalid_argument("Empty file name found on failed list"); fileName = m_failedFileRowsetIt->get<1>().get(); INFO_STREAM << "ProtocolManager::createRequest() request failed file " << fileName << " version " << fileVersion << endl; } request_sp->set_file_version(fileVersion); request_sp->set_file_name(fileName); if(!request_sp->IsInitialized()) throw std::runtime_error("Request not initialized"); return request_sp; } //============================================================================== // ProtocolManager::processResponse() //============================================================================== FileWrapper::SP ProtocolManager::processResponse(ResponseSP response_sp) throw(std::logic_error, std::runtime_error) { DEBUG_STREAM << "ProtocolManager::processResponse()" << endl; if(!response_sp->IsInitialized()) throw std::runtime_error("Response not initialized"); std::string filePath = response_sp->file_path(); if(filePath.empty()) throw std::invalid_argument("Empty file path received"); int fileVersion = response_sp->file_version(); std::string fileName = response_sp->file_name(); if(fileName.empty()) throw std::invalid_argument("Empty file path received"); boost::uint64_t fileSize = response_sp->file_size(); if(response_sp->state() == Response::REQUEST_ACCEPTED) { INFO_STREAM << "ProtocolManager::processResponse() transfer file " << fileName << " version " << fileVersion << " size " << fileSize << endl; return FileWrapper::create(m_deviceImpl_p, m_configuration_sp->getStoragePath(), filePath, fileVersion, fileName, fileSize); } else if(response_sp->state() == Response::METADATA_NOT_FOUND || response_sp->state() == Response::FILE_NOT_DOWNLOADED || response_sp->state() == Response::FILE_NOT_FOUND) { throw std::logic_error(response_sp->status()); } else throw std::runtime_error(response_sp->status()); } //============================================================================== // ProtocolManager::setFileTransfered() //============================================================================== void ProtocolManager::setFileTransfered(FileWrapper::SP fileWrapper_sp) throw(std::logic_error, std::runtime_error) { DEBUG_STREAM << "ProtocolManager::setFileTransfered()" << endl; std::string storagePath = fileWrapper_sp->getStoragePath(); std::string filePath = fileWrapper_sp->getFilePath(); if(!m_recoveryMode) { if(!m_newFileRowset_sp || m_newFileRowsetIt == m_newFileRowset_sp->end()) throw std::runtime_error("New list not initialized or empty"); if(!m_newFileRowsetIt->get<0>()) throw std::invalid_argument("Empty file version found on new list"); int fileVersion = m_newFileRowsetIt->get<0>().get(); if(!m_newFileRowsetIt->get<1>()) throw std::invalid_argument("Empty file name found on new list"); std::string fileName = m_newFileRowsetIt->get<1>().get(); if(!m_newFileRowsetIt->get<2>()) throw std::invalid_argument("Empty update time found on new list"); std::tm update_time = m_newFileRowsetIt->get<2>().get(); INFO_STREAM << "ProtocolManager::setFileTransfered() file " << fileName << " version " << fileVersion << " transfered" << endl; boost::posix_time::ptime newPtime = boost::posix_time::ptime_from_tm(update_time); if(m_currentPtime.is_not_a_date_time()) m_currentPtime = newPtime; DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction(); DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction(); if(newPtime > m_currentPtime) m_dBManager_sp->persistLastTimestamp(newPtime); m_dBManager_sp->updateNewFilePath(storagePath, filePath, fileVersion, fileName); auxTransaction_sp->commit(); mainTransaction_sp->commit(); } else { if(!m_failedFileRowset_sp || m_failedFileRowsetIt == m_failedFileRowset_sp->end()) throw std::runtime_error("Failed list not initialized or empty"); if(!m_failedFileRowsetIt->get<0>()) throw std::invalid_argument("Empty file version found on failed list"); int fileVersion = m_failedFileRowsetIt->get<0>().get(); if(!m_failedFileRowsetIt->get<1>()) throw std::invalid_argument("Empty file name found on failed list"); string fileName = m_failedFileRowsetIt->get<1>().get(); DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction(); DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction(); m_dBManager_sp->removeFailedFile(fileVersion, fileName); m_dBManager_sp->updateNewFilePath(storagePath, filePath, fileVersion, fileName); auxTransaction_sp->commit(); mainTransaction_sp->commit(); } } //============================================================================== // ProtocolManager::markAsFailed() //============================================================================== void ProtocolManager::setFileFailed() throw(std::logic_error, std::runtime_error) { DEBUG_STREAM << "ProtocolManager::markAsFailed()" << endl; if(!m_recoveryMode) { if(!m_newFileRowset_sp || m_newFileRowsetIt == m_newFileRowset_sp->end()) throw std::runtime_error("New list not initialized or empty"); if(!m_newFileRowsetIt->get<0>()) throw std::invalid_argument("Empty file version found on new list"); int fileVersion = m_newFileRowsetIt->get<0>().get(); if(!m_newFileRowsetIt->get<1>()) throw std::invalid_argument("Empty file name found on new list"); string fileName = m_newFileRowsetIt->get<1>().get(); m_dBManager_sp->addFailedFile(fileVersion, fileName); } else { //TODO: file failed again -> what to do? } } } //namespace