Commit 5835143e authored by Luca Tornatore's avatar Luca Tornatore
Browse files

update on reduce: (i) added the mapping from global_rank to ranks in myHOST...

update on reduce: (i) added the mapping from global_rank to ranks in myHOST (ii) added a ctrl window to synchronize reduce operations
parent 19c1352f
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -14,7 +14,7 @@ struct fileData data;
char filename[1000], buf[30], num_buf[30];
char datapath[900];
int xaxis, yaxis;
int rank;
int global_rank;
int size;
long nsectors;
long startrow;
+1 −1
Original line number Diff line number Diff line
@@ -118,7 +118,7 @@ extern struct fileData
extern char filename[1000], buf[30], num_buf[30];
extern char datapath[900];
extern int xaxis, yaxis;
extern int rank;
extern int global_rank;
extern int size;
extern long nsectors;
extern long startrow;
+6 −6
Original line number Diff line number Diff line
@@ -6,7 +6,7 @@ void fftw_data(){

    #ifdef USE_FFTW
        // FFT transform the data (using distributed FFTW)
        if(rank == 0)printf("PERFORMING FFT\n");
        if(global_rank == 0)printf("PERFORMING FFT\n");
        clock_gettime(CLOCK_MONOTONIC, &begin);
        start = clock();
        fftw_plan plan;
@@ -84,7 +84,7 @@ void write_fftw_data(){
        MPI_Win_create(gridss, size_of_grid*sizeof(double), sizeof(double), MPI_INFO_NULL, MPI_COMM_WORLD, &writewin);
        MPI_Win_fence(0,writewin);
        #endif
        if (rank == 0)
        if (global_rank == 0)
        {
          printf("WRITING FFT TRANSFORMED DATA\n");
          file.pFilereal = fopen (out.fftfile2,"wb");
@@ -157,7 +157,7 @@ void write_fftw_data(){

     clock_gettime(CLOCK_MONOTONIC, &begin);
     start = clock();
     if(rank == 0)printf("PHASE CORRECTION\n");
     if(global_rank == 0)printf("PHASE CORRECTION\n");
     double* image_real = (double*) calloc(xaxis*yaxis,sizeof(double));
     double* image_imag = (double*) calloc(xaxis*yaxis,sizeof(double));

@@ -171,7 +171,7 @@ void write_fftw_data(){

     #ifdef WRITE_IMAGE

        if(rank == 0)
        if(global_rank == 0)
        {
            file.pFilereal = fopen (out.fftfile2,"wb");
            file.pFileimg = fopen (out.fftfile3,"wb");
@@ -181,13 +181,13 @@ void write_fftw_data(){
        #ifdef USE_MPI
            MPI_Barrier(MPI_COMM_WORLD);
        #endif
        if(rank == 0)printf("WRITING IMAGE\n");
        if(global_rank == 0)printf("WRITING IMAGE\n");
        for (int isector=0; isector<size; isector++)
        {
            #ifdef USE_MPI
                MPI_Barrier(MPI_COMM_WORLD);
            #endif
            if(isector == rank)
            if(isector == global_rank)
            {
               printf("%d writing\n",isector);
               file.pFilereal = fopen (out.fftfile2,"ab");
+88 −34
Original line number Diff line number Diff line
@@ -6,7 +6,7 @@

void gridding(){

    if(rank == 0)printf("GRIDDING DATA\n");
    if(global_rank == 0)printf("GRIDDING DATA\n");

    // Create histograms and linked lists
    
@@ -71,21 +71,8 @@ void initialize_array(){
    }
     
  
   #ifdef PIPPO
        long iiii = 0;
        for (int j=0; j<nsectors; j++)
        {
                iiii = 0;
                for(long iphi = histo_send[j]-1; iphi>=0; iphi--)
                {
                      printf("%d %d %ld %ld %ld\n",rank,j,iiii,histo_send[j],sectorarray[j][iphi]);
                      iiii++;
                }
        }
   #endif

    #ifdef VERBOSE
        for (int iii=0; iii<nsectors+1; iii++)printf("HISTO %d %d %ld\n",rank, iii, histo_send[iii]);
        for (int iii=0; iii<nsectors+1; iii++)printf("HISTO %d %d %ld\n", global_rank, iii, histo_send[iii]);
    #endif
}

@@ -253,20 +240,56 @@ void gridding_data(){
      // MPI_Accumulate(gridss,size_of_grid,MPI_DOUBLE,target_rank,0,size_of_grid,MPI_DOUBLE,MPI_SUM,slabwin);
      // MPI_Win_unlock(target_rank,slabwin);

       // for every task, gridss coincides with the 
       // that can be avoided if shared window coincides with gridss
       memcpy(Me.win.ptr+isector*sizeof(gridss), gridss, sizeof(gridss));

       reduce( isector );      // here the reduce is performed within every host 
       
       reduce( isector, target_rank );  // here the reduce is performed within every host 


                                  // here thre reduce is performed among hosts
       MPI_Barrier(MPI_COMM_WORLD);
       if((Me.MAXl > myHOST) && (Me.Rank[HOSTS] != -1))
       
          MPI_Reduce(grid, grid, size_of_grid, MPI_DOUBLE,MPI_SUM,target_rank,*Me.COMM[HOSTS]);
       int Im_in_the_new_communicator = MPI_UNDEFINED;
       if(global_rank == target_rank)
	 Im_in_the_new_communicator = 1;
       else
	 if( Me.Rank[HOSTS] == 0 )
	   {
	     if( Me.Ranks_to_host[ target_rank ] != Me.myhost )
	       Im_in_the_new_communicator = 1;
	   }

       // that can be avoided if shared window coincides with gridss
       MPI_Comm Sector_Comm;
       MPI_Comm_split( COMM[WORLD], Im_in_the_new_communicator, global_rank, &Sector_Comm);

       if( Sector_Comm != MPI_COMM_NULL )
	 {
	   int sector_size;
	   int sector_rank = 0;
	   int sector_target;
	   
	   MPI_Comm_size( Sector_Comm, &sector_size);
	   MPI_Comm_rank( Sector_Comm, &sector_rank);
	   if ( global_rank == target_rank)
	     {
	       MPI_Send( &sector_rank, 1, MPI_INT, 0, 0, Sector_Comm);
	       memcpy(grid, Me.swins[Me.Rank[myHOST]].ptr+isector*sizeof(gridss), sizeof(grid));    
	     }
	   
	   if( sector_rank == 0 )
	     {
	       MPI_Status status;
	       MPI_Recv( &sector_target, 1, MPI_INT, MPI_ANY_SOURCE, 0, Sector_Comm, &status);
	     }
		       
	   MPI_Bcast( &sector_target, 1, MPI_INT, 0, Sector_Comm );

	   MPI_Reduce(grid, grid, size_of_grid, MPI_DOUBLE,MPI_SUM,sector_target, Sector_Comm);

	   MPI_Comm_free( &Sector_Comm );
	 }

       
       
@@ -321,7 +344,7 @@ void write_grided_data()

   #ifdef WRITE_DATA
     // Write results
     if (rank == 0)
     if (global_rank == 0)
     {
        printf("WRITING GRIDDED DATA\n");
        file.pFilereal = fopen (out.outfile2,"wb");
@@ -399,23 +422,54 @@ void write_grided_data()
}


 void reduce( int sector )
#define NSLEEP( T ) {struct timespec tsleep={0, (T)}; nanosleep(&tsleep, NULL); }

void reduce( int sector, int target_rank )
 {   
   
   int local_rank = Me.Rank[myHOST];

   if( Me.Ranks_to_host[ target_rank ] == Me.myhost )
     {

       int r = 0;
       while( Me.Ranks_to_myhost[r] != target_rank )
	 r++;
       
       if( r > 0 )
	 {       
	   if( local_rank == 0 )
	     local_rank = r;
	   if( local_rank == r )
	     local_rank = 0;
	 }
     }
   
   
   int max_level = 0;
   while( (1<< (++max_level) ) < Me.Ntasks[myHOST] );

   *(int*)(Me.win_ctrl.ptr) = -1;
   
   for(int l = 0; l < max_level; l++)
     {
       int threshold = 1 << (1+l);

       if( Me.Rank[myHOST] % threshold == 0)
       if( local_rank % threshold == 0)
         {	   
	   int target = Me.Rank[myHOST] + (1<<l);
	   int source = local_rank + (1<<l);
	   while( *(int*)(Me.scwins[source].ptr) < l )
	     // sleep 5 usec if the source target is not ready
	     NSLEEP( 5000 );
	   
	   for(int j = 0; j < size_of_grid; j++)
	     *((double*)Me.swins[Me.Rank[myHOST]].ptr+sector*sizeof(gridss)+j) +=
	       *((double*)Me.swins[target].ptr+sector*sizeof(gridss)+j);
	     *((double*)Me.swins[local_rank].ptr+sector*sizeof(gridss)+j) +=
	       *((double*)Me.swins[source].ptr+sector*sizeof(gridss)+j);
	   
	   *(int*)(Me.win_ctrl.ptr) = l;
         }
       else
	 *(int*)(Me.win_ctrl.ptr) = l;
     }
   
   return;
+12 −12
Original line number Diff line number Diff line
@@ -31,10 +31,10 @@ void init(int index)
   yaxis = local_grid_size_y;

   #ifdef USE_MPI
   	init_numa( rank, size, &MYMPI_COMM_WORLD, &Me );
   	init_numa( global_rank, size, &MYMPI_COMM_WORLD, &Me );

        if(rank == 0)
    	    printf("\nTask %d sees %d topology levels\n", rank, Me.MAXl);
        if(global_rank == 0)
    	    printf("\nTask %d sees %d topology levels\n", global_rank, Me.MAXl);
   #endif

   clock_gettime(CLOCK_MONOTONIC, &begin);
@@ -75,7 +75,7 @@ void init(int index)

void op_filename() {

   if(rank == 0)
   if(global_rank == 0)
   {   
   	strcpy(buf, num_buf);
   	strcat(buf, outparam.outfile);
@@ -127,7 +127,7 @@ void op_filename() {

void read_parameter_file(char *fname)
{
   if(rank == 0)
   if(global_rank == 0)
   {
   	if(file.pFile = fopen (fname,"r"))
   	{
@@ -273,7 +273,7 @@ void fileName(char datapath[900], char file[30]) {

void readMetaData(char fileLocal[1000]) {

     if(rank == 0) 
     if(global_rank == 0) 
     {
        if(file.pFile = fopen (fileLocal,"r"))
        {
@@ -315,7 +315,7 @@ void metaData_calculation() {
     // calculate the coordinates of the center
     double uvshift = metaData.uvmin/(metaData.uvmax-metaData.uvmin);

     if (rank == 0)
     if (global_rank == 0)
     {
          printf("N. measurements %ld\n",metaData.Nmeasures);
          printf("N. visibilities %ld\n",metaData.Nvis);
@@ -325,8 +325,8 @@ void metaData_calculation() {
     long nm_pe = (long)(metaData.Nmeasures/size);
     long remaining = metaData.Nmeasures%size;

     startrow = rank*nm_pe;
     if (rank == size-1)nm_pe = nm_pe+remaining;
     startrow = global_rank*nm_pe;
     if (global_rank == size-1)nm_pe = nm_pe+remaining;

     long Nmeasures_tot = metaData.Nmeasures;
     metaData.Nmeasures = nm_pe;
@@ -335,8 +335,8 @@ void metaData_calculation() {
     metaData.Nweights = metaData.Nmeasures*metaData.polarisations;

     #ifdef VERBOSE
          printf("N. measurements on %d %ld\n",rank,metaData.Nmeasures);
          printf("N. visibilities on %d %ld\n",rank,metaData.Nvis);
          printf("N. measurements on %d %ld\n",global_rank,metaData.Nmeasures);
          printf("N. visibilities on %d %ld\n",global_rank,metaData.Nvis);
     #endif

}
@@ -376,7 +376,7 @@ void allocate_memory() {

void readData() {

     if(rank == 0) 
     if(global_rank == 0) 
     {

          printf("READING DATA\n");
Loading