Merge release-4-6 into master
[alexxy/gromacs.git] / src / gromacs / gmxlib / thread_mpi / collective.c
1 /*
2 This source code file is part of thread_mpi.  
3 Written by Sander Pronk, Erik Lindahl, and possibly others. 
4
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6 All rights reserved.
7
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11    notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13    notice, this list of conditions and the following disclaimer in the
14    documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16    names of its contributors may be used to endorse or promote products
17    derived from this software without specific prior written permission.
18
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
35 files.
36 */
37
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
40 #endif
41
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
45
46
47 #ifdef HAVE_UNISTD_H
48 #include <unistd.h>
49 #endif
50
51 #include <errno.h>
52 #include <stdlib.h>
53 #include <stdio.h>
54 #include <stdarg.h>
55 #include <string.h>
56
57 #include "impl.h"
58 #include "collective.h"
59
60
61
62
63
64 #ifdef USE_COLLECTIVE_COPY_BUFFER
65 /* initialize a copy buffer */
66 void tMPI_Copy_buffer_init(struct copy_buffer *cb, size_t size)
67 {
68     cb->buf=tMPI_Malloc(size);
69     cb->size=size;
70 }
71
72 /* destroy a copy buffer */
73 void tMPI_Copy_buffer_destroy(struct copy_buffer *cb)
74 {
75     free(cb->buf);
76 }
77
78 void tMPI_Copy_buffer_list_init(struct copy_buffer_list *cbl, int Nbufs,
79                                 size_t size)
80 {
81     int i;
82
83     cbl->size=size;
84     cbl->cb_alloc=(struct copy_buffer*)
85                   tMPI_Malloc(sizeof(struct copy_buffer)*Nbufs);
86     cbl->cb=cbl->cb_alloc; /* the first one */
87     cbl->Nbufs = Nbufs;
88     for(i=0;i<Nbufs;i++)
89     {
90         tMPI_Copy_buffer_init( &(cbl->cb_alloc[i]), size );
91         if (i<Nbufs-1)
92             cbl->cb_alloc[i].next=&(cbl->cb_alloc[i+1]);
93         else
94             cbl->cb_alloc[i].next=NULL;
95     }
96 }
97
98 void tMPI_Copy_buffer_list_destroy(struct copy_buffer_list *cbl)
99 {
100     int i;
101
102     for(i=0;i<cbl->Nbufs;i++)
103     {
104         tMPI_Copy_buffer_destroy( &(cbl->cb_alloc[i]) );
105     }
106     free(cbl->cb_alloc);
107 }
108
109 struct copy_buffer *tMPI_Copy_buffer_list_get(struct copy_buffer_list *cbl)
110 {
111     struct copy_buffer *ret=cbl->cb;
112     if (!ret)
113     {
114         fprintf(stderr,"out of copy buffers!!");
115         exit(1);
116     }
117     cbl->cb=ret->next;
118
119     return ret;
120 }
121
122 void tMPI_Copy_buffer_list_return(struct copy_buffer_list *cbl, 
123                                   struct copy_buffer *cb)
124 {
125     cb->next=cbl->cb;
126     cbl->cb=cb;
127 }
128 #endif
129
130
131
132
133
134
135
136
137 void tMPI_Coll_envt_init(struct coll_env_thread *met, int N)
138 {
139     tMPI_Atomic_set(&(met->current_sync), 0);
140     tMPI_Atomic_set(&(met->n_remaining), 0);
141     met->buf=(void**)tMPI_Malloc(sizeof(void*)*N);
142     met->bufsize=(size_t*)tMPI_Malloc(sizeof(size_t)*N);
143     met->read_data=(tmpi_bool*)tMPI_Malloc(sizeof(tmpi_bool)*N);
144 #ifdef USE_COLLECTIVE_COPY_BUFFER
145     met->cpbuf=(tMPI_Atomic_ptr_t*)tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*N);
146     met->cb=NULL;
147     met->using_cb=FALSE;
148 #endif
149     tMPI_Event_init( &(met->send_ev) );
150     tMPI_Event_init( &(met->recv_ev) );
151 }
152
153
154 void tMPI_Coll_envt_destroy(struct coll_env_thread *met)
155 {
156     free( (void*)met->buf );
157     free( (void*)met->bufsize );
158     free( (void*)met->read_data );
159
160 #ifdef USE_COLLECTIVE_COPY_BUFFER
161     free( (void*)met->cpbuf );
162 #endif
163 }
164
165 void tMPI_Coll_env_init(struct coll_env *cev, int N)
166 {
167     int i;
168
169     cev->met=(struct coll_env_thread*)tMPI_Malloc(
170                         sizeof(struct coll_env_thread)*N);
171     cev->N=N;
172     tMPI_Atomic_set(&(cev->coll.current_sync), 0);
173     tMPI_Atomic_set(&(cev->coll.n_remaining), 0);
174     for(i=0;i<N;i++)
175     {
176         tMPI_Coll_envt_init(&(cev->met[i]), N);
177     }
178 }
179
180 void tMPI_Coll_env_destroy(struct coll_env *cev)
181 {
182     int i;
183     for(i=0;i<cev->N;i++)
184     {
185         tMPI_Coll_envt_destroy(&(cev->met[i]));
186     }
187     free( (void*)cev->met );
188 }
189
190
191 void tMPI_Coll_sync_init(struct coll_sync *csync, int N)
192 {
193     int i;
194
195     csync->synct=0;
196     csync->syncs=0;
197     csync->N=N;
198
199     csync->events=(tMPI_Event*)tMPI_Malloc(sizeof(tMPI_Event)*N);
200     for(i=0;i<N;i++)
201     {
202         tMPI_Event_init( &(csync->events[i]) );
203     }
204 }
205
206 void tMPI_Coll_sync_destroy(struct coll_sync *csync)
207 {
208     int i;
209
210     csync->synct=0;
211     csync->syncs=0;
212
213     for(i=0;i<csync->N;i++)
214     {
215         tMPI_Event_destroy( &(csync->events[i]) );
216     }
217     free(csync->events);
218 }
219
220
221
222
223
224
225 /* get a pointer the next coll_env once it's ready. */
226 struct coll_env *tMPI_Get_cev(tMPI_Comm comm, int myrank, int *counter)
227 {
228     struct coll_sync *csync=&(comm->csync[myrank]);
229     struct coll_env *cev;
230 #ifdef USE_COLLECTIVE_COPY_BUFFER
231     int N;
232 #endif
233
234     /* we increase our counter, and determine which coll_env we get */
235     csync->synct++;
236     *counter=csync->synct;
237     cev=&(comm->cev[csync->synct % N_COLL_ENV]);
238
239
240 #ifdef USE_COLLECTIVE_COPY_BUFFER
241     if (cev->met[myrank].using_cb)
242     {
243         N=tMPI_Event_wait( &(cev->met[myrank].send_ev));
244         tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
245     }
246 #endif
247 #ifdef USE_COLLECTIVE_COPY_BUFFER
248     /* clean up old copy_buffer pointers */
249     if (cev->met[myrank].cb)  
250     {
251         tMPI_Copy_buffer_list_return(&(tMPI_Get_current()->cbl_multi),
252                                      cev->met[myrank].cb);
253         cev->met[myrank].cb=NULL;
254         cev->met[myrank].using_cb=FALSE;
255     }
256 #endif
257
258     return cev;
259 }
260
261
262
263
264
265 void tMPI_Mult_recv(tMPI_Comm comm, struct coll_env *cev, int rank,
266                     int index, int expected_tag, tMPI_Datatype recvtype, 
267                     size_t recvsize, void *recvbuf, int *ret)
268 {
269     size_t sendsize=cev->met[rank].bufsize[index];
270
271     /* check tags, types */
272     if ((cev->met[rank].datatype != recvtype ) || 
273         (cev->met[rank].tag != expected_tag))
274     {
275         *ret=tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
276     }
277   
278     if (sendsize) /* we allow NULL ptrs if there's nothing to xmit */
279     {
280         void *srcbuf;
281 #ifdef USE_COLLECTIVE_COPY_BUFFER
282         tmpi_bool decrease_ctr=FALSE;
283 #endif
284
285         if ( sendsize > recvsize ) 
286         {
287             *ret=tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
288             return;
289         }
290
291         if ( cev->met[rank].buf == recvbuf )
292         {
293             *ret=tMPI_Error(TMPI_COMM_WORLD,TMPI_ERR_XFER_BUF_OVERLAP);
294             return;
295         }
296         /* get source buffer */
297 #ifdef USE_COLLECTIVE_COPY_BUFFER
298         if ( !(cev->met[rank].using_cb)) 
299 #endif
300         {
301             srcbuf=cev->met[rank].buf[index];
302         }
303 #ifdef USE_COLLECTIVE_COPY_BUFFER
304         else
305         {
306             srcbuf=tMPI_Atomic_ptr_get(&(cev->met[rank].cpbuf[index]));
307             tMPI_Atomic_memory_barrier_acq();
308
309             if(!srcbuf)
310             { /* there was (as of yet) no copied buffer */
311                 void *try_again_srcbuf;
312                 /* we need to try checking the pointer again after we increase
313                    the read counter, signaling that one more thread
314                    is reading. */
315                 tMPI_Atomic_add_return(&(cev->met[rank].buf_readcount), 1);
316                 /* a full memory barrier */
317                 tMPI_Atomic_memory_barrier();
318                 try_again_srcbuf=tMPI_Atomic_ptr_get(
319                                          &(cev->met[rank].cpbuf[index]));
320                 if (!try_again_srcbuf)
321                 {
322                     /* apparently the copied buffer is not ready yet. We
323                        just use the real source buffer. We have already
324                        indicated we're reading from the regular buf. */
325                     srcbuf=cev->met[rank].buf[index];
326                     decrease_ctr=TRUE;
327
328                 }
329                 else
330                 {
331                     /* We tried again, and this time there was a copied buffer. 
332                        We use that, and indicate that we're not reading from the
333                        regular buf. This case should be pretty rare.  */
334                     tMPI_Atomic_add_return(&(cev->met[rank].buf_readcount),-1);
335                     tMPI_Atomic_memory_barrier_acq();
336                     srcbuf=try_again_srcbuf;
337                 }
338             }
339
340 #ifdef TMPI_PROFILE
341             if (srcbuf)
342                 tMPI_Profile_count_buffered_coll_xfer(tMPI_Get_current());
343 #endif
344         }
345 #endif
346         /* copy data */
347         memcpy((char*)recvbuf, srcbuf, sendsize);
348 #ifdef TMPI_PROFILE
349         tMPI_Profile_count_coll_xfer(tMPI_Get_current());
350 #endif
351
352 #ifdef USE_COLLECTIVE_COPY_BUFFER
353         if (decrease_ctr)
354         {
355             /* we decrement the read count; potentially releasing the buffer. */
356             tMPI_Atomic_memory_barrier_rel();
357             tMPI_Atomic_add_return( &(cev->met[rank].buf_readcount), -1);
358         }
359 #endif
360     }
361     /* signal one thread ready */
362    {
363         int reta;
364         tMPI_Atomic_memory_barrier_rel();
365         reta=tMPI_Atomic_add_return( &(cev->met[rank].n_remaining), -1);
366         if (reta <= 0)
367         {
368             tMPI_Event_signal( &(cev->met[rank].send_ev) );
369         }
370     }
371 }
372
373 void tMPI_Coll_root_xfer(tMPI_Comm comm, tMPI_Datatype sendtype, 
374                          tMPI_Datatype recvtype, 
375                          size_t sendsize, size_t recvsize, 
376                          void* sendbuf, void* recvbuf, int *ret)
377 {
378     /* do root transfer */
379     if (recvsize < sendsize)
380     {
381         *ret=tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
382         return;
383     }
384     if (recvtype != sendtype)
385     {
386         *ret=tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
387         return;
388     }
389     if ( sendbuf == recvbuf )
390     {
391         *ret=tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_XFER_BUF_OVERLAP);
392         return;
393     }
394
395     memcpy(recvbuf, sendbuf, sendsize);
396 }
397
398 void tMPI_Post_multi(struct coll_env *cev, int myrank, int index, 
399                      int tag, tMPI_Datatype datatype, size_t bufsize, 
400                      void *buf, int n_remaining, int synct, int dest)
401 {
402     int i;
403 #ifdef USE_COLLECTIVE_COPY_BUFFER
404     /* decide based on the number of waiting threads */
405     tmpi_bool using_cb=(bufsize < (size_t)(n_remaining*COPY_BUFFER_SIZE));
406
407     cev->met[myrank].using_cb=using_cb;
408     if (using_cb)
409     {
410         /* we set it to NULL initially */
411         /*cev->met[myrank].cpbuf[index]=NULL;*/
412         tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[index]), NULL);
413
414         tMPI_Atomic_set(&(cev->met[myrank].buf_readcount), 0);
415     }
416 #endif
417     cev->met[myrank].tag=tag;
418     cev->met[myrank].datatype=datatype;
419     cev->met[myrank].buf[index]=buf;
420     cev->met[myrank].bufsize[index]=bufsize;
421     tMPI_Atomic_set(&(cev->met[myrank].n_remaining), n_remaining);
422     tMPI_Atomic_memory_barrier_rel();
423     tMPI_Atomic_set(&(cev->met[myrank].current_sync), synct);
424
425     /* publish availability. */
426     if (dest<0)
427     {
428         for(i=0;i<cev->N;i++)
429         {
430             if (i != myrank)
431                 tMPI_Event_signal( &(cev->met[i].recv_ev) );
432         }
433     }
434     else
435     {
436         tMPI_Event_signal( &(cev->met[dest].recv_ev) );
437     }
438
439 #ifdef USE_COLLECTIVE_COPY_BUFFER
440     /* becase we've published availability, we can start copying -- 
441        possibly in parallel with the receiver */
442     if (using_cb)
443     {
444         struct tmpi_thread *cur=tMPI_Get_current();
445          /* copy the buffer locally. First allocate */
446         cev->met[myrank].cb=tMPI_Copy_buffer_list_get( &(cur->cbl_multi) );
447         if (cev->met[myrank].cb->size < bufsize)
448         {
449             fprintf(stderr, "ERROR: cb size too small\n");
450             exit(1);
451         }
452         /* copy to the new buf */
453         memcpy(cev->met[myrank].cb->buf, buf, bufsize);
454
455         /* post the new buf */
456         tMPI_Atomic_memory_barrier_rel();
457         /*cev->met[myrank].cpbuf[index]=cev->met[myrank].cb->buf;*/
458         tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[index]), 
459                             cev->met[myrank].cb->buf);
460     }
461 #endif
462 }
463
464
465 void tMPI_Wait_for_others(struct coll_env *cev, int myrank)
466 {
467 #if defined(TMPI_PROFILE) 
468     struct tmpi_thread *cur=tMPI_Get_current();
469     tMPI_Profile_wait_start(cur);
470 #endif
471
472 #ifdef USE_COLLECTIVE_COPY_BUFFER
473     if (! (cev->met[myrank].using_cb) )
474 #endif
475     {
476         /* wait until everybody else is done copying the buffer */
477         tMPI_Event_wait( &(cev->met[myrank].send_ev));
478         tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
479     }
480 #ifdef USE_COLLECTIVE_COPY_BUFFER
481     else
482     {
483         /* wait until everybody else is done copying the original buffer. 
484            We use atomic add-return because we want to be sure of coherency.
485            This wait is bound to be very short (otherwise it wouldn't 
486            be double-buffering) so we always spin here. */
487         /*tMPI_Atomic_memory_barrier_rel();*/
488 #if 0
489         while (!tMPI_Atomic_cas( &(cev->met[rank].buf_readcount), 0,
490                                     -100000))
491 #endif
492 #if 0
493         while (tMPI_Atomic_add_return( &(cev->met[myrank].buf_readcount), 0) 
494                != 0)
495 #endif
496 #if 1
497         while (tMPI_Atomic_get( &(cev->met[rank].buf_readcount) )>0)
498 #endif
499         {
500         }
501         tMPI_Atomic_memory_barrier_acq();
502     }
503 #endif
504 #if defined(TMPI_PROFILE) 
505     tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_send);
506 #endif
507 }
508
509 void tMPI_Wait_for_data(struct tmpi_thread *cur, struct coll_env *cev, 
510                         int myrank)
511 {
512 #if defined(TMPI_PROFILE) 
513     tMPI_Profile_wait_start(cur);
514 #endif
515     tMPI_Event_wait( &(cev->met[myrank].recv_ev));
516     tMPI_Event_process( &(cev->met[myrank].recv_ev), 1);
517 #if defined(TMPI_PROFILE) 
518     tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_recv);
519 #endif
520 }
521
522
523
524
525
526
527 int tMPI_Barrier(tMPI_Comm comm) 
528 {
529 #ifdef TMPI_PROFILE
530     struct tmpi_thread *cur=tMPI_Get_current();
531     tMPI_Profile_count_start(cur);
532 #endif
533
534 #ifdef TMPI_TRACE
535     tMPI_Trace_print("tMPI_Barrier(%p, %d, %p, %d, %d, %p, %p)", comm);
536 #endif
537
538     if (!comm)
539     {
540         return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
541     }
542
543     if (comm->grp.N>1)
544     {
545 #if defined(TMPI_PROFILE) 
546         tMPI_Profile_wait_start(cur);
547 #endif
548
549         tMPI_Barrier_wait( &(comm->barrier) );
550 #if defined(TMPI_PROFILE) 
551         tMPI_Profile_wait_stop(cur, TMPIWAIT_Barrier);
552 #endif
553     }
554
555 #ifdef TMPI_PROFILE
556     tMPI_Profile_count_stop(cur, TMPIFN_Barrier);
557 #endif
558     return TMPI_SUCCESS;
559 }
560
561
562
563