Code beautification with uncrustify
[alexxy/gromacs.git] / src / gromacs / gmxlib / thread_mpi / p2p_protocol.c
1 /*
2    This source code file is part of thread_mpi.
3    Written by Sander Pronk, Erik Lindahl, and possibly others.
4
5    Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6    All rights reserved.
7
8    Redistribution and use in source and binary forms, with or without
9    modification, are permitted provided that the following conditions are met:
10    1) Redistributions of source code must retain the above copyright
11    notice, this list of conditions and the following disclaimer.
12    2) Redistributions in binary form must reproduce the above copyright
13    notice, this list of conditions and the following disclaimer in the
14    documentation and/or other materials provided with the distribution.
15    3) Neither the name of the copyright holders nor the
16    names of its contributors may be used to endorse or promote products
17    derived from this software without specific prior written permission.
18
19    THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20    EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21    WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22    DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23    DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24    (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26    ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28    SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30    If you want to redistribute modifications, please consider that
31    scientific software is very special. Version control is crucial -
32    bugs must be traceable. We will be happy to consider code for
33    inclusion in the official distribution, but derived work should not
34    be called official thread_mpi. Details are found in the README & COPYING
35    files.
36  */
37
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
40 #endif
41
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
45
46
47 #ifdef HAVE_UNISTD_H
48 #include <unistd.h>
49 #endif
50
51 #include <errno.h>
52 #include <stdlib.h>
53 #include <stdio.h>
54 #include <stdarg.h>
55 #include <string.h>
56
57
58 #include "impl.h"
59 #include "p2p.h"
60
61 /* free envelopes: */
62 /* free envelopes: */
63 static struct envelope *tMPI_Free_env_list_fetch_recv(struct free_envelope_list
64                                                       *evl);
65
66 /* return an envelope to the free envelopes list */
67 static void tMPI_Free_env_list_return_recv(struct free_envelope_list *evl,
68                                            struct envelope           *rev);
69
70
71
72 /* send envelope lists: */
73 /* send envelopes: */
74 /* get a new envelope from the send list's free envelope list */
75 static struct envelope* tMPI_Send_env_list_fetch_new(struct
76                                                      send_envelope_list *evl);
77
78 /* return a send envelope to the send list's free envelope list,
79    (to be used by the sending thread, who owns the send_envelope_list) */
80 static void tMPI_Send_env_list_return(struct envelope *ev);
81 #ifdef USE_SEND_RECV_COPY_BUFFER
82 /* return a send envelope to the sender's send list.
83    (to be used by the receiving thread). */
84 static void tMPI_Send_env_list_rts(struct envelope *sev);
85 #endif
86
87
88
89
90 /* send envelopes: */
91 /* remove a send envelope from its head_old list. Does not lock */
92 static void tMPI_Send_env_list_remove_old(struct envelope *sev);
93
94 /* add a send envelope to the new envelopes queue in a list */
95 static void tMPI_Send_env_list_add_new(struct tmpi_thread        *cur,
96                                        struct send_envelope_list *evl,
97                                        struct envelope           *sev);
98 /* move a send envelope to the old envelopes queue in a list.
99    Assumes that this is safe to do without interference
100    from other threads, i.e. the list it's in must have been
101    detached. */
102 static void tMPI_Send_env_list_move_to_old(struct envelope *sev);
103
104
105
106
107 /* receive envelopes: */
108 /* add a receive envelope to a list */
109 static void tMPI_Recv_env_list_add(struct recv_envelope_list *evl,
110                                    struct envelope           *ev);
111 /* remove a receive envelope from its list */
112 static void tMPI_Recv_env_list_remove(struct envelope *ev);
113
114
115
116
117 /* do the actual point-to-point transfer */
118 static void tMPI_Xfer(struct tmpi_thread *cur, struct envelope *sev,
119                       struct envelope *rev);
120
121
122
123
124 /* Point-to-point communication protocol functions */
125 void tMPI_Free_env_list_init(struct free_envelope_list *evl, int N)
126 {
127     int i;
128
129     /* allocate the head element */
130     evl->recv_alloc_head = (struct envelope*)tMPI_Malloc(sizeof(struct envelope)
131                                                          *N);
132     evl->head_recv = evl->recv_alloc_head;
133
134     for (i = 0; i < N; i++)
135     {
136         if (i < N-1)
137         {
138             evl->head_recv[i].next = &(evl->head_recv[i+1]);
139         }
140         else
141         {
142             evl->head_recv[i].next = NULL;
143         }
144         evl->head_recv[i].rlist = NULL;
145         evl->head_recv[i].slist = NULL;
146     }
147 }
148
149 void tMPI_Free_env_list_destroy(struct free_envelope_list *evl)
150 {
151     free(evl->recv_alloc_head);
152     evl->head_recv       = NULL;
153     evl->recv_alloc_head = NULL;
154 }
155
156 static struct envelope* tMPI_Free_env_list_fetch_recv(struct
157                                                       free_envelope_list *evl)
158 {
159     struct envelope *ret;
160     if (!evl->head_recv)
161     {
162         /* TODO: make this do something better than crash */
163         fprintf(stderr, "Ran out of recv envelopes!!!!\n");
164         abort();
165     }
166
167     ret            = evl->head_recv;
168     evl->head_recv = ret->next;
169     ret->next      = NULL;
170     ret->prev      = NULL;
171     /*evl->N--;*/
172
173     return ret;
174 }
175
176 static void tMPI_Free_env_list_return_recv(struct free_envelope_list *evl,
177                                            struct envelope           *rev)
178 {
179     rev->rlist     = NULL;
180     rev->slist     = NULL;
181     rev->prev      = NULL;
182     rev->next      = evl->head_recv;
183     evl->head_recv = rev;
184 }
185
186
187
188
189
190
191
192
193
194
195 /* tmpi_send_envelope_list functions */
196
197 void tMPI_Send_env_list_init(struct send_envelope_list *evl, int N)
198 {
199     int i;
200 #ifndef TMPI_LOCK_FREE_LISTS
201     tMPI_Spinlock_init( &(evl->lock_rts) );
202     tMPI_Spinlock_init( &(evl->lock_new) );
203 #endif
204     evl->Nalloc = N;
205
206     evl->alloc_head = (struct envelope*)tMPI_Malloc(sizeof(struct envelope)*N);
207     for (i = 0; i < N; i++)
208     {
209         evl->alloc_head[i].next  = (i < (N-1)) ? &(evl->alloc_head[i+1]) : NULL;
210         evl->alloc_head[i].prev  = NULL;
211         evl->alloc_head[i].slist = evl;
212         evl->alloc_head[i].rlist = NULL;
213 #ifdef USE_SEND_RECV_COPY_BUFFER
214         evl->alloc_head[i].cb = (void*)tMPI_Malloc(sizeof(char)*COPY_BUFFER_SIZE);
215 #endif
216     }
217
218 #ifdef TMPI_LOCK_FREE_LISTS
219     tMPI_Atomic_ptr_set(&(evl->head_new), NULL);
220     tMPI_Atomic_ptr_set(&(evl->head_rts), NULL);
221 #else
222     evl->head_new = NULL;
223     evl->head_rts = NULL;
224 #endif
225     evl->head_free = &(evl->alloc_head[1]);
226     /* initialize the head_old circular list with dummy element */
227     evl->head_old       = evl->alloc_head; /* the first element is a dummy */
228     evl->head_old->next = evl->head_old;
229     evl->head_old->prev = evl->head_old;
230 }
231
232 void tMPI_Send_env_list_destroy(struct send_envelope_list *evl)
233 {
234 #ifdef USE_SEND_RECV_COPY_BUFFER
235     size_t i;
236     for (i = 0; i < evl->Nalloc; i++)
237     {
238         free(evl->alloc_head[i].cb);
239     }
240 #endif
241     free(evl->alloc_head);
242     evl->alloc_head = NULL;
243 #ifdef TMPI_LOCK_FREE_LISTS
244     tMPI_Atomic_ptr_set(&(evl->head_new), NULL);
245 #else
246     evl->head_new = NULL;
247 #endif
248     evl->head_old = NULL; /* make it crash if used after tMPI_Finalize */
249 }
250
251
252 static struct envelope* tMPI_Send_env_list_fetch_new(struct
253                                                      send_envelope_list *evl)
254 {
255     struct envelope *ret;
256
257     do
258     {
259         /* first check whether any envelopes were returned to sender */
260 #ifdef TMPI_LOCK_FREE_LISTS
261         if ((ret = (struct envelope*)tMPI_Atomic_ptr_get(&(evl->head_rts))))
262 #else
263         if (evl->head_rts)
264 #endif
265         {
266             /* detach the list */
267 #ifdef TMPI_LOCK_FREE_LISTS
268             /* we detach by swapping what we expect the pointer value to be,
269                with NULL. If there were a cross-platform way to atomically
270                swap  without checking, we could do that, too. */
271             while (!tMPI_Atomic_ptr_cas( &(evl->head_rts), ret, NULL ))
272             {
273                 ret = (struct envelope*)tMPI_Atomic_ptr_get(&(evl->head_rts));
274             }
275 #else
276             tMPI_Spinlock_lock( &(evl->lock_rts) );
277             ret           = evl->head_rts;
278             evl->head_rts = NULL;
279             tMPI_Spinlock_unlock( &(evl->lock_rts) );
280 #endif
281             /* now add the items to head_free */
282             while (ret)
283             {
284                 struct envelope *next = ret->next;
285                 ret->next      = evl->head_free;
286                 evl->head_free = ret;
287                 ret            = next;
288             }
289         }
290
291         /* get the last free one off the list */
292         ret = evl->head_free;
293         if (!ret)
294 #ifdef USE_SEND_RECV_COPY_BUFFER
295         {
296             /* There are no free send envelopes, so all we can do is handle
297                incoming requests until we get a free send envelope. */
298             printf("Ran out of send envelopes!!\n");
299             tMPI_Wait_process_incoming(tMPI_Get_current());
300         }
301 #else
302         {
303             /* If this happens, it most likely indicates a bug in the
304                calling program. We could fix the situation by waiting,
305                but that would most likely lead to deadlocks - even
306                more difficult to debug than this. */
307             fprintf(stderr, "Ran out of send envelopes!!!!\n");
308             abort();
309         }
310 #endif
311     }
312     while (!ret);
313
314     evl->head_free = ret->next;
315
316     ret->next  = NULL;
317     ret->prev  = NULL;
318     ret->slist = evl;
319     ret->rlist = NULL;
320
321     /* and return it */
322     return ret;
323 }
324
325 static void tMPI_Send_env_list_return(struct envelope *sev)
326 {
327     struct send_envelope_list *evl = sev->slist;
328
329     sev->next      = evl->head_free;
330     evl->head_free = sev;
331 }
332
333
334 #ifdef USE_SEND_RECV_COPY_BUFFER
335 static void tMPI_Send_env_list_rts(struct envelope *sev)
336 {
337     struct send_envelope_list *evl = sev->slist;
338 #ifdef TMPI_LOCK_FREE_LISTS
339     struct envelope           *sevn;
340
341     do
342     {
343         sevn      = (struct envelope*)tMPI_Atomic_ptr_get(&evl->head_rts);
344         sev->next = sevn;
345         /* the cmpxchg operation is a memory fence, so we shouldn't need
346            to worry about out-of-order evaluation */
347     }
348     while (!tMPI_Atomic_ptr_cas( &(evl->head_rts), sevn, sev ));
349 #else
350     tMPI_Spinlock_lock( &(evl->lock_rts) );
351     ev->next      = (struct envelope*)evl->head_rts;
352     evl->head_rts = sev;
353     tMPI_Spinlock_unlock( &(evl->lock_rts) );
354 #endif
355 }
356 #endif
357
358
359
360 static void tMPI_Send_env_list_remove_old(struct envelope *sev)
361 {
362     /* pretty straighforward because it isn't a shared list */
363     if (sev->next)
364     {
365         sev->next->prev = sev->prev;
366     }
367     if (sev->prev)
368     {
369         sev->prev->next = sev->next;
370     }
371     sev->prev = NULL;
372     sev->next = NULL;
373 }
374
375
376 static void tMPI_Send_env_list_add_new(struct tmpi_thread        *cur,
377                                        struct send_envelope_list *evl,
378                                        struct envelope           *sev)
379 {
380 #ifdef TMPI_LOCK_FREE_LISTS
381     struct envelope *evl_head_new_orig;
382 #endif
383     sev->prev = NULL;
384
385 #ifdef TMPI_LOCK_FREE_LISTS
386     /* behold our lock-free shared linked list:
387        (it's actually quite simple because we only do operations at the head
388         of the list, either adding them - such as here - or detaching the whole
389         list) */
390     do
391     {
392         /* read the old head atomically */
393         evl_head_new_orig = (struct envelope*) tMPI_Atomic_ptr_get(
394                     &(evl->head_new) );
395         /* set our envelope to have that as its next */
396         sev->next = evl_head_new_orig;
397         /* do the compare-and-swap.
398            this operation is a memory fence, so we shouldn't need
399            to worry about out-of-order stores. If it returns false,
400            somebody else got there before us: */
401     }
402     while (!tMPI_Atomic_ptr_cas(&(evl->head_new), evl_head_new_orig, sev));
403
404 #else
405     tMPI_Spinlock_lock( &(evl->lock_new) );
406     /* we add to the start of the list */
407     sev->next = (struct send_envelope*)evl->head_new;
408     /* actually attach it to the list */
409     evl->head_new = sev;
410     tMPI_Spinlock_unlock( &(evl->lock_new) );
411 #endif
412
413 #if defined(TMPI_PROFILE)
414     tMPI_Profile_wait_start(cur);
415 #endif
416     /* signal to the thread that there is a new envelope */
417     tMPI_Event_signal( &(sev->dest->p2p_event) );
418 #if defined(TMPI_PROFILE)
419     tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p_signal);
420 #endif
421 }
422
423 static void tMPI_Send_env_list_move_to_old(struct envelope *sev)
424 {
425     struct send_envelope_list *evl = sev->slist;
426
427     /* remove from old list. We assume the list has been detached! */
428     if (sev->next)
429     {
430         sev->next->prev = sev->prev;
431     }
432     if (sev->prev)
433     {
434         sev->prev->next = sev->next;
435     }
436
437     /* we add to the end of the list */
438     sev->next = evl->head_old;
439     sev->prev = evl->head_old->prev;
440
441     sev->next->prev = sev;
442     sev->prev->next = sev;
443 }
444
445
446
447
448
449
450
451
452 /* tmpi_recv_envelope_list functions */
453
454 void tMPI_Recv_env_list_init(struct recv_envelope_list *evl)
455 {
456     evl->head       = &(evl->dummy);
457     evl->head->prev = evl->head;
458     evl->head->next = evl->head;
459 }
460
461 void tMPI_Recv_env_list_destroy(struct recv_envelope_list *evl)
462 {
463     evl->head = NULL;
464 }
465
466 static void tMPI_Recv_env_list_add(struct recv_envelope_list *evl,
467                                    struct envelope           *rev)
468 {
469     rev->rlist = evl;
470     /* we add to the end of the list */
471     rev->next = evl->head;
472     rev->prev = evl->head->prev;
473
474     rev->next->prev = rev;
475     rev->prev->next = rev;
476 }
477
478 static void tMPI_Recv_env_list_remove(struct envelope *rev)
479 {
480     if (rev->next)
481     {
482         rev->next->prev = rev->prev;
483     }
484     if (rev->prev)
485     {
486         rev->prev->next = rev->next;
487     }
488     rev->prev  = NULL;
489     rev->next  = NULL;
490     rev->rlist = NULL;
491 }
492
493
494
495
496
497
498
499
500 /* tmpi_req functions */
501
502 void tMPI_Req_list_init(struct req_list *rl, int N_reqs)
503 {
504     int i;
505
506     rl->alloc_head = (struct tmpi_req_*)tMPI_Malloc(
507                 sizeof(struct tmpi_req_)*N_reqs);
508     rl->head = rl->alloc_head;
509     for (i = 0; i < N_reqs; i++)
510     {
511         if (i == 0)
512         {
513             rl->head[i].prev = NULL;
514         }
515         else
516         {
517             rl->head[i].prev = &(rl->head[i-1]);
518         }
519
520         if (i >= (N_reqs-1))
521         {
522             rl->head[i].next = NULL;
523         }
524         else
525         {
526             rl->head[i].next = &(rl->head[i+1]);
527         }
528     }
529 }
530
531 void tMPI_Req_list_destroy(struct req_list *rl)
532 {
533     free(rl->alloc_head);
534     rl->head       = NULL;
535     rl->alloc_head = NULL;
536 }
537
538
539
540 struct tmpi_req_ *tMPI_Get_req(struct req_list *rl)
541 {
542     struct tmpi_req_ *req = rl->head;
543
544
545     /* we don't need locks here because requests are a per-thread property */
546     if (!req)
547     {
548         /* this could be fixed */
549         tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_REQUESTS);
550         return NULL;
551     }
552     rl->head  = req->next;
553     req->next = NULL;
554
555     return req;
556 }
557
558 void tMPI_Return_req(struct req_list *rl, struct tmpi_req_ *req)
559 {
560     req->next = rl->head;
561     req->prev = NULL;
562     rl->head  = req;
563 }
564
565
566
567 void tMPI_Req_init(struct tmpi_req_ *rq, struct envelope *ev)
568 {
569     rq->ev       = ev;
570     rq->finished = FALSE;
571     rq->next     = rq;
572     rq->prev     = rq;
573
574     rq->source      = ev->src;
575     rq->comm        = ev->comm;
576     rq->tag         = TMPI_ANY_TAG;
577     rq->error       = TMPI_SUCCESS;
578     rq->transferred = 0;
579     rq->cancelled   = FALSE;
580 }
581
582
583
584
585
586
587 /* Point-to-point communication protocol functions */
588
589
590
591 void tMPI_Set_req(struct envelope *ev, struct tmpi_req_ *req)
592 {
593     req->source = ev->src;
594     req->comm   = ev->comm;
595     req->tag    = ev->tag;
596     req->error  = ev->error;
597     if (ev->send)
598     {
599         if (tMPI_Atomic_get(&(ev->state)) > env_unmatched)
600         {
601             req->transferred = ev->bufsize;
602         }
603         else
604         {
605             req->transferred = 0;
606         }
607     }
608     else
609     {
610         if (tMPI_Atomic_get(&(ev->state)) == env_finished)
611         {
612             req->transferred = ev->bufsize;
613         }
614         else
615         {
616             req->transferred = 0;
617         }
618     }
619 }
620
621 void tMPI_Set_status(struct tmpi_req_ *req, tMPI_Status *st)
622 {
623     if (st)
624     {
625         st->TMPI_SOURCE = tMPI_Comm_seek_rank(req->comm, req->source);
626         st->TMPI_TAG    = req->tag;
627         st->TMPI_ERROR  = req->error;
628         st->transferred = req->transferred;
629         st->cancelled   = req->cancelled;
630     }
631 }
632
633
634 tmpi_bool tMPI_Envelope_matches(const struct envelope *sev,
635                                 const struct envelope *rev)
636 {
637 #ifdef TMPI_DEBUG
638     printf("%5d: tMPI_Envelope_matches (%d->%d)==(%d->%d),  tag=(%d==%d),       \n       datatype=(%ld==%ld), comm=(%ld,%ld),\n              finished=(%d==%d)\n",
639            tMPI_This_threadnr(),
640            tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
641            tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
642            (int)(sev->tag), (int)(rev->tag),
643            (long int)sev->datatype, (long int)rev->datatype,
644            (long int)sev->comm, (long int)rev->comm,
645            (int)sev->state.value, (int)rev->state.value);
646     fflush(stdout);
647 #endif
648     if ( ( (rev->tag == TMPI_ANY_TAG) || (rev->tag == sev->tag) ) &&
649          ( sev->comm == rev->comm ) &&
650          ( (!rev->src)  || (rev->src == sev->src) ) &&
651          ( sev->dest == rev->dest ) &&
652          ( sev->datatype == rev->datatype ) &&
653          ( sev->state.value < env_finished  &&
654            rev->state.value == env_unmatched ) )
655     {
656 #ifdef TMPI_DEBUG
657         printf("%5d: (%d->%d) tag=%d found match\n",
658                tMPI_This_threadnr(),
659                tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
660                (int)(sev->tag));
661         fflush(stdout);
662 #endif
663         return TRUE;
664     }
665     return FALSE;
666 }
667
668
669
670
671 struct envelope* tMPI_Send_env_list_search_old(struct send_envelope_list *evl,
672                                                struct envelope           *rev)
673 {
674     struct envelope *sev;
675
676     sev = (struct envelope*)evl->head_old->next;
677     while (sev != evl->head_old)
678     {
679         if (tMPI_Envelope_matches(sev, rev))
680         {
681             /* remove the envelope */
682             tMPI_Send_env_list_remove_old(sev);
683             return sev;
684         }
685         sev = sev->next;
686     }
687     return NULL;
688 }
689
690
691 struct envelope* tMPI_Recv_env_list_search_new(struct recv_envelope_list *evl,
692                                                struct envelope           *sev)
693 {
694     struct envelope *rev;
695
696     rev = evl->head->next;
697     while (rev != evl->head)
698     {
699         if (tMPI_Envelope_matches(sev, rev))
700         {
701             return rev;
702         }
703         rev = rev->next;
704     }
705     return NULL;
706 }
707
708
709 #ifdef USE_SEND_RECV_COPY_BUFFER
710 void tMPI_Send_copy_buffer(struct envelope *sev, struct tmpi_req_ *req)
711 {
712     int state;
713     /* Fill copy buffer, after having anounced its possible use */
714
715     /* in the special case of a zero buffer size, we don't do anything and
716        always let the receiver handle it */
717     if (sev->bufsize == 0)
718     {
719         return;
720     }
721
722     /* first check whether the other side hasn't started yet */
723     state = tMPI_Atomic_get( &(sev->state) );
724     tMPI_Atomic_memory_barrier_acq();
725     if (state == env_unmatched)
726     {
727         /* first copy */
728         memcpy(sev->cb, sev->buf, sev->bufsize);
729         /* now set state, if other side hasn't started copying yet. */
730         tMPI_Atomic_memory_barrier_rel();
731         if (tMPI_Atomic_cas( &(sev->state), env_unmatched, env_cb_available))
732         {
733             /* if it was originally unmatched, the receiver wasn't
734                copying the old buffer. We can don't need to wait,
735                and the receiver is going to clean up this envelope. */
736 #ifdef TMPI_DEBUG
737             printf("%5d: tMPI_Send_copy_buffer(%d->%d, tag=%d) completed\n",
738                    tMPI_This_threadnr(),
739                    tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
740                    (int)(sev->tag));
741             fflush(stdout);
742 #endif
743             return;
744         }
745     }
746     /* and if we reached this point, the receiver had already started
747        copying, and we need to clean up the envelope ourselves.
748
749        we first need to wait until the receiver is finished copying. We
750        know this is a short wait (since the buffer was small enough to be
751        buffered in the first place), so we just spin-wait.  */
752     while (tMPI_Atomic_get( &(sev->state) ) < env_cb_available)
753     {
754     }
755     tMPI_Atomic_memory_barrier_acq();
756 #ifdef TMPI_DEBUG
757     printf("%5d: tMPI_Send_copy_buffer(%d->%d, tag=%d) waiting-completed\n",
758            tMPI_This_threadnr(),
759            tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest), (int)(sev->tag));
760     fflush(stdout);
761 #endif
762     tMPI_Set_req(sev, req);
763     /* and now we clean up */
764     tMPI_Send_env_list_return(sev);
765 }
766 #endif
767
768
769 struct envelope* tMPI_Prep_send_envelope(struct send_envelope_list *evl,
770                                          tMPI_Comm comm,
771                                          struct tmpi_thread *src,
772                                          struct tmpi_thread *dest,
773                                          void *buf, int count,
774                                          tMPI_Datatype datatype,
775                                          int tag, tmpi_bool nonblock)
776 {
777     /* get an envelope from the send-envelope stack */
778     struct envelope *ev = tMPI_Send_env_list_fetch_new( evl );
779
780     ev->tag      = tag;
781     ev->nonblock = nonblock;
782
783     ev->comm = comm;
784
785     ev->src  = src;
786     ev->dest = dest;
787
788     ev->buf      = buf;
789     ev->bufsize  = count*datatype->size;
790     ev->datatype = datatype;
791
792     ev->send = TRUE;
793
794     ev->rlist = NULL;
795
796 #ifdef USE_SEND_RECV_COPY_BUFFER
797     /* check whether we'll be double buffering */
798     ev->using_cb = (ev->bufsize < COPY_BUFFER_SIZE);
799     /* but don't do anything yet */
800 #endif
801
802     tMPI_Atomic_set(&(ev->state), env_unmatched);
803
804     ev->error = TMPI_SUCCESS;
805     if (count < 0)
806     {
807         tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
808         ev->error = TMPI_ERR_XFER_BUFSIZE;
809     }
810
811     return ev;
812 }
813
814 struct envelope* tMPI_Prep_recv_envelope(struct tmpi_thread *cur,
815                                          tMPI_Comm comm,
816                                          struct tmpi_thread *src,
817                                          struct tmpi_thread *dest,
818                                          void *buf, int count,
819                                          tMPI_Datatype datatype, int tag,
820                                          tmpi_bool nonblock)
821 {
822     /* get an envelope from the stack */
823     struct envelope *ev = tMPI_Free_env_list_fetch_recv( &(cur->envelopes) );
824
825     ev->tag      = tag;
826     ev->nonblock = nonblock;
827
828     ev->comm = comm;
829
830     ev->src  = src;
831     ev->dest = dest;
832
833     ev->buf      = buf;
834     ev->bufsize  = count*datatype->size;
835     ev->datatype = datatype;
836
837     ev->send = FALSE;
838
839     ev->slist = NULL;
840     ev->rlist = NULL;
841
842     tMPI_Atomic_set(&(ev->state), env_unmatched);
843
844     ev->error = TMPI_SUCCESS;
845     if (count < 0)
846     {
847         tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
848         ev->error = TMPI_ERR_XFER_BUFSIZE;
849     }
850
851     return ev;
852 }
853
854
855
856
857
858
859
860
861 static void tMPI_Xfer(struct tmpi_thread *cur, struct envelope *sev,
862                       struct envelope *rev)
863 {
864 #ifdef USE_SEND_RECV_COPY_BUFFER
865     /* we remove the sender's envelope only if we do the transfer, which
866        we always do if the buffer size = 0 */
867     tmpi_bool remove_sender = (sev->bufsize == 0);
868 #endif
869 #ifdef TMPI_DEBUG
870     printf("%5d: tMPI_Xfer (%d->%d, tag=%d) started\n",
871            tMPI_This_threadnr(),
872            tMPI_Threadnr(sev->src), tMPI_Threadnr(rev->dest), (int)(sev->tag));
873     fflush(stdout);
874 #endif
875     /* first set data on the receiving end so status can be updated */
876     rev->src = sev->src;
877     rev->tag = sev->tag;
878
879     if (sev->bufsize)          /* do the actual transfer */
880     {
881         void *sbuf = sev->buf; /* source buffer */
882         if (sev->bufsize > rev->bufsize)
883         {
884             tMPI_Error((rev->comm), TMPI_ERR_XFER_BUFSIZE);
885             tMPI_Atomic_set(&(rev->state), env_finished);
886             tMPI_Atomic_set(&(sev->state), env_finished);
887             rev->error = TMPI_ERR_XFER_BUFSIZE;
888             sev->error = TMPI_ERR_XFER_BUFSIZE;
889             return;
890         }
891
892 #ifdef USE_SEND_RECV_COPY_BUFFER
893         if (sev->using_cb)
894         {
895             /* check if the other side has already finished copying */
896             if (!tMPI_Atomic_cas( &(sev->state), env_unmatched, env_copying))
897             {
898                 /* it has, and we're copying from the new buffer.
899                    We're now also tasked with removing the envelope */
900                 sbuf          = sev->cb;
901                 remove_sender = TRUE;
902 #ifdef TMPI_PROFILE
903                 tMPI_Profile_count_buffered_p2p_xfer(cur);
904 #endif
905             }
906         }
907 #endif
908
909         if (!rev->buf || !sev->buf)
910         {
911             tMPI_Error((rev->comm), TMPI_ERR_BUF);
912             tMPI_Atomic_set(&(rev->state), env_finished);
913             tMPI_Atomic_set(&(sev->state), env_finished);
914             rev->error = TMPI_ERR_BUF;
915             sev->error = TMPI_ERR_BUF;
916             return;
917         }
918         memcpy(rev->buf, sbuf, sev->bufsize);
919 #ifdef TMPI_PROFILE
920         tMPI_Profile_count_p2p_xfer(cur);
921 #endif
922         /* for status update */
923     }
924     rev->bufsize = sev->bufsize;
925     /* and mark that we're finished */
926 #if defined(TMPI_PROFILE)
927     {
928         tMPI_Profile_wait_start(cur);
929 #endif
930     tMPI_Atomic_set( &(rev->state), env_finished);
931     tMPI_Atomic_set( &(sev->state), env_finished);
932
933     /* signal to a potentially waiting thread that we're done. */
934     tMPI_Atomic_add_return( &(rev->src->ev_outgoing_received), 1);
935     tMPI_Event_signal(&(rev->src->p2p_event));
936
937     /* remove the receiving envelope if it's in a list */
938     tMPI_Recv_env_list_remove(rev);
939 #ifdef USE_SEND_RECV_COPY_BUFFER
940     if (remove_sender)
941     {
942         tMPI_Send_env_list_rts(sev);
943     }
944 #endif
945 #if defined(TMPI_PROFILE)
946     tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p_signal);
947 }
948 #endif
949
950
951 #ifdef TMPI_DEBUG
952     printf("%5d: tMPI_Xfer (%d->%d, tag=%d) done\n",
953            tMPI_This_threadnr(),
954            tMPI_Threadnr(sev->src), tMPI_Threadnr(rev->dest), (int)(sev->tag));
955     fflush(stdout);
956 #endif
957     return;
958 }
959
960
961
962
963
964
965 struct envelope* tMPI_Post_match_recv(struct tmpi_thread *cur,
966                                       tMPI_Comm comm,
967                                       struct tmpi_thread *src,
968                                       void *recv_buf, int recv_count,
969                                       tMPI_Datatype datatype,
970                                       int tag, tmpi_bool nonblock)
971 {
972     struct tmpi_thread *dest = cur;
973     struct envelope    *rev;
974     struct envelope    *sev          = NULL;
975     int                 src_threadnr = src ? tMPI_Threadnr(src) : Nthreads;
976     int                 i;
977
978     /* reserve an envelope to post */
979     rev = tMPI_Prep_recv_envelope(cur, comm, src, dest, recv_buf, recv_count,
980                                   datatype, tag, nonblock);
981
982 #ifdef TMPI_DEBUG
983     printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) started\n",
984            tMPI_This_threadnr(),
985            tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest), (int)(rev->tag));
986     fflush(stdout);
987 #endif
988     /* we now check the entire exisiting send queue */
989     if (src)
990     {
991         sev = tMPI_Send_env_list_search_old( &(dest->evs[src_threadnr]), rev);
992     }
993     else
994     {
995         /* if we don't know the source, we look at all possible sources */
996         for (i = 0; i < Nthreads; i++)
997         {
998             sev = tMPI_Send_env_list_search_old(&(dest->evs[i]), rev);
999             if (sev)
1000             {
1001                 break;
1002             }
1003         }
1004     }
1005
1006     if (sev)
1007     {
1008 #ifdef TMPI_DEBUG
1009         printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) found match\n",
1010                tMPI_This_threadnr(),
1011                tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
1012                (int)(rev->tag));
1013         fflush(stdout);
1014 #endif
1015         /* we found a matching send */
1016         tMPI_Xfer(cur, sev, rev);
1017     }
1018     else
1019     {
1020 #ifdef TMPI_DEBUG
1021         printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) no match\n",
1022                tMPI_This_threadnr(),
1023                tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
1024                (int)(rev->tag));
1025         fflush(stdout);
1026 #endif
1027         /* we post the envelope in the right list */
1028         tMPI_Recv_env_list_add( &(dest->evr), rev);
1029     }
1030     return rev;
1031 }
1032
1033
1034
1035
1036 struct envelope *tMPI_Post_send(struct tmpi_thread *cur,
1037                                 tMPI_Comm comm,
1038                                 struct tmpi_thread *dest,
1039                                 void *send_buf, int send_count,
1040                                 tMPI_Datatype datatype, int tag,
1041                                 tmpi_bool nonblock)
1042 {
1043     struct tmpi_thread        *src = cur;
1044     struct envelope           *sev;
1045     int                        src_threadnr = tMPI_Threadnr(src);
1046     struct send_envelope_list *sevl         = &(dest->evs[src_threadnr]);
1047
1048     /* reserve an envelope to post */
1049     sev = tMPI_Prep_send_envelope(sevl, comm, src, dest, send_buf, send_count,
1050                                   datatype, tag, nonblock);
1051
1052 #ifdef TMPI_DEBUG
1053     printf("%5d: tMPI_Post_send (%d->%d, tag=%d)\n",
1054            tMPI_This_threadnr(),
1055            tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
1056            (int)(sev->tag));
1057     fflush(stdout);
1058 #endif
1059     /* we post the envelope in the right list */
1060     tMPI_Send_env_list_add_new(cur, &(dest->evs[src_threadnr]), sev);
1061
1062     return sev;
1063 }
1064
1065
1066
1067
1068 void tMPI_Wait_process_incoming(struct tmpi_thread *cur)
1069 {
1070     int i;
1071     int check_id;
1072     int n_handled = 0;
1073
1074
1075 #if defined(TMPI_PROFILE)
1076     tMPI_Profile_wait_start(cur);
1077 #endif
1078     /* we check for newly arrived send envelopes and finished send
1079        envelopes */
1080     check_id = tMPI_Event_wait( &(cur->p2p_event));
1081     /* the outgoing_received items are handled 'automatically'
1082        by the function calling this function */
1083 #if defined(TMPI_PROFILE)
1084     tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p);
1085 #endif
1086     n_handled = tMPI_Atomic_get(&(cur->ev_outgoing_received));
1087     tMPI_Atomic_add_return( &(cur->ev_outgoing_received), -n_handled);
1088     check_id -= n_handled;
1089
1090     if (check_id > 0)
1091     {
1092         /*int repl=check_id;*/
1093         /*int n=0;*/
1094         /* there were new send envelopes. Let's check them all */
1095         for (i = 0; i < Nthreads; i++)
1096         {
1097             struct envelope *sev_head;
1098
1099 #ifdef TMPI_LOCK_FREE_LISTS
1100             /* Behold our lock-free shared linked list:
1101                (see tMPI_Send_env_list_add_new for more info) */
1102             do
1103             {
1104                 /* read old head atomically */
1105                 sev_head = (struct envelope*)
1106                     tMPI_Atomic_ptr_get( &(cur->evs[i].head_new) );
1107                 /* do the compare-and-swap to detach the list */
1108             }
1109             while (!tMPI_Atomic_ptr_cas(&(cur->evs[i].head_new), sev_head,
1110                                         NULL));
1111 #else
1112             tMPI_Spinlock_lock( &(cur->evs[i].lock_new) );
1113             sev_head             = (struct send_envelope*)cur->evs[i].head_new;
1114             cur->evs[i].head_new = NULL; /* detach the list */
1115             tMPI_Spinlock_unlock( &(cur->evs[i].lock_new) );
1116 #endif
1117
1118             if (sev_head) /* there's a newly arrived send envelope from this
1119                              thread*/
1120             {
1121                 struct envelope *sev    = sev_head;
1122                 struct envelope *prev_s = NULL;
1123                 struct envelope *rev;
1124
1125                 /* first enable reversing order by creating a regular
1126                    doubly-linked list from the singly-linked shared
1127                    linked list */
1128                 while (sev)
1129                 {
1130                     sev->prev = prev_s;
1131                     prev_s    = sev;
1132                     sev       = sev->next;
1133                 }
1134                 /* now walk through it backwards (in order of addition) */
1135                 sev = prev_s;
1136                 while (sev)
1137                 {
1138                     struct envelope *sevp = sev->prev;
1139                     n_handled++;
1140                     rev = tMPI_Recv_env_list_search_new(&(cur->evr), sev);
1141                     if (rev)
1142                     {
1143                         tMPI_Xfer(cur, sev, rev);
1144                     }
1145                     else
1146                     {
1147                         tMPI_Send_env_list_move_to_old( sev );
1148                     }
1149                     sev = sevp;
1150                 }
1151             }
1152         }
1153     }
1154     tMPI_Event_process( &(cur->p2p_event), n_handled);
1155 }
1156
1157 tmpi_bool tMPI_Test_single(struct tmpi_thread *cur, struct tmpi_req_ *rq)
1158 {
1159     struct envelope *ev = rq->ev;
1160
1161     if (ev && !(rq->finished) )
1162     {
1163 #ifdef USE_SEND_RECV_COPY_BUFFER
1164         if (ev->send && ev->using_cb)
1165         {
1166             /* We buffer-copy. Just do the transfer to the buffer and
1167                return saying that we're done. It's now up to the
1168                receiver to return our envelope.*/
1169             /* do our transfer and are guaranteed a finished
1170                envelope. */
1171             tMPI_Send_copy_buffer(ev, rq);
1172             /* get the results */
1173             rq->error    = rq->ev->error;
1174             rq->finished = TRUE;
1175         }
1176         else
1177 #endif
1178         {
1179             if (tMPI_Atomic_get( &(ev->state) ) >= env_finished)
1180             {
1181                 rq->finished = TRUE;
1182                 /* get the results */
1183                 rq->error = rq->ev->error;
1184                 tMPI_Set_req(ev, rq);
1185                 /* and release the envelope. After this point, the envelope
1186                    may be reused, so its contents shouldn't be relied on. */
1187                 if (ev->send)
1188                 {
1189                     tMPI_Send_env_list_return(ev);
1190                 }
1191                 else
1192                 {
1193                     tMPI_Free_env_list_return_recv( &(cur->envelopes), ev);
1194                 }
1195             }
1196         }
1197     }
1198     return rq->finished;
1199 }
1200
1201 void tMPI_Wait_single(struct tmpi_thread *cur, struct tmpi_req_ *rq)
1202 {
1203     do
1204     {
1205         if (tMPI_Test_single(cur, rq))
1206         {
1207             return;
1208         }
1209         tMPI_Wait_process_incoming(cur);
1210     }
1211     while (TRUE);
1212 }
1213
1214 tmpi_bool tMPI_Test_multi(struct tmpi_thread *cur, struct tmpi_req_ *rqs,
1215                           tmpi_bool *any_done)
1216 {
1217     tmpi_bool         all_done = TRUE;
1218     struct tmpi_req_ *creq     = rqs;
1219
1220     int               i = 0;
1221     if (any_done)
1222     {
1223         *any_done = FALSE;
1224     }
1225
1226     while (creq)
1227     {
1228         tmpi_bool finished = tMPI_Test_single(cur, creq);
1229         i++;
1230
1231         /* now do the check */
1232         if (!finished)
1233         {
1234             all_done = FALSE;
1235         }
1236         else
1237         {
1238             /* remove the request from the list we've been given. */
1239             if (creq->prev)
1240             {
1241                 creq->prev->next = creq->next;
1242             }
1243             if (creq->next)
1244             {
1245                 creq->next->prev = creq->prev;
1246             }
1247             if (any_done)
1248             {
1249                 *any_done = TRUE;
1250             }
1251         }
1252
1253         creq = creq->next;
1254     }
1255
1256     return all_done;
1257 }