11a70977509818495f49eb1c557c11afb06b7e62
[alexxy/gromacs.git] / src / gromacs / gmxlib / thread_mpi / collective.c
1 /*
2    This source code file is part of thread_mpi.
3    Written by Sander Pronk, Erik Lindahl, and possibly others.
4
5    Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6    All rights reserved.
7
8    Redistribution and use in source and binary forms, with or without
9    modification, are permitted provided that the following conditions are met:
10    1) Redistributions of source code must retain the above copyright
11    notice, this list of conditions and the following disclaimer.
12    2) Redistributions in binary form must reproduce the above copyright
13    notice, this list of conditions and the following disclaimer in the
14    documentation and/or other materials provided with the distribution.
15    3) Neither the name of the copyright holders nor the
16    names of its contributors may be used to endorse or promote products
17    derived from this software without specific prior written permission.
18
19    THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20    EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21    WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22    DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23    DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24    (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26    ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28    SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30    If you want to redistribute modifications, please consider that
31    scientific software is very special. Version control is crucial -
32    bugs must be traceable. We will be happy to consider code for
33    inclusion in the official distribution, but derived work should not
34    be called official thread_mpi. Details are found in the README & COPYING
35    files.
36  */
37
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
40 #endif
41
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
45
46
47 #ifdef HAVE_UNISTD_H
48 #include <unistd.h>
49 #endif
50
51 #include <errno.h>
52 #include <stdlib.h>
53 #include <stdio.h>
54 #include <stdarg.h>
55 #include <string.h>
56
57 #include "impl.h"
58 #include "collective.h"
59 #include "unused.h"
60
61 #ifdef USE_COLLECTIVE_COPY_BUFFER
62 /* initialize a copy buffer */
63 int tMPI_Copy_buffer_init(struct copy_buffer *cb, size_t size)
64 {
65     cb->buf = tMPI_Malloc(size);
66     if (cb->buf == NULL)
67     {
68         return TMPI_ERR_NO_MEM;
69     }
70     cb->size = size;
71     return TMPI_SUCCESS;
72 }
73
74 /* destroy a copy buffer */
75 void tMPI_Copy_buffer_destroy(struct copy_buffer *cb)
76 {
77     free(cb->buf);
78 }
79
80 int tMPI_Copy_buffer_list_init(struct copy_buffer_list *cbl, int Nbufs,
81                                size_t size)
82 {
83     int i;
84     int ret;
85
86     cbl->size     = size;
87     cbl->cb_alloc = (struct copy_buffer*)
88         tMPI_Malloc(sizeof(struct copy_buffer)*Nbufs);
89     if (cbl->cb_alloc == NULL)
90     {
91         return TMPI_ERR_NO_MEM;
92     }
93     cbl->cb    = cbl->cb_alloc; /* the first one */
94     cbl->Nbufs = Nbufs;
95     for (i = 0; i < Nbufs; i++)
96     {
97         ret = tMPI_Copy_buffer_init( &(cbl->cb_alloc[i]), size );
98         if (ret != TMPI_SUCCESS)
99         {
100             return ret;
101         }
102         if (i < Nbufs-1)
103         {
104             cbl->cb_alloc[i].next = &(cbl->cb_alloc[i+1]);
105         }
106         else
107         {
108             cbl->cb_alloc[i].next = NULL;
109         }
110     }
111     return TMPI_SUCCESS;
112 }
113
114 void tMPI_Copy_buffer_list_destroy(struct copy_buffer_list *cbl)
115 {
116     int i;
117
118     for (i = 0; i < cbl->Nbufs; i++)
119     {
120         tMPI_Copy_buffer_destroy( &(cbl->cb_alloc[i]) );
121     }
122     free(cbl->cb_alloc);
123 }
124
125 struct copy_buffer *tMPI_Copy_buffer_list_get(struct copy_buffer_list *cbl)
126 {
127     struct copy_buffer *ret = cbl->cb;
128     if (!ret)
129     {
130         tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COPY_NBUFFERS);
131         return NULL;
132     }
133     cbl->cb = ret->next;
134
135     return ret;
136 }
137
138 void tMPI_Copy_buffer_list_return(struct copy_buffer_list *cbl,
139                                   struct copy_buffer      *cb)
140 {
141     cb->next = cbl->cb;
142     cbl->cb  = cb;
143 }
144 #endif
145
146 int tMPI_Coll_envt_init(struct coll_env_thread *met, int N)
147 {
148     tMPI_Atomic_set(&(met->current_sync), 0);
149     tMPI_Atomic_set(&(met->n_remaining), 0);
150     met->buf = (void**)tMPI_Malloc(sizeof(void*)*N);
151     if (met->buf == NULL)
152     {
153         return TMPI_ERR_NO_MEM;
154     }
155     met->bufsize = (size_t*)tMPI_Malloc(sizeof(size_t)*N);
156     if (met->bufsize == NULL)
157     {
158         return TMPI_ERR_NO_MEM;
159     }
160     met->read_data = (tmpi_bool*)tMPI_Malloc(sizeof(tmpi_bool)*N);
161     if (met->read_data == NULL)
162     {
163         return TMPI_ERR_NO_MEM;
164     }
165 #ifdef USE_COLLECTIVE_COPY_BUFFER
166     met->cpbuf = (tMPI_Atomic_ptr_t*)tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*
167                                                  N);
168     if (met->read_data == NULL)
169     {
170         return TMPI_ERR_NO_MEM;
171     }
172     met->cb       = NULL;
173     met->using_cb = FALSE;
174 #endif
175     tMPI_Event_init( &(met->send_ev) );
176     tMPI_Event_init( &(met->recv_ev) );
177     return TMPI_SUCCESS;
178 }
179
180 void tMPI_Coll_envt_destroy(struct coll_env_thread *met)
181 {
182     free( (void*)met->buf );
183     free( (void*)met->bufsize );
184     free( (void*)met->read_data );
185
186 #ifdef USE_COLLECTIVE_COPY_BUFFER
187     free( (void*)met->cpbuf );
188 #endif
189 }
190
191 int tMPI_Coll_env_init(struct coll_env *cev, int N)
192 {
193     int i;
194     int ret;
195
196     cev->met = (struct coll_env_thread*)tMPI_Malloc(
197                 sizeof(struct coll_env_thread)*N);
198     if (cev->met == NULL)
199     {
200         return TMPI_ERR_NO_MEM;
201     }
202     cev->N = N;
203     tMPI_Atomic_set(&(cev->coll.current_sync), 0);
204     tMPI_Atomic_set(&(cev->coll.n_remaining), 0);
205     for (i = 0; i < N; i++)
206     {
207         ret = tMPI_Coll_envt_init(&(cev->met[i]), N);
208         if (ret != TMPI_SUCCESS)
209         {
210             return ret;
211         }
212     }
213     return TMPI_SUCCESS;
214 }
215
216 void tMPI_Coll_env_destroy(struct coll_env *cev)
217 {
218     int i;
219     for (i = 0; i < cev->N; i++)
220     {
221         tMPI_Coll_envt_destroy(&(cev->met[i]));
222     }
223     free( (void*)cev->met );
224 }
225
226 int tMPI_Coll_sync_init(struct coll_sync *csync, int N)
227 {
228     int i;
229
230     csync->synct = 0;
231     csync->syncs = 0;
232     csync->N     = N;
233
234     csync->events = (tMPI_Event*)tMPI_Malloc(sizeof(tMPI_Event)*N);
235     if (csync->events == NULL)
236     {
237         return TMPI_ERR_NO_MEM;
238     }
239     for (i = 0; i < N; i++)
240     {
241         tMPI_Event_init( &(csync->events[i]) );
242     }
243     return TMPI_SUCCESS;
244 }
245
246 void tMPI_Coll_sync_destroy(struct coll_sync *csync)
247 {
248     int i;
249
250     csync->synct = 0;
251     csync->syncs = 0;
252
253     for (i = 0; i < csync->N; i++)
254     {
255         tMPI_Event_destroy( &(csync->events[i]) );
256     }
257     free(csync->events);
258 }
259
260 /* get a pointer the next coll_env once it's ready. */
261 struct coll_env *tMPI_Get_cev(tMPI_Comm comm, int myrank, int *counter)
262 {
263     struct coll_sync *csync = &(comm->csync[myrank]);
264     struct coll_env  *cev;
265 #ifdef USE_COLLECTIVE_COPY_BUFFER
266     int               N;
267 #endif
268
269     /* we increase our counter, and determine which coll_env we get */
270     csync->synct++;
271     *counter = csync->synct;
272     cev      = &(comm->cev[csync->synct % N_COLL_ENV]);
273
274
275 #ifdef USE_COLLECTIVE_COPY_BUFFER
276     if (cev->met[myrank].using_cb)
277     {
278         if (cev->N > 1)
279         {
280             N = tMPI_Event_wait( &(cev->met[myrank].send_ev));
281             tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
282         }
283     }
284 #endif
285 #ifdef USE_COLLECTIVE_COPY_BUFFER
286     /* clean up old copy_buffer pointers */
287     if (cev->met[myrank].cb)
288     {
289         tMPI_Copy_buffer_list_return(&(tMPI_Get_current()->cbl_multi),
290                                      cev->met[myrank].cb);
291         cev->met[myrank].cb       = NULL;
292         cev->met[myrank].using_cb = FALSE;
293     }
294 #endif
295
296     return cev;
297 }
298
299 void tMPI_Mult_recv(tMPI_Comm comm, struct coll_env *cev, int rank,
300                     int index, int expected_tag, tMPI_Datatype recvtype,
301                     size_t recvsize, void *recvbuf, int *ret)
302 {
303     size_t sendsize = cev->met[rank].bufsize[index];
304
305     /* check tags, types */
306     if ((cev->met[rank].datatype != recvtype ) ||
307         (cev->met[rank].tag != expected_tag))
308     {
309         *ret = tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
310     }
311
312     if (sendsize) /* we allow NULL ptrs if there's nothing to xmit */
313     {
314         void     *srcbuf;
315 #ifdef USE_COLLECTIVE_COPY_BUFFER
316         tmpi_bool decrease_ctr = FALSE;
317 #endif
318
319         if (sendsize > recvsize)
320         {
321             *ret = tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
322             return;
323         }
324
325         if (cev->met[rank].buf == recvbuf)
326         {
327             *ret = tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_XFER_BUF_OVERLAP);
328             return;
329         }
330         /* get source buffer */
331 #ifdef USE_COLLECTIVE_COPY_BUFFER
332         if (!(cev->met[rank].using_cb))
333 #endif
334         {
335             srcbuf = cev->met[rank].buf[index];
336         }
337 #ifdef USE_COLLECTIVE_COPY_BUFFER
338         else
339         {
340             srcbuf = tMPI_Atomic_ptr_get(&(cev->met[rank].cpbuf[index]));
341             tMPI_Atomic_memory_barrier_acq();
342
343             if (!srcbuf)
344             {   /* there was (as of yet) no copied buffer */
345                 void *try_again_srcbuf;
346                 /* we need to try checking the pointer again after we increase
347                    the read counter, signaling that one more thread
348                    is reading. */
349                 tMPI_Atomic_fetch_add(&(cev->met[rank].buf_readcount), 1);
350                 /* a full memory barrier */
351                 tMPI_Atomic_memory_barrier();
352                 try_again_srcbuf = tMPI_Atomic_ptr_get(
353                             &(cev->met[rank].cpbuf[index]));
354                 if (!try_again_srcbuf)
355                 {
356                     /* apparently the copied buffer is not ready yet. We
357                        just use the real source buffer. We have already
358                        indicated we're reading from the regular buf. */
359                     srcbuf       = cev->met[rank].buf[index];
360                     decrease_ctr = TRUE;
361
362                 }
363                 else
364                 {
365                     /* We tried again, and this time there was a copied buffer.
366                        We use that, and indicate that we're not reading from the
367                        regular buf. This case should be pretty rare.  */
368                     tMPI_Atomic_fetch_add(&(cev->met[rank].buf_readcount), -1);
369                     tMPI_Atomic_memory_barrier_acq();
370                     srcbuf = try_again_srcbuf;
371                 }
372             }
373
374 #ifdef TMPI_PROFILE
375             if (srcbuf)
376             {
377                 tMPI_Profile_count_buffered_coll_xfer(tMPI_Get_current());
378             }
379 #endif
380         }
381 #endif
382         /* copy data */
383         memcpy((char*)recvbuf, srcbuf, sendsize);
384 #ifdef TMPI_PROFILE
385         tMPI_Profile_count_coll_xfer(tMPI_Get_current());
386 #endif
387
388 #ifdef USE_COLLECTIVE_COPY_BUFFER
389         if (decrease_ctr)
390         {
391             /* we decrement the read count; potentially releasing the buffer. */
392             tMPI_Atomic_memory_barrier_rel();
393             tMPI_Atomic_fetch_add( &(cev->met[rank].buf_readcount), -1);
394         }
395 #endif
396     }
397     /* signal one thread ready */
398     {
399         int reta;
400         tMPI_Atomic_memory_barrier_rel();
401         reta = tMPI_Atomic_fetch_add( &(cev->met[rank].n_remaining), -1);
402         if (reta <= 1) /* n_remaining == 0 now. */
403         {
404             tMPI_Event_signal( &(cev->met[rank].send_ev) );
405         }
406     }
407 }
408
409 void tMPI_Coll_root_xfer(tMPI_Comm comm, tMPI_Datatype sendtype,
410                          tMPI_Datatype recvtype,
411                          size_t sendsize, size_t recvsize,
412                          void* sendbuf, void* recvbuf, int *ret)
413 {
414     /* do root transfer */
415     if (recvsize < sendsize)
416     {
417         *ret = tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
418         return;
419     }
420     if (recvtype != sendtype)
421     {
422         *ret = tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
423         return;
424     }
425     if (sendbuf == recvbuf)
426     {
427         *ret = tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_XFER_BUF_OVERLAP);
428         return;
429     }
430
431     memcpy(recvbuf, sendbuf, sendsize);
432 }
433
434 int tMPI_Post_multi(struct coll_env *cev, int myrank, int index,
435                     int tag, tMPI_Datatype datatype, size_t bufsize,
436                     void *buf, int n_remaining, int synct, int dest)
437 {
438     int i;
439 #ifdef USE_COLLECTIVE_COPY_BUFFER
440     /* decide based on the number of waiting threads */
441     tmpi_bool using_cb = (bufsize < (size_t)(n_remaining*COPY_BUFFER_SIZE));
442
443     cev->met[myrank].using_cb = using_cb;
444     if (using_cb)
445     {
446         /* we set it to NULL initially */
447         /*cev->met[myrank].cpbuf[index]=NULL;*/
448         tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[index]), NULL);
449
450         tMPI_Atomic_set(&(cev->met[myrank].buf_readcount), 0);
451     }
452 #endif
453     cev->met[myrank].tag            = tag;
454     cev->met[myrank].datatype       = datatype;
455     cev->met[myrank].buf[index]     = buf;
456     cev->met[myrank].bufsize[index] = bufsize;
457     tMPI_Atomic_set(&(cev->met[myrank].n_remaining), n_remaining);
458     tMPI_Atomic_memory_barrier_rel();
459     tMPI_Atomic_set(&(cev->met[myrank].current_sync), synct);
460
461     /* publish availability. */
462     if (dest < 0)
463     {
464         for (i = 0; i < cev->N; i++)
465         {
466             if (i != myrank)
467             {
468                 tMPI_Event_signal( &(cev->met[i].recv_ev) );
469             }
470         }
471     }
472     else
473     {
474         tMPI_Event_signal( &(cev->met[dest].recv_ev) );
475     }
476
477 #ifdef USE_COLLECTIVE_COPY_BUFFER
478     /* becase we've published availability, we can start copying --
479        possibly in parallel with the receiver */
480     if (using_cb)
481     {
482         struct tmpi_thread *cur = tMPI_Get_current();
483         /* copy the buffer locally. First allocate */
484         cev->met[myrank].cb = tMPI_Copy_buffer_list_get( &(cur->cbl_multi) );
485         if (cev->met[myrank].cb == NULL)
486         {
487             return TMPI_ERR_COPY_NBUFFERS;
488         }
489         if (cev->met[myrank].cb->size < bufsize)
490         {
491             return TMPI_ERR_COPY_BUFFER_SIZE;
492         }
493         /* copy to the new buf */
494         memcpy(cev->met[myrank].cb->buf, buf, bufsize);
495
496         /* post the new buf */
497         tMPI_Atomic_memory_barrier_rel();
498         /*cev->met[myrank].cpbuf[index]=cev->met[myrank].cb->buf;*/
499         tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[index]),
500                             cev->met[myrank].cb->buf);
501     }
502 #endif
503     return TMPI_SUCCESS;
504 }
505
506 void tMPI_Wait_for_others(struct coll_env *cev, int myrank)
507 {
508 #if defined(TMPI_PROFILE)
509     struct tmpi_thread *cur = tMPI_Get_current();
510     tMPI_Profile_wait_start(cur);
511 #endif
512
513     if (cev->N > 1)
514     {
515 #ifdef USE_COLLECTIVE_COPY_BUFFER
516         if (!(cev->met[myrank].using_cb) )
517 #endif
518         {
519             /* wait until everybody else is done copying the buffer */
520             tMPI_Event_wait( &(cev->met[myrank].send_ev));
521             tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
522         }
523 #ifdef USE_COLLECTIVE_COPY_BUFFER
524         else
525         {
526             /* wait until everybody else is done copying the original buffer.
527                This wait is bound to be very short (otherwise it wouldn't
528                be double-buffering) so we always spin here. */
529 #if 0
530             /* dummy compare-and-swap to a value that is non-zero. The
531                atomic read with barrier below is simpler, but we keep this
532                code here commented out for if there is ever a platform
533                where the simple read doesn't work because of, say, cache
534                coherency issues. */
535             while (!tMPI_Atomic_cas( &(cev->met[rank].buf_readcount), 0,
536                                      -100000))
537 #endif
538 #if 1
539             tMPI_Atomic_memory_barrier();         /* a full barrier to make
540                                                      sure that the sending
541                                                      doesn't interfere with the
542                                                      waiting */
543             while (tMPI_Atomic_get( &(cev->met[myrank].buf_readcount) ) > 0)
544 #endif
545             {
546                 tMPI_Atomic_memory_barrier_acq();
547             }
548             tMPI_Atomic_memory_barrier_acq();
549         }
550 #endif
551     }
552 #if defined(TMPI_PROFILE)
553     tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_send);
554 #endif
555 }
556
557 void tMPI_Wait_for_data(struct tmpi_thread tmpi_unused *cur, struct coll_env *cev,
558                         int myrank)
559 {
560 #if defined(TMPI_PROFILE)
561     tMPI_Profile_wait_start(cur);
562 #endif
563     tMPI_Event_wait( &(cev->met[myrank].recv_ev));
564     tMPI_Event_process( &(cev->met[myrank].recv_ev), 1);
565 #if defined(TMPI_PROFILE)
566     tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_recv);
567 #endif
568 }
569
570 int tMPI_Barrier(tMPI_Comm comm)
571 {
572 #ifdef TMPI_PROFILE
573     struct tmpi_thread *cur = tMPI_Get_current();
574     tMPI_Profile_count_start(cur);
575 #endif
576
577 #ifdef TMPI_TRACE
578     tMPI_Trace_print("tMPI_Barrier(%p, %d, %p, %d, %d, %p, %p)", comm);
579 #endif
580
581     if (!comm)
582     {
583         return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
584     }
585
586     if (comm->grp.N > 1)
587     {
588 #if defined(TMPI_PROFILE)
589         tMPI_Profile_wait_start(cur);
590 #endif
591
592         tMPI_Barrier_wait( &(comm->barrier) );
593 #if defined(TMPI_PROFILE)
594         tMPI_Profile_wait_stop(cur, TMPIWAIT_Barrier);
595 #endif
596     }
597
598 #ifdef TMPI_PROFILE
599     tMPI_Profile_count_stop(cur, TMPIFN_Barrier);
600 #endif
601     return TMPI_SUCCESS;
602 }