2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
63 static struct envelope *tMPI_Free_env_list_fetch_recv(struct free_envelope_list
66 /* return an envelope to the free envelopes list */
67 static void tMPI_Free_env_list_return_recv(struct free_envelope_list *evl,
68 struct envelope *rev);
72 /* send envelope lists: */
74 /* get a new envelope from the send list's free envelope list */
75 static struct envelope* tMPI_Send_env_list_fetch_new(struct
76 send_envelope_list *evl);
78 /* return a send envelope to the send list's free envelope list,
79 (to be used by the sending thread, who owns the send_envelope_list) */
80 static void tMPI_Send_env_list_return(struct envelope *ev);
81 #ifdef USE_SEND_RECV_COPY_BUFFER
82 /* return a send envelope to the sender's send list.
83 (to be used by the receiving thread). */
84 static void tMPI_Send_env_list_rts(struct envelope *sev);
91 /* remove a send envelope from its head_old list. Does not lock */
92 static void tMPI_Send_env_list_remove_old(struct envelope *sev);
94 /* add a send envelope to the new envelopes queue in a list */
95 static void tMPI_Send_env_list_add_new(struct tmpi_thread *cur,
96 struct send_envelope_list *evl,
97 struct envelope *sev);
98 /* move a send envelope to the old envelopes queue in a list.
99 Assumes that this is safe to do without interference
100 from other threads, i.e. the list it's in must have been
102 static void tMPI_Send_env_list_move_to_old(struct envelope *sev);
107 /* receive envelopes: */
108 /* add a receive envelope to a list */
109 static void tMPI_Recv_env_list_add(struct recv_envelope_list *evl,
110 struct envelope *ev);
111 /* remove a receive envelope from its list */
112 static void tMPI_Recv_env_list_remove(struct envelope *ev);
117 /* do the actual point-to-point transfer */
118 static void tMPI_Xfer(struct tmpi_thread *cur, struct envelope *sev,
119 struct envelope *rev);
124 /* Point-to-point communication protocol functions */
125 void tMPI_Free_env_list_init(struct free_envelope_list *evl, int N)
129 /* allocate the head element */
130 evl->recv_alloc_head = (struct envelope*)tMPI_Malloc(sizeof(struct envelope)
132 evl->head_recv = evl->recv_alloc_head;
134 for (i = 0; i < N; i++)
138 evl->head_recv[i].next = &(evl->head_recv[i+1]);
142 evl->head_recv[i].next = NULL;
144 evl->head_recv[i].rlist = NULL;
145 evl->head_recv[i].slist = NULL;
149 void tMPI_Free_env_list_destroy(struct free_envelope_list *evl)
151 free(evl->recv_alloc_head);
152 evl->head_recv = NULL;
153 evl->recv_alloc_head = NULL;
156 static struct envelope* tMPI_Free_env_list_fetch_recv(struct
157 free_envelope_list *evl)
159 struct envelope *ret;
162 /* TODO: make this do something better than crash */
163 fprintf(stderr, "Ran out of recv envelopes!!!!\n");
167 ret = evl->head_recv;
168 evl->head_recv = ret->next;
176 static void tMPI_Free_env_list_return_recv(struct free_envelope_list *evl,
177 struct envelope *rev)
182 rev->next = evl->head_recv;
183 evl->head_recv = rev;
195 /* tmpi_send_envelope_list functions */
197 void tMPI_Send_env_list_init(struct send_envelope_list *evl, int N)
200 #ifndef TMPI_LOCK_FREE_LISTS
201 tMPI_Spinlock_init( &(evl->lock_rts) );
202 tMPI_Spinlock_init( &(evl->lock_new) );
206 evl->alloc_head = (struct envelope*)tMPI_Malloc(sizeof(struct envelope)*N);
207 for (i = 0; i < N; i++)
209 evl->alloc_head[i].next = (i < (N-1)) ? &(evl->alloc_head[i+1]) : NULL;
210 evl->alloc_head[i].prev = NULL;
211 evl->alloc_head[i].slist = evl;
212 evl->alloc_head[i].rlist = NULL;
213 #ifdef USE_SEND_RECV_COPY_BUFFER
214 evl->alloc_head[i].cb = (void*)tMPI_Malloc(sizeof(char)*COPY_BUFFER_SIZE);
218 #ifdef TMPI_LOCK_FREE_LISTS
219 tMPI_Atomic_ptr_set(&(evl->head_new), NULL);
220 tMPI_Atomic_ptr_set(&(evl->head_rts), NULL);
222 evl->head_new = NULL;
223 evl->head_rts = NULL;
225 evl->head_free = &(evl->alloc_head[1]);
226 /* initialize the head_old circular list with dummy element */
227 evl->head_old = evl->alloc_head; /* the first element is a dummy */
228 evl->head_old->next = evl->head_old;
229 evl->head_old->prev = evl->head_old;
232 void tMPI_Send_env_list_destroy(struct send_envelope_list *evl)
234 #ifdef USE_SEND_RECV_COPY_BUFFER
236 for (i = 0; i < evl->Nalloc; i++)
238 free(evl->alloc_head[i].cb);
241 free(evl->alloc_head);
242 evl->alloc_head = NULL;
243 #ifdef TMPI_LOCK_FREE_LISTS
244 tMPI_Atomic_ptr_set(&(evl->head_new), NULL);
246 evl->head_new = NULL;
248 evl->head_old = NULL; /* make it crash if used after tMPI_Finalize */
252 static struct envelope* tMPI_Send_env_list_fetch_new(struct
253 send_envelope_list *evl)
255 struct envelope *ret;
259 /* first check whether any envelopes were returned to sender */
260 #ifdef TMPI_LOCK_FREE_LISTS
261 if ((ret = (struct envelope*)tMPI_Atomic_ptr_get(&(evl->head_rts))))
266 /* detach the list */
267 #ifdef TMPI_LOCK_FREE_LISTS
268 /* we detach by swapping what we expect the pointer value to be,
269 with NULL. If there were a cross-platform way to atomically
270 swap without checking, we could do that, too. */
271 while (!tMPI_Atomic_ptr_cas( &(evl->head_rts), ret, NULL ))
273 ret = (struct envelope*)tMPI_Atomic_ptr_get(&(evl->head_rts));
276 tMPI_Spinlock_lock( &(evl->lock_rts) );
278 evl->head_rts = NULL;
279 tMPI_Spinlock_unlock( &(evl->lock_rts) );
281 /* now add the items to head_free */
284 struct envelope *next = ret->next;
285 ret->next = evl->head_free;
286 evl->head_free = ret;
291 /* get the last free one off the list */
292 ret = evl->head_free;
294 #ifdef USE_SEND_RECV_COPY_BUFFER
296 /* There are no free send envelopes, so all we can do is handle
297 incoming requests until we get a free send envelope. */
298 printf("Ran out of send envelopes!!\n");
299 tMPI_Wait_process_incoming(tMPI_Get_current());
303 /* If this happens, it most likely indicates a bug in the
304 calling program. We could fix the situation by waiting,
305 but that would most likely lead to deadlocks - even
306 more difficult to debug than this. */
307 fprintf(stderr, "Ran out of send envelopes!!!!\n");
314 evl->head_free = ret->next;
325 static void tMPI_Send_env_list_return(struct envelope *sev)
327 struct send_envelope_list *evl = sev->slist;
329 sev->next = evl->head_free;
330 evl->head_free = sev;
334 #ifdef USE_SEND_RECV_COPY_BUFFER
335 static void tMPI_Send_env_list_rts(struct envelope *sev)
337 struct send_envelope_list *evl = sev->slist;
338 #ifdef TMPI_LOCK_FREE_LISTS
339 struct envelope *sevn;
343 sevn = (struct envelope*)tMPI_Atomic_ptr_get(&evl->head_rts);
345 /* the cmpxchg operation is a memory fence, so we shouldn't need
346 to worry about out-of-order evaluation */
348 while (!tMPI_Atomic_ptr_cas( &(evl->head_rts), sevn, sev ));
350 tMPI_Spinlock_lock( &(evl->lock_rts) );
351 ev->next = (struct envelope*)evl->head_rts;
353 tMPI_Spinlock_unlock( &(evl->lock_rts) );
360 static void tMPI_Send_env_list_remove_old(struct envelope *sev)
362 /* pretty straighforward because it isn't a shared list */
365 sev->next->prev = sev->prev;
369 sev->prev->next = sev->next;
376 static void tMPI_Send_env_list_add_new(struct tmpi_thread *cur,
377 struct send_envelope_list *evl,
378 struct envelope *sev)
380 #ifdef TMPI_LOCK_FREE_LISTS
381 struct envelope *evl_head_new_orig;
385 #ifdef TMPI_LOCK_FREE_LISTS
386 /* behold our lock-free shared linked list:
387 (it's actually quite simple because we only do operations at the head
388 of the list, either adding them - such as here - or detaching the whole
392 /* read the old head atomically */
393 evl_head_new_orig = (struct envelope*) tMPI_Atomic_ptr_get(
395 /* set our envelope to have that as its next */
396 sev->next = evl_head_new_orig;
397 /* do the compare-and-swap.
398 this operation is a memory fence, so we shouldn't need
399 to worry about out-of-order stores. If it returns false,
400 somebody else got there before us: */
402 while (!tMPI_Atomic_ptr_cas(&(evl->head_new), evl_head_new_orig, sev));
405 tMPI_Spinlock_lock( &(evl->lock_new) );
406 /* we add to the start of the list */
407 sev->next = (struct send_envelope*)evl->head_new;
408 /* actually attach it to the list */
410 tMPI_Spinlock_unlock( &(evl->lock_new) );
413 #if defined(TMPI_PROFILE)
414 tMPI_Profile_wait_start(cur);
416 /* signal to the thread that there is a new envelope */
417 tMPI_Event_signal( &(sev->dest->p2p_event) );
418 #if defined(TMPI_PROFILE)
419 tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p_signal);
423 static void tMPI_Send_env_list_move_to_old(struct envelope *sev)
425 struct send_envelope_list *evl = sev->slist;
427 /* remove from old list. We assume the list has been detached! */
430 sev->next->prev = sev->prev;
434 sev->prev->next = sev->next;
437 /* we add to the end of the list */
438 sev->next = evl->head_old;
439 sev->prev = evl->head_old->prev;
441 sev->next->prev = sev;
442 sev->prev->next = sev;
452 /* tmpi_recv_envelope_list functions */
454 void tMPI_Recv_env_list_init(struct recv_envelope_list *evl)
456 evl->head = &(evl->dummy);
457 evl->head->prev = evl->head;
458 evl->head->next = evl->head;
461 void tMPI_Recv_env_list_destroy(struct recv_envelope_list *evl)
466 static void tMPI_Recv_env_list_add(struct recv_envelope_list *evl,
467 struct envelope *rev)
470 /* we add to the end of the list */
471 rev->next = evl->head;
472 rev->prev = evl->head->prev;
474 rev->next->prev = rev;
475 rev->prev->next = rev;
478 static void tMPI_Recv_env_list_remove(struct envelope *rev)
482 rev->next->prev = rev->prev;
486 rev->prev->next = rev->next;
500 /* tmpi_req functions */
502 void tMPI_Req_list_init(struct req_list *rl, int N_reqs)
506 rl->alloc_head = (struct tmpi_req_*)tMPI_Malloc(
507 sizeof(struct tmpi_req_)*N_reqs);
508 rl->head = rl->alloc_head;
509 for (i = 0; i < N_reqs; i++)
513 rl->head[i].prev = NULL;
517 rl->head[i].prev = &(rl->head[i-1]);
522 rl->head[i].next = NULL;
526 rl->head[i].next = &(rl->head[i+1]);
531 void tMPI_Req_list_destroy(struct req_list *rl)
533 free(rl->alloc_head);
535 rl->alloc_head = NULL;
540 struct tmpi_req_ *tMPI_Get_req(struct req_list *rl)
542 struct tmpi_req_ *req = rl->head;
545 /* we don't need locks here because requests are a per-thread property */
548 /* this could be fixed */
549 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_REQUESTS);
552 rl->head = req->next;
558 void tMPI_Return_req(struct req_list *rl, struct tmpi_req_ *req)
560 req->next = rl->head;
567 void tMPI_Req_init(struct tmpi_req_ *rq, struct envelope *ev)
570 rq->finished = FALSE;
574 rq->source = ev->src;
576 rq->tag = TMPI_ANY_TAG;
577 rq->error = TMPI_SUCCESS;
579 rq->cancelled = FALSE;
587 /* Point-to-point communication protocol functions */
591 void tMPI_Set_req(struct envelope *ev, struct tmpi_req_ *req)
593 req->source = ev->src;
594 req->comm = ev->comm;
596 req->error = ev->error;
599 if (tMPI_Atomic_get(&(ev->state)) > env_unmatched)
601 req->transferred = ev->bufsize;
605 req->transferred = 0;
610 if (tMPI_Atomic_get(&(ev->state)) == env_finished)
612 req->transferred = ev->bufsize;
616 req->transferred = 0;
621 void tMPI_Set_status(struct tmpi_req_ *req, tMPI_Status *st)
625 st->TMPI_SOURCE = tMPI_Comm_seek_rank(req->comm, req->source);
626 st->TMPI_TAG = req->tag;
627 st->TMPI_ERROR = req->error;
628 st->transferred = req->transferred;
629 st->cancelled = req->cancelled;
634 tmpi_bool tMPI_Envelope_matches(const struct envelope *sev,
635 const struct envelope *rev)
638 printf("%5d: tMPI_Envelope_matches (%d->%d)==(%d->%d), tag=(%d==%d), \n datatype=(%ld==%ld), comm=(%ld,%ld),\n finished=(%d==%d)\n",
639 tMPI_This_threadnr(),
640 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
641 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
642 (int)(sev->tag), (int)(rev->tag),
643 (long int)sev->datatype, (long int)rev->datatype,
644 (long int)sev->comm, (long int)rev->comm,
645 (int)sev->state.value, (int)rev->state.value);
648 if ( ( (rev->tag == TMPI_ANY_TAG) || (rev->tag == sev->tag) ) &&
649 ( sev->comm == rev->comm ) &&
650 ( (!rev->src) || (rev->src == sev->src) ) &&
651 ( sev->dest == rev->dest ) &&
652 ( sev->datatype == rev->datatype ) &&
653 ( sev->state.value < env_finished &&
654 rev->state.value == env_unmatched ) )
657 printf("%5d: (%d->%d) tag=%d found match\n",
658 tMPI_This_threadnr(),
659 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
671 struct envelope* tMPI_Send_env_list_search_old(struct send_envelope_list *evl,
672 struct envelope *rev)
674 struct envelope *sev;
676 sev = (struct envelope*)evl->head_old->next;
677 while (sev != evl->head_old)
679 if (tMPI_Envelope_matches(sev, rev))
681 /* remove the envelope */
682 tMPI_Send_env_list_remove_old(sev);
691 struct envelope* tMPI_Recv_env_list_search_new(struct recv_envelope_list *evl,
692 struct envelope *sev)
694 struct envelope *rev;
696 rev = evl->head->next;
697 while (rev != evl->head)
699 if (tMPI_Envelope_matches(sev, rev))
709 #ifdef USE_SEND_RECV_COPY_BUFFER
710 void tMPI_Send_copy_buffer(struct envelope *sev, struct tmpi_req_ *req)
713 /* Fill copy buffer, after having anounced its possible use */
715 /* in the special case of a zero buffer size, we don't do anything and
716 always let the receiver handle it */
717 if (sev->bufsize == 0)
722 /* first check whether the other side hasn't started yet */
723 state = tMPI_Atomic_get( &(sev->state) );
724 tMPI_Atomic_memory_barrier_acq();
725 if (state == env_unmatched)
728 memcpy(sev->cb, sev->buf, sev->bufsize);
729 /* now set state, if other side hasn't started copying yet. */
730 tMPI_Atomic_memory_barrier_rel();
731 if (tMPI_Atomic_cas( &(sev->state), env_unmatched, env_cb_available))
733 /* if it was originally unmatched, the receiver wasn't
734 copying the old buffer. We can don't need to wait,
735 and the receiver is going to clean up this envelope. */
737 printf("%5d: tMPI_Send_copy_buffer(%d->%d, tag=%d) completed\n",
738 tMPI_This_threadnr(),
739 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
746 /* and if we reached this point, the receiver had already started
747 copying, and we need to clean up the envelope ourselves.
749 we first need to wait until the receiver is finished copying. We
750 know this is a short wait (since the buffer was small enough to be
751 buffered in the first place), so we just spin-wait. */
752 while (tMPI_Atomic_get( &(sev->state) ) < env_cb_available)
755 tMPI_Atomic_memory_barrier_acq();
757 printf("%5d: tMPI_Send_copy_buffer(%d->%d, tag=%d) waiting-completed\n",
758 tMPI_This_threadnr(),
759 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest), (int)(sev->tag));
762 tMPI_Set_req(sev, req);
763 /* and now we clean up */
764 tMPI_Send_env_list_return(sev);
769 struct envelope* tMPI_Prep_send_envelope(struct send_envelope_list *evl,
771 struct tmpi_thread *src,
772 struct tmpi_thread *dest,
773 void *buf, int count,
774 tMPI_Datatype datatype,
775 int tag, tmpi_bool nonblock)
777 /* get an envelope from the send-envelope stack */
778 struct envelope *ev = tMPI_Send_env_list_fetch_new( evl );
781 ev->nonblock = nonblock;
789 ev->bufsize = count*datatype->size;
790 ev->datatype = datatype;
796 #ifdef USE_SEND_RECV_COPY_BUFFER
797 /* check whether we'll be double buffering */
798 ev->using_cb = (ev->bufsize < COPY_BUFFER_SIZE);
799 /* but don't do anything yet */
802 tMPI_Atomic_set(&(ev->state), env_unmatched);
804 ev->error = TMPI_SUCCESS;
807 tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
808 ev->error = TMPI_ERR_XFER_BUFSIZE;
814 struct envelope* tMPI_Prep_recv_envelope(struct tmpi_thread *cur,
816 struct tmpi_thread *src,
817 struct tmpi_thread *dest,
818 void *buf, int count,
819 tMPI_Datatype datatype, int tag,
822 /* get an envelope from the stack */
823 struct envelope *ev = tMPI_Free_env_list_fetch_recv( &(cur->envelopes) );
826 ev->nonblock = nonblock;
834 ev->bufsize = count*datatype->size;
835 ev->datatype = datatype;
842 tMPI_Atomic_set(&(ev->state), env_unmatched);
844 ev->error = TMPI_SUCCESS;
847 tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
848 ev->error = TMPI_ERR_XFER_BUFSIZE;
861 static void tMPI_Xfer(struct tmpi_thread *cur, struct envelope *sev,
862 struct envelope *rev)
864 #ifdef USE_SEND_RECV_COPY_BUFFER
865 /* we remove the sender's envelope only if we do the transfer, which
866 we always do if the buffer size = 0 */
867 tmpi_bool remove_sender = (sev->bufsize == 0);
870 printf("%5d: tMPI_Xfer (%d->%d, tag=%d) started\n",
871 tMPI_This_threadnr(),
872 tMPI_Threadnr(sev->src), tMPI_Threadnr(rev->dest), (int)(sev->tag));
875 /* first set data on the receiving end so status can be updated */
879 if (sev->bufsize) /* do the actual transfer */
881 void *sbuf = sev->buf; /* source buffer */
882 if (sev->bufsize > rev->bufsize)
884 tMPI_Error((rev->comm), TMPI_ERR_XFER_BUFSIZE);
885 tMPI_Atomic_set(&(rev->state), env_finished);
886 tMPI_Atomic_set(&(sev->state), env_finished);
887 rev->error = TMPI_ERR_XFER_BUFSIZE;
888 sev->error = TMPI_ERR_XFER_BUFSIZE;
892 #ifdef USE_SEND_RECV_COPY_BUFFER
895 /* check if the other side has already finished copying */
896 if (!tMPI_Atomic_cas( &(sev->state), env_unmatched, env_copying))
898 /* it has, and we're copying from the new buffer.
899 We're now also tasked with removing the envelope */
901 remove_sender = TRUE;
903 tMPI_Profile_count_buffered_p2p_xfer(cur);
909 if (!rev->buf || !sev->buf)
911 tMPI_Error((rev->comm), TMPI_ERR_BUF);
912 tMPI_Atomic_set(&(rev->state), env_finished);
913 tMPI_Atomic_set(&(sev->state), env_finished);
914 rev->error = TMPI_ERR_BUF;
915 sev->error = TMPI_ERR_BUF;
918 memcpy(rev->buf, sbuf, sev->bufsize);
920 tMPI_Profile_count_p2p_xfer(cur);
922 /* for status update */
924 rev->bufsize = sev->bufsize;
925 /* and mark that we're finished */
926 #if defined(TMPI_PROFILE)
928 tMPI_Profile_wait_start(cur);
930 tMPI_Atomic_set( &(rev->state), env_finished);
931 tMPI_Atomic_set( &(sev->state), env_finished);
933 /* signal to a potentially waiting thread that we're done. */
934 tMPI_Atomic_add_return( &(rev->src->ev_outgoing_received), 1);
935 tMPI_Event_signal(&(rev->src->p2p_event));
937 /* remove the receiving envelope if it's in a list */
938 tMPI_Recv_env_list_remove(rev);
939 #ifdef USE_SEND_RECV_COPY_BUFFER
942 tMPI_Send_env_list_rts(sev);
945 #if defined(TMPI_PROFILE)
946 tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p_signal);
952 printf("%5d: tMPI_Xfer (%d->%d, tag=%d) done\n",
953 tMPI_This_threadnr(),
954 tMPI_Threadnr(sev->src), tMPI_Threadnr(rev->dest), (int)(sev->tag));
965 struct envelope* tMPI_Post_match_recv(struct tmpi_thread *cur,
967 struct tmpi_thread *src,
968 void *recv_buf, int recv_count,
969 tMPI_Datatype datatype,
970 int tag, tmpi_bool nonblock)
972 struct tmpi_thread *dest = cur;
973 struct envelope *rev;
974 struct envelope *sev = NULL;
975 int src_threadnr = src ? tMPI_Threadnr(src) : Nthreads;
978 /* reserve an envelope to post */
979 rev = tMPI_Prep_recv_envelope(cur, comm, src, dest, recv_buf, recv_count,
980 datatype, tag, nonblock);
983 printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) started\n",
984 tMPI_This_threadnr(),
985 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest), (int)(rev->tag));
988 /* we now check the entire exisiting send queue */
991 sev = tMPI_Send_env_list_search_old( &(dest->evs[src_threadnr]), rev);
995 /* if we don't know the source, we look at all possible sources */
996 for (i = 0; i < Nthreads; i++)
998 sev = tMPI_Send_env_list_search_old(&(dest->evs[i]), rev);
1009 printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) found match\n",
1010 tMPI_This_threadnr(),
1011 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
1015 /* we found a matching send */
1016 tMPI_Xfer(cur, sev, rev);
1021 printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) no match\n",
1022 tMPI_This_threadnr(),
1023 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
1027 /* we post the envelope in the right list */
1028 tMPI_Recv_env_list_add( &(dest->evr), rev);
1036 struct envelope *tMPI_Post_send(struct tmpi_thread *cur,
1038 struct tmpi_thread *dest,
1039 void *send_buf, int send_count,
1040 tMPI_Datatype datatype, int tag,
1043 struct tmpi_thread *src = cur;
1044 struct envelope *sev;
1045 int src_threadnr = tMPI_Threadnr(src);
1046 struct send_envelope_list *sevl = &(dest->evs[src_threadnr]);
1048 /* reserve an envelope to post */
1049 sev = tMPI_Prep_send_envelope(sevl, comm, src, dest, send_buf, send_count,
1050 datatype, tag, nonblock);
1053 printf("%5d: tMPI_Post_send (%d->%d, tag=%d)\n",
1054 tMPI_This_threadnr(),
1055 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
1059 /* we post the envelope in the right list */
1060 tMPI_Send_env_list_add_new(cur, &(dest->evs[src_threadnr]), sev);
1068 void tMPI_Wait_process_incoming(struct tmpi_thread *cur)
1075 #if defined(TMPI_PROFILE)
1076 tMPI_Profile_wait_start(cur);
1078 /* we check for newly arrived send envelopes and finished send
1080 check_id = tMPI_Event_wait( &(cur->p2p_event));
1081 /* the outgoing_received items are handled 'automatically'
1082 by the function calling this function */
1083 #if defined(TMPI_PROFILE)
1084 tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p);
1086 n_handled = tMPI_Atomic_get(&(cur->ev_outgoing_received));
1087 tMPI_Atomic_add_return( &(cur->ev_outgoing_received), -n_handled);
1088 check_id -= n_handled;
1092 /*int repl=check_id;*/
1094 /* there were new send envelopes. Let's check them all */
1095 for (i = 0; i < Nthreads; i++)
1097 struct envelope *sev_head;
1099 #ifdef TMPI_LOCK_FREE_LISTS
1100 /* Behold our lock-free shared linked list:
1101 (see tMPI_Send_env_list_add_new for more info) */
1104 /* read old head atomically */
1105 sev_head = (struct envelope*)
1106 tMPI_Atomic_ptr_get( &(cur->evs[i].head_new) );
1107 /* do the compare-and-swap to detach the list */
1109 while (!tMPI_Atomic_ptr_cas(&(cur->evs[i].head_new), sev_head,
1112 tMPI_Spinlock_lock( &(cur->evs[i].lock_new) );
1113 sev_head = (struct send_envelope*)cur->evs[i].head_new;
1114 cur->evs[i].head_new = NULL; /* detach the list */
1115 tMPI_Spinlock_unlock( &(cur->evs[i].lock_new) );
1118 if (sev_head) /* there's a newly arrived send envelope from this
1121 struct envelope *sev = sev_head;
1122 struct envelope *prev_s = NULL;
1123 struct envelope *rev;
1125 /* first enable reversing order by creating a regular
1126 doubly-linked list from the singly-linked shared
1134 /* now walk through it backwards (in order of addition) */
1138 struct envelope *sevp = sev->prev;
1140 rev = tMPI_Recv_env_list_search_new(&(cur->evr), sev);
1143 tMPI_Xfer(cur, sev, rev);
1147 tMPI_Send_env_list_move_to_old( sev );
1154 tMPI_Event_process( &(cur->p2p_event), n_handled);
1157 tmpi_bool tMPI_Test_single(struct tmpi_thread *cur, struct tmpi_req_ *rq)
1159 struct envelope *ev = rq->ev;
1161 if (ev && !(rq->finished) )
1163 #ifdef USE_SEND_RECV_COPY_BUFFER
1164 if (ev->send && ev->using_cb)
1166 /* We buffer-copy. Just do the transfer to the buffer and
1167 return saying that we're done. It's now up to the
1168 receiver to return our envelope.*/
1169 /* do our transfer and are guaranteed a finished
1171 tMPI_Send_copy_buffer(ev, rq);
1172 /* get the results */
1173 rq->error = rq->ev->error;
1174 rq->finished = TRUE;
1179 if (tMPI_Atomic_get( &(ev->state) ) >= env_finished)
1181 rq->finished = TRUE;
1182 /* get the results */
1183 rq->error = rq->ev->error;
1184 tMPI_Set_req(ev, rq);
1185 /* and release the envelope. After this point, the envelope
1186 may be reused, so its contents shouldn't be relied on. */
1189 tMPI_Send_env_list_return(ev);
1193 tMPI_Free_env_list_return_recv( &(cur->envelopes), ev);
1198 return rq->finished;
1201 void tMPI_Wait_single(struct tmpi_thread *cur, struct tmpi_req_ *rq)
1205 if (tMPI_Test_single(cur, rq))
1209 tMPI_Wait_process_incoming(cur);
1214 tmpi_bool tMPI_Test_multi(struct tmpi_thread *cur, struct tmpi_req_ *rqs,
1215 tmpi_bool *any_done)
1217 tmpi_bool all_done = TRUE;
1218 struct tmpi_req_ *creq = rqs;
1228 tmpi_bool finished = tMPI_Test_single(cur, creq);
1231 /* now do the check */
1238 /* remove the request from the list we've been given. */
1241 creq->prev->next = creq->next;
1245 creq->next->prev = creq->prev;