Commit c00fd663 authored by lykos98's avatar lykos98
Browse files

working ngbh search

parent 9b670d45
Loading
Loading
Loading
Loading
+141 −22

File changed.

Preview size limit exceeded, changes collapsed.

+1 −0
Original line number Diff line number Diff line
@@ -60,5 +60,6 @@ heap knn_kdtree_v2(FLOAT_TYPE* point, kdnode_v2* kdtree_root, int maxk);
heap knn_kdtree_v2_no_heapsort(FLOAT_TYPE* point, kdnode_v2* kdtree_root, int maxk);

kdnode_v2 * build_tree_kdtree_v2(kdnode_v2* kd_ptrs, size_t n, size_t dimensions);
void knn_sub_tree_search_kdtree_v2(FLOAT_TYPE* point, kdnode_v2* root, heap * H);
void kdtree_v2_init(kdtree_v2* tree, FLOAT_TYPE* data, size_t n_nodes, unsigned int dims);
void kdtree_v2_free(kdtree_v2* tree);
+278 −9
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@
 * neighbors
 */
#include "tree.h"
#include "heap.h"
#include "kdtreeV2.h"
#include "mpi.h"
#include <math.h>
@@ -22,6 +23,8 @@
#define MPI_MY_FLOAT MPI_DOUBLE
#endif

#define HERE printf("%d reached line %d\n", ctx -> mpi_rank, __LINE__);

#define I_AM_MASTER ctx->mpi_rank == 0

#define TOP_TREE_RCH 1
@@ -1039,6 +1042,16 @@ void exchange_points(global_context_t* ctx, top_kdtree_t* tree)
	int* points_per_proc = (int*)malloc(ctx -> world_size * sizeof(int));	
	int* points_owners 	 = (int*)malloc(ctx -> dims * ctx -> local_n_points * sizeof(float_t));
	int* partition_offset = (int*)malloc(ctx -> world_size * sizeof(int));	
    for(int i = 0; i < ctx -> local_n_points; ++i)
    {
        float_t d1 = ctx -> local_data[i * ctx -> dims] + 8.33416939;
        float_t d2 = ctx -> local_data[i * ctx -> dims + 1] + 8.22858047;
        if (sqrt(d1 * d1 + d2 * d2) < 1e-5)
        {
            DB_PRINT("Rank %d found it!!! idx %d\n", ctx -> mpi_rank, i);
        }
    }

	/* compute owner */
	for(size_t i = 0; i < ctx -> local_n_points; ++i)
	{
@@ -1071,6 +1084,13 @@ void exchange_points(global_context_t* ctx, top_kdtree_t* tree)
	int* 	 displs    = (int*)malloc(ctx -> world_size * sizeof(int));
	float_t* rcvbuffer = NULL;
	int tot_count = 0;

    //[-8.33416939 -8.22858047]
    

    MPI_Barrier(ctx -> mpi_communicator);


	for(int rcv = 0; rcv < ctx -> world_size; ++rcv)
	{
		/* recieve the number of points to recieve from each proc */
@@ -1093,22 +1113,43 @@ void exchange_points(global_context_t* ctx, top_kdtree_t* tree)
			}
			//DB_PRINT("[RANK %d] is recieving %d elements %d points\n", rcv, tot_count, tot_count / ctx -> dims);
			rcvbuffer = (float_t*)malloc(tot_count * sizeof(float_t));

            DB_PRINT("Rank %d recieving: \t", ctx -> mpi_rank);
            for(int i = 0; i < ctx -> world_size; ++i)
            {
                DB_PRINT("%d:%d \t", i, rcvcount[i]);
            }
            DB_PRINT("\n");
		}
		MPI_Gatherv(send_buffer, ctx -> dims * points_per_proc[rcv], MPI_MY_FLOAT, rcvbuffer, rcvcount, displs, MPI_MY_FLOAT, rcv, ctx -> mpi_communicator);			
	}

	ctx -> local_n_points = tot_count / ctx -> dims; 
    int* ppp = (int*)malloc(ctx -> world_size * sizeof(int));
    MPI_Allgather(&(ctx -> local_n_points), 1, MPI_INT, ppp, 1, MPI_INT, ctx -> mpi_communicator);
	ctx -> idx_start = 0;
	for(int i = 0; i < ctx -> mpi_rank - 1; ++i)
	for(int i = 0; i < ctx -> mpi_rank; ++i)
	{
		ctx -> idx_start += points_per_proc[i];
		ctx -> idx_start += ppp[i];
	}
    DB_PRINT("rank %d start %lu\n", ctx -> mpi_rank, ctx -> idx_start);


	/* free prv pointer */
    free(ppp);
	free(ctx -> local_data);
	ctx -> local_data = rcvbuffer;

    for(int i = 0; i < ctx -> local_n_points; ++i)
    {
        float_t d1 = ctx -> local_data[i * ctx -> dims] + 8.33416939;
        float_t d2 = ctx -> local_data[i * ctx -> dims + 1] + 8.22858047;
        if (sqrt(d1 * d1 + d2 * d2) < 1e-5)
        {
            DB_PRINT("Rank %d found it!!! idx %d\n", ctx -> mpi_rank, i);
        }
    }
	
	/* check exchange */
	for(size_t i = 0; i < ctx -> local_n_points; ++i)
	{
@@ -1248,6 +1289,14 @@ void tree_walk(

}

void convert_heap_idx_to_global(global_context_t* ctx, heap* H)
{
    for(uint64_t i = 0; i < H -> count; ++i)
    {
        H -> data[i].array_idx = local_to_global_idx(ctx, H -> data[i].array_idx);
    }
}

void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtree_t* top_tree, kdtree_v2* local_tree, float_t* data, int k)
{
	/* local search */
@@ -1257,12 +1306,14 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
		idx_t idx = local_tree -> _nodes[p].array_idx;
		/* actually we want to preserve the heap to then insert guesses from other nodes */
		dp_info[idx].ngbh = knn_kdtree_v2_no_heapsort(local_tree -> _nodes[p].data, local_tree -> root, k);
        convert_heap_idx_to_global(ctx, &(dp_info[idx].ngbh));
		dp_info[idx].cluster_idx = -1;
		dp_info[idx].is_center = 0;
		dp_info[idx].array_idx = idx;
	}
	MPI_DB_PRINT("Done\n");


	/* find if a points needs a refine on the global tree */
	float_t** data_to_send_per_proc  	= (float_t**)malloc(ctx -> world_size * sizeof(float_t*));
	int** 	  local_idx_of_the_point 	= (int**)malloc(ctx -> world_size * sizeof(int*));
@@ -1295,11 +1346,186 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
                  point_to_send_count, point_to_send_capacity);
	}


    MPI_Barrier(ctx -> mpi_communicator);
    DB_PRINT("Rank %d wants to send:\t",ctx -> mpi_rank);
    for(int i = 0; i < ctx -> world_size; ++i)
    {
        DB_PRINT("%d:%d\t", i, point_to_send_count[i]);
    }
    DB_PRINT("\n");
    MPI_Barrier(ctx -> mpi_communicator);
    /* exchange points to work on*/

    int* count_rcv_work_batches = (int*)malloc(ctx -> world_size * sizeof(int));
    float_t** rcv_work_batches = (float_t**)malloc(ctx -> world_size * sizeof(float_t*));
    for(int i = 0; i < ctx -> world_size; ++i) 
    {
        count_rcv_work_batches[i] = 0;
        rcv_work_batches[i]       = NULL;
    }

	for(int i = 0; i < ctx -> world_size; ++i)
	{
        if(point_to_send_count[i] > 0)
        {
            MPI_Request request;
            //MPI_Isend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request)
            MPI_Isend( data_to_send_per_proc[i], 
                    point_to_send_count[i]*(1 + ctx -> dims) ,  /* 1 per max_dist + point*/
                    MPI_MY_FLOAT, i, 0, ctx -> mpi_communicator, &request);
        }
	}

	MPI_Status status;
	int flag;
	int cc = 0;
    MPI_Barrier(ctx -> mpi_communicator);
	MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_SOURCE, ctx -> mpi_communicator, &flag, &status);
	while(flag)
	{
		cc++;
		MPI_Request request;
        int count; 
        MPI_Get_count(&status, MPI_MY_FLOAT, &count);
        rcv_work_batches[status.MPI_SOURCE] = (float_t*)malloc(count * sizeof(float_t));
        //MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status)

		MPI_Recv(rcv_work_batches[status.MPI_SOURCE], count, MPI_MY_FLOAT, status.MPI_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &status);
        count_rcv_work_batches[status.MPI_SOURCE] = count / (1 + ctx -> dims);
		MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_SOURCE, ctx -> mpi_communicator, &flag, &status);

	}

    MPI_DB_PRINT("\n");

    MPI_Barrier(ctx -> mpi_communicator);
    //if(ctx -> mpi_rank == 1)
    {
        DB_PRINT("Rank %d has rcvd:\t",ctx -> mpi_rank);
        for(int i = 0; i < ctx -> world_size; ++i)
        {
		free(data_to_send_per_proc[i]);
		free(local_idx_of_the_point[i]);
            DB_PRINT("%d:%d\t", i,count_rcv_work_batches[i]);
        }
        DB_PRINT("\n");
    }
    MPI_Barrier(ctx -> mpi_communicator);

    /* prepare heap batches */

    int work_batch_stride = 1 + ctx -> dims;

    heap_node** heap_batches_per_node = (heap_node**)malloc(ctx -> world_size * sizeof(heap_node*));
    for(int p = 0; p < ctx -> world_size; ++p) heap_batches_per_node[p] = NULL;

    for(int p = 0; p < ctx -> world_size; ++p)
    {
        if(count_rcv_work_batches[p] > 0)
        {
            heap_batches_per_node[p] = (heap_node*)malloc(k * count_rcv_work_batches[p] * sizeof(heap_node));
            for(int batch = 0; batch < count_rcv_work_batches[p]; ++batch)
            {
                heap H;
                H.count = 0;
                H.N = k;
                H.data = heap_batches_per_node[p] + k * batch; 
                float_t* point = rcv_work_batches[p] + batch * work_batch_stride + 1; 
                knn_sub_tree_search_kdtree_v2(point, local_tree -> root, &H);
                convert_heap_idx_to_global(ctx, &H);
            }
        }
    }

    /* 
     * send out heaps 
     * and rcv counterparts
     */
	MPI_Barrier(ctx -> mpi_communicator);

    heap_node** rcv_heap_batches = (heap_node**)malloc(ctx -> world_size * sizeof(heap_node*));
    for(int i = 0; i < ctx -> world_size; ++i) rcv_heap_batches[i] = NULL;

	for(int i = 0; i < ctx -> world_size; ++i)
	{
        if(count_rcv_work_batches[i] > 0)
        {
            MPI_Request request;
            //MPI_Isend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request)
            MPI_Isend( heap_batches_per_node[i], 
                    sizeof(heap_node) * k * count_rcv_work_batches[i],  /* 1 per max_dist + point*/
                    MPI_CHAR, i, 0, ctx -> mpi_communicator, &request);
        }
	}


	MPI_Barrier(ctx -> mpi_communicator);
	MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status);
    //DB_PRINT("%d %p %p\n",ctx -> mpi_rank, &flag, &status);
	while(flag)
	{
		MPI_Request request;
        int count; 
        MPI_Get_count(&status, MPI_CHAR, &count);
        rcv_heap_batches[status.MPI_SOURCE] = (heap_node*)malloc(count);

		MPI_Recv(rcv_heap_batches[status.MPI_SOURCE], count, MPI_CHAR, status.MPI_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &status);
		MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status);
	}

    MPI_Barrier(ctx -> mpi_communicator);
    /* merge old with new heaps */

	for(int i = 0; i < ctx -> world_size; ++i)
    {
        for(int b = 0; b < point_to_send_count[i]; ++b)
        {
            int idx = local_idx_of_the_point[i][b];
            //MPI_DB_PRINT("%d %d %d\n",i,b,idx);
            /* retrieve the heap */
            heap H;
            H.count = k;
            H.N     = k;
            H.data  = rcv_heap_batches[i] + k*b;
            /* insert the points into the heap */
            for(int j = 0; j < k; ++j)
            {
             //   MPI_DB_PRINT(" -- inserting %u %lf max_dist %lf \n", H.data[j].value, H.data[j].array_idx, dp_info[idx].ngbh.data[0].value);
                insert_max_heap(&(dp_info[idx].ngbh), H.data[j].value, H.data[j].array_idx);
            }
        }
    }
    
    /* heapsort them */

    for(int i = 0; i < ctx -> local_n_points; ++i)
    {
        heap_sort(&(dp_info[i].ngbh));
    }

    char ngbh_out[80];
    sprintf(ngbh_out, "./bb/rank_%d.ngbh",ctx -> mpi_rank);
    FILE* file = fopen(ngbh_out,"w");
    for(int i = 0; i < ctx -> local_n_points; ++i)
    {
        fwrite(dp_info[i].ngbh.data, sizeof(heap_node), k, file);
    }
    fclose(file);


    
	for(int i = 0; i < ctx -> world_size; ++i)
	{
        if(heap_batches_per_node[i])  free(heap_batches_per_node[i]);
		if(data_to_send_per_proc[i])  free(data_to_send_per_proc[i]);
		if(local_idx_of_the_point[i]) free(local_idx_of_the_point[i]);
        if(rcv_work_batches[i])       free(rcv_work_batches[i]);
        if(rcv_heap_batches[i])       free(rcv_heap_batches[i]);
	}

    free(heap_batches_per_node);
    free(rcv_heap_batches);
    free(rcv_work_batches);
    free(count_rcv_work_batches);

	free(point_to_send_count);
	free(point_to_send_capacity);
@@ -1338,6 +1564,47 @@ void build_local_tree(global_context_t* ctx, kdtree_v2* local_tree)
	local_tree -> root = build_tree_kdtree_v2(local_tree -> _nodes, local_tree -> n_nodes, ctx -> dims);
}

void ordered_data_to_file(global_context_t* ctx)
{
    //MPI_Barrier(ctx -> mpi_communicator);
    float_t* tmp_data; 
    int* ppp; 
    int* displs;

    MPI_Barrier(ctx -> mpi_communicator);
    if(I_AM_MASTER) 
    {
        tmp_data = (float_t*)malloc(ctx -> dims * ctx -> n_points * sizeof(float_t));
        ppp      = (int*)malloc(ctx -> world_size * sizeof(int));
        displs   = (int*)malloc(ctx -> world_size * sizeof(int));

    }
    
    MPI_Gather(&(ctx -> local_n_points), 1, MPI_INT, ppp, 1, MPI_INT, 0, ctx -> mpi_communicator);

    if(I_AM_MASTER)
    {
        displs[0] = 0;
        for(int i = 0; i < ctx -> world_size; ++i) ppp[i]    = ctx -> dims * ppp[i];
        for(int i = 1; i < ctx -> world_size; ++i) displs[i] = displs[i - 1] + ppp[i - 1];
            
    }
    MPI_Gatherv(ctx -> local_data, ctx -> dims * ctx -> local_n_points, 
            MPI_MY_FLOAT, tmp_data, ppp, displs, MPI_MY_FLOAT, 0, ctx -> mpi_communicator);

    if(I_AM_MASTER)
    {
        FILE* file = fopen("bb/ordered_data.npy","w");
        fwrite(tmp_data, sizeof(float_t), ctx -> dims * ctx -> n_points, file);
        fclose(file);
        free(tmp_data);
        free(ppp);
        free(displs);

    }
    MPI_Barrier(ctx -> mpi_communicator);
}


void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) 
{
@@ -1363,14 +1630,15 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)

	if (ctx->mpi_rank == 0) 
	{
		data = read_data_file(ctx, "../norm_data/50_blobs_more_var.npy", MY_TRUE);
		//data = read_data_file(ctx, "../norm_data/50_blobs_more_var.npy", MY_TRUE);
		data = read_data_file(ctx, "../norm_data/50_blobs.npy", MY_TRUE);
		ctx->dims = 2;
		// std_g0163178_Me14_091_0000
		// data =
		// read_data_file(ctx,"../norm_data/std_g0163178_Me14_091_0000",MY_TRUE);
		// ctx -> n_points = 48*5*2000;
		ctx->n_points = ctx->n_points / ctx->dims;
		//ctx -> n_points = 48 * 500;
		//ctx -> n_points = 6 * 500;
		mpi_printf(ctx, "Read %lu points in %u dims\n", ctx->n_points, ctx->dims);
	}
	
@@ -1399,9 +1667,10 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
		send_counts[p] = (ctx->n_points / ctx->world_size);
		send_counts[p] += (ctx->n_points % ctx->world_size) > p ? 1 : 0;
		send_counts[p] = send_counts[p] * ctx->dims;
		displacements[p] = displacements[p - 1] + send_counts[p];
		displacements[p] = displacements[p - 1] + send_counts[p - 1];
	}


	ctx->local_n_points = send_counts[ctx->mpi_rank] / ctx->dims;

	/*
@@ -1445,13 +1714,13 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
	build_local_tree(ctx, &local_tree);

	MPI_DB_PRINT("Mi pianto qua\n");
	mpi_ngbh_search(ctx, dp_info, &local_tree, ctx -> local_data, k);
	
	mpi_ngbh_search(ctx, dp_info, &tree, &local_tree, ctx -> local_data, k);
		

	top_tree_free(ctx, &tree);

	kdtree_v2_free(&local_tree);
    ordered_data_to_file(ctx);
	free(send_counts);
	free(displacements);
	free(dp_info);