2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
58 #include "collective.h"
61 int tMPI_Reduce_run_op(void *dest, void *src_a, void *src_b,
62 tMPI_Datatype datatype, int count, tMPI_Op op,
65 tMPI_Op_fn fn = datatype->op_functions[op];
69 return tMPI_Error(comm, TMPI_ERR_XFER_BUF_OVERLAP);
71 fn(dest, src_a, src_b, count);
75 int tMPI_Reduce_fast(void* sendbuf, void* recvbuf, int count,
76 tMPI_Datatype datatype, tMPI_Op op, int root,
79 struct tmpi_thread *cur = tMPI_Get_current();
80 int myrank = tMPI_Comm_seek_rank(comm, cur);
82 /* this function uses a binary tree-like reduction algorithm: */
83 int N = tMPI_Comm_N(comm);
84 int myrank_rtr = (N+myrank-root)%N; /* my rank relative to root */
85 int Nred = N; /* number of neighbours that still communicate
86 (decreases exponentially) */
87 int nbr_dist = 1; /* distance between communicating neighbours
88 (increases exponentially) */
89 int stepping = 2; /* distance between non-communicating neighbours
90 (increases exponentially) */
99 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
103 return tMPI_Error(comm, TMPI_ERR_BUF);
105 if ( (!datatype->op_functions) || (!datatype->op_functions[op]) )
107 return tMPI_Error(comm, TMPI_ERR_OP_FN);
110 if (sendbuf == TMPI_IN_PLACE) /* i.e. sendbuf == tMPI_IN_PLACE */
114 /* we set our send and recv buffer s*/
115 tMPI_Atomic_ptr_set(&(comm->reduce_sendbuf[myrank]), sendbuf);
116 tMPI_Atomic_ptr_set(&(comm->reduce_recvbuf[myrank]), recvbuf);
120 /* calculate neighbour rank (here we use the real rank) */
121 int nbr = (myrank_rtr%stepping == 0) ?
122 (N+myrank+nbr_dist)%N :
123 (N+myrank-nbr_dist)%N;
126 printf("%d: iteration %d: myrank_rtr=%d, stepping=%d\n",
127 myrank, iteration, myrank_rtr, stepping);
130 /* check if I'm the reducing thread in this iteration's pair: */
131 if (myrank_rtr%stepping == 0)
136 /* now wait for my neighbor's data to become ready.
137 First check if I actually have a neighbor. */
138 if (myrank_rtr+nbr_dist < N)
141 printf("%d: waiting to reduce with %d, iteration=%d\n",
142 myrank, nbr, iteration);
146 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
147 tMPI_Profile_wait_start(cur);
149 tMPI_Event_wait( &(comm->csync[myrank].events[nbr]) );
150 tMPI_Event_process( &(comm->csync[myrank].events[nbr]), 1);
151 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
152 tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);
156 printf("%d: reducing with %d, iteration=%d\n",
157 myrank, nbr, iteration);
160 /* we reduce with our neighbour*/
163 /* for the first iteration, the inputs are in the
166 b = (void*)tMPI_Atomic_ptr_get(&(comm->reduce_sendbuf[nbr]));
170 /* after the first operation, they're already in
173 b = (void*)tMPI_Atomic_ptr_get(&(comm->reduce_recvbuf[nbr]));
176 if ((ret = tMPI_Reduce_run_op(recvbuf, a, b, datatype,
177 count, op, comm)) != TMPI_SUCCESS)
182 /* signal to my neighbour that I'm ready. */
183 tMPI_Event_signal( &(comm->csync[nbr].events[myrank]) );
188 printf("%d: not waiting copying buffer\n", myrank);
191 /* we still need to put things in the right buffer for the next
192 iteration. We need to check for overlapping buffers
193 here because MPI_IN_PLACE might cause recvbuf to be the
195 if (iteration == 0 && (recvbuf != sendbuf))
197 memcpy(recvbuf, sendbuf, datatype->size*count);
204 /* the other thread is doing the reducing; we can just
205 wait and break when ready */
206 /* Awake our neighbour */
207 tMPI_Event_signal( &(comm->csync[nbr].events[myrank]) );
211 printf("%d: signalled %d, now waiting: iteration=%d\n",
212 nbr, myrank, iteration);
216 /* And wait for an incoming event from out neighbour */
217 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
218 tMPI_Profile_wait_start(cur);
220 tMPI_Event_wait( &(comm->csync[myrank].events[nbr]) );
221 tMPI_Event_process( &(comm->csync[myrank].events[nbr]), 1);
222 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
223 tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);
225 /* now we can break because our data is reduced, and
226 our neighbour goes on reducing it further. */
231 printf("%d: iteration over, iteration=%d\n", myrank, iteration);
235 Nred = Nred/2 + Nred%2;
244 int tMPI_Reduce(void* sendbuf, void* recvbuf, int count,
245 tMPI_Datatype datatype, tMPI_Op op, int root, tMPI_Comm comm)
247 struct tmpi_thread *cur = tMPI_Get_current();
248 int myrank = tMPI_Comm_seek_rank(comm, cur);
252 tMPI_Profile_count_start(cur);
255 tMPI_Trace_print("tMPI_Reduce(%p, %p, %d, %p, %p, %d, %p)",
256 sendbuf, recvbuf, count, datatype, op, root, comm);
261 if (sendbuf == TMPI_IN_PLACE) /* i.e. sendbuf == TMPI_IN_PLACE */
268 #ifdef TMPI_WARN_MALLOC
269 fprintf(stderr, "Warning: malloc during tMPI_Reduce\n");
271 recvbuf = (void*)tMPI_Malloc(datatype->size*count);
273 ret = tMPI_Reduce_fast(sendbuf, recvbuf, count, datatype, op, root, comm);
279 tMPI_Profile_count_stop(cur, TMPIFN_Reduce);
284 int tMPI_Allreduce(void* sendbuf, void* recvbuf, int count,
285 tMPI_Datatype datatype, tMPI_Op op, tMPI_Comm comm)
287 void *rootbuf = NULL; /* root process' receive buffer */
288 struct tmpi_thread *cur = tMPI_Get_current();
289 int myrank = tMPI_Comm_seek_rank(comm, cur);
293 tMPI_Profile_count_start(cur);
296 tMPI_Trace_print("tMPI_Allreduce(%p, %p, %d, %p, %p, %p)",
297 sendbuf, recvbuf, count, datatype, op, comm);
305 return tMPI_Error(comm, TMPI_ERR_BUF);
307 if (sendbuf == TMPI_IN_PLACE) /* i.e. sendbuf == TMPI_IN_PLACE */
312 ret = tMPI_Reduce_fast(sendbuf, recvbuf, count, datatype, op, 0, comm);
313 #if defined(TMPI_PROFILE)
314 tMPI_Profile_wait_start(cur);
316 tMPI_Barrier_wait( &(comm->barrier));
317 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
318 tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);
320 /* distribute rootbuf */
321 rootbuf = (void*)tMPI_Atomic_ptr_get(&(comm->reduce_recvbuf[0]));
323 /* and now we just copy things back. We know that the root thread
324 arrives last, so there's no point in using tMPI_Scatter with
325 copy buffers, etc. */
328 if (rootbuf == recvbuf)
330 return tMPI_Error(comm, TMPI_ERR_XFER_BUF_OVERLAP);
332 memcpy(recvbuf, rootbuf, datatype->size*count );
335 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
336 tMPI_Profile_wait_start(cur);
338 tMPI_Barrier_wait( &(comm->barrier));
339 #if defined(TMPI_PROFILE)
340 tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);
341 tMPI_Profile_count_stop(cur, TMPIFN_Allreduce);