2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
63 static struct envelope *tMPI_Free_env_list_fetch_recv(struct free_envelope_list
66 /* return an envelope to the free envelopes list */
67 static void tMPI_Free_env_list_return_recv(struct free_envelope_list *evl,
68 struct envelope *rev);
72 /* send envelope lists: */
74 /* get a new envelope from the send list's free envelope list */
75 static struct envelope* tMPI_Send_env_list_fetch_new(struct
76 send_envelope_list *evl);
78 /* return a send envelope to the send list's free envelope list,
79 (to be used by the sending thread, who owns the send_envelope_list) */
80 static void tMPI_Send_env_list_return(struct envelope *ev);
81 #ifdef USE_SEND_RECV_COPY_BUFFER
82 /* return a send envelope to the sender's send list.
83 (to be used by the receiving thread). */
84 static void tMPI_Send_env_list_rts(struct envelope *sev);
91 /* remove a send envelope from its head_old list. Does not lock */
92 static void tMPI_Send_env_list_remove_old(struct envelope *sev);
94 /* add a send envelope to the new envelopes queue in a list */
95 static void tMPI_Send_env_list_add_new(struct tmpi_thread *cur,
96 struct send_envelope_list *evl,
97 struct envelope *sev);
98 /* move a send envelope to the old envelopes queue in a list.
99 Assumes that this is safe to do without interference
100 from other threads, i.e. the list it's in must have been
102 static void tMPI_Send_env_list_move_to_old(struct envelope *sev);
107 /* receive envelopes: */
108 /* add a receive envelope to a list */
109 static void tMPI_Recv_env_list_add(struct recv_envelope_list *evl,
110 struct envelope *ev);
111 /* remove a receive envelope from its list */
112 static void tMPI_Recv_env_list_remove(struct envelope *ev);
117 /* do the actual point-to-point transfer */
118 static void tMPI_Xfer(struct tmpi_thread *cur, struct envelope *sev,
119 struct envelope *rev);
124 /* Point-to-point communication protocol functions */
125 int tMPI_Free_env_list_init(struct free_envelope_list *evl, int N)
129 /* allocate the head element */
130 evl->recv_alloc_head = (struct envelope*)tMPI_Malloc(sizeof(struct envelope)
132 if (evl->recv_alloc_head == NULL)
134 return TMPI_ERR_NO_MEM;
136 evl->head_recv = evl->recv_alloc_head;
138 for (i = 0; i < N; i++)
142 evl->head_recv[i].next = &(evl->head_recv[i+1]);
146 evl->head_recv[i].next = NULL;
148 evl->head_recv[i].rlist = NULL;
149 evl->head_recv[i].slist = NULL;
154 void tMPI_Free_env_list_destroy(struct free_envelope_list *evl)
156 free(evl->recv_alloc_head);
157 evl->head_recv = NULL;
158 evl->recv_alloc_head = NULL;
161 static struct envelope* tMPI_Free_env_list_fetch_recv(struct
162 free_envelope_list *evl)
164 struct envelope *ret;
167 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_ENVELOPES);
171 ret = evl->head_recv;
172 evl->head_recv = ret->next;
180 static void tMPI_Free_env_list_return_recv(struct free_envelope_list *evl,
181 struct envelope *rev)
186 rev->next = evl->head_recv;
187 evl->head_recv = rev;
199 /* tmpi_send_envelope_list functions */
201 int tMPI_Send_env_list_init(struct send_envelope_list *evl, int N)
204 #ifndef TMPI_LOCK_FREE_LISTS
205 tMPI_Spinlock_init( &(evl->lock_rts) );
206 tMPI_Spinlock_init( &(evl->lock_new) );
210 evl->alloc_head = (struct envelope*)tMPI_Malloc(sizeof(struct envelope)*N);
211 if (evl->alloc_head == NULL)
213 return TMPI_ERR_NO_MEM;
215 for (i = 0; i < N; i++)
217 evl->alloc_head[i].next = (i < (N-1)) ? &(evl->alloc_head[i+1]) : NULL;
218 evl->alloc_head[i].prev = NULL;
219 evl->alloc_head[i].slist = evl;
220 evl->alloc_head[i].rlist = NULL;
221 #ifdef USE_SEND_RECV_COPY_BUFFER
222 evl->alloc_head[i].cb = (void*)tMPI_Malloc(sizeof(char)*
224 if (evl->alloc_head[i].cb == NULL)
226 return TMPI_ERR_NO_MEM;
231 #ifdef TMPI_LOCK_FREE_LISTS
232 tMPI_Atomic_ptr_set(&(evl->head_new), NULL);
233 tMPI_Atomic_ptr_set(&(evl->head_rts), NULL);
235 evl->head_new = NULL;
236 evl->head_rts = NULL;
238 evl->head_free = &(evl->alloc_head[1]);
239 /* initialize the head_old circular list with dummy element */
240 evl->head_old = evl->alloc_head; /* the first element is a dummy */
241 evl->head_old->next = evl->head_old;
242 evl->head_old->prev = evl->head_old;
246 void tMPI_Send_env_list_destroy(struct send_envelope_list *evl)
248 #ifdef USE_SEND_RECV_COPY_BUFFER
250 for (i = 0; i < evl->Nalloc; i++)
252 free(evl->alloc_head[i].cb);
255 free(evl->alloc_head);
256 evl->alloc_head = NULL;
257 #ifdef TMPI_LOCK_FREE_LISTS
258 tMPI_Atomic_ptr_set(&(evl->head_new), NULL);
260 evl->head_new = NULL;
262 evl->head_old = NULL; /* make it crash if used after tMPI_Finalize */
266 static struct envelope* tMPI_Send_env_list_fetch_new(struct
267 send_envelope_list *evl)
269 struct envelope *ret;
273 /* first check whether any envelopes were returned to sender */
274 #ifdef TMPI_LOCK_FREE_LISTS
275 if ((ret = (struct envelope*)tMPI_Atomic_ptr_get(&(evl->head_rts))))
280 /* detach the list */
281 #ifdef TMPI_LOCK_FREE_LISTS
282 /* we detach by swapping what we expect the pointer value to be,
283 with NULL. If there were a cross-platform way to atomically
284 swap without checking, we could do that, too. */
285 while (!tMPI_Atomic_ptr_cas( &(evl->head_rts), ret, NULL ))
287 ret = (struct envelope*)tMPI_Atomic_ptr_get(&(evl->head_rts));
290 tMPI_Spinlock_lock( &(evl->lock_rts) );
292 evl->head_rts = NULL;
293 tMPI_Spinlock_unlock( &(evl->lock_rts) );
295 /* now add the items to head_free */
298 struct envelope *next = ret->next;
299 ret->next = evl->head_free;
300 evl->head_free = ret;
305 /* get the last free one off the list */
306 ret = evl->head_free;
308 #ifdef USE_SEND_RECV_COPY_BUFFER
310 /* There are no free send envelopes, so all we can do is handle
311 incoming requests until we get a free send envelope. */
312 #if defined(TMPI_DEBUG) || defined(TMPI_WARNINGS)
313 printf("Ran out of send envelopes!!\n");
316 tMPI_Wait_process_incoming(tMPI_Get_current());
320 /* If this happens, it most likely indicates a bug in the
321 calling program. We could fix the situation by waiting,
322 but that would most likely lead to deadlocks - even
323 more difficult to debug than this. */
324 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_ENVELOPES);
331 evl->head_free = ret->next;
342 static void tMPI_Send_env_list_return(struct envelope *sev)
344 struct send_envelope_list *evl = sev->slist;
346 sev->next = evl->head_free;
347 evl->head_free = sev;
351 #ifdef USE_SEND_RECV_COPY_BUFFER
352 static void tMPI_Send_env_list_rts(struct envelope *sev)
354 struct send_envelope_list *evl = sev->slist;
355 #ifdef TMPI_LOCK_FREE_LISTS
356 struct envelope *sevn;
360 sevn = (struct envelope*)tMPI_Atomic_ptr_get(&evl->head_rts);
362 /* the cmpxchg operation is a memory fence, so we shouldn't need
363 to worry about out-of-order evaluation */
365 while (!tMPI_Atomic_ptr_cas( &(evl->head_rts), sevn, sev ));
367 tMPI_Spinlock_lock( &(evl->lock_rts) );
368 ev->next = (struct envelope*)evl->head_rts;
370 tMPI_Spinlock_unlock( &(evl->lock_rts) );
377 static void tMPI_Send_env_list_remove_old(struct envelope *sev)
379 /* pretty straighforward because it isn't a shared list */
382 sev->next->prev = sev->prev;
386 sev->prev->next = sev->next;
393 static void tMPI_Send_env_list_add_new(struct tmpi_thread *cur,
394 struct send_envelope_list *evl,
395 struct envelope *sev)
397 #ifdef TMPI_LOCK_FREE_LISTS
398 struct envelope *evl_head_new_orig;
402 #ifdef TMPI_LOCK_FREE_LISTS
403 /* behold our lock-free shared linked list:
404 (it's actually quite simple because we only do operations at the head
405 of the list, either adding them - such as here - or detaching the whole
409 /* read the old head atomically */
410 evl_head_new_orig = (struct envelope*) tMPI_Atomic_ptr_get(
412 /* set our envelope to have that as its next */
413 sev->next = evl_head_new_orig;
414 /* do the compare-and-swap.
415 this operation is a memory fence, so we shouldn't need
416 to worry about out-of-order stores. If it returns false,
417 somebody else got there before us: */
419 while (!tMPI_Atomic_ptr_cas(&(evl->head_new), evl_head_new_orig, sev));
422 tMPI_Spinlock_lock( &(evl->lock_new) );
423 /* we add to the start of the list */
424 sev->next = (struct send_envelope*)evl->head_new;
425 /* actually attach it to the list */
427 tMPI_Spinlock_unlock( &(evl->lock_new) );
430 #if defined(TMPI_PROFILE)
431 tMPI_Profile_wait_start(cur);
433 /* signal to the thread that there is a new envelope */
434 tMPI_Event_signal( &(sev->dest->p2p_event) );
435 #if defined(TMPI_PROFILE)
436 tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p_signal);
440 static void tMPI_Send_env_list_move_to_old(struct envelope *sev)
442 struct send_envelope_list *evl = sev->slist;
444 /* remove from old list. We assume the list has been detached! */
447 sev->next->prev = sev->prev;
451 sev->prev->next = sev->next;
454 /* we add to the end of the list */
455 sev->next = evl->head_old;
456 sev->prev = evl->head_old->prev;
458 sev->next->prev = sev;
459 sev->prev->next = sev;
469 /* tmpi_recv_envelope_list functions */
471 int tMPI_Recv_env_list_init(struct recv_envelope_list *evl)
473 evl->head = &(evl->dummy);
474 evl->head->prev = evl->head;
475 evl->head->next = evl->head;
480 void tMPI_Recv_env_list_destroy(struct recv_envelope_list *evl)
485 static void tMPI_Recv_env_list_add(struct recv_envelope_list *evl,
486 struct envelope *rev)
489 /* we add to the end of the list */
490 rev->next = evl->head;
491 rev->prev = evl->head->prev;
493 rev->next->prev = rev;
494 rev->prev->next = rev;
497 static void tMPI_Recv_env_list_remove(struct envelope *rev)
501 rev->next->prev = rev->prev;
505 rev->prev->next = rev->next;
519 /* tmpi_req functions */
521 int tMPI_Req_list_init(struct req_list *rl, int N_reqs)
525 rl->alloc_head = (struct tmpi_req_*)tMPI_Malloc(
526 sizeof(struct tmpi_req_)*N_reqs);
527 if (rl->alloc_head == 0)
529 return TMPI_ERR_NO_MEM;
531 rl->head = rl->alloc_head;
532 for (i = 0; i < N_reqs; i++)
536 rl->head[i].prev = NULL;
540 rl->head[i].prev = &(rl->head[i-1]);
545 rl->head[i].next = NULL;
549 rl->head[i].next = &(rl->head[i+1]);
555 void tMPI_Req_list_destroy(struct req_list *rl)
557 free(rl->alloc_head);
559 rl->alloc_head = NULL;
564 struct tmpi_req_ *tMPI_Get_req(struct req_list *rl)
566 struct tmpi_req_ *req = rl->head;
569 /* we don't need locks here because requests are a per-thread property */
572 /* this could be fixed */
573 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_REQUESTS);
576 rl->head = req->next;
582 void tMPI_Return_req(struct req_list *rl, struct tmpi_req_ *req)
584 req->next = rl->head;
591 void tMPI_Req_init(struct tmpi_req_ *rq, struct envelope *ev)
594 rq->finished = FALSE;
598 rq->source = ev->src;
600 rq->tag = TMPI_ANY_TAG;
601 rq->error = TMPI_SUCCESS;
603 rq->cancelled = FALSE;
611 /* Point-to-point communication protocol functions */
615 void tMPI_Set_req(struct envelope *ev, struct tmpi_req_ *req)
617 req->source = ev->src;
618 req->comm = ev->comm;
620 req->error = ev->error;
623 if (tMPI_Atomic_get(&(ev->state)) > env_unmatched)
625 req->transferred = ev->bufsize;
629 req->transferred = 0;
634 if (tMPI_Atomic_get(&(ev->state)) == env_finished)
636 req->transferred = ev->bufsize;
640 req->transferred = 0;
645 void tMPI_Set_status(struct tmpi_req_ *req, tMPI_Status *st)
649 st->TMPI_SOURCE = tMPI_Comm_seek_rank(req->comm, req->source);
650 st->TMPI_TAG = req->tag;
651 st->TMPI_ERROR = req->error;
652 st->transferred = req->transferred;
653 st->cancelled = req->cancelled;
658 tmpi_bool tMPI_Envelope_matches(const struct envelope *sev,
659 const struct envelope *rev)
662 printf("%5d: tMPI_Envelope_matches (%d->%d)==(%d->%d), tag=(%d==%d), \n datatype=(%ld==%ld), comm=(%ld,%ld),\n finished=(%d==%d)\n",
663 tMPI_This_threadnr(),
664 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
665 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
666 (int)(sev->tag), (int)(rev->tag),
667 (long int)sev->datatype, (long int)rev->datatype,
668 (long int)sev->comm, (long int)rev->comm,
669 (int)sev->state.value, (int)rev->state.value);
672 if ( ( (rev->tag == TMPI_ANY_TAG) || (rev->tag == sev->tag) ) &&
673 ( sev->comm == rev->comm ) &&
674 ( (!rev->src) || (rev->src == sev->src) ) &&
675 ( sev->dest == rev->dest ) &&
676 ( sev->datatype == rev->datatype ) &&
677 ( tMPI_Atomic_get(&(sev->state)) < env_finished &&
678 tMPI_Atomic_get(&(rev->state)) == env_unmatched ) )
681 printf("%5d: (%d->%d) tag=%d found match\n",
682 tMPI_This_threadnr(),
683 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
695 struct envelope* tMPI_Send_env_list_search_old(struct send_envelope_list *evl,
696 struct envelope *rev)
698 struct envelope *sev;
700 sev = (struct envelope*)evl->head_old->next;
701 while (sev != evl->head_old)
703 if (tMPI_Envelope_matches(sev, rev))
705 /* remove the envelope */
706 tMPI_Send_env_list_remove_old(sev);
715 struct envelope* tMPI_Recv_env_list_search_new(struct recv_envelope_list *evl,
716 struct envelope *sev)
718 struct envelope *rev;
720 rev = evl->head->next;
721 while (rev != evl->head)
723 if (tMPI_Envelope_matches(sev, rev))
733 #ifdef USE_SEND_RECV_COPY_BUFFER
734 void tMPI_Send_copy_buffer(struct envelope *sev, struct tmpi_req_ *req)
737 /* Fill copy buffer, after having anounced its possible use */
739 /* in the special case of a zero buffer size, we don't do anything and
740 always let the receiver handle it */
741 if (sev->bufsize == 0)
746 /* first check whether the other side hasn't started yet */
747 state = tMPI_Atomic_get( &(sev->state) );
748 tMPI_Atomic_memory_barrier_acq();
749 if (state == env_unmatched)
752 memcpy(sev->cb, sev->buf, sev->bufsize);
753 /* now set state, if other side hasn't started copying yet. */
754 tMPI_Atomic_memory_barrier_rel();
755 if (tMPI_Atomic_cas( &(sev->state), env_unmatched, env_cb_available))
757 /* if it was originally unmatched, the receiver wasn't
758 copying the old buffer. We can don't need to wait,
759 and the receiver is going to clean up this envelope. */
761 printf("%5d: tMPI_Send_copy_buffer(%d->%d, tag=%d) completed\n",
762 tMPI_This_threadnr(),
763 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
770 /* and if we reached this point, the receiver had already started
771 copying, and we need to clean up the envelope ourselves.
773 we first need to wait until the receiver is finished copying. We
774 know this is a short wait (since the buffer was small enough to be
775 buffered in the first place), so we just spin-wait. */
776 tMPI_Atomic_memory_barrier(); /* a full barrier to make sure that the
777 sending doesn't interfere with the
779 while (tMPI_Atomic_get( &(sev->state) ) < env_cb_available)
781 tMPI_Atomic_memory_barrier_acq();
783 tMPI_Atomic_memory_barrier_acq();
785 printf("%5d: tMPI_Send_copy_buffer(%d->%d, tag=%d) waiting-completed\n",
786 tMPI_This_threadnr(),
787 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest), (int)(sev->tag));
790 tMPI_Set_req(sev, req);
791 /* and now we clean up */
792 tMPI_Send_env_list_return(sev);
797 struct envelope* tMPI_Prep_send_envelope(struct send_envelope_list *evl,
799 struct tmpi_thread *src,
800 struct tmpi_thread *dest,
801 void *buf, int count,
802 tMPI_Datatype datatype,
803 int tag, tmpi_bool nonblock)
805 /* get an envelope from the send-envelope stack */
806 struct envelope *ev = tMPI_Send_env_list_fetch_new( evl );
813 ev->nonblock = nonblock;
821 ev->bufsize = count*datatype->size;
822 ev->datatype = datatype;
828 #ifdef USE_SEND_RECV_COPY_BUFFER
829 /* check whether we'll be double buffering */
830 ev->using_cb = (ev->bufsize < COPY_BUFFER_SIZE);
831 /* but don't do anything yet */
834 tMPI_Atomic_set(&(ev->state), env_unmatched);
836 ev->error = TMPI_SUCCESS;
839 tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
840 ev->error = TMPI_ERR_XFER_BUFSIZE;
846 struct envelope* tMPI_Prep_recv_envelope(struct tmpi_thread *cur,
848 struct tmpi_thread *src,
849 struct tmpi_thread *dest,
850 void *buf, int count,
851 tMPI_Datatype datatype, int tag,
854 /* get an envelope from the stack */
855 struct envelope *ev = tMPI_Free_env_list_fetch_recv( &(cur->envelopes) );
862 ev->nonblock = nonblock;
870 ev->bufsize = count*datatype->size;
871 ev->datatype = datatype;
878 tMPI_Atomic_set(&(ev->state), env_unmatched);
880 ev->error = TMPI_SUCCESS;
883 tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
884 ev->error = TMPI_ERR_XFER_BUFSIZE;
897 static void tMPI_Xfer(struct tmpi_thread *cur, struct envelope *sev,
898 struct envelope *rev)
900 #ifdef USE_SEND_RECV_COPY_BUFFER
901 /* we remove the sender's envelope only if we do the transfer, which
902 we always do if the buffer size = 0 */
903 tmpi_bool remove_sender = (sev->bufsize == 0);
906 printf("%5d: tMPI_Xfer (%d->%d, tag=%d) started\n",
907 tMPI_This_threadnr(),
908 tMPI_Threadnr(sev->src), tMPI_Threadnr(rev->dest), (int)(sev->tag));
911 /* first set data on the receiving end so status can be updated */
915 if (sev->bufsize) /* do the actual transfer */
917 void *sbuf = sev->buf; /* source buffer */
918 if (sev->bufsize > rev->bufsize)
920 tMPI_Error((rev->comm), TMPI_ERR_XFER_BUFSIZE);
921 tMPI_Atomic_set(&(rev->state), env_finished);
922 tMPI_Atomic_set(&(sev->state), env_finished);
923 rev->error = TMPI_ERR_XFER_BUFSIZE;
924 sev->error = TMPI_ERR_XFER_BUFSIZE;
928 #ifdef USE_SEND_RECV_COPY_BUFFER
931 /* check if the other side has already finished copying */
932 if (!tMPI_Atomic_cas( &(sev->state), env_unmatched, env_copying))
934 /* it has, and we're copying from the new buffer.
935 We're now also tasked with removing the envelope */
937 remove_sender = TRUE;
939 tMPI_Profile_count_buffered_p2p_xfer(cur);
945 if (!rev->buf || !sev->buf)
947 tMPI_Error((rev->comm), TMPI_ERR_BUF);
948 tMPI_Atomic_set(&(rev->state), env_finished);
949 tMPI_Atomic_set(&(sev->state), env_finished);
950 rev->error = TMPI_ERR_BUF;
951 sev->error = TMPI_ERR_BUF;
954 memcpy(rev->buf, sbuf, sev->bufsize);
956 tMPI_Profile_count_p2p_xfer(cur);
958 /* for status update */
960 rev->bufsize = sev->bufsize;
961 /* and mark that we're finished */
962 #if defined(TMPI_PROFILE)
964 tMPI_Profile_wait_start(cur);
966 tMPI_Atomic_set( &(rev->state), env_finished);
967 tMPI_Atomic_set( &(sev->state), env_finished);
969 /* signal to a potentially waiting thread that we're done. */
970 tMPI_Atomic_fetch_add( &(rev->src->ev_outgoing_received), 1);
971 tMPI_Event_signal(&(rev->src->p2p_event));
973 /* remove the receiving envelope if it's in a list */
974 tMPI_Recv_env_list_remove(rev);
975 #ifdef USE_SEND_RECV_COPY_BUFFER
978 tMPI_Send_env_list_rts(sev);
981 #if defined(TMPI_PROFILE)
982 tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p_signal);
988 printf("%5d: tMPI_Xfer (%d->%d, tag=%d) done\n",
989 tMPI_This_threadnr(),
990 tMPI_Threadnr(sev->src), tMPI_Threadnr(rev->dest), (int)(sev->tag));
1001 struct envelope* tMPI_Post_match_recv(struct tmpi_thread *cur,
1003 struct tmpi_thread *src,
1004 void *recv_buf, int recv_count,
1005 tMPI_Datatype datatype,
1006 int tag, tmpi_bool nonblock)
1008 struct tmpi_thread *dest = cur;
1009 struct envelope *rev;
1010 struct envelope *sev = NULL;
1011 int src_threadnr = src ? tMPI_Threadnr(src) : Nthreads;
1014 /* reserve an envelope to post */
1015 rev = tMPI_Prep_recv_envelope(cur, comm, src, dest, recv_buf, recv_count,
1016 datatype, tag, nonblock);
1023 printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) started\n",
1024 tMPI_This_threadnr(),
1025 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest), (int)(rev->tag));
1028 /* we now check the entire exisiting send queue */
1031 sev = tMPI_Send_env_list_search_old( &(dest->evs[src_threadnr]), rev);
1035 /* if we don't know the source, we look at all possible sources */
1036 for (i = 0; i < Nthreads; i++)
1038 sev = tMPI_Send_env_list_search_old(&(dest->evs[i]), rev);
1049 printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) found match\n",
1050 tMPI_This_threadnr(),
1051 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
1055 /* we found a matching send */
1056 tMPI_Xfer(cur, sev, rev);
1061 printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) no match\n",
1062 tMPI_This_threadnr(),
1063 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
1067 /* we post the envelope in the right list */
1068 tMPI_Recv_env_list_add( &(dest->evr), rev);
1076 struct envelope *tMPI_Post_send(struct tmpi_thread *cur,
1078 struct tmpi_thread *dest,
1079 void *send_buf, int send_count,
1080 tMPI_Datatype datatype, int tag,
1083 struct tmpi_thread *src = cur;
1084 struct envelope *sev;
1085 int src_threadnr = tMPI_Threadnr(src);
1086 struct send_envelope_list *sevl = &(dest->evs[src_threadnr]);
1088 /* reserve an envelope to post */
1089 sev = tMPI_Prep_send_envelope(sevl, comm, src, dest, send_buf, send_count,
1090 datatype, tag, nonblock);
1097 printf("%5d: tMPI_Post_send (%d->%d, tag=%d)\n",
1098 tMPI_This_threadnr(),
1099 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
1103 /* we post the envelope in the right list */
1104 tMPI_Send_env_list_add_new(cur, &(dest->evs[src_threadnr]), sev);
1112 void tMPI_Wait_process_incoming(struct tmpi_thread *cur)
1119 #if defined(TMPI_PROFILE)
1120 tMPI_Profile_wait_start(cur);
1122 /* we check for newly arrived send envelopes and finished send
1124 check_id = tMPI_Event_wait( &(cur->p2p_event));
1125 /* the outgoing_received items are handled 'automatically'
1126 by the function calling this function */
1127 #if defined(TMPI_PROFILE)
1128 tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p);
1130 n_handled = tMPI_Atomic_get(&(cur->ev_outgoing_received));
1131 tMPI_Atomic_fetch_add( &(cur->ev_outgoing_received), -n_handled);
1132 check_id -= n_handled;
1136 /*int repl=check_id;*/
1138 /* there were new send envelopes. Let's check them all */
1139 for (i = 0; i < Nthreads; i++)
1141 struct envelope *sev_head;
1143 #ifdef TMPI_LOCK_FREE_LISTS
1144 /* Behold our lock-free shared linked list:
1145 (see tMPI_Send_env_list_add_new for more info) */
1148 /* read old head atomically */
1149 sev_head = (struct envelope*)
1150 tMPI_Atomic_ptr_get( &(cur->evs[i].head_new) );
1151 /* do the compare-and-swap to detach the list */
1153 while (!tMPI_Atomic_ptr_cas(&(cur->evs[i].head_new), sev_head,
1156 tMPI_Spinlock_lock( &(cur->evs[i].lock_new) );
1157 sev_head = (struct send_envelope*)cur->evs[i].head_new;
1158 cur->evs[i].head_new = NULL; /* detach the list */
1159 tMPI_Spinlock_unlock( &(cur->evs[i].lock_new) );
1162 if (sev_head) /* there's a newly arrived send envelope from this
1165 struct envelope *sev = sev_head;
1166 struct envelope *prev_s = NULL;
1167 struct envelope *rev;
1169 /* first enable reversing order by creating a regular
1170 doubly-linked list from the singly-linked shared
1178 /* now walk through it backwards (in order of addition) */
1182 struct envelope *sevp = sev->prev;
1184 rev = tMPI_Recv_env_list_search_new(&(cur->evr), sev);
1187 tMPI_Xfer(cur, sev, rev);
1191 tMPI_Send_env_list_move_to_old( sev );
1198 tMPI_Event_process( &(cur->p2p_event), n_handled);
1201 tmpi_bool tMPI_Test_single(struct tmpi_thread *cur, struct tmpi_req_ *rq)
1203 struct envelope *ev = rq->ev;
1205 if (ev && !(rq->finished) )
1207 #ifdef USE_SEND_RECV_COPY_BUFFER
1208 if (ev->send && ev->using_cb)
1210 /* We buffer-copy. Just do the transfer to the buffer and
1211 return saying that we're done. It's now up to the
1212 receiver to return our envelope.*/
1213 /* do our transfer and are guaranteed a finished
1215 tMPI_Send_copy_buffer(ev, rq);
1216 /* get the results */
1217 rq->error = rq->ev->error;
1218 rq->finished = TRUE;
1223 if (tMPI_Atomic_get( &(ev->state) ) >= env_finished)
1225 rq->finished = TRUE;
1226 /* get the results */
1227 rq->error = rq->ev->error;
1228 tMPI_Set_req(ev, rq);
1229 /* and release the envelope. After this point, the envelope
1230 may be reused, so its contents shouldn't be relied on. */
1233 tMPI_Send_env_list_return(ev);
1237 tMPI_Free_env_list_return_recv( &(cur->envelopes), ev);
1242 return rq->finished;
1245 void tMPI_Wait_single(struct tmpi_thread *cur, struct tmpi_req_ *rq)
1249 if (tMPI_Test_single(cur, rq))
1253 tMPI_Wait_process_incoming(cur);
1258 tmpi_bool tMPI_Test_multi(struct tmpi_thread *cur, struct tmpi_req_ *rqs,
1259 tmpi_bool *any_done)
1261 tmpi_bool all_done = TRUE;
1262 struct tmpi_req_ *creq = rqs;
1272 tmpi_bool finished = tMPI_Test_single(cur, creq);
1275 /* now do the check */
1282 /* remove the request from the list we've been given. */
1285 creq->prev->next = creq->next;
1289 creq->next->prev = creq->prev;