2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
38 /* this file is included from p2p.c; it's not really a header file,
39 but this defines a lot of functions that probably need to be inlined.*/
41 /* Point-to-point communication protocol functions */
44 void tMPI_Free_env_list_init(struct free_envelope_list *evl, int N)
48 /* allocate the head element */
49 evl->recv_alloc_head=(struct envelope*)tMPI_Malloc(sizeof(struct envelope)
51 evl->head_recv=evl->recv_alloc_head;
57 evl->head_recv[i].next=&(evl->head_recv[i+1]);
61 evl->head_recv[i].next=NULL;
63 evl->head_recv[i].rlist=NULL;
64 evl->head_recv[i].slist=NULL;
68 void tMPI_Free_env_list_destroy(struct free_envelope_list *evl)
70 free(evl->recv_alloc_head);
72 evl->recv_alloc_head=NULL;
75 static struct envelope* tMPI_Free_env_list_fetch_recv(struct
76 free_envelope_list *evl)
79 if (! evl->head_recv )
81 /* TODO: make this do something better than crash */
82 fprintf(stderr, "Ran out of recv envelopes!!!!\n");
87 evl->head_recv=ret->next;
95 static void tMPI_Free_env_list_return_recv(struct free_envelope_list *evl,
101 rev->next=evl->head_recv;
114 /* tmpi_send_envelope_list functions */
116 void tMPI_Send_env_list_init(struct send_envelope_list *evl, int N)
119 #ifndef TMPI_LOCK_FREE_LISTS
120 tMPI_Spinlock_init( &(evl->lock_rts) );
121 tMPI_Spinlock_init( &(evl->lock_new) );
125 evl->alloc_head=(struct envelope*)tMPI_Malloc(sizeof(struct envelope)*N);
128 evl->alloc_head[i].next=(i<(N-1)) ? &(evl->alloc_head[i+1]) : NULL;
129 evl->alloc_head[i].prev=NULL;
130 evl->alloc_head[i].slist=evl;
131 evl->alloc_head[i].rlist=NULL;
132 #ifdef USE_SEND_RECV_COPY_BUFFER
133 evl->alloc_head[i].cb=(void*)tMPI_Malloc(sizeof(char)*COPY_BUFFER_SIZE);
137 #ifdef TMPI_LOCK_FREE_LISTS
138 tMPI_Atomic_ptr_set(&(evl->head_new), NULL);
139 tMPI_Atomic_ptr_set(&(evl->head_rts), NULL);
141 evl->head_new = NULL;
142 evl->head_rts = NULL;
144 evl->head_free = &(evl->alloc_head[1]);
145 /* initialize the head_old circular list with dummy element */
146 evl->head_old = evl->alloc_head; /* the first element is a dummy */
147 evl->head_old->next = evl->head_old;
148 evl->head_old->prev = evl->head_old;
151 void tMPI_Send_env_list_destroy(struct send_envelope_list *evl)
153 #ifdef USE_SEND_RECV_COPY_BUFFER
155 for(i=0;i<evl->Nalloc;i++)
157 free(evl->alloc_head[i].cb);
160 free(evl->alloc_head);
161 evl->alloc_head=NULL;
162 #ifdef TMPI_LOCK_FREE_LISTS
163 tMPI_Atomic_ptr_set(&(evl->head_new), NULL);
167 evl->head_old=NULL; /* make it crash if used after tMPI_Finalize */
171 static struct envelope*
172 tMPI_Send_env_list_fetch_new(struct send_envelope_list *evl)
174 struct envelope *ret;
178 /* first check whether any envelopes were returned to sender */
179 #ifdef TMPI_LOCK_FREE_LISTS
180 if ((ret=(struct envelope*)tMPI_Atomic_ptr_get(&(evl->head_rts))))
185 /* detach the list */
186 #ifdef TMPI_LOCK_FREE_LISTS
187 /* we detach by swapping what we expect the pointer value to be,
188 with NULL. If there were a cross-platform way to atomically
189 swap without checking, we could do that, too. */
190 while(tMPI_Atomic_ptr_cas( &(evl->head_rts), ret, NULL ) !=
193 ret=(struct envelope*)tMPI_Atomic_ptr_get(&(evl->head_rts));
196 tMPI_Spinlock_lock( &(evl->lock_rts) );
199 tMPI_Spinlock_unlock( &(evl->lock_rts) );
201 /* now add the items to head_free */
204 struct envelope *next=ret->next;
205 ret->next=evl->head_free;
211 /* get the last free one off the list */
214 #ifdef USE_SEND_RECV_COPY_BUFFER
216 /* There are no free send envelopes, so all we can do is handle
217 incoming requests until we get a free send envelope. */
218 printf("Ran out of send envelopes!!\n");
219 tMPI_Wait_process_incoming(tMPI_Get_current());
223 /* If this happens, it most likely indicates a bug in the
224 calling program. We could fix the situation by waiting,
225 but that would most likely lead to deadlocks - even
226 more difficult to debug than this. */
227 fprintf(stderr, "Ran out of send envelopes!!!!\n");
233 evl->head_free=ret->next;
244 static void tMPI_Send_env_list_return(struct envelope *sev)
246 struct send_envelope_list *evl=sev->slist;
248 sev->next=evl->head_free;
253 #ifdef USE_SEND_RECV_COPY_BUFFER
254 static void tMPI_Send_env_list_rts(struct envelope *sev)
256 struct send_envelope_list *evl=sev->slist;
257 #ifdef TMPI_LOCK_FREE_LISTS
258 struct envelope *sevn;
262 sevn=(struct envelope*)tMPI_Atomic_ptr_get(&evl->head_rts);
264 /* the cmpxchg operation is a memory fence, so we shouldn't need
265 to worry about out-of-order evaluation */
267 while (tMPI_Atomic_ptr_cas( &(evl->head_rts), sevn, sev ) != (void*)sevn);
269 tMPI_Spinlock_lock( &(evl->lock_rts) );
270 ev->next=(struct envelope*)evl->head_rts;
272 tMPI_Spinlock_unlock( &(evl->lock_rts) );
279 static void tMPI_Send_env_list_remove_old(struct envelope *sev)
281 /* pretty straighforward because it isn't a shared list */
283 sev->next->prev=sev->prev;
285 sev->prev->next=sev->next;
291 static void tMPI_Send_env_list_add_new(struct tmpi_thread *cur,
292 struct send_envelope_list *evl,
293 struct envelope *sev)
295 #ifdef TMPI_LOCK_FREE_LISTS
296 struct envelope *evl_head_new_orig;
297 struct envelope *evl_cas;
301 #ifdef TMPI_LOCK_FREE_LISTS
302 /* behold our lock-free shared linked list:
303 (it's actually quite simple because we only do operations at the head
304 of the list, either adding them - such as here - or detaching the whole
308 /* read the old head atomically */
309 evl_head_new_orig=(struct envelope*) tMPI_Atomic_ptr_get(
311 /* set our envelope to have that as its next */
312 sev->next=evl_head_new_orig;
313 /* do the compare-and-swap.
314 this operation is a memory fence, so we shouldn't need
315 to worry about out-of-order stores */
316 evl_cas=(struct envelope*)tMPI_Atomic_ptr_cas(&(evl->head_new),
317 evl_head_new_orig, sev);
318 /* and compare the results: if they aren't the same,
319 somebody else got there before us: */
320 } while (evl_cas != evl_head_new_orig);
322 tMPI_Spinlock_lock( &(evl->lock_new) );
323 /* we add to the start of the list */
324 sev->next=(struct send_envelope*)evl->head_new;
325 /* actually attach it to the list */
327 tMPI_Spinlock_unlock( &(evl->lock_new) );
330 #if defined(TMPI_PROFILE)
331 tMPI_Profile_wait_start(cur);
333 /* signal to the thread that there is a new envelope */
334 tMPI_Event_signal( &(sev->dest->p2p_event) );
335 #if defined(TMPI_PROFILE)
336 tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p_signal);
340 static void tMPI_Send_env_list_move_to_old(struct envelope *sev)
342 struct send_envelope_list *evl=sev->slist;
344 /* remove from old list. We assume the list has been detached! */
346 sev->next->prev=sev->prev;
348 sev->prev->next=sev->next;
350 /* we add to the end of the list */
351 sev->next=evl->head_old;
352 sev->prev=evl->head_old->prev;
365 /* tmpi_recv_envelope_list functions */
367 void tMPI_Recv_env_list_init(struct recv_envelope_list *evl)
369 evl->head = &(evl->dummy);
370 evl->head->prev=evl->head;
371 evl->head->next=evl->head;
374 void tMPI_Recv_env_list_destroy(struct recv_envelope_list *evl)
379 static void tMPI_Recv_env_list_add(struct recv_envelope_list *evl,
380 struct envelope *rev)
383 /* we add to the end of the list */
385 rev->prev=evl->head->prev;
391 static void tMPI_Recv_env_list_remove(struct envelope *rev)
394 rev->next->prev=rev->prev;
396 rev->prev->next=rev->next;
409 /* tmpi_req functions */
411 void tMPI_Req_list_init(struct req_list *rl, int N_reqs)
415 rl->alloc_head=(struct tmpi_req_*)tMPI_Malloc(
416 sizeof(struct tmpi_req_)*N_reqs);
417 rl->head=rl->alloc_head;
418 for(i=0;i<N_reqs;i++)
421 rl->head[i].prev=NULL;
423 rl->head[i].prev=&(rl->head[i-1]);
426 rl->head[i].next=NULL;
428 rl->head[i].next=&(rl->head[i+1]);
432 void tMPI_Req_list_destroy(struct req_list *rl)
434 free(rl->alloc_head);
441 static struct tmpi_req_ *tMPI_Get_req(struct req_list *rl)
443 struct tmpi_req_ *req=rl->head;
446 /* we don't need locks here because requests are a per-thread property */
449 /* this could be fixed */
450 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_REQUESTS);
459 static void tMPI_Return_req(struct req_list *rl, struct tmpi_req_ *req)
468 static void tMPI_Req_init(struct tmpi_req_ *rq, struct envelope *ev)
477 rq->tag=TMPI_ANY_TAG;
478 rq->error=TMPI_SUCCESS;
488 /* Point-to-point communication protocol functions */
492 static void tMPI_Set_req(struct envelope *ev, struct tmpi_req_ *req)
494 req->source = ev->src;
495 req->comm = ev->comm;
497 req->error = ev->error;
500 if (tMPI_Atomic_get(&(ev->state))>env_unmatched)
501 req->transferred = ev->bufsize;
503 req->transferred = 0;
507 if (tMPI_Atomic_get(&(ev->state))==env_finished)
508 req->transferred = ev->bufsize;
510 req->transferred = 0;
514 static void tMPI_Set_status(struct tmpi_req_ *req, tMPI_Status *st)
518 st->TMPI_SOURCE = tMPI_Comm_seek_rank(req->comm, req->source);
519 st->TMPI_TAG = req->tag;
520 st->TMPI_ERROR = req->error;
521 st->transferred = req->transferred;
522 st->cancelled = req->cancelled;
527 static bool tMPI_Envelope_matches(const struct envelope *sev,
528 const struct envelope *rev)
531 printf("%5d: tMPI_Envelope_matches (%d->%d)==(%d->%d), tag=(%d==%d), \n datatype=(%ld==%ld), comm=(%ld,%ld),\n finished=(%d==%d)\n",
532 tMPI_This_threadnr(),
533 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
534 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
535 (int)(sev->tag), (int)(rev->tag),
536 (long int)sev->datatype, (long int)rev->datatype,
537 (long int)sev->comm, (long int)rev->comm,
538 (int)sev->state.value, (int)rev->state.value);
541 if ( ( (rev->tag == TMPI_ANY_TAG) || (rev->tag == sev->tag) ) &&
542 ( sev->comm == rev->comm ) &&
543 ( (!rev->src) || (rev->src == sev->src) ) &&
544 ( sev->dest == rev->dest ) &&
545 ( sev->datatype == rev->datatype ) &&
546 ( sev->state.value < env_finished &&
547 rev->state.value == env_unmatched ) )
550 printf("%5d: (%d->%d) tag=%d found match\n",
551 tMPI_This_threadnr(),
552 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
564 static struct envelope*
565 tMPI_Send_env_list_search_old(struct send_envelope_list *evl,
566 struct envelope *rev)
568 struct envelope *sev;
570 sev=(struct envelope*)evl->head_old->next;
571 while(sev != evl->head_old)
573 if (tMPI_Envelope_matches(sev, rev))
575 /* remove the envelope */
576 tMPI_Send_env_list_remove_old(sev);
585 static struct envelope*
586 tMPI_Recv_env_list_search_new(struct recv_envelope_list *evl,
587 struct envelope *sev)
589 struct envelope *rev;
592 while(rev != evl->head)
594 if (tMPI_Envelope_matches(sev, rev))
604 #ifdef USE_SEND_RECV_COPY_BUFFER
605 static void tMPI_Send_copy_buffer(struct envelope *sev, struct tmpi_req_ *req)
607 /* Fill copy buffer, after having anounced its possible use */
609 /* in the special case of a zero buffer size, we don't do anything and
610 always let the receiver handle it */
614 /* first check whether the other side hasn't started yet */
615 tMPI_Atomic_memory_barrier();
616 if ((tMPI_Atomic_get( &(sev->state) ) == env_unmatched ))
619 memcpy(sev->cb, sev->buf, sev->bufsize);
620 /* now set state, if other side hasn't started copying yet. */
621 if (tMPI_Atomic_cas( &(sev->state), env_unmatched, env_cb_available)
624 /* if it was originally unmatched, the receiver wasn't
625 copying the old buffer. We can don't need to wait,
626 and the receiver is going to clean up this envelope. */
628 printf("%5d: tMPI_Send_copy_buffer(%d->%d, tag=%d) completed\n",
629 tMPI_This_threadnr(),
630 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
637 /* and if we reached this point, the receiver had already started
638 copying, and we need to clean up the envelope ourselves.
640 we first need to wait until the receiver is finished copying. We
641 know this is a short wait (since the buffer was small enough to be
642 buffered in the first place), so we just spin-wait. */
643 while(tMPI_Atomic_get( &(sev->state) ) < env_cb_available)
645 tMPI_Atomic_memory_barrier();
648 printf("%5d: tMPI_Send_copy_buffer(%d->%d, tag=%d) waiting-completed\n",
649 tMPI_This_threadnr(),
650 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest), (int)(sev->tag));
653 tMPI_Set_req(sev, req);
654 /* and now we clean up */
655 tMPI_Send_env_list_return(sev);
660 static struct envelope* tMPI_Prep_send_envelope(struct send_envelope_list *evl,
661 tMPI_Comm comm, struct tmpi_thread *src,
662 struct tmpi_thread *dest, void *buf, int count,
663 tMPI_Datatype datatype, int tag, bool nonblock)
665 /* get an envelope from the send-envelope stack */
666 struct envelope *ev=tMPI_Send_env_list_fetch_new( evl );
669 ev->nonblock=nonblock;
677 ev->bufsize=count*datatype->size;
678 ev->datatype=datatype;
684 #ifdef USE_SEND_RECV_COPY_BUFFER
685 /* check whether we'll be double buffering */
686 ev->using_cb=(ev->bufsize < COPY_BUFFER_SIZE);
687 /* but don't do anything yet */
690 tMPI_Atomic_set(&(ev->state), env_unmatched);
692 ev->error=TMPI_SUCCESS;
695 tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
696 ev->error=TMPI_ERR_XFER_BUFSIZE;
702 static struct envelope* tMPI_Prep_recv_envelope(struct tmpi_thread *cur,
703 tMPI_Comm comm, struct tmpi_thread *src,
704 struct tmpi_thread *dest, void *buf, int count,
705 tMPI_Datatype datatype, int tag, bool nonblock)
707 /* get an envelope from the stack */
708 struct envelope *ev=tMPI_Free_env_list_fetch_recv( &(cur->envelopes) );
711 ev->nonblock=nonblock;
719 ev->bufsize=count*datatype->size;
720 ev->datatype=datatype;
727 tMPI_Atomic_set(&(ev->state), env_unmatched);
729 ev->error=TMPI_SUCCESS;
732 tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
733 ev->error=TMPI_ERR_XFER_BUFSIZE;
746 static void tMPI_Xfer(struct tmpi_thread *cur, struct envelope *sev,
747 struct envelope *rev)
749 #ifdef USE_SEND_RECV_COPY_BUFFER
750 /* we remove the sender's envelope only if we do the transfer, which
751 we always do if the buffer size = 0 */
752 bool remove_sender = (sev->bufsize==0);
755 printf("%5d: tMPI_Xfer (%d->%d, tag=%d) started\n",
756 tMPI_This_threadnr(),
757 tMPI_Threadnr(sev->src), tMPI_Threadnr(rev->dest), (int)(sev->tag));
760 /* first set data on the receiving end so status can be updated */
764 if (sev->bufsize) /* do the actual transfer */
766 void *sbuf=sev->buf; /* source buffer */
767 if (sev->bufsize > rev->bufsize)
769 tMPI_Error((rev->comm), TMPI_ERR_XFER_BUFSIZE);
770 tMPI_Atomic_set(&(rev->state), env_finished);
771 tMPI_Atomic_set(&(sev->state), env_finished);
772 rev->error = TMPI_ERR_XFER_BUFSIZE;
773 sev->error = TMPI_ERR_XFER_BUFSIZE;
777 #ifdef USE_SEND_RECV_COPY_BUFFER
780 /* check if the other side has already finished copying */
781 if (tMPI_Atomic_cas( &(sev->state), env_unmatched, env_copying)
784 /* it has, and we're copying from the new buffer.
785 We're now also tasked with removing the envelope */
789 tMPI_Profile_count_buffered_p2p_xfer(cur);
795 if (!rev->buf || !sev->buf)
797 tMPI_Error((rev->comm), TMPI_ERR_BUF);
798 tMPI_Atomic_set(&(rev->state), env_finished);
799 tMPI_Atomic_set(&(sev->state), env_finished);
800 rev->error = TMPI_ERR_BUF;
801 sev->error = TMPI_ERR_BUF;
804 memcpy(rev->buf, sbuf, sev->bufsize);
806 tMPI_Profile_count_p2p_xfer(cur);
808 /* for status update */
810 rev->bufsize=sev->bufsize;
811 /* and mark that we're finished */
812 #if defined(TMPI_PROFILE)
814 tMPI_Profile_wait_start(cur);
816 tMPI_Atomic_set( &(rev->state), env_finished);
817 tMPI_Atomic_set( &(sev->state), env_finished);
819 /* signal to a potentially waiting thread that we're done. */
820 tMPI_Atomic_add_return( &(rev->src->ev_outgoing_received), 1);
821 tMPI_Event_signal(&(rev->src->p2p_event));
823 /* remove the receiving envelope if it's in a list */
824 tMPI_Recv_env_list_remove(rev);
825 #ifdef USE_SEND_RECV_COPY_BUFFER
828 tMPI_Send_env_list_rts(sev);
831 #if defined(TMPI_PROFILE)
832 tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p_signal);
838 printf("%5d: tMPI_Xfer (%d->%d, tag=%d) done\n",
839 tMPI_This_threadnr(),
840 tMPI_Threadnr(sev->src), tMPI_Threadnr(rev->dest), (int)(sev->tag));
851 static struct envelope* tMPI_Post_match_recv(struct tmpi_thread *cur,
853 struct tmpi_thread *src,
854 void *recv_buf, int recv_count,
855 tMPI_Datatype datatype,
856 int tag, bool nonblock)
858 struct tmpi_thread *dest=cur;
859 struct envelope *rev;
860 struct envelope *sev=NULL;
861 int src_threadnr=src ? tMPI_Threadnr(src) : Nthreads;
864 /* reserve an envelope to post */
865 rev=tMPI_Prep_recv_envelope(cur, comm, src, dest, recv_buf, recv_count,
866 datatype, tag, nonblock);
869 printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) started\n",
870 tMPI_This_threadnr(),
871 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest), (int)(rev->tag));
874 /* we now check the entire exisiting send queue */
877 sev=tMPI_Send_env_list_search_old( &(dest->evs[src_threadnr]), rev);
881 /* if we don't know the source, we look at all possible sources */
882 for(i=0;i<Nthreads;i++)
884 sev=tMPI_Send_env_list_search_old(&(dest->evs[i]), rev);
893 printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) found match\n",
894 tMPI_This_threadnr(),
895 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
899 /* we found a matching send */
900 tMPI_Xfer(cur, sev, rev);
905 printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) no match\n",
906 tMPI_This_threadnr(),
907 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
911 /* we post the envelope in the right list */
912 tMPI_Recv_env_list_add( &(dest->evr), rev);
920 static struct envelope *tMPI_Post_send(struct tmpi_thread *cur,
922 struct tmpi_thread *dest,
923 void *send_buf, int send_count,
924 tMPI_Datatype datatype, int tag,
927 struct tmpi_thread *src=cur;
928 struct envelope *sev;
929 int src_threadnr=tMPI_Threadnr(src);
930 struct send_envelope_list *sevl=&(dest->evs[src_threadnr]);
932 /* reserve an envelope to post */
933 sev=tMPI_Prep_send_envelope(sevl, comm, src, dest, send_buf, send_count,
934 datatype, tag, nonblock);
937 printf("%5d: tMPI_Post_send (%d->%d, tag=%d)\n",
938 tMPI_This_threadnr(),
939 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
943 /* we post the envelope in the right list */
944 tMPI_Send_env_list_add_new(cur, &(dest->evs[src_threadnr]), sev);
952 static void tMPI_Wait_process_incoming(struct tmpi_thread *cur)
959 #if defined(TMPI_PROFILE)
960 tMPI_Profile_wait_start(cur);
962 /* we check for newly arrived send envelopes and finished send
964 check_id=tMPI_Event_wait( &(cur->p2p_event));
965 /* the outgoing_received items are handled 'automatically'
966 by the function calling this function */
967 #if defined(TMPI_PROFILE)
968 tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p);
970 n_handled = tMPI_Atomic_get(&(cur->ev_outgoing_received));
971 tMPI_Atomic_add_return( &(cur->ev_outgoing_received), -n_handled);
972 check_id -= n_handled;
976 /*int repl=check_id;*/
978 /* there were new send envelopes. Let's check them all */
979 for(i=0;i<Nthreads;i++)
981 struct envelope *sev_head;
983 #ifdef TMPI_LOCK_FREE_LISTS
984 /* Behold our lock-free shared linked list:
985 (see tMPI_Send_env_list_add_new for more info) */
986 struct envelope *evl_cas;
990 /* read old head atomically */
991 sev_head=(struct envelope*)
992 tMPI_Atomic_ptr_get( &(cur->evs[i].head_new) );
993 /* do the compare-and-swap to detach the list */
994 evl_cas=(struct envelope*)
995 tMPI_Atomic_ptr_cas(&(cur->evs[i].head_new), sev_head,
997 } while (evl_cas != sev_head);
999 tMPI_Spinlock_lock( &(cur->evs[i].lock_new) );
1000 sev_head=(struct send_envelope*)cur->evs[i].head_new;
1001 cur->evs[i].head_new=NULL; /* detach the list */
1002 tMPI_Spinlock_unlock( &(cur->evs[i].lock_new) );
1005 if (sev_head) /* there's a newly arrived send envelope from this
1008 struct envelope *sev=sev_head;
1009 struct envelope *prev_s=NULL;
1010 struct envelope *rev;
1012 /* first enable reversing order by creating a regular
1013 doubly-linked list from the singly-linked shared
1021 /* now walk through it backwards (in order of addition) */
1025 struct envelope *sevp=sev->prev;
1027 rev=tMPI_Recv_env_list_search_new(&(cur->evr), sev);
1030 tMPI_Xfer(cur, sev, rev);
1034 tMPI_Send_env_list_move_to_old( sev );
1041 tMPI_Event_process( &(cur->p2p_event), n_handled);
1044 static bool tMPI_Test_single(struct tmpi_thread *cur, struct tmpi_req_ *rq)
1046 struct envelope *ev=rq->ev;
1048 if (ev && !(rq->finished) )
1050 #ifdef USE_SEND_RECV_COPY_BUFFER
1051 if (ev->send && ev->using_cb)
1053 /* We buffer-copy. Just do the transfer to the buffer and
1054 return saying that we're done. It's now up to the
1055 receiver to return our envelope.*/
1056 /* do our transfer and are guaranteed a finished
1058 tMPI_Send_copy_buffer(ev, rq);
1059 /* get the results */
1060 rq->error=rq->ev->error;
1066 if( tMPI_Atomic_get( &(ev->state) ) >= env_finished )
1069 /* get the results */
1070 rq->error=rq->ev->error;
1071 tMPI_Set_req(ev, rq);
1072 /* and release the envelope. After this point, the envelope
1073 may be reused, so its contents shouldn't be relied on. */
1075 tMPI_Send_env_list_return(ev);
1077 tMPI_Free_env_list_return_recv( &(cur->envelopes), ev);
1081 return rq->finished;
1084 static void tMPI_Wait_single(struct tmpi_thread *cur, struct tmpi_req_ *rq)
1088 if (tMPI_Test_single(cur, rq))
1090 tMPI_Wait_process_incoming(cur);
1094 static bool tMPI_Test_multi(struct tmpi_thread *cur, struct tmpi_req_ *rqs,
1098 struct tmpi_req_ *creq=rqs;
1106 bool finished=tMPI_Test_single(cur, creq);
1109 /* now do the check */
1114 /* remove the request from the list we've been given. */
1116 creq->prev->next = creq->next;
1118 creq->next->prev = creq->prev;