Commit 43b6d769 authored by lykos98's avatar lykos98
Browse files

added experimental h1 optimization

parent 9b4a2351
Loading
Loading
Loading
Loading
+13 −4
Original line number Diff line number Diff line
#!/bin/bash

#SBATCH --nodes=6
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=2
#SBATCH --cpus-per-task=56
#SBATCH --time=04:00:00
#SBATCH --job-name=dadp_test
#SBATCH --partition=dcgp_usr_prod 
#SBATCH --account=IscrC_dadp
#SBATCH --account=EUHPC_D18_045
#SBATCH --output=out_leo
#SBATCH --error=err_leo
#SBATCH --mem=480G
@@ -14,8 +14,10 @@


cd $SLURM_SUBMIT_DIR
module restore my_gcc
#module restore my_intel

module load gcc
module load openmpi

make clean
make
ulimit -s unlimited
@@ -36,5 +38,12 @@ IN_DATA=/leonardo_work/IscrC_dadp
#10^6 points 
time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK}  ./main -t f32 -i ${IN_DATA}/norm_data/std_LR_091_0001 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA}

#34 * 10^6 points
#time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK}  ./main -t f32 -i ${IN_DATA}/norm_data/std_g1212639_091_0001 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA}

#88 * 10^6 points 
#time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK}  ./main -t f32 -i ${IN_DATA}/norm_data/std_g5503149_091_0000 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA}


#200 * 10^6 points
#time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK}  ./main -t f32 -i ${IN_DATA}/norm_data/std_g2980844_091_0000 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA}
+185 −90
Original line number Diff line number Diff line
@@ -118,8 +118,8 @@ idx_t get_j_ksel_idx(global_context_t* ctx, idx_t j, idx_t ksel, MPI_Win exposed
}


void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int verbose){

void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int verbose)
{
    /*
     * Point density computation:                       
     * args:                                            
@@ -648,12 +648,32 @@ lock_t h1_lock_free(global_context_t* ctx, MPI_Win lock_window, int owner, idx_t
    
}

void enqueue_center_removal(center_removal_queue_t* queue, 
                            center_removal_t element)
{
    int default_incr_size = 100;
    if(queue -> count >= queue -> size)
    {
        queue -> size += default_incr_size;
        queue -> data = realloc(queue -> data, queue -> size * sizeof(center_removal_t));
    }
    queue -> data[queue -> count] = element;
    queue -> count++;
}

int compare_removal_by_target(const void* a, const void* b)
{
    idx_t arg1 = (*(const center_removal_t *)a).target_id;
    idx_t arg2 = (*(const center_removal_t *)b).target_id;
    return (arg1 > arg2) - (arg1 < arg2);
}

clusters_t Heuristic1(global_context_t *ctx)
{
    /*
    /**
     * Heurisitc 1, from paper of Errico, Facco, Laio & Rodriguez 
     * ( https://doi.org/10.1016/j.ins.2021.01.010 )              
     */
     **/

    datapoint_info_t* dp_info = ctx -> local_datapoints;
    idx_t n = ctx -> local_n_points; 
@@ -721,36 +741,38 @@ clusters_t Heuristic1(global_context_t *ctx)
	 * Generate a mask that keeps track of the point has been eliminating the 
	 * point considered. Each thread updates this mask, then after the procedure
	 * ends, center, removed centers, and max_rho arrays are populated
     *
     * optimized v2 use a queue of center removal and then exchange them
	 */
		
    lock_t*    lock_array     = (lock_t*)MY_MALLOC(n * sizeof(lock_t));
	heap_node* to_remove_mask = (heap_node*)MY_MALLOC(n*sizeof(heap_node));

    for(idx_t p = 0; p < n; ++p) 
    {
        to_remove_mask[p].array_idx = MY_SIZE_MAX;
        to_remove_mask[p].value = 9999999;
        lock_array[p] = LOCK_FREE;
    }
    qsort(dp_info_ptrs, n, sizeof(datapoint_info_t*), cmpPP);

    /**
     * workflow
     * find the elements we need to eliminate
     * compact them to a single array
     * send/recieve 
     **/

    MPI_Win win_to_remove_mask;
    MPI_Win_create(to_remove_mask, n * sizeof(heap_node), 1, MPI_INFO_NULL, ctx -> mpi_communicator, &win_to_remove_mask);
    MPI_Win_fence(0, win_to_remove_mask);

    MPI_Win win_locks;
    MPI_Win_create(lock_array, n * sizeof(lock_t), sizeof(lock_t), MPI_INFO_NULL, ctx -> mpi_communicator, &win_locks);
    MPI_Win_fence(0, win_locks);
    center_removal_queue_t* removal_queues = (center_removal_queue_t*)MY_MALLOC(n * sizeof(center_removal_queue_t));
    omp_lock_t* lock_array = (omp_lock_t*)MY_MALLOC(n * sizeof(omp_lock_t));

#ifdef EXP_H1
    MPI_Win_lock_all(0, win_to_remove_mask);
    MPI_Win_lock_all(0, win_locks);
#endif
    //zero out all queues
    
#ifdef EXP_H1
    printf("Using experimental h1\n");
#endif
    for(idx_t i = 0; i < n; ++i)
    {
        removal_queues[i].count  = 0;
        removal_queues[i].size   = 0;
        removal_queues[i].data   = NULL;
        omp_init_lock(lock_array + i);
    }

#if !defined(THREAD_FUNNELED)
    #pragma omp parallel for schedule(dynamic)
@@ -767,82 +789,140 @@ clusters_t Heuristic1(global_context_t *ctx)

            if(j_point.is_center && i_point.g > j_point.g)
            {
                /*
                 *
                 * TODO: Implement it without this but using private locks
                 * use an array of locks, and compare and swap to actually gain control of the thing
                 *
                 * */

#ifdef EXP_H1
                #pragma omp critical (h1_exp)
                {
                // set the lock at position i 
                // actually is the p-th point
                int owner = foreign_owner(ctx, jidx);
                    idx_t jpos = jidx - ctx -> rank_idx_start[owner];
                //if local process it
                if(owner == ctx -> mpi_rank)
                {
                    idx_t jpos = jidx - ctx -> idx_start;
                    if(i_point.g > to_remove_mask[jpos].value)
                    {
                        omp_set_lock(lock_array + jpos);
                        to_remove_mask[jpos].array_idx = i_point.array_idx;
                        to_remove_mask[jpos].value     = i_point.g;
                        omp_unset_lock(lock_array + jpos);
                    }
                    
                    lock_t state = LOCK_FREE;
                }
                //otherwise enqueue for sending
                else
                {
                    center_removal_t element = {.rank = owner, .source_id = i_point.array_idx, 
                        .target_id = j_point.array_idx, .source_density = i_point.g};
                    enqueue_center_removal(removal_queues + p, element);
                }
            }
        }
    }

                    state = h1_lock_acquire(ctx, win_locks, owner, jpos, state);
    //assemble arrays into a single buffer
    
                    heap_node mask_element;
                    MPI_Request request;
    idx_t tot_removal = 0;
    for(idx_t p = 0; p < n; ++p)
    {
        tot_removal += removal_queues[p].count;
    }
    
                    MPI_Rget(&mask_element, sizeof(heap_node), MPI_BYTE, 
                            owner, jpos * sizeof(heap_node), sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request);
                    MPI_Wait(&request, MPI_STATUS_IGNORE);
    idx_t buffer_idx = 0;
    center_removal_t* removal_buffer = (center_removal_t*)MY_MALLOC(tot_removal * sizeof(center_removal_t));

                    int flag = mask_element.array_idx == MY_SIZE_MAX;							
                    if(flag || i_point.g > mask_element.value )
    for(idx_t p = 0; p < n; ++p)
    {
                        heap_node tmp_mask_element = {.array_idx = i_point.array_idx, .value = i_point.g};
                        MPI_Request request;
                        MPI_Rput(&tmp_mask_element, sizeof(heap_node), MPI_BYTE, owner, 
                                jpos*sizeof(heap_node), sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request);
                        MPI_Wait(&request, MPI_STATUS_IGNORE);

        if(removal_queues[p].count > 0)        
        {
            memcpy(removal_buffer + buffer_idx, removal_queues[p].data, removal_queues[p].count * sizeof(center_removal_t));
            buffer_idx += removal_queues[p].count;
        }

                    state = h1_lock_free(ctx, win_locks, owner, jpos, state);
    }
#else
                #pragma omp critical (centers_elimination)                 
                {
                    int owner = foreign_owner(ctx, jidx);
                    idx_t jpos = jidx - ctx -> rank_idx_start[owner];

                    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, owner, 0, win_to_remove_mask);
                    heap_node mask_element;
                    MPI_Request request;
                    MPI_Rget(&mask_element, sizeof(heap_node), MPI_BYTE, 
                             owner, jpos * sizeof(heap_node), sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request);
                    MPI_Wait(&request, MPI_STATUS_IGNORE);
    //sort by array idx (it sorts also by rank)
    
    qsort(removal_buffer, tot_removal, sizeof(center_removal_t), compare_removal_by_target);

    //prepare for the sendrcv
    
    int* recv_counts = (int*)MY_MALLOC(ctx -> world_size * sizeof(int));
    int* send_counts = (int*)MY_MALLOC(ctx -> world_size * sizeof(int));

                    int flag = mask_element.array_idx == MY_SIZE_MAX;							
                    if(flag || i_point.g > mask_element.value )
    int* recv_displs = (int*)MY_MALLOC(ctx -> world_size * sizeof(int));
    int* send_displs = (int*)MY_MALLOC(ctx -> world_size * sizeof(int));

    //zero_out_all

    for(int i = 0; i < ctx -> mpi_rank; ++i)
    {
                        heap_node tmp_mask_element = {.array_idx = i_point.array_idx, .value = i_point.g};
                        MPI_Request request;
                        MPI_Rput(&tmp_mask_element, sizeof(heap_node), MPI_BYTE, owner, 
                                jpos*sizeof(heap_node), sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request);
                        MPI_Wait(&request, MPI_STATUS_IGNORE);
        recv_counts[i] = 0;
        send_counts[i] = 0;
        recv_displs[i] = 0;
        send_displs[i] = 0;
    }

    for(idx_t i = 0; i < tot_removal; ++i)
    {
        int rank_idx = removal_buffer[i].rank;
        send_counts[rank_idx]++;
    }

                    MPI_Win_unlock(owner, win_to_remove_mask);
    // all to all to recieve counts

    MPI_Alltoall(send_counts, 1, MPI_INT, 
                 recv_counts, 1, MPI_INT, ctx -> mpi_communicator);

    // compute displacements

    for(int i = 1; i < ctx -> world_size; ++i)
    {
        recv_displs[i] = recv_displs[i - 1] + recv_counts[i - 1];
        send_displs[i] = send_displs[i - 1] + send_counts[i - 1];
    }
#endif

    idx_t tot_recv_counts = 0;

    // count how many elements to recieve

    for(int i = 0; i < ctx -> world_size; ++i) tot_recv_counts += recv_counts[i];
    if(ctx -> mpi_rank == 0){
        for(int i = 0; i < ctx -> world_size; ++i){
            DB_PRINT("%d mpi rank recv_count %d to %d\n", ctx -> mpi_rank, recv_counts[i], i);
            DB_PRINT("%d mpi rank send_count %d to %d\n", ctx -> mpi_rank, send_counts[i], i);
        } 
    }
    DB_PRINT("rank %d: %lu recv counts\n", ctx -> mpi_rank, tot_recv_counts);

    // change dimensions to bytes

    for(int i = 0; i < ctx -> world_size; ++i)
    {
        recv_counts[i] = recv_counts[i] * sizeof(center_removal_t);
        send_counts[i] = send_counts[i] * sizeof(center_removal_t);
        recv_displs[i] = recv_displs[i] * sizeof(center_removal_t);
        send_displs[i] = send_displs[i] * sizeof(center_removal_t);
    }

#ifdef EXP_H1
    MPI_Win_unlock_all(win_to_remove_mask);
    MPI_Win_unlock_all(win_locks);
#endif
    //allocate buffer to recieve center elminiations

    center_removal_t* recv_removals = (center_removal_t*)MY_MALLOC(tot_recv_counts * sizeof(center_removal_t));

    // all to all
    MPI_Alltoallv(removal_buffer, send_counts, send_displs, MPI_BYTE, 
                   recv_removals, recv_counts, recv_displs, MPI_BYTE, ctx -> mpi_communicator);

    // merge into the mask
    
    #pragma omp parallel for
    for(idx_t i = 0; i < tot_recv_counts; ++i)
    {
        idx_t el_pos = recv_removals[i].target_id - ctx -> idx_start;
        if(recv_removals[i].source_density > to_remove_mask[el_pos].value)
        {
            omp_set_lock(lock_array + el_pos);
            to_remove_mask[el_pos].array_idx = recv_removals[i].source_id;
            to_remove_mask[el_pos].value     = recv_removals[i].source_density;
            omp_unset_lock(lock_array + el_pos);
        }
    }

    MPI_Win_fence(0, win_to_remove_mask);
    MPI_Win_fence(0, win_locks);
    MPI_Barrier(ctx -> mpi_communicator);

	/* populate the usual arrays */
    for(idx_t p = 0; p < all_centers.count; ++p)
@@ -888,11 +968,24 @@ clusters_t Heuristic1(global_context_t *ctx)
        }
    }

    MPI_Win_free(&win_to_remove_mask);
	free(to_remove_mask);

    MPI_Win_free(&win_locks);
    for(idx_t i = 0; i < n; ++i)
    {
        removal_queues[i].count  = 0;
        removal_queues[i].size   = 0;
        removal_queues[i].data   = NULL;
        omp_destroy_lock(lock_array+ i);
    }

    free(removal_queues);
    free(removal_buffer);
    free(send_counts);
    free(send_displs);
    free(recv_counts);
    free(recv_displs);
    free(lock_array);
    free(recv_removals);

    int n_centers = (int)actual_centers.count;
    int tot_centers;
@@ -900,9 +993,11 @@ clusters_t Heuristic1(global_context_t *ctx)

    MPI_DB_PRINT("Found %d temporary centers\n", tot_centers);

    /* bring on master all centers 
    /** 
     * bring on master all centers 
     * order them in ascending order of density, 
     * then re-scatter them around to get unique cluster labels */ 
     * then re-scatter them around to get unique cluster labels 
     **/ 

    center_t* private_centers_buffer = (center_t*)MY_MALLOC(actual_centers.count * sizeof(center_t));
    center_t* global_centers_buffer  = (center_t*)MY_MALLOC(tot_centers * sizeof(center_t));
+15 −0
Original line number Diff line number Diff line
@@ -45,6 +45,21 @@ typedef struct merge_t
    float_t density;
} merge_t;

typedef struct center_removal_t
{
    int     rank;
    idx_t   source_id;
    idx_t   target_id;
    float_t source_density;
} center_removal_t;

typedef struct center_removal_queue_t
{
    center_removal_t* data;
    idx_t count;
    idx_t size;
} center_removal_queue_t;



void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int verbose);