Commit 7fb847f1 authored by lykos98's avatar lykos98
Browse files

fixed finally bug on ucx_rdma error

parent bb07cd30
Loading
Loading
Loading
Loading
+5 −5
Original line number Diff line number Diff line
@@ -1320,15 +1320,15 @@ void Heuristic2(global_context_t* ctx, clusters_t* cluster)
    #define RECV 0
    #define DO_NOTHING -1

    MPI_Barrier(ctx -> mpi_communicator);

    while(ranks > 1)
    {
        int dp = ranks % 2;
        ranks = ranks / 2 + dp;
        int send_rcv = ctx -> mpi_rank >= ranks;
        int send_rcv = (ctx -> mpi_rank >= ranks);

        MPI_Barrier(ctx -> mpi_communicator);

        if(dp && ctx -> mpi_rank == ranks - 1) send_rcv = DO_NOTHING;
        if(dp && ctx -> mpi_rank == (ranks - 1)) send_rcv = DO_NOTHING;

        switch (send_rcv) 
        {
@@ -1392,9 +1392,9 @@ void Heuristic2(global_context_t* ctx, clusters_t* cluster)
                #endif
                break;
        }
        MPI_Barrier(ctx -> mpi_communicator);
        #if defined(PRINT_H2_COMM_SCHEME)
        MPI_DB_PRINT("-----------------\n");
        MPI_Barrier(ctx -> mpi_communicator);
        #endif

    }
+11 −0
Original line number Diff line number Diff line
@@ -8,6 +8,16 @@
#include "../tree/heap.h"
//#include <stdarg.h>

//#define WRITE_NGBH
//#define WRITE_TOP_NODES
#define WRITE_DENSITY
#define WRITE_CLUSTER_ASSIGN_H1
//#define WRITE_BORDERS
//#define WRITE_MERGING_TABLE
#define WRITE_FINAL_ASSIGNMENT

#define PRINT_NGBH_EXCHANGE_SCHEME

typedef struct datapoint_info_t {
    idx_t array_idx;
    heap ngbh;
@@ -183,3 +193,4 @@ void lu_dynamic_array_init(lu_dynamic_array_t * a);
void print_error_code(int err);

void ordered_data_to_file(global_context_t* ctx);
void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname);
+4 −3
Original line number Diff line number Diff line
@@ -5,6 +5,7 @@
#include "../tree/tree.h"
#include "../adp/adp.h"


//

#ifdef THREAD_FUNNELED
@@ -58,9 +59,9 @@ int main(int argc, char** argv) {
    #endif

    #if defined (THREAD_FUNNELED)
        mpi_printf(&ctx,"/!\\ Code build with MPI_THREAD_FUNNELED level\n");
        mpi_printf(&ctx,"/!\\ Code built with MPI_THREAD_FUNNELED level\n");
    #else
        mpi_printf(&ctx,"/!\\ Code build with MPI_THREAD_MULTIPLE level\n");
        mpi_printf(&ctx,"/!\\ Code built with MPI_THREAD_MULTIPLE level\n");
    #endif

	/*
+194 −91
Original line number Diff line number Diff line
@@ -19,13 +19,6 @@
#include <omp.h>
#include <sys/sysinfo.h>

//#define WRITE_NGBH
//#define WRITE_TOP_NODES
#define WRITE_DENSITY
//#define WRITE_CLUSTER_ASSIGN_H1
//#define WRITE_BORDERS
//#define WRITE_MERGING_TABLE
#define WRITE_FINAL_ASSIGNMENT


/* 
@@ -37,7 +30,7 @@
//#define MAX_MSG_SIZE 4294967296

/* Used slices of 10 mb ? Really good? Maybe at the cause of TID thing */
#define MAX_MSG_SIZE 1000000000 
#define MAX_MSG_SIZE (10000 * k * sizeof(heap_node))


#define TOP_TREE_RCH 1
@@ -1591,23 +1584,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
        point_to_snd_count[i]    = 0;
    }

    /* for each point walk the tree and find to which proc send data */
    /* actually compute intersection of ngbh radius of each point to node box */

    /* OLD VERSION SINGLE TREE WALK */
    /*
    #pragma omp parallel for
    for(int i = 0; i < ctx -> local_n_points; ++i)
    {
        float_t max_dist = dp_info[i].ngbh.data[0].value;
        float_t* point   = ctx -> local_data + (i * ctx -> dims);

        tree_walk(ctx, top_tree -> root, i, max_dist, 
                  point, data_to_send_per_proc, local_idx_of_the_point, 
                  point_to_snd_count, point_to_snd_capacity);
    }
    */

    /* NEW VERSION double tree walk */
    #pragma omp parallel for
    for(int i = 0; i < ctx -> local_n_points; ++i)
@@ -1709,14 +1685,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre

    /* Note that I then have to recieve an equal number of heaps as the one I sent out before */
    heap_node* __heap_batches_to_snd = (heap_node*)MY_MALLOC((uint64_t)k * (uint64_t)tot_points_rcv * sizeof(heap_node));
    heap_node* __heap_batches_to_rcv = (heap_node*)MY_MALLOC((uint64_t)k * (uint64_t)tot_points_snd * sizeof(heap_node));

    
    if( __heap_batches_to_rcv == NULL)
    {
        DB_PRINT("Rank %d failed to allocate rcv_heaps %luB required\n",ctx -> mpi_rank, (uint64_t)k * (uint64_t)tot_points_rcv * sizeof(heap_node));
        exit(1);
    }

    if( __heap_batches_to_snd == NULL)
    {
@@ -1805,11 +1773,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
    MPI_Barrier(ctx -> mpi_communicator);
    MPI_Type_commit(&MPI_my_heap);

    heap_node** rcv_heap_batches = (heap_node**)MY_MALLOC(ctx -> world_size * sizeof(heap_node*));
    for(int i = 0; i < ctx -> world_size; ++i)
    {
        rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] * k;
    }

    /* -------------------------------------
     * ALTERNATIVE TO ALL TO ALL FOR BIG MSG
@@ -1842,86 +1805,226 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre

    int req_idx = 0;

    /* ---------------------------------------------------- */ 
    // FROM HERE
    //heap_node* __heap_batches_to_rcv = (heap_node*)MY_MALLOC((uint64_t)k * (uint64_t)tot_points_snd * sizeof(heap_node));
    //if( __heap_batches_to_rcv == NULL)
    //{
    //    DB_PRINT("Rank %d failed to allocate rcv_heaps %luB required\n",ctx -> mpi_rank, (uint64_t)k * (uint64_t)tot_points_rcv * sizeof(heap_node));
    //    exit(1);
    //}

    //heap_node** rcv_heap_batches = (heap_node**)MY_MALLOC(ctx -> world_size * sizeof(heap_node*));
    //for(int i = 0; i < ctx -> world_size; ++i)
    //{
    //    rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] * k;
    //}

    //HERE

    //for(int i = 0; i < ctx -> world_size; ++i)
    //{
    //    int count = 0;
    //    if(ngbh_to_send[i] > 0)
    //    {
    //        while(already_sent_points[i] < ngbh_to_send[i])
    //        {
    //            MPI_Request request;
    //            count = MIN(default_msg_len, ngbh_to_send[i] - already_sent_points[i] );
    //            MPI_Isend(  heap_batches_per_node[i] + k * already_sent_points[i], count,  
    //                    MPI_my_heap, i, 0, ctx -> mpi_communicator, &request);
    //            already_sent_points[i] += count;
    //            req_array[req_idx] = request;
    //            ++req_idx;
    //        }
    //    }
    //}

    ///* Here it breaks for six nodes */

    //HERE;
    //
    //MPI_Barrier(ctx -> mpi_communicator);
    //MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status);
    ////DB_PRINT("%d %p %p\n",ctx -> mpi_rank, &flag, &status);
    //while(flag)
    //{
    //    MPI_Request request;
    //    int count; 
    //    int source = status.MPI_SOURCE;
    //    MPI_Get_count(&status, MPI_my_heap, &count);
    //    /* recieve each slice */

    //    MPI_Recv(rcv_heap_batches[source] + k * already_rcvd_points[source], 
    //            count, MPI_my_heap, source, MPI_ANY_TAG, ctx -> mpi_communicator, &status);

    //    already_rcvd_points[source] += count;
    //    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status);

    //}
    //MPI_Barrier(ctx -> mpi_communicator);


    //MPI_Testall(req_num, req_array, &flag, MPI_STATUSES_IGNORE);

    //if(flag == 0)
    //{
    //    DB_PRINT("[!!!] Rank %d has unfinished communications\n", ctx -> mpi_rank);
    //    exit(1);
    //}
    //free(req_array);
    //free(already_sent_points);
    //free(already_rcvd_points);

    //elapsed_time = TIME_STOP;
    //LOG_WRITE("Sending results to other proc", elapsed_time);

    ///* merge old with new heaps */

    //MPI_Barrier(ctx -> mpi_communicator);

    //TIME_START;

    //for(int i = 0; i < ctx -> world_size; ++i)
    //{
    //    #pragma omp paralell for
    //    for(int b = 0; b < ngbh_to_recv[i]; ++b)
    //    {
    //        int idx = local_idx_of_the_point[i][b];
    //        /* retrieve the heap */
    //        heap H;
    //        H.count = k;
    //        H.N     = k;
    //        H.data  = rcv_heap_batches[i] + k*b;
    //        /* insert the points into the heap */
    //        for(int j = 0; j < k; ++j)
    //        {
    //            insert_max_heap(&(dp_info[idx].ngbh), H.data[j].value, H.data[j].array_idx);
    //        }
    //    }
    //}
    /* ----------- TO HERE ---------------------------- */

    // find the maximum number of points to send */

    
    idx_t max_n_recv = 0;
    for(int i = 0; i < ctx -> world_size; ++i)
    {
        int count = 0;
        if(ngbh_to_send[i] > 0)
        {
            while(already_sent_points[i] < ngbh_to_send[i])
            {
                MPI_Request request;
                count = MIN(default_msg_len, ngbh_to_send[i] - already_sent_points[i] );
                MPI_Isend(  heap_batches_per_node[i] + k * already_sent_points[i], count,  
                        MPI_my_heap, i, 0, ctx -> mpi_communicator, &request);
                already_sent_points[i] += count;
                req_array[req_idx] = request;
                ++req_idx;
            }
        max_n_recv = MAX(max_n_recv, (idx_t)ngbh_to_recv[i]);
    }

    MPI_DB_PRINT("Using default message lenght %lu\n", default_msg_len);

    heap_node* __heap_batches_to_rcv = (heap_node*)MY_MALLOC((uint64_t)k * (uint64_t)max_n_recv * sizeof(heap_node));
    if( __heap_batches_to_rcv == NULL)
    {
        DB_PRINT("Rank %d failed to allocate rcv_heaps %luB required\n",ctx -> mpi_rank, (uint64_t)k * (uint64_t)max_n_recv* sizeof(heap_node));
        exit(1);
    }

    /* make a ring */

    MPI_Barrier(ctx -> mpi_communicator);
    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status);
    //DB_PRINT("%d %p %p\n",ctx -> mpi_rank, &flag, &status);
    while(flag)
    for(int i = 1; i < ctx -> world_size; ++i)
    {
        MPI_Request request;
        int count; 
        int source = status.MPI_SOURCE;
        MPI_Get_count(&status, MPI_my_heap, &count);
        /* recieve each slice */
        int rank_to_send = (ctx -> mpi_rank + i) % (ctx -> world_size);
        int rank_to_recv = (ctx -> world_size + ctx -> mpi_rank - i) % (ctx -> world_size);

        MPI_Recv(rcv_heap_batches[source] + k * already_rcvd_points[source], 
                count, MPI_my_heap, source, MPI_ANY_TAG, ctx -> mpi_communicator, &status);
        /* do things */

        already_rcvd_points[source] += count;
        MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status);
        /* send out one batch */

    }
        #ifdef PRINT_NGBH_EXCHANGE_SCHEME
            MPI_DB_PRINT("[--- ROUND %d ----]\n", i);
            MPI_Barrier(ctx -> mpi_communicator);
            DB_PRINT("[RANK %d] sending to %d tot: %d [%luB]---- recieving from %d %d\n", ctx -> mpi_rank, 
                    rank_to_send, ngbh_to_send[rank_to_send], ngbh_to_send[rank_to_send]*sizeof(heap_node), rank_to_recv, ngbh_to_recv[rank_to_recv]);
        #endif
        if(ngbh_to_send[rank_to_send] > 0)
        {
            int count_send = 0;
            while(already_sent_points[rank_to_send] < ngbh_to_send[rank_to_send])
            {
                MPI_Request request;
                count_send = MIN((int)default_msg_len, (int)(ngbh_to_send[rank_to_send] - already_sent_points[rank_to_send] ));

                MPI_Isend(  heap_batches_per_node[rank_to_send] + k * already_sent_points[rank_to_send], count_send,  
                        MPI_my_heap, rank_to_send, 0, ctx -> mpi_communicator, &request);

    MPI_Testall(req_num, req_array, &flag, MPI_STATUSES_IGNORE);
                already_sent_points[rank_to_send] += count_send;
                req_array[req_idx] = request;
                ++req_idx;
            }
        }

    if(flag == 0)
        if(ngbh_to_send[rank_to_send] != already_sent_points[rank_to_send] || point_to_rcv_count[rank_to_send] != already_sent_points[rank_to_send])
        {
        DB_PRINT("[!!!] Rank %d has unfinished communications\n", ctx -> mpi_rank);
        exit(1);
            DB_PRINT("Madonnina del mare send [rank %d] %d %d %d\n", ctx -> mpi_rank, ngbh_to_send[rank_to_send], already_sent_points[rank_to_send], point_to_snd_count[rank_to_send]);

        }
    free(req_array);
    free(already_sent_points);
    free(already_rcvd_points);
        
    elapsed_time = TIME_STOP;
    LOG_WRITE("Sending results to other proc", elapsed_time);
        MPI_Barrier(ctx -> mpi_communicator);

    /* merge old with new heaps */
        if(ngbh_to_recv[rank_to_recv] > 0)
        {
            flag = 0;
            while(already_rcvd_points[rank_to_recv] < ngbh_to_recv[rank_to_recv])
            {
                MPI_Probe(rank_to_recv, MPI_ANY_TAG, ctx -> mpi_communicator, &status);
                MPI_Request request;
                int count_recv; 
                int source = status.MPI_SOURCE;
                MPI_Get_count(&status, MPI_my_heap, &count_recv);
                /* recieve each slice */

    MPI_Barrier(ctx -> mpi_communicator);
                MPI_Recv(__heap_batches_to_rcv + k * already_rcvd_points[rank_to_recv], 
                        count_recv, MPI_my_heap, source, MPI_ANY_TAG, ctx -> mpi_communicator, &status);

    TIME_START;
                already_rcvd_points[rank_to_recv] += count_recv;
            }
        }

    for(int i = 0; i < ctx -> world_size; ++i)
        if(ngbh_to_recv[rank_to_recv] != already_rcvd_points[rank_to_recv] || point_to_snd_count[rank_to_recv] != already_rcvd_points[rank_to_recv])
        {
            DB_PRINT("Madonnina del mare [rank %d] %d %d %d\n", ctx -> mpi_rank, ngbh_to_recv[rank_to_recv], already_rcvd_points[rank_to_recv], point_to_snd_count[rank_to_recv]);

        }
        /* merge lists */
        #pragma omp paralell for
        for(int b = 0; b < ngbh_to_recv[i]; ++b)
        for(int b = 0; b < ngbh_to_recv[rank_to_recv]; ++b)
        {
            int idx = local_idx_of_the_point[i][b];
            int idx = local_idx_of_the_point[rank_to_recv][b];
            /* retrieve the heap */
            heap H;
            H.count = k;
            H.N     = k;
            H.data  = rcv_heap_batches[i] + k*b;
            H.data  = __heap_batches_to_rcv + k*b;
            /* insert the points into the heap */
            for(int j = 0; j < k; ++j)
            {
                insert_max_heap(&(dp_info[idx].ngbh), H.data[j].value, H.data[j].array_idx);
            }
        }


        MPI_Barrier(ctx -> mpi_communicator);
    }

    
    MPI_Testall(req_idx, req_array, &flag, MPI_STATUSES_IGNORE);

    if(flag == 0)
    {
        DB_PRINT("[!!!] Rank %d has unfinished communications\n", ctx -> mpi_rank);
        exit(1);
    }
    free(req_array);
    free(already_sent_points);
    free(already_rcvd_points);
    

    /* -------------------------------------------------------- */
    /* heapsort them */

    #pragma omp parallel for
@@ -1964,7 +2067,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
    free(data_to_send_per_proc);
    free(local_idx_of_the_point);
    free(heap_batches_per_node);
    free(rcv_heap_batches);
    //free(rcv_heap_batches);
    free(rcv_work_batches);
    free(point_to_rcv_count);
    free(point_to_snd_count);