Fix use of inline in IMD
[alexxy/gromacs.git] / src / external / thread_mpi / src / p2p_protocol.c
1 /*
2    This source code file is part of thread_mpi.
3    Written by Sander Pronk, Erik Lindahl, and possibly others.
4
5    Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6    All rights reserved.
7
8    Redistribution and use in source and binary forms, with or without
9    modification, are permitted provided that the following conditions are met:
10    1) Redistributions of source code must retain the above copyright
11    notice, this list of conditions and the following disclaimer.
12    2) Redistributions in binary form must reproduce the above copyright
13    notice, this list of conditions and the following disclaimer in the
14    documentation and/or other materials provided with the distribution.
15    3) Neither the name of the copyright holders nor the
16    names of its contributors may be used to endorse or promote products
17    derived from this software without specific prior written permission.
18
19    THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20    EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21    WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22    DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23    DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24    (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26    ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28    SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30    If you want to redistribute modifications, please consider that
31    scientific software is very special. Version control is crucial -
32    bugs must be traceable. We will be happy to consider code for
33    inclusion in the official distribution, but derived work should not
34    be called official thread_mpi. Details are found in the README & COPYING
35    files.
36  */
37
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
40 #endif
41
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
45
46
47 #ifdef HAVE_UNISTD_H
48 #include <unistd.h>
49 #endif
50
51 #include <errno.h>
52 #include <stdlib.h>
53 #include <stdio.h>
54 #include <stdarg.h>
55 #include <string.h>
56
57 #include "impl.h"
58 #include "p2p.h"
59 #include "unused.h"
60
61 /* free envelopes: */
62 static struct envelope *tMPI_Free_env_list_fetch_recv(struct free_envelope_list
63                                                       *evl);
64
65 /* return an envelope to the free envelopes list */
66 static void tMPI_Free_env_list_return_recv(struct free_envelope_list *evl,
67                                            struct envelope           *rev);
68
69
70
71 /* send envelope lists: */
72 /* send envelopes: */
73 /* get a new envelope from the send list's free envelope list */
74 static struct envelope* tMPI_Send_env_list_fetch_new(struct
75                                                      send_envelope_list *evl);
76
77 /* return a send envelope to the send list's free envelope list,
78    (to be used by the sending thread, who owns the send_envelope_list) */
79 static void tMPI_Send_env_list_return(struct envelope *ev);
80 #ifdef USE_SEND_RECV_COPY_BUFFER
81 /* return a send envelope to the sender's send list.
82    (to be used by the receiving thread). */
83 static void tMPI_Send_env_list_rts(struct envelope *sev);
84 #endif
85
86
87
88
89 /* send envelopes: */
90 /* remove a send envelope from its head_old list. Does not lock */
91 static void tMPI_Send_env_list_remove_old(struct envelope *sev);
92
93 /* add a send envelope to the new envelopes queue in a list */
94 static void tMPI_Send_env_list_add_new(struct tmpi_thread        *cur,
95                                        struct send_envelope_list *evl,
96                                        struct envelope           *sev);
97 /* move a send envelope to the old envelopes queue in a list.
98    Assumes that this is safe to do without interference
99    from other threads, i.e. the list it's in must have been
100    detached. */
101 static void tMPI_Send_env_list_move_to_old(struct envelope *sev);
102
103
104
105
106 /* receive envelopes: */
107 /* add a receive envelope to a list */
108 static void tMPI_Recv_env_list_add(struct recv_envelope_list *evl,
109                                    struct envelope           *ev);
110 /* remove a receive envelope from its list */
111 static void tMPI_Recv_env_list_remove(struct envelope *ev);
112
113
114
115
116 /* do the actual point-to-point transfer */
117 static void tMPI_Xfer(struct tmpi_thread *cur, struct envelope *sev,
118                       struct envelope *rev);
119
120
121
122
123 /* Point-to-point communication protocol functions */
124 int tMPI_Free_env_list_init(struct free_envelope_list *evl, int N)
125 {
126     int i;
127
128     /* allocate the head element */
129     evl->recv_alloc_head = (struct envelope*)tMPI_Malloc(sizeof(struct envelope)
130                                                          *N);
131     if (evl->recv_alloc_head == NULL)
132     {
133         return TMPI_ERR_NO_MEM;
134     }
135     evl->head_recv = evl->recv_alloc_head;
136
137     for (i = 0; i < N; i++)
138     {
139         if (i < N-1)
140         {
141             evl->head_recv[i].next = &(evl->head_recv[i+1]);
142         }
143         else
144         {
145             evl->head_recv[i].next = NULL;
146         }
147         evl->head_recv[i].rlist = NULL;
148         evl->head_recv[i].slist = NULL;
149     }
150     return TMPI_SUCCESS;
151 }
152
153 void tMPI_Free_env_list_destroy(struct free_envelope_list *evl)
154 {
155     free(evl->recv_alloc_head);
156     evl->head_recv       = NULL;
157     evl->recv_alloc_head = NULL;
158 }
159
160 static struct envelope* tMPI_Free_env_list_fetch_recv(struct
161                                                       free_envelope_list *evl)
162 {
163     struct envelope *ret;
164     if (!evl->head_recv)
165     {
166         tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_ENVELOPES);
167         return NULL;
168     }
169
170     ret            = evl->head_recv;
171     evl->head_recv = ret->next;
172     ret->next      = NULL;
173     ret->prev      = NULL;
174     /*evl->N--;*/
175
176     return ret;
177 }
178
179 static void tMPI_Free_env_list_return_recv(struct free_envelope_list *evl,
180                                            struct envelope           *rev)
181 {
182     rev->rlist     = NULL;
183     rev->slist     = NULL;
184     rev->prev      = NULL;
185     rev->next      = evl->head_recv;
186     evl->head_recv = rev;
187 }
188
189 /* tmpi_send_envelope_list functions */
190
191 int tMPI_Send_env_list_init(struct send_envelope_list *evl, int N)
192 {
193     int i;
194 #ifndef TMPI_LOCK_FREE_LISTS
195     tMPI_Spinlock_init( &(evl->lock_rts) );
196     tMPI_Spinlock_init( &(evl->lock_new) );
197 #endif
198     evl->Nalloc = N;
199
200     evl->alloc_head = (struct envelope*)tMPI_Malloc(sizeof(struct envelope)*N);
201     if (evl->alloc_head == NULL)
202     {
203         return TMPI_ERR_NO_MEM;
204     }
205     for (i = 0; i < N; i++)
206     {
207         evl->alloc_head[i].next  = (i < (N-1)) ? &(evl->alloc_head[i+1]) : NULL;
208         evl->alloc_head[i].prev  = NULL;
209         evl->alloc_head[i].slist = evl;
210         evl->alloc_head[i].rlist = NULL;
211 #ifdef USE_SEND_RECV_COPY_BUFFER
212         evl->alloc_head[i].cb = (void*)tMPI_Malloc(sizeof(char)*
213                                                    COPY_BUFFER_SIZE);
214         if (evl->alloc_head[i].cb == NULL)
215         {
216             return TMPI_ERR_NO_MEM;
217         }
218 #endif
219     }
220
221 #ifdef TMPI_LOCK_FREE_LISTS
222     tMPI_Atomic_ptr_set(&(evl->head_new), NULL);
223     tMPI_Atomic_ptr_set(&(evl->head_rts), NULL);
224 #else
225     evl->head_new = NULL;
226     evl->head_rts = NULL;
227 #endif
228     evl->head_free = &(evl->alloc_head[1]);
229     /* initialize the head_old circular list with dummy element */
230     evl->head_old       = evl->alloc_head; /* the first element is a dummy */
231     evl->head_old->next = evl->head_old;
232     evl->head_old->prev = evl->head_old;
233     return TMPI_SUCCESS;
234 }
235
236 void tMPI_Send_env_list_destroy(struct send_envelope_list *evl)
237 {
238 #ifdef USE_SEND_RECV_COPY_BUFFER
239     size_t i;
240     for (i = 0; i < evl->Nalloc; i++)
241     {
242         free(evl->alloc_head[i].cb);
243     }
244 #endif
245     free(evl->alloc_head);
246     evl->alloc_head = NULL;
247 #ifdef TMPI_LOCK_FREE_LISTS
248     tMPI_Atomic_ptr_set(&(evl->head_new), NULL);
249 #else
250     evl->head_new = NULL;
251 #endif
252     evl->head_old = NULL; /* make it crash if used after tMPI_Finalize */
253 }
254
255
256 static struct envelope* tMPI_Send_env_list_fetch_new(struct
257                                                      send_envelope_list *evl)
258 {
259     struct envelope *ret;
260
261     do
262     {
263         /* first check whether any envelopes were returned to sender */
264 #ifdef TMPI_LOCK_FREE_LISTS
265         if ((ret = (struct envelope*)tMPI_Atomic_ptr_get(&(evl->head_rts))))
266 #else
267         if (evl->head_rts)
268 #endif
269         {
270             /* detach the list */
271 #ifdef TMPI_LOCK_FREE_LISTS
272             /* we detach by swapping what we expect the pointer value to be,
273                with NULL. If there were a cross-platform way to atomically
274                swap  without checking, we could do that, too. */
275             while (!tMPI_Atomic_ptr_cas( &(evl->head_rts), ret, NULL ))
276             {
277                 ret = (struct envelope*)tMPI_Atomic_ptr_get(&(evl->head_rts));
278             }
279 #else
280             tMPI_Spinlock_lock( &(evl->lock_rts) );
281             ret           = evl->head_rts;
282             evl->head_rts = NULL;
283             tMPI_Spinlock_unlock( &(evl->lock_rts) );
284 #endif
285             /* now add the items to head_free */
286             while (ret)
287             {
288                 struct envelope *next = ret->next;
289                 ret->next      = evl->head_free;
290                 evl->head_free = ret;
291                 ret            = next;
292             }
293         }
294
295         /* get the last free one off the list */
296         ret = evl->head_free;
297         if (!ret)
298 #ifdef USE_SEND_RECV_COPY_BUFFER
299         {
300             /* There are no free send envelopes, so all we can do is handle
301                incoming requests until we get a free send envelope. */
302 #if defined(TMPI_DEBUG)  || defined(TMPI_WARNINGS)
303             printf("Ran out of send envelopes!!\n");
304             fflush(stdout);
305 #endif
306             tMPI_Wait_process_incoming(tMPI_Get_current());
307         }
308 #else
309         {
310             /* If this happens, it most likely indicates a bug in the
311                calling program. We could fix the situation by waiting,
312                but that would most likely lead to deadlocks - even
313                more difficult to debug than this. */
314             tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_ENVELOPES);
315             return NULL;
316         }
317 #endif
318     }
319     while (!ret);
320
321     evl->head_free = ret->next;
322
323     ret->next  = NULL;
324     ret->prev  = NULL;
325     ret->slist = evl;
326     ret->rlist = NULL;
327
328     /* and return it */
329     return ret;
330 }
331
332 static void tMPI_Send_env_list_return(struct envelope *sev)
333 {
334     struct send_envelope_list *evl = sev->slist;
335
336     sev->next      = evl->head_free;
337     evl->head_free = sev;
338 }
339
340
341 #ifdef USE_SEND_RECV_COPY_BUFFER
342 static void tMPI_Send_env_list_rts(struct envelope *sev)
343 {
344     struct send_envelope_list *evl = sev->slist;
345 #ifdef TMPI_LOCK_FREE_LISTS
346     struct envelope           *sevn;
347
348     do
349     {
350         sevn      = (struct envelope*)tMPI_Atomic_ptr_get(&evl->head_rts);
351         sev->next = sevn;
352         /* the cmpxchg operation is a memory fence, so we shouldn't need
353            to worry about out-of-order evaluation */
354     }
355     while (!tMPI_Atomic_ptr_cas( &(evl->head_rts), sevn, sev ));
356 #else
357     tMPI_Spinlock_lock( &(evl->lock_rts) );
358     ev->next      = (struct envelope*)evl->head_rts;
359     evl->head_rts = sev;
360     tMPI_Spinlock_unlock( &(evl->lock_rts) );
361 #endif
362 }
363 #endif
364
365
366
367 static void tMPI_Send_env_list_remove_old(struct envelope *sev)
368 {
369     /* pretty straighforward because it isn't a shared list */
370     if (sev->next)
371     {
372         sev->next->prev = sev->prev;
373     }
374     if (sev->prev)
375     {
376         sev->prev->next = sev->next;
377     }
378     sev->prev = NULL;
379     sev->next = NULL;
380 }
381
382
383 static void tMPI_Send_env_list_add_new(struct tmpi_thread        tmpi_unused *cur,
384                                        struct send_envelope_list             *evl,
385                                        struct envelope                       *sev)
386 {
387 #ifdef TMPI_LOCK_FREE_LISTS
388     struct envelope *evl_head_new_orig;
389 #endif
390     sev->prev = NULL;
391
392 #ifdef TMPI_LOCK_FREE_LISTS
393     /* behold our lock-free shared linked list:
394        (it's actually quite simple because we only do operations at the head
395         of the list, either adding them - such as here - or detaching the whole
396         list) */
397     do
398     {
399         /* read the old head atomically */
400         evl_head_new_orig = (struct envelope*) tMPI_Atomic_ptr_get(
401                     &(evl->head_new) );
402         /* set our envelope to have that as its next */
403         sev->next = evl_head_new_orig;
404         /* do the compare-and-swap.
405            this operation is a memory fence, so we shouldn't need
406            to worry about out-of-order stores. If it returns false,
407            somebody else got there before us: */
408     }
409     while (!tMPI_Atomic_ptr_cas(&(evl->head_new), evl_head_new_orig, sev));
410
411 #else
412     tMPI_Spinlock_lock( &(evl->lock_new) );
413     /* we add to the start of the list */
414     sev->next = (struct send_envelope*)evl->head_new;
415     /* actually attach it to the list */
416     evl->head_new = sev;
417     tMPI_Spinlock_unlock( &(evl->lock_new) );
418 #endif
419
420 #if defined(TMPI_PROFILE)
421     tMPI_Profile_wait_start(cur);
422 #endif
423     /* signal to the thread that there is a new envelope */
424     tMPI_Event_signal( &(sev->dest->p2p_event) );
425 #if defined(TMPI_PROFILE)
426     tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p_signal);
427 #endif
428 }
429
430 static void tMPI_Send_env_list_move_to_old(struct envelope *sev)
431 {
432     struct send_envelope_list *evl = sev->slist;
433
434     /* remove from old list. We assume the list has been detached! */
435     if (sev->next)
436     {
437         sev->next->prev = sev->prev;
438     }
439     if (sev->prev)
440     {
441         sev->prev->next = sev->next;
442     }
443
444     /* we add to the end of the list */
445     sev->next = evl->head_old;
446     sev->prev = evl->head_old->prev;
447
448     sev->next->prev = sev;
449     sev->prev->next = sev;
450 }
451
452 /* tmpi_recv_envelope_list functions */
453
454 int tMPI_Recv_env_list_init(struct recv_envelope_list *evl)
455 {
456     evl->head       = &(evl->dummy);
457     evl->head->prev = evl->head;
458     evl->head->next = evl->head;
459
460     return TMPI_SUCCESS;
461 }
462
463 void tMPI_Recv_env_list_destroy(struct recv_envelope_list *evl)
464 {
465     evl->head = NULL;
466 }
467
468 static void tMPI_Recv_env_list_add(struct recv_envelope_list *evl,
469                                    struct envelope           *rev)
470 {
471     rev->rlist = evl;
472     /* we add to the end of the list */
473     rev->next = evl->head;
474     rev->prev = evl->head->prev;
475
476     rev->next->prev = rev;
477     rev->prev->next = rev;
478 }
479
480 static void tMPI_Recv_env_list_remove(struct envelope *rev)
481 {
482     if (rev->next)
483     {
484         rev->next->prev = rev->prev;
485     }
486     if (rev->prev)
487     {
488         rev->prev->next = rev->next;
489     }
490     rev->prev  = NULL;
491     rev->next  = NULL;
492     rev->rlist = NULL;
493 }
494
495 /* tmpi_req functions */
496
497 int tMPI_Req_list_init(struct req_list *rl, int N_reqs)
498 {
499     int i;
500
501     rl->alloc_head = (struct tmpi_req_*)tMPI_Malloc(
502                 sizeof(struct tmpi_req_)*N_reqs);
503     if (rl->alloc_head == 0)
504     {
505         return TMPI_ERR_NO_MEM;
506     }
507     rl->head = rl->alloc_head;
508     for (i = 0; i < N_reqs; i++)
509     {
510         if (i == 0)
511         {
512             rl->head[i].prev = NULL;
513         }
514         else
515         {
516             rl->head[i].prev = &(rl->head[i-1]);
517         }
518
519         if (i >= (N_reqs-1))
520         {
521             rl->head[i].next = NULL;
522         }
523         else
524         {
525             rl->head[i].next = &(rl->head[i+1]);
526         }
527     }
528     return TMPI_SUCCESS;
529 }
530
531 void tMPI_Req_list_destroy(struct req_list *rl)
532 {
533     free(rl->alloc_head);
534     rl->head       = NULL;
535     rl->alloc_head = NULL;
536 }
537
538
539
540 struct tmpi_req_ *tMPI_Get_req(struct req_list *rl)
541 {
542     struct tmpi_req_ *req = rl->head;
543
544
545     /* we don't need locks here because requests are a per-thread property */
546     if (!req)
547     {
548         /* this could be fixed */
549         tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_REQUESTS);
550         return NULL;
551     }
552     rl->head  = req->next;
553     req->next = NULL;
554
555     return req;
556 }
557
558 void tMPI_Return_req(struct req_list *rl, struct tmpi_req_ *req)
559 {
560     req->next = rl->head;
561     req->prev = NULL;
562     rl->head  = req;
563 }
564
565 void tMPI_Req_init(struct tmpi_req_ *rq, struct envelope *ev)
566 {
567     rq->ev       = ev;
568     rq->finished = FALSE;
569     rq->next     = rq;
570     rq->prev     = rq;
571
572     rq->source      = ev->src;
573     rq->comm        = ev->comm;
574     rq->tag         = TMPI_ANY_TAG;
575     rq->error       = TMPI_SUCCESS;
576     rq->transferred = 0;
577     rq->cancelled   = FALSE;
578 }
579
580 /* Point-to-point communication protocol functions */
581 void tMPI_Set_req(struct envelope *ev, struct tmpi_req_ *req)
582 {
583     req->source = ev->src;
584     req->comm   = ev->comm;
585     req->tag    = ev->tag;
586     req->error  = ev->error;
587     if (ev->send)
588     {
589         if (tMPI_Atomic_get(&(ev->state)) > env_unmatched)
590         {
591             req->transferred = ev->bufsize;
592         }
593         else
594         {
595             req->transferred = 0;
596         }
597     }
598     else
599     {
600         if (tMPI_Atomic_get(&(ev->state)) == env_finished)
601         {
602             req->transferred = ev->bufsize;
603         }
604         else
605         {
606             req->transferred = 0;
607         }
608     }
609 }
610
611 void tMPI_Set_status(struct tmpi_req_ *req, tMPI_Status *st)
612 {
613     if (st)
614     {
615         st->TMPI_SOURCE = tMPI_Comm_seek_rank(req->comm, req->source);
616         st->TMPI_TAG    = req->tag;
617         st->TMPI_ERROR  = req->error;
618         st->transferred = req->transferred;
619         st->cancelled   = req->cancelled;
620     }
621 }
622
623 tmpi_bool tMPI_Envelope_matches(const struct envelope *sev,
624                                 const struct envelope *rev)
625 {
626 #ifdef TMPI_DEBUG
627     printf("%5d: tMPI_Envelope_matches (%d->%d)==(%d->%d),  tag=(%d==%d),       \n       datatype=(%ld==%ld), comm=(%ld,%ld),\n              finished=(%d==%d)\n",
628            tMPI_This_threadnr(),
629            tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
630            tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
631            (int)(sev->tag), (int)(rev->tag),
632            (long int)sev->datatype, (long int)rev->datatype,
633            (long int)sev->comm, (long int)rev->comm,
634            (int)sev->state.value, (int)rev->state.value);
635     fflush(stdout);
636 #endif
637     if ( ( (rev->tag == TMPI_ANY_TAG) || (rev->tag == sev->tag) ) &&
638          ( sev->comm == rev->comm ) &&
639          ( (!rev->src)  || (rev->src == sev->src) ) &&
640          ( sev->dest == rev->dest ) &&
641          ( sev->datatype == rev->datatype ) &&
642          ( tMPI_Atomic_get(&(sev->state)) < env_finished  &&
643            tMPI_Atomic_get(&(rev->state)) == env_unmatched ) )
644     {
645 #ifdef TMPI_DEBUG
646         printf("%5d: (%d->%d) tag=%d found match\n",
647                tMPI_This_threadnr(),
648                tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
649                (int)(sev->tag));
650         fflush(stdout);
651 #endif
652         return TRUE;
653     }
654     return FALSE;
655 }
656
657 struct envelope* tMPI_Send_env_list_search_old(struct send_envelope_list *evl,
658                                                struct envelope           *rev)
659 {
660     struct envelope *sev;
661
662     sev = (struct envelope*)evl->head_old->next;
663     while (sev != evl->head_old)
664     {
665         if (tMPI_Envelope_matches(sev, rev))
666         {
667             /* remove the envelope */
668             tMPI_Send_env_list_remove_old(sev);
669             return sev;
670         }
671         sev = sev->next;
672     }
673     return NULL;
674 }
675
676 struct envelope* tMPI_Recv_env_list_search_new(struct recv_envelope_list *evl,
677                                                struct envelope           *sev)
678 {
679     struct envelope *rev;
680
681     rev = evl->head->next;
682     while (rev != evl->head)
683     {
684         if (tMPI_Envelope_matches(sev, rev))
685         {
686             return rev;
687         }
688         rev = rev->next;
689     }
690     return NULL;
691 }
692
693 #ifdef USE_SEND_RECV_COPY_BUFFER
694 void tMPI_Send_copy_buffer(struct envelope *sev, struct tmpi_req_ *req)
695 {
696     int state;
697     /* Fill copy buffer, after having anounced its possible use */
698
699     /* in the special case of a zero buffer size, we don't do anything and
700        always let the receiver handle it */
701     if (sev->bufsize == 0)
702     {
703         return;
704     }
705
706     /* first check whether the other side hasn't started yet */
707     state = tMPI_Atomic_get( &(sev->state) );
708     tMPI_Atomic_memory_barrier_acq();
709     if (state == env_unmatched)
710     {
711         /* first copy */
712         memcpy(sev->cb, sev->buf, sev->bufsize);
713         /* now set state, if other side hasn't started copying yet. */
714         tMPI_Atomic_memory_barrier_rel();
715         if (tMPI_Atomic_cas( &(sev->state), env_unmatched, env_cb_available))
716         {
717             /* if it was originally unmatched, the receiver wasn't
718                copying the old buffer. We can don't need to wait,
719                and the receiver is going to clean up this envelope. */
720 #ifdef TMPI_DEBUG
721             printf("%5d: tMPI_Send_copy_buffer(%d->%d, tag=%d) completed\n",
722                    tMPI_This_threadnr(),
723                    tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
724                    (int)(sev->tag));
725             fflush(stdout);
726 #endif
727             return;
728         }
729     }
730     /* and if we reached this point, the receiver had already started
731        copying, and we need to clean up the envelope ourselves.
732
733        we first need to wait until the receiver is finished copying. We
734        know this is a short wait (since the buffer was small enough to be
735        buffered in the first place), so we just spin-wait.  */
736     tMPI_Atomic_memory_barrier(); /* a full barrier to make sure that the
737                                      sending doesn't interfere with the
738                                      waiting */
739     while (tMPI_Atomic_get( &(sev->state) ) < env_cb_available)
740     {
741         tMPI_Atomic_memory_barrier_acq();
742     }
743     tMPI_Atomic_memory_barrier_acq();
744 #ifdef TMPI_DEBUG
745     printf("%5d: tMPI_Send_copy_buffer(%d->%d, tag=%d) waiting-completed\n",
746            tMPI_This_threadnr(),
747            tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest), (int)(sev->tag));
748     fflush(stdout);
749 #endif
750     tMPI_Set_req(sev, req);
751     /* and now we clean up */
752     tMPI_Send_env_list_return(sev);
753 }
754 #endif
755
756 struct envelope* tMPI_Prep_send_envelope(struct send_envelope_list *evl,
757                                          tMPI_Comm comm,
758                                          struct tmpi_thread *src,
759                                          struct tmpi_thread *dest,
760                                          void *buf, int count,
761                                          tMPI_Datatype datatype,
762                                          int tag, tmpi_bool nonblock)
763 {
764     /* get an envelope from the send-envelope stack */
765     struct envelope *ev = tMPI_Send_env_list_fetch_new( evl );
766     if (ev == NULL)
767     {
768         return NULL;
769     }
770
771     ev->tag      = tag;
772     ev->nonblock = nonblock;
773
774     ev->comm = comm;
775
776     ev->src  = src;
777     ev->dest = dest;
778
779     ev->buf      = buf;
780     ev->bufsize  = count*datatype->size;
781     ev->datatype = datatype;
782
783     ev->send = TRUE;
784
785     ev->rlist = NULL;
786
787 #ifdef USE_SEND_RECV_COPY_BUFFER
788     /* check whether we'll be double buffering */
789     ev->using_cb = (ev->bufsize < COPY_BUFFER_SIZE);
790     /* but don't do anything yet */
791 #endif
792
793     tMPI_Atomic_set(&(ev->state), env_unmatched);
794
795     ev->error = TMPI_SUCCESS;
796     if (count < 0)
797     {
798         tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
799         ev->error = TMPI_ERR_XFER_BUFSIZE;
800     }
801
802     return ev;
803 }
804
805 struct envelope* tMPI_Prep_recv_envelope(struct tmpi_thread *cur,
806                                          tMPI_Comm comm,
807                                          struct tmpi_thread *src,
808                                          struct tmpi_thread *dest,
809                                          void *buf, int count,
810                                          tMPI_Datatype datatype, int tag,
811                                          tmpi_bool nonblock)
812 {
813     /* get an envelope from the stack */
814     struct envelope *ev = tMPI_Free_env_list_fetch_recv( &(cur->envelopes) );
815     if (ev == NULL)
816     {
817         return NULL;
818     }
819
820     ev->tag      = tag;
821     ev->nonblock = nonblock;
822
823     ev->comm = comm;
824
825     ev->src  = src;
826     ev->dest = dest;
827
828     ev->buf      = buf;
829     ev->bufsize  = count*datatype->size;
830     ev->datatype = datatype;
831
832     ev->send = FALSE;
833
834     ev->slist = NULL;
835     ev->rlist = NULL;
836
837     tMPI_Atomic_set(&(ev->state), env_unmatched);
838
839     ev->error = TMPI_SUCCESS;
840     if (count < 0)
841     {
842         tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
843         ev->error = TMPI_ERR_XFER_BUFSIZE;
844     }
845
846     return ev;
847 }
848
849 static void tMPI_Xfer(struct tmpi_thread tmpi_unused *cur, struct envelope *sev,
850                       struct envelope *rev)
851 {
852 #ifdef USE_SEND_RECV_COPY_BUFFER
853     /* we remove the sender's envelope only if we do the transfer, which
854        we always do if the buffer size = 0 */
855     tmpi_bool remove_sender = (sev->bufsize == 0);
856 #endif
857 #ifdef TMPI_DEBUG
858     printf("%5d: tMPI_Xfer (%d->%d, tag=%d) started\n",
859            tMPI_This_threadnr(),
860            tMPI_Threadnr(sev->src), tMPI_Threadnr(rev->dest), (int)(sev->tag));
861     fflush(stdout);
862 #endif
863     /* first set data on the receiving end so status can be updated */
864     rev->src = sev->src;
865     rev->tag = sev->tag;
866
867     if (sev->bufsize)          /* do the actual transfer */
868     {
869         void *sbuf = sev->buf; /* source buffer */
870         if (sev->bufsize > rev->bufsize)
871         {
872             tMPI_Error((rev->comm), TMPI_ERR_XFER_BUFSIZE);
873             tMPI_Atomic_set(&(rev->state), env_finished);
874             tMPI_Atomic_set(&(sev->state), env_finished);
875             rev->error = TMPI_ERR_XFER_BUFSIZE;
876             sev->error = TMPI_ERR_XFER_BUFSIZE;
877             return;
878         }
879
880 #ifdef USE_SEND_RECV_COPY_BUFFER
881         if (sev->using_cb)
882         {
883             /* check if the other side has already finished copying */
884             if (!tMPI_Atomic_cas( &(sev->state), env_unmatched, env_copying))
885             {
886                 /* it has, and we're copying from the new buffer.
887                    We're now also tasked with removing the envelope */
888                 sbuf          = sev->cb;
889                 remove_sender = TRUE;
890 #ifdef TMPI_PROFILE
891                 tMPI_Profile_count_buffered_p2p_xfer(cur);
892 #endif
893             }
894         }
895 #endif
896
897         if (!rev->buf || !sev->buf)
898         {
899             tMPI_Error((rev->comm), TMPI_ERR_BUF);
900             tMPI_Atomic_set(&(rev->state), env_finished);
901             tMPI_Atomic_set(&(sev->state), env_finished);
902             rev->error = TMPI_ERR_BUF;
903             sev->error = TMPI_ERR_BUF;
904             return;
905         }
906         memcpy(rev->buf, sbuf, sev->bufsize);
907 #ifdef TMPI_PROFILE
908         tMPI_Profile_count_p2p_xfer(cur);
909 #endif
910         /* for status update */
911     }
912     rev->bufsize = sev->bufsize;
913     /* and mark that we're finished */
914 #if defined(TMPI_PROFILE)
915     {
916         tMPI_Profile_wait_start(cur);
917 #endif
918     tMPI_Atomic_set( &(rev->state), env_finished);
919     tMPI_Atomic_set( &(sev->state), env_finished);
920
921     /* signal to a potentially waiting thread that we're done. */
922     tMPI_Atomic_fetch_add( &(rev->src->ev_outgoing_received), 1);
923     tMPI_Event_signal(&(rev->src->p2p_event));
924
925     /* remove the receiving envelope if it's in a list */
926     tMPI_Recv_env_list_remove(rev);
927 #ifdef USE_SEND_RECV_COPY_BUFFER
928     if (remove_sender)
929     {
930         tMPI_Send_env_list_rts(sev);
931     }
932 #endif
933 #if defined(TMPI_PROFILE)
934     tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p_signal);
935 }
936 #endif
937
938
939 #ifdef TMPI_DEBUG
940     printf("%5d: tMPI_Xfer (%d->%d, tag=%d) done\n",
941            tMPI_This_threadnr(),
942            tMPI_Threadnr(sev->src), tMPI_Threadnr(rev->dest), (int)(sev->tag));
943     fflush(stdout);
944 #endif
945     return;
946 }
947
948 struct envelope* tMPI_Post_match_recv(struct tmpi_thread *cur,
949                                       tMPI_Comm comm,
950                                       struct tmpi_thread *src,
951                                       void *recv_buf, int recv_count,
952                                       tMPI_Datatype datatype,
953                                       int tag, tmpi_bool nonblock)
954 {
955     struct tmpi_thread *dest = cur;
956     struct envelope    *rev;
957     struct envelope    *sev          = NULL;
958     int                 src_threadnr = src ? tMPI_Threadnr(src) : Nthreads;
959     int                 i;
960
961     /* reserve an envelope to post */
962     rev = tMPI_Prep_recv_envelope(cur, comm, src, dest, recv_buf, recv_count,
963                                   datatype, tag, nonblock);
964     if (rev == NULL)
965     {
966         return NULL;
967     }
968
969 #ifdef TMPI_DEBUG
970     printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) started\n",
971            tMPI_This_threadnr(),
972            tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest), (int)(rev->tag));
973     fflush(stdout);
974 #endif
975     /* we now check the entire exisiting send queue */
976     if (src)
977     {
978         sev = tMPI_Send_env_list_search_old( &(dest->evs[src_threadnr]), rev);
979     }
980     else
981     {
982         /* if we don't know the source, we look at all possible sources */
983         for (i = 0; i < Nthreads; i++)
984         {
985             sev = tMPI_Send_env_list_search_old(&(dest->evs[i]), rev);
986             if (sev)
987             {
988                 break;
989             }
990         }
991     }
992
993     if (sev)
994     {
995 #ifdef TMPI_DEBUG
996         printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) found match\n",
997                tMPI_This_threadnr(),
998                tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
999                (int)(rev->tag));
1000         fflush(stdout);
1001 #endif
1002         /* we found a matching send */
1003         tMPI_Xfer(cur, sev, rev);
1004     }
1005     else
1006     {
1007 #ifdef TMPI_DEBUG
1008         printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) no match\n",
1009                tMPI_This_threadnr(),
1010                tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
1011                (int)(rev->tag));
1012         fflush(stdout);
1013 #endif
1014         /* we post the envelope in the right list */
1015         tMPI_Recv_env_list_add( &(dest->evr), rev);
1016     }
1017     return rev;
1018 }
1019
1020 struct envelope *tMPI_Post_send(struct tmpi_thread *cur,
1021                                 tMPI_Comm comm,
1022                                 struct tmpi_thread *dest,
1023                                 void *send_buf, int send_count,
1024                                 tMPI_Datatype datatype, int tag,
1025                                 tmpi_bool nonblock)
1026 {
1027     struct tmpi_thread        *src = cur;
1028     struct envelope           *sev;
1029     int                        src_threadnr = tMPI_Threadnr(src);
1030     struct send_envelope_list *sevl         = &(dest->evs[src_threadnr]);
1031
1032     /* reserve an envelope to post */
1033     sev = tMPI_Prep_send_envelope(sevl, comm, src, dest, send_buf, send_count,
1034                                   datatype, tag, nonblock);
1035     if (sev == NULL)
1036     {
1037         return NULL;
1038     }
1039
1040 #ifdef TMPI_DEBUG
1041     printf("%5d: tMPI_Post_send (%d->%d, tag=%d)\n",
1042            tMPI_This_threadnr(),
1043            tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
1044            (int)(sev->tag));
1045     fflush(stdout);
1046 #endif
1047     /* we post the envelope in the right list */
1048     tMPI_Send_env_list_add_new(cur, &(dest->evs[src_threadnr]), sev);
1049
1050     return sev;
1051 }
1052
1053 void tMPI_Wait_process_incoming(struct tmpi_thread *cur)
1054 {
1055     int i;
1056     int check_id;
1057     int n_handled = 0;
1058
1059
1060 #if defined(TMPI_PROFILE)
1061     tMPI_Profile_wait_start(cur);
1062 #endif
1063     /* we check for newly arrived send envelopes and finished send
1064        envelopes */
1065     check_id = tMPI_Event_wait( &(cur->p2p_event));
1066     /* the outgoing_received items are handled 'automatically'
1067        by the function calling this function */
1068 #if defined(TMPI_PROFILE)
1069     tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p);
1070 #endif
1071     n_handled = tMPI_Atomic_get(&(cur->ev_outgoing_received));
1072     tMPI_Atomic_fetch_add( &(cur->ev_outgoing_received), -n_handled);
1073     check_id -= n_handled;
1074
1075     if (check_id > 0)
1076     {
1077         /*int repl=check_id;*/
1078         /*int n=0;*/
1079         /* there were new send envelopes. Let's check them all */
1080         for (i = 0; i < Nthreads; i++)
1081         {
1082             struct envelope *sev_head;
1083
1084 #ifdef TMPI_LOCK_FREE_LISTS
1085             /* Behold our lock-free shared linked list:
1086                (see tMPI_Send_env_list_add_new for more info) */
1087             do
1088             {
1089                 /* read old head atomically */
1090                 sev_head = (struct envelope*)
1091                     tMPI_Atomic_ptr_get( &(cur->evs[i].head_new) );
1092                 /* do the compare-and-swap to detach the list */
1093             }
1094             while (!tMPI_Atomic_ptr_cas(&(cur->evs[i].head_new), sev_head,
1095                                         NULL));
1096 #else
1097             tMPI_Spinlock_lock( &(cur->evs[i].lock_new) );
1098             sev_head             = (struct send_envelope*)cur->evs[i].head_new;
1099             cur->evs[i].head_new = NULL; /* detach the list */
1100             tMPI_Spinlock_unlock( &(cur->evs[i].lock_new) );
1101 #endif
1102
1103             if (sev_head) /* there's a newly arrived send envelope from this
1104                              thread*/
1105             {
1106                 struct envelope *sev    = sev_head;
1107                 struct envelope *prev_s = NULL;
1108                 struct envelope *rev;
1109
1110                 /* first enable reversing order by creating a regular
1111                    doubly-linked list from the singly-linked shared
1112                    linked list */
1113                 while (sev)
1114                 {
1115                     sev->prev = prev_s;
1116                     prev_s    = sev;
1117                     sev       = sev->next;
1118                 }
1119                 /* now walk through it backwards (in order of addition) */
1120                 sev = prev_s;
1121                 while (sev)
1122                 {
1123                     struct envelope *sevp = sev->prev;
1124                     n_handled++;
1125                     rev = tMPI_Recv_env_list_search_new(&(cur->evr), sev);
1126                     if (rev)
1127                     {
1128                         tMPI_Xfer(cur, sev, rev);
1129                     }
1130                     else
1131                     {
1132                         tMPI_Send_env_list_move_to_old( sev );
1133                     }
1134                     sev = sevp;
1135                 }
1136             }
1137         }
1138     }
1139     tMPI_Event_process( &(cur->p2p_event), n_handled);
1140 }
1141
1142 tmpi_bool tMPI_Test_single(struct tmpi_thread *cur, struct tmpi_req_ *rq)
1143 {
1144     struct envelope *ev = rq->ev;
1145
1146     if (ev && !(rq->finished) )
1147     {
1148 #ifdef USE_SEND_RECV_COPY_BUFFER
1149         if (ev->send && ev->using_cb)
1150         {
1151             /* We buffer-copy. Just do the transfer to the buffer and
1152                return saying that we're done. It's now up to the
1153                receiver to return our envelope.*/
1154             /* do our transfer and are guaranteed a finished
1155                envelope. */
1156             tMPI_Send_copy_buffer(ev, rq);
1157             /* get the results */
1158             rq->error    = rq->ev->error;
1159             rq->finished = TRUE;
1160         }
1161         else
1162 #endif
1163         {
1164             if (tMPI_Atomic_get( &(ev->state) ) >= env_finished)
1165             {
1166                 rq->finished = TRUE;
1167                 /* get the results */
1168                 rq->error = rq->ev->error;
1169                 tMPI_Set_req(ev, rq);
1170                 /* and release the envelope. After this point, the envelope
1171                    may be reused, so its contents shouldn't be relied on. */
1172                 if (ev->send)
1173                 {
1174                     tMPI_Send_env_list_return(ev);
1175                 }
1176                 else
1177                 {
1178                     tMPI_Free_env_list_return_recv( &(cur->envelopes), ev);
1179                 }
1180             }
1181         }
1182     }
1183     return rq->finished;
1184 }
1185
1186 void tMPI_Wait_single(struct tmpi_thread *cur, struct tmpi_req_ *rq)
1187 {
1188     do
1189     {
1190         if (tMPI_Test_single(cur, rq))
1191         {
1192             return;
1193         }
1194         tMPI_Wait_process_incoming(cur);
1195     }
1196     while (TRUE);
1197 }
1198
1199 tmpi_bool tMPI_Test_multi(struct tmpi_thread *cur, struct tmpi_req_ *rqs,
1200                           tmpi_bool *any_done)
1201 {
1202     tmpi_bool         all_done = TRUE;
1203     struct tmpi_req_ *creq     = rqs;
1204
1205     int               i = 0;
1206     if (any_done)
1207     {
1208         *any_done = FALSE;
1209     }
1210
1211     while (creq)
1212     {
1213         tmpi_bool finished = tMPI_Test_single(cur, creq);
1214         i++;
1215
1216         /* now do the check */
1217         if (!finished)
1218         {
1219             all_done = FALSE;
1220         }
1221         else
1222         {
1223             /* remove the request from the list we've been given. */
1224             if (creq->prev)
1225             {
1226                 creq->prev->next = creq->next;
1227             }
1228             if (creq->next)
1229             {
1230                 creq->next->prev = creq->prev;
1231             }
1232             if (any_done)
1233             {
1234                 *any_done = TRUE;
1235             }
1236         }
1237
1238         creq = creq->next;
1239     }
1240
1241     return all_done;
1242 }