Commit c4f2807a authored by Giovanni La Mura's avatar Giovanni La Mura
Browse files

Rebase cluster.cpp to MPI version

parent 3bda3f5f
Loading
Loading
Loading
Loading
+412 −220
Original line number Diff line number Diff line
/* Copyright 2004 INAF - Osservatorio Astronomico di Cagliari
/* Copyright 2024 INAF - Osservatorio Astronomico di Cagliari

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
@@ -25,6 +25,11 @@
#ifdef _OPENMP
#include <omp.h>
#endif
#ifdef USE_MPI
#ifndef MPI_VERSION
#include <mpi.h>
#endif
#endif

#ifndef INCLUDE_TYPES_H_
#include "../include/types.h"
@@ -74,14 +79,16 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf
 *  \param data_file: `string` Name of the input data file.
 *  \param output_path: `string` Directory to write the output files in.
 */
void cluster(const string& config_file, const string& data_file, const string& output_path) {
void cluster(const string& config_file, const string& data_file, const string& output_path, const mixMPI *mpidata) {
  chrono::time_point<chrono::high_resolution_clock> t_start = chrono::high_resolution_clock::now();
  chrono::duration<double> elapsed;
  string message;
  string timing_name = output_path + "/c_timing.log";
  string timing_name = output_path + "/c_timing_mpi"+ to_string(mpidata->rank) +".log";
  FILE *timing_file = fopen(timing_name.c_str(), "w");
  Logger *time_logger = new Logger(LOG_DEBG, timing_file);
  Logger *logger = new Logger(LOG_INFO);
  // the following only happens on MPI process 0
  if (mpidata->rank == 0) {
    logger->log("INFO: making legacy configuration...", LOG_INFO);
    ScattererConfiguration *sconf = NULL;
    try {
@@ -113,7 +120,7 @@ void cluster(const string& config_file, const string& data_file, const string& o
      ScatteringAngles *p_scattering_angles = new ScatteringAngles(gconf);
      double wp = sconf->wp;
      FILE *output = fopen((output_path + "/c_OCLU").c_str(), "w");
    ClusterIterationData *cid = new ClusterIterationData(gconf, sconf);
      ClusterIterationData *cid = new ClusterIterationData(gconf, sconf, mpidata);
      const int ndi = cid->c4->nsph * cid->c4->nlim;
      np_int ndit = 2 * ndi;
      logger->log("INFO: Size of matrices to invert: " + to_string((int64_t)ndit) + " x " + to_string((int64_t)ndit) +".\n");
@@ -162,9 +169,6 @@ void cluster(const string& config_file, const string& data_file, const string& o
      if (tppoan.is_open()) {
#ifdef USE_LAPACK
	logger->log("INFO: using LAPACK calls.\n", LOG_INFO);
#elif defined USE_MAGMA
      logger->log("INFO: using MAGMA calls.\n", LOG_INFO);
      magma_init();
#else
	logger->log("INFO: using fall-back lucin() calls.\n", LOG_INFO);
#endif
@@ -195,7 +199,7 @@ void cluster(const string& config_file, const string& data_file, const string& o
	int jer = cluster_jxi488_cycle(jxi488, sconf, gconf, p_scattering_angles, cid, output, output_path, tppoan);
	chrono::time_point<chrono::high_resolution_clock> end_iter_1 = chrono::high_resolution_clock::now();
	elapsed = start_iter_1 - t_start;
      message = "INFO: Calculation setup took " + to_string(elapsed.count()) + "s.\n";
	string message = "INFO: Calculation setup took " + to_string(elapsed.count()) + "s.\n";
	logger->log(message);
	time_logger->log(message);
	elapsed = end_iter_1 - start_iter_1;
@@ -203,6 +207,15 @@ void cluster(const string& config_file, const string& data_file, const string& o
	logger->log(message);
	time_logger->log(message);

	// here go the calls that send data to be duplicated on other MPI processes from process 0 to others, using MPI broadcasts, but only if MPI is actually used
#ifdef MPI_VERSION
	if (mpidata->mpirunning) {
	  gconf->mpibcast(mpidata);
	  sconf->mpibcast(mpidata);	    
	  cid->mpibcast(mpidata);
	  p_scattering_angles->mpibcast(mpidata);
	}	
#endif
	// Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled
	int ompnumthreads = 1;

@@ -227,9 +240,9 @@ void cluster(const string& config_file, const string& data_file, const string& o
	  } else {
	    // this is not thread 0, so do create fresh copies of all local variables
	    cid_2 = new ClusterIterationData(*cid);
	  output_2 = fopen((output_path + "/c_OCLU_" + to_string(myompthread)).c_str(), "w");
	    output_2 = fopen((output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), "w");
	    tppoanp_2 = new fstream;
	  tppoanp_2->open((output_path + "/c_TPPOAN_" + to_string(myompthread)).c_str(), ios::out | ios::binary);
	    tppoanp_2->open((output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), ios::out | ios::binary);
	  }
	  fstream &tppoan_2 = *tppoanp_2;
	  // make sure all threads align here: I don't want the following loop to accidentally start for thread 0, possibly modifying some variables before they are copied by all other threads
@@ -237,7 +250,7 @@ void cluster(const string& config_file, const string& data_file, const string& o
	  if (myompthread==0) logger->log("Syncing OpenMP threads and starting the loop on wavelengths\n");
	  // ok, now I can actually start the parallel calculations
#pragma omp for
	for (jxi488 = 2; jxi488 <= nxi; jxi488++) {
	  for (jxi488 = cid_2->firstxi; jxi488 <= cid_2->lastxi; jxi488++) {
	    int jer = cluster_jxi488_cycle(jxi488, sconf, gconf, p_scattering_angles, cid_2, output_2, output_path, *tppoanp_2);
	  }

@@ -249,21 +262,22 @@ void cluster(const string& config_file, const string& data_file, const string& o
	    tppoanp_2->close();
	    delete tppoanp_2;
	  }
#pragma omp barrier
	  {
	    string message = "INFO: Closing thread-local output files of thread " + to_string(myompthread) + " and syncing threads.\n";
	    logger->log(message);
	  }
	} // closes pragma omp parallel
#ifdef _OPENMP
#pragma omp barrier
	{
	message = "INFO: Thread-local output files closed and threads synchronized.\n";
	logger->log(message);
	  // thread 0 already wrote on global files, skip it and take care of appending the others
	chrono::time_point<chrono::high_resolution_clock> t_output_start = chrono::high_resolution_clock::now();
	  for (int ri = 1; ri < ompnumthreads; ri++) {
	  // Giovanni, please add here in this loop code to reopen the temporary files, reread them and append them respectively to the global output and tppoan, before closing them
	  string partial_file_name = output_path + "/c_OCLU_" + to_string(ri);
	  message = "Copying ASCII output of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
	    string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri);
	    string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
	    logger->log(message, LOG_DEBG);
	    FILE *partial_output = fopen(partial_file_name.c_str(), "r");
	  char c = fgetc(partial_output);
	    int c = fgetc(partial_output);
	    while (c != EOF) {
	      fputc(c, output);
	      c = fgetc(partial_output);
@@ -271,8 +285,8 @@ void cluster(const string& config_file, const string& data_file, const string& o
	    fclose(partial_output);
	    remove(partial_file_name.c_str());
	    logger->log("done.\n", LOG_DEBG);
	  partial_file_name = output_path + "/c_TPPOAN_" + to_string(ri);
	  message = "Copying binary output of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
	    partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri);
	    message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
	    logger->log(message, LOG_DEBG);
	    fstream partial_tppoan;
	    partial_tppoan.open(partial_file_name.c_str(), ios::in | ios::binary);
@@ -287,21 +301,51 @@ void cluster(const string& config_file, const string& data_file, const string& o
	    remove(partial_file_name.c_str());
	    logger->log("done.\n", LOG_DEBG);
	  }
	chrono::time_point<chrono::high_resolution_clock> t_output_end = chrono::high_resolution_clock::now();
	elapsed = t_output_end - t_output_start;
	message = "INFO: Recombining output files took " + to_string(elapsed.count()) + "s.\n";
	logger->log(message);
	time_logger->log(message);
	}
#endif
	// here go the code to append the files written in MPI processes > 0 to the ones on MPI process 0
#ifdef MPI_VERSION
	if (mpidata->mpirunning) {
	  // only go through this if MPI has been actually used
	  for (int rr=1; rr<mpidata->nprocs; rr++) {
	    // get the data from process rr
	    // how many openmp threads did process rr use?
	    int remotethreads;
	    MPI_Recv(&remotethreads, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
	    for (int ri=0; ri<remotethreads; ri++) {
	      // first get the ASCII local file
	      char *chunk_buffer;
	      int chunk_buffer_size = -1;
	      MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
	      while (chunk_buffer_size != 0) {
		char *chunk_buffer = new char[chunk_buffer_size];
		MPI_Recv(chunk_buffer, chunk_buffer_size, MPI_CHAR, rr, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
		fputs(chunk_buffer, output);
		delete[] chunk_buffer;
		MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
	      }
	      fprintf(output, "\n");

	      // now get the binary local file
	      long buffer_size = 0;
	      // get the size of the buffer
	      MPI_Recv(&buffer_size, 1, MPI_LONG, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
	      // allocate the bufer
	      char *binary_buffer = new char[buffer_size];
	      // actually receive the buffer
	      MPI_Recv(binary_buffer, buffer_size, MPI_CHAR, rr, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
	      // we can write it to disk
	      tppoan.write(binary_buffer, buffer_size);
	      delete[] binary_buffer;
	    }
	  }
	}
#endif
	tppoanp->close();
	delete tppoanp;
#ifdef USE_MAGMA
      magma_finalize();
#endif
      } else { // In case TPPOAN could not be opened. Should never happen.
	logger->err("\nERROR: failed to open TPPOAN file.\n");
    } // closes if (tppoan.is_open) ... else ...
      }
      fclose(output);
      // Clean memory
      delete cid;
@@ -315,10 +359,158 @@ void cluster(const string& config_file, const string& data_file, const string& o
    delete gconf;
    chrono::time_point<chrono::high_resolution_clock> t_end = chrono::high_resolution_clock::now();
    elapsed = t_end - t_start;
  message = "INFO: Calculation lasted " + to_string(elapsed.count()) + "s.\n";
    string message = "INFO: Calculation lasted " + to_string(elapsed.count()) + "s.\n";
    logger->log(message);
  time_logger->log(message);
    logger->log("Finished: output written to " + output_path + "/c_OCLU\n");
    time_logger->log(message);
  }

#ifdef MPI_VERSION
  else {
    // here go the code for MPI processes other than 0
    // copy gconf, sconf, cid and p_scattering_angles from MPI process 0
    GeometryConfiguration *gconf = new GeometryConfiguration(mpidata);
    ScattererConfiguration *sconf = new ScattererConfiguration(mpidata);
    ClusterIterationData *cid = new ClusterIterationData(mpidata);
    ScatteringAngles *p_scattering_angles = new ScatteringAngles(mpidata);
    // open separate files for other MPI processes
    // File *output = fopen((output_path + "/c_OCLU_mpi"+ to_string(mpidata->rank)).c_str(), "w");
    // fstream *tppoanp = new fstream;
    // fstream &tppoan = *tppoanp;
    // string tppoan_name = output_path + "/c_TPPOAN_mpi"+ to_string(mpidata->rank);
    // tppoan.open(tppoan_name.c_str(), ios::out | ios::binary);
    // Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled
    int ompnumthreads = 1;

#pragma omp parallel
    {
      // Create and initialise this variable here, so that if OpenMP is enabled it is local to the thread, and if OpenMP is not enabled it has a well-defiled value anyway
      int myompthread = 0;
#ifdef _OPENMP
      // If OpenMP is enabled, give actual values to myompthread and ompnumthreads, and open thread-local output files
      myompthread = omp_get_thread_num();
      if (myompthread == 0) ompnumthreads = omp_get_num_threads();
#endif
      // To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number
      ClusterIterationData *cid_2 = NULL;
      FILE *output_2 = NULL;
      fstream *tppoanp_2 = NULL;
      // for threads other than the 0, create distinct copies of all relevant data, while for thread 0 just define new references / pointers to the original ones
      if (myompthread == 0) {
	cid_2 = cid;
	// output_2 = output;
	// tppoanp_2 = tppoanp;
      } else {
	// this is not thread 0, so do create fresh copies of all local variables
	cid_2 = new ClusterIterationData(*cid);
      }
      output_2 = fopen((output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), "w");
      tppoanp_2 = new fstream;
      tppoanp_2->open((output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), ios::out | ios::binary);
      fstream &tppoan_2 = *tppoanp_2;
      // make sure all threads align here: I don't want the following loop to accidentally start for thread 0, possibly modifying some variables before they are copied by all other threads
#pragma omp barrier
      if (myompthread==0) logger->log("Syncing OpenMP threads and starting the loop on wavelengths\n");
      // ok, now I can actually start the parallel calculations
#pragma omp for
      for (int jxi488 = cid_2->firstxi; jxi488 <= cid_2->lastxi; jxi488++) {
	int jer = cluster_jxi488_cycle(jxi488, sconf, gconf, p_scattering_angles, cid_2, output_2, output_path, *tppoanp_2);
      }

#pragma omp barrier
      // only threads different from 0 have to free local copies of variables
      if (myompthread != 0) {
	delete cid_2;
      }
      fclose(output_2);
      tppoanp_2->close();
      delete tppoanp_2;
#pragma omp barrier
      {
	string message = "INFO: Closing thread-local output files of thread " + to_string(myompthread) + " and syncing threads.\n";
	logger->log(message);
      }
    } // closes pragma omp parallel
#pragma omp barrier
    {
      // tell MPI process 0 how many threads we have on this process (not necessarily the same across all processes)
      MPI_Send(&ompnumthreads, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
      // reopen local files, send them all to MPI process 0
      for (int ri = 0; ri < ompnumthreads; ri++) {
	string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri);
	string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
	logger->log(message, LOG_DEBG);
	fstream partial_output;
	partial_output.open(partial_file_name.c_str(), ios::in | ios::binary);
	partial_output.seekg(0, ios::end);
	const long partial_output_size = partial_output.tellg();
	partial_output.close();
	partial_output.open(partial_file_name.c_str(), ios::in | ios::binary);
	int chunk_buffer_size = 25165824; // Length of char array  with 24Mb size
	char *chunk_buffer = new char[chunk_buffer_size]();
	int full_chunks = (int)(partial_output_size / chunk_buffer_size);
	for (int fi = 0; fi < full_chunks; fi++) {
	  partial_output.read(chunk_buffer, chunk_buffer_size);
	  // If EOF is reached, do not send EOF character.
	  long ptr_position = partial_output.tellg();
	  if (ptr_position == partial_output_size) {
	    chunk_buffer[chunk_buffer_size - 1] = '\0';
	  }
	  // Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not)
	  MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
	  // Actually send the file contents to Node-0
	  MPI_Send(chunk_buffer, chunk_buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
	}
	long ptr_position = partial_output.tellg();
	if (ptr_position < partial_output_size) {
	  // Send the last partial buffer
	  chunk_buffer_size = partial_output_size - ptr_position;
	  delete[] chunk_buffer;
	  chunk_buffer = new char[chunk_buffer_size];
	  partial_output.read(chunk_buffer, chunk_buffer_size);
	  chunk_buffer[chunk_buffer_size - 1] = '\0';
	  // Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not)
	  MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
	  // Actually send the file contents to Node-0
	  MPI_Send(chunk_buffer, chunk_buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
	}
	// Send a size 0 flag to inform Node-0 that the transmission is complete
	chunk_buffer_size = 0;
	MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
	partial_output.close();
	delete[] chunk_buffer;
	remove(partial_file_name.c_str());
	logger->log("done.\n", LOG_DEBG);
	
	partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri);
	message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
	logger->log(message, LOG_DEBG);
	fstream partial_tppoan;
	partial_tppoan.open(partial_file_name.c_str(), ios::in | ios::binary);
	partial_tppoan.seekg(0, ios::end);
	long buffer_size = partial_tppoan.tellg();
	char *binary_buffer = new char[buffer_size];
	partial_tppoan.seekg(0, ios::beg);
	partial_tppoan.read(binary_buffer, buffer_size);
	// tell MPI process 0 how large is the buffer
	MPI_Send(&buffer_size, 1, MPI_LONG, 0, 1, MPI_COMM_WORLD);
	// actually send the buffer
	MPI_Send(binary_buffer, buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
	// tppoan.write(binary_buffer, buffer_size);
	partial_tppoan.close();
	delete[] binary_buffer;
	remove(partial_file_name.c_str());
	logger->log("done.\n", LOG_DEBG);
      }
    }
    // Clean memory
    delete cid;
    delete p_scattering_angles;
    delete sconf;
    delete gconf;

  }
#endif
  fclose(timing_file);
  delete time_logger;
  delete logger;
@@ -327,8 +519,8 @@ void cluster(const string& config_file, const string& data_file, const string& o
int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConfiguration *gconf, ScatteringAngles *sa, ClusterIterationData *cid, FILE *output, const string& output_path, fstream& tppoan)
{
  int nxi = sconf->number_of_scales;
  Logger *logger = new Logger(LOG_INFO);
  string message = "INFO: running scale iteration " + to_string(jxi488) + " of " + to_string(nxi) + ".\n";
  Logger *logger = new Logger(LOG_INFO);
  logger->log(message);
  chrono::duration<double> elapsed;
  chrono::time_point<chrono::high_resolution_clock> interval_start, interval_end;
@@ -369,7 +561,6 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf
  hjv(exri, vkarg, jer, lcalc, cid->arg, cid->c1, cid->c1ao, cid->c4);
  if (jer != 0) {
    fprintf(output, "  STOP IN HJV\n");
    delete logger;
    return jer;
    // break; // rewrite this to go to the end of the function, to free locally allocated variables and return jer
  }
@@ -399,13 +590,11 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf
	  );
      if (jer != 0) {
	fprintf(output, "  STOP IN DME\n");
	delete logger;
	return jer;
	//break;
      }
    }
    if (jer != 0) {
      delete logger;
      return jer;
      //break;
    }
@@ -987,7 +1176,10 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf
  elapsed = interval_end - interval_start;
  message = "INFO: angle loop for scale " + to_string(jxi488) + " took " + to_string(elapsed.count()) + "s.\n";
  logger->log(message);
  
  logger->log("INFO: finished scale iteration " + to_string(jxi488) + " of " + to_string(nxi) + ".\n");

  delete logger;

  return jer;
}