Commit be12bbfb authored by lykos98's avatar lykos98
Browse files

added slimmified heap

parent 92c5e4a9
Loading
Loading
Loading
Loading
+5 −4
Original line number Diff line number Diff line
@@ -15,11 +15,12 @@ The suggestion is to run it with one mpi task per socket.

# Todo

 - [ ] H1: implementation of lock free centers elimination
 - [ ] context: open all windows in a single shot, close them all togheter
 - [ ] io: curation of IO using mpi IO or other solutions 
 - [ ] kdtree: optimization an profiling
 - [ ] prettify overall stdout
 - [x] ~~arugment parser~~
 - [x] ~~H2: graph reduction~~
 - [x] ~~kdtree: implement slim heap~~
 - [ ] prettify overall stdout
 - [ ] H1: implementation of lock free centers elimination
 - [ ] kdtree: optimization an profiling
 - [ ] io: curation of IO using mpi IO or other solutions 
+101 −10
Original line number Diff line number Diff line
@@ -595,6 +595,56 @@ void compute_correction(global_context_t* ctx, float_t Z)

}

/* maybe this should return an error?*/

#define LOCK_ACQUIRED 1
#define LOCK_FREE     0

#define lock_t     int
#define MPI_LOCK_T MPI_INT

lock_t h1_lock_acquire(global_context_t* ctx, MPI_Win lock_window, int owner, idx_t pos, lock_t state)
{
    if(state == LOCK_FREE)
    {
        state = LOCK_ACQUIRED;

        lock_t compare = LOCK_FREE;
        lock_t result  = LOCK_ACQUIRED;

        int err = MPI_SUCCESS;

        while(result == LOCK_ACQUIRED && err == MPI_SUCCESS)
        {
            err = MPI_Compare_and_swap(&state, &compare, &result, MPI_LOCK_T, owner, pos, lock_window);
        }

        if(err != MPI_SUCCESS)
        {
            printf("/!\\ Rank %d at line %u\n encountered an error while using MPI_RMA, aborting\n", ctx -> mpi_rank, __LINE__);
            print_error_code(err);
            exit(1);
        }

    }
    return state;
                
}

lock_t h1_lock_free(global_context_t* ctx, MPI_Win lock_window, int owner, idx_t pos, lock_t state)
{
    if(state == LOCK_ACQUIRED)
    {
        state = LOCK_FREE;

        MPI_Accumulate(&state, 1, MPI_LOCK_T, owner, 
                        pos,   1, MPI_LOCK_T, MPI_REPLACE, lock_window);

    }
    return state;
    
}

clusters_t Heuristic1(global_context_t *ctx)
{
    /*
@@ -656,7 +706,7 @@ clusters_t Heuristic1(global_context_t *ctx)
        }
        if(dp_info[i].is_center)
        {
            #pragma omp critical
            #pragma omp critical (push_candidate_center)
            {
                lu_dynamic_array_pushBack(&all_centers, i);
            }
@@ -671,11 +721,14 @@ clusters_t Heuristic1(global_context_t *ctx)
	 * ends, center, removed centers, and max_rho arrays are populated
	 */
		
    lock_t*    lock_array     = (lock_t*)MY_MALLOC(n * sizeof(lock_t));
	heap_node* to_remove_mask = (heap_node*)MY_MALLOC(n*sizeof(heap_node));

    for(idx_t p = 0; p < n; ++p) 
    {
        to_remove_mask[p].array_idx = MY_SIZE_MAX;
        to_remove_mask[p].value = 9999999;
        lock_array[p] = LOCK_FREE;
    }
    qsort(dp_info_ptrs, n, sizeof(datapoint_info_t*), cmpPP);

@@ -684,6 +737,12 @@ clusters_t Heuristic1(global_context_t *ctx)
    MPI_Win_create(to_remove_mask, n * sizeof(heap_node), 1, MPI_INFO_NULL, ctx -> mpi_communicator, &win_to_remove_mask);
    MPI_Win_fence(0, win_to_remove_mask);

    MPI_Win win_locks;
    MPI_Win_create(lock_array, n * sizeof(lock_t), sizeof(lock_t), MPI_INFO_NULL, ctx -> mpi_communicator, &win_locks);
    MPI_Win_fence(0, win_locks);



#if defined(THREAD_FUNNELED)
#else
    #pragma omp parallel for
@@ -699,8 +758,6 @@ clusters_t Heuristic1(global_context_t *ctx)
            datapoint_info_t j_point = find_possibly_halo_datapoint_rma(ctx, jidx,  win_datapoints);

            if(j_point.is_center && i_point.g > j_point.g)
            {
                #pragma omp critical
            {
                /*
                 *
@@ -711,6 +768,35 @@ clusters_t Heuristic1(global_context_t *ctx)
                int owner = foreign_owner(ctx, jidx);
                idx_t jpos = jidx - ctx -> rank_idx_start[owner];

                lock_t state = LOCK_FREE;

                state = h1_lock_acquire(ctx, win_locks, owner, jpos, state);

                heap_node mask_element;
                MPI_Request request;
                MPI_Rget(&mask_element, sizeof(heap_node), MPI_BYTE, 
                         owner, jpos * sizeof(heap_node), sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request);
                MPI_Wait(&request, MPI_STATUS_IGNORE);

                int flag = mask_element.array_idx == MY_SIZE_MAX;							
                if(flag || i_point.g > mask_element.value )
                {
                    heap_node tmp_mask_element = {.array_idx = i_point.array_idx, .value = i_point.g};
                    MPI_Request request;
                    MPI_Rput(&tmp_mask_element, sizeof(heap_node), MPI_BYTE, owner, 
                            jpos*sizeof(heap_node), sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request);
                    MPI_Wait(&request, MPI_STATUS_IGNORE);

                }

                state = h1_lock_free(ctx, win_locks, owner, jpos, state);

                /*
                #pragma omp critical (h1_centers_elimination)
                {
                    int owner = foreign_owner(ctx, jidx);
                    idx_t jpos = jidx - ctx -> rank_idx_start[owner];

                    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, owner, 0, win_to_remove_mask);
                    heap_node mask_element;
                    MPI_Request request;
@@ -731,11 +817,13 @@ clusters_t Heuristic1(global_context_t *ctx)

                    MPI_Win_unlock(owner, win_to_remove_mask);
                }
                */
            }
        }
    }
    
    MPI_Win_fence(0, win_to_remove_mask);
    MPI_Win_fence(0, win_locks);
    MPI_Barrier(ctx -> mpi_communicator);

	/* populate the usual arrays */
@@ -785,6 +873,9 @@ clusters_t Heuristic1(global_context_t *ctx)
    MPI_Win_free(&win_to_remove_mask);
	free(to_remove_mask);
    
    MPI_Win_free(&win_locks);
	free(lock_array);

    int n_centers = (int)actual_centers.count;
    int tot_centers;
    MPI_Allreduce(&n_centers, &tot_centers, 1, MPI_INT, MPI_SUM, ctx -> mpi_communicator);
+0 −2
Original line number Diff line number Diff line
@@ -183,8 +183,6 @@ int main(int argc, char** argv) {
    //parse command line
    
    parse_args(&ctx, argc, argv);
    printf("DIO\n");

	/*
	 * Generate a random matrix of lenght of some kind
	 */