Merge branch 'release-4-6'
[alexxy/gromacs.git] / src / gromacs / gmxlib / thread_mpi / p2p_protocol.c
1 /*
2    This source code file is part of thread_mpi.
3    Written by Sander Pronk, Erik Lindahl, and possibly others.
4
5    Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6    All rights reserved.
7
8    Redistribution and use in source and binary forms, with or without
9    modification, are permitted provided that the following conditions are met:
10    1) Redistributions of source code must retain the above copyright
11    notice, this list of conditions and the following disclaimer.
12    2) Redistributions in binary form must reproduce the above copyright
13    notice, this list of conditions and the following disclaimer in the
14    documentation and/or other materials provided with the distribution.
15    3) Neither the name of the copyright holders nor the
16    names of its contributors may be used to endorse or promote products
17    derived from this software without specific prior written permission.
18
19    THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20    EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21    WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22    DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23    DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24    (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26    ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28    SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30    If you want to redistribute modifications, please consider that
31    scientific software is very special. Version control is crucial -
32    bugs must be traceable. We will be happy to consider code for
33    inclusion in the official distribution, but derived work should not
34    be called official thread_mpi. Details are found in the README & COPYING
35    files.
36  */
37
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
40 #endif
41
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
45
46
47 #ifdef HAVE_UNISTD_H
48 #include <unistd.h>
49 #endif
50
51 #include <errno.h>
52 #include <stdlib.h>
53 #include <stdio.h>
54 #include <stdarg.h>
55 #include <string.h>
56
57
58 #include "impl.h"
59 #include "p2p.h"
60
61 /* free envelopes: */
62 /* free envelopes: */
63 static struct envelope *tMPI_Free_env_list_fetch_recv(struct free_envelope_list
64                                                       *evl);
65
66 /* return an envelope to the free envelopes list */
67 static void tMPI_Free_env_list_return_recv(struct free_envelope_list *evl,
68                                            struct envelope           *rev);
69
70
71
72 /* send envelope lists: */
73 /* send envelopes: */
74 /* get a new envelope from the send list's free envelope list */
75 static struct envelope* tMPI_Send_env_list_fetch_new(struct
76                                                      send_envelope_list *evl);
77
78 /* return a send envelope to the send list's free envelope list,
79    (to be used by the sending thread, who owns the send_envelope_list) */
80 static void tMPI_Send_env_list_return(struct envelope *ev);
81 #ifdef USE_SEND_RECV_COPY_BUFFER
82 /* return a send envelope to the sender's send list.
83    (to be used by the receiving thread). */
84 static void tMPI_Send_env_list_rts(struct envelope *sev);
85 #endif
86
87
88
89
90 /* send envelopes: */
91 /* remove a send envelope from its head_old list. Does not lock */
92 static void tMPI_Send_env_list_remove_old(struct envelope *sev);
93
94 /* add a send envelope to the new envelopes queue in a list */
95 static void tMPI_Send_env_list_add_new(struct tmpi_thread        *cur,
96                                        struct send_envelope_list *evl,
97                                        struct envelope           *sev);
98 /* move a send envelope to the old envelopes queue in a list.
99    Assumes that this is safe to do without interference
100    from other threads, i.e. the list it's in must have been
101    detached. */
102 static void tMPI_Send_env_list_move_to_old(struct envelope *sev);
103
104
105
106
107 /* receive envelopes: */
108 /* add a receive envelope to a list */
109 static void tMPI_Recv_env_list_add(struct recv_envelope_list *evl,
110                                    struct envelope           *ev);
111 /* remove a receive envelope from its list */
112 static void tMPI_Recv_env_list_remove(struct envelope *ev);
113
114
115
116
117 /* do the actual point-to-point transfer */
118 static void tMPI_Xfer(struct tmpi_thread *cur, struct envelope *sev,
119                       struct envelope *rev);
120
121
122
123
124 /* Point-to-point communication protocol functions */
125 int tMPI_Free_env_list_init(struct free_envelope_list *evl, int N)
126 {
127     int i;
128
129     /* allocate the head element */
130     evl->recv_alloc_head = (struct envelope*)tMPI_Malloc(sizeof(struct envelope)
131                                                          *N);
132     if (evl->recv_alloc_head == NULL)
133     {
134         return TMPI_ERR_NO_MEM;
135     }
136     evl->head_recv = evl->recv_alloc_head;
137
138     for (i = 0; i < N; i++)
139     {
140         if (i < N-1)
141         {
142             evl->head_recv[i].next = &(evl->head_recv[i+1]);
143         }
144         else
145         {
146             evl->head_recv[i].next = NULL;
147         }
148         evl->head_recv[i].rlist = NULL;
149         evl->head_recv[i].slist = NULL;
150     }
151     return TMPI_SUCCESS;
152 }
153
154 void tMPI_Free_env_list_destroy(struct free_envelope_list *evl)
155 {
156     free(evl->recv_alloc_head);
157     evl->head_recv       = NULL;
158     evl->recv_alloc_head = NULL;
159 }
160
161 static struct envelope* tMPI_Free_env_list_fetch_recv(struct
162                                                       free_envelope_list *evl)
163 {
164     struct envelope *ret;
165     if (!evl->head_recv)
166     {
167         tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_ENVELOPES);
168         return NULL;
169     }
170
171     ret            = evl->head_recv;
172     evl->head_recv = ret->next;
173     ret->next      = NULL;
174     ret->prev      = NULL;
175     /*evl->N--;*/
176
177     return ret;
178 }
179
180 static void tMPI_Free_env_list_return_recv(struct free_envelope_list *evl,
181                                            struct envelope           *rev)
182 {
183     rev->rlist     = NULL;
184     rev->slist     = NULL;
185     rev->prev      = NULL;
186     rev->next      = evl->head_recv;
187     evl->head_recv = rev;
188 }
189
190
191
192
193
194
195
196
197
198
199 /* tmpi_send_envelope_list functions */
200
201 int tMPI_Send_env_list_init(struct send_envelope_list *evl, int N)
202 {
203     int i;
204 #ifndef TMPI_LOCK_FREE_LISTS
205     tMPI_Spinlock_init( &(evl->lock_rts) );
206     tMPI_Spinlock_init( &(evl->lock_new) );
207 #endif
208     evl->Nalloc = N;
209
210     evl->alloc_head = (struct envelope*)tMPI_Malloc(sizeof(struct envelope)*N);
211     if (evl->alloc_head == NULL)
212     {
213         return TMPI_ERR_NO_MEM;
214     }
215     for (i = 0; i < N; i++)
216     {
217         evl->alloc_head[i].next  = (i < (N-1)) ? &(evl->alloc_head[i+1]) : NULL;
218         evl->alloc_head[i].prev  = NULL;
219         evl->alloc_head[i].slist = evl;
220         evl->alloc_head[i].rlist = NULL;
221 #ifdef USE_SEND_RECV_COPY_BUFFER
222         evl->alloc_head[i].cb = (void*)tMPI_Malloc(sizeof(char)*
223                                                    COPY_BUFFER_SIZE);
224         if (evl->alloc_head[i].cb == NULL)
225         {
226             return TMPI_ERR_NO_MEM;
227         }
228 #endif
229     }
230
231 #ifdef TMPI_LOCK_FREE_LISTS
232     tMPI_Atomic_ptr_set(&(evl->head_new), NULL);
233     tMPI_Atomic_ptr_set(&(evl->head_rts), NULL);
234 #else
235     evl->head_new = NULL;
236     evl->head_rts = NULL;
237 #endif
238     evl->head_free = &(evl->alloc_head[1]);
239     /* initialize the head_old circular list with dummy element */
240     evl->head_old       = evl->alloc_head; /* the first element is a dummy */
241     evl->head_old->next = evl->head_old;
242     evl->head_old->prev = evl->head_old;
243     return TMPI_SUCCESS;
244 }
245
246 void tMPI_Send_env_list_destroy(struct send_envelope_list *evl)
247 {
248 #ifdef USE_SEND_RECV_COPY_BUFFER
249     size_t i;
250     for (i = 0; i < evl->Nalloc; i++)
251     {
252         free(evl->alloc_head[i].cb);
253     }
254 #endif
255     free(evl->alloc_head);
256     evl->alloc_head = NULL;
257 #ifdef TMPI_LOCK_FREE_LISTS
258     tMPI_Atomic_ptr_set(&(evl->head_new), NULL);
259 #else
260     evl->head_new = NULL;
261 #endif
262     evl->head_old = NULL; /* make it crash if used after tMPI_Finalize */
263 }
264
265
266 static struct envelope* tMPI_Send_env_list_fetch_new(struct
267                                                      send_envelope_list *evl)
268 {
269     struct envelope *ret;
270
271     do
272     {
273         /* first check whether any envelopes were returned to sender */
274 #ifdef TMPI_LOCK_FREE_LISTS
275         if ((ret = (struct envelope*)tMPI_Atomic_ptr_get(&(evl->head_rts))))
276 #else
277         if (evl->head_rts)
278 #endif
279         {
280             /* detach the list */
281 #ifdef TMPI_LOCK_FREE_LISTS
282             /* we detach by swapping what we expect the pointer value to be,
283                with NULL. If there were a cross-platform way to atomically
284                swap  without checking, we could do that, too. */
285             while (!tMPI_Atomic_ptr_cas( &(evl->head_rts), ret, NULL ))
286             {
287                 ret = (struct envelope*)tMPI_Atomic_ptr_get(&(evl->head_rts));
288             }
289 #else
290             tMPI_Spinlock_lock( &(evl->lock_rts) );
291             ret           = evl->head_rts;
292             evl->head_rts = NULL;
293             tMPI_Spinlock_unlock( &(evl->lock_rts) );
294 #endif
295             /* now add the items to head_free */
296             while (ret)
297             {
298                 struct envelope *next = ret->next;
299                 ret->next      = evl->head_free;
300                 evl->head_free = ret;
301                 ret            = next;
302             }
303         }
304
305         /* get the last free one off the list */
306         ret = evl->head_free;
307         if (!ret)
308 #ifdef USE_SEND_RECV_COPY_BUFFER
309         {
310             /* There are no free send envelopes, so all we can do is handle
311                incoming requests until we get a free send envelope. */
312 #if defined(TMPI_DEBUG)  || defined(TMPI_WARNINGS)
313             printf("Ran out of send envelopes!!\n");
314             fflush(stdout);
315 #endif
316             tMPI_Wait_process_incoming(tMPI_Get_current());
317         }
318 #else
319         {
320             /* If this happens, it most likely indicates a bug in the
321                calling program. We could fix the situation by waiting,
322                but that would most likely lead to deadlocks - even
323                more difficult to debug than this. */
324             tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_ENVELOPES);
325             return NULL;
326         }
327 #endif
328     }
329     while (!ret);
330
331     evl->head_free = ret->next;
332
333     ret->next  = NULL;
334     ret->prev  = NULL;
335     ret->slist = evl;
336     ret->rlist = NULL;
337
338     /* and return it */
339     return ret;
340 }
341
342 static void tMPI_Send_env_list_return(struct envelope *sev)
343 {
344     struct send_envelope_list *evl = sev->slist;
345
346     sev->next      = evl->head_free;
347     evl->head_free = sev;
348 }
349
350
351 #ifdef USE_SEND_RECV_COPY_BUFFER
352 static void tMPI_Send_env_list_rts(struct envelope *sev)
353 {
354     struct send_envelope_list *evl = sev->slist;
355 #ifdef TMPI_LOCK_FREE_LISTS
356     struct envelope           *sevn;
357
358     do
359     {
360         sevn      = (struct envelope*)tMPI_Atomic_ptr_get(&evl->head_rts);
361         sev->next = sevn;
362         /* the cmpxchg operation is a memory fence, so we shouldn't need
363            to worry about out-of-order evaluation */
364     }
365     while (!tMPI_Atomic_ptr_cas( &(evl->head_rts), sevn, sev ));
366 #else
367     tMPI_Spinlock_lock( &(evl->lock_rts) );
368     ev->next      = (struct envelope*)evl->head_rts;
369     evl->head_rts = sev;
370     tMPI_Spinlock_unlock( &(evl->lock_rts) );
371 #endif
372 }
373 #endif
374
375
376
377 static void tMPI_Send_env_list_remove_old(struct envelope *sev)
378 {
379     /* pretty straighforward because it isn't a shared list */
380     if (sev->next)
381     {
382         sev->next->prev = sev->prev;
383     }
384     if (sev->prev)
385     {
386         sev->prev->next = sev->next;
387     }
388     sev->prev = NULL;
389     sev->next = NULL;
390 }
391
392
393 static void tMPI_Send_env_list_add_new(struct tmpi_thread        *cur,
394                                        struct send_envelope_list *evl,
395                                        struct envelope           *sev)
396 {
397 #ifdef TMPI_LOCK_FREE_LISTS
398     struct envelope *evl_head_new_orig;
399 #endif
400     sev->prev = NULL;
401
402 #ifdef TMPI_LOCK_FREE_LISTS
403     /* behold our lock-free shared linked list:
404        (it's actually quite simple because we only do operations at the head
405         of the list, either adding them - such as here - or detaching the whole
406         list) */
407     do
408     {
409         /* read the old head atomically */
410         evl_head_new_orig = (struct envelope*) tMPI_Atomic_ptr_get(
411                     &(evl->head_new) );
412         /* set our envelope to have that as its next */
413         sev->next = evl_head_new_orig;
414         /* do the compare-and-swap.
415            this operation is a memory fence, so we shouldn't need
416            to worry about out-of-order stores. If it returns false,
417            somebody else got there before us: */
418     }
419     while (!tMPI_Atomic_ptr_cas(&(evl->head_new), evl_head_new_orig, sev));
420
421 #else
422     tMPI_Spinlock_lock( &(evl->lock_new) );
423     /* we add to the start of the list */
424     sev->next = (struct send_envelope*)evl->head_new;
425     /* actually attach it to the list */
426     evl->head_new = sev;
427     tMPI_Spinlock_unlock( &(evl->lock_new) );
428 #endif
429
430 #if defined(TMPI_PROFILE)
431     tMPI_Profile_wait_start(cur);
432 #endif
433     /* signal to the thread that there is a new envelope */
434     tMPI_Event_signal( &(sev->dest->p2p_event) );
435 #if defined(TMPI_PROFILE)
436     tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p_signal);
437 #endif
438 }
439
440 static void tMPI_Send_env_list_move_to_old(struct envelope *sev)
441 {
442     struct send_envelope_list *evl = sev->slist;
443
444     /* remove from old list. We assume the list has been detached! */
445     if (sev->next)
446     {
447         sev->next->prev = sev->prev;
448     }
449     if (sev->prev)
450     {
451         sev->prev->next = sev->next;
452     }
453
454     /* we add to the end of the list */
455     sev->next = evl->head_old;
456     sev->prev = evl->head_old->prev;
457
458     sev->next->prev = sev;
459     sev->prev->next = sev;
460 }
461
462
463
464
465
466
467
468
469 /* tmpi_recv_envelope_list functions */
470
471 int tMPI_Recv_env_list_init(struct recv_envelope_list *evl)
472 {
473     evl->head       = &(evl->dummy);
474     evl->head->prev = evl->head;
475     evl->head->next = evl->head;
476
477     return TMPI_SUCCESS;
478 }
479
480 void tMPI_Recv_env_list_destroy(struct recv_envelope_list *evl)
481 {
482     evl->head = NULL;
483 }
484
485 static void tMPI_Recv_env_list_add(struct recv_envelope_list *evl,
486                                    struct envelope           *rev)
487 {
488     rev->rlist = evl;
489     /* we add to the end of the list */
490     rev->next = evl->head;
491     rev->prev = evl->head->prev;
492
493     rev->next->prev = rev;
494     rev->prev->next = rev;
495 }
496
497 static void tMPI_Recv_env_list_remove(struct envelope *rev)
498 {
499     if (rev->next)
500     {
501         rev->next->prev = rev->prev;
502     }
503     if (rev->prev)
504     {
505         rev->prev->next = rev->next;
506     }
507     rev->prev  = NULL;
508     rev->next  = NULL;
509     rev->rlist = NULL;
510 }
511
512
513
514
515
516
517
518
519 /* tmpi_req functions */
520
521 int tMPI_Req_list_init(struct req_list *rl, int N_reqs)
522 {
523     int i;
524
525     rl->alloc_head = (struct tmpi_req_*)tMPI_Malloc(
526                 sizeof(struct tmpi_req_)*N_reqs);
527     if (rl->alloc_head == 0)
528     {
529         return TMPI_ERR_NO_MEM;
530     }
531     rl->head = rl->alloc_head;
532     for (i = 0; i < N_reqs; i++)
533     {
534         if (i == 0)
535         {
536             rl->head[i].prev = NULL;
537         }
538         else
539         {
540             rl->head[i].prev = &(rl->head[i-1]);
541         }
542
543         if (i >= (N_reqs-1))
544         {
545             rl->head[i].next = NULL;
546         }
547         else
548         {
549             rl->head[i].next = &(rl->head[i+1]);
550         }
551     }
552     return TMPI_SUCCESS;
553 }
554
555 void tMPI_Req_list_destroy(struct req_list *rl)
556 {
557     free(rl->alloc_head);
558     rl->head       = NULL;
559     rl->alloc_head = NULL;
560 }
561
562
563
564 struct tmpi_req_ *tMPI_Get_req(struct req_list *rl)
565 {
566     struct tmpi_req_ *req = rl->head;
567
568
569     /* we don't need locks here because requests are a per-thread property */
570     if (!req)
571     {
572         /* this could be fixed */
573         tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_REQUESTS);
574         return NULL;
575     }
576     rl->head  = req->next;
577     req->next = NULL;
578
579     return req;
580 }
581
582 void tMPI_Return_req(struct req_list *rl, struct tmpi_req_ *req)
583 {
584     req->next = rl->head;
585     req->prev = NULL;
586     rl->head  = req;
587 }
588
589
590
591 void tMPI_Req_init(struct tmpi_req_ *rq, struct envelope *ev)
592 {
593     rq->ev       = ev;
594     rq->finished = FALSE;
595     rq->next     = rq;
596     rq->prev     = rq;
597
598     rq->source      = ev->src;
599     rq->comm        = ev->comm;
600     rq->tag         = TMPI_ANY_TAG;
601     rq->error       = TMPI_SUCCESS;
602     rq->transferred = 0;
603     rq->cancelled   = FALSE;
604 }
605
606
607
608
609
610
611 /* Point-to-point communication protocol functions */
612
613
614
615 void tMPI_Set_req(struct envelope *ev, struct tmpi_req_ *req)
616 {
617     req->source = ev->src;
618     req->comm   = ev->comm;
619     req->tag    = ev->tag;
620     req->error  = ev->error;
621     if (ev->send)
622     {
623         if (tMPI_Atomic_get(&(ev->state)) > env_unmatched)
624         {
625             req->transferred = ev->bufsize;
626         }
627         else
628         {
629             req->transferred = 0;
630         }
631     }
632     else
633     {
634         if (tMPI_Atomic_get(&(ev->state)) == env_finished)
635         {
636             req->transferred = ev->bufsize;
637         }
638         else
639         {
640             req->transferred = 0;
641         }
642     }
643 }
644
645 void tMPI_Set_status(struct tmpi_req_ *req, tMPI_Status *st)
646 {
647     if (st)
648     {
649         st->TMPI_SOURCE = tMPI_Comm_seek_rank(req->comm, req->source);
650         st->TMPI_TAG    = req->tag;
651         st->TMPI_ERROR  = req->error;
652         st->transferred = req->transferred;
653         st->cancelled   = req->cancelled;
654     }
655 }
656
657
658 tmpi_bool tMPI_Envelope_matches(const struct envelope *sev,
659                                 const struct envelope *rev)
660 {
661 #ifdef TMPI_DEBUG
662     printf("%5d: tMPI_Envelope_matches (%d->%d)==(%d->%d),  tag=(%d==%d),       \n       datatype=(%ld==%ld), comm=(%ld,%ld),\n              finished=(%d==%d)\n",
663            tMPI_This_threadnr(),
664            tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
665            tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
666            (int)(sev->tag), (int)(rev->tag),
667            (long int)sev->datatype, (long int)rev->datatype,
668            (long int)sev->comm, (long int)rev->comm,
669            (int)sev->state.value, (int)rev->state.value);
670     fflush(stdout);
671 #endif
672     if ( ( (rev->tag == TMPI_ANY_TAG) || (rev->tag == sev->tag) ) &&
673          ( sev->comm == rev->comm ) &&
674          ( (!rev->src)  || (rev->src == sev->src) ) &&
675          ( sev->dest == rev->dest ) &&
676          ( sev->datatype == rev->datatype ) &&
677          ( tMPI_Atomic_get(&(sev->state)) < env_finished  &&
678            tMPI_Atomic_get(&(rev->state)) == env_unmatched ) )
679     {
680 #ifdef TMPI_DEBUG
681         printf("%5d: (%d->%d) tag=%d found match\n",
682                tMPI_This_threadnr(),
683                tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
684                (int)(sev->tag));
685         fflush(stdout);
686 #endif
687         return TRUE;
688     }
689     return FALSE;
690 }
691
692
693
694
695 struct envelope* tMPI_Send_env_list_search_old(struct send_envelope_list *evl,
696                                                struct envelope           *rev)
697 {
698     struct envelope *sev;
699
700     sev = (struct envelope*)evl->head_old->next;
701     while (sev != evl->head_old)
702     {
703         if (tMPI_Envelope_matches(sev, rev))
704         {
705             /* remove the envelope */
706             tMPI_Send_env_list_remove_old(sev);
707             return sev;
708         }
709         sev = sev->next;
710     }
711     return NULL;
712 }
713
714
715 struct envelope* tMPI_Recv_env_list_search_new(struct recv_envelope_list *evl,
716                                                struct envelope           *sev)
717 {
718     struct envelope *rev;
719
720     rev = evl->head->next;
721     while (rev != evl->head)
722     {
723         if (tMPI_Envelope_matches(sev, rev))
724         {
725             return rev;
726         }
727         rev = rev->next;
728     }
729     return NULL;
730 }
731
732
733 #ifdef USE_SEND_RECV_COPY_BUFFER
734 void tMPI_Send_copy_buffer(struct envelope *sev, struct tmpi_req_ *req)
735 {
736     int state;
737     /* Fill copy buffer, after having anounced its possible use */
738
739     /* in the special case of a zero buffer size, we don't do anything and
740        always let the receiver handle it */
741     if (sev->bufsize == 0)
742     {
743         return;
744     }
745
746     /* first check whether the other side hasn't started yet */
747     state = tMPI_Atomic_get( &(sev->state) );
748     tMPI_Atomic_memory_barrier_acq();
749     if (state == env_unmatched)
750     {
751         /* first copy */
752         memcpy(sev->cb, sev->buf, sev->bufsize);
753         /* now set state, if other side hasn't started copying yet. */
754         tMPI_Atomic_memory_barrier_rel();
755         if (tMPI_Atomic_cas( &(sev->state), env_unmatched, env_cb_available))
756         {
757             /* if it was originally unmatched, the receiver wasn't
758                copying the old buffer. We can don't need to wait,
759                and the receiver is going to clean up this envelope. */
760 #ifdef TMPI_DEBUG
761             printf("%5d: tMPI_Send_copy_buffer(%d->%d, tag=%d) completed\n",
762                    tMPI_This_threadnr(),
763                    tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
764                    (int)(sev->tag));
765             fflush(stdout);
766 #endif
767             return;
768         }
769     }
770     /* and if we reached this point, the receiver had already started
771        copying, and we need to clean up the envelope ourselves.
772
773        we first need to wait until the receiver is finished copying. We
774        know this is a short wait (since the buffer was small enough to be
775        buffered in the first place), so we just spin-wait.  */
776     tMPI_Atomic_memory_barrier(); /* a full barrier to make sure that the
777                                      sending doesn't interfere with the
778                                      waiting */
779     while (tMPI_Atomic_get( &(sev->state) ) < env_cb_available)
780     {
781         tMPI_Atomic_memory_barrier_acq();
782     }
783     tMPI_Atomic_memory_barrier_acq();
784 #ifdef TMPI_DEBUG
785     printf("%5d: tMPI_Send_copy_buffer(%d->%d, tag=%d) waiting-completed\n",
786            tMPI_This_threadnr(),
787            tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest), (int)(sev->tag));
788     fflush(stdout);
789 #endif
790     tMPI_Set_req(sev, req);
791     /* and now we clean up */
792     tMPI_Send_env_list_return(sev);
793 }
794 #endif
795
796
797 struct envelope* tMPI_Prep_send_envelope(struct send_envelope_list *evl,
798                                          tMPI_Comm comm,
799                                          struct tmpi_thread *src,
800                                          struct tmpi_thread *dest,
801                                          void *buf, int count,
802                                          tMPI_Datatype datatype,
803                                          int tag, tmpi_bool nonblock)
804 {
805     /* get an envelope from the send-envelope stack */
806     struct envelope *ev = tMPI_Send_env_list_fetch_new( evl );
807     if (ev == NULL)
808     {
809         return NULL;
810     }
811
812     ev->tag      = tag;
813     ev->nonblock = nonblock;
814
815     ev->comm = comm;
816
817     ev->src  = src;
818     ev->dest = dest;
819
820     ev->buf      = buf;
821     ev->bufsize  = count*datatype->size;
822     ev->datatype = datatype;
823
824     ev->send = TRUE;
825
826     ev->rlist = NULL;
827
828 #ifdef USE_SEND_RECV_COPY_BUFFER
829     /* check whether we'll be double buffering */
830     ev->using_cb = (ev->bufsize < COPY_BUFFER_SIZE);
831     /* but don't do anything yet */
832 #endif
833
834     tMPI_Atomic_set(&(ev->state), env_unmatched);
835
836     ev->error = TMPI_SUCCESS;
837     if (count < 0)
838     {
839         tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
840         ev->error = TMPI_ERR_XFER_BUFSIZE;
841     }
842
843     return ev;
844 }
845
846 struct envelope* tMPI_Prep_recv_envelope(struct tmpi_thread *cur,
847                                          tMPI_Comm comm,
848                                          struct tmpi_thread *src,
849                                          struct tmpi_thread *dest,
850                                          void *buf, int count,
851                                          tMPI_Datatype datatype, int tag,
852                                          tmpi_bool nonblock)
853 {
854     /* get an envelope from the stack */
855     struct envelope *ev = tMPI_Free_env_list_fetch_recv( &(cur->envelopes) );
856     if (ev == NULL)
857     {
858         return NULL;
859     }
860
861     ev->tag      = tag;
862     ev->nonblock = nonblock;
863
864     ev->comm = comm;
865
866     ev->src  = src;
867     ev->dest = dest;
868
869     ev->buf      = buf;
870     ev->bufsize  = count*datatype->size;
871     ev->datatype = datatype;
872
873     ev->send = FALSE;
874
875     ev->slist = NULL;
876     ev->rlist = NULL;
877
878     tMPI_Atomic_set(&(ev->state), env_unmatched);
879
880     ev->error = TMPI_SUCCESS;
881     if (count < 0)
882     {
883         tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
884         ev->error = TMPI_ERR_XFER_BUFSIZE;
885     }
886
887     return ev;
888 }
889
890
891
892
893
894
895
896
897 static void tMPI_Xfer(struct tmpi_thread *cur, struct envelope *sev,
898                       struct envelope *rev)
899 {
900 #ifdef USE_SEND_RECV_COPY_BUFFER
901     /* we remove the sender's envelope only if we do the transfer, which
902        we always do if the buffer size = 0 */
903     tmpi_bool remove_sender = (sev->bufsize == 0);
904 #endif
905 #ifdef TMPI_DEBUG
906     printf("%5d: tMPI_Xfer (%d->%d, tag=%d) started\n",
907            tMPI_This_threadnr(),
908            tMPI_Threadnr(sev->src), tMPI_Threadnr(rev->dest), (int)(sev->tag));
909     fflush(stdout);
910 #endif
911     /* first set data on the receiving end so status can be updated */
912     rev->src = sev->src;
913     rev->tag = sev->tag;
914
915     if (sev->bufsize)          /* do the actual transfer */
916     {
917         void *sbuf = sev->buf; /* source buffer */
918         if (sev->bufsize > rev->bufsize)
919         {
920             tMPI_Error((rev->comm), TMPI_ERR_XFER_BUFSIZE);
921             tMPI_Atomic_set(&(rev->state), env_finished);
922             tMPI_Atomic_set(&(sev->state), env_finished);
923             rev->error = TMPI_ERR_XFER_BUFSIZE;
924             sev->error = TMPI_ERR_XFER_BUFSIZE;
925             return;
926         }
927
928 #ifdef USE_SEND_RECV_COPY_BUFFER
929         if (sev->using_cb)
930         {
931             /* check if the other side has already finished copying */
932             if (!tMPI_Atomic_cas( &(sev->state), env_unmatched, env_copying))
933             {
934                 /* it has, and we're copying from the new buffer.
935                    We're now also tasked with removing the envelope */
936                 sbuf          = sev->cb;
937                 remove_sender = TRUE;
938 #ifdef TMPI_PROFILE
939                 tMPI_Profile_count_buffered_p2p_xfer(cur);
940 #endif
941             }
942         }
943 #endif
944
945         if (!rev->buf || !sev->buf)
946         {
947             tMPI_Error((rev->comm), TMPI_ERR_BUF);
948             tMPI_Atomic_set(&(rev->state), env_finished);
949             tMPI_Atomic_set(&(sev->state), env_finished);
950             rev->error = TMPI_ERR_BUF;
951             sev->error = TMPI_ERR_BUF;
952             return;
953         }
954         memcpy(rev->buf, sbuf, sev->bufsize);
955 #ifdef TMPI_PROFILE
956         tMPI_Profile_count_p2p_xfer(cur);
957 #endif
958         /* for status update */
959     }
960     rev->bufsize = sev->bufsize;
961     /* and mark that we're finished */
962 #if defined(TMPI_PROFILE)
963     {
964         tMPI_Profile_wait_start(cur);
965 #endif
966     tMPI_Atomic_set( &(rev->state), env_finished);
967     tMPI_Atomic_set( &(sev->state), env_finished);
968
969     /* signal to a potentially waiting thread that we're done. */
970     tMPI_Atomic_fetch_add( &(rev->src->ev_outgoing_received), 1);
971     tMPI_Event_signal(&(rev->src->p2p_event));
972
973     /* remove the receiving envelope if it's in a list */
974     tMPI_Recv_env_list_remove(rev);
975 #ifdef USE_SEND_RECV_COPY_BUFFER
976     if (remove_sender)
977     {
978         tMPI_Send_env_list_rts(sev);
979     }
980 #endif
981 #if defined(TMPI_PROFILE)
982     tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p_signal);
983 }
984 #endif
985
986
987 #ifdef TMPI_DEBUG
988     printf("%5d: tMPI_Xfer (%d->%d, tag=%d) done\n",
989            tMPI_This_threadnr(),
990            tMPI_Threadnr(sev->src), tMPI_Threadnr(rev->dest), (int)(sev->tag));
991     fflush(stdout);
992 #endif
993     return;
994 }
995
996
997
998
999
1000
1001 struct envelope* tMPI_Post_match_recv(struct tmpi_thread *cur,
1002                                       tMPI_Comm comm,
1003                                       struct tmpi_thread *src,
1004                                       void *recv_buf, int recv_count,
1005                                       tMPI_Datatype datatype,
1006                                       int tag, tmpi_bool nonblock)
1007 {
1008     struct tmpi_thread *dest = cur;
1009     struct envelope    *rev;
1010     struct envelope    *sev          = NULL;
1011     int                 src_threadnr = src ? tMPI_Threadnr(src) : Nthreads;
1012     int                 i;
1013
1014     /* reserve an envelope to post */
1015     rev = tMPI_Prep_recv_envelope(cur, comm, src, dest, recv_buf, recv_count,
1016                                   datatype, tag, nonblock);
1017     if (rev == NULL)
1018     {
1019         return NULL;
1020     }
1021
1022 #ifdef TMPI_DEBUG
1023     printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) started\n",
1024            tMPI_This_threadnr(),
1025            tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest), (int)(rev->tag));
1026     fflush(stdout);
1027 #endif
1028     /* we now check the entire exisiting send queue */
1029     if (src)
1030     {
1031         sev = tMPI_Send_env_list_search_old( &(dest->evs[src_threadnr]), rev);
1032     }
1033     else
1034     {
1035         /* if we don't know the source, we look at all possible sources */
1036         for (i = 0; i < Nthreads; i++)
1037         {
1038             sev = tMPI_Send_env_list_search_old(&(dest->evs[i]), rev);
1039             if (sev)
1040             {
1041                 break;
1042             }
1043         }
1044     }
1045
1046     if (sev)
1047     {
1048 #ifdef TMPI_DEBUG
1049         printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) found match\n",
1050                tMPI_This_threadnr(),
1051                tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
1052                (int)(rev->tag));
1053         fflush(stdout);
1054 #endif
1055         /* we found a matching send */
1056         tMPI_Xfer(cur, sev, rev);
1057     }
1058     else
1059     {
1060 #ifdef TMPI_DEBUG
1061         printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) no match\n",
1062                tMPI_This_threadnr(),
1063                tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
1064                (int)(rev->tag));
1065         fflush(stdout);
1066 #endif
1067         /* we post the envelope in the right list */
1068         tMPI_Recv_env_list_add( &(dest->evr), rev);
1069     }
1070     return rev;
1071 }
1072
1073
1074
1075
1076 struct envelope *tMPI_Post_send(struct tmpi_thread *cur,
1077                                 tMPI_Comm comm,
1078                                 struct tmpi_thread *dest,
1079                                 void *send_buf, int send_count,
1080                                 tMPI_Datatype datatype, int tag,
1081                                 tmpi_bool nonblock)
1082 {
1083     struct tmpi_thread        *src = cur;
1084     struct envelope           *sev;
1085     int                        src_threadnr = tMPI_Threadnr(src);
1086     struct send_envelope_list *sevl         = &(dest->evs[src_threadnr]);
1087
1088     /* reserve an envelope to post */
1089     sev = tMPI_Prep_send_envelope(sevl, comm, src, dest, send_buf, send_count,
1090                                   datatype, tag, nonblock);
1091     if (sev == NULL)
1092     {
1093         return NULL;
1094     }
1095
1096 #ifdef TMPI_DEBUG
1097     printf("%5d: tMPI_Post_send (%d->%d, tag=%d)\n",
1098            tMPI_This_threadnr(),
1099            tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
1100            (int)(sev->tag));
1101     fflush(stdout);
1102 #endif
1103     /* we post the envelope in the right list */
1104     tMPI_Send_env_list_add_new(cur, &(dest->evs[src_threadnr]), sev);
1105
1106     return sev;
1107 }
1108
1109
1110
1111
1112 void tMPI_Wait_process_incoming(struct tmpi_thread *cur)
1113 {
1114     int i;
1115     int check_id;
1116     int n_handled = 0;
1117
1118
1119 #if defined(TMPI_PROFILE)
1120     tMPI_Profile_wait_start(cur);
1121 #endif
1122     /* we check for newly arrived send envelopes and finished send
1123        envelopes */
1124     check_id = tMPI_Event_wait( &(cur->p2p_event));
1125     /* the outgoing_received items are handled 'automatically'
1126        by the function calling this function */
1127 #if defined(TMPI_PROFILE)
1128     tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p);
1129 #endif
1130     n_handled = tMPI_Atomic_get(&(cur->ev_outgoing_received));
1131     tMPI_Atomic_fetch_add( &(cur->ev_outgoing_received), -n_handled);
1132     check_id -= n_handled;
1133
1134     if (check_id > 0)
1135     {
1136         /*int repl=check_id;*/
1137         /*int n=0;*/
1138         /* there were new send envelopes. Let's check them all */
1139         for (i = 0; i < Nthreads; i++)
1140         {
1141             struct envelope *sev_head;
1142
1143 #ifdef TMPI_LOCK_FREE_LISTS
1144             /* Behold our lock-free shared linked list:
1145                (see tMPI_Send_env_list_add_new for more info) */
1146             do
1147             {
1148                 /* read old head atomically */
1149                 sev_head = (struct envelope*)
1150                     tMPI_Atomic_ptr_get( &(cur->evs[i].head_new) );
1151                 /* do the compare-and-swap to detach the list */
1152             }
1153             while (!tMPI_Atomic_ptr_cas(&(cur->evs[i].head_new), sev_head,
1154                                         NULL));
1155 #else
1156             tMPI_Spinlock_lock( &(cur->evs[i].lock_new) );
1157             sev_head             = (struct send_envelope*)cur->evs[i].head_new;
1158             cur->evs[i].head_new = NULL; /* detach the list */
1159             tMPI_Spinlock_unlock( &(cur->evs[i].lock_new) );
1160 #endif
1161
1162             if (sev_head) /* there's a newly arrived send envelope from this
1163                              thread*/
1164             {
1165                 struct envelope *sev    = sev_head;
1166                 struct envelope *prev_s = NULL;
1167                 struct envelope *rev;
1168
1169                 /* first enable reversing order by creating a regular
1170                    doubly-linked list from the singly-linked shared
1171                    linked list */
1172                 while (sev)
1173                 {
1174                     sev->prev = prev_s;
1175                     prev_s    = sev;
1176                     sev       = sev->next;
1177                 }
1178                 /* now walk through it backwards (in order of addition) */
1179                 sev = prev_s;
1180                 while (sev)
1181                 {
1182                     struct envelope *sevp = sev->prev;
1183                     n_handled++;
1184                     rev = tMPI_Recv_env_list_search_new(&(cur->evr), sev);
1185                     if (rev)
1186                     {
1187                         tMPI_Xfer(cur, sev, rev);
1188                     }
1189                     else
1190                     {
1191                         tMPI_Send_env_list_move_to_old( sev );
1192                     }
1193                     sev = sevp;
1194                 }
1195             }
1196         }
1197     }
1198     tMPI_Event_process( &(cur->p2p_event), n_handled);
1199 }
1200
1201 tmpi_bool tMPI_Test_single(struct tmpi_thread *cur, struct tmpi_req_ *rq)
1202 {
1203     struct envelope *ev = rq->ev;
1204
1205     if (ev && !(rq->finished) )
1206     {
1207 #ifdef USE_SEND_RECV_COPY_BUFFER
1208         if (ev->send && ev->using_cb)
1209         {
1210             /* We buffer-copy. Just do the transfer to the buffer and
1211                return saying that we're done. It's now up to the
1212                receiver to return our envelope.*/
1213             /* do our transfer and are guaranteed a finished
1214                envelope. */
1215             tMPI_Send_copy_buffer(ev, rq);
1216             /* get the results */
1217             rq->error    = rq->ev->error;
1218             rq->finished = TRUE;
1219         }
1220         else
1221 #endif
1222         {
1223             if (tMPI_Atomic_get( &(ev->state) ) >= env_finished)
1224             {
1225                 rq->finished = TRUE;
1226                 /* get the results */
1227                 rq->error = rq->ev->error;
1228                 tMPI_Set_req(ev, rq);
1229                 /* and release the envelope. After this point, the envelope
1230                    may be reused, so its contents shouldn't be relied on. */
1231                 if (ev->send)
1232                 {
1233                     tMPI_Send_env_list_return(ev);
1234                 }
1235                 else
1236                 {
1237                     tMPI_Free_env_list_return_recv( &(cur->envelopes), ev);
1238                 }
1239             }
1240         }
1241     }
1242     return rq->finished;
1243 }
1244
1245 void tMPI_Wait_single(struct tmpi_thread *cur, struct tmpi_req_ *rq)
1246 {
1247     do
1248     {
1249         if (tMPI_Test_single(cur, rq))
1250         {
1251             return;
1252         }
1253         tMPI_Wait_process_incoming(cur);
1254     }
1255     while (TRUE);
1256 }
1257
1258 tmpi_bool tMPI_Test_multi(struct tmpi_thread *cur, struct tmpi_req_ *rqs,
1259                           tmpi_bool *any_done)
1260 {
1261     tmpi_bool         all_done = TRUE;
1262     struct tmpi_req_ *creq     = rqs;
1263
1264     int               i = 0;
1265     if (any_done)
1266     {
1267         *any_done = FALSE;
1268     }
1269
1270     while (creq)
1271     {
1272         tmpi_bool finished = tMPI_Test_single(cur, creq);
1273         i++;
1274
1275         /* now do the check */
1276         if (!finished)
1277         {
1278             all_done = FALSE;
1279         }
1280         else
1281         {
1282             /* remove the request from the list we've been given. */
1283             if (creq->prev)
1284             {
1285                 creq->prev->next = creq->next;
1286             }
1287             if (creq->next)
1288             {
1289                 creq->next->prev = creq->prev;
1290             }
1291             if (any_done)
1292             {
1293                 *any_done = TRUE;
1294             }
1295         }
1296
1297         creq = creq->next;
1298     }
1299
1300     return all_done;
1301 }