diff --git a/Makefile b/Makefile index 99b4473d23b0af3722d2b57d562bb4c5a940794a..ceb6a13a6805c67953d67ca4b5cb3c58c78cd8c6 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ CC=mpicc #CC=mpiicx -#CFLAGS=-O3 -march=native -flto -funroll-loops -fopenmp -CFLAGS=-O3 -fopenmp +CFLAGS=-O3 -march=native -flto -funroll-loops -fopenmp +#CFLAGS=-O3 -fopenmp LDFLAGS=-lm all: main diff --git a/src/adp/adp.c b/src/adp/adp.c index cbd0263796fb30948e22ca0466a12ecd72836e90..56f946852ec1efe91b6a973217a4306a47435f70 100644 --- a/src/adp/adp.c +++ b/src/adp/adp.c @@ -1,4 +1,8 @@ #include "adp.h" +#include "mpi.h" +#include +#include +#include #include const border_t border_null = {.density = -1.0, .error = 0, .idx = NOBORDER}; @@ -133,7 +137,6 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int MPI_Barrier(ctx -> mpi_communicator); - idx_t k = ctx -> k; struct timespec start_tot, finish_tot; double elapsed_tot; @@ -158,22 +161,11 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int // heuristics // Also, shall we give som info argument to it ? // - - MPI_Win exposed_ngbh; - MPI_Win_create( ctx -> __local_heap_buffers, - ctx -> local_n_points * k * sizeof(heap_node), - 1, MPI_INFO_NULL, - ctx -> mpi_communicator, - &exposed_ngbh); - - MPI_Win_fence(MPI_MODE_NOPUT, exposed_ngbh); - - MPI_Barrier(ctx -> mpi_communicator); - + #pragma omp parallel for for(idx_t i = 0; i < ctx -> local_n_points; ++i) { - for(idx_t k = 0; k <= kMAX; ++k) + for(idx_t k = 0; k <= ctx -> k; ++k) { local_datapoints[i].ngbh[k].value = omega * pow(local_datapoints[i].ngbh[k].value, d/2.); } @@ -181,6 +173,15 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int local_datapoints[i].kstar = 0; } + MPI_Win exposed_ngbh; + MPI_Win_create( ctx -> __local_heap_buffers, + ctx -> local_n_points * ctx -> k * sizeof(heap_node), + 1, MPI_INFO_NULL, + ctx -> mpi_communicator, + &exposed_ngbh); + + MPI_Win_fence(MPI_MODE_NOPUT, exposed_ngbh); + int i_have_finished = 0; int all_have_finished = 0; int finished_points = 0; @@ -219,11 +220,12 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int } else { + // if(pos > ctx -> rank_n_points[owner]) printf("NOPE HERE from rank %d %lu %d\n", ctx -> mpi_rank, pos, ctx -> rank_n_points[owner]); MPI_Get(scratch_heap_nodes + i, sizeof(heap_node), MPI_BYTE, owner, - (MPI_Aint)((pos * (ctx -> k) + ksel) * sizeof(heap_node)), + (MPI_Aint)((pos * (idx_t)(ctx -> k) + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_BYTE, exposed_ngbh); @@ -1542,7 +1544,15 @@ void Heuristic2(global_context_t* ctx, clusters_t* cluster) MPI_Send(&num_border_el, 1, MPI_UINT64_T, rank_to_send, rank_to_send, ctx -> mpi_communicator); - MPI_Send(borders_to_send, num_border_el * sizeof(sparse_border_t), MPI_BYTE , rank_to_send, rank_to_send, ctx -> mpi_communicator); + idx_t el_already_sent = 0; + + while( el_already_sent < num_border_el) + { + idx_t el_to_send = MIN(DEFAULT_MSG_LEN, num_border_el - el_already_sent); + MPI_Send(borders_to_send + el_already_sent, el_to_send * sizeof(sparse_border_t), MPI_BYTE , rank_to_send, rank_to_send, ctx -> mpi_communicator); + el_already_sent += el_to_send; + } + i_have_sent = 1; free(borders_to_send); @@ -1558,7 +1568,27 @@ void Heuristic2(global_context_t* ctx, clusters_t* cluster) sparse_border_t* borders_to_recv = (sparse_border_t*)MY_MALLOC(num_borders_recv * sizeof(sparse_border_t)); - MPI_Recv(borders_to_recv, num_borders_recv * sizeof(sparse_border_t), MPI_BYTE , MPI_ANY_SOURCE, ctx -> mpi_rank, ctx -> mpi_communicator, MPI_STATUS_IGNORE); + idx_t el_already_recv = 0; + + while(el_already_recv < num_borders_recv) + { + MPI_Status status = {0}; + + MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &status); + + MPI_Request request; + int count_recv; + int source = status.MPI_SOURCE; + MPI_Get_count(&status, MPI_BYTE, &count_recv); + + idx_t el_to_recv = count_recv/sizeof(sparse_border_t); + MPI_Recv(borders_to_recv + el_already_recv, + count_recv, + MPI_BYTE , MPI_ANY_SOURCE, ctx -> mpi_rank, + ctx -> mpi_communicator, MPI_STATUS_IGNORE); + el_already_recv += el_to_recv; + } + for(int i = 0; i < num_borders_recv; ++i) { @@ -1618,16 +1648,25 @@ void Heuristic2(global_context_t* ctx, clusters_t* cluster) if(I_AM_MASTER) { printf("[MASTER] Writing borders to bb/borders.csv\n"); - FILE* f = fopen("bb/borders.csv", "w"); + // FILE* f = fopen("bb/borders.csv", "w"); + // for(idx_t i = 0; i < nclus; ++i) + // { + // for(int j = 0; j < cluster -> sparse_borders[i].count; ++j) + // { + // sparse_border_t b = cluster -> sparse_borders[i].data[j]; + // fprintf(f, "%lu,%lu,%lu,%lf,%lf\n", b.i, b.j, b.idx, b.density, b.error); + // } + // } + // fclose(f); + + FILE* f = fopen("bb/borders.bin", "wb"); for(idx_t i = 0; i < nclus; ++i) { - for(int j = 0; j < cluster -> sparse_borders[i].count; ++j) - { - sparse_border_t b = cluster -> sparse_borders[i].data[j]; - fprintf(f, "%lu,%lu,%lu,%lf\n", b.i, b.j, b.idx, b.density); - } + fwrite(cluster -> sparse_borders[i].data, sizeof(sparse_border_t), + cluster -> sparse_borders[i].count, f); } fclose(f); + } @@ -1751,7 +1790,8 @@ static inline void delete_adj_list_element(clusters_t * c, const idx_t list_idx, c -> sparse_borders[list_idx].count -= 1; } -void fix_sparse_borders_A_into_B(idx_t s,idx_t t, clusters_t* c) +#ifdef PARALLEL_FIX_BORDERS +void fix_sparse_borders_A_into_B(idx_t source,idx_t target, clusters_t* clusters_obj) { /* * Handle borders after two clusters are merged @@ -1764,67 +1804,119 @@ void fix_sparse_borders_A_into_B(idx_t s,idx_t t, clusters_t* c) * density is kept */ - { - { - for(idx_t el = 0; el < c -> sparse_borders[t].count; ++el) - { - sparse_border_t b = c -> sparse_borders[t].data[el]; - if(b.i == t && b.j == s) - { - //delete the border src trg - delete_adj_list_element(c, t, el); - } - } - } - //find the border and delete it, other insert them in correct place - for(idx_t el = 0; el < c -> sparse_borders[s].count; ++el) - { - sparse_border_t b = c -> sparse_borders[s].data[el]; - // idx_t ii = b.i; - if(b.j != t) - { - //insert these borders as trg -> j and j -> trg - b.i = t; - sparse_border_insert(c, b); - sparse_border_t bsym = b; - bsym.i = b.j; - bsym.j = b.i; - sparse_border_insert(c, bsym); - for(idx_t dl = 0; dl < c -> sparse_borders[b.j].count; ++dl) - { - sparse_border_t b_del = c -> sparse_borders[b.j].data[dl]; - if(b_del.j == s) - { - //delete the border src trg - delete_adj_list_element(c, b.j, dl); - } - } - - } - } - //clean up all borders - //delete the src list - { - adj_list_reset((c->sparse_borders) + s); - } - //delete all borders containing src - // for(idx_t i = 0; i < nclus; ++i) - // { - // for(idx_t el = 0; el < c -> sparse_borders[i].count; ++el) - // { - // sparse_border_t b = c -> sparse_borders[i].data[el]; - // if(b.j == s) - // { - // //delete the border src trg - // delete_adj_list_element(c, i, el); - // } - // } - // - // } - } + for(idx_t el = 0; el < clusters_obj -> sparse_borders[target].count; ++el) + { + sparse_border_t b = clusters_obj -> sparse_borders[target].data[el]; + if(b.i == target && b.j == source) + { + //delete the border src trg + delete_adj_list_element(clusters_obj, target, el); + } + } + + //find the border and delete it + for(idx_t el = 0; el < clusters_obj -> sparse_borders[source].count; ++el) + { + sparse_border_t b = clusters_obj -> sparse_borders[source].data[el]; + if(b.j != target) + { + //insert these borders as trg -> j and j -> trg + b.i = target; + sparse_border_insert(clusters_obj, b); + } + } + + // other insert them in correct place + + #pragma omp parallel for + for(idx_t el = 0; el < clusters_obj -> sparse_borders[source].count; ++el) + { + sparse_border_t b = clusters_obj -> sparse_borders[source].data[el]; + if(b.j != target) + { + // insert these borders as trg -> j and j -> trg + // fix the same, b.i should now be the target + b.i = target; + sparse_border_t bsym = b; + bsym.i = b.j; + bsym.j = b.i; + sparse_border_insert(clusters_obj, bsym); + for(idx_t dl = 0; dl < clusters_obj -> sparse_borders[b.j].count; ++dl) + { + sparse_border_t b_del = clusters_obj -> sparse_borders[b.j].data[dl]; + if(b_del.j == source) + { + delete_adj_list_element(clusters_obj, b.j, dl); + } + } + + } + } + //clean up all borders + //delete the src list + + adj_list_reset((clusters_obj->sparse_borders) + source); } +#else + +void fix_sparse_borders_A_into_B(idx_t source,idx_t target, clusters_t* clusters_obj) +{ + /* + * Handle borders after two clusters are merged + * - idx_t s : source cluster, the one has to be merged + * - idx_t t : target cluster, the one recieves the merge + * - clusters* c : object containing all the data + * + * When s goes into t all the clusters which had a border with s now they must have + * a border with t. If t already has a border like that, the border with higher + * density is kept + */ + + for(idx_t el = 0; el < clusters_obj -> sparse_borders[target].count; ++el) + { + sparse_border_t b = clusters_obj -> sparse_borders[target].data[el]; + if(b.i == target && b.j == source) + { + //delete the border src trg + delete_adj_list_element(clusters_obj, target, el); + } + } + + //find the border and delete it + for(idx_t el = 0; el < clusters_obj -> sparse_borders[source].count; ++el) + { + sparse_border_t b = clusters_obj -> sparse_borders[source].data[el]; + if(b.j != target) + { + //insert these borders as trg -> j and j -> trg + b.i = target; + sparse_border_insert(clusters_obj, b); + sparse_border_t bsym = b; + bsym.i = b.j; + bsym.j = b.i; + sparse_border_insert(clusters_obj, bsym); + for(idx_t dl = 0; dl < clusters_obj -> sparse_borders[b.j].count; ++dl) + { + sparse_border_t b_del = clusters_obj -> sparse_borders[b.j].data[dl]; + if(b_del.j == source) + { + //delete the border src trg + delete_adj_list_element(clusters_obj, b.j, dl); + } + } + + } + } + + //clean up all borders + //delete the src list + + adj_list_reset((clusters_obj->sparse_borders) + source); + +} +#endif void merge_A_into_B(idx_t* who_amI, idx_t cluster_A, idx_t cluster_B, idx_t n) { @@ -1919,9 +2011,24 @@ void master_finds_borders(global_context_t* ctx, clusters_t* cluster, float_t Z, #endif + #ifdef WRITE_MERGES_INFO + FILE* f = fopen("bb/merges_info.csv", "w"); + struct timespec start_merge, end_merge; + #endif + for( idx_t m = 0; m < merge_count; m++ ) { + #if defined(WRITE_MERGES_INFO) + clock_gettime(CLOCK_MONOTONIC, &start_merge); + #endif + // print progress + if(merge_count > 1e5) + { + int slice = merge_count / 20; + if(m % slice == 0 || m == merge_count - 1) printf("Merging progress: %lu / %lu -> %.2f \n", + m, merge_count, (float)m/(float)merge_count * 100.); + } #define src surviving_clusters[merging_table[m].source] #define trg surviving_clusters[merging_table[m].target] @@ -1970,19 +2077,37 @@ void master_finds_borders(global_context_t* ctx, clusters_t* cluster, float_t Z, * first -> fix the borders, delete old ones and spawn new one in the correct position * second -> update the surviving_clusters buffer */ + fix_sparse_borders_A_into_B(new_src, new_trg, cluster); merge_A_into_B(surviving_clusters, new_src, new_trg, nclus ); + } break; default: break; } + + #if defined(WRITE_MERGES_INFO) + clock_gettime(CLOCK_MONOTONIC, &end_merge); + fprintf(f, "%lu,%lu,%lu,%lu,%f,%d\n", new_src, new_trg, + cluster -> sparse_borders[new_src].count, + cluster -> sparse_borders[new_trg].count, + (float)(end_merge.tv_sec - start_merge.tv_sec) + + (float)(end_merge.tv_nsec - start_merge.tv_nsec)/1e9, + i_have_to_merge && src != trg); + fflush(f); + #endif + #undef src #undef trg } + #if defined(WRITE_MERGES_INFO) + fclose(f); + #endif + free(merging_table); } @@ -2130,9 +2255,29 @@ void Heuristic3(global_context_t* ctx, clusters_t* cluster, float_t Z, int halo) } } MPI_Win_fence(0, dp_info_win); + MPI_Win_free(&dp_info_win); qsort(centers_dp, cluster -> centers.count, sizeof(datapoint_info_t), compare_dp_by_cidx); + + #if defined(WRITE_CENTERS_PRE_MERGING) + if(I_AM_MASTER) + { + printf("[MASTER] Writing centers pre merging bb/centers_pre_merging.csv\n"); + // FILE* f = fopen("bb/centers_pre_merging.csv", "w"); + // for(idx_t i = 0; i < cluster -> centers.count; ++i) + // { + // datapoint_info_t dp = centers_dp[i]; + // fprintf(f, "%lu,%d,%lf,%lf\n", dp.array_idx, dp.cluster_idx, dp.log_rho_c, dp.log_rho_err); + // } + // fclose(f); + + FILE* f = fopen("bb/centers_pre_merging.bin", "wb"); + fwrite(centers_dp, sizeof(datapoint_info_t), cluster -> centers.count, f); + fclose(f); + } + #endif + master_finds_borders(ctx, cluster, Z, surviving_clusters, centers_dp); master_fixes_border_matrix_and_centers(ctx, cluster, Z, old_to_new, surviving_clusters, nclus); free(centers_dp); @@ -2140,8 +2285,8 @@ void Heuristic3(global_context_t* ctx, clusters_t* cluster, float_t Z, int halo) else { MPI_Win_fence(0, dp_info_win); + MPI_Win_free(&dp_info_win); } - MPI_Win_free(&dp_info_win); /* at this point master has the final border matrix * with the final list of surviving clusters diff --git a/src/common/common.c b/src/common/common.c index 49ad8204d75718c6a502e3a3b00acbaeba369147..3ee1a5d78453af1cfbc74099bf0ef655f94b0c6d 100644 --- a/src/common/common.c +++ b/src/common/common.c @@ -15,8 +15,8 @@ void get_context(global_context_t* ctx) ctx -> local_data = NULL; ctx -> lb_box = NULL; ctx -> ub_box = NULL; - ctx -> rank_n_points = (int*)malloc(ctx -> world_size * sizeof(int)); - ctx -> rank_idx_start = (int*)malloc(ctx -> world_size * sizeof(int)); + ctx -> rank_n_points = (idx_t*)malloc(ctx -> world_size * sizeof(idx_t)); + ctx -> rank_idx_start = (idx_t*)malloc(ctx -> world_size * sizeof(idx_t)); ctx -> local_datapoints = NULL; ctx -> __local_heap_buffers = NULL; ctx -> input_data_in_float32 = -1; diff --git a/src/common/common.h b/src/common/common.h index de3ee63ee91ee2596ec6bad3b4d7030b44aaa80e..dec2d61d731b49f1a1c0569be4520b3d59bc9329 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -6,20 +6,25 @@ #include #include #include "../tree/heap.h" -//#include -//#define WRITE_NGBH -//#define WRITE_TOP_NODES -//#define WRITE_DENSITY -//#define WRITE_CLUSTER_ASSIGN_H1 -//#define WRITE_BORDERS -//#define WRITE_MERGING_TABLE -//#define WRITE_FINAL_ASSIGNMENT +#define DEFAULT_MSG_LEN 10000000 +//#include -//#define PRINT_NGBH_EXCHANGE_SCHEME -//#define PRINT_H2_COMM_SCHEME -//#define PRINT_H1_CLUSTER_ASSIGN_COMPLETION -//#define PRINT_ORDERED_BUFFER +#define PARALLEL_FIX_BORDERS +// #define WRITE_NGBH +// #define WRITE_TOP_NODES +// #define WRITE_DENSITY +// #define WRITE_CLUSTER_ASSIGN_H1 +// #define WRITE_BORDERS +// #define WRITE_CENTERS_PRE_MERGING +// #define WRITE_MERGES_INFO +// #define WRITE_MERGING_TABLE +// #define WRITE_FINAL_ASSIGNMENT + +// #define PRINT_NGBH_EXCHANGE_SCHEME +// #define PRINT_H2_COMM_SCHEME +// #define PRINT_H1_CLUSTER_ASSIGN_COMPLETION +// #define PRINT_ORDERED_BUFFER #define DEFAULT_STR_LEN 200 @@ -159,8 +164,8 @@ typedef struct global_context_t size_t idx_start; //starting index of the points on the processor size_t local_n_points; //number of points stored in the current processor datapoint_info_t* local_datapoints; //pointer to the datapoint properties - int* rank_idx_start; //starting index of datapoints in each processor - int* rank_n_points; //processor name + idx_t* rank_idx_start; //starting index of datapoints in each processor + idx_t* rank_n_points; //processor name heap_node* __local_heap_buffers; //buffer that stores nearest neighbors MPI_Comm mpi_communicator; //mpi communicator char input_data_file[DEFAULT_STR_LEN]; diff --git a/src/main/main.c b/src/main/main.c index e6153b1c839b2c7bbaba2ee5f7733f00e8a810be..0863c9917120ca8984abd91ed1decd2803018798 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -134,14 +134,14 @@ void parse_args(global_context_t* ctx, int argc, char** argv) void print_hello(global_context_t* ctx) { - char * hello = "" -" _ _ \n" -" __| | __ _ __| |_ __ \n" -" / _` |/ _` |/ _` | '_ \\ \n" -"| (_| | (_| | (_| | |_) | \n" -" \\__,_|\\__,_|\\__,_| .__/ \n" -" |_| \n" -" Distributed Advanced Density Peak"; + char * hello = "" + " _ _ \n" + " __| | __ _ __| |_ __ \n" + " / _` |/ _` |/ _` | '_ \\ \n" + "| (_| | (_| | (_| | |_) | \n" + " \\__,_|\\__,_|\\__,_| .__/ \n" + " |_| \n" + " Distributed Advanced Density Peak"; mpi_printf(ctx, "%s\n\n", hello); #if defined (_OPENMP) @@ -299,7 +299,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) float_t *pvt_data = (float_t *)MY_MALLOC(send_counts[ctx->mpi_rank] * sizeof(float_t)); - uint64_t default_msg_len = 10000000; //bytes if(I_AM_MASTER) { @@ -310,7 +309,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) already_sent_points = 0; while(already_sent_points < send_counts[i]) { - int count_send = MIN(default_msg_len, send_counts[i] - already_sent_points); + int count_send = MIN(DEFAULT_MSG_LEN, send_counts[i] - already_sent_points); MPI_Send(data + displacements[i] + already_sent_points, count_send, MPI_MY_FLOAT, i, ctx -> mpi_rank, ctx -> mpi_communicator); already_sent_points += count_send; } @@ -394,7 +393,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) MPI_DB_PRINT("----- Performing ngbh search -----\n"); MPI_Barrier(ctx -> mpi_communicator); - HERE mpi_ngbh_search(ctx, dp_info, &tree, &local_tree, ctx -> local_data, ctx -> k); MPI_Barrier(ctx -> mpi_communicator); diff --git a/src/tree/tree.c b/src/tree/tree.c index 9ff820b165c5542d2517bb31b0d9d5136c41b49f..3014c3e369a48ba1101b047af748f1673fab6dab 100644 --- a/src/tree/tree.c +++ b/src/tree/tree.c @@ -935,7 +935,6 @@ void build_top_kdtree(global_context_t *ctx, pointset_t *og_pointset, top_kdtree } #endif - // HERE free(current_pointset.lb_box); free(current_pointset.ub_box); @@ -1077,9 +1076,9 @@ void exchange_points(global_context_t* ctx, top_kdtree_t* tree) ctx -> mpi_communicator); ctx -> local_n_points = tot_count / ctx -> dims; - int* ppp = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + idx_t* ppp = (idx_t*)MY_MALLOC(ctx -> world_size * sizeof(idx_t)); - MPI_Allgather(&(ctx -> local_n_points), 1, MPI_INT, ppp, 1, MPI_INT, ctx -> mpi_communicator); + MPI_Allgather(&(ctx -> local_n_points), 1, MPI_UINT64_T, ppp, 1, MPI_UINT64_T, ctx -> mpi_communicator); ctx -> idx_start = 0; for(int i = 0; i < ctx -> mpi_rank; ++i) { @@ -1426,7 +1425,7 @@ void convert_heap_idx_to_global(global_context_t* ctx, heap* H) { for(uint64_t i = 0; i < H -> count; ++i) { - H -> data[i].array_idx = local_to_global_idx(ctx, H -> data[i].array_idx); + if(H -> data[i].array_idx != MY_SIZE_MAX) H -> data[i].array_idx = local_to_global_idx(ctx, H -> data[i].array_idx); } } @@ -1584,6 +1583,9 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre float_t* __rcv_points = (float_t*)MY_MALLOC(tot_points_rcv * (ctx -> dims) * sizeof(float_t)); float_t* __snd_points = (float_t*)MY_MALLOC(tot_points_snd * (ctx -> dims) * sizeof(float_t)); + float_t* __max_dist_snd = (float_t*)MY_MALLOC(tot_points_snd * sizeof(float_t)); + float_t* __max_dist_rcv = (float_t*)MY_MALLOC(tot_points_rcv * sizeof(float_t)); + /* copy data to send in contiguous memory */ @@ -1592,6 +1594,20 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre memcpy(__snd_points + snd_displ[i], data_to_send_per_proc[i], snd_count[i] * sizeof(float_t)); } + size_t dist_idx = 0; + for(idx_t i = 0; i < ctx -> world_size; ++i) + { + for(idx_t j = 0; j < point_to_snd_count[i]; ++j) + { + idx_t point_idx = local_idx_of_the_point[i][j]; + // for each point take the distance of its furthest nearest neighbor + // then communicate it in order to prune the search on foreign nodes + __max_dist_snd[dist_idx] = ctx -> local_datapoints[point_idx].ngbh[0].value; + ++dist_idx; + } + } + + MPI_Alltoallv(__snd_points, snd_count, snd_displ, MPI_MY_FLOAT, __rcv_points, rcv_count, rcv_displ, MPI_MY_FLOAT, ctx -> mpi_communicator); @@ -1599,12 +1615,43 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre float_t** rcv_work_batches = (float_t**)MY_MALLOC(ctx -> world_size * sizeof(float_t*)); for(int i = 0; i < ctx -> world_size; ++i) { - //rcv_work_batches[i] = NULL; rcv_work_batches[i] = __rcv_points + rcv_displ[i]; } MPI_Status status; int flag; + + for(int i = 0; i < ctx -> world_size; ++i) + { + rcv_count[i] = rcv_count[i] / ctx -> dims; + rcv_displ[i] = rcv_displ[i] / ctx -> dims; + snd_count[i] = snd_count[i] / ctx -> dims; + snd_displ[i] = snd_displ[i] / ctx -> dims; + + } + + MPI_Alltoallv(__max_dist_snd, snd_count, snd_displ, MPI_MY_FLOAT, + __max_dist_rcv, rcv_count, rcv_displ, MPI_MY_FLOAT, ctx -> mpi_communicator); + + + // THIS CAN BE SUBSTITUTED BY DIVIDING BY ctx -> dims + // rcv_displ[0] = 0; + // snd_displ[0] = 0; + // rcv_count[0] = point_to_rcv_count[0]; + // snd_count[0] = point_to_snd_count[0]; + // + // + // for(int i = 1; i < ctx -> world_size; ++i) + // { + // + // rcv_count[i] = point_to_rcv_count[i]; + // snd_count[i] = point_to_snd_count[i]; + // + // rcv_displ[i] = rcv_displ[i - 1] + rcv_count[i - 1]; + // snd_displ[i] = snd_displ[i - 1] + snd_count[i - 1]; + // } + // + /* prepare heap batches */ //int work_batch_stride = 1 + ctx -> dims; @@ -1621,27 +1668,15 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre MPI_Barrier(ctx -> mpi_communicator); - rcv_displ[0] = 0; - snd_displ[0] = 0; - rcv_count[0] = point_to_rcv_count[0]; - snd_count[0] = point_to_snd_count[0]; - - - for(int i = 1; i < ctx -> world_size; ++i) - { - - rcv_count[i] = point_to_rcv_count[i]; - snd_count[i] = point_to_snd_count[i]; - - rcv_displ[i] = rcv_displ[i - 1] + rcv_count[i - 1]; - snd_displ[i] = snd_displ[i - 1] + snd_count[i - 1]; - } - + // max dists will contain the maximum distance to have to search heap_node** heap_batches_per_node = (heap_node**)MY_MALLOC(ctx -> world_size * sizeof(heap_node*)); + float_t** max_dists_per_node = (float_t**)MY_MALLOC(ctx -> world_size * sizeof(float_t*)); + for(int p = 0; p < ctx -> world_size; ++p) { heap_batches_per_node[p] = __heap_batches_to_snd + (uint64_t)rcv_displ[p] * (uint64_t)k; + max_dists_per_node[p] = __max_dist_rcv + (uint64_t)rcv_displ[p]; } /* compute everything */ @@ -1656,17 +1691,21 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre for(int p = 0; p < ctx -> world_size; ++p) { if(point_to_rcv_count[p] > 0 && p != ctx -> mpi_rank) - //if(count_rcv_work_batches[p] > 0) { - //heap_batches_per_node[p] = (heap_node*)MY_MALLOC(k * point_to_rcv_count[p] * sizeof(heap_node)); #pragma omp parallel for schedule(dynamic) for(int batch = 0; batch < point_to_rcv_count[p]; ++batch) { heap H; - H.count = 0; + H.count = k; H.N = k; H.data = heap_batches_per_node[p] + (uint64_t)k * (uint64_t)batch; - init_heap(&H); + float_t max_dist = max_dists_per_node[p][batch]; + for(idx_t i = 0; i < H.N; ++i) + { + H.data[i].value = max_dist; + H.data[i].array_idx = MY_SIZE_MAX; + } + //float_t* point = rcv_work_batches[p] + batch * work_batch_stride + 1; float_t* point = rcv_work_batches[p] + (uint64_t)batch * (uint64_t)work_batch_stride; knn_sub_tree_search_kdtree_v2(point, local_tree -> root, &H); @@ -1834,7 +1873,8 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre tmp_heap.N = k; tmp_heap.count = k; tmp_heap.data = dp_info[idx].ngbh; - insert_max_heap(&(tmp_heap), H.data[j].value, H.data[j].array_idx); + // insert it only of != from max + if(H.data[j].array_idx != MY_SIZE_MAX) insert_max_heap(&(tmp_heap), H.data[j].value, H.data[j].array_idx); } } @@ -1885,7 +1925,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre { for(int i = 0; i < ctx -> local_n_points; ++i) { - fwrite(dp_info[i].ngbh.data, sizeof(heap_node), k, file); + fwrite(dp_info[i].ngbh, sizeof(heap_node), k, file); } fclose(file); } @@ -1903,6 +1943,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre free(data_to_send_per_proc); free(local_idx_of_the_point); free(heap_batches_per_node); + free(max_dists_per_node); //free(rcv_heap_batches); free(rcv_work_batches); free(point_to_rcv_count); @@ -1917,6 +1958,8 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre free(__heap_batches_to_snd); free(__rcv_points); free(__snd_points); + free(__max_dist_snd); + free(__max_dist_rcv); }