-: 0:Source:/home/MPI/testing/mpich2/mpich2/src/mpid/ch3/src/ch3u_port.c
-: 0:Graph:ch3u_port.gcno
-: 0:Data:ch3u_port.gcda
-: 0:Runs:3459
-: 0:Programs:899
-: 1:/* -*- Mode: C; c-basic-offset:4 ; -*- */
-: 2:/*
-: 3: * (C) 2001 by Argonne National Laboratory.
-: 4: * See COPYRIGHT in top-level directory.
-: 5: */
-: 6:
-: 7:#include "mpidi_ch3_impl.h"
-: 8:
-: 9:/*
-: 10: * This file replaces ch3u_comm_connect.c and ch3u_comm_accept.c . These
-: 11: * routines need to be used together, particularly since they must exchange
-: 12: * information. In addition, many of the steps that the take are identical,
-: 13: * such as building the new process group information after a connection.
-: 14: * By having these routines in the same file, it is easier for them
-: 15: * to share internal routines and it is easier to ensure that communication
-: 16: * between the two root processes (the connector and acceptor) are
-: 17: * consistent.
-: 18: */
-: 19:
-: 20:/* FIXME: If dynamic processes are not supported, this file will contain
-: 21: no code and some compilers may warn about an "empty translation unit" */
-: 22:#ifndef MPIDI_CH3_HAS_NO_DYNAMIC_PROCESS
-: 23:
-: 24:/* FIXME: pg_translation is used for ? */
-: 25:typedef struct pg_translation {
-: 26: int pg_index; /* index of a process group (index in pg_node) */
-: 27: int pg_rank; /* rank in that process group */
-: 28:} pg_translation;
-: 29:
-: 30:
-: 31:typedef struct pg_node {
-: 32: int index; /* Internal index of process group
-: 33: (see pg_translation) */
-: 34: char *pg_id;
-: 35: char *str; /* String describing connection info for pg */
-: 36: int lenStr; /* Length of this string (including the null terminator(s)) */
-: 37: struct pg_node *next;
-: 38:} pg_node;
-: 39:
-: 40:/* These functions help implement the connect/accept algorithm */
-: 41:static int ExtractLocalPGInfo( MPID_Comm *, pg_translation [],
-: 42: pg_node **, int * );
-: 43:static int ReceivePGAndDistribute( MPID_Comm *, MPID_Comm *, int, int *,
-: 44: int, MPIDI_PG_t *[] );
-: 45:static int SendPGtoPeerAndFree( MPID_Comm *, int *, pg_node * );
-: 46:static int FreeNewVC( MPIDI_VC_t *new_vc );
-: 47:static int SetupNewIntercomm( MPID_Comm *comm_ptr, int remote_comm_size,
-: 48: pg_translation remote_translation[],
-: 49: MPIDI_PG_t **remote_pg,
-: 50: MPID_Comm *intercomm );
-: 51:static int MPIDI_CH3I_Initialize_tmp_comm(MPID_Comm **comm_pptr,
-: 52: MPIDI_VC_t *vc_ptr, int is_low_group, int context_id_offset);
-: 53:/* ------------------------------------------------------------------------- */
-: 54:/*
-: 55: * Structure of this file and the connect/accept algorithm:
-: 56: *
-: 57: * Here are the steps involved in implementating MPI_Comm_connect and
-: 58: * MPI_Comm_accept. These same steps are used withing MPI_Comm_spawn
-: 59: * and MPI_Comm_spawn_multiple.
-: 60: *
-: 61: * First, the connecting process establishes a connection (not a virtual
-: 62: * connection!) to the designated accepting process.
-: 63: * This makes use of the usual (channel-specific) connection code.
-: 64: * Once this connection is established, the connecting process sends a packet
-: 65: * (type MPIDI_CH3I_PKT_SC_CONN_ACCEPT) to the accepting process.
-: 66: * This packet contains a "port_tag_name", which is a value that
-: 67: * is used to separate different MPI port names (values from MPI_Open_port)
-: 68: * on the same process (this is a way to multiplex many MPI port names on
-: 69: * a single communication connection port).
-: 70: *
-: 71: * At this point, the accepting process creates a virtual connection (VC)
-: 72: * for this connection, initializes it, sends a packet back with the type
-: 73: * MPIDI_CH3I_PKT_SC_OPEN_RESP. In addition, the connection is saved in
-: 74: * an accept queue with the port_tag_name.
-: 75: *
-: 76: * On the accepting side, the process waits until the progress engine
-: 77: * inserts the connect request into the accept queue (this is done with the
-: 78: * routine MPIDI_CH3I_Acceptq_dequeue). This routine returns the matched
-: 79: * virtual connection (VC).
-: 80: *
-: 81: * Once both sides have established there VC, they both invoke
-: 82: * MPIDI_CH3I_Initialize_tmp_comm to create a temporary intercommunicator.
-: 83: * A temporary intercommunicator is constructed so that we can use
-: 84: * MPI routines to send the other information that we need to complete
-: 85: * the connect/accept operation (described below).
-: 86: *
-: 87: * The above is implemented with the routines
-: 88: * MPIDI_Create_inter_root_communicator_connect
-: 89: * MPIDI_Create_inter_root_communicator_accept
-: 90: * MPIDI_CH3I_Initialize_tmp_comm
-: 91: *
-: 92: * At this point, the two "root" processes of the communicators that are
-: 93: * connecting can use MPI communication. They must then exchange the
-: 94: * following information:
-: 95: *
-: 96: * The size of the "remote" communicator
-: 97: * Description of all process groups; that is, all of the MPI_COMM_WORLDs
-: 98: * that they know.
-: 99: * The shared context id that will be used
-: 100: *
-: 101: *
-: 102: */
-: 103:/* ------------------------------------------------------------------------- */
-: 104:
-: 105:/*
-: 106: * These next two routines are used to create a virtual connection
-: 107: * (VC) and a temporary intercommunicator that can be used to
-: 108: * communicate between the two "root" processes for the
-: 109: * connect and accept.
-: 110: */
-: 111:
-: 112:#undef FUNCNAME
-: 113:#define FUNCNAME MPIDI_Create_inter_root_communicator_connect
-: 114:#undef FCNAME
-: 115:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 116:static int MPIDI_Create_inter_root_communicator_connect(const char *port_name,
-: 117: MPID_Comm **comm_pptr,
-: 118: MPIDI_VC_t **vc_pptr)
669: 119:{
669: 120: int mpi_errno = MPI_SUCCESS;
-: 121: MPID_Comm *tmp_comm;
669: 122: MPIDI_VC_t *connect_vc = NULL;
-: 123: int port_name_tag;
-: 124: MPIDI_STATE_DECL(MPID_STATE_MPIDI_CREATE_INTER_ROOT_COMMUNICATOR_CONNECT);
-: 125:
-: 126: MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CREATE_INTER_ROOT_COMMUNICATOR_CONNECT);
-: 127:
-: 128: /* Connect to the root on the other side. Create a
-: 129: temporary intercommunicator between the two roots so that
-: 130: we can use MPI functions to communicate data between them. */
-: 131:
669: 132: mpi_errno = MPIU_CALL(MPIDI_CH3,Connect_to_root(port_name, &connect_vc));
669: 133: if (mpi_errno != MPI_SUCCESS) {
|
1: 134: MPIU_ERR_POP(mpi_errno);
-: 135: }
-: 136:
-: 137: /* extract the tag from the port_name */
|
668: 138: mpi_errno = MPIDI_GetTagFromPort( port_name, &port_name_tag);
668: 139: if (mpi_errno != MPIU_STR_SUCCESS) {
|
#####: 140: MPIU_ERR_POP(mpi_errno);
-: 141: }
-: 142:
|
668: 143: mpi_errno = MPIDI_CH3I_Initialize_tmp_comm(&tmp_comm, connect_vc, 1, port_name_tag);
668: 144: if (mpi_errno != MPI_SUCCESS) {
|
#####: 145: MPIU_ERR_POP(mpi_errno);
-: 146: }
-: 147:
|
668: 148: *comm_pptr = tmp_comm;
668: 149: *vc_pptr = connect_vc;
-: 150:
669: 151: fn_exit:
|
-: 152: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CREATE_INTER_ROOT_COMMUNICATOR_CONNECT);
|
669: 153: return mpi_errno;
|
-: 154: fn_fail:
-: 155: goto fn_exit;
-: 156:}
-: 157:
-: 158:/* Creates a communicator for the purpose of communicating with one other
-: 159: process (the root of the other group). It also returns the virtual
-: 160: connection */
-: 161:#undef FUNCNAME
-: 162:#define FUNCNAME MPIDI_Create_inter_root_communicator_accept
-: 163:#undef FCNAME
-: 164:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 165:static int MPIDI_Create_inter_root_communicator_accept(const char *port_name,
-: 166: MPID_Comm **comm_pptr,
-: 167: MPIDI_VC_t **vc_pptr)
|
668: 168:{
668: 169: int mpi_errno = MPI_SUCCESS;
-: 170: MPID_Comm *tmp_comm;
668: 171: MPIDI_VC_t *new_vc = NULL;
-: 172: MPID_Progress_state progress_state;
-: 173: int port_name_tag;
-: 174: MPIDI_STATE_DECL(MPID_STATE_MPIDI_CREATE_INTER_ROOT_COMMUNICATOR_ACCEPT);
-: 175:
-: 176: MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CREATE_INTER_ROOT_COMMUNICATOR_ACCEPT);
-: 177:
-: 178: /* extract the tag from the port_name */
668: 179: mpi_errno = MPIDI_GetTagFromPort( port_name, &port_name_tag);
668: 180: if (mpi_errno != MPIU_STR_SUCCESS) {
|
#####: 181: MPIU_ERR_POP(mpi_errno);
-: 182: }
-: 183:
-: 184: /* FIXME: Describe the algorithm used here, and what routine
-: 185: is user on the other side of this connection */
-: 186: /* dequeue the accept queue to see if a connection with the
-: 187: root on the connect side has been formed in the progress
-: 188: engine (the connection is returned in the form of a vc). If
-: 189: not, poke the progress engine. */
-: 190:
|
668: 191: MPID_Progress_start(&progress_state);
-: 192: for(;;)
-: 193: {
1342: 194: MPIDI_CH3I_Acceptq_dequeue(&new_vc, port_name_tag);
1342: 195: if (new_vc != NULL)
-: 196: {
668: 197: break;
-: 198: }
-: 199:
674: 200: mpi_errno = MPID_Progress_wait(&progress_state);
|
-: 201: /* --BEGIN ERROR HANDLING-- */
674: 202: if (mpi_errno)
-: 203: {
-: 204: MPID_Progress_end(&progress_state);
#####: 205: MPIU_ERR_POP(mpi_errno);
-: 206: }
-: 207: /* --END ERROR HANDLING-- */
-: 208: }
-: 209: MPID_Progress_end(&progress_state);
-: 210:
|
668: 211: mpi_errno = MPIDI_CH3I_Initialize_tmp_comm(&tmp_comm, new_vc, 0, port_name_tag);
668: 212: if (mpi_errno != MPI_SUCCESS) {
|
#####: 213: MPIU_ERR_POP(mpi_errno);
-: 214: }
-: 215:
|
668: 216: *comm_pptr = tmp_comm;
668: 217: *vc_pptr = new_vc;
-: 218:
-: 219: MPIU_DBG_MSG_FMT(CH3_CONNECT,VERBOSE,(MPIU_DBG_FDEST,
-: 220: "new_vc=%p", new_vc));
-: 221:
668: 222:fn_exit:
|
-: 223: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CREATE_INTER_ROOT_COMMUNICATOR_ACCEPT);
|
668: 224: return mpi_errno;
-: 225:
|
-: 226:fn_fail:
-: 227: goto fn_exit;
-: 228:}
-: 229:
-: 230:/* This is a utility routine used to initialize temporary communicators
-: 231: used in connect/accept operations, and is only used in the above two
-: 232: routines */
-: 233:#undef FUNCNAME
-: 234:#define FUNCNAME MPIDI_CH3I_Initialize_tmp_comm
-: 235:#undef FCNAME
-: 236:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 237:static int MPIDI_CH3I_Initialize_tmp_comm(MPID_Comm **comm_pptr,
-: 238: MPIDI_VC_t *vc_ptr, int is_low_group, int context_id_offset)
|
1336: 239:{
1336: 240: int mpi_errno = MPI_SUCCESS;
-: 241: MPID_Comm *tmp_comm, *commself_ptr;
-: 242: MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_INITIALIZE_TMP_COMM);
-: 243:
-: 244: MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_INITIALIZE_TMP_COMM);
-: 245:
1336: 246: MPID_Comm_get_ptr( MPI_COMM_SELF, commself_ptr );
-: 247:
-: 248: /* WDG-old code allocated a context id that was then discarded */
1336: 249: mpi_errno = MPIR_Comm_create(&tmp_comm);
1336: 250: if (mpi_errno != MPI_SUCCESS) {
|
#####: 251: MPIU_ERR_POP(mpi_errno);
-: 252: }
-: 253: /* fill in all the fields of tmp_comm. */
-: 254:
-: 255: /* We use the second half of the context ID bits for dynamic
-: 256: * processes. This assumes that the context ID mask array is made
-: 257: * up of uint32_t's. */
-: 258: /* FIXME: This code is still broken for the following case:
-: 259: * If the same process opens connections to the multiple
-: 260: * processes, this context ID might get out of sync.
-: 261: */
|
1336: 262: tmp_comm->context_id = MPID_CONTEXT_SET_FIELD(DYNAMIC_PROC, context_id_offset, 1);
1336: 263: tmp_comm->recvcontext_id = tmp_comm->context_id;
-: 264:
-: 265: /* FIXME - we probably need a unique context_id. */
1336: 266: tmp_comm->remote_size = 1;
-: 267:
-: 268: /* Fill in new intercomm */
1336: 269: tmp_comm->local_size = 1;
1336: 270: tmp_comm->rank = 0;
1336: 271: tmp_comm->comm_kind = MPID_INTERCOMM;
1336: 272: tmp_comm->local_comm = NULL;
1336: 273: tmp_comm->is_low_group = is_low_group;
-: 274:
-: 275: /* No pg structure needed since vc has already been set up
-: 276: (connection has been established). */
-: 277:
-: 278: /* Point local vcr, vcrt at those of commself_ptr */
-: 279: /* FIXME: Explain why */
1336: 280: tmp_comm->local_vcrt = commself_ptr->vcrt;
1336: 281: MPID_VCRT_Add_ref(commself_ptr->vcrt);
1336: 282: tmp_comm->local_vcr = commself_ptr->vcr;
-: 283:
-: 284: /* No pg needed since connection has already been formed.
-: 285: FIXME - ensure that the comm_release code does not try to
-: 286: free an unallocated pg */
-: 287:
-: 288: /* Set up VC reference table */
1336: 289: mpi_errno = MPID_VCRT_Create(tmp_comm->remote_size, &tmp_comm->vcrt);
1336: 290: if (mpi_errno != MPI_SUCCESS) {
|
#####: 291: MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**init_vcrt");
-: 292: }
|
1336: 293: mpi_errno = MPID_VCRT_Get_ptr(tmp_comm->vcrt, &tmp_comm->vcr);
1336: 294: if (mpi_errno != MPI_SUCCESS) {
|
#####: 295: MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**init_getptr");
-: 296: }
-: 297:
-: 298: /* FIXME: Why do we do a dup here? */
|
1336: 299: MPID_VCR_Dup(vc_ptr, tmp_comm->vcr);
-: 300:
1336: 301: *comm_pptr = tmp_comm;
-: 302:
-: 303: /* FIXME: Who sets? Why? Where is this defined? Document.
-: 304: Why is this not done as part of the VC initialization? */
-: 305: /* channels/sshm/include/mpidi_ch3_pre.h defines this */
-: 306:#ifdef MPIDI_CH3_HAS_CONN_ACCEPT_HOOK
-: 307: /* If the VC creates non-duplex connections then the acceptor will
-: 308: * need to connect back to form the other half of the connection. */
-: 309: /* FIXME: A hook should not be such a specific function; instead,
-: 310: it should invoke a function pointer defined in the channel
-: 311: interface structure */
-: 312: mpi_errno = MPIDI_CH3_Complete_unidirectional_connection( vc_ptr );
-: 313:#endif
-: 314:
1336: 315:fn_exit:
|
-: 316: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_INITIALIZE_TMP_COMM);
|
1336: 317: return mpi_errno;
|
-: 318:fn_fail:
-: 319: goto fn_exit;
-: 320:}
-: 321:
-: 322:/* ------------------------------------------------------------------------- */
-: 323:/*
-: 324: MPIDI_Comm_connect()
-: 325:
-: 326: Algorithm: First create a connection (vc) between this root and the
-: 327: root on the accept side. Using this vc, create a temporary
-: 328: intercomm between the two roots. Use MPI functions to communicate
-: 329: the other information needed to create the real intercommunicator
-: 330: between the processes on the two sides. Then free the
-: 331: intercommunicator between the roots. Most of the complexity is
-: 332: because there can be multiple process groups on each side.
-: 333:*/
-: 334:
-: 335:#undef FUNCNAME
-: 336:#define FUNCNAME MPIDI_Comm_connect
-: 337:#undef FCNAME
-: 338:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 339:int MPIDI_Comm_connect(const char *port_name, MPID_Info *info, int root,
-: 340: MPID_Comm *comm_ptr, MPID_Comm **newcomm)
|
1318: 341:{
1318: 342: int mpi_errno=MPI_SUCCESS;
-: 343: int j, i, rank, recv_ints[3], send_ints[3], context_id;
1318: 344: int remote_comm_size=0;
1318: 345: MPID_Comm *tmp_comm = NULL, *intercomm;
1318: 346: MPIDI_VC_t *new_vc = NULL;
1318: 347: int sendtag=100, recvtag=100, n_remote_pgs;
1318: 348: int n_local_pgs=1, local_comm_size;
1318: 349: pg_translation *local_translation = NULL, *remote_translation = NULL;
1318: 350: pg_node *pg_list = NULL;
1318: 351: MPIDI_PG_t **remote_pg = NULL;
1318: 352: MPIU_CHKLMEM_DECL(3);
-: 353: MPIDI_STATE_DECL(MPID_STATE_MPIDI_COMM_CONNECT);
-: 354:
-: 355: MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_COMM_CONNECT);
-: 356:
-: 357: /* Create the new intercommunicator here. We need to send the
-: 358: context id to the other side. */
-: 359: /* FIXME: If we fail to connect, someone needs to free this newcomm */
1318: 360: mpi_errno = MPIR_Comm_create(newcomm);
1318: 361: if (mpi_errno) {
|
#####: 362: MPIU_ERR_POP(mpi_errno);
-: 363: }
|
1318: 364: mpi_errno = MPIR_Get_contextid( comm_ptr, &(*newcomm)->recvcontext_id );
|
1318: 365: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 366: /* FIXME why is this commented out? */
-: 367: /* (*newcomm)->context_id = (*newcomm)->recvcontext_id; */
-: 368:
|
1318: 369: rank = comm_ptr->rank;
1318: 370: local_comm_size = comm_ptr->local_size;
-: 371:
1318: 372: if (rank == root)
-: 373: {
-: 374: /* Establish a communicator to communicate with the root on the
-: 375: other side. */
669: 376: mpi_errno = MPIDI_Create_inter_root_communicator_connect(
-: 377: port_name, &tmp_comm, &new_vc);
669: 378: if (mpi_errno != MPI_SUCCESS) {
|
1: 379: MPIU_ERR_POP(mpi_errno);
-: 380: }
-: 381:
-: 382: /* Make an array to translate local ranks to process group index
-: 383: and rank */
|
668: 384: MPIU_CHKLMEM_MALLOC(local_translation,pg_translation*,
-: 385: local_comm_size*sizeof(pg_translation),
-: 386: mpi_errno,"local_translation");
-: 387:
-: 388: /* Make a list of the local communicator's process groups and encode
-: 389: them in strings to be sent to the other side.
-: 390: The encoded string for each process group contains the process
-: 391: group id, size and all its KVS values */
668: 392: mpi_errno = ExtractLocalPGInfo( comm_ptr, local_translation,
-: 393: &pg_list, &n_local_pgs );
-: 394:
-: 395:
-: 396: /* Send the remote root: n_local_pgs, local_comm_size,
-: 397: Recv from the remote root: n_remote_pgs, remote_comm_size,
-: 398: context_id for newcomm */
-: 399:
668: 400: send_ints[0] = n_local_pgs;
668: 401: send_ints[1] = local_comm_size;
668: 402: send_ints[2] = (*newcomm)->recvcontext_id;
-: 403:
-: 404: MPIU_DBG_MSG_FMT(CH3_CONNECT,VERBOSE,(MPIU_DBG_FDEST,
-: 405: "sending two ints, %d and %d, and receiving 3 ints",
-: 406: send_ints[0], send_ints[1]));
668: 407: mpi_errno = MPIC_Sendrecv(send_ints, 3, MPI_INT, 0,
-: 408: sendtag++, recv_ints, 3, MPI_INT,
-: 409: 0, recvtag++, tmp_comm->handle,
-: 410: MPI_STATUS_IGNORE);
668: 411: if (mpi_errno != MPI_SUCCESS) {
|
#####: 412: MPIU_ERR_POP(mpi_errno);
-: 413: }
-: 414: }
-: 415:
-: 416: /* broadcast the received info to local processes */
-: 417: MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"broadcasting the received 3 ints");
|
1317: 418: mpi_errno = MPIR_Bcast(recv_ints, 3, MPI_INT, root, comm_ptr);
1317: 419: if (mpi_errno) {
|
#####: 420: MPIU_ERR_POP(mpi_errno);
-: 421: }
-: 422:
|
1317: 423: n_remote_pgs = recv_ints[0];
1317: 424: remote_comm_size = recv_ints[1];
1317: 425: context_id = recv_ints[2];
-: 426:
1317: 427: MPIU_CHKLMEM_MALLOC(remote_pg,MPIDI_PG_t**,
-: 428: n_remote_pgs * sizeof(MPIDI_PG_t*),
-: 429: mpi_errno,"remote_pg");
1317: 430: MPIU_CHKLMEM_MALLOC(remote_translation,pg_translation*,
-: 431: remote_comm_size * sizeof(pg_translation),
-: 432: mpi_errno,"remote_translation");
-: 433: MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"allocated remote process groups");
-: 434:
-: 435: /* Exchange the process groups and their corresponding KVSes */
1317: 436: if (rank == root)
-: 437: {
668: 438: mpi_errno = SendPGtoPeerAndFree( tmp_comm, &sendtag, pg_list );
668: 439: mpi_errno = ReceivePGAndDistribute( tmp_comm, comm_ptr, root, &recvtag,
-: 440: n_remote_pgs, remote_pg );
-: 441: /* Receive the translations from remote process rank to process group
-: 442: index */
-: 443: MPIU_DBG_MSG_FMT(CH3_CONNECT,VERBOSE,(MPIU_DBG_FDEST,
-: 444: "sending %d ints, receiving %d ints",
-: 445: local_comm_size * 2, remote_comm_size * 2));
668: 446: mpi_errno = MPIC_Sendrecv(local_translation, local_comm_size * 2,
-: 447: MPI_INT, 0, sendtag++,
-: 448: remote_translation, remote_comm_size * 2,
-: 449: MPI_INT, 0, recvtag++, tmp_comm->handle,
-: 450: MPI_STATUS_IGNORE);
668: 451: if (mpi_errno) {
|
#####: 452: MPIU_ERR_POP(mpi_errno);
-: 453: }
-: 454:
-: 455:#ifdef MPICH_DBG_OUTPUT
-: 456: MPIU_DBG_PRINTF(("[%d]connect:Received remote_translation:\n", rank));
-: 457: for (i=0; i<remote_comm_size; i++)
-: 458: {
-: 459: MPIU_DBG_PRINTF((" remote_translation[%d].pg_index = %d\n remote_translation[%d].pg_rank = %d\n",
-: 460: i, remote_translation[i].pg_index, i, remote_translation[i].pg_rank));
-: 461: }
-: 462:#endif
-: 463: }
-: 464: else
-: 465: {
|
649: 466: mpi_errno = ReceivePGAndDistribute( tmp_comm, comm_ptr, root, &recvtag,
-: 467: n_remote_pgs, remote_pg );
-: 468: }
-: 469:
-: 470: /* Broadcast out the remote rank translation array */
-: 471: MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"Broadcasting remote translation");
1317: 472: mpi_errno = MPIR_Bcast(remote_translation, remote_comm_size * 2, MPI_INT,
-: 473: root, comm_ptr);
1317: 474: if (mpi_errno) {
|
#####: 475: MPIU_ERR_POP(mpi_errno);
-: 476: }
-: 477:#ifdef MPICH_DBG_OUTPUT
-: 478: MPIU_DBG_PRINTF(("[%d]connect:Received remote_translation after broadcast:\n", rank));
-: 479: for (i=0; i<remote_comm_size; i++)
-: 480: {
-: 481: MPIU_DBG_PRINTF((" remote_translation[%d].pg_index = %d\n remote_translation[%d].pg_rank = %d\n",
-: 482: i, remote_translation[i].pg_index, i, remote_translation[i].pg_rank));
-: 483: }
-: 484:#endif
-: 485:
|
1317: 486: intercomm = *newcomm;
1317: 487: intercomm->context_id = context_id;
1317: 488: intercomm->is_low_group = 1;
-: 489:
1317: 490: mpi_errno = SetupNewIntercomm( comm_ptr, remote_comm_size,
-: 491: remote_translation, remote_pg, intercomm );
1317: 492: if (mpi_errno != MPI_SUCCESS) {
|
#####: 493: MPIU_ERR_POP(mpi_errno);
-: 494: }
-: 495:
-: 496: /* synchronize with remote root */
|
1317: 497: if (rank == root)
-: 498: {
-: 499: MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"sync with peer");
668: 500: mpi_errno = MPIC_Sendrecv(&i, 0, MPI_INT, 0,
-: 501: sendtag++, &j, 0, MPI_INT,
-: 502: 0, recvtag++, tmp_comm->handle,
-: 503: MPI_STATUS_IGNORE);
668: 504: if (mpi_errno != MPI_SUCCESS) {
|
#####: 505: MPIU_ERR_POP(mpi_errno);
-: 506: }
-: 507:
-: 508: /* All communication with remote root done. Release the communicator. */
|
668: 509: MPIR_Comm_release(tmp_comm,0);
-: 510: }
-: 511:
-: 512: /*printf("connect:barrier\n");fflush(stdout);*/
1317: 513: mpi_errno = MPIR_Barrier(comm_ptr);
1317: 514: if (mpi_errno != MPI_SUCCESS) {
|
#####: 515: MPIU_ERR_POP(mpi_errno);
-: 516: }
-: 517:
-: 518: /* Free new_vc. It was explicitly allocated in MPIDI_CH3_Connect_to_root.*/
|
1317: 519: if (rank == root) {
668: 520: FreeNewVC( new_vc );
-: 521: }
-: 522:
-: 523: fn_exit:
-: 524: MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"Exiting ch3u_comm_connect");
3302: 525: MPIU_CHKLMEM_FREEALL();
|
-: 526: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_COMM_CONNECT);
|
1318: 527: return mpi_errno;
|
-: 528: fn_fail:
-: 529: goto fn_exit;
-: 530:}
-: 531:
-: 532:/*
-: 533: * Extract all of the process groups from the given communicator and
-: 534: * form a list (returned in pg_list) of those process groups.
-: 535: * Also returned is an array (local_translation) that contains tuples mapping
-: 536: * rank in process group to rank in that communicator (local translation
-: 537: * must be allocated before this routine is called). The number of
-: 538: * distinct process groups is returned in n_local_pgs_p .
-: 539: *
-: 540: * This allows an intercomm_create to exchange the full description of
-: 541: * all of the process groups that have made up the communicator that
-: 542: * will define the "remote group".
-: 543: */
-: 544:#undef FUNCNAME
-: 545:#define FUNCNAME ExtractLocalPGInfo
-: 546:#undef FCNAME
-: 547:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 548:static int ExtractLocalPGInfo( MPID_Comm *comm_p,
-: 549: pg_translation local_translation[],
-: 550: pg_node **pg_list_p,
-: 551: int *n_local_pgs_p )
|
1337: 552:{
1337: 553: pg_node *pg_list = 0, *pg_iter, *pg_trailer;
1337: 554: int i, cur_index = 0, local_comm_size, mpi_errno = 0;
1337: 555: MPIU_CHKPMEM_DECL(1);
-: 556: MPIDI_STATE_DECL(MPID_STATE_EXTRACTLOCALPGINFO);
-: 557:
-: 558: MPIDI_FUNC_ENTER(MPID_STATE_EXTRACTLOCALPGINFO);
-: 559:
-: 560: /* If we are in the case of singleton-init, we may need to reset the
-: 561: id string for comm world. We do this before doing anything else */
1337: 562: MPIDI_PG_CheckForSingleton();
-: 563:
1337: 564: local_comm_size = comm_p->local_size;
-: 565:
-: 566: /* Make a list of the local communicator's process groups and encode
-: 567: them in strings to be sent to the other side.
-: 568: The encoded string for each process group contains the process
-: 569: group id, size and all its KVS values */
-: 570:
1337: 571: cur_index = 0;
1337: 572: MPIU_CHKPMEM_MALLOC(pg_list,pg_node*,sizeof(pg_node),mpi_errno,
-: 573: "pg_list");
-: 574:
1337: 575: pg_list->pg_id = MPIU_Strdup(comm_p->vcr[0]->pg->id);
1337: 576: pg_list->index = cur_index++;
1337: 577: pg_list->next = NULL;
-: 578: /* XXX DJG FIXME-MT should we be checking this? the add/release macros already check this */
1337: 579: MPIU_Assert( MPIU_Object_get_ref(comm_p->vcr[0]->pg));
1337: 580: mpi_errno = MPIDI_PG_To_string(comm_p->vcr[0]->pg, &pg_list->str,
-: 581: &pg_list->lenStr );
1337: 582: if (mpi_errno != MPI_SUCCESS) {
|
#####: 583: MPIU_ERR_POP(mpi_errno);
-: 584: }
-: 585: MPIU_DBG_STMT(CH3_CONNECT,VERBOSE,MPIDI_PrintConnStr(__FILE__,__LINE__,"PG as string is", pg_list->str ));
|
1337: 586: local_translation[0].pg_index = 0;
1337: 587: local_translation[0].pg_rank = comm_p->vcr[0]->pg_rank;
1337: 588: pg_iter = pg_list;
2704: 589: for (i=1; i<local_comm_size; i++) {
1367: 590: pg_iter = pg_list;
1367: 591: pg_trailer = pg_list;
3134: 592: while (pg_iter != NULL) {
-: 593: /* Check to ensure pg is (probably) valid */
-: 594: /* XXX DJG FIXME-MT should we be checking this? the add/release macros already check this */
1567: 595: MPIU_Assert(MPIU_Object_get_ref(comm_p->vcr[i]->pg) != 0);
1567: 596: if (MPIDI_PG_Id_compare(comm_p->vcr[i]->pg->id, pg_iter->pg_id)) {
1167: 597: local_translation[i].pg_index = pg_iter->index;
1167: 598: local_translation[i].pg_rank = comm_p->vcr[i]->pg_rank;
1167: 599: break;
-: 600: }
400: 601: if (pg_trailer != pg_iter)
|
#####: 602: pg_trailer = pg_trailer->next;
|
400: 603: pg_iter = pg_iter->next;
-: 604: }
1367: 605: if (pg_iter == NULL) {
-: 606: /* We use MPIU_Malloc directly because we do not know in
-: 607: advance how many nodes we may allocate */
200: 608: pg_iter = (pg_node*)MPIU_Malloc(sizeof(pg_node));
200: 609: if (!pg_iter) {
|
#####: 610: MPIU_ERR_POP(mpi_errno);
-: 611: }
|
200: 612: pg_iter->pg_id = MPIU_Strdup(comm_p->vcr[i]->pg->id);
200: 613: pg_iter->index = cur_index++;
200: 614: pg_iter->next = NULL;
200: 615: mpi_errno = MPIDI_PG_To_string(comm_p->vcr[i]->pg, &pg_iter->str,
-: 616: &pg_iter->lenStr );
200: 617: if (mpi_errno != MPI_SUCCESS) {
|
#####: 618: MPIU_ERR_POP(mpi_errno);
-: 619: }
|
200: 620: local_translation[i].pg_index = pg_iter->index;
200: 621: local_translation[i].pg_rank = comm_p->vcr[i]->pg_rank;
200: 622: pg_trailer->next = pg_iter;
-: 623: }
-: 624: }
-: 625:
1337: 626: *n_local_pgs_p = cur_index;
1337: 627: *pg_list_p = pg_list;
-: 628:
-: 629:#ifdef MPICH_DBG_OUTPUT
-: 630: pg_iter = pg_list;
-: 631: while (pg_iter != NULL) {
-: 632: MPIU_DBG_PRINTF(("connect:PG: '%s'\n<%s>\n", pg_iter->pg_id, pg_iter->str));
-: 633: pg_iter = pg_iter->next;
-: 634: }
-: 635:#endif
-: 636:
-: 637:
1337: 638: fn_exit:
|
-: 639: MPIDI_FUNC_EXIT(MPID_STATE_EXTRACTLOCALPGINFO);
|
1337: 640: return mpi_errno;
|
-: 641: fn_fail:
|
#####: 642: MPIU_CHKPMEM_REAP();
-: 643: goto fn_exit;
-: 644:}
-: 645:
-: 646:
-: 647:/* The root process in comm_ptr receives strings describing the
-: 648: process groups and then distributes them to the other processes
-: 649: in comm_ptr.
-: 650: See SendPGToPeer for the routine that sends the descriptions */
-: 651:#undef FUNCNAME
-: 652:#define FUNCNAME ReceivePGAndDistribute
-: 653:#undef FCNAME
-: 654:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 655:static int ReceivePGAndDistribute( MPID_Comm *tmp_comm, MPID_Comm *comm_ptr,
-: 656: int root, int *recvtag_p,
-: 657: int n_remote_pgs, MPIDI_PG_t *remote_pg[] )
|
2703: 658:{
2703: 659: char *pg_str = 0;
-: 660: int i, j, flag;
2703: 661: int rank = comm_ptr->rank;
2703: 662: int mpi_errno = 0;
2703: 663: int recvtag = *recvtag_p;
-: 664: MPIDI_STATE_DECL(MPID_STATE_RECEIVEPGANDDISTRIBUTE);
-: 665:
-: 666: MPIDI_FUNC_ENTER(MPID_STATE_RECEIVEPGANDDISTRIBUTE);
-: 667:
6106: 668: for (i=0; i<n_remote_pgs; i++) {
-: 669:
3403: 670: if (rank == root) {
-: 671: /* First, receive the pg description from the partner */
1536: 672: mpi_errno = MPIC_Recv(&j, 1, MPI_INT, 0, recvtag++,
-: 673: tmp_comm->handle, MPI_STATUS_IGNORE);
1536: 674: *recvtag_p = recvtag;
1536: 675: if (mpi_errno != MPI_SUCCESS) {
|
#####: 676: MPIU_ERR_POP(mpi_errno);
-: 677: }
|
1536: 678: pg_str = (char*)MPIU_Malloc(j);
1536: 679: if (pg_str == NULL) {
|
#####: 680: MPIU_ERR_POP(mpi_errno);
-: 681: }
|
1536: 682: mpi_errno = MPIC_Recv(pg_str, j, MPI_CHAR, 0, recvtag++,
-: 683: tmp_comm->handle, MPI_STATUS_IGNORE);
1536: 684: *recvtag_p = recvtag;
1536: 685: if (mpi_errno != MPI_SUCCESS) {
|
#####: 686: MPIU_ERR_POP(mpi_errno);
-: 687: }
-: 688: }
-: 689:
-: 690: /* Broadcast the size and data to the local communicator */
-: 691: /*printf("accept:broadcasting 1 int\n");fflush(stdout);*/
|
3403: 692: mpi_errno = MPIR_Bcast(&j, 1, MPI_INT, root, comm_ptr);
3403: 693: if (mpi_errno != MPI_SUCCESS) {
|
#####: 694: MPIU_ERR_POP(mpi_errno);
-: 695: }
-: 696:
|
3403: 697: if (rank != root) {
-: 698: /* The root has already allocated this string */
1867: 699: pg_str = (char*)MPIU_Malloc(j);
1867: 700: if (pg_str == NULL) {
|
#####: 701: MPIU_ERR_POP(mpi_errno);
-: 702: }
-: 703: }
-: 704: /*printf("accept:broadcasting string of length %d\n", j);fflush(stdout);*/
|
3403: 705: mpi_errno = MPIR_Bcast(pg_str, j, MPI_CHAR, root, comm_ptr);
3403: 706: if (mpi_errno != MPI_SUCCESS) {
|
#####: 707: MPIU_ERR_POP(mpi_errno);
-: 708: }
-: 709: /* Then reconstruct the received process group. This step
-: 710: also initializes the created process group */
-: 711:
-: 712: MPIU_DBG_STMT(CH3_CONNECT,VERBOSE,MPIDI_PrintConnStr(__FILE__,__LINE__,"Creating pg from string", pg_str ));
|
3403: 713: mpi_errno = MPIDI_PG_Create_from_string(pg_str, &remote_pg[i], &flag);
3403: 714: if (mpi_errno != MPI_SUCCESS) {
|
#####: 715: MPIU_ERR_POP(mpi_errno);
-: 716: }
-: 717:
|
3403: 718: MPIU_Free(pg_str);
3403: 719: if (flag) {
-: 720: /* FIXME: If this is really needed, make it a destroy callback
-: 721: on the process group rather than an SSHM-specific item */
-: 722:#ifdef MPIDI_CH3_USES_SSHM
-: 723: /* extra pg ref needed for shared memory modules because the
-: 724: * shm_XXXXing_list's
-: 725: * need to be walked though in the later stages of finalize to
-: 726: * free queue_info's.
-: 727: */
-: 728: /* FIXME: Need to understand this and either remove or make
-: 729: common to all channels */
-: 730: MPIDI_PG_add_ref(remote_pg[i]);
-: 731:#endif
-: 732: }
-: 733: }
2703: 734: fn_exit:
|
-: 735: MPIDI_FUNC_EXIT(MPID_STATE_RECEIVEPGANDDISTRIBUTE);
|
2703: 736: return mpi_errno;
|
-: 737: fn_fail:
-: 738: goto fn_exit;
-: 739:}
-: 740:
-: 741:/* Used internally to broadcast process groups belonging to peercomm to
-: 742: all processes in comm. The process with rank root in comm is the
-: 743: process in peercomm from which the process groups are taken. This routine
-: 744: is collective over comm_p . */
-: 745:#undef FUNCNAME
-: 746:#define FUNCNAME MPID_PG_BCast
-: 747:#undef FCNAME
-: 748:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 749:int MPID_PG_BCast( MPID_Comm *peercomm_p, MPID_Comm *comm_p, int root )
|
2: 750:{
2: 751: int n_local_pgs=0, mpi_errno = 0;
2: 752: pg_translation *local_translation = 0;
2: 753: pg_node *pg_list, *pg_next, *pg_head = 0;
-: 754: int rank, i, peer_comm_size;
2: 755: MPIU_CHKLMEM_DECL(1);
-: 756:
2: 757: peer_comm_size = comm_p->local_size;
2: 758: rank = comm_p->rank;
-: 759:
2: 760: MPIU_CHKLMEM_MALLOC(local_translation,pg_translation*,
-: 761: peer_comm_size*sizeof(pg_translation),
-: 762: mpi_errno,"local_translation");
-: 763:
2: 764: if (rank == root) {
-: 765: /* Get the process groups known to the *peercomm* */
1: 766: ExtractLocalPGInfo( peercomm_p, local_translation, &pg_head,
-: 767: &n_local_pgs );
-: 768: }
-: 769:
-: 770: /* Now, broadcast the number of local pgs */
2: 771: NMPI_Bcast( &n_local_pgs, 1, MPI_INT, root, comm_p->handle );
-: 772:
2: 773: pg_list = pg_head;
4: 774: for (i=0; i<n_local_pgs; i++) {
-: 775: int len, flag;
2: 776: char *pg_str=0;
-: 777: MPIDI_PG_t *pgptr;
-: 778:
2: 779: if (rank == root) {
1: 780: if (!pg_list) {
-: 781: /* FIXME: Error, the pg_list is broken */
|
#####: 782: printf( "Unexpected end of pg_list\n" ); fflush(stdout);
#####: 783: break;
-: 784: }
-: 785:
|
1: 786: pg_str = pg_list->str;
1: 787: len = pg_list->lenStr;
1: 788: pg_list = pg_list->next;
-: 789: }
2: 790: NMPI_Bcast( &len, 1, MPI_INT, root, comm_p->handle );
2: 791: if (rank != root) {
1: 792: pg_str = (char *)MPIU_Malloc(len);
-: 793: }
2: 794: NMPI_Bcast( pg_str, len, MPI_CHAR, root, comm_p->handle );
2: 795: if (rank != root) {
-: 796: /* flag is true if the pg was created, false if it
-: 797: already existed. This step
-: 798: also initializes the created process group */
1: 799: MPIDI_PG_Create_from_string( pg_str, &pgptr, &flag );
1: 800: if (flag) {
-: 801: /*printf( "[%d]Added pg named %s to list\n", rank,
-: 802: (char *)pgptr->id );
-: 803: fflush(stdout); */
-: 804: }
1: 805: MPIU_Free( pg_str );
-: 806: }
-: 807: }
-: 808:
-: 809: /* Free pg_list */
2: 810: pg_list = pg_head;
-: 811:
-: 812: /* FIXME: We should use the PG destroy function for this, and ensure that
-: 813: the PG fields are valid for that function */
5: 814: while (pg_list) {
1: 815: pg_next = pg_list->next;
1: 816: MPIU_Free( pg_list->str );
1: 817: if (pg_list->pg_id ) {
1: 818: MPIU_Free( pg_list->pg_id );
-: 819: }
1: 820: MPIU_Free( pg_list );
1: 821: pg_list = pg_next;
-: 822: }
-: 823:
-: 824: fn_exit:
2: 825: MPIU_CHKLMEM_FREEALL();
2: 826: return mpi_errno;
|
-: 827: fn_fail:
-: 828: goto fn_exit;
-: 829:}
-: 830:
-: 831:/* Sends the process group information to the peer and frees the
-: 832: pg_list */
-: 833:#undef FUNCNAME
-: 834:#define FUNCNAME SendPGtoPeerAndFree
-: 835:#undef FCNAME
-: 836:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 837:static int SendPGtoPeerAndFree( MPID_Comm *tmp_comm, int *sendtag_p,
-: 838: pg_node *pg_list )
|
1336: 839:{
1336: 840: int mpi_errno = 0;
1336: 841: int sendtag = *sendtag_p, i;
-: 842: pg_node *pg_iter;
-: 843: MPIDI_STATE_DECL(MPID_STATE_SENDPGTOPEERANDFREE);
-: 844:
-: 845: MPIDI_FUNC_ENTER(MPID_STATE_SENDPGTOPEERANDFREE);
-: 846:
4208: 847: while (pg_list != NULL) {
1536: 848: pg_iter = pg_list;
1536: 849: i = pg_iter->lenStr;
-: 850: /*printf("connect:sending 1 int: %d\n", i);fflush(stdout);*/
1536: 851: mpi_errno = MPIC_Send(&i, 1, MPI_INT, 0, sendtag++, tmp_comm->handle);
1536: 852: *sendtag_p = sendtag;
1536: 853: if (mpi_errno != MPI_SUCCESS) {
|
#####: 854: MPIU_ERR_POP(mpi_errno);
-: 855: }
-: 856:
-: 857: /* printf("connect:sending string length %d\n", i);fflush(stdout); */
|
1536: 858: mpi_errno = MPIC_Send(pg_iter->str, i, MPI_CHAR, 0, sendtag++,
-: 859: tmp_comm->handle);
1536: 860: *sendtag_p = sendtag;
1536: 861: if (mpi_errno != MPI_SUCCESS) {
|
#####: 862: MPIU_ERR_POP(mpi_errno);
-: 863: }
-: 864:
|
1536: 865: pg_list = pg_list->next;
1536: 866: MPIU_Free(pg_iter->str);
1536: 867: MPIU_Free(pg_iter->pg_id);
1536: 868: MPIU_Free(pg_iter);
-: 869: }
-: 870:
1336: 871: fn_exit:
|
-: 872: MPIDI_FUNC_EXIT(MPID_STATE_SENDPGTOPEERANDFREE);
|
1336: 873: return mpi_errno;
|
-: 874: fn_fail:
-: 875: goto fn_exit;
-: 876:}
-: 877:
-: 878:/* ---------------------------------------------------------------------- */
-: 879:/*
-: 880: * MPIDI_Comm_accept()
-: 881:
-: 882: Algorithm: First dequeue the vc from the accept queue (it was
-: 883: enqueued by the progress engine in response to a connect request
-: 884: from the root process that is attempting the connection on
-: 885: the connect side). Use this vc to create an
-: 886: intercommunicator between this root and the root on the connect
-: 887: side. Use this intercomm. to communicate the other information
-: 888: needed to create the real intercommunicator between the processes
-: 889: on the two sides. Then free the intercommunicator between the
-: 890: roots. Most of the complexity is because there can be multiple
-: 891: process groups on each side.
-: 892:
-: 893: */
-: 894:#undef FUNCNAME
-: 895:#define FUNCNAME MPIDI_Comm_accept
-: 896:#undef FCNAME
-: 897:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 898:int MPIDI_Comm_accept(const char *port_name, MPID_Info *info, int root,
-: 899: MPID_Comm *comm_ptr, MPID_Comm **newcomm)
|
1386: 900:{
1386: 901: int mpi_errno=MPI_SUCCESS;
-: 902: int i, j, rank, recv_ints[3], send_ints[3], context_id;
1386: 903: int remote_comm_size=0;
1386: 904: MPID_Comm *tmp_comm = NULL, *intercomm;
1386: 905: MPIDI_VC_t *new_vc = NULL;
1386: 906: int sendtag=100, recvtag=100, local_comm_size;
1386: 907: int n_local_pgs=1, n_remote_pgs;
1386: 908: pg_translation *local_translation = NULL, *remote_translation = NULL;
1386: 909: pg_node *pg_list = NULL;
1386: 910: MPIDI_PG_t **remote_pg = NULL;
1386: 911: MPIU_CHKLMEM_DECL(3);
-: 912: MPIDI_STATE_DECL(MPID_STATE_MPIDI_COMM_ACCEPT);
-: 913:
-: 914: MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_COMM_ACCEPT);
-: 915:
-: 916: /* Create the new intercommunicator here. We need to send the
-: 917: context id to the other side. */
1386: 918: mpi_errno = MPIR_Comm_create(newcomm);
1386: 919: if (mpi_errno != MPI_SUCCESS) {
|
#####: 920: MPIU_ERR_POP(mpi_errno);
-: 921: }
|
1386: 922: mpi_errno = MPIR_Get_contextid( comm_ptr, &(*newcomm)->recvcontext_id );
|
1386: 923: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 924: /* FIXME why is this commented out? */
-: 925: /* (*newcomm)->context_id = (*newcomm)->recvcontext_id; */
-: 926:
|
1386: 927: rank = comm_ptr->rank;
1386: 928: local_comm_size = comm_ptr->local_size;
-: 929:
1386: 930: if (rank == root)
-: 931: {
-: 932: /* Establish a communicator to communicate with the root on the
-: 933: other side. */
668: 934: mpi_errno = MPIDI_Create_inter_root_communicator_accept(port_name,
-: 935: &tmp_comm, &new_vc);
668: 936: if (mpi_errno != MPI_SUCCESS) {
|
#####: 937: MPIU_ERR_POP(mpi_errno);
-: 938: }
-: 939:
-: 940: /* Make an array to translate local ranks to process group index and
-: 941: rank */
|
668: 942: MPIU_CHKLMEM_MALLOC(local_translation,pg_translation*,
-: 943: local_comm_size*sizeof(pg_translation),
-: 944: mpi_errno,"local_translation");
-: 945:
-: 946: /* Make a list of the local communicator's process groups and encode
-: 947: them in strings to be sent to the other side.
-: 948: The encoded string for each process group contains the process
-: 949: group id, size and all its KVS values */
668: 950: mpi_errno = ExtractLocalPGInfo( comm_ptr, local_translation,
-: 951: &pg_list, &n_local_pgs );
-: 952: /* Send the remote root: n_local_pgs, local_comm_size, context_id for
-: 953: newcomm.
-: 954: Recv from the remote root: n_remote_pgs, remote_comm_size */
-: 955:
668: 956: send_ints[0] = n_local_pgs;
668: 957: send_ints[1] = local_comm_size;
668: 958: send_ints[2] = (*newcomm)->recvcontext_id;
-: 959:
-: 960: /*printf("accept:sending 3 ints, %d, %d, %d, and receiving 2 ints\n", send_ints[0], send_ints[1], send_ints[2]);fflush(stdout);*/
668: 961: mpi_errno = MPIC_Sendrecv(send_ints, 3, MPI_INT, 0,
-: 962: sendtag++, recv_ints, 3, MPI_INT,
-: 963: 0, recvtag++, tmp_comm->handle,
-: 964: MPI_STATUS_IGNORE);
668: 965: if (mpi_errno != MPI_SUCCESS) {
|
#####: 966: MPIU_ERR_POP(mpi_errno);
-: 967: }
-: 968: }
-: 969:
-: 970: /* broadcast the received info to local processes */
-: 971: /*printf("accept:broadcasting 2 ints - %d and %d\n", recv_ints[0], recv_ints[1]);fflush(stdout);*/
|
1386: 972: mpi_errno = MPIR_Bcast(recv_ints, 3, MPI_INT, root, comm_ptr);
1386: 973: if (mpi_errno) {
|
#####: 974: MPIU_ERR_POP(mpi_errno);
-: 975: }
-: 976:
|
1386: 977: n_remote_pgs = recv_ints[0];
1386: 978: remote_comm_size = recv_ints[1];
1386: 979: context_id = recv_ints[2];
1386: 980: MPIU_CHKLMEM_MALLOC(remote_pg,MPIDI_PG_t**,
-: 981: n_remote_pgs * sizeof(MPIDI_PG_t*),
-: 982: mpi_errno,"remote_pg");
1386: 983: MPIU_CHKLMEM_MALLOC(remote_translation,pg_translation*,
-: 984: remote_comm_size * sizeof(pg_translation),
-: 985: mpi_errno, "remote_translation");
-: 986: MPIU_DBG_PRINTF(("[%d]accept:remote process groups: %d\nremote comm size: %d\n", rank, n_remote_pgs, remote_comm_size));
-: 987:
-: 988: /* Exchange the process groups and their corresponding KVSes */
1386: 989: if (rank == root)
-: 990: {
-: 991: /* The root receives the PG from the peer (in tmp_comm) and
-: 992: distributes them to the processes in comm_ptr */
668: 993: mpi_errno = ReceivePGAndDistribute( tmp_comm, comm_ptr, root, &recvtag,
-: 994: n_remote_pgs, remote_pg );
-: 995:
668: 996: mpi_errno = SendPGtoPeerAndFree( tmp_comm, &sendtag, pg_list );
-: 997:
-: 998: /* Receive the translations from remote process rank to process group index */
-: 999: /*printf("accept:sending %d ints and receiving %d ints\n", local_comm_size * 2, remote_comm_size * 2);fflush(stdout);*/
668: 1000: mpi_errno = MPIC_Sendrecv(local_translation, local_comm_size * 2,
-: 1001: MPI_INT, 0, sendtag++,
-: 1002: remote_translation, remote_comm_size * 2,
-: 1003: MPI_INT, 0, recvtag++, tmp_comm->handle,
-: 1004: MPI_STATUS_IGNORE);
-: 1005:#ifdef MPICH_DBG_OUTPUT
-: 1006: MPIU_DBG_PRINTF(("[%d]accept:Received remote_translation:\n", rank));
-: 1007: for (i=0; i<remote_comm_size; i++)
-: 1008: {
-: 1009: MPIU_DBG_PRINTF((" remote_translation[%d].pg_index = %d\n remote_translation[%d].pg_rank = %d\n",
-: 1010: i, remote_translation[i].pg_index, i, remote_translation[i].pg_rank));
-: 1011: }
-: 1012:#endif
-: 1013: }
-: 1014: else
-: 1015: {
718: 1016: mpi_errno = ReceivePGAndDistribute( tmp_comm, comm_ptr, root, &recvtag,
-: 1017: n_remote_pgs, remote_pg );
-: 1018: }
-: 1019:
-: 1020: /* Broadcast out the remote rank translation array */
-: 1021: MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"Broadcast remote_translation");
1386: 1022: mpi_errno = MPIR_Bcast(remote_translation, remote_comm_size * 2, MPI_INT,
-: 1023: root, comm_ptr);
-: 1024:#ifdef MPICH_DBG_OUTPUT
-: 1025: MPIU_DBG_PRINTF(("[%d]accept:Received remote_translation after broadcast:\n", rank));
-: 1026: for (i=0; i<remote_comm_size; i++)
-: 1027: {
-: 1028: MPIU_DBG_PRINTF((" remote_translation[%d].pg_index = %d\n remote_translation[%d].pg_rank = %d\n",
-: 1029: i, remote_translation[i].pg_index, i, remote_translation[i].pg_rank));
-: 1030: }
-: 1031:#endif
-: 1032:
-: 1033:
-: 1034: /* Now fill in newcomm */
1386: 1035: intercomm = *newcomm;
1386: 1036: intercomm->context_id = context_id;
1386: 1037: intercomm->is_low_group = 0;
-: 1038:
1386: 1039: mpi_errno = SetupNewIntercomm( comm_ptr, remote_comm_size,
-: 1040: remote_translation, remote_pg, intercomm );
1386: 1041: if (mpi_errno != MPI_SUCCESS) {
|
#####: 1042: MPIU_ERR_POP(mpi_errno);
-: 1043: }
-: 1044:
-: 1045: /* synchronize with remote root */
|
1386: 1046: if (rank == root)
-: 1047: {
-: 1048: MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"sync with peer");
668: 1049: mpi_errno = MPIC_Sendrecv(&i, 0, MPI_INT, 0,
-: 1050: sendtag++, &j, 0, MPI_INT,
-: 1051: 0, recvtag++, tmp_comm->handle,
-: 1052: MPI_STATUS_IGNORE);
668: 1053: if (mpi_errno != MPI_SUCCESS) {
|
#####: 1054: MPIU_ERR_POP(mpi_errno);
-: 1055: }
-: 1056:
-: 1057: /* All communication with remote root done. Release the communicator. */
|
668: 1058: MPIR_Comm_release(tmp_comm,0);
-: 1059: }
-: 1060:
-: 1061: MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"Barrier");
1386: 1062: mpi_errno = MPIR_Barrier(comm_ptr);
1386: 1063: if (mpi_errno != MPI_SUCCESS) {
|
#####: 1064: MPIU_ERR_POP(mpi_errno);
-: 1065: }
-: 1066:
-: 1067: /* Free new_vc once the connection is completed. It was explicitly
-: 1068: allocated in ch3_progress.c and returned by
-: 1069: MPIDI_CH3I_Acceptq_dequeue. */
|
1386: 1070: if (rank == root) {
668: 1071: FreeNewVC( new_vc );
-: 1072: }
-: 1073:
-: 1074:fn_exit:
3440: 1075: MPIU_CHKLMEM_FREEALL();
|
-: 1076: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_COMM_ACCEPT);
|
1386: 1077: return mpi_errno;
-: 1078:
|
-: 1079:fn_fail:
-: 1080: goto fn_exit;
-: 1081:}
-: 1082:
-: 1083:/* ------------------------------------------------------------------------- */
-: 1084:
-: 1085:/* This routine initializes the new intercomm, setting up the
-: 1086: VCRT and other common structures. The is_low_group and context_id
-: 1087: fields are NOT set because they differ in the use of this
-: 1088: routine in Comm_accept and Comm_connect. The virtual connections
-: 1089: are initialized from a collection of process groups.
-: 1090:
-: 1091: Input parameters:
-: 1092:+ comm_ptr - communicator that gives the group for the "local" group on the
-: 1093: new intercommnicator
-: 1094:. remote_comm_size - size of remote group
-: 1095:. remote_translation - array that specifies the process group and rank in
-: 1096: that group for each of the processes to include in the remote group of the
-: 1097: new intercommunicator
-: 1098:- remote_pg - array of remote process groups
-: 1099:
-: 1100: Input/Output Parameter:
-: 1101:. intercomm - New intercommunicator. The intercommunicator must already
-: 1102: have been allocated; this routine initializes many of the fields
-: 1103:
-: 1104: Note:
-: 1105: This routine performance a barrier over 'comm_ptr'. Why?
-: 1106:*/
-: 1107:#undef FUNCNAME
-: 1108:#define FUNCNAME SetupNewIntercomm
-: 1109:#undef FCNAME
-: 1110:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 1111:static int SetupNewIntercomm( MPID_Comm *comm_ptr, int remote_comm_size,
-: 1112: pg_translation remote_translation[],
-: 1113: MPIDI_PG_t **remote_pg,
-: 1114: MPID_Comm *intercomm )
|
2703: 1115:{
2703: 1116: int mpi_errno = MPI_SUCCESS, i;
-: 1117:
-: 1118: /* FIXME: How much of this could/should be common with the
-: 1119: upper level (src/mpi/comm/ *.c) code? For best robustness,
-: 1120: this should use the same routine (not copy/paste code) as
-: 1121: in the upper level code. */
2703: 1122: intercomm->attributes = NULL;
2703: 1123: intercomm->remote_size = remote_comm_size;
2703: 1124: intercomm->local_size = comm_ptr->local_size;
2703: 1125: intercomm->rank = comm_ptr->rank;
2703: 1126: intercomm->local_group = NULL;
2703: 1127: intercomm->remote_group = NULL;
2703: 1128: intercomm->comm_kind = MPID_INTERCOMM;
2703: 1129: intercomm->local_comm = NULL;
2703: 1130: intercomm->coll_fns = NULL;
-: 1131:
-: 1132: /* Point local vcr, vcrt at those of incoming intracommunicator */
2703: 1133: intercomm->local_vcrt = comm_ptr->vcrt;
2703: 1134: MPID_VCRT_Add_ref(comm_ptr->vcrt);
2703: 1135: intercomm->local_vcr = comm_ptr->vcr;
-: 1136:
-: 1137: /* Set up VC reference table */
2703: 1138: mpi_errno = MPID_VCRT_Create(intercomm->remote_size, &intercomm->vcrt);
2703: 1139: if (mpi_errno != MPI_SUCCESS) {
|
#####: 1140: MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**init_vcrt");
-: 1141: }
|
2703: 1142: mpi_errno = MPID_VCRT_Get_ptr(intercomm->vcrt, &intercomm->vcr);
2703: 1143: if (mpi_errno != MPI_SUCCESS) {
|
#####: 1144: MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**init_getptr");
-: 1145: }
|
9641: 1146: for (i=0; i < intercomm->remote_size; i++) {
6938: 1147: MPIDI_PG_Dup_vcr(remote_pg[remote_translation[i].pg_index],
-: 1148: remote_translation[i].pg_rank, &intercomm->vcr[i]);
-: 1149: }
-: 1150:
-: 1151: MPIU_DBG_MSG(CH3_CONNECT,VERBOSE,"Barrier");
2703: 1152: mpi_errno = MPIR_Barrier(comm_ptr);
2703: 1153: if (mpi_errno != MPI_SUCCESS) {
|
#####: 1154: MPIU_ERR_POP(mpi_errno);
-: 1155: }
-: 1156:
|
2703: 1157: fn_exit:
2703: 1158: return mpi_errno;
-: 1159:
|
-: 1160: fn_fail:
-: 1161: goto fn_exit;
-: 1162:}
-: 1163:
-: 1164:/* Free new_vc. It was explicitly allocated in MPIDI_CH3_Connect_to_root. */
-: 1165:/* FIXME: The free and the create routines should be in the same file */
-: 1166:#undef FUNCNAME
-: 1167:#define FUNCNAME FreeNewVC
-: 1168:#undef FCNAME
-: 1169:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 1170:static int FreeNewVC( MPIDI_VC_t *new_vc )
|
1336: 1171:{
-: 1172: MPID_Progress_state progress_state;
1336: 1173: int mpi_errno = MPI_SUCCESS;
-: 1174:
1336: 1175: if (new_vc->state != MPIDI_VC_STATE_INACTIVE) {
-: 1176: /* If the new_vc isn't done, run the progress engine until
-: 1177: the state of the new vc is complete */
1238: 1178: MPID_Progress_start(&progress_state);
4198: 1179: while (new_vc->state != MPIDI_VC_STATE_INACTIVE) {
1722: 1180: mpi_errno = MPID_Progress_wait(&progress_state);
|
-: 1181: /* --BEGIN ERROR HANDLING-- */
1722: 1182: if (mpi_errno != MPI_SUCCESS)
-: 1183: {
-: 1184: MPID_Progress_end(&progress_state);
#####: 1185: MPIU_ERR_POP(mpi_errno);
-: 1186: }
-: 1187: /* --END ERROR HANDLING-- */
-: 1188: }
-: 1189: MPID_Progress_end(&progress_state);
-: 1190: }
-: 1191:
-: 1192: /* FIXME: remove this ifdef - method on connection? */
-: 1193:#ifdef MPIDI_CH3_HAS_CONN_ACCEPT_HOOK
-: 1194: /* FIXME should this be an MPIU_CALL macro? */
-: 1195: mpi_errno = MPIDI_CH3_Cleanup_after_connection( new_vc );
-: 1196:#endif
-: 1197:
|
1336: 1198: MPIU_CALL(MPIDI_CH3,VC_Destroy(new_vc));
1336: 1199: MPIU_Free(new_vc);
-: 1200:
|
1336: 1201: fn_fail:
|
1336: 1202: return mpi_errno;
-: 1203:}
-: 1204:
-: 1205:/* ------------------------------------------------------------------------- */
-: 1206:/*
-: 1207: *
-: 1208: */
-: 1209:
-: 1210:/* FIXME: What is an Accept queue and who uses it?
-: 1211: Is this part of the connect/accept support?
-: 1212: These routines appear to be called by channel progress routines;
-: 1213: perhaps this belongs in util/sock (note the use of a port_name_tag in the
-: 1214: dequeue code, though this could be any string).
-: 1215:
-: 1216: Are the locks required? If this is only called within the progress
-: 1217: engine, then the progress engine locks should be sufficient. If a
-: 1218: finer grain lock model is used, it needs to be very carefully
-: 1219: designed and documented.
-: 1220:*/
-: 1221:
-: 1222:typedef struct MPIDI_CH3I_Acceptq_s
-: 1223:{
-: 1224: struct MPIDI_VC *vc;
-: 1225: int port_name_tag;
-: 1226: struct MPIDI_CH3I_Acceptq_s *next;
-: 1227:}
-: 1228:MPIDI_CH3I_Acceptq_t;
-: 1229:
-: 1230:static MPIDI_CH3I_Acceptq_t * acceptq_head=0;
-: 1231:static int maxAcceptQueueSize = 0;
-: 1232:static int AcceptQueueSize = 0;
-: 1233:
-: 1234:#undef FUNCNAME
-: 1235:#define FUNCNAME MPIDI_CH3I_Acceptq_enqueue
-: 1236:#undef FCNAME
-: 1237:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 1238:int MPIDI_CH3I_Acceptq_enqueue(MPIDI_VC_t * vc, int port_name_tag )
668: 1239:{
668: 1240: int mpi_errno=MPI_SUCCESS;
-: 1241: MPIDI_CH3I_Acceptq_t *q_item;
-: 1242: MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_ACCEPTQ_ENQUEUE);
-: 1243:
-: 1244: MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_ACCEPTQ_ENQUEUE);
-: 1245:
-: 1246: /* FIXME: Use CHKPMEM */
668: 1247: q_item = (MPIDI_CH3I_Acceptq_t *)
-: 1248: MPIU_Malloc(sizeof(MPIDI_CH3I_Acceptq_t));
|
-: 1249: /* --BEGIN ERROR HANDLING-- */
668: 1250: if (q_item == NULL)
-: 1251: {
#####: 1252: mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0);
#####: 1253: goto fn_exit;
-: 1254: }
-: 1255: /* --END ERROR HANDLING-- */
-: 1256:
|
668: 1257: q_item->vc = vc;
668: 1258: q_item->port_name_tag = port_name_tag;
-: 1259:
-: 1260: /* Keep some statistics on the accept queue */
668: 1261: AcceptQueueSize++;
668: 1262: if (AcceptQueueSize > maxAcceptQueueSize)
83: 1263: maxAcceptQueueSize = AcceptQueueSize;
-: 1264:
-: 1265: /* FIXME: Stack or queue? */
-: 1266: MPIU_DBG_MSG_P(CH3_CONNECT,TYPICAL,"vc=%p:Enqueuing accept connection",vc);
668: 1267: q_item->next = acceptq_head;
668: 1268: acceptq_head = q_item;
-: 1269:
668: 1270: fn_exit:
|
-: 1271: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_ACCEPTQ_ENQUEUE);
|
668: 1272: return mpi_errno;
-: 1273:}
-: 1274:
-: 1275:
-: 1276:/* Attempt to dequeue a vc from the accept queue. If the queue is
-: 1277: empty or the port_name_tag doesn't match, return a NULL vc. */
-: 1278:#undef FUNCNAME
-: 1279:#define FUNCNAME MPIDI_CH3I_Acceptq_dequeue
-: 1280:#undef FCNAME
-: 1281:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 1282:int MPIDI_CH3I_Acceptq_dequeue(MPIDI_VC_t ** vc, int port_name_tag)
1342: 1283:{
1342: 1284: int mpi_errno=MPI_SUCCESS;
-: 1285: MPIDI_CH3I_Acceptq_t *q_item, *prev;
-: 1286: MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_ACCEPTQ_DEQUEUE);
-: 1287:
-: 1288: MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_ACCEPTQ_DEQUEUE);
-: 1289:
1342: 1290: *vc = NULL;
1342: 1291: q_item = acceptq_head;
1342: 1292: prev = q_item;
-: 1293:
2750: 1294: while (q_item != NULL)
-: 1295: {
734: 1296: if (q_item->port_name_tag == port_name_tag)
-: 1297: {
668: 1298: *vc = q_item->vc;
-: 1299:
668: 1300: if ( q_item == acceptq_head )
666: 1301: acceptq_head = q_item->next;
-: 1302: else
2: 1303: prev->next = q_item->next;
-: 1304:
668: 1305: MPIU_Free(q_item);
668: 1306: AcceptQueueSize--;
668: 1307: break;;
-: 1308: }
-: 1309: else
-: 1310: {
66: 1311: prev = q_item;
66: 1312: q_item = q_item->next;
-: 1313: }
-: 1314: }
-: 1315:
1342: 1316: mpi_errno = MPIDI_CH3_Complete_Acceptq_dequeue(*vc);
-: 1317:
-: 1318: MPIU_DBG_MSG_FMT(CH3_CONNECT,TYPICAL,
-: 1319: (MPIU_DBG_FDEST,"vc=%p:Dequeuing accept connection with tag %d",
-: 1320: *vc,port_name_tag));
-: 1321:
|
-: 1322: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_ACCEPTQ_DEQUEUE);
|
1342: 1323: return mpi_errno;
-: 1324:}
-: 1325:
-: 1326:#else /* MPIDI_CH3_HAS_NO_DYNAMIC_PROCESS is defined */
-: 1327:
-: 1328:#endif /* MPIDI_CH3_HAS_NO_DYNAMIC_PROCESS */
-: 1329:
|