-: 0:Source:/home/MPI/testing/mpich2/mpich2/src/mpi/coll/allreduce.c
-: 0:Graph:allreduce.gcno
-: 0:Data:allreduce.gcda
-: 0:Runs:4382
-: 0:Programs:1376
-: 1:/* -*- Mode: C; c-basic-offset:4 ; -*- */
-: 2:/*
-: 3: * (C) 2001 by Argonne National Laboratory.
-: 4: * See COPYRIGHT in top-level directory.
-: 5: */
-: 6:
-: 7:#include "mpiimpl.h"
-: 8:
-: 9:/* -- Begin Profiling Symbol Block for routine MPI_Allreduce */
-: 10:#if defined(HAVE_PRAGMA_WEAK)
-: 11:#pragma weak MPI_Allreduce = PMPI_Allreduce
-: 12:#elif defined(HAVE_PRAGMA_HP_SEC_DEF)
-: 13:#pragma _HP_SECONDARY_DEF PMPI_Allreduce MPI_Allreduce
-: 14:#elif defined(HAVE_PRAGMA_CRI_DUP)
-: 15:#pragma _CRI duplicate MPI_Allreduce as PMPI_Allreduce
-: 16:#endif
-: 17:/* -- End Profiling Symbol Block */
-: 18:
-: 19:/* Define MPICH_MPI_FROM_PMPI if weak symbols are not supported to build
-: 20: the MPI routines */
-: 21:#ifndef MPICH_MPI_FROM_PMPI
-: 22:#undef MPI_Allreduce
-: 23:#define MPI_Allreduce PMPI_Allreduce
-: 24:
-: 25:MPI_User_function *MPIR_Op_table[] = { MPIR_MAXF, MPIR_MINF, MPIR_SUM,
-: 26: MPIR_PROD, MPIR_LAND,
-: 27: MPIR_BAND, MPIR_LOR, MPIR_BOR,
-: 28: MPIR_LXOR, MPIR_BXOR,
-: 29: MPIR_MINLOC, MPIR_MAXLOC, };
-: 30:
-: 31:MPIR_Op_check_dtype_fn *MPIR_Op_check_dtype_table[] = {
-: 32: MPIR_MAXF_check_dtype, MPIR_MINF_check_dtype,
-: 33: MPIR_SUM_check_dtype,
-: 34: MPIR_PROD_check_dtype, MPIR_LAND_check_dtype,
-: 35: MPIR_BAND_check_dtype, MPIR_LOR_check_dtype, MPIR_BOR_check_dtype,
-: 36: MPIR_LXOR_check_dtype, MPIR_BXOR_check_dtype,
-: 37: MPIR_MINLOC_check_dtype, MPIR_MAXLOC_check_dtype, };
-: 38:
-: 39:
-: 40:/* This is the default implementation of allreduce. The algorithm is:
-: 41:
-: 42: Algorithm: MPI_Allreduce
-: 43:
-: 44: For the heterogeneous case, we call MPI_Reduce followed by MPI_Bcast
-: 45: in order to meet the requirement that all processes must have the
-: 46: same result. For the homogeneous case, we use the following algorithms.
-: 47:
-: 48:
-: 49: For long messages and for builtin ops and if count >= pof2 (where
-: 50: pof2 is the nearest power-of-two less than or equal to the number
-: 51: of processes), we use Rabenseifner's algorithm (see
-: 52: http://www.hlrs.de/organization/par/services/models/mpi/myreduce.html ).
-: 53: This algorithm implements the allreduce in two steps: first a
-: 54: reduce-scatter, followed by an allgather. A recursive-halving
-: 55: algorithm (beginning with processes that are distance 1 apart) is
-: 56: used for the reduce-scatter, and a recursive doubling
-: 57: algorithm is used for the allgather. The non-power-of-two case is
-: 58: handled by dropping to the nearest lower power-of-two: the first
-: 59: few even-numbered processes send their data to their right neighbors
-: 60: (rank+1), and the reduce-scatter and allgather happen among the remaining
-: 61: power-of-two processes. At the end, the first few even-numbered
-: 62: processes get the result from their right neighbors.
-: 63:
-: 64: For the power-of-two case, the cost for the reduce-scatter is
-: 65: lgp.alpha + n.((p-1)/p).beta + n.((p-1)/p).gamma. The cost for the
-: 66: allgather lgp.alpha + n.((p-1)/p).beta. Therefore, the
-: 67: total cost is:
-: 68: Cost = 2.lgp.alpha + 2.n.((p-1)/p).beta + n.((p-1)/p).gamma
-: 69:
-: 70: For the non-power-of-two case,
-: 71: Cost = (2.floor(lgp)+2).alpha + (2.((p-1)/p) + 2).n.beta + n.(1+(p-1)/p).gamma
-: 72:
-: 73:
-: 74: For short messages, for user-defined ops, and for count < pof2
-: 75: we use a recursive doubling algorithm (similar to the one in
-: 76: MPI_Allgather). We use this algorithm in the case of user-defined ops
-: 77: because in this case derived datatypes are allowed, and the user
-: 78: could pass basic datatypes on one process and derived on another as
-: 79: long as the type maps are the same. Breaking up derived datatypes
-: 80: to do the reduce-scatter is tricky.
-: 81:
-: 82: Cost = lgp.alpha + n.lgp.beta + n.lgp.gamma
-: 83:
-: 84: Possible improvements:
-: 85:
-: 86: End Algorithm: MPI_Allreduce
-: 87:*/
-: 88:
-: 89:
-: 90:/* not declared static because a machine-specific function may call this one
-: 91: in some cases */
-: 92:#undef FCNAME
-: 93:#define FCNAME "MPIR_Allreduce"
-: 94:
-: 95:int MPIR_Allreduce (
-: 96: void *sendbuf,
-: 97: void *recvbuf,
-: 98: int count,
-: 99: MPI_Datatype datatype,
-: 100: MPI_Op op,
-: 101: MPID_Comm *comm_ptr )
479444: 102:{
-: 103: int is_homogeneous;
-: 104:#ifdef MPID_HAS_HETERO
-: 105: int rc;
-: 106:#endif
-: 107: int comm_size, rank, type_size;
479444: 108: int mpi_errno = MPI_SUCCESS;
-: 109: int mask, dst, is_commutative, pof2, newrank, rem, newdst, i,
-: 110: send_idx, recv_idx, last_idx, send_cnt, recv_cnt, *cnts, *disps;
-: 111: MPI_Aint true_extent, true_lb, extent;
-: 112: void *tmp_buf;
-: 113: MPI_User_function *uop;
-: 114: MPID_Op *op_ptr;
-: 115: MPI_Comm comm;
479444: 116: MPIU_THREADPRIV_DECL;
-: 117:#ifdef HAVE_CXX_BINDING
479444: 118: int is_cxx_uop = 0;
-: 119:#endif
479444: 120: MPIU_CHKLMEM_DECL(3);
-: 121:
479444: 122: if (count == 0) return MPI_SUCCESS;
479444: 123: comm = comm_ptr->handle;
-: 124:
479444: 125: MPIU_THREADPRIV_GET;
479444: 126: MPIR_Nest_incr();
-: 127:
479444: 128: is_homogeneous = 1;
-: 129:#ifdef MPID_HAS_HETERO
-: 130: if (comm_ptr->is_hetero)
-: 131: is_homogeneous = 0;
-: 132:#endif
-: 133:
-: 134:#ifdef MPID_HAS_HETERO
-: 135: if (!is_homogeneous) {
-: 136: /* heterogeneous. To get the same result on all processes, we
-: 137: do a reduce to 0 and then broadcast. */
-: 138: mpi_errno = NMPI_Reduce ( sendbuf, recvbuf, count, datatype,
-: 139: op, 0, comm );
-: 140: /* FIXME: mpi_errno is error CODE, not necessarily the error
-: 141: class MPI_ERR_OP. In MPICH2, we can get the error class
-: 142: with
-: 143: errorclass = mpi_errno & ERROR_CLASS_MASK;
-: 144: */
-: 145: if (mpi_errno == MPI_ERR_OP || mpi_errno == MPI_SUCCESS) {
-: 146: /* Allow MPI_ERR_OP since we can continue from this error */
-: 147: rc = NMPI_Bcast ( recvbuf, count, datatype, 0, comm );
-: 148: if (rc) mpi_errno = rc;
-: 149: }
-: 150: }
-: 151: else
-: 152:#endif /* MPID_HAS_HETERO */
-: 153: {
-: 154: /* homogeneous */
-: 155:
-: 156: /* set op_errno to 0. stored in perthread structure */
479444: 157: MPIU_THREADPRIV_FIELD(op_errno) = 0;
-: 158:
479444: 159: comm_size = comm_ptr->local_size;
479444: 160: rank = comm_ptr->rank;
-: 161:
479444: 162: if (HANDLE_GET_KIND(op) == HANDLE_KIND_BUILTIN) {
475917: 163: is_commutative = 1;
-: 164: /* get the function by indexing into the op table */
475917: 165: uop = MPIR_Op_table[op%16 - 1];
-: 166: }
-: 167: else {
3527: 168: MPID_Op_get_ptr(op, op_ptr);
3527: 169: if (op_ptr->kind == MPID_OP_USER_NONCOMMUTE)
2140: 170: is_commutative = 0;
-: 171: else
1387: 172: is_commutative = 1;
-: 173:#ifdef HAVE_CXX_BINDING
3527: 174: if (op_ptr->language == MPID_LANG_CXX) {
76: 175: uop = (MPI_User_function *) op_ptr->function.c_function;
76: 176: is_cxx_uop = 1;
-: 177: }
-: 178: else
-: 179:#endif
3451: 180: if ((op_ptr->language == MPID_LANG_C))
3451: 181: uop = (MPI_User_function *) op_ptr->function.c_function;
-: 182: else
|
#####: 183: uop = (MPI_User_function *) op_ptr->function.f77_function;
-: 184: }
-: 185:
-: 186: /* need to allocate temporary buffer to store incoming data*/
|
479444: 187: mpi_errno = NMPI_Type_get_true_extent(datatype, &true_lb,
-: 188: &true_extent);
479444: 189: MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail");
479444: 190: MPID_Datatype_get_extent_macro(datatype, extent);
-: 191:
-: 192: MPID_Ensure_Aint_fits_in_pointer(count * MPIR_MAX(extent, true_extent));
479444: 193: MPIU_CHKLMEM_MALLOC(tmp_buf, void *, count*(MPIR_MAX(extent,true_extent)), mpi_errno, "temporary buffer");
-: 194:
-: 195: /* adjust for potential negative lower bound in datatype */
479444: 196: tmp_buf = (void *)((char*)tmp_buf - true_lb);
-: 197:
-: 198: /* copy local data into recvbuf */
479444: 199: if (sendbuf != MPI_IN_PLACE) {
9501: 200: mpi_errno = MPIR_Localcopy(sendbuf, count, datatype, recvbuf,
-: 201: count, datatype);
9501: 202: MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail");
-: 203: }
-: 204:
479444: 205: MPID_Datatype_get_size_macro(datatype, type_size);
-: 206:
-: 207: /* find nearest power-of-two less than or equal to comm_size */
479444: 208: pof2 = 1;
479444: 209: while (pof2 <= comm_size) pof2 <<= 1;
479444: 210: pof2 >>=1;
-: 211:
479444: 212: rem = comm_size - pof2;
-: 213:
-: 214: /* In the non-power-of-two case, all even-numbered
-: 215: processes of rank < 2*rem send their data to
-: 216: (rank+1). These even-numbered processes no longer
-: 217: participate in the algorithm until the very end. The
-: 218: remaining processes form a nice power-of-two. */
-: 219:
-: 220: /* check if multiple threads are calling this collective function */
-: 221: MPIDU_ERR_CHECK_MULTIPLE_THREADS_ENTER( comm_ptr );
-: 222:
479444: 223: if (rank < 2*rem) {
486: 224: if (rank % 2 == 0) { /* even */
243: 225: mpi_errno = MPIC_Send(recvbuf, count,
-: 226: datatype, rank+1,
-: 227: MPIR_ALLREDUCE_TAG, comm);
243: 228: MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail");
-: 229:
-: 230: /* temporarily set the rank to -1 so that this
-: 231: process does not pariticipate in recursive
-: 232: doubling */
243: 233: newrank = -1;
-: 234: }
-: 235: else { /* odd */
243: 236: mpi_errno = MPIC_Recv(tmp_buf, count,
-: 237: datatype, rank-1,
-: 238: MPIR_ALLREDUCE_TAG, comm,
-: 239: MPI_STATUS_IGNORE);
243: 240: MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail");
-: 241:
-: 242: /* do the reduction on received data. since the
-: 243: ordering is right, it doesn't matter whether
-: 244: the operation is commutative or not. */
-: 245:#ifdef HAVE_CXX_BINDING
243: 246: if (is_cxx_uop) {
10: 247: (*MPIR_Process.cxx_call_op_fn)( tmp_buf, recvbuf,
-: 248: count,
-: 249: datatype,
-: 250: uop );
-: 251: }
-: 252: else
-: 253:#endif
233: 254: (*uop)(tmp_buf, recvbuf, &count, &datatype);
-: 255:
-: 256: /* change the rank */
243: 257: newrank = rank / 2;
-: 258: }
-: 259: }
-: 260: else /* rank >= 2*rem */
478958: 261: newrank = rank - rem;
-: 262:
-: 263: /* If op is user-defined or count is less than pof2, use
-: 264: recursive doubling algorithm. Otherwise do a reduce-scatter
-: 265: followed by allgather. (If op is user-defined,
-: 266: derived datatypes are allowed and the user could pass basic
-: 267: datatypes on one process and derived on another as long as
-: 268: the type maps are the same. Breaking up derived
-: 269: datatypes to do the reduce-scatter is tricky, therefore
-: 270: using recursive doubling in that case.) */
-: 271:
479444: 272: if (newrank != -1) {
479201: 273: if ((count*type_size <= MPIR_ALLREDUCE_SHORT_MSG) ||
-: 274: (HANDLE_GET_KIND(op) != HANDLE_KIND_BUILTIN) ||
-: 275: (count < pof2)) { /* use recursive doubling */
477987: 276: mask = 0x1;
960220: 277: while (mask < pof2) {
4246: 278: newdst = newrank ^ mask;
-: 279: /* find real rank of dest */
4246: 280: dst = (newdst < rem) ? newdst*2 + 1 : newdst + rem;
-: 281:
-: 282: /* Send the most current data, which is in recvbuf. Recv
-: 283: into tmp_buf */
4246: 284: mpi_errno = MPIC_Sendrecv(recvbuf, count, datatype,
-: 285: dst, MPIR_ALLREDUCE_TAG, tmp_buf,
-: 286: count, datatype, dst,
-: 287: MPIR_ALLREDUCE_TAG, comm,
-: 288: MPI_STATUS_IGNORE);
4246: 289: MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail");
-: 290:
-: 291: /* tmp_buf contains data received in this step.
-: 292: recvbuf contains data accumulated so far */
-: 293:
4246: 294: if (is_commutative || (dst < rank)) {
-: 295: /* op is commutative OR the order is already right */
-: 296:#ifdef HAVE_CXX_BINDING
3424: 297: if (is_cxx_uop) {
40: 298: (*MPIR_Process.cxx_call_op_fn)( tmp_buf, recvbuf,
-: 299: count,
-: 300: datatype,
-: 301: uop );
-: 302: }
-: 303: else
-: 304:#endif
3384: 305: (*uop)(tmp_buf, recvbuf, &count, &datatype);
-: 306: }
-: 307: else {
-: 308: /* op is noncommutative and the order is not right */
-: 309:#ifdef HAVE_CXX_BINDING
822: 310: if (is_cxx_uop) {
40: 311: (*MPIR_Process.cxx_call_op_fn)( recvbuf, tmp_buf,
-: 312: count,
-: 313: datatype,
-: 314: uop );
-: 315: }
-: 316: else
-: 317:#endif
782: 318: (*uop)(recvbuf, tmp_buf, &count, &datatype);
-: 319:
-: 320: /* copy result back into recvbuf */
822: 321: mpi_errno = MPIR_Localcopy(tmp_buf, count, datatype,
-: 322: recvbuf, count, datatype);
822: 323: MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail");
-: 324: }
4246: 325: mask <<= 1;
-: 326: }
-: 327: }
-: 328:
-: 329: else {
-: 330:
-: 331: /* do a reduce-scatter followed by allgather */
-: 332:
-: 333: /* for the reduce-scatter, calculate the count that
-: 334: each process receives and the displacement within
-: 335: the buffer */
-: 336:
1214: 337: MPIU_CHKLMEM_MALLOC(cnts, int *, pof2*sizeof(int), mpi_errno, "counts");
1214: 338: MPIU_CHKLMEM_MALLOC(disps, int *, pof2*sizeof(int), mpi_errno, "displacements");
-: 339:
1214: 340: for (i=0; i<(pof2-1); i++)
|
#####: 341: cnts[i] = count/pof2;
|
1214: 342: cnts[pof2-1] = count - (count/pof2)*(pof2-1);
-: 343:
1214: 344: disps[0] = 0;
1214: 345: for (i=1; i<pof2; i++)
|
#####: 346: disps[i] = disps[i-1] + cnts[i-1];
-: 347:
|
1214: 348: mask = 0x1;
1214: 349: send_idx = recv_idx = 0;
1214: 350: last_idx = pof2;
2428: 351: while (mask < pof2) {
|
#####: 352: newdst = newrank ^ mask;
-: 353: /* find real rank of dest */
#####: 354: dst = (newdst < rem) ? newdst*2 + 1 : newdst + rem;
-: 355:
#####: 356: send_cnt = recv_cnt = 0;
#####: 357: if (newrank < newdst) {
#####: 358: send_idx = recv_idx + pof2/(mask*2);
#####: 359: for (i=send_idx; i<last_idx; i++)
#####: 360: send_cnt += cnts[i];
#####: 361: for (i=recv_idx; i<send_idx; i++)
#####: 362: recv_cnt += cnts[i];
-: 363: }
-: 364: else {
#####: 365: recv_idx = send_idx + pof2/(mask*2);
#####: 366: for (i=send_idx; i<recv_idx; i++)
#####: 367: send_cnt += cnts[i];
#####: 368: for (i=recv_idx; i<last_idx; i++)
#####: 369: recv_cnt += cnts[i];
-: 370: }
-: 371:
-: 372:/* printf("Rank %d, send_idx %d, recv_idx %d, send_cnt %d, recv_cnt %d, last_idx %d\n", newrank, send_idx, recv_idx,
-: 373: send_cnt, recv_cnt, last_idx);
-: 374: */
-: 375: /* Send data from recvbuf. Recv into tmp_buf */
#####: 376: mpi_errno = MPIC_Sendrecv((char *) recvbuf +
-: 377: disps[send_idx]*extent,
-: 378: send_cnt, datatype,
-: 379: dst, MPIR_ALLREDUCE_TAG,
-: 380: (char *) tmp_buf +
-: 381: disps[recv_idx]*extent,
-: 382: recv_cnt, datatype, dst,
-: 383: MPIR_ALLREDUCE_TAG, comm,
-: 384: MPI_STATUS_IGNORE);
#####: 385: MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail");
-: 386:
-: 387: /* tmp_buf contains data received in this step.
-: 388: recvbuf contains data accumulated so far */
-: 389:
-: 390: /* This algorithm is used only for predefined ops
-: 391: and predefined ops are always commutative. */
-: 392:
#####: 393: (*uop)((char *) tmp_buf + disps[recv_idx]*extent,
-: 394: (char *) recvbuf + disps[recv_idx]*extent,
-: 395: &recv_cnt, &datatype);
-: 396:
-: 397: /* update send_idx for next iteration */
#####: 398: send_idx = recv_idx;
#####: 399: mask <<= 1;
-: 400:
-: 401: /* update last_idx, but not in last iteration
-: 402: because the value is needed in the allgather
-: 403: step below. */
#####: 404: if (mask < pof2)
#####: 405: last_idx = recv_idx + pof2/mask;
-: 406: }
-: 407:
-: 408: /* now do the allgather */
-: 409:
|
1214: 410: mask >>= 1;
2428: 411: while (mask > 0) {
|
#####: 412: newdst = newrank ^ mask;
-: 413: /* find real rank of dest */
#####: 414: dst = (newdst < rem) ? newdst*2 + 1 : newdst + rem;
-: 415:
#####: 416: send_cnt = recv_cnt = 0;
#####: 417: if (newrank < newdst) {
-: 418: /* update last_idx except on first iteration */
#####: 419: if (mask != pof2/2)
#####: 420: last_idx = last_idx + pof2/(mask*2);
-: 421:
#####: 422: recv_idx = send_idx + pof2/(mask*2);
#####: 423: for (i=send_idx; i<recv_idx; i++)
#####: 424: send_cnt += cnts[i];
#####: 425: for (i=recv_idx; i<last_idx; i++)
#####: 426: recv_cnt += cnts[i];
-: 427: }
-: 428: else {
#####: 429: recv_idx = send_idx - pof2/(mask*2);
#####: 430: for (i=send_idx; i<last_idx; i++)
#####: 431: send_cnt += cnts[i];
#####: 432: for (i=recv_idx; i<send_idx; i++)
#####: 433: recv_cnt += cnts[i];
-: 434: }
-: 435:
#####: 436: mpi_errno = MPIC_Sendrecv((char *) recvbuf +
-: 437: disps[send_idx]*extent,
-: 438: send_cnt, datatype,
-: 439: dst, MPIR_ALLREDUCE_TAG,
-: 440: (char *) recvbuf +
-: 441: disps[recv_idx]*extent,
-: 442: recv_cnt, datatype, dst,
-: 443: MPIR_ALLREDUCE_TAG, comm,
-: 444: MPI_STATUS_IGNORE);
#####: 445: MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail");
-: 446:
#####: 447: if (newrank > newdst) send_idx = recv_idx;
-: 448:
#####: 449: mask >>= 1;
-: 450: }
-: 451: }
-: 452: }
-: 453:
-: 454: /* In the non-power-of-two case, all odd-numbered
-: 455: processes of rank < 2*rem send the result to
-: 456: (rank-1), the ranks who didn't participate above. */
|
479444: 457: if (rank < 2*rem) {
486: 458: if (rank % 2) /* odd */
243: 459: mpi_errno = MPIC_Send(recvbuf, count,
-: 460: datatype, rank-1,
-: 461: MPIR_ALLREDUCE_TAG, comm);
-: 462: else /* even */
243: 463: mpi_errno = MPIC_Recv(recvbuf, count,
-: 464: datatype, rank+1,
-: 465: MPIR_ALLREDUCE_TAG, comm,
-: 466: MPI_STATUS_IGNORE);
486: 467: MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail");
-: 468: }
-: 469:
-: 470: /* check if multiple threads are calling this collective function */
-: 471: MPIDU_ERR_CHECK_MULTIPLE_THREADS_EXIT( comm_ptr );
-: 472:
479444: 473: if (MPIU_THREADPRIV_FIELD(op_errno))
|
#####: 474: mpi_errno = MPIU_THREADPRIV_FIELD(op_errno);
-: 475: }
-: 476:
-: 477: fn_exit:
|
481872: 478: MPIU_CHKLMEM_FREEALL();
479444: 479: MPIR_Nest_decr();
479444: 480: return (mpi_errno);
-: 481:
|
-: 482: fn_fail:
-: 483: goto fn_exit;
-: 484:}
-: 485:
-: 486:
-: 487:/* not declared static because a machine-specific function may call this one
-: 488: in some cases */
-: 489:#undef FCNAME
-: 490:#define FCNAME "MPIR_Allreduce_inter"
-: 491:int MPIR_Allreduce_inter (
-: 492: void *sendbuf,
-: 493: void *recvbuf,
-: 494: int count,
-: 495: MPI_Datatype datatype,
-: 496: MPI_Op op,
-: 497: MPID_Comm *comm_ptr )
|
1440: 498:{
-: 499:/* Intercommunicator Allreduce.
-: 500: We first do an intercommunicator reduce to rank 0 on left group,
-: 501: then an intercommunicator reduce to rank 0 on right group, followed
-: 502: by local intracommunicator broadcasts in each group.
-: 503:
-: 504: We don't do local reduces first and then intercommunicator
-: 505: broadcasts because it would require allocation of a temporary buffer.
-: 506:*/
-: 507: int rank, mpi_errno, root;
1440: 508: MPID_Comm *newcomm_ptr = NULL;
1440: 509: MPIU_THREADPRIV_DECL;
-: 510:
1440: 511: MPIU_THREADPRIV_GET;
-: 512:
1440: 513: MPIR_Nest_incr();
-: 514:
1440: 515: rank = comm_ptr->rank;
-: 516:
-: 517: /* first do a reduce from right group to rank 0 in left group,
-: 518: then from left group to rank 0 in right group*/
1440: 519: if (comm_ptr->is_low_group) {
-: 520: /* reduce from right group to rank 0*/
496: 521: root = (rank == 0) ? MPI_ROOT : MPI_PROC_NULL;
496: 522: mpi_errno = MPIR_Reduce_inter(sendbuf, recvbuf, count, datatype, op,
-: 523: root, comm_ptr);
496: 524: MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail");
-: 525:
-: 526: /* reduce to rank 0 of right group */
496: 527: root = 0;
496: 528: mpi_errno = MPIR_Reduce_inter(sendbuf, recvbuf, count, datatype, op,
-: 529: root, comm_ptr);
496: 530: MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail");
-: 531: }
-: 532: else {
-: 533: /* reduce to rank 0 of left group */
944: 534: root = 0;
944: 535: mpi_errno = MPIR_Reduce_inter(sendbuf, recvbuf, count, datatype, op,
-: 536: root, comm_ptr);
944: 537: MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail");
-: 538:
-: 539: /* reduce from right group to rank 0 */
944: 540: root = (rank == 0) ? MPI_ROOT : MPI_PROC_NULL;
944: 541: mpi_errno = MPIR_Reduce_inter(sendbuf, recvbuf, count, datatype, op,
-: 542: root, comm_ptr);
944: 543: MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail");
-: 544: }
-: 545:
-: 546: /* Get the local intracommunicator */
1440: 547: if (!comm_ptr->local_comm)
|
#####: 548: MPIR_Setup_intercomm_localcomm( comm_ptr );
-: 549:
|
1440: 550: newcomm_ptr = comm_ptr->local_comm;
-: 551:
1440: 552: mpi_errno = MPIR_Bcast(recvbuf, count, datatype, 0, newcomm_ptr);
1440: 553: MPIU_ERR_CHKANDJUMP((mpi_errno), mpi_errno, MPI_ERR_OTHER, "**fail");
-: 554:
1440: 555: fn_exit:
1440: 556: MPIR_Nest_decr();
1440: 557: return mpi_errno;
-: 558:
|
-: 559: fn_fail:
-: 560: goto fn_exit;
-: 561:}
-: 562:
-: 563:#endif
-: 564:
-: 565:#undef FUNCNAME
-: 566:#define FUNCNAME MPI_Allreduce
-: 567:#undef FCNAME
-: 568:
-: 569:/*@
-: 570:MPI_Allreduce - Combines values from all processes and distributes the result
-: 571: back to all processes
-: 572:
-: 573:Input Parameters:
-: 574:+ sendbuf - starting address of send buffer (choice)
-: 575:. count - number of elements in send buffer (integer)
-: 576:. datatype - data type of elements of send buffer (handle)
-: 577:. op - operation (handle)
-: 578:- comm - communicator (handle)
-: 579:
-: 580:Output Parameter:
-: 581:. recvbuf - starting address of receive buffer (choice)
-: 582:
-: 583:.N ThreadSafe
-: 584:
-: 585:.N Fortran
-: 586:
-: 587:.N collops
-: 588:
-: 589:.N Errors
-: 590:.N MPI_ERR_BUFFER
-: 591:.N MPI_ERR_COUNT
-: 592:.N MPI_ERR_TYPE
-: 593:.N MPI_ERR_OP
-: 594:.N MPI_ERR_COMM
-: 595:@*/
-: 596:int MPI_Allreduce ( void *sendbuf, void *recvbuf, int count,
-: 597: MPI_Datatype datatype, MPI_Op op, MPI_Comm comm )
|
1203525: 598:{
-: 599: static const char FCNAME[] = "MPI_Allreduce";
1203525: 600: int mpi_errno = MPI_SUCCESS;
1203525: 601: MPID_Comm *comm_ptr = NULL;
1203525: 602: MPIU_THREADPRIV_DECL;
-: 603: MPID_MPI_STATE_DECL(MPID_STATE_MPI_ALLREDUCE);
-: 604:
1203525: 605: MPIR_ERRTEST_INITIALIZED_ORDIE();
-: 606:
1203525: 607: MPIU_THREAD_CS_ENTER(ALLFUNC,);
-: 608: MPID_MPI_COLL_FUNC_ENTER(MPID_STATE_MPI_ALLREDUCE);
-: 609:
-: 610: /* Validate parameters, especially handles needing to be converted */
|
-: 611:# ifdef HAVE_ERROR_CHECKING
-: 612: {
-: 613: MPID_BEGIN_ERROR_CHECKS;
-: 614: {
1203525: 615: MPIR_ERRTEST_COMM(comm, mpi_errno);
1203525: 616: if (mpi_errno != MPI_SUCCESS) goto fn_fail;
-: 617: }
-: 618: MPID_END_ERROR_CHECKS;
-: 619: }
-: 620:# endif /* HAVE_ERROR_CHECKING */
-: 621:
-: 622: /* Convert MPI object handles to object pointers */
|
1203523: 623: MPID_Comm_get_ptr( comm, comm_ptr );
-: 624:
-: 625: /* Validate parameters and objects (post conversion) */
|
-: 626:# ifdef HAVE_ERROR_CHECKING
-: 627: {
-: 628: MPID_BEGIN_ERROR_CHECKS;
-: 629: {
1203523: 630: MPID_Datatype *datatype_ptr = NULL;
1203523: 631: MPID_Op *op_ptr = NULL;
-: 632:
1203523: 633: MPID_Comm_valid_ptr( comm_ptr, mpi_errno );
1203523: 634: if (mpi_errno != MPI_SUCCESS) goto fn_fail;
1203521: 635: MPIR_ERRTEST_COUNT(count, mpi_errno);
1203521: 636: MPIR_ERRTEST_DATATYPE(datatype, "datatype", mpi_errno);
1203521: 637: MPIR_ERRTEST_OP(op, mpi_errno);
-: 638:
1203495: 639: if (HANDLE_GET_KIND(datatype) != HANDLE_KIND_BUILTIN) {
2132: 640: MPID_Datatype_get_ptr(datatype, datatype_ptr);
2132: 641: MPID_Datatype_valid_ptr( datatype_ptr, mpi_errno );
2132: 642: MPID_Datatype_committed_ptr( datatype_ptr, mpi_errno );
-: 643: }
-: 644:
1203495: 645: if (comm_ptr->comm_kind == MPID_INTERCOMM)
1440: 646: MPIR_ERRTEST_SENDBUF_INPLACE(sendbuf, count, mpi_errno);
-: 647:
1203495: 648: if (sendbuf != MPI_IN_PLACE)
90812: 649: MPIR_ERRTEST_USERBUFFER(sendbuf,count,datatype,mpi_errno);
-: 650:
1203495: 651: MPIR_ERRTEST_RECVBUF_INPLACE(recvbuf, count, mpi_errno);
1203495: 652: MPIR_ERRTEST_USERBUFFER(recvbuf,count,datatype,mpi_errno);
-: 653:
1203495: 654: if (mpi_errno != MPI_SUCCESS) goto fn_fail;
-: 655:
1203469: 656: if (HANDLE_GET_KIND(op) != HANDLE_KIND_BUILTIN) {
4306: 657: MPID_Op_get_ptr(op, op_ptr);
4306: 658: MPID_Op_valid_ptr( op_ptr, mpi_errno );
-: 659: }
1203469: 660: if (HANDLE_GET_KIND(op) == HANDLE_KIND_BUILTIN) {
1199163: 661: mpi_errno =
-: 662: ( * MPIR_Op_check_dtype_table[op%16 - 1] )(datatype);
-: 663: }
1203469: 664: if (count != 0) {
1203469: 665: MPIR_ERRTEST_ALIAS_COLL(sendbuf, recvbuf, mpi_errno);
-: 666: }
1203469: 667: if (mpi_errno != MPI_SUCCESS) goto fn_fail;
-: 668: }
-: 669: MPID_END_ERROR_CHECKS;
-: 670: }
-: 671:# endif /* HAVE_ERROR_CHECKING */
-: 672:
-: 673: /* ... body of routine ... */
-: 674:
|
1203433: 675: if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Allreduce != NULL)
-: 676: {
|
#####: 677: mpi_errno = comm_ptr->coll_fns->Allreduce(sendbuf, recvbuf, count,
-: 678: datatype, op, comm_ptr);
-: 679: }
-: 680: else
-: 681: {
|
1203433: 682: if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-: 683: /* intracommunicator */
-: 684:#if defined(USE_SMP_COLLECTIVES)
-: 685: MPID_Op *op_ptr;
-: 686: int is_commutative;
-: 687:
-: 688: /* is the op commutative? We do SMP optimizations only if it is. */
1201993: 689: if (HANDLE_GET_KIND(op) == HANDLE_KIND_BUILTIN)
1197687: 690: is_commutative = 1;
-: 691: else {
4306: 692: MPID_Op_get_ptr(op, op_ptr);
4306: 693: is_commutative = (op_ptr->kind == MPID_OP_USER_NONCOMMUTE) ? 0 : 1;
-: 694: }
-: 695:
1201993: 696: if (MPIR_Comm_is_node_aware(comm_ptr) && is_commutative) {
-: 697: /* on each node, do a reduce to the local root */
1186699: 698: if (comm_ptr->node_comm != NULL)
-: 699: {
-: 700: /* take care of the MPI_IN_PLACE case. For reduce,
-: 701: MPI_IN_PLACE is specified only on the root;
-: 702: for allreduce it is specified on all processes. */
-: 703:
1855905: 704: if ((sendbuf == MPI_IN_PLACE) && (comm_ptr->node_comm->rank != 0)) {
-: 705: /* IN_PLACE and not root of reduce. Data supplied to this
-: 706: allreduce is in recvbuf. Pass that as the sendbuf to reduce. */
-: 707:
669206: 708: mpi_errno = MPIR_Reduce_or_coll_fn(recvbuf, NULL, count, datatype,
-: 709: op, 0, comm_ptr->node_comm);
-: 710: }
-: 711: else {
517493: 712: mpi_errno = MPIR_Reduce_or_coll_fn(sendbuf, recvbuf, count, datatype,
-: 713: op, 0, comm_ptr->node_comm);
-: 714: }
|
1186699: 715: if (mpi_errno) goto fn_fail;
-: 716: }
-: 717: else {
-: 718: /* only one process on the node. copy sendbuf to recvbuf */
|
#####: 719: if (sendbuf != MPI_IN_PLACE) {
#####: 720: mpi_errno = MPIR_Localcopy(sendbuf, count, datatype,
-: 721: recvbuf, count, datatype);
|
#####: 722: if (mpi_errno) goto fn_fail;
-: 723: }
-: 724: }
-: 725:
-: 726: /* now do an IN_PLACE allreduce among the local roots of all nodes */
|
1186699: 727: if (comm_ptr->node_roots_comm != NULL) {
464150: 728: mpi_errno = MPIR_Allreduce(MPI_IN_PLACE, recvbuf, count, datatype,
-: 729: op, comm_ptr->node_roots_comm);
|
464150: 730: if (mpi_errno) goto fn_fail;
-: 731: }
-: 732:
-: 733: /* now broadcast the result among local processes */
|
1186699: 734: if (comm_ptr->node_comm != NULL) {
1186699: 735: MPIU_THREADPRIV_GET;
-: 736:
1186699: 737: MPIR_Nest_incr();
1186699: 738: mpi_errno = MPIR_Bcast_or_coll_fn(recvbuf, count, datatype,
-: 739: 0, comm_ptr->node_comm);
1186699: 740: MPIR_Nest_decr();
-: 741: }
-: 742: }
-: 743: else {
15294: 744: mpi_errno = MPIR_Allreduce(sendbuf, recvbuf, count, datatype,
-: 745: op, comm_ptr);
-: 746: }
-: 747:#else
-: 748: mpi_errno = MPIR_Allreduce(sendbuf, recvbuf, count, datatype,
-: 749: op, comm_ptr);
-: 750:#endif
-: 751: }
-: 752: else {
-: 753: /* intercommunicator */
1440: 754: mpi_errno = MPIR_Allreduce_inter(sendbuf, recvbuf, count,
-: 755: datatype, op, comm_ptr);
-: 756: }
-: 757: }
-: 758:
|
1203433: 759: if (mpi_errno != MPI_SUCCESS) goto fn_fail;
-: 760:
-: 761: /* ... end of body of routine ... */
-: 762:
|
1203525: 763: fn_exit:
|
-: 764: MPID_MPI_COLL_FUNC_EXIT(MPID_STATE_MPI_ALLREDUCE);
|
1203525: 765: MPIU_THREAD_CS_EXIT(ALLFUNC,);
1203525: 766: return mpi_errno;
-: 767:
|
93: 768: fn_fail:
-: 769: /* --BEGIN ERROR HANDLING-- */
-: 770:# ifdef HAVE_ERROR_CHECKING
-: 771: {
93: 772: mpi_errno = MPIR_Err_create_code(
-: 773: mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**mpi_allreduce",
-: 774: "**mpi_allreduce %p %p %d %D %O %C", sendbuf, recvbuf, count, datatype, op, comm);
-: 775: }
-: 776:# endif
93: 777: mpi_errno = MPIR_Err_return_comm( comm_ptr, FCNAME, mpi_errno );
93: 778: goto fn_exit;
-: 779: /* --END ERROR HANDLING-- */
-: 780:}
|