Code beautification with uncrustify
[alexxy/gromacs.git] / src / gromacs / gmxlib / thread_mpi / scatter.c
1 /*
2    This source code file is part of thread_mpi.
3    Written by Sander Pronk, Erik Lindahl, and possibly others.
4
5    Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6    All rights reserved.
7
8    Redistribution and use in source and binary forms, with or without
9    modification, are permitted provided that the following conditions are met:
10    1) Redistributions of source code must retain the above copyright
11    notice, this list of conditions and the following disclaimer.
12    2) Redistributions in binary form must reproduce the above copyright
13    notice, this list of conditions and the following disclaimer in the
14    documentation and/or other materials provided with the distribution.
15    3) Neither the name of the copyright holders nor the
16    names of its contributors may be used to endorse or promote products
17    derived from this software without specific prior written permission.
18
19    THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20    EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21    WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22    DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23    DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24    (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26    ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28    SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30    If you want to redistribute modifications, please consider that
31    scientific software is very special. Version control is crucial -
32    bugs must be traceable. We will be happy to consider code for
33    inclusion in the official distribution, but derived work should not
34    be called official thread_mpi. Details are found in the README & COPYING
35    files.
36  */
37
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
40 #endif
41
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
45
46
47 #ifdef HAVE_UNISTD_H
48 #include <unistd.h>
49 #endif
50
51 #include <errno.h>
52 #include <stdlib.h>
53 #include <stdio.h>
54 #include <stdarg.h>
55 #include <string.h>
56
57 #include "impl.h"
58 #include "collective.h"
59
60
61
62 int tMPI_Scatter(void* sendbuf, int sendcount, tMPI_Datatype sendtype,
63                  void* recvbuf, int recvcount, tMPI_Datatype recvtype,
64                  int root, tMPI_Comm comm)
65 {
66     int                 synct;
67     struct coll_env    *cev;
68     int                 myrank;
69     int                 ret = TMPI_SUCCESS;
70     struct tmpi_thread *cur = tMPI_Get_current();
71
72 #ifdef TMPI_PROFILE
73     tMPI_Profile_count_start(cur);
74 #endif
75 #ifdef TMPI_TRACE
76     tMPI_Trace_print("tMPI_Scatter(%p, %d, %p, %p, %d, %p, %d, %p)",
77                      sendbuf, sendcount, sendtype,
78                      recvbuf, recvcount, recvtype, root, comm);
79 #endif
80     if (!comm)
81     {
82         return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
83     }
84     myrank = tMPI_Comm_seek_rank(comm, cur);
85
86     /* we increase our counter, and determine which coll_env we get */
87     cev = tMPI_Get_cev(comm, myrank, &synct);
88
89     if (myrank == root)
90     {
91         int       i;
92         size_t    sendsize        = sendtype->size*sendcount;
93         size_t    total_send_size = 0;
94 #ifdef USE_COLLECTIVE_COPY_BUFFER
95         tmpi_bool using_cb;
96 #endif
97
98         if (!sendbuf) /* don't do pointer arithmetic on a NULL ptr */
99         {
100             return tMPI_Error(comm, TMPI_ERR_BUF);
101         }
102
103         /* we set up multiple posts, so no Post_multi */
104         cev->met[myrank].tag      = TMPI_SCATTER_TAG;
105         cev->met[myrank].datatype = sendtype;
106         tMPI_Atomic_memory_barrier_rel();
107         tMPI_Atomic_set( &(cev->met[myrank].n_remaining), cev->N-1 );
108         for (i = 0; i < comm->grp.N; i++)
109         {
110             total_send_size            += sendtype->size*sendcount;
111             cev->met[myrank].bufsize[i] = sendsize;
112             cev->met[myrank].buf[i]     = (char*)sendbuf+sendsize*i;
113         }
114 #ifdef USE_COLLECTIVE_COPY_BUFFER
115         /* we must copy our own data too, unfortunately (otherwise there's
116            a hole) */
117         using_cb                  = (total_send_size < (size_t)((cev->N)*COPY_BUFFER_SIZE));
118         cev->met[myrank].using_cb = using_cb;
119         if (using_cb)
120         {
121             /* we set cpbuf stuff to NULL initially */
122             for (i = 0; i < cev->N; i++)
123             {
124                 /*cev->met[myrank].cpbuf[i]=NULL;*/
125                 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[i]), NULL);
126             }
127             tMPI_Atomic_set(&(cev->met[myrank].buf_readcount), 0);
128         }
129 #endif
130
131         /* post availability */
132         for (i = 0; i < cev->N; i++)
133         {
134             if (i != myrank)
135             {
136                 tMPI_Event_signal( &(cev->met[i].recv_ev) );
137             }
138         }
139
140
141
142
143 #ifdef USE_COLLECTIVE_COPY_BUFFER
144         /* do the copy_buffer thing */
145         if (using_cb)
146         {
147             /* copy the buffer locally. First allocate */
148             cev->met[myrank].cb = tMPI_Copy_buffer_list_get(
149                         &(tMPI_Get_thread(comm, myrank)->cbl_multi));
150             if (cev->met[myrank].cb->size < total_send_size)
151             {
152                 fprintf(stderr, "ERROR: cb size too small\n");
153                 exit(1);
154             }
155             /* copy to the new buf */
156             memcpy(cev->met[myrank].cb->buf, sendbuf, total_send_size);
157
158             /* post the new buf */
159             for (i = 0; i < cev->N; i++)
160             {
161                 tMPI_Atomic_memory_barrier_rel();
162                 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[i]),
163                                     (char*)cev->met[myrank].cb->buf+sendsize*i);
164                 /*cev->met[myrank].cpbuf[i] = (char*)cev->met[myrank].cb->buf +
165                                             sendsize*i ;*/
166             }
167         }
168 #endif
169
170         /* do root transfer */
171         if (recvbuf != TMPI_IN_PLACE)
172         {
173             tMPI_Coll_root_xfer(comm, sendtype, recvtype,
174                                 sendsize, recvtype->size*recvcount,
175                                 (char*)sendbuf+sendsize*myrank,
176                                 recvbuf, &ret);
177         }
178
179         /* and wait until everybody is done copying */
180         tMPI_Wait_for_others(cev, myrank);
181     }
182     else
183     {
184         /* get the root cev */
185         size_t bufsize = recvcount*recvtype->size;
186         /* wait until root becomes available */
187         tMPI_Wait_for_data(cur, cev, myrank);
188         tMPI_Mult_recv(comm, cev, root, myrank, TMPI_SCATTER_TAG, recvtype,
189                        bufsize, recvbuf, &ret);
190     }
191 #ifdef TMPI_PROFILE
192     tMPI_Profile_count_stop(cur, TMPIFN_Scatter);
193 #endif
194     return ret;
195 }
196
197
198
199 int tMPI_Scatterv(void* sendbuf, int *sendcounts, int *displs,
200                   tMPI_Datatype sendtype, void* recvbuf, int recvcount,
201                   tMPI_Datatype recvtype, int root, tMPI_Comm comm)
202 {
203     int                 synct;
204     struct coll_env    *cev;
205     int                 myrank;
206     int                 ret = TMPI_SUCCESS;
207     struct tmpi_thread *cur = tMPI_Get_current();
208
209 #ifdef TMPI_TRACE
210     tMPI_Trace_print("tMPI_Scatterv(%p, %p, %p, %p, %p, %d, %p, %d, %p)",
211                      sendbuf, sendcounts, displs, sendtype, recvbuf,
212                      recvcount, recvtype, root, comm);
213 #endif
214 #ifdef TMPI_PROFILE
215     tMPI_Profile_count_start(cur);
216 #endif
217
218
219     if (!comm)
220     {
221         return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
222     }
223     myrank = tMPI_Comm_seek_rank(comm, cur);
224
225     /* we increase our counter, and determine which coll_env we get */
226     cev = tMPI_Get_cev(comm, myrank, &synct);
227
228     if (myrank == root)
229     {
230         int       i;
231         size_t    total_send_size = 0;
232 #ifdef USE_COLLECTIVE_COPY_BUFFER
233         tmpi_bool using_cb;
234 #endif
235
236         if (!sendbuf) /* don't do pointer arithmetic on a NULL ptr */
237         {
238             return tMPI_Error(comm, TMPI_ERR_BUF);
239         }
240
241         /* we set up multiple posts, so no Post_multi */
242         cev->met[myrank].tag      = TMPI_SCATTERV_TAG;
243         cev->met[myrank].datatype = sendtype;
244         tMPI_Atomic_memory_barrier_rel();
245         tMPI_Atomic_set( &(cev->met[myrank].n_remaining), cev->N-1 );
246         for (i = 0; i < cev->N; i++)
247         {
248             total_send_size            += sendtype->size*sendcounts[i];
249             cev->met[myrank].bufsize[i] = sendtype->size*sendcounts[i];
250             cev->met[myrank].buf[i]     = (char*)sendbuf+sendtype->size*displs[i];
251         }
252 #ifdef USE_COLLECTIVE_COPY_BUFFER
253         /* we must copy our own data too, unfortunately (otherwise there's
254            a hole) */
255         using_cb                  = (total_send_size < (size_t)((cev->N)*COPY_BUFFER_SIZE));
256         cev->met[myrank].using_cb = using_cb;
257         if (using_cb)
258         {
259             /* we set cpbuf stuff to NULL initially */
260             for (i = 0; i < cev->N; i++)
261             {
262                 /*cev->met[myrank].cpbuf[i]=NULL;*/
263                 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[i]), NULL);
264             }
265             tMPI_Atomic_set(&(cev->met[myrank].buf_readcount), 0);
266         }
267 #endif
268
269         /* post availability */
270         for (i = 0; i < cev->N; i++)
271         {
272             if (i != myrank)
273             {
274                 tMPI_Event_signal( &(cev->met[i].recv_ev) );
275             }
276         }
277
278
279
280 #ifdef USE_COLLECTIVE_COPY_BUFFER
281         /* do the copy_buffer thing */
282         if (using_cb)
283         {
284             /* copy the buffer locally. First allocate */
285             cev->met[myrank].cb = tMPI_Copy_buffer_list_get(
286                         &(tMPI_Get_thread(comm, myrank)->cbl_multi));
287             if (cev->met[myrank].cb->size < total_send_size)
288             {
289                 fprintf(stderr, "ERROR: cb size too small\n");
290                 exit(1);
291             }
292             /* copy to the new buf */
293             memcpy(cev->met[myrank].cb->buf, sendbuf, total_send_size);
294             /* post the new buf */
295             for (i = 0; i < cev->N; i++)
296             {
297                 tMPI_Atomic_memory_barrier_rel();
298                 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[i]),
299                                     (char*)cev->met[myrank].cb->buf +
300                                     sendtype->size*displs[i]);
301                 /*cev->met[myrank].cpbuf[i]=(char*)cev->met[myrank].cb->buf +
302                           sendtype->size*displs[i];*/
303             }
304         }
305 #endif
306
307         /* do root transfer */
308         if (recvbuf != TMPI_IN_PLACE)
309         {
310             tMPI_Coll_root_xfer(comm, sendtype, recvtype,
311                                 sendtype->size*sendcounts[myrank],
312                                 recvtype->size*recvcount,
313                                 (char*)sendbuf+sendtype->size*displs[myrank],
314                                 recvbuf,
315                                 &ret);
316         }
317
318         /* and wait until everybody is done copying */
319         tMPI_Wait_for_others(cev, myrank);
320     }
321     else
322     {
323         /* get the root cev */
324         size_t bufsize = recvcount*recvtype->size;
325         /* wait until root becomes available */
326         tMPI_Wait_for_data(cur, cev, myrank);
327         tMPI_Mult_recv(comm, cev, root, myrank, TMPI_SCATTERV_TAG,
328                        recvtype, bufsize, recvbuf, &ret);
329     }
330 #ifdef TMPI_PROFILE
331     tMPI_Profile_count_stop(cur, TMPIFN_Scatterv);
332 #endif
333     return ret;
334 }