Commit 792e2512 authored by lykos98's avatar lykos98
Browse files

added implementation of centers elimination queues

parent 43b6d769
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -15,7 +15,8 @@ The suggestion is to run it with one mpi task per socket.

# Todo

 - [ ] H1: implementation of lock free centers elimination
 - [ ] argument parsing: find an elegant way to pass parameters and file (maybe a config file?)
 - [~] H1: implementation of lock free centers elimination (*work in progress*)
 - [ ] context: open all windows in a single shot, close them all togheter
 - [ ] io: curation of IO using mpi IO or other solutions 
 - [ ] kdtree: optimization an profiling
+2 −1
Original line number Diff line number Diff line
#!/bin/bash

#SBATCH --nodes=1
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=2
#SBATCH --cpus-per-task=56
#SBATCH --time=04:00:00
@@ -37,6 +37,7 @@ IN_DATA=/leonardo_work/IscrC_dadp

#10^6 points 
time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK}  ./main -t f32 -i ${IN_DATA}/norm_data/std_LR_091_0001 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA}
#time mpirun -n ${SLURM_NTASKS} --map-by core  ./main -t f32 -i ${IN_DATA}/norm_data/std_LR_091_0001 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA}

#34 * 10^6 points
#time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK}  ./main -t f32 -i ${IN_DATA}/norm_data/std_g1212639_091_0001 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA}
+67 −8
Original line number Diff line number Diff line
@@ -750,7 +750,7 @@ clusters_t Heuristic1(global_context_t *ctx)
    for(idx_t p = 0; p < n; ++p) 
    {
        to_remove_mask[p].array_idx = MY_SIZE_MAX;
        to_remove_mask[p].value = 9999999;
        to_remove_mask[p].value = -9999999;
    }
    qsort(dp_info_ptrs, n, sizeof(datapoint_info_t*), cmpPP);

@@ -796,14 +796,13 @@ clusters_t Heuristic1(global_context_t *ctx)
                if(owner == ctx -> mpi_rank)
                {
                    idx_t jpos = jidx - ctx -> idx_start;
                    omp_set_lock(lock_array + jpos);
                    if(i_point.g > to_remove_mask[jpos].value)
                    {
                        omp_set_lock(lock_array + jpos);
                        to_remove_mask[jpos].array_idx = i_point.array_idx;
                        to_remove_mask[jpos].value     = i_point.g;
                        omp_unset_lock(lock_array + jpos);
                    }
                    
                    omp_unset_lock(lock_array + jpos);
                }
                //otherwise enqueue for sending
                else
@@ -840,6 +839,13 @@ clusters_t Heuristic1(global_context_t *ctx)
    
    qsort(removal_buffer, tot_removal, sizeof(center_removal_t), compare_removal_by_target);

    // remove
    for(int i = 1; i < tot_removal; ++i)
    {
        if(removal_buffer[i - 1].rank > removal_buffer[i].rank)
            printf("Unsorted removal buffer");
    }

    //prepare for the sendrcv
    
    int* recv_counts = (int*)MY_MALLOC(ctx -> world_size * sizeof(int));
@@ -880,17 +886,65 @@ clusters_t Heuristic1(global_context_t *ctx)
    idx_t tot_recv_counts = 0;

    // count how many elements to recieve
    MPI_DB_PRINT("Using centers elimination queue experiment\n");

    for(int i = 0; i < ctx -> world_size; ++i) tot_recv_counts += recv_counts[i];
    /*
    if(ctx -> mpi_rank == 0){
        for(int i = 0; i < ctx -> world_size; ++i){
            DB_PRINT("%d mpi rank recv_count %d to %d\n", ctx -> mpi_rank, recv_counts[i], i);
            DB_PRINT("%d mpi rank send_count %d to %d\n", ctx -> mpi_rank, send_counts[i], i);
        } 
    }
    DB_PRINT("rank %d: %lu recv counts\n", ctx -> mpi_rank, tot_recv_counts);
    */
    /*DB_PRINT("rank %d: %lu recv counts\n", ctx -> mpi_rank, tot_recv_counts);*/

    // change dimensions to bytes
    //

    //comm matrices

    /* DEBUG PRINT
     * COMMUNICATION MATRICES */

    /*
    int* all_send = (int*)MY_MALLOC(ctx -> world_size * ctx -> world_size * sizeof(int));
    int* all_recv = (int*)MY_MALLOC(ctx -> world_size * ctx -> world_size * sizeof(int));

    MPI_Gather(send_counts, ctx -> world_size, MPI_INT, 
                all_send, ctx -> world_size , MPI_INT, 0, ctx -> mpi_communicator);

    MPI_Gather(recv_counts, ctx -> world_size, MPI_INT, 
                all_recv, ctx -> world_size , MPI_INT, 0, ctx -> mpi_communicator);

    if(I_AM_MASTER)
    {
        printf("Send matrix\n");
        for(int i = 0; i < ctx -> world_size; ++i)
        {
            for(int j = 0; j < ctx -> world_size; ++j)
            {
                printf("%4d ", all_send[i*ctx -> world_size + j]);
            }
            printf("\n");
        }

        printf("Recv matrix\n");
        for(int i = 0; i < ctx -> world_size; ++i)
        {
            for(int j = 0; j < ctx -> world_size; ++j)
            {
                printf("%4d ", all_recv[i*ctx -> world_size + j]);
            }
            printf("\n");
        }
    }

    free(all_recv);
    free(all_send);

    */


    for(int i = 0; i < ctx -> world_size; ++i)
    {
@@ -914,13 +968,18 @@ clusters_t Heuristic1(global_context_t *ctx)
    for(idx_t i = 0; i < tot_recv_counts; ++i)
    {
        idx_t el_pos = recv_removals[i].target_id - ctx -> idx_start;
        int owner = foreign_owner(ctx, recv_removals[i].target_id);
        if(owner != ctx -> mpi_rank){
            printf("Error here\n");
            exit(1);
        }
        omp_set_lock(lock_array + el_pos);
        if(recv_removals[i].source_density > to_remove_mask[el_pos].value)
        {
            omp_set_lock(lock_array + el_pos);
            to_remove_mask[el_pos].array_idx = recv_removals[i].source_id;
            to_remove_mask[el_pos].value     = recv_removals[i].source_density;
            omp_unset_lock(lock_array + el_pos);
        }
        omp_unset_lock(lock_array + el_pos);
    }


@@ -974,7 +1033,7 @@ clusters_t Heuristic1(global_context_t *ctx)
    {
        removal_queues[i].count  = 0;
        removal_queues[i].size   = 0;
        removal_queues[i].data   = NULL;
        free(removal_queues[i].data);
        omp_destroy_lock(lock_array+ i);
    }

+3 −3
Original line number Diff line number Diff line
@@ -212,7 +212,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    double elapsed_time;

    float_t z = 3;
    int halo = MY_TRUE;
    int halo = MY_FALSE;
    float_t tol = 0.002;
    int k = 300;

@@ -343,7 +343,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
        dp_info[i].array_idx = -1;
        dp_info[i].kstar = -1;
        dp_info[i].is_center = -1;
        dp_info[i].cluster_idx = -1;
        dp_info[i].cluster_idx = -1717171717;
        //dp_info[i].halo_flag = 0;
    }
    ctx -> local_datapoints = dp_info;
@@ -402,7 +402,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
    int* cl = (int*)MY_MALLOC(ctx -> local_n_points * sizeof(int));
    for(int i = 0; i < ctx -> local_n_points; ++i) cl[i] = ctx -> local_datapoints[i].cluster_idx;

    if(ctx -> world_size <= 6)
    if(ctx -> world_size <= 32)
    {
        big_ordered_buffer_to_file(ctx, cl, sizeof(int), ctx -> local_n_points, ctx -> output_assignment_file);
        big_ordered_buffer_to_file(ctx, ctx -> local_data, sizeof(double), ctx -> local_n_points * ctx -> dims, ctx -> output_data_file);