074d74aa205a3025a0eeedf8fc5dc8e26dbc0baf
[alexxy/gromacs.git] / src / gmxlib / thread_mpi / p2p_protocol.h
1 /*
2 This source code file is part of thread_mpi.  
3 Written by Sander Pronk, Erik Lindahl, and possibly others. 
4
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6 All rights reserved.
7
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11    notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13    notice, this list of conditions and the following disclaimer in the
14    documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16    names of its contributors may be used to endorse or promote products
17    derived from this software without specific prior written permission.
18
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
35 files.
36 */
37
38 /* this file is included from p2p.c; it's not really a header file,
39    but this defines a lot of functions that probably need to be inlined.*/
40
41 /* Point-to-point communication protocol functions */
42
43
44 void tMPI_Free_env_list_init(struct free_envelope_list *evl, int N)
45 {
46     int i;
47
48     /* allocate the head element */
49     evl->recv_alloc_head=(struct envelope*)tMPI_Malloc(sizeof(struct envelope)
50                                                        *N);
51     evl->head_recv=evl->recv_alloc_head;
52
53     for(i=0;i<N;i++)
54     {
55         if (i < N-1)
56         {
57             evl->head_recv[i].next=&(evl->head_recv[i+1]);
58         }
59         else
60         {
61             evl->head_recv[i].next=NULL;
62         }
63         evl->head_recv[i].rlist=NULL;
64         evl->head_recv[i].slist=NULL;
65     }
66 }
67
68 void tMPI_Free_env_list_destroy(struct free_envelope_list *evl)
69 {
70     free(evl->recv_alloc_head);
71     evl->head_recv=NULL;
72     evl->recv_alloc_head=NULL;
73 }
74
75 static struct envelope* tMPI_Free_env_list_fetch_recv(struct 
76                                                       free_envelope_list *evl)
77 {
78     struct envelope *ret;
79     if (! evl->head_recv )
80     {
81         /* TODO: make this do something better than crash */
82         fprintf(stderr, "Ran out of recv envelopes!!!!\n");
83         abort();
84     }
85
86     ret=evl->head_recv;
87     evl->head_recv=ret->next;
88     ret->next=NULL;
89     ret->prev=NULL;
90     /*evl->N--;*/
91
92     return ret;
93 }
94
95 static void tMPI_Free_env_list_return_recv(struct free_envelope_list *evl,
96                                            struct envelope *rev)
97 {
98     rev->rlist=NULL;
99     rev->slist=NULL;
100     rev->prev=NULL;
101     rev->next=evl->head_recv;
102     evl->head_recv=rev;
103 }
104
105
106
107
108
109
110
111
112
113
114 /* tmpi_send_envelope_list functions */
115
116 void tMPI_Send_env_list_init(struct send_envelope_list *evl, int N)
117 {
118     int i;
119 #ifndef TMPI_LOCK_FREE_LISTS
120     tMPI_Spinlock_init( &(evl->lock_rts) );
121     tMPI_Spinlock_init( &(evl->lock_new) );
122 #endif
123     evl->Nalloc=N;
124
125     evl->alloc_head=(struct envelope*)tMPI_Malloc(sizeof(struct envelope)*N);
126     for(i=0;i<N;i++) 
127     { 
128         evl->alloc_head[i].next=(i<(N-1)) ? &(evl->alloc_head[i+1]) : NULL;
129         evl->alloc_head[i].prev=NULL;
130         evl->alloc_head[i].slist=evl;
131         evl->alloc_head[i].rlist=NULL;
132 #ifdef USE_SEND_RECV_COPY_BUFFER
133         evl->alloc_head[i].cb=(void*)tMPI_Malloc(sizeof(char)*COPY_BUFFER_SIZE);
134 #endif
135     }
136  
137 #ifdef TMPI_LOCK_FREE_LISTS
138     tMPI_Atomic_ptr_set(&(evl->head_new), NULL);
139     tMPI_Atomic_ptr_set(&(evl->head_rts), NULL);
140 #else
141     evl->head_new = NULL;
142     evl->head_rts = NULL;
143 #endif
144     evl->head_free = &(evl->alloc_head[1]);
145     /* initialize the head_old circular list with dummy element */
146     evl->head_old = evl->alloc_head; /* the first element is a dummy */
147     evl->head_old->next = evl->head_old;
148     evl->head_old->prev = evl->head_old;
149 }
150
151 void tMPI_Send_env_list_destroy(struct send_envelope_list *evl)
152 {
153 #ifdef USE_SEND_RECV_COPY_BUFFER
154     size_t i;
155     for(i=0;i<evl->Nalloc;i++)
156     {
157         free(evl->alloc_head[i].cb);
158     }
159 #endif
160     free(evl->alloc_head);
161     evl->alloc_head=NULL;
162 #ifdef TMPI_LOCK_FREE_LISTS
163     tMPI_Atomic_ptr_set(&(evl->head_new), NULL);
164 #else
165     evl->head_new=NULL; 
166 #endif
167     evl->head_old=NULL; /* make it crash if used after tMPI_Finalize */
168 }
169
170
171 static struct envelope* 
172 tMPI_Send_env_list_fetch_new(struct send_envelope_list *evl)
173 {
174     struct envelope *ret;
175
176     do
177     {
178         /* first check whether any envelopes were returned to sender */
179 #ifdef TMPI_LOCK_FREE_LISTS
180         if ((ret=(struct envelope*)tMPI_Atomic_ptr_get(&(evl->head_rts))))
181 #else
182         if (evl->head_rts)
183 #endif
184         {
185             /* detach the list */
186 #ifdef TMPI_LOCK_FREE_LISTS
187             /* we detach by swapping what we expect the pointer value to be,
188                with NULL. If there were a cross-platform way to atomically 
189                swap  without checking, we could do that, too. */
190             while(tMPI_Atomic_ptr_cas( &(evl->head_rts), ret, NULL ) !=
191                   (void*)ret)
192             {
193                 ret=(struct envelope*)tMPI_Atomic_ptr_get(&(evl->head_rts));
194             }
195 #else
196             tMPI_Spinlock_lock( &(evl->lock_rts) );
197             ret=evl->head_rts;
198             evl->head_rts=NULL;
199             tMPI_Spinlock_unlock( &(evl->lock_rts) );
200 #endif
201             /* now add the items to head_free */
202             while(ret)
203             {
204                 struct envelope *next=ret->next;
205                 ret->next=evl->head_free;
206                 evl->head_free=ret;
207                 ret=next;
208             }
209         }
210
211         /* get the last free one off the list */
212         ret=evl->head_free;
213         if (!ret)
214 #ifdef USE_SEND_RECV_COPY_BUFFER
215         {
216             /* There are no free send envelopes, so all we can do is handle
217                incoming requests until we get a free send envelope. */
218             printf("Ran out of send envelopes!!\n");
219             tMPI_Wait_process_incoming(tMPI_Get_current());
220         }
221 #else
222         {
223             /* If this happens, it most likely indicates a bug in the 
224                calling program. We could fix the situation by waiting,
225                but that would most likely lead to deadlocks - even
226                more difficult to debug than this. */
227             fprintf(stderr, "Ran out of send envelopes!!!!\n");
228             abort();
229         }
230 #endif
231     } while(!ret);
232
233     evl->head_free=ret->next;
234
235     ret->next=NULL;
236     ret->prev=NULL;
237     ret->slist=evl;
238     ret->rlist=NULL;
239
240     /* and return it */
241     return ret;
242 }
243
244 static void tMPI_Send_env_list_return(struct envelope *sev)
245 {
246     struct send_envelope_list *evl=sev->slist;
247
248     sev->next=evl->head_free;
249     evl->head_free=sev;
250 }
251
252
253 #ifdef USE_SEND_RECV_COPY_BUFFER
254 static void tMPI_Send_env_list_rts(struct envelope *sev)
255 {
256     struct send_envelope_list *evl=sev->slist;
257 #ifdef TMPI_LOCK_FREE_LISTS
258     struct envelope *sevn;
259
260     do
261     {
262         sevn=(struct envelope*)tMPI_Atomic_ptr_get(&evl->head_rts);
263         sev->next=sevn;
264         /* the cmpxchg operation is a memory fence, so we shouldn't need
265            to worry about out-of-order evaluation */
266     }
267     while (tMPI_Atomic_ptr_cas( &(evl->head_rts), sevn, sev ) != (void*)sevn);
268 #else
269     tMPI_Spinlock_lock( &(evl->lock_rts) );
270     ev->next=(struct envelope*)evl->head_rts;
271     evl->head_rts=sev;
272     tMPI_Spinlock_unlock( &(evl->lock_rts) );
273 #endif
274 }
275 #endif
276
277
278
279 static void tMPI_Send_env_list_remove_old(struct envelope *sev)
280 {
281     /* pretty straighforward because it isn't a shared list */
282     if (sev->next)
283         sev->next->prev=sev->prev; 
284     if (sev->prev)
285         sev->prev->next=sev->next; 
286     sev->prev=NULL;
287     sev->next=NULL;
288 }
289
290
291 static void tMPI_Send_env_list_add_new(struct tmpi_thread *cur,
292                                        struct send_envelope_list *evl, 
293                                        struct envelope *sev)
294 {
295 #ifdef TMPI_LOCK_FREE_LISTS
296     struct envelope *evl_head_new_orig;
297     struct envelope *evl_cas;
298 #endif
299     sev->prev=NULL;
300
301 #ifdef TMPI_LOCK_FREE_LISTS
302     /* behold our lock-free shared linked list: 
303        (it's actually quite simple because we only do operations at the head 
304         of the list, either adding them - such as here - or detaching the whole
305         list) */
306     do 
307     {
308         /* read the old head atomically */
309         evl_head_new_orig=(struct envelope*) tMPI_Atomic_ptr_get( 
310                                                         &(evl->head_new) );
311         /* set our envelope to have that as its next */
312         sev->next=evl_head_new_orig;
313         /* do the compare-and-swap. 
314            this operation is a memory fence, so we shouldn't need
315            to worry about out-of-order stores */
316         evl_cas=(struct envelope*)tMPI_Atomic_ptr_cas(&(evl->head_new), 
317                                                       evl_head_new_orig, sev);
318         /* and compare the results: if they aren't the same,
319            somebody else got there before us: */
320     } while (evl_cas != evl_head_new_orig); 
321 #else
322     tMPI_Spinlock_lock( &(evl->lock_new) );
323     /* we add to the start of the list */
324     sev->next=(struct send_envelope*)evl->head_new;
325     /* actually attach it to the list */
326     evl->head_new=sev;
327     tMPI_Spinlock_unlock( &(evl->lock_new) );
328 #endif
329
330 #if defined(TMPI_PROFILE)
331     tMPI_Profile_wait_start(cur);
332 #endif
333     /* signal to the thread that there is a new envelope */
334     tMPI_Event_signal( &(sev->dest->p2p_event) );
335 #if defined(TMPI_PROFILE)
336     tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p_signal);
337 #endif
338 }
339
340 static void tMPI_Send_env_list_move_to_old(struct envelope *sev)
341 {
342     struct send_envelope_list *evl=sev->slist;
343
344     /* remove from old list. We assume the list has been detached! */
345     if (sev->next)
346         sev->next->prev=sev->prev; 
347     if (sev->prev)
348         sev->prev->next=sev->next; 
349
350     /* we add to the end of the list */
351     sev->next=evl->head_old;
352     sev->prev=evl->head_old->prev;
353
354     sev->next->prev=sev;
355     sev->prev->next=sev;
356 }
357
358
359
360
361
362
363
364
365 /* tmpi_recv_envelope_list functions */
366
367 void tMPI_Recv_env_list_init(struct recv_envelope_list *evl)
368 {
369     evl->head = &(evl->dummy);
370     evl->head->prev=evl->head;
371     evl->head->next=evl->head;
372 }
373
374 void tMPI_Recv_env_list_destroy(struct recv_envelope_list *evl)
375 {
376     evl->head=NULL;
377 }
378
379 static void tMPI_Recv_env_list_add(struct recv_envelope_list *evl, 
380                                    struct envelope *rev)
381 {
382     rev->rlist=evl;
383     /* we add to the end of the list */
384     rev->next=evl->head;
385     rev->prev=evl->head->prev;
386
387     rev->next->prev=rev;
388     rev->prev->next=rev;
389 }
390
391 static void tMPI_Recv_env_list_remove(struct envelope *rev)
392 {
393     if (rev->next)
394         rev->next->prev=rev->prev; 
395     if (rev->prev)
396         rev->prev->next=rev->next; 
397     rev->prev=NULL;
398     rev->next=NULL;
399     rev->rlist=NULL;
400 }
401
402
403
404
405
406
407
408
409 /* tmpi_req functions */
410
411 void tMPI_Req_list_init(struct req_list *rl, int N_reqs)
412 {
413     int i;
414
415     rl->alloc_head=(struct tmpi_req_*)tMPI_Malloc(
416                                         sizeof(struct tmpi_req_)*N_reqs);
417     rl->head=rl->alloc_head;
418     for(i=0;i<N_reqs;i++)
419     {
420         if (i==0)
421             rl->head[i].prev=NULL;
422         else
423             rl->head[i].prev=&(rl->head[i-1]);
424
425         if (i >= (N_reqs-1))
426             rl->head[i].next=NULL;
427         else
428             rl->head[i].next=&(rl->head[i+1]);
429     }
430 }
431
432 void tMPI_Req_list_destroy(struct req_list *rl)
433 {
434     free(rl->alloc_head);
435     rl->head=NULL;
436     rl->alloc_head=NULL;
437 }
438
439
440
441 static struct tmpi_req_ *tMPI_Get_req(struct req_list *rl)
442 {
443     struct tmpi_req_ *req=rl->head;
444     
445
446     /* we don't need locks here because requests are a per-thread property */
447     if (!req)
448     {
449         /* this could be fixed */
450         tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_REQUESTS);
451         return NULL;
452     }
453     rl->head=req->next;
454     req->next=NULL;
455
456     return req;
457 }
458
459 static void tMPI_Return_req(struct req_list *rl, struct tmpi_req_ *req)
460 {
461     req->next=rl->head;
462     req->prev=NULL;
463     rl->head=req;
464 }
465
466
467
468 static void tMPI_Req_init(struct tmpi_req_ *rq, struct envelope *ev)
469 {
470     rq->ev=ev;
471     rq->finished=FALSE;
472     rq->next=rq;
473     rq->prev=rq;
474
475     rq->source=ev->src;
476     rq->comm=ev->comm;
477     rq->tag=TMPI_ANY_TAG;
478     rq->error=TMPI_SUCCESS;
479     rq->transferred=0;
480     rq->cancelled=FALSE;
481 }
482
483
484
485
486
487
488 /* Point-to-point communication protocol functions */
489
490
491
492 static void tMPI_Set_req(struct envelope *ev, struct tmpi_req_ *req)
493 {
494     req->source = ev->src;
495     req->comm = ev->comm;
496     req->tag = ev->tag;
497     req->error = ev->error;
498     if (ev->send)
499     {
500         if (tMPI_Atomic_get(&(ev->state))>env_unmatched)
501             req->transferred = ev->bufsize;
502         else
503             req->transferred = 0;
504     }
505     else
506     {
507         if (tMPI_Atomic_get(&(ev->state))==env_finished)
508             req->transferred = ev->bufsize;
509         else
510             req->transferred = 0;
511     }
512 }
513
514 static void tMPI_Set_status(struct tmpi_req_ *req, tMPI_Status *st)
515 {
516     if (st)
517     {
518         st->TMPI_SOURCE = tMPI_Comm_seek_rank(req->comm, req->source);
519         st->TMPI_TAG = req->tag;
520         st->TMPI_ERROR = req->error;
521         st->transferred = req->transferred;
522         st->cancelled = req->cancelled;
523     }
524 }
525
526
527 static bool tMPI_Envelope_matches(const struct envelope *sev,
528                                   const struct envelope *rev)
529 {
530 #ifdef TMPI_DEBUG
531     printf("%5d: tMPI_Envelope_matches (%d->%d)==(%d->%d),  tag=(%d==%d),       \n       datatype=(%ld==%ld), comm=(%ld,%ld),\n              finished=(%d==%d)\n",
532             tMPI_This_threadnr(),
533             tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
534             tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest),
535             (int)(sev->tag), (int)(rev->tag),
536             (long int)sev->datatype, (long int)rev->datatype,
537             (long int)sev->comm, (long int)rev->comm,
538             (int)sev->state.value, (int)rev->state.value);
539     fflush(stdout);
540 #endif
541     if ( ( (rev->tag == TMPI_ANY_TAG) || (rev->tag == sev->tag) ) &&
542             ( sev->comm == rev->comm ) &&
543             ( (!rev->src)  || (rev->src == sev->src) ) &&
544             ( sev->dest == rev->dest ) &&
545             ( sev->datatype == rev->datatype ) &&
546             ( sev->state.value < env_finished  &&
547               rev->state.value == env_unmatched ) )
548     {
549 #ifdef TMPI_DEBUG
550         printf("%5d: (%d->%d) tag=%d found match\n",
551                 tMPI_This_threadnr(),
552                 tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest),
553                 (int)(sev->tag));
554         fflush(stdout);
555 #endif
556         return TRUE;
557     }
558     return FALSE;
559 }
560
561
562
563
564 static struct envelope* 
565 tMPI_Send_env_list_search_old(struct send_envelope_list *evl,
566                               struct envelope *rev)
567 {
568     struct envelope *sev;
569
570     sev=(struct envelope*)evl->head_old->next;
571     while(sev != evl->head_old)
572     {
573         if (tMPI_Envelope_matches(sev, rev))
574         {
575             /* remove the envelope */
576             tMPI_Send_env_list_remove_old(sev);
577             return sev;
578         }
579         sev=sev->next;
580     }
581     return NULL;
582 }
583
584
585 static struct envelope* 
586 tMPI_Recv_env_list_search_new(struct recv_envelope_list *evl,
587                               struct envelope *sev)
588 {
589     struct envelope *rev;
590
591     rev=evl->head->next;
592     while(rev != evl->head)
593     {
594         if (tMPI_Envelope_matches(sev, rev))
595         {
596             return rev;
597         }
598         rev=rev->next;
599     }
600     return NULL;
601 }
602
603
604 #ifdef USE_SEND_RECV_COPY_BUFFER
605 static void tMPI_Send_copy_buffer(struct envelope *sev, struct tmpi_req_ *req)
606 {
607     /* Fill copy buffer, after having anounced its possible use */
608
609     /* in the special case of a zero buffer size, we don't do anything and
610        always let the receiver handle it */
611     if (sev->bufsize==0) 
612         return;
613
614     /* first check whether the other side hasn't started yet */
615     tMPI_Atomic_memory_barrier();
616     if ((tMPI_Atomic_get( &(sev->state) ) == env_unmatched )) 
617     {
618         /* first copy */
619         memcpy(sev->cb, sev->buf, sev->bufsize);
620         /* now set state, if other side hasn't started copying yet. */
621         if (tMPI_Atomic_cas( &(sev->state), env_unmatched, env_cb_available)
622             == env_unmatched)
623         {
624             /* if it was originally unmatched, the receiver wasn't 
625                copying the old buffer. We can don't need to wait,
626                and the receiver is going to clean up this envelope. */
627 #ifdef TMPI_DEBUG
628             printf("%5d: tMPI_Send_copy_buffer(%d->%d, tag=%d) completed\n", 
629                    tMPI_This_threadnr(), 
630                    tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest), 
631                    (int)(sev->tag));
632             fflush(stdout);
633 #endif
634             return;
635         }
636     }
637     /* and if we reached this point, the receiver had already started 
638        copying, and we need to clean up the envelope ourselves. 
639
640        we first need to wait until the receiver is finished copying. We 
641        know this is a short wait (since the buffer was small enough to be
642        buffered in the first place), so we just spin-wait.  */
643     while(tMPI_Atomic_get( &(sev->state) ) < env_cb_available)
644     {
645         tMPI_Atomic_memory_barrier();
646     }
647 #ifdef TMPI_DEBUG
648     printf("%5d: tMPI_Send_copy_buffer(%d->%d, tag=%d) waiting-completed\n", 
649            tMPI_This_threadnr(), 
650            tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest), (int)(sev->tag));
651     fflush(stdout);
652 #endif
653     tMPI_Set_req(sev, req);
654     /* and now we clean up */
655     tMPI_Send_env_list_return(sev);
656 }
657 #endif
658
659
660 static struct envelope* tMPI_Prep_send_envelope(struct send_envelope_list *evl, 
661                         tMPI_Comm comm, struct tmpi_thread *src, 
662                         struct tmpi_thread *dest, void *buf, int count, 
663                         tMPI_Datatype datatype, int tag, bool nonblock)
664 {
665     /* get an envelope from the send-envelope stack */
666     struct envelope *ev=tMPI_Send_env_list_fetch_new( evl );
667
668     ev->tag=tag;
669     ev->nonblock=nonblock;
670
671     ev->comm=comm;
672
673     ev->src=src;
674     ev->dest=dest;
675
676     ev->buf=buf;
677     ev->bufsize=count*datatype->size;
678     ev->datatype=datatype;
679
680     ev->send=TRUE;
681
682     ev->rlist=NULL;
683
684 #ifdef USE_SEND_RECV_COPY_BUFFER
685     /* check whether we'll be double buffering */
686     ev->using_cb=(ev->bufsize < COPY_BUFFER_SIZE);
687     /* but don't do anything yet */
688 #endif
689
690     tMPI_Atomic_set(&(ev->state), env_unmatched);
691
692     ev->error=TMPI_SUCCESS;
693     if (count < 0)
694     {
695         tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
696         ev->error=TMPI_ERR_XFER_BUFSIZE;
697     }
698
699     return ev;
700 }
701
702 static struct envelope* tMPI_Prep_recv_envelope(struct tmpi_thread *cur, 
703                         tMPI_Comm comm, struct tmpi_thread *src, 
704                         struct tmpi_thread *dest, void *buf, int count, 
705                         tMPI_Datatype datatype, int tag, bool nonblock)
706 {
707     /* get an envelope from the stack */
708     struct envelope *ev=tMPI_Free_env_list_fetch_recv( &(cur->envelopes) );
709
710     ev->tag=tag;
711     ev->nonblock=nonblock;
712
713     ev->comm=comm;
714
715     ev->src=src;
716     ev->dest=dest;
717
718     ev->buf=buf;
719     ev->bufsize=count*datatype->size;
720     ev->datatype=datatype;
721     
722     ev->send=FALSE;
723
724     ev->slist=NULL;
725     ev->rlist=NULL;
726
727     tMPI_Atomic_set(&(ev->state), env_unmatched);
728
729     ev->error=TMPI_SUCCESS;
730     if (count < 0)
731     {
732         tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
733         ev->error=TMPI_ERR_XFER_BUFSIZE;
734     }
735
736     return ev;
737 }
738
739
740
741
742
743
744
745
746 static void tMPI_Xfer(struct tmpi_thread *cur, struct envelope *sev, 
747                       struct envelope *rev)
748 {
749 #ifdef USE_SEND_RECV_COPY_BUFFER
750     /* we remove the sender's envelope only if we do the transfer, which 
751        we always do if the buffer size = 0 */ 
752     bool remove_sender = (sev->bufsize==0);
753 #endif
754 #ifdef TMPI_DEBUG
755     printf("%5d: tMPI_Xfer (%d->%d, tag=%d) started\n", 
756             tMPI_This_threadnr(), 
757             tMPI_Threadnr(sev->src), tMPI_Threadnr(rev->dest), (int)(sev->tag));
758     fflush(stdout);
759 #endif
760     /* first set data on the receiving end so status can be updated */
761     rev->src = sev->src;
762     rev->tag = sev->tag;
763
764     if (sev->bufsize) /* do the actual transfer */
765     {
766         void *sbuf=sev->buf; /* source buffer */
767         if (sev->bufsize > rev->bufsize)
768         {
769             tMPI_Error((rev->comm), TMPI_ERR_XFER_BUFSIZE);
770             tMPI_Atomic_set(&(rev->state), env_finished);
771             tMPI_Atomic_set(&(sev->state), env_finished);
772             rev->error = TMPI_ERR_XFER_BUFSIZE;
773             sev->error = TMPI_ERR_XFER_BUFSIZE;
774             return;
775         }
776
777 #ifdef USE_SEND_RECV_COPY_BUFFER
778         if (sev->using_cb)
779         {
780             /* check if the other side has already finished copying */
781             if (tMPI_Atomic_cas( &(sev->state), env_unmatched, env_copying)
782                 != env_unmatched)
783             {
784                 /* it has, and we're copying from the new buffer. 
785                    We're now also tasked with removing the envelope */
786                 sbuf=sev->cb;
787                 remove_sender=TRUE;
788 #ifdef TMPI_PROFILE
789                 tMPI_Profile_count_buffered_p2p_xfer(cur);
790 #endif
791             }
792         }
793 #endif
794
795         if (!rev->buf || !sev->buf)
796         {
797             tMPI_Error((rev->comm), TMPI_ERR_BUF);
798             tMPI_Atomic_set(&(rev->state), env_finished);
799             tMPI_Atomic_set(&(sev->state), env_finished);
800             rev->error = TMPI_ERR_BUF;
801             sev->error = TMPI_ERR_BUF;
802             return;
803         }
804         memcpy(rev->buf, sbuf, sev->bufsize);
805 #ifdef TMPI_PROFILE
806         tMPI_Profile_count_p2p_xfer(cur);
807 #endif
808         /* for status update */
809     }
810     rev->bufsize=sev->bufsize;
811     /* and mark that we're finished */
812 #if defined(TMPI_PROFILE)
813     {
814         tMPI_Profile_wait_start(cur);
815 #endif
816         tMPI_Atomic_set( &(rev->state), env_finished);
817         tMPI_Atomic_set( &(sev->state), env_finished);
818
819         /* signal to a potentially waiting thread that we're done. */
820         tMPI_Atomic_add_return( &(rev->src->ev_outgoing_received), 1); 
821         tMPI_Event_signal(&(rev->src->p2p_event));
822
823         /* remove the receiving envelope if it's in a list */
824         tMPI_Recv_env_list_remove(rev);
825 #ifdef USE_SEND_RECV_COPY_BUFFER
826         if (remove_sender)
827         {
828             tMPI_Send_env_list_rts(sev);
829         }
830 #endif
831 #if defined(TMPI_PROFILE)
832         tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p_signal);
833     }
834 #endif
835  
836
837 #ifdef TMPI_DEBUG
838     printf("%5d: tMPI_Xfer (%d->%d, tag=%d) done\n", 
839             tMPI_This_threadnr(), 
840             tMPI_Threadnr(sev->src), tMPI_Threadnr(rev->dest), (int)(sev->tag));
841     fflush(stdout);
842 #endif
843     return;
844 }
845
846
847
848
849
850
851 static struct envelope* tMPI_Post_match_recv(struct tmpi_thread *cur,
852                                              tMPI_Comm comm, 
853                                              struct tmpi_thread *src, 
854                                              void *recv_buf, int recv_count, 
855                                              tMPI_Datatype datatype, 
856                                              int tag, bool nonblock)
857 {
858     struct tmpi_thread *dest=cur;
859     struct envelope *rev;
860     struct envelope *sev=NULL;
861     int src_threadnr=src ? tMPI_Threadnr(src) : Nthreads;
862     int i;
863
864     /* reserve an envelope to post */
865     rev=tMPI_Prep_recv_envelope(cur, comm, src, dest, recv_buf, recv_count, 
866                                 datatype, tag, nonblock);
867
868 #ifdef TMPI_DEBUG
869     printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) started\n", 
870             tMPI_This_threadnr(), 
871             tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest), (int)(rev->tag));
872     fflush(stdout);
873 #endif
874     /* we now check the entire exisiting send queue */
875     if (src)
876     {
877         sev=tMPI_Send_env_list_search_old( &(dest->evs[src_threadnr]), rev);
878     }
879     else
880     {
881         /* if we don't know the source, we look at all possible sources */
882         for(i=0;i<Nthreads;i++)
883         {
884             sev=tMPI_Send_env_list_search_old(&(dest->evs[i]), rev);
885             if (sev)
886                 break;
887         } 
888     }
889
890     if (sev)
891     {
892 #ifdef TMPI_DEBUG
893         printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) found match\n", 
894                 tMPI_This_threadnr(), 
895                 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest), 
896                 (int)(rev->tag));
897         fflush(stdout);
898 #endif
899         /* we found a matching send */
900         tMPI_Xfer(cur, sev, rev);
901     }
902     else
903     {
904 #ifdef TMPI_DEBUG
905         printf("%5d: tMPI_Post_match_recv (%d->%d, tag=%d) no match\n", 
906                 tMPI_This_threadnr(), 
907                 tMPI_Threadnr(rev->src), tMPI_Threadnr(rev->dest), 
908                 (int)(rev->tag));
909         fflush(stdout);
910 #endif
911         /* we post the envelope in the right list */
912         tMPI_Recv_env_list_add( &(dest->evr), rev);
913     }
914     return rev;
915 }
916
917
918
919
920 static struct envelope *tMPI_Post_send(struct tmpi_thread *cur,
921                                        tMPI_Comm comm, 
922                                        struct tmpi_thread *dest, 
923                                        void *send_buf, int send_count,
924                                        tMPI_Datatype datatype, int tag, 
925                                        bool nonblock)
926 {
927     struct tmpi_thread *src=cur;
928     struct envelope *sev;
929     int src_threadnr=tMPI_Threadnr(src);
930     struct send_envelope_list *sevl=&(dest->evs[src_threadnr]);
931
932     /* reserve an envelope to post */
933     sev=tMPI_Prep_send_envelope(sevl, comm, src, dest, send_buf, send_count, 
934                                 datatype, tag, nonblock);
935
936 #ifdef TMPI_DEBUG
937     printf("%5d: tMPI_Post_send (%d->%d, tag=%d)\n", 
938            tMPI_This_threadnr(), 
939            tMPI_Threadnr(sev->src), tMPI_Threadnr(sev->dest), 
940            (int)(sev->tag));
941     fflush(stdout);
942 #endif
943     /* we post the envelope in the right list */
944     tMPI_Send_env_list_add_new(cur, &(dest->evs[src_threadnr]), sev);
945    
946     return sev;
947 }
948
949
950
951
952 static void tMPI_Wait_process_incoming(struct tmpi_thread *cur)
953 {
954     int i;
955     int check_id;
956     int n_handled=0;
957
958
959 #if defined(TMPI_PROFILE)
960     tMPI_Profile_wait_start(cur);
961 #endif
962     /* we check for newly arrived send envelopes and finished send
963        envelopes */
964     check_id=tMPI_Event_wait( &(cur->p2p_event));
965     /* the outgoing_received items are handled 'automatically' 
966        by the function calling this function */ 
967 #if defined(TMPI_PROFILE)
968     tMPI_Profile_wait_stop(cur, TMPIWAIT_P2p);
969 #endif
970     n_handled = tMPI_Atomic_get(&(cur->ev_outgoing_received));
971     tMPI_Atomic_add_return( &(cur->ev_outgoing_received), -n_handled);
972     check_id -= n_handled;
973
974     if ( check_id > 0)
975     {
976         /*int repl=check_id;*/
977         /*int n=0;*/
978         /* there were new send envelopes. Let's check them all */
979         for(i=0;i<Nthreads;i++)
980         {
981             struct envelope *sev_head;
982
983 #ifdef TMPI_LOCK_FREE_LISTS
984             /* Behold our lock-free shared linked list:
985                (see tMPI_Send_env_list_add_new for more info) */
986             struct envelope *evl_cas;
987
988             do
989             {
990                 /* read old head atomically */
991                 sev_head=(struct envelope*)
992                           tMPI_Atomic_ptr_get( &(cur->evs[i].head_new) );
993                 /* do the compare-and-swap to detach the list */
994                 evl_cas=(struct envelope*)
995                           tMPI_Atomic_ptr_cas(&(cur->evs[i].head_new), sev_head,
996                                               NULL);
997             } while (evl_cas != sev_head);
998 #else
999             tMPI_Spinlock_lock( &(cur->evs[i].lock_new) );
1000             sev_head=(struct send_envelope*)cur->evs[i].head_new;
1001             cur->evs[i].head_new=NULL; /* detach the list */
1002             tMPI_Spinlock_unlock( &(cur->evs[i].lock_new) );
1003 #endif
1004
1005             if (sev_head) /* there's a newly arrived send envelope from this 
1006                              thread*/
1007             {
1008                 struct envelope *sev=sev_head;
1009                 struct envelope *prev_s=NULL;
1010                 struct envelope *rev;
1011
1012                 /* first enable reversing order by creating a regular 
1013                    doubly-linked list from the singly-linked shared
1014                    linked list */
1015                 while(sev) 
1016                 {
1017                     sev->prev=prev_s;
1018                     prev_s=sev;
1019                     sev=sev->next;
1020                 }
1021                 /* now walk through it backwards (in order of addition) */ 
1022                 sev=prev_s;
1023                 while(sev)
1024                 {
1025                     struct envelope *sevp=sev->prev;
1026                     n_handled++;
1027                     rev=tMPI_Recv_env_list_search_new(&(cur->evr), sev);
1028                     if (rev)
1029                     {
1030                         tMPI_Xfer(cur, sev, rev);
1031                     }
1032                     else
1033                     {
1034                         tMPI_Send_env_list_move_to_old( sev );
1035                     }
1036                     sev=sevp;
1037                 }
1038             }
1039         }
1040     }
1041     tMPI_Event_process( &(cur->p2p_event), n_handled);
1042 }
1043
1044 static bool tMPI_Test_single(struct tmpi_thread *cur, struct tmpi_req_ *rq)
1045 {
1046     struct envelope *ev=rq->ev;
1047
1048     if (ev && !(rq->finished) )
1049     {
1050 #ifdef USE_SEND_RECV_COPY_BUFFER
1051         if (ev->send && ev->using_cb)
1052         {
1053             /* We buffer-copy. Just do the transfer to the buffer and 
1054                return saying that we're done. It's now up to the 
1055                receiver to return our envelope.*/
1056             /* do our transfer and are guaranteed a finished 
1057                envelope. */
1058             tMPI_Send_copy_buffer(ev, rq);
1059             /* get the results */
1060             rq->error=rq->ev->error;
1061             rq->finished=TRUE;
1062         }
1063         else
1064 #endif
1065         {
1066             if( tMPI_Atomic_get( &(ev->state) ) >= env_finished ) 
1067             {
1068                 rq->finished=TRUE;
1069                 /* get the results */
1070                 rq->error=rq->ev->error;
1071                 tMPI_Set_req(ev, rq);
1072                 /* and release the envelope. After this point, the envelope
1073                    may be reused, so its contents shouldn't be relied on. */
1074                 if (ev->send)
1075                     tMPI_Send_env_list_return(ev);
1076                 else
1077                     tMPI_Free_env_list_return_recv( &(cur->envelopes), ev);
1078             }
1079         }
1080     }
1081     return rq->finished;
1082 }
1083
1084 static void tMPI_Wait_single(struct tmpi_thread *cur, struct tmpi_req_ *rq)
1085 {
1086     do
1087     {
1088         if (tMPI_Test_single(cur, rq))
1089             return;
1090         tMPI_Wait_process_incoming(cur);
1091     } while(TRUE);
1092 }
1093
1094 static bool tMPI_Test_multi(struct tmpi_thread *cur, struct tmpi_req_ *rqs,
1095                             bool *any_done)
1096 {
1097     bool all_done=TRUE;
1098     struct tmpi_req_ *creq=rqs;
1099
1100     int i=0;
1101     if (any_done)
1102         *any_done=FALSE;
1103
1104     while(creq)
1105     {
1106         bool finished=tMPI_Test_single(cur, creq);
1107         i++;
1108
1109         /* now do the check */
1110         if (! finished)
1111             all_done=FALSE;
1112         else 
1113         {
1114             /* remove the request from the list we've been given. */
1115             if (creq->prev)
1116                 creq->prev->next = creq->next;
1117             if (creq->next)
1118                 creq->next->prev = creq->prev;
1119             if (any_done)
1120                 *any_done=TRUE;
1121         }
1122
1123         creq = creq->next;
1124     } 
1125
1126     return all_done;
1127 }
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137