2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
62 static struct envelope *tMPI_Free_env_list_fetch_recv(struct free_envelope_list
65 /* return an envelope to the free envelopes list */
66 static void tMPI_Free_env_list_return_recv(struct free_envelope_list *evl,
67 struct envelope *rev);
71 /* send envelope lists: */
73 /* get a new envelope from the send list's free envelope list */
74 static struct envelope* tMPI_Send_env_list_fetch_new(struct
75 send_envelope_list *evl);
77 /* return a send envelope to the send list's free envelope list,
78 (to be used by the sending thread, who owns the send_envelope_list) */
79 static void tMPI_Send_env_list_return(struct envelope *ev);
80 #ifdef USE_SEND_RECV_COPY_BUFFER
81 /* return a send envelope to the sender's send list.
82 (to be used by the receiving thread). */
83 static void tMPI_Send_env_list_rts(struct envelope *sev);
90 /* remove a send envelope from its head_old list. Does not lock */
91 static void tMPI_Send_env_list_remove_old(struct envelope *sev);
93 /* add a send envelope to the new envelopes queue in a list */
94 static void tMPI_Send_env_list_add_new(struct tmpi_thread *cur,
95 struct send_envelope_list *evl,
96 struct envelope *sev);
97 /* move a send envelope to the old envelopes queue in a list.
98 Assumes that this is safe to do without interference
99 from other threads, i.e. the list it's in must have been
101 static void tMPI_Send_env_list_move_to_old(struct envelope *sev);
106 /* receive envelopes: */
107 /* add a receive envelope to a list */
108 static void tMPI_Recv_env_list_add(struct recv_envelope_list *evl,
109 struct envelope *ev);
110 /* remove a receive envelope from its list */
111 static void tMPI_Recv_env_list_remove(struct envelope *ev);
116 /* do the actual point-to-point transfer */
117 static void tMPI_Xfer(struct tmpi_thread *cur, struct envelope *sev,
118 struct envelope *rev);
123 /* Point-to-point communication protocol functions */
124 int tMPI_Free_env_list_init(struct free_envelope_list *evl, int N)
128 /* allocate the head element */
129 evl->recv_alloc_head = (struct envelope*)tMPI_Malloc(sizeof(struct envelope)
131 if (evl->recv_alloc_head == NULL)
133 return TMPI_ERR_NO_MEM;
135 evl->head_recv = evl->recv_alloc_head;
137 for (i = 0; i < N; i++)
141 evl->head_recv[i].next = &(evl->head_recv[i+1]);
145 evl->head_recv[i].next = NULL;
147 evl->head_recv[i].rlist = NULL;
148 evl->head_recv[i].slist = NULL;
153 void tMPI_Free_env_list_destroy(struct free_envelope_list *evl)
155 free(evl->recv_alloc_head);
156 evl->head_recv = NULL;
157 evl->recv_alloc_head = NULL;
160 static struct envelope* tMPI_Free_env_list_fetch_recv(struct
161 free_envelope_list *evl)
163 struct envelope *ret;
166 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_ENVELOPES);
170 ret = evl->head_recv;
171 evl->head_recv = ret->next;
179 static void tMPI_Free_env_list_return_recv(struct free_envelope_list *evl,
180 struct envelope *rev)
185 rev->next = evl->head_recv;
186 evl->head_recv = rev;
189 /* tmpi_send_envelope_list functions */
191 int tMPI_Send_env_list_init(struct send_envelope_list *evl, int N)
194 #ifndef TMPI_LOCK_FREE_LISTS
195 tMPI_Spinlock_init( &(evl->lock_rts) );
196 tMPI_Spinlock_init( &(evl->lock_new) );
200 evl->alloc_head = (struct envelope*)tMPI_Malloc(sizeof(struct envelope)*N);
201 if (evl->alloc_head == NULL)
203 return TMPI_ERR_NO_MEM;
205 for (i = 0; i < N; i++)
207 evl->alloc_head[i].next = (i < (N-1)) ? &(evl->alloc_head[i+1]) : NULL;
208 evl->alloc_head[i].prev = NULL;
209 evl->alloc_head[i].slist = evl;
210 evl->alloc_head[i].rlist = NULL;
211 #ifdef USE_SEND_RECV_COPY_BUFFER
212 evl->alloc_head[i].cb = (void*)tMPI_Malloc(sizeof(char)*
214 if (evl->alloc_head[i].cb == NULL)
216 return TMPI_ERR_NO_MEM;
221 #ifdef TMPI_LOCK_FREE_LISTS
222 tMPI_Atomic_ptr_set(&(evl->head_new), NULL);
223 tMPI_Atomic_ptr_set(&(evl->head_rts), NULL);
225 evl->head_new = NULL;
226 evl->head_rts = NULL;
228 evl->head_free = &(evl->alloc_head[1]);
229 /* initialize the head_old circular list with dummy element */
230 evl->head_old = evl->alloc_head; /* the first element is a dummy */
231 evl->head_old->next = evl->head_old;
232 evl->head_old->prev = evl->head_old;
236 void tMPI_Send_env_list_destroy(struct send_envelope_list *evl)
238 #ifdef USE_SEND_RECV_COPY_BUFFER
240 for (i = 0; i < evl->Nalloc; i++)
242 free(evl->alloc_head[i].cb);
245 free(evl->alloc_head);
246 evl->alloc_head = NULL;
247 #ifdef TMPI_LOCK_FREE_LISTS
248 tMPI_Atomic_ptr_set(&(evl->head_new), NULL);
250 evl->head_new = NULL;
252 evl->head_old = NULL; /* make it crash if used after tMPI_Finalize */
256 static struct envelope* tMPI_Send_env_list_fetch_new(struct
257 send_envelope_list *evl)
259 struct envelope *ret;
263 /* first check whether any envelopes were returned to sender */
264 #ifdef TMPI_LOCK_FREE_LISTS
265 if ((ret = (struct envelope*)tMPI_Atomic_ptr_get(&(evl->head_rts))))
270 /* detach the list */
271 #ifdef TMPI_LOCK_FREE_LISTS
272 /* we detach by swapping what we expect the pointer value to be,
273 with NULL. If there were a cross-platform way to atomically
274 swap without checking, we could do that, too. */
275 while (!tMPI_Atomic_ptr_cas( &(evl->head_rts), ret, NULL ))
277 ret = (struct envelope*)tMPI_Atomic_ptr_get(&(evl->head_rts));
280 tMPI_Spinlock_lock( &(evl->lock_rts) );
282 evl->head_rts = NULL;
283 tMPI_Spinlock_unlock( &(evl->lock_rts) );
285 /* now add the items to head_free */
288 struct envelope *next = ret->next;
289 ret->next = evl->head_free;
290 evl->head_free = ret;
295 /* get the last free one off the list */
296 ret = evl->head_free;
298 #ifdef USE_SEND_RECV_COPY_BUFFER
300 /* There are no free send envelopes, so all we can do is handle
301 incoming requests until we get a free send envelope. */
302 #if defined(TMPI_DEBUG) || defined(TMPI_WARNINGS)
303 printf("Ran out of send envelopes!!\n");
306 tMPI_Wait_process_incoming(tMPI_Get_current());
310 /* If this happens, it most likely indicates a bug in the
311 calling program. We could fix the situation by waiting,
312 but that would most likely lead to deadlocks - even
313 more difficult to debug than this. */
314 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_ENVELOPES);
321 evl->head_free = ret->next;
332 static void tMPI_Send_env_list_return(struct envelope *sev)
334 struct send_envelope_list *evl = sev->slist;
336 sev->next = evl->head_free;
337 evl->head_free = sev;
341 #ifdef USE_SEND_RECV_COPY_BUFFER
342 static void tMPI_Send_env_list_rts(struct envelope *sev)
344 struct send_envelope_list *evl = sev->slist;
345 #ifdef TMPI_LOCK_FREE_LISTS
346 struct envelope *sevn;
350 sevn = (struct envelope*)tMPI_Atomic_ptr_get(&evl->head_rts);
352 /* the cmpxchg operation is a memory fence, so we shouldn't need
353 to worry about out-of-order evaluation */
355 while (!tMPI_Atomic_ptr_cas( &(evl->head_rts), sevn, sev ));
357 tMPI_Spinlock_lock( &(evl->lock_rts) );
358 ev->next = (struct envelope*)evl->head_rts;
360 tMPI_Spinlock_unlock( &(evl->lock_rts) );
367 static void tMPI_Send_env_list_remove_old(struct envelope *sev)
369 /* pretty straighforward because it isn't a shared list */
372 sev->next->prev = sev->prev;
376 sev->prev->next = sev->next;
383 static void tMPI_Send_env_list_add_new(struct tmpi_thread tmpi_unused *cur,
384 struct send_envelope_list *evl,
385 struct envelope *sev)
387 #ifdef TMPI_LOCK_FREE_LISTS
388 struct envelope *evl_head_new_orig;
392 #ifdef TMPI_LOCK_FREE_LISTS
393 /* behold our lock-free shared linked list:
394 (it's actually quite simple because we only do operations at the head
395 of the list, either adding them - such as here - or detaching the whole
399 /* read the old head atomically */
400 evl_head_new_orig = (struct envelope*) tMPI_Atomic_ptr_get(
402 /* set our envelope to have that as its next */
403 sev->next = evl_head_new_orig;
404 /* do the compare-and-swap.
405 this operation is a memory fence, so we shouldn't need
406 to worry about out-of-order stores. If it returns false,
407 somebody else got there before us: */
409 while (!tMPI_Atomic_ptr_cas(&(evl->head_new), evl_head_new_orig, sev));
412 tMPI_Spinlock_lock( &(evl->lock_new) );
413 /* we add to the start of the list */
414 sev->next = (struct send_envelope*)evl->head_new;
415 /* actually attach it to the list */
417 tMPI_Spinlock_unlock( &(evl->lock_new) );
420 #if defined(TMPI_PROFILE)
421 tMPI_Profile_wait_start(cur);
423 /* signal to the thread that there is a new envelope */
424 tMPI_Event_signal( &(sev->dest->p2p_event) );
425 #if defined(TMPI_PROFILE)
426 tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p_signal);
430 static void tMPI_Send_env_list_move_to_old(struct envelope *sev)
432 struct send_envelope_list *evl = sev->slist;
434 /* remove from old list. We assume the list has been detached! */
437 sev->next->prev = sev->prev;
441 sev->prev->next = sev->next;
444 /* we add to the end of the list */
445 sev->next = evl->head_old;
446 sev->prev = evl->head_old->prev;
448 sev->next->prev = sev;
449 sev->prev->next = sev;
452 /* tmpi_recv_envelope_list functions */
454 int tMPI_Recv_env_list_init(struct recv_envelope_list *evl)
456 evl->head = &(evl->dummy);
457 evl->head->prev = evl->head;
458 evl->head->next = evl->head;
463 void tMPI_Recv_env_list_destroy(struct recv_envelope_list *evl)
468 static void tMPI_Recv_env_list_add(struct recv_envelope_list *evl,
469 struct envelope *rev)
472 /* we add to the end of the list */
473 rev->next = evl->head;
474 rev->prev = evl->head->prev;
476 rev->next->prev = rev;
477 rev->prev->next = rev;
480 static void tMPI_Recv_env_list_remove(struct envelope *rev)
484 rev->next->prev = rev->prev;
488 rev->prev->next = rev->next;
495 /* tmpi_req functions */
497 int tMPI_Req_list_init(struct req_list *rl, int N_reqs)
501 rl->alloc_head = (struct tmpi_req_*)tMPI_Malloc(
502 sizeof(struct tmpi_req_)*N_reqs);
503 if (rl->alloc_head == 0)
505 return TMPI_ERR_NO_MEM;
507 rl->head = rl->alloc_head;
508 for (i = 0; i < N_reqs; i++)
512 rl->head[i].prev = NULL;
516 rl->head[i].prev = &(rl->head[i-1]);
521 rl->head[i].next = NULL;
525 rl->head[i].next = &(rl->head[i+1]);
531 void tMPI_Req_list_destroy(struct req_list *rl)
533 free(rl->alloc_head);
535 rl->alloc_head = NULL;
540 struct tmpi_req_ *tMPI_Get_req(struct req_list *rl)
542 struct tmpi_req_ *req = rl->head;
545 /* we don't need locks here because requests are a per-thread property */
548 /* this could be fixed */
549 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_REQUESTS);
552 rl->head = req->next;
558 void tMPI_Return_req(struct req_list *rl, struct tmpi_req_ *req)
560 req->next = rl->head;
565 void tMPI_Req_init(struct tmpi_req_ *rq, struct envelope *ev)
568 rq->finished = FALSE;
572 rq->source = ev->src;
574 rq->tag = TMPI_ANY_TAG;
575 rq->error = TMPI_SUCCESS;
577 rq->cancelled = FALSE;
580 /* Point-to-point communication protocol functions */
581 void tMPI_Set_req(struct envelope *ev, struct tmpi_req_ *req)
583 req->source = ev->src;
584 req->comm = ev->comm;
586 req->error = ev->error;
589 if (tMPI_Atomic_get(&(ev->state)) > env_unmatched)
591 req->transferred = ev->bufsize;
595 req->transferred = 0;
600 if (tMPI_Atomic_get(&(ev->state)) == env_finished)
602 req->transferred = ev->bufsize;
606 req->transferred = 0;
611 void tMPI_Set_status(struct tmpi_req_ *req, tMPI_Status *st)
615 st->TMPI_SOURCE = tMPI_Comm_seek_rank(req->comm, req->source);
616 st->TMPI_TAG = req->tag;
617 st->TMPI_ERROR = req->error;
618 st->transferred = req->transferred;
619 st->cancelled = req->cancelled;
623 tmpi_bool tMPI_Envelope_matches(const struct envelope *sev,
624 const struct envelope *rev)
627 printf("%5d: tMPI_Envelope_matches (%d->%d)==(%d->%d), tag=(%d==%d), \n datatype=(%ld==%ld), comm=(%ld,%ld),\n finished=(%d==%d)\n",
628 tMPI_This_threadnr(),
629 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
630 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
631 (int)(sev->tag), (int)(rev->tag),
632 (long int)sev->datatype, (long int)rev->datatype,
633 (long int)sev->comm, (long int)rev->comm,
634 (int)sev->state.value, (int)rev->state.value);
637 if ( ( (rev->tag == TMPI_ANY_TAG) || (rev->tag == sev->tag) ) &&
638 ( sev->comm == rev->comm ) &&
639 ( (!rev->src) || (rev->src == sev->src) ) &&
640 ( sev->dest == rev->dest ) &&
641 ( sev->datatype == rev->datatype ) &&
642 ( tMPI_Atomic_get(&(sev->state)) < env_finished &&
643 tMPI_Atomic_get(&(rev->state)) == env_unmatched ) )
646 printf("%5d: (%d->%d) tag=%d found match\n",
647 tMPI_This_threadnr(),
648 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
657 struct envelope* tMPI_Send_env_list_search_old(struct send_envelope_list *evl,
658 struct envelope *rev)
660 struct envelope *sev;
662 sev = (struct envelope*)evl->head_old->next;
663 while (sev != evl->head_old)
665 if (tMPI_Envelope_matches(sev, rev))
667 /* remove the envelope */
668 tMPI_Send_env_list_remove_old(sev);
676 struct envelope* tMPI_Recv_env_list_search_new(struct recv_envelope_list *evl,
677 struct envelope *sev)
679 struct envelope *rev;
681 rev = evl->head->next;
682 while (rev != evl->head)
684 if (tMPI_Envelope_matches(sev, rev))
693 #ifdef USE_SEND_RECV_COPY_BUFFER
694 void tMPI_Send_copy_buffer(struct envelope *sev, struct tmpi_req_ *req)
697 /* Fill copy buffer, after having anounced its possible use */
699 /* in the special case of a zero buffer size, we don't do anything and
700 always let the receiver handle it */
701 if (sev->bufsize == 0)
706 /* first check whether the other side hasn't started yet */
707 state = tMPI_Atomic_get( &(sev->state) );
708 tMPI_Atomic_memory_barrier_acq();
709 if (state == env_unmatched)
712 memcpy(sev->cb, sev->buf, sev->bufsize);
713 /* now set state, if other side hasn't started copying yet. */
714 tMPI_Atomic_memory_barrier_rel();
715 if (tMPI_Atomic_cas( &(sev->state), env_unmatched, env_cb_available))
717 /* if it was originally unmatched, the receiver wasn't
718 copying the old buffer. We can don't need to wait,
719 and the receiver is going to clean up this envelope. */
721 printf("%5d: tMPI_Send_copy_buffer(%d->%d, tag=%d) completed\n",
722 tMPI_This_threadnr(),
723 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
730 /* and if we reached this point, the receiver had already started
731 copying, and we need to clean up the envelope ourselves.
733 we first need to wait until the receiver is finished copying. We
734 know this is a short wait (since the buffer was small enough to be
735 buffered in the first place), so we just spin-wait. */
736 tMPI_Atomic_memory_barrier(); /* a full barrier to make sure that the
737 sending doesn't interfere with the
739 while (tMPI_Atomic_get( &(sev->state) ) < env_cb_available)
741 tMPI_Atomic_memory_barrier_acq();
743 tMPI_Atomic_memory_barrier_acq();
745 printf("%5d: tMPI_Send_copy_buffer(%d->%d, tag=%d) waiting-completed\n",
746 tMPI_This_threadnr(),
747 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest), (int)(sev->tag));
750 tMPI_Set_req(sev, req);
751 /* and now we clean up */
752 tMPI_Send_env_list_return(sev);
756 struct envelope* tMPI_Prep_send_envelope(struct send_envelope_list *evl,
758 struct tmpi_thread *src,
759 struct tmpi_thread *dest,
760 void *buf, int count,
761 tMPI_Datatype datatype,
762 int tag, tmpi_bool nonblock)
764 /* get an envelope from the send-envelope stack */
765 struct envelope *ev = tMPI_Send_env_list_fetch_new( evl );
772 ev->nonblock = nonblock;
780 ev->bufsize = count*datatype->size;
781 ev->datatype = datatype;
787 #ifdef USE_SEND_RECV_COPY_BUFFER
788 /* check whether we'll be double buffering */
789 ev->using_cb = (ev->bufsize < COPY_BUFFER_SIZE);
790 /* but don't do anything yet */
793 tMPI_Atomic_set(&(ev->state), env_unmatched);
795 ev->error = TMPI_SUCCESS;
798 tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
799 ev->error = TMPI_ERR_XFER_BUFSIZE;
805 struct envelope* tMPI_Prep_recv_envelope(struct tmpi_thread *cur,
807 struct tmpi_thread *src,
808 struct tmpi_thread *dest,
809 void *buf, int count,
810 tMPI_Datatype datatype, int tag,
813 /* get an envelope from the stack */
814 struct envelope *ev = tMPI_Free_env_list_fetch_recv( &(cur->envelopes) );
821 ev->nonblock = nonblock;
829 ev->bufsize = count*datatype->size;
830 ev->datatype = datatype;
837 tMPI_Atomic_set(&(ev->state), env_unmatched);
839 ev->error = TMPI_SUCCESS;
842 tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
843 ev->error = TMPI_ERR_XFER_BUFSIZE;
849 static void tMPI_Xfer(struct tmpi_thread tmpi_unused *cur, struct envelope *sev,
850 struct envelope *rev)
852 #ifdef USE_SEND_RECV_COPY_BUFFER
853 /* we remove the sender's envelope only if we do the transfer, which
854 we always do if the buffer size = 0 */
855 tmpi_bool remove_sender = (sev->bufsize == 0);
858 printf("%5d: tMPI_Xfer (%d->%d, tag=%d) started\n",
859 tMPI_This_threadnr(),
860 tMPI_Threadnr(sev->src), tMPI_Threadnr(rev->dest), (int)(sev->tag));
863 /* first set data on the receiving end so status can be updated */
867 if (sev->bufsize) /* do the actual transfer */
869 void *sbuf = sev->buf; /* source buffer */
870 if (sev->bufsize > rev->bufsize)
872 tMPI_Error((rev->comm), TMPI_ERR_XFER_BUFSIZE);
873 tMPI_Atomic_set(&(rev->state), env_finished);
874 tMPI_Atomic_set(&(sev->state), env_finished);
875 rev->error = TMPI_ERR_XFER_BUFSIZE;
876 sev->error = TMPI_ERR_XFER_BUFSIZE;
880 #ifdef USE_SEND_RECV_COPY_BUFFER
883 /* check if the other side has already finished copying */
884 if (!tMPI_Atomic_cas( &(sev->state), env_unmatched, env_copying))
886 /* it has, and we're copying from the new buffer.
887 We're now also tasked with removing the envelope */
889 remove_sender = TRUE;
891 tMPI_Profile_count_buffered_p2p_xfer(cur);
897 if (!rev->buf || !sev->buf)
899 tMPI_Error((rev->comm), TMPI_ERR_BUF);
900 tMPI_Atomic_set(&(rev->state), env_finished);
901 tMPI_Atomic_set(&(sev->state), env_finished);
902 rev->error = TMPI_ERR_BUF;
903 sev->error = TMPI_ERR_BUF;
906 memcpy(rev->buf, sbuf, sev->bufsize);
908 tMPI_Profile_count_p2p_xfer(cur);
910 /* for status update */
912 rev->bufsize = sev->bufsize;
913 /* and mark that we're finished */
914 #if defined(TMPI_PROFILE)
916 tMPI_Profile_wait_start(cur);
918 tMPI_Atomic_set( &(rev->state), env_finished);
919 tMPI_Atomic_set( &(sev->state), env_finished);
921 /* signal to a potentially waiting thread that we're done. */
922 tMPI_Atomic_fetch_add( &(rev->src->ev_outgoing_received), 1);
923 tMPI_Event_signal(&(rev->src->p2p_event));
925 /* remove the receiving envelope if it's in a list */
926 tMPI_Recv_env_list_remove(rev);
927 #ifdef USE_SEND_RECV_COPY_BUFFER
930 tMPI_Send_env_list_rts(sev);
933 #if defined(TMPI_PROFILE)
934 tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p_signal);
940 printf("%5d: tMPI_Xfer (%d->%d, tag=%d) done\n",
941 tMPI_This_threadnr(),
942 tMPI_Threadnr(sev->src), tMPI_Threadnr(rev->dest), (int)(sev->tag));
948 struct envelope* tMPI_Post_match_recv(struct tmpi_thread *cur,
950 struct tmpi_thread *src,
951 void *recv_buf, int recv_count,
952 tMPI_Datatype datatype,
953 int tag, tmpi_bool nonblock)
955 struct tmpi_thread *dest = cur;
956 struct envelope *rev;
957 struct envelope *sev = NULL;
958 int src_threadnr = src ? tMPI_Threadnr(src) : Nthreads;
961 /* reserve an envelope to post */
962 rev = tMPI_Prep_recv_envelope(cur, comm, src, dest, recv_buf, recv_count,
963 datatype, tag, nonblock);
970 printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) started\n",
971 tMPI_This_threadnr(),
972 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest), (int)(rev->tag));
975 /* we now check the entire exisiting send queue */
978 sev = tMPI_Send_env_list_search_old( &(dest->evs[src_threadnr]), rev);
982 /* if we don't know the source, we look at all possible sources */
983 for (i = 0; i < Nthreads; i++)
985 sev = tMPI_Send_env_list_search_old(&(dest->evs[i]), rev);
996 printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) found match\n",
997 tMPI_This_threadnr(),
998 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
1002 /* we found a matching send */
1003 tMPI_Xfer(cur, sev, rev);
1008 printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) no match\n",
1009 tMPI_This_threadnr(),
1010 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
1014 /* we post the envelope in the right list */
1015 tMPI_Recv_env_list_add( &(dest->evr), rev);
1020 struct envelope *tMPI_Post_send(struct tmpi_thread *cur,
1022 struct tmpi_thread *dest,
1023 void *send_buf, int send_count,
1024 tMPI_Datatype datatype, int tag,
1027 struct tmpi_thread *src = cur;
1028 struct envelope *sev;
1029 int src_threadnr = tMPI_Threadnr(src);
1030 struct send_envelope_list *sevl = &(dest->evs[src_threadnr]);
1032 /* reserve an envelope to post */
1033 sev = tMPI_Prep_send_envelope(sevl, comm, src, dest, send_buf, send_count,
1034 datatype, tag, nonblock);
1041 printf("%5d: tMPI_Post_send (%d->%d, tag=%d)\n",
1042 tMPI_This_threadnr(),
1043 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
1047 /* we post the envelope in the right list */
1048 tMPI_Send_env_list_add_new(cur, &(dest->evs[src_threadnr]), sev);
1053 void tMPI_Wait_process_incoming(struct tmpi_thread *cur)
1060 #if defined(TMPI_PROFILE)
1061 tMPI_Profile_wait_start(cur);
1063 /* we check for newly arrived send envelopes and finished send
1065 check_id = tMPI_Event_wait( &(cur->p2p_event));
1066 /* the outgoing_received items are handled 'automatically'
1067 by the function calling this function */
1068 #if defined(TMPI_PROFILE)
1069 tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p);
1071 n_handled = tMPI_Atomic_get(&(cur->ev_outgoing_received));
1072 tMPI_Atomic_fetch_add( &(cur->ev_outgoing_received), -n_handled);
1073 check_id -= n_handled;
1077 /*int repl=check_id;*/
1079 /* there were new send envelopes. Let's check them all */
1080 for (i = 0; i < Nthreads; i++)
1082 struct envelope *sev_head;
1084 #ifdef TMPI_LOCK_FREE_LISTS
1085 /* Behold our lock-free shared linked list:
1086 (see tMPI_Send_env_list_add_new for more info) */
1089 /* read old head atomically */
1090 sev_head = (struct envelope*)
1091 tMPI_Atomic_ptr_get( &(cur->evs[i].head_new) );
1092 /* do the compare-and-swap to detach the list */
1094 while (!tMPI_Atomic_ptr_cas(&(cur->evs[i].head_new), sev_head,
1097 tMPI_Spinlock_lock( &(cur->evs[i].lock_new) );
1098 sev_head = (struct send_envelope*)cur->evs[i].head_new;
1099 cur->evs[i].head_new = NULL; /* detach the list */
1100 tMPI_Spinlock_unlock( &(cur->evs[i].lock_new) );
1103 if (sev_head) /* there's a newly arrived send envelope from this
1106 struct envelope *sev = sev_head;
1107 struct envelope *prev_s = NULL;
1108 struct envelope *rev;
1110 /* first enable reversing order by creating a regular
1111 doubly-linked list from the singly-linked shared
1119 /* now walk through it backwards (in order of addition) */
1123 struct envelope *sevp = sev->prev;
1125 rev = tMPI_Recv_env_list_search_new(&(cur->evr), sev);
1128 tMPI_Xfer(cur, sev, rev);
1132 tMPI_Send_env_list_move_to_old( sev );
1139 tMPI_Event_process( &(cur->p2p_event), n_handled);
1142 tmpi_bool tMPI_Test_single(struct tmpi_thread *cur, struct tmpi_req_ *rq)
1144 struct envelope *ev = rq->ev;
1146 if (ev && !(rq->finished) )
1148 #ifdef USE_SEND_RECV_COPY_BUFFER
1149 if (ev->send && ev->using_cb)
1151 /* We buffer-copy. Just do the transfer to the buffer and
1152 return saying that we're done. It's now up to the
1153 receiver to return our envelope.*/
1154 /* do our transfer and are guaranteed a finished
1156 tMPI_Send_copy_buffer(ev, rq);
1157 /* get the results */
1158 rq->error = rq->ev->error;
1159 rq->finished = TRUE;
1164 if (tMPI_Atomic_get( &(ev->state) ) >= env_finished)
1166 rq->finished = TRUE;
1167 /* get the results */
1168 rq->error = rq->ev->error;
1169 tMPI_Set_req(ev, rq);
1170 /* and release the envelope. After this point, the envelope
1171 may be reused, so its contents shouldn't be relied on. */
1174 tMPI_Send_env_list_return(ev);
1178 tMPI_Free_env_list_return_recv( &(cur->envelopes), ev);
1183 return rq->finished;
1186 void tMPI_Wait_single(struct tmpi_thread *cur, struct tmpi_req_ *rq)
1190 if (tMPI_Test_single(cur, rq))
1194 tMPI_Wait_process_incoming(cur);
1199 tmpi_bool tMPI_Test_multi(struct tmpi_thread *cur, struct tmpi_req_ *rqs,
1200 tmpi_bool *any_done)
1202 tmpi_bool all_done = TRUE;
1203 struct tmpi_req_ *creq = rqs;
1213 tmpi_bool finished = tMPI_Test_single(cur, creq);
1216 /* now do the check */
1223 /* remove the request from the list we've been given. */
1226 creq->prev->next = creq->next;
1230 creq->next->prev = creq->prev;