Commit c629603d authored by Francesco Tomba's avatar Francesco Tomba
Browse files

solved output data problems

parent 9d256282
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
CC=mpicc
#CC=mpiicx
CFLAGS=-O3 -march=native -flto -funroll-loops -fopenmp
CFLAGS=-O3 -march=native -funroll-loops -fopenmp
#CFLAGS=-O3 -fopenmp 
LDFLAGS=-lm 

all: main

obj=src/main/main.c src/tree/tree.c src/common/common.c src/tree/kdtreeV2.c src/tree/heap.c src/adp/adp.c
obj=src/main/main.c src/tree/tree.c src/common/common.c src/adp/adp.c

main: ${obj} 
	${CC} ${CFLAGS} ${LDFLAGS} ${obj} -o $@
	${CC} ${CFLAGS} ${obj} -o $@ ${LDFLAGS}

clean:
	rm main
+2 −2
Original line number Diff line number Diff line
@@ -6,7 +6,7 @@
#SBATCH --time=04:00:00
#SBATCH --job-name=dadp_test
#SBATCH --partition=dcgp_usr_prod 
#SBATCH --account=EUHPC_D18_045
#SBATCH --account=EUHPC_D31_027
#SBATCH --output=out_leo
#SBATCH --error=err_leo
#SBATCH --mem=480G
@@ -37,7 +37,7 @@ OUT_DATA=/leonardo_scratch/large/userexternal/ftomba00/data
IN_DATA=/leonardo_work/EUHPC_D18_045

#10^6 points 
time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK}  ./main -t f32 -i ${IN_DATA}/norm_data/std_LR_091_0001 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA}
time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK}  ./main -t f32 -k 100 -i ${IN_DATA}/norm_data/std_LR_091_0001 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA}
#time mpirun -n ${SLURM_NTASKS} --map-by core  ./main -t f32 -i ${IN_DATA}/norm_data/std_LR_091_0001 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA}

#34 * 10^6 points
+27 −23
Original line number Diff line number Diff line
@@ -92,10 +92,10 @@ float_t get_j_ksel_dist(global_context_t* ctx, idx_t j, idx_t ksel, MPI_Win expo
    }
    else
    {
        heap_node el;
        heap_node_t el;
        idx_t pos  = j - ctx -> rank_idx_start[owner];
        MPI_Request request;
        int err = MPI_Rget(&el, sizeof(heap_node), MPI_BYTE, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_BYTE, exposed_ngbh, &request);
        int err = MPI_Rget(&el, sizeof(heap_node_t), MPI_BYTE, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node_t)), sizeof(heap_node_t), MPI_BYTE, exposed_ngbh, &request);
        MPI_Wait(&request,MPI_STATUS_IGNORE);
        return el.value;
    }                 
@@ -113,10 +113,10 @@ idx_t get_j_ksel_idx(global_context_t* ctx, idx_t j, idx_t ksel, MPI_Win exposed
    }
    else
    {
        heap_node el;
        heap_node_t el;
        idx_t pos  = j - ctx -> rank_idx_start[owner];
        MPI_Request request;
        int err = MPI_Rget(&el, sizeof(heap_node), MPI_BYTE, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_BYTE, exposed_ngbh, &request);
        int err = MPI_Rget(&el, sizeof(heap_node_t), MPI_BYTE, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node_t)), sizeof(heap_node_t), MPI_BYTE, exposed_ngbh, &request);
        MPI_Wait(&request,MPI_STATUS_IGNORE);
        return el.array_idx;
    }                 
@@ -176,7 +176,7 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int

    MPI_Win exposed_ngbh;
    MPI_Win_create( ctx -> __local_heap_buffers, 
                    ctx -> local_n_points * ctx -> k * sizeof(heap_node), 
                    ctx -> local_n_points * ctx -> k * sizeof(heap_node_t), 
                    1, MPI_INFO_NULL, 
                    ctx -> mpi_communicator, 
                    &exposed_ngbh);
@@ -186,7 +186,7 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int
    int i_have_finished = 0;
    int all_have_finished = 0;
    int finished_points = 0;
    heap_node* scratch_heap_nodes = (heap_node*)MY_MALLOC(ctx -> local_n_points * sizeof(heap_node));  
    heap_node_t* scratch_heap_node_ts = (heap_node_t*)MY_MALLOC(ctx -> local_n_points * sizeof(heap_node_t));  

        for(idx_t j = 4; j < kMAX - 1; ++j)
        {
@@ -217,17 +217,17 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int

                    if(owner == ctx -> mpi_rank)
                    {
                        scratch_heap_nodes[i] = ctx -> local_datapoints[pos].ngbh[ksel];
                        scratch_heap_node_ts[i] = ctx -> local_datapoints[pos].ngbh[ksel];
                    }
                    else
                    {
                        // if(pos > ctx -> rank_n_points[owner]) printf("NOPE HERE from rank %d %lu %d\n", ctx -> mpi_rank, pos, ctx -> rank_n_points[owner]);
                        MPI_Get(scratch_heap_nodes + i, 
                                sizeof(heap_node), 
                        MPI_Get(scratch_heap_node_ts + i, 
                                sizeof(heap_node_t), 
                                MPI_BYTE, 
                                owner, 
                                (MPI_Aint)((pos * (idx_t)(ctx -> k) + ksel) * sizeof(heap_node)), 
                                sizeof(heap_node), 
                                (MPI_Aint)((pos * (idx_t)(ctx -> k) + ksel) * sizeof(heap_node_t)), 
                                sizeof(heap_node_t), 
                                MPI_BYTE, 
                                exposed_ngbh);
                    }
@@ -246,7 +246,7 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int
                {
                    float_t vvi, vvj, vp, dL;
                    vvi = local_datapoints[i].ngbh[ksel].value;
                    vvj = scratch_heap_nodes[i].value;
                    vvj = scratch_heap_node_ts[i].value;
                    vp = (vvi + vvj)*(vvi + vvj);
                    dL = -2.0 * ksel * log(4.*vvi*vvj/vp);

@@ -293,7 +293,7 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int
    MPI_Win_fence(0, exposed_ngbh);
    MPI_Win_free(&exposed_ngbh);

    free(scratch_heap_nodes);
    free(scratch_heap_node_ts);

    #if defined(WRITE_DENSITY)
        /* densities */
@@ -301,24 +301,28 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int
        float_t* gs = (float_t*)MY_MALLOC(ctx -> local_n_points * sizeof(float_t));
        idx_t* ks = (idx_t*)MY_MALLOC(ctx -> local_n_points * sizeof(idx_t));

        for(int i = 0; i < ctx -> local_n_points; ++i) den[i] = ctx -> local_datapoints[i].log_rho;
        for(int i = 0; i < ctx -> local_n_points; ++i) ks[i]  = ctx -> local_datapoints[i].kstar;
        for(int i = 0; i < ctx -> local_n_points; ++i) gs[i]  = ctx -> local_datapoints[i].g;
        for(int i = 0; i < ctx -> local_n_points; ++i) 
        {
            den[i] = ctx -> local_datapoints[i].log_rho;
            ks[i]  = ctx -> local_datapoints[i].kstar;
            gs[i]  = ctx -> local_datapoints[i].g;
        }

        ordered_buffer_to_file(ctx, den, sizeof(float_t), ctx -> local_n_points, "bb/ordered_density.npy");
        ordered_buffer_to_file(ctx, ks, sizeof(idx_t), ctx -> local_n_points, "bb/ks.npy");
        ordered_buffer_to_file(ctx, gs, sizeof(float_t), ctx -> local_n_points, "bb/g.npy");

        ordered_data_to_file(ctx, "bb/ordered_data.npy");
        free(gs);
        free(den);
        free(ks);

    #endif
    return;


}

float_t get_j_ksel_dist_v2(global_context_t* ctx, idx_t i, idx_t j, idx_t ksel, int* flags, heap_node* tmp_heap_nodes, MPI_Win* exposed_ngbh)
float_t get_j_ksel_dist_v2(global_context_t* ctx, idx_t i, idx_t j, idx_t ksel, int* flags, heap_node_t* tmp_heap_node_ts, MPI_Win* exposed_ngbh)
{
    if(flags[i])
    {
@@ -335,14 +339,14 @@ float_t get_j_ksel_dist_v2(global_context_t* ctx, idx_t i, idx_t j, idx_t ksel,
            //RMA
            flags[i] = 0;
            idx_t pos  = j - ctx -> rank_idx_start[owner];
            MPI_Get(tmp_heap_nodes + i, sizeof(heap_node), MPI_BYTE, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_BYTE, *exposed_ngbh);
            MPI_Get(tmp_heap_node_ts + i, sizeof(heap_node_t), MPI_BYTE, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node_t)), sizeof(heap_node_t), MPI_BYTE, *exposed_ngbh);
            return 0;
        }                 
    }
    else
    {
        flags[i] = 1;
        return tmp_heap_nodes[i].value;
        return tmp_heap_node_ts[i].value;
    }
}

@@ -745,7 +749,7 @@ clusters_t Heuristic1(global_context_t *ctx)
     *
     * optimized v2 use a queue of center removal and then exchange them
	 **/
	heap_node* to_remove_mask = (heap_node*)MY_MALLOC(n*sizeof(heap_node));
	heap_node_t* to_remove_mask = (heap_node_t*)MY_MALLOC(n*sizeof(heap_node_t));

    for(idx_t p = 0; p < n; ++p) 
    {
@@ -1401,7 +1405,7 @@ void Heuristic2(global_context_t* ctx, clusters_t* cluster)

    MPI_Win dp_info_win, ngbh_win;
    MPI_Win_create(ctx -> local_datapoints, ctx -> local_n_points * sizeof(datapoint_info_t), 1, MPI_INFO_NULL, ctx -> mpi_communicator, &dp_info_win);
    MPI_Win_create(ctx -> __local_heap_buffers, ctx -> local_n_points * ctx -> k * sizeof(heap_node), 1, MPI_INFO_NULL, ctx -> mpi_communicator, &ngbh_win);
    MPI_Win_create(ctx -> __local_heap_buffers, ctx -> local_n_points * ctx -> k * sizeof(heap_node_t), 1, MPI_INFO_NULL, ctx -> mpi_communicator, &ngbh_win);

    MPI_Win_fence(0, dp_info_win);
    MPI_Win_fence(0, ngbh_win);
@@ -2478,7 +2482,7 @@ void Heuristic3(global_context_t* ctx, clusters_t* cluster, float_t Z, int halo)
        for(int i = 0; i < ctx -> local_n_points; ++i) cl[i] = ctx -> local_datapoints[i].cluster_idx;

        ordered_buffer_to_file(ctx, cl, sizeof(int), ctx -> local_n_points, "bb/final_assignment.npy");
        ordered_data_to_file(ctx, "bb/ordered_data.npy");
        //ordered_data_to_file(ctx, "bb/ordered_data.npy");

        free(cl);
        
+10 −7
Original line number Diff line number Diff line
#include "common.h"
#include "mpi.h"
#include <stdlib.h>
#include <time.h>

#define ARRAY_INCREMENT 100
@@ -15,6 +16,7 @@ void get_context(global_context_t* ctx)
	ctx -> local_data = NULL;
	ctx -> lb_box     = NULL;
	ctx -> ub_box     = NULL;
    ctx -> og_idxs    = NULL;
    ctx -> rank_n_points  = (idx_t*)malloc(ctx -> world_size * sizeof(idx_t));
    ctx -> rank_idx_start = (idx_t*)malloc(ctx -> world_size * sizeof(idx_t));
    ctx -> local_datapoints = NULL;
@@ -160,10 +162,10 @@ void print_error_code(int err)
void free_context(global_context_t* ctx)
{

    FREE_NOT_NULL(ctx -> local_data);
    // FREE_NOT_NULL(ctx -> local_data);
    FREE_NOT_NULL(ctx -> ub_box);
    FREE_NOT_NULL(ctx -> lb_box);
    //FREE_NOT_NULL(ctx -> __local_heap_buffers);
    FREE_NOT_NULL(ctx -> og_idxs);
    if(ctx -> __local_heap_buffers) MPI_Free_mem(ctx -> __local_heap_buffers);


@@ -179,10 +181,10 @@ void free_context(global_context_t* ctx)

void free_pointset(pointset_t* ps)
{
	if(ps -> data) 
	if(ps -> datapoints) 
	{
		free(ps -> data);
		ps -> data = NULL;
		free(ps -> datapoints);
		ps -> datapoints = NULL;
	}

	if(ps -> ub_box)
@@ -210,6 +212,7 @@ void mpi_printf(global_context_t* ctx, const char *fmt, ...)
		//        myflush(stdout);
		va_end(l);
	}
    fflush(stdout);
}

void generate_random_matrix(
+33 −13
Original line number Diff line number Diff line
@@ -10,21 +10,25 @@
#define DEFAULT_MSG_LEN 10000000
//#include <stdarg.h>

#define PARALLEL_FIX_BORDERS
// #define WRITE_NGBH
// #define PARALLEL_FIX_BORDERS
#define WRITE_SHUFFLED_DATA
#define WRITE_NGBH
// #define WRITE_TOP_NODES
// #define WRITE_DENSITY
#define WRITE_DENSITY
// #define WRITE_CLUSTER_ASSIGN_H1
// #define WRITE_BORDERS
// #define WRITE_CENTERS_PRE_MERGING
// #define WRITE_MERGES_INFO
// #define WRITE_MERGING_TABLE
// #define WRITE_FINAL_ASSIGNMENT
#define WRITE_FINAL_ASSIGNMENT

// #define PRINT_NGBH_EXCHANGE_SCHEME
// #define PRINT_H2_COMM_SCHEME
// #define PRINT_H1_CLUSTER_ASSIGN_COMPLETION
// #define PRINT_ORDERED_BUFFER
// #define PRINT_BALANCE_FACTOR
// #define CHECK_CORRECT_EXCHANGE
// #define DUMP_NGBH_KNN

#define DEFAULT_STR_LEN 200

@@ -50,14 +54,17 @@
#define MY_TRUE  1
#define MY_FALSE 0

#define HERE printf("%d in file %s reached line %d\n", ctx -> mpi_rank, __FILE__, __LINE__); MPI_Barrier(ctx -> mpi_communicator);
#define HERE printf("%d in file %s reached line %d\n", ctx -> mpi_rank, __FILE__, __LINE__); fflush(stdout); MPI_Barrier(ctx -> mpi_communicator);

#define CHECK_ALLOCATION(x) if(!x){printf("[!!!] %d rank encountered failed allocation: %s at line %s \n", ctx -> mpi_rank, __FILE__, __LINE__ ); exit(1);};


#define CHECK_ALLOCATION_NO_CTX(x) if(!x){printf("[!!!] Failed allocation: %s at line %d \n", __FILE__, __LINE__ ); exit(1);}
#ifndef MY_MALLOC
#define MY_MALLOC(n) ({void* p = calloc(n,1); CHECK_ALLOCATION_NO_CTX(p); p; })
#endif

#define DB_PRINT(...) printf(__VA_ARGS__)
#define DB_PRINT(...) printf(__VA_ARGS__); fflush(stdout)
#ifdef NDEBUG
	#undef DB_PRINT(...)
	#define DB_PRINT(...)
@@ -93,11 +100,14 @@
            MPI_Reduce(&time, &avg, 1, MPI_DOUBLE, MPI_SUM, 0, ctx -> mpi_communicator); \
            MPI_Reduce(&time, &min, 1, MPI_DOUBLE, MPI_MIN, 0, ctx -> mpi_communicator); \
            MPI_Reduce(&time, &max, 1, MPI_DOUBLE, MPI_MAX, 0, ctx -> mpi_communicator); \
            MPI_Barrier(ctx->mpi_communicator); \
            MPI_DB_PRINT("%50.50s -> [avg: %.2lfs, min: %.2lfs, max: %.2lfs]\n", sec_name, avg/((double)ctx -> world_size), min, max); \
            fflush(stdout); \
        } \
        else \
        { \
            MPI_DB_PRINT("%s\n", sec_name);\
            fflush(stdout); \
        }\
    }
    
@@ -128,11 +138,20 @@
    #define LOG_END
#endif

#ifndef POINT
    #define POINT
    typedef struct point_t
    {
        idx_t    array_idx;
        float_t* data;
    } point_t;
#endif

typedef struct datapoint_info_t {
    /*
     * datapoint object 
     */
    heap_node* ngbh;        //heap object stores nearest neighbors of each point
    heap_node_t* ngbh;      //heap object stores nearest neighbors of each point
    int is_center;          //flag signaling if a point is a cluster center
    int cluster_idx;        //cluster index
    idx_t array_idx;        //global index of the point in the dataset
@@ -154,8 +173,8 @@ typedef struct global_context_t
    int __processor_name_len;                       //processor name len
    int world_size;  
    int mpi_rank;                                   //rank of the processor
    uint32_t dims;                                  //number of dimensions of the dataset
    idx_t k;                                        //number of neighbors
    uint32_t dims;                                  //number of dimensions of the dataset
    float_t z;                                      //z parameter
	float_t* local_data;                            //pointer to the dataset stored into the node
	float_t* lb_box;                                //bounding box of the dataset
@@ -166,10 +185,11 @@ typedef struct global_context_t
    datapoint_info_t*  local_datapoints;            //pointer to the datapoint properties
    idx_t* rank_idx_start;                          //starting index of datapoints in each processor
    idx_t* rank_n_points;                           //processor name
    heap_node* __local_heap_buffers;                //buffer that stores nearest neighbors
    idx_t* og_idxs;                                 //original indexes
    heap_node_t* __local_heap_buffers;              //buffer that stores nearest neighbors
	MPI_Comm mpi_communicator;                      //mpi communicator
    char input_data_file[DEFAULT_STR_LEN];
    int input_data_in_float32;
    char input_data_file[DEFAULT_STR_LEN];
    char output_assignment_file[DEFAULT_STR_LEN];
    char output_data_file[DEFAULT_STR_LEN];
} global_context_t;
@@ -184,7 +204,7 @@ typedef struct pointset_t
	size_t n_points;
	size_t __capacity;
	uint32_t dims;
	float_t* data;
	point_t* datapoints;
	float_t* lb_box;
	float_t* ub_box;
} pointset_t;
Loading