-: 0:Source:/home/MPI/testing/mpich2/mpich2/src/mpi/coll/red_scat.c
-: 0:Graph:red_scat.gcno
-: 0:Data:red_scat.gcda
-: 0:Runs:4382
-: 0:Programs:1376
-: 1:/* -*- Mode: C; c-basic-offset:4 ; -*- */
-: 2:/*
-: 3: *
-: 4: * (C) 2001 by Argonne National Laboratory.
-: 5: * See COPYRIGHT in top-level directory.
-: 6: */
-: 7:
-: 8:#include "mpiimpl.h"
-: 9:
-: 10:/* -- Begin Profiling Symbol Block for routine MPI_Reduce_scatter */
-: 11:#if defined(HAVE_PRAGMA_WEAK)
-: 12:#pragma weak MPI_Reduce_scatter = PMPI_Reduce_scatter
-: 13:#elif defined(HAVE_PRAGMA_HP_SEC_DEF)
-: 14:#pragma _HP_SECONDARY_DEF PMPI_Reduce_scatter MPI_Reduce_scatter
-: 15:#elif defined(HAVE_PRAGMA_CRI_DUP)
-: 16:#pragma _CRI duplicate MPI_Reduce_scatter as PMPI_Reduce_scatter
-: 17:#endif
-: 18:/* -- End Profiling Symbol Block */
-: 19:
-: 20:/* Define MPICH_MPI_FROM_PMPI if weak symbols are not supported to build
-: 21: the MPI routines */
-: 22:#ifndef MPICH_MPI_FROM_PMPI
-: 23:#undef MPI_Reduce_scatter
-: 24:#define MPI_Reduce_scatter PMPI_Reduce_scatter
-: 25:
-: 26:
-: 27:/* Implements the "mirror permutation" of "bits" bits of an integer "x".
-: 28:
-: 29: positions 76543210, bits==3 yields 76543012.
-: 30:
-: 31: This function could/should be moved to a common utility location for use in
-: 32: other collectives as well. */
-: 33:ATTRIBUTE((const)) /* tells the compiler that this func only depends on its args
-: 34: and may be optimized much more aggressively, similar to "pure" */
-: 35:static inline int mirror_permutation(unsigned int x, int bits)
-: 36:{
-: 37: /* a mask for the high order bits that should be copied as-is */
3360: 38: int high_mask = ~((0x1 << bits) - 1);
3360: 39: int retval = x & high_mask;
-: 40: int i;
-: 41:
6360: 42: for (i = 0; i < bits; ++i) {
3000: 43: unsigned int bitval = (x & (0x1 << i)) >> i; /* 0x1 or 0x0 */
3000: 44: retval |= bitval << ((bits - i) - 1);
-: 45: }
-: 46:
3360: 47: return retval;
-: 48:}
-: 49:
-: 50:/* FIXME should we be checking the op_errno here? */
-: 51:#ifdef HAVE_CXX_BINDING
-: 52:/* NOTE: assumes 'uop' is the operator function pointer and
-: 53: that 'is_cxx_uop' is is a boolean indicating the obvious */
-: 54:#define call_uop(in_, inout_, count_, datatype_) \
-: 55:do { \
-: 56: if (is_cxx_uop) { \
-: 57: (*MPIR_Process.cxx_call_op_fn)((in_), (inout_), (count_), (datatype_), uop); \
-: 58: } \
-: 59: else { \
-: 60: (*uop)((in_), (inout_), &(count_), &(datatype_)); \
-: 61: } \
-: 62:} while (0)
-: 63:
-: 64:#else
-: 65:#define call_uop(in_, inout_, count_, datatype_) \
-: 66: (*uop)((in_), (inout_), &(count_), &(datatype_))
-: 67:#endif
-: 68:
-: 69:/* Implements the reduce-scatter butterfly algorithm described in J. L. Traff's
-: 70: * "An Improved Algorithm for (Non-commutative) Reduce-Scatter with an Application"
-: 71: * from EuroPVM/MPI 2005. This function currently only implements support for
-: 72: * the power-of-2, block-regular case (all receive counts are equal). */
-: 73:#undef FUNCNAME
-: 74:#define FUNCNAME MPIR_Reduce_scatter_noncomm
-: 75:#undef FCNAME
-: 76:#define FCNAME MPIU_QUOTE(FUNCNAME)
-: 77:static int MPIR_Reduce_scatter_noncomm (
-: 78: void *sendbuf,
-: 79: void *recvbuf,
-: 80: int *recvcnts,
-: 81: MPI_Datatype datatype,
-: 82: MPI_Op op,
-: 83: MPID_Comm *comm_ptr )
1956: 84:{
1956: 85: int mpi_errno = MPI_SUCCESS;
1956: 86: int comm_size = comm_ptr->local_size;
1956: 87: int rank = comm_ptr->rank;
-: 88: int pof2;
-: 89: int log2_comm_size;
-: 90: int i, k;
-: 91: int recv_offset, send_offset;
-: 92: int block_size, total_count, size;
-: 93: MPI_Aint extent, true_extent, true_lb;
-: 94: int is_commutative;
-: 95: int buf0_was_inout;
-: 96: void *tmp_buf0;
-: 97: void *tmp_buf1;
-: 98: void *result_ptr;
1956: 99: MPI_Comm comm = comm_ptr->handle;
-: 100: MPI_User_function *uop;
-: 101: MPID_Op *op_ptr;
-: 102:#ifdef HAVE_CXX_BINDING
1956: 103: int is_cxx_uop = 0;
-: 104:#endif
1956: 105: MPIU_CHKLMEM_DECL(3);
-: 106:
1956: 107: MPID_Datatype_get_extent_macro(datatype, extent);
-: 108: /* assumes nesting is handled by the caller right now, may not be true in the future */
1956: 109: mpi_errno = NMPI_Type_get_true_extent(datatype, &true_lb, &true_extent);
-: 110:
1956: 111: if (HANDLE_GET_KIND(op) == HANDLE_KIND_BUILTIN) {
|
#####: 112: is_commutative = 1;
-: 113: /* get the function by indexing into the op table */
#####: 114: uop = MPIR_Op_table[op%16 - 1];
-: 115: }
-: 116: else {
|
1956: 117: MPID_Op_get_ptr(op, op_ptr);
1956: 118: if (op_ptr->kind == MPID_OP_USER_NONCOMMUTE)
1956: 119: is_commutative = 0;
-: 120: else
|
#####: 121: is_commutative = 1;
-: 122:
-: 123:#ifdef HAVE_CXX_BINDING
|
1956: 124: if (op_ptr->language == MPID_LANG_CXX) {
|
#####: 125: uop = (MPI_User_function *) op_ptr->function.c_function;
#####: 126: is_cxx_uop = 1;
-: 127: }
-: 128: else
-: 129:#endif
|
1956: 130: if ((op_ptr->language == MPID_LANG_C))
1956: 131: uop = (MPI_User_function *) op_ptr->function.c_function;
-: 132: else
|
#####: 133: uop = (MPI_User_function *) op_ptr->function.f77_function;
-: 134: }
-: 135:
|
1956: 136: pof2 = 1;
1956: 137: log2_comm_size = 0;
5220: 138: while (pof2 < comm_size) {
1308: 139: pof2 <<= 1;
1308: 140: ++log2_comm_size;
-: 141: }
-: 142:
-: 143: /* begin error checking */
1956: 144: MPIU_Assert(pof2 == comm_size); /* FIXME this version only works for power of 2 procs */
-: 145:
3360: 146: for (i = 0; i < (comm_size - 1); ++i) {
1404: 147: MPIU_Assert(recvcnts[i] == recvcnts[i+1]);
-: 148: }
-: 149: /* end error checking */
-: 150:
-: 151: /* size of a block (count of datatype per block, NOT bytes per block) */
1956: 152: block_size = recvcnts[0];
1956: 153: total_count = block_size * comm_size;
-: 154:
1956: 155: MPIU_CHKLMEM_MALLOC(tmp_buf0, void *, true_extent * total_count, mpi_errno, "tmp_buf0");
1956: 156: MPIU_CHKLMEM_MALLOC(tmp_buf1, void *, true_extent * total_count, mpi_errno, "tmp_buf1");
-: 157: /* adjust for potential negative lower bound in datatype */
1956: 158: tmp_buf0 = (void *)((char*)tmp_buf0 - true_lb);
1956: 159: tmp_buf1 = (void *)((char*)tmp_buf1 - true_lb);
-: 160:
-: 161: /* Copy our send data to tmp_buf0. We do this one block at a time and
-: 162: permute the blocks as we go according to the mirror permutation. */
5316: 163: for (i = 0; i < comm_size; ++i) {
6720: 164: mpi_errno = MPIR_Localcopy((char *)(sendbuf == MPI_IN_PLACE ? recvbuf : sendbuf) + (i * true_extent * block_size), block_size, datatype,
-: 165: (char *)tmp_buf0 + (mirror_permutation(i, log2_comm_size) * true_extent * block_size), block_size, datatype);
|
3360: 166: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 167: }
|
1956: 168: buf0_was_inout = 1;
-: 169:
1956: 170: send_offset = 0;
1956: 171: recv_offset = 0;
1956: 172: size = total_count;
3264: 173: for (k = 0; k < log2_comm_size; ++k) {
-: 174: /* use a double-buffering scheme to avoid local copies */
1308: 175: char *incoming_data = (buf0_was_inout ? tmp_buf1 : tmp_buf0);
1308: 176: char *outgoing_data = (buf0_was_inout ? tmp_buf0 : tmp_buf1);
1308: 177: int peer = rank ^ (0x1 << k);
1308: 178: size /= 2;
-: 179:
1308: 180: if (rank > peer) {
-: 181: /* we have the higher rank: send top half, recv bottom half */
654: 182: recv_offset += size;
-: 183: }
-: 184: else {
-: 185: /* we have the lower rank: recv top half, send bottom half */
654: 186: send_offset += size;
-: 187: }
-: 188:
1308: 189: mpi_errno = MPIC_Sendrecv(outgoing_data + send_offset*true_extent,
-: 190: size, datatype, peer, MPIR_REDUCE_SCATTER_TAG,
-: 191: incoming_data + recv_offset*true_extent,
-: 192: size, datatype, peer, MPIR_REDUCE_SCATTER_TAG,
-: 193: comm, MPI_STATUS_IGNORE);
-: 194: /* always perform the reduction at recv_offset, the data at send_offset
-: 195: is now our peer's responsibility */
1308: 196: if (rank > peer) {
-: 197: /* higher ranked value so need to call op(received_data, my_data) */
654: 198: call_uop(incoming_data + recv_offset*true_extent,
-: 199: outgoing_data + recv_offset*true_extent,
-: 200: size, datatype);
654: 201: buf0_was_inout = buf0_was_inout;
-: 202: }
-: 203: else {
-: 204: /* lower ranked value so need to call op(my_data, received_data) */
654: 205: call_uop(outgoing_data + recv_offset*true_extent,
-: 206: incoming_data + recv_offset*true_extent,
-: 207: size, datatype);
654: 208: buf0_was_inout = !buf0_was_inout;
-: 209: }
-: 210:
-: 211: /* the next round of send/recv needs to happen within the block (of size
-: 212: "size") that we just received and reduced */
1308: 213: send_offset = recv_offset;
-: 214: }
-: 215:
1956: 216: MPIU_Assert(size == recvcnts[rank]);
-: 217:
-: 218: /* copy the reduced data to the recvbuf */
1956: 219: result_ptr = (char *)(buf0_was_inout ? tmp_buf0 : tmp_buf1) + recv_offset * true_extent;
1956: 220: mpi_errno = MPIR_Localcopy(result_ptr, size, datatype,
-: 221: recvbuf, size, datatype);
-: 222:fn_exit:
3912: 223: MPIU_CHKLMEM_FREEALL();
1956: 224: return mpi_errno;
|
-: 225:fn_fail:
-: 226: goto fn_exit;
-: 227:}
-: 228:
-: 229:/* This is the default implementation of reduce_scatter. The algorithm is:
-: 230:
-: 231: Algorithm: MPI_Reduce_scatter
-: 232:
-: 233: If the operation is commutative, for short and medium-size
-: 234: messages, we use a recursive-halving
-: 235: algorithm in which the first p/2 processes send the second n/2 data
-: 236: to their counterparts in the other half and receive the first n/2
-: 237: data from them. This procedure continues recursively, halving the
-: 238: data communicated at each step, for a total of lgp steps. If the
-: 239: number of processes is not a power-of-two, we convert it to the
-: 240: nearest lower power-of-two by having the first few even-numbered
-: 241: processes send their data to the neighboring odd-numbered process
-: 242: at (rank+1). Those odd-numbered processes compute the result for
-: 243: their left neighbor as well in the recursive halving algorithm, and
-: 244: then at the end send the result back to the processes that didn't
-: 245: participate.
-: 246: Therefore, if p is a power-of-two,
-: 247: Cost = lgp.alpha + n.((p-1)/p).beta + n.((p-1)/p).gamma
-: 248: If p is not a power-of-two,
-: 249: Cost = (floor(lgp)+2).alpha + n.(1+(p-1+n)/p).beta + n.(1+(p-1)/p).gamma
-: 250: The above cost in the non power-of-two case is approximate because
-: 251: there is some imbalance in the amount of work each process does
-: 252: because some processes do the work of their neighbors as well.
-: 253:
-: 254: For commutative operations and very long messages we use
-: 255: we use a pairwise exchange algorithm similar to
-: 256: the one used in MPI_Alltoall. At step i, each process sends n/p
-: 257: amount of data to (rank+i) and receives n/p amount of data from
-: 258: (rank-i).
-: 259: Cost = (p-1).alpha + n.((p-1)/p).beta + n.((p-1)/p).gamma
-: 260:
-: 261:
-: 262: If the operation is not commutative, we do the following:
-: 263:
-: 264: We use a recursive doubling algorithm, which
-: 265: takes lgp steps. At step 1, processes exchange (n-n/p) amount of
-: 266: data; at step 2, (n-2n/p) amount of data; at step 3, (n-4n/p)
-: 267: amount of data, and so forth.
-: 268:
-: 269: Cost = lgp.alpha + n.(lgp-(p-1)/p).beta + n.(lgp-(p-1)/p).gamma
-: 270:
-: 271: Possible improvements:
-: 272:
-: 273: End Algorithm: MPI_Reduce_scatter
-: 274:*/
-: 275:
-: 276:#undef FUNCNAME
-: 277:#define FUNCNAME MPIR_Reduce_scatter
-: 278:#undef FCNAME
-: 279:#define FCNAME MPIU_QUOTE(FUNCNAME)
-: 280:/* begin:nested */
-: 281:/* not declared static because a machine-specific function may call this one in some cases */
-: 282:int MPIR_Reduce_scatter (
-: 283: void *sendbuf,
-: 284: void *recvbuf,
-: 285: int *recvcnts,
-: 286: MPI_Datatype datatype,
-: 287: MPI_Op op,
-: 288: MPID_Comm *comm_ptr )
|
51515: 289:{
-: 290: int rank, comm_size, i;
-: 291: MPI_Aint extent, true_extent, true_lb;
-: 292: int *disps;
-: 293: void *tmp_recvbuf, *tmp_results;
51515: 294: int mpi_errno = MPI_SUCCESS;
-: 295: int type_size, dis[2], blklens[2], total_count, nbytes, src, dst;
-: 296: int mask, dst_tree_root, my_tree_root, j, k;
-: 297: int *newcnts, *newdisps, rem, newdst, send_idx, recv_idx,
-: 298: last_idx, send_cnt, recv_cnt;
-: 299: int pof2, old_i, newrank, received;
-: 300: MPI_Datatype sendtype, recvtype;
-: 301: int nprocs_completed, tmp_mask, tree_root, is_commutative;
-: 302: MPI_User_function *uop;
-: 303: MPID_Op *op_ptr;
-: 304: MPI_Comm comm;
51515: 305: MPIU_THREADPRIV_DECL;
-: 306:#ifdef HAVE_CXX_BINDING
51515: 307: int is_cxx_uop = 0;
-: 308:#endif
51515: 309: MPIU_CHKLMEM_DECL(5);
-: 310:
51515: 311: comm = comm_ptr->handle;
51515: 312: comm_size = comm_ptr->local_size;
51515: 313: rank = comm_ptr->rank;
-: 314:
-: 315: /* set op_errno to 0. stored in perthread structure */
51515: 316: MPIU_THREADPRIV_GET;
51515: 317: MPIU_THREADPRIV_FIELD(op_errno) = 0;
-: 318:
51515: 319: MPIR_Nest_incr();
-: 320:
51515: 321: MPID_Datatype_get_extent_macro(datatype, extent);
51515: 322: mpi_errno = NMPI_Type_get_true_extent(datatype, &true_lb,
-: 323: &true_extent);
|
51515: 324: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 325:
|
51515: 326: if (HANDLE_GET_KIND(op) == HANDLE_KIND_BUILTIN) {
47339: 327: is_commutative = 1;
-: 328: /* get the function by indexing into the op table */
47339: 329: uop = MPIR_Op_table[op%16 - 1];
-: 330: }
-: 331: else {
4176: 332: MPID_Op_get_ptr(op, op_ptr);
4176: 333: if (op_ptr->kind == MPID_OP_USER_NONCOMMUTE)
2316: 334: is_commutative = 0;
-: 335: else
1860: 336: is_commutative = 1;
-: 337:
-: 338:#ifdef HAVE_CXX_BINDING
4176: 339: if (op_ptr->language == MPID_LANG_CXX) {
|
#####: 340: uop = (MPI_User_function *) op_ptr->function.c_function;
#####: 341: is_cxx_uop = 1;
-: 342: }
-: 343: else
-: 344:#endif
|
4176: 345: if ((op_ptr->language == MPID_LANG_C))
4176: 346: uop = (MPI_User_function *) op_ptr->function.c_function;
-: 347: else
|
#####: 348: uop = (MPI_User_function *) op_ptr->function.f77_function;
-: 349: }
-: 350:
|
51515: 351: MPIU_CHKLMEM_MALLOC(disps, int *, comm_size * sizeof(int), mpi_errno, "disps");
-: 352:
51515: 353: total_count = 0;
194226: 354: for (i=0; i<comm_size; i++) {
142711: 355: disps[i] = total_count;
142711: 356: total_count += recvcnts[i];
-: 357: }
-: 358:
51515: 359: if (total_count == 0) {
|
#####: 360: goto fn_exit;
-: 361: }
-: 362:
|
51515: 363: MPID_Datatype_get_size_macro(datatype, type_size);
51515: 364: nbytes = total_count * type_size;
-: 365:
-: 366: /* check if multiple threads are calling this collective function */
-: 367: MPIDU_ERR_CHECK_MULTIPLE_THREADS_ENTER( comm_ptr );
-: 368:
-: 369: /* total_count*extent eventually gets malloced. it isn't added to
-: 370: * a user-passed in buffer */
-: 371: MPID_Ensure_Aint_fits_in_pointer(total_count * MPIR_MAX(true_extent, extent));
-: 372:
51515: 373: if ((is_commutative) && (nbytes < MPIR_REDSCAT_COMMUTATIVE_LONG_MSG)) {
-: 374: /* commutative and short. use recursive halving algorithm */
-: 375:
-: 376: /* allocate temp. buffer to receive incoming data */
49199: 377: MPIU_CHKLMEM_MALLOC(tmp_recvbuf, void *, total_count*(MPIR_MAX(true_extent,extent)), mpi_errno, "tmp_recvbuf");
-: 378: /* adjust for potential negative lower bound in datatype */
49199: 379: tmp_recvbuf = (void *)((char*)tmp_recvbuf - true_lb);
-: 380:
-: 381: /* need to allocate another temporary buffer to accumulate
-: 382: results because recvbuf may not be big enough */
49199: 383: MPIU_CHKLMEM_MALLOC(tmp_results, void *, total_count*(MPIR_MAX(true_extent,extent)), mpi_errno, "tmp_results");
-: 384: /* adjust for potential negative lower bound in datatype */
49199: 385: tmp_results = (void *)((char*)tmp_results - true_lb);
-: 386:
-: 387: /* copy sendbuf into tmp_results */
49199: 388: if (sendbuf != MPI_IN_PLACE)
32376: 389: mpi_errno = MPIR_Localcopy(sendbuf, total_count, datatype,
-: 390: tmp_results, total_count, datatype);
-: 391: else
16823: 392: mpi_errno = MPIR_Localcopy(recvbuf, total_count, datatype,
-: 393: tmp_results, total_count, datatype);
-: 394:
|
49199: 395: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 396:
|
49199: 397: pof2 = 1;
49199: 398: while (pof2 <= comm_size) pof2 <<= 1;
49199: 399: pof2 >>=1;
-: 400:
49199: 401: rem = comm_size - pof2;
-: 402:
-: 403: /* In the non-power-of-two case, all even-numbered
-: 404: processes of rank < 2*rem send their data to
-: 405: (rank+1). These even-numbered processes no longer
-: 406: participate in the algorithm until the very end. The
-: 407: remaining processes form a nice power-of-two. */
-: 408:
49199: 409: if (rank < 2*rem) {
3300: 410: if (rank % 2 == 0) { /* even */
1650: 411: mpi_errno = MPIC_Send(tmp_results, total_count,
-: 412: datatype, rank+1,
-: 413: MPIR_REDUCE_SCATTER_TAG, comm);
|
-: 414: /* --BEGIN ERROR HANDLING-- */
1650: 415: if (mpi_errno)
-: 416: {
#####: 417: mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
#####: 418: return mpi_errno;
-: 419: }
-: 420: /* --END ERROR HANDLING-- */
-: 421:
-: 422: /* temporarily set the rank to -1 so that this
-: 423: process does not pariticipate in recursive
-: 424: doubling */
|
1650: 425: newrank = -1;
-: 426: }
-: 427: else { /* odd */
1650: 428: mpi_errno = MPIC_Recv(tmp_recvbuf, total_count,
-: 429: datatype, rank-1,
-: 430: MPIR_REDUCE_SCATTER_TAG, comm,
-: 431: MPI_STATUS_IGNORE);
|
-: 432: /* --BEGIN ERROR HANDLING-- */
1650: 433: if (mpi_errno)
-: 434: {
#####: 435: mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
#####: 436: return mpi_errno;
-: 437: }
-: 438: /* --END ERROR HANDLING-- */
-: 439:
-: 440: /* do the reduction on received data. since the
-: 441: ordering is right, it doesn't matter whether
-: 442: the operation is commutative or not. */
-: 443:#ifdef HAVE_CXX_BINDING
|
1650: 444: if (is_cxx_uop) {
|
#####: 445: (*MPIR_Process.cxx_call_op_fn)( tmp_recvbuf, tmp_results,
-: 446: total_count,
-: 447: datatype,
-: 448: uop );
-: 449: }
-: 450: else
-: 451:#endif
|
1650: 452: (*uop)(tmp_recvbuf, tmp_results, &total_count, &datatype);
-: 453:
-: 454: /* change the rank */
1650: 455: newrank = rank / 2;
-: 456: }
-: 457: }
-: 458: else /* rank >= 2*rem */
45899: 459: newrank = rank - rem;
-: 460:
49199: 461: if (newrank != -1) {
-: 462: /* recalculate the recvcnts and disps arrays because the
-: 463: even-numbered processes who no longer participate will
-: 464: have their result calculated by the process to their
-: 465: right (rank+1). */
-: 466:
47549: 467: MPIU_CHKLMEM_MALLOC(newcnts, int *, pof2*sizeof(int), mpi_errno, "newcnts");
47549: 468: MPIU_CHKLMEM_MALLOC(newdisps, int *, pof2*sizeof(int), mpi_errno, "newdisps");
-: 469:
175516: 470: for (i=0; i<pof2; i++) {
-: 471: /* what does i map to in the old ranking? */
127967: 472: old_i = (i < rem) ? i*2 + 1 : i + rem;
127967: 473: if (old_i < 2*rem) {
-: 474: /* This process has to also do its left neighbor's
-: 475: work */
3350: 476: newcnts[i] = recvcnts[old_i] + recvcnts[old_i-1];
-: 477: }
-: 478: else
124617: 479: newcnts[i] = recvcnts[old_i];
-: 480: }
-: 481:
47549: 482: newdisps[0] = 0;
127967: 483: for (i=1; i<pof2; i++)
80418: 484: newdisps[i] = newdisps[i-1] + newcnts[i-1];
-: 485:
47549: 486: mask = pof2 >> 1;
47549: 487: send_idx = recv_idx = 0;
47549: 488: last_idx = pof2;
155856: 489: while (mask > 0) {
60758: 490: newdst = newrank ^ mask;
-: 491: /* find real rank of dest */
60758: 492: dst = (newdst < rem) ? newdst*2 + 1 : newdst + rem;
-: 493:
60758: 494: send_cnt = recv_cnt = 0;
60758: 495: if (newrank < newdst) {
30379: 496: send_idx = recv_idx + mask;
70588: 497: for (i=send_idx; i<last_idx; i++)
40209: 498: send_cnt += newcnts[i];
70588: 499: for (i=recv_idx; i<send_idx; i++)
40209: 500: recv_cnt += newcnts[i];
-: 501: }
-: 502: else {
30379: 503: recv_idx = send_idx + mask;
70588: 504: for (i=send_idx; i<recv_idx; i++)
40209: 505: send_cnt += newcnts[i];
70588: 506: for (i=recv_idx; i<last_idx; i++)
40209: 507: recv_cnt += newcnts[i];
-: 508: }
-: 509:
-: 510:/* printf("Rank %d, send_idx %d, recv_idx %d, send_cnt %d, recv_cnt %d, last_idx %d\n", newrank, send_idx, recv_idx,
-: 511: send_cnt, recv_cnt, last_idx);
-: 512:*/
-: 513: /* Send data from tmp_results. Recv into tmp_recvbuf */
60758: 514: if ((send_cnt != 0) && (recv_cnt != 0))
60758: 515: mpi_errno = MPIC_Sendrecv((char *) tmp_results +
-: 516: newdisps[send_idx]*extent,
-: 517: send_cnt, datatype,
-: 518: dst, MPIR_REDUCE_SCATTER_TAG,
-: 519: (char *) tmp_recvbuf +
-: 520: newdisps[recv_idx]*extent,
-: 521: recv_cnt, datatype, dst,
-: 522: MPIR_REDUCE_SCATTER_TAG, comm,
-: 523: MPI_STATUS_IGNORE);
|
#####: 524: else if ((send_cnt == 0) && (recv_cnt != 0))
#####: 525: mpi_errno = MPIC_Recv((char *) tmp_recvbuf +
-: 526: newdisps[recv_idx]*extent,
-: 527: recv_cnt, datatype, dst,
-: 528: MPIR_REDUCE_SCATTER_TAG, comm,
-: 529: MPI_STATUS_IGNORE);
#####: 530: else if ((recv_cnt == 0) && (send_cnt != 0))
#####: 531: mpi_errno = MPIC_Send((char *) tmp_results +
-: 532: newdisps[send_idx]*extent,
-: 533: send_cnt, datatype,
-: 534: dst, MPIR_REDUCE_SCATTER_TAG,
-: 535: comm);
-: 536:
|
60758: 537: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 538:
-: 539: /* tmp_recvbuf contains data received in this step.
-: 540: tmp_results contains data accumulated so far */
-: 541:
|
60758: 542: if (recv_cnt) {
-: 543:#ifdef HAVE_CXX_BINDING
60758: 544: if (is_cxx_uop) {
|
#####: 545: (*MPIR_Process.cxx_call_op_fn)((char *) tmp_recvbuf +
-: 546: newdisps[recv_idx]*extent,
-: 547: (char *) tmp_results +
-: 548: newdisps[recv_idx]*extent,
-: 549: recv_cnt, datatype, uop);
-: 550: }
-: 551: else
-: 552:#endif
|
60758: 553: (*uop)((char *) tmp_recvbuf + newdisps[recv_idx]*extent,
-: 554: (char *) tmp_results + newdisps[recv_idx]*extent,
-: 555: &recv_cnt, &datatype);
-: 556: }
-: 557:
-: 558: /* update send_idx for next iteration */
60758: 559: send_idx = recv_idx;
60758: 560: last_idx = recv_idx + mask;
60758: 561: mask >>= 1;
-: 562: }
-: 563:
-: 564: /* copy this process's result from tmp_results to recvbuf */
47549: 565: if (recvcnts[rank]) {
47549: 566: mpi_errno = MPIR_Localcopy((char *)tmp_results +
-: 567: disps[rank]*extent,
-: 568: recvcnts[rank], datatype, recvbuf,
-: 569: recvcnts[rank], datatype);
|
47549: 570: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 571: }
-: 572:
-: 573: }
-: 574:
-: 575: /* In the non-power-of-two case, all odd-numbered
-: 576: processes of rank < 2*rem send to (rank-1) the result they
-: 577: calculated for that process */
|
49199: 578: if (rank < 2*rem) {
3300: 579: if (rank % 2) { /* odd */
1650: 580: if (recvcnts[rank-1])
1650: 581: mpi_errno = MPIC_Send((char *) tmp_results +
-: 582: disps[rank-1]*extent, recvcnts[rank-1],
-: 583: datatype, rank-1,
-: 584: MPIR_REDUCE_SCATTER_TAG, comm);
-: 585: }
-: 586: else { /* even */
1650: 587: if (recvcnts[rank])
1650: 588: mpi_errno = MPIC_Recv(recvbuf, recvcnts[rank],
-: 589: datatype, rank+1,
-: 590: MPIR_REDUCE_SCATTER_TAG, comm,
-: 591: MPI_STATUS_IGNORE);
-: 592: }
|
3300: 593: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 594: }
-: 595: }
-: 596:
|
51515: 597: if (is_commutative && (nbytes >= MPIR_REDSCAT_COMMUTATIVE_LONG_MSG)) {
-: 598:
-: 599: /* commutative and long message, or noncommutative and long message.
-: 600: use (p-1) pairwise exchanges */
-: 601:
|
#####: 602: if (sendbuf != MPI_IN_PLACE) {
-: 603: /* copy local data into recvbuf */
#####: 604: mpi_errno = MPIR_Localcopy(((char *)sendbuf+disps[rank]*extent),
-: 605: recvcnts[rank], datatype, recvbuf,
-: 606: recvcnts[rank], datatype);
|
#####: 607: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 608: }
-: 609:
-: 610: /* allocate temporary buffer to store incoming data */
|
#####: 611: MPIU_CHKLMEM_MALLOC(tmp_recvbuf, void *, recvcnts[rank]*(MPIR_MAX(true_extent,extent))+1, mpi_errno, "tmp_recvbuf");
-: 612: /* adjust for potential negative lower bound in datatype */
#####: 613: tmp_recvbuf = (void *)((char*)tmp_recvbuf - true_lb);
-: 614:
#####: 615: for (i=1; i<comm_size; i++) {
#####: 616: src = (rank - i + comm_size) % comm_size;
#####: 617: dst = (rank + i) % comm_size;
-: 618:
-: 619: /* send the data that dst needs. recv data that this process
-: 620: needs from src into tmp_recvbuf */
#####: 621: if (sendbuf != MPI_IN_PLACE)
#####: 622: mpi_errno = MPIC_Sendrecv(((char *)sendbuf+disps[dst]*extent),
-: 623: recvcnts[dst], datatype, dst,
-: 624: MPIR_REDUCE_SCATTER_TAG, tmp_recvbuf,
-: 625: recvcnts[rank], datatype, src,
-: 626: MPIR_REDUCE_SCATTER_TAG, comm,
-: 627: MPI_STATUS_IGNORE);
-: 628: else
#####: 629: mpi_errno = MPIC_Sendrecv(((char *)recvbuf+disps[dst]*extent),
-: 630: recvcnts[dst], datatype, dst,
-: 631: MPIR_REDUCE_SCATTER_TAG, tmp_recvbuf,
-: 632: recvcnts[rank], datatype, src,
-: 633: MPIR_REDUCE_SCATTER_TAG, comm,
-: 634: MPI_STATUS_IGNORE);
-: 635:
|
#####: 636: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 637:
|
#####: 638: if (is_commutative || (src < rank)) {
#####: 639: if (sendbuf != MPI_IN_PLACE) {
-: 640:#ifdef HAVE_CXX_BINDING
#####: 641: if (is_cxx_uop) {
#####: 642: (*MPIR_Process.cxx_call_op_fn)(tmp_recvbuf,
-: 643: recvbuf,
-: 644: recvcnts[rank],
-: 645: datatype, uop );
-: 646: }
-: 647: else
-: 648:#endif
#####: 649: (*uop)(tmp_recvbuf, recvbuf, &recvcnts[rank],
-: 650: &datatype);
-: 651: }
-: 652: else {
-: 653:#ifdef HAVE_CXX_BINDING
#####: 654: if (is_cxx_uop) {
#####: 655: (*MPIR_Process.cxx_call_op_fn)( tmp_recvbuf,
-: 656: ((char *)recvbuf+disps[rank]*extent),
-: 657: recvcnts[rank], datatype, uop );
-: 658: }
-: 659: else
-: 660:#endif
#####: 661: (*uop)(tmp_recvbuf, ((char *)recvbuf+disps[rank]*extent),
-: 662: &recvcnts[rank], &datatype);
-: 663: /* we can't store the result at the beginning of
-: 664: recvbuf right here because there is useful data
-: 665: there that other process/processes need. at the
-: 666: end, we will copy back the result to the
-: 667: beginning of recvbuf. */
-: 668: }
-: 669: }
-: 670: else {
#####: 671: if (sendbuf != MPI_IN_PLACE) {
-: 672:#ifdef HAVE_CXX_BINDING
#####: 673: if (is_cxx_uop) {
#####: 674: (*MPIR_Process.cxx_call_op_fn)( recvbuf,
-: 675: tmp_recvbuf,
-: 676: recvcnts[rank],
-: 677: datatype, uop );
-: 678: }
-: 679: else
-: 680:#endif
#####: 681: (*uop)(recvbuf, tmp_recvbuf, &recvcnts[rank], &datatype);
-: 682: /* copy result back into recvbuf */
#####: 683: mpi_errno = MPIR_Localcopy(tmp_recvbuf, recvcnts[rank],
-: 684: datatype, recvbuf,
-: 685: recvcnts[rank], datatype);
-: 686: }
-: 687: else {
-: 688:#ifdef HAVE_CXX_BINDING
#####: 689: if (is_cxx_uop) {
#####: 690: (*MPIR_Process.cxx_call_op_fn)(
-: 691: ((char *)recvbuf+disps[rank]*extent),
-: 692: tmp_recvbuf, recvcnts[rank], datatype, uop );
-: 693:
-: 694: }
-: 695: else
-: 696:#endif
#####: 697: (*uop)(((char *)recvbuf+disps[rank]*extent),
-: 698: tmp_recvbuf, &recvcnts[rank], &datatype);
-: 699: /* copy result back into recvbuf */
#####: 700: mpi_errno = MPIR_Localcopy(tmp_recvbuf, recvcnts[rank],
-: 701: datatype,
-: 702: ((char *)recvbuf +
-: 703: disps[rank]*extent),
-: 704: recvcnts[rank], datatype);
-: 705: }
|
#####: 706: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 707: }
-: 708: }
-: 709:
-: 710: /* if MPI_IN_PLACE, move output data to the beginning of
-: 711: recvbuf. already done for rank 0. */
|
#####: 712: if ((sendbuf == MPI_IN_PLACE) && (rank != 0)) {
#####: 713: mpi_errno = MPIR_Localcopy(((char *)recvbuf +
-: 714: disps[rank]*extent),
-: 715: recvcnts[rank], datatype,
-: 716: recvbuf,
-: 717: recvcnts[rank], datatype);
|
#####: 718: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 719: }
-: 720: }
-: 721:
|
51515: 722: if (!is_commutative) {
2316: 723: int is_block_regular = 1;
6360: 724: for (i = 0; i < (comm_size - 1); ++i) {
4044: 725: if (recvcnts[i] != recvcnts[i+1]) {
|
#####: 726: is_block_regular = 0;
#####: 727: break;
-: 728: }
-: 729: }
-: 730:
-: 731: /* slightly retask pof2 to mean pof2 equal or greater, not always greater as it is above */
|
2316: 732: pof2 = 1;
2316: 733: while (pof2 < comm_size) pof2 <<= 1;
-: 734:
2316: 735: if (pof2 == comm_size && is_block_regular) {
-: 736: /* noncommutative, pof2 size, and block regular */
1956: 737: mpi_errno = MPIR_Reduce_scatter_noncomm(sendbuf, recvbuf, recvcnts, datatype, op, comm_ptr);
|
1956: 738: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 739: }
-: 740: else {
-: 741: /* noncommutative and (non-pof2 or block irregular), use recursive doubling. */
-: 742:
-: 743: /* need to allocate temporary buffer to receive incoming data*/
|
360: 744: MPIU_CHKLMEM_MALLOC(tmp_recvbuf, void *, total_count*(MPIR_MAX(true_extent,extent)), mpi_errno, "tmp_recvbuf");
-: 745: /* adjust for potential negative lower bound in datatype */
360: 746: tmp_recvbuf = (void *)((char*)tmp_recvbuf - true_lb);
-: 747:
-: 748: /* need to allocate another temporary buffer to accumulate
-: 749: results */
360: 750: MPIU_CHKLMEM_MALLOC(tmp_results, void *, total_count*(MPIR_MAX(true_extent,extent)), mpi_errno, "tmp_results");
-: 751: /* adjust for potential negative lower bound in datatype */
360: 752: tmp_results = (void *)((char*)tmp_results - true_lb);
-: 753:
-: 754: /* copy sendbuf into tmp_results */
360: 755: if (sendbuf != MPI_IN_PLACE)
360: 756: mpi_errno = MPIR_Localcopy(sendbuf, total_count, datatype,
-: 757: tmp_results, total_count, datatype);
-: 758: else
|
#####: 759: mpi_errno = MPIR_Localcopy(recvbuf, total_count, datatype,
-: 760: tmp_results, total_count, datatype);
-: 761:
|
360: 762: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 763:
|
360: 764: mask = 0x1;
360: 765: i = 0;
2040: 766: while (mask < comm_size) {
1320: 767: dst = rank ^ mask;
-: 768:
1320: 769: dst_tree_root = dst >> i;
1320: 770: dst_tree_root <<= i;
-: 771:
1320: 772: my_tree_root = rank >> i;
1320: 773: my_tree_root <<= i;
-: 774:
-: 775: /* At step 1, processes exchange (n-n/p) amount of
-: 776: data; at step 2, (n-2n/p) amount of data; at step 3, (n-4n/p)
-: 777: amount of data, and so forth. We use derived datatypes for this.
-: 778:
-: 779: At each step, a process does not need to send data
-: 780: indexed from my_tree_root to
-: 781: my_tree_root+mask-1. Similarly, a process won't receive
-: 782: data indexed from dst_tree_root to dst_tree_root+mask-1. */
-: 783:
-: 784: /* calculate sendtype */
1320: 785: blklens[0] = blklens[1] = 0;
5040: 786: for (j=0; j<my_tree_root; j++)
3720: 787: blklens[0] += recvcnts[j];
5040: 788: for (j=my_tree_root+mask; j<comm_size; j++)
3720: 789: blklens[1] += recvcnts[j];
-: 790:
1320: 791: dis[0] = 0;
1320: 792: dis[1] = blklens[0];
5280: 793: for (j=my_tree_root; (j<my_tree_root+mask) && (j<comm_size); j++)
3960: 794: dis[1] += recvcnts[j];
-: 795:
1320: 796: NMPI_Type_indexed(2, blklens, dis, datatype, &sendtype);
1320: 797: NMPI_Type_commit(&sendtype);
-: 798:
-: 799: /* calculate recvtype */
1320: 800: blklens[0] = blklens[1] = 0;
6720: 801: for (j=0; j<dst_tree_root && j<comm_size; j++)
5400: 802: blklens[0] += recvcnts[j];
4680: 803: for (j=dst_tree_root+mask; j<comm_size; j++)
3360: 804: blklens[1] += recvcnts[j];
-: 805:
1320: 806: dis[0] = 0;
1320: 807: dis[1] = blklens[0];
3960: 808: for (j=dst_tree_root; (j<dst_tree_root+mask) && (j<comm_size); j++)
2640: 809: dis[1] += recvcnts[j];
-: 810:
1320: 811: NMPI_Type_indexed(2, blklens, dis, datatype, &recvtype);
1320: 812: NMPI_Type_commit(&recvtype);
-: 813:
1320: 814: received = 0;
1320: 815: if (dst < comm_size) {
-: 816: /* tmp_results contains data to be sent in each step. Data is
-: 817: received in tmp_recvbuf and then accumulated into
-: 818: tmp_results. accumulation is done later below. */
-: 819:
960: 820: mpi_errno = MPIC_Sendrecv(tmp_results, 1, sendtype, dst,
-: 821: MPIR_REDUCE_SCATTER_TAG,
-: 822: tmp_recvbuf, 1, recvtype, dst,
-: 823: MPIR_REDUCE_SCATTER_TAG, comm,
-: 824: MPI_STATUS_IGNORE);
960: 825: received = 1;
|
960: 826: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 827: }
-: 828:
-: 829: /* if some processes in this process's subtree in this step
-: 830: did not have any destination process to communicate with
-: 831: because of non-power-of-two, we need to send them the
-: 832: result. We use a logarithmic recursive-halfing algorithm
-: 833: for this. */
-: 834:
|
1320: 835: if (dst_tree_root + mask > comm_size) {
432: 836: nprocs_completed = comm_size - my_tree_root - mask;
-: 837: /* nprocs_completed is the number of processes in this
-: 838: subtree that have all the data. Send data to others
-: 839: in a tree fashion. First find root of current tree
-: 840: that is being divided into two. k is the number of
-: 841: least-significant bits in this process's rank that
-: 842: must be zeroed out to find the rank of the root */
432: 843: j = mask;
432: 844: k = 0;
2232: 845: while (j) {
1368: 846: j >>= 1;
1368: 847: k++;
-: 848: }
432: 849: k--;
-: 850:
432: 851: tmp_mask = mask >> 1;
1800: 852: while (tmp_mask) {
936: 853: dst = rank ^ tmp_mask;
-: 854:
936: 855: tree_root = rank >> k;
936: 856: tree_root <<= k;
-: 857:
-: 858: /* send only if this proc has data and destination
-: 859: doesn't have data. at any step, multiple processes
-: 860: can send if they have the data */
936: 861: if ((dst > rank) &&
-: 862: (rank < tree_root + nprocs_completed)
-: 863: && (dst >= tree_root + nprocs_completed)) {
-: 864: /* send the current result */
216: 865: mpi_errno = MPIC_Send(tmp_recvbuf, 1, recvtype,
-: 866: dst, MPIR_REDUCE_SCATTER_TAG,
-: 867: comm);
|
216: 868: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 869: }
-: 870: /* recv only if this proc. doesn't have data and sender
-: 871: has data */
|
720: 872: else if ((dst < rank) &&
-: 873: (dst < tree_root + nprocs_completed) &&
-: 874: (rank >= tree_root + nprocs_completed)) {
216: 875: mpi_errno = MPIC_Recv(tmp_recvbuf, 1, recvtype, dst,
-: 876: MPIR_REDUCE_SCATTER_TAG,
-: 877: comm, MPI_STATUS_IGNORE);
216: 878: received = 1;
|
216: 879: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 880: }
|
936: 881: tmp_mask >>= 1;
936: 882: k--;
-: 883: }
-: 884: }
-: 885:
-: 886: /* The following reduction is done here instead of after
-: 887: the MPIC_Sendrecv or MPIC_Recv above. This is
-: 888: because to do it above, in the noncommutative
-: 889: case, we would need an extra temp buffer so as not to
-: 890: overwrite temp_recvbuf, because temp_recvbuf may have
-: 891: to be communicated to other processes in the
-: 892: non-power-of-two case. To avoid that extra allocation,
-: 893: we do the reduce here. */
1320: 894: if (received) {
1176: 895: if (is_commutative || (dst_tree_root < my_tree_root)) {
-: 896:#ifdef HAVE_CXX_BINDING
480: 897: if (is_cxx_uop) {
|
#####: 898: (*MPIR_Process.cxx_call_op_fn)( tmp_recvbuf,
-: 899: tmp_results, blklens[0],
-: 900: datatype, uop);
#####: 901: (*MPIR_Process.cxx_call_op_fn)(
-: 902: ((char *)tmp_recvbuf + dis[1]*extent),
-: 903: ((char *)tmp_results + dis[1]*extent),
-: 904: blklens[1], datatype, uop );
-: 905: }
-: 906: else
-: 907:#endif
-: 908: {
|
480: 909: (*uop)(tmp_recvbuf, tmp_results, &blklens[0],
-: 910: &datatype);
480: 911: (*uop)(((char *)tmp_recvbuf + dis[1]*extent),
-: 912: ((char *)tmp_results + dis[1]*extent),
-: 913: &blklens[1], &datatype);
-: 914: }
-: 915: }
-: 916: else {
-: 917:#ifdef HAVE_CXX_BINDING
696: 918: if (is_cxx_uop) {
|
#####: 919: (*MPIR_Process.cxx_call_op_fn)( tmp_results,
-: 920: tmp_recvbuf, blklens[0],
-: 921: datatype, uop );
#####: 922: (*MPIR_Process.cxx_call_op_fn)(
-: 923: ((char *)tmp_results + dis[1]*extent),
-: 924: ((char *)tmp_recvbuf + dis[1]*extent),
-: 925: blklens[1], datatype, uop );
-: 926: }
-: 927: else
-: 928:#endif
-: 929: {
|
696: 930: (*uop)(tmp_results, tmp_recvbuf, &blklens[0],
-: 931: &datatype);
696: 932: (*uop)(((char *)tmp_results + dis[1]*extent),
-: 933: ((char *)tmp_recvbuf + dis[1]*extent),
-: 934: &blklens[1], &datatype);
-: 935: }
-: 936: /* copy result back into tmp_results */
696: 937: mpi_errno = MPIR_Localcopy(tmp_recvbuf, 1, recvtype,
-: 938: tmp_results, 1, recvtype);
|
696: 939: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 940: }
-: 941: }
-: 942:
|
1320: 943: NMPI_Type_free(&sendtype);
1320: 944: NMPI_Type_free(&recvtype);
-: 945:
1320: 946: mask <<= 1;
1320: 947: i++;
-: 948: }
-: 949:
-: 950: /* now copy final results from tmp_results to recvbuf */
360: 951: mpi_errno = MPIR_Localcopy(((char *)tmp_results+disps[rank]*extent),
-: 952: recvcnts[rank], datatype, recvbuf,
-: 953: recvcnts[rank], datatype);
|
360: 954: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 955: }
-: 956: }
-: 957:
-: 958:fn_exit:
|
245731: 959: MPIU_CHKLMEM_FREEALL();
-: 960:
51515: 961: MPIR_Nest_decr();
-: 962: /* check if multiple threads are calling this collective function */
-: 963: MPIDU_ERR_CHECK_MULTIPLE_THREADS_EXIT( comm_ptr );
-: 964:
51515: 965: if (MPIU_THREADPRIV_FIELD(op_errno))
|
#####: 966: mpi_errno = MPIU_THREADPRIV_FIELD(op_errno);
-: 967:
|
51515: 968: return (mpi_errno);
|
-: 969:fn_fail:
-: 970: goto fn_exit;
-: 971:}
-: 972:/* end:nested */
-: 973:
-: 974:#undef FUNCNAME
-: 975:#define FUNCNAME MPIR_Reduce_scatter
-: 976:#undef FCNAME
-: 977:#define FCNAME MPIU_QUOTE(FUNCNAME)
-: 978:/* begin:nested */
-: 979:/* not declared static because a machine-specific function may call this one in some cases */
-: 980:int MPIR_Reduce_scatter_inter (
-: 981: void *sendbuf,
-: 982: void *recvbuf,
-: 983: int *recvcnts,
-: 984: MPI_Datatype datatype,
-: 985: MPI_Op op,
-: 986: MPID_Comm *comm_ptr )
|
#####: 987:{
-: 988:/* Intercommunicator Reduce_scatter.
-: 989: We first do an intercommunicator reduce to rank 0 on left group,
-: 990: then an intercommunicator reduce to rank 0 on right group, followed
-: 991: by local intracommunicator scattervs in each group.
-: 992:*/
-: 993:
-: 994: int rank, mpi_errno, root, local_size, total_count, i;
#####: 995: MPI_Aint true_extent, true_lb = 0, extent;
#####: 996: void *tmp_buf=NULL;
#####: 997: int *disps=NULL;
#####: 998: MPID_Comm *newcomm_ptr = NULL;
-: 999:
#####: 1000: rank = comm_ptr->rank;
#####: 1001: local_size = comm_ptr->local_size;
-: 1002:
#####: 1003: total_count = 0;
#####: 1004: for (i=0; i<local_size; i++) total_count += recvcnts[i];
-: 1005:
#####: 1006: if (rank == 0) {
-: 1007: /* In each group, rank 0 allocates a temp. buffer for the
-: 1008: reduce */
-: 1009:
#####: 1010: disps = MPIU_Malloc(local_size*sizeof(int));
|
-: 1011: /* --BEGIN ERROR HANDLING-- */
#####: 1012: if (!disps) {
#####: 1013: mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0 );
#####: 1014: return mpi_errno;
-: 1015: }
-: 1016: /* --END ERROR HANDLING-- */
-: 1017:
|
#####: 1018: total_count = 0;
#####: 1019: for (i=0; i<local_size; i++) {
#####: 1020: disps[i] = total_count;
#####: 1021: total_count += recvcnts[i];
-: 1022: }
-: 1023:
#####: 1024: mpi_errno = NMPI_Type_get_true_extent(datatype, &true_lb,
-: 1025: &true_extent);
|
-: 1026: /* --BEGIN ERROR HANDLING-- */
#####: 1027: if (mpi_errno)
-: 1028: {
#####: 1029: mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
#####: 1030: return mpi_errno;
-: 1031: }
-: 1032: /* --END ERROR HANDLING-- */
|
#####: 1033: MPID_Datatype_get_extent_macro(datatype, extent);
-: 1034:
#####: 1035: tmp_buf = MPIU_Malloc(total_count*(MPIR_MAX(extent,true_extent)));
|
-: 1036: /* --BEGIN ERROR HANDLING-- */
#####: 1037: if (!tmp_buf) {
#####: 1038: mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0 );
#####: 1039: return mpi_errno;
-: 1040: }
-: 1041: /* --END ERROR HANDLING-- */
-: 1042: /* adjust for potential negative lower bound in datatype */
|
#####: 1043: tmp_buf = (void *)((char*)tmp_buf - true_lb);
-: 1044: }
-: 1045:
-: 1046: /* first do a reduce from right group to rank 0 in left group,
-: 1047: then from left group to rank 0 in right group*/
#####: 1048: if (comm_ptr->is_low_group) {
-: 1049: /* reduce from right group to rank 0*/
#####: 1050: root = (rank == 0) ? MPI_ROOT : MPI_PROC_NULL;
#####: 1051: mpi_errno = MPIR_Reduce_inter(sendbuf, tmp_buf, total_count, datatype, op,
-: 1052: root, comm_ptr);
|
-: 1053: /* --BEGIN ERROR HANDLING-- */
#####: 1054: if (mpi_errno)
-: 1055: {
#####: 1056: mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
#####: 1057: return mpi_errno;
-: 1058: }
-: 1059: /* --END ERROR HANDLING-- */
-: 1060:
-: 1061: /* reduce to rank 0 of right group */
|
#####: 1062: root = 0;
#####: 1063: mpi_errno = MPIR_Reduce_inter(sendbuf, tmp_buf, total_count, datatype, op,
-: 1064: root, comm_ptr);
|
-: 1065: /* --BEGIN ERROR HANDLING-- */
#####: 1066: if (mpi_errno)
-: 1067: {
#####: 1068: mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
#####: 1069: return mpi_errno;
-: 1070: }
-: 1071: /* --END ERROR HANDLING-- */
-: 1072: }
-: 1073: else {
-: 1074: /* reduce to rank 0 of left group */
|
#####: 1075: root = 0;
#####: 1076: mpi_errno = MPIR_Reduce_inter(sendbuf, tmp_buf, total_count, datatype, op,
-: 1077: root, comm_ptr);
|
-: 1078: /* --BEGIN ERROR HANDLING-- */
#####: 1079: if (mpi_errno)
-: 1080: {
#####: 1081: mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
#####: 1082: return mpi_errno;
-: 1083: }
-: 1084: /* --END ERROR HANDLING-- */
-: 1085:
-: 1086: /* reduce from right group to rank 0 */
|
#####: 1087: root = (rank == 0) ? MPI_ROOT : MPI_PROC_NULL;
#####: 1088: mpi_errno = MPIR_Reduce_inter(sendbuf, tmp_buf, total_count, datatype, op,
-: 1089: root, comm_ptr);
|
-: 1090: /* --BEGIN ERROR HANDLING-- */
#####: 1091: if (mpi_errno)
-: 1092: {
#####: 1093: mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
#####: 1094: return mpi_errno;
-: 1095: }
-: 1096: /* --END ERROR HANDLING-- */
-: 1097: }
-: 1098:
-: 1099: /* Get the local intracommunicator */
|
#####: 1100: if (!comm_ptr->local_comm)
#####: 1101: MPIR_Setup_intercomm_localcomm( comm_ptr );
-: 1102:
#####: 1103: newcomm_ptr = comm_ptr->local_comm;
-: 1104:
#####: 1105: mpi_errno = MPIR_Scatterv(tmp_buf, recvcnts, disps, datatype, recvbuf,
-: 1106: recvcnts[rank], datatype, 0, newcomm_ptr);
|
-: 1107: /* --BEGIN ERROR HANDLING-- */
#####: 1108: if (mpi_errno)
-: 1109: {
#####: 1110: mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
#####: 1111: return mpi_errno;
-: 1112: }
-: 1113: /* --END ERROR HANDLING-- */
-: 1114:
|
#####: 1115: if (rank == 0) {
#####: 1116: MPIU_Free(disps);
#####: 1117: MPIU_Free((char*)tmp_buf+true_lb);
-: 1118: }
-: 1119:
#####: 1120: return mpi_errno;
-: 1121:
-: 1122:}
-: 1123:/* end:nested */
-: 1124:#endif
-: 1125:
-: 1126:#undef FUNCNAME
-: 1127:#define FUNCNAME MPI_Reduce_scatter
-: 1128:#undef FCNAME
-: 1129:#define FCNAME MPIU_QUOTE(FUNCNAME)
-: 1130:/*@
-: 1131:
-: 1132:MPI_Reduce_scatter - Combines values and scatters the results
-: 1133:
-: 1134:Input Parameters:
-: 1135:+ sendbuf - starting address of send buffer (choice)
-: 1136:. recvcounts - integer array specifying the
-: 1137:number of elements in result distributed to each process.
-: 1138:Array must be identical on all calling processes.
-: 1139:. datatype - data type of elements of input buffer (handle)
-: 1140:. op - operation (handle)
-: 1141:- comm - communicator (handle)
-: 1142:
-: 1143:Output Parameter:
-: 1144:. recvbuf - starting address of receive buffer (choice)
-: 1145:
-: 1146:.N ThreadSafe
-: 1147:
-: 1148:.N Fortran
-: 1149:
-: 1150:.N collops
-: 1151:
-: 1152:.N Errors
-: 1153:.N MPI_SUCCESS
-: 1154:.N MPI_ERR_COMM
-: 1155:.N MPI_ERR_COUNT
-: 1156:.N MPI_ERR_TYPE
-: 1157:.N MPI_ERR_BUFFER
-: 1158:.N MPI_ERR_OP
-: 1159:.N MPI_ERR_BUFFER_ALIAS
-: 1160:@*/
-: 1161:int MPI_Reduce_scatter(void *sendbuf, void *recvbuf, int *recvcnts,
-: 1162: MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
|
51597: 1163:{
51597: 1164: int mpi_errno = MPI_SUCCESS;
51597: 1165: MPID_Comm *comm_ptr = NULL;
51597: 1166: MPIU_THREADPRIV_DECL;
-: 1167: MPID_MPI_STATE_DECL(MPID_STATE_MPI_REDUCE_SCATTER);
-: 1168:
51597: 1169: MPIR_ERRTEST_INITIALIZED_ORDIE();
-: 1170:
51597: 1171: MPIU_THREAD_CS_ENTER(ALLFUNC,);
-: 1172: MPID_MPI_COLL_FUNC_ENTER(MPID_STATE_MPI_REDUCE_SCATTER);
-: 1173:
-: 1174: /* Validate parameters, especially handles needing to be converted */
|
-: 1175:# ifdef HAVE_ERROR_CHECKING
-: 1176: {
-: 1177: MPID_BEGIN_ERROR_CHECKS;
-: 1178: {
51597: 1179: MPIR_ERRTEST_COMM(comm, mpi_errno);
51597: 1180: if (mpi_errno != MPI_SUCCESS) goto fn_fail;
-: 1181: }
-: 1182: MPID_END_ERROR_CHECKS;
-: 1183: }
-: 1184:# endif /* HAVE_ERROR_CHECKING */
-: 1185:
-: 1186: /* Convert MPI object handles to object pointers */
|
51595: 1187: MPID_Comm_get_ptr( comm, comm_ptr );
-: 1188:
-: 1189: /* Validate parameters and objects (post conversion) */
|
-: 1190:# ifdef HAVE_ERROR_CHECKING
-: 1191: {
-: 1192: MPID_BEGIN_ERROR_CHECKS;
-: 1193: {
51595: 1194: MPID_Datatype *datatype_ptr = NULL;
51595: 1195: MPID_Op *op_ptr = NULL;
-: 1196: int i, size, sum;
-: 1197:
51595: 1198: MPID_Comm_valid_ptr( comm_ptr, mpi_errno );
51595: 1199: if (mpi_errno != MPI_SUCCESS) goto fn_fail;
-: 1200:
51593: 1201: size = comm_ptr->local_size;
-: 1202: /* even in intercomm. case, recvcnts is of size local_size */
-: 1203:
51593: 1204: sum = 0;
194460: 1205: for (i=0; i<size; i++) {
142867: 1206: MPIR_ERRTEST_COUNT(recvcnts[i],mpi_errno);
142867: 1207: sum += recvcnts[i];
-: 1208: }
-: 1209:
51593: 1210: MPIR_ERRTEST_DATATYPE(datatype, "datatype", mpi_errno);
51593: 1211: if (HANDLE_GET_KIND(datatype) != HANDLE_KIND_BUILTIN) {
1684: 1212: MPID_Datatype_get_ptr(datatype, datatype_ptr);
1684: 1213: MPID_Datatype_valid_ptr( datatype_ptr, mpi_errno );
1684: 1214: MPID_Datatype_committed_ptr( datatype_ptr, mpi_errno );
-: 1215: }
-: 1216:
51593: 1217: MPIR_ERRTEST_RECVBUF_INPLACE(recvbuf, recvcnts[comm_ptr->rank], mpi_errno);
51593: 1218: if (comm_ptr->comm_kind == MPID_INTERCOMM)
#####: 1219: MPIR_ERRTEST_SENDBUF_INPLACE(sendbuf, sum, mpi_errno);
-: 1220:
51593: 1221: MPIR_ERRTEST_USERBUFFER(recvbuf,recvcnts[comm_ptr->rank],datatype,mpi_errno);
51593: 1222: MPIR_ERRTEST_USERBUFFER(sendbuf,sum,datatype,mpi_errno);
-: 1223:
51593: 1224: MPIR_ERRTEST_OP(op, mpi_errno);
-: 1225:
51567: 1226: if (mpi_errno != MPI_SUCCESS) goto fn_fail;
51541: 1227: if (HANDLE_GET_KIND(op) != HANDLE_KIND_BUILTIN) {
4176: 1228: MPID_Op_get_ptr(op, op_ptr);
4176: 1229: MPID_Op_valid_ptr( op_ptr, mpi_errno );
-: 1230: }
51541: 1231: if (HANDLE_GET_KIND(op) == HANDLE_KIND_BUILTIN) {
47365: 1232: mpi_errno =
-: 1233: ( * MPIR_Op_check_dtype_table[op%16 - 1] )(datatype);
-: 1234: }
51541: 1235: if (mpi_errno != MPI_SUCCESS) goto fn_fail;
-: 1236: }
-: 1237: MPID_END_ERROR_CHECKS;
-: 1238: }
-: 1239:# endif /* HAVE_ERROR_CHECKING */
-: 1240:
-: 1241: /* ... body of routine ... */
-: 1242:
|
51515: 1243: if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Reduce_scatter != NULL)
-: 1244: {
|
#####: 1245: mpi_errno = comm_ptr->coll_fns->Reduce_scatter(sendbuf, recvbuf,
-: 1246: recvcnts, datatype,
-: 1247: op, comm_ptr);
-: 1248: }
-: 1249: else
-: 1250: {
|
51515: 1251: MPIU_THREADPRIV_GET;
-: 1252:
51515: 1253: MPIR_Nest_incr();
51515: 1254: if (comm_ptr->comm_kind == MPID_INTRACOMM)
-: 1255: /* intracommunicator */
51515: 1256: mpi_errno = MPIR_Reduce_scatter(sendbuf, recvbuf,
-: 1257: recvcnts, datatype,
-: 1258: op, comm_ptr);
-: 1259: else {
-: 1260: /* intercommunicator */
|
#####: 1261: mpi_errno = MPIR_Reduce_scatter_inter(sendbuf, recvbuf,
-: 1262: recvcnts, datatype,
-: 1263: op, comm_ptr);
-: 1264: }
|
51515: 1265: MPIR_Nest_decr();
-: 1266: }
-: 1267:
|
51515: 1268: if (mpi_errno != MPI_SUCCESS) goto fn_fail;
-: 1269:
-: 1270: /* ... end of body of routine ... */
-: 1271:
|
51597: 1272: fn_exit:
|
-: 1273: MPID_MPI_COLL_FUNC_EXIT(MPID_STATE_MPI_REDUCE_SCATTER);
|
51597: 1274: MPIU_THREAD_CS_EXIT(ALLFUNC,);
51597: 1275: return mpi_errno;
-: 1276:
|
82: 1277: fn_fail:
-: 1278: /* --BEGIN ERROR HANDLING-- */
-: 1279:# ifdef HAVE_ERROR_CHECKING
-: 1280: {
82: 1281: mpi_errno = MPIR_Err_create_code(
-: 1282: mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**mpi_reduce_scatter",
-: 1283: "**mpi_reduce_scatter %p %p %p %D %O %C", sendbuf, recvbuf, recvcnts, datatype, op, comm);
-: 1284: }
-: 1285:# endif
82: 1286: mpi_errno = MPIR_Err_return_comm( comm_ptr, FCNAME, mpi_errno );
82: 1287: goto fn_exit;
-: 1288: /* --END ERROR HANDLING-- */
-: 1289:}
|