Commit 25dce9c1 authored by lykos98's avatar lykos98
Browse files

added file export with ranks per node

parent 286638c6
Loading
Loading
Loading
Loading
+4 −1
Original line number Diff line number Diff line
@@ -2058,7 +2058,10 @@ void Heuristic3(global_context_t* ctx, clusters_t* cluster, float_t Z, int halo)
					int cidx = dp_info[i].cluster_idx;
					int halo_flag = dp_info[i].log_rho_c < max_border_den_array[cidx]; 
					//int halo_flag = max_border_den_array[cidx] > dp_info[i].log_rho_c  ; 
					dp_info[i].cluster_idx = halo_flag ? -1 : cidx;

                    //changed_here
					//dp_info[i].cluster_idx = halo_flag ? -1 : cidx;
					dp_info[i].halo_flag = halo_flag;
				}
			}
			free(max_border_den_array);
+148 −4
Original line number Diff line number Diff line
@@ -5,6 +5,7 @@
#define ARRAY_INCREMENT 100

#define FREE_NOT_NULL(x) if(x){free(x); x = NULL;}
#define PATH_LEN 500

void get_context(global_context_t* ctx)
{
@@ -26,6 +27,107 @@ void get_context(global_context_t* ctx)
    ctx -> __local_heap_buffers = NULL;
}

void get_dataset_diagnostics(global_context_t* ctx, float_t* data)
{
    //print mean and variance per column of the dataset
    float_t* mean = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t));
    float_t* var  = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t));

    for(int i = 0; i < ctx -> dims; ++i)
    {
        mean[i] = 0.;
        var[i]  = 0.;
    }

    int jmax = ctx -> dims - (ctx -> dims % 4);
    #pragma omp parallel
    {
        float_t* pvt_mean = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t));
        for(int j = 0; j < ctx -> dims; ++j) pvt_mean[j] = 0.;

        #pragma omp for
        for(int i = 0; i < ctx -> n_points; ++i)
        {
            int j = 0;
            for(j = 0; j < jmax; j+=4)
            {
                pvt_mean[j    ] += data[i * ctx -> dims + j    ];
                pvt_mean[j + 1] += data[i * ctx -> dims + j + 1];
                pvt_mean[j + 2] += data[i * ctx -> dims + j + 2];
                pvt_mean[j + 3] += data[i * ctx -> dims + j + 3];
            }

            for(j = jmax; j < ctx -> dims; ++j)
            {
                pvt_mean[j] += data[i * ctx -> dims + j];
            }
        }

        for(int j = 0; j < ctx -> dims; ++j)
        {
            #pragma omp atomic update
            mean[j] += pvt_mean[j];
        }

        free(pvt_mean);
    }

    for(int i = 0; i < ctx -> dims; ++i)
    {
        mean[i] = mean[i] / (float_t) ctx -> n_points;
    }

    #pragma omp parallel
    {
        float_t* pvt_var = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t));
        for(int j = 0; j < ctx -> dims; ++j) pvt_var[j] = 0.;

        #pragma omp for
        for(int i = 0; i < ctx -> n_points; ++i)
        {
            int j = 0;
            for(j = 0; j < jmax; j+=4)
            {
                float_t v0 = mean[j    ] - data[i * ctx -> dims + j    ];
                float_t v1 = mean[j + 1] - data[i * ctx -> dims + j + 1];
                float_t v2 = mean[j + 2] - data[i * ctx -> dims + j + 2];
                float_t v3 = mean[j + 3] - data[i * ctx -> dims + j + 3];

                pvt_var[j    ] += v0 * v0;
                pvt_var[j + 1] += v1 * v1;
                pvt_var[j + 2] += v2 * v2;
                pvt_var[j + 3] += v3 * v3;
            }

            for(j = jmax; j < ctx -> dims; ++j)
            {
                float_t v = mean[j] - data[i * ctx -> dims + j];
                pvt_var[j] += v * v;
            }
        }

        for(int j = 0; j < ctx -> dims; ++j)
        {
            #pragma omp atomic update
            var[j] += pvt_var[j];
        }

        free(pvt_var);
    }

    for(int i = 0; i < ctx -> dims; ++i)
    {
        var[i]  = var[i]  / ((float_t) ctx -> n_points - 1);
    }

    for(int j = 0; j < ctx -> dims; ++j)
    {
        printf("dim%2d   mean %.2e std %.2e\n", j, mean[j], sqrt(var[j]));
    }
    free(mean);
    free(var);
}

void print_error_code(int err)
{
    switch (err) 
@@ -225,7 +327,7 @@ static inline int get_unit_measure(size_t bytes)
float_t* read_data_file(global_context_t *ctx, const char *fname, const idx_t ndims,
                        const int file_in_float32) 
{

    printf("Reading %s\n",fname);
    FILE *f = fopen(fname, "r");
    if (!f) 
    {
@@ -336,6 +438,24 @@ void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size,
    MPI_Barrier(ctx -> mpi_communicator);
}

void distributed_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname)
{
    char out_path_w_proc_name[PATH_LEN]; 
    MPI_DB_PRINT("[MASTER] writing to file %s.%s\n", fname, "[proc]");
    snprintf(out_path_w_proc_name, PATH_LEN, "%s.%d", fname, ctx -> mpi_rank);
    FILE* file = fopen(out_path_w_proc_name,"w");
    if(!file)
    {
        printf("Cannot open file %s ! Aborting \n", fname);
    }
    else
    {
        fwrite(buffer, 1, el_size * n, file);
    }
    fclose(file);

}

void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname)
{
    //MPI_Barrier(ctx -> mpi_communicator);
@@ -367,7 +487,6 @@ void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_s
        displs[0] = 0;
        for(int i = 0; i < ctx -> world_size; ++i) ppp[i]    = el_size  * ppp[i];
        for(int i = 1; i < ctx -> world_size; ++i) displs[i] = displs[i - 1] + ppp[i - 1];

        for(int i = 0; i < ctx -> world_size; ++i) already_recv[i] = 0;
            
    }
@@ -376,7 +495,7 @@ void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_s
    //Gather on master
    //
    
    uint64_t default_msg_len = 100000; //bytes
    uint64_t default_msg_len = 100000000; //bytes
    
    if(I_AM_MASTER)
    {
@@ -396,7 +515,14 @@ void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_s

                MPI_Recv(tmp_data + displs[r] + already_recv[r], ppp[r], MPI_BYTE, r, r, ctx -> mpi_communicator, MPI_STATUS_IGNORE);
                already_recv[r] += count_recv;

                #ifdef PRINT_ORDERED_BUFFER
                printf("[MASTER] recieved from %d %lu elements out of %lu\n", r, already_recv[r], ppp[r]);
                #endif
            }
            #ifdef PRINT_ORDERED_BUFFER
            printf("-----------\n");
            #endif
        }
    }
    else
@@ -490,3 +616,21 @@ void test_file_path(const char* fname)
    }
    
}

void test_distributed_file_path(global_context_t* ctx, const char* fname)
{
    char out_path_w_proc_name[PATH_LEN]; 
    snprintf(out_path_w_proc_name, PATH_LEN, "%s.%d", fname, ctx -> mpi_rank);
    FILE* file = fopen(out_path_w_proc_name,"w");
    if(!file)
    {
        printf("Cannot open file %s ! Aborting \n", out_path_w_proc_name);
    }
    else
    {
        fprintf(file, "This is only to test if I can open a file in the desidered path\n");
        fprintf(file, "Here will be written the output of dadp\n");
    }
    fclose(file);

}
+6 −0
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@
//#define PRINT_NGBH_EXCHANGE_SCHEME
//#define PRINT_H2_COMM_SCHEME
//#define PRINT_H1_CLUSTER_ASSIGN_COMPLETION
//#define PRINT_ORDERED_BUFFER

typedef struct datapoint_info_t {
    idx_t array_idx;
@@ -30,6 +31,7 @@ typedef struct datapoint_info_t {
    idx_t kstar;
    int is_center;
    int cluster_idx;
    int halo_flag;
} datapoint_info_t;

#define MAX(A,B) ((A) > (B) ? (A) : (B))
@@ -199,3 +201,7 @@ void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size,
void test_file_path(const char* fname);
void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname);
float_t* read_data_file(global_context_t *ctx, const char *fname, const idx_t ndims, const int file_in_float32);
void get_dataset_diagnostics(global_context_t* ctx, float_t* data);

void test_distributed_file_path(global_context_t* ctx, const char* fname);
void distributed_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname);
+59 −15
Original line number Diff line number Diff line
@@ -9,10 +9,12 @@
#ifdef AMONRA
    #pragma message "Hi, you are on amonra"
    #define OUT_CLUSTER_ASSIGN "/beegfs/ftomba/phd/results/final_assignment.npy"
    #define OUT_HALO_FLAGS     "/beegfs/ftomba/phd/results/halo_flags.npy"
    #define OUT_DATA           "/beegfs/ftomba/phd/results/ordered_data.npy"
#else
    #define OUT_CLUSTER_ASSIGN "/leonardo_scratch/large/userexternal/ftomba00/final_assignment.npy"
    #define OUT_DATA           "/leonardo_scratch/large/userexternal/ftomba00/ordered_data.npy"
    #define OUT_CLUSTER_ASSIGN "/leonardo_scratch/large/userexternal/ftomba00/out_dadp/final_assignment.npy"
    #define OUT_HALO_FLAGS     "/leonardo_scratch/large/userexternal/ftomba00/out_dadp/halo_flags.npy"
    #define OUT_DATA           "/leonardo_scratch/large/userexternal/ftomba00/out_dadp/ordered_data.npy"
#endif

//
@@ -106,10 +108,22 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    TIME_DEF
    double elapsed_time;

    if(I_AM_MASTER)
    float_t z = 3;
    int halo = MY_TRUE;
    float_t tol = 0.002;
    int k = 300;

    if(I_AM_MASTER && ctx -> world_size <= 6)
    {
        test_file_path(OUT_DATA);
        test_file_path(OUT_CLUSTER_ASSIGN);
        if(halo) test_file_path(OUT_HALO_FLAGS);
    }
    else
    {
        test_distributed_file_path(ctx, OUT_DATA);
        test_distributed_file_path(ctx, OUT_CLUSTER_ASSIGN);
        if(halo) test_distributed_file_path(ctx, OUT_HALO_FLAGS);
    }


@@ -130,12 +144,15 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
        // data = read_data_file(ctx,"../norm_data/very_huge_blobs.npy",MY_FALSE);
        // data = read_data_file(ctx,"../norm_data/hd_blobs.npy",5,MY_FALSE);
        
        //1B points
        //data = read_data_file(ctx,"../norm_data/eds_box_acc_normalized",5,MY_FALSE);

        // 190M points
        // std_g2980844_091_0000
        data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",5,MY_TRUE);
        // data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",5,MY_TRUE);
        
        /* 1M points ca.*/
        //data = read_data_file(ctx,"../norm_data/std_LR_091_0001",5,MY_TRUE);
        data = read_data_file(ctx,"../norm_data/std_LR_091_0001",5,MY_TRUE);

        /* BOX */
        // data = read_data_file(ctx,"../norm_data/std_Box_256_30_092_0000",MY_TRUE);
@@ -155,6 +172,8 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
        // ctx->n_points = ctx->n_points / 2;
        // ctx->n_points = (ctx->n_points / 32) * ctx -> world_size;

        get_dataset_diagnostics(ctx, data);

    }
    
    /* communicate the total number of points*/
@@ -239,7 +258,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    original_ps.lb_box = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t));
    original_ps.ub_box = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t));

    float_t tol = 0.002;

    top_kdtree_t tree;
    TIME_START;
@@ -256,7 +274,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    TIME_START;
    kdtree_v2 local_tree;
    kdtree_v2_init( &local_tree, ctx -> local_data, ctx -> local_n_points, (unsigned int)ctx -> dims);
    int k = 300;

    datapoint_info_t* dp_info = (datapoint_info_t*)MY_MALLOC(ctx -> local_n_points * sizeof(datapoint_info_t));            
    for(uint64_t i = 0; i < ctx -> local_n_points; ++i)
@@ -272,6 +289,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
        dp_info[i].kstar = -1;
        dp_info[i].is_center = -1;
        dp_info[i].cluster_idx = -1;
        dp_info[i].halo_flag = 0;
    }
    ctx -> local_datapoints = dp_info;

@@ -301,7 +319,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)

    TIME_START;

    float_t z = 3;

    compute_density_kstarnn_rma_v2(ctx, id, MY_FALSE);
    compute_correction(ctx, z);
@@ -321,19 +338,48 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)


    TIME_START;
    int halo = 1;
    Heuristic3(ctx, &clusters, z, halo);
    elapsed_time = TIME_STOP;
    LOG_WRITE("H3", elapsed_time)

    /* write final assignment and data */
    
    TIME_START;
    int* cl = (int*)MY_MALLOC(ctx -> local_n_points * sizeof(int));
    for(int i = 0; i < ctx -> local_n_points; ++i) cl[i] = ctx -> local_datapoints[i].cluster_idx;

    if(ctx -> world_size <= 6)
    {
        big_ordered_buffer_to_file(ctx, cl, sizeof(int), ctx -> local_n_points, OUT_CLUSTER_ASSIGN);
        big_ordered_buffer_to_file(ctx, ctx -> local_data, sizeof(double), ctx -> local_n_points * ctx -> dims, OUT_DATA);

        if(halo)
        {
            int* halo_flags = (int*)MY_MALLOC(ctx -> local_n_points * sizeof(int));
            for(int i = 0; i < ctx -> local_n_points; ++i) cl[i] = ctx -> local_datapoints[i].halo_flag;
            big_ordered_buffer_to_file(ctx, halo_flags, sizeof(int), ctx -> local_n_points, OUT_HALO_FLAGS);
            free(halo_flags);
        }

        free(cl);

    }
    else
    {
        distributed_buffer_to_file(ctx, cl, sizeof(int), ctx -> local_n_points, OUT_CLUSTER_ASSIGN);
        distributed_buffer_to_file(ctx, ctx -> local_data, sizeof(double), ctx -> local_n_points * ctx -> dims, OUT_DATA);

        if(halo)
        {
            int* halo_flags = (int*)MY_MALLOC(ctx -> local_n_points * sizeof(int));
            for(int i = 0; i < ctx -> local_n_points; ++i) cl[i] = ctx -> local_datapoints[i].halo_flag;
            distributed_buffer_to_file(ctx, halo_flags, sizeof(int), ctx -> local_n_points, OUT_HALO_FLAGS);
            free(halo_flags);
        }

        free(cl);

    }

    elapsed_time = TIME_STOP;
    LOG_WRITE("Write results to file", elapsed_time);
    
@@ -346,8 +392,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    free(displacements);
    //free(dp_info);
    

    
    original_ps.data = NULL;
    free_pointset(&original_ps);
    free(global_bin_counts_int);
+21 −5
Original line number Diff line number Diff line
@@ -447,6 +447,8 @@ void compute_pure_global_binning(global_context_t *ctx, pointset_t *ps,

    float_t bin_w = (ps-> ub_box[d] - ps->lb_box[d]) / (float_t)k_global;

    /* THIS IS problematic when ps -> ub_box == ps -> lb_box */

    #pragma omp parallel for
    for (size_t i = 0; i < ps->n_points; ++i) 
    {
@@ -688,7 +690,7 @@ top_kdtree_node_t* top_tree_generate_node(global_context_t* ctx, top_kdtree_t* t
    ptr -> lb_node_box = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t));
    ptr -> ub_node_box = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t));
    ptr -> owner       = -1;
    ptr -> split_dim   = 0.;
    ptr -> split_dim   = 0;
    ++tree -> count;
    return ptr;
 
@@ -810,13 +812,24 @@ void build_top_kdtree(global_context_t *ctx, pointset_t *og_pointset, top_kdtree
        /* insert node */
        
        /*
        MPI_DB_PRINT("Handling partition: \n\tcurrent_node %p, \n\tdim %d, \n\tn_points %d, \n\tstart_proc %d, \n\tn_procs %d, \n\tparent %p\n", 
        MPI_DB_PRINT(   "[RANK %d] Handling partition:\n"\
                        "\tcurrent_node %p,\n"\
                        "\tdim %d,\n"\
                        "\tn_points %lu,\n"\
                        "\tstart_proc %d,\n"\
                        "\tn_procs %d\n"\
                        "\tparent %p\n"\
                        "\tbase_ptr %p\n"\
                        "\tlr %d\n", 
                ctx -> mpi_rank,
                current_node,
                current_partition.d,
                current_partition.n_points,
                current_partition.start_proc,
                current_partition.n_procs,
                current_partition.parent);
                current_partition.parent,
                current_partition.base_ptr,
                current_partition.lr);
        */

        switch (current_partition.lr) {
@@ -1079,6 +1092,9 @@ void exchange_points(global_context_t* ctx, top_kdtree_t* tree)

    /*exchange points */


    /* this need to change */

    MPI_Alltoallv(  ctx -> local_data, send_count, send_displs, MPI_MY_FLOAT, 
                    rcvbuffer, rcv_count, rcv_displs, MPI_MY_FLOAT, 
                    ctx -> mpi_communicator);