Skip to content
Server.cpp 5.86 KiB
Newer Older
#include <Server.h>
#include <WorkerThread.h>

#include <boost/lexical_cast.hpp>

#include <google/protobuf/stubs/common.h>

namespace DataExporter_ns
{

//==============================================================================
//      Server::Server()
//==============================================================================
Server::Server(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp) :
    Tango::LogAdapter(deviceImpl_p), m_deviceImpl_p(deviceImpl_p),
    m_configuration_sp(configuration_sp)
{
    DEBUG_STREAM << "Server::Server()" << endl;

    GOOGLE_PROTOBUF_VERIFY_VERSION;
Marco De Marco's avatar
Marco De Marco committed
    m_dBManager_sp = DBManager::create(deviceImpl_p, configuration_sp);

    m_ioService_sp.reset(new boost::asio::io_service);

    m_acceptor_sp.reset(new boost::asio::ip::tcp::acceptor(*m_ioService_sp));

    m_state = Tango::OFF;
    m_status="Disconnected";
}

//==============================================================================
//      Server::~Server()
//==============================================================================
Server::~Server()
{
    DEBUG_STREAM << "Server::~Server()" << endl;

    boost::system::error_code errorCode;
    m_acceptor_sp->close(errorCode);

    m_work_sp.reset();

    m_ioService_sp->stop();

    if(m_threadGroup_sp)
    {
        //m_threadGroup_sp->interrupt_all();
    DEBUG_STREAM << "Server::~Server() END" << endl;

    google::protobuf::ShutdownProtobufLibrary();
}

//==============================================================================
//      Server::start()
//==============================================================================
void Server::start() throw(std::runtime_error)
{
    DEBUG_STREAM << "Server::start()" << endl;

Marco De Marco's avatar
Marco De Marco committed
    m_dBManager_sp->connect();

    m_ioService_sp->reset();

    m_work_sp.reset(new boost::asio::io_service::work(*m_ioService_sp));

    std::string localHost = m_configuration_sp->getLocalHost();
    unsigned int localPort = m_configuration_sp->getLocalPort();

    std::stringstream infoStream;
    infoStream << "Listening on " << localHost << ":" << localPort << endl;

    INFO_STREAM << "Server::start()" << infoStream.str() << endl;

    writeState(Tango::ON);
    writeStatus(infoStream.str());

    boost::asio::ip::tcp::resolver::query query(localHost,
        boost::lexical_cast<std::string>(localPort));

    boost::asio::ip::tcp::resolver resolver(*m_ioService_sp);
    boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);

    m_acceptor_sp->open(endpoint.protocol());
    m_acceptor_sp->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
    m_acceptor_sp->bind(endpoint);
    m_acceptor_sp->listen();

    m_threadGroup_sp.reset(new boost::thread_group);

    unsigned int workerNumber = m_configuration_sp->getWorkerNumber();

    WorkerThread worker(m_deviceImpl_p, m_ioService_sp);

    for(unsigned int i=0; i<workerNumber; ++i)
        m_threadGroup_sp->add_thread(new boost::thread(&WorkerThread::run, worker));

    startAccept();
}

//==============================================================================
//      Server::stop()
//==============================================================================
void Server::stop() throw(std::runtime_error)
{
    DEBUG_STREAM << "Server::stop()" << endl;

    writeState(Tango::OFF);
    writeStatus("Disconnected");

    boost::system::error_code errorCode;
    m_acceptor_sp->close(errorCode);

    m_work_sp.reset();

    m_ioService_sp->stop();

    if(m_threadGroup_sp)
    {
        //m_threadGroup_sp->interrupt_all();

        m_threadGroup_sp->join_all();
    }

    m_threadGroup_sp.reset();

Marco De Marco's avatar
Marco De Marco committed
    m_dBManager_sp->disconnect();
}
//==============================================================================
//      Server::readState()
//==============================================================================
Tango::DevState Server::readState()
{
    DEBUG_STREAM << "Server::readState()" << endl;

    boost::mutex::scoped_lock stateLock(m_stateMutex);

    return m_state;
}

//==============================================================================
//      Server::readStatus()
//==============================================================================
std::string Server::readStatus()
{
    DEBUG_STREAM << "Server::readStatus()" << endl;

    boost::mutex::scoped_lock statusLock(m_statusMutex);

    return m_status;
}

//==============================================================================
//      Server::writeState()
//==============================================================================
void Server::writeState(Tango::DevState state)
{
    DEBUG_STREAM << "Server::writeState()" << endl;

    boost::mutex::scoped_lock stateLock(m_stateMutex);

    m_state = state;
}

//==============================================================================
//      Server::writeStatus()
//==============================================================================
void Server::writeStatus(std::string status)
{
    DEBUG_STREAM << "Server::writeStatus()" << endl;

    boost::mutex::scoped_lock statusLock(m_statusMutex);

    m_status = status;
}

//==============================================================================
//      Server::handleAccept()
//==============================================================================
void Server::handleAccept(Session::SP session_sp,
    const boost::system::error_code& ec)
{
    DEBUG_STREAM << "Server::handleAccept()" << endl;

    if(!ec)
    {
	 try
	 {
             session_sp->start();
	 }
	 catch(std::exception& ex)
         {
             ERROR_STREAM << "Server::handleAccept() " << ex.what() << endl;
         }
	 catch(...)
         {
             ERROR_STREAM << "Server::handleAccept() unknown error... " << endl;
         }
    }
    else
    {
        ERROR_STREAM << "Server::handleAccept() " << ec.message() << endl;
    }

    startAccept();
}