Commit b9da39c5 authored by lykos98's avatar lykos98
Browse files

resolved a bug on H1 on removed centers

parent 86a03a48
Loading
Loading
Loading
Loading
+15 −8
Original line number Diff line number Diff line
@@ -52,7 +52,7 @@ float_t compute_ID_two_NN_ML(global_context_t* ctx, datapoint_info_t* dp_info, i
		clock_gettime(CLOCK_MONOTONIC, &start_tot);
	}
    
    float_t log_mus = 0;
    float_t log_mus = 0.;
    for(idx_t i = 0; i < n; ++i)
    {
        log_mus += 0.5 * log(dp_info[i].ngbh.data[2].value/dp_info[i].ngbh.data[1].value);
@@ -244,7 +244,7 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver
        ordered_buffer_to_file(ctx, ks, sizeof(idx_t), ctx -> local_n_points, "bb/ks.npy");
        ordered_buffer_to_file(ctx, gs, sizeof(float_t), ctx -> local_n_points, "bb/g.npy");

        ordered_data_to_file(ctx);
        ordered_data_to_file(ctx, "bb/ordered_data.npy");
        free(den);
        free(ks);
    #endif
@@ -437,7 +437,7 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int
        ordered_buffer_to_file(ctx, ks, sizeof(idx_t), ctx -> local_n_points, "bb/ks.npy");
        ordered_buffer_to_file(ctx, gs, sizeof(float_t), ctx -> local_n_points, "bb/g.npy");

        ordered_data_to_file(ctx);
        ordered_data_to_file(ctx, "bb/ordered_data.npy");
        free(den);
        free(ks);
    #endif
@@ -822,7 +822,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)
		clock_gettime(CLOCK_MONOTONIC, &start);
	}


	/* 
	 * optimized version
	 *
@@ -846,7 +845,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)



    /* check into internal nodes */

    /* 
     * to remove 
@@ -934,7 +932,9 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)
        {
            case 1:
                {
                    lu_dynamic_array_pushBack(&removed_centers,i);
                    //lu_dynamic_array_pushBack(&removed_centers,i);
                    lu_dynamic_array_pushBack(&removed_centers,dp_info[i].array_idx);
                    //here it sets is_center to 0
                    dp_info[i].is_center = 0;
                    for(idx_t c = 0; c < removed_centers.count - 1; ++c)
                    {
@@ -1011,6 +1011,8 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)
    {
        idx_t idx = actual_centers.data[i];
        dp_info[idx].cluster_idx += center_displs[ctx -> mpi_rank];

        //this tranlates them to global indexing
        actual_centers.data[i] += ctx -> idx_start;
    }

@@ -1067,7 +1069,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)
                    }
                }


                if(cluster == -1 && !wait_for_comms)
                {
                    float_t gmax = -99999.;               
@@ -1102,6 +1103,11 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)

        }

        #ifdef PRINT_H1_CLUSTER_ASSIGN_COMPLETION
        DB_PRINT("[RANK %d] proc points %d completed %d %lu\n", ctx -> mpi_rank, proc_points, completed, ctx -> local_n_points);
        MPI_Barrier(ctx -> mpi_communicator);
        #endif

        MPI_Allreduce(MPI_IN_PLACE, &completed, 1, MPI_INT, MPI_SUM, ctx -> mpi_communicator);
        completed = completed == ctx -> world_size ? 1 : 0;

@@ -1133,7 +1139,7 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)
        for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].cluster_idx;

        ordered_buffer_to_file(ctx, ks, sizeof(int), ctx -> local_n_points, "bb/cl.npy");
        ordered_data_to_file(ctx);
        ordered_data_to_file(ctx, "bb/ordered_data.npy");
        free(ks);
    #endif

@@ -2078,6 +2084,7 @@ void Heuristic3(global_context_t* ctx, clusters_t* cluster, float_t Z, int halo)
        for(int i = 0; i < ctx -> local_n_points; ++i) cl[i] = ctx -> local_datapoints[i].cluster_idx;

        ordered_buffer_to_file(ctx, cl, sizeof(int), ctx -> local_n_points, "bb/final_assignment.npy");
        ordered_data_to_file(ctx, "bb/ordered_data.npy");

        free(cl);
        
+1 −0
Original line number Diff line number Diff line
@@ -55,3 +55,4 @@ void clusters_allocate(clusters_t * c, int s);
clusters_t Heuristic1(global_context_t *ctx, int verbose);
void Heuristic2(global_context_t* ctx, clusters_t* cluster);
void Heuristic3(global_context_t* ctx, clusters_t* cluster, float_t Z, int halo);
void clusters_free(clusters_t * c);
+155 −0
Original line number Diff line number Diff line
@@ -253,3 +253,158 @@ void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size,
    }
    MPI_Barrier(ctx -> mpi_communicator);
}

void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname)
{
    //MPI_Barrier(ctx -> mpi_communicator);
    MPI_DB_PRINT("[MASTER] writing to file %s\n", fname);
    void* tmp_data; 
    idx_t already_sent = 0;
    idx_t* ppp; 
    idx_t* displs;
    idx_t* already_recv;

    MPI_Barrier(ctx -> mpi_communicator);
    
    uint64_t tot_n = 0;
    MPI_Reduce(&n, &tot_n, 1, MPI_UINT64_T , MPI_SUM, 0, ctx -> mpi_communicator);

    if(I_AM_MASTER) 
    {
        tmp_data = (void*)MY_MALLOC(el_size * tot_n );
        ppp      = (idx_t*)MY_MALLOC(ctx -> world_size * sizeof(idx_t));
        displs   = (idx_t*)MY_MALLOC(ctx -> world_size * sizeof(idx_t));
        already_recv   = (idx_t*)MY_MALLOC(ctx -> world_size * sizeof(idx_t));

    }
    
    MPI_Gather(&n, 1, MPI_UINT64_T, ppp, 1, MPI_UINT64_T, 0, ctx -> mpi_communicator);

    if(I_AM_MASTER)
    {
        displs[0] = 0;
        for(int i = 0; i < ctx -> world_size; ++i) ppp[i]    = el_size  * ppp[i];
        for(int i = 1; i < ctx -> world_size; ++i) displs[i] = displs[i - 1] + ppp[i - 1];

        for(int i = 0; i < ctx -> world_size; ++i) already_recv[i] = 0;
            
    }


    //Gather on master
    //
    
    uint64_t default_msg_len = 10000000; //bytes
    
    if(I_AM_MASTER)
    {
        //recieve from itself
        memcpy(tmp_data, buffer, n * el_size);   
        for(int r = 1; r < ctx -> world_size; ++r)
        {
            while(already_recv[r] < ppp[r])
            {
                MPI_Status status;
                MPI_Probe(r, MPI_ANY_TAG, ctx -> mpi_communicator, &status);

                MPI_Request request;
                int count_recv; 
                int source = status.MPI_SOURCE;
                MPI_Get_count(&status, MPI_BYTE, &count_recv);

                MPI_Recv(tmp_data + displs[r] + already_recv[r], ppp[r], MPI_BYTE, r, r, ctx -> mpi_communicator, MPI_STATUS_IGNORE);
                already_recv[r] += count_recv;
            }
        }
    }
    else
    {
        while(already_sent < n * el_size) 
        {
            int count_send = MIN(default_msg_len, n * el_size - already_sent); 
            MPI_Send(buffer + already_sent, count_send, MPI_BYTE, 0, ctx -> mpi_rank, ctx -> mpi_communicator);
            already_sent += count_send;
        }
    }

    if(I_AM_MASTER)
    {
        FILE* file = fopen(fname,"w");
        if(!file)
        {
            printf("Cannot open file %s ! Aborting \n", fname);
        }
        fwrite(tmp_data, 1, el_size * tot_n, file);
        fclose(file);
        free(tmp_data);
        free(ppp);
        free(displs);

    }
    MPI_Barrier(ctx -> mpi_communicator);
}

void ordered_data_to_file(global_context_t* ctx, const char* fname)
{
    //MPI_Barrier(ctx -> mpi_communicator);
    MPI_DB_PRINT("[MASTER] writing DATA to file\n");
    float_t* tmp_data; 
    int* ppp; 
    int* displs;

    MPI_Barrier(ctx -> mpi_communicator);
    if(I_AM_MASTER) 
    {
        tmp_data = (float_t*)MY_MALLOC(ctx -> dims * ctx -> n_points * sizeof(float_t));
        ppp      = (int*)MY_MALLOC(ctx -> world_size * sizeof(int));
        displs   = (int*)MY_MALLOC(ctx -> world_size * sizeof(int));

    }
    
    MPI_Gather(&(ctx -> local_n_points), 1, MPI_INT, ppp, 1, MPI_INT, 0, ctx -> mpi_communicator);

    if(I_AM_MASTER)
    {
        displs[0] = 0;
        for(int i = 0; i < ctx -> world_size; ++i) ppp[i]    = ctx -> dims * ppp[i];
        for(int i = 1; i < ctx -> world_size; ++i) displs[i] = displs[i - 1] + ppp[i - 1];
            
    }
    MPI_Gatherv(ctx -> local_data, ctx -> dims * ctx -> local_n_points, 
            MPI_MY_FLOAT, tmp_data, ppp, displs, MPI_MY_FLOAT, 0, ctx -> mpi_communicator);

    if(I_AM_MASTER)
    {
        FILE* file = fopen(fname,"w");
        if(file)
        {
            fwrite(tmp_data, sizeof(float_t), ctx -> dims * ctx -> n_points, file);
            fclose(file);
        }
        else
        {
            printf("Cannot open file %s\n", fname);
        }
        free(tmp_data);
        free(ppp);
        free(displs);
    }
    MPI_Barrier(ctx -> mpi_communicator);
}

void test_file_path(const char* fname)
{
    FILE* file = fopen(fname,"w");
    if(file)
    {
        fprintf(file, "This is only to test if I can open a file in the desidered path\n");
        fprintf(file, "Here willbe written the output of dadp\n");
        fclose(file);
    }
    else
    {
        printf("Cannot open file %s\n", fname);
        exit(1);
    }
    
}
+6 −3
Original line number Diff line number Diff line
@@ -14,10 +14,11 @@
//#define WRITE_CLUSTER_ASSIGN_H1
//#define WRITE_BORDERS
//#define WRITE_MERGING_TABLE
#define WRITE_FINAL_ASSIGNMENT
//#define WRITE_FINAL_ASSIGNMENT

//#define PRINT_NGBH_EXCHANGE_SCHEME
#define PRINT_H2_COMM_SCHEME
//#define PRINT_H2_COMM_SCHEME
//#define PRINT_H1_CLUSTER_ASSIGN_COMPLETION

typedef struct datapoint_info_t {
    idx_t array_idx;
@@ -193,5 +194,7 @@ void lu_dynamic_array_reserve(lu_dynamic_array_t * a, idx_t n);
void lu_dynamic_array_init(lu_dynamic_array_t * a);
void print_error_code(int err);

void ordered_data_to_file(global_context_t* ctx);
void ordered_data_to_file(global_context_t* ctx, const char* fname);
void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname);
void test_file_path(const char* fname);
void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname);
+49 −26
Original line number Diff line number Diff line
@@ -6,6 +6,15 @@
#include "../adp/adp.h"


#ifdef AMONRA
    #pragma message "Hi, you are on amonra"
    #define OUT_CLUSTER_ASSIGN "/beegfs/ftomba/phd/results/final_assignment.npy"
    #define OUT_DATA           "/beegfs/ftomba/phd/results/ordered_data.npy"
#else
    #define OUT_CLUSTER_ASSIGN "/leonardo_scratch/large/userexternal/ftomba00/final_assignment.npy"
    #define OUT_DATA           "/leonardo_scratch/large/userexternal/ftomba00/ordered_data.npy"
#endif

//

#ifdef THREAD_FUNNELED
@@ -97,19 +106,33 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    TIME_DEF
    double elapsed_time;

    if(I_AM_MASTER)
    {
        test_file_path(OUT_DATA);
        test_file_path(OUT_CLUSTER_ASSIGN);
    }


    if (ctx->mpi_rank == 0) 
    {
        //data = read_data_file(ctx, "../norm_data/50_blobs_more_var.npy", MY_TRUE);
        //ctx->dims = 2;
        //data = read_data_file(ctx, "../norm_data/50_blobs.npy", MY_TRUE);
        //data = read_data_file(ctx, "../norm_data/blobs_small.npy", MY_FALSE);
        // std_g0163178_Me14_091_0000
    
        // 100M points
        // 2D
        // std_g2980844_091_0000
        //data = read_data_file(ctx,"../norm_data/huge_blobs.npy",MY_FALSE);
        // 2B points
        // data = read_data_file(ctx,"../norm_data/very_huge_blobs.npy",MY_FALSE);

        // 190M points
        // std_g2980844_091_0000
        data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",MY_TRUE);
        // data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",MY_TRUE);
        
        /* 1M points ca.*/
        // data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE);
        data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE);

        /* BOX */
        // data = read_data_file(ctx,"../norm_data/std_Box_256_30_092_0000",MY_TRUE);
@@ -125,17 +148,14 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
        //34 M
        //data = read_data_file(ctx,"../norm_data/std_g1212639_091_0001",MY_TRUE);
        ctx -> dims = 5;

        //ctx -> n_points = 5 * 100000;
        // ctx->dims = 2;
        ctx->n_points = ctx->n_points / ctx->dims;
        //ctx->n_points = (ctx->n_points * 5) / 10;
        // ctx -> n_points = ctx -> world_size * 1000;

        //ctx -> n_points = 10000000 * ctx -> world_size;
        //generate_random_matrix(&data, ctx -> dims, ctx -> n_points, ctx);
        //mpi_printf(ctx, "Read %lu points in %u dims\n", ctx->n_points, ctx->dims);
        //for weak scalability 
        // ctx->n_points = ctx->n_points / 2;
        // ctx->n_points = (ctx->n_points / 32) * ctx -> world_size;

    }
    //MPI_DB_PRINT("[MASTER] Reading file and scattering\n");
    
    /* communicate the total number of points*/
    MPI_Bcast(&(ctx->dims), 1, MPI_UINT32_T, 0, ctx->mpi_communicator);
@@ -165,6 +185,8 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)

    MPI_Scatterv(data, send_counts, displacements, MPI_MY_FLOAT, pvt_data, send_counts[ctx->mpi_rank], MPI_MY_FLOAT, 0, ctx->mpi_communicator);

    if (I_AM_MASTER) free(data);

    ctx->local_data = pvt_data;

    int k_local = 20;
@@ -201,7 +223,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    //int k = 30;

    datapoint_info_t* dp_info = (datapoint_info_t*)MY_MALLOC(ctx -> local_n_points * sizeof(datapoint_info_t));            
    /* initialize, to cope with valgrind */
    for(uint64_t i = 0; i < ctx -> local_n_points; ++i)
    {
        dp_info[i].ngbh.data = NULL;
@@ -216,6 +237,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
        dp_info[i].is_center = -1;
        dp_info[i].cluster_idx = -1;
    }
    ctx -> local_datapoints = dp_info;

    build_local_tree(ctx, &local_tree);
    elapsed_time = TIME_STOP;
@@ -243,9 +265,8 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)

    TIME_START;

    float_t  z = 2;
    float_t z = 3;

    ctx -> local_datapoints = dp_info;
    //compute_density_kstarnn_rma(ctx, id, MY_FALSE);
    compute_density_kstarnn_rma_v2(ctx, id, MY_FALSE);
    compute_correction(ctx, z);
@@ -265,30 +286,32 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)


    TIME_START;
    int halo = 0;
    int halo = 1;
    Heuristic3(ctx, &clusters, z, halo);
    elapsed_time = TIME_STOP;
    LOG_WRITE("H3", elapsed_time)

    /* write final assignment and data */
    
    /* find density */ 
    #if defined (WRITE_NGBH)
        ordered_data_to_file(ctx);
    #endif

    /*
    free(foreign_dp_info);
    */
    TIME_START;
    int* cl = (int*)MY_MALLOC(ctx -> local_n_points * sizeof(int));
    for(int i = 0; i < ctx -> local_n_points; ++i) cl[i] = ctx -> local_datapoints[i].cluster_idx;
    big_ordered_buffer_to_file(ctx, cl, sizeof(int), ctx -> local_n_points, OUT_CLUSTER_ASSIGN);
    big_ordered_buffer_to_file(ctx, ctx -> local_data, sizeof(double), ctx -> local_n_points * ctx -> dims, OUT_DATA);
    free(cl);
    elapsed_time = TIME_STOP;
    LOG_WRITE("Write results to file", elapsed_time);
    
    
    top_tree_free(ctx, &tree);
    kdtree_v2_free(&local_tree);
    //clusters_free(&clusters);

    free(send_counts);
    free(displacements);
    //free(dp_info);

    if (ctx->mpi_rank == 0) free(data);

    
    original_ps.data = NULL;
    free_pointset(&original_ps);
Loading