Commit 86a03a48 authored by lykos98's avatar lykos98
Browse files

polished implementation

parent 7fb847f1
Loading
Loading
Loading
Loading
+3 −29
Original line number Diff line number Diff line
@@ -464,7 +464,6 @@ float_t get_j_ksel_dist_v2(global_context_t* ctx, idx_t i, idx_t j, idx_t ksel,
            flags[i] = 0;
            idx_t pos  = j - ctx -> rank_idx_start[owner];
            MPI_Get(tmp_heap_nodes + i, sizeof(heap_node), MPI_BYTE, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_BYTE, *exposed_ngbh);
            //if(ctx -> mpi_rank == 0) DB_PRINT("rvcd %lu %lf\n", el.array_idx, el.value);
            return 0;
        }                 
    }
@@ -726,7 +725,6 @@ void compute_correction(global_context_t* ctx, float_t Z)

    }

    //printf("%lf\n",min_log_rho);
}

clusters_t Heuristic1(global_context_t *ctx, int verbose)
@@ -734,8 +732,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)
    /*
     * Heurisitc 1, from paper of Errico, Facco, Laio & Rodriguez 
     * ( https://doi.org/10.1016/j.ins.2021.01.010 )              
     *                                                            
     * args:                                                      
     */

    datapoint_info_t* dp_info = ctx -> local_datapoints;
@@ -753,9 +749,8 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)
    TIME_DEF;


    //idx_t ncenters = 0;
    //idx_t putativeCenters = n;
    lu_dynamic_array_t all_centers, removed_centers, actual_centers, max_rho;

    lu_dynamic_array_allocate(&all_centers);
    lu_dynamic_array_allocate(&removed_centers);
    lu_dynamic_array_allocate(&actual_centers);
@@ -811,7 +806,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)
        }
        if(dp_info[i].is_center)
        {
            //lu_dynamic_array_pushBack(&all_centers, dp_info[i].array_idx);
            #pragma omp critical
            {
                lu_dynamic_array_pushBack(&all_centers, i);
@@ -981,19 +975,13 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)
    int n_centers = (int)actual_centers.count;
    int tot_centers;
    MPI_Allreduce(&n_centers, &tot_centers, 1, MPI_INT, MPI_SUM, ctx -> mpi_communicator);
    // MPI_Allreduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
    /*
    DB_PRINT("rank %d got %d heheh\n", ctx -> mpi_rank, (int)actual_centers.count);
    DB_PRINT("rank %d tc %d rc %d\n", ctx -> mpi_rank, (int)all_centers.count, (int)removed_centers.count);
    */
    MPI_DB_PRINT("Found %d temporary centers\n", tot_centers);

    MPI_DB_PRINT("Found %d temporary centers\n", tot_centers);

    /* bring on master all centers 
     * order them in ascending order of density, 
     * then re-scatter them around to get unique cluster labels */ 


    center_t* private_centers_buffer = (center_t*)MY_MALLOC(actual_centers.count * sizeof(center_t));

    center_t* global_centers_buffer  = (center_t*)MY_MALLOC(tot_centers * sizeof(center_t));
@@ -1033,11 +1021,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)
    free(center_counts);
    free(center_displs);

    /*
     * Sort all the dp_info based on g and then perform the cluster assignment
     * in asceding order                                                     
     */

    /*
     * Sort all the dp_info based on g and then perform the cluster assignment
     * in asceding order                                                     
@@ -1082,7 +1065,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)
                        wait_for_comms = 1;
                        break;
                    }
                    //if(p -> array_idx == 1587636) printf("%lu k %d p_idx %lu %lf %lf\n", k, cluster, p_idx, p_retrieved.g, p -> g );
                }


@@ -1097,11 +1079,9 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)
                        {
                            idx_t max_rho_idx = max_rho.data[m];
                            datapoint_info_t dp_max_rho = find_possibly_halo_datapoint_rma(ctx, max_rho_idx, win_datapoints);
                            //datapoint_info_t dp_max_rho = dp_info[max_rho_idx];
                            float_t gcand = dp_max_rho.g;
                            if(ngbh_index == removed_centers.data[m] && gcand > gmax)
                            {   
                                //printf("%lu -- %lu\n", ele, m);
                                gmax = gcand;
                                gm_index = max_rho.data[m];
                            }
@@ -1110,7 +1090,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)
                    if(gm_index != SIZE_MAX)
                    {
                        datapoint_info_t dp_gm = find_possibly_halo_datapoint_rma(ctx, gm_index, win_datapoints);
                        //datapoint_info_t dp_gm = dp_info[gm_index];
                        cluster = dp_gm.cluster_idx;
                    }

@@ -1123,7 +1102,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)

        }

        //DB_PRINT("rank %d proc %d\n", ctx -> mpi_rank, proc_points);
        MPI_Allreduce(MPI_IN_PLACE, &completed, 1, MPI_INT, MPI_SUM, ctx -> mpi_communicator);
        completed = completed == ctx -> world_size ? 1 : 0;

@@ -1222,8 +1200,6 @@ void Heuristic2(global_context_t* ctx, clusters_t* cluster)
    MPI_Win_lock_all(0, ngbh_win);
    MPI_Barrier(ctx -> mpi_communicator);



    #define borders cluster->borders

    struct timespec start_tot, finish_tot;
@@ -1254,7 +1230,6 @@ void Heuristic2(global_context_t* ctx, clusters_t* cluster)
                /*Loop over kn neigbhours to find if n is the nearest*/
                /*if cluster of the particle in nbhg is c then check is neighborhood*/                                                

                //DB_PRINT("my rank %d l %lu %lu %d\n", ctx -> mpi_rank, k, j, foreign_owner(ctx, j));
                datapoint_info_t j_dp = find_possibly_halo_datapoint_rma(ctx, j, dp_info_win);
                if(j_dp.cluster_idx != c)
                {
@@ -1293,10 +1268,9 @@ void Heuristic2(global_context_t* ctx, clusters_t* cluster)
		{
            datapoint_info_t j_dp = find_possibly_halo_datapoint_rma(ctx, pp, dp_info_win);
			int ppc = j_dp.cluster_idx;
            //insert one and symmetric one

            sparse_border_t b = {.i = c, .j = ppc, .idx = ctx -> local_datapoints[i].array_idx, .density = ctx -> local_datapoints[i].g, .error = ctx -> local_datapoints[i].log_rho_err}; 
            sparse_border_insert(cluster, b);
            ////get symmetric border
            sparse_border_t bsym = {.i = ppc, .j = c, .idx = ctx -> local_datapoints[i].array_idx, .density = ctx -> local_datapoints[i].g, .error = ctx -> local_datapoints[i].log_rho_err}; 
            sparse_border_insert(cluster, bsym);
		}
+0 −1
Original line number Diff line number Diff line
@@ -3,7 +3,6 @@

#define PREALLOC_BORDERS 100
#define MAX_SERIAL_MERGING 100
#define PRINT_H2_COMM_SCHEME

typedef struct border_t 
{
+4 −3
Original line number Diff line number Diff line
@@ -10,13 +10,14 @@

//#define WRITE_NGBH
//#define WRITE_TOP_NODES
#define WRITE_DENSITY
#define WRITE_CLUSTER_ASSIGN_H1
//#define WRITE_DENSITY
//#define WRITE_CLUSTER_ASSIGN_H1
//#define WRITE_BORDERS
//#define WRITE_MERGING_TABLE
#define WRITE_FINAL_ASSIGNMENT

#define PRINT_NGBH_EXCHANGE_SCHEME
//#define PRINT_NGBH_EXCHANGE_SCHEME
#define PRINT_H2_COMM_SCHEME

typedef struct datapoint_info_t {
    idx_t array_idx;
+2 −2
Original line number Diff line number Diff line
@@ -106,10 +106,10 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    
        // 190M points
        // std_g2980844_091_0000
        // data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",MY_TRUE);
        data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",MY_TRUE);
        
        /* 1M points ca.*/
        data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE);
        // data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE);

        /* BOX */
        // data = read_data_file(ctx,"../norm_data/std_Box_256_30_092_0000",MY_TRUE);
+10 −119
Original line number Diff line number Diff line
@@ -1572,14 +1572,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
    for(int i = 0; i < ctx -> world_size; ++i)
    {
        /* allocate it afterwards */

        /* OLD VERSION 
        data_to_send_per_proc[i]  = (float_t*)MY_MALLOC(100 * (ctx -> dims) * sizeof(float_t));    
        local_idx_of_the_point[i] = (int*)MY_MALLOC(100 * sizeof(int));    
        point_to_snd_capacity[i] = 100;
        */

        /* NEW VERSION with double tree walk */
        point_to_snd_capacity[i] = 0;
        point_to_snd_count[i]    = 0;
    }
@@ -1776,8 +1768,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre

    /* -------------------------------------
     * ALTERNATIVE TO ALL TO ALL FOR BIG MSG
     * HERE IT BREAKS mpi cannot handle msg
     * lager than 4GB
     * ------------------------------------- */
    
    MPI_Barrier(ctx -> mpi_communicator);
@@ -1805,109 +1795,8 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre

    int req_idx = 0;

    /* ---------------------------------------------------- */ 
    // FROM HERE
    //heap_node* __heap_batches_to_rcv = (heap_node*)MY_MALLOC((uint64_t)k * (uint64_t)tot_points_snd * sizeof(heap_node));
    //if( __heap_batches_to_rcv == NULL)
    //{
    //    DB_PRINT("Rank %d failed to allocate rcv_heaps %luB required\n",ctx -> mpi_rank, (uint64_t)k * (uint64_t)tot_points_rcv * sizeof(heap_node));
    //    exit(1);
    //}

    //heap_node** rcv_heap_batches = (heap_node**)MY_MALLOC(ctx -> world_size * sizeof(heap_node*));
    //for(int i = 0; i < ctx -> world_size; ++i)
    //{
    //    rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] * k;
    //}

    //HERE

    //for(int i = 0; i < ctx -> world_size; ++i)
    //{
    //    int count = 0;
    //    if(ngbh_to_send[i] > 0)
    //    {
    //        while(already_sent_points[i] < ngbh_to_send[i])
    //        {
    //            MPI_Request request;
    //            count = MIN(default_msg_len, ngbh_to_send[i] - already_sent_points[i] );
    //            MPI_Isend(  heap_batches_per_node[i] + k * already_sent_points[i], count,  
    //                    MPI_my_heap, i, 0, ctx -> mpi_communicator, &request);
    //            already_sent_points[i] += count;
    //            req_array[req_idx] = request;
    //            ++req_idx;
    //        }
    //    }
    //}

    ///* Here it breaks for six nodes */

    //HERE;
    //
    //MPI_Barrier(ctx -> mpi_communicator);
    //MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status);
    ////DB_PRINT("%d %p %p\n",ctx -> mpi_rank, &flag, &status);
    //while(flag)
    //{
    //    MPI_Request request;
    //    int count; 
    //    int source = status.MPI_SOURCE;
    //    MPI_Get_count(&status, MPI_my_heap, &count);
    //    /* recieve each slice */

    //    MPI_Recv(rcv_heap_batches[source] + k * already_rcvd_points[source], 
    //            count, MPI_my_heap, source, MPI_ANY_TAG, ctx -> mpi_communicator, &status);

    //    already_rcvd_points[source] += count;
    //    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status);

    //}
    //MPI_Barrier(ctx -> mpi_communicator);


    //MPI_Testall(req_num, req_array, &flag, MPI_STATUSES_IGNORE);

    //if(flag == 0)
    //{
    //    DB_PRINT("[!!!] Rank %d has unfinished communications\n", ctx -> mpi_rank);
    //    exit(1);
    //}
    //free(req_array);
    //free(already_sent_points);
    //free(already_rcvd_points);

    //elapsed_time = TIME_STOP;
    //LOG_WRITE("Sending results to other proc", elapsed_time);

    ///* merge old with new heaps */

    //MPI_Barrier(ctx -> mpi_communicator);

    //TIME_START;

    //for(int i = 0; i < ctx -> world_size; ++i)
    //{
    //    #pragma omp paralell for
    //    for(int b = 0; b < ngbh_to_recv[i]; ++b)
    //    {
    //        int idx = local_idx_of_the_point[i][b];
    //        /* retrieve the heap */
    //        heap H;
    //        H.count = k;
    //        H.N     = k;
    //        H.data  = rcv_heap_batches[i] + k*b;
    //        /* insert the points into the heap */
    //        for(int j = 0; j < k; ++j)
    //        {
    //            insert_max_heap(&(dp_info[idx].ngbh), H.data[j].value, H.data[j].array_idx);
    //        }
    //    }
    //}
    /* ----------- TO HERE ---------------------------- */

    // find the maximum number of points to send */
    
    
    idx_t max_n_recv = 0;
    for(int i = 0; i < ctx -> world_size; ++i)
    {
@@ -1958,10 +1847,11 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
            }
        }

        if(ngbh_to_send[rank_to_send] != already_sent_points[rank_to_send] || point_to_rcv_count[rank_to_send] != already_sent_points[rank_to_send])
        if(     ngbh_to_send[rank_to_send] != already_sent_points[rank_to_send] || 
                point_to_rcv_count[rank_to_send] != already_sent_points[rank_to_send])
        {
            DB_PRINT("Madonnina del mare send [rank %d] %d %d %d\n", ctx -> mpi_rank, ngbh_to_send[rank_to_send], already_sent_points[rank_to_send], point_to_snd_count[rank_to_send]);

            DB_PRINT("ERROR OCCURRED in sending points [rank %d] got %d expected %d\n", 
                    ctx -> mpi_rank, already_rcvd_points[rank_to_send], point_to_rcv_count[rank_to_send]);
        }
        
        MPI_Barrier(ctx -> mpi_communicator);
@@ -1985,11 +1875,13 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
            }
        }

        if(ngbh_to_recv[rank_to_recv] != already_rcvd_points[rank_to_recv] || point_to_snd_count[rank_to_recv] != already_rcvd_points[rank_to_recv])
        if(     ngbh_to_recv[rank_to_recv] != already_rcvd_points[rank_to_recv] || 
                point_to_snd_count[rank_to_recv] != already_rcvd_points[rank_to_recv])
        {
            DB_PRINT("Madonnina del mare [rank %d] %d %d %d\n", ctx -> mpi_rank, ngbh_to_recv[rank_to_recv], already_rcvd_points[rank_to_recv], point_to_snd_count[rank_to_recv]);

            DB_PRINT("ERROR OCCURRED in recieving points [rank %d] got %d expected %d\n", 
                    ctx -> mpi_rank, already_rcvd_points[rank_to_recv], point_to_snd_count[rank_to_recv]);
        }

        /* merge lists */
        #pragma omp paralell for
        for(int b = 0; b < ngbh_to_recv[rank_to_recv]; ++b)
@@ -2016,14 +1908,13 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre

    if(flag == 0)
    {
        DB_PRINT("[!!!] Rank %d has unfinished communications\n", ctx -> mpi_rank);
        DB_PRINT("ERROR OCCURRED Rank %d has unfinished communications\n", ctx -> mpi_rank);
        exit(1);
    }
    free(req_array);
    free(already_sent_points);
    free(already_rcvd_points);


    /* -------------------------------------------------------- */
    /* heapsort them */