Loading Makefile +2 −2 Original line number Diff line number Diff line CC=mpicc #CC=mpiicx #CFLAGS=-O3 -march=native -flto -funroll-loops -fopenmp CFLAGS=-O3 -fopenmp CFLAGS=-O3 -march=native -flto -funroll-loops -fopenmp #CFLAGS=-O3 -fopenmp LDFLAGS=-lm all: main Loading src/adp/adp.c +228 −83 Original line number Diff line number Diff line #include "adp.h" #include "mpi.h" #include <bits/time.h> #include <stdio.h> #include <time.h> #include <unistd.h> const border_t border_null = {.density = -1.0, .error = 0, .idx = NOBORDER}; Loading Loading @@ -133,7 +137,6 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int MPI_Barrier(ctx -> mpi_communicator); idx_t k = ctx -> k; struct timespec start_tot, finish_tot; double elapsed_tot; Loading @@ -159,21 +162,10 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int // Also, shall we give som info argument to it ? // MPI_Win exposed_ngbh; MPI_Win_create( ctx -> __local_heap_buffers, ctx -> local_n_points * k * sizeof(heap_node), 1, MPI_INFO_NULL, ctx -> mpi_communicator, &exposed_ngbh); MPI_Win_fence(MPI_MODE_NOPUT, exposed_ngbh); MPI_Barrier(ctx -> mpi_communicator); #pragma omp parallel for for(idx_t i = 0; i < ctx -> local_n_points; ++i) { for(idx_t k = 0; k <= kMAX; ++k) for(idx_t k = 0; k <= ctx -> k; ++k) { local_datapoints[i].ngbh[k].value = omega * pow(local_datapoints[i].ngbh[k].value, d/2.); } Loading @@ -181,6 +173,15 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int local_datapoints[i].kstar = 0; } MPI_Win exposed_ngbh; MPI_Win_create( ctx -> __local_heap_buffers, ctx -> local_n_points * ctx -> k * sizeof(heap_node), 1, MPI_INFO_NULL, ctx -> mpi_communicator, &exposed_ngbh); MPI_Win_fence(MPI_MODE_NOPUT, exposed_ngbh); int i_have_finished = 0; int all_have_finished = 0; int finished_points = 0; Loading Loading @@ -219,11 +220,12 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int } else { // if(pos > ctx -> rank_n_points[owner]) printf("NOPE HERE from rank %d %lu %d\n", ctx -> mpi_rank, pos, ctx -> rank_n_points[owner]); MPI_Get(scratch_heap_nodes + i, sizeof(heap_node), MPI_BYTE, owner, (MPI_Aint)((pos * (ctx -> k) + ksel) * sizeof(heap_node)), (MPI_Aint)((pos * (idx_t)(ctx -> k) + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_BYTE, exposed_ngbh); Loading Loading @@ -1542,7 +1544,15 @@ void Heuristic2(global_context_t* ctx, clusters_t* cluster) MPI_Send(&num_border_el, 1, MPI_UINT64_T, rank_to_send, rank_to_send, ctx -> mpi_communicator); MPI_Send(borders_to_send, num_border_el * sizeof(sparse_border_t), MPI_BYTE , rank_to_send, rank_to_send, ctx -> mpi_communicator); idx_t el_already_sent = 0; while( el_already_sent < num_border_el) { idx_t el_to_send = MIN(DEFAULT_MSG_LEN, num_border_el - el_already_sent); MPI_Send(borders_to_send + el_already_sent, el_to_send * sizeof(sparse_border_t), MPI_BYTE , rank_to_send, rank_to_send, ctx -> mpi_communicator); el_already_sent += el_to_send; } i_have_sent = 1; free(borders_to_send); Loading @@ -1558,7 +1568,27 @@ void Heuristic2(global_context_t* ctx, clusters_t* cluster) sparse_border_t* borders_to_recv = (sparse_border_t*)MY_MALLOC(num_borders_recv * sizeof(sparse_border_t)); MPI_Recv(borders_to_recv, num_borders_recv * sizeof(sparse_border_t), MPI_BYTE , MPI_ANY_SOURCE, ctx -> mpi_rank, ctx -> mpi_communicator, MPI_STATUS_IGNORE); idx_t el_already_recv = 0; while(el_already_recv < num_borders_recv) { MPI_Status status = {0}; MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &status); MPI_Request request; int count_recv; int source = status.MPI_SOURCE; MPI_Get_count(&status, MPI_BYTE, &count_recv); idx_t el_to_recv = count_recv/sizeof(sparse_border_t); MPI_Recv(borders_to_recv + el_already_recv, count_recv, MPI_BYTE , MPI_ANY_SOURCE, ctx -> mpi_rank, ctx -> mpi_communicator, MPI_STATUS_IGNORE); el_already_recv += el_to_recv; } for(int i = 0; i < num_borders_recv; ++i) { Loading Loading @@ -1618,16 +1648,25 @@ void Heuristic2(global_context_t* ctx, clusters_t* cluster) if(I_AM_MASTER) { printf("[MASTER] Writing borders to bb/borders.csv\n"); FILE* f = fopen("bb/borders.csv", "w"); // FILE* f = fopen("bb/borders.csv", "w"); // for(idx_t i = 0; i < nclus; ++i) // { // for(int j = 0; j < cluster -> sparse_borders[i].count; ++j) // { // sparse_border_t b = cluster -> sparse_borders[i].data[j]; // fprintf(f, "%lu,%lu,%lu,%lf,%lf\n", b.i, b.j, b.idx, b.density, b.error); // } // } // fclose(f); FILE* f = fopen("bb/borders.bin", "wb"); for(idx_t i = 0; i < nclus; ++i) { for(int j = 0; j < cluster -> sparse_borders[i].count; ++j) { sparse_border_t b = cluster -> sparse_borders[i].data[j]; fprintf(f, "%lu,%lu,%lu,%lf\n", b.i, b.j, b.idx, b.density); } fwrite(cluster -> sparse_borders[i].data, sizeof(sparse_border_t), cluster -> sparse_borders[i].count, f); } fclose(f); } Loading Loading @@ -1751,7 +1790,8 @@ static inline void delete_adj_list_element(clusters_t * c, const idx_t list_idx, c -> sparse_borders[list_idx].count -= 1; } void fix_sparse_borders_A_into_B(idx_t s,idx_t t, clusters_t* c) #ifdef PARALLEL_FIX_BORDERS void fix_sparse_borders_A_into_B(idx_t source,idx_t target, clusters_t* clusters_obj) { /* * Handle borders after two clusters are merged Loading @@ -1764,67 +1804,119 @@ void fix_sparse_borders_A_into_B(idx_t s,idx_t t, clusters_t* c) * density is kept */ for(idx_t el = 0; el < clusters_obj -> sparse_borders[target].count; ++el) { { for(idx_t el = 0; el < c -> sparse_borders[t].count; ++el) { sparse_border_t b = c -> sparse_borders[t].data[el]; if(b.i == t && b.j == s) sparse_border_t b = clusters_obj -> sparse_borders[target].data[el]; if(b.i == target && b.j == source) { //delete the border src trg delete_adj_list_element(c, t, el); delete_adj_list_element(clusters_obj, target, el); } } //find the border and delete it for(idx_t el = 0; el < clusters_obj -> sparse_borders[source].count; ++el) { sparse_border_t b = clusters_obj -> sparse_borders[source].data[el]; if(b.j != target) { //insert these borders as trg -> j and j -> trg b.i = target; sparse_border_insert(clusters_obj, b); } } //find the border and delete it, other insert them in correct place for(idx_t el = 0; el < c -> sparse_borders[s].count; ++el) // other insert them in correct place #pragma omp parallel for for(idx_t el = 0; el < clusters_obj -> sparse_borders[source].count; ++el) { sparse_border_t b = c -> sparse_borders[s].data[el]; // idx_t ii = b.i; if(b.j != t) sparse_border_t b = clusters_obj -> sparse_borders[source].data[el]; if(b.j != target) { // insert these borders as trg -> j and j -> trg b.i = t; sparse_border_insert(c, b); // fix the same, b.i should now be the target b.i = target; sparse_border_t bsym = b; bsym.i = b.j; bsym.j = b.i; sparse_border_insert(c, bsym); for(idx_t dl = 0; dl < c -> sparse_borders[b.j].count; ++dl) sparse_border_insert(clusters_obj, bsym); for(idx_t dl = 0; dl < clusters_obj -> sparse_borders[b.j].count; ++dl) { sparse_border_t b_del = c -> sparse_borders[b.j].data[dl]; if(b_del.j == s) sparse_border_t b_del = clusters_obj -> sparse_borders[b.j].data[dl]; if(b_del.j == source) { //delete the border src trg delete_adj_list_element(c, b.j, dl); delete_adj_list_element(clusters_obj, b.j, dl); } } } } //clean up all borders //delete the src list adj_list_reset((clusters_obj->sparse_borders) + source); } #else void fix_sparse_borders_A_into_B(idx_t source,idx_t target, clusters_t* clusters_obj) { /* * Handle borders after two clusters are merged * - idx_t s : source cluster, the one has to be merged * - idx_t t : target cluster, the one recieves the merge * - clusters* c : object containing all the data * * When s goes into t all the clusters which had a border with s now they must have * a border with t. If t already has a border like that, the border with higher * density is kept */ for(idx_t el = 0; el < clusters_obj -> sparse_borders[target].count; ++el) { sparse_border_t b = clusters_obj -> sparse_borders[target].data[el]; if(b.i == target && b.j == source) { adj_list_reset((c->sparse_borders) + s); //delete the border src trg delete_adj_list_element(clusters_obj, target, el); } //delete all borders containing src // for(idx_t i = 0; i < nclus; ++i) // { // for(idx_t el = 0; el < c -> sparse_borders[i].count; ++el) // { // sparse_border_t b = c -> sparse_borders[i].data[el]; // if(b.j == s) // { // //delete the border src trg // delete_adj_list_element(c, i, el); // } // } // // } } //find the border and delete it for(idx_t el = 0; el < clusters_obj -> sparse_borders[source].count; ++el) { sparse_border_t b = clusters_obj -> sparse_borders[source].data[el]; if(b.j != target) { //insert these borders as trg -> j and j -> trg b.i = target; sparse_border_insert(clusters_obj, b); sparse_border_t bsym = b; bsym.i = b.j; bsym.j = b.i; sparse_border_insert(clusters_obj, bsym); for(idx_t dl = 0; dl < clusters_obj -> sparse_borders[b.j].count; ++dl) { sparse_border_t b_del = clusters_obj -> sparse_borders[b.j].data[dl]; if(b_del.j == source) { //delete the border src trg delete_adj_list_element(clusters_obj, b.j, dl); } } } } //clean up all borders //delete the src list adj_list_reset((clusters_obj->sparse_borders) + source); } #endif void merge_A_into_B(idx_t* who_amI, idx_t cluster_A, idx_t cluster_B, idx_t n) { Loading Loading @@ -1919,9 +2011,24 @@ void master_finds_borders(global_context_t* ctx, clusters_t* cluster, float_t Z, #endif #ifdef WRITE_MERGES_INFO FILE* f = fopen("bb/merges_info.csv", "w"); struct timespec start_merge, end_merge; #endif for( idx_t m = 0; m < merge_count; m++ ) { #if defined(WRITE_MERGES_INFO) clock_gettime(CLOCK_MONOTONIC, &start_merge); #endif // print progress if(merge_count > 1e5) { int slice = merge_count / 20; if(m % slice == 0 || m == merge_count - 1) printf("Merging progress: %lu / %lu -> %.2f \n", m, merge_count, (float)m/(float)merge_count * 100.); } #define src surviving_clusters[merging_table[m].source] #define trg surviving_clusters[merging_table[m].target] Loading Loading @@ -1970,8 +2077,10 @@ void master_finds_borders(global_context_t* ctx, clusters_t* cluster, float_t Z, * first -> fix the borders, delete old ones and spawn new one in the correct position * second -> update the surviving_clusters buffer */ fix_sparse_borders_A_into_B(new_src, new_trg, cluster); merge_A_into_B(surviving_clusters, new_src, new_trg, nclus ); } break; Loading @@ -1979,10 +2088,26 @@ void master_finds_borders(global_context_t* ctx, clusters_t* cluster, float_t Z, break; } #if defined(WRITE_MERGES_INFO) clock_gettime(CLOCK_MONOTONIC, &end_merge); fprintf(f, "%lu,%lu,%lu,%lu,%f,%d\n", new_src, new_trg, cluster -> sparse_borders[new_src].count, cluster -> sparse_borders[new_trg].count, (float)(end_merge.tv_sec - start_merge.tv_sec) + (float)(end_merge.tv_nsec - start_merge.tv_nsec)/1e9, i_have_to_merge && src != trg); fflush(f); #endif #undef src #undef trg } #if defined(WRITE_MERGES_INFO) fclose(f); #endif free(merging_table); } Loading Loading @@ -2130,9 +2255,29 @@ void Heuristic3(global_context_t* ctx, clusters_t* cluster, float_t Z, int halo) } } MPI_Win_fence(0, dp_info_win); MPI_Win_free(&dp_info_win); qsort(centers_dp, cluster -> centers.count, sizeof(datapoint_info_t), compare_dp_by_cidx); #if defined(WRITE_CENTERS_PRE_MERGING) if(I_AM_MASTER) { printf("[MASTER] Writing centers pre merging bb/centers_pre_merging.csv\n"); // FILE* f = fopen("bb/centers_pre_merging.csv", "w"); // for(idx_t i = 0; i < cluster -> centers.count; ++i) // { // datapoint_info_t dp = centers_dp[i]; // fprintf(f, "%lu,%d,%lf,%lf\n", dp.array_idx, dp.cluster_idx, dp.log_rho_c, dp.log_rho_err); // } // fclose(f); FILE* f = fopen("bb/centers_pre_merging.bin", "wb"); fwrite(centers_dp, sizeof(datapoint_info_t), cluster -> centers.count, f); fclose(f); } #endif master_finds_borders(ctx, cluster, Z, surviving_clusters, centers_dp); master_fixes_border_matrix_and_centers(ctx, cluster, Z, old_to_new, surviving_clusters, nclus); free(centers_dp); Loading @@ -2140,8 +2285,8 @@ void Heuristic3(global_context_t* ctx, clusters_t* cluster, float_t Z, int halo) else { MPI_Win_fence(0, dp_info_win); } MPI_Win_free(&dp_info_win); } /* at this point master has the final border matrix * with the final list of surviving clusters Loading src/common/common.c +2 −2 Original line number Diff line number Diff line Loading @@ -15,8 +15,8 @@ void get_context(global_context_t* ctx) ctx -> local_data = NULL; ctx -> lb_box = NULL; ctx -> ub_box = NULL; ctx -> rank_n_points = (int*)malloc(ctx -> world_size * sizeof(int)); ctx -> rank_idx_start = (int*)malloc(ctx -> world_size * sizeof(int)); ctx -> rank_n_points = (idx_t*)malloc(ctx -> world_size * sizeof(idx_t)); ctx -> rank_idx_start = (idx_t*)malloc(ctx -> world_size * sizeof(idx_t)); ctx -> local_datapoints = NULL; ctx -> __local_heap_buffers = NULL; ctx -> input_data_in_float32 = -1; Loading src/common/common.h +19 −14 Original line number Diff line number Diff line Loading @@ -6,13 +6,18 @@ #include <stdint.h> #include <time.h> #include "../tree/heap.h" #define DEFAULT_MSG_LEN 10000000 //#include <stdarg.h> #define PARALLEL_FIX_BORDERS // #define WRITE_NGBH // #define WRITE_TOP_NODES // #define WRITE_DENSITY // #define WRITE_CLUSTER_ASSIGN_H1 // #define WRITE_BORDERS // #define WRITE_CENTERS_PRE_MERGING // #define WRITE_MERGES_INFO // #define WRITE_MERGING_TABLE // #define WRITE_FINAL_ASSIGNMENT Loading Loading @@ -159,8 +164,8 @@ typedef struct global_context_t size_t idx_start; //starting index of the points on the processor size_t local_n_points; //number of points stored in the current processor datapoint_info_t* local_datapoints; //pointer to the datapoint properties int* rank_idx_start; //starting index of datapoints in each processor int* rank_n_points; //processor name idx_t* rank_idx_start; //starting index of datapoints in each processor idx_t* rank_n_points; //processor name heap_node* __local_heap_buffers; //buffer that stores nearest neighbors MPI_Comm mpi_communicator; //mpi communicator char input_data_file[DEFAULT_STR_LEN]; Loading src/main/main.c +9 −11 Original line number Diff line number Diff line Loading @@ -299,7 +299,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) float_t *pvt_data = (float_t *)MY_MALLOC(send_counts[ctx->mpi_rank] * sizeof(float_t)); uint64_t default_msg_len = 10000000; //bytes if(I_AM_MASTER) { Loading @@ -310,7 +309,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) already_sent_points = 0; while(already_sent_points < send_counts[i]) { int count_send = MIN(default_msg_len, send_counts[i] - already_sent_points); int count_send = MIN(DEFAULT_MSG_LEN, send_counts[i] - already_sent_points); MPI_Send(data + displacements[i] + already_sent_points, count_send, MPI_MY_FLOAT, i, ctx -> mpi_rank, ctx -> mpi_communicator); already_sent_points += count_send; } Loading Loading @@ -394,7 +393,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) MPI_DB_PRINT("----- Performing ngbh search -----\n"); MPI_Barrier(ctx -> mpi_communicator); HERE mpi_ngbh_search(ctx, dp_info, &tree, &local_tree, ctx -> local_data, ctx -> k); MPI_Barrier(ctx -> mpi_communicator); Loading Loading
Makefile +2 −2 Original line number Diff line number Diff line CC=mpicc #CC=mpiicx #CFLAGS=-O3 -march=native -flto -funroll-loops -fopenmp CFLAGS=-O3 -fopenmp CFLAGS=-O3 -march=native -flto -funroll-loops -fopenmp #CFLAGS=-O3 -fopenmp LDFLAGS=-lm all: main Loading
src/adp/adp.c +228 −83 Original line number Diff line number Diff line #include "adp.h" #include "mpi.h" #include <bits/time.h> #include <stdio.h> #include <time.h> #include <unistd.h> const border_t border_null = {.density = -1.0, .error = 0, .idx = NOBORDER}; Loading Loading @@ -133,7 +137,6 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int MPI_Barrier(ctx -> mpi_communicator); idx_t k = ctx -> k; struct timespec start_tot, finish_tot; double elapsed_tot; Loading @@ -159,21 +162,10 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int // Also, shall we give som info argument to it ? // MPI_Win exposed_ngbh; MPI_Win_create( ctx -> __local_heap_buffers, ctx -> local_n_points * k * sizeof(heap_node), 1, MPI_INFO_NULL, ctx -> mpi_communicator, &exposed_ngbh); MPI_Win_fence(MPI_MODE_NOPUT, exposed_ngbh); MPI_Barrier(ctx -> mpi_communicator); #pragma omp parallel for for(idx_t i = 0; i < ctx -> local_n_points; ++i) { for(idx_t k = 0; k <= kMAX; ++k) for(idx_t k = 0; k <= ctx -> k; ++k) { local_datapoints[i].ngbh[k].value = omega * pow(local_datapoints[i].ngbh[k].value, d/2.); } Loading @@ -181,6 +173,15 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int local_datapoints[i].kstar = 0; } MPI_Win exposed_ngbh; MPI_Win_create( ctx -> __local_heap_buffers, ctx -> local_n_points * ctx -> k * sizeof(heap_node), 1, MPI_INFO_NULL, ctx -> mpi_communicator, &exposed_ngbh); MPI_Win_fence(MPI_MODE_NOPUT, exposed_ngbh); int i_have_finished = 0; int all_have_finished = 0; int finished_points = 0; Loading Loading @@ -219,11 +220,12 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int } else { // if(pos > ctx -> rank_n_points[owner]) printf("NOPE HERE from rank %d %lu %d\n", ctx -> mpi_rank, pos, ctx -> rank_n_points[owner]); MPI_Get(scratch_heap_nodes + i, sizeof(heap_node), MPI_BYTE, owner, (MPI_Aint)((pos * (ctx -> k) + ksel) * sizeof(heap_node)), (MPI_Aint)((pos * (idx_t)(ctx -> k) + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_BYTE, exposed_ngbh); Loading Loading @@ -1542,7 +1544,15 @@ void Heuristic2(global_context_t* ctx, clusters_t* cluster) MPI_Send(&num_border_el, 1, MPI_UINT64_T, rank_to_send, rank_to_send, ctx -> mpi_communicator); MPI_Send(borders_to_send, num_border_el * sizeof(sparse_border_t), MPI_BYTE , rank_to_send, rank_to_send, ctx -> mpi_communicator); idx_t el_already_sent = 0; while( el_already_sent < num_border_el) { idx_t el_to_send = MIN(DEFAULT_MSG_LEN, num_border_el - el_already_sent); MPI_Send(borders_to_send + el_already_sent, el_to_send * sizeof(sparse_border_t), MPI_BYTE , rank_to_send, rank_to_send, ctx -> mpi_communicator); el_already_sent += el_to_send; } i_have_sent = 1; free(borders_to_send); Loading @@ -1558,7 +1568,27 @@ void Heuristic2(global_context_t* ctx, clusters_t* cluster) sparse_border_t* borders_to_recv = (sparse_border_t*)MY_MALLOC(num_borders_recv * sizeof(sparse_border_t)); MPI_Recv(borders_to_recv, num_borders_recv * sizeof(sparse_border_t), MPI_BYTE , MPI_ANY_SOURCE, ctx -> mpi_rank, ctx -> mpi_communicator, MPI_STATUS_IGNORE); idx_t el_already_recv = 0; while(el_already_recv < num_borders_recv) { MPI_Status status = {0}; MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &status); MPI_Request request; int count_recv; int source = status.MPI_SOURCE; MPI_Get_count(&status, MPI_BYTE, &count_recv); idx_t el_to_recv = count_recv/sizeof(sparse_border_t); MPI_Recv(borders_to_recv + el_already_recv, count_recv, MPI_BYTE , MPI_ANY_SOURCE, ctx -> mpi_rank, ctx -> mpi_communicator, MPI_STATUS_IGNORE); el_already_recv += el_to_recv; } for(int i = 0; i < num_borders_recv; ++i) { Loading Loading @@ -1618,16 +1648,25 @@ void Heuristic2(global_context_t* ctx, clusters_t* cluster) if(I_AM_MASTER) { printf("[MASTER] Writing borders to bb/borders.csv\n"); FILE* f = fopen("bb/borders.csv", "w"); // FILE* f = fopen("bb/borders.csv", "w"); // for(idx_t i = 0; i < nclus; ++i) // { // for(int j = 0; j < cluster -> sparse_borders[i].count; ++j) // { // sparse_border_t b = cluster -> sparse_borders[i].data[j]; // fprintf(f, "%lu,%lu,%lu,%lf,%lf\n", b.i, b.j, b.idx, b.density, b.error); // } // } // fclose(f); FILE* f = fopen("bb/borders.bin", "wb"); for(idx_t i = 0; i < nclus; ++i) { for(int j = 0; j < cluster -> sparse_borders[i].count; ++j) { sparse_border_t b = cluster -> sparse_borders[i].data[j]; fprintf(f, "%lu,%lu,%lu,%lf\n", b.i, b.j, b.idx, b.density); } fwrite(cluster -> sparse_borders[i].data, sizeof(sparse_border_t), cluster -> sparse_borders[i].count, f); } fclose(f); } Loading Loading @@ -1751,7 +1790,8 @@ static inline void delete_adj_list_element(clusters_t * c, const idx_t list_idx, c -> sparse_borders[list_idx].count -= 1; } void fix_sparse_borders_A_into_B(idx_t s,idx_t t, clusters_t* c) #ifdef PARALLEL_FIX_BORDERS void fix_sparse_borders_A_into_B(idx_t source,idx_t target, clusters_t* clusters_obj) { /* * Handle borders after two clusters are merged Loading @@ -1764,67 +1804,119 @@ void fix_sparse_borders_A_into_B(idx_t s,idx_t t, clusters_t* c) * density is kept */ for(idx_t el = 0; el < clusters_obj -> sparse_borders[target].count; ++el) { { for(idx_t el = 0; el < c -> sparse_borders[t].count; ++el) { sparse_border_t b = c -> sparse_borders[t].data[el]; if(b.i == t && b.j == s) sparse_border_t b = clusters_obj -> sparse_borders[target].data[el]; if(b.i == target && b.j == source) { //delete the border src trg delete_adj_list_element(c, t, el); delete_adj_list_element(clusters_obj, target, el); } } //find the border and delete it for(idx_t el = 0; el < clusters_obj -> sparse_borders[source].count; ++el) { sparse_border_t b = clusters_obj -> sparse_borders[source].data[el]; if(b.j != target) { //insert these borders as trg -> j and j -> trg b.i = target; sparse_border_insert(clusters_obj, b); } } //find the border and delete it, other insert them in correct place for(idx_t el = 0; el < c -> sparse_borders[s].count; ++el) // other insert them in correct place #pragma omp parallel for for(idx_t el = 0; el < clusters_obj -> sparse_borders[source].count; ++el) { sparse_border_t b = c -> sparse_borders[s].data[el]; // idx_t ii = b.i; if(b.j != t) sparse_border_t b = clusters_obj -> sparse_borders[source].data[el]; if(b.j != target) { // insert these borders as trg -> j and j -> trg b.i = t; sparse_border_insert(c, b); // fix the same, b.i should now be the target b.i = target; sparse_border_t bsym = b; bsym.i = b.j; bsym.j = b.i; sparse_border_insert(c, bsym); for(idx_t dl = 0; dl < c -> sparse_borders[b.j].count; ++dl) sparse_border_insert(clusters_obj, bsym); for(idx_t dl = 0; dl < clusters_obj -> sparse_borders[b.j].count; ++dl) { sparse_border_t b_del = c -> sparse_borders[b.j].data[dl]; if(b_del.j == s) sparse_border_t b_del = clusters_obj -> sparse_borders[b.j].data[dl]; if(b_del.j == source) { //delete the border src trg delete_adj_list_element(c, b.j, dl); delete_adj_list_element(clusters_obj, b.j, dl); } } } } //clean up all borders //delete the src list adj_list_reset((clusters_obj->sparse_borders) + source); } #else void fix_sparse_borders_A_into_B(idx_t source,idx_t target, clusters_t* clusters_obj) { /* * Handle borders after two clusters are merged * - idx_t s : source cluster, the one has to be merged * - idx_t t : target cluster, the one recieves the merge * - clusters* c : object containing all the data * * When s goes into t all the clusters which had a border with s now they must have * a border with t. If t already has a border like that, the border with higher * density is kept */ for(idx_t el = 0; el < clusters_obj -> sparse_borders[target].count; ++el) { sparse_border_t b = clusters_obj -> sparse_borders[target].data[el]; if(b.i == target && b.j == source) { adj_list_reset((c->sparse_borders) + s); //delete the border src trg delete_adj_list_element(clusters_obj, target, el); } //delete all borders containing src // for(idx_t i = 0; i < nclus; ++i) // { // for(idx_t el = 0; el < c -> sparse_borders[i].count; ++el) // { // sparse_border_t b = c -> sparse_borders[i].data[el]; // if(b.j == s) // { // //delete the border src trg // delete_adj_list_element(c, i, el); // } // } // // } } //find the border and delete it for(idx_t el = 0; el < clusters_obj -> sparse_borders[source].count; ++el) { sparse_border_t b = clusters_obj -> sparse_borders[source].data[el]; if(b.j != target) { //insert these borders as trg -> j and j -> trg b.i = target; sparse_border_insert(clusters_obj, b); sparse_border_t bsym = b; bsym.i = b.j; bsym.j = b.i; sparse_border_insert(clusters_obj, bsym); for(idx_t dl = 0; dl < clusters_obj -> sparse_borders[b.j].count; ++dl) { sparse_border_t b_del = clusters_obj -> sparse_borders[b.j].data[dl]; if(b_del.j == source) { //delete the border src trg delete_adj_list_element(clusters_obj, b.j, dl); } } } } //clean up all borders //delete the src list adj_list_reset((clusters_obj->sparse_borders) + source); } #endif void merge_A_into_B(idx_t* who_amI, idx_t cluster_A, idx_t cluster_B, idx_t n) { Loading Loading @@ -1919,9 +2011,24 @@ void master_finds_borders(global_context_t* ctx, clusters_t* cluster, float_t Z, #endif #ifdef WRITE_MERGES_INFO FILE* f = fopen("bb/merges_info.csv", "w"); struct timespec start_merge, end_merge; #endif for( idx_t m = 0; m < merge_count; m++ ) { #if defined(WRITE_MERGES_INFO) clock_gettime(CLOCK_MONOTONIC, &start_merge); #endif // print progress if(merge_count > 1e5) { int slice = merge_count / 20; if(m % slice == 0 || m == merge_count - 1) printf("Merging progress: %lu / %lu -> %.2f \n", m, merge_count, (float)m/(float)merge_count * 100.); } #define src surviving_clusters[merging_table[m].source] #define trg surviving_clusters[merging_table[m].target] Loading Loading @@ -1970,8 +2077,10 @@ void master_finds_borders(global_context_t* ctx, clusters_t* cluster, float_t Z, * first -> fix the borders, delete old ones and spawn new one in the correct position * second -> update the surviving_clusters buffer */ fix_sparse_borders_A_into_B(new_src, new_trg, cluster); merge_A_into_B(surviving_clusters, new_src, new_trg, nclus ); } break; Loading @@ -1979,10 +2088,26 @@ void master_finds_borders(global_context_t* ctx, clusters_t* cluster, float_t Z, break; } #if defined(WRITE_MERGES_INFO) clock_gettime(CLOCK_MONOTONIC, &end_merge); fprintf(f, "%lu,%lu,%lu,%lu,%f,%d\n", new_src, new_trg, cluster -> sparse_borders[new_src].count, cluster -> sparse_borders[new_trg].count, (float)(end_merge.tv_sec - start_merge.tv_sec) + (float)(end_merge.tv_nsec - start_merge.tv_nsec)/1e9, i_have_to_merge && src != trg); fflush(f); #endif #undef src #undef trg } #if defined(WRITE_MERGES_INFO) fclose(f); #endif free(merging_table); } Loading Loading @@ -2130,9 +2255,29 @@ void Heuristic3(global_context_t* ctx, clusters_t* cluster, float_t Z, int halo) } } MPI_Win_fence(0, dp_info_win); MPI_Win_free(&dp_info_win); qsort(centers_dp, cluster -> centers.count, sizeof(datapoint_info_t), compare_dp_by_cidx); #if defined(WRITE_CENTERS_PRE_MERGING) if(I_AM_MASTER) { printf("[MASTER] Writing centers pre merging bb/centers_pre_merging.csv\n"); // FILE* f = fopen("bb/centers_pre_merging.csv", "w"); // for(idx_t i = 0; i < cluster -> centers.count; ++i) // { // datapoint_info_t dp = centers_dp[i]; // fprintf(f, "%lu,%d,%lf,%lf\n", dp.array_idx, dp.cluster_idx, dp.log_rho_c, dp.log_rho_err); // } // fclose(f); FILE* f = fopen("bb/centers_pre_merging.bin", "wb"); fwrite(centers_dp, sizeof(datapoint_info_t), cluster -> centers.count, f); fclose(f); } #endif master_finds_borders(ctx, cluster, Z, surviving_clusters, centers_dp); master_fixes_border_matrix_and_centers(ctx, cluster, Z, old_to_new, surviving_clusters, nclus); free(centers_dp); Loading @@ -2140,8 +2285,8 @@ void Heuristic3(global_context_t* ctx, clusters_t* cluster, float_t Z, int halo) else { MPI_Win_fence(0, dp_info_win); } MPI_Win_free(&dp_info_win); } /* at this point master has the final border matrix * with the final list of surviving clusters Loading
src/common/common.c +2 −2 Original line number Diff line number Diff line Loading @@ -15,8 +15,8 @@ void get_context(global_context_t* ctx) ctx -> local_data = NULL; ctx -> lb_box = NULL; ctx -> ub_box = NULL; ctx -> rank_n_points = (int*)malloc(ctx -> world_size * sizeof(int)); ctx -> rank_idx_start = (int*)malloc(ctx -> world_size * sizeof(int)); ctx -> rank_n_points = (idx_t*)malloc(ctx -> world_size * sizeof(idx_t)); ctx -> rank_idx_start = (idx_t*)malloc(ctx -> world_size * sizeof(idx_t)); ctx -> local_datapoints = NULL; ctx -> __local_heap_buffers = NULL; ctx -> input_data_in_float32 = -1; Loading
src/common/common.h +19 −14 Original line number Diff line number Diff line Loading @@ -6,13 +6,18 @@ #include <stdint.h> #include <time.h> #include "../tree/heap.h" #define DEFAULT_MSG_LEN 10000000 //#include <stdarg.h> #define PARALLEL_FIX_BORDERS // #define WRITE_NGBH // #define WRITE_TOP_NODES // #define WRITE_DENSITY // #define WRITE_CLUSTER_ASSIGN_H1 // #define WRITE_BORDERS // #define WRITE_CENTERS_PRE_MERGING // #define WRITE_MERGES_INFO // #define WRITE_MERGING_TABLE // #define WRITE_FINAL_ASSIGNMENT Loading Loading @@ -159,8 +164,8 @@ typedef struct global_context_t size_t idx_start; //starting index of the points on the processor size_t local_n_points; //number of points stored in the current processor datapoint_info_t* local_datapoints; //pointer to the datapoint properties int* rank_idx_start; //starting index of datapoints in each processor int* rank_n_points; //processor name idx_t* rank_idx_start; //starting index of datapoints in each processor idx_t* rank_n_points; //processor name heap_node* __local_heap_buffers; //buffer that stores nearest neighbors MPI_Comm mpi_communicator; //mpi communicator char input_data_file[DEFAULT_STR_LEN]; Loading
src/main/main.c +9 −11 Original line number Diff line number Diff line Loading @@ -299,7 +299,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) float_t *pvt_data = (float_t *)MY_MALLOC(send_counts[ctx->mpi_rank] * sizeof(float_t)); uint64_t default_msg_len = 10000000; //bytes if(I_AM_MASTER) { Loading @@ -310,7 +309,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) already_sent_points = 0; while(already_sent_points < send_counts[i]) { int count_send = MIN(default_msg_len, send_counts[i] - already_sent_points); int count_send = MIN(DEFAULT_MSG_LEN, send_counts[i] - already_sent_points); MPI_Send(data + displacements[i] + already_sent_points, count_send, MPI_MY_FLOAT, i, ctx -> mpi_rank, ctx -> mpi_communicator); already_sent_points += count_send; } Loading Loading @@ -394,7 +393,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) MPI_DB_PRINT("----- Performing ngbh search -----\n"); MPI_Barrier(ctx -> mpi_communicator); HERE mpi_ngbh_search(ctx, dp_info, &tree, &local_tree, ctx -> local_data, ctx -> k); MPI_Barrier(ctx -> mpi_communicator); Loading