/*
-This source code file is part of thread_mpi.
-Written by Sander Pronk, Erik Lindahl, and possibly others.
+ This source code file is part of thread_mpi.
+ Written by Sander Pronk, Erik Lindahl, and possibly others.
-Copyright (c) 2009, Sander Pronk, Erik Lindahl.
-All rights reserved.
+ Copyright (c) 2009, Sander Pronk, Erik Lindahl.
+ All rights reserved.
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are met:
-1) Redistributions of source code must retain the above copyright
+ Redistribution and use in source and binary forms, with or without
+ modification, are permitted provided that the following conditions are met:
+ 1) Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
-2) Redistributions in binary form must reproduce the above copyright
+ 2) Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
-3) Neither the name of the copyright holders nor the
+ 3) Neither the name of the copyright holders nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
-THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
-EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
-DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
-LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
-ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-If you want to redistribute modifications, please consider that
-scientific software is very special. Version control is crucial -
-bugs must be traceable. We will be happy to consider code for
-inclusion in the official distribution, but derived work should not
-be called official thread_mpi. Details are found in the README & COPYING
-files.
-*/
+ THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
+ EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
+ DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+ If you want to redistribute modifications, please consider that
+ scientific software is very special. Version control is crucial -
+ bugs must be traceable. We will be happy to consider code for
+ inclusion in the official distribution, but derived work should not
+ be called official thread_mpi. Details are found in the README & COPYING
+ files.
+ */
#ifdef HAVE_TMPI_CONFIG_H
#include "tmpi_config.h"
/* initialize a copy buffer */
void tMPI_Copy_buffer_init(struct copy_buffer *cb, size_t size)
{
- cb->buf=tMPI_Malloc(size);
- cb->size=size;
+ cb->buf = tMPI_Malloc(size);
+ cb->size = size;
}
/* destroy a copy buffer */
{
int i;
- cbl->size=size;
- cbl->cb_alloc=(struct copy_buffer*)
- tMPI_Malloc(sizeof(struct copy_buffer)*Nbufs);
- cbl->cb=cbl->cb_alloc; /* the first one */
+ cbl->size = size;
+ cbl->cb_alloc = (struct copy_buffer*)
+ tMPI_Malloc(sizeof(struct copy_buffer)*Nbufs);
+ cbl->cb = cbl->cb_alloc; /* the first one */
cbl->Nbufs = Nbufs;
- for(i=0;i<Nbufs;i++)
+ for (i = 0; i < Nbufs; i++)
{
tMPI_Copy_buffer_init( &(cbl->cb_alloc[i]), size );
- if (i<Nbufs-1)
- cbl->cb_alloc[i].next=&(cbl->cb_alloc[i+1]);
+ if (i < Nbufs-1)
+ {
+ cbl->cb_alloc[i].next = &(cbl->cb_alloc[i+1]);
+ }
else
- cbl->cb_alloc[i].next=NULL;
+ {
+ cbl->cb_alloc[i].next = NULL;
+ }
}
}
{
int i;
- for(i=0;i<cbl->Nbufs;i++)
+ for (i = 0; i < cbl->Nbufs; i++)
{
tMPI_Copy_buffer_destroy( &(cbl->cb_alloc[i]) );
}
struct copy_buffer *tMPI_Copy_buffer_list_get(struct copy_buffer_list *cbl)
{
- struct copy_buffer *ret=cbl->cb;
+ struct copy_buffer *ret = cbl->cb;
if (!ret)
{
- fprintf(stderr,"out of copy buffers!!");
+ fprintf(stderr, "out of copy buffers!!");
exit(1);
}
- cbl->cb=ret->next;
+ cbl->cb = ret->next;
return ret;
}
-void tMPI_Copy_buffer_list_return(struct copy_buffer_list *cbl,
- struct copy_buffer *cb)
+void tMPI_Copy_buffer_list_return(struct copy_buffer_list *cbl,
+ struct copy_buffer *cb)
{
- cb->next=cbl->cb;
- cbl->cb=cb;
+ cb->next = cbl->cb;
+ cbl->cb = cb;
}
#endif
{
tMPI_Atomic_set(&(met->current_sync), 0);
tMPI_Atomic_set(&(met->n_remaining), 0);
- met->buf=(void**)tMPI_Malloc(sizeof(void*)*N);
- met->bufsize=(size_t*)tMPI_Malloc(sizeof(size_t)*N);
- met->read_data=(tmpi_bool*)tMPI_Malloc(sizeof(tmpi_bool)*N);
+ met->buf = (void**)tMPI_Malloc(sizeof(void*)*N);
+ met->bufsize = (size_t*)tMPI_Malloc(sizeof(size_t)*N);
+ met->read_data = (tmpi_bool*)tMPI_Malloc(sizeof(tmpi_bool)*N);
#ifdef USE_COLLECTIVE_COPY_BUFFER
- met->cpbuf=(tMPI_Atomic_ptr_t*)tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*N);
- met->cb=NULL;
- met->using_cb=FALSE;
+ met->cpbuf = (tMPI_Atomic_ptr_t*)tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*N);
+ met->cb = NULL;
+ met->using_cb = FALSE;
#endif
tMPI_Event_init( &(met->send_ev) );
tMPI_Event_init( &(met->recv_ev) );
{
int i;
- cev->met=(struct coll_env_thread*)tMPI_Malloc(
- sizeof(struct coll_env_thread)*N);
- cev->N=N;
+ cev->met = (struct coll_env_thread*)tMPI_Malloc(
+ sizeof(struct coll_env_thread)*N);
+ cev->N = N;
tMPI_Atomic_set(&(cev->coll.current_sync), 0);
tMPI_Atomic_set(&(cev->coll.n_remaining), 0);
- for(i=0;i<N;i++)
+ for (i = 0; i < N; i++)
{
tMPI_Coll_envt_init(&(cev->met[i]), N);
}
void tMPI_Coll_env_destroy(struct coll_env *cev)
{
int i;
- for(i=0;i<cev->N;i++)
+ for (i = 0; i < cev->N; i++)
{
tMPI_Coll_envt_destroy(&(cev->met[i]));
}
{
int i;
- csync->synct=0;
- csync->syncs=0;
- csync->N=N;
+ csync->synct = 0;
+ csync->syncs = 0;
+ csync->N = N;
- csync->events=(tMPI_Event*)tMPI_Malloc(sizeof(tMPI_Event)*N);
- for(i=0;i<N;i++)
+ csync->events = (tMPI_Event*)tMPI_Malloc(sizeof(tMPI_Event)*N);
+ for (i = 0; i < N; i++)
{
tMPI_Event_init( &(csync->events[i]) );
}
{
int i;
- csync->synct=0;
- csync->syncs=0;
+ csync->synct = 0;
+ csync->syncs = 0;
- for(i=0;i<csync->N;i++)
+ for (i = 0; i < csync->N; i++)
{
tMPI_Event_destroy( &(csync->events[i]) );
}
/* get a pointer the next coll_env once it's ready. */
struct coll_env *tMPI_Get_cev(tMPI_Comm comm, int myrank, int *counter)
{
- struct coll_sync *csync=&(comm->csync[myrank]);
- struct coll_env *cev;
+ struct coll_sync *csync = &(comm->csync[myrank]);
+ struct coll_env *cev;
#ifdef USE_COLLECTIVE_COPY_BUFFER
- int N;
+ int N;
#endif
/* we increase our counter, and determine which coll_env we get */
csync->synct++;
- *counter=csync->synct;
- cev=&(comm->cev[csync->synct % N_COLL_ENV]);
+ *counter = csync->synct;
+ cev = &(comm->cev[csync->synct % N_COLL_ENV]);
#ifdef USE_COLLECTIVE_COPY_BUFFER
if (cev->met[myrank].using_cb)
{
- N=tMPI_Event_wait( &(cev->met[myrank].send_ev));
+ N = tMPI_Event_wait( &(cev->met[myrank].send_ev));
tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
}
#endif
#ifdef USE_COLLECTIVE_COPY_BUFFER
/* clean up old copy_buffer pointers */
- if (cev->met[myrank].cb)
+ if (cev->met[myrank].cb)
{
tMPI_Copy_buffer_list_return(&(tMPI_Get_current()->cbl_multi),
cev->met[myrank].cb);
- cev->met[myrank].cb=NULL;
- cev->met[myrank].using_cb=FALSE;
+ cev->met[myrank].cb = NULL;
+ cev->met[myrank].using_cb = FALSE;
}
#endif
void tMPI_Mult_recv(tMPI_Comm comm, struct coll_env *cev, int rank,
- int index, int expected_tag, tMPI_Datatype recvtype,
+ int index, int expected_tag, tMPI_Datatype recvtype,
size_t recvsize, void *recvbuf, int *ret)
{
- size_t sendsize=cev->met[rank].bufsize[index];
+ size_t sendsize = cev->met[rank].bufsize[index];
/* check tags, types */
- if ((cev->met[rank].datatype != recvtype ) ||
+ if ((cev->met[rank].datatype != recvtype ) ||
(cev->met[rank].tag != expected_tag))
{
- *ret=tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
+ *ret = tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
}
-
+
if (sendsize) /* we allow NULL ptrs if there's nothing to xmit */
{
- void *srcbuf;
+ void *srcbuf;
#ifdef USE_COLLECTIVE_COPY_BUFFER
- tmpi_bool decrease_ctr=FALSE;
+ tmpi_bool decrease_ctr = FALSE;
#endif
- if ( sendsize > recvsize )
+ if (sendsize > recvsize)
{
- *ret=tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
+ *ret = tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
return;
}
- if ( cev->met[rank].buf == recvbuf )
+ if (cev->met[rank].buf == recvbuf)
{
- *ret=tMPI_Error(TMPI_COMM_WORLD,TMPI_ERR_XFER_BUF_OVERLAP);
+ *ret = tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_XFER_BUF_OVERLAP);
return;
}
/* get source buffer */
#ifdef USE_COLLECTIVE_COPY_BUFFER
- if ( !(cev->met[rank].using_cb))
+ if (!(cev->met[rank].using_cb))
#endif
{
- srcbuf=cev->met[rank].buf[index];
+ srcbuf = cev->met[rank].buf[index];
}
#ifdef USE_COLLECTIVE_COPY_BUFFER
else
{
- srcbuf=tMPI_Atomic_ptr_get(&(cev->met[rank].cpbuf[index]));
+ srcbuf = tMPI_Atomic_ptr_get(&(cev->met[rank].cpbuf[index]));
tMPI_Atomic_memory_barrier_acq();
- if(!srcbuf)
- { /* there was (as of yet) no copied buffer */
+ if (!srcbuf)
+ { /* there was (as of yet) no copied buffer */
void *try_again_srcbuf;
/* we need to try checking the pointer again after we increase
the read counter, signaling that one more thread
tMPI_Atomic_add_return(&(cev->met[rank].buf_readcount), 1);
/* a full memory barrier */
tMPI_Atomic_memory_barrier();
- try_again_srcbuf=tMPI_Atomic_ptr_get(
- &(cev->met[rank].cpbuf[index]));
+ try_again_srcbuf = tMPI_Atomic_ptr_get(
+ &(cev->met[rank].cpbuf[index]));
if (!try_again_srcbuf)
{
/* apparently the copied buffer is not ready yet. We
just use the real source buffer. We have already
indicated we're reading from the regular buf. */
- srcbuf=cev->met[rank].buf[index];
- decrease_ctr=TRUE;
+ srcbuf = cev->met[rank].buf[index];
+ decrease_ctr = TRUE;
}
else
{
- /* We tried again, and this time there was a copied buffer.
+ /* We tried again, and this time there was a copied buffer.
We use that, and indicate that we're not reading from the
regular buf. This case should be pretty rare. */
- tMPI_Atomic_add_return(&(cev->met[rank].buf_readcount),-1);
+ tMPI_Atomic_add_return(&(cev->met[rank].buf_readcount), -1);
tMPI_Atomic_memory_barrier_acq();
- srcbuf=try_again_srcbuf;
+ srcbuf = try_again_srcbuf;
}
}
#ifdef TMPI_PROFILE
if (srcbuf)
+ {
tMPI_Profile_count_buffered_coll_xfer(tMPI_Get_current());
+ }
#endif
}
#endif
#endif
}
/* signal one thread ready */
- {
+ {
int reta;
tMPI_Atomic_memory_barrier_rel();
- reta=tMPI_Atomic_add_return( &(cev->met[rank].n_remaining), -1);
+ reta = tMPI_Atomic_add_return( &(cev->met[rank].n_remaining), -1);
if (reta <= 0)
{
tMPI_Event_signal( &(cev->met[rank].send_ev) );
}
}
-void tMPI_Coll_root_xfer(tMPI_Comm comm, tMPI_Datatype sendtype,
- tMPI_Datatype recvtype,
- size_t sendsize, size_t recvsize,
+void tMPI_Coll_root_xfer(tMPI_Comm comm, tMPI_Datatype sendtype,
+ tMPI_Datatype recvtype,
+ size_t sendsize, size_t recvsize,
void* sendbuf, void* recvbuf, int *ret)
{
/* do root transfer */
if (recvsize < sendsize)
{
- *ret=tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
+ *ret = tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
return;
}
if (recvtype != sendtype)
{
- *ret=tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
+ *ret = tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
return;
}
- if ( sendbuf == recvbuf )
+ if (sendbuf == recvbuf)
{
- *ret=tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_XFER_BUF_OVERLAP);
+ *ret = tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_XFER_BUF_OVERLAP);
return;
}
memcpy(recvbuf, sendbuf, sendsize);
}
-void tMPI_Post_multi(struct coll_env *cev, int myrank, int index,
- int tag, tMPI_Datatype datatype, size_t bufsize,
+void tMPI_Post_multi(struct coll_env *cev, int myrank, int index,
+ int tag, tMPI_Datatype datatype, size_t bufsize,
void *buf, int n_remaining, int synct, int dest)
{
int i;
#ifdef USE_COLLECTIVE_COPY_BUFFER
/* decide based on the number of waiting threads */
- tmpi_bool using_cb=(bufsize < (size_t)(n_remaining*COPY_BUFFER_SIZE));
+ tmpi_bool using_cb = (bufsize < (size_t)(n_remaining*COPY_BUFFER_SIZE));
- cev->met[myrank].using_cb=using_cb;
+ cev->met[myrank].using_cb = using_cb;
if (using_cb)
{
/* we set it to NULL initially */
tMPI_Atomic_set(&(cev->met[myrank].buf_readcount), 0);
}
#endif
- cev->met[myrank].tag=tag;
- cev->met[myrank].datatype=datatype;
- cev->met[myrank].buf[index]=buf;
- cev->met[myrank].bufsize[index]=bufsize;
+ cev->met[myrank].tag = tag;
+ cev->met[myrank].datatype = datatype;
+ cev->met[myrank].buf[index] = buf;
+ cev->met[myrank].bufsize[index] = bufsize;
tMPI_Atomic_set(&(cev->met[myrank].n_remaining), n_remaining);
tMPI_Atomic_memory_barrier_rel();
tMPI_Atomic_set(&(cev->met[myrank].current_sync), synct);
/* publish availability. */
- if (dest<0)
+ if (dest < 0)
{
- for(i=0;i<cev->N;i++)
+ for (i = 0; i < cev->N; i++)
{
if (i != myrank)
+ {
tMPI_Event_signal( &(cev->met[i].recv_ev) );
+ }
}
}
else
}
#ifdef USE_COLLECTIVE_COPY_BUFFER
- /* becase we've published availability, we can start copying --
+ /* becase we've published availability, we can start copying --
possibly in parallel with the receiver */
if (using_cb)
{
- struct tmpi_thread *cur=tMPI_Get_current();
- /* copy the buffer locally. First allocate */
- cev->met[myrank].cb=tMPI_Copy_buffer_list_get( &(cur->cbl_multi) );
+ struct tmpi_thread *cur = tMPI_Get_current();
+ /* copy the buffer locally. First allocate */
+ cev->met[myrank].cb = tMPI_Copy_buffer_list_get( &(cur->cbl_multi) );
if (cev->met[myrank].cb->size < bufsize)
{
fprintf(stderr, "ERROR: cb size too small\n");
/* post the new buf */
tMPI_Atomic_memory_barrier_rel();
/*cev->met[myrank].cpbuf[index]=cev->met[myrank].cb->buf;*/
- tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[index]),
+ tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[index]),
cev->met[myrank].cb->buf);
}
#endif
void tMPI_Wait_for_others(struct coll_env *cev, int myrank)
{
-#if defined(TMPI_PROFILE)
- struct tmpi_thread *cur=tMPI_Get_current();
+#if defined(TMPI_PROFILE)
+ struct tmpi_thread *cur = tMPI_Get_current();
tMPI_Profile_wait_start(cur);
#endif
#ifdef USE_COLLECTIVE_COPY_BUFFER
- if (! (cev->met[myrank].using_cb) )
+ if (!(cev->met[myrank].using_cb) )
#endif
{
/* wait until everybody else is done copying the buffer */
#ifdef USE_COLLECTIVE_COPY_BUFFER
else
{
- /* wait until everybody else is done copying the original buffer.
+ /* wait until everybody else is done copying the original buffer.
We use atomic add-return because we want to be sure of coherency.
- This wait is bound to be very short (otherwise it wouldn't
+ This wait is bound to be very short (otherwise it wouldn't
be double-buffering) so we always spin here. */
/*tMPI_Atomic_memory_barrier_rel();*/
#if 0
while (!tMPI_Atomic_cas( &(cev->met[rank].buf_readcount), 0,
- -100000))
+ -100000))
#endif
#if 0
- while (tMPI_Atomic_add_return( &(cev->met[myrank].buf_readcount), 0)
+ while (tMPI_Atomic_add_return( &(cev->met[myrank].buf_readcount), 0)
!= 0)
#endif
#if 1
- while (tMPI_Atomic_get( &(cev->met[rank].buf_readcount) )>0)
+ while (tMPI_Atomic_get( &(cev->met[rank].buf_readcount) ) > 0)
#endif
{
}
tMPI_Atomic_memory_barrier_acq();
}
#endif
-#if defined(TMPI_PROFILE)
+#if defined(TMPI_PROFILE)
tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_send);
#endif
}
-void tMPI_Wait_for_data(struct tmpi_thread *cur, struct coll_env *cev,
+void tMPI_Wait_for_data(struct tmpi_thread *cur, struct coll_env *cev,
int myrank)
{
-#if defined(TMPI_PROFILE)
+#if defined(TMPI_PROFILE)
tMPI_Profile_wait_start(cur);
#endif
tMPI_Event_wait( &(cev->met[myrank].recv_ev));
tMPI_Event_process( &(cev->met[myrank].recv_ev), 1);
-#if defined(TMPI_PROFILE)
+#if defined(TMPI_PROFILE)
tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_recv);
#endif
}
-int tMPI_Barrier(tMPI_Comm comm)
+int tMPI_Barrier(tMPI_Comm comm)
{
#ifdef TMPI_PROFILE
- struct tmpi_thread *cur=tMPI_Get_current();
+ struct tmpi_thread *cur = tMPI_Get_current();
tMPI_Profile_count_start(cur);
#endif
return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
}
- if (comm->grp.N>1)
+ if (comm->grp.N > 1)
{
-#if defined(TMPI_PROFILE)
+#if defined(TMPI_PROFILE)
tMPI_Profile_wait_start(cur);
#endif
tMPI_Barrier_wait( &(comm->barrier) );
-#if defined(TMPI_PROFILE)
+#if defined(TMPI_PROFILE)
tMPI_Profile_wait_stop(cur, TMPIWAIT_Barrier);
#endif
}
#endif
return TMPI_SUCCESS;
}
-
-
-
-