Loading src/WorkerThread.cpp +40 −23 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ #include <cassert> #include <fstream> #include <iomanip> #include <set> #include <boost/scoped_ptr.hpp> #include <boost/iostreams/filtering_streambuf.hpp> Loading Loading @@ -73,10 +74,13 @@ void WorkerThread::workerLoop() boost::shared_ptr<CCfits::FITS> fitsFile_sp( new CCfits::FITS(origPath.string(), CCfits::Write)); //Keep track of already readed HDU to avoid memory leaks std::set<int> readHDUSet; try { //Try to ingest using an instrument in list ingestUsingInstrumentList(origPath, fitsFile_sp); ingestUsingInstrumentList(origPath, fitsFile_sp, readHDUSet); //Notify archive correctly event completed = true; Loading @@ -99,7 +103,7 @@ void WorkerThread::workerLoop() if(!completed) { //Try to ingest using default instrument ingestUsingDefaultInstrument(origPath, fitsFile_sp); ingestUsingDefaultInstrument(origPath, fitsFile_sp, readHDUSet); //Notify archiving warning event completed = true; Loading Loading @@ -151,13 +155,16 @@ void WorkerThread::workerLoop() // WorkerThread::ingestUsingInstrumentList() //============================================================================== void WorkerThread::ingestUsingInstrumentList(boost::filesystem::path& origPath, boost::shared_ptr<CCfits::FITS> fitsFile_sp) throw(CCfits::FitsException, std::runtime_error) boost::shared_ptr<CCfits::FITS> fitsFile_sp, std::set<int>& readHDUSet) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::ingestUsingInstrumentList()" << endl; CCfits::PHDU& phdu = fitsFile_sp->pHDU(); //Read all primary HDU keys and keep track phdu.readAllKeys(); readHDUSet.insert(0); Instrument::SP instrument_sp; boost::gregorian::date date; Loading Loading @@ -208,7 +215,7 @@ void WorkerThread::ingestUsingInstrumentList(boost::filesystem::path& origPath, } //for if(found) processFits(origPath, fitsFile_sp, instrument_sp, date); processFits(origPath, fitsFile_sp, readHDUSet, instrument_sp, date); else throw std::runtime_error("Instrument not found in instrument list"); } Loading @@ -217,8 +224,8 @@ void WorkerThread::ingestUsingInstrumentList(boost::filesystem::path& origPath, // WorkerThread::ingestUsingDefaultInstrument() //=============================================================================== void WorkerThread::ingestUsingDefaultInstrument(boost::filesystem::path& origPath, boost::shared_ptr<CCfits::FITS> fitsFile_sp) throw(CCfits::FitsException, std::runtime_error) boost::shared_ptr<CCfits::FITS> fitsFile_sp, std::set<int>& readHDUSet) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::ingestUsingDefaultInstrument()" << endl; Loading @@ -229,7 +236,7 @@ void WorkerThread::ingestUsingDefaultInstrument(boost::filesystem::path& origPat //boost::gregorian::date defaultDate = boost::gregorian::day_clock::universal_day(); boost::gregorian::date defaultDate = boost::gregorian::day_clock::local_day(); processFits(origPath, fitsFile_sp, m_defaultInstrument_sp, defaultDate); processFits(origPath, fitsFile_sp, readHDUSet, m_defaultInstrument_sp, defaultDate); } //============================================================================== Loading Loading @@ -313,8 +320,9 @@ boost::gregorian::date WorkerThread::parseDate(std::string dateString) // WorkerThread::processFits() //============================================================================== void WorkerThread::processFits(boost::filesystem::path& origPath, boost::shared_ptr<CCfits::FITS> fitsFile_sp, Instrument::SP instrument_sp, boost::gregorian::date& date) throw (CCfits::FitsException, std::runtime_error) boost::shared_ptr<CCfits::FITS> fitsFile_sp, std::set<int>& readHDUSet, Instrument::SP instrument_sp, boost::gregorian::date& date) throw (CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::processFits()" << endl; Loading @@ -341,7 +349,7 @@ void WorkerThread::processFits(boost::filesystem::path& origPath, //Create metadata query std::string query = composeQuery(destination_sp, date, duplicateMax, origPath, fitsFile_sp); origPath, fitsFile_sp, readHDUSet); //Create destination path boost::filesystem::path destPath = createDestPath(origPath, Loading Loading @@ -369,8 +377,8 @@ void WorkerThread::processFits(boost::filesystem::path& origPath, //============================================================================== // WorkerThread::findDuplicateMax() //============================================================================== int WorkerThread::findDuplicateMax(ConnectionManager::SessionSP session_sp, Destination::SP destination_sp, boost::filesystem::path& origPath) int WorkerThread::findDuplicateMax(ConnectionManager::SessionSP session_sp, Destination::SP destination_sp, boost::filesystem::path& origPath) throw(std::runtime_error) { DEBUG_STREAM << "WorkerThread::findDuplicateMax()" << endl; Loading Loading @@ -411,7 +419,7 @@ int WorkerThread::findDuplicateMax(ConnectionManager::SessionSP //============================================================================== std::string WorkerThread::composeQuery(Destination::SP destination_sp, boost::gregorian::date& date, int duplicateMax, boost::filesystem::path& origPath, boost::shared_ptr<CCfits::FITS> fitsFile_sp) boost::shared_ptr<CCfits::FITS> fitsFile_sp, std::set<int>& readHDUSet) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::composeQuery()" << endl; Loading Loading @@ -447,7 +455,7 @@ std::string WorkerThread::composeQuery(Destination::SP destination_sp, valuesQuery << ",\'" << origPath.filename().string() << ".gz\'"; //Search key from mapping and fill query fillQueryFromFits(insertQuery, valuesQuery, destination_sp, fitsFile_sp); fillQueryFromFits(insertQuery, valuesQuery, destination_sp, fitsFile_sp, readHDUSet); //Merge insert and values queries insertQuery << ") " << valuesQuery.rdbuf() << ")"; Loading @@ -462,7 +470,7 @@ std::string WorkerThread::composeQuery(Destination::SP destination_sp, //============================================================================== void WorkerThread::fillQueryFromFits(std::stringstream& insertQuery, std::stringstream& valuesQuery, Destination::SP destination_sp, boost::shared_ptr<CCfits::FITS> fitsFile_sp) boost::shared_ptr<CCfits::FITS> fitsFile_sp, std::set<int>& readHDUSet) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::fillQueryFromFits()" << endl; Loading @@ -476,6 +484,19 @@ void WorkerThread::fillQueryFromFits(std::stringstream& insertQuery, { int fitsHDU = (*it)->getFitsHDU(); //If current HDU has not been already read if(readHDUSet.find(fitsHDU) == readHDUSet.end()) { //Read all currnet HDU keys if(fitsHDU == 0) fitsFile_sp->pHDU().readAllKeys(); else fitsFile_sp->extension(fitsHDU).readAllKeys(); //Keep track of read HDU readHDUSet.insert(fitsHDU); } std::string value; try Loading @@ -501,7 +522,7 @@ void WorkerThread::fillQueryFromFits(std::stringstream& insertQuery, } catch(CCfits::HDU::NoSuchKeyword& ex) { WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; WARN_STREAM << "WorkerThread::fillQueryFromFits() " << ex.message() << endl; try { Loading @@ -524,7 +545,7 @@ void WorkerThread::fillQueryFromFits(std::stringstream& insertQuery, } catch(CCfits::HDU::NoSuchKeyword& ex) { WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; WARN_STREAM << "WorkerThread::fillQueryFromFits() " << ex.message() << endl; //If key is mandatory throw and exception if((*it)->isMandatory()) Loading @@ -534,7 +555,7 @@ void WorkerThread::fillQueryFromFits(std::stringstream& insertQuery, } catch(CCfits::FITS::NoSuchHDU& ex) { WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; WARN_STREAM << "WorkerThread::fillQueryFromFits() " << ex.message() << endl; //If key is mandatory throw and exception if((*it)->isMandatory()) Loading @@ -551,8 +572,6 @@ std::string WorkerThread::readKeyFromPHDU(CCfits::PHDU& phdu, std::string keyNam { DEBUG_STREAM << "WorkerThread::readKeyFromPHDU()" << endl; phdu.readAllKeys(); std::stringstream resultStream; boost::regex pattern("^(NAXIS)([0-9]+)$", boost::regex::icase); Loading Loading @@ -621,8 +640,6 @@ std::string WorkerThread::readKeyFromHDU(CCfits::HDU& hdu, std::string keyName, { DEBUG_STREAM << "WorkerThread::readKeyFromHDU()" << endl; hdu.readAllKeys(); std::stringstream resultStream; boost::regex pattern("^(NAXIS)([0-9]+)$", boost::regex::icase); Loading src/WorkerThread.h +8 −9 Original line number Diff line number Diff line Loading @@ -44,11 +44,11 @@ protected: // [Protected] Instrument related archiving process methods //------------------------------------------------------------------------------ virtual void ingestUsingInstrumentList(boost::filesystem::path&, boost::shared_ptr<CCfits::FITS>) boost::shared_ptr<CCfits::FITS>, std::set<int>&) throw(CCfits::FitsException, std::runtime_error); virtual void ingestUsingDefaultInstrument(boost::filesystem::path&, boost::shared_ptr<CCfits::FITS>) boost::shared_ptr<CCfits::FITS>, std::set<int>&) throw(CCfits::FitsException, std::runtime_error); //------------------------------------------------------------------------------ Loading @@ -61,9 +61,8 @@ protected: // [Protected] Common archiving process method //------------------------------------------------------------------------------ virtual void processFits(boost::filesystem::path&, boost::shared_ptr<CCfits::FITS>, Instrument::SP, boost::gregorian::date&) throw(CCfits::FitsException, std::runtime_error); boost::shared_ptr<CCfits::FITS>, std::set<int>&, Instrument::SP, boost::gregorian::date&) throw(CCfits::FitsException, std::runtime_error); //------------------------------------------------------------------------------ // [Protected] Duplicate discovery method Loading @@ -76,11 +75,11 @@ protected: // [Protected] Metadata archiving methods //------------------------------------------------------------------------------ virtual std::string composeQuery(Destination::SP, boost::gregorian::date&, int, boost::filesystem::path&, boost::shared_ptr<CCfits::FITS>) throw(CCfits::FitsException, std::runtime_error); int, boost::filesystem::path&, boost::shared_ptr<CCfits::FITS>, std::set<int>&) throw(CCfits::FitsException, std::runtime_error); virtual void fillQueryFromFits(std::stringstream&, std::stringstream&, Destination::SP, boost::shared_ptr<CCfits::FITS>) Destination::SP, boost::shared_ptr<CCfits::FITS>, std::set<int>&) throw(CCfits::FitsException, std::runtime_error); virtual std::string readKeyFromPHDU(CCfits::PHDU&, std::string, std::string) Loading Loading
src/WorkerThread.cpp +40 −23 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ #include <cassert> #include <fstream> #include <iomanip> #include <set> #include <boost/scoped_ptr.hpp> #include <boost/iostreams/filtering_streambuf.hpp> Loading Loading @@ -73,10 +74,13 @@ void WorkerThread::workerLoop() boost::shared_ptr<CCfits::FITS> fitsFile_sp( new CCfits::FITS(origPath.string(), CCfits::Write)); //Keep track of already readed HDU to avoid memory leaks std::set<int> readHDUSet; try { //Try to ingest using an instrument in list ingestUsingInstrumentList(origPath, fitsFile_sp); ingestUsingInstrumentList(origPath, fitsFile_sp, readHDUSet); //Notify archive correctly event completed = true; Loading @@ -99,7 +103,7 @@ void WorkerThread::workerLoop() if(!completed) { //Try to ingest using default instrument ingestUsingDefaultInstrument(origPath, fitsFile_sp); ingestUsingDefaultInstrument(origPath, fitsFile_sp, readHDUSet); //Notify archiving warning event completed = true; Loading Loading @@ -151,13 +155,16 @@ void WorkerThread::workerLoop() // WorkerThread::ingestUsingInstrumentList() //============================================================================== void WorkerThread::ingestUsingInstrumentList(boost::filesystem::path& origPath, boost::shared_ptr<CCfits::FITS> fitsFile_sp) throw(CCfits::FitsException, std::runtime_error) boost::shared_ptr<CCfits::FITS> fitsFile_sp, std::set<int>& readHDUSet) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::ingestUsingInstrumentList()" << endl; CCfits::PHDU& phdu = fitsFile_sp->pHDU(); //Read all primary HDU keys and keep track phdu.readAllKeys(); readHDUSet.insert(0); Instrument::SP instrument_sp; boost::gregorian::date date; Loading Loading @@ -208,7 +215,7 @@ void WorkerThread::ingestUsingInstrumentList(boost::filesystem::path& origPath, } //for if(found) processFits(origPath, fitsFile_sp, instrument_sp, date); processFits(origPath, fitsFile_sp, readHDUSet, instrument_sp, date); else throw std::runtime_error("Instrument not found in instrument list"); } Loading @@ -217,8 +224,8 @@ void WorkerThread::ingestUsingInstrumentList(boost::filesystem::path& origPath, // WorkerThread::ingestUsingDefaultInstrument() //=============================================================================== void WorkerThread::ingestUsingDefaultInstrument(boost::filesystem::path& origPath, boost::shared_ptr<CCfits::FITS> fitsFile_sp) throw(CCfits::FitsException, std::runtime_error) boost::shared_ptr<CCfits::FITS> fitsFile_sp, std::set<int>& readHDUSet) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::ingestUsingDefaultInstrument()" << endl; Loading @@ -229,7 +236,7 @@ void WorkerThread::ingestUsingDefaultInstrument(boost::filesystem::path& origPat //boost::gregorian::date defaultDate = boost::gregorian::day_clock::universal_day(); boost::gregorian::date defaultDate = boost::gregorian::day_clock::local_day(); processFits(origPath, fitsFile_sp, m_defaultInstrument_sp, defaultDate); processFits(origPath, fitsFile_sp, readHDUSet, m_defaultInstrument_sp, defaultDate); } //============================================================================== Loading Loading @@ -313,8 +320,9 @@ boost::gregorian::date WorkerThread::parseDate(std::string dateString) // WorkerThread::processFits() //============================================================================== void WorkerThread::processFits(boost::filesystem::path& origPath, boost::shared_ptr<CCfits::FITS> fitsFile_sp, Instrument::SP instrument_sp, boost::gregorian::date& date) throw (CCfits::FitsException, std::runtime_error) boost::shared_ptr<CCfits::FITS> fitsFile_sp, std::set<int>& readHDUSet, Instrument::SP instrument_sp, boost::gregorian::date& date) throw (CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::processFits()" << endl; Loading @@ -341,7 +349,7 @@ void WorkerThread::processFits(boost::filesystem::path& origPath, //Create metadata query std::string query = composeQuery(destination_sp, date, duplicateMax, origPath, fitsFile_sp); origPath, fitsFile_sp, readHDUSet); //Create destination path boost::filesystem::path destPath = createDestPath(origPath, Loading Loading @@ -369,8 +377,8 @@ void WorkerThread::processFits(boost::filesystem::path& origPath, //============================================================================== // WorkerThread::findDuplicateMax() //============================================================================== int WorkerThread::findDuplicateMax(ConnectionManager::SessionSP session_sp, Destination::SP destination_sp, boost::filesystem::path& origPath) int WorkerThread::findDuplicateMax(ConnectionManager::SessionSP session_sp, Destination::SP destination_sp, boost::filesystem::path& origPath) throw(std::runtime_error) { DEBUG_STREAM << "WorkerThread::findDuplicateMax()" << endl; Loading Loading @@ -411,7 +419,7 @@ int WorkerThread::findDuplicateMax(ConnectionManager::SessionSP //============================================================================== std::string WorkerThread::composeQuery(Destination::SP destination_sp, boost::gregorian::date& date, int duplicateMax, boost::filesystem::path& origPath, boost::shared_ptr<CCfits::FITS> fitsFile_sp) boost::shared_ptr<CCfits::FITS> fitsFile_sp, std::set<int>& readHDUSet) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::composeQuery()" << endl; Loading Loading @@ -447,7 +455,7 @@ std::string WorkerThread::composeQuery(Destination::SP destination_sp, valuesQuery << ",\'" << origPath.filename().string() << ".gz\'"; //Search key from mapping and fill query fillQueryFromFits(insertQuery, valuesQuery, destination_sp, fitsFile_sp); fillQueryFromFits(insertQuery, valuesQuery, destination_sp, fitsFile_sp, readHDUSet); //Merge insert and values queries insertQuery << ") " << valuesQuery.rdbuf() << ")"; Loading @@ -462,7 +470,7 @@ std::string WorkerThread::composeQuery(Destination::SP destination_sp, //============================================================================== void WorkerThread::fillQueryFromFits(std::stringstream& insertQuery, std::stringstream& valuesQuery, Destination::SP destination_sp, boost::shared_ptr<CCfits::FITS> fitsFile_sp) boost::shared_ptr<CCfits::FITS> fitsFile_sp, std::set<int>& readHDUSet) throw(CCfits::FitsException, std::runtime_error) { DEBUG_STREAM << "WorkerThread::fillQueryFromFits()" << endl; Loading @@ -476,6 +484,19 @@ void WorkerThread::fillQueryFromFits(std::stringstream& insertQuery, { int fitsHDU = (*it)->getFitsHDU(); //If current HDU has not been already read if(readHDUSet.find(fitsHDU) == readHDUSet.end()) { //Read all currnet HDU keys if(fitsHDU == 0) fitsFile_sp->pHDU().readAllKeys(); else fitsFile_sp->extension(fitsHDU).readAllKeys(); //Keep track of read HDU readHDUSet.insert(fitsHDU); } std::string value; try Loading @@ -501,7 +522,7 @@ void WorkerThread::fillQueryFromFits(std::stringstream& insertQuery, } catch(CCfits::HDU::NoSuchKeyword& ex) { WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; WARN_STREAM << "WorkerThread::fillQueryFromFits() " << ex.message() << endl; try { Loading @@ -524,7 +545,7 @@ void WorkerThread::fillQueryFromFits(std::stringstream& insertQuery, } catch(CCfits::HDU::NoSuchKeyword& ex) { WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; WARN_STREAM << "WorkerThread::fillQueryFromFits() " << ex.message() << endl; //If key is mandatory throw and exception if((*it)->isMandatory()) Loading @@ -534,7 +555,7 @@ void WorkerThread::fillQueryFromFits(std::stringstream& insertQuery, } catch(CCfits::FITS::NoSuchHDU& ex) { WARN_STREAM << "WorkerThread::execQuery() " << ex.message() << endl; WARN_STREAM << "WorkerThread::fillQueryFromFits() " << ex.message() << endl; //If key is mandatory throw and exception if((*it)->isMandatory()) Loading @@ -551,8 +572,6 @@ std::string WorkerThread::readKeyFromPHDU(CCfits::PHDU& phdu, std::string keyNam { DEBUG_STREAM << "WorkerThread::readKeyFromPHDU()" << endl; phdu.readAllKeys(); std::stringstream resultStream; boost::regex pattern("^(NAXIS)([0-9]+)$", boost::regex::icase); Loading Loading @@ -621,8 +640,6 @@ std::string WorkerThread::readKeyFromHDU(CCfits::HDU& hdu, std::string keyName, { DEBUG_STREAM << "WorkerThread::readKeyFromHDU()" << endl; hdu.readAllKeys(); std::stringstream resultStream; boost::regex pattern("^(NAXIS)([0-9]+)$", boost::regex::icase); Loading
src/WorkerThread.h +8 −9 Original line number Diff line number Diff line Loading @@ -44,11 +44,11 @@ protected: // [Protected] Instrument related archiving process methods //------------------------------------------------------------------------------ virtual void ingestUsingInstrumentList(boost::filesystem::path&, boost::shared_ptr<CCfits::FITS>) boost::shared_ptr<CCfits::FITS>, std::set<int>&) throw(CCfits::FitsException, std::runtime_error); virtual void ingestUsingDefaultInstrument(boost::filesystem::path&, boost::shared_ptr<CCfits::FITS>) boost::shared_ptr<CCfits::FITS>, std::set<int>&) throw(CCfits::FitsException, std::runtime_error); //------------------------------------------------------------------------------ Loading @@ -61,9 +61,8 @@ protected: // [Protected] Common archiving process method //------------------------------------------------------------------------------ virtual void processFits(boost::filesystem::path&, boost::shared_ptr<CCfits::FITS>, Instrument::SP, boost::gregorian::date&) throw(CCfits::FitsException, std::runtime_error); boost::shared_ptr<CCfits::FITS>, std::set<int>&, Instrument::SP, boost::gregorian::date&) throw(CCfits::FitsException, std::runtime_error); //------------------------------------------------------------------------------ // [Protected] Duplicate discovery method Loading @@ -76,11 +75,11 @@ protected: // [Protected] Metadata archiving methods //------------------------------------------------------------------------------ virtual std::string composeQuery(Destination::SP, boost::gregorian::date&, int, boost::filesystem::path&, boost::shared_ptr<CCfits::FITS>) throw(CCfits::FitsException, std::runtime_error); int, boost::filesystem::path&, boost::shared_ptr<CCfits::FITS>, std::set<int>&) throw(CCfits::FitsException, std::runtime_error); virtual void fillQueryFromFits(std::stringstream&, std::stringstream&, Destination::SP, boost::shared_ptr<CCfits::FITS>) Destination::SP, boost::shared_ptr<CCfits::FITS>, std::set<int>&) throw(CCfits::FitsException, std::runtime_error); virtual std::string readKeyFromPHDU(CCfits::PHDU&, std::string, std::string) Loading