diff --git a/src/common/common.c b/src/common/common.c index ee9e7669467a8be4a46b25174762d586313df8c5..1106d6492377c3e2d1c2dc3c2d9d921b60b0792c 100644 --- a/src/common/common.c +++ b/src/common/common.c @@ -359,6 +359,140 @@ float_t* read_data_file(global_context_t *ctx, const char *fname, const idx_t nd return data; } +float_t* mpiio_read_data_file(global_context_t *ctx, const char *fname, const idx_t ndims, + const int file_in_float32, MPI_Comm comm, int rank, int size) +{ + int elem_size = file_in_float32 ? 4 : 8; + MPI_Datatype filetype; + MPI_Datatype mpicounttype; + + if (file_in_float32) { + mpicounttype = MPI_FLOAT; + } else { + mpicounttype = MPI_DOUBLE; + } + + MPI_File fh; + MPI_Status status; + MPI_Offset filesize; + size_t local_n_points; + size_t local_count; + MPI_Offset local_offset; + + if (rank == 0) { + printf("MPI-IO: Reading %s\n", fname); + } + + MPI_Barrier(comm); + + int ret = MPI_File_open(comm, fname, MPI_MODE_RDONLY, MPI_INFO_NULL, &fh); + if (ret != MPI_SUCCESS) { + char error_string[MPI_MAX_ERROR_STRING]; + int error_len; + MPI_Error_string(ret, error_string, &error_len); + if (rank == 0) { + fprintf(stderr, "Cannot open file %s: %s\n", fname, error_string); + } + MPI_Abort(comm, 1); + } + + MPI_File_get_size(fh, &filesize); + + if (rank == 0) { + size_t total_elements = filesize / elem_size; + ctx->n_points = total_elements / ndims; + ctx->dims = ndims; + printf("MPI-IO: File size = %lld, total_elements = %zu, n_points = %zu, dims = %u\n", + (long long)filesize, total_elements, (size_t)ctx->n_points, ctx->dims); + } + + MPI_Bcast(&(ctx->n_points), 1, MPI_UINT64_T, 0, comm); + MPI_Bcast(&(ctx->dims), 1, MPI_UINT32_T, 0, comm); + + size_t n_points = (size_t)ctx->n_points; + uint32_t dims = ctx->dims; + + size_t remainder = n_points % size; + size_t base_points = n_points / size; + + if ((size_t)rank < remainder) { + local_n_points = base_points + 1; + } else { + local_n_points = base_points; + } + + local_count = local_n_points * dims; + local_offset = 0; + + for (int r = 0; r < rank; r++) { + size_t r_points; + if ((size_t)r < remainder) { + r_points = base_points + 1; + } else { + r_points = base_points; + } + local_offset += r_points * dims * elem_size; + } + + ctx->local_n_points = local_n_points; + + float_t *local_data = (float_t *)MY_MALLOC(local_count * sizeof(float_t)); + if (!local_data) { + fprintf(stderr, "Rank %d: Failed to allocate local_data\n", rank); + MPI_Abort(comm, 1); + } + + if (file_in_float32) { + float *tmp_buf = (float *)MY_MALLOC(local_count * sizeof(float)); + if (!tmp_buf) { + fprintf(stderr, "Rank %d: Failed to allocate tmp_buf\n", rank); + MPI_Abort(comm, 1); + } + + MPI_File_read_at(fh, local_offset, tmp_buf, local_count, MPI_FLOAT, &status); + + int count; + MPI_Get_count(&status, MPI_FLOAT, &count); + if ((size_t)count != local_count) { + fprintf(stderr, "Rank %d: Warning - read %d elements, expected %zu\n", rank, count, local_count); + } + + for (size_t i = 0; i < local_count; i++) { + local_data[i] = (float_t)(tmp_buf[i]); + } + + free(tmp_buf); + } else { + double *tmp_buf = (double *)MY_MALLOC(local_count * sizeof(double)); + if (!tmp_buf) { + fprintf(stderr, "Rank %d: Failed to allocate tmp_buf\n", rank); + MPI_Abort(comm, 1); + } + + MPI_File_read_at(fh, local_offset, tmp_buf, local_count, MPI_DOUBLE, &status); + + int count; + MPI_Get_count(&status, MPI_DOUBLE, &count); + if ((size_t)count != local_count) { + fprintf(stderr, "Rank %d: Warning - read %d elements, expected %zu\n", rank, count, local_count); + } + + for (size_t i = 0; i < local_count; i++) { + local_data[i] = (float_t)(tmp_buf[i]); + } + + free(tmp_buf); + } + + MPI_File_close(&fh); + + if (rank == 0) { + printf("MPI-IO: Read complete. Each rank has %zu points.\n", local_n_points); + } + + return local_data; +} + void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname) { //MPI_Barrier(ctx -> mpi_communicator); diff --git a/src/common/common.h b/src/common/common.h index c92c2b2020eaeff8091ddb21cf02653e8a798b21..c4a882758346dffed188ae5c8067636a00829780 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -238,6 +238,7 @@ void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, void test_file_path(const char* fname); void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname); float_t* read_data_file(global_context_t *ctx, const char *fname, const idx_t ndims, const int file_in_float32); +float_t* mpiio_read_data_file(global_context_t *ctx, const char *fname, const idx_t ndims, const int file_in_float32, MPI_Comm comm, int rank, int size); void get_dataset_diagnostics(global_context_t* ctx, float_t* data); void test_distributed_file_path(global_context_t* ctx, const char* fname); diff --git a/src/main/main.c b/src/main/main.c index 039eaa89a929938706ccebf09855db4aacf5f50b..deb2d209305cd2d50d9bece3b6c4c2e7e08c450e 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -154,6 +154,9 @@ void print_hello(global_context_t* ctx) } + +void mpiio_master_read_and_scatter(int dims, size_t n, global_context_t *ctx); + int main(int argc, char** argv) { #if defined (_OPENMP) int mpi_provided_thread_level; @@ -214,11 +217,11 @@ int main(int argc, char** argv) { if(ctx.mpi_rank == 0) { - simulate_master_read_and_scatter(5, 1000000, &ctx); + mpiio_master_read_and_scatter(5, 1000000, &ctx); } else { - simulate_master_read_and_scatter(0, 0, &ctx); + mpiio_master_read_and_scatter(0, 0, &ctx); } //free(data); @@ -228,7 +231,7 @@ int main(int argc, char** argv) { -void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) +void mpiio_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) { /* TODO * @@ -240,7 +243,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) int halo = MY_TRUE; float_t tol = 0.002; - if(ctx -> world_size <= 6) + if(ctx -> world_size <= 32) { if(I_AM_MASTER) { @@ -271,83 +274,12 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) TIME_START; - if (ctx->mpi_rank == 0) - { - data = read_data_file(ctx, ctx -> input_data_file, ctx -> dims, ctx -> input_data_in_float32); - get_dataset_diagnostics(ctx, data); - } + ctx->local_data = mpiio_read_data_file(ctx, ctx->input_data_file, ctx->dims, + ctx->input_data_in_float32, + ctx->mpi_communicator, ctx->mpi_rank, ctx->world_size); - /* communicate the total number of points*/ - MPI_Bcast(&(ctx->dims), 1, MPI_UINT32_T, 0, ctx->mpi_communicator); - MPI_Bcast(&(ctx->n_points), 1, MPI_UINT64_T, 0, ctx->mpi_communicator); - - /* compute the number of elements to recieve for each processor */ - idx_t *send_counts = (idx_t *)MY_MALLOC(ctx->world_size * sizeof(idx_t)); - idx_t *displacements = (idx_t *)MY_MALLOC(ctx->world_size * sizeof(idx_t)); - - displacements[0] = 0; - send_counts[0] = ctx->n_points / ctx->world_size; - send_counts[0] += (ctx->n_points % ctx->world_size) > 0 ? 1 : 0; - send_counts[0] = send_counts[0] * ctx->dims; - - for (int p = 1; p < ctx->world_size; ++p) - { - send_counts[p] = (ctx->n_points / ctx->world_size); - send_counts[p] += (ctx->n_points % ctx->world_size) > p ? 1 : 0; - send_counts[p] = send_counts[p] * ctx->dims; - displacements[p] = displacements[p - 1] + send_counts[p - 1]; - } - - - ctx->local_n_points = send_counts[ctx->mpi_rank] / ctx->dims; - - float_t *pvt_data = (float_t *)MY_MALLOC(send_counts[ctx->mpi_rank] * sizeof(float_t)); - - - if(I_AM_MASTER) - { - memcpy(pvt_data, data, ctx -> dims * ctx -> local_n_points * sizeof(float_t)); - int already_sent_points = 0; - for(int i = 1; i < ctx -> world_size; ++i) - { - already_sent_points = 0; - while(already_sent_points < send_counts[i]) - { - int count_send = MIN(DEFAULT_MSG_LEN, send_counts[i] - already_sent_points); - MPI_Send(data + displacements[i] + already_sent_points, count_send, MPI_MY_FLOAT, i, ctx -> mpi_rank, ctx -> mpi_communicator); - already_sent_points += count_send; - } - } - } - else - { - int already_recvd_points = 0; - while(already_recvd_points < send_counts[ctx -> mpi_rank]) - { - MPI_Status status; - MPI_Probe(0, MPI_ANY_TAG, ctx -> mpi_communicator, &status); - - MPI_Request request; - int count_recv; - int source = status.MPI_SOURCE; - MPI_Get_count(&status, MPI_MY_FLOAT, &count_recv); - - MPI_Recv(pvt_data + already_recvd_points, count_recv, MPI_MY_FLOAT, source, MPI_ANY_TAG, ctx -> mpi_communicator, MPI_STATUS_IGNORE); - already_recvd_points += count_recv; - } - } - elapsed_time = TIME_STOP; - LOG_WRITE("Importing file ad scattering", elapsed_time); - - if (I_AM_MASTER) free(data); - - ctx->local_data = pvt_data; - - int k_local = 20; - int k_global = 20; - - uint64_t *global_bin_counts_int = (uint64_t *)MY_MALLOC(k_global * sizeof(uint64_t)); + LOG_WRITE("Importing file with MPI-IO", elapsed_time); top_kdtree_t tree; TIME_START; @@ -548,9 +480,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) top_tree_free(ctx, &tree); //clusters_free(&clusters); - free(send_counts); - free(displacements); + //free(send_counts); + //free(displacements); //free(dp_info); - - free(global_bin_counts_int); }