Commit 55582ae9 authored by lykos98's avatar lykos98
Browse files

added rma computation for density estimation

parent c566198d
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
#!/bin/bash

#SBATCH --nodes=4
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=2
#SBATCH --cpus-per-task=18
#SBATCH --time=01:00:00
@@ -27,7 +27,7 @@ export OMP_PROC_BIND=close
rm bb/*

#time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:node:PE=${SLURM_CPUS_PER_TASK}  main
time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK}  main
time mpirun -n ${SLURM_NTASKS} --mca orte_base_help_aggregate 0 --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK}  main
#time mpirun -n ${SLURM_NTASKS} main

#time python3 check.py
+39 −4
Original line number Diff line number Diff line
@@ -23,6 +23,38 @@ void get_context(global_context_t* ctx)
    ctx -> halo_datapoints  = NULL;
    ctx -> local_datapoints = NULL;
    ctx -> __recv_heap_buffers = NULL;
    ctx -> __local_heap_buffers = NULL;
}

void print_error_code(int err)
{
    switch (err) 
    {
        case MPI_SUCCESS:
            DB_PRINT("MPI_SUCCESS\n");
            break;
        case MPI_ERR_ARG:
            DB_PRINT("MPI_ERR_ARG\n");
            break;
        case MPI_ERR_COMM:
            DB_PRINT("MPI_ERR_COMM\n");
            break;
        case MPI_ERR_DISP:
            DB_PRINT("MPI_ERR_DISP\n");
            break;
        case MPI_ERR_INFO:
            DB_PRINT("MPI_ERR_INFO\n");
            break;
        case MPI_ERR_SIZE:
            DB_PRINT("MPI_ERR_SIZE\n");
            break;
        case MPI_ERR_OTHER:
            DB_PRINT("MPI_ERR_OTHER\n");
            break;
        default:
            break;
    
    }
}

void free_context(global_context_t* ctx)
@@ -31,11 +63,14 @@ void free_context(global_context_t* ctx)
    FREE_NOT_NULL(ctx -> local_data);
    FREE_NOT_NULL(ctx -> ub_box);
    FREE_NOT_NULL(ctx -> lb_box);
    //FREE_NOT_NULL(ctx -> __local_heap_buffers);
    if(ctx -> __local_heap_buffers) MPI_Free_mem(ctx -> __local_heap_buffers);

    if(ctx -> local_datapoints)
    {
        for(int i = 0; i < ctx -> local_n_points; ++i) FREE_NOT_NULL(ctx -> local_datapoints[i].ngbh.data);
    }

    //if(ctx -> local_datapoints)
    //{
    //    for(int i = 0; i < ctx -> local_n_points; ++i) FREE_NOT_NULL(ctx -> local_datapoints[i].ngbh.data);
    //}

    FREE_NOT_NULL(ctx -> local_datapoints);
    if(ctx -> halo_datapoints)
+11 −8
Original line number Diff line number Diff line
@@ -111,6 +111,7 @@ struct global_context_t
    int world_size; 
    int mpi_rank;
    int __processor_name_len;
    idx_t k;
	float_t* local_data;
	float_t* lb_box;
	float_t* ub_box;
@@ -130,6 +131,7 @@ struct global_context_t
	char processor_mame[MPI_MAX_PROCESSOR_NAME];
	MPI_Comm mpi_communicator;
    heap_node* __recv_heap_buffers;
    heap_node* __local_heap_buffers;
};

struct pointset_t
@@ -165,5 +167,6 @@ void lu_dynamic_array_pushBack(lu_dynamic_array_t * a, idx_t p);
void lu_dynamic_array_Reset(lu_dynamic_array_t * a);
void lu_dynamic_array_reserve(lu_dynamic_array_t * a, idx_t n);
void lu_dynamic_array_init(lu_dynamic_array_t * a);
void print_error_code(int err);

+2 −0
Original line number Diff line number Diff line
@@ -8,6 +8,8 @@
#define T double
#define DATA_DIMS 0 

#define HERE printf("%d reached line %d\n", ctx -> mpi_rank, __LINE__);

#ifdef USE_FLOAT32
    #define FLOAT_TYPE float
#else
+393 −31
Original line number Diff line number Diff line
@@ -41,7 +41,6 @@
#define MPI_MY_FLOAT MPI_DOUBLE
#endif

#define HERE printf("%d reached line %d\n", ctx -> mpi_rank, __LINE__);

#define I_AM_MASTER ctx->mpi_rank == 0

@@ -1547,18 +1546,28 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
    /* local search */
    /* print diagnostics */
    print_diagnositcs(ctx, k);
    ctx -> k = (idx_t)k;
    
    TIME_DEF;
    double elapsed_time;

    TIME_START;
    MPI_Barrier(ctx -> mpi_communicator);
    //ctx -> __local_heap_buffers = (heap_node*)malloc(ctx -> local_n_points * k * sizeof(heap_node));
    MPI_Alloc_mem(ctx -> local_n_points * k * sizeof(heap_node), MPI_INFO_NULL, &(ctx -> __local_heap_buffers));

    #pragma omp parallel for
    for(int p = 0; p < ctx -> local_n_points; ++p)
    {
        idx_t idx = local_tree -> _nodes[p].array_idx;
        /* actually we want to preserve the heap to then insert guesses from other nodes */
        dp_info[idx].ngbh = knn_kdtree_v2_no_heapsort(local_tree -> _nodes[p].data, local_tree -> root, k);
        dp_info[idx].ngbh.data  = ctx -> __local_heap_buffers + k * idx;
        dp_info[idx].ngbh.count = 0;
        dp_info[idx].ngbh.N     = k;

        //dp_info[idx].ngbh = knn_kdtree_v2_no_heapsort(local_tree -> _nodes[p].data, local_tree -> root, k);
        knn_sub_tree_search_kdtree_v2(local_tree -> _nodes[p].data, local_tree -> root, &(dp_info[idx].ngbh));

        convert_heap_idx_to_global(ctx, &(dp_info[idx].ngbh));
        dp_info[idx].cluster_idx = -1;
        dp_info[idx].is_center = 0;
@@ -2741,6 +2750,369 @@ void compute_density_kstarnn(global_context_t* ctx, const float_t d, int verbose
    return;


}

float_t get_j_ksel_dist(global_context_t* ctx, idx_t j, idx_t ksel, MPI_Win* exposed_ngbh)
{
    int owner = foreign_owner(ctx, j);
    idx_t k = ctx -> k;
    /* find if datapoint is halo or not */
    if(owner == ctx -> mpi_rank)
    {
        idx_t pos = j - ctx -> idx_start;
        return ctx -> local_datapoints[pos].ngbh.data[ksel].value;
    }
    else
    {
        //RMA
        heap_node el;
        idx_t pos  = j - ctx -> rank_idx_start[owner];
        MPI_Status status;
        MPI_Request request;
        MPI_Rget(&el, sizeof(heap_node), MPI_CHAR, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_CHAR, *exposed_ngbh, &request);
        MPI_Wait(&request,&status);
        return el.value;
    }                 
}

void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int verbose){

    /*
     * Point density computation:                       
     * args:                                            
     * - paricles: array of structs                   
     * - d       : intrinsic dimension of the dataset 
     * - points  : number of points in the dataset    
     */


    MPI_Win exposed_ngbh;
    MPI_Info info;


    MPI_Barrier(ctx -> mpi_communicator);
    idx_t k = ctx -> local_datapoints[0].ngbh.N;
    MPI_DB_PRINT("%lu %p\n", k,  ctx -> __local_heap_buffers);

    //int* test_buffer = (int*)malloc(k * sizeof(int));
    //for(int i = 0; i < k; ++i) test_buffer[i] = (ctx -> mpi_rank + 1) * 1024;

    MPI_Win_create( ctx -> __local_heap_buffers, 
                    ctx -> local_n_points * k * sizeof(heap_node), 
                    1, MPI_INFO_NULL, 
                    ctx -> mpi_communicator, 
                    &exposed_ngbh);

    MPI_Win_fence(0, exposed_ngbh);

    struct timespec start_tot, finish_tot;
    double elapsed_tot;

    datapoint_info_t* local_datapoints = ctx -> local_datapoints;

    //DB_PRINT("rank %d pos %lu own %d %lu %lu %lu\n", ctx -> mpi_rank, pos, owner, k, ksel, (pos * k + ksel) * sizeof(heap_node));
	if(verbose)
	{
		printf("Density and k* estimation:\n");
		clock_gettime(CLOCK_MONOTONIC, &start_tot);
	}

    idx_t kMAX = ctx -> local_datapoints[0].ngbh.N - 1;   

    float_t omega = 0.;  
    if(sizeof(float_t) == sizeof(float)){ omega = powf(PI_F,d/2)/tgammaf(d/2.0f + 1.0f);}  
    else{omega = pow(M_PI,d/2.)/tgamma(d/2.0 + 1.0);}

    #pragma omp parallel for
    for(idx_t i = 0; i < ctx -> local_n_points; ++i)
    {

        idx_t j = 4;
        idx_t k;
        float_t dL  = 0.;
        float_t vvi = 0.;
		float_t vvj = 0.;
		float_t vp  = 0.;
        while(j < kMAX  && dL < DTHR)
        {
            idx_t ksel = j - 1;
            vvi = omega * pow(local_datapoints[i].ngbh.data[ksel].value,d/2.);

            idx_t jj = local_datapoints[i].ngbh.data[j].array_idx;

            /* 
             * note jj can be an halo point 
             * need to search maybe for it in foreign nodes
             * */
            float_t dist_jj = get_j_ksel_dist(ctx, jj, ksel, &exposed_ngbh);
            vvj = omega * pow(dist_jj,d/2.);

            vp = (vvi + vvj)*(vvi + vvj);
            dL = -2.0 * ksel * log(4.*vvi*vvj/vp);
            j = j + 1;
        }
        if(j == kMAX)
        {
            k = j - 1;
            vvi = omega * pow(ctx -> local_datapoints[i].ngbh.data[k].value,d/2.);
        }
        else
        {
            k = j - 2;
        }
        local_datapoints[i].kstar = k;
        local_datapoints[i].log_rho = log((float_t)(k)/vvi/((float_t)(ctx -> n_points)));
        //dp_info[i].log_rho = log((float_t)(k)) - log(vvi) -log((float_t)(points));
        local_datapoints[i].log_rho_err =   1.0/sqrt((float_t)k); //(float_t)(-Q_rsqrt((float)k));
        local_datapoints[i].g = local_datapoints[i].log_rho - local_datapoints[i].log_rho_err;
    }

	if(verbose)
	{
		clock_gettime(CLOCK_MONOTONIC, &finish_tot);
		elapsed_tot = (finish_tot.tv_sec - start_tot.tv_sec);
		elapsed_tot += (finish_tot.tv_nsec - start_tot.tv_nsec) / 1000000000.0;
		printf("\tTotal time: %.3lfs\n\n", elapsed_tot);
	}
    
    MPI_Win_fence(0, exposed_ngbh);
    MPI_Barrier(ctx -> mpi_communicator);
    MPI_Win_free(&exposed_ngbh);

    #if defined(WRITE_DENSITY)
        /* densities */
        float_t* den = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t));
        idx_t* ks = (idx_t*)malloc(ctx -> local_n_points * sizeof(idx_t));
        for(int i = 0; i < ctx -> local_n_points; ++i) den[i] = ctx -> local_datapoints[i].log_rho;
        for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].kstar;

        ordered_buffer_to_file(ctx, den, sizeof(float_t), ctx -> local_n_points, "bb/ordered_density.npy");
        ordered_buffer_to_file(ctx, ks, sizeof(idx_t), ctx -> local_n_points, "bb/ks.npy");
        ordered_data_to_file(ctx);
        free(den);
        free(ks);
    #endif
    return;


}

float_t get_j_ksel_dist_v2(global_context_t* ctx, idx_t i, idx_t j, idx_t ksel, int* flags, heap_node* tmp_heap_nodes, MPI_Win* exposed_ngbh)
{
    if(flags[i])
    {
        int owner = foreign_owner(ctx, j);
        idx_t k = ctx -> k;
        /* find if datapoint is halo or not */
        if(owner == ctx -> mpi_rank)
        {
            idx_t pos = j - ctx -> idx_start;
            return ctx -> local_datapoints[pos].ngbh.data[ksel].value;
        }
        else
        {
            //RMA
            flags[i] = 0;
            idx_t pos  = j - ctx -> rank_idx_start[owner];
            MPI_Get(tmp_heap_nodes + i, sizeof(heap_node), MPI_CHAR, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_CHAR, *exposed_ngbh);
            //if(ctx -> mpi_rank == 0) DB_PRINT("rvcd %lu %lf\n", el.array_idx, el.value);
            return 0;
        }                 
    }
    else
    {
        flags[i] = 1;
        return tmp_heap_nodes[i].value;
    }
}

void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int verbose){

    /*
     * Point density computation:                       
     * args:                                            
     * - paricles: array of structs                   
     * - d       : intrinsic dimension of the dataset 
     * - points  : number of points in the dataset    
     */


    MPI_Win exposed_ngbh;
    MPI_Info info;


    MPI_Barrier(ctx -> mpi_communicator);
    idx_t k = ctx -> local_datapoints[0].ngbh.N;
    MPI_DB_PRINT("%lu %p\n", k,  ctx -> __local_heap_buffers);


    MPI_Win_create( ctx -> __local_heap_buffers, 
                    ctx -> local_n_points * k * sizeof(heap_node), 
                    1, MPI_INFO_NULL, 
                    ctx -> mpi_communicator, 
                    &exposed_ngbh);

    MPI_Win_fence(0, exposed_ngbh);

    struct timespec start_tot, finish_tot;
    double elapsed_tot;

    datapoint_info_t* local_datapoints = ctx -> local_datapoints;

	if(verbose)
	{
		printf("Density and k* estimation:\n");
		clock_gettime(CLOCK_MONOTONIC, &start_tot);
	}

    idx_t kMAX = ctx -> local_datapoints[0].ngbh.N - 1;   

    float_t omega = 0.;  
    if(sizeof(float_t) == sizeof(float)){ omega = powf(PI_F,d/2)/tgammaf(d/2.0f + 1.0f);}  
    else{omega = pow(M_PI,d/2.)/tgamma(d/2.0 + 1.0);}


    /*
     * Iterative, wait after each pass for communications to finish
     * 
     * */

    int finished = 0;
    int fin_num  = 0;

    int* flags      = (int*)malloc(ctx -> local_n_points * sizeof(int));
    int* last_j     = (int*)malloc(ctx -> local_n_points * sizeof(int));
    int* completed  = (int*)malloc(ctx -> local_n_points * sizeof(int));
    heap_node* tmp_heap_nodes = (heap_node*)malloc(ctx -> local_n_points * sizeof(heap_node));

    for(int i = 0; i < ctx -> local_n_points; ++i)
    {
        flags[i]  = 1;
        last_j[i] = 4;
        completed[i] = 0;
    }


    while(!finished)
    {
        finished = 1;
        #pragma omp parallel for
        for(idx_t i = 0; i < ctx -> local_n_points; ++i)
        {
            if(!completed[i])
            {

                idx_t j = last_j[i];
                idx_t k;
                float_t dL  = 0.;
                float_t vvi = 0.;
                float_t vvj = 0.;
                float_t vp  = 0.;
                int dl_DTHR_flag = 1;
                while(j < kMAX  && dl_DTHR_flag)
                {
                    idx_t ksel = j - 1;
                    vvi = omega * pow(local_datapoints[i].ngbh.data[ksel].value,d/2.);

                    idx_t jj = local_datapoints[i].ngbh.data[j].array_idx;

                    /* 
                     * note jj can be an halo point 
                     * need to search maybe for it in foreign nodes
                     * */
                    float_t dist_jj = get_j_ksel_dist_v2(ctx, i, jj, ksel, flags, tmp_heap_nodes, &exposed_ngbh);
                    if(flags[i])
                    {
                        //if the ngbh node is available compute it else
                        //you need to wait for comms
                        vvj = omega * pow(dist_jj,d/2.);

                        vp = (vvi + vvj)*(vvi + vvj);
                        dL = -2.0 * ksel * log(4.*vvi*vvj/vp);

                        dl_DTHR_flag = dL < DTHR;
                        j = j + 1;
                        last_j[i] = j;
                    }
                    else
                    {
                        break;
                    }
                }
                if(j == kMAX || !dl_DTHR_flag)
                {

                    if(j == kMAX)
                    {
                        k = j - 1;
                        vvi = omega * pow(ctx -> local_datapoints[i].ngbh.data[k].value,d/2.);
                    }
                    else
                    {
                        k = j - 2;
                    }
                    local_datapoints[i].kstar = k;
                    local_datapoints[i].log_rho = log((float_t)(k)/vvi/((float_t)(ctx -> n_points)));
                    //dp_info[i].log_rho = log((float_t)(k)) - log(vvi) -log((float_t)(points));
                    local_datapoints[i].log_rho_err =   1.0/sqrt((float_t)k); //(float_t)(-Q_rsqrt((float)k));
                    local_datapoints[i].g = local_datapoints[i].log_rho - local_datapoints[i].log_rho_err;

                    completed[i] = 1;
                    #pragma omp atomic update
                    finished = finished & 1; 

                    #pragma omp atomic update
                    fin_num++;
                }
                else
                {
                    completed[i] = 0;
                    #pragma omp atomic update
                    finished = finished & 0;
                }
            }

        }

        //DB_PRINT("Rank %d fin %d/%lu\n", ctx -> mpi_rank, fin_num, ctx -> local_n_points);
        //call the fence to get out all results
        MPI_Win_fence(0, exposed_ngbh);
        MPI_Allreduce(MPI_IN_PLACE, &finished, 1, MPI_INT, MPI_LAND, ctx -> mpi_communicator);
    }

	if(verbose)
	{
		clock_gettime(CLOCK_MONOTONIC, &finish_tot);
		elapsed_tot = (finish_tot.tv_sec - start_tot.tv_sec);
		elapsed_tot += (finish_tot.tv_nsec - start_tot.tv_nsec) / 1000000000.0;
		printf("\tTotal time: %.3lfs\n\n", elapsed_tot);
	}

    free(flags);
    free(tmp_heap_nodes);
    free(completed);
    free(last_j);
    
    MPI_Win_fence(0, exposed_ngbh);
    MPI_Barrier(ctx -> mpi_communicator);
    MPI_Win_free(&exposed_ngbh);

    #if defined(WRITE_DENSITY)
        /* densities */
        float_t* den = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t));
        idx_t* ks = (idx_t*)malloc(ctx -> local_n_points * sizeof(idx_t));
        for(int i = 0; i < ctx -> local_n_points; ++i) den[i] = ctx -> local_datapoints[i].log_rho;
        for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].kstar;

        ordered_buffer_to_file(ctx, den, sizeof(float_t), ctx -> local_n_points, "bb/ordered_density.npy");
        ordered_buffer_to_file(ctx, ks, sizeof(idx_t), ctx -> local_n_points, "bb/ks.npy");
        ordered_data_to_file(ctx);
        free(den);
        free(ks);
    #endif
    return;


}

void clusters_allocate(clusters_t * c, int s)
@@ -3612,10 +3984,10 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    
        // 190M points
        // std_g2980844_091_0000
        data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",MY_TRUE);
        // data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",MY_TRUE);
        
        /* 1M points ca.*/
        //data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE);
        data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE);

        /* BOX */
        // data = read_data_file(ctx,"../norm_data/std_Box_256_30_092_0000",MY_TRUE);
@@ -3632,7 +4004,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
        // data = read_data_file(ctx,"../norm_data/std_g1212639_091_0001",MY_TRUE);
        ctx->dims = 5;

        //ctx -> n_points = 48*5*2000;
        //ctx -> n_points = 5*2000;
        ctx->n_points = ctx->n_points / ctx->dims;
        //ctx->n_points = (ctx->n_points * 5) / 10;
        // ctx -> n_points = ctx -> world_size * 1000;
@@ -3740,12 +4112,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    LOG_WRITE("Total time for all knn search", elapsed_time)


    TIME_START;

    datapoint_info_t** foreign_dp_info = (datapoint_info_t**)malloc(ctx -> world_size * sizeof(datapoint_info_t*));
    find_foreign_nodes(ctx, dp_info, foreign_dp_info);
    elapsed_time = TIME_STOP;
    LOG_WRITE("Finding points to request the ngbh", elapsed_time)

    TIME_START;
    //float_t id = id_estimate(ctx, dp_info, ctx -> local_n_points, 0.9, MY_FALSE);
@@ -3758,16 +4124,27 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)

    MPI_DB_PRINT("ID %lf \n",id);


    //TIME_START;
    //datapoint_info_t** foreign_dp_info = (datapoint_info_t**)malloc(ctx -> world_size * sizeof(datapoint_info_t*));
    //find_foreign_nodes(ctx, dp_info, foreign_dp_info);
    //elapsed_time = TIME_STOP;
    //LOG_WRITE("Finding points to request the ngbh", elapsed_time)

    TIME_START;
    compute_density_kstarnn(ctx, id, MY_FALSE);

    ctx -> local_datapoints = dp_info;
    //compute_density_kstarnn_rma_v2(ctx, id, MY_FALSE);
    compute_density_kstarnn_rma(ctx, id, MY_FALSE);
    //compute_density_kstarnn(ctx, id, MY_FALSE);
    compute_correction(ctx, 2);
    elapsed_time = TIME_STOP;
    LOG_WRITE("Density estimate", elapsed_time)

    TIME_START;
    Heuristic1(ctx, MY_FALSE);
    elapsed_time = TIME_STOP;
    LOG_WRITE("H1", elapsed_time)
    //TIME_START;
    //Heuristic1(ctx, MY_FALSE);
    //elapsed_time = TIME_STOP;
    //LOG_WRITE("H1", elapsed_time)
    

    /* find density */ 
@@ -3780,20 +4157,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    #endif

    /*
    for(int i = 0; i < ctx -> local_n_points; ++i)
    {
        free(dp_info[i].ngbh.data);
    }

    for(int i = 0; i < ctx -> world_size; ++i)
    {
        for(int j = 0; j < ctx -> n_halo_points_recv[i]; ++j)
        {
            free(foreign_dp_info[i][j].ngbh.data);
        }
        free(foreign_dp_info[i]);
    }

    free(foreign_dp_info);
    */
    
@@ -3810,5 +4173,4 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    original_ps.data = NULL;
    free_pointset(&original_ps);
    free(global_bin_counts_int);

}