Commit a28c2edb authored by Marco De Marco's avatar Marco De Marco
Browse files

Protocol manager class added

parent e193d775
Loading
Loading
Loading
Loading
+66 −79
Original line number Diff line number Diff line
@@ -2,6 +2,7 @@

#include <boost/lexical_cast.hpp>
#include <boost/bind.hpp>
#include <stdexcept>

namespace MetadataImporter_ns
{
@@ -11,9 +12,8 @@ namespace MetadataImporter_ns
//==============================================================================
Client::Client(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp) :
    Tango::LogAdapter(deviceImpl_p), m_deviceImpl_p(deviceImpl_p),
    m_configuration_sp(configuration_sp), m_work(m_ioService),
    m_resolver(m_ioService), m_resetConnectionTimer(m_ioService),
    m_requestResponseTimer(m_ioService)
    m_configuration_sp(configuration_sp),  m_resolver(m_ioService),
    m_resetConnectionTimer(m_ioService), m_requestResponseTimer(m_ioService)
{
    DEBUG_STREAM << "Client::Client()" << endl;

@@ -21,8 +21,10 @@ Client::Client(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_

    m_dBManager_sp = DBManager::create(deviceImpl_p, configuration_sp);

    m_state = Tango::OFF;
    m_protocolManager_sp = ProtocolManager::create(deviceImpl_p,
        configuration_sp, m_dBManager_sp);

    m_state = Tango::OFF;
    m_status="Disconnected";
}

@@ -54,6 +56,8 @@ void Client::start()

    m_ioService.reset();

    m_work_sp.reset(new boost::asio::io_service::work(m_ioService));

    m_thread_sp.reset(new boost::thread(boost::bind(&Client::run, this)));
}

@@ -66,12 +70,16 @@ void Client::stop()

    m_ioService.stop();

    m_work_sp.reset();

    if(m_thread_sp)
    {
        m_thread_sp->interrupt();

        m_thread_sp->join();
    }

    m_thread_sp.reset();
}

//==============================================================================
@@ -98,6 +106,30 @@ std::string Client::readStatus()
    return m_status;
}

//==============================================================================
//      Client::writeState()
//==============================================================================
void Client::writeState(Tango::DevState state)
{
    DEBUG_STREAM << "Client::writeState()" << endl;

    boost::mutex::scoped_lock stateLock(m_stateMutex);

    m_state = state;
}

//==============================================================================
//      Client::writeStatus()
//==============================================================================
void Client::writeStatus(std::string status)
{
    DEBUG_STREAM << "Client::writeStatus()" << endl;

    boost::mutex::scoped_lock statusLock(m_stateMutex);

    m_status = status;
}

//==============================================================================
//      Client::run()
//==============================================================================
@@ -137,7 +169,6 @@ void Client::startResolve()
{
    DEBUG_STREAM << "Client::startResolve()" << endl;

    //@warning: check lexical cast
    boost::asio::ip::tcp::resolver::query query(m_configuration_sp->getRemoteHost(),
        boost::lexical_cast<std::string>(m_configuration_sp->getRemotePort()));

@@ -215,16 +246,30 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode)
    DEBUG_STREAM << "Client::handleReadResponseBody()" << endl;

    if(!errorCode)
    {
        try
        {
            ResponseSP response_sp(new Response);

            response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], m_readBuff.size() - HEADER_SIZE);

        processAuthorisationResponse(response_sp);
            m_protocolManager_sp->processResponse(response_sp);

            //@todo: if no more data wait before starWriteRequest,
            //       else starWriteRequest now

            m_requestResponseTimer.expires_from_now(boost::posix_time::seconds(10));
            m_requestResponseTimer.async_wait(boost::bind(&Client::startWriteRequest, this));
        }
        catch(std::runtime_error& ec)
        {
            ERROR_STREAM << "Client::handleResponse() " << ec.what() << endl;
        }
        catch(...)
        {
            ERROR_STREAM << "Client::handleResponse() unknown error" << endl;
        }
    }
    else
    {
        ERROR_STREAM << "Client::handleResponse() " << errorCode.message() << endl;
@@ -241,6 +286,8 @@ void Client::resetConnection()
    if(m_resetConnectionTimer.expires_at() <=
            boost::asio::deadline_timer::traits_type::now())
    {
        INFO_STREAM << "CONNECTION RESET AFTER TIMEOUT" << endl;

        closeConnection();

        m_resetConnectionTimer.expires_at(boost::posix_time::pos_infin);
@@ -252,66 +299,6 @@ void Client::resetConnection()
    m_resetConnectionTimer.async_wait(boost::bind(&Client::resetConnection, this));
}

//==============================================================================
//      Client::createAuthorisationRequest()
//==============================================================================
Client::RequestSP Client::createAuthorisationRequest()
{
    DEBUG_STREAM << "Client::createAuthorisationRequest()" << endl;

    RequestSP request_sp(new Request);

    request_sp->set_type(Request::AUTHORIZATION);

    Request::Authorization* authorization = request_sp->mutable_authorization();
    authorization->set_username(m_configuration_sp->getRemoteUsername());
    authorization->set_password(m_configuration_sp->getRemotePassword());

    return request_sp;
}

//==============================================================================
//      Client::processAuthorisationResponse()
//==============================================================================
void Client::processAuthorisationResponse(Client::ResponseSP response_sp)
{
    DEBUG_STREAM << "Client::processAuthorisationResponse()" << endl;

        const Response::Authorization& authorization = response_sp->authorization();

        INFO_STREAM << "STATUS " << authorization.status() << endl;

        //@todo: verify if connected or not
}

//==============================================================================
//      Client::createMetadataRequest()
//==============================================================================
Client::RequestSP Client::createMetadataRequest()
{
    DEBUG_STREAM << "Client::createMetadataRequest()" << endl;

    RequestSP request_sp(new Request);

    request_sp->set_type(Request::METADATA);

    Request::Metadata* metadata = request_sp->mutable_metadata();
    metadata->set_schema(m_configuration_sp->getDatabaseSchema());
    metadata->set_table(m_configuration_sp->getDatabaseTable());

    return request_sp;
}

//==============================================================================
//      Client::processMetadataResponse()
//==============================================================================
void Client::processMetadataResponse(Client::ResponseSP response_sp)
{
    DEBUG_STREAM << "Client::processMetadataResponse()" << endl;

    //@todo: what to do in case of error
}

//==============================================================================
//      Client::encodeHeader()
//==============================================================================
+13 −19
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@

#include <Configuration.h>
#include <DBManager.h>
#include <ProtocolManager.h>
#include <Request.pb.h>
#include <Response.pb.h>

@@ -28,12 +29,6 @@ public:
//------------------------------------------------------------------------------
    typedef boost::shared_ptr<Client> SP;

//------------------------------------------------------------------------------
//  [Public] Request Response classes shared pointer typedef
//------------------------------------------------------------------------------
    typedef boost::shared_ptr<Request> RequestSP;
    typedef boost::shared_ptr<Response> ResponseSP;

protected:
//------------------------------------------------------------------------------
//  [Protected] Constructor destructor
@@ -51,13 +46,20 @@ public:
    virtual void stop() = 0;

//------------------------------------------------------------------------------
//  [Public] State status methods
//  [Public] Read state and status methods
//------------------------------------------------------------------------------
    virtual Tango::DevState readState();

    virtual std::string readStatus();

protected:
//------------------------------------------------------------------------------
//  [Protected] Write state and status methods
//------------------------------------------------------------------------------
    virtual void writeState(Tango::DevState);

    virtual void writeStatus(std::string);

//------------------------------------------------------------------------------
//  [Protected] IO service run thread method
//------------------------------------------------------------------------------
@@ -107,17 +109,6 @@ protected:

    virtual void resetConnection();

//------------------------------------------------------------------------------
//  [Protected] Request response methods
//------------------------------------------------------------------------------
    virtual RequestSP createAuthorisationRequest();

    virtual void processAuthorisationResponse(ResponseSP);

    virtual RequestSP createMetadataRequest();

    virtual void processMetadataResponse(ResponseSP);

//------------------------------------------------------------------------------
//  [Protected] Header encoding decoding methods
//------------------------------------------------------------------------------
@@ -139,11 +130,14 @@ protected:
    //Database manager shared pointer
    DBManager::SP m_dBManager_sp;

    //Protocol manager shared pointer
    ProtocolManager::SP m_protocolManager_sp;

    //IO service instance
    boost::asio::io_service m_ioService;

    //Work IO service instance
    boost::asio::io_service::work m_work;
    boost::scoped_ptr<boost::asio::io_service::work> m_work_sp;

    //DNS resolver instance
    boost::asio::ip::tcp::resolver m_resolver;
+17 −1
Original line number Diff line number Diff line
@@ -22,7 +22,7 @@ DBManager::DBManager(Tango::DeviceImpl* deviceImpl_p,
//==============================================================================
DBManager::~DBManager()
{
    DEBUG_STREAM << "DBManager::DBManager()" << endl;
    DEBUG_STREAM << "DBManager::~DBManager()" << endl;

    m_session_sp->close();
}
@@ -71,4 +71,20 @@ void DBManager::disconnect()
    m_session_sp->close();
}

//==============================================================================
//      DBManager::()
//==============================================================================

//==============================================================================
//      DBManager::()
//==============================================================================

//==============================================================================
//      DBManager::()
//==============================================================================

//==============================================================================
//      DBManager::()
//==============================================================================

}   //namespace
+6 −3
Original line number Diff line number Diff line
@@ -46,19 +46,22 @@ protected:

public:
//------------------------------------------------------------------------------
//  [Public] Users methods
//  [Public] Class creation method
//------------------------------------------------------------------------------
    static DBManager::SP create(Tango::DeviceImpl*, Configuration::SP);

//------------------------------------------------------------------------------
//  [Public] Connection handling methods
//------------------------------------------------------------------------------
    virtual void connect() throw(soci::soci_error);

    virtual void disconnect();

protected:
//------------------------------------------------------------------------------
//  [Protected] Utilities methods
//  [Public] Queries methods
//------------------------------------------------------------------------------

protected:
//------------------------------------------------------------------------------
//  [Protected] Class variables
//------------------------------------------------------------------------------
+44 −20
Original line number Diff line number Diff line
#include <PlainClient.h>

#include <boost/bind.hpp>
#include <sstream>

namespace MetadataImporter_ns
{
@@ -75,6 +76,12 @@ void PlainClient::startConnect(boost::asio::ip::tcp::resolver::iterator endPoint
        DEBUG_STREAM << "PlainClient::startConnect() connecting to "
            << endPointIterator->endpoint() << endl;

        writeState(Tango::OFF);

        std::stringstream statusStream;
        statusStream << "Connecting to " << endPointIterator->endpoint();
        writeStatus(statusStream.str());

        m_plainSocket.async_connect(endPointIterator->endpoint(),
            boost::bind(&PlainClient::handleConnect, this,
            boost::asio::placeholders::error, endPointIterator));
@@ -97,6 +104,12 @@ void PlainClient::handleConnect(const boost::system::error_code& errorCode,
    {
        INFO_STREAM << m_plainSocket.remote_endpoint() << " CONNECTED" << endl;

        writeState(Tango::ON);

        std::stringstream statusStream;
        statusStream << "Connected to " << m_plainSocket.remote_endpoint();
        writeStatus(statusStream.str());

        startWriteRequest();
    }
    else
@@ -117,7 +130,9 @@ void PlainClient::startWriteRequest()
{
    DEBUG_STREAM << "PlainClient::startRequest()" << endl;

    RequestSP request_sp = createAuthorisationRequest();
    try
    {
        RequestSP request_sp = m_protocolManager_sp->createRequest();

        boost::uint32_t bodySize = request_sp->ByteSize();

@@ -137,6 +152,15 @@ void PlainClient::startWriteRequest()
            boost::bind(&PlainClient::handleWriteRequest, this,
            boost::asio::placeholders::error));
    }
    catch(std::runtime_error& ec)
    {
        ERROR_STREAM << "PlainClient::startWriteRequest() " << ec.what() << endl;
    }
    catch(...)
    {
        ERROR_STREAM << "PlainClient::startWriteRequest() unknown error" << endl;
    }
}

//==============================================================================
//      PlainClient::startReadResponseHeader()
@@ -150,8 +174,7 @@ void PlainClient::startReadResponseHeader()
    m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30));

    boost::asio::async_read(m_plainSocket, boost::asio::buffer(m_readBuff),
        boost::bind(
            &PlainClient::handleReadResponseHeader, this,
        boost::bind(&PlainClient::handleReadResponseHeader, this,
            boost::asio::placeholders::error));
}

@@ -182,7 +205,8 @@ void PlainClient::closeConnection()
{
    DEBUG_STREAM << "PlainClient::closeConnection()" << endl;

        INFO_STREAM << m_plainSocket.remote_endpoint() << " DISCONNECTED" << endl;
    writeState(Tango::OFF);
    writeStatus("Disconnected");

    boost::system::error_code errorCode;

Loading