Commit 97cd94f0 authored by lykos98's avatar lykos98
Browse files

refactored code to not use criticals, implemented request of neighbors of neighbors

parent 18fa41b9
Loading
Loading
Loading
Loading
+31 −17
Original line number Diff line number Diff line
@@ -2,6 +2,8 @@
#include "mpi.h"
#include <time.h>

#define FREE_NOT_NULL(x) if(x){free(x); x = NULL;}

void get_context(global_context_t* ctx)
{
	MPI_Comm_size(ctx -> mpi_communicator, &(ctx -> world_size));
@@ -12,29 +14,41 @@ void get_context(global_context_t* ctx)
	ctx -> ub_box 	  = NULL;
    ctx -> rank_n_points  = (int*)malloc(ctx -> world_size * sizeof(int));
    ctx -> rank_idx_start = (int*)malloc(ctx -> world_size * sizeof(int));
    ctx -> idx_halo_points_recv = NULL;
    ctx -> idx_halo_points_send = NULL;
    ctx -> n_halo_points_recv = NULL;
    ctx -> n_halo_points_send = NULL;
    ctx -> halo_datapoints = NULL;
}

void free_context(global_context_t* ctx)
{
	if(ctx -> local_data) 

    FREE_NOT_NULL(ctx -> local_data);
    FREE_NOT_NULL(ctx -> ub_box);
    FREE_NOT_NULL(ctx -> lb_box);

    if(ctx -> halo_datapoints)
    {
		free(ctx -> local_data);
		ctx -> local_data = NULL;
        for(int i = 0; i < ctx -> world_size; ++i) FREE_NOT_NULL(ctx -> halo_datapoints[i]);
    }
    FREE_NOT_NULL(ctx -> halo_datapoints);

	if(ctx -> ub_box)
    if(ctx -> idx_halo_points_recv)
    {
		free(ctx -> ub_box);
		ctx -> ub_box = NULL;	
        for(int i = 0; i < ctx -> world_size; ++i) FREE_NOT_NULL(ctx -> idx_halo_points_recv[i]);
    }
    FREE_NOT_NULL(ctx -> idx_halo_points_recv);

	if(ctx -> lb_box)
    if(ctx -> idx_halo_points_send)
    {
		free(ctx -> lb_box);
		ctx -> lb_box = NULL;	
        for(int i = 0; i < ctx -> world_size; ++i) FREE_NOT_NULL(ctx -> idx_halo_points_send[i]);
    }
    free(ctx -> rank_n_points);
    free(ctx -> rank_idx_start);
    FREE_NOT_NULL(ctx -> idx_halo_points_send);
    FREE_NOT_NULL(ctx -> n_halo_points_recv);
    FREE_NOT_NULL(ctx -> n_halo_points_send);
    FREE_NOT_NULL(ctx -> rank_n_points);
    FREE_NOT_NULL(ctx -> rank_idx_start);
}

void free_pointset(pointset_t* ps)
+20 −0
Original line number Diff line number Diff line
@@ -5,8 +5,21 @@
#include <mpi.h>
#include <stdint.h>
#include <time.h>
#include "../tree/heap.h"
//#include <stdarg.h>

typedef struct datapoint_info_t {
    idx_t array_idx;
  heap ngbh;
  float_t g;
  float_t log_rho;
  float_t log_rho_c;
  float_t log_rho_err;
  idx_t kstar;
  int is_center;
  int cluster_idx;
} datapoint_info_t;

#define MAX(A,B) ((A) > (B) ? (A) : (B))
#define MIN(A,B) ((A) < (B) ? (A) : (B))

@@ -98,9 +111,16 @@ struct global_context_t
	float_t* local_data;
	float_t* lb_box;
	float_t* ub_box;
    int* n_halo_points_recv;
    int* n_halo_points_send;
    idx_t** idx_halo_points_recv;
    idx_t** idx_halo_points_send;
    size_t n_points;
    size_t idx_start;
    size_t local_n_points;
    datapoint_info_t*  local_datapoints;
    datapoint_info_t** halo_datapoints;
    heap_node* __recieved_heap_data;
    uint32_t dims;
    int* rank_idx_start;
    int* rank_n_points;
+288 −36
Original line number Diff line number Diff line
@@ -2039,7 +2039,7 @@ void ordered_data_to_file(global_context_t* ctx)

static inline int foreign_owner(global_context_t* ctx, idx_t idx)
{
    int owner;
    int owner = ctx -> mpi_rank;
    if( idx >= ctx -> idx_start && idx < ctx -> idx_start + ctx -> local_n_points) 
    {
        return ctx -> mpi_rank;
@@ -2053,38 +2053,27 @@ static inline int foreign_owner(global_context_t* ctx, idx_t idx)
    return owner;
}

static inline void push_on_foreign_idx_list(idx_t element, int owner, int* counts, int* capacities, idx_t** lists)
static inline void append_foreign_idx_list(idx_t element, int owner, int* counts, idx_t** lists)
{

    if(capacities[owner] == counts[owner])
    {
        int new_cap = capacities[owner] * 1.1;
        lists[owner] = (idx_t*)realloc(lists[owner], new_cap * sizeof(idx_t));
        capacities[owner] = new_cap;
    }
   

    /* find the plausible place */
    int idx_to_insert = counts[owner]; 
    
    int flag = 1;
    /* check if point is already inserted */
    for(int i = 0; i < idx_to_insert; ++i)
    {
        flag = flag && lists[owner][i] != element; 
    }
    int idx_to_insert;

    /* if this is the case insert it */ 
    #pragma omp atomic capture
    idx_to_insert = counts[owner]++; 
    
    if(flag)
    {
        counts[owner]++;
    lists[owner][idx_to_insert] = element;
}

int cmp_idx(const void* a, const void* b)
{
    idx_t aa = *((idx_t*)a);
    idx_t bb = *((idx_t*)b);
    return (aa > bb) - (aa < bb);
}

void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp)
void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp, datapoint_info_t** foreign_dp)
{
    int k = dp[0].ngbh.count;
    
@@ -2092,13 +2081,15 @@ void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp)
    int*    count_to_request         = (int*)malloc(ctx -> world_size * sizeof(int));
    int*    capacities               = (int*)malloc(ctx -> world_size * sizeof(int));


    for(int i = 0; i < ctx -> world_size; ++i)
    {
        array_indexes_to_request[i] = (idx_t*)malloc(100 * sizeof(idx_t));
        capacities[i] = 100;
        capacities[i] = 0;
        count_to_request[i] = 0;
    }

    /* count them */
    
    #pragma omp parallel for
    for(uint32_t i = 0; i < ctx -> local_n_points; ++i)
    {
@@ -2111,27 +2102,272 @@ void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp)
            if(owner != ctx -> mpi_rank)
            {
                //DB_PRINT("Hehe");
                #pragma omp critical
                /* TODO: change this to a series of locks */
                #pragma  omp atomic update
                capacities[owner]++;
            }
        }
    }

    /* alloc */

    for(int i = 0; i < ctx -> world_size; ++i)
    {
        array_indexes_to_request[i] = (idx_t*)malloc(capacities[i] * sizeof(idx_t));
    }

    /* append them */
    
    #pragma omp parallel for
    for(uint32_t i = 0; i < ctx -> local_n_points; ++i)
    {
        for(int j = 0; j < k; ++j)
        {
            idx_t element = dp[i].ngbh.data[j].array_idx;        
            int owner = foreign_owner(ctx, element);
            //DB_PRINT("%lu %d\n", element, owner);
            
            if(owner != ctx -> mpi_rank)
            {
                    push_on_foreign_idx_list(element, owner, count_to_request, capacities, array_indexes_to_request);
                //DB_PRINT("Hehe");
                /* TODO: change this to a series of locks */
                append_foreign_idx_list(element, owner, count_to_request, array_indexes_to_request); 
            }
        }
    }

    /* prune them */
    int* unique_count = (int*)malloc(ctx -> world_size * sizeof(int));

    /*
    if(I_AM_MASTER)
    {
        FILE* f = fopen("uniq","w");
        fwrite(array_indexes_to_request[1], sizeof(idx_t), capacities[1],f);
        fclose(f);
    }
    */

    #pragma omp paralell for
    for(int i = 0; i < ctx -> world_size; ++i)
    {
       unique_count[i] = capacities[i] > 0; //init unique count 
       qsort(array_indexes_to_request[i], capacities[i], sizeof(idx_t), cmp_idx); 
       uint32_t prev = array_indexes_to_request[i][0];
       for(int el = 1; el < capacities[i]; ++el)
       {
            int flag = prev == array_indexes_to_request[i][el];
            if(!flag)
            {
                /* in place subsitution 
                 * if a different element is found then 
                 * copy in at the next free place
                 * */
                array_indexes_to_request[i][unique_count[i]] = array_indexes_to_request[i][el];
                unique_count[i]++;
            }
            prev = array_indexes_to_request[i][el];
       }
    }

    for(int i = 0; i < ctx -> world_size; ++i)
    {
        array_indexes_to_request[i] = (idx_t*)realloc(array_indexes_to_request[i], unique_count[i] * sizeof(idx_t));
    }


    
    DB_PRINT("[RANK %d elements to request]", ctx -> mpi_rank);
    for(int i = 0; i < ctx -> world_size; ++i)
    {
        DB_PRINT("%d\t", count_to_request[i]);
        DB_PRINT("%d\t", unique_count[i]);
    }
    DB_PRINT("\n");

    /*
    if(I_AM_MASTER)
    {
        FILE* f = fopen("uniq2","w");
        fwrite(array_indexes_to_request[1], sizeof(idx_t), unique_count[1],f);
        fclose(f);
    }
    */

    for(int i = 0; i < ctx -> world_size; ++i)
    {
        foreign_dp[i] = (datapoint_info_t*)calloc(sizeof(datapoint_info_t), unique_count[i]);
    }
    
    /* alias for unique counts */
    int* n_heap_to_recv = unique_count;
    int* n_heap_to_send = (int*)malloc(ctx -> world_size * sizeof(int));

    /* exchange how many to recv how many to send */

    MPI_Alltoall(n_heap_to_recv, 1, MPI_INT, n_heap_to_send, 1, MPI_INT , ctx -> mpi_communicator);

    /* compute displacements and yada yada */
    int* sdispls = (int*)calloc(ctx -> world_size , sizeof(int));
    int* rdispls = (int*)calloc(ctx -> world_size , sizeof(int));
    
    int tot_count_send = n_heap_to_send[0];
    int tot_count_recv = n_heap_to_recv[0];
    for(int i = 1; i < ctx -> world_size; ++i)
    {
        sdispls[i] = sdispls[i - 1] + n_heap_to_send[i - 1];
        rdispls[i] = rdispls[i - 1] + n_heap_to_recv[i - 1];

        tot_count_send += n_heap_to_send[i];
        tot_count_recv += n_heap_to_recv[i];
    }
    idx_t* idx_buffer_to_send = (idx_t*)malloc(tot_count_send * sizeof(idx_t));
    idx_t* idx_buffer_to_recv = (idx_t*)malloc(tot_count_recv * sizeof(idx_t));
    
    /* copy indexes on send buffer */
    for(int i = 0; i < ctx -> world_size; ++i)
    {
        memcpy(idx_buffer_to_recv + rdispls[i], array_indexes_to_request[i], n_heap_to_recv[i] * sizeof(idx_t));
    }
    
    MPI_Alltoallv(idx_buffer_to_recv, n_heap_to_recv, rdispls, MPI_UINT64_T, idx_buffer_to_send, n_heap_to_send, sdispls, MPI_UINT64_T, ctx -> mpi_communicator);


    /* allocate foreign dp */ 
    heap_node* heap_buffer_to_send = (heap_node*)malloc(tot_count_send * k * sizeof(heap_node));
    heap_node* heap_buffer_to_recv = (heap_node*)malloc(tot_count_recv * k * sizeof(heap_node));

    for(int i = 0; i < tot_count_send; ++i)
    {
        idx_t idx = idx_buffer_to_send[i] - ctx -> idx_start;
        memcpy(heap_buffer_to_send + i * k, dp[idx].ngbh.data, k * sizeof(heap_node));
    }
    /* exchange heaps */


    MPI_Barrier(ctx -> mpi_communicator);
    int default_msg_len = MAX_MSG_SIZE / (k * sizeof(heap_node));

    int* already_sent_points = (int*)malloc(ctx -> world_size * sizeof(int));
    int* already_rcvd_points = (int*)malloc(ctx -> world_size * sizeof(int));

    /* allocate a request array to keep track of all requests going out*/
    MPI_Request* req_array;
    int req_num = 0;
    for(int i = 0; i < ctx -> world_size; ++i)
    {
        req_num += n_heap_to_send[i] > 0 ? n_heap_to_send[i]/default_msg_len + 1 : 0;
    }

    req_array = (MPI_Request*)malloc(req_num * sizeof(MPI_Request));

    for(int i = 0; i < ctx -> world_size; ++i)
    {
        already_sent_points[i] = 0;
        already_rcvd_points[i] = 0;
    }

    int req_idx = 0;
    
    MPI_Datatype MPI_my_heap;
    MPI_Type_contiguous(k * sizeof(heap_node), MPI_CHAR, &MPI_my_heap);
    MPI_Barrier(ctx -> mpi_communicator);
    MPI_Type_commit(&MPI_my_heap);

    for(int i = 0; i < ctx -> world_size; ++i)
    {
        int count = 0;
        if(n_heap_to_send[i] > 0)
        {
            while(already_sent_points[i] < n_heap_to_send[i])
            {
                MPI_Request request;
                count = MIN(default_msg_len, n_heap_to_send[i] - already_sent_points[i] );
                MPI_Isend(  heap_buffer_to_send + k * (already_sent_points[i] + sdispls[i]), count,  
                        MPI_my_heap, i, 0, ctx -> mpi_communicator, &request);
                already_sent_points[i] += count;
                req_array[req_idx] = request;
                ++req_idx;
            }
        }
    }
    int flag; 
    MPI_Status status;
    MPI_Barrier(ctx -> mpi_communicator);
    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status);
    //DB_PRINT("%d %p %p\n",ctx -> mpi_rank, &flag, &status);
    while(flag)
    {
        MPI_Request request;
        int count; 
        int source = status.MPI_SOURCE;
        MPI_Get_count(&status, MPI_my_heap, &count);
        /* recieve each slice */

        MPI_Recv(heap_buffer_to_recv + k * (already_rcvd_points[source] + rdispls[source]), 
                count, MPI_my_heap, source, MPI_ANY_TAG, ctx -> mpi_communicator, &status);

        already_rcvd_points[source] += count;
        MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status);

    }
    MPI_Barrier(ctx -> mpi_communicator);

    MPI_Type_free(&MPI_my_heap);

    MPI_Testall(req_num, req_array, &flag, MPI_STATUSES_IGNORE);

    if(flag == 0)
    {
        DB_PRINT("[!!!] Rank %d has unfinished communications\n", ctx -> mpi_rank);
        exit(1);
    }
    free(req_array);
    free(already_sent_points);
    free(already_rcvd_points);


    /* copy results where needed */
    for(int i = 0; i < ctx -> world_size; ++i)
    {
        for(int j = 0; j < n_heap_to_recv[i]; ++j)
        {
            foreign_dp[i][j].array_idx = array_indexes_to_request[i][j];
            init_heap(&(foreign_dp[i][j].ngbh));
            allocate_heap(&(foreign_dp[i][j].ngbh), k);
            foreign_dp[i][j].ngbh.N     = k;
            foreign_dp[i][j].ngbh.count = k;
            memcpy(foreign_dp[i][j].ngbh.data, heap_buffer_to_recv + k * (j + rdispls[i]), k * sizeof(heap_node));

            if(foreign_dp[i][j].ngbh.data[0].array_idx != array_indexes_to_request[i][j])
            {
                printf("Chahah %lu\n",array_indexes_to_request[i][j]);
            }
        }
    }
    

    /* put back indexes in the context */

    ctx -> idx_halo_points_recv = array_indexes_to_request; 
    ctx -> n_halo_points_recv   = n_heap_to_recv;

    ctx -> n_halo_points_send = n_heap_to_send;
    ctx -> idx_halo_points_send = (idx_t**)malloc(ctx -> world_size * sizeof(idx_t*));
    for(int i = 0; i < ctx -> world_size; ++i) ctx -> idx_halo_points_send[i] = NULL;

    for(int i = 0; i < ctx -> world_size; ++i)
    {
        ctx -> idx_halo_points_send[i] = (idx_t*)malloc( n_heap_to_send[i] * sizeof(idx_t));
        memcpy(ctx -> idx_halo_points_send[i], idx_buffer_to_send + sdispls[i], n_heap_to_send[i] * sizeof(idx_t) ); 
    }

    for(int i = 0; i < ctx -> world_size; ++i) free(array_indexes_to_request[i]);
    free(array_indexes_to_request);
    free(count_to_request);
    free(capacities);

    free(heap_buffer_to_recv);
    free(heap_buffer_to_send);
    free(idx_buffer_to_send);
    free(idx_buffer_to_recv);
}

void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) 
@@ -2168,7 +2404,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)

        //ctx -> n_points = 48*5*2000;
        ctx->n_points = ctx->n_points / ctx->dims;
        ctx->n_points = (ctx->n_points * 6) / 10;
        ctx->n_points = (ctx->n_points * 0.1) / 10;
        // ctx -> n_points = ctx -> world_size * 1000;

        //ctx -> n_points = 10000000 * ctx -> world_size;
@@ -2271,7 +2507,12 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    elapsed_time = TIME_STOP;
    LOG_WRITE("Total time for all knn search", elapsed_time)

    //find_foreign_nodes(ctx, dp_info);
    TIME_START;

    datapoint_info_t** foreign_dp_info = (datapoint_info_t**)malloc(ctx -> world_size * sizeof(datapoint_info_t*));
    find_foreign_nodes(ctx, dp_info, foreign_dp_info);
    elapsed_time = TIME_STOP;
    LOG_WRITE("Finding points to request the ngbh", elapsed_time)


    #if defined (WRITE_NGBH)
@@ -2283,6 +2524,17 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
        free(dp_info[i].ngbh.data);
    }

    for(int i = 0; i < ctx -> world_size; ++i)
    {
        for(int j = 0; j < ctx -> n_halo_points_recv[i]; ++j)
        {
            free(foreign_dp_info[i][j].ngbh.data);
        }
        free(foreign_dp_info[i]);
    }

    free(foreign_dp_info);
    
    
    top_tree_free(ctx, &tree);
    kdtree_v2_free(&local_tree);
+0 −11
Original line number Diff line number Diff line
@@ -82,17 +82,6 @@ typedef struct top_kdtree_t



typedef struct datapoint_info_t {
    idx_t array_idx;
  heap ngbh;
  float_t g;
  float_t log_rho;
  float_t log_rho_c;
  float_t log_rho_err;
  idx_t kstar;
  int is_center;
  int cluster_idx;
} datapoint_info_t;