diff --git a/Makefile b/Makefile index 9c34660ebdf14daa9a557904972344b9d18bc740..99b4473d23b0af3722d2b57d562bb4c5a940794a 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,7 @@ CC=mpicc -CFLAGS=-O3 -g -fopenmp +#CC=mpiicx +#CFLAGS=-O3 -march=native -flto -funroll-loops -fopenmp +CFLAGS=-O3 -fopenmp LDFLAGS=-lm all: main diff --git a/README.md b/README.md index 7949127e45470faf53f2d4eb06bc4cd3a601eef5..7fd3b04b67b3779acc31528ae89bf0858e6311b0 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,8 @@ The suggestion is to run it with one mpi task per socket. # Todo - - [ ] H1: implementation of lock free centers elimination + - [ ] argument parsing: find an elegant way to pass parameters and file (maybe a config file?) + - [~] H1: implementation of lock free centers elimination (*work in progress*) - [ ] context: open all windows in a single shot, close them all togheter - [ ] io: curation of IO using mpi IO or other solutions - [ ] kdtree: optimization an profiling diff --git a/run_leo b/run_leo index 1f74f83b606d2ee7288716c7805ad0e6abc889c9..44e834a571ce59d5978a9b92c10bc34105ab73db 100644 --- a/run_leo +++ b/run_leo @@ -1,12 +1,12 @@ #!/bin/bash -#SBATCH --nodes=6 +#SBATCH --nodes=2 #SBATCH --ntasks-per-node=2 #SBATCH --cpus-per-task=56 #SBATCH --time=04:00:00 #SBATCH --job-name=dadp_test #SBATCH --partition=dcgp_usr_prod -#SBATCH --account=IscrC_dadp +#SBATCH --account=EUHPC_D18_045 #SBATCH --output=out_leo #SBATCH --error=err_leo #SBATCH --mem=480G @@ -14,8 +14,11 @@ cd $SLURM_SUBMIT_DIR -module restore my_gcc -#module restore my_intel + +#module load gcc +#module load openmpi +module load intel-oneapi-mpi + make clean make ulimit -s unlimited @@ -31,10 +34,18 @@ mkdir bb OUT_ASSIGNMENT=/leonardo_scratch/large/userexternal/ftomba00/assignment OUT_DATA=/leonardo_scratch/large/userexternal/ftomba00/data -IN_DATA=/leonardo_work/IscrC_dadp +IN_DATA=/leonardo_work/EUHPC_D18_045 #10^6 points time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK} ./main -t f32 -i ${IN_DATA}/norm_data/std_LR_091_0001 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA} +#time mpirun -n ${SLURM_NTASKS} --map-by core ./main -t f32 -i ${IN_DATA}/norm_data/std_LR_091_0001 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA} + +#34 * 10^6 points +#time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK} ./main -t f32 -i ${IN_DATA}/norm_data/std_g1212639_091_0001 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA} + +#88 * 10^6 points +#time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK} ./main -t f32 -i ${IN_DATA}/norm_data/std_g5503149_091_0000 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA} + #200 * 10^6 points #time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK} ./main -t f32 -i ${IN_DATA}/norm_data/std_g2980844_091_0000 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA} diff --git a/src/adp/adp.c b/src/adp/adp.c index e796b8db7dc6a0ca93d79525026c54920b05850e..cbd0263796fb30948e22ca0466a12ecd72836e90 100644 --- a/src/adp/adp.c +++ b/src/adp/adp.c @@ -118,8 +118,8 @@ idx_t get_j_ksel_idx(global_context_t* ctx, idx_t j, idx_t ksel, MPI_Win exposed } -void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int verbose){ - +void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int verbose) +{ /* * Point density computation: * args: @@ -510,17 +510,13 @@ datapoint_info_t find_possibly_halo_datapoint_rma(global_context_t* ctx, idx_t i else { datapoint_info_t tmp_dp; - #pragma omp critical - { - idx_t i = idx - ctx -> rank_idx_start[owner]; - MPI_Request request; - MPI_Status status; - - MPI_Rget(&tmp_dp, sizeof(datapoint_info_t), MPI_BYTE, owner, - i * sizeof(datapoint_info_t), sizeof(datapoint_info_t), MPI_BYTE, win_datapoints, &request); - MPI_Wait(&request, MPI_STATUS_IGNORE); + idx_t i = idx - ctx -> rank_idx_start[owner]; + MPI_Request request; + MPI_Status status; - } + MPI_Rget(&tmp_dp, sizeof(datapoint_info_t), MPI_BYTE, owner, + i * sizeof(datapoint_info_t), sizeof(datapoint_info_t), MPI_BYTE, win_datapoints, &request); + MPI_Wait(&request, MPI_STATUS_IGNORE); return tmp_dp; } @@ -648,21 +644,43 @@ lock_t h1_lock_free(global_context_t* ctx, MPI_Win lock_window, int owner, idx_t } +void enqueue_center_removal(center_removal_queue_t* queue, + center_removal_t element) +{ + int default_incr_size = 100; + if(queue -> count >= queue -> size) + { + queue -> size += default_incr_size; + queue -> data = realloc(queue -> data, queue -> size * sizeof(center_removal_t)); + } + queue -> data[queue -> count] = element; + queue -> count++; +} + +int compare_removal_by_target(const void* a, const void* b) +{ + idx_t arg1 = (*(const center_removal_t *)a).target_id; + idx_t arg2 = (*(const center_removal_t *)b).target_id; + return (arg1 > arg2) - (arg1 < arg2); +} + clusters_t Heuristic1(global_context_t *ctx) { - /* + /** * Heurisitc 1, from paper of Errico, Facco, Laio & Rodriguez * ( https://doi.org/10.1016/j.ins.2021.01.010 ) - */ + **/ datapoint_info_t* dp_info = ctx -> local_datapoints; idx_t n = ctx -> local_n_points; struct timespec start_tot, finish_tot; double elapsed_tot; + double elapsed_time; TIME_DEF; + TIME_START; lu_dynamic_array_t all_centers, removed_centers, actual_centers, max_rho; lu_dynamic_array_allocate(&all_centers); @@ -679,7 +697,7 @@ clusters_t Heuristic1(global_context_t *ctx) 1, MPI_INFO_NULL, ctx -> mpi_communicator, &win_datapoints); MPI_Win_fence(0, win_datapoints); MPI_Win_lock_all(0, win_datapoints); - + // #if !defined(THREAD_FUNNELED) #pragma omp parallel for #endif @@ -715,45 +733,166 @@ clusters_t Heuristic1(global_context_t *ctx) } } - /* + /** * optimized version * * Generate a mask that keeps track of the point has been eliminating the * point considered. Each thread updates this mask, then after the procedure * ends, center, removed centers, and max_rho arrays are populated - */ - - lock_t* lock_array = (lock_t*)MY_MALLOC(n * sizeof(lock_t)); + * + * optimized v2 use a queue of center removal and then exchange them + **/ heap_node* to_remove_mask = (heap_node*)MY_MALLOC(n*sizeof(heap_node)); for(idx_t p = 0; p < n; ++p) { to_remove_mask[p].array_idx = MY_SIZE_MAX; - to_remove_mask[p].value = 9999999; - lock_array[p] = LOCK_FREE; + to_remove_mask[p].value = -9999999; } + + // sort by density + qsort(dp_info_ptrs, n, sizeof(datapoint_info_t*), cmpPP); + /** + * workflow + * find the elements we need to eliminate + * compact them to a single array + * send/recieve + **/ + + center_removal_queue_t* removal_queues = (center_removal_queue_t*)MY_MALLOC(n * sizeof(center_removal_queue_t)); + omp_lock_t* lock_array = (omp_lock_t*)MY_MALLOC(n * sizeof(omp_lock_t)); - MPI_Win win_to_remove_mask; - MPI_Win_create(to_remove_mask, n * sizeof(heap_node), 1, MPI_INFO_NULL, ctx -> mpi_communicator, &win_to_remove_mask); - MPI_Win_fence(0, win_to_remove_mask); + //zero out all queues + + for(idx_t i = 0; i < n; ++i) + { + removal_queues[i].count = 0; + removal_queues[i].size = 0; + removal_queues[i].data = NULL; + omp_init_lock(lock_array + i); + } - MPI_Win win_locks; - MPI_Win_create(lock_array, n * sizeof(lock_t), sizeof(lock_t), MPI_INFO_NULL, ctx -> mpi_communicator, &win_locks); - MPI_Win_fence(0, win_locks); + elapsed_time = TIME_STOP; + LOG_WRITE("Putative centers", elapsed_time); + + TIME_START; -#ifdef EXP_H1 - MPI_Win_lock_all(0, win_to_remove_mask); - MPI_Win_lock_all(0, win_locks); -#endif + -#ifdef EXP_H1 - printf("Using experimental h1\n"); -#endif +//#define EXP_CENTER_PRUNING +#if defined(EXP_CENTER_PRUNING) + int all_have_finished = 0; + int i_have_finished = 0; + + datapoint_info_t* tmp_datapoints = (datapoint_info_t*)MY_MALLOC(ctx -> local_n_points * sizeof(datapoint_info_t)); + + // use strategy similar to the one employed in density estimation + // vertical loop, first on k the on n + // request points + // then process the points + + + MPI_Win_fence(MPI_MODE_NOPUT, win_datapoints); + //MPI_Win_lock_all(0, win_datapoints); + MPI_DB_PRINT("Using vertical loop on centers pruning\n"); + int kmax = ctx -> k; + for(idx_t j = 1; j < kmax; ++j) + { + i_have_finished = 1; + + // request points + #pragma omp parallel for schedule(dynamic) + for(idx_t p = 0; p < n; ++p) + { + + datapoint_info_t i_point = *(dp_info_ptrs[p]); + if(j < i_point.kstar + 1) + { + #pragma omp atomic write + i_have_finished = 0; + + idx_t jidx = i_point.ngbh[j].array_idx; + int owner = foreign_owner(ctx, jidx); + /* find if datapoint is halo or not */ + idx_t jpos = jidx - ctx -> rank_idx_start[owner]; + if(owner == ctx -> mpi_rank) + { + tmp_datapoints[p] = ctx -> local_datapoints[jpos]; + } + else + { + MPI_Get(tmp_datapoints + p, sizeof(datapoint_info_t), MPI_BYTE, owner, + jpos * sizeof(datapoint_info_t), sizeof(datapoint_info_t), MPI_BYTE, win_datapoints); + } + } + + } + + //MPI_Win_flush_all(win_datapoints); + MPI_Win_fence(MPI_MODE_NOPUT ^ MPI_MODE_NOSTORE, win_datapoints); + // wait for all comms to happen + + #pragma omp parallel for schedule(dynamic) + for(idx_t p = 0; p < n; ++p) + { + datapoint_info_t i_point = *(dp_info_ptrs[p]); + if(j < i_point.kstar + 1) + { + datapoint_info_t j_point = tmp_datapoints[p]; + idx_t jidx = j_point.array_idx; + + if(j_point.is_center && i_point.g > j_point.g) + { + // set the lock at position i + // actually is the p-th point + int owner = foreign_owner(ctx, jidx); + //if local process it + if(owner == ctx -> mpi_rank) + { + //acquire the lock + idx_t jpos = jidx - ctx -> idx_start; + omp_set_lock(lock_array + jpos); + if(i_point.g > to_remove_mask[jpos].value) + { + to_remove_mask[jpos].array_idx = i_point.array_idx; + to_remove_mask[jpos].value = i_point.g; + } + omp_unset_lock(lock_array + jpos); + } + ////otherwise enqueue for sending + else + { + center_removal_t element = {.rank = owner, .source_id = i_point.array_idx, + .target_id = j_point.array_idx, .source_density = i_point.g}; + enqueue_center_removal(removal_queues + p, element); + } + } + + } + + } + + //MPI_Allreduce(&i_have_finished, &all_have_finished, 1, MPI_INT, MPI_LAND, ctx -> mpi_communicator); + //if(all_have_finished) break; + if(i_have_finished) break; + } + + MPI_Win_unlock_all(win_datapoints); + MPI_Win_fence(0, win_datapoints); + free(tmp_datapoints); + +#else + //MPI_Win_unlock_all(win_datapoints); + //MPI_Win_fence(MPI_MODE_NOPUT, win_datapoints); + + idx_t n_foreign_req = 0; + idx_t n_local_req = 0; + float_t elapsed_proc = 0.;; #if !defined(THREAD_FUNNELED) - #pragma omp parallel for schedule(dynamic) + #pragma omp parallel for schedule(dynamic) reduction(+:elapsed_proc, n_local_req, n_foreign_req) #endif for(idx_t p = 0; p < n; ++p) { @@ -762,87 +901,234 @@ clusters_t Heuristic1(global_context_t *ctx) for(idx_t j = 1; j < i_point.kstar + 1; ++j) { idx_t jidx = i_point.ngbh[j].array_idx; - + struct timespec __start, __end; + clock_gettime(CLOCK_MONOTONIC,&__start); datapoint_info_t j_point = find_possibly_halo_datapoint_rma(ctx, jidx, win_datapoints); + clock_gettime(CLOCK_MONOTONIC,&__end); + elapsed_proc += (double)(__end.tv_sec - __start.tv_sec) + (__end.tv_nsec - __start.tv_nsec)/1e9; + + int owner = foreign_owner(ctx, jidx); + if(owner == ctx -> mpi_rank) + { + n_local_req++; + } + else + { + n_foreign_req++; + } if(j_point.is_center && i_point.g > j_point.g) { - /* - * - * TODO: Implement it without this but using private locks - * use an array of locks, and compare and swap to actually gain control of the thing - * - * */ - -#ifdef EXP_H1 - #pragma omp critical (h1_exp) + // set the lock at position i + // actually is the p-th point + // if local process it + if(owner == ctx -> mpi_rank) { - int owner = foreign_owner(ctx, jidx); - idx_t jpos = jidx - ctx -> rank_idx_start[owner]; + // acquire the lock + idx_t jpos = jidx - ctx -> idx_start; + omp_set_lock(lock_array + jpos); + if(i_point.g > to_remove_mask[jpos].value) + { + to_remove_mask[jpos].array_idx = i_point.array_idx; + to_remove_mask[jpos].value = i_point.g; + } + omp_unset_lock(lock_array + jpos); + } + // otherwise enqueue for sending + else + { + center_removal_t element = {.rank = owner, .source_id = i_point.array_idx, + .target_id = j_point.array_idx, .source_density = i_point.g}; + enqueue_center_removal(removal_queues + p, element); + } + } + } + } + //MPI_Win_fence(MPI_MODE_NOPUT, win_datapoints); - lock_t state = LOCK_FREE; + // MPI_Barrier(ctx -> mpi_communicator); + // DB_PRINT("Rank %d: foreign requests points %lu out of %lu -> fraction %.2lf time %.2lfs\n", + // ctx -> mpi_rank, n_foreign_req, n_local_req + n_foreign_req, (float)n_foreign_req/(float)n_local_req, elapsed_proc); + // MPI_Barrier(ctx -> mpi_communicator); +#endif - state = h1_lock_acquire(ctx, win_locks, owner, jpos, state); + //assemble arrays into a single buffer + - heap_node mask_element; - MPI_Request request; + elapsed_time = TIME_STOP; + LOG_WRITE("Finding centers to prune", elapsed_time); + TIME_START; + + idx_t tot_removal = 0; + for(idx_t p = 0; p < n; ++p) + { + tot_removal += removal_queues[p].count; + } + + idx_t buffer_idx = 0; + center_removal_t* removal_buffer = (center_removal_t*)MY_MALLOC(tot_removal * sizeof(center_removal_t)); - MPI_Rget(&mask_element, sizeof(heap_node), MPI_BYTE, - owner, jpos * sizeof(heap_node), sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request); - MPI_Wait(&request, MPI_STATUS_IGNORE); + for(idx_t p = 0; p < n; ++p) + { + if(removal_queues[p].count > 0) + { + memcpy(removal_buffer + buffer_idx, removal_queues[p].data, removal_queues[p].count * sizeof(center_removal_t)); + buffer_idx += removal_queues[p].count; + } + } - int flag = mask_element.array_idx == MY_SIZE_MAX; - if(flag || i_point.g > mask_element.value ) - { - heap_node tmp_mask_element = {.array_idx = i_point.array_idx, .value = i_point.g}; - MPI_Request request; - MPI_Rput(&tmp_mask_element, sizeof(heap_node), MPI_BYTE, owner, - jpos*sizeof(heap_node), sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request); - MPI_Wait(&request, MPI_STATUS_IGNORE); + //sort by array idx (it sorts also by rank) + + qsort(removal_buffer, tot_removal, sizeof(center_removal_t), compare_removal_by_target); - } + // remove + for(int i = 1; i < tot_removal; ++i) + { + if(removal_buffer[i - 1].rank > removal_buffer[i].rank) + printf("Unsorted removal buffer"); + } - state = h1_lock_free(ctx, win_locks, owner, jpos, state); - } -#else - #pragma omp critical (centers_elimination) - { - int owner = foreign_owner(ctx, jidx); - idx_t jpos = jidx - ctx -> rank_idx_start[owner]; + //prepare for the sendrcv + + int* recv_counts = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + int* send_counts = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); - MPI_Win_lock(MPI_LOCK_EXCLUSIVE, owner, 0, win_to_remove_mask); - heap_node mask_element; - MPI_Request request; - MPI_Rget(&mask_element, sizeof(heap_node), MPI_BYTE, - owner, jpos * sizeof(heap_node), sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request); - MPI_Wait(&request, MPI_STATUS_IGNORE); + int* recv_displs = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + int* send_displs = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); - int flag = mask_element.array_idx == MY_SIZE_MAX; - if(flag || i_point.g > mask_element.value ) - { - heap_node tmp_mask_element = {.array_idx = i_point.array_idx, .value = i_point.g}; - MPI_Request request; - MPI_Rput(&tmp_mask_element, sizeof(heap_node), MPI_BYTE, owner, - jpos*sizeof(heap_node), sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request); - MPI_Wait(&request, MPI_STATUS_IGNORE); + //zero_out_all - } + for(int i = 0; i < ctx -> mpi_rank; ++i) + { + recv_counts[i] = 0; + send_counts[i] = 0; + recv_displs[i] = 0; + send_displs[i] = 0; + } - MPI_Win_unlock(owner, win_to_remove_mask); - } -#endif + for(idx_t i = 0; i < tot_removal; ++i) + { + int rank_idx = removal_buffer[i].rank; + send_counts[rank_idx]++; + } + + // all to all to recieve counts + + MPI_Alltoall(send_counts, 1, MPI_INT, + recv_counts, 1, MPI_INT, ctx -> mpi_communicator); + + // compute displacements + + for(int i = 1; i < ctx -> world_size; ++i) + { + recv_displs[i] = recv_displs[i - 1] + recv_counts[i - 1]; + send_displs[i] = send_displs[i - 1] + send_counts[i - 1]; + } + + idx_t tot_recv_counts = 0; + + // count how many elements to recieve + // MPI_DB_PRINT("Using centers elimination queue experiment\n"); + + for(int i = 0; i < ctx -> world_size; ++i) tot_recv_counts += recv_counts[i]; + /* + if(ctx -> mpi_rank == 0){ + for(int i = 0; i < ctx -> world_size; ++i){ + DB_PRINT("%d mpi rank recv_count %d to %d\n", ctx -> mpi_rank, recv_counts[i], i); + DB_PRINT("%d mpi rank send_count %d to %d\n", ctx -> mpi_rank, send_counts[i], i); + } + } + */ + /*DB_PRINT("rank %d: %lu recv counts\n", ctx -> mpi_rank, tot_recv_counts);*/ + + // change dimensions to bytes + // + + //comm matrices + + /* DEBUG PRINT + * COMMUNICATION MATRICES */ + + /* + int* all_send = (int*)MY_MALLOC(ctx -> world_size * ctx -> world_size * sizeof(int)); + int* all_recv = (int*)MY_MALLOC(ctx -> world_size * ctx -> world_size * sizeof(int)); + + MPI_Gather(send_counts, ctx -> world_size, MPI_INT, + all_send, ctx -> world_size , MPI_INT, 0, ctx -> mpi_communicator); + + MPI_Gather(recv_counts, ctx -> world_size, MPI_INT, + all_recv, ctx -> world_size , MPI_INT, 0, ctx -> mpi_communicator); + + if(I_AM_MASTER) + { + printf("Send matrix\n"); + for(int i = 0; i < ctx -> world_size; ++i) + { + for(int j = 0; j < ctx -> world_size; ++j) + { + printf("%4d ", all_send[i*ctx -> world_size + j]); } + printf("\n"); + } + + printf("Recv matrix\n"); + for(int i = 0; i < ctx -> world_size; ++i) + { + for(int j = 0; j < ctx -> world_size; ++j) + { + printf("%4d ", all_recv[i*ctx -> world_size + j]); + } + printf("\n"); } } -#ifdef EXP_H1 - MPI_Win_unlock_all(win_to_remove_mask); - MPI_Win_unlock_all(win_locks); -#endif + free(all_recv); + free(all_send); + + */ + + + for(int i = 0; i < ctx -> world_size; ++i) + { + recv_counts[i] = recv_counts[i] * sizeof(center_removal_t); + send_counts[i] = send_counts[i] * sizeof(center_removal_t); + recv_displs[i] = recv_displs[i] * sizeof(center_removal_t); + send_displs[i] = send_displs[i] * sizeof(center_removal_t); + } + + //allocate buffer to recieve center elminiations - MPI_Win_fence(0, win_to_remove_mask); - MPI_Win_fence(0, win_locks); - MPI_Barrier(ctx -> mpi_communicator); + + center_removal_t* recv_removals = (center_removal_t*)MY_MALLOC(tot_recv_counts * sizeof(center_removal_t)); + + // all to all + MPI_Alltoallv(removal_buffer, send_counts, send_displs, MPI_BYTE, + recv_removals, recv_counts, recv_displs, MPI_BYTE, ctx -> mpi_communicator); + + // merge into the mask + + elapsed_time = TIME_STOP; + LOG_WRITE("Communicating eliminations", elapsed_time); + TIME_START; + + #pragma omp parallel for schedule(dynamic) + for(idx_t i = 0; i < tot_recv_counts; ++i) + { + idx_t el_pos = recv_removals[i].target_id - ctx -> idx_start; + int owner = foreign_owner(ctx, recv_removals[i].target_id); + if(owner != ctx -> mpi_rank){ + printf("Error here\n"); + exit(1); + } + omp_set_lock(lock_array + el_pos); + if(recv_removals[i].source_density > to_remove_mask[el_pos].value) + { + to_remove_mask[el_pos].array_idx = recv_removals[i].source_id; + to_remove_mask[el_pos].value = recv_removals[i].source_density; + } + omp_unset_lock(lock_array + el_pos); + } + /* populate the usual arrays */ for(idx_t p = 0; p < all_centers.count; ++p) @@ -888,11 +1174,29 @@ clusters_t Heuristic1(global_context_t *ctx) } } - MPI_Win_free(&win_to_remove_mask); free(to_remove_mask); - - MPI_Win_free(&win_locks); - free(lock_array); + + for(idx_t i = 0; i < n; ++i) + { + removal_queues[i].count = 0; + removal_queues[i].size = 0; + free(removal_queues[i].data); + omp_destroy_lock(lock_array+ i); + } + + free(removal_queues); + free(removal_buffer); + free(send_counts); + free(send_displs); + free(recv_counts); + free(recv_displs); + free(lock_array); + free(recv_removals); + + elapsed_time = TIME_STOP; + LOG_WRITE("Merging", elapsed_time); + + TIME_START; int n_centers = (int)actual_centers.count; int tot_centers; @@ -900,9 +1204,11 @@ clusters_t Heuristic1(global_context_t *ctx) MPI_DB_PRINT("Found %d temporary centers\n", tot_centers); - /* bring on master all centers + /** + * bring on master all centers * order them in ascending order of density, - * then re-scatter them around to get unique cluster labels */ + * then re-scatter them around to get unique cluster labels + **/ center_t* private_centers_buffer = (center_t*)MY_MALLOC(actual_centers.count * sizeof(center_t)); center_t* global_centers_buffer = (center_t*)MY_MALLOC(tot_centers * sizeof(center_t)); @@ -1055,6 +1361,8 @@ clusters_t Heuristic1(global_context_t *ctx) free(ks); #endif + elapsed_time = TIME_STOP; + LOG_WRITE("Cluster assign", elapsed_time); free(actual_centers.data); actual_centers.size = tot_centers; diff --git a/src/adp/adp.h b/src/adp/adp.h index 4db27b2d8934a761dabecae7ab6973e231146f11..e06f0bf01785c793b7296bfedbcdb360603c5fe8 100644 --- a/src/adp/adp.h +++ b/src/adp/adp.h @@ -45,6 +45,21 @@ typedef struct merge_t float_t density; } merge_t; +typedef struct center_removal_t +{ + int rank; + idx_t source_id; + idx_t target_id; + float_t source_density; +} center_removal_t; + +typedef struct center_removal_queue_t +{ + center_removal_t* data; + idx_t count; + idx_t size; +} center_removal_queue_t; + void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int verbose); diff --git a/src/common/common.c b/src/common/common.c index 71f57f58f39e30ee4148e09479826868a77b5c00..49ad8204d75718c6a502e3a3b00acbaeba369147 100644 --- a/src/common/common.c +++ b/src/common/common.c @@ -21,6 +21,8 @@ void get_context(global_context_t* ctx) ctx -> __local_heap_buffers = NULL; ctx -> input_data_in_float32 = -1; ctx -> dims = 0; + ctx -> k = 300; + ctx -> z = 3; } void get_dataset_diagnostics(global_context_t* ctx, float_t* data) @@ -42,7 +44,7 @@ void get_dataset_diagnostics(global_context_t* ctx, float_t* data) for(int j = 0; j < ctx -> dims; ++j) pvt_mean[j] = 0.; #pragma omp for - for(int i = 0; i < ctx -> n_points; ++i) + for(idx_t i = 0; i < ctx -> n_points; ++i) { int j = 0; for(j = 0; j < jmax; j+=4) @@ -79,7 +81,7 @@ void get_dataset_diagnostics(global_context_t* ctx, float_t* data) for(int j = 0; j < ctx -> dims; ++j) pvt_var[j] = 0.; #pragma omp for - for(int i = 0; i < ctx -> n_points; ++i) + for(idx_t i = 0; i < ctx -> n_points; ++i) { int j = 0; for(j = 0; j < jmax; j+=4) diff --git a/src/common/common.h b/src/common/common.h index d9bdc7e799df5bcc60a1a8488d7a05002e168beb..de3ee63ee91ee2596ec6bad3b4d7030b44aaa80e 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -45,9 +45,11 @@ #define MY_TRUE 1 #define MY_FALSE 0 -#define CHECK_ALLOCATION(x) if(!x){printf("[!!!] %d rank encountered failed allocation at line %s \n", ctx -> mpi_rank, __LINE__ ); exit(1);}; +#define HERE printf("%d in file %s reached line %d\n", ctx -> mpi_rank, __FILE__, __LINE__); MPI_Barrier(ctx -> mpi_communicator); -#define CHECK_ALLOCATION_NO_CTX(x) if(!x){printf("[!!!] Failed allocation at line %d \n", __LINE__ ); exit(1);} +#define CHECK_ALLOCATION(x) if(!x){printf("[!!!] %d rank encountered failed allocation: %s at line %s \n", ctx -> mpi_rank, __FILE__, __LINE__ ); exit(1);}; + +#define CHECK_ALLOCATION_NO_CTX(x) if(!x){printf("[!!!] Failed allocation: %s at line %d \n", __FILE__, __LINE__ ); exit(1);} #define MY_MALLOC(n) ({void* p = calloc(n,1); CHECK_ALLOCATION_NO_CTX(p); p; }) #define DB_PRINT(...) printf(__VA_ARGS__) @@ -149,6 +151,7 @@ typedef struct global_context_t int mpi_rank; //rank of the processor uint32_t dims; //number of dimensions of the dataset idx_t k; //number of neighbors + float_t z; //z parameter float_t* local_data; //pointer to the dataset stored into the node float_t* lb_box; //bounding box of the dataset float_t* ub_box; //bounding box of the dataset diff --git a/src/main/main.c b/src/main/main.c index 3f3d8396418342a23acb5e31a4998869a480fdeb..e6153b1c839b2c7bbaba2ee5f7733f00e8a810be 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -44,6 +44,8 @@ struct option long_options[] = {"in-dims" , required_argument, 0, 'd'}, {"out-data", optional_argument, 0, 'o'}, {"out-assignment", optional_argument, 0, 'a'}, + {"kngbh", optional_argument, 0, 'k'}, + {"z", optional_argument, 0, 'z'}, {"help", optional_argument, 0, 'h'}, {0, 0, 0, 0} }; @@ -59,7 +61,9 @@ const char* help = "Distributed Advanced Density Peak\n"\ " mpi tasks and datapoints are ordered default is `out_data` \n"\ " -a --out-assignment (optional) output path for the cluster assignment output ranges [0 ... Nc - 1]\n"\ " for core points halo points have indices [-Nc ... -1] conversion\n"\ - " of idx for an halo point is cluster_idx = -halo_idx - 1, default is `out_assignment`\n"; + " of idx for an halo point is cluster_idx = -halo_idx - 1, default is `out_assignment`\n"\ + " -k --kngbh (optional) number of nearest neighbors to compute\n"\ + " -z --z (optional) number of nearest neighbors to compute\n"; void parse_args(global_context_t* ctx, int argc, char** argv) { @@ -70,7 +74,7 @@ void parse_args(global_context_t* ctx, int argc, char** argv) snprintf(ctx -> output_assignment_file, DEFAULT_STR_LEN, "%s", OUT_CLUSTER_ASSIGN); snprintf(ctx -> output_data_file, DEFAULT_STR_LEN, "%s", OUT_DATA); - while((opt = getopt_long(argc, argv, "i:t:d:o:a:h", long_options, NULL)) != -1) + while((opt = getopt_long(argc, argv, "i:t:d:o:a:k:z:h", long_options, NULL)) != -1) { switch(opt) { @@ -103,10 +107,17 @@ void parse_args(global_context_t* ctx, int argc, char** argv) case 'a': strncpy(ctx -> output_assignment_file, optarg, DEFAULT_STR_LEN); break; + case 'k': + ctx -> k = atoi(optarg); + break; + case 'z': + ctx -> z = atof(optarg); + break; case 'h': mpi_printf(ctx, "%s\n", help); MPI_Finalize(); exit(0); + default: mpi_printf(ctx, "%s\n", help); MPI_Finalize(); @@ -121,6 +132,42 @@ void parse_args(global_context_t* ctx, int argc, char** argv) if(err){MPI_Finalize(); exit(1);}; } +void print_hello(global_context_t* ctx) +{ + char * hello = "" +" _ _ \n" +" __| | __ _ __| |_ __ \n" +" / _` |/ _` |/ _` | '_ \\ \n" +"| (_| | (_| | (_| | |_) | \n" +" \\__,_|\\__,_|\\__,_| .__/ \n" +" |_| \n" +" Distributed Advanced Density Peak"; + mpi_printf(ctx, "%s\n\n", hello); + + #if defined (_OPENMP) + mpi_printf(ctx,"Running Hybrid (Openmp + MPI) code\n"); + #else + mpi_printf(ctx,"Running pure MPI code\n"); + #endif + + #if defined (THREAD_FUNNELED) + mpi_printf(ctx,"/!\\ Code built with MPI_THREAD_FUNNELED level\n"); + #else + mpi_printf(ctx,"/!\\ Code built with MPI_THREAD_MULTIPLE level\n"); + #endif + + mpi_printf(ctx, "\nConfigs: \n"); + mpi_printf(ctx, "Data file .............> %s\n", ctx -> input_data_file); + mpi_printf(ctx, "Input Type .............> float%d\n", ctx -> input_data_in_float32 ? 32 : 64); + mpi_printf(ctx, "Dimensions .............> %d\n", ctx -> dims); + mpi_printf(ctx, "Output data file .......> %s\n", ctx -> output_data_file); + mpi_printf(ctx, "Output assignment file -> %s\n", ctx -> output_assignment_file); + mpi_printf(ctx, "k ......................> %lu\n", ctx -> k); + mpi_printf(ctx, "Z ......................> %.2lf\n", ctx -> z); + mpi_printf(ctx, "\nRUNNING!\n"); + +} + int main(int argc, char** argv) { #if defined (_OPENMP) int mpi_provided_thread_level; @@ -160,32 +207,25 @@ int main(int argc, char** argv) { ctx.mpi_communicator = MPI_COMM_WORLD; get_context(&ctx); - #if defined (_OPENMP) - mpi_printf(&ctx,"Running Hybrid (Openmp + MPI) code\n"); - #else - mpi_printf(&ctx,"Running pure MPI code\n"); - #endif - - #if defined (THREAD_FUNNELED) - mpi_printf(&ctx,"/!\\ Code built with MPI_THREAD_FUNNELED level\n"); - #else - mpi_printf(&ctx,"/!\\ Code built with MPI_THREAD_MULTIPLE level\n"); - #endif /* * Mock reading some files, one for each processor */ int d = 5; + int k = 300; float_t* data; //parse command line parse_args(&ctx, argc, argv); + print_hello(&ctx); /* * Generate a random matrix of lenght of some kind */ + + if(ctx.mpi_rank == 0) { simulate_master_read_and_scatter(5, 1000000, &ctx); @@ -211,10 +251,8 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) TIME_DEF double elapsed_time; - float_t z = 3; - int halo = MY_TRUE; + int halo = MY_FALSE; float_t tol = 0.002; - int k = 300; if(I_AM_MASTER && ctx -> world_size <= 6) { @@ -231,7 +269,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) TIME_START; if (ctx->mpi_rank == 0) { - data = read_data_file(ctx,ctx -> input_data_file, ctx -> dims, ctx -> input_data_in_float32); + data = read_data_file(ctx, ctx -> input_data_file, ctx -> dims, ctx -> input_data_in_float32); get_dataset_diagnostics(ctx, data); } @@ -343,7 +381,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) dp_info[i].array_idx = -1; dp_info[i].kstar = -1; dp_info[i].is_center = -1; - dp_info[i].cluster_idx = -1; + dp_info[i].cluster_idx = -1717171717; //dp_info[i].halo_flag = 0; } ctx -> local_datapoints = dp_info; @@ -356,7 +394,8 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) MPI_DB_PRINT("----- Performing ngbh search -----\n"); MPI_Barrier(ctx -> mpi_communicator); - mpi_ngbh_search(ctx, dp_info, &tree, &local_tree, ctx -> local_data, k); + HERE + mpi_ngbh_search(ctx, dp_info, &tree, &local_tree, ctx -> local_data, ctx -> k); MPI_Barrier(ctx -> mpi_communicator); elapsed_time = TIME_STOP; @@ -376,7 +415,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) compute_density_kstarnn_rma_v2(ctx, id, MY_FALSE); - compute_correction(ctx, z); + compute_correction(ctx, ctx -> z); elapsed_time = TIME_STOP; LOG_WRITE("Density estimate", elapsed_time) @@ -393,7 +432,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) TIME_START; - Heuristic3(ctx, &clusters, z, halo); + Heuristic3(ctx, &clusters, ctx -> z, halo); elapsed_time = TIME_STOP; LOG_WRITE("H3", elapsed_time) @@ -402,7 +441,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) int* cl = (int*)MY_MALLOC(ctx -> local_n_points * sizeof(int)); for(int i = 0; i < ctx -> local_n_points; ++i) cl[i] = ctx -> local_datapoints[i].cluster_idx; - if(ctx -> world_size <= 6) + if(ctx -> world_size <= 32) { big_ordered_buffer_to_file(ctx, cl, sizeof(int), ctx -> local_n_points, ctx -> output_assignment_file); big_ordered_buffer_to_file(ctx, ctx -> local_data, sizeof(double), ctx -> local_n_points * ctx -> dims, ctx -> output_data_file); diff --git a/src/tree/heap.h b/src/tree/heap.h index 9d86491c933d1c7e97a6f86224542520026b3484..117ff8c95e982279c5c77d60f901898f16147789 100644 --- a/src/tree/heap.h +++ b/src/tree/heap.h @@ -8,7 +8,6 @@ #define T double #define DATA_DIMS 0 -#define HERE printf("%d reached line %d\n", ctx -> mpi_rank, __LINE__); MPI_Barrier(ctx -> mpi_communicator); #ifdef USE_FLOAT32 #define FLOAT_TYPE float diff --git a/src/tree/kdtreeV2.c b/src/tree/kdtreeV2.c index 3547975ea8c13d49c359583429cc3b9384b57577..f7b5e7da8c3b041b4eeda67ebb3c6e8fdcfc18a3 100644 --- a/src/tree/kdtreeV2.c +++ b/src/tree/kdtreeV2.c @@ -4,92 +4,87 @@ #include #include -#define DEFAULT_LEAF_SIZE 30 - +#define DEFAULT_LEAF_SIZE 30 extern unsigned int data_dims; -FLOAT_TYPE eud_kdtree_v2(FLOAT_TYPE* restrict p1, FLOAT_TYPE* restrict p2){ - register FLOAT_TYPE d = 0; - for(unsigned int i = 0; i data, sizeof(FLOAT_TYPE)*data_dims); - memcpy(x -> data, y -> data, sizeof(FLOAT_TYPE)*data_dims); - memcpy(y -> data, swapMem_kdv2, sizeof(FLOAT_TYPE)*data_dims); - FLOAT_TYPE* tmpPtr = x -> data; - x -> data = y -> data; - y -> data = tmpPtr; - #endif + kdnode_v2 tmp; + tmp = *x; + *x = *y; + *y = tmp; + +#ifdef SWMEM + memcpy(swapMem_kdv2, x->data, sizeof(FLOAT_TYPE) * data_dims); + memcpy(x->data, y->data, sizeof(FLOAT_TYPE) * data_dims); + memcpy(y->data, swapMem_kdv2, sizeof(FLOAT_TYPE) * data_dims); + FLOAT_TYPE *tmpPtr = x->data; + x->data = y->data; + y->data = tmpPtr; +#endif } - /** - * - * KDtree implementation - * - * -*/ - -void initialize_kdnodes_v2(kdnode_v2* node_array, FLOAT_TYPE* d, idx_t n ) -{ - for(idx_t i = 0; i < n; ++i) - { - node_array[i].data = d + (i*data_dims); - node_array[i].array_idx = i; - node_array[i].lch = NULL; - node_array[i].rch = NULL; - node_array[i].parent = NULL; - node_array[i].level = -1; - node_array[i].is_leaf = 0; - node_array[i].split_var = -1; - node_array[i].node_list.data = NULL; - node_array[i].node_list.count = 0; - } -} + * + * KDtree implementation + * + * + */ + +void initialize_kdnodes_v2(kdnode_v2 *node_array, FLOAT_TYPE *d, idx_t n) { + for (idx_t i = 0; i < n; ++i) { + node_array[i].data = d + (i * data_dims); + node_array[i].array_idx = i; + node_array[i].lch = NULL; + node_array[i].rch = NULL; + node_array[i].parent = NULL; + node_array[i].level = -1; + node_array[i].is_leaf = 0; + node_array[i].split_var = -1; + node_array[i].node_list.data = NULL; + node_array[i].node_list.count = 0; + } +} /* void initializePTRS(kdnode_v2** node_ptr_array, kdnode_v2* node_array, idx_t n ) { @@ -100,300 +95,284 @@ void initializePTRS(kdnode_v2** node_ptr_array, kdnode_v2* node_array, idx_t n ) } */ -int cmpKDnodesV2(kdnode_v2* a, kdnode_v2* b, int var){ - - FLOAT_TYPE res = a->data[var] - b->data[var]; - return (res > 0); +int cmpKDnodesV2(kdnode_v2 *a, kdnode_v2 *b, int var) { + + FLOAT_TYPE res = a->data[var] - b->data[var]; + return (res > 0); } -void printKDnodeV2(kdnode_v2* node) -{ - printf("Node %p:\n",node); - printf("\t array_idx: %lu\n", (uint64_t)(node -> array_idx)); - printf("\t data: "); - for(unsigned int i=0; idata[i]); - printf("\n"); - printf("\t parent: %p\n",node -> parent); - printf("\t lch: %p\n",node -> lch); - printf("\t rch: %p\n",node -> rch); - printf("\t level: %d\n", node -> level); - printf("\t split_var: %d\n", node -> split_var); - printf("\n"); +void printKDnodeV2(kdnode_v2 *node) { + printf("Node %p:\n", node); + printf("\t array_idx: %lu\n", (uint64_t)(node->array_idx)); + printf("\t data: "); + for (unsigned int i = 0; i < data_dims; ++i) + printf(" %f ", node->data[i]); + printf("\n"); + printf("\t parent: %p\n", node->parent); + printf("\t lch: %p\n", node->lch); + printf("\t rch: %p\n", node->rch); + printf("\t level: %d\n", node->level); + printf("\t split_var: %d\n", node->split_var); + printf("\n"); } // Standard Lomuto partition function -int partition_kdnode_v2(kdnode_v2* arr, int low, int high, int split_var) -{ - kdnode_v2 pivot = arr[high]; - - int i = (low - 1); - for (int j = low; j <= high - 1; j++) { - if (!cmpKDnodesV2(arr + j,&pivot,split_var)) { - i++; - swap_kdnode_v2(arr + i, arr + j); - } +int partition_kdnode_v2(kdnode_v2 *arr, int low, int high, int split_var) { + kdnode_v2 pivot = arr[high]; + + int i = (low - 1); + for (int j = low; j <= high - 1; j++) { + if (!cmpKDnodesV2(arr + j, &pivot, split_var)) { + i++; + swap_kdnode_v2(arr + i, arr + j); } - swap_kdnode_v2(arr + i + 1, arr + high); - return (i + 1); + } + swap_kdnode_v2(arr + i + 1, arr + high); + return (i + 1); } // Implementation of QuickSelect -int median_of_nodes_kdnode_v2(kdnode_v2* a, int left, int right, int split_var) -{ - //printf("----------\n"); - int k = left + ((right - left + 1)/2); - - if(left == right) return left; - if(left == (right - 1)){ - if(cmpKDnodesV2(a + left,a + right,split_var)) {swap_kdnode_v2(a + left, a + right);} - return right; - } - while (left <= right) { - - // Partition a[left..right] around a pivot - // and find the position of the pivot - int pivotIndex = partition_kdnode_v2(a, left, right,split_var); - //printf("%d %d %d %d\n",left, right, k, pivotIndex); - - // If pivot itself is the k-th smallest element - if (pivotIndex == k) - return pivotIndex; - - // If there are more than k-1 elements on - // left of pivot, then k-th smallest must be - // on left side. - - else if (pivotIndex > k) - right = pivotIndex - 1; - - // Else k-th smallest is on right side. - else - left = pivotIndex + 1; +int median_of_nodes_kdnode_v2(kdnode_v2 *a, int left, int right, + int split_var) { + // printf("----------\n"); + int k = left + ((right - left + 1) / 2); + + if (left == right) + return left; + if (left == (right - 1)) { + if (cmpKDnodesV2(a + left, a + right, split_var)) { + swap_kdnode_v2(a + left, a + right); } - return -1; + return right; + } + while (left <= right) { + + // Partition a[left..right] around a pivot + // and find the position of the pivot + int pivotIndex = partition_kdnode_v2(a, left, right, split_var); + // printf("%d %d %d %d\n",left, right, k, pivotIndex); + + // If pivot itself is the k-th smallest element + if (pivotIndex == k) + return pivotIndex; + + // If there are more than k-1 elements on + // left of pivot, then k-th smallest must be + // on left side. + + else if (pivotIndex > k) + right = pivotIndex - 1; + + // Else k-th smallest is on right side. + else + left = pivotIndex + 1; + } + return -1; } -kdnode_v2* make_tree_kdnode_v2(kdnode_v2* t, int start, int end, kdnode_v2* parent, int level) -{ - kdnode_v2 *n = NULL; - int split_var = level % data_dims; - FLOAT_TYPE max_diff = -999999.; - for(unsigned int v = 0; v < data_dims; ++v) - { - FLOAT_TYPE max_v = -9999999.; - FLOAT_TYPE min_v = 9999999.; - for(int i = start; i <= end; ++i) - { - max_v = t[i].data[v] > max_v ? t[i].data[v] : max_v; - min_v = t[i].data[v] < min_v ? t[i].data[v] : min_v; - } - if((max_v - min_v) > max_diff) - { - max_diff = max_v - min_v; - split_var = v; - } +kdnode_v2 *make_tree_kdnode_v2(kdnode_v2 *t, int start, int end, + kdnode_v2 *parent, int level) { + kdnode_v2 *n = NULL; + int split_var = level % data_dims; + FLOAT_TYPE max_diff = -999999.; + for (unsigned int v = 0; v < data_dims; ++v) { + FLOAT_TYPE max_v = -9999999.; + FLOAT_TYPE min_v = 9999999.; + for (int i = start; i <= end; ++i) { + max_v = t[i].data[v] > max_v ? t[i].data[v] : max_v; + min_v = t[i].data[v] < min_v ? t[i].data[v] : min_v; } - - /* - #ifdef SWMEM - if(parent == NULL) - { - swapMem_kdv2 = (FLOAT_TYPE*)malloc(sizeof(FLOAT_TYPE)*data_dims); - } - #endif - */ - - - - if(end - start < DEFAULT_LEAF_SIZE) - { - n = t + start; - n -> is_leaf = 1; - n -> parent = parent; - n -> lch = NULL; - n -> rch = NULL; - size_t j = 0; - n -> node_list.count = (size_t)(end - start + 1); - n -> node_list.data = (kdnode_v2**)malloc(n -> node_list.count * sizeof(kdnode_v2*)); - for(int i = start; i <= end; ++i){ - t[i].parent = n; - t[i].is_leaf = 1; - t[i].lch = NULL; - t[i].rch = NULL; - n -> node_list.data[j] = t + i; - ++j; - } - return n; - + if ((max_v - min_v) > max_diff) { + max_diff = max_v - min_v; + split_var = v; } - - int median_idx = -1; - - //if ((end - start) < 0) return 0; - //if (end == start) { - // n = t + start; - // n -> split_var = split_var; - // n->parent = parent; - // n->level = level; - // n -> lch = NULL; - // n -> rch = NULL; - // return n; - //} - - median_idx = median_of_nodes_kdnode_v2(t, start, end, split_var); - //printf("%d median idx\n", median_idx); - if(median_idx > -1){ - swap_kdnode_v2(t + start, t + median_idx); - //n = t + median_idx; - n = t + start; - //n->lch = make_tree_kdnode_v2(t, start, median_idx - 1, n, level + 1); - n->lch = make_tree_kdnode_v2(t, start + 1, median_idx, n, level + 1); - //n->rch = make_tree_kdnode_v2(t, median_idx + 1, end, n, level + 1); - n->rch = make_tree_kdnode_v2(t, median_idx + 1, end, n, level + 1); - n -> split_var = split_var; - n->parent = parent; - n->level = level; + } + + /* + #ifdef SWMEM + if(parent == NULL) + { + swapMem_kdv2 = (FLOAT_TYPE*)malloc(sizeof(FLOAT_TYPE)*data_dims); + } + #endif + */ + + if (end - start < DEFAULT_LEAF_SIZE) { + n = t + start; + n->is_leaf = 1; + n->parent = parent; + n->lch = NULL; + n->rch = NULL; + size_t j = 0; + n->node_list.count = (size_t)(end - start + 1); + n->node_list.data = + (kdnode_v2 **)malloc(n->node_list.count * sizeof(kdnode_v2 *)); + for (int i = start; i <= end; ++i) { + t[i].parent = n; + t[i].is_leaf = 1; + t[i].lch = NULL; + t[i].rch = NULL; + n->node_list.data[j] = t + i; + ++j; } - - /* - #ifdef SWMEM - if(parent == NULL) - { - swapMem_kdv2 = malloc(sizeof(FLOAT_TYPE)*data_dims); - } - #endif - */ - return n; + } + + int median_idx = -1; + + // if ((end - start) < 0) return 0; + // if (end == start) { + // n = t + start; + // n -> split_var = split_var; + // n->parent = parent; + // n->level = level; + // n -> lch = NULL; + // n -> rch = NULL; + // return n; + // } + + median_idx = median_of_nodes_kdnode_v2(t, start, end, split_var); + // printf("%d median idx\n", median_idx); + if (median_idx > -1) { + swap_kdnode_v2(t + start, t + median_idx); + // n = t + median_idx; + n = t + start; + // n->lch = make_tree_kdnode_v2(t, start, median_idx - 1, n, level + 1); + n->lch = make_tree_kdnode_v2(t, start + 1, median_idx, n, level + 1); + // n->rch = make_tree_kdnode_v2(t, median_idx + 1, end, n, level + 1); + n->rch = make_tree_kdnode_v2(t, median_idx + 1, end, n, level + 1); + n->split_var = split_var; + n->parent = parent; + n->level = level; + } + + /* + #ifdef SWMEM + if(parent == NULL) + { + swapMem_kdv2 = malloc(sizeof(FLOAT_TYPE)*data_dims); + } + #endif + */ + + return n; } -static inline FLOAT_TYPE hyper_plane_dist(FLOAT_TYPE* p1, FLOAT_TYPE* p2, int var) -{ - return p1[var] - p2[var]; +static inline FLOAT_TYPE hyper_plane_dist(FLOAT_TYPE *p1, FLOAT_TYPE *p2, + int var) { + return p1[var] - p2[var]; } -static inline int hyper_plane_side(FLOAT_TYPE* p1, FLOAT_TYPE* p2, int var) -{ - return p1[var] > p2[var]; +static inline int hyper_plane_side(FLOAT_TYPE *p1, FLOAT_TYPE *p2, int var) { + return p1[var] > p2[var]; } -void knn_sub_tree_search_kdtree_v2(FLOAT_TYPE* point, kdnode_v2* root, heap * H) -{ - if(root -> is_leaf) - { - for(size_t i = 0; i < root -> node_list.count; ++i) - { - kdnode_v2* n = root -> node_list.data[i]; - //__builtin_prefetch(root -> node_list.data + i + 1, 0, 3); - FLOAT_TYPE distance = eud_kdtree_v2(point, n -> data); - insert_max_heap(H, distance,n -> array_idx); - } - return; +void knn_sub_tree_search_kdtree_v2(FLOAT_TYPE *point, kdnode_v2 *root, + heap *H) { + if (root->is_leaf) { + for (size_t i = 0; i < root->node_list.count; ++i) { + kdnode_v2 *n = root->node_list.data[i]; + //__builtin_prefetch(root -> node_list.data + i + 1, 0, 3); + FLOAT_TYPE distance = eud_kdtree_v2(point, n->data); + insert_max_heap(H, distance, n->array_idx); } + return; + } - FLOAT_TYPE current_distance = eud_kdtree_v2(point, root -> data); - FLOAT_TYPE hp_distance = hyper_plane_dist(point, root -> data, root -> split_var); - insert_max_heap(H, current_distance, root -> array_idx); - __builtin_prefetch(root -> lch, 0, 3); - __builtin_prefetch(root -> rch, 0, 3); + FLOAT_TYPE current_distance = eud_kdtree_v2(point, root->data); + FLOAT_TYPE hp_distance = hyper_plane_dist(point, root->data, root->split_var); + insert_max_heap(H, current_distance, root->array_idx); + __builtin_prefetch(root->lch, 0, 3); + __builtin_prefetch(root->rch, 0, 3); - int side = hp_distance > 0.f; + int side = hp_distance > 0.f; - switch (side) - { - case HP_LEFT_SIDE: - if(root -> lch) - { - knn_sub_tree_search_kdtree_v2(point, root -> lch, H); - } - break; - - case HP_RIGHT_SIDE: - if(root -> rch) - { - knn_sub_tree_search_kdtree_v2(point, root -> rch, H); - } - break; - - default: - break; + switch (side) { + case HP_LEFT_SIDE: + if (root->lch) { + knn_sub_tree_search_kdtree_v2(point, root->lch, H); } - FLOAT_TYPE max_d = H -> data[0].value; - int c = max_d > (hp_distance * hp_distance); + break; - if(c || (H -> count) < (H -> N)) - //if(c) - { - - switch (side) - { - case HP_LEFT_SIDE: - if(root -> rch) - { - knn_sub_tree_search_kdtree_v2(point, root -> rch, H); - } - break; - - case HP_RIGHT_SIDE: - if(root -> lch) - { - knn_sub_tree_search_kdtree_v2(point, root -> lch, H); - } - break; - - default: - break; - } + case HP_RIGHT_SIDE: + if (root->rch) { + knn_sub_tree_search_kdtree_v2(point, root->rch, H); } - return; + break; + + default: + break; + } + FLOAT_TYPE max_d = H->data[0].value; + int c = max_d > (hp_distance * hp_distance); + + if (c || (H->count) < (H->N)) + // if(c) + { + + switch (side) { + case HP_LEFT_SIDE: + if (root->rch) { + knn_sub_tree_search_kdtree_v2(point, root->rch, H); + } + break; + + case HP_RIGHT_SIDE: + if (root->lch) { + knn_sub_tree_search_kdtree_v2(point, root->lch, H); + } + break; + + default: + break; + } + } + return; } -void kdtree_v2_init(kdtree_v2* tree, FLOAT_TYPE* data, size_t n_nodes, unsigned int dims) -{ - data_dims = dims; - tree -> n_nodes = n_nodes; - tree -> _nodes = (kdnode_v2*)malloc(n_nodes * sizeof(kdnode_v2)); - initialize_kdnodes_v2(tree -> _nodes, data, n_nodes); - tree -> root = NULL; +void kdtree_v2_init(kdtree_v2 *tree, FLOAT_TYPE *data, size_t n_nodes, + unsigned int dims) { + data_dims = dims; + tree->n_nodes = n_nodes; + tree->_nodes = (kdnode_v2 *)malloc(n_nodes * sizeof(kdnode_v2)); + initialize_kdnodes_v2(tree->_nodes, data, n_nodes); + tree->root = NULL; } -void kdtree_v2_free(kdtree_v2* tree) -{ - for(uint64_t i = 0; i < tree->n_nodes; ++i) - if(tree -> _nodes[i].node_list.data) free(tree -> _nodes[i].node_list.data); +void kdtree_v2_free(kdtree_v2 *tree) { + for (uint64_t i = 0; i < tree->n_nodes; ++i) + if (tree->_nodes[i].node_list.data) + free(tree->_nodes[i].node_list.data); - free(tree -> _nodes); + free(tree->_nodes); } - -heap knn_kdtree_v2(FLOAT_TYPE* point, kdnode_v2* kdtree_root, int maxk) -{ - heap H; - allocate_heap(&H,maxk); - init_heap(&H); - knn_sub_tree_search_kdtree_v2(point, kdtree_root,&H); - heap_sort(&H); - return H; +heap knn_kdtree_v2(FLOAT_TYPE *point, kdnode_v2 *kdtree_root, int maxk) { + heap H; + allocate_heap(&H, maxk); + init_heap(&H); + knn_sub_tree_search_kdtree_v2(point, kdtree_root, &H); + heap_sort(&H); + return H; } -heap knn_kdtree_v2_no_heapsort(FLOAT_TYPE* point, kdnode_v2* kdtree_root, int maxk) -{ - heap H; - allocate_heap(&H,maxk); - init_heap(&H); - knn_sub_tree_search_kdtree_v2(point, kdtree_root,&H); - return H; +heap knn_kdtree_v2_no_heapsort(FLOAT_TYPE *point, kdnode_v2 *kdtree_root, + int maxk) { + heap H; + allocate_heap(&H, maxk); + init_heap(&H); + knn_sub_tree_search_kdtree_v2(point, kdtree_root, &H); + return H; } -kdnode_v2 * build_tree_kdtree_v2(kdnode_v2* kd_ptrs, size_t n, size_t dimensions ) -{ - /************************************************* - * Wrapper for make_tree function. * - * Simplifies interfaces and takes time measures * - *************************************************/ - data_dims = dimensions; - kdnode_v2* root = make_tree_kdnode_v2(kd_ptrs, 0, n-1, NULL ,0); - return root; - +kdnode_v2 *build_tree_kdtree_v2(kdnode_v2 *kd_ptrs, size_t n, + size_t dimensions) { + /************************************************* + * Wrapper for make_tree function. * + * Simplifies interfaces and takes time measures * + *************************************************/ + data_dims = dimensions; + kdnode_v2 *root = make_tree_kdnode_v2(kd_ptrs, 0, n - 1, NULL, 0); + return root; } diff --git a/src/tree/tree.c b/src/tree/tree.c index 876dd85c5c9ebe40f0093ee2f5d2d095c97bd4da..9ff820b165c5542d2517bb31b0d9d5136c41b49f 100644 --- a/src/tree/tree.c +++ b/src/tree/tree.c @@ -20,7 +20,6 @@ #include - /* * Maximum bytes to send with a single mpi send/recv, used * while communicating results of ngbh search @@ -51,9 +50,9 @@ int cmp_float_t(const void* a, const void* b) /* quickselect for an element along a dimension */ -void swap_data_element(float_t *a, float_t *b, int vec_len) { +void swap_data_element(float_t *a, float_t *b, size_t vec_len) { float_t tmp; - for (int i = 0; i < vec_len; ++i) + for (size_t i = 0; i < vec_len; ++i) { tmp = a[i]; a[i] = b[i]; @@ -249,15 +248,15 @@ float_t check_pc_pointset_parallel(global_context_t *ctx, pointset_t *ps, guess_ * gather on master all data * perform the count on master */ - int pvt_count = 0; - for (int i = 0; i < ps->n_points; ++i) + size_t pvt_count = 0; + for (size_t i = 0; i < ps->n_points; ++i) { pvt_count += ps->data[i * ps->dims + d] <= g.x_guess; } - int pvt_n_and_tot[2] = {pvt_count, ps->n_points}; - int tot_count[2]; - MPI_Allreduce(pvt_n_and_tot, tot_count, 2, MPI_INT, MPI_SUM, ctx->mpi_communicator); + size_t pvt_n_and_tot[2] = {pvt_count, ps->n_points}; + size_t tot_count[2]; + MPI_Allreduce(pvt_n_and_tot, tot_count, 2, MPI_UINT64_T, MPI_SUM, ctx->mpi_communicator); float_t ep = (float_t)tot_count[0] / (float_t)(tot_count[1]); /* @@ -404,7 +403,7 @@ void global_binning_check(global_context_t *ctx, float_t *data, int d, int k) float_t box_width = box_ub - box_lb; float_t bin_width = box_width / (float_t)k; - for (int i = 0; i < ctx->n_points; ++i) + for (size_t i = 0; i < ctx->n_points; ++i) { int bin_idx = (int)((data[i * ctx->dims + d] - box_lb) / bin_width); if (bin_idx < k) counts[bin_idx]++; @@ -453,14 +452,14 @@ void compute_pure_global_binning(global_context_t *ctx, pointset_t *ps, free(local_bin_count); } -int partition_data_around_value(float_t *array, int vec_len, int compare_dim, - int left, int right, float_t pivot_value) +size_t partition_data_around_value(float_t *array, size_t vec_len, size_t compare_dim, + size_t left, size_t right, float_t pivot_value) { /* * returns the number of elements less than the pivot */ - int store_index = left; - int i; + size_t store_index = left; + size_t i; /* Move pivot to end */ for (i = left; i < right; ++i) { @@ -602,9 +601,9 @@ void free_queue(partition_queue_t *pq) { free(pq->data); } void get_pointset_from_partition(pointset_t *ps, partition_t *part) { - ps->n_points = part->n_points; + ps->n_points = part->n_points; ps->data = part->base_ptr; - ps->n_points = part->n_points; + ps->n_points = part->n_points; } guess_t compute_median_pure_binning(global_context_t *ctx, pointset_t *ps, float_t fraction, int selected_dim, int n_bins, float_t tolerance) @@ -680,7 +679,7 @@ top_kdtree_node_t* top_tree_generate_node(global_context_t* ctx, top_kdtree_t* t ptr -> ub_node_box = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t)); ptr -> owner = -1; ptr -> split_dim = 0; - ++tree -> count; + ++(tree -> count); return ptr; } @@ -800,26 +799,24 @@ void build_top_kdtree(global_context_t *ctx, pointset_t *og_pointset, top_kdtree top_kdtree_node_t* current_node = top_tree_generate_node(ctx, tree); /* insert node */ - /* - MPI_DB_PRINT( "[RANK %d] Handling partition:\n"\ - "\tcurrent_node %p,\n"\ - "\tdim %d,\n"\ - "\tn_points %lu,\n"\ - "\tstart_proc %d,\n"\ - "\tn_procs %d\n"\ - "\tparent %p\n"\ - "\tbase_ptr %p\n"\ - "\tlr %d\n", - ctx -> mpi_rank, - current_node, - current_partition.d, - current_partition.n_points, - current_partition.start_proc, - current_partition.n_procs, - current_partition.parent, - current_partition.base_ptr, - current_partition.lr); - */ + // MPI_DB_PRINT( "[RANK %d] Handling partition:\n"\ + // "\tcurrent_node %p,\n"\ + // "\tdim %d,\n"\ + // "\tn_points %lu,\n"\ + // "\tstart_proc %d,\n"\ + // "\tn_procs %d\n"\ + // "\tparent %p\n"\ + // "\tbase_ptr %p\n"\ + // "\tlr %d\n", + // ctx -> mpi_rank, + // current_node, + // current_partition.d, + // current_partition.n_points, + // current_partition.start_proc, + // current_partition.n_procs, + // current_partition.parent, + // current_partition.base_ptr, + // current_partition.lr); switch (current_partition.lr) { case TOP_TREE_LCH: @@ -876,13 +873,13 @@ void build_top_kdtree(global_context_t *ctx, pointset_t *og_pointset, top_kdtree current_node -> lch = NULL; current_node -> rch = NULL; - + MPI_Barrier(ctx -> mpi_communicator); /* handle partition */ if(current_partition.n_procs > 1) { float_t fraction = (current_partition.n_procs / 2) / (float_t)current_partition.n_procs; guess_t g = compute_median_pure_binning(ctx, ¤t_pointset, fraction, current_partition.d, n_bins, tolerance); - int pv = partition_data_around_value(current_pointset.data, ctx->dims, current_partition.d, 0, current_pointset.n_points, g.x_guess); + size_t pv = partition_data_around_value(current_pointset.data, ctx->dims, current_partition.d, 0, current_pointset.n_points, g.x_guess); current_node -> split_val = g.x_guess; @@ -893,10 +890,8 @@ void build_top_kdtree(global_context_t *ctx, pointset_t *og_pointset, top_kdtree int procs_right = current_partition.n_procs - procs_left; - /* - MPI_DB_PRINT("Chosing as guess: %lf, seareching for %lf, obtained %lf\n", g.x_guess, fraction, g.ep); - MPI_DB_PRINT("-------------------\n\n"); - */ + // MPI_DB_PRINT("Chosing as guess: %lf, seareching for %lf, obtained %lf\n", g.x_guess, fraction, g.ep); + // MPI_DB_PRINT("-------------------\n\n"); @@ -935,11 +930,12 @@ void build_top_kdtree(global_context_t *ctx, pointset_t *og_pointset, top_kdtree MPI_DB_PRINT("Root is %p\n", tree -> root); if(I_AM_MASTER) { - //tree_print(ctx, tree -> root); + tree_print(ctx, tree -> root); write_nodes_to_file(ctx, tree, "bb/top_nodes.csv"); } #endif + // HERE free(current_pointset.lb_box); free(current_pointset.ub_box); @@ -1862,7 +1858,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre /* -------------------------------------------------------- */ /* heapsort them */ - #pragma omp parallel for + #pragma omp parallel for schedule(dynamic) for(int i = 0; i < ctx -> local_n_points; ++i) { heap tmp_heap;