Code beautification with uncrustify
[alexxy/gromacs.git] / src / gromacs / gmxlib / thread_mpi / alltoall.c
1 /*
2    This source code file is part of thread_mpi.
3    Written by Sander Pronk, Erik Lindahl, and possibly others.
4
5    Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6    All rights reserved.
7
8    Redistribution and use in source and binary forms, with or without
9    modification, are permitted provided that the following conditions are met:
10    1) Redistributions of source code must retain the above copyright
11    notice, this list of conditions and the following disclaimer.
12    2) Redistributions in binary form must reproduce the above copyright
13    notice, this list of conditions and the following disclaimer in the
14    documentation and/or other materials provided with the distribution.
15    3) Neither the name of the copyright holders nor the
16    names of its contributors may be used to endorse or promote products
17    derived from this software without specific prior written permission.
18
19    THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20    EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21    WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22    DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23    DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24    (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26    ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28    SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30    If you want to redistribute modifications, please consider that
31    scientific software is very special. Version control is crucial -
32    bugs must be traceable. We will be happy to consider code for
33    inclusion in the official distribution, but derived work should not
34    be called official thread_mpi. Details are found in the README & COPYING
35    files.
36  */
37
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
40 #endif
41
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
45
46
47 #ifdef HAVE_UNISTD_H
48 #include <unistd.h>
49 #endif
50
51 #include <errno.h>
52 #include <stdlib.h>
53 #include <stdio.h>
54 #include <stdarg.h>
55 #include <string.h>
56
57 #include "impl.h"
58 #include "collective.h"
59
60
61
62 int tMPI_Alltoall(void* sendbuf, int sendcount, tMPI_Datatype sendtype,
63                   void* recvbuf, int recvcount, tMPI_Datatype recvtype,
64                   tMPI_Comm comm)
65 {
66     int                 synct;
67     struct coll_env    *cev;
68     int                 myrank;
69     int                 ret = TMPI_SUCCESS;
70     int                 i;
71     size_t              sendsize = sendtype->size*sendcount;
72     size_t              recvsize = recvtype->size*recvcount;
73     int                 n_remaining;
74     struct tmpi_thread *cur = tMPI_Get_current();
75
76 #ifdef TMPI_PROFILE
77     tMPI_Profile_count_start(cur);
78 #endif
79 #ifdef TMPI_TRACE
80     tMPI_Trace_print("tMPI_Alltoall(%p, %d, %p, %p, %d, %p, %p)",
81                      sendbuf, sendcount, sendtype,
82                      recvbuf, recvcount, recvtype, comm);
83 #endif
84
85     if (!comm)
86     {
87         return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
88     }
89     if (!sendbuf || !recvbuf) /* don't do pointer arithmetic on a NULL ptr */
90     {
91         return tMPI_Error(comm, TMPI_ERR_BUF);
92     }
93
94     myrank = tMPI_Comm_seek_rank(comm, cur);
95
96     /* we increase our counter, and determine which coll_env we get */
97     cev = tMPI_Get_cev(comm, myrank, &synct);
98
99     /* post our pointers */
100     /* we set up multiple posts, so no Post_multi */
101     cev->met[myrank].tag      = TMPI_ALLTOALL_TAG;
102     cev->met[myrank].datatype = sendtype;
103     tMPI_Atomic_set( &(cev->met[myrank].n_remaining), cev->N-1 );
104     for (i = 0; i < comm->grp.N; i++)
105     {
106         cev->met[myrank].bufsize[i]   = sendsize;
107         cev->met[myrank].buf[i]       = (char*)sendbuf+sendsize*i;
108         cev->met[myrank].read_data[i] = FALSE;
109     }
110     tMPI_Atomic_memory_barrier_rel();
111     tMPI_Atomic_set(&(cev->met[myrank].current_sync), synct);
112
113     /* post availability */
114     for (i = 0; i < cev->N; i++)
115     {
116         if (i != myrank)
117         {
118             tMPI_Event_signal( &(cev->met[i].recv_ev) );
119         }
120     }
121
122     /* we don't do the copy buffer thing here because it's pointless:
123        the processes have to synchronize anyway, because they all
124        send and receive. */
125
126     /* do root transfer */
127     tMPI_Coll_root_xfer(comm, sendtype, recvtype,
128                         sendsize, recvsize,
129                         (char*)sendbuf+sendsize*myrank,
130                         (char*)recvbuf+recvsize*myrank, &ret);
131     cev->met[myrank].read_data[myrank] = TRUE;
132     /* and poll data availability */
133     n_remaining = cev->N-1;
134     while (n_remaining > 0)
135     {
136 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
137         tMPI_Profile_wait_start(cur);
138 #endif
139         tMPI_Event_wait( &(cev->met[myrank]).recv_ev );
140 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
141         tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_recv);
142 #endif
143         for (i = 0; i < cev->N; i++)
144         {
145             if ((!cev->met[myrank].read_data[i]) &&
146                 (tMPI_Atomic_get(&(cev->met[i].current_sync)) == synct))
147             {
148                 tMPI_Event_process( &(cev->met[myrank]).recv_ev, 1);
149                 tMPI_Mult_recv(comm, cev, i, myrank, TMPI_ALLTOALL_TAG,
150                                recvtype, recvsize, (char*)recvbuf+recvsize*i,
151                                &ret);
152                 if (ret != TMPI_SUCCESS)
153                 {
154                     return ret;
155                 }
156                 cev->met[myrank].read_data[i] = TRUE;
157                 n_remaining--;
158             }
159         }
160     }
161
162
163     /* and wait until everybody is done copying our data */
164     tMPI_Wait_for_others(cev, myrank);
165
166 #ifdef TMPI_PROFILE
167     tMPI_Profile_count_stop(cur, TMPIFN_Alltoall);
168 #endif
169     return ret;
170 }
171
172
173 int tMPI_Alltoallv(void* sendbuf, int *sendcounts, int *sdispls,
174                    tMPI_Datatype sendtype,
175                    void* recvbuf, int *recvcounts, int *rdispls,
176                    tMPI_Datatype recvtype,
177                    tMPI_Comm comm)
178
179 {
180     int                 synct;
181     struct coll_env    *cev;
182     int                 myrank;
183     int                 ret = TMPI_SUCCESS;
184     int                 i;
185     int                 n_remaining;
186     struct tmpi_thread *cur = tMPI_Get_current();
187
188 #ifdef TMPI_PROFILE
189     tMPI_Profile_count_start(cur);
190 #endif
191 #ifdef TMPI_TRACE
192     tMPI_Trace_print("tMPI_Alltoallv(%p, %p, %p, %p, %p, %p, %p, %p, %p, %p)",
193                      sendbuf, sendcounts, sdispls, sendtype,
194                      recvbuf, recvcounts, rdispls, recvtype,
195                      comm);
196 #endif
197     if (!comm)
198     {
199         return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
200     }
201     if (!sendbuf || !recvbuf) /* don't do pointer arithmetic on a NULL ptr */
202     {
203         return tMPI_Error(comm, TMPI_ERR_BUF);
204     }
205
206     myrank = tMPI_Comm_seek_rank(comm, cur);
207
208     /* we increase our counter, and determine which coll_env we get */
209     cev = tMPI_Get_cev(comm, myrank, &synct);
210
211     /* post our pointers */
212     /* we set up multiple posts, so no Post_multi */
213     cev->met[myrank].tag      = TMPI_ALLTOALLV_TAG;
214     cev->met[myrank].datatype = sendtype;
215     tMPI_Atomic_set( &(cev->met[myrank].n_remaining), cev->N-1 );
216     for (i = 0; i < comm->grp.N; i++)
217     {
218         cev->met[myrank].bufsize[i]   = sendtype->size*sendcounts[i];
219         cev->met[myrank].buf[i]       = (char*)sendbuf+sendtype->size*sdispls[i];
220         cev->met[myrank].read_data[i] = FALSE;
221     }
222     tMPI_Atomic_memory_barrier_rel();
223     tMPI_Atomic_set(&(cev->met[myrank].current_sync), synct);
224
225     /* post availability */
226     for (i = 0; i < cev->N; i++)
227     {
228         if (i != myrank)
229         {
230             tMPI_Event_signal( &(cev->met[i].recv_ev) );
231         }
232     }
233
234     /* we don't do the copy buffer thing here because it's pointless:
235        the processes have to synchronize anyway, because they all
236        send and receive. */
237
238     /* do root transfer */
239     tMPI_Coll_root_xfer(comm, sendtype, recvtype,
240                         sendtype->size*sendcounts[myrank],
241                         recvtype->size*recvcounts[myrank],
242                         (char*)sendbuf+sendtype->size*sdispls[myrank],
243                         (char*)recvbuf+recvtype->size*rdispls[myrank], &ret);
244     cev->met[myrank].read_data[myrank] = TRUE;
245
246     /* and poll data availability */
247     n_remaining = cev->N-1;
248     while (n_remaining > 0)
249     {
250 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
251         tMPI_Profile_wait_start(cur);
252 #endif
253         tMPI_Event_wait( &(cev->met[myrank]).recv_ev );
254 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
255         tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_recv);
256 #endif
257         for (i = 0; i < cev->N; i++)
258         {
259             if ((!cev->met[myrank].read_data[i]) &&
260                 (tMPI_Atomic_get(&(cev->met[i].current_sync)) == synct) )
261             {
262                 tMPI_Event_process( &(cev->met[myrank]).recv_ev, 1);
263                 tMPI_Mult_recv(comm, cev, i, myrank, TMPI_ALLTOALLV_TAG,
264                                recvtype, recvtype->size*recvcounts[i],
265                                (char*)recvbuf+recvtype->size*rdispls[i], &ret);
266                 if (ret != TMPI_SUCCESS)
267                 {
268                     return ret;
269                 }
270                 cev->met[myrank].read_data[i] = TRUE;
271                 n_remaining--;
272             }
273         }
274     }
275
276     /* and wait until everybody is done copying our data */
277     tMPI_Wait_for_others(cev, myrank);
278
279 #ifdef TMPI_PROFILE
280     tMPI_Profile_count_stop(cur, TMPIFN_Alltoallv);
281 #endif
282     return ret;
283 }