#ifdef USE_COLLECTIVE_COPY_BUFFER
/* initialize a copy buffer */
-void tMPI_Copy_buffer_init(struct copy_buffer *cb, size_t size)
+int tMPI_Copy_buffer_init(struct copy_buffer *cb, size_t size)
{
- cb->buf = tMPI_Malloc(size);
+ cb->buf = tMPI_Malloc(size);
+ if (cb->buf == NULL)
+ {
+ return TMPI_ERR_NO_MEM;
+ }
cb->size = size;
+ return TMPI_SUCCESS;
}
/* destroy a copy buffer */
free(cb->buf);
}
-void tMPI_Copy_buffer_list_init(struct copy_buffer_list *cbl, int Nbufs,
- size_t size)
+int tMPI_Copy_buffer_list_init(struct copy_buffer_list *cbl, int Nbufs,
+ size_t size)
{
int i;
+ int ret;
cbl->size = size;
cbl->cb_alloc = (struct copy_buffer*)
tMPI_Malloc(sizeof(struct copy_buffer)*Nbufs);
+ if (cbl->cb_alloc == NULL)
+ {
+ return TMPI_ERR_NO_MEM;
+ }
cbl->cb = cbl->cb_alloc; /* the first one */
cbl->Nbufs = Nbufs;
for (i = 0; i < Nbufs; i++)
{
- tMPI_Copy_buffer_init( &(cbl->cb_alloc[i]), size );
+ ret = tMPI_Copy_buffer_init( &(cbl->cb_alloc[i]), size );
+ if (ret != TMPI_SUCCESS)
+ {
+ return ret;
+ }
if (i < Nbufs-1)
{
cbl->cb_alloc[i].next = &(cbl->cb_alloc[i+1]);
cbl->cb_alloc[i].next = NULL;
}
}
+ return TMPI_SUCCESS;
}
void tMPI_Copy_buffer_list_destroy(struct copy_buffer_list *cbl)
struct copy_buffer *ret = cbl->cb;
if (!ret)
{
- fprintf(stderr, "out of copy buffers!!");
- exit(1);
+ tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COPY_NBUFFERS);
+ return NULL;
}
cbl->cb = ret->next;
-void tMPI_Coll_envt_init(struct coll_env_thread *met, int N)
+int tMPI_Coll_envt_init(struct coll_env_thread *met, int N)
{
tMPI_Atomic_set(&(met->current_sync), 0);
tMPI_Atomic_set(&(met->n_remaining), 0);
- met->buf = (void**)tMPI_Malloc(sizeof(void*)*N);
- met->bufsize = (size_t*)tMPI_Malloc(sizeof(size_t)*N);
+ met->buf = (void**)tMPI_Malloc(sizeof(void*)*N);
+ if (met->buf == NULL)
+ {
+ return TMPI_ERR_NO_MEM;
+ }
+ met->bufsize = (size_t*)tMPI_Malloc(sizeof(size_t)*N);
+ if (met->bufsize == NULL)
+ {
+ return TMPI_ERR_NO_MEM;
+ }
met->read_data = (tmpi_bool*)tMPI_Malloc(sizeof(tmpi_bool)*N);
+ if (met->read_data == NULL)
+ {
+ return TMPI_ERR_NO_MEM;
+ }
#ifdef USE_COLLECTIVE_COPY_BUFFER
- met->cpbuf = (tMPI_Atomic_ptr_t*)tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*N);
+ met->cpbuf = (tMPI_Atomic_ptr_t*)tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*
+ N);
+ if (met->read_data == NULL)
+ {
+ return TMPI_ERR_NO_MEM;
+ }
met->cb = NULL;
met->using_cb = FALSE;
#endif
tMPI_Event_init( &(met->send_ev) );
tMPI_Event_init( &(met->recv_ev) );
+ return TMPI_SUCCESS;
}
#endif
}
-void tMPI_Coll_env_init(struct coll_env *cev, int N)
+int tMPI_Coll_env_init(struct coll_env *cev, int N)
{
int i;
+ int ret;
cev->met = (struct coll_env_thread*)tMPI_Malloc(
sizeof(struct coll_env_thread)*N);
+ if (cev->met == NULL)
+ {
+ return TMPI_ERR_NO_MEM;
+ }
cev->N = N;
tMPI_Atomic_set(&(cev->coll.current_sync), 0);
tMPI_Atomic_set(&(cev->coll.n_remaining), 0);
for (i = 0; i < N; i++)
{
- tMPI_Coll_envt_init(&(cev->met[i]), N);
+ ret = tMPI_Coll_envt_init(&(cev->met[i]), N);
+ if (ret != TMPI_SUCCESS)
+ {
+ return ret;
+ }
}
+ return TMPI_SUCCESS;
}
void tMPI_Coll_env_destroy(struct coll_env *cev)
}
-void tMPI_Coll_sync_init(struct coll_sync *csync, int N)
+int tMPI_Coll_sync_init(struct coll_sync *csync, int N)
{
int i;
csync->N = N;
csync->events = (tMPI_Event*)tMPI_Malloc(sizeof(tMPI_Event)*N);
+ if (csync->events == NULL)
+ {
+ return TMPI_ERR_NO_MEM;
+ }
for (i = 0; i < N; i++)
{
tMPI_Event_init( &(csync->events[i]) );
}
+ return TMPI_SUCCESS;
}
void tMPI_Coll_sync_destroy(struct coll_sync *csync)
#ifdef USE_COLLECTIVE_COPY_BUFFER
if (cev->met[myrank].using_cb)
{
- N = tMPI_Event_wait( &(cev->met[myrank].send_ev));
- tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
+ if (cev->N > 1)
+ {
+ N = tMPI_Event_wait( &(cev->met[myrank].send_ev));
+ tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
+ }
}
#endif
#ifdef USE_COLLECTIVE_COPY_BUFFER
/* we need to try checking the pointer again after we increase
the read counter, signaling that one more thread
is reading. */
- tMPI_Atomic_add_return(&(cev->met[rank].buf_readcount), 1);
+ tMPI_Atomic_fetch_add(&(cev->met[rank].buf_readcount), 1);
/* a full memory barrier */
tMPI_Atomic_memory_barrier();
try_again_srcbuf = tMPI_Atomic_ptr_get(
/* We tried again, and this time there was a copied buffer.
We use that, and indicate that we're not reading from the
regular buf. This case should be pretty rare. */
- tMPI_Atomic_add_return(&(cev->met[rank].buf_readcount), -1);
+ tMPI_Atomic_fetch_add(&(cev->met[rank].buf_readcount), -1);
tMPI_Atomic_memory_barrier_acq();
srcbuf = try_again_srcbuf;
}
{
/* we decrement the read count; potentially releasing the buffer. */
tMPI_Atomic_memory_barrier_rel();
- tMPI_Atomic_add_return( &(cev->met[rank].buf_readcount), -1);
+ tMPI_Atomic_fetch_add( &(cev->met[rank].buf_readcount), -1);
}
#endif
}
{
int reta;
tMPI_Atomic_memory_barrier_rel();
- reta = tMPI_Atomic_add_return( &(cev->met[rank].n_remaining), -1);
- if (reta <= 0)
+ reta = tMPI_Atomic_fetch_add( &(cev->met[rank].n_remaining), -1);
+ if (reta <= 1) /* n_remaining == 0 now. */
{
tMPI_Event_signal( &(cev->met[rank].send_ev) );
}
memcpy(recvbuf, sendbuf, sendsize);
}
-void tMPI_Post_multi(struct coll_env *cev, int myrank, int index,
- int tag, tMPI_Datatype datatype, size_t bufsize,
- void *buf, int n_remaining, int synct, int dest)
+int tMPI_Post_multi(struct coll_env *cev, int myrank, int index,
+ int tag, tMPI_Datatype datatype, size_t bufsize,
+ void *buf, int n_remaining, int synct, int dest)
{
int i;
#ifdef USE_COLLECTIVE_COPY_BUFFER
struct tmpi_thread *cur = tMPI_Get_current();
/* copy the buffer locally. First allocate */
cev->met[myrank].cb = tMPI_Copy_buffer_list_get( &(cur->cbl_multi) );
+ if (cev->met[myrank].cb == NULL)
+ {
+ return TMPI_ERR_COPY_NBUFFERS;
+ }
if (cev->met[myrank].cb->size < bufsize)
{
- fprintf(stderr, "ERROR: cb size too small\n");
- exit(1);
+ return TMPI_ERR_COPY_BUFFER_SIZE;
}
/* copy to the new buf */
memcpy(cev->met[myrank].cb->buf, buf, bufsize);
cev->met[myrank].cb->buf);
}
#endif
+ return TMPI_SUCCESS;
}
tMPI_Profile_wait_start(cur);
#endif
-#ifdef USE_COLLECTIVE_COPY_BUFFER
- if (!(cev->met[myrank].using_cb) )
-#endif
+ if (cev->N > 1)
{
- /* wait until everybody else is done copying the buffer */
- tMPI_Event_wait( &(cev->met[myrank].send_ev));
- tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
- }
#ifdef USE_COLLECTIVE_COPY_BUFFER
- else
- {
- /* wait until everybody else is done copying the original buffer.
- We use atomic add-return because we want to be sure of coherency.
- This wait is bound to be very short (otherwise it wouldn't
- be double-buffering) so we always spin here. */
- /*tMPI_Atomic_memory_barrier_rel();*/
-#if 0
- while (!tMPI_Atomic_cas( &(cev->met[rank].buf_readcount), 0,
- -100000))
+ if (!(cev->met[myrank].using_cb) )
#endif
+ {
+ /* wait until everybody else is done copying the buffer */
+ tMPI_Event_wait( &(cev->met[myrank].send_ev));
+ tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
+ }
+#ifdef USE_COLLECTIVE_COPY_BUFFER
+ else
+ {
+ /* wait until everybody else is done copying the original buffer.
+ This wait is bound to be very short (otherwise it wouldn't
+ be double-buffering) so we always spin here. */
#if 0
- while (tMPI_Atomic_add_return( &(cev->met[myrank].buf_readcount), 0)
- != 0)
+ /* dummy compare-and-swap to a value that is non-zero. The
+ atomic read with barrier below is simpler, but we keep this
+ code here commented out for if there is ever a platform
+ where the simple read doesn't work because of, say, cache
+ coherency issues. */
+ while (!tMPI_Atomic_cas( &(cev->met[rank].buf_readcount), 0,
+ -100000))
#endif
#if 1
- while (tMPI_Atomic_get( &(cev->met[rank].buf_readcount) ) > 0)
+ tMPI_Atomic_memory_barrier(); /* a full barrier to make
+ sure that the sending
+ doesn't interfere with the
+ waiting */
+ while (tMPI_Atomic_get( &(cev->met[myrank].buf_readcount) ) > 0)
#endif
- {
+ {
+ tMPI_Atomic_memory_barrier_acq();
+ }
+ tMPI_Atomic_memory_barrier_acq();
}
- tMPI_Atomic_memory_barrier_acq();
- }
#endif
+ }
#if defined(TMPI_PROFILE)
tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_send);
#endif