Commit 71a93a6b authored by Marco De Marco's avatar Marco De Marco
Browse files

Long time from query exec and commit fix

parent ab4faa05
Loading
Loading
Loading
Loading
+1 −4
Original line number Original line Diff line number Diff line
@@ -323,8 +323,6 @@ void EventThread::eventLoop()
            {
            {
                event = ( struct inotify_event * ) &buffer[ i ];
                event = ( struct inotify_event * ) &buffer[ i ];


                DEBUG_STREAM << "EVENT: " << event->name << endl; //TODO: delete me

                //Add path to file name
                //Add path to file name
                boost::filesystem::path file(event->name);
                boost::filesystem::path file(event->name);
                boost::filesystem::path path(watchPath);
                boost::filesystem::path path(watchPath);
@@ -339,8 +337,7 @@ void EventThread::eventLoop()


            DEBUG_STREAM << "EventThread::eventLoop() sleep for " << sleepTime << endl;
            DEBUG_STREAM << "EventThread::eventLoop() sleep for " << sleepTime << endl;


			boost::posix_time::milliseconds sleepPosixTime(sleepTime);
            boost::this_thread::sleep_for(boost::chrono::milliseconds(sleepTime));
			boost::this_thread::sleep(sleepPosixTime);
		}
		}
		catch(boost::thread_interrupted& ex)
		catch(boost::thread_interrupted& ex)
		{
		{
+134 −113
Original line number Original line Diff line number Diff line
@@ -42,14 +42,13 @@ WorkerThread::~WorkerThread()
}
}


//==============================================================================
//==============================================================================
//	WorkerThread::workerLoop()()
//	WorkerThread::workerLoop()
//==============================================================================
//==============================================================================
void WorkerThread::workerLoop()
void WorkerThread::workerLoop()
{
{
    DEBUG_STREAM << "WorkerThread::workerLoop(): starting loop" << endl;
    DEBUG_STREAM << "WorkerThread::workerLoop(): starting loop" << endl;


	unsigned int waitTime = m_configuration_sp->getWaitTime();
	unsigned int waitTime = m_configuration_sp->getWaitTime();
	std::string watchPath = m_configuration_sp->getWatchPath();


    while(true)
    while(true)
    {
    {
@@ -60,8 +59,7 @@ void WorkerThread::workerLoop()
            std::string fileName = origPath.stem().string();
            std::string fileName = origPath.stem().string();


            //If configured wait after new file event
            //If configured wait after new file event
            boost::posix_time::milliseconds waitPosixTime(waitTime);
            boost::this_thread::sleep_for(boost::chrono::milliseconds(waitTime));
            boost::this_thread::sleep( waitPosixTime );


            bool completed = false;
            bool completed = false;


@@ -79,10 +77,14 @@ void WorkerThread::workerLoop()
                    //Try to ingest using an instrument in list
                    //Try to ingest using an instrument in list
                    ingestUsingInstrumentList(origPath, fitsFile_sp);
                    ingestUsingInstrumentList(origPath, fitsFile_sp);


                    //Notify archive correctly event
                    completed = true;
                    completed = true;
                    m_fitsImporter_p->incrementRegularCounter();
                    m_fitsImporter_p->incrementRegularCounter();
                    INFO_STREAM << "WorkerThread::workerLoop() \"" << fileName
                    INFO_STREAM << "WorkerThread::workerLoop() \"" << fileName
                        << "\" archived regularly" << endl;
                        << "\" archived regularly" << endl;

                    //Remove original file
                    boost::filesystem::remove(origPath);
                }
                }
                catch(CCfits::FitsException& ex)
                catch(CCfits::FitsException& ex)
                {
                {
@@ -98,10 +100,14 @@ void WorkerThread::workerLoop()
                    //Try to ingest using default instrument
                    //Try to ingest using default instrument
                    ingestUsingDefaultInstrument(origPath, fitsFile_sp);
                    ingestUsingDefaultInstrument(origPath, fitsFile_sp);


                    //Notify archiving warning event
                    completed = true;
                    completed = true;
                    m_fitsImporter_p->incrementWarningCounter();
                    m_fitsImporter_p->incrementWarningCounter();
                    WARN_STREAM << "WorkerThread::workerLoop() \"" << fileName
                    WARN_STREAM << "WorkerThread::workerLoop() \"" << fileName
                        << "\" archived in default instrument" << endl;
                        << "\" archived in default instrument" << endl;

                    //Remove original file
                    boost::filesystem::remove(origPath);
                }
                }
            }
            }
            catch(CCfits::FitsException& ex)
            catch(CCfits::FitsException& ex)
@@ -115,7 +121,7 @@ void WorkerThread::workerLoop()


            if(!completed)
            if(!completed)
            {
            {
                //Cannot ingest new file => notify error
                //Notify archiving error event
                m_fitsImporter_p->incrementErrorCounter();
                m_fitsImporter_p->incrementErrorCounter();
                ERROR_STREAM << "WorkerThread::workerLoop() \"" << fileName
                ERROR_STREAM << "WorkerThread::workerLoop() \"" << fileName
                    << "\" not archived" << endl;
                    << "\" not archived" << endl;
@@ -225,6 +231,83 @@ void WorkerThread::ingestUsingDefaultInstrument(boost::filesystem::path& origPat
    processFits(origPath, fitsFile_sp, m_defaultInstrument_sp, defaultDate);
    processFits(origPath, fitsFile_sp, m_defaultInstrument_sp, defaultDate);
}
}


//==============================================================================
//	WorkerThread::parseDate()
//==============================================================================
boost::gregorian::date WorkerThread::parseDate(std::string dateString)
	throw(boost::bad_lexical_cast, std::runtime_error)
{
    DEBUG_STREAM << "WorkerThread::parseDate()" << endl;

	std::stringstream dateStream;

	//Regex match date like DD/MM/YYYYY DD-MM-YYYY DD.MM.YYYY
	boost::regex pattern_1("^(0?[1-9]|[1-2][0-9]|3[01])(/|-|\\.)(0?[1-9]|1[012])(/|-|\\.)((19|20)\\d\\d).*$");

	boost::smatch matches_1;
	if(boost::regex_match(dateString, matches_1, pattern_1))
	{
        DEBUG_STREAM << "WorkerThread::parseDate() match date like DD/MM/YYYYY DD-MM-YYYY DD.MM.YYYY" << endl;

		if(matches_1.size() == 7)
			dateStream << matches_1[5] << "/" << matches_1[3] << "/" << matches_1[1];
		else
			throw std::runtime_error("Error parsing date like DD/MM/YYYYY DD-MM-YYYY DD.MM.YYYY");

		return boost::gregorian::date(boost::gregorian::from_string(dateStream.str()));
	}

	//Regex match date like DDMMYYYY
	boost::regex pattern_2("^(0?[1-9]|[1-2][0-9]|3[01])(0?[1-9]|1[012])((19|20)\\d\\d).*$");

	boost::smatch matches_2;
	if(boost::regex_match(dateString, matches_2, pattern_2))
	{
        DEBUG_STREAM << "WorkerThread::parseDate() match date like DDMMYYYY" << endl;

		if(matches_2.size() == 5)
			dateStream << matches_2[3] << "/" << matches_2[2] << "/" << matches_2[1];
		else
			throw std::runtime_error("Error parsing DDMMYYYY");

		return boost::gregorian::date(boost::gregorian::from_string(dateStream.str()));
	}

	//Regex match date like YYYY/MM/DD YYYY-MM-DD YYYY.MM.DD
	boost::regex pattern_3("^((19|20)\\d\\d)(/|-|\\.)(0?[1-9]|1[012])(/|-|\\.)(0+[1-9]|[1-2][0-9]|3[01]).*$");

	boost::smatch matches_3;
	if(boost::regex_match(dateString, matches_3, pattern_3))
	{
        DEBUG_STREAM << "WorkerThread::parseDate() match date like YYYY/MM/DD YYYY-MM-DD YYYY.MM.DD" << endl;

		if(matches_3.size() == 7)
			dateStream << matches_3[1] << "/" << matches_3[4] << "/" << matches_3[6];
		else
			throw std::runtime_error("Error parsing date like YYYY/MM/DD YYYY-MM-DD YYYY.MM.DD");

		return boost::gregorian::date(boost::gregorian::from_string(dateStream.str()));
	}

	//Regex match date like YYYYMMDD
	boost::regex pattern_4("^((19|20)\\d\\d)(0?[1-9]|1[012])(0+[1-9]|[1-2][0-9]|3[01]).*$");

	boost::smatch matches_4;
	if(boost::regex_match(dateString, matches_4, pattern_4))
	{
        DEBUG_STREAM << "WorkerThread::parseDate() match date like YYYYMMDD" << endl;

		if(matches_4.size() == 5)
			dateStream << matches_4[1] << "/" << matches_4[3] << "/" << matches_4[4];
		else
			throw std::runtime_error("Error parsing YYYYMMDD");

		return boost::gregorian::date(boost::gregorian::from_string(dateStream.str()));
	}

	throw std::runtime_error("Date string not match any known format");
}

//==============================================================================
//==============================================================================
//	WorkerThread::processFits()
//	WorkerThread::processFits()
//==============================================================================
//==============================================================================
@@ -255,11 +338,28 @@ void WorkerThread::processFits(boost::filesystem::path& origPath,
		fitsFile_sp->flush();
		fitsFile_sp->flush();
	}
	}


	//Insert metadata into database
	//Create metadata query
	execQuery(session_sp, destination_sp, date, duplicateMax, origPath, fitsFile_sp);
	std::string query = composeQuery(destination_sp, date, duplicateMax,
        origPath, fitsFile_sp);


	//Archive file to storage
    //Create destination path
	moveFile(origPath,destination_sp, date, duplicateMax);
    boost::filesystem::path destPath = createDestPath(origPath,
        destination_sp, date, duplicateMax);

	//Copy file to archive storage
	copyFile(origPath, destPath);

    try
    {
        //Insert metadata into database
        *session_sp << query;
    }
    catch(std::runtime_error& ex)
    {
        //In case of error remove archived file
        boost::filesystem::remove(destPath);
        throw(ex);
    }


	//Commit Transaction
	//Commit Transaction
	transaction.commit();
	transaction.commit();
@@ -306,14 +406,14 @@ int WorkerThread::findDuplicateMax(ConnectionManager::SessionSP
}
}


//==============================================================================
//==============================================================================
//	WorkerThread::execQuery()
//	WorkerThread::composeQuery()
//==============================================================================
//==============================================================================
void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp,
std::string WorkerThread::composeQuery(Destination::SP destination_sp,
	Destination::SP destination_sp, boost::gregorian::date& date, int duplicateMax,
    boost::gregorian::date& date, int duplicateMax, boost::filesystem::path& origPath,
	boost::filesystem::path& origPath, boost::shared_ptr<CCfits::FITS> fitsFile_sp)
    boost::shared_ptr<CCfits::FITS> fitsFile_sp)
	throw(CCfits::FitsException, std::runtime_error)
	throw(CCfits::FitsException, std::runtime_error)
{
{
    DEBUG_STREAM << "WorkerThread::execQuery()" << endl;
    DEBUG_STREAM << "WorkerThread::composeQuery()" << endl;


	//Insert part of the query
	//Insert part of the query
	std::stringstream insertQuery;
	std::stringstream insertQuery;
@@ -351,8 +451,7 @@ void WorkerThread::execQuery(ConnectionManager::SessionSP session_sp,


    DEBUG_STREAM << "WorkerThread::execQuery() query " << insertQuery.str() << endl;
    DEBUG_STREAM << "WorkerThread::execQuery() query " << insertQuery.str() << endl;


	//Exec query
    return insertQuery.str();
	*session_sp << insertQuery.str();
}
}


//==============================================================================
//==============================================================================
@@ -660,18 +759,13 @@ std::string WorkerThread::readKey(CCfits::HDU& hdu, std::string keyName,
}
}


//==============================================================================
//==============================================================================
//	WorkerThread::moveFile()
//	WorkerThread::createDestPath()
//==============================================================================
//==============================================================================
void WorkerThread::moveFile(boost::filesystem::path& origPath,
boost::filesystem::path WorkerThread::createDestPath(boost::filesystem::path& origPath,
	Destination::SP destination_sp, boost::gregorian::date& date,
	Destination::SP destination_sp, boost::gregorian::date& date,
	int duplicateMax) throw(std::runtime_error)
	int duplicateMax) throw(std::runtime_error)
{
{
    DEBUG_STREAM << "WorkerThread::moveFile()" << endl;
    DEBUG_STREAM << "WorkerThread::createDestPath()" << endl;

	//Is origin path a regular file
	if(!boost::filesystem::is_regular_file(origPath))
		throw std::runtime_error( "Origin path \"" +
			origPath.string() + "\" is not a regular file");


	//Destination path = storage path + directory name
	//Destination path = storage path + directory name
    // + date path + duplicate max + file name
    // + date path + duplicate max + file name
@@ -703,6 +797,22 @@ void WorkerThread::moveFile(boost::filesystem::path& origPath,
	//Append file name to destination path
	//Append file name to destination path
	destPath /= destFile;
	destPath /= destFile;


    return destPath;
}

//==============================================================================
//	WorkerThread::copyFile()
//==============================================================================
void WorkerThread::copyFile(boost::filesystem::path& origPath,
	boost::filesystem::path& destPath) throw(std::runtime_error)
{
    DEBUG_STREAM << "WorkerThread::copyFile()" << endl;

	//Is origin path a regular file
	if(!boost::filesystem::is_regular_file(origPath))
		throw std::runtime_error( "Origin path \"" +
			origPath.string() + "\" is not a regular file");

#ifdef VERBOSE_DEBUG
#ifdef VERBOSE_DEBUG
    INFO_STREAM << "FROM: " << origPath.string() << " TO: " << destPath.string() << endl;
    INFO_STREAM << "FROM: " << origPath.string() << " TO: " << destPath.string() << endl;
    INFO_STREAM << "-------------------------------------------------" << endl;
    INFO_STREAM << "-------------------------------------------------" << endl;
@@ -724,95 +834,6 @@ void WorkerThread::moveFile(boost::filesystem::path& origPath,
	if(!boost::filesystem::is_regular_file(destPath))
	if(!boost::filesystem::is_regular_file(destPath))
		throw std::runtime_error( "Copy failure: storage path \"" +
		throw std::runtime_error( "Copy failure: storage path \"" +
			destPath.string() + "\" is not a regular file");
			destPath.string() + "\" is not a regular file");

	//Remove origin file after copy, do not throw exception in case,
	//file is already archived correctly, just produce an error log
	try
	{
		boost::filesystem::remove(origPath);
	}
	catch(std::runtime_error& ex)
	{
        ERROR_STREAM << "WorkerThread::moveFile() cannot remove file " <<
            origPath.string() << endl;
	}
}

//==============================================================================
//	WorkerThread::parseDate()
//==============================================================================
boost::gregorian::date WorkerThread::parseDate(std::string dateString)
	throw(boost::bad_lexical_cast, std::runtime_error)
{
    DEBUG_STREAM << "WorkerThread::parseDate()" << endl;

	std::stringstream dateStream;

	//Regex match date like DD/MM/YYYYY DD-MM-YYYY DD.MM.YYYY
	boost::regex pattern_1("^(0?[1-9]|[1-2][0-9]|3[01])(/|-|\\.)(0?[1-9]|1[012])(/|-|\\.)((19|20)\\d\\d).*$");

	boost::smatch matches_1;
	if(boost::regex_match(dateString, matches_1, pattern_1))
	{
        DEBUG_STREAM << "WorkerThread::parseDate() match date like DD/MM/YYYYY DD-MM-YYYY DD.MM.YYYY" << endl;

		if(matches_1.size() == 7)
			dateStream << matches_1[5] << "/" << matches_1[3] << "/" << matches_1[1];
		else
			throw std::runtime_error("Error parsing date like DD/MM/YYYYY DD-MM-YYYY DD.MM.YYYY");

		return boost::gregorian::date(boost::gregorian::from_string(dateStream.str()));
	}

	//Regex match date like DDMMYYYY
	boost::regex pattern_2("^(0?[1-9]|[1-2][0-9]|3[01])(0?[1-9]|1[012])((19|20)\\d\\d).*$");

	boost::smatch matches_2;
	if(boost::regex_match(dateString, matches_2, pattern_2))
	{
        DEBUG_STREAM << "WorkerThread::parseDate() match date like DDMMYYYY" << endl;

		if(matches_2.size() == 5)
			dateStream << matches_2[3] << "/" << matches_2[2] << "/" << matches_2[1];
		else
			throw std::runtime_error("Error parsing DDMMYYYY");

		return boost::gregorian::date(boost::gregorian::from_string(dateStream.str()));
	}

	//Regex match date like YYYY/MM/DD YYYY-MM-DD YYYY.MM.DD
	boost::regex pattern_3("^((19|20)\\d\\d)(/|-|\\.)(0?[1-9]|1[012])(/|-|\\.)(0+[1-9]|[1-2][0-9]|3[01]).*$");

	boost::smatch matches_3;
	if(boost::regex_match(dateString, matches_3, pattern_3))
	{
        DEBUG_STREAM << "WorkerThread::parseDate() match date like YYYY/MM/DD YYYY-MM-DD YYYY.MM.DD" << endl;

		if(matches_3.size() == 7)
			dateStream << matches_3[1] << "/" << matches_3[4] << "/" << matches_3[6];
		else
			throw std::runtime_error("Error parsing date like YYYY/MM/DD YYYY-MM-DD YYYY.MM.DD");

		return boost::gregorian::date(boost::gregorian::from_string(dateStream.str()));
	}

	//Regex match date like YYYYMMDD
	boost::regex pattern_4("^((19|20)\\d\\d)(0?[1-9]|1[012])(0+[1-9]|[1-2][0-9]|3[01]).*$");

	boost::smatch matches_4;
	if(boost::regex_match(dateString, matches_4, pattern_4))
	{
        DEBUG_STREAM << "WorkerThread::parseDate() match date like YYYYMMDD" << endl;

		if(matches_4.size() == 5)
			dateStream << matches_4[1] << "/" << matches_4[3] << "/" << matches_4[4];
		else
			throw std::runtime_error("Error parsing YYYYMMDD");

		return boost::gregorian::date(boost::gregorian::from_string(dateStream.str()));
	}

	throw std::runtime_error("Date string not match any known format");
}
}


}   //namespace
}   //namespace
+27 −9
Original line number Original line Diff line number Diff line
@@ -35,13 +35,13 @@ public:
	virtual ~WorkerThread();
	virtual ~WorkerThread();


//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
//	[Public] Users method
//	[Public] Thread loop method
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
	virtual void workerLoop();
	virtual void workerLoop();


protected:
protected:
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
//	[Protected] Utilities methods
//	[Protected] Instrument related archiving process methods
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
    virtual void ingestUsingInstrumentList(boost::filesystem::path&,
    virtual void ingestUsingInstrumentList(boost::filesystem::path&,
        boost::shared_ptr<CCfits::FITS>)
        boost::shared_ptr<CCfits::FITS>)
@@ -51,18 +51,32 @@ protected:
        boost::shared_ptr<CCfits::FITS>)
        boost::shared_ptr<CCfits::FITS>)
        throw(CCfits::FitsException, std::runtime_error);
        throw(CCfits::FitsException, std::runtime_error);


//------------------------------------------------------------------------------
//	[Protected] Date parsing methods
//------------------------------------------------------------------------------
	virtual boost::gregorian::date parseDate(std::string)
		throw(boost::bad_lexical_cast, std::runtime_error);

//------------------------------------------------------------------------------
//	[Protected] Common archiving process method
//------------------------------------------------------------------------------
	virtual void processFits(boost::filesystem::path&,
	virtual void processFits(boost::filesystem::path&,
		boost::shared_ptr<CCfits::FITS>, Instrument::SP,
		boost::shared_ptr<CCfits::FITS>, Instrument::SP,
		boost::gregorian::date&)
		boost::gregorian::date&)
		throw(CCfits::FitsException, std::runtime_error);
		throw(CCfits::FitsException, std::runtime_error);


//------------------------------------------------------------------------------
//	[Protected] Duplicate discovery method
//------------------------------------------------------------------------------
	virtual int findDuplicateMax(ConnectionManager::SessionSP,
	virtual int findDuplicateMax(ConnectionManager::SessionSP,
		Destination::SP, boost::filesystem::path&)
		Destination::SP, boost::filesystem::path&)
		throw(std::runtime_error);
		throw(std::runtime_error);


	virtual void execQuery(ConnectionManager::SessionSP, Destination::SP,
//------------------------------------------------------------------------------
		boost::gregorian::date&, int, boost::filesystem::path&,
//	[Protected] Metadata archiving methods
		boost::shared_ptr<CCfits::FITS>)
//------------------------------------------------------------------------------
	virtual std::string composeQuery(Destination::SP, boost::gregorian::date&,
		int, boost::filesystem::path&, boost::shared_ptr<CCfits::FITS>)
        throw(CCfits::FitsException, std::runtime_error);
        throw(CCfits::FitsException, std::runtime_error);


    virtual void fillQueryFromFits(std::stringstream&, std::stringstream&,
    virtual void fillQueryFromFits(std::stringstream&, std::stringstream&,
@@ -78,11 +92,15 @@ protected:
	virtual std::string readKey(CCfits::HDU&, std::string, std::string)
	virtual std::string readKey(CCfits::HDU&, std::string, std::string)
        throw(CCfits::FitsException, std::runtime_error);
        throw(CCfits::FitsException, std::runtime_error);


	virtual void moveFile(boost::filesystem::path&, Destination::SP,
//------------------------------------------------------------------------------
		boost::gregorian::date&, int) throw(std::runtime_error);
//	[Protected] File archiving methods
//------------------------------------------------------------------------------
	virtual boost::filesystem::path createDestPath(boost::filesystem::path&,
		Destination::SP, boost::gregorian::date&, int)
		throw(std::runtime_error);


	virtual boost::gregorian::date parseDate(std::string)
	virtual void copyFile(boost::filesystem::path&, boost::filesystem::path& )
		throw(boost::bad_lexical_cast, std::runtime_error);
		throw(std::runtime_error);


//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
//	[Protected] Class variables
//	[Protected] Class variables