2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
58 #include "collective.h"
61 #ifdef USE_COLLECTIVE_COPY_BUFFER
62 /* initialize a copy buffer */
63 int tMPI_Copy_buffer_init(struct copy_buffer *cb, size_t size)
65 cb->buf = tMPI_Malloc(size);
68 return TMPI_ERR_NO_MEM;
74 /* destroy a copy buffer */
75 void tMPI_Copy_buffer_destroy(struct copy_buffer *cb)
80 int tMPI_Copy_buffer_list_init(struct copy_buffer_list *cbl, int Nbufs,
87 cbl->cb_alloc = (struct copy_buffer*)
88 tMPI_Malloc(sizeof(struct copy_buffer)*Nbufs);
89 if (cbl->cb_alloc == NULL)
91 return TMPI_ERR_NO_MEM;
93 cbl->cb = cbl->cb_alloc; /* the first one */
95 for (i = 0; i < Nbufs; i++)
97 ret = tMPI_Copy_buffer_init( &(cbl->cb_alloc[i]), size );
98 if (ret != TMPI_SUCCESS)
104 cbl->cb_alloc[i].next = &(cbl->cb_alloc[i+1]);
108 cbl->cb_alloc[i].next = NULL;
114 void tMPI_Copy_buffer_list_destroy(struct copy_buffer_list *cbl)
118 for (i = 0; i < cbl->Nbufs; i++)
120 tMPI_Copy_buffer_destroy( &(cbl->cb_alloc[i]) );
125 struct copy_buffer *tMPI_Copy_buffer_list_get(struct copy_buffer_list *cbl)
127 struct copy_buffer *ret = cbl->cb;
130 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COPY_NBUFFERS);
138 void tMPI_Copy_buffer_list_return(struct copy_buffer_list *cbl,
139 struct copy_buffer *cb)
146 int tMPI_Coll_envt_init(struct coll_env_thread *met, int N)
148 tMPI_Atomic_set(&(met->current_sync), 0);
149 tMPI_Atomic_set(&(met->n_remaining), 0);
150 met->buf = (void**)tMPI_Malloc(sizeof(void*)*N);
151 if (met->buf == NULL)
153 return TMPI_ERR_NO_MEM;
155 met->bufsize = (size_t*)tMPI_Malloc(sizeof(size_t)*N);
156 if (met->bufsize == NULL)
158 return TMPI_ERR_NO_MEM;
160 met->read_data = (tmpi_bool*)tMPI_Malloc(sizeof(tmpi_bool)*N);
161 if (met->read_data == NULL)
163 return TMPI_ERR_NO_MEM;
165 #ifdef USE_COLLECTIVE_COPY_BUFFER
166 met->cpbuf = (tMPI_Atomic_ptr_t*)tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*
168 if (met->read_data == NULL)
170 return TMPI_ERR_NO_MEM;
173 met->using_cb = FALSE;
175 tMPI_Event_init( &(met->send_ev) );
176 tMPI_Event_init( &(met->recv_ev) );
180 void tMPI_Coll_envt_destroy(struct coll_env_thread *met)
182 free( (void*)met->buf );
183 free( (void*)met->bufsize );
184 free( (void*)met->read_data );
186 #ifdef USE_COLLECTIVE_COPY_BUFFER
187 free( (void*)met->cpbuf );
191 int tMPI_Coll_env_init(struct coll_env *cev, int N)
196 cev->met = (struct coll_env_thread*)tMPI_Malloc(
197 sizeof(struct coll_env_thread)*N);
198 if (cev->met == NULL)
200 return TMPI_ERR_NO_MEM;
203 tMPI_Atomic_set(&(cev->coll.current_sync), 0);
204 tMPI_Atomic_set(&(cev->coll.n_remaining), 0);
205 for (i = 0; i < N; i++)
207 ret = tMPI_Coll_envt_init(&(cev->met[i]), N);
208 if (ret != TMPI_SUCCESS)
216 void tMPI_Coll_env_destroy(struct coll_env *cev)
219 for (i = 0; i < cev->N; i++)
221 tMPI_Coll_envt_destroy(&(cev->met[i]));
223 free( (void*)cev->met );
226 int tMPI_Coll_sync_init(struct coll_sync *csync, int N)
234 csync->events = (tMPI_Event*)tMPI_Malloc(sizeof(tMPI_Event)*N);
235 if (csync->events == NULL)
237 return TMPI_ERR_NO_MEM;
239 for (i = 0; i < N; i++)
241 tMPI_Event_init( &(csync->events[i]) );
246 void tMPI_Coll_sync_destroy(struct coll_sync *csync)
253 for (i = 0; i < csync->N; i++)
255 tMPI_Event_destroy( &(csync->events[i]) );
260 /* get a pointer the next coll_env once it's ready. */
261 struct coll_env *tMPI_Get_cev(tMPI_Comm comm, int myrank, int *counter)
263 struct coll_sync *csync = &(comm->csync[myrank]);
264 struct coll_env *cev;
265 #ifdef USE_COLLECTIVE_COPY_BUFFER
269 /* we increase our counter, and determine which coll_env we get */
271 *counter = csync->synct;
272 cev = &(comm->cev[csync->synct % N_COLL_ENV]);
275 #ifdef USE_COLLECTIVE_COPY_BUFFER
276 if (cev->met[myrank].using_cb)
280 N = tMPI_Event_wait( &(cev->met[myrank].send_ev));
281 tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
285 #ifdef USE_COLLECTIVE_COPY_BUFFER
286 /* clean up old copy_buffer pointers */
287 if (cev->met[myrank].cb)
289 tMPI_Copy_buffer_list_return(&(tMPI_Get_current()->cbl_multi),
290 cev->met[myrank].cb);
291 cev->met[myrank].cb = NULL;
292 cev->met[myrank].using_cb = FALSE;
299 void tMPI_Mult_recv(tMPI_Comm comm, struct coll_env *cev, int rank,
300 int index, int expected_tag, tMPI_Datatype recvtype,
301 size_t recvsize, void *recvbuf, int *ret)
303 size_t sendsize = cev->met[rank].bufsize[index];
305 /* check tags, types */
306 if ((cev->met[rank].datatype != recvtype ) ||
307 (cev->met[rank].tag != expected_tag))
309 *ret = tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
312 if (sendsize) /* we allow NULL ptrs if there's nothing to xmit */
315 #ifdef USE_COLLECTIVE_COPY_BUFFER
316 tmpi_bool decrease_ctr = FALSE;
319 if (sendsize > recvsize)
321 *ret = tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
325 if (cev->met[rank].buf == recvbuf)
327 *ret = tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_XFER_BUF_OVERLAP);
330 /* get source buffer */
331 #ifdef USE_COLLECTIVE_COPY_BUFFER
332 if (!(cev->met[rank].using_cb))
335 srcbuf = cev->met[rank].buf[index];
337 #ifdef USE_COLLECTIVE_COPY_BUFFER
340 srcbuf = tMPI_Atomic_ptr_get(&(cev->met[rank].cpbuf[index]));
341 tMPI_Atomic_memory_barrier_acq();
344 { /* there was (as of yet) no copied buffer */
345 void *try_again_srcbuf;
346 /* we need to try checking the pointer again after we increase
347 the read counter, signaling that one more thread
349 tMPI_Atomic_fetch_add(&(cev->met[rank].buf_readcount), 1);
350 /* a full memory barrier */
351 tMPI_Atomic_memory_barrier();
352 try_again_srcbuf = tMPI_Atomic_ptr_get(
353 &(cev->met[rank].cpbuf[index]));
354 if (!try_again_srcbuf)
356 /* apparently the copied buffer is not ready yet. We
357 just use the real source buffer. We have already
358 indicated we're reading from the regular buf. */
359 srcbuf = cev->met[rank].buf[index];
365 /* We tried again, and this time there was a copied buffer.
366 We use that, and indicate that we're not reading from the
367 regular buf. This case should be pretty rare. */
368 tMPI_Atomic_fetch_add(&(cev->met[rank].buf_readcount), -1);
369 tMPI_Atomic_memory_barrier_acq();
370 srcbuf = try_again_srcbuf;
377 tMPI_Profile_count_buffered_coll_xfer(tMPI_Get_current());
383 memcpy((char*)recvbuf, srcbuf, sendsize);
385 tMPI_Profile_count_coll_xfer(tMPI_Get_current());
388 #ifdef USE_COLLECTIVE_COPY_BUFFER
391 /* we decrement the read count; potentially releasing the buffer. */
392 tMPI_Atomic_memory_barrier_rel();
393 tMPI_Atomic_fetch_add( &(cev->met[rank].buf_readcount), -1);
397 /* signal one thread ready */
400 tMPI_Atomic_memory_barrier_rel();
401 reta = tMPI_Atomic_fetch_add( &(cev->met[rank].n_remaining), -1);
402 if (reta <= 1) /* n_remaining == 0 now. */
404 tMPI_Event_signal( &(cev->met[rank].send_ev) );
409 void tMPI_Coll_root_xfer(tMPI_Comm comm, tMPI_Datatype sendtype,
410 tMPI_Datatype recvtype,
411 size_t sendsize, size_t recvsize,
412 void* sendbuf, void* recvbuf, int *ret)
414 /* do root transfer */
415 if (recvsize < sendsize)
417 *ret = tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
420 if (recvtype != sendtype)
422 *ret = tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
425 if (sendbuf == recvbuf)
427 *ret = tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_XFER_BUF_OVERLAP);
431 memcpy(recvbuf, sendbuf, sendsize);
434 int tMPI_Post_multi(struct coll_env *cev, int myrank, int index,
435 int tag, tMPI_Datatype datatype, size_t bufsize,
436 void *buf, int n_remaining, int synct, int dest)
439 #ifdef USE_COLLECTIVE_COPY_BUFFER
440 /* decide based on the number of waiting threads */
441 tmpi_bool using_cb = (bufsize < (size_t)(n_remaining*COPY_BUFFER_SIZE));
443 cev->met[myrank].using_cb = using_cb;
446 /* we set it to NULL initially */
447 /*cev->met[myrank].cpbuf[index]=NULL;*/
448 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[index]), NULL);
450 tMPI_Atomic_set(&(cev->met[myrank].buf_readcount), 0);
453 cev->met[myrank].tag = tag;
454 cev->met[myrank].datatype = datatype;
455 cev->met[myrank].buf[index] = buf;
456 cev->met[myrank].bufsize[index] = bufsize;
457 tMPI_Atomic_set(&(cev->met[myrank].n_remaining), n_remaining);
458 tMPI_Atomic_memory_barrier_rel();
459 tMPI_Atomic_set(&(cev->met[myrank].current_sync), synct);
461 /* publish availability. */
464 for (i = 0; i < cev->N; i++)
468 tMPI_Event_signal( &(cev->met[i].recv_ev) );
474 tMPI_Event_signal( &(cev->met[dest].recv_ev) );
477 #ifdef USE_COLLECTIVE_COPY_BUFFER
478 /* becase we've published availability, we can start copying --
479 possibly in parallel with the receiver */
482 struct tmpi_thread *cur = tMPI_Get_current();
483 /* copy the buffer locally. First allocate */
484 cev->met[myrank].cb = tMPI_Copy_buffer_list_get( &(cur->cbl_multi) );
485 if (cev->met[myrank].cb == NULL)
487 return TMPI_ERR_COPY_NBUFFERS;
489 if (cev->met[myrank].cb->size < bufsize)
491 return TMPI_ERR_COPY_BUFFER_SIZE;
493 /* copy to the new buf */
494 memcpy(cev->met[myrank].cb->buf, buf, bufsize);
496 /* post the new buf */
497 tMPI_Atomic_memory_barrier_rel();
498 /*cev->met[myrank].cpbuf[index]=cev->met[myrank].cb->buf;*/
499 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[index]),
500 cev->met[myrank].cb->buf);
506 void tMPI_Wait_for_others(struct coll_env *cev, int myrank)
508 #if defined(TMPI_PROFILE)
509 struct tmpi_thread *cur = tMPI_Get_current();
510 tMPI_Profile_wait_start(cur);
515 #ifdef USE_COLLECTIVE_COPY_BUFFER
516 if (!(cev->met[myrank].using_cb) )
519 /* wait until everybody else is done copying the buffer */
520 tMPI_Event_wait( &(cev->met[myrank].send_ev));
521 tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
523 #ifdef USE_COLLECTIVE_COPY_BUFFER
526 /* wait until everybody else is done copying the original buffer.
527 This wait is bound to be very short (otherwise it wouldn't
528 be double-buffering) so we always spin here. */
530 /* dummy compare-and-swap to a value that is non-zero. The
531 atomic read with barrier below is simpler, but we keep this
532 code here commented out for if there is ever a platform
533 where the simple read doesn't work because of, say, cache
535 while (!tMPI_Atomic_cas( &(cev->met[rank].buf_readcount), 0,
539 tMPI_Atomic_memory_barrier(); /* a full barrier to make
540 sure that the sending
541 doesn't interfere with the
543 while (tMPI_Atomic_get( &(cev->met[myrank].buf_readcount) ) > 0)
546 tMPI_Atomic_memory_barrier_acq();
548 tMPI_Atomic_memory_barrier_acq();
552 #if defined(TMPI_PROFILE)
553 tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_send);
557 void tMPI_Wait_for_data(struct tmpi_thread tmpi_unused *cur, struct coll_env *cev,
560 #if defined(TMPI_PROFILE)
561 tMPI_Profile_wait_start(cur);
563 tMPI_Event_wait( &(cev->met[myrank].recv_ev));
564 tMPI_Event_process( &(cev->met[myrank].recv_ev), 1);
565 #if defined(TMPI_PROFILE)
566 tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_recv);
570 int tMPI_Barrier(tMPI_Comm comm)
573 struct tmpi_thread *cur = tMPI_Get_current();
574 tMPI_Profile_count_start(cur);
578 tMPI_Trace_print("tMPI_Barrier(%p, %d, %p, %d, %d, %p, %p)", comm);
583 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
588 #if defined(TMPI_PROFILE)
589 tMPI_Profile_wait_start(cur);
592 tMPI_Barrier_wait( &(comm->barrier) );
593 #if defined(TMPI_PROFILE)
594 tMPI_Profile_wait_stop(cur, TMPIWAIT_Barrier);
599 tMPI_Profile_count_stop(cur, TMPIFN_Barrier);