Merge release-4-6 into master
[alexxy/gromacs.git] / src / gromacs / gmxlib / thread_mpi / reduce.c
1 /*
2 This source code file is part of thread_mpi.  
3 Written by Sander Pronk, Erik Lindahl, and possibly others. 
4
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6 All rights reserved.
7
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11    notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13    notice, this list of conditions and the following disclaimer in the
14    documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16    names of its contributors may be used to endorse or promote products
17    derived from this software without specific prior written permission.
18
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
35 files.
36 */
37
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
40 #endif
41
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
45
46
47 #ifdef HAVE_UNISTD_H
48 #include <unistd.h>
49 #endif
50
51 #include <errno.h>
52 #include <stdlib.h>
53 #include <stdio.h>
54 #include <stdarg.h>
55 #include <string.h>
56
57 #include "impl.h"
58 #include "collective.h"
59
60
61 int tMPI_Reduce_run_op(void *dest, void *src_a, void *src_b, 
62                        tMPI_Datatype datatype, int count, tMPI_Op op, 
63                        tMPI_Comm comm)
64 {
65     tMPI_Op_fn fn=datatype->op_functions[op];
66
67     if (src_a==src_b)
68     {
69         return tMPI_Error(comm, TMPI_ERR_XFER_BUF_OVERLAP);
70     }
71     fn(dest, src_a, src_b, count);
72     return TMPI_SUCCESS;
73 }
74
75 int tMPI_Reduce_fast(void* sendbuf, void* recvbuf, int count, 
76                      tMPI_Datatype datatype, tMPI_Op op, int root, 
77                      tMPI_Comm comm)
78 {
79     struct tmpi_thread *cur=tMPI_Get_current();
80     int myrank=tMPI_Comm_seek_rank(comm, cur);
81
82     /* this function uses a binary tree-like reduction algorithm: */
83     int N=tMPI_Comm_N(comm);
84     int myrank_rtr=(N+myrank-root)%N; /* my rank relative to root */
85     int Nred=N; /* number of neighbours that still communicate 
86                    (decreases exponentially) */
87     int nbr_dist=1; /* distance between communicating neighbours 
88                        (increases exponentially) */
89     int stepping=2; /* distance between non-communicating neighbours
90                        (increases exponentially) */
91     int iteration=0;
92   
93     if (count==0)
94         return TMPI_SUCCESS;
95     if (!comm)
96     {
97         return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
98     }
99     if (!recvbuf)
100     {
101         return tMPI_Error(comm, TMPI_ERR_BUF);
102     }
103     if ( (!datatype->op_functions) || (!datatype->op_functions[op]) )
104     {
105         return tMPI_Error(comm, TMPI_ERR_OP_FN);
106     }
107
108     if (sendbuf==TMPI_IN_PLACE)/* i.e. sendbuf == tMPI_IN_PLACE */
109     {
110         sendbuf=recvbuf;
111     }
112     /* we set our send and recv buffer s*/
113     tMPI_Atomic_ptr_set(&(comm->reduce_sendbuf[myrank]),sendbuf);
114     tMPI_Atomic_ptr_set(&(comm->reduce_recvbuf[myrank]),recvbuf);
115
116     while (Nred>1)
117     {
118         /* calculate neighbour rank (here we use the real rank) */
119         int nbr=(myrank_rtr%stepping==0) ?  
120                   (N+myrank+nbr_dist)%N : 
121                   (N+myrank-nbr_dist)%N;
122
123 #ifdef TMPI_DEBUG
124         printf("%d: iteration %d: myrank_rtr=%d, stepping=%d\n", 
125                myrank, iteration, myrank_rtr, stepping);
126         fflush(stdout);
127 #endif
128         /* check if I'm the reducing thread in this iteration's pair: */
129         if (myrank_rtr%stepping == 0)
130         {
131             void *a,*b;
132             int ret;
133
134             /* now wait for my neighbor's data to become ready. 
135                First check if I actually have a neighbor. */
136             if (myrank_rtr+nbr_dist<N)
137             {
138 #ifdef TMPI_DEBUG
139                 printf("%d: waiting to reduce with %d, iteration=%d\n", 
140                        myrank, nbr, iteration);
141                 fflush(stdout);
142 #endif
143
144 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
145                 tMPI_Profile_wait_start(cur);
146 #endif
147                 tMPI_Event_wait( &(comm->csync[myrank].events[nbr]) );
148                 tMPI_Event_process( &(comm->csync[myrank].events[nbr]), 1);
149 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
150                 tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);
151 #endif
152
153 #ifdef TMPI_DEBUG
154                 printf("%d: reducing with %d, iteration=%d\n", 
155                        myrank, nbr, iteration);
156                 fflush(stdout);
157 #endif
158                 /* we reduce with our neighbour*/
159                 if (iteration==0)
160                 {
161                     /* for the first iteration, the inputs are in the 
162                        sendbuf*/
163                     a=sendbuf;
164                     b=(void*)tMPI_Atomic_ptr_get(&(comm->reduce_sendbuf[nbr]));
165                 }
166                 else
167                 {
168                     /* after the first operation, they're already in 
169                        the recvbuf */
170                     a=recvbuf;
171                     b=(void*)tMPI_Atomic_ptr_get(&(comm->reduce_recvbuf[nbr]));
172                 }
173
174                 if ((ret=tMPI_Reduce_run_op(recvbuf, a, b, datatype, 
175                                             count, op, comm)) != TMPI_SUCCESS)
176                     return ret;
177
178                 /* signal to my neighbour that I'm ready. */
179                 tMPI_Event_signal( &(comm->csync[nbr].events[myrank]) );
180             }
181             else
182             {
183 #ifdef TMPI_DEBUG
184             printf("%d: not waiting copying buffer\n", myrank);
185             fflush(stdout);
186 #endif
187                 /* we still need to put things in the right buffer for the next
188                    iteration. We need to check for overlapping buffers
189                    here because MPI_IN_PLACE might cause recvbuf to be the
190                    same as sendbuf. */
191                 if (iteration==0 && (recvbuf!=sendbuf))
192                     memcpy(recvbuf, sendbuf, datatype->size*count);
193             }
194
195         }
196         else
197         {
198             /* the other thread is doing the reducing; we can just
199                wait and break when ready */
200            /* Awake our neighbour */
201             tMPI_Event_signal( &(comm->csync[nbr].events[myrank]) );
202
203
204 #ifdef TMPI_DEBUG
205             printf("%d: signalled %d, now waiting: iteration=%d\n", 
206                    nbr, myrank,  iteration);
207             fflush(stdout);
208 #endif
209  
210             /* And wait for an incoming event from out neighbour */
211 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
212             tMPI_Profile_wait_start(cur);
213 #endif
214             tMPI_Event_wait( &(comm->csync[myrank].events[nbr]) );
215             tMPI_Event_process( &(comm->csync[myrank].events[nbr]), 1);
216 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
217             tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);
218 #endif
219             /* now we can break because our data is reduced, and
220                our neighbour goes on reducing it further. */
221             break; 
222         }
223
224 #ifdef TMPI_DEBUG
225         printf("%d: iteration over, iteration=%d\n", myrank,  iteration);
226         fflush(stdout);
227 #endif
228
229         Nred = Nred/2 + Nred%2;
230         nbr_dist*=2;
231         stepping*=2;
232         iteration++;
233     }
234
235     return TMPI_SUCCESS;
236 }
237
238 int tMPI_Reduce(void* sendbuf, void* recvbuf, int count,
239                tMPI_Datatype datatype, tMPI_Op op, int root, tMPI_Comm comm)
240 {
241     struct tmpi_thread *cur=tMPI_Get_current();
242     int myrank=tMPI_Comm_seek_rank(comm, cur);
243     int ret;
244
245 #ifdef TMPI_PROFILE
246     tMPI_Profile_count_start(cur);
247 #endif
248 #ifdef TMPI_TRACE
249     tMPI_Trace_print("tMPI_Reduce(%p, %p, %d, %p, %p, %d, %p)",
250                        sendbuf, recvbuf, count, datatype, op, root, comm);
251 #endif
252
253     if (myrank==root)
254     {
255         if (sendbuf==TMPI_IN_PLACE) /* i.e. sendbuf == TMPI_IN_PLACE */
256         {
257             sendbuf=recvbuf;
258         }
259     }
260     else
261     {
262 #ifdef TMPI_WARN_MALLOC
263         fprintf(stderr, "Warning: malloc during tMPI_Reduce\n");
264 #endif
265         recvbuf=(void*)tMPI_Malloc(datatype->size*count);
266     }
267     ret=tMPI_Reduce_fast(sendbuf, recvbuf, count, datatype, op, root, comm);
268     if (myrank!=root)
269     {
270         free(recvbuf);
271     }
272 #ifdef TMPI_PROFILE
273     tMPI_Profile_count_stop(cur, TMPIFN_Reduce);
274 #endif
275     return ret;
276 }
277
278 int tMPI_Allreduce(void* sendbuf, void* recvbuf, int count,
279                    tMPI_Datatype datatype, tMPI_Op op, tMPI_Comm comm)
280 {
281     void *rootbuf=NULL; /* root process' receive buffer */
282     struct tmpi_thread *cur=tMPI_Get_current();
283     int myrank=tMPI_Comm_seek_rank(comm, cur);
284     int ret;
285
286 #ifdef TMPI_PROFILE
287     tMPI_Profile_count_start(cur);
288 #endif
289 #ifdef TMPI_TRACE
290     tMPI_Trace_print("tMPI_Allreduce(%p, %p, %d, %p, %p, %p)",
291                      sendbuf, recvbuf, count, datatype, op, comm);
292 #endif
293     if (count==0)
294         return TMPI_SUCCESS;
295     if (!recvbuf)
296     {
297         return tMPI_Error(comm, TMPI_ERR_BUF);
298     }
299     if (sendbuf==TMPI_IN_PLACE) /* i.e. sendbuf == TMPI_IN_PLACE */
300     {
301         sendbuf=recvbuf;
302     }
303
304     ret=tMPI_Reduce_fast(sendbuf, recvbuf, count, datatype, op, 0, comm);
305 #if defined(TMPI_PROFILE) 
306     tMPI_Profile_wait_start(cur);
307 #endif
308     tMPI_Barrier_wait( &(comm->barrier));
309 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
310     tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);
311 #endif
312     /* distribute rootbuf */
313     rootbuf=(void*)tMPI_Atomic_ptr_get(&(comm->reduce_recvbuf[0]));
314
315     /* and now we just copy things back. We know that the root thread 
316        arrives last, so there's no point in using tMPI_Scatter with 
317        copy buffers, etc. */
318     if (myrank != 0)
319     {
320         if (rootbuf==recvbuf)
321         {
322             return tMPI_Error(comm, TMPI_ERR_XFER_BUF_OVERLAP);
323         }
324         memcpy(recvbuf, rootbuf, datatype->size*count );
325     }
326
327 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
328     tMPI_Profile_wait_start(cur);
329 #endif
330     tMPI_Barrier_wait( &(comm->barrier));
331 #if defined(TMPI_PROFILE)
332     tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);
333     tMPI_Profile_count_stop(cur, TMPIFN_Allreduce);
334 #endif
335     return ret;
336 }
337
338