-: 0:Source:/home/MPI/testing/mpich2/mpich2/src/mpid/ch3/channels/sock/src/ch3_progress.c
-: 0:Graph:ch3_progress.gcno
-: 0:Data:ch3_progress.gcda
-: 0:Runs:4382
-: 0:Programs:1376
-: 1:/* -*- Mode: C; c-basic-offset:4 ; -*- */
-: 2:/*
-: 3: * (C) 2001 by Argonne National Laboratory.
-: 4: * See COPYRIGHT in top-level directory.
-: 5: */
-: 6:
-: 7:#include "mpidi_ch3_impl.h"
-: 8:#include "pmi.h"
-: 9:#include "mpidu_sock.h"
-: 10:
-: 11:#ifdef HAVE_STRING_H
-: 12:#include <string.h>
-: 13:#endif
-: 14:
-: 15:static MPIDI_CH3_PktHandler_Fcn *pktArray[MPIDI_CH3_PKT_END_CH3+1];
-: 16:
-: 17:static int ReadMoreData( MPIDI_CH3I_Connection_t *, MPID_Request * );
-: 18:
-: 19:static int MPIDI_CH3i_Progress_wait(MPID_Progress_state * );
-: 20:static int MPIDI_CH3i_Progress_test(void);
-: 21:
-: 22:/* FIXME: Move thread stuff into some set of abstractions in order to remove
-: 23: ifdefs */
-: 24:volatile unsigned int MPIDI_CH3I_progress_completion_count = 0;
-: 25:#ifdef MPICH_IS_THREADED
-: 26: volatile int MPIDI_CH3I_progress_blocked = FALSE;
-: 27: volatile int MPIDI_CH3I_progress_wakeup_signalled = FALSE;
-: 28:
-: 29:# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)
-: 30:/* This value must be static so that it isn't an uninitialized
-: 31: common symbol */
-: 32:static MPID_Thread_cond_t MPIDI_CH3I_progress_completion_cond;
-: 33:# endif
-: 34:
-: 35: static int MPIDI_CH3I_Progress_delay(unsigned int completion_count);
-: 36: static int MPIDI_CH3I_Progress_continue(unsigned int completion_count);
-: 37:#endif
-: 38:
-: 39:
-: 40:MPIDU_Sock_set_t MPIDI_CH3I_sock_set = NULL;
-: 41:static int MPIDI_CH3I_Progress_handle_sock_event(MPIDU_Sock_event_t * event);
-: 42:
-: 43:static inline int connection_pop_sendq_req(MPIDI_CH3I_Connection_t * conn);
-: 44:static inline int connection_post_recv_pkt(MPIDI_CH3I_Connection_t * conn);
-: 45:
-: 46:static int adjust_iov(MPID_IOV ** iovp, int * countp, MPIU_Size_t nb);
-: 47:
-: 48:
-: 49:#undef FUNCNAME
-: 50:#define FUNCNAME MPIDI_CH3i_Progress_test
-: 51:#undef FCNAME
-: 52:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 53:static int MPIDI_CH3i_Progress_test(void)
-: 55: MPIDU_Sock_event_t event;
-: 57: MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_TEST);
-: 58:
-: 59: MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_TEST);
-: 60:
-: 61:# ifdef MPICH_IS_THREADED
-: 62: {
-: 63: /* We don't bother testing whether threads are enabled in the
-: 64: runtime-checking case because this simple test will always be false
-: 65: if threads are not enabled. */
-: 67: {
-: 68: /*
-: 69: * Another thread is already blocking in the progress engine.
-: 70: * We are not going to block waiting for progress, so we
-: 71: * simply return. It might make sense to yield before * returning,
-: 72: * giving the PE thread a change to make progress.
-: 73: *
-: 74: * MT: Another thread is already blocking in poll. Right now,
-: 75: * calls to the progress routines are effectively
-: 76: * serialized by the device. The only way another thread may
-: 77: * enter this function is if MPIDU_Sock_wait() blocks. If
-: 78: * this changes, a flag other than MPIDI_CH3I_Progress_blocked
-: 79: * may be required to determine if another thread is in
-: 80: * the progress engine.
-: 81: */
-: 82:
|
#####: 83: goto fn_exit;
-: 84: }
-: 85: }
-: 86:# endif
-: 87:
-: 89:
-: 91: {
|
96959: 92: mpi_errno = MPIDI_CH3I_Progress_handle_sock_event(&event);
96959: 93: if (mpi_errno != MPI_SUCCESS) {
|
#####: 94: MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,
-: 95: "**ch3|sock|handle_sock_event");
-: 96: }
-: 97: }
-: 99: {
-: 102: }
-: 103: else {
#####: 104: MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**progress_sock_wait");
-: 105: }
-: 106:
-: 108: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_TEST);
-: 110: fn_fail:
-: 111: goto fn_exit;
-: 112:}
-: 113:/* end MPIDI_CH3_Progress_test() */
-: 114:
-: 115:
-: 116:#undef FUNCNAME
-: 117:#define FUNCNAME MPIDI_CH3i_Progress_wait
-: 118:#undef FCNAME
-: 119:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 120:static int MPIDI_CH3i_Progress_wait(MPID_Progress_state * progress_state)
|
16154110: 121:{
-: 122: MPIDU_Sock_event_t event;
16154110: 123: int mpi_errno = MPI_SUCCESS;
-: 124: MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_WAIT);
-: 125:
-: 126: MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_WAIT);
-: 127:
-: 128: /*
-: 129: * MT: the following code will be needed if progress can occur between
-: 130: * MPIDI_CH3_Progress_start() and
-: 131: * MPIDI_CH3_Progress_wait(), or iterations of MPIDI_CH3_Progress_wait().
-: 132: *
-: 133: * This is presently not possible, and thus the code is commented out.
-: 134: */
-: 135:# if 0
-: 136: /* FIXME: Was (USE_THREAD_IMPL == MPICH_THREAD_IMPL_NOT_IMPLEMENTED),
-: 137: which really meant not-using-global-mutex-thread model . This
-: 138: was true for the single threaded case, but was probably not intended
-: 139: for that case*/
-: 140: {
-: 141: if (progress_state->ch.completion_count != MPIDI_CH3I_progress_completion_count)
-: 142: {
-: 143: goto fn_exit;
-: 144: }
-: 145: }
-: 146:# endif
-: 147:
-: 148:# ifdef MPICH_IS_THREADED
16154110: 149: MPIU_THREAD_CHECK_BEGIN
-: 150: {
2713302: 151: if (MPIDI_CH3I_progress_blocked == TRUE)
-: 152: {
-: 153: /*
-: 154: * Another thread is already blocking in the progress engine.
-: 155: *
-: 156: * MT: Another thread is already blocking in poll. Right now,
-: 157: * calls to MPIDI_CH3_Progress_wait() are effectively
-: 158: * serialized by the device. The only way another thread may
-: 159: * enter this function is if MPIDU_Sock_wait() blocks. If
-: 160: * this changes, a flag other than MPIDI_CH3I_Progress_blocked
-: 161: * may be required to determine if another thread is in
-: 162: * the progress engine.
-: 163: */
189787: 164: MPIDI_CH3I_Progress_delay(MPIDI_CH3I_progress_completion_count);
-: 165:
189787: 166: goto fn_exit;
-: 167: }
-: 168: }
-: 169: MPIU_THREAD_CHECK_END
-: 170:# endif
-: 171:
-: 172: do
-: 173: {
-: 174:# ifdef MPICH_IS_THREADED
-: 175:
-: 176: /* The logic for this case is just complicated enough that
-: 177: we write separate code for each possibility */
-: 178:# ifdef HAVE_RUNTIME_THREADCHECK
16118343: 179: if (MPIR_ThreadInfo.isThreaded) {
2537222: 180: MPIDI_CH3I_progress_blocked = TRUE;
2537222: 181: mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set,
-: 182: MPIDU_SOCK_INFINITE_TIME, &event);
2537222: 183: MPIDI_CH3I_progress_blocked = FALSE;
2537222: 184: MPIDI_CH3I_progress_wakeup_signalled = FALSE;
-: 185: }
-: 186: else {
13581121: 187: mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set,
-: 188: MPIDU_SOCK_INFINITE_TIME, &event);
-: 189: }
-: 190:# else
-: 191: MPIDI_CH3I_progress_blocked = TRUE;
-: 192: mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set,
-: 193: MPIDU_SOCK_INFINITE_TIME, &event);
-: 194: MPIDI_CH3I_progress_blocked = FALSE;
-: 195: MPIDI_CH3I_progress_wakeup_signalled = FALSE;
-: 196:# endif /* HAVE_RUNTIME_THREADCHECK */
-: 197:
-: 198:# else
-: 199: mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set,
-: 200: MPIDU_SOCK_INFINITE_TIME, &event);
-: 201:# endif
-: 202:
|
-: 203: /* --BEGIN ERROR HANDLING-- */
16118343: 204: if (mpi_errno != MPI_SUCCESS)
-: 205: {
#####: 206: MPIU_Assert(MPIR_ERR_GET_CLASS(mpi_errno) != MPIDU_SOCK_ERR_TIMEOUT);
#####: 207: MPIU_ERR_SET(mpi_errno,MPI_ERR_OTHER,"**progress_sock_wait");
#####: 208: goto fn_fail;
-: 209: }
-: 210: /* --END ERROR HANDLING-- */
-: 211:
|
16118343: 212: mpi_errno = MPIDI_CH3I_Progress_handle_sock_event(&event);
16118343: 213: if (mpi_errno != MPI_SUCCESS) {
|
2: 214: MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,
-: 215: "**ch3|sock|handle_sock_event");
-: 216: }
-: 217: }
|
16118341: 218: while (progress_state->ch.completion_count == MPIDI_CH3I_progress_completion_count);
-: 219:
-: 220: /*
-: 221: * We could continue to call MPIU_Sock_wait in a non-blocking fashion
-: 222: * and process any other events; however, this would not
-: 223: * give the application a chance to post new receives, and thus could
-: 224: * result in an increased number of unexpected messages
-: 225: * that would need to be buffered.
-: 226: */
-: 227:
-: 228:# if MPICH_IS_THREADED
-: 229: {
-: 230: /*
-: 231: * Awaken any threads which are waiting for the progress that just
-: 232: * occurred
-: 233: */
15964321: 234: MPIDI_CH3I_Progress_continue(MPIDI_CH3I_progress_completion_count);
-: 235: }
-: 236:# endif
-: 237:
16154110: 238: fn_exit:
-: 239: /*
-: 240: * Reset the progress state so it is fresh for the next iteration
-: 241: */
16154110: 242: progress_state->ch.completion_count = MPIDI_CH3I_progress_completion_count;
-: 243:
|
-: 244: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_WAIT);
|
16154110: 245: return mpi_errno;
|
-: 246: fn_fail:
-: 247: goto fn_exit;
-: 248:}
-: 249:/* end MPIDI_CH3_Progress_wait() */
-: 250:
-: 251:
-: 252:#undef FUNCNAME
-: 253:#define FUNCNAME MPIDI_CH3_Connection_terminate
-: 254:#undef FCNAME
-: 255:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 256:int MPIDI_CH3_Connection_terminate(MPIDI_VC_t * vc)
|
14436: 257:{
14436: 258: int mpi_errno = MPI_SUCCESS;
14436: 259: MPIDI_CH3I_VC *vcch = (MPIDI_CH3I_VC *)vc->channel_private;
-: 260:
-: 261: MPIU_DBG_CONNSTATECHANGE(vc,vcch->conn,CONN_STATE_CLOSING);
14436: 262: vcch->conn->state = CONN_STATE_CLOSING;
-: 263: MPIU_DBG_MSG(CH3_DISCONNECT,TYPICAL,"Closing sock (Post_close)");
14436: 264: mpi_errno = MPIDU_Sock_post_close(vcch->sock);
14436: 265: if (mpi_errno != MPI_SUCCESS) {
|
#####: 266: MPIU_ERR_POP(mpi_errno);
-: 267: }
-: 268:
|
14436: 269: fn_exit:
14436: 270: return mpi_errno;
|
-: 271: fn_fail:
-: 272: goto fn_exit;
-: 273:}
-: 274:/* end MPIDI_CH3_Connection_terminate() */
-: 275:
-: 276:
-: 277:#undef FUNCNAME
-: 278:#define FUNCNAME MPIDI_CH3I_Progress_init
-: 279:#undef FCNAME
-: 280:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 281:int MPIDI_CH3I_Progress_init(void)
|
4382: 282:{
4382: 283: int mpi_errno = MPI_SUCCESS;
-: 284: MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT);
-: 285:
-: 286: MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT);
-: 287:
4382: 288: MPIU_THREAD_CHECK_BEGIN
-: 289:# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)
-: 290: {
186: 291: MPID_Thread_cond_create(&MPIDI_CH3I_progress_completion_cond, NULL);
-: 292: }
-: 293:# endif
-: 294: MPIU_THREAD_CHECK_END
-: 295:
4382: 296: mpi_errno = MPIDU_Sock_init();
4382: 297: if (mpi_errno != MPI_SUCCESS) {
|
#####: 298: MPIU_ERR_POP(mpi_errno);
-: 299: }
-: 300:
-: 301: /* create sock set */
|
4382: 302: mpi_errno = MPIDU_Sock_create_set(&MPIDI_CH3I_sock_set);
4382: 303: if (mpi_errno != MPI_SUCCESS) {
|
#####: 304: MPIU_ERR_POP(mpi_errno);
-: 305: }
-: 306:
-: 307: /* establish non-blocking listener */
|
4382: 308: mpi_errno = MPIDU_CH3I_SetupListener( MPIDI_CH3I_sock_set );
|
4382: 309: if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
-: 310:
-: 311: /* Initialize the code to handle incoming packets */
|
4382: 312: mpi_errno = MPIDI_CH3_PktHandler_Init( pktArray, MPIDI_CH3_PKT_END_CH3+1 );
|
4382: 313: if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
-: 314:
|
4382: 315: fn_exit:
|
-: 316: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT);
|
4382: 317: return mpi_errno;
|
-: 318: fn_fail:
-: 319: goto fn_exit;
-: 320:}
-: 321:/* end MIPDI_CH3I_Progress_init() */
-: 322:
-: 323:
-: 324:#undef FUNCNAME
-: 325:#define FUNCNAME MPIDI_CH3I_Progress_finalize
-: 326:#undef FCNAME
-: 327:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 328:int MPIDI_CH3I_Progress_finalize(void)
|
4374: 329:{
-: 330: int mpi_errno;
-: 331: MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE);
-: 332:
-: 333: MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE);
-: 334:
-: 335: /* Shut down the listener */
4374: 336: mpi_errno = MPIDU_CH3I_ShutdownListener();
|
4374: 337: if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }
-: 338:
-: 339: /* FIXME: Cleanly shutdown other socks and free connection structures.
-: 340: (close protocol?) */
-: 341:
-: 342:
-: 343: /*
-: 344: * MT: in a multi-threaded environment, finalize() should signal any
-: 345: * thread(s) blocking on MPIDU_Sock_wait() and wait for
-: 346: * those * threads to complete before destroying the progress engine
-: 347: * data structures.
-: 348: */
-: 349:
|
4374: 350: MPIDU_Sock_destroy_set(MPIDI_CH3I_sock_set);
4374: 351: MPIDU_Sock_finalize();
-: 352:
4374: 353: MPIU_THREAD_CHECK_BEGIN
-: 354:# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)
-: 355: {
186: 356: MPID_Thread_cond_destroy(&MPIDI_CH3I_progress_completion_cond, NULL);
-: 357: }
-: 358:# endif
-: 359: MPIU_THREAD_CHECK_END
-: 360:
4374: 361: fn_exit:
|
-: 362: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE);
|
4374: 363: return mpi_errno;
|
-: 364: fn_fail:
-: 365: goto fn_exit;
-: 366:}
-: 367:/* end MPIDI_CH3I_Progress_finalize() */
-: 368:
-: 369:
-: 370:#ifdef MPICH_IS_THREADED
-: 371:#undef FUNCNAME
-: 372:#define FUNCNAME MPIDI_CH3I_Progress_wakeup
-: 373:#undef FCNAME
-: 374:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 375:void MPIDI_CH3I_Progress_wakeup(void)
|
101: 376:{
101: 377: MPIDU_Sock_wakeup(MPIDI_CH3I_sock_set);
101: 378:}
-: 379:#endif
-: 380:
-: 381:#undef FUNCNAME
-: 382:#define FUNCNAME MPIDI_CH3_Get_business_card
-: 383:#undef FCNAME
-: 384:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 385:int MPIDI_CH3_Get_business_card(int myRank, char *value, int length)
372: 386:{
372: 387: return MPIDI_CH3U_Get_business_card_sock(myRank, &value, &length);
-: 388:}
-: 389:
-: 390:
-: 391:#undef FUNCNAME
-: 392:#define FUNCNAME MPIDI_CH3I_Progress_handle_sock_event
-: 393:#undef FCNAME
-: 394:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 395:static int MPIDI_CH3I_Progress_handle_sock_event(MPIDU_Sock_event_t * event)
16215302: 396:{
16215302: 397: int mpi_errno = MPI_SUCCESS;
-: 398: MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SOCK_EVENT);
-: 399:
-: 400: MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SOCK_EVENT);
-: 401:
-: 402: MPIU_DBG_MSG_D(CH3_OTHER,VERBOSE,"Socket event of type %d", event->op_type );
-: 403:
16215302: 404: switch (event->op_type)
-: 405: {
-: 406: case MPIDU_SOCK_OP_READ:
-: 407: {
-: 408: MPIDI_CH3I_Connection_t * conn =
16123209: 409: (MPIDI_CH3I_Connection_t *) event->user_ptr;
-: 410:
16123209: 411: MPID_Request * rreq = conn->recv_active;
-: 412:
|
-: 413: /* --BEGIN ERROR HANDLING-- */
16123209: 414: if (event->error != MPI_SUCCESS)
-: 415: {
-: 416: /* FIXME: the following should be handled by the close
-: 417: protocol */
2: 418: if (MPIR_ERR_GET_CLASS(event->error) != MPIDU_SOCK_ERR_CONN_CLOSED) {
2: 419: mpi_errno = event->error;
2: 420: MPIU_ERR_POP(mpi_errno);
-: 421: }
-: 422: break;
-: 423: }
-: 424: /* --END ERROR HANDLING-- */
-: 425:
|
16123207: 426: if (conn->state == CONN_STATE_CONNECTED)
-: 427: {
16099756: 428: if (conn->recv_active == NULL)
-: 429: {
16054957: 430: MPIDI_msg_sz_t buflen = sizeof (MPIDI_CH3_Pkt_t);
16054957: 431: MPIU_Assert(conn->pkt.type < MPIDI_CH3_PKT_END_CH3);
-: 432:
16054957: 433: mpi_errno = pktArray[conn->pkt.type]( conn->vc, &conn->pkt,
-: 434: &buflen, &rreq );
16054957: 435: if (mpi_errno != MPI_SUCCESS) {
|
#####: 436: MPIU_ERR_POP(mpi_errno);
-: 437: }
|
16054957: 438: MPIU_Assert(buflen == sizeof (MPIDI_CH3_Pkt_t));
-: 439:
16054957: 440: if (rreq == NULL)
-: 441: {
5969332: 442: if (conn->state != CONN_STATE_CLOSING)
-: 443: {
-: 444: /* conn->recv_active = NULL; --
-: 445: already set to NULL */
5954896: 446: mpi_errno = connection_post_recv_pkt(conn);
5954896: 447: if (mpi_errno != MPI_SUCCESS) {
|
#####: 448: MPIU_ERR_POP(mpi_errno);
-: 449: }
-: 450: }
-: 451: }
-: 452: else
-: 453: {
|
10085625: 454: mpi_errno = ReadMoreData( conn, rreq );
|
10085625: 455: if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
-: 456: }
-: 457: }
-: 458: else /* incoming data */
-: 459: {
-: 460: int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
-: 461: int complete;
-: 462:
|
44799: 463: reqFn = rreq->dev.OnDataAvail;
44799: 464: if (!reqFn) {
26333: 465: MPIU_Assert(MPIDI_Request_get_type(rreq)!=MPIDI_REQUEST_TYPE_GET_RESP);
26333: 466: MPIDI_CH3U_Request_complete(rreq);
26333: 467: complete = TRUE;
-: 468: }
-: 469: else {
18466: 470: mpi_errno = reqFn( conn->vc, rreq, &complete );
|
18466: 471: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 472: }
-: 473:
|
44799: 474: if (complete)
-: 475: {
44696: 476: conn->recv_active = NULL;
44696: 477: mpi_errno = connection_post_recv_pkt(conn);
44696: 478: if (mpi_errno != MPI_SUCCESS) {
|
#####: 479: MPIU_ERR_POP(mpi_errno);
-: 480: }
-: 481: }
-: 482: else /* more data to be read */
-: 483: {
|
103: 484: mpi_errno = ReadMoreData( conn, rreq );
|
103: 485: if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
-: 486: }
-: 487: }
-: 488: }
|
23451: 489: else if (conn->state == CONN_STATE_OPEN_LRECV_DATA)
-: 490: {
7372: 491: mpi_errno = MPIDI_CH3_Sockconn_handle_connopen_event( conn );
|
7372: 492: if (mpi_errno) { MPIU_ERR_POP( mpi_errno ); }
-: 493: }
-: 494: else /* Handling some internal connection establishment or
-: 495: tear down packet */
-: 496: {
|
16079: 497: mpi_errno = MPIDI_CH3_Sockconn_handle_conn_event( conn );
|
16079: 498: if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
-: 499: }
-: 500: break;
-: 501: }
-: 502:
-: 503: /* END OF SOCK_OP_READ */
-: 504:
-: 505: case MPIDU_SOCK_OP_WRITE:
-: 506: {
-: 507: MPIDI_CH3I_Connection_t * conn =
|
55467: 508: (MPIDI_CH3I_Connection_t *) event->user_ptr;
|
-: 509: /* --BEGIN ERROR HANDLING-- */
55467: 510: if (event->error != MPI_SUCCESS) {
#####: 511: mpi_errno = event->error;
#####: 512: MPIU_ERR_POP(mpi_errno);
-: 513: }
-: 514: /* --END ERROR HANDLING-- */
-: 515:
|
55467: 516: if (conn->send_active)
-: 517: {
39388: 518: MPID_Request * sreq = conn->send_active;
-: 519: int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
-: 520: int complete;
-: 521:
39388: 522: reqFn = sreq->dev.OnDataAvail;
39388: 523: if (!reqFn) {
33324: 524: MPIU_Assert(MPIDI_Request_get_type(sreq)!=MPIDI_REQUEST_TYPE_GET_RESP);
33324: 525: MPIDI_CH3U_Request_complete(sreq);
33324: 526: complete = TRUE;
-: 527: }
-: 528: else {
6064: 529: mpi_errno = reqFn( conn->vc, sreq, &complete );
|
6064: 530: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 531: }
-: 532:
|
39388: 533: if (complete)
-: 534: {
39337: 535: mpi_errno = connection_pop_sendq_req(conn);
39337: 536: if (mpi_errno != MPI_SUCCESS) {
|
#####: 537: MPIU_ERR_POP(mpi_errno);
-: 538: }
-: 539: }
-: 540: else /* more data to send */
-: 541: {
-: 542: for(;;)
-: 543: {
-: 544: MPID_IOV * iovp;
-: 545: MPIU_Size_t nb;
-: 546:
|
103: 547: iovp = sreq->dev.iov;
-: 548:
103: 549: mpi_errno = MPIDU_Sock_writev(conn->sock, iovp, sreq->dev.iov_count, &nb);
|
-: 550: /* --BEGIN ERROR HANDLING-- */
103: 551: if (mpi_errno != MPI_SUCCESS)
-: 552: {
#####: 553: mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER,
-: 554: "**ch3|sock|immedwrite", "ch3|sock|immedwrite %p %p %p",
-: 555: sreq, conn, conn->vc);
#####: 556: goto fn_fail;
-: 557: }
-: 558: /* --END ERROR HANDLING-- */
-: 559:
-: 560: MPIU_DBG_MSG_FMT(CH3_CHANNEL,VERBOSE,
-: 561: (MPIU_DBG_FDEST,"immediate writev, vc=%p, sreq=0x%08x, nb=" MPIDI_MSG_SZ_FMT,
-: 562: conn->vc, sreq->handle, nb));
-: 563:
|
103: 564: if (nb > 0 && adjust_iov(&iovp, &sreq->dev.iov_count, nb))
-: 565: {
-: 566: int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
-: 567: int complete;
-: 568:
96: 569: reqFn = sreq->dev.OnDataAvail;
96: 570: if (!reqFn) {
43: 571: MPIU_Assert(MPIDI_Request_get_type(sreq)!=MPIDI_REQUEST_TYPE_GET_RESP);
43: 572: MPIDI_CH3U_Request_complete(sreq);
43: 573: complete = TRUE;
-: 574: }
-: 575: else {
53: 576: mpi_errno = reqFn( conn->vc, sreq, &complete );
|
53: 577: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 578: }
|
96: 579: if (complete)
-: 580: {
44: 581: mpi_errno = connection_pop_sendq_req(conn);
44: 582: if (mpi_errno != MPI_SUCCESS) {
|
#####: 583: MPIU_ERR_POP(mpi_errno);
-: 584: }
-: 585: break;
-: 586: }
-: 587: }
-: 588: else
-: 589: {
-: 590: MPIU_DBG_MSG_FMT(CH3_CHANNEL,VERBOSE,
-: 591: (MPIU_DBG_FDEST,"posting writev, vc=%p, conn=%p, sreq=0x%08x",
-: 592: conn->vc, conn, sreq->handle));
|
7: 593: mpi_errno = MPIDU_Sock_post_writev(conn->sock, iovp, sreq->dev.iov_count, NULL);
|
-: 594: /* --BEGIN ERROR HANDLING-- */
7: 595: if (mpi_errno != MPI_SUCCESS)
-: 596: {
#####: 597: mpi_errno = MPIR_Err_create_code(
-: 598: mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|postwrite",
-: 599: "ch3|sock|postwrite %p %p %p", sreq, conn, conn->vc);
#####: 600: goto fn_fail;
-: 601: }
-: 602: /* --END ERROR HANDLING-- */
-: 603:
-: 604: break;
-: 605: }
-: 606: }
-: 607: }
-: 608: }
-: 609: else /* finished writing internal packet header */
-: 610: {
-: 611: /* the connection is not active yet */
|
16079: 612: mpi_errno = MPIDI_CH3_Sockconn_handle_connwrite( conn );
|
16079: 613: if (mpi_errno) { MPIU_ERR_POP( mpi_errno ); }
-: 614: }
-: 615: break;
-: 616: }
-: 617: /* END OF SOCK_OP_WRITE */
-: 618:
-: 619: case MPIDU_SOCK_OP_ACCEPT:
-: 620: {
|
8040: 621: mpi_errno = MPIDI_CH3_Sockconn_handle_accept_event();
|
8040: 622: if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
-: 623: break;
-: 624: }
-: 625:
-: 626: case MPIDU_SOCK_OP_CONNECT:
-: 627: {
|
8039: 628: mpi_errno = MPIDI_CH3_Sockconn_handle_connect_event(
-: 629: (MPIDI_CH3I_Connection_t *) event->user_ptr,
-: 630: event->error );
|
8039: 631: if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
-: 632: break;
-: 633: }
-: 634:
-: 635: case MPIDU_SOCK_OP_CLOSE:
-: 636: {
|
20446: 637: mpi_errno = MPIDI_CH3_Sockconn_handle_close_event(
-: 638: (MPIDI_CH3I_Connection_t *) event->user_ptr );
|
20446: 639: if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
-: 640: break;
-: 641: }
-: 642:
-: 643: case MPIDU_SOCK_OP_WAKEUP:
-: 644: {
|
101: 645: MPIDI_CH3_Progress_signal_completion();
-: 646: /* MPIDI_CH3I_progress_completion_count++; */
-: 647: break;
-: 648: }
-: 649: }
-: 650:
16215302: 651: fn_exit:
|
-: 652: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SOCK_EVENT);
|
16215302: 653: return mpi_errno;
|
-: 654: fn_fail:
-: 655: goto fn_exit;
-: 656:}
-: 657:/* end MPIDI_CH3I_Progress_handle_sock_event() */
-: 658:
-: 659:
-: 660:#ifdef MPICH_IS_THREADED
-: 661:
-: 662:/* Note that this routine is only called if threads are enabled;
-: 663: it does not need to check whether runtime threads are enabled */
-: 664:#undef FUNCNAME
-: 665:#define FUNCNAME MPIDI_CH3I_Progress_delay
-: 666:#undef FCNAME
-: 667:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 668:static int MPIDI_CH3I_Progress_delay(unsigned int completion_count)
|
189787: 669:{
189787: 670: int mpi_errno = MPI_SUCCESS;
-: 671:
-: 672:# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)
-: 673: {
569361: 674: while (completion_count == MPIDI_CH3I_progress_completion_count)
-: 675: {
189787: 676: MPID_Thread_cond_wait(&MPIDI_CH3I_progress_completion_cond,
-: 677: &MPIR_ThreadInfo.global_mutex);
-: 678: }
-: 679: }
-: 680:# endif
-: 681:
189787: 682: return mpi_errno;
-: 683:}
-: 684:/* end MPIDI_CH3I_Progress_delay() */
-: 685:
-: 686:
-: 687:#undef FUNCNAME
-: 688:#define FUNCNAME MPIDI_CH3I_Progress_continue
-: 689:#undef FCNAME
-: 690:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 691:static int MPIDI_CH3I_Progress_continue(unsigned int completion_count)
15964321: 692:{
15964321: 693: int mpi_errno = MPI_SUCCESS;
-: 694:
15964321: 695: MPIU_THREAD_CHECK_BEGIN
-: 696:# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)
-: 697: {
2523515: 698: MPID_Thread_cond_broadcast(&MPIDI_CH3I_progress_completion_cond);
-: 699: }
-: 700:# endif
-: 701: MPIU_THREAD_CHECK_END
-: 702:
15964321: 703: return mpi_errno;
-: 704:}
-: 705:/* end MPIDI_CH3I_Progress_continue() */
-: 706:
-: 707:#endif /* MPICH_IS_THREADED */
-: 708:
-: 709:
-: 710:/* FIXME: (a) what does this do and where is it used and (b)
-: 711: we could replace it with a #define for the single-method case */
-: 712:#undef FUNCNAME
-: 713:#define FUNCNAME MPIDI_CH3I_VC_post_connect
-: 714:#undef FCNAME
-: 715:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 716:int MPIDI_CH3I_VC_post_connect(MPIDI_VC_t * vc)
7371: 717:{
7371: 718: return MPIDI_CH3I_VC_post_sockconnect( vc );
-: 719:}
-: 720:/* end MPIDI_CH3I_VC_post_connect() */
-: 721:
-: 722:/* FIXME: This function also used in ch3u_connect_sock.c */
-: 723:#undef FUNCNAME
-: 724:#define FUNCNAME connection_pop_sendq_req
-: 725:#undef FCNAME
-: 726:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 727:static inline int connection_pop_sendq_req(MPIDI_CH3I_Connection_t * conn)
39381: 728:{
39381: 729: int mpi_errno = MPI_SUCCESS;
39381: 730: MPIDI_CH3I_VC *vcch = (MPIDI_CH3I_VC *)conn->vc->channel_private;
-: 731: MPIDI_STATE_DECL(MPID_STATE_CONNECTION_POP_SENDQ_REQ);
-: 732:
-: 733:
-: 734: MPIDI_FUNC_ENTER(MPID_STATE_CONNECTION_POP_SENDQ_REQ);
-: 735: /* post send of next request on the send queue */
-: 736:
-: 737: /* FIXME: Is dequeue/get next the operation we really want? */
39381: 738: MPIDI_CH3I_SendQ_dequeue(vcch);
39381: 739: conn->send_active = MPIDI_CH3I_SendQ_head(vcch); /* MT */
39381: 740: if (conn->send_active != NULL)
-: 741: {
-: 742: MPIU_DBG_MSG_P(CH3_CONNECT,TYPICAL,"conn=%p: Posting message from connection send queue", conn );
14915: 743: mpi_errno = MPIDU_Sock_post_writev(conn->sock, conn->send_active->dev.iov, conn->send_active->dev.iov_count, NULL);
14915: 744: if (mpi_errno != MPI_SUCCESS) {
|
#####: 745: MPIU_ERR_POP(mpi_errno);
-: 746: }
-: 747: }
-: 748:
39381: 749: fn_fail:
-: 750: MPIDI_FUNC_EXIT(MPID_STATE_CONNECTION_POP_SENDQ_REQ);
|
39381: 751: return mpi_errno;
-: 752:}
-: 753:
-: 754:
-: 755:
-: 756:#undef FUNCNAME
-: 757:#define FUNCNAME connection_post_recv_pkt
-: 758:#undef FCNAME
-: 759:#define FCNAME MPIDI_QUOTE(FUNCNAME)
-: 760:static inline int connection_post_recv_pkt(MPIDI_CH3I_Connection_t * conn)
16040521: 761:{
16040521: 762: int mpi_errno = MPI_SUCCESS;
-: 763: MPIDI_STATE_DECL(MPID_STATE_CONNECTION_POST_RECV_PKT);
-: 764:
-: 765: MPIDI_FUNC_ENTER(MPID_STATE_CONNECTION_POST_RECV_PKT);
-: 766:
16040521: 767: mpi_errno = MPIDU_Sock_post_read(conn->sock, &conn->pkt, sizeof(conn->pkt), sizeof(conn->pkt), NULL);
16040521: 768: if (mpi_errno != MPI_SUCCESS) {
|
#####: 769: MPIU_ERR_SET(mpi_errno,MPI_ERR_OTHER, "**fail");
-: 770: }
-: 771:
-: 772: MPIDI_FUNC_EXIT(MPID_STATE_CONNECTION_POST_RECV_PKT);
|
16040521: 773: return mpi_errno;
-: 774:}
-: 775:
-: 776:/* FIXME: What is this routine for? */
-: 777:#undef FUNCNAME
-: 778:#define FUNCNAME adjust_iov
-: 779:#undef FCNAME
-: 780:#define FCNAME MPIU_QUOTE(FUNCNAME)
-: 781:static int adjust_iov(MPID_IOV ** iovp, int * countp, MPIU_Size_t nb)
10087593: 782:{
10087593: 783: MPID_IOV * const iov = *iovp;
10087593: 784: const int count = *countp;
10087593: 785: int offset = 0;
-: 786:
30560642: 787: while (offset < count)
-: 788: {
10430262: 789: if (iov[offset].MPID_IOV_LEN <= nb)
-: 790: {
10385456: 791: nb -= iov[offset].MPID_IOV_LEN;
10385456: 792: offset++;
-: 793: }
-: 794: else
-: 795: {
44806: 796: iov[offset].MPID_IOV_BUF =
-: 797: (MPID_IOV_BUF_CAST)((char *) iov[offset].MPID_IOV_BUF + nb);
44806: 798: iov[offset].MPID_IOV_LEN -= nb;
44806: 799: break;
-: 800: }
-: 801: }
-: 802:
10087593: 803: *iovp += offset;
10087593: 804: *countp -= offset;
-: 805:
10087593: 806: return (*countp == 0);
-: 807:}
-: 808:/* end adjust_iov() */
-: 809:
-: 810:
-: 811:static int ReadMoreData( MPIDI_CH3I_Connection_t * conn, MPID_Request *rreq )
10085728: 812:{
10085728: 813: int mpi_errno = MPI_SUCCESS;
-: 814:
-: 815: while (1) {
-: 816: MPID_IOV * iovp;
-: 817: MPIU_Size_t nb;
-: 818:
10087490: 819: iovp = rreq->dev.iov;
-: 820:
10087490: 821: mpi_errno = MPIDU_Sock_readv(conn->sock, iovp,
-: 822: rreq->dev.iov_count, &nb);
|
-: 823: /* --BEGIN ERROR HANDLING-- */
10087490: 824: if (mpi_errno != MPI_SUCCESS) {
#####: 825: mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER,
-: 826: "**ch3|sock|immedread", "ch3|sock|immedread %p %p %p",
-: 827: rreq, conn, conn->vc);
#####: 828: goto fn_fail;
-: 829: }
-: 830: /* --END ERROR HANDLING-- */
-: 831:
-: 832: MPIU_DBG_MSG_FMT(CH3_CHANNEL,VERBOSE,
-: 833: (MPIU_DBG_FDEST,"immediate readv, vc=%p nb=" MPIDI_MSG_SZ_FMT ", rreq=0x%08x",
-: 834: conn->vc, nb, rreq->handle));
-: 835:
|
10087490: 836: if (nb > 0 && adjust_iov(&iovp, &rreq->dev.iov_count, nb)) {
-: 837: int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
-: 838: int complete;
-: 839:
10042691: 840: reqFn = rreq->dev.OnDataAvail;
10042691: 841: if (!reqFn) {
9010771: 842: MPIU_Assert(MPIDI_Request_get_type(rreq)!=MPIDI_REQUEST_TYPE_GET_RESP);
9010771: 843: MPIDI_CH3U_Request_complete(rreq);
9010771: 844: complete = TRUE;
-: 845: }
-: 846: else {
1031920: 847: mpi_errno = reqFn( conn->vc, rreq, &complete );
|
1031920: 848: if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-: 849: }
-: 850:
|
10042691: 851: if (complete) {
10040929: 852: conn->recv_active = NULL; /* -- already set to NULL */
10040929: 853: mpi_errno = connection_post_recv_pkt(conn);
10040929: 854: if (mpi_errno != MPI_SUCCESS) {
|
#####: 855: MPIU_ERR_POP(mpi_errno);
-: 856: }
-: 857:
-: 858: break;
-: 859: }
-: 860: }
-: 861: else {
-: 862: MPIU_DBG_MSG_FMT(CH3_CHANNEL,VERBOSE,
-: 863: (MPIU_DBG_FDEST,"posting readv, vc=%p, rreq=0x%08x",
-: 864: conn->vc, rreq->handle));
|
44799: 865: conn->recv_active = rreq;
44799: 866: mpi_errno = MPIDU_Sock_post_readv(conn->sock, iovp, rreq->dev.iov_count, NULL);
|
-: 867: /* --BEGIN ERROR HANDLING-- */
44799: 868: if (mpi_errno != MPI_SUCCESS) {
#####: 869: mpi_errno = MPIR_Err_create_code(
-: 870: mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|postread",
-: 871: "ch3|sock|postread %p %p %p", rreq, conn, conn->vc);
#####: 872: goto fn_fail;
-: 873: }
-: 874: /* --END ERROR HANDLING-- */
-: 875: break;
-: 876: }
-: 877: }
-: 878:
10085728: 879: fn_fail:
|
10085728: 880: return mpi_errno;
-: 881:}
-: 882:
-: 883:/*
-: 884: * The dynamic-library interface requires a unified Progress routine.
-: 885: * This is that routine.
-: 886: */
-: 887:int MPIDI_CH3I_Progress( int blocking, MPID_Progress_state *state )
-: 889: int mpi_errno;
-: 892:
-: 894:}
|