Commit dbb7c443 authored by Marco De Marco's avatar Marco De Marco
Browse files

Protocol buffer communication works

parent 17258480
Loading
Loading
Loading
Loading
+44 −3
Original line number Diff line number Diff line
@@ -9,11 +9,13 @@ namespace MetadataImporter_ns
//      Client::Client()
//==============================================================================
Client::Client(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp) :
    Tango::LogAdapter(deviceImpl_p), m_configuration_sp(configuration_sp),
    m_work(m_ioService), m_resolver(m_ioService)
    Tango::LogAdapter(deviceImpl_p), m_deviceImpl_p(deviceImpl_p),
    m_configuration_sp(configuration_sp), m_work(m_ioService), m_resolver(m_ioService)
{
    DEBUG_STREAM << "Client::Client()" << endl;

    GOOGLE_PROTOBUF_VERIFY_VERSION;

    m_dBManager_sp = DBManager::create(deviceImpl_p, configuration_sp);

    m_state = Tango::OFF;
@@ -36,6 +38,8 @@ Client::~Client()

        m_thread_sp->join();
    }

    google::protobuf::ShutdownProtobufLibrary();
}

//==============================================================================
@@ -121,4 +125,41 @@ void Client::run()
    }   //while
}


//==============================================================================
//      Client::prepareResponse()
//==============================================================================
void Client::encodeHeader(std::vector<boost::uint8_t>& buf, boost::uint32_t size)
    throw(std::runtime_error)
{
    DEBUG_STREAM << "Client::encodeHeader()" << endl;

    if(buf.size() < HEADER_SIZE)
        throw std::runtime_error("Buffer to small to contain header!");

    buf[0] = static_cast<boost::uint8_t>((size >> 24) & 0xFF);
    buf[1] = static_cast<boost::uint8_t>((size >> 16) & 0xFF);
    buf[2] = static_cast<boost::uint8_t>((size >> 8) & 0xFF);
    buf[3] = static_cast<boost::uint8_t>(size & 0xFF);
}

//==============================================================================
//      Client::prepareResponse()
//==============================================================================
boost::uint32_t Client::decodeHeader(std::vector<boost::uint8_t>& buf)
    throw(std::runtime_error)
{
    DEBUG_STREAM << "Client::decodeHeader()" << endl;

    if(buf.size() < HEADER_SIZE)
        throw std::runtime_error("Buffer to small to contain header!");

    boost::uint32_t size = 0;

    for (unsigned i = 0; i < HEADER_SIZE; ++i)
        size = size * 256 + (static_cast<unsigned>(buf[i]) & 0xFF);

    return size;
}

}   //namespace
 No newline at end of file
+23 −1
Original line number Diff line number Diff line
@@ -3,6 +3,8 @@

#include <Configuration.h>
#include <DBManager.h>
#include <Request.pb.h>
#include <Response.pb.h>

#include <tango.h>

@@ -10,6 +12,10 @@
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/cstdint.hpp>

#include <Request.pb.h>
#include <Response.pb.h>

namespace MetadataImporter_ns
{
@@ -22,6 +28,12 @@ public:
//------------------------------------------------------------------------------
    typedef boost::shared_ptr<Client> SP;

//------------------------------------------------------------------------------
//  [Public] Request Response shared pointer typedef
//------------------------------------------------------------------------------
    typedef boost::shared_ptr<Request> RequestSP;
    typedef boost::shared_ptr<Response> ResponseSP;

protected:
//------------------------------------------------------------------------------
//  [Protected] Constructor destructor
@@ -48,11 +60,17 @@ protected:
//------------------------------------------------------------------------------
    virtual void run();

    virtual void encodeHeader(std::vector<boost::uint8_t>&, boost::uint32_t)
        throw(std::runtime_error);

    virtual boost::uint32_t decodeHeader(std::vector<boost::uint8_t>&)
        throw(std::runtime_error);

//------------------------------------------------------------------------------
//  [Protected] Class variables
//------------------------------------------------------------------------------
    //Tango server class pointer
    Tango::DeviceImpl* deviceImpl_p;
    Tango::DeviceImpl* m_deviceImpl_p;

    //Configuration shared pointer
    Configuration::SP m_configuration_sp;
@@ -83,6 +101,10 @@ protected:

    //Tango status property
    std::string m_status;

    const unsigned HEADER_SIZE = 4;

    std::vector<boost::uint8_t> m_readBuff;
};

}   //End of namespace
+83 −16
Original line number Diff line number Diff line
@@ -166,12 +166,31 @@ void PlainClient::startRequest()
{
    DEBUG_STREAM << "PlainClient::startRequest()" << endl;

    std::ostream requestStream(&m_request);
    requestStream << "Request \n";
    RequestSP request_sp(new Request);

    request_sp->set_type(Request::AUTHORIZATION);

    Request::Authorization* authorization = request_sp->mutable_authorization();
    authorization->set_username(m_configuration_sp->getRemoteUsername());
    authorization->set_password(m_configuration_sp->getRemotePassword());

    boost::uint32_t bodySize = request_sp->ByteSize();

    DEBUG_STREAM << "PlainClient::startRequest() SIZE " << bodySize << endl;

    if(!request_sp->IsInitialized())
        ERROR_STREAM << "NOT INITIALISED" << endl;

    std::vector<boost::uint8_t> writeBuff;
    writeBuff.resize(HEADER_SIZE + bodySize);

    encodeHeader(writeBuff, bodySize);

    request_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize);

    m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30));

    boost::asio::async_write(m_plainSocket, m_request,
    boost::asio::async_write(m_plainSocket, boost::asio::buffer(writeBuff),
        boost::bind(&PlainClient::handleRequest, this,
        boost::asio::placeholders::error));
}
@@ -185,7 +204,7 @@ void PlainClient::handleRequest(const boost::system::error_code& errorCode)

    if(!errorCode)
    {
        startResponse();
        startReadResponseHeader();
    }
    else
    {
@@ -194,31 +213,79 @@ void PlainClient::handleRequest(const boost::system::error_code& errorCode)
}

//==============================================================================
//      PlainClient::startResponse()
//      PlainClient::startReadResponseHeader()
//==============================================================================
void PlainClient::startResponse()
void PlainClient::startReadResponseHeader()
{
    DEBUG_STREAM << "PlainClient::startResponse()" << endl;
    DEBUG_STREAM << "PlainClient::startReadResponseHeader()" << endl;

    m_readBuff.resize(HEADER_SIZE);

    m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30));

    boost::asio::async_read_until(m_plainSocket, m_response, "\n",
        boost::bind(&PlainClient::handleResponse, this,
    boost::asio::async_read(m_plainSocket, boost::asio::buffer(m_readBuff),
        boost::bind(
            &PlainClient::handleReadResponseHeader, this,
            boost::asio::placeholders::error));
}

//==============================================================================
//      PlainClient::handleReadResponseHeader()
//==============================================================================
void PlainClient::handleReadResponseHeader(const boost::system::error_code& errorCode)
{
    DEBUG_STREAM << "PlainClient::handleReadResponseHeader()" << endl;

    if(!errorCode)
    {
        boost::uint32_t bodySize = decodeHeader(m_readBuff);

        DEBUG_STREAM << "PlainClient::handleReadResponseHeader() SIZE: " << bodySize << endl;

    INFO_STREAM << "PlainClient::handleRequest() " << &m_response << endl;
        startReadResponseBody(bodySize);
    }
    else
    {
        ERROR_STREAM << "PlainClient::handleReadResponseHeader() " << errorCode.message() << endl;
    }
}

//==============================================================================
//      PlainClient::startReadResponseBody()
//==============================================================================
void PlainClient::startReadResponseBody(boost::uint32_t bodySize)
{
    DEBUG_STREAM << "PlainClient::startReadResponseBody()" << endl;

    m_readBuff.resize(HEADER_SIZE + bodySize);

    boost::asio::mutable_buffers_1 mutableBuffer =
        boost::asio::buffer(&m_readBuff[HEADER_SIZE], bodySize);

    boost::asio::async_read(m_plainSocket, mutableBuffer,
        boost::bind(&PlainClient::handleReadResponseBody, this,
            boost::asio::placeholders::error));
}

//==============================================================================
//      PlainClient::handleResponse()
//      PlainClient::handleReadResponseBody()
//==============================================================================
void PlainClient::handleResponse(const boost::system::error_code& errorCode)
void PlainClient::handleReadResponseBody(const boost::system::error_code& errorCode)
{
    DEBUG_STREAM << "PlainClient::handleResponse()" << endl;
    DEBUG_STREAM << "PlainClient::handleReadResponseBody()" << endl;

    if(!errorCode)
    {
        m_requestResponseTimer.expires_from_now(boost::posix_time::seconds(5));
        ResponseSP response_sp(new Response);

        response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], m_readBuff.size() - HEADER_SIZE);

        const Response::Authorization& authorization = response_sp->authorization();

        INFO_STREAM << "STATUS " << authorization.status() << endl;

        m_requestResponseTimer.expires_from_now(boost::posix_time::seconds(10));

        m_requestResponseTimer.async_wait(boost::bind(&PlainClient::startRequest, this));
    }
    else
@@ -232,7 +299,7 @@ void PlainClient::handleResponse(const boost::system::error_code& errorCode)
//==============================================================================
void PlainClient::resetConnection()
{
        DEBUG_STREAM << "PlainClient::resetConnection()" << endl;
        //DEBUG_STREAM << "PlainClient::resetConnection()" << endl;

        if(m_resetConnectionTimer.expires_at() <=
                boost::asio::deadline_timer::traits_type::now())
+9 −6
Original line number Diff line number Diff line
@@ -52,24 +52,27 @@ protected:

    void handleRequest(const boost::system::error_code&);

    void startResponse();
    void startReadResponseHeader();

    void handleResponse(const boost::system::error_code&);
    void handleReadResponseHeader(const boost::system::error_code&);

    void startReadResponseBody(boost::uint32_t);

    void handleReadResponseBody(const boost::system::error_code&);

    void resetConnection();

//------------------------------------------------------------------------------
//  [Protected] Class variables
//------------------------------------------------------------------------------
    //Plain connection socket
    boost::asio::ip::tcp::socket m_plainSocket;

    //First connection timeout
    boost::asio::deadline_timer m_resetConnectionTimer;

    //Request response timeout
    boost::asio::deadline_timer m_requestResponseTimer;

    boost::asio::streambuf m_request;

    boost::asio::streambuf m_response;
};

}   //End of namespace