Redefine the default boolean type to gmx_bool.
[alexxy/gromacs.git] / src / gmxlib / thread_mpi / collective.c
1 /*
2 This source code file is part of thread_mpi.  
3 Written by Sander Pronk, Erik Lindahl, and possibly others. 
4
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6 All rights reserved.
7
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11    notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13    notice, this list of conditions and the following disclaimer in the
14    documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16    names of its contributors may be used to endorse or promote products
17    derived from this software without specific prior written permission.
18
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
35 files.
36 */
37
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
40 #endif
41
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
45
46
47 #ifdef HAVE_UNISTD_H
48 #include <unistd.h>
49 #endif
50
51 #include <errno.h>
52 #include <stdlib.h>
53 #include <stdio.h>
54 #include <stdarg.h>
55 #include <string.h>
56
57 #include "impl.h"
58
59
60 /* get a pointer the next coll_env once it's ready */
61 static struct coll_env *tMPI_Get_cev(tMPI_Comm comm, int myrank, int *synct);
62
63 /* post the availability of data in a cev. 
64     cev         = the collective comm environment
65     myrank      = my rank
66     index       = the buffer index
67     tag         = the tag
68     datatype    = the datatype
69     busize      = the buffer size
70     buf         = the buffer to xfer
71     n_remaining = the number of remaining threads that need to transfer
72     synct       = the multicast sync number 
73     dest        = -1 for all theads, or a specific rank number.
74 */
75 static void tMPI_Post_multi(struct coll_env *cev, int myrank, int index, 
76                             int tag, tMPI_Datatype datatype, 
77                             size_t bufsize, void *buf, int n_remaining, 
78                             int synct, int dest);
79
80 /* transfer data from cev->met[rank] to recvbuf */
81 static void tMPI_Mult_recv(tMPI_Comm comm, struct coll_env *cev, int rank,
82                            int index, int expected_tag, tMPI_Datatype recvtype,
83                            size_t recvsize, void *recvbuf, int *ret);
84
85 /* do a root transfer (from root send buffer to root recv buffer) */
86 static void tMPI_Coll_root_xfer(tMPI_Comm comm, 
87                                 tMPI_Datatype sendtype, tMPI_Datatype recvtype, 
88                                 size_t sendsize, size_t recvsize, 
89                                 void* sendbuf, void* recvbuf, int *ret);
90
91 /* wait for other processes to copy data from my cev */
92 static void tMPI_Wait_for_others(struct coll_env *cev, int myrank);
93 /* wait for data to become available from a specific rank */
94 static void tMPI_Wait_for_data(struct tmpi_thread *cur, struct coll_env *cev, 
95                                int myrank);
96                                /*int rank, int myrank, int synct);*/
97
98 /* run a single binary reduce operation on src_a and src_b, producing dest. 
99       dest and src_a may be identical */
100 static int tMPI_Reduce_run_op(void *dest, void *src_a, void *src_b,
101                               tMPI_Datatype datatype, int count, tMPI_Op op,
102                               tMPI_Comm comm);
103
104
105
106
107 #ifdef USE_COLLECTIVE_COPY_BUFFER
108 /* initialize a copy buffer */
109 void tMPI_Copy_buffer_init(struct copy_buffer *cb, size_t size)
110 {
111     cb->buf=tMPI_Malloc(size);
112     cb->size=size;
113 }
114
115 /* destroy a copy buffer */
116 void tMPI_Copy_buffer_destroy(struct copy_buffer *cb)
117 {
118     free(cb->buf);
119 }
120
121 void tMPI_Copy_buffer_list_init(struct copy_buffer_list *cbl, int Nbufs,
122                                 size_t size)
123 {
124     int i;
125
126     cbl->size=size;
127     cbl->cb_alloc=(struct copy_buffer*)
128                   tMPI_Malloc(sizeof(struct copy_buffer)*Nbufs);
129     cbl->cb=cbl->cb_alloc; /* the first one */
130     cbl->Nbufs = Nbufs;
131     for(i=0;i<Nbufs;i++)
132     {
133         tMPI_Copy_buffer_init( &(cbl->cb_alloc[i]), size );
134         if (i<Nbufs-1)
135             cbl->cb_alloc[i].next=&(cbl->cb_alloc[i+1]);
136         else
137             cbl->cb_alloc[i].next=NULL;
138     }
139 }
140
141 void tMPI_Copy_buffer_list_destroy(struct copy_buffer_list *cbl)
142 {
143     int i;
144
145     for(i=0;i<cbl->Nbufs;i++)
146     {
147         tMPI_Copy_buffer_destroy( &(cbl->cb_alloc[i]) );
148     }
149     free(cbl->cb_alloc);
150 }
151
152 struct copy_buffer *tMPI_Copy_buffer_list_get(struct copy_buffer_list *cbl)
153 {
154     struct copy_buffer *ret=cbl->cb;
155     if (!ret)
156     {
157         fprintf(stderr,"out of copy buffers!!");
158         exit(1);
159     }
160     cbl->cb=ret->next;
161
162     return ret;
163 }
164
165 void tMPI_Copy_buffer_list_return(struct copy_buffer_list *cbl, 
166                                   struct copy_buffer *cb)
167 {
168     cb->next=cbl->cb;
169     cbl->cb=cb;
170 }
171 #endif
172
173
174
175
176
177
178
179
180 static void tMPI_Coll_envt_init(struct coll_env_thread *met, int N)
181 {
182     tMPI_Atomic_set(&(met->current_sync), 0);
183     tMPI_Atomic_set(&(met->n_remaining), 0);
184     met->buf=(void**)tMPI_Malloc(sizeof(void*)*N);
185     met->bufsize=(size_t*)tMPI_Malloc(sizeof(size_t)*N);
186     met->read_data=(gmx_bool*)tMPI_Malloc(sizeof(gmx_bool)*N);
187 #ifdef USE_COLLECTIVE_COPY_BUFFER
188     met->cpbuf=(tMPI_Atomic_ptr_t*)tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*N);
189     met->cb=NULL;
190     met->using_cb=FALSE;
191 #endif
192     tMPI_Event_init( &(met->send_ev) );
193     tMPI_Event_init( &(met->recv_ev) );
194 }
195
196
197 static void tMPI_Coll_envt_destroy(struct coll_env_thread *met)
198 {
199     free( (void*)met->buf );
200     free( (void*)met->bufsize );
201     free( (void*)met->read_data );
202
203 #ifdef USE_COLLECTIVE_COPY_BUFFER
204     free( (void*)met->cpbuf );
205 #endif
206 }
207
208 void tMPI_Coll_env_init(struct coll_env *cev, int N)
209 {
210     int i;
211
212     cev->met=(struct coll_env_thread*)tMPI_Malloc(
213                         sizeof(struct coll_env_thread)*N);
214     cev->N=N;
215     tMPI_Atomic_set(&(cev->coll.current_sync), 0);
216     tMPI_Atomic_set(&(cev->coll.n_remaining), 0);
217     for(i=0;i<N;i++)
218     {
219         tMPI_Coll_envt_init(&(cev->met[i]), N);
220     }
221 }
222
223 void tMPI_Coll_env_destroy(struct coll_env *cev)
224 {
225     int i;
226     for(i=0;i<cev->N;i++)
227     {
228         tMPI_Coll_envt_destroy(&(cev->met[i]));
229     }
230     free( (void*)cev->met );
231 }
232
233
234 void tMPI_Coll_sync_init(struct coll_sync *csync, int N)
235 {
236     int i;
237
238     csync->synct=0;
239     csync->syncs=0;
240     csync->N=N;
241
242     csync->events=(tMPI_Event*)tMPI_Malloc(sizeof(tMPI_Event)*N);
243     for(i=0;i<N;i++)
244     {
245         tMPI_Event_init( &(csync->events[i]) );
246     }
247 }
248
249 void tMPI_Coll_sync_destroy(struct coll_sync *csync)
250 {
251     int i;
252
253     csync->synct=0;
254     csync->syncs=0;
255
256     for(i=0;i<csync->N;i++)
257     {
258         tMPI_Event_destroy( &(csync->events[i]) );
259     }
260     free(csync->events);
261 }
262
263
264
265
266
267
268 /* get a pointer the next coll_env once it's ready. */
269 static struct coll_env *tMPI_Get_cev(tMPI_Comm comm, int myrank, int *counter)
270 {
271     struct coll_sync *csync=&(comm->csync[myrank]);
272     struct coll_env *cev;
273 #ifdef USE_COLLECTIVE_COPY_BUFFER
274     int N;
275 #endif
276
277     /* we increase our counter, and determine which coll_env we get */
278     csync->synct++;
279     *counter=csync->synct;
280     cev=&(comm->cev[csync->synct % N_COLL_ENV]);
281
282
283 #ifdef USE_COLLECTIVE_COPY_BUFFER
284     if (cev->met[myrank].using_cb)
285     {
286         N=tMPI_Event_wait( &(cev->met[myrank].send_ev));
287         tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
288     }
289 #endif
290 #ifdef USE_COLLECTIVE_COPY_BUFFER
291     /* clean up old copy_buffer pointers */
292     if (cev->met[myrank].cb)  
293     {
294         tMPI_Copy_buffer_list_return(&(tMPI_Get_current()->cbl_multi),
295                                      cev->met[myrank].cb);
296         cev->met[myrank].cb=NULL;
297         cev->met[myrank].using_cb=FALSE;
298     }
299 #endif
300
301     return cev;
302 }
303
304
305
306
307
308 static void tMPI_Mult_recv(tMPI_Comm comm, struct coll_env *cev, int rank,
309                            int index, int expected_tag, tMPI_Datatype recvtype, 
310                            size_t recvsize, void *recvbuf, int *ret)
311 {
312     size_t sendsize=cev->met[rank].bufsize[index];
313
314     /* check tags, types */
315     if ((cev->met[rank].datatype != recvtype ) || 
316         (cev->met[rank].tag != expected_tag))
317     {
318         *ret=tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
319     }
320   
321     if (sendsize) /* we allow NULL ptrs if there's nothing to xmit */
322     {
323         void *srcbuf;
324 #ifdef USE_COLLECTIVE_COPY_BUFFER
325         gmx_bool decrease_ctr=FALSE;
326 #endif
327
328         if ( sendsize > recvsize ) 
329         {
330             *ret=tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
331             return;
332         }
333
334         if ( cev->met[rank].buf == recvbuf )
335         {
336             *ret=tMPI_Error(TMPI_COMM_WORLD,TMPI_ERR_XFER_BUF_OVERLAP);
337             return;
338         }
339         /* get source buffer */
340 #ifdef USE_COLLECTIVE_COPY_BUFFER
341         if ( !(cev->met[rank].using_cb)) 
342 #endif
343         {
344             srcbuf=cev->met[rank].buf[index];
345         }
346 #ifdef USE_COLLECTIVE_COPY_BUFFER
347         else
348         {
349             tMPI_Atomic_memory_barrier();
350             srcbuf=tMPI_Atomic_ptr_get(&(cev->met[rank].cpbuf[index]));
351
352             if(!srcbuf)
353             { /* there was (as of yet) no copied buffer */
354                 void *try_again_srcbuf;
355                 /* we need to try checking the pointer again after we increase
356                    the read counter, signaling that one more thread
357                    is reading. */
358                 tMPI_Atomic_add_return(&(cev->met[rank].buf_readcount), 1);
359                 tMPI_Atomic_memory_barrier();
360                 /*try_again_srcbuf=(char*) (cev->met[rank].cpbuf[index]);*/
361                 try_again_srcbuf=tMPI_Atomic_ptr_get(
362                                          &(cev->met[rank].cpbuf[index]));
363                 if (!try_again_srcbuf)
364                 {
365                     /* apparently the copied buffer is not ready yet. We
366                        just use the real source buffer. We have already
367                        indicated we're reading from the regular buf. */
368                     srcbuf=cev->met[rank].buf[index];
369                     decrease_ctr=TRUE;
370
371                 }
372                 else
373                 {
374                     /* We tried again, and this time there was a copied buffer. 
375                        We use that, and indicate that we're not reading from the
376                        regular buf. This case should be pretty rare.  */
377                     tMPI_Atomic_fetch_add(&(cev->met[rank].buf_readcount),-1);
378                     srcbuf=try_again_srcbuf;
379                 }
380             }
381
382 #ifdef TMPI_PROFILE
383             if (srcbuf)
384                 tMPI_Profile_count_buffered_coll_xfer(tMPI_Get_current());
385 #endif
386         }
387 #endif
388         /* copy data */
389         memcpy((char*)recvbuf, srcbuf, sendsize);
390 #ifdef TMPI_PROFILE
391         tMPI_Profile_count_coll_xfer(tMPI_Get_current());
392 #endif
393
394 #ifdef USE_COLLECTIVE_COPY_BUFFER
395         if (decrease_ctr)
396         {
397             /* we decrement the read count; potentially releasing the buffer. */
398             tMPI_Atomic_fetch_add( &(cev->met[rank].buf_readcount), -1);
399         }
400 #endif
401     }
402     /* signal one thread ready */
403    {
404         int reta;
405         reta=tMPI_Atomic_add_return( &(cev->met[rank].n_remaining), -1);
406         if (reta <= 0)
407         {
408             tMPI_Event_signal( &(cev->met[rank].send_ev) );
409         }
410     }
411 }
412
413 static void tMPI_Coll_root_xfer(tMPI_Comm comm, tMPI_Datatype sendtype, 
414                                 tMPI_Datatype recvtype, 
415                                 size_t sendsize, size_t recvsize, 
416                                 void* sendbuf, void* recvbuf, int *ret)
417 {
418     /* do root transfer */
419     if (recvsize < sendsize)
420     {
421         *ret=tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
422         return;
423     }
424     if (recvtype != sendtype)
425     {
426         *ret=tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
427         return;
428     }
429     if ( sendbuf == recvbuf )
430     {
431         *ret=tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_XFER_BUF_OVERLAP);
432         return;
433     }
434
435     memcpy(recvbuf, sendbuf, sendsize);
436 }
437
438 static void tMPI_Post_multi(struct coll_env *cev, int myrank, int index, 
439                             int tag, tMPI_Datatype datatype, size_t bufsize, 
440                             void *buf, int n_remaining, int synct, int dest)
441 {
442     int i;
443 #ifdef USE_COLLECTIVE_COPY_BUFFER
444     /* decide based on the number of waiting threads */
445     gmx_bool using_cb=(bufsize < (size_t)(n_remaining*COPY_BUFFER_SIZE));
446
447     cev->met[myrank].using_cb=using_cb;
448     if (using_cb)
449     {
450         /* we set it to NULL initially */
451         /*cev->met[myrank].cpbuf[index]=NULL;*/
452         tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[index]), NULL);
453
454         tMPI_Atomic_set(&(cev->met[myrank].buf_readcount), 0);
455     }
456 #endif
457     cev->met[myrank].tag=tag;
458     cev->met[myrank].datatype=datatype;
459     cev->met[myrank].buf[index]=buf;
460     cev->met[myrank].bufsize[index]=bufsize;
461     tMPI_Atomic_set(&(cev->met[myrank].n_remaining), n_remaining);
462     tMPI_Atomic_set(&(cev->met[myrank].current_sync), synct);
463
464     /* publish availability. */
465     if (dest<0)
466     {
467         for(i=0;i<cev->N;i++)
468         {
469             if (i != myrank)
470                 tMPI_Event_signal( &(cev->met[i].recv_ev) );
471         }
472     }
473     else
474     {
475         tMPI_Event_signal( &(cev->met[dest].recv_ev) );
476     }
477
478 #ifdef USE_COLLECTIVE_COPY_BUFFER
479     /* becase we've published availability, we can start copying -- 
480        possibly in parallel with the receiver */
481     if (using_cb)
482     {
483         struct tmpi_thread *cur=tMPI_Get_current();
484          /* copy the buffer locally. First allocate */
485         cev->met[myrank].cb=tMPI_Copy_buffer_list_get( &(cur->cbl_multi) );
486         if (cev->met[myrank].cb->size < bufsize)
487         {
488             fprintf(stderr, "ERROR: cb size too small\n");
489             exit(1);
490         }
491         /* copy to the new buf */
492         memcpy(cev->met[myrank].cb->buf, buf, bufsize);
493
494         /* post the new buf */
495         tMPI_Atomic_memory_barrier();
496         /*cev->met[myrank].cpbuf[index]=cev->met[myrank].cb->buf;*/
497         tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[index]), 
498                             cev->met[myrank].cb->buf);
499     }
500 #endif
501 }
502
503
504 static void tMPI_Wait_for_others(struct coll_env *cev, int myrank)
505 {
506 #if defined(TMPI_PROFILE) 
507     struct tmpi_thread *cur=tMPI_Get_current();
508     tMPI_Profile_wait_start(cur);
509 #endif
510
511 #ifdef USE_COLLECTIVE_COPY_BUFFER
512     if (! (cev->met[myrank].using_cb) )
513 #endif
514     {
515         /* wait until everybody else is done copying the buffer */
516         tMPI_Event_wait( &(cev->met[myrank].send_ev));
517         tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
518     }
519 #ifdef USE_COLLECTIVE_COPY_BUFFER
520     else
521     {
522         /* wait until everybody else is done copying the original buffer. 
523            We use fetch_add because we want to be sure of coherency.
524            This wait is bound to be very short (otherwise it wouldn't 
525            be double-buffering) so we always spin here. */
526         tMPI_Atomic_memory_barrier();
527 #if 0
528         while (tMPI_Atomic_cas( &(cev->met[rank].buf_readcount), 0,
529                                     -100000) != 0)
530 #endif
531 #if 1
532         while (tMPI_Atomic_fetch_add( &(cev->met[myrank].buf_readcount), 0) 
533                != 0)
534 #endif
535 #if 0
536         while (tMPI_Atomic_get( &(cev->met[rank].buf_readcount) )>0)
537 #endif
538         {
539             tMPI_Atomic_memory_barrier();
540         }
541     }
542 #endif
543 #if defined(TMPI_PROFILE) 
544     tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_send);
545 #endif
546 }
547
548 static void tMPI_Wait_for_data(struct tmpi_thread *cur, struct coll_env *cev, 
549                                int myrank)
550                                /*int rank, int myrank, int synct)*/
551 {
552 #if defined(TMPI_PROFILE) 
553     tMPI_Profile_wait_start(cur);
554 #endif
555     tMPI_Event_wait( &(cev->met[myrank].recv_ev));
556     tMPI_Event_process( &(cev->met[myrank].recv_ev), 1);
557 #if defined(TMPI_PROFILE) 
558     tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_recv);
559 #endif
560 }
561
562
563
564
565
566
567 int tMPI_Barrier(tMPI_Comm comm) 
568 {
569 #ifdef TMPI_PROFILE
570     struct tmpi_thread *cur=tMPI_Get_current();
571     tMPI_Profile_count_start(cur);
572 #endif
573
574 #ifdef TMPI_TRACE
575     tMPI_Trace_print("tMPI_Barrier(%p, %d, %p, %d, %d, %p, %p)", comm);
576 #endif
577
578     if (!comm)
579     {
580         return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
581     }
582
583     if (comm->grp.N>1)
584     {
585 #if defined(TMPI_PROFILE) 
586         tMPI_Profile_wait_start(cur);
587 #endif
588
589         tMPI_Barrier_wait( &(comm->barrier) );
590 #if defined(TMPI_PROFILE) 
591         tMPI_Profile_wait_stop(cur, TMPIWAIT_Barrier);
592 #endif
593     }
594
595 #ifdef TMPI_PROFILE
596     tMPI_Profile_count_stop(cur, TMPIFN_Barrier);
597 #endif
598     return TMPI_SUCCESS;
599 }
600
601
602
603
604 /* The actual collective functions are #included, so that the static
605    functions above are available to them and can get inlined if the
606    compiler deems it appropriate. */
607 #include "bcast.h"
608 #include "scatter.h"
609 #include "gather.h"
610 #include "alltoall.h"
611 #include "reduce.h"
612