Commit 025c92bd authored by Francesco Tomba's avatar Francesco Tomba
Browse files

Merge branch 'update-tree' into 'main'

added main

See merge request !5
parents e38c4697 1828cb00
Loading
Loading
Loading
Loading
+134 −0
Original line number Diff line number Diff line
@@ -359,6 +359,140 @@ float_t* read_data_file(global_context_t *ctx, const char *fname, const idx_t nd
    return data;
}

float_t* mpiio_read_data_file(global_context_t *ctx, const char *fname, const idx_t ndims,
                                const int file_in_float32, MPI_Comm comm, int rank, int size)
{
    int elem_size = file_in_float32 ? 4 : 8;
    MPI_Datatype filetype;
    MPI_Datatype mpicounttype;
    
    if (file_in_float32) {
        mpicounttype = MPI_FLOAT;
    } else {
        mpicounttype = MPI_DOUBLE;
    }

    MPI_File fh;
    MPI_Status status;
    MPI_Offset filesize;
    size_t local_n_points;
    size_t local_count;
    MPI_Offset local_offset;

    if (rank == 0) {
        printf("MPI-IO: Reading %s\n", fname);
    }

    MPI_Barrier(comm);

    int ret = MPI_File_open(comm, fname, MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
    if (ret != MPI_SUCCESS) {
        char error_string[MPI_MAX_ERROR_STRING];
        int error_len;
        MPI_Error_string(ret, error_string, &error_len);
        if (rank == 0) {
            fprintf(stderr, "Cannot open file %s: %s\n", fname, error_string);
        }
        MPI_Abort(comm, 1);
    }

    MPI_File_get_size(fh, &filesize);
    
    if (rank == 0) {
        size_t total_elements = filesize / elem_size;
        ctx->n_points = total_elements / ndims;
        ctx->dims = ndims;
        printf("MPI-IO: File size = %lld, total_elements = %zu, n_points = %zu, dims = %u\n",
               (long long)filesize, total_elements, (size_t)ctx->n_points, ctx->dims);
    }

    MPI_Bcast(&(ctx->n_points), 1, MPI_UINT64_T, 0, comm);
    MPI_Bcast(&(ctx->dims), 1, MPI_UINT32_T, 0, comm);

    size_t n_points = (size_t)ctx->n_points;
    uint32_t dims = ctx->dims;

    size_t remainder = n_points % size;
    size_t base_points = n_points / size;
    
    if ((size_t)rank < remainder) {
        local_n_points = base_points + 1;
    } else {
        local_n_points = base_points;
    }
    
    local_count = local_n_points * dims;
    local_offset = 0;
    
    for (int r = 0; r < rank; r++) {
        size_t r_points;
        if ((size_t)r < remainder) {
            r_points = base_points + 1;
        } else {
            r_points = base_points;
        }
        local_offset += r_points * dims * elem_size;
    }

    ctx->local_n_points = local_n_points;

    float_t *local_data = (float_t *)MY_MALLOC(local_count * sizeof(float_t));
    if (!local_data) {
        fprintf(stderr, "Rank %d: Failed to allocate local_data\n", rank);
        MPI_Abort(comm, 1);
    }

    if (file_in_float32) {
        float *tmp_buf = (float *)MY_MALLOC(local_count * sizeof(float));
        if (!tmp_buf) {
            fprintf(stderr, "Rank %d: Failed to allocate tmp_buf\n", rank);
            MPI_Abort(comm, 1);
        }
        
        MPI_File_read_at(fh, local_offset, tmp_buf, local_count, MPI_FLOAT, &status);
        
        int count;
        MPI_Get_count(&status, MPI_FLOAT, &count);
        if ((size_t)count != local_count) {
            fprintf(stderr, "Rank %d: Warning - read %d elements, expected %zu\n", rank, count, local_count);
        }
        
        for (size_t i = 0; i < local_count; i++) {
            local_data[i] = (float_t)(tmp_buf[i]);
        }
        
        free(tmp_buf);
    } else {
        double *tmp_buf = (double *)MY_MALLOC(local_count * sizeof(double));
        if (!tmp_buf) {
            fprintf(stderr, "Rank %d: Failed to allocate tmp_buf\n", rank);
            MPI_Abort(comm, 1);
        }
        
        MPI_File_read_at(fh, local_offset, tmp_buf, local_count, MPI_DOUBLE, &status);
        
        int count;
        MPI_Get_count(&status, MPI_DOUBLE, &count);
        if ((size_t)count != local_count) {
            fprintf(stderr, "Rank %d: Warning - read %d elements, expected %zu\n", rank, count, local_count);
        }
        
        for (size_t i = 0; i < local_count; i++) {
            local_data[i] = (float_t)(tmp_buf[i]);
        }
        
        free(tmp_buf);
    }

    MPI_File_close(&fh);

    if (rank == 0) {
        printf("MPI-IO: Read complete. Each rank has %zu points.\n", local_n_points);
    }

    return local_data;
}

void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname)
{
    //MPI_Barrier(ctx -> mpi_communicator);
+1 −0
Original line number Diff line number Diff line
@@ -238,6 +238,7 @@ void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size,
void test_file_path(const char* fname);
void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname);
float_t* read_data_file(global_context_t *ctx, const char *fname, const idx_t ndims, const int file_in_float32);
float_t* mpiio_read_data_file(global_context_t *ctx, const char *fname, const idx_t ndims, const int file_in_float32, MPI_Comm comm, int rank, int size);
void get_dataset_diagnostics(global_context_t* ctx, float_t* data);

void test_distributed_file_path(global_context_t* ctx, const char* fname);
+13 −83
Original line number Diff line number Diff line
@@ -154,6 +154,9 @@ void print_hello(global_context_t* ctx)

}


void mpiio_master_read_and_scatter(int dims, size_t n, global_context_t *ctx);

int main(int argc, char** argv) {
    #if defined (_OPENMP)
        int mpi_provided_thread_level;
@@ -214,11 +217,11 @@ int main(int argc, char** argv) {

	if(ctx.mpi_rank == 0)
	{
		simulate_master_read_and_scatter(5, 1000000, &ctx);	
		mpiio_master_read_and_scatter(5, 1000000, &ctx);	
	}
	else
	{
		simulate_master_read_and_scatter(0, 0, &ctx);	
		mpiio_master_read_and_scatter(0, 0, &ctx);	
	}

	//free(data);
@@ -228,7 +231,7 @@ int main(int argc, char** argv) {



void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) 
void mpiio_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) 
{
    /* TODO
     *
@@ -240,7 +243,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    int halo = MY_TRUE;
    float_t tol = 0.002;

    if(ctx -> world_size <= 6)
    if(ctx -> world_size <= 32)
    {
        if(I_AM_MASTER)
        {
@@ -271,83 +274,12 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    

    TIME_START;
    if (ctx->mpi_rank == 0) 
    {
        data = read_data_file(ctx, ctx -> input_data_file, ctx -> dims, ctx -> input_data_in_float32);
        get_dataset_diagnostics(ctx, data);
    }
    
    /* communicate the total number of points*/
    MPI_Bcast(&(ctx->dims), 1, MPI_UINT32_T, 0, ctx->mpi_communicator);
    MPI_Bcast(&(ctx->n_points), 1, MPI_UINT64_T, 0, ctx->mpi_communicator);

    /* compute the number of elements to recieve for each processor */
    idx_t *send_counts = (idx_t *)MY_MALLOC(ctx->world_size * sizeof(idx_t));
    idx_t *displacements = (idx_t *)MY_MALLOC(ctx->world_size * sizeof(idx_t));

    displacements[0] = 0;
    send_counts[0] = ctx->n_points / ctx->world_size;
    send_counts[0] += (ctx->n_points % ctx->world_size) > 0 ? 1 : 0;
    send_counts[0] = send_counts[0] * ctx->dims;

    for (int p = 1; p < ctx->world_size; ++p) 
    {
        send_counts[p] = (ctx->n_points / ctx->world_size);
        send_counts[p] += (ctx->n_points % ctx->world_size) > p ? 1 : 0;
        send_counts[p] = send_counts[p] * ctx->dims;
        displacements[p] = displacements[p - 1] + send_counts[p - 1];
    }


    ctx->local_n_points = send_counts[ctx->mpi_rank] / ctx->dims;

    float_t *pvt_data = (float_t *)MY_MALLOC(send_counts[ctx->mpi_rank] * sizeof(float_t));


    if(I_AM_MASTER)
    {
        memcpy(pvt_data, data, ctx -> dims * ctx -> local_n_points * sizeof(float_t));
        int already_sent_points = 0;
        for(int i = 1; i < ctx -> world_size; ++i)
        {
            already_sent_points = 0;
            while(already_sent_points < send_counts[i])
            {
                int count_send = MIN(DEFAULT_MSG_LEN, send_counts[i] - already_sent_points); 
                MPI_Send(data + displacements[i] + already_sent_points, count_send, MPI_MY_FLOAT, i, ctx -> mpi_rank, ctx -> mpi_communicator);
                already_sent_points += count_send;
            }
        }
    }
    else
    {
        int already_recvd_points = 0;
        while(already_recvd_points < send_counts[ctx -> mpi_rank])
        {
            MPI_Status status;
            MPI_Probe(0, MPI_ANY_TAG, ctx -> mpi_communicator, &status);

            MPI_Request request;
            int count_recv; 
            int source = status.MPI_SOURCE;
            MPI_Get_count(&status, MPI_MY_FLOAT, &count_recv);

            MPI_Recv(pvt_data + already_recvd_points, count_recv, MPI_MY_FLOAT, source, MPI_ANY_TAG, ctx -> mpi_communicator, MPI_STATUS_IGNORE);
            already_recvd_points += count_recv;
        }
    }
    ctx->local_data = mpiio_read_data_file(ctx, ctx->input_data_file, ctx->dims, 
                                            ctx->input_data_in_float32,
                                            ctx->mpi_communicator, ctx->mpi_rank, ctx->world_size);
    
    elapsed_time = TIME_STOP;
    LOG_WRITE("Importing file ad scattering", elapsed_time);

    if (I_AM_MASTER) free(data);

    ctx->local_data = pvt_data;

    int k_local  = 20;
    int k_global = 20;

    uint64_t *global_bin_counts_int = (uint64_t *)MY_MALLOC(k_global * sizeof(uint64_t));
    LOG_WRITE("Importing file with MPI-IO", elapsed_time);

    top_kdtree_t tree;
    TIME_START;
@@ -548,9 +480,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    top_tree_free(ctx, &tree);
    //clusters_free(&clusters);

    free(send_counts);
    free(displacements);
    //free(send_counts);
    //free(displacements);
    //free(dp_info);
    
    free(global_bin_counts_int);
}