Commit bdc03726 authored by Marco De Marco's avatar Marco De Marco
Browse files

Event buffer enhanced

parent 6fa93292
Loading
Loading
Loading
Loading
+52 −22
Original line number Diff line number Diff line
@@ -43,7 +43,7 @@ void EventBuffer::insertNew(boost::filesystem::path path)
	boost::mutex::scoped_lock lock(m_mutex);

    bool inserted = m_buffer.insert(
        std::pair<boost::filesystem::path, bool>(path, false) ).second;
        std::pair<boost::filesystem::path, EventStatus>(path, UNPROCESSED) ).second;
    
	if(inserted)
    {
@@ -70,15 +70,15 @@ boost::filesystem::path EventBuffer::waitNew()

	do
	{
		std::map<boost::filesystem::path, bool>::iterator it;   
		std::map<boost::filesystem::path, EventStatus>::iterator it;   
		for(it=m_buffer.begin(); it!=m_buffer.end(); it++)
		{
			if(it->second == false)
			if(it->second == UNPROCESSED)
			{
                DEBUG_STREAM << "EventBuffer::waitNew() found new element:"
                    << it->first.string() << endl;                 
                
				it->second = true;
				it->second = ASSIGNED;
				return it->first;
			}
		}
@@ -91,36 +91,66 @@ boost::filesystem::path EventBuffer::waitNew()
}

//==============================================================================
//	EventBuffer::removeProcessed()
//	EventBuffer::markAsProcessed()
//==============================================================================
void EventBuffer::removeProcessed(boost::filesystem::path path)
void EventBuffer::markAsProcessed(boost::filesystem::path path)
{
    DEBUG_STREAM << "EventBuffer::removeProcessed()" << endl;
    DEBUG_STREAM << "EventBuffer::markAsProcessed()" << endl;
    
    boost::mutex::scoped_lock lock(m_mutex);
    
    std::map<boost::filesystem::path, bool>::iterator it;
    std::map<boost::filesystem::path, EventStatus>::iterator it;

	it = m_buffer.find(path);

	if(it != m_buffer.end())
    {        
        if(it->second == true)
        switch(it->second)
        {
            m_buffer.erase(it);

            DEBUG_STREAM << "EventBuffer::removeProcessed() element "
                << path.string() << " removed" << endl;
            case UNPROCESSED:
                ERROR_STREAM << "EventBuffer::markAsProcessed() element "
                    << path.string() << " is marked not processed" << endl;                
                break;            
            case ASSIGNED:
                it->second = PROCESSED;
                DEBUG_STREAM << "EventBuffer::markAsProcessed() element "
                    << path.string() << " marked as processed" << endl;
                break;
            case PROCESSED:
                ERROR_STREAM << "EventBuffer::markAsProcessed() element "
                    << path.string() << " already marked as processed" << endl;                
                break;        
        }           
        else
            ERROR_STREAM << "EventBuffer::removeProcessed() element" 
                << path.string() << " to remove not processed" << endl;    
    }
    else
        ERROR_STREAM << "EventBuffer::removeProcessed() element" 
        ERROR_STREAM << "EventBuffer::markAsProcessed() element" 
            << path.string() << " not found" << endl;        
}

//==============================================================================
//	EventBuffer::removeAllProcessed()
//==============================================================================
void EventBuffer::removeAllProcessed()
{
    DEBUG_STREAM << "EventBuffer::removeAllProcessed()" << endl;
    
    boost::mutex::scoped_lock lock(m_mutex);
    
    std::map<boost::filesystem::path, EventStatus>::iterator it;
    for(it=m_buffer.begin(); it!=m_buffer.end(); ++it)
    {
        if(it->second == PROCESSED)
        {
            DEBUG_STREAM << "EventBuffer::removeAllProcessed() element "
                << it->first << "will be removed" << endl;
            
            std::map<boost::filesystem::path, EventStatus>::iterator to_delete = it;
            
            m_buffer.erase(to_delete);
        }
    }
}

//==============================================================================
//	EventBuffer::size()
//==============================================================================
+10 −3
Original line number Diff line number Diff line
@@ -50,11 +50,18 @@ public:

	virtual boost::filesystem::path waitNew();
    
	virtual void removeProcessed(boost::filesystem::path);
    virtual void markAsProcessed(boost::filesystem::path);
    
    virtual void removeAllProcessed();

	virtual std::size_t size(); 

protected:
//------------------------------------------------------------------------------
//	[Protected] Event status enumeration
//------------------------------------------------------------------------------    
    enum EventStatus { UNPROCESSED=0, ASSIGNED=1, PROCESSED=2 };
    
//------------------------------------------------------------------------------
//	[Protected] Class variables
//------------------------------------------------------------------------------    
@@ -65,7 +72,7 @@ protected:
	boost::condition_variable m_conditionVariable;
	
	//File buffer
	std::map<boost::filesystem::path, bool> m_buffer;
	std::map<boost::filesystem::path, EventStatus> m_buffer;
};

}   //End of namespace
+2 −0
Original line number Diff line number Diff line
@@ -305,6 +305,8 @@ void EventThread::eventLoop()
                    m_eventBuffer_sp->insertNew(path);
            }
		
            m_eventBuffer_sp->removeAllProcessed();
            
            DEBUG_STREAM << "EventThread::eventLoop() sleep for " << sleepTime << endl;
            
			boost::posix_time::milliseconds sleepPosixTime(sleepTime);
+1 −1
Original line number Diff line number Diff line
@@ -125,7 +125,7 @@ void WorkerThread::workerLoop()
                    << "\" not archived" << endl;                                    
            }

            m_eventBuffer_sp->removeProcessed(origPath);        
            m_eventBuffer_sp->markAsProcessed(origPath);        
        }
		catch(boost::thread_interrupted& ex)
		{