Commit 20412939 authored by Giovanni La Mura's avatar Giovanni La Mura
Browse files

Merge branch 'fix_mpi_barriers' into 'master'

Fix mpi barriers

See merge request giacomo.mulas/np_tmcode!37
parents 3c6365e2 c93c88ec
Loading
Loading
Loading
Loading
+13 −184
Original line number Diff line number Diff line
@@ -386,11 +386,6 @@ void cluster(const string& config_file, const string& data_file, const string& o
	  // only threads different from 0 have to free local copies of variables and close local files
	  if (myompthread != 0) {
	    delete cid_2;
	    //fclose(output_2);
	    // p_output_2->write_to_disk(output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread));
	    // delete p_output_2;
	    // tppoanp_2->close();
	    // delete tppoanp_2;
	  }
#pragma omp barrier
	  {
@@ -418,43 +413,6 @@ void cluster(const string& config_file, const string& data_file, const string& o
	  vtppoanarray[0]->append_to_disk(output_path + "/c_TPPOAN");
	  delete vtppoanarray[0];
	  delete[] vtppoanarray;
	  // for (int ri = 1; ri < ompnumthreads; ri++) {
	    // string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri);
	    // string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
	    // logger->log(message, LOG_DEBG);
	    // FILE *partial_output = fopen(partial_file_name.c_str(), "r");
	    // // have a look at the getline() or fgets() C functions to read one line at a time, instead of char by char, it would simplify things
	    // char virtual_line[256];
	    // int index = 0;
	    // int c = fgetc(partial_output);
	    // while (c != EOF) {
	    //   virtual_line[index++] = c;
	    //   if (c == '\n') {
	    // 	virtual_line[index] = '\0';
	    // 	p_output->append_line(virtual_line);
	    // 	index = 0;
	    //   }
	    //   c = fgetc(partial_output);
	    // }
	    // fclose(partial_output);
	    // remove(partial_file_name.c_str());
	    // logger->log("done.\n", LOG_DEBG);
	  //   string partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri);
	  //   string message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
	  //   logger->log(message, LOG_DEBG);
	  //   fstream partial_tppoan;
	  //   partial_tppoan.open(partial_file_name.c_str(), ios::in | ios::binary);
	  //   partial_tppoan.seekg(0, ios::end);
	  //   long buffer_size = partial_tppoan.tellg();
	  //   char *binary_buffer = new char[buffer_size];
	  //   partial_tppoan.seekg(0, ios::beg);
	  //   partial_tppoan.read(binary_buffer, buffer_size);
	  //   // tppoan.write(binary_buffer, buffer_size);
	  //   partial_tppoan.close();
	  //   delete[] binary_buffer;
	  //   remove(partial_file_name.c_str());
	  //   logger->log("done.\n", LOG_DEBG);
	  // }
	}
	// here go the code to append the files written in MPI processes > 0 to the ones on MPI process 0
#ifdef MPI_VERSION
@@ -466,48 +424,9 @@ void cluster(const string& config_file, const string& data_file, const string& o
	    p_output->append_to_disk(output_path + "/c_OCLU");
	    delete p_output;
	    VirtualBinaryFile *vtppoanp = new VirtualBinaryFile(mpidata, rr);
	    vtppoanp->append_to_disk(output_path + "/c_TPPOAN_bis");
	    vtppoanp->append_to_disk(output_path + "/c_TPPOAN");
	    delete vtppoanp;
	    // // how many openmp threads did process rr use?
	    // int remotethreads;
	    // MPI_Recv(&remotethreads, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
	    // for (int ri=0; ri<remotethreads; ri++) {
	      //   // first get the ASCII local file
	      //   char *chunk_buffer;
	      //   int chunk_buffer_size = -1;
	      //   MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
	      //   while (chunk_buffer_size != 0) {
	      // 	char *chunk_buffer = new char[chunk_buffer_size];
	      // 	MPI_Recv(chunk_buffer, chunk_buffer_size, MPI_CHAR, rr, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
	      // 	// fputs(chunk_buffer, output);
	      // 	int index = 0, last_char = 0;
	      // 	char c;
	      // 	while (last_char < chunk_buffer_size) {
	      // 	  c = chunk_buffer[last_char++];
	      // 	  virtual_line[index++] = c;
	      // 	  if (c == '\n') {
	      // 	    virtual_line[index] = '\0';
	      // 	    index = 0;
	      // 	    p_output->append_line(virtual_line);
	      // 	  }
	      // 	}
	      // 	delete[] chunk_buffer;
	      // 	MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
	      //   }
	      //   // if (ri<remotethreads-1) fprintf(output, "\n");

	    //   // now get the binary local file
	    //   long buffer_size = 0;
	    //   // get the size of the buffer
	    //   MPI_Recv(&buffer_size, 1, MPI_LONG, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
	    //   // allocate the bufer
	    //   char *binary_buffer = new char[buffer_size];
	    //   // actually receive the buffer
	    //   MPI_Recv(binary_buffer, buffer_size, MPI_CHAR, rr, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
	    //   // we can write it to disk
	    //   // tppoan.write(binary_buffer, buffer_size);
	    //   delete[] binary_buffer;
	    // }
	    int test = MPI_Barrier(MPI_COMM_WORLD);
	  }
	}

@@ -515,14 +434,6 @@ void cluster(const string& config_file, const string& data_file, const string& o
#ifdef USE_NVTX
	nvtxRangePop();
#endif
	// tppoanp->close();
	// delete tppoanp;
      // } else { // In case TPPOAN could not be opened. Should never happen.
      // 	logger->err("\nERROR: failed to open TPPOAN file.\n");
      // }
      // fclose(output);
      // p_output->write_to_disk(output_path + "/c_OCLU");
      // delete p_output;
      // Clean memory
      delete cid;
      delete p_scattering_angles;
@@ -550,11 +461,6 @@ void cluster(const string& config_file, const string& data_file, const string& o
    ClusterIterationData *cid = new ClusterIterationData(mpidata);
    ScatteringAngles *p_scattering_angles = new ScatteringAngles(mpidata);
    // open separate files for other MPI processes
    // File *output = fopen((output_path + "/c_OCLU_mpi"+ to_string(mpidata->rank)).c_str(), "w");
    // fstream *tppoanp = new fstream;
    // fstream &tppoan = *tppoanp;
    // string tppoan_name = output_path + "/c_TPPOAN_mpi"+ to_string(mpidata->rank);
    // tppoan.open(tppoan_name.c_str(), ios::out | ios::binary);
    // Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled
    int ompnumthreads = 1;
    VirtualAsciiFile **p_outarray = NULL;
@@ -575,27 +481,19 @@ void cluster(const string& config_file, const string& data_file, const string& o
      }
      // To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number
      ClusterIterationData *cid_2 = NULL;
      //FILE *output_2 = NULL;
      VirtualAsciiFile *p_output_2 = NULL;
      VirtualBinaryFile *vtppoanp_2 = NULL;
      // fstream *tppoanp_2 = NULL;
      // for threads other than the 0, create distinct copies of all relevant data, while for thread 0 just define new references / pointers to the original ones
      if (myompthread == 0) {
	cid_2 = cid;
	// output_2 = output;
	// tppoanp_2 = tppoanp;
      } else {
	// this is not thread 0, so do create fresh copies of all local variables
	cid_2 = new ClusterIterationData(*cid);
      }
      // output_2 = fopen((output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), "w");
      p_output_2 = new VirtualAsciiFile();
      p_outarray[myompthread] = p_output_2;
      vtppoanp_2 = new VirtualBinaryFile();
      vtppoanarray[myompthread] = vtppoanp_2;
      // tppoanp_2 = new fstream;
      // tppoanp_2->open((output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), ios::out | ios::binary);
      // fstream &tppoan_2 = *tppoanp_2;
      // make sure all threads align here: I don't want the following loop to accidentally start for thread 0, possibly modifying some variables before they are copied by all other threads
#pragma omp barrier
      if (myompthread==0) logger->log("Syncing OpenMP threads and starting the loop on wavelengths\n");
@@ -610,11 +508,6 @@ void cluster(const string& config_file, const string& data_file, const string& o
      if (myompthread != 0) {
	delete cid_2;
      }
      // fclose(output_2);
      // p_output_2->write_to_disk(output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread));
      // delete p_output_2;
      // tppoanp_2->close();
      // delete tppoanp_2;
#pragma omp barrier
      {
	string message = "INFO: Closing thread-local output files of thread " + to_string(myompthread) + " and syncing threads.\n";
@@ -629,81 +522,17 @@ void cluster(const string& config_file, const string& data_file, const string& o
	vtppoanarray[0]->append(*(vtppoanarray[ti]));
	delete vtppoanarray[ti];
      }
      for (int rr = 1; rr < mpidata->nprocs; rr++) {
	if (rr == mpidata->rank) {
	  p_outarray[0]->mpisend(mpidata);
	  delete p_outarray[0];
	  delete[] p_outarray;
	  vtppoanarray[0]->mpisend(mpidata);
	  delete vtppoanarray[0];
	  delete[] vtppoanarray;
      // // tell MPI process 0 how many threads we have on this process (not necessarily the same across all processes)
      // MPI_Send(&ompnumthreads, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
      // // reopen local files, send them all to MPI process 0
      // for (int ri = 0; ri < ompnumthreads; ri++) {
	// 	string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri);
	// 	string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
	// 	logger->log(message, LOG_DEBG);
	// 	fstream partial_output;
	// 	partial_output.open(partial_file_name.c_str(), ios::in);
	// 	partial_output.seekg(0, ios::end);
	// 	const long partial_output_size = partial_output.tellg();
	// 	partial_output.close();
	// 	partial_output.open(partial_file_name.c_str(), ios::in);
	// 	int chunk_buffer_size = 25165824; // Length of char array  with 24Mb size
	// 	char *chunk_buffer = new char[chunk_buffer_size]();
	// 	int full_chunks = (int)(partial_output_size / chunk_buffer_size);
	// 	for (int fi = 0; fi < full_chunks; fi++) {
	// 	  partial_output.read(chunk_buffer, chunk_buffer_size);
	// 	  // If EOF is reached, do not send EOF character.
	// 	  long ptr_position = partial_output.tellg();
	// 	  if (ptr_position == partial_output_size) {
	// 	    chunk_buffer[chunk_buffer_size - 1] = '\0';
	// 	  }
	// 	  // Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not)
	// 	  MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
	// 	  // Actually send the file contents to Node-0
	// 	  MPI_Send(chunk_buffer, chunk_buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
	// 	}
	// 	long ptr_position = partial_output.tellg();
	// 	if (ptr_position < partial_output_size) {
	// 	  // Send the last partial buffer
	// 	  chunk_buffer_size = partial_output_size - ptr_position;
	// 	  delete[] chunk_buffer;
	// 	  chunk_buffer = new char[chunk_buffer_size];
	// 	  partial_output.read(chunk_buffer, chunk_buffer_size);
	// 	  // chunk_buffer[chunk_buffer_size - 1] = '\0';
	// 	  // Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not)
	// 	  MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
	// 	  // Actually send the file contents to Node-0
	// 	  MPI_Send(chunk_buffer, chunk_buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
	// 	}
	// 	// Send a size 0 flag to inform Node-0 that the transmission is complete
	// 	chunk_buffer_size = 0;
	// 	MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
	// 	partial_output.close();
	// 	delete[] chunk_buffer;
	// 	remove(partial_file_name.c_str());
	// 	logger->log("done.\n", LOG_DEBG);
	
      // 	string partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri);
      // 	string message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
      // 	logger->log(message, LOG_DEBG);
      // 	fstream partial_tppoan;
      // 	partial_tppoan.open(partial_file_name.c_str(), ios::in | ios::binary);
      // 	partial_tppoan.seekg(0, ios::end);
      // 	long buffer_size = partial_tppoan.tellg();
      // 	char *binary_buffer = new char[buffer_size];
      // 	partial_tppoan.seekg(0, ios::beg);
      // 	partial_tppoan.read(binary_buffer, buffer_size);
      // 	// tell MPI process 0 how large is the buffer
      // 	MPI_Send(&buffer_size, 1, MPI_LONG, 0, 1, MPI_COMM_WORLD);
      // 	// actually send the buffer
      // 	MPI_Send(binary_buffer, buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
      // 	// // tppoan.write(binary_buffer, buffer_size);
      // 	partial_tppoan.close();
      // 	delete[] binary_buffer;
      // 	remove(partial_file_name.c_str());
      // 	logger->log("done.\n", LOG_DEBG);
      // }
	}
	int test = MPI_Barrier(MPI_COMM_WORLD);
      }
    }
    // Clean memory
    delete cid;
+61 −47
Original line number Diff line number Diff line
@@ -277,9 +277,10 @@ VirtualAsciiFile::VirtualAsciiFile(const mixMPI *mpidata, int rr) {
VirtualAsciiFile::~VirtualAsciiFile() {
  // is it necessary to pop them out one by one? isn't there the dedicated method of std::vector to clean the vector?
  // besides, shouldn't this be done anyway by the destructor of std:vector?
  while (!_file_lines->size() > 0) {
    _file_lines->pop_back();
  }
  _file_lines->clear();
  // while (_file_lines->size() > 0) {
  //   _file_lines->pop_back();
  // }
  if (_file_lines != NULL) delete _file_lines;
}

@@ -300,6 +301,7 @@ void VirtualAsciiFile::append_line(const string& line) {
int VirtualAsciiFile::append_to_disk(const std::string& file_name) {
  // dump to disk the contents of the virtualasciifile, appending at the end of the given file_name
  int result = 0;
  if (_file_lines->size() > 0) {
    fstream output_file;
    output_file.open(file_name, ios::app);
    if (output_file.is_open()) {
@@ -310,6 +312,7 @@ int VirtualAsciiFile::append_to_disk(const std::string& file_name) {
    } else {
      result = 1;
    }
  }
  return result;
}

@@ -334,6 +337,7 @@ int VirtualAsciiFile::insert(int32_t position, VirtualAsciiFile& rhs, int32_t st
int VirtualAsciiFile::write_to_disk(const std::string& file_name) {
  // dump to disk the contents of the virtualasciifile, replacing the given file_name
  int result = 0;
  if (_file_lines->size() > 0) {
    fstream output_file;
    output_file.open(file_name, ios::out);
    if (output_file.is_open()) {
@@ -344,6 +348,7 @@ int VirtualAsciiFile::write_to_disk(const std::string& file_name) {
    } else {
      result = 1;
    }
  }
  return result;
}

@@ -354,6 +359,7 @@ void VirtualAsciiFile::mpisend(const mixMPI *mpidata) {
  int32_t num_lines =  _file_lines->size();
  MPI_Send(&num_lines, 1, MPI_INT32_T, 0, 10, MPI_COMM_WORLD);
  // now loop over data to send
  if (num_lines>0) {
    int32_t mysize;
    for (vector<string>::iterator it = _file_lines->begin(); it != _file_lines->end(); ++it) {
      // send the length of each string
@@ -363,6 +369,7 @@ void VirtualAsciiFile::mpisend(const mixMPI *mpidata) {
      MPI_Send(it->c_str(), mysize+1, MPI_CHAR, 0, 10, MPI_COMM_WORLD);
    }
  }
}
#endif

/* >>> End of VirtualAsciiFile class implementation <<< */
@@ -483,9 +490,10 @@ VirtualBinaryFile::~VirtualBinaryFile() {
  // for (vector<VirtualBinaryLine>::iterator it = _file_lines->begin(); it != _file_lines->end(); ++it) {
  //   delete it;
  // }
  while (!_file_lines->size() > 0) {
    _file_lines->pop_back();
  }
  _file_lines->clear();
  // while (_file_lines->size() > 0) {
  //   _file_lines->pop_back();
  // }
  if (_file_lines != NULL) delete _file_lines;
}

@@ -506,6 +514,7 @@ void VirtualBinaryFile::append_line(const VirtualBinaryLine& line) {
int VirtualBinaryFile::append_to_disk(const std::string& file_name) {
  // dump to disk the contents of the virtualasciifile, appending at the end of the given file_name
  int result = 0;
  if (_file_lines->size() > 0) {
    fstream output_file;
    output_file.open(file_name, ios::app | ios::binary);
    if (output_file.is_open()) {
@@ -516,6 +525,7 @@ int VirtualBinaryFile::append_to_disk(const std::string& file_name) {
    } else {
      result = 1;
    }
  }
  return result;
}

@@ -541,6 +551,7 @@ int VirtualBinaryFile::append_to_disk(const std::string& file_name) {
int VirtualBinaryFile::write_to_disk(const std::string& file_name) {
  // dump to disk the contents of the virtualasciifile, replacing the given file_name
  int result = 0;
  if (_file_lines->size() > 0) {
    fstream output_file;
    output_file.open(file_name, ios::out | ios::binary);
    if (output_file.is_open()) {
@@ -551,6 +562,7 @@ int VirtualBinaryFile::write_to_disk(const std::string& file_name) {
    } else {
      result = 1;
    }
  }
  return result;
}

@@ -561,10 +573,12 @@ void VirtualBinaryFile::mpisend(const mixMPI *mpidata) {
  int32_t num_lines =  _file_lines->size();
  MPI_Send(&num_lines, 1, MPI_INT32_T, 0, 10, MPI_COMM_WORLD);
  // now loop over data to send
  if (num_lines>0) {
    for (vector<VirtualBinaryLine>::iterator it = _file_lines->begin(); it != _file_lines->end(); ++it) {
      it->mpisend(mpidata);
    }
  }
}
#endif

/* >>> End of VirtualBinaryFile class implementation <<< */