Commit 3a151d09 authored by Marco De Marco's avatar Marco De Marco
Browse files

Experimental branch started

parent 6b4bade0
Loading
Loading
Loading
Loading
+36 −9
Original line number Diff line number Diff line
#================================================================================
EXEC_NAME=metadataImporter-srv
INST_NAME=test
DEBUG_LEV=-v1
DEBUG_LEV=-v3
INSTALL_DIR=/usr/local/bin
#================================================================================
INC_DIR=/usr/local/omniORB-4.1.7/include \
@@ -10,28 +10,38 @@ INC_DIR=/usr/local/omniORB-4.1.7/include \
	   /usr/local/soci-3.2.1/include \
	   /usr/local/soci-3.2.1/include/soci \
	   /usr/include/mysql \
	   /usr/local/protobuf-2.5.0/include \
	   ./src
LIB_DIR=/usr/local/omniORB-4.1.7/lib \
	   /usr/local/zeromq-3.2.3/lib \
	   /usr/local/tango-8.1.2/lib \
	   /usr/local/soci-3.2.1/lib64
	   /usr/local/soci-3.2.1/lib64 \
	   /usr/local/protobuf-2.5.0/lib
#================================================================================
CC=g++
CXX_DEBUG_FLAGS=-g -DVERBOSE_DEBUG
CXX_RELEASE_FLAGS=-O3
CXX_DEFAULT_FLAGS=-c -Wall -Wextra -std=c++11 -std=gnu++11
LDFLAGS=-Wall -lomniORB4 -lomniDynamic4 -lCOS4 -lomnithread -ltango -llog4tango \
	-lsoci_core -lsoci_mysql -lboost_system -lboost_thread
	-lsoci_core -lsoci_mysql -lboost_system -lboost_thread -lprotobuf
INC_PARM=$(foreach d, $(INC_DIR), -I$d)
LIB_PARM=$(foreach d, $(LIB_DIR), -L$d)
PROTOC :=/usr/local/protobuf-2.5.0/bin/protoc
#================================================================================
SRC_DIR=./src
OBJ_DIR=./obj
BIN_DIR=./bin
PROTO_DIR=./proto
#================================================================================
EXECUTABLE=$(BIN_DIR)/$(EXEC_NAME)
CPP_FILES=$(wildcard $(SRC_DIR)/*.cpp)
OBJ_FILES=$(addprefix $(OBJ_DIR)/,$(notdir $(CPP_FILES:.cpp=.o)))
EXECUTABLE := $(BIN_DIR)/$(EXEC_NAME)
CPP_FILES := $(wildcard $(SRC_DIR)/*.cpp)
OBJ_FILES := $(addprefix $(OBJ_DIR)/,$(notdir $(CPP_FILES:.cpp=.o)))
#================================================================================
PROTO_FILES := $(wildcard $(PROTO_DIR)/*.proto)
PROTO_HEADERS := $(addprefix $(SRC_DIR)/,$(notdir $(PROTO_FILES:.proto=.pb.h)))
PROTO_CLASSES := $(addprefix $(SRC_DIR)/,$(notdir $(PROTO_FILES:.proto=.pb.cc)))
CPP_FILES += $(PROTO_CLASSES)
OBJ_FILES += $(addprefix $(OBJ_DIR)/,$(notdir $(PROTO_CLASSES:.pb.cc=.pb.o)))
#================================================================================

.PHONY: all
@@ -49,12 +59,19 @@ release: $(EXECUTABLE)
debug: CXXFLAGS+=$(CXX_DEBUG_FLAGS) $(CXX_DEFAULT_FLAGS)
debug: $(EXECUTABLE)

$(EXECUTABLE): makedir $(OBJ_FILES)
$(EXECUTABLE): makedir protoc $(OBJ_FILES)
	$(CC) $(LDFLAGS) $(LIB_PARM) -o $@ $(OBJ_FILES)

$(OBJ_DIR)/%.o: $(SRC_DIR)/%.cpp
	$(CC) $(CXXFLAGS) $(INC_PARM) -o $@ $<

$(OBJ_DIR)/%.pb.o: $(SRC_DIR)/%.pb.cc
	$(CC) $(CXXFLAGS) $(INC_PARM) -o $@ $<

.PHONY: protoc
protoc:
	$(PROTOC) --proto_path=$(PROTO_DIR) --cpp_out=$(SRC_DIR) $(PROTO_FILES)

.PHONY: makedir
makedir:
	-mkdir -p $(OBJ_DIR) $(BIN_DIR)
@@ -63,6 +80,10 @@ makedir:
clean:
	-rm -rf $(OBJ_DIR) $(BIN_DIR)

.PHONY: deepclean
deepclean:
	-rm -rf $(OBJ_DIR) $(BIN_DIR) $(PROTO_HEADERS) $(PROTO_CLASSES)

.PHONY: install
install:
	-cp $(EXECUTABLE) $(INSTALL_DIR)
@@ -79,4 +100,10 @@ echo:
	@echo $(INC_PARM)
	@echo LIB_PARM
	@echo $(LIB_PARM)
	@echo PROTO_FILES
	@echo $(PROTO_FILES)
	@echo PROTO_CLASSES
	@echo $(PROTO_CLASSES)
	@echo PROTO_HEADERS
	@echo $(PROTO_HEADERS)
+138 −37
Original line number Diff line number Diff line
#include <PlainClient.h>

#include <boost/lexical_cast.hpp>
#include <boost/bind.hpp>

namespace MetadataImporter_ns
{
@@ -10,7 +11,8 @@ namespace MetadataImporter_ns
//==============================================================================
PlainClient::PlainClient(Tango::DeviceImpl* deviceImpl_p,
    Configuration::SP configuration_sp) : Client(deviceImpl_p, configuration_sp),
    m_plainSocket(m_ioService)
    m_plainSocket(m_ioService), m_resetConnectionTimer(m_ioService),
    m_requestResponseTimer(m_ioService)
{
    DEBUG_STREAM << "PlainClient::PlainClient()" << endl;
}
@@ -44,12 +46,7 @@ void PlainClient::start()

    Client::start();

    boost::asio::ip::tcp::resolver::query query(m_configuration_sp->getAddress(),
        boost::lexical_cast<std::string>(m_configuration_sp->getPort()));

    m_resolver.async_resolve(query, boost::bind(&PlainClient::handleResolve, this,
        boost::asio::placeholders::error, boost::asio::placeholders::iterator));

    startResolve();
}

//==============================================================================
@@ -59,9 +56,37 @@ void PlainClient::stop()
{
    DEBUG_STREAM << "PlainClient::stop()" << endl;

    boost::system::error_code errorCode;

    m_plainSocket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, errorCode);

    m_plainSocket.close(errorCode);

    Client::stop();
}

//==============================================================================
//      PlainClient::startResolve()
//==============================================================================
void PlainClient::startResolve()
{
    DEBUG_STREAM << "PlainClient::startResolve()" << endl;

    boost::asio::ip::tcp::resolver::query query(m_configuration_sp->getAddress(),
        boost::lexical_cast<std::string>(m_configuration_sp->getPort()));

    INFO_STREAM << "PlainClient::startResolve() host: "
        << m_configuration_sp->getAddress() << " port: "
        << m_configuration_sp->getPort() << endl;

    m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30));

    m_resolver.async_resolve(query, boost::bind(&PlainClient::handleResolve, this,
        boost::asio::placeholders::error, boost::asio::placeholders::iterator));

    m_resetConnectionTimer.async_wait(boost::bind(&PlainClient::resetConnection, this));
}

//==============================================================================
//      PlainClient::handleResolve()
//==============================================================================
@@ -70,43 +95,86 @@ void PlainClient::handleResolve(const boost::system::error_code& errorCode,
{
    DEBUG_STREAM << "PlainClient::handleResolve()" << endl;

    INFO_STREAM << "PlainClient::handleResolve() " << endPointIterator->host_name() << endl;

    if(!errorCode)
    {
        boost::asio::async_connect(m_plainSocket, endPointIterator,
        startConnect(endPointIterator);
    }
    else
    {
        ERROR_STREAM << "PlainClient::handleResolve() " << errorCode.message() << endl;
    }
}

//==============================================================================
//      PlainClient::startConnect()
//==============================================================================
void PlainClient::startConnect(boost::asio::ip::tcp::resolver::iterator endPointIterator)
{
    DEBUG_STREAM << "PlainClient::startConnect()" << endl;

    m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30));

    if(endPointIterator != boost::asio::ip::tcp::resolver::iterator())
    {
        INFO_STREAM << "PlainClient::startConnect() connecting to "
            << endPointIterator->endpoint() << endl;

        m_plainSocket.async_connect(endPointIterator->endpoint(),
            boost::bind(&PlainClient::handleConnect, this,
            boost::asio::placeholders::error));
            boost::asio::placeholders::error, endPointIterator));
    }
    else
    {
        ERROR_STREAM << "PlainClient::handleResolve() "
            << errorCode.message() << endl;
        ERROR_STREAM << "PlainClient::startConnect() no more endpoint" << endl;
    }
}

//==============================================================================
//      PlainClient::handleConnect()
//==============================================================================
void PlainClient::handleConnect(const boost::system::error_code& errorCode)
void PlainClient::handleConnect(const boost::system::error_code& errorCode,
    boost::asio::ip::tcp::resolver::iterator endPointIterator)
{
    DEBUG_STREAM << "PlainClient::handleConnect()" << endl;

//    if(!m_plainSocket.is_open())
//    {
//        ERROR_STREAM << "PlainClient::handleConnect() connection timeout!" << endl;
//
//        startConnect(++endPointIterator);
//    }
//    else
    if(!errorCode)
    {
        startRequest();
    }
    else
    {
        ERROR_STREAM << "PlainClient::handleConnect() " << errorCode.message() << endl;

        if(m_plainSocket.is_open())
            m_plainSocket.close();

        startConnect(++endPointIterator);
    }
}

//==============================================================================
//      PlainClient::startRequest()
//==============================================================================
void PlainClient::startRequest()
{
    DEBUG_STREAM << "PlainClient::startRequest()" << endl;

    std::ostream requestStream(&m_request);
        requestStream << "Request \r\n";
    requestStream << "Request \n";

    m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30));

    boost::asio::async_write(m_plainSocket, m_request,
        boost::bind(&PlainClient::handleRequest, this,
        boost::asio::placeholders::error));
}
    else
    {
        ERROR_STREAM << "PlainClient::handleConnect() "
            << errorCode.message() << endl;
    }
}

//==============================================================================
//      PlainClient::handleRequest()
@@ -117,19 +185,30 @@ void PlainClient::handleRequest(const boost::system::error_code& errorCode)

    if(!errorCode)
    {
        boost::asio::async_read_until(m_plainSocket, m_response, "\r\n",
            boost::bind(&PlainClient::handleResponse, this,
            boost::asio::placeholders::error));

        INFO_STREAM << "PlainClient::handleRequest() " << &m_response << endl;
        startResponse();
    }
    else
    {
        ERROR_STREAM << "PlainClient::handleRequest() "
            << errorCode.message() << endl;
        ERROR_STREAM << "PlainClient::handleRequest() " << errorCode.message() << endl;
    }
}

//==============================================================================
//      PlainClient::startResponse()
//==============================================================================
void PlainClient::startResponse()
{
    DEBUG_STREAM << "PlainClient::startResponse()" << endl;

    m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30));

    boost::asio::async_read_until(m_plainSocket, m_response, "\n",
        boost::bind(&PlainClient::handleResponse, this,
        boost::asio::placeholders::error));

    INFO_STREAM << "PlainClient::handleRequest() " << &m_response << endl;
}

//==============================================================================
//      PlainClient::handleResponse()
//==============================================================================
@@ -139,18 +218,40 @@ void PlainClient::handleResponse(const boost::system::error_code& errorCode)

    if(!errorCode)
    {
        std::ostream requestStream(&m_request);
        requestStream << "Request \r\n";

        boost::asio::async_write(m_plainSocket, m_request,
            boost::bind(&PlainClient::handleRequest, this,
            boost::asio::placeholders::error));
        m_requestResponseTimer.expires_from_now(boost::posix_time::seconds(5));
        m_requestResponseTimer.async_wait(boost::bind(&PlainClient::startRequest, this));
    }
    else
    {
        ERROR_STREAM << "PlainClient::handleResponse() "
            << errorCode.message() << endl;
        ERROR_STREAM << "PlainClient::handleResponse() " << errorCode.message() << endl;
    }
}

//==============================================================================
//      PlainClient::resetConnection()
//==============================================================================
void PlainClient::resetConnection()
{
        DEBUG_STREAM << "PlainClient::resetConnection()" << endl;

        if(m_resetConnectionTimer.expires_at() <=
                boost::asio::deadline_timer::traits_type::now())
        {
            INFO_STREAM << "PlainClient::resetConnection() closing socket" << endl;

            boost::system::error_code errorCode;

            m_plainSocket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, errorCode);

            m_plainSocket.close(errorCode);

            m_resetConnectionTimer.expires_at(boost::posix_time::pos_infin);
            m_requestResponseTimer.expires_at(boost::posix_time::pos_infin);

            startResolve();
        }

        m_resetConnectionTimer.async_wait(boost::bind(&PlainClient::resetConnection, this));
}

}   //namespace
+16 −1
Original line number Diff line number Diff line
@@ -38,20 +38,35 @@ protected:
//------------------------------------------------------------------------------
//      [Protected] Class variables
//------------------------------------------------------------------------------
    void startResolve();

    void handleResolve(const boost::system::error_code&,
        boost::asio::ip::tcp::resolver::iterator);

    void handleConnect(const boost::system::error_code&);
    void startConnect(boost::asio::ip::tcp::resolver::iterator);

    void handleConnect(const boost::system::error_code&,
        boost::asio::ip::tcp::resolver::iterator);

    void startRequest();

    void handleRequest(const boost::system::error_code&);

    void startResponse();

    void handleResponse(const boost::system::error_code&);

    void resetConnection();

//------------------------------------------------------------------------------
//  [Protected] Class variables
//------------------------------------------------------------------------------
    boost::asio::ip::tcp::socket m_plainSocket;

    boost::asio::deadline_timer m_resetConnectionTimer;

    boost::asio::deadline_timer m_requestResponseTimer;

    boost::asio::streambuf m_request;

    boost::asio::streambuf m_response;