Commit 656b6417 authored by Andrea Bignamini's avatar Andrea Bignamini
Browse files

Add checksum and notnew

The checksum computation has been added. The checksum is stored
in the checksum column. The check on the checksum has been added.
Only file with different checksum are ingested with file_version
increased by one.
The path notnew has been added for files already in the database
with the same checksum.
parent 27f93144
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -28,7 +28,7 @@ CXX_RELEASE_FLAGS=-O3
CXX_DEFAULT_FLAGS=-c -Wall -Wextra -std=c++0x
LDFLAGS=-Wall -lomniORB4 -lomniDynamic4 -lCOS4 -lomnithread -ltango -llog4tango \
	-lsoci_core -lsoci_mysql -lCCfits -lboost_thread -lboost_filesystem \
	-lboost_system -lboost_iostreams -lboost_date_time -lboost_regex
	-lboost_system -lboost_iostreams -lboost_date_time -lboost_regex -lssl
INC_PARM=$(foreach d, $(INC_DIR), -I$d)
LIB_PARM=$(foreach d, $(LIB_DIR), -L$d)
#================================================================================
+115 −70
Original line number Diff line number Diff line
#include <WorkerThread.h>
#include <FitsImporter.h>
#include <md5.h>

#include <cassert>
#include <fstream>
@@ -326,7 +327,7 @@ void WorkerThread::processFits(boost::filesystem::path& origPath,
{
    DEBUG_STREAM << "WorkerThread::processFits()" << endl;

	CCfits::PHDU& phdu = fitsFile_sp->pHDU();
    //	CCfits::PHDU& phdu = fitsFile_sp->pHDU();

	Destination::SP destination_sp = instrument_sp->getDestination();

@@ -337,27 +338,34 @@ void WorkerThread::processFits(boost::filesystem::path& origPath,
	//Start a transaction [RAII]
	soci::transaction transaction(*session_sp);

	//Search if file is already archived
	int duplicateMax = findDuplicateMax(session_sp, destination_sp, origPath);
	//Compute checksum
	std::string checksum = computeMd5(origPath);

	//In case of duplicate, add key IA2VERS with duplicateMax
	if(duplicateMax > 0)
	{
		phdu.addKey("IA2VERS", duplicateMax, "IA2 version for duplicate file");
		fitsFile_sp->flush();
	}
	//Search if file is already archived
	int duplicateMax = findDuplicateMax(session_sp, destination_sp, origPath, checksum);

	//Create metadata query
	std::string query = composeQuery(destination_sp, date, duplicateMax,
        origPath, fitsFile_sp, readHDUSet);
	//In case of duplicate, add key IA2VERS with duplicateMax [REMOVED]
//	if(duplicateMax > 0)
//	{
//		phdu.addKey("IA2VERS", duplicateMax, "IA2 version for duplicate file");
//		fitsFile_sp->flush();
//	}

	//Create destination path
	boost::filesystem::path destPath = createDestPath(origPath,
							  destination_sp, date, duplicateMax);


	//Copy file to archive storage
	copyFile(origPath, destPath);

	//Don't add the file to metadata if duplidateMax=-1 (i.e. if it already exists with the same checksum)
	if(duplicateMax!=-1)
	  {
	    //Create metadata query
	    std::string query = composeQuery(destination_sp, date, duplicateMax, checksum,
					     origPath, fitsFile_sp, readHDUSet);

	    try
	      {
		//Insert metadata into database
@@ -369,6 +377,7 @@ void WorkerThread::processFits(boost::filesystem::path& origPath,
		boost::filesystem::remove(destPath);
		throw(ex);
	      }
	  }
	
	//Commit Transaction
	transaction.commit();
@@ -378,23 +387,43 @@ void WorkerThread::processFits(boost::filesystem::path& origPath,
//	WorkerThread::findDuplicateMax()
//==============================================================================
int WorkerThread::findDuplicateMax(ConnectionManager::SessionSP session_sp,
    Destination::SP destination_sp, boost::filesystem::path& origPath)
	Destination::SP destination_sp, boost::filesystem::path& origPath, std::string checksum)
	throw(std::runtime_error)
{
    DEBUG_STREAM << "WorkerThread::findDuplicateMax()" << endl;

	std::stringstream query_checksum;
	query_checksum << "select max(file_version) from " << destination_sp->getSchema()
	      << "." << destination_sp->getTable() << " where file_name = \'"
	      << origPath.filename().string() << ".gz\' and checksum = \'"
	      << checksum << "\'" << std::endl;

	std::stringstream query;
	query << "select max(file_version) from " << destination_sp->getSchema()
	      << "." << destination_sp->getTable() << " where file_name like \'"
	      << origPath.filename().string() << ".gz\'" << std::endl;

    DEBUG_STREAM << "WorkerThread::findDuplicateMax() query " << query_checksum.str() << endl;
    DEBUG_STREAM << "WorkerThread::findDuplicateMax() query " << query.str() << endl;

	boost::optional<int> result;
	*session_sp << query.str(), soci::into(result);
	boost::optional<int> result_checksum;
	*session_sp << query_checksum.str(), soci::into(result_checksum);

	int duplicateMax = 0;

	if(result_checksum)
	  {
	    duplicateMax=-1;
	    std::string fileName( origPath.filename().string() );
	   
	    WARN_STREAM << "WorkerThread::findDuplicateMax() file " << fileName
			<< " was already ingested -> moved to notnew " << endl;
	  }
	else
	  {
	    boost::optional<int> result;
	    *session_sp << query.str(), soci::into(result);

	    //If result != null
	    if(result)
	      {
@@ -410,6 +439,7 @@ int WorkerThread::findDuplicateMax(ConnectionManager::SessionSP session_sp,
		WARN_STREAM << "WorkerThread::findDuplicateMax() file " << fileName
			    << " has " << duplicateMax << " duplicates " << endl;
	      }
	  }

	return duplicateMax;
}
@@ -418,7 +448,7 @@ int WorkerThread::findDuplicateMax(ConnectionManager::SessionSP session_sp,
//	WorkerThread::composeQuery()
//==============================================================================
std::string WorkerThread::composeQuery(Destination::SP destination_sp,
    boost::gregorian::date& date, int duplicateMax, boost::filesystem::path& origPath,
    boost::gregorian::date& date, int duplicateMax, std::string checksum, boost::filesystem::path& origPath,
    boost::shared_ptr<CCfits::FITS> fitsFile_sp, std::set<int>& readHDUSet)
	throw(CCfits::FitsException, std::runtime_error)
{
@@ -437,6 +467,10 @@ std::string WorkerThread::composeQuery(Destination::SP destination_sp,
	insertQuery << destination_sp->getTable() << ".storage_path";
	valuesQuery << "\'" << destination_sp->getStoragePath() << "\'";

	//Append checksum
	insertQuery << "," << destination_sp->getTable() << ".checksum";
	valuesQuery << ",\'" << checksum << "\'";

	//Append file path column to query
	insertQuery << "," << destination_sp->getTable() << ".file_path";
	int month = date.month();
@@ -791,12 +825,23 @@ boost::filesystem::path WorkerThread::createDestPath(boost::filesystem::path& or
    // + date path + duplicate max + file name
    boost::filesystem::path destPath(destination_sp->getStoragePath());
    
    //If duplicateMax is -1 use the today date
    //and add notnew to destPath
    if(duplicateMax == -1)
      {
	date = boost::gregorian::day_clock::local_day();
	destPath /= "/notnew";
      }
    
    //Create directory path, date and version part of destination path
    std::stringstream relPathStream;
    int month = date.month();
    relPathStream << "/" << date.year();
    relPathStream << "/" << std::setw(2) << std::setfill('0') << month ;
    relPathStream << "/" << std::setw(2) << std::setfill('0') << date.day();
    
    //If duplicateMax is -1 do not add getDirName and duplicateMax to destination path
    if(duplicateMax != -1)
      relPathStream << "/" << destination_sp->getDirName() << "/" << duplicateMax;
    boost::filesystem::path relPath( relPathStream.str() );
    
+2 −2
Original line number Diff line number Diff line
@@ -68,14 +68,14 @@ protected:
//	[Protected] Duplicate discovery method
//------------------------------------------------------------------------------
	virtual int findDuplicateMax(ConnectionManager::SessionSP,
		Destination::SP, boost::filesystem::path&)
    	     Destination::SP, boost::filesystem::path&, std::string checksum)
		throw(std::runtime_error);

//------------------------------------------------------------------------------
//	[Protected] Metadata archiving methods
//------------------------------------------------------------------------------
	virtual std::string composeQuery(Destination::SP, boost::gregorian::date&,
		int, boost::filesystem::path&, boost::shared_ptr<CCfits::FITS>,
        int, std::string, boost::filesystem::path&, boost::shared_ptr<CCfits::FITS>,
        std::set<int>&) throw(CCfits::FitsException, std::runtime_error);

    virtual void fillQueryFromFits(std::stringstream&, std::stringstream&,

src/md5.h

0 → 100644
+32 −0
Original line number Diff line number Diff line
#include <stdio.h>
#include <openssl/md5.h>

std::string computeMd5(boost::filesystem::path& filePath)
{
  unsigned char digest[MD5_DIGEST_LENGTH];

  const char *filename=filePath.string().c_str();
  int i;
  FILE *inFile = fopen (filename, "rb");
  MD5_CTX mdContext;
  int bytes;
  unsigned char data[1024];

  if (inFile == NULL) {
    printf ("%s can't be opened.\n", filename);
    return 0;
  }

  MD5_Init (&mdContext);
  while ((bytes = fread (data, 1, 1024, inFile)) != 0)
    MD5_Update (&mdContext, data, bytes);
  MD5_Final (digest,&mdContext);

  char mdString[33];

  for(i = 0; i < MD5_DIGEST_LENGTH; i++)
    sprintf(&mdString[i*2], "%02x", (unsigned int)digest[i]);

  fclose (inFile);
  return mdString;
}