Commit c36add8a authored by lykos98's avatar lykos98
Browse files

added args and hello mesg

parent 04c3417b
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
CC=mpicc
CFLAGS=-O3 -g -fopenmp
CFLAGS=-O3 -march=native -flto -funroll-loops -fopenmp
LDFLAGS=-lm 

all: main
+135 −10
Original line number Diff line number Diff line
@@ -733,7 +733,7 @@ clusters_t Heuristic1(global_context_t *ctx)
        }
    }

	/* 
	/** 
	 * optimized version
	 *
	 * Generate a mask that keeps track of the point has been eliminating the 
@@ -741,7 +741,7 @@ clusters_t Heuristic1(global_context_t *ctx)
	 * ends, center, removed centers, and max_rho arrays are populated
     *
     * optimized v2 use a queue of center removal and then exchange them
	 */
	 **/
	heap_node* to_remove_mask = (heap_node*)MY_MALLOC(n*sizeof(heap_node));

    for(idx_t p = 0; p < n; ++p) 
@@ -779,10 +779,118 @@ clusters_t Heuristic1(global_context_t *ctx)
		
    TIME_START;


//#define EXP_CENTER_PRUNING
#if defined(EXP_CENTER_PRUNING)
    int all_have_finished = 0;
    int i_have_finished   = 0;

    datapoint_info_t* tmp_datapoints = (datapoint_info_t*)MY_MALLOC(ctx -> local_n_points * sizeof(datapoint_info_t));

    // use strategy similar to the one employed in density estimation
    // vertical loop, first on k the on n
    // request points
    // then process the points


    MPI_Win_fence(MPI_MODE_NOPUT, win_datapoints);
    //MPI_Win_lock_all(0, win_datapoints);
    MPI_DB_PRINT("Using vertical loop on centers pruning\n");
    int kmax = ctx -> k;
    for(idx_t j = 1; j < kmax; ++j)        
    {
        i_have_finished = 1;

#if !defined(THREAD_FUNNELED)
        // request points
        #pragma omp parallel for schedule(dynamic)
        for(idx_t p = 0; p < n; ++p)
        {

            datapoint_info_t i_point = *(dp_info_ptrs[p]);
            if(j < i_point.kstar + 1)
            {
                #pragma omp atomic write
                i_have_finished = 0;

                idx_t jidx = i_point.ngbh[j].array_idx; 
                int owner = foreign_owner(ctx, jidx);
                /* find if datapoint is halo or not */
                idx_t jpos = jidx - ctx -> rank_idx_start[owner];
                if(owner == ctx -> mpi_rank)
                {
                    tmp_datapoints[p] = ctx -> local_datapoints[jpos];
                }
                else
                {
                    MPI_Get(tmp_datapoints + p, sizeof(datapoint_info_t), MPI_BYTE, owner,
                            jpos * sizeof(datapoint_info_t), sizeof(datapoint_info_t), MPI_BYTE, win_datapoints);
                }                 
            }

        }

        //MPI_Win_flush_all(win_datapoints);
        MPI_Win_fence(MPI_MODE_NOPUT ^ MPI_MODE_NOSTORE, win_datapoints);
        // wait for all comms to happen

        #pragma omp parallel for schedule(dynamic)
        for(idx_t p = 0; p < n; ++p)
        {
            datapoint_info_t i_point = *(dp_info_ptrs[p]);
            if(j < i_point.kstar + 1)
            {
                datapoint_info_t j_point = tmp_datapoints[p];
                idx_t jidx = j_point.array_idx;

                if(j_point.is_center && i_point.g > j_point.g)
                {
                    // set the lock at position i 
                    // actually is the p-th point
                    int owner = foreign_owner(ctx, jidx);
                    //if local process it
                    if(owner == ctx -> mpi_rank)
                    {
                        //acquire the lock
                        idx_t jpos = jidx - ctx -> idx_start;
                        omp_set_lock(lock_array + jpos);
                        if(i_point.g > to_remove_mask[jpos].value)
                        {
                            to_remove_mask[jpos].array_idx = i_point.array_idx;
                            to_remove_mask[jpos].value     = i_point.g;
                        }
                        omp_unset_lock(lock_array + jpos);
                    }
                    ////otherwise enqueue for sending
                    else
                    {
                        center_removal_t element = {.rank = owner, .source_id = i_point.array_idx, 
                            .target_id = j_point.array_idx, .source_density = i_point.g};
                        enqueue_center_removal(removal_queues + p, element);
                    }
                }

            }

        }

        //MPI_Allreduce(&i_have_finished, &all_have_finished, 1, MPI_INT, MPI_LAND, ctx -> mpi_communicator);
        //if(all_have_finished) break;
        if(i_have_finished) break;
    }

    //MPI_Win_unlock_all(win_datapoints);
    MPI_Win_fence(0, win_datapoints);
    free(tmp_datapoints);
    
#else
    MPI_Win_fence(MPI_MODE_NOPUT, win_datapoints);

    idx_t n_foreign_req = 0;
    idx_t n_local_req   = 0;
    float_t elapsed_proc = 0.;;

#if !defined(THREAD_FUNNELED)
    #pragma omp parallel for schedule(dynamic) reduction(+:elapsed_proc, n_local_req, n_foreign_req)
#endif
    for(idx_t p = 0; p < n; ++p)
    {
@@ -791,19 +899,31 @@ clusters_t Heuristic1(global_context_t *ctx)
        for(idx_t j = 1; j < i_point.kstar + 1; ++j)
        {
            idx_t jidx = i_point.ngbh[j].array_idx; 

            struct timespec __start, __end;
            clock_gettime(CLOCK_MONOTONIC,&__start); 
            datapoint_info_t j_point = find_possibly_halo_datapoint_rma(ctx, jidx,  win_datapoints);
            clock_gettime(CLOCK_MONOTONIC,&__end);
            elapsed_proc += (double)(__end.tv_sec - __start.tv_sec) + (__end.tv_nsec - __start.tv_nsec)/1e9;

            int owner = foreign_owner(ctx, jidx);
            if(owner == ctx -> mpi_rank)
            {
                n_local_req++;
            }
            else
            {
                n_foreign_req++;
            }

            if(j_point.is_center && i_point.g > j_point.g)
            {
                // set the lock at position i 
                // actually is the p-th point
                int owner = foreign_owner(ctx, jidx);
                // if local process it
                idx_t jpos = jidx - ctx -> idx_start;
                if(owner == ctx -> mpi_rank)
                {
                    // acquire the lock
                    idx_t jpos = jidx - ctx -> idx_start;
                    omp_set_lock(lock_array + jpos);
                    if(i_point.g > to_remove_mask[jpos].value)
                    {
@@ -822,9 +942,14 @@ clusters_t Heuristic1(global_context_t *ctx)
            }
        }
    }

    MPI_Win_fence(MPI_MODE_NOPUT, win_datapoints);

    MPI_Barrier(ctx -> mpi_communicator);
    DB_PRINT("Rank %d: foreign requests points %lu out of %lu -> fraction %.2lf time %.2lfs\n",
            ctx -> mpi_rank, n_foreign_req, n_local_req + n_foreign_req, (float)n_foreign_req/(float)n_local_req, elapsed_proc);
    MPI_Barrier(ctx -> mpi_communicator);
#endif

    //assemble arrays into a single buffer

    elapsed_time = TIME_STOP;
+2 −0
Original line number Diff line number Diff line
@@ -21,6 +21,8 @@ void get_context(global_context_t* ctx)
    ctx -> __local_heap_buffers = NULL;
    ctx -> input_data_in_float32 = -1;
    ctx -> dims = 0;
    ctx -> k = 300;
    ctx -> z = 3;
}

void get_dataset_diagnostics(global_context_t* ctx, float_t* data)
+1 −0
Original line number Diff line number Diff line
@@ -149,6 +149,7 @@ typedef struct global_context_t
    int mpi_rank;                                   //rank of the processor
    uint32_t dims;                                  //number of dimensions of the dataset
    idx_t k;                                        //number of neighbors
    float_t z;                                      //z parameter
	float_t* local_data;                            //pointer to the dataset stored into the node
	float_t* lb_box;                                //bounding box of the dataset
	float_t* ub_box;                                //bounding box of the dataset
+44 −7
Original line number Diff line number Diff line
@@ -44,6 +44,8 @@ struct option long_options[] =
    {"in-dims" , required_argument, 0, 'd'},
    {"out-data", optional_argument, 0, 'o'},
    {"out-assignment", optional_argument, 0, 'a'},
    {"kngbh", optional_argument, 0, 'k'},
    {"z", optional_argument, 0, 'z'},
    {"help", optional_argument, 0, 'h'},
    {0, 0, 0, 0}
};
@@ -59,7 +61,9 @@ const char* help = "Distributed Advanced Density Peak\n"\
                   "                                   mpi tasks and datapoints are ordered default is `out_data` \n"\
                   "    -a --out-assignment (optional) output path for the cluster assignment output ranges [0 ... Nc - 1]\n"\
                   "                                   for core points halo points have indices [-Nc ... -1] conversion\n"\
                   "                                   of idx for an halo point is cluster_idx = -halo_idx - 1, default is `out_assignment`\n";
                   "                                   of idx for an halo point is cluster_idx = -halo_idx - 1, default is `out_assignment`\n"\
                   "    -k --kngbh          (optional) number of nearest neighbors to compute\n"\
                   "    -z --z              (optional) number of nearest neighbors to compute\n";

void parse_args(global_context_t* ctx, int argc, char** argv)
{
@@ -70,7 +74,7 @@ void parse_args(global_context_t* ctx, int argc, char** argv)
    snprintf(ctx -> output_assignment_file, DEFAULT_STR_LEN, "%s", OUT_CLUSTER_ASSIGN);
    snprintf(ctx -> output_data_file, DEFAULT_STR_LEN, "%s", OUT_DATA);

    while((opt = getopt_long(argc, argv, "i:t:d:o:a:h", long_options, NULL)) != -1)
    while((opt = getopt_long(argc, argv, "i:t:d:o:a:k:z:h", long_options, NULL)) != -1)
    {
        switch(opt)
        {
@@ -103,10 +107,17 @@ void parse_args(global_context_t* ctx, int argc, char** argv)
            case 'a':
                strncpy(ctx -> output_assignment_file, optarg, DEFAULT_STR_LEN);
                break;
            case 'k':
                ctx -> k = atoi(optarg);
                break;
            case 'z':
                ctx -> z = atof(optarg);
                break;
            case 'h':
                mpi_printf(ctx, "%s\n", help);
                MPI_Finalize();
                exit(0);

            default:
                mpi_printf(ctx, "%s\n", help);
                MPI_Finalize();
@@ -121,6 +132,30 @@ void parse_args(global_context_t* ctx, int argc, char** argv)
    if(err){MPI_Finalize(); exit(1);};
}

void print_hello(global_context_t* ctx)
{
   char * hello = ""
"     _           _       \n"
"  __| | __ _  __| |_ __  \n"
" / _` |/ _` |/ _` | '_ \\ \n"
"| (_| | (_| | (_| | |_) | \n"
" \\__,_|\\__,_|\\__,_| .__/ \n"
"                  |_|    \n"
" Distributed Advanced Density Peak";

    mpi_printf(ctx, "%s\n\n", hello);
    mpi_printf(ctx, "Configs: \n");
    mpi_printf(ctx, "Data file              -> %s\n", ctx -> input_data_file);
    mpi_printf(ctx, "Input Type             -> float%d\n", ctx -> input_data_in_float32 ? 32 : 64);
    mpi_printf(ctx, "Dimensions             -> %d\n", ctx -> dims);
    mpi_printf(ctx, "Output data file       -> %s\n", ctx -> output_data_file);
    mpi_printf(ctx, "Output assignment file -> %s\n", ctx -> output_assignment_file);
    mpi_printf(ctx, "k                      -> %lu\n", ctx -> k);
    mpi_printf(ctx, "Z                      -> %lf\n", ctx -> z);
    mpi_printf(ctx, "\nRUNNING!\n");

}

int main(int argc, char** argv) {
    #if defined (_OPENMP)
        int mpi_provided_thread_level;
@@ -177,15 +212,19 @@ int main(int argc, char** argv) {
	 */

	int d = 5;
    int k = 300;
	
	float_t* data;
    
    //parse command line
    
    parse_args(&ctx, argc, argv);
    print_hello(&ctx);
	/*
	 * Generate a random matrix of lenght of some kind
	 */


	if(ctx.mpi_rank == 0)
	{
		simulate_master_read_and_scatter(5, 1000000, &ctx);	
@@ -211,10 +250,8 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    TIME_DEF
    double elapsed_time;

    float_t z = 3;
    int halo = MY_FALSE;
    float_t tol = 0.002;
    int k = 300;

    if(I_AM_MASTER && ctx -> world_size <= 6)
    {
@@ -356,7 +393,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    MPI_DB_PRINT("----- Performing ngbh search -----\n");
    MPI_Barrier(ctx -> mpi_communicator);

    mpi_ngbh_search(ctx, dp_info, &tree, &local_tree, ctx -> local_data, k);
    mpi_ngbh_search(ctx, dp_info, &tree, &local_tree, ctx -> local_data, ctx -> k);

    MPI_Barrier(ctx -> mpi_communicator);
    elapsed_time = TIME_STOP;
@@ -376,7 +413,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)


    compute_density_kstarnn_rma_v2(ctx, id, MY_FALSE);
    compute_correction(ctx, z);
    compute_correction(ctx, ctx -> z);
    elapsed_time = TIME_STOP;
    LOG_WRITE("Density estimate", elapsed_time)

@@ -393,7 +430,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)


    TIME_START;
    Heuristic3(ctx, &clusters, z, halo);
    Heuristic3(ctx, &clusters, ctx -> z, halo);
    elapsed_time = TIME_STOP;
    LOG_WRITE("H3", elapsed_time)