2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
58 #include "collective.h"
61 int tMPI_Reduce_run_op(void *dest, void *src_a, void *src_b,
62 tMPI_Datatype datatype, int count, tMPI_Op op,
65 tMPI_Op_fn fn=datatype->op_functions[op];
69 return tMPI_Error(comm, TMPI_ERR_XFER_BUF_OVERLAP);
71 fn(dest, src_a, src_b, count);
75 int tMPI_Reduce_fast(void* sendbuf, void* recvbuf, int count,
76 tMPI_Datatype datatype, tMPI_Op op, int root,
79 struct tmpi_thread *cur=tMPI_Get_current();
80 int myrank=tMPI_Comm_seek_rank(comm, cur);
82 /* this function uses a binary tree-like reduction algorithm: */
83 int N=tMPI_Comm_N(comm);
84 int myrank_rtr=(N+myrank-root)%N; /* my rank relative to root */
85 int Nred=N; /* number of neighbours that still communicate
86 (decreases exponentially) */
87 int nbr_dist=1; /* distance between communicating neighbours
88 (increases exponentially) */
89 int stepping=2; /* distance between non-communicating neighbours
90 (increases exponentially) */
97 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
101 return tMPI_Error(comm, TMPI_ERR_BUF);
103 if ( (!datatype->op_functions) || (!datatype->op_functions[op]) )
105 return tMPI_Error(comm, TMPI_ERR_OP_FN);
108 if (sendbuf==TMPI_IN_PLACE)/* i.e. sendbuf == tMPI_IN_PLACE */
112 /* we set our send and recv buffer s*/
113 tMPI_Atomic_ptr_set(&(comm->reduce_sendbuf[myrank]),sendbuf);
114 tMPI_Atomic_ptr_set(&(comm->reduce_recvbuf[myrank]),recvbuf);
118 /* calculate neighbour rank (here we use the real rank) */
119 int nbr=(myrank_rtr%stepping==0) ?
120 (N+myrank+nbr_dist)%N :
121 (N+myrank-nbr_dist)%N;
124 printf("%d: iteration %d: myrank_rtr=%d, stepping=%d\n",
125 myrank, iteration, myrank_rtr, stepping);
128 /* check if I'm the reducing thread in this iteration's pair: */
129 if (myrank_rtr%stepping == 0)
134 /* now wait for my neighbor's data to become ready.
135 First check if I actually have a neighbor. */
136 if (myrank_rtr+nbr_dist<N)
139 printf("%d: waiting to reduce with %d, iteration=%d\n",
140 myrank, nbr, iteration);
144 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
145 tMPI_Profile_wait_start(cur);
147 tMPI_Event_wait( &(comm->csync[myrank].events[nbr]) );
148 tMPI_Event_process( &(comm->csync[myrank].events[nbr]), 1);
149 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
150 tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);
154 printf("%d: reducing with %d, iteration=%d\n",
155 myrank, nbr, iteration);
158 /* we reduce with our neighbour*/
161 /* for the first iteration, the inputs are in the
164 b=(void*)tMPI_Atomic_ptr_get(&(comm->reduce_sendbuf[nbr]));
168 /* after the first operation, they're already in
171 b=(void*)tMPI_Atomic_ptr_get(&(comm->reduce_recvbuf[nbr]));
174 if ((ret=tMPI_Reduce_run_op(recvbuf, a, b, datatype,
175 count, op, comm)) != TMPI_SUCCESS)
178 /* signal to my neighbour that I'm ready. */
179 tMPI_Event_signal( &(comm->csync[nbr].events[myrank]) );
184 printf("%d: not waiting copying buffer\n", myrank);
187 /* we still need to put things in the right buffer for the next
188 iteration. We need to check for overlapping buffers
189 here because MPI_IN_PLACE might cause recvbuf to be the
191 if (iteration==0 && (recvbuf!=sendbuf))
192 memcpy(recvbuf, sendbuf, datatype->size*count);
198 /* the other thread is doing the reducing; we can just
199 wait and break when ready */
200 /* Awake our neighbour */
201 tMPI_Event_signal( &(comm->csync[nbr].events[myrank]) );
205 printf("%d: signalled %d, now waiting: iteration=%d\n",
206 nbr, myrank, iteration);
210 /* And wait for an incoming event from out neighbour */
211 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
212 tMPI_Profile_wait_start(cur);
214 tMPI_Event_wait( &(comm->csync[myrank].events[nbr]) );
215 tMPI_Event_process( &(comm->csync[myrank].events[nbr]), 1);
216 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
217 tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);
219 /* now we can break because our data is reduced, and
220 our neighbour goes on reducing it further. */
225 printf("%d: iteration over, iteration=%d\n", myrank, iteration);
229 Nred = Nred/2 + Nred%2;
238 int tMPI_Reduce(void* sendbuf, void* recvbuf, int count,
239 tMPI_Datatype datatype, tMPI_Op op, int root, tMPI_Comm comm)
241 struct tmpi_thread *cur=tMPI_Get_current();
242 int myrank=tMPI_Comm_seek_rank(comm, cur);
246 tMPI_Profile_count_start(cur);
249 tMPI_Trace_print("tMPI_Reduce(%p, %p, %d, %p, %p, %d, %p)",
250 sendbuf, recvbuf, count, datatype, op, root, comm);
255 if (sendbuf==TMPI_IN_PLACE) /* i.e. sendbuf == TMPI_IN_PLACE */
262 #ifdef TMPI_WARN_MALLOC
263 fprintf(stderr, "Warning: malloc during tMPI_Reduce\n");
265 recvbuf=(void*)tMPI_Malloc(datatype->size*count);
267 ret=tMPI_Reduce_fast(sendbuf, recvbuf, count, datatype, op, root, comm);
273 tMPI_Profile_count_stop(cur, TMPIFN_Reduce);
278 int tMPI_Allreduce(void* sendbuf, void* recvbuf, int count,
279 tMPI_Datatype datatype, tMPI_Op op, tMPI_Comm comm)
281 void *rootbuf=NULL; /* root process' receive buffer */
282 struct tmpi_thread *cur=tMPI_Get_current();
283 int myrank=tMPI_Comm_seek_rank(comm, cur);
287 tMPI_Profile_count_start(cur);
290 tMPI_Trace_print("tMPI_Allreduce(%p, %p, %d, %p, %p, %p)",
291 sendbuf, recvbuf, count, datatype, op, comm);
297 return tMPI_Error(comm, TMPI_ERR_BUF);
299 if (sendbuf==TMPI_IN_PLACE) /* i.e. sendbuf == TMPI_IN_PLACE */
304 ret=tMPI_Reduce_fast(sendbuf, recvbuf, count, datatype, op, 0, comm);
305 #if defined(TMPI_PROFILE)
306 tMPI_Profile_wait_start(cur);
308 tMPI_Barrier_wait( &(comm->barrier));
309 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
310 tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);
312 /* distribute rootbuf */
313 rootbuf=(void*)tMPI_Atomic_ptr_get(&(comm->reduce_recvbuf[0]));
315 /* and now we just copy things back. We know that the root thread
316 arrives last, so there's no point in using tMPI_Scatter with
317 copy buffers, etc. */
320 if (rootbuf==recvbuf)
322 return tMPI_Error(comm, TMPI_ERR_XFER_BUF_OVERLAP);
324 memcpy(recvbuf, rootbuf, datatype->size*count );
327 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
328 tMPI_Profile_wait_start(cur);
330 tMPI_Barrier_wait( &(comm->barrier));
331 #if defined(TMPI_PROFILE)
332 tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);
333 tMPI_Profile_count_stop(cur, TMPIFN_Allreduce);