2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
60 /* get a pointer the next coll_env once it's ready */
61 static struct coll_env *tMPI_Get_cev(tMPI_Comm comm, int myrank, int *synct);
63 /* post the availability of data in a cev.
64 cev = the collective comm environment
66 index = the buffer index
68 datatype = the datatype
69 busize = the buffer size
70 buf = the buffer to xfer
71 n_remaining = the number of remaining threads that need to transfer
72 synct = the multicast sync number
73 dest = -1 for all theads, or a specific rank number.
75 static void tMPI_Post_multi(struct coll_env *cev, int myrank, int index,
76 int tag, tMPI_Datatype datatype,
77 size_t bufsize, void *buf, int n_remaining,
80 /* transfer data from cev->met[rank] to recvbuf */
81 static void tMPI_Mult_recv(tMPI_Comm comm, struct coll_env *cev, int rank,
82 int index, int expected_tag, tMPI_Datatype recvtype,
83 size_t recvsize, void *recvbuf, int *ret);
85 /* do a root transfer (from root send buffer to root recv buffer) */
86 static void tMPI_Coll_root_xfer(tMPI_Comm comm,
87 tMPI_Datatype sendtype, tMPI_Datatype recvtype,
88 size_t sendsize, size_t recvsize,
89 void* sendbuf, void* recvbuf, int *ret);
91 /* wait for other processes to copy data from my cev */
92 static void tMPI_Wait_for_others(struct coll_env *cev, int myrank);
93 /* wait for data to become available from a specific rank */
94 static void tMPI_Wait_for_data(struct tmpi_thread *cur, struct coll_env *cev,
96 /*int rank, int myrank, int synct);*/
98 /* run a single binary reduce operation on src_a and src_b, producing dest.
99 dest and src_a may be identical */
100 static int tMPI_Reduce_run_op(void *dest, void *src_a, void *src_b,
101 tMPI_Datatype datatype, int count, tMPI_Op op,
107 #ifdef USE_COLLECTIVE_COPY_BUFFER
108 /* initialize a copy buffer */
109 void tMPI_Copy_buffer_init(struct copy_buffer *cb, size_t size)
111 cb->buf=tMPI_Malloc(size);
115 /* destroy a copy buffer */
116 void tMPI_Copy_buffer_destroy(struct copy_buffer *cb)
121 void tMPI_Copy_buffer_list_init(struct copy_buffer_list *cbl, int Nbufs,
127 cbl->cb_alloc=(struct copy_buffer*)
128 tMPI_Malloc(sizeof(struct copy_buffer)*Nbufs);
129 cbl->cb=cbl->cb_alloc; /* the first one */
133 tMPI_Copy_buffer_init( &(cbl->cb_alloc[i]), size );
135 cbl->cb_alloc[i].next=&(cbl->cb_alloc[i+1]);
137 cbl->cb_alloc[i].next=NULL;
141 void tMPI_Copy_buffer_list_destroy(struct copy_buffer_list *cbl)
145 for(i=0;i<cbl->Nbufs;i++)
147 tMPI_Copy_buffer_destroy( &(cbl->cb_alloc[i]) );
152 struct copy_buffer *tMPI_Copy_buffer_list_get(struct copy_buffer_list *cbl)
154 struct copy_buffer *ret=cbl->cb;
157 fprintf(stderr,"out of copy buffers!!");
165 void tMPI_Copy_buffer_list_return(struct copy_buffer_list *cbl,
166 struct copy_buffer *cb)
180 static void tMPI_Coll_envt_init(struct coll_env_thread *met, int N)
182 tMPI_Atomic_set(&(met->current_sync), 0);
183 tMPI_Atomic_set(&(met->n_remaining), 0);
184 met->buf=(void**)tMPI_Malloc(sizeof(void*)*N);
185 met->bufsize=(size_t*)tMPI_Malloc(sizeof(size_t)*N);
186 met->read_data=(gmx_bool*)tMPI_Malloc(sizeof(gmx_bool)*N);
187 #ifdef USE_COLLECTIVE_COPY_BUFFER
188 met->cpbuf=(tMPI_Atomic_ptr_t*)tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*N);
192 tMPI_Event_init( &(met->send_ev) );
193 tMPI_Event_init( &(met->recv_ev) );
197 static void tMPI_Coll_envt_destroy(struct coll_env_thread *met)
199 free( (void*)met->buf );
200 free( (void*)met->bufsize );
201 free( (void*)met->read_data );
203 #ifdef USE_COLLECTIVE_COPY_BUFFER
204 free( (void*)met->cpbuf );
208 void tMPI_Coll_env_init(struct coll_env *cev, int N)
212 cev->met=(struct coll_env_thread*)tMPI_Malloc(
213 sizeof(struct coll_env_thread)*N);
215 tMPI_Atomic_set(&(cev->coll.current_sync), 0);
216 tMPI_Atomic_set(&(cev->coll.n_remaining), 0);
219 tMPI_Coll_envt_init(&(cev->met[i]), N);
223 void tMPI_Coll_env_destroy(struct coll_env *cev)
226 for(i=0;i<cev->N;i++)
228 tMPI_Coll_envt_destroy(&(cev->met[i]));
230 free( (void*)cev->met );
234 void tMPI_Coll_sync_init(struct coll_sync *csync, int N)
242 csync->events=(tMPI_Event*)tMPI_Malloc(sizeof(tMPI_Event)*N);
245 tMPI_Event_init( &(csync->events[i]) );
249 void tMPI_Coll_sync_destroy(struct coll_sync *csync)
256 for(i=0;i<csync->N;i++)
258 tMPI_Event_destroy( &(csync->events[i]) );
268 /* get a pointer the next coll_env once it's ready. */
269 static struct coll_env *tMPI_Get_cev(tMPI_Comm comm, int myrank, int *counter)
271 struct coll_sync *csync=&(comm->csync[myrank]);
272 struct coll_env *cev;
273 #ifdef USE_COLLECTIVE_COPY_BUFFER
277 /* we increase our counter, and determine which coll_env we get */
279 *counter=csync->synct;
280 cev=&(comm->cev[csync->synct % N_COLL_ENV]);
283 #ifdef USE_COLLECTIVE_COPY_BUFFER
284 if (cev->met[myrank].using_cb)
286 N=tMPI_Event_wait( &(cev->met[myrank].send_ev));
287 tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
290 #ifdef USE_COLLECTIVE_COPY_BUFFER
291 /* clean up old copy_buffer pointers */
292 if (cev->met[myrank].cb)
294 tMPI_Copy_buffer_list_return(&(tMPI_Get_current()->cbl_multi),
295 cev->met[myrank].cb);
296 cev->met[myrank].cb=NULL;
297 cev->met[myrank].using_cb=FALSE;
308 static void tMPI_Mult_recv(tMPI_Comm comm, struct coll_env *cev, int rank,
309 int index, int expected_tag, tMPI_Datatype recvtype,
310 size_t recvsize, void *recvbuf, int *ret)
312 size_t sendsize=cev->met[rank].bufsize[index];
314 /* check tags, types */
315 if ((cev->met[rank].datatype != recvtype ) ||
316 (cev->met[rank].tag != expected_tag))
318 *ret=tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
321 if (sendsize) /* we allow NULL ptrs if there's nothing to xmit */
324 #ifdef USE_COLLECTIVE_COPY_BUFFER
325 gmx_bool decrease_ctr=FALSE;
328 if ( sendsize > recvsize )
330 *ret=tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
334 if ( cev->met[rank].buf == recvbuf )
336 *ret=tMPI_Error(TMPI_COMM_WORLD,TMPI_ERR_XFER_BUF_OVERLAP);
339 /* get source buffer */
340 #ifdef USE_COLLECTIVE_COPY_BUFFER
341 if ( !(cev->met[rank].using_cb))
344 srcbuf=cev->met[rank].buf[index];
346 #ifdef USE_COLLECTIVE_COPY_BUFFER
349 tMPI_Atomic_memory_barrier();
350 srcbuf=tMPI_Atomic_ptr_get(&(cev->met[rank].cpbuf[index]));
353 { /* there was (as of yet) no copied buffer */
354 void *try_again_srcbuf;
355 /* we need to try checking the pointer again after we increase
356 the read counter, signaling that one more thread
358 tMPI_Atomic_add_return(&(cev->met[rank].buf_readcount), 1);
359 tMPI_Atomic_memory_barrier();
360 /*try_again_srcbuf=(char*) (cev->met[rank].cpbuf[index]);*/
361 try_again_srcbuf=tMPI_Atomic_ptr_get(
362 &(cev->met[rank].cpbuf[index]));
363 if (!try_again_srcbuf)
365 /* apparently the copied buffer is not ready yet. We
366 just use the real source buffer. We have already
367 indicated we're reading from the regular buf. */
368 srcbuf=cev->met[rank].buf[index];
374 /* We tried again, and this time there was a copied buffer.
375 We use that, and indicate that we're not reading from the
376 regular buf. This case should be pretty rare. */
377 tMPI_Atomic_fetch_add(&(cev->met[rank].buf_readcount),-1);
378 srcbuf=try_again_srcbuf;
384 tMPI_Profile_count_buffered_coll_xfer(tMPI_Get_current());
389 memcpy((char*)recvbuf, srcbuf, sendsize);
391 tMPI_Profile_count_coll_xfer(tMPI_Get_current());
394 #ifdef USE_COLLECTIVE_COPY_BUFFER
397 /* we decrement the read count; potentially releasing the buffer. */
398 tMPI_Atomic_fetch_add( &(cev->met[rank].buf_readcount), -1);
402 /* signal one thread ready */
405 reta=tMPI_Atomic_add_return( &(cev->met[rank].n_remaining), -1);
408 tMPI_Event_signal( &(cev->met[rank].send_ev) );
413 static void tMPI_Coll_root_xfer(tMPI_Comm comm, tMPI_Datatype sendtype,
414 tMPI_Datatype recvtype,
415 size_t sendsize, size_t recvsize,
416 void* sendbuf, void* recvbuf, int *ret)
418 /* do root transfer */
419 if (recvsize < sendsize)
421 *ret=tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
424 if (recvtype != sendtype)
426 *ret=tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
429 if ( sendbuf == recvbuf )
431 *ret=tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_XFER_BUF_OVERLAP);
435 memcpy(recvbuf, sendbuf, sendsize);
438 static void tMPI_Post_multi(struct coll_env *cev, int myrank, int index,
439 int tag, tMPI_Datatype datatype, size_t bufsize,
440 void *buf, int n_remaining, int synct, int dest)
443 #ifdef USE_COLLECTIVE_COPY_BUFFER
444 /* decide based on the number of waiting threads */
445 gmx_bool using_cb=(bufsize < (size_t)(n_remaining*COPY_BUFFER_SIZE));
447 cev->met[myrank].using_cb=using_cb;
450 /* we set it to NULL initially */
451 /*cev->met[myrank].cpbuf[index]=NULL;*/
452 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[index]), NULL);
454 tMPI_Atomic_set(&(cev->met[myrank].buf_readcount), 0);
457 cev->met[myrank].tag=tag;
458 cev->met[myrank].datatype=datatype;
459 cev->met[myrank].buf[index]=buf;
460 cev->met[myrank].bufsize[index]=bufsize;
461 tMPI_Atomic_set(&(cev->met[myrank].n_remaining), n_remaining);
462 tMPI_Atomic_set(&(cev->met[myrank].current_sync), synct);
464 /* publish availability. */
467 for(i=0;i<cev->N;i++)
470 tMPI_Event_signal( &(cev->met[i].recv_ev) );
475 tMPI_Event_signal( &(cev->met[dest].recv_ev) );
478 #ifdef USE_COLLECTIVE_COPY_BUFFER
479 /* becase we've published availability, we can start copying --
480 possibly in parallel with the receiver */
483 struct tmpi_thread *cur=tMPI_Get_current();
484 /* copy the buffer locally. First allocate */
485 cev->met[myrank].cb=tMPI_Copy_buffer_list_get( &(cur->cbl_multi) );
486 if (cev->met[myrank].cb->size < bufsize)
488 fprintf(stderr, "ERROR: cb size too small\n");
491 /* copy to the new buf */
492 memcpy(cev->met[myrank].cb->buf, buf, bufsize);
494 /* post the new buf */
495 tMPI_Atomic_memory_barrier();
496 /*cev->met[myrank].cpbuf[index]=cev->met[myrank].cb->buf;*/
497 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[index]),
498 cev->met[myrank].cb->buf);
504 static void tMPI_Wait_for_others(struct coll_env *cev, int myrank)
506 #if defined(TMPI_PROFILE)
507 struct tmpi_thread *cur=tMPI_Get_current();
508 tMPI_Profile_wait_start(cur);
511 #ifdef USE_COLLECTIVE_COPY_BUFFER
512 if (! (cev->met[myrank].using_cb) )
515 /* wait until everybody else is done copying the buffer */
516 tMPI_Event_wait( &(cev->met[myrank].send_ev));
517 tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
519 #ifdef USE_COLLECTIVE_COPY_BUFFER
522 /* wait until everybody else is done copying the original buffer.
523 We use fetch_add because we want to be sure of coherency.
524 This wait is bound to be very short (otherwise it wouldn't
525 be double-buffering) so we always spin here. */
526 tMPI_Atomic_memory_barrier();
528 while (tMPI_Atomic_cas( &(cev->met[rank].buf_readcount), 0,
532 while (tMPI_Atomic_fetch_add( &(cev->met[myrank].buf_readcount), 0)
536 while (tMPI_Atomic_get( &(cev->met[rank].buf_readcount) )>0)
539 tMPI_Atomic_memory_barrier();
543 #if defined(TMPI_PROFILE)
544 tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_send);
548 static void tMPI_Wait_for_data(struct tmpi_thread *cur, struct coll_env *cev,
550 /*int rank, int myrank, int synct)*/
552 #if defined(TMPI_PROFILE)
553 tMPI_Profile_wait_start(cur);
555 tMPI_Event_wait( &(cev->met[myrank].recv_ev));
556 tMPI_Event_process( &(cev->met[myrank].recv_ev), 1);
557 #if defined(TMPI_PROFILE)
558 tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_recv);
567 int tMPI_Barrier(tMPI_Comm comm)
570 struct tmpi_thread *cur=tMPI_Get_current();
571 tMPI_Profile_count_start(cur);
575 tMPI_Trace_print("tMPI_Barrier(%p, %d, %p, %d, %d, %p, %p)", comm);
580 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
585 #if defined(TMPI_PROFILE)
586 tMPI_Profile_wait_start(cur);
589 tMPI_Barrier_wait( &(comm->barrier) );
590 #if defined(TMPI_PROFILE)
591 tMPI_Profile_wait_stop(cur, TMPIWAIT_Barrier);
596 tMPI_Profile_count_stop(cur, TMPIFN_Barrier);
604 /* The actual collective functions are #included, so that the static
605 functions above are available to them and can get inlined if the
606 compiler deems it appropriate. */
610 #include "alltoall.h"