Commit 15745d4e authored by Marco De Marco's avatar Marco De Marco
Browse files

Connection manager check if connection still alive, debug messages added

parent ee388f40
Loading
Loading
Loading
Loading
+7 −3
Original line number Original line Diff line number Diff line
@@ -54,7 +54,7 @@ ConnectionManager::ConnectionManager(Tango::DeviceImpl* deviceImpl_p,
				connection << " dbname=" << schema;
				connection << " dbname=" << schema;


                #ifdef VERBOSE_DEBUG
                #ifdef VERBOSE_DEBUG
                    INFO_STREAM << "Connection: " << connection.str() << endl;
                    INFO_STREAM << "CONNECTION: " << connection.str() << endl;
                #endif                
                #endif                
                
                
				pool_sp->at(i).open(soci::mysql, connection.str());
				pool_sp->at(i).open(soci::mysql, connection.str());
@@ -62,13 +62,13 @@ ConnectionManager::ConnectionManager(Tango::DeviceImpl* deviceImpl_p,


			m_pool.insert( std::pair<PoolID, PoolSP>(poolID, pool_sp) ); 
			m_pool.insert( std::pair<PoolID, PoolSP>(poolID, pool_sp) ); 


            DEBUG_STREAM << "ConnectionManager::ConnectionManager() connection" 
            INFO_STREAM << "ConnectionManager::ConnectionManager() connection" 
                << " host: " << host.c_str() << " port: " << port 
                << " host: " << host.c_str() << " port: " << port 
                << " user: " << user.c_str() << " added" << endl;
                << " user: " << user.c_str() << " added" << endl;
		}
		}
		else
		else
		{
		{
            DEBUG_STREAM << "ConnectionManager::ConnectionManager() connection" 
            INFO_STREAM << "ConnectionManager::ConnectionManager() connection" 
                << " host: " << host.c_str() << " port: " << port 
                << " host: " << host.c_str() << " port: " << port 
                << " user: " << user.c_str() << " skipped" << endl;
                << " user: " << user.c_str() << " skipped" << endl;
		}
		}
@@ -121,6 +121,10 @@ ConnectionManager::SessionSP ConnectionManager::getSession(


	SessionSP session_sp( new soci::session( *(it->second) ) );
	SessionSP session_sp( new soci::session( *(it->second) ) );


    //Check if connection is still alive
    if(session_sp->get_backend() == NULL)
        session_sp->reconnect();
    
	return session_sp;
	return session_sp;
}
}


+10 −8
Original line number Original line Diff line number Diff line
@@ -66,7 +66,8 @@ void DMDBImporter::connect() throw(soci::soci_error)
    connection << " dbname=" << m_schema;
    connection << " dbname=" << m_schema;


#ifdef VERBOSE_DEBUG
#ifdef VERBOSE_DEBUG
    INFO_STREAM << "Connection: " << connection.str() << endl;
    INFO_STREAM << "CONNECTION: " << connection.str() << endl;
    INFO_STREAM << "-------------------------------------------------" << endl;
#endif
#endif
    
    
    m_session_sp->open(soci::mysql, connection.str());       
    m_session_sp->open(soci::mysql, connection.str());       
@@ -183,8 +184,8 @@ Instrument::SP DMDBImporter::getInstrument(std::string instrumentName)
    int destinationId = instrumentTuple.get<5>().get();
    int destinationId = instrumentTuple.get<5>().get();
    
    
#ifdef VERBOSE_DEBUG    
#ifdef VERBOSE_DEBUG    
    INFO_STREAM << "*" << instId << "|" << name << "|" << fitsKey << "|"
    INFO_STREAM << "INSTRUMENT: " << instId << " " << name << " " << fitsKey 
        << fitsValue << "|" << fitsDate << "|" << destinationId << "*" << endl;
        << " " << fitsValue << " " << fitsDate << " " << destinationId << endl;
    INFO_STREAM << "-------------------------------------------------" << endl;
    INFO_STREAM << "-------------------------------------------------" << endl;
#endif
#endif
    
    
@@ -249,9 +250,9 @@ Destination::SP DMDBImporter::getDestination(int destinationId)
    std::string storage = destinationTuple.get<7>().get();
    std::string storage = destinationTuple.get<7>().get();


#ifdef VERBOSE_DEBUG
#ifdef VERBOSE_DEBUG
    INFO_STREAM << "*" << destinationId << "|" << host << "|" << port << "|"
    INFO_STREAM << "DESTINATION: " << destinationId << " " << host << " " << port 
        << user << "|" << password << "|" << schema << "|" << table << "|"
        << " " << user << " " << password << " " << schema << " " << table
        << storage << "*" << endl;
        << " " << storage << endl;
    INFO_STREAM << "-------------------------------------------------" << endl;    
    INFO_STREAM << "-------------------------------------------------" << endl;    
#endif
#endif
    
    
@@ -308,8 +309,9 @@ Mapping::SPVector DMDBImporter::getMapping(int destinationId)
        int mandatory = it->get<5>().get();
        int mandatory = it->get<5>().get();


#ifdef VERBOSE_DEBUG
#ifdef VERBOSE_DEBUG
        INFO_STREAM << "*" << mapId << "|" << columnName << "|" << columnType << "|"
        INFO_STREAM << "MAPPING: " << mapId << " " << columnName << " " 
            << fitsPrimary << "|" << fitsSecondary << "|" << mandatory << "*" << endl;            
            << columnType << " " << fitsPrimary << " " << fitsSecondary << " " 
            << mandatory << endl;            
        INFO_STREAM << "-------------------------------------------------" << endl;
        INFO_STREAM << "-------------------------------------------------" << endl;
#endif        
#endif        
        
        
+7 −5
Original line number Original line Diff line number Diff line
@@ -67,9 +67,9 @@ void DMDBVerifier::testInstrumentMapping(Instrument::SP instrument_sp)
    Destination::SP destination_sp = instrument_sp->getDestination();
    Destination::SP destination_sp = instrument_sp->getDestination();
  
  
#ifdef VERBOSE_DEBUG
#ifdef VERBOSE_DEBUG
    INFO_STREAM << "Table: " << destination_sp->getHost() << ":" 
    INFO_STREAM << "TABLE: " << destination_sp->getHost() << " " 
        << destination_sp->getPort() << " " << destination_sp->getSchema() << "."
        << destination_sp->getPort() << " " << destination_sp->getSchema()
        << destination_sp->getTable() << endl;
         << " " << destination_sp->getTable() << endl;
    INFO_STREAM << "-------------------------------------------------" << endl;
    INFO_STREAM << "-------------------------------------------------" << endl;
#endif    
#endif    
    
    
@@ -89,12 +89,14 @@ void DMDBVerifier::testInstrumentMapping(Instrument::SP instrument_sp)
    
    
    try
    try
    {
    {
        //Check nadir mandatory columns in table definition
        testColumnDefinition(columnTuple_vector, "id", "mediumint", true);
        testColumnDefinition(columnTuple_vector, "id", "mediumint", true);
        testColumnDefinition(columnTuple_vector,"file_path","varchar", true);
        testColumnDefinition(columnTuple_vector,"file_path","varchar", true);
        testColumnDefinition(columnTuple_vector,"file_version","smallint", true);
        testColumnDefinition(columnTuple_vector,"file_version","smallint", true);
        testColumnDefinition(columnTuple_vector,"file_name","varchar", true);
        testColumnDefinition(columnTuple_vector,"file_name","varchar", true);
        testColumnDefinition(columnTuple_vector,"update_time","timestamp", true);
        testColumnDefinition(columnTuple_vector,"update_time","timestamp", true);
                
                
        //Check mapping columns in table definition
        Mapping::SPVector::iterator it;
        Mapping::SPVector::iterator it;
        for(it=mapping_spvector.begin(); it!=mapping_spvector.end(); ++it)
        for(it=mapping_spvector.begin(); it!=mapping_spvector.end(); ++it)
            testColumnDefinition(columnTuple_vector, (*it)->getColumnName(),
            testColumnDefinition(columnTuple_vector, (*it)->getColumnName(),
@@ -158,8 +160,8 @@ void DMDBVerifier::testColumnDefinition(ColumnTupleVector& columnTuple_vector,
    }
    }
    
    
#ifdef VERBOSE_DEBUG
#ifdef VERBOSE_DEBUG
    INFO_STREAM << "Column name: " << columnName << " column type: " 
    INFO_STREAM << "COLUMN: " << columnName << " " << columnType
        << columnType << " is nullable: " << isNullable << " -> OK" << endl;
        << " " << isNullable << " -> OK" << endl;
    INFO_STREAM << "-------------------------------------------------" << endl;
    INFO_STREAM << "-------------------------------------------------" << endl;
#endif    
#endif    
}
}
+1 −1
Original line number Original line Diff line number Diff line
@@ -75,7 +75,7 @@ boost::filesystem::path EventBuffer::waitNew()
		{
		{
			if(it->second == false)
			if(it->second == false)
			{
			{
                DEBUG_STREAM << "EventBuffer::insertNew() found new element:"
                DEBUG_STREAM << "EventBuffer::waitNew() found new element:"
                    << it->first.string() << endl;                 
                    << it->first.string() << endl;                 
                
                
				it->second = true;
				it->second = true;
+27 −18
Original line number Original line Diff line number Diff line
@@ -90,9 +90,9 @@ void EventThread::stop()
		m_threadGroup_sp->join_all();
		m_threadGroup_sp->join_all();
	}
	}


	inotify_rm_watch(fd, wd);
	inotify_rm_watch(m_fileDescriptor, m_watchDescriptor);


	if(fd) { close(fd); }
	if(m_fileDescriptor) { close(m_fileDescriptor); }
}
}


//==============================================================================
//==============================================================================
@@ -142,6 +142,7 @@ void EventThread::initEventBuffer() throw(std::runtime_error)
        throw std::runtime_error("Watch path \"" +
        throw std::runtime_error("Watch path \"" +
            watchPath + "\" is not a valid directory");
            watchPath + "\" is not a valid directory");


    //All files in watch path are inserted into event buffer
    boost::filesystem::directory_iterator startIt(path);
    boost::filesystem::directory_iterator startIt(path);
    boost::filesystem::directory_iterator endIt;
    boost::filesystem::directory_iterator endIt;


@@ -161,22 +162,23 @@ void EventThread::initINotify() throw(std::runtime_error)
{	
{	
    DEBUG_STREAM << "EventThread::initINotify()" << endl;
    DEBUG_STREAM << "EventThread::initINotify()" << endl;


    if((fd = inotify_init ()) < 0)
    if((m_fileDescriptor = inotify_init ()) < 0)
        throw std::runtime_error("INotify initialization");
        throw std::runtime_error("INotify initialization error");


	std::string watchPath(m_configuration_sp->getWatchPath());
	std::string watchPath(m_configuration_sp->getWatchPath());


	uint32_t iNotifyMask = m_configuration_sp->getINotifyMask();
	uint32_t iNotifyMask = m_configuration_sp->getINotifyMask();


    if((wd = inotify_add_watch(fd, watchPath.c_str(), iNotifyMask)) < 0)
    if((m_watchDescriptor = inotify_add_watch(
        throw std::runtime_error("INotify add watch");
        m_fileDescriptor, watchPath.c_str(), iNotifyMask)) < 0)
            throw std::runtime_error("INotify add watch error");


    int flags;
    int flags;
    if((flags = fcntl(fd,F_GETFL,0)) < 0)
    if((flags = fcntl(m_fileDescriptor,F_GETFL,0)) < 0)
        throw std::runtime_error("File descriptor get flags");
        throw std::runtime_error("File descriptor get flags error");


    if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
    if(fcntl(m_fileDescriptor, F_SETFL, flags | O_NONBLOCK) < 0)
        throw std::runtime_error("File descriptor set flags");
        throw std::runtime_error("File descriptor set flags error");
}
}


//==============================================================================
//==============================================================================
@@ -186,6 +188,7 @@ void EventThread::initConnectionManager() throw(std::runtime_error)
{
{
    DEBUG_STREAM << "EventThread::initConnectionManager()" << endl;
    DEBUG_STREAM << "EventThread::initConnectionManager()" << endl;


    //Initialize databases connections
	m_connectionManager_sp = ConnectionManager::create(m_fitsImporter_p,
	m_connectionManager_sp = ConnectionManager::create(m_fitsImporter_p,
		 m_configuration_sp, m_instrumentList_spvector);
		 m_configuration_sp, m_instrumentList_spvector);
}
}
@@ -217,17 +220,20 @@ void EventThread::initThreadGroup() throw(std::runtime_error)


	int workerNumber = m_configuration_sp->getWorkerNumber();
	int workerNumber = m_configuration_sp->getWorkerNumber();


    //Create a worker thread and pass all arguments
	WorkerThread worker(m_fitsImporter_p, m_eventBuffer_sp, m_connectionManager_sp,
	WorkerThread worker(m_fitsImporter_p, m_eventBuffer_sp, m_connectionManager_sp,
        m_configuration_sp, m_instrumentList_spvector, m_defaultInstrument_sp);        
        m_configuration_sp, m_instrumentList_spvector, m_defaultInstrument_sp);        
	
	
	try
	try
	{
	{
		//Add to thread group event thread
		//Add to thread group event thread
		m_threadGroup_sp->add_thread(new boost::thread(&EventThread::eventLoop, this));			
		m_threadGroup_sp->add_thread(
            new boost::thread(&EventThread::eventLoop, this));			


		//Add to thread group worker threads	
		//Add to thread group worker threads	
		for(int i=0; i<workerNumber; i++)
		for(int i=0; i<workerNumber; i++)
			m_threadGroup_sp->add_thread(new boost::thread(&WorkerThread::workerLoop, worker));
			m_threadGroup_sp->add_thread(
                new boost::thread(&WorkerThread::workerLoop, worker));
	}
	}
	catch(boost::thread_resource_error& ex)
	catch(boost::thread_resource_error& ex)
	{
	{
@@ -252,9 +258,9 @@ void EventThread::eventLoop()
		try
		try
		{
		{
			char buffer[BUF_LEN];
			char buffer[BUF_LEN];
			int length;
			int length = 0;


			if((length = read( fd, buffer, BUF_LEN )) < 0)
			if((length = read( m_fileDescriptor, buffer, BUF_LEN )) < 0)
			{
			{
				if(errno != EINTR && errno != EAGAIN)
				if(errno != EINTR && errno != EAGAIN)
				{
				{
@@ -287,6 +293,9 @@ void EventThread::eventLoop()
            {
            {
                event = ( struct inotify_event * ) &buffer[ i ];
                event = ( struct inotify_event * ) &buffer[ i ];


                DEBUG_STREAM << "EVENT: " << event->name << endl; //TODO: delete me
                
                //Add path to file name
                boost::filesystem::path file(event->name);
                boost::filesystem::path file(event->name);
                boost::filesystem::path path(watchPath);
                boost::filesystem::path path(watchPath);
                path /= file;
                path /= file;
Loading