Commit 685191f9 authored by Marco De Marco's avatar Marco De Marco
Browse files

Experimental branch started

parent 60f15026
Loading
Loading
Loading
Loading

proto/Message.proto

0 → 100644
+76 −0
Original line number Diff line number Diff line
package MetadataMessage;

message Request
{
	enum Type
	{
		AUTHORIZATION = 0;
		METADATA = 1;
	}

	required Type type = 1;

	message Authorization
	{
		required string username = 1;
		required string password = 2;
	}

	optional Authorization authorization = 2;

	message Metadata
	{
		required string host = 1;
		required uint32 port = 2;
		required string schema = 3;
		required string table = 4;
	}

	optional Metadata metadata = 5;
}

message Response
{
	enum Type
	{
		AUTHORIZATION = 0;
		METADATA = 1;
	}

	required Type type = 1;

	message Authorization
	{
		enum State
		{
			ACCEPTED = 0;
			REJECTED = 1;
		}
	}

	optional Authorization authorization = 2;

	message Metadata
	{
		message Parameter
		{
			required string key = 4;

			enum Type
			{
				VARCHAR = 0;
				INTEGER = 1;
				DOUBLE = 2;
			}

		required Type type = 5;

			required string value = 6;
		}

		repeated Parameter parameters = 7;
	}

	optional Metadata metadata = 5;
}
+44 −33
Original line number Diff line number Diff line
@@ -9,7 +9,7 @@ namespace MetadataExporter_ns
//==============================================================================
PlainSession::PlainSession(Tango::DeviceImpl* deviceImpl_p,
    boost::shared_ptr<boost::asio::io_service> ioService_sp) :
    Session::Session(deviceImpl_p, ioService_sp), m_socket(*ioService_sp)
    Session::Session(deviceImpl_p, ioService_sp), m_plainSocket(*ioService_sp)
{
    DEBUG_STREAM << "PlainSession::PlainSession()" << endl;
}
@@ -20,6 +20,12 @@ PlainSession::PlainSession(Tango::DeviceImpl* deviceImpl_p,
PlainSession::~PlainSession()
{
    DEBUG_STREAM << "PlainSession::~PlainSession()" << endl;

    boost::system::error_code errorCode;

    m_plainSocket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, errorCode);

    m_plainSocket.close(errorCode);
}

//==============================================================================
@@ -41,7 +47,7 @@ boost::asio::ip::tcp::socket& PlainSession::getSocket()
{
    DEBUG_STREAM << "PlainSession::getSocket()" << endl;

    return m_socket;
    return m_plainSocket;
}

//==============================================================================
@@ -52,76 +58,81 @@ void PlainSession::start()
    DEBUG_STREAM << "PlainSession::start()" << endl;

    INFO_STREAM << "PlainSession::start() connection from "
        << m_socket.remote_endpoint() << endl;

    BufferSP recvBuffer_sp(new Buffer);

    m_socket.async_read_some(boost::asio::buffer(*recvBuffer_sp),
        m_strand.wrap(boost::bind( &PlainSession::handleRequest, shared_from_this(),
        recvBuffer_sp, boost::asio::placeholders::bytes_transferred,
        boost::asio::placeholders::error)));
        << m_plainSocket.remote_endpoint() << endl;

    boost::asio::async_read_until(m_plainSocket, m_request, "\r\n",
        m_strand.wrap(
            boost::bind(
                &PlainSession::handleRequest, shared_from_this(),
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred)));
}

//==============================================================================
//      PlainSession::handleRequest()
//==============================================================================
void PlainSession::handleRequest(BufferSP recvBuffer_sp, std::size_t size,
    const boost::system::error_code& ec)
void PlainSession::handleRequest(const boost::system::error_code& ec, std::size_t size)
{
    DEBUG_STREAM << "PlainSession::handleRequest()" << endl;

    if(!ec)
    {
        BufferSP sendBuffer_sp(new Buffer);
        m_request.commit(size);

        for(std::size_t i=0; i<size; ++i)
        {
            (*sendBuffer_sp)[i] = toupper( (*recvBuffer_sp)[i] );
        }
        std::istream iStream(&m_request);
        std::string request;
        iStream >> request;

        INFO_STREAM << "PlainSession::handleRequest() request \"" << request << "\"" << endl;

        boost::asio::async_write(m_socket, boost::asio::buffer(*sendBuffer_sp, size),
            m_strand.wrap(boost::bind(&PlainSession::handleResponse, shared_from_this(),
            sendBuffer_sp, boost::asio::placeholders::bytes_transferred,
            boost::asio::placeholders::error)));
        std::ostream oStream(&m_response);
        oStream << request << "\r\n";

        boost::asio::async_write(m_plainSocket, m_response,
            m_strand.wrap(
                boost::bind(&PlainSession::handleResponse, shared_from_this(),
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred)));
    }
    else if(ec == boost::asio::error::eof)
    {
        INFO_STREAM << "PlainSession::handleRequest() disconnection from "
            << m_socket.remote_endpoint() << endl;
            << m_plainSocket.remote_endpoint() << endl;
    }
    else
    {
        WARN_STREAM << "PlainSession::handleRequest() error " << ec.message()
            << " from " << m_socket.remote_endpoint() << endl;
            << " from " << m_plainSocket.remote_endpoint() << endl;
    }
}

//==============================================================================
//      PlainSession::handleResponse()
//==============================================================================
void PlainSession::handleResponse(BufferSP sendBuffer_sp, std::size_t size,
    const boost::system::error_code& ec)
void PlainSession::handleResponse(const boost::system::error_code& ec, std::size_t size)
{
    DEBUG_STREAM << "PlainSession::handleResponse()" << endl;

    if(!ec)
    {
        BufferSP recvBuffer_sp( new Buffer );

        m_socket.async_read_some(boost::asio::buffer(*recvBuffer_sp),
            m_strand.wrap(boost::bind(&PlainSession::handleRequest, shared_from_this(),
            recvBuffer_sp, boost::asio::placeholders::bytes_transferred,
            boost::asio::placeholders::error)));
        m_response.consume(size);

        boost::asio::async_read_until(m_plainSocket, m_request, "\r\n",
            m_strand.wrap(
                boost::bind(
                    &PlainSession::handleRequest, shared_from_this(),
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred)));
    }
    else if(ec == boost::asio::error::eof)
    {
        INFO_STREAM << "PlainSession::handleResponse() disconnection from "
            << m_socket.remote_endpoint() << endl;
            << m_plainSocket.remote_endpoint() << endl;
    }
    else
    {
        WARN_STREAM << "PlainSession::handleResponse() error " << ec.message()
            << " from " << m_socket.remote_endpoint() << endl;
            << " from " << m_plainSocket.remote_endpoint() << endl;
    }
}

+7 −5
Original line number Diff line number Diff line
@@ -44,17 +44,19 @@ protected:
//------------------------------------------------------------------------------
//  [Protected] Utilities methods
//------------------------------------------------------------------------------
    virtual void handleRequest(BufferSP, std::size_t,
        const boost::system::error_code&);
    virtual void handleRequest(const boost::system::error_code&, std::size_t);

    virtual void handleResponse(BufferSP, std::size_t,
        const boost::system::error_code&);
    virtual void handleResponse(const boost::system::error_code&, std::size_t);

//------------------------------------------------------------------------------
//	[Protected] Class variables
//------------------------------------------------------------------------------
    //TCP socket object
    boost::asio::ip::tcp::socket m_socket;
    boost::asio::ip::tcp::socket m_plainSocket;

    boost::asio::streambuf m_request;

    boost::asio::streambuf m_response;
};

}   //End of namespace
+3 −23
Original line number Diff line number Diff line
@@ -71,12 +71,7 @@ void SSLSession::handleHandShake(const boost::system::error_code& ec)

        if(!ec)
        {
            BufferSP recvBuffer_sp( new Buffer );
            
            m_sslSocket.async_read_some(boost::asio::buffer(*recvBuffer_sp),
                m_strand.wrap(boost::bind(&SSLSession::handleRequest, shared_from_this(),
                recvBuffer_sp, boost::asio::placeholders::bytes_transferred,
                boost::asio::placeholders::error)));
        }
        else
        {
@@ -88,22 +83,13 @@ void SSLSession::handleHandShake(const boost::system::error_code& ec)
//==============================================================================
//      SSLSession::handleRequest()
//==============================================================================
void SSLSession::handleRequest(BufferSP recvBuffer_sp, std::size_t size,
    const boost::system::error_code& ec)
void SSLSession::handleRequest(const boost::system::error_code& ec, std::size_t size)
{
    DEBUG_STREAM << "SSLSession::handleRequest()" << endl;

    if(!ec)
    {
        BufferSP sendBuffer_sp( new Buffer );

        for(std::size_t i=0; i<size; ++i)
            (*sendBuffer_sp)[i] = toupper( (*recvBuffer_sp)[i] );

        boost::asio::async_write(m_sslSocket, boost::asio::buffer(*sendBuffer_sp, size),
            m_strand.wrap(boost::bind(&SSLSession::handleResponse, shared_from_this(),
            sendBuffer_sp, boost::asio::placeholders::bytes_transferred,
            boost::asio::placeholders::error)));
    }
    else if(ec == boost::asio::error::eof)
    {
@@ -120,19 +106,13 @@ void SSLSession::handleRequest(BufferSP recvBuffer_sp, std::size_t size,
//==============================================================================
//      SSLSession::handleResponse()
//==============================================================================
void SSLSession::handleResponse(BufferSP sendBuffer_sp, std::size_t size,
        const boost::system::error_code& ec)
void SSLSession::handleResponse(const boost::system::error_code& ec, std::size_t size)
{
    DEBUG_STREAM << "SSLSession::handleResponse()" << endl;

    if(!ec)
    {
        BufferSP recvBuffer_sp( new Buffer );

        m_sslSocket.async_read_some(boost::asio::buffer(*recvBuffer_sp),
            m_strand.wrap(boost::bind(&SSLSession::handleRequest, shared_from_this(),
            recvBuffer_sp, boost::asio::placeholders::bytes_transferred,
            boost::asio::placeholders::error)));
    }
    else if(ec == boost::asio::error::eof)
    {
+2 −4
Original line number Diff line number Diff line
@@ -49,11 +49,9 @@ protected:
//------------------------------------------------------------------------------
    virtual void handleHandShake(const boost::system::error_code& ec);

    virtual void handleRequest(BufferSP, std::size_t,
        const boost::system::error_code&);
    virtual void handleRequest(const boost::system::error_code&, std::size_t);

    virtual void handleResponse(BufferSP, std::size_t,
        const boost::system::error_code&);
    virtual void handleResponse(const boost::system::error_code&, std::size_t);

//------------------------------------------------------------------------------
//	[Protected] Class variables
Loading