Fix use of inline in IMD
[alexxy/gromacs.git] / src / gromacs / gmxlib / thread_mpi / reduce.c
1 /*
2    This source code file is part of thread_mpi.
3    Written by Sander Pronk, Erik Lindahl, and possibly others.
4
5    Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6    All rights reserved.
7
8    Redistribution and use in source and binary forms, with or without
9    modification, are permitted provided that the following conditions are met:
10    1) Redistributions of source code must retain the above copyright
11    notice, this list of conditions and the following disclaimer.
12    2) Redistributions in binary form must reproduce the above copyright
13    notice, this list of conditions and the following disclaimer in the
14    documentation and/or other materials provided with the distribution.
15    3) Neither the name of the copyright holders nor the
16    names of its contributors may be used to endorse or promote products
17    derived from this software without specific prior written permission.
18
19    THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20    EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21    WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22    DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23    DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24    (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26    ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28    SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30    If you want to redistribute modifications, please consider that
31    scientific software is very special. Version control is crucial -
32    bugs must be traceable. We will be happy to consider code for
33    inclusion in the official distribution, but derived work should not
34    be called official thread_mpi. Details are found in the README & COPYING
35    files.
36  */
37
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
40 #endif
41
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
45
46
47 #ifdef HAVE_UNISTD_H
48 #include <unistd.h>
49 #endif
50
51 #include <errno.h>
52 #include <stdlib.h>
53 #include <stdio.h>
54 #include <stdarg.h>
55 #include <string.h>
56
57 #include "impl.h"
58 #include "collective.h"
59
60
61 int tMPI_Reduce_run_op(void *dest, void *src_a, void *src_b,
62                        tMPI_Datatype datatype, int count, tMPI_Op op,
63                        tMPI_Comm comm)
64 {
65     tMPI_Op_fn fn = datatype->op_functions[op];
66
67     if (src_a == src_b)
68     {
69         return tMPI_Error(comm, TMPI_ERR_XFER_BUF_OVERLAP);
70     }
71     fn(dest, src_a, src_b, count);
72     return TMPI_SUCCESS;
73 }
74
75 int tMPI_Reduce_fast(void* sendbuf, void* recvbuf, int count,
76                      tMPI_Datatype datatype, tMPI_Op op, int root,
77                      tMPI_Comm comm)
78 {
79     struct tmpi_thread *cur    = tMPI_Get_current();
80     int                 myrank = tMPI_Comm_seek_rank(comm, cur);
81
82     /* this function uses a binary tree-like reduction algorithm: */
83     int N          = tMPI_Comm_N(comm);
84     int myrank_rtr = (N+myrank-root)%N; /* my rank relative to root */
85     int Nred       = N;                 /* number of neighbours that still communicate
86                                            (decreases exponentially) */
87     int nbr_dist   = 1;                 /* distance between communicating neighbours
88                                            (increases exponentially) */
89     int stepping   = 2;                 /* distance between non-communicating neighbours
90                                            (increases exponentially) */
91     int iteration  = 0;
92
93     if (count == 0)
94     {
95         return TMPI_SUCCESS;
96     }
97     if (!comm)
98     {
99         return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
100     }
101     if (!recvbuf)
102     {
103         return tMPI_Error(comm, TMPI_ERR_BUF);
104     }
105     if ( (!datatype->op_functions) || (!datatype->op_functions[op]) )
106     {
107         return tMPI_Error(comm, TMPI_ERR_OP_FN);
108     }
109
110     if (sendbuf == TMPI_IN_PLACE) /* i.e. sendbuf == tMPI_IN_PLACE */
111     {
112         sendbuf = recvbuf;
113     }
114     /* we set our send and recv buffer s*/
115     tMPI_Atomic_ptr_set(&(comm->reduce_sendbuf[myrank]), sendbuf);
116     tMPI_Atomic_ptr_set(&(comm->reduce_recvbuf[myrank]), recvbuf);
117
118     while (Nred > 1)
119     {
120         /* calculate neighbour rank (here we use the real rank) */
121         int nbr = (myrank_rtr%stepping == 0) ?
122             (N+myrank+nbr_dist)%N :
123             (N+myrank-nbr_dist)%N;
124
125 #ifdef TMPI_DEBUG
126         printf("%d: iteration %d: myrank_rtr=%d, stepping=%d\n",
127                myrank, iteration, myrank_rtr, stepping);
128         fflush(stdout);
129 #endif
130         /* check if I'm the reducing thread in this iteration's pair: */
131         if (myrank_rtr%stepping == 0)
132         {
133             void *a, *b;
134             int   ret;
135
136             /* now wait for my neighbor's data to become ready.
137                First check if I actually have a neighbor. */
138             if (myrank_rtr+nbr_dist < N)
139             {
140 #ifdef TMPI_DEBUG
141                 printf("%d: waiting to reduce with %d, iteration=%d\n",
142                        myrank, nbr, iteration);
143                 fflush(stdout);
144 #endif
145
146 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
147                 tMPI_Profile_wait_start(cur);
148 #endif
149                 tMPI_Event_wait( &(comm->csync[myrank].events[nbr]) );
150                 tMPI_Event_process( &(comm->csync[myrank].events[nbr]), 1);
151 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
152                 tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);
153 #endif
154
155 #ifdef TMPI_DEBUG
156                 printf("%d: reducing with %d, iteration=%d\n",
157                        myrank, nbr, iteration);
158                 fflush(stdout);
159 #endif
160                 /* we reduce with our neighbour*/
161                 if (iteration == 0)
162                 {
163                     /* for the first iteration, the inputs are in the
164                        sendbuf*/
165                     a = sendbuf;
166                     b = (void*)tMPI_Atomic_ptr_get(&(comm->reduce_sendbuf[nbr]));
167                 }
168                 else
169                 {
170                     /* after the first operation, they're already in
171                        the recvbuf */
172                     a = recvbuf;
173                     b = (void*)tMPI_Atomic_ptr_get(&(comm->reduce_recvbuf[nbr]));
174                 }
175
176                 if ((ret = tMPI_Reduce_run_op(recvbuf, a, b, datatype,
177                                               count, op, comm)) != TMPI_SUCCESS)
178                 {
179                     return ret;
180                 }
181
182                 /* signal to my neighbour that I'm ready. */
183                 tMPI_Event_signal( &(comm->csync[nbr].events[myrank]) );
184             }
185             else
186             {
187 #ifdef TMPI_DEBUG
188                 printf("%d: not waiting copying buffer\n", myrank);
189                 fflush(stdout);
190 #endif
191                 /* we still need to put things in the right buffer for the next
192                    iteration. We need to check for overlapping buffers
193                    here because MPI_IN_PLACE might cause recvbuf to be the
194                    same as sendbuf. */
195                 if (iteration == 0 && (recvbuf != sendbuf))
196                 {
197                     memcpy(recvbuf, sendbuf, datatype->size*count);
198                 }
199             }
200
201         }
202         else
203         {
204             /* the other thread is doing the reducing; we can just
205                wait and break when ready */
206             /* Awake our neighbour */
207             tMPI_Event_signal( &(comm->csync[nbr].events[myrank]) );
208
209
210 #ifdef TMPI_DEBUG
211             printf("%d: signalled %d, now waiting: iteration=%d\n",
212                    nbr, myrank,  iteration);
213             fflush(stdout);
214 #endif
215
216             /* And wait for an incoming event from out neighbour */
217 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
218             tMPI_Profile_wait_start(cur);
219 #endif
220             tMPI_Event_wait( &(comm->csync[myrank].events[nbr]) );
221             tMPI_Event_process( &(comm->csync[myrank].events[nbr]), 1);
222 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
223             tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);
224 #endif
225             /* now we can break because our data is reduced, and
226                our neighbour goes on reducing it further. */
227             break;
228         }
229
230 #ifdef TMPI_DEBUG
231         printf("%d: iteration over, iteration=%d\n", myrank,  iteration);
232         fflush(stdout);
233 #endif
234
235         Nred      = Nred/2 + Nred%2;
236         nbr_dist *= 2;
237         stepping *= 2;
238         iteration++;
239     }
240
241     return TMPI_SUCCESS;
242 }
243
244 int tMPI_Reduce(void* sendbuf, void* recvbuf, int count,
245                 tMPI_Datatype datatype, tMPI_Op op, int root, tMPI_Comm comm)
246 {
247     struct tmpi_thread *cur    = tMPI_Get_current();
248     int                 myrank = tMPI_Comm_seek_rank(comm, cur);
249     int                 ret;
250
251 #ifdef TMPI_PROFILE
252     tMPI_Profile_count_start(cur);
253 #endif
254 #ifdef TMPI_TRACE
255     tMPI_Trace_print("tMPI_Reduce(%p, %p, %d, %p, %p, %d, %p)",
256                      sendbuf, recvbuf, count, datatype, op, root, comm);
257 #endif
258
259     if (myrank == root)
260     {
261         if (sendbuf == TMPI_IN_PLACE) /* i.e. sendbuf == TMPI_IN_PLACE */
262         {
263             sendbuf = recvbuf;
264         }
265     }
266     else
267     {
268 #ifdef TMPI_WARN_MALLOC
269         fprintf(stderr, "Warning: malloc during tMPI_Reduce\n");
270 #endif
271         recvbuf = (void*)tMPI_Malloc(datatype->size*count);
272     }
273     ret = tMPI_Reduce_fast(sendbuf, recvbuf, count, datatype, op, root, comm);
274     if (myrank != root)
275     {
276         free(recvbuf);
277     }
278 #ifdef TMPI_PROFILE
279     tMPI_Profile_count_stop(cur, TMPIFN_Reduce);
280 #endif
281     return ret;
282 }
283
284 int tMPI_Allreduce(void* sendbuf, void* recvbuf, int count,
285                    tMPI_Datatype datatype, tMPI_Op op, tMPI_Comm comm)
286 {
287     void               *rootbuf = NULL; /* root process' receive buffer */
288     struct tmpi_thread *cur     = tMPI_Get_current();
289     int                 myrank  = tMPI_Comm_seek_rank(comm, cur);
290     int                 ret;
291
292 #ifdef TMPI_PROFILE
293     tMPI_Profile_count_start(cur);
294 #endif
295 #ifdef TMPI_TRACE
296     tMPI_Trace_print("tMPI_Allreduce(%p, %p, %d, %p, %p, %p)",
297                      sendbuf, recvbuf, count, datatype, op, comm);
298 #endif
299     if (count == 0)
300     {
301         return TMPI_SUCCESS;
302     }
303     if (!recvbuf)
304     {
305         return tMPI_Error(comm, TMPI_ERR_BUF);
306     }
307     if (sendbuf == TMPI_IN_PLACE) /* i.e. sendbuf == TMPI_IN_PLACE */
308     {
309         sendbuf = recvbuf;
310     }
311
312     ret = tMPI_Reduce_fast(sendbuf, recvbuf, count, datatype, op, 0, comm);
313 #if defined(TMPI_PROFILE)
314     tMPI_Profile_wait_start(cur);
315 #endif
316     tMPI_Barrier_wait( &(comm->barrier));
317 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
318     tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);
319 #endif
320     /* distribute rootbuf */
321     rootbuf = (void*)tMPI_Atomic_ptr_get(&(comm->reduce_recvbuf[0]));
322
323     /* and now we just copy things back. We know that the root thread
324        arrives last, so there's no point in using tMPI_Scatter with
325        copy buffers, etc. */
326     if (myrank != 0)
327     {
328         if (rootbuf == recvbuf)
329         {
330             return tMPI_Error(comm, TMPI_ERR_XFER_BUF_OVERLAP);
331         }
332         memcpy(recvbuf, rootbuf, datatype->size*count );
333     }
334
335 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
336     tMPI_Profile_wait_start(cur);
337 #endif
338     tMPI_Barrier_wait( &(comm->barrier));
339 #if defined(TMPI_PROFILE)
340     tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);
341     tMPI_Profile_count_stop(cur, TMPIFN_Allreduce);
342 #endif
343     return ret;
344 }