#include #include #include #include namespace DataExporter_ns { //============================================================================== // Server::Server() //============================================================================== Server::Server(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p), m_deviceImpl_p(deviceImpl_p), m_configuration_sp(configuration_sp) { DEBUG_STREAM << "Server::Server()" << endl; GOOGLE_PROTOBUF_VERIFY_VERSION; m_dBManager_sp = DBManager::create(deviceImpl_p, configuration_sp); m_ioService_sp.reset(new boost::asio::io_service); m_acceptor_sp.reset(new boost::asio::ip::tcp::acceptor(*m_ioService_sp)); m_state = Tango::OFF; m_status="Disconnected"; } //============================================================================== // Server::~Server() //============================================================================== Server::~Server() { DEBUG_STREAM << "Server::~Server()" << endl; boost::system::error_code errorCode; m_acceptor_sp->close(errorCode); m_work_sp.reset(); m_ioService_sp->stop(); if(m_threadGroup_sp) { //m_threadGroup_sp->interrupt_all(); m_threadGroup_sp->join_all(); } DEBUG_STREAM << "Server::~Server() END" << endl; google::protobuf::ShutdownProtobufLibrary(); } //============================================================================== // Server::start() //============================================================================== void Server::start() throw(std::runtime_error) { DEBUG_STREAM << "Server::start()" << endl; m_dBManager_sp->connect(); m_ioService_sp->reset(); m_work_sp.reset(new boost::asio::io_service::work(*m_ioService_sp)); std::string localHost = m_configuration_sp->getLocalHost(); unsigned int localPort = m_configuration_sp->getLocalPort(); std::stringstream infoStream; infoStream << "Listening on " << localHost << ":" << localPort << endl; INFO_STREAM << "Server::start()" << infoStream.str() << endl; writeState(Tango::ON); writeStatus(infoStream.str()); boost::asio::ip::tcp::resolver::query query(localHost, boost::lexical_cast(localPort)); boost::asio::ip::tcp::resolver resolver(*m_ioService_sp); boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query); m_acceptor_sp->open(endpoint.protocol()); m_acceptor_sp->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); m_acceptor_sp->bind(endpoint); m_acceptor_sp->listen(); m_threadGroup_sp.reset(new boost::thread_group); unsigned int workerNumber = m_configuration_sp->getWorkerNumber(); WorkerThread worker(m_deviceImpl_p, m_ioService_sp); for(unsigned int i=0; iadd_thread(new boost::thread(&WorkerThread::run, worker)); startAccept(); } //============================================================================== // Server::stop() //============================================================================== void Server::stop() throw(std::runtime_error) { DEBUG_STREAM << "Server::stop()" << endl; writeState(Tango::OFF); writeStatus("Disconnected"); boost::system::error_code errorCode; m_acceptor_sp->close(errorCode); m_work_sp.reset(); m_ioService_sp->stop(); if(m_threadGroup_sp) { //m_threadGroup_sp->interrupt_all(); m_threadGroup_sp->join_all(); } m_threadGroup_sp.reset(); m_dBManager_sp->disconnect(); } //============================================================================== // Server::readState() //============================================================================== Tango::DevState Server::readState() { DEBUG_STREAM << "Server::readState()" << endl; boost::mutex::scoped_lock stateLock(m_stateMutex); return m_state; } //============================================================================== // Server::readStatus() //============================================================================== std::string Server::readStatus() { DEBUG_STREAM << "Server::readStatus()" << endl; boost::mutex::scoped_lock statusLock(m_statusMutex); return m_status; } //============================================================================== // Server::writeState() //============================================================================== void Server::writeState(Tango::DevState state) { DEBUG_STREAM << "Server::writeState()" << endl; boost::mutex::scoped_lock stateLock(m_stateMutex); m_state = state; } //============================================================================== // Server::writeStatus() //============================================================================== void Server::writeStatus(std::string status) { DEBUG_STREAM << "Server::writeStatus()" << endl; boost::mutex::scoped_lock statusLock(m_statusMutex); m_status = status; } //============================================================================== // Server::handleAccept() //============================================================================== void Server::handleAccept(Session::SP session_sp, const boost::system::error_code& ec) { DEBUG_STREAM << "Server::handleAccept()" << endl; if(!ec) { try { session_sp->start(); } catch(std::exception& ex) { ERROR_STREAM << "Server::handleAccept() " << ex.what() << endl; } catch(...) { ERROR_STREAM << "Server::handleAccept() unknown error... " << endl; } } else { ERROR_STREAM << "Server::handleAccept() " << ec.message() << endl; } startAccept(); } } //namespace