Commit 286638c6 authored by lykos98's avatar lykos98
Browse files

added read file improvement

parent b9da39c5
Loading
Loading
Loading
Loading
+83 −1
Original line number Diff line number Diff line
@@ -202,6 +202,88 @@ void lu_dynamic_array_init(lu_dynamic_array_t * a)
    a -> size = 0;
}

const char*  __units[3] = {"MB", "GB", "TB"};
const double __multiplier[3] = {1e6, 1e9, 1e12}; 

static inline int get_unit_measure(size_t bytes)
{
    if((double)bytes < (1e9))
    {
        return 0;
    }
    else if ((double)bytes < (1e12)) {
        return 1; 
    }
    else
    {
        return 2;
    }


}

float_t* read_data_file(global_context_t *ctx, const char *fname, const idx_t ndims,
                        const int file_in_float32) 
{

    FILE *f = fopen(fname, "r");
    if (!f) 
    {
        printf("Nope\n");
        exit(1);
    }
    fseek(f, 0, SEEK_END);
    size_t n = ftell(f);
    rewind(f);

    int InputFloatSize = file_in_float32 ? 4 : 8;

    n = n / (InputFloatSize);

    float_t *data = (float_t *)MY_MALLOC(n * sizeof(float_t));

    if (file_in_float32) 
    {
        float *df = (float *)MY_MALLOC(n * sizeof(float));
        size_t fff = fread(df, sizeof(float), n, f);

        int measure = get_unit_measure(fff * sizeof(float));
        double file_len_converted = (double)(fff * sizeof(float))/__multiplier[measure];

        mpi_printf(ctx, "Read %.2lf%s\n", file_len_converted, __units[measure]);

        ctx -> dims = ndims;
        ctx -> n_points = n / ctx -> dims;

        mpi_printf(ctx, "Got ndims %lu npoints %lu\n", ctx -> dims, ctx -> n_points);
        fclose(f);

        for (uint64_t i = 0; i < n; ++i) data[i] = (float_t)(df[i]);

        free(df);
    } 
    else 
    {
        double *df = (double *)MY_MALLOC(n * sizeof(double));
        size_t fff = fread(df, sizeof(double), n, f);

        int measure = get_unit_measure(fff * sizeof(double));
        double file_len_converted = (double)(fff * sizeof(double))/__multiplier[measure];
        mpi_printf(ctx, "Read %.2lf%s\n", file_len_converted, __units[measure]);

        ctx -> dims = ndims;
        ctx -> n_points = n / ctx -> dims;

        mpi_printf(ctx, "Got ndims %lu npoints %lu\n", ctx -> dims, ctx -> n_points);
        fclose(f);

        for (uint64_t i = 0; i < n; ++i) data[i] = (float_t)(df[i]);

        free(df);
    }
    return data;
}

void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname)
{
    //MPI_Barrier(ctx -> mpi_communicator);
@@ -294,7 +376,7 @@ void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_s
    //Gather on master
    //
    
    uint64_t default_msg_len = 10000000; //bytes
    uint64_t default_msg_len = 100000; //bytes
    
    if(I_AM_MASTER)
    {
+3 −2
Original line number Diff line number Diff line
@@ -54,9 +54,9 @@ typedef struct datapoint_info_t {
#define MY_TRUE  1
#define MY_FALSE 0

#define CHECK_ALLOCATION(x) if(!x){printf("[!!!] %d rank encountered failed allocation at line %s ", ctx -> mpi_rank, __LINE__ ); exit(1);};
#define CHECK_ALLOCATION(x) if(!x){printf("[!!!] %d rank encountered failed allocation at line %s \n", ctx -> mpi_rank, __LINE__ ); exit(1);};

#define CHECK_ALLOCATION_NO_CTX(x) if(!x){printf("[!!!] Failed allocation at line %d ", __LINE__ ); exit(1);}
#define CHECK_ALLOCATION_NO_CTX(x) if(!x){printf("[!!!] Failed allocation at line %d \n", __LINE__ ); exit(1);}
#define MY_MALLOC(n) ({void* p = calloc(n,1); CHECK_ALLOCATION_NO_CTX(p); p; })

#define DB_PRINT(...) printf(__VA_ARGS__)
@@ -198,3 +198,4 @@ void ordered_data_to_file(global_context_t* ctx, const char* fname);
void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname);
void test_file_path(const char* fname);
void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname);
float_t* read_data_file(global_context_t *ctx, const char *fname, const idx_t ndims, const int file_in_float32);
+48 −13
Original line number Diff line number Diff line
@@ -113,11 +113,13 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    }


    TIME_START;
    if (ctx->mpi_rank == 0) 
    {
        //data = read_data_file(ctx, "../norm_data/50_blobs_more_var.npy", MY_TRUE);
        //ctx->dims = 2;
        //data = read_data_file(ctx, "../norm_data/blobs_small.npy", MY_FALSE);
        //data = read_data_file(ctx, "../norm_data/blobs_small.npy", MY_FALSE);
        // std_g0163178_Me14_091_0000
    
        // 100M points
@@ -126,13 +128,14 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
        //data = read_data_file(ctx,"../norm_data/huge_blobs.npy",MY_FALSE);
        // 2B points
        // data = read_data_file(ctx,"../norm_data/very_huge_blobs.npy",MY_FALSE);
        // data = read_data_file(ctx,"../norm_data/hd_blobs.npy",5,MY_FALSE);

        // 190M points
        // std_g2980844_091_0000
        // data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",MY_TRUE);
        data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",5,MY_TRUE);
        
        /* 1M points ca.*/
        data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE);
        //data = read_data_file(ctx,"../norm_data/std_LR_091_0001",5,MY_TRUE);

        /* BOX */
        // data = read_data_file(ctx,"../norm_data/std_Box_256_30_092_0000",MY_TRUE);
@@ -147,9 +150,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
        //
        //34 M
        //data = read_data_file(ctx,"../norm_data/std_g1212639_091_0001",MY_TRUE);
        ctx -> dims = 5;
        // ctx->dims = 2;
        ctx->n_points = ctx->n_points / ctx->dims;
        
        //for weak scalability 
        // ctx->n_points = ctx->n_points / 2;
@@ -162,8 +162,8 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    MPI_Bcast(&(ctx->n_points), 1, MPI_UINT64_T, 0, ctx->mpi_communicator);

    /* compute the number of elements to recieve for each processor */
    int *send_counts = (int *)MY_MALLOC(ctx->world_size * sizeof(int));
    int *displacements = (int *)MY_MALLOC(ctx->world_size * sizeof(int));
    idx_t *send_counts = (idx_t *)MY_MALLOC(ctx->world_size * sizeof(idx_t));
    idx_t *displacements = (idx_t *)MY_MALLOC(ctx->world_size * sizeof(idx_t));

    displacements[0] = 0;
    send_counts[0] = ctx->n_points / ctx->world_size;
@@ -183,7 +183,45 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)

    float_t *pvt_data = (float_t *)MY_MALLOC(send_counts[ctx->mpi_rank] * sizeof(float_t));

    MPI_Scatterv(data, send_counts, displacements, MPI_MY_FLOAT, pvt_data, send_counts[ctx->mpi_rank], MPI_MY_FLOAT, 0, ctx->mpi_communicator);
    uint64_t default_msg_len = 10000000; //bytes

    if(I_AM_MASTER)
    {
        memcpy(pvt_data, data, ctx -> dims * ctx -> local_n_points * sizeof(float_t));
        int already_sent_points = 0;
        for(int i = 1; i < ctx -> world_size; ++i)
        {
            already_sent_points = 0;
            while(already_sent_points < send_counts[i])
            {
                int count_send = MIN(default_msg_len, send_counts[i] - already_sent_points); 
                MPI_Send(data + displacements[i] + already_sent_points, count_send, MPI_MY_FLOAT, i, ctx -> mpi_rank, ctx -> mpi_communicator);
                already_sent_points += count_send;
                //DB_PRINT("[RANK 0] has sent to rank %d %d elements out of %lu\n",i, already_sent_points, send_counts[i]);
            }
            //DB_PRINT("------------------------------------------------\n");
        }
    }
    else
    {
        int already_recvd_points = 0;
        while(already_recvd_points < send_counts[ctx -> mpi_rank])
        {
            MPI_Status status;
            MPI_Probe(0, MPI_ANY_TAG, ctx -> mpi_communicator, &status);

            MPI_Request request;
            int count_recv; 
            int source = status.MPI_SOURCE;
            MPI_Get_count(&status, MPI_MY_FLOAT, &count_recv);

            MPI_Recv(pvt_data + already_recvd_points, count_recv, MPI_MY_FLOAT, source, MPI_ANY_TAG, ctx -> mpi_communicator, MPI_STATUS_IGNORE);
            already_recvd_points += count_recv;
        }
    }

    elapsed_time = TIME_STOP;
    LOG_WRITE("Importing file ad scattering", elapsed_time);

    if (I_AM_MASTER) free(data);

@@ -214,13 +252,11 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    exchange_points(ctx, &tree);
    elapsed_time = TIME_STOP;
    LOG_WRITE("Top kdtree build and domain decomposition", elapsed_time);
    //test_the_idea(ctx);

    TIME_START;
    kdtree_v2 local_tree;
    kdtree_v2_init( &local_tree, ctx -> local_data, ctx -> local_n_points, (unsigned int)ctx -> dims);
    int k = 300;
    //int k = 30;

    datapoint_info_t* dp_info = (datapoint_info_t*)MY_MALLOC(ctx -> local_n_points * sizeof(datapoint_info_t));            
    for(uint64_t i = 0; i < ctx -> local_n_points; ++i)
@@ -267,7 +303,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)

    float_t z = 3;

    //compute_density_kstarnn_rma(ctx, id, MY_FALSE);
    compute_density_kstarnn_rma_v2(ctx, id, MY_FALSE);
    compute_correction(ctx, z);
    elapsed_time = TIME_STOP;
+14 −59
Original line number Diff line number Diff line
@@ -49,51 +49,6 @@ int cmp_float_t(const void* a, const void* b)



float_t* read_data_file(global_context_t *ctx, const char *fname,
                        const int file_in_float32) 
{

    FILE *f = fopen(fname, "r");
    if (!f) 
    {
        printf("Nope\n");
        exit(1);
    }
    fseek(f, 0, SEEK_END);
    size_t n = ftell(f);
    rewind(f);

    int InputFloatSize = file_in_float32 ? 4 : 8;

    n = n / (InputFloatSize);

    float_t *data = (float_t *)MY_MALLOC(n * sizeof(float_t));

    if (file_in_float32) 
    {
        float *df = (float *)MY_MALLOC(n * sizeof(float));
        size_t fff = fread(df, sizeof(float), n, f);
        mpi_printf(ctx, "Read %luB\n", fff);
        fclose(f);

        for (uint64_t i = 0; i < n; ++i) data[i] = (float_t)(df[i]);

        free(df);
    } 
    else 
    {
        double *df = (double *)MY_MALLOC(n * sizeof(double));
        size_t fff = fread(df, sizeof(double), n, f);
        mpi_printf(ctx, "Read %luB\n", fff);
        fclose(f);

        for (uint64_t i = 0; i < n; ++i) data[i] = (float_t)(df[i]);

        free(df);
    }
    ctx->n_points = n;
    return data;
}

/* quickselect for an element along a dimension */

+0 −1
Original line number Diff line number Diff line
@@ -90,7 +90,6 @@ typedef struct top_kdtree_t


void     simulate_master_read_and_scatter(int, size_t, global_context_t* ); 
float_t* read_data_file(global_context_t *ctx, const char *fname, const int file_in_float32);
void     top_tree_init(global_context_t *ctx, top_kdtree_t *tree); 
void     build_top_kdtree(global_context_t *ctx, pointset_t *og_pointset, top_kdtree_t *tree, int n_bins, float_t tolerance);
void     exchange_points(global_context_t* ctx, top_kdtree_t* tree);