Commit 8239a0c5 authored by Nandhana Sakhtivel's avatar Nandhana Sakhtivel
Browse files

Code with correct ring reduce

parent 71bccbe1
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
@@ -107,7 +107,9 @@ typedef struct {
  double compose;    //
  double phase;      //
  double write;      //
  double total; } timing_t;
  double total;
  double reduce_ring;
 } timing_t;

extern timing_t wt_timing;      // wall-clock timings
extern timing_t pr_timing;      // process CPU timing
+14 −23
Original line number Diff line number Diff line
@@ -278,7 +278,6 @@ void gridding_data()

      double twt_r, tpr_r;
      TAKE_TIME(twt_r, tpr_r);

                                                     // ..................
     #ifndef USE_MPI                                 // REDUCE WITH NO MPI                
      
@@ -300,22 +299,19 @@ void gridding_data()
       
     #ifdef ONE_SIDE

     printf("One Side communication active");
     //printf("One Side communication active my rank %d target rank %d\n", global_rank, target_rank);

     //MPI_Win_lock(MPI_LOCK_SHARED,target_rank,0,slabwin);
     //MPI_Accumulate(gridss,size_of_grid,MPI_DOUBLE,target_rank,0,size_of_grid,MPI_DOUBLE,MPI_SUM,slabwin);
     //MPI_Win_unlock(target_rank,slabwin);
     printf("One Side communication active\n");

     #ifdef RING
     double _twt_;
     TAKE_TIMEwt(_twt_);
     int res = reduce_ring(target_rank);
     ADD_TIMEwt(reduce_ring, _twt_);
     #endif

     #ifdef BINOMIAL
     int res = reduce_binomial(target_rank);
     #endif

     //printf("I'm outside reduce global rank %d target rank %d local_rank %d \n", global_rank, target_rank, Me.Rank[HOSTS]);

     #else   // relates to #ifdef ONE_SIDE
      
@@ -332,7 +328,6 @@ void gridding_data()
      ADD_TIME(reduce, twt_r, tpr_r);

      

      // Deallocate all sector arrays
      free(uus);
      free(vvs);
@@ -342,22 +337,16 @@ void gridding_data()
      free(visimgs);
      // End of loop over sector    
    }
    

    #ifdef ONE_SIDE
    #ifdef RING

        if( (Me.Rank[HOSTS] >= 0) && (Me.Nhosts > 1 )) {
          MPI_Waitall( Me.Ntasks[WORLD], requests, MPI_STATUSES_IGNORE);
          free(requests);}

        MPI_Barrier(MPI_COMM_WORLD);

 //       if(Me.Nhosts>1)
		memcpy(Me.sfwins[Me.Rank[myHOST]].ptr, grid, size_of_grid);
   //     else
     //   	memcpy(Me.sfwins[global_rank].ptr, grid, size_of_grid);
         for( int jj = 0; jj < size_of_grid; jj++)
         {
       	    	*((double*)grid+jj) = *((double*)Me.fwin.ptr+jj);
         }

    #endif
    #endif

  free( histo_send );

@@ -366,8 +355,10 @@ void gridding_data()
 #endif

 #ifdef USE_MPI
  MPI_Win_fence(0,slabwin);
 // MPI_Win_fence(0,slabwin);
 #ifdef ONE_SIDE
  numa_shutdown(global_rank, 0, &MYMPI_COMM_WORLD, &Me);
 #endif
  MPI_Barrier(MPI_COMM_WORLD);
 #endif
  
+8 −2
Original line number Diff line number Diff line
@@ -30,9 +30,11 @@ void init(int index)
   yaxis = local_grid_size_y;

   #ifdef USE_MPI
   #ifdef ONE_SIDE
   numa_init( global_rank, size, &MYMPI_COMM_WORLD, &Me );
   numa_expose(&Me,0);
   #endif
   #endif

   TAKE_TIME_START(setup);

@@ -366,7 +368,11 @@ void allocate_memory() {
     gridss_real  = (double*) calloc(size_of_grid/2,sizeof(double));
     gridss_img   = (double*) calloc(size_of_grid/2,sizeof(double));
   
     #ifdef USE_MPI   
     #ifdef ONE_SIDE
     numa_allocate_shared_windows( &Me, size_of_grid*sizeof(double)*1.1, sizeof(double)*1.1 );
     #endif
     #endif
 
     // Create destination slab
      grid = (double*) calloc(size_of_grid,sizeof(double));
+35 −24
Original line number Diff line number Diff line
@@ -12,8 +12,8 @@


#if defined(DEBUG)
double check_host_value   = 0;
double check_global_value = 0;
double check_host_value ;
double check_global_value ;
#endif

struct { double rtime, ttotal, treduce, tspin, tspin_in, tmovmemory, tsum;} timing = {0};
@@ -34,6 +34,14 @@ int reduce_ring (int target_rank)
	  timing.rtime  = CPU_TIME_rt;
	  timing.ttotal = CPU_TIME_pr;

          #ifdef DEBUG
                check_host_value = 0;
		for( int jj = 0; jj < Me.Ntasks[myHOST]; jj++ )
		{
			check_host_value += (double)(Me.Ranks_to_myhost[jj]);				
		}
          #endif
         
	 #pragma omp parallel num_threads(2)
	  {
	    int thid         = omp_get_thread_num();
@@ -48,9 +56,11 @@ int reduce_ring (int target_rank)
		    		    
		    if( Ntasks_local > 1 )
		      {
                        memcpy(Me.win.ptr, gridss, sizeof(gridss));
                       	for( int jj = 0; jj < size_of_grid; jj++ )
                          *((double*)Me.win.ptr+jj) = *((double*)gridss+jj);

			int value = target_rank * Ntasks_local;
			
			for ( int jj = 0; jj < Me.Ntasks[Me.SHMEMl]; jj++ )
			  *((int*)Me.win_ctrl.ptr+CTRL_BLOCKS+jj) = value;

@@ -79,8 +89,11 @@ int reduce_ring (int target_rank)
		      {
			ACQUIRE_CTRL((int*)Me.win_ctrl.ptr+CTRL_FINAL_STATUS, FINAL_FREE, timing.tspin, != );
                                                                 		       // mimic the production of new data
                        memcpy(Me.win.ptr, gridss, sizeof(gridss));

                       for( int jj = 0; jj < size_of_grid; jj++ )
		       {
                          *((double*)Me.fwin.ptr+jj) = *((double*)gridss+jj);
			  *((double*)Me.win.ptr+size_of_grid+jj) = *((double*)gridss+jj);
		       }
			atomic_store(((int*)Me.win_ctrl.ptr+CTRL_FINAL_CONTRIB), Ntasks_local);
		      }

@@ -91,6 +104,7 @@ int reduce_ring (int target_rank)
		    if( Im_target || Im_NOT_target_but_Im_master )
		      {			
			ACQUIRE_CTRL((int*)Me.win_ctrl.ptr+CTRL_FINAL_CONTRIB, Ntasks_local, timing.tspin, !=);
                        
			atomic_store(((int*)Me.win_ctrl.ptr+CTRL_FINAL_STATUS), target_rank);
		      }

@@ -128,7 +142,6 @@ int reduce_ring (int target_rank)
			int    *ctrl_ptr    = (int*)Me.scwins[target].ptr+CTRL_FINAL_STATUS;

			double *send_buffer = ( Im_hosting_target ? MPI_IN_PLACE : (double*)Me.win.ptr+size_of_grid );
			
			double *recv_buffer = ( Im_hosting_target ? (double*)Me.sfwins[target].ptr : NULL );


@@ -143,11 +156,8 @@ int reduce_ring (int target_rank)
			tstart = CPU_TIME_tr;
			MPI_Ireduce(send_buffer, recv_buffer, size_of_grid, MPI_DOUBLE, MPI_SUM, target_task, COMM[HOSTS], &requests[target_rank]);			
			timingmpi.tmpi_reduce += CPU_TIME_tr - tstart;
									
			MPI_Wait( &requests[target_rank], MPI_STATUS_IGNORE );
			atomic_store(ctrl_ptr, FINAL_FREE);

		        //printf("Im after MPI_Ireduce and my global rank %d and local rank %d\n", global_rank, Me.Rank[HOSTS]);	
			timingmpi.tmpi += CPU_TIME_tr - start;
		      }

@@ -359,9 +369,9 @@ int shmem_reduce_ring( int sector, int target_rank, int_t size_of_grid, map_t *M
       my_source = __builtin_assume_aligned( my_source, 8);
       my_target = __builtin_assume_aligned( my_target, 8);

       dprintf(1, 0, 0, "+ SEC %d host %d l %d t %d <-> %d block %d from %llu to %llu\n",
	       sector, Me->myhost, t, local_rank, target, myblock, 
	       blocks->Bstart[myblock], blocks->Bstart[myblock]+dsize );
     //  dprintf(1, 0, 0, "+ SEC %d host %d l %d t %d <-> %d block %d from %llu to %llu\n",
//	       sector, Me->myhost, t, local_rank, target, myblock, 
//	       blocks->Bstart[myblock], blocks->Bstart[myblock]+dsize );
       
	                                                                      // check whether the data of the source rank
	                                                                      // are ready to be used (control tag must have
@@ -395,6 +405,7 @@ int shmem_reduce_ring( int sector, int target_rank, int_t size_of_grid, map_t *M
	       *(my_target+1) += *(my_source+1);
	       *(my_target+2) += *(my_source+2);
	       *(my_target+3) += *(my_source+3);
//                printf("The rank %d target value of 3 %lf\n", global_rank, (target+3));
	     } }
	 case 1: { for( ; my_source < my_end; my_source++, my_target++)
	       *my_target += *my_source; } break;
@@ -410,9 +421,9 @@ int shmem_reduce_ring( int sector, int target_rank, int_t size_of_grid, map_t *M
       ctrl++;
       atomic_store( ((int*)Me->win_ctrl.ptr+CTRL_BLOCKS+myblock), ctrl );
       //CPU_TIME_STAMP( local_rank, "R3");
       dprintf(1, 0, 0, "- SEC %d host %d l %d t %d ... writing tag %d on block %d = %d\n",
	       sector, Me->myhost, t, local_rank, ctrl, myblock, 
	       *((int*)Me->win_ctrl.ptr+CTRL_BLOCKS+myblock) );
  //     dprintf(1, 0, 0, "- SEC %d host %d l %d t %d ... writing tag %d on block %d = %d\n",
//	       sector, Me->myhost, t, local_rank, ctrl, myblock, 
//	       *((int*)Me->win_ctrl.ptr+CTRL_BLOCKS+myblock) );
       
       myblock = (Nt+(myblock-1)) % Nt;
       atomic_thread_fence(memory_order_release);
@@ -422,8 +433,8 @@ int shmem_reduce_ring( int sector, int target_rank, int_t size_of_grid, map_t *M
   int_t offset = blocks->Bstart[myblock];
   int_t dsize  = blocks->Bsize[myblock];

   dprintf(1,0,0, "c SEC %d host %d t %d (%d) ==> t %d, block %d %llu from %llu\n",
	   sector, Me->myhost, local_rank, global_rank, target_rank_on_myhost, myblock, dsize, offset );
 //  dprintf(1,0,0, "c SEC %d host %d t %d (%d) ==> t %d, block %d %llu from %llu\n",
//	   sector, Me->myhost, local_rank, global_rank, target_rank_on_myhost, myblock, dsize, offset );

   double tstart2 = CPU_TIME_tr;
   double * restrict my_source = data+offset;
@@ -454,6 +465,7 @@ int shmem_reduce_ring( int sector, int target_rank, int_t size_of_grid, map_t *M
   memmoves += dsize;
   
   //atomic_thread_fence(memory_order_release);
   
   return 0;
 }

@@ -508,8 +520,7 @@ int shmem_reduce_binomial( int sector, int target_rank, int dsize, map_t *Me, do
      while( (local_rank % (1<<my_maxlevel)) ) my_maxlevel--;

      //printf("my max_level %d max level %d my rank %d\n", my_maxlevel, max_level, global_rank);
      dprintf(1, 0, 0, "@ SEC %d t %d (%d), %d %d\n",
      sector, local_rank, oRank, *(int*)Me->win_ctrl.ptr, my_maxlevel);
      //dprintf(1, 0, 0, "@ SEC %d t %d (%d), %d %d\n",sector, local_rank, global_rank, *(int*)Me->win_ctrl.ptr, my_maxlevel);
     
      // main reduction loop
      //    
+4 −3
Original line number Diff line number Diff line
@@ -14,9 +14,10 @@ void write_result()
      printf("%14s time : %f sec\n", "Reduce", wt_timing.reduce);
     #if defined(USE_MPI)
     #if defined(ONE_SIDE)
      printf("%14s time : %f sec\n", "Reduce sh", wt_timing.reduce_sh);
      printf("%14s time : %f sec\n", "Mmove", wt_timing.mmove);
      printf("%14s time : %f sec\n", "ReduceMPI", wt_timing.reduce_mpi);
      //printf("%14s time : %f sec\n", "Reduce sh", wt_timing.reduce_sh);
      printf("%14s time : %f sec\n", "Reduce ring", wt_timing.reduce_ring);
      //printf("%14s time : %f sec\n", "Mmove", wt_timing.mmove);
      //printf("%14s time : %f sec\n", "ReduceMPI", wt_timing.reduce_mpi);
     #endif
      printf("%14s time : %f sec\n", "MPI", wt_timing.mpi);
     #endif
+1 −1

File changed.

Contains only whitespace changes.

Loading