2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
58 #include "collective.h"
64 #ifdef USE_COLLECTIVE_COPY_BUFFER
65 /* initialize a copy buffer */
66 void tMPI_Copy_buffer_init(struct copy_buffer *cb, size_t size)
68 cb->buf=tMPI_Malloc(size);
72 /* destroy a copy buffer */
73 void tMPI_Copy_buffer_destroy(struct copy_buffer *cb)
78 void tMPI_Copy_buffer_list_init(struct copy_buffer_list *cbl, int Nbufs,
84 cbl->cb_alloc=(struct copy_buffer*)
85 tMPI_Malloc(sizeof(struct copy_buffer)*Nbufs);
86 cbl->cb=cbl->cb_alloc; /* the first one */
90 tMPI_Copy_buffer_init( &(cbl->cb_alloc[i]), size );
92 cbl->cb_alloc[i].next=&(cbl->cb_alloc[i+1]);
94 cbl->cb_alloc[i].next=NULL;
98 void tMPI_Copy_buffer_list_destroy(struct copy_buffer_list *cbl)
102 for(i=0;i<cbl->Nbufs;i++)
104 tMPI_Copy_buffer_destroy( &(cbl->cb_alloc[i]) );
109 struct copy_buffer *tMPI_Copy_buffer_list_get(struct copy_buffer_list *cbl)
111 struct copy_buffer *ret=cbl->cb;
114 fprintf(stderr,"out of copy buffers!!");
122 void tMPI_Copy_buffer_list_return(struct copy_buffer_list *cbl,
123 struct copy_buffer *cb)
137 void tMPI_Coll_envt_init(struct coll_env_thread *met, int N)
139 tMPI_Atomic_set(&(met->current_sync), 0);
140 tMPI_Atomic_set(&(met->n_remaining), 0);
141 met->buf=(void**)tMPI_Malloc(sizeof(void*)*N);
142 met->bufsize=(size_t*)tMPI_Malloc(sizeof(size_t)*N);
143 met->read_data=(tmpi_bool*)tMPI_Malloc(sizeof(tmpi_bool)*N);
144 #ifdef USE_COLLECTIVE_COPY_BUFFER
145 met->cpbuf=(tMPI_Atomic_ptr_t*)tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*N);
149 tMPI_Event_init( &(met->send_ev) );
150 tMPI_Event_init( &(met->recv_ev) );
154 void tMPI_Coll_envt_destroy(struct coll_env_thread *met)
156 free( (void*)met->buf );
157 free( (void*)met->bufsize );
158 free( (void*)met->read_data );
160 #ifdef USE_COLLECTIVE_COPY_BUFFER
161 free( (void*)met->cpbuf );
165 void tMPI_Coll_env_init(struct coll_env *cev, int N)
169 cev->met=(struct coll_env_thread*)tMPI_Malloc(
170 sizeof(struct coll_env_thread)*N);
172 tMPI_Atomic_set(&(cev->coll.current_sync), 0);
173 tMPI_Atomic_set(&(cev->coll.n_remaining), 0);
176 tMPI_Coll_envt_init(&(cev->met[i]), N);
180 void tMPI_Coll_env_destroy(struct coll_env *cev)
183 for(i=0;i<cev->N;i++)
185 tMPI_Coll_envt_destroy(&(cev->met[i]));
187 free( (void*)cev->met );
191 void tMPI_Coll_sync_init(struct coll_sync *csync, int N)
199 csync->events=(tMPI_Event*)tMPI_Malloc(sizeof(tMPI_Event)*N);
202 tMPI_Event_init( &(csync->events[i]) );
206 void tMPI_Coll_sync_destroy(struct coll_sync *csync)
213 for(i=0;i<csync->N;i++)
215 tMPI_Event_destroy( &(csync->events[i]) );
225 /* get a pointer the next coll_env once it's ready. */
226 struct coll_env *tMPI_Get_cev(tMPI_Comm comm, int myrank, int *counter)
228 struct coll_sync *csync=&(comm->csync[myrank]);
229 struct coll_env *cev;
230 #ifdef USE_COLLECTIVE_COPY_BUFFER
234 /* we increase our counter, and determine which coll_env we get */
236 *counter=csync->synct;
237 cev=&(comm->cev[csync->synct % N_COLL_ENV]);
240 #ifdef USE_COLLECTIVE_COPY_BUFFER
241 if (cev->met[myrank].using_cb)
243 N=tMPI_Event_wait( &(cev->met[myrank].send_ev));
244 tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
247 #ifdef USE_COLLECTIVE_COPY_BUFFER
248 /* clean up old copy_buffer pointers */
249 if (cev->met[myrank].cb)
251 tMPI_Copy_buffer_list_return(&(tMPI_Get_current()->cbl_multi),
252 cev->met[myrank].cb);
253 cev->met[myrank].cb=NULL;
254 cev->met[myrank].using_cb=FALSE;
265 void tMPI_Mult_recv(tMPI_Comm comm, struct coll_env *cev, int rank,
266 int index, int expected_tag, tMPI_Datatype recvtype,
267 size_t recvsize, void *recvbuf, int *ret)
269 size_t sendsize=cev->met[rank].bufsize[index];
271 /* check tags, types */
272 if ((cev->met[rank].datatype != recvtype ) ||
273 (cev->met[rank].tag != expected_tag))
275 *ret=tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
278 if (sendsize) /* we allow NULL ptrs if there's nothing to xmit */
281 #ifdef USE_COLLECTIVE_COPY_BUFFER
282 tmpi_bool decrease_ctr=FALSE;
285 if ( sendsize > recvsize )
287 *ret=tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
291 if ( cev->met[rank].buf == recvbuf )
293 *ret=tMPI_Error(TMPI_COMM_WORLD,TMPI_ERR_XFER_BUF_OVERLAP);
296 /* get source buffer */
297 #ifdef USE_COLLECTIVE_COPY_BUFFER
298 if ( !(cev->met[rank].using_cb))
301 srcbuf=cev->met[rank].buf[index];
303 #ifdef USE_COLLECTIVE_COPY_BUFFER
306 srcbuf=tMPI_Atomic_ptr_get(&(cev->met[rank].cpbuf[index]));
307 tMPI_Atomic_memory_barrier_acq();
310 { /* there was (as of yet) no copied buffer */
311 void *try_again_srcbuf;
312 /* we need to try checking the pointer again after we increase
313 the read counter, signaling that one more thread
315 tMPI_Atomic_add_return(&(cev->met[rank].buf_readcount), 1);
316 /* a full memory barrier */
317 tMPI_Atomic_memory_barrier();
318 try_again_srcbuf=tMPI_Atomic_ptr_get(
319 &(cev->met[rank].cpbuf[index]));
320 if (!try_again_srcbuf)
322 /* apparently the copied buffer is not ready yet. We
323 just use the real source buffer. We have already
324 indicated we're reading from the regular buf. */
325 srcbuf=cev->met[rank].buf[index];
331 /* We tried again, and this time there was a copied buffer.
332 We use that, and indicate that we're not reading from the
333 regular buf. This case should be pretty rare. */
334 tMPI_Atomic_add_return(&(cev->met[rank].buf_readcount),-1);
335 tMPI_Atomic_memory_barrier_acq();
336 srcbuf=try_again_srcbuf;
342 tMPI_Profile_count_buffered_coll_xfer(tMPI_Get_current());
347 memcpy((char*)recvbuf, srcbuf, sendsize);
349 tMPI_Profile_count_coll_xfer(tMPI_Get_current());
352 #ifdef USE_COLLECTIVE_COPY_BUFFER
355 /* we decrement the read count; potentially releasing the buffer. */
356 tMPI_Atomic_memory_barrier_rel();
357 tMPI_Atomic_add_return( &(cev->met[rank].buf_readcount), -1);
361 /* signal one thread ready */
364 tMPI_Atomic_memory_barrier_rel();
365 reta=tMPI_Atomic_add_return( &(cev->met[rank].n_remaining), -1);
368 tMPI_Event_signal( &(cev->met[rank].send_ev) );
373 void tMPI_Coll_root_xfer(tMPI_Comm comm, tMPI_Datatype sendtype,
374 tMPI_Datatype recvtype,
375 size_t sendsize, size_t recvsize,
376 void* sendbuf, void* recvbuf, int *ret)
378 /* do root transfer */
379 if (recvsize < sendsize)
381 *ret=tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
384 if (recvtype != sendtype)
386 *ret=tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
389 if ( sendbuf == recvbuf )
391 *ret=tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_XFER_BUF_OVERLAP);
395 memcpy(recvbuf, sendbuf, sendsize);
398 void tMPI_Post_multi(struct coll_env *cev, int myrank, int index,
399 int tag, tMPI_Datatype datatype, size_t bufsize,
400 void *buf, int n_remaining, int synct, int dest)
403 #ifdef USE_COLLECTIVE_COPY_BUFFER
404 /* decide based on the number of waiting threads */
405 tmpi_bool using_cb=(bufsize < (size_t)(n_remaining*COPY_BUFFER_SIZE));
407 cev->met[myrank].using_cb=using_cb;
410 /* we set it to NULL initially */
411 /*cev->met[myrank].cpbuf[index]=NULL;*/
412 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[index]), NULL);
414 tMPI_Atomic_set(&(cev->met[myrank].buf_readcount), 0);
417 cev->met[myrank].tag=tag;
418 cev->met[myrank].datatype=datatype;
419 cev->met[myrank].buf[index]=buf;
420 cev->met[myrank].bufsize[index]=bufsize;
421 tMPI_Atomic_set(&(cev->met[myrank].n_remaining), n_remaining);
422 tMPI_Atomic_memory_barrier_rel();
423 tMPI_Atomic_set(&(cev->met[myrank].current_sync), synct);
425 /* publish availability. */
428 for(i=0;i<cev->N;i++)
431 tMPI_Event_signal( &(cev->met[i].recv_ev) );
436 tMPI_Event_signal( &(cev->met[dest].recv_ev) );
439 #ifdef USE_COLLECTIVE_COPY_BUFFER
440 /* becase we've published availability, we can start copying --
441 possibly in parallel with the receiver */
444 struct tmpi_thread *cur=tMPI_Get_current();
445 /* copy the buffer locally. First allocate */
446 cev->met[myrank].cb=tMPI_Copy_buffer_list_get( &(cur->cbl_multi) );
447 if (cev->met[myrank].cb->size < bufsize)
449 fprintf(stderr, "ERROR: cb size too small\n");
452 /* copy to the new buf */
453 memcpy(cev->met[myrank].cb->buf, buf, bufsize);
455 /* post the new buf */
456 tMPI_Atomic_memory_barrier_rel();
457 /*cev->met[myrank].cpbuf[index]=cev->met[myrank].cb->buf;*/
458 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[index]),
459 cev->met[myrank].cb->buf);
465 void tMPI_Wait_for_others(struct coll_env *cev, int myrank)
467 #if defined(TMPI_PROFILE)
468 struct tmpi_thread *cur=tMPI_Get_current();
469 tMPI_Profile_wait_start(cur);
472 #ifdef USE_COLLECTIVE_COPY_BUFFER
473 if (! (cev->met[myrank].using_cb) )
476 /* wait until everybody else is done copying the buffer */
477 tMPI_Event_wait( &(cev->met[myrank].send_ev));
478 tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
480 #ifdef USE_COLLECTIVE_COPY_BUFFER
483 /* wait until everybody else is done copying the original buffer.
484 We use atomic add-return because we want to be sure of coherency.
485 This wait is bound to be very short (otherwise it wouldn't
486 be double-buffering) so we always spin here. */
487 /*tMPI_Atomic_memory_barrier_rel();*/
489 while (!tMPI_Atomic_cas( &(cev->met[rank].buf_readcount), 0,
493 while (tMPI_Atomic_add_return( &(cev->met[myrank].buf_readcount), 0)
497 while (tMPI_Atomic_get( &(cev->met[rank].buf_readcount) )>0)
501 tMPI_Atomic_memory_barrier_acq();
504 #if defined(TMPI_PROFILE)
505 tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_send);
509 void tMPI_Wait_for_data(struct tmpi_thread *cur, struct coll_env *cev,
512 #if defined(TMPI_PROFILE)
513 tMPI_Profile_wait_start(cur);
515 tMPI_Event_wait( &(cev->met[myrank].recv_ev));
516 tMPI_Event_process( &(cev->met[myrank].recv_ev), 1);
517 #if defined(TMPI_PROFILE)
518 tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_recv);
527 int tMPI_Barrier(tMPI_Comm comm)
530 struct tmpi_thread *cur=tMPI_Get_current();
531 tMPI_Profile_count_start(cur);
535 tMPI_Trace_print("tMPI_Barrier(%p, %d, %p, %d, %d, %p, %p)", comm);
540 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
545 #if defined(TMPI_PROFILE)
546 tMPI_Profile_wait_start(cur);
549 tMPI_Barrier_wait( &(comm->barrier) );
550 #if defined(TMPI_PROFILE)
551 tMPI_Profile_wait_stop(cur, TMPIWAIT_Barrier);
556 tMPI_Profile_count_stop(cur, TMPIFN_Barrier);