From e93cd4cd9338d834afd886f72f265829ad9c0349 Mon Sep 17 00:00:00 2001 From: Ashley Pittman Date: Tue, 24 Dec 2013 14:21:06 +0000 Subject: [PATCH] Import a version of the collectives patch. Change-Id: Ia9a370e1ff5bc3c7498b4901b0f68e87e5e84e61 --- ompi/communicator/comm.c | 9 +++++++- ompi/communicator/communicator.h | 41 +++++++++++++++++++++++++++++++++++ ompi/debuggers/msgq_interface.h | 2 ++ ompi/debuggers/ompi_common_dll.c | 4 ++++ ompi/debuggers/ompi_common_dll_defs.h | 2 ++ ompi/debuggers/ompi_msgq_dll.c | 28 ++++++++++++++++++++++++ ompi/mpi/c/allgather.c | 2 ++ ompi/mpi/c/allgatherv.c | 2 ++ ompi/mpi/c/allreduce.c | 2 ++ ompi/mpi/c/alltoall.c | 2 ++ ompi/mpi/c/alltoallv.c | 2 ++ ompi/mpi/c/barrier.c | 3 +++ ompi/mpi/c/bcast.c | 2 ++ ompi/mpi/c/gather.c | 2 ++ ompi/mpi/c/gatherv.c | 2 ++ ompi/mpi/c/reduce.c | 2 ++ ompi/mpi/c/reduce_scatter.c | 2 ++ ompi/mpi/c/scan.c | 2 ++ ompi/mpi/c/scatter.c | 2 ++ ompi/mpi/c/scatterv.c | 2 ++ 20 files changed, 114 insertions(+), 1 deletion(-) diff --git a/ompi/communicator/comm.c b/ompi/communicator/comm.c index 0ca001a..f962e51 100644 --- a/ompi/communicator/comm.c +++ b/ompi/communicator/comm.c @@ -130,7 +130,7 @@ int ompi_comm_set_nb ( ompi_communicator_t **ncomm, ompi_request_t **req ) { ompi_communicator_t *newcomm = NULL; - int ret; + int ret, i; *req = NULL; @@ -141,6 +141,13 @@ int ompi_comm_set_nb ( ompi_communicator_t **ncomm, newcomm->c_id_available = MPI_UNDEFINED; newcomm->c_id_start_index = MPI_UNDEFINED; + newcomm->c_comm_call_active = 0; + for ( i = 0 ; i < 14 ; i++ ) { + newcomm->c_call_counters_up[i] = 0; + newcomm->c_call_counters_down[i] = 0; + } + + if (NULL == local_group) { /* determine how the list of local_rank can be stored most efficiently */ diff --git a/ompi/communicator/communicator.h b/ompi/communicator/communicator.h index 76ee83e..4ee127b 100644 --- a/ompi/communicator/communicator.h +++ b/ompi/communicator/communicator.h @@ -144,6 +144,14 @@ struct ompi_communicator_t { /* index in Fortran <-> C translation array */ int c_f_to_c_index; + /* There values match the enum in mpi_interface.h, we increase "up" every time a collective + * function is called and increase "down" every time the library returns, comm_call_active + * is used to ensure we only change the counters for the top-level function if one collective + * is implemented in terms of another */ + int c_call_counters_up[15]; + int c_call_counters_down[15]; + int c_comm_call_active; + #ifdef OMPI_WANT_PERUSE /* * Place holder for the PERUSE events. @@ -249,6 +257,39 @@ OMPI_DECLSPEC extern ompi_predefined_communicator_t *ompi_mpi_comm_world_addr; OMPI_DECLSPEC extern ompi_predefined_communicator_t *ompi_mpi_comm_self_addr; OMPI_DECLSPEC extern ompi_predefined_communicator_t *ompi_mpi_comm_null_addr; +/* 14 of them */ +typedef enum + { + ompi_mqs_comm_barrier, + ompi_mqs_comm_bcast, + ompi_mqs_comm_allgather, + ompi_mqs_comm_allgatherv, + ompi_mqs_comm_allreduce, + ompi_mqs_comm_alltoall, + ompi_mqs_comm_alltoallv, + ompi_mqs_comm_reduce_scatter, + ompi_mqs_comm_reduce, + ompi_mqs_comm_gather, + ompi_mqs_comm_gatherv, + ompi_mqs_comm_scan, + ompi_mqs_comm_scatter, + ompi_mqs_comm_scatterv + } ompi_mqs_comm_class; + +static inline void ompi_comm_call_up (ompi_communicator_t* comm, ompi_mqs_comm_class op) +{ + if ( ! comm->c_comm_call_active++ ) { + comm->c_call_counters_up[op]++; + } +} + +static inline void ompi_comm_call_down (ompi_communicator_t* comm, ompi_mqs_comm_class op) +{ + if ( ! --comm->c_comm_call_active ) { + comm->c_call_counters_down[op]++; + } +} + /** * Is this a valid communicator? This is a complicated question. diff --git a/ompi/debuggers/msgq_interface.h b/ompi/debuggers/msgq_interface.h index bb34072..fdb5470 100644 --- a/ompi/debuggers/msgq_interface.h +++ b/ompi/debuggers/msgq_interface.h @@ -691,6 +691,8 @@ extern int mqs_next_new_process (mqs_process *, mqs_process_location *); extern int mqs_set_process_identity (mqs_process *, int); #endif +OMPI_DECLSPEC extern int mqs_get_comm_coll_state (mqs_process *, int, int *, int *); + END_C_DECLS #endif /* defined (_MPI_INTERFACE_INCLUDED) */ diff --git a/ompi/debuggers/ompi_common_dll.c b/ompi/debuggers/ompi_common_dll.c index e57e903..1a9f020 100644 --- a/ompi/debuggers/ompi_common_dll.c +++ b/ompi/debuggers/ompi_common_dll.c @@ -329,6 +329,10 @@ int ompi_fill_in_type_info(mqs_image *image, char **message) qh_type, ompi_communicator_t, c_topo); ompi_field_offset(i_info->ompi_communicator_t.offset.c_keyhash, qh_type, ompi_communicator_t, c_keyhash); + ompi_field_offset(i_info->ompi_communicator_t.offset.c_call_counters_up, + qh_type, ompi_communicator_t, c_call_counters_up); + ompi_field_offset(i_info->ompi_communicator_t.offset.c_call_counters_down, + qh_type, ompi_communicator_t, c_call_counters_down); } { mqs_type* qh_type, *cg_union_type, *cart_type, *graph_type, *dist_graph_type; diff --git a/ompi/debuggers/ompi_common_dll_defs.h b/ompi/debuggers/ompi_common_dll_defs.h index 0c472a2..a028050 100644 --- a/ompi/debuggers/ompi_common_dll_defs.h +++ b/ompi/debuggers/ompi_common_dll_defs.h @@ -199,6 +199,8 @@ typedef struct int c_f_to_c_index; int c_topo; int c_keyhash; + int c_call_counters_up; + int c_call_counters_down; } offset; } ompi_communicator_t; /* base topology information in a communicator */ diff --git a/ompi/debuggers/ompi_msgq_dll.c b/ompi/debuggers/ompi_msgq_dll.c index e5b7325..a743620 100644 --- a/ompi/debuggers/ompi_msgq_dll.c +++ b/ompi/debuggers/ompi_msgq_dll.c @@ -1317,6 +1317,34 @@ void mqs_destroy_image_info (mqs_image_info *info) mqs_free (info); } /* mqs_destroy_image_info */ + +int mqs_get_comm_coll_state (mqs_process *proc, int op, int *in, int *curr) +{ + mpi_process_info *p_info = (mpi_process_info *)mqs_get_process_info (proc); + mpi_process_info_extra *extra = (mpi_process_info_extra*) p_info->extra; + mqs_image * image = mqs_get_image (proc); + mpi_image_info *i_info = (mpi_image_info *)mqs_get_image_info (image); + + mqs_taddr_t comm_ptr = extra->current_communicator->comm_ptr; + int out; + + if ( op >= 14 ) + return mqs_no_information; + + *in = ompi_fetch_int( proc, + comm_ptr + i_info->ompi_communicator_t.offset.c_call_counters_up + ( op * sizeof(int)), + p_info ); + + out = ompi_fetch_int( proc, + comm_ptr + i_info->ompi_communicator_t.offset.c_call_counters_down + ( op * sizeof(int)), + p_info ); + + *curr = ( *in == out ? 0 : 1); + + return mqs_ok; +} + + /***********************************************************************/ /* Convert an error code into a printable string */ char * mqs_dll_error_string (int errcode) diff --git a/ompi/mpi/c/allgather.c b/ompi/mpi/c/allgather.c index 4be4755..704928a 100644 --- a/ompi/mpi/c/allgather.c +++ b/ompi/mpi/c/allgather.c @@ -115,12 +115,14 @@ int MPI_Allgather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, OPAL_CR_ENTER_LIBRARY(); + ompi_comm_call_up(comm,ompi_mqs_comm_allgather); /* Invoke the coll component to perform the back-end operation */ /* XXX -- CONST -- do not cast away const -- update mca/coll */ err = comm->c_coll.coll_allgather((void *) sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm, comm->c_coll.coll_allgather_module); + ompi_comm_call_down(comm,ompi_mqs_comm_allgather); OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME); } diff --git a/ompi/mpi/c/allgatherv.c b/ompi/mpi/c/allgatherv.c index 44a912b..f6182dc 100644 --- a/ompi/mpi/c/allgatherv.c +++ b/ompi/mpi/c/allgatherv.c @@ -137,12 +137,14 @@ int MPI_Allgatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype, OPAL_CR_ENTER_LIBRARY(); + ompi_comm_call_up(comm,ompi_mqs_comm_allgatherv); /* Invoke the coll component to perform the back-end operation */ /* XXX -- CONST -- do not cast away const -- update mca/coll */ err = comm->c_coll.coll_allgatherv((void *) sendbuf, sendcount, sendtype, recvbuf, (int *) recvcounts, (int *) displs, recvtype, comm, comm->c_coll.coll_allgatherv_module); + ompi_comm_call_down(comm,ompi_mqs_comm_allgatherv); OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME); } diff --git a/ompi/mpi/c/allreduce.c b/ompi/mpi/c/allreduce.c index 3b32264..f883da6 100644 --- a/ompi/mpi/c/allreduce.c +++ b/ompi/mpi/c/allreduce.c @@ -102,6 +102,7 @@ int MPI_Allreduce(const void *sendbuf, void *recvbuf, int count, OPAL_CR_ENTER_LIBRARY(); + ompi_comm_call_up(comm,ompi_mqs_comm_allreduce); /* Invoke the coll component to perform the back-end operation */ OBJ_RETAIN(op); @@ -110,6 +111,7 @@ int MPI_Allreduce(const void *sendbuf, void *recvbuf, int count, datatype, op, comm, comm->c_coll.coll_allreduce_module); OBJ_RELEASE(op); + ompi_comm_call_down(comm,ompi_mqs_comm_allreduce); OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME); } diff --git a/ompi/mpi/c/alltoall.c b/ompi/mpi/c/alltoall.c index 477c8ae..81d205c 100644 --- a/ompi/mpi/c/alltoall.c +++ b/ompi/mpi/c/alltoall.c @@ -90,11 +90,13 @@ int MPI_Alltoall(const void *sendbuf, int sendcount, MPI_Datatype sendtype, OPAL_CR_ENTER_LIBRARY(); + ompi_comm_call_up(comm,ompi_mqs_comm_alltoall); /* Invoke the coll component to perform the back-end operation */ /* XXX -- CONST -- do not cast away const -- update mca/coll */ err = comm->c_coll.coll_alltoall((void *) sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm, comm->c_coll.coll_alltoall_module); + ompi_comm_call_down(comm,ompi_mqs_comm_alltoall); OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME); } diff --git a/ompi/mpi/c/alltoallv.c b/ompi/mpi/c/alltoallv.c index 6b0ac96..11af002 100644 --- a/ompi/mpi/c/alltoallv.c +++ b/ompi/mpi/c/alltoallv.c @@ -115,11 +115,13 @@ int MPI_Alltoallv(const void *sendbuf, const int sendcounts[], OPAL_CR_ENTER_LIBRARY(); + ompi_comm_call_up(comm,ompi_mqs_comm_alltoallv); /* Invoke the coll component to perform the back-end operation */ /* XXX -- CONST -- do not cast away const -- update mca/coll */ err = comm->c_coll.coll_alltoallv((void *) sendbuf, (int *) sendcounts, (int *) sdispls, sendtype, recvbuf, (int *) recvcounts, (int *) rdispls, recvtype, comm, comm->c_coll.coll_alltoallv_module); + ompi_comm_call_down(comm,ompi_mqs_comm_alltoallv); OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME); } diff --git a/ompi/mpi/c/barrier.c b/ompi/mpi/c/barrier.c index 8f5cccd..db4e043 100644 --- a/ompi/mpi/c/barrier.c +++ b/ompi/mpi/c/barrier.c @@ -54,6 +54,8 @@ int MPI_Barrier(MPI_Comm comm) OPAL_CR_ENTER_LIBRARY(); + ompi_comm_call_up(comm,ompi_mqs_comm_barrier); + /* Intracommunicators: Only invoke the back-end coll module barrier function if there's more than one process in the communicator */ @@ -70,6 +72,7 @@ int MPI_Barrier(MPI_Comm comm) err = comm->c_coll.coll_barrier(comm, comm->c_coll.coll_barrier_module); } + ompi_comm_call_down(comm,ompi_mqs_comm_barrier); /* All done */ OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME); diff --git a/ompi/mpi/c/bcast.c b/ompi/mpi/c/bcast.c index 8044862..6c7f9d5 100644 --- a/ompi/mpi/c/bcast.c +++ b/ompi/mpi/c/bcast.c @@ -105,9 +105,11 @@ int MPI_Bcast(void *buffer, int count, MPI_Datatype datatype, OPAL_CR_ENTER_LIBRARY(); + ompi_comm_call_up(comm,ompi_mqs_comm_bcast); /* Invoke the coll component to perform the back-end operation */ err = comm->c_coll.coll_bcast(buffer, count, datatype, root, comm, comm->c_coll.coll_bcast_module); + ompi_comm_call_down(comm,ompi_mqs_comm_bcast); OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME); } diff --git a/ompi/mpi/c/gather.c b/ompi/mpi/c/gather.c index ed788a0..05516b0 100644 --- a/ompi/mpi/c/gather.c +++ b/ompi/mpi/c/gather.c @@ -176,10 +176,12 @@ int MPI_Gather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, OPAL_CR_ENTER_LIBRARY(); + ompi_comm_call_up(comm,ompi_mqs_comm_gather); /* Invoke the coll component to perform the back-end operation */ /* XXX -- CONST -- do not cast away const -- update mca/coll */ err = comm->c_coll.coll_gather((void *) sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm, comm->c_coll.coll_gather_module); + ompi_comm_call_down(comm,ompi_mqs_comm_gather); OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME); } diff --git a/ompi/mpi/c/gatherv.c b/ompi/mpi/c/gatherv.c index b3b384a..1adc08a 100644 --- a/ompi/mpi/c/gatherv.c +++ b/ompi/mpi/c/gatherv.c @@ -190,11 +190,13 @@ int MPI_Gatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype, OPAL_CR_ENTER_LIBRARY(); + ompi_comm_call_up(comm,ompi_mqs_comm_gatherv); /* Invoke the coll component to perform the back-end operation */ /* XXX -- CONST -- do not cast away const -- update mca/coll */ err = comm->c_coll.coll_gatherv((void *) sendbuf, sendcount, sendtype, recvbuf, (int *) recvcounts, (int *) displs, recvtype, root, comm, comm->c_coll.coll_gatherv_module); + ompi_comm_call_down(comm,ompi_mqs_comm_gatherv); OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME); } diff --git a/ompi/mpi/c/reduce.c b/ompi/mpi/c/reduce.c index dbd2964..a01c0b9 100644 --- a/ompi/mpi/c/reduce.c +++ b/ompi/mpi/c/reduce.c @@ -129,6 +129,7 @@ int MPI_Reduce(const void *sendbuf, void *recvbuf, int count, OPAL_CR_ENTER_LIBRARY(); + ompi_comm_call_up(comm,ompi_mqs_comm_reduce); /* Invoke the coll component to perform the back-end operation */ OBJ_RETAIN(op); @@ -137,5 +138,6 @@ int MPI_Reduce(const void *sendbuf, void *recvbuf, int count, datatype, op, root, comm, comm->c_coll.coll_reduce_module); OBJ_RELEASE(op); + ompi_comm_call_down(comm,ompi_mqs_comm_reduce); OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME); } diff --git a/ompi/mpi/c/reduce_scatter.c b/ompi/mpi/c/reduce_scatter.c index 3877737..63f8e36 100644 --- a/ompi/mpi/c/reduce_scatter.c +++ b/ompi/mpi/c/reduce_scatter.c @@ -124,6 +124,7 @@ int MPI_Reduce_scatter(const void *sendbuf, void *recvbuf, const int recvcounts[ OPAL_CR_ENTER_LIBRARY(); + ompi_comm_call_up(comm,ompi_mqs_comm_reduce_scatter); /* Invoke the coll component to perform the back-end operation */ OBJ_RETAIN(op); @@ -132,5 +133,6 @@ int MPI_Reduce_scatter(const void *sendbuf, void *recvbuf, const int recvcounts[ datatype, op, comm, comm->c_coll.coll_reduce_scatter_module); OBJ_RELEASE(op); + ompi_comm_call_down(comm,ompi_mqs_comm_reduce_scatter); OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME); } diff --git a/ompi/mpi/c/scan.c b/ompi/mpi/c/scan.c index 5d44ea7..52fa3de 100644 --- a/ompi/mpi/c/scan.c +++ b/ompi/mpi/c/scan.c @@ -98,6 +98,7 @@ int MPI_Scan(const void *sendbuf, void *recvbuf, int count, OPAL_CR_ENTER_LIBRARY(); + ompi_comm_call_up(comm,ompi_mqs_comm_scan); /* Call the coll component to actually perform the allgather */ OBJ_RETAIN(op); @@ -106,5 +107,6 @@ int MPI_Scan(const void *sendbuf, void *recvbuf, int count, datatype, op, comm, comm->c_coll.coll_scan_module); OBJ_RELEASE(op); + ompi_comm_call_down(comm,ompi_mqs_comm_scan); OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME); } diff --git a/ompi/mpi/c/scatter.c b/ompi/mpi/c/scatter.c index 56acc7d..be361f3 100644 --- a/ompi/mpi/c/scatter.c +++ b/ompi/mpi/c/scatter.c @@ -159,10 +159,12 @@ int MPI_Scatter(const void *sendbuf, int sendcount, MPI_Datatype sendtype, OPAL_CR_ENTER_LIBRARY(); + ompi_comm_call_up(comm,ompi_mqs_comm_scatter); /* Invoke the coll component to perform the back-end operation */ /* XXX -- CONST -- do not cast away const -- update mca/coll */ err = comm->c_coll.coll_scatter((void *) sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm, comm->c_coll.coll_scatter_module); + ompi_comm_call_down(comm,ompi_mqs_comm_scatterv); OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME); } diff --git a/ompi/mpi/c/scatterv.c b/ompi/mpi/c/scatterv.c index 8410359..6833b45 100644 --- a/ompi/mpi/c/scatterv.c +++ b/ompi/mpi/c/scatterv.c @@ -188,10 +188,12 @@ int MPI_Scatterv(const void *sendbuf, const int sendcounts[], const int displs[] OPAL_CR_ENTER_LIBRARY(); + ompi_comm_call_up(comm,ompi_mqs_comm_scatterv); /* Invoke the coll component to perform the back-end operation */ /* XXX -- CONST -- do not cast away const -- update mca/coll */ err = comm->c_coll.coll_scatterv((void *) sendbuf, (int *) sendcounts, (int *) displs, sendtype, recvbuf, recvcount, recvtype, root, comm, comm->c_coll.coll_scatterv_module); + ompi_comm_call_down(comm,ompi_mqs_comm_scatterv); OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME); } -- 1.8.4.2