Commit d20deb6c authored by Nandhana Sakhtivel's avatar Nandhana Sakhtivel
Browse files

Ring communication added for One sided reduce communication

parent 65f67774
Loading
Loading
Loading
Loading
+13 −13
Original line number Diff line number Diff line
@@ -30,17 +30,17 @@ endif

#OPT += -DNVIDIA
# perform one-side communication (suggested) instead of reduce (only if MPI is active)
#OPT += -DONE_SIDE
OPT += -DONE_SIDE
# write the full 3D cube of gridded visibilities and its FFT transform
#OPT += -DWRITE_DATA
OPT += -DWRITE_DATA
# write the final image
#OPT += -DWRITE_IMAGE
OPT += -DWRITE_IMAGE
# perform w-stacking phase correction
OPT += -DPHASE_ON


DEPS = w-stacking.h main.c w-stacking.cu phase_correction.cu allvars.h init.c gridding.c fourier_transform.c result.c numa.h
COBJ = w-stacking.o main.o phase_correction.o allvars.o init.o gridding.o fourier_transform.o result.o numa.o
DEPS = w-stacking.h main.c w-stacking.cu phase_correction.cu allvars.h init.c gridding.c fourier_transform.c result.c reduce.c numa.h
COBJ = w-stacking.o main.o phase_correction.o allvars.o init.o gridding.o fourier_transform.o result.o reduce.o numa.o

w-stacking.c: w-stacking.cu
	cp w-stacking.cu w-stacking.c
@@ -50,35 +50,35 @@ phase_correction.c: phase_correction.cu

ifeq (USE_MPI,$(findstring USE_MPI,$(OPT)))
%.o: %.c $(DEPS)
	$(MPICC) $(OPTIMIZE) $(OPT) -c -o $@ $< $(CFLAGS)
	$(MPICC) $(OPTIMIZE) $(OPT) -fopenmp -c -g -O0 -o $@ $< $(CFLAGS)
else
%.o: %.c $(DEPS)
	$(CC) $(OPTIMIZE) $(OPT) -c -o $@ $< $(CFLAGS)
	$(CC) $(OPTIMIZE) $(OPT) -fopenmp -c -g -O0 -o $@ $< $(CFLAGS)
endif

serial: $(COBJ)
	$(CC) $(OPTIMIZE) $(OPT) -o w-stackingCfftw_serial  $^ $(LIBS)

serial_omp: phase_correction.c
	$(CC)  $(OPTIMIZE) $(OPT) -o w-stackingOMP_serial main.c init.c gridding.c fourier_transform.c result.c w-stacking_omp.c    $(CFLAGS) $(LIBS)
	$(CC)  $(OPTIMIZE) $(OPT) -o w-stackingOMP_serial main.c init.c gridding.c fourier_transform.c result.c reduce.c w-stacking_omp.c    $(CFLAGS) $(LIBS)

simple_mpi: phase_correction.c
	$(MPICC) $(OPTIMIZE) $(OPT) -o w-stackingMPI_simple w-stacking_omp.c main.c init.c gridding.c fourier_transform.c result.c phase_correction.c  $(CFLAGS) $(LIBS)
	$(MPICC) $(OPTIMIZE) $(OPT) -o w-stackingMPI_simple w-stacking_omp.c main.c init.c gridding.c fourier_transform.c result.c reduce.c phase_correction.c  $(CFLAGS) $(LIBS)

mpi_omp: phase_correction.c
	$(MPICC) $(OPTIMIZE) $(OPT) -o w-stackingMPI_omp w-stacking_omp.c main.c init.c gridding.c fourier_transform.c result.c phase_correction.c  $(CFLAGS) $(LIBS)
	$(MPICC) $(OPTIMIZE) $(OPT) -o w-stackingMPI_omp w-stacking_omp.c main.c init.c gridding.c fourier_transform.c result.c reduce.c phase_correction.c  $(CFLAGS) $(LIBS)

serial_cuda:
	$(NVCC) $(NVFLAGS) -c w-stacking.cu phase_correction.cu $(NVLIB)
	$(CC)  $(OPTIMIZE) $(OPT) -c main.c init.c gridding.c fourier_transform.c result.c $(CFLAGS) $(LIBS)
	$(CC)  $(OPTIMIZE) $(OPT) -c main.c init.c gridding.c fourier_transform.c result.c reduce.c $(CFLAGS) $(LIBS)
	$(CXX) $(OPTIMIZE) $(OPT) -o w-stackingfftw_serial w-stacking-fftw.o w-stacking.o phase_correction.o $(CFLAGS) $(NVLIB) -lm

mpi: $(COBJ)
	$(MPICC) $(OPTIMIZE) -o w-stackingCfftw $^  $(CFLAGS) $(LIBS)
	$(MPICC) $(OPTIMIZE) -fopenmp -o w-stackingCfftw $^  $(CFLAGS) $(LIBS)

mpi_cuda:
	$(NVCC)   $(NVFLAGS) -c w-stacking.cu phase_correction.cu $(NVLIB)
	$(MPICC)  $(OPTIMIZE) $(OPT) -c main.c init.c fourier_transform.c result.c $(CFLAGS) $(LIBS)
	$(MPICC)  $(OPTIMIZE) $(OPT) -c main.c init.c fourier_transform.c result.c gridding.c reduce.c $(CFLAGS) $(LIBS)
	$(MPIC++) $(OPTIMIZE) $(OPT)   -o w-stackingfftw w-stacking-fftw.o w-stacking.o phase_correction.o $(NVLIB) $(CFLAGS) $(LIBS)

clean:
+5 −0
Original line number Diff line number Diff line
@@ -36,3 +36,8 @@ double * grid, *gridss, *gridss_real, *gridss_img, *gridss_w;
#endif

long **sectorarray;

blocks_t blocks;
MPI_Request *requests;
int thid;
int Ntasks_local;
+57 −1
Original line number Diff line number Diff line
@@ -35,7 +35,8 @@
#include <time.h>
#include <unistd.h>
#include "numa.h"

#include <stdatomic.h>
#include <omp.h>

extern struct io
{
@@ -209,3 +210,58 @@ extern long **sectorarray;

#define STRINGIFY(a) #a
#define UNROLL(N) _Pragma(STRINGIFY(unroll(N)))


#define CPU_TIME_tr ({ struct timespec myts; (clock_gettime( CLOCK_THREAD_CPUTIME_ID, &myts ), (double)myts.tv_sec + (double)myts.tv_nsec * 1e-9);})

#define CPU_TIME_rt ({ struct timespec myts; (clock_gettime( CLOCK_REALTIME, &myts ), (double)myts.tv_sec + (double)myts.tv_nsec * 1e-9);})

#define CPU_TIME_STAMP(t, s) { struct timespec myts; clock_gettime( CLOCK_REALTIME, &myts ); printf("STAMP t %d %s - %ld %ld\n", (t), s, myts.tv_sec, myts.tv_nsec);}

#define NSLEEP( T ) {struct timespec tsleep={0, (T)}; nanosleep(&tsleep, NULL); }


#define ACQUIRE_CTRL( ADDR, V, TIME, CMP ) {                            \
    double tstart = CPU_TIME_tr;                                        \
    atomic_thread_fence(memory_order_acquire);                          \
    int read = atomic_load((ADDR));                                     \
    while( read CMP (V) ) { NSLEEP(50);                                 \
      read = atomic_load((ADDR)); }                                     \
    (TIME) += CPU_TIME_tr - tstart; }

              
 #define ACQUIRE_CTRL_DBG( ADDR, V, CMP, TAG ) {                  \
    atomic_thread_fence(memory_order_acquire);                          \
    int read = atomic_load((ADDR));                                     \
    printf("%s %s Task %d read is %d\n", TAG, #ADDR, global_rank, read);      \
    unsigned int counter = 0;                                           \
    while( read CMP (V) ) { NSLEEP(50);                                 \
    read = atomic_load((ADDR));                                         \
    if( (++counter) % 10000 == 0 )                                      \
                        printf("%s %p Task %d read %u is %d\n",         \
                        TAG, ADDR, global_rank, counter, read);}              \
    }



#define DATA_FREE   -1
#define FINAL_FREE  -1

#define CTRL_DATA          0
#define CTRL_FINAL_STATUS  0
#define CTRL_FINAL_CONTRIB 1
#define CTRL_SHMEM_STATUS  2
#define CTRL_BLOCKS        3
#define CTRL_END           (CTRL_BLOCKS+1)

#define CTRL_BARRIER_START 0
#define CTRL_BARRIER_END   1

#define BUNCH_FOR_VECTORIZATION 128

typedef long long unsigned int int_t;
typedef struct { int Nblocks; int_t *Bstart; int_t *Bsize; } blocks_t;
extern MPI_Request *requests;
extern int thid;
extern int Ntasks_local;
extern blocks_t blocks;
+56 −161
Original line number Diff line number Diff line
@@ -94,6 +94,44 @@ void gridding_data()
  MPI_Win_fence(0,slabwin);
 #endif

 #ifdef ONE_SIDE
   memset( (char*)Me.win.ptr, 0, size_of_grid*sizeof(double)*1.1);
   if( Me.Rank[myHOST] == 0 )
   {
       for( int tt = 1; tt < Me.Ntasks[myHOST]; tt++ )
           memset( (char*)Me.swins[tt].ptr, 0, size_of_grid*sizeof(double)*1.1);
   }

   MPI_Barrier(MPI_COMM_WORLD);

   if( Me.Rank[HOSTS] >= 0 )
       requests = (MPI_Request *)calloc( Me.Ntasks[WORLD], sizeof(MPI_Request) );

   if( Me.Rank[myHOST] == 0 ) {
       *((int*)win_ctrl_hostmaster_ptr+CTRL_BARRIER_END) = 0;
       *((int*)win_ctrl_hostmaster_ptr+CTRL_BARRIER_START) = 0; 
   }

   *((int*)Me.win_ctrl.ptr + CTRL_FINAL_STATUS) = FINAL_FREE;
   *((int*)Me.win_ctrl.ptr + CTRL_FINAL_CONTRIB) = 0;
   *((int*)Me.win_ctrl.ptr + CTRL_SHMEM_STATUS) = -1;
   MPI_Barrier(*(Me.COMM[myHOST]));
   
   blocks.Nblocks = Me.Ntasks[myHOST];
   blocks.Bstart  = (int_t*)calloc( blocks.Nblocks, sizeof(int_t));
   blocks.Bsize   = (int_t*)calloc( blocks.Nblocks, sizeof(int_t));
   int_t size  = size_of_grid / blocks.Nblocks;
   int_t rem   = size_of_grid % blocks.Nblocks;

   blocks.Bsize[0]  = size + (rem > 0);
   blocks.Bstart[0] = 0;
   for(int b = 1; b < blocks.Nblocks; b++ ) {
   blocks.Bstart[b] = blocks.Bstart[b-1]+blocks.Bsize[b-1];
   blocks.Bsize[b] = size + (b < rem); }
  

 #endif
  
 #ifndef USE_MPI
  file.pFile1 = fopen (out.outfile1,"w");
 #endif
@@ -135,9 +173,6 @@ void gridding_data()
      for(long iphi = histo_send[isector]-1; iphi>=0; iphi--)
        {
	  long ilocal = sectorarray[isector][iphi];
	  //double vvh = data.vv[ilocal];
	  //int binphi = (int)(vvh*nsectors);
	  //if (binphi == isector || boundary[ilocal] == isector) {
	  uus[icount] = data.uu[ilocal];
	  vvs[icount] = data.vv[ilocal]-isector*shift;
	  wws[icount] = data.ww[ilocal];	  
@@ -246,72 +281,12 @@ void gridding_data()
       
     #ifdef ONE_SIDE

      // for every task, gridss coincides with the 
      // that can be avoided if shared window coincides with gridss

     /* TAKE_TIME(twt, tpr);
      memcpy(Me.win.ptr, gridss, size_of_grid*sizeof(double));
      ADD_TIME(mmove, twt, tpr);
 
      dprintf(1, global_rank, 0, "reducing sector %ld..\n", isector);

      TAKE_TIME( twt, tpr);
      reduce( isector, target_rank );  // here the reduce is performed within every host 
      ADD_TIME(reduce_sh, twt, tpr);
    */
      /*if ( Me.Nhosts > 1 )
	{
	  // here the reduce is performed among hosts
	  MPI_Barrier(MPI_COMM_WORLD);
	   
	  int Im_in_the_new_communicator = MPI_UNDEFINED;
	  if(global_rank == target_rank)
	    Im_in_the_new_communicator = 1;
	  else
	    if( Me.Rank[HOSTS] == 0 )
	      {
		if( Me.Ranks_to_host[ target_rank ] != Me.myhost )
		  Im_in_the_new_communicator = 1;
	      }
	   
	  MPI_Comm Sector_Comm;
	  MPI_Comm_split( COMM[WORLD], Im_in_the_new_communicator, global_rank, &Sector_Comm);
	   
	  if( Sector_Comm != MPI_COMM_NULL )
	    {
	      double _twt_;
	      int sector_size;
	      int sector_rank = 0;
	      int sector_target;
	       
	      MPI_Comm_size( Sector_Comm, &sector_size);
	      MPI_Comm_rank( Sector_Comm, &sector_rank);
	      if ( global_rank == target_rank)
		{
		  MPI_Send( &sector_rank, 1, MPI_INT, 0, 0, Sector_Comm);
		  TAKE_TIMEwt( _twt_ );
		  memcpy(gridss, Me.swins[Me.Rank[myHOST]].ptr+isector*size_of_grid*sizeof(double),
			 size_of_grid * sizeof(double));
		  ADD_TIMEwt( mmove, _twt_);
		}
	       
	      if( sector_rank == 0 )
		{
		  MPI_Status status;
		  MPI_Recv( &sector_target, 1, MPI_INT, MPI_ANY_SOURCE, 0, Sector_Comm, &status);
		}

	      TAKE_TIMEwt(_twt_);
	      MPI_Bcast( &sector_target, 1, MPI_INT, 0, Sector_Comm );
	       
	      MPI_Reduce(gridss, grid, size_of_grid, MPI_DOUBLE,MPI_SUM, sector_target, Sector_Comm);
	      
	      MPI_Comm_free( &Sector_Comm );
	      ADD_TIMEwt(mpi, _twt_);
	    }
	}
      ADD_TIME(reduce_mpi, twt, tpr);
      */
     printf("One Side communication active\n");
     //MPI_Win_lock(MPI_LOCK_SHARED,target_rank,0,slabwin);
     //MPI_Accumulate(gridss,size_of_grid,MPI_DOUBLE,target_rank,0,size_of_grid,MPI_DOUBLE,MPI_SUM,slabwin);
     //MPI_Win_unlock(target_rank,slabwin);
     int res = reduce(target_rank);
     printf("I'm outside reduce global rank %d target rank %d local_rank %d \n", global_rank, target_rank, Me.Rank[HOSTS]);

     #else   // relates to #ifdef ONE_SIDE
      
@@ -329,7 +304,7 @@ void gridding_data()

      
      // wipe before getting to the next sector
      memset((void*)gridss, 0, size_of_grid * sizeof(double));
  //    memset((void*)gridss, 0, size_of_grid * sizeof(double));

      // Deallocate all sector arrays
      free(uus);
@@ -342,6 +317,16 @@ void gridding_data()
    }
    

    #ifdef ONE_SIDE
        if( (Me.Rank[HOSTS] >= 0) && (Me.Nhosts > 1 )) {
          MPI_Waitall( Me.Ntasks[WORLD], requests, MPI_STATUSES_IGNORE);
          free(requests);}
        MPI_Barrier(MPI_COMM_WORLD);
        memcpy(Me.sfwins[global_rank].ptr, grid, size_of_grid);
       /* int test = memcmp(Me.sfwins[global_rank].ptr, grid, size_of_grid);
        printf("The comparison value = %d\n", test);*/
    #endif

  free( histo_send );

 #ifndef USE_MPI
@@ -350,6 +335,7 @@ void gridding_data()

 #ifdef USE_MPI
  MPI_Win_fence(0,slabwin);
  numa_shutdown(global_rank, 0, &MYMPI_COMM_WORLD, &Me);
  MPI_Barrier(MPI_COMM_WORLD);
 #endif
  
@@ -438,96 +424,5 @@ void write_gridded_data()
    #endif //WRITE_DATA      
}

/*
void reduce( int sector, int target_rank )
 {   
   MPI_Barrier(*(Me.COMM[myHOST]));
   
   int local_rank = Me.Rank[myHOST];
   int target_rank_on_myhost = -1;
   
   if( Me.Ranks_to_host[ target_rank ] == Me.myhost )
     // exchange rank 0 with target rank
     // in this way the following log2 alogorithm,
     // which reduces to rank 0, will work for
     // every target rank
     {

       target_rank_on_myhost = 0;
       while( Me.Ranks_to_myhost[target_rank_on_myhost] != target_rank )
	 target_rank_on_myhost++;
       
       
       if( target_rank_on_myhost > 0 )
	 // the target is not the task that already has rank 0
	 // on my host
	 {
	   dprintf(2, Me.Rank[myHOST], 0,
		   "[SEC %d] swapping Host master with target rank %d (%d)\n",
		   sector, target_rank, target_rank_on_myhost);	   
	   
	   if( local_rank == 0 )
	     local_rank = target_rank_on_myhost;
	   else if( local_rank == target_rank_on_myhost )
	     local_rank = 0;

	   win_t temp = Me.swins[target_rank_on_myhost];
	   Me.swins[target_rank_on_myhost] = Me.swins[0];
	   Me.swins[0] = temp;

	   temp = Me.scwins[target_rank_on_myhost];
	   Me.scwins[target_rank_on_myhost] = Me.scwins[0];
	   Me.scwins[0] = temp;
	 }
     }
   
   
   int max_level = 0;
   while( (1<< (++max_level) ) < Me.Ntasks[myHOST] );

   *(int*)(Me.win_ctrl.ptr) = -1;
   MPI_Win_fence( 0, Me.win_ctrl.win);
   
   for(int l = 0; l < max_level; l++)
     {
       int threshold = 1 << (1+l);

       MPI_Win_fence( MPI_MODE_NOSTORE, Me.win_ctrl.win);
       if( local_rank % threshold == 0)
         {
	   int source = local_rank + (1<<l);
	   if( source < Me.Ntasks[myHOST] )
	     {
	       double * restrict my_base     = ((double*)Me.win.ptr);
	       double * my_end               = my_base + size_of_grid;
	       double * restrict source_base = ((double*)Me.swins[source].ptr);
	       for( ; my_base < my_end; my_base++, source_base++)
		 *my_base += *source_base;	       
	     }
	   
	   *(int*)(Me.win_ctrl.ptr) = l;
	   MPI_Win_fence( 0, Me.win_ctrl.win);
         }
       else
	 {
	   *(int*)(Me.win_ctrl.ptr) = l;
	   MPI_Win_fence( 0, Me.win_ctrl.win);
	 }
     }

   if ( target_rank_on_myhost > 0 )
     {
       win_t temp = Me.swins[target_rank_on_myhost];
       Me.swins[target_rank_on_myhost] = Me.swins[0];
       Me.swins[0] = temp;

       temp = Me.scwins[target_rank_on_myhost];
       Me.scwins[target_rank_on_myhost] = Me.scwins[0];
       Me.scwins[0] = temp;
     }
   
   return;
 }
*/

+1 −1
Original line number Diff line number Diff line
@@ -366,7 +366,7 @@ void allocate_memory() {
     gridss_real  = (double*) calloc(size_of_grid/2,sizeof(double));
     gridss_img   = (double*) calloc(size_of_grid/2,sizeof(double));

     numa_allocate_shared_windows( &Me, size_of_grid*sizeof(double)*1.1, size_of_grid*sizeof(double)*1.1 );
     numa_allocate_shared_windows( &Me, size_of_grid*sizeof(double)*1.1, sizeof(double)*1.1 );
     
     // Create destination slab
      grid = (double*) calloc(size_of_grid,sizeof(double));
Loading