Commit 57bf64fc authored by Claudio Gheller's avatar Claudio Gheller
Browse files

First commit

parent fd550772
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -7,3 +7,4 @@ w-stackingCfftw
w-stackingfftw
w-stackingCfftw_serial
w-stackingfftw_serial
w-stacking_*
+3 −3
Original line number Diff line number Diff line
@@ -77,16 +77,16 @@ OPT += -DGAUSS_HI_PRECISION
#OPT += -DNVIDIA

# use CUDA for GPUs
OPT += -DCUDACC
#OPT += -DCUDACC

# use GPU acceleration via OMP 
#OPT += -DACCOMP

# perform stacking on GPUs
OPT += -DGPU_STACKING
#OPT += -DGPU_STACKING

# use NVIDIA GPU to perform the reduce
OPT += -DNCCL_REDUCE
#OPT += -DNCCL_REDUCE

# use GPU to perform FFT
#OPT += -DCUFFTMP
+3 −2
Original line number Diff line number Diff line
@@ -113,8 +113,8 @@ void gridding_data()
      uint ip = 0;
      uint inu = 0;

      #warning "this loop should be threaded"
      #warning "the counter of this loop should not be int"
      //warning "this loop should be threaded"
      //warning "the counter of this loop should not be int"
      for( int iphi = histo_send[isector]-1; iphi >=0 ; iphi--)
        {
	  
@@ -246,6 +246,7 @@ void gridding_data()
	 #endif
	  
	  timing_wt.reduce += CPU_TIME_wt - start;
	  printf("=========================\n");

	  // Go to next sector
	  memset ( gridss, 0, 2*param.num_w_planes*xaxis*yaxis * sizeof(double) );	  

gridding_rccl.cpp

deleted100644 → 0
+0 −273
Original line number Diff line number Diff line
#include "allvars_rccl.h"
#include "proto.h"
#include <hip/hip_runtime.h>
#include <rccl.h>

/* 
 * Implements the gridding of data via GPU
 * by using NCCL library
 *
 */


#if defined( RCCL_REDUCE )

/*
#define NCCLCHECK(cmd) do {                         
ncclResult_t r = cmd;                             
if (r!= ncclSuccess) {                            
  printf("Failed, NCCL error %s:%d '%s'\n",       
	 __FILE__,__LINE__,ncclGetErrorString(r));   
  exit(EXIT_FAILURE);                             
 }                                                 
} while(0)
*/  

static uint64_t getHostHash(const char* string) {
  // Based on DJB2a, result = result * 33 ^ char                                                                                                 
  uint64_t result = 5381;
  for (int c = 0; string[c] != '\0'; c++){
    result = ((result << 5) + result) ^ string[c];
  }
  return result;
}


static void getHostName(char* hostname, int maxlen) {
  gethostname(hostname, maxlen);
  for (int i=0; i< maxlen; i++) {
    if (hostname[i] == '.') {
        hostname[i] = '\0';
        return;
    }
  }  
}




void gridding_data(){

  double shift = (double)(dx*yaxis);

  timing_wt.kernel     = 0.0;
  timing_wt.reduce     = 0.0;
  timing_wt.reduce_mpi = 0.0;
  timing_wt.reduce_sh  = 0.0;
  timing_wt.compose    = 0.0;
  
  // calculate the resolution in radians
  resolution = 1.0/MAX(fabs(metaData.uvmin),fabs(metaData.uvmax));

  // calculate the resolution in arcsec 
  double resolution_asec = (3600.0*180.0)/MAX(fabs(metaData.uvmin),fabs(metaData.uvmax))/PI;
  if ( rank == 0 )
    printf("RESOLUTION = %f rad, %f arcsec\n", resolution, resolution_asec);

 

    //Initialize nccl

  double * grid_gpu, *gridss_gpu;
  int local_rank = 0;

  uint64_t hostHashs[size];
  char hostname[1024];
  getHostName(hostname, 1024);
  hostHashs[rank] = getHostHash(hostname);
  MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD);
  for (int p=0; p<size; p++) {
     if (p == rank) break;
     if (hostHashs[p] == hostHashs[rank]) local_rank++;
  }
  
  ncclUniqueId id;
  ncclComm_t comm;
  hipStream_t stream_reduce;

  if (rank == 0) ncclGetUniqueId(&id);
  MPI_Bcast((void *)&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);

  hipSetDevice(local_rank);

  hipMalloc(&grid_gpu, 2*param.num_w_planes*xaxis*yaxis * sizeof(double));
  hipMalloc(&gridss_gpu, 2*param.num_w_planes*xaxis*yaxis * sizeof(double));
  hipStreamCreate(&stream_reduce);
  
  ncclCommInitRank(&comm, size, id, rank);

  for (uint isector = 0; isector < nsectors; isector++)
    {

      double start = CPU_TIME_wt;

      uint Nsec            = histo_send[isector];
      uint Nweightss       = Nsec*metaData.polarisations;
      uint Nvissec         = Nweightss*metaData.freq_per_chan;
      double_ty *memory     = (double*) malloc ( (Nsec*3)*sizeof(double_ty) +
						(Nvissec*2+Nweightss)*sizeof(float_ty) );

      if ( memory == NULL )
	shutdown_wstacking(NOT_ENOUGH_MEM_STACKING, "Not enough memory for stacking", __FILE__, __LINE__);
  
      double_ty *uus        = (double_ty*) memory;
      double_ty *vvs        = (double_ty*) uus+Nsec;
      double_ty *wws        = (double_ty*) vvs+Nsec;
      float_ty  *weightss   = (float_ty*)((double_t*)wws+Nsec);
      float_ty  *visreals   = (float_ty*)weightss + Nweightss;
      float_ty  *visimgs    = (float_ty*)visreals + Nvissec;
  
      
      
      // select data for this sector
      uint icount = 0;
      uint ip = 0;
      uint inu = 0;

      #warning "this loop should be threaded"
      #warning "the counter of this loop should not be int"
      for(int iphi = histo_send[isector]-1; iphi>=0; iphi--)
        {

	  uint ilocal = sectorarray[isector][iphi];

	  uus[icount] = data.uu[ilocal];
	  vvs[icount] = data.vv[ilocal]-isector*shift;
	  wws[icount] = data.ww[ilocal];
	  for (uint ipol=0; ipol<metaData.polarisations; ipol++)
	    {
	      weightss[ip] = data.weights[ilocal*metaData.polarisations+ipol];
	      ip++;
	    }
	  for (uint ifreq=0; ifreq<metaData.polarisations*metaData.freq_per_chan; ifreq++)
	    {
	      visreals[inu] = data.visreal[ilocal*metaData.polarisations*metaData.freq_per_chan+ifreq];
	      visimgs[inu]  = data.visimg[ilocal*metaData.polarisations*metaData.freq_per_chan+ifreq];

	      inu++;
	    }
	  icount++;
	}

      double uumin = 1e20;
      double vvmin = 1e20;
      double uumax = -1e20;
      double vvmax = -1e20;

      #pragma omp parallel reduction( min: uumin, vvmin) reduction( max: uumax, vvmax) num_threads(param.num_threads)
      {
        double my_uumin = 1e20;
        double my_vvmin = 1e20;
        double my_uumax = -1e20;
        double my_vvmax = -1e20;

       #pragma omp for
        for (uint ipart=0; ipart<Nsec; ipart++)
          {
            my_uumin = MIN(my_uumin, uus[ipart]);
            my_uumax = MAX(my_uumax, uus[ipart]);
            my_vvmin = MIN(my_vvmin, vvs[ipart]);
            my_vvmax = MAX(my_vvmax, vvs[ipart]);
          }

        uumin = MIN( uumin, my_uumin );
        uumax = MAX( uumax, my_uumax );
        vvmin = MIN( vvmin, my_vvmin );
        vvmax = MAX( vvmax, my_vvmax );
      }

      timing_wt.compose += CPU_TIME_wt - start;

      //printf("UU, VV, min, max = %f %f %f %f\n", uumin, uumax, vvmin, vvmax);
      
      // Make convolution on the grid

     #ifdef VERBOSE
      printf("Processing sector %ld\n",isector);
     #endif

      start = CPU_TIME_wt;

      double *stacking_target_array;
      if ( size > 1 )
	stacking_target_array = gridss;
      else
	stacking_target_array = grid;
      
     //We have to call different GPUs per MPI task!!! [GL]
      wstack(param.num_w_planes,
	     Nsec,
	     metaData.freq_per_chan,
	     metaData.polarisations,
	     uus,
	     vvs,
	     wws,
	     visreals,
	     visimgs,
	     weightss,
	     dx,
	     dw,
	     param.w_support,
	     xaxis,
	     yaxis,
	     stacking_target_array,
	     param.num_threads,
	     rank);
      //Allocate memory on devices non-blocking for the host                                                                                   
      ///////////////////////////////////////////////////////


      timing_wt.kernel += CPU_TIME_wt - start;
	    
      hipMemcpyAsync(gridss_gpu, gridss, 2*param.num_w_planes*xaxis*yaxis*sizeof(double), hipMemcpyHostToDevice, stream_reduce);
      
     #ifdef VERBOSE
      printf("Processed sector %ld\n",isector);
     #endif
      

      if( size > 1 )
	{
     
	  // Write grid in the corresponding remote slab
     
	  // int target_rank = (int)isector;    it implied that size >= nsectors
	  int target_rank = (int)(isector % size);
	  
	  	  
	  hipStreamSynchronize(stream_reduce);
    
          start = CPU_TIME_wt;

	  ncclReduce(gridss_gpu, grid_gpu, size_of_grid, ncclDouble, ncclSum, target_rank, comm, stream_reduce);
	  hipStreamSynchronize(stream_reduce);
      
	  timing_wt.reduce += CPU_TIME_wt - start;

		  
	  // Go to next sector
	  memset ( gridss, 0, 2*param.num_w_planes*xaxis*yaxis * sizeof(double) );	  
	}	

      free(memory);
    }

  //Copy data back from device to host (to be deleted in next steps)

  hipMemcpyAsync(grid, grid_gpu, 2*param.num_w_planes*xaxis*yaxis*sizeof(double), hipMemcpyDeviceToHost, stream_reduce);
  
  MPI_Barrier(MPI_COMM_WORLD);

  hipStreamSynchronize(stream_reduce);
  hipFree(gridss_gpu);
  hipFree(grid_gpu);
  
  hipStreamDestroy(stream_reduce);
  
  ncclCommDestroy(comm);

  return;
  
}

#endif
+3 −0
Original line number Diff line number Diff line
@@ -48,7 +48,9 @@ void init(int index)
   metaData_calculation();

   // Allocate Data Buffer
   // CLAAAAAA assume here that data shape/size will never change across multiple data read
   allocate_memory();
   //if (index == 0) allocate_memory();
  
   // Reading Data
   readData();
@@ -388,6 +390,7 @@ void allocate_memory() {
	 grid   = (double*)Me.fwin.ptr; // let grid point to the right memory location [GL]
       }
          
     // CLAAAAA these two arrays are need ONLY for I/O purposes. Can be improved!
     gridss_real = gridss_w + size_of_grid;
     gridss_img  = gridss_real + size_of_grid / 2;
          
Loading