Redefine the default boolean type to gmx_bool.
[alexxy/gromacs.git] / src / gmxlib / thread_mpi / scatter.h
1 /*
2 This source code file is part of thread_mpi.  
3 Written by Sander Pronk, Erik Lindahl, and possibly others. 
4
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6 All rights reserved.
7
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11    notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13    notice, this list of conditions and the following disclaimer in the
14    documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16    names of its contributors may be used to endorse or promote products
17    derived from this software without specific prior written permission.
18
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
35 files.
36 */
37
38 /* this file is #included from collective.c; it's not really a header file,
39    but this defines a lot of functions that probably need to be inlined.*/
40
41
42 int tMPI_Scatter(void* sendbuf, int sendcount, tMPI_Datatype sendtype,
43                  void* recvbuf, int recvcount, tMPI_Datatype recvtype, 
44                  int root, tMPI_Comm comm)
45 {
46     int synct;
47     struct coll_env *cev;
48     int myrank;
49     int ret=TMPI_SUCCESS;
50     struct tmpi_thread *cur=tMPI_Get_current();
51
52 #ifdef TMPI_PROFILE
53     tMPI_Profile_count_start(cur); 
54 #endif
55 #ifdef TMPI_TRACE
56     tMPI_Trace_print("tMPI_Scatter(%p, %d, %p, %p, %d, %p, %d, %p)", 
57                      sendbuf, sendcount, sendtype, 
58                      recvbuf, recvcount, recvtype, root, comm);
59 #endif
60     if (!comm)
61     {
62         return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
63     }
64     myrank=tMPI_Comm_seek_rank(comm, cur);
65
66     /* we increase our counter, and determine which coll_env we get */
67     cev=tMPI_Get_cev(comm, myrank, &synct);
68
69     if (myrank==root)
70     {
71         int i;
72         size_t sendsize=sendtype->size*sendcount;
73         size_t total_send_size=0;
74 #ifdef USE_COLLECTIVE_COPY_BUFFER
75         gmx_bool using_cb;
76 #endif
77
78         if (!sendbuf) /* don't do pointer arithmetic on a NULL ptr */
79         {
80             return tMPI_Error(comm, TMPI_ERR_BUF);
81         }
82
83         /* we set up multiple posts, so no Post_multi */
84         cev->met[myrank].tag=TMPI_SCATTER_TAG;
85         cev->met[myrank].datatype=sendtype;
86         tMPI_Atomic_set( &(cev->met[myrank].n_remaining), cev->N-1 );
87         for(i=0;i<comm->grp.N;i++)
88         {        
89             total_send_size += sendtype->size*sendcount;
90             cev->met[myrank].bufsize[i]=sendsize;
91             cev->met[myrank].buf[i]=(char*)sendbuf+sendsize*i;
92         }
93 #ifdef USE_COLLECTIVE_COPY_BUFFER
94         /* we must copy our own data too, unfortunately (otherwise there's 
95            a hole) */
96         using_cb=(total_send_size < (size_t)((cev->N)*COPY_BUFFER_SIZE));
97         cev->met[myrank].using_cb=using_cb;
98         if (using_cb)
99         {
100             /* we set cpbuf stuff to NULL initially */
101             for(i=0;i<cev->N;i++)
102             {
103                 /*cev->met[myrank].cpbuf[i]=NULL;*/
104                 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[i]), NULL);
105             }
106             tMPI_Atomic_set(&(cev->met[myrank].buf_readcount), 0);
107         }
108 #endif
109
110         /* post availability */
111         for(i=0;i<cev->N;i++)
112         {
113             if (i != myrank)
114                 tMPI_Event_signal( &(cev->met[i].recv_ev) );
115         }
116
117
118
119
120 #ifdef USE_COLLECTIVE_COPY_BUFFER
121         /* do the copy_buffer thing */
122         if (using_cb)
123         {
124             /* copy the buffer locally. First allocate */
125             cev->met[myrank].cb=tMPI_Copy_buffer_list_get(
126                                     &(tMPI_Get_thread(comm,myrank)->cbl_multi));
127             if (cev->met[myrank].cb->size < total_send_size)
128             {
129                 fprintf(stderr, "ERROR: cb size too small\n");
130                 exit(1);
131             }
132             /* copy to the new buf */
133             memcpy(cev->met[myrank].cb->buf, sendbuf, total_send_size);
134
135             /* post the new buf */
136             for(i=0;i<cev->N;i++)
137             {
138                 tMPI_Atomic_memory_barrier();
139                 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[i]),
140                                     (char*)cev->met[myrank].cb->buf+sendsize*i);
141                 /*cev->met[myrank].cpbuf[i] = (char*)cev->met[myrank].cb->buf + 
142                                             sendsize*i ;*/
143             }
144         }
145 #endif
146
147         /* do root transfer */
148         if (recvbuf!=TMPI_IN_PLACE)
149         {
150             tMPI_Coll_root_xfer(comm, sendtype, recvtype,
151                                 sendsize, recvtype->size*recvcount, 
152                                 (char*)sendbuf+sendsize*myrank, 
153                                 recvbuf, &ret);
154         }
155
156         /* and wait until everybody is done copying */
157         tMPI_Wait_for_others(cev, myrank);
158     }
159     else
160     {
161         /* get the root cev */
162         size_t bufsize=recvcount*recvtype->size;
163         /* wait until root becomes available */
164         tMPI_Wait_for_data(cur, cev, myrank);
165         tMPI_Mult_recv(comm, cev, root, myrank,TMPI_SCATTER_TAG, recvtype, 
166                        bufsize, recvbuf, &ret);
167     }
168 #ifdef TMPI_PROFILE
169     tMPI_Profile_count_stop(cur, TMPIFN_Scatter); 
170 #endif
171     return ret;
172 }
173
174
175
176 int tMPI_Scatterv(void* sendbuf, int *sendcounts, int *displs,
177                  tMPI_Datatype sendtype, void* recvbuf, int recvcount,
178                  tMPI_Datatype recvtype, int root, tMPI_Comm comm)
179 {
180     int synct;
181     struct coll_env *cev;
182     int myrank;
183     int ret=TMPI_SUCCESS;
184     struct tmpi_thread *cur=tMPI_Get_current();
185
186 #ifdef TMPI_TRACE
187     tMPI_Trace_print("tMPI_Scatterv(%p, %p, %p, %p, %p, %d, %p, %d, %p)", 
188                      sendbuf, sendcounts, displs, sendtype, recvbuf,
189                      recvcount, recvtype, root, comm);
190 #endif
191 #ifdef TMPI_PROFILE
192     tMPI_Profile_count_start(cur); 
193 #endif
194
195
196     if (!comm)
197     {
198         return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
199     }
200     myrank=tMPI_Comm_seek_rank(comm, cur);
201
202     /* we increase our counter, and determine which coll_env we get */
203     cev=tMPI_Get_cev(comm, myrank, &synct);
204
205     if (myrank==root)
206     {
207         int i;
208         size_t total_send_size=0;
209 #ifdef USE_COLLECTIVE_COPY_BUFFER
210         gmx_bool using_cb;
211 #endif
212
213         if (!sendbuf) /* don't do pointer arithmetic on a NULL ptr */
214         {
215             return tMPI_Error(comm, TMPI_ERR_BUF);
216         }
217
218         /* we set up multiple posts, so no Post_multi */
219         cev->met[myrank].tag=TMPI_SCATTERV_TAG;
220         cev->met[myrank].datatype=sendtype;
221         tMPI_Atomic_set( &(cev->met[myrank].n_remaining), cev->N-1 );
222         for(i=0;i<cev->N;i++)
223         {        
224             total_send_size += sendtype->size*sendcounts[i];
225             cev->met[myrank].bufsize[i]=sendtype->size*sendcounts[i];
226             cev->met[myrank].buf[i]=(char*)sendbuf+sendtype->size*displs[i];
227         }
228 #ifdef USE_COLLECTIVE_COPY_BUFFER
229         /* we must copy our own data too, unfortunately (otherwise there's 
230            a hole) */
231         using_cb=(total_send_size < (size_t)((cev->N)*COPY_BUFFER_SIZE));
232         cev->met[myrank].using_cb=using_cb;
233         if (using_cb)
234         {
235             /* we set cpbuf stuff to NULL initially */
236             for(i=0;i<cev->N;i++)
237             {
238                 /*cev->met[myrank].cpbuf[i]=NULL;*/
239                 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[i]), NULL);
240             }
241             tMPI_Atomic_set(&(cev->met[myrank].buf_readcount), 0);
242         }
243 #endif
244
245         /* post availability */
246         for(i=0;i<cev->N;i++)
247         {
248             if (i != myrank)
249                 tMPI_Event_signal( &(cev->met[i].recv_ev) );
250         }
251
252
253
254 #ifdef USE_COLLECTIVE_COPY_BUFFER
255         /* do the copy_buffer thing */
256         if (using_cb)
257         {
258             /* copy the buffer locally. First allocate */
259             cev->met[myrank].cb=tMPI_Copy_buffer_list_get(
260                                     &(tMPI_Get_thread(comm,myrank)->cbl_multi));
261             if (cev->met[myrank].cb->size < total_send_size)
262             {
263                 fprintf(stderr, "ERROR: cb size too small\n");
264                 exit(1);
265             }
266             /* copy to the new buf */
267             memcpy(cev->met[myrank].cb->buf, sendbuf, total_send_size);
268             /* post the new buf */
269             for(i=0;i<cev->N;i++)
270             {
271                 tMPI_Atomic_memory_barrier();
272                 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[i]),
273                                     (char*)cev->met[myrank].cb->buf + 
274                                     sendtype->size*displs[i]);
275                 /*cev->met[myrank].cpbuf[i]=(char*)cev->met[myrank].cb->buf + 
276                           sendtype->size*displs[i];*/
277             }
278             tMPI_Atomic_memory_barrier();
279         }
280 #endif
281
282         /* do root transfer */
283         if (recvbuf!=TMPI_IN_PLACE)
284         {
285             tMPI_Coll_root_xfer(comm, sendtype, recvtype,
286                                 sendtype->size*sendcounts[myrank], 
287                                 recvtype->size*recvcount, 
288                                 (char*)sendbuf+sendtype->size*displs[myrank], 
289                                 recvbuf,
290                                 &ret);
291         }
292
293         /* and wait until everybody is done copying */
294         tMPI_Wait_for_others(cev, myrank);
295     }
296     else
297     {
298         /* get the root cev */
299         size_t bufsize=recvcount*recvtype->size;
300         /* wait until root becomes available */
301         tMPI_Wait_for_data(cur, cev, myrank);
302         tMPI_Mult_recv(comm, cev, root, myrank, TMPI_SCATTERV_TAG, 
303                        recvtype, bufsize, recvbuf, &ret);
304     }
305 #ifdef TMPI_PROFILE
306     tMPI_Profile_count_stop(cur, TMPIFN_Scatterv); 
307 #endif
308     return ret;
309 }
310