Merge branch 'release-4-6'
[alexxy/gromacs.git] / src / gromacs / gmxlib / thread_mpi / collective.c
index 069db3f1d8958947cc1d1b139f79c90d3f103c3a..5434dc425d3297a1d0c429e9c08d3aa602a544bc 100644 (file)
 
 #ifdef USE_COLLECTIVE_COPY_BUFFER
 /* initialize a copy buffer */
-void tMPI_Copy_buffer_init(struct copy_buffer *cb, size_t size)
+int tMPI_Copy_buffer_init(struct copy_buffer *cb, size_t size)
 {
-    cb->buf  = tMPI_Malloc(size);
+    cb->buf = tMPI_Malloc(size);
+    if (cb->buf == NULL)
+    {
+        return TMPI_ERR_NO_MEM;
+    }
     cb->size = size;
+    return TMPI_SUCCESS;
 }
 
 /* destroy a copy buffer */
@@ -75,19 +80,28 @@ void tMPI_Copy_buffer_destroy(struct copy_buffer *cb)
     free(cb->buf);
 }
 
-void tMPI_Copy_buffer_list_init(struct copy_buffer_list *cbl, int Nbufs,
-                                size_t size)
+int tMPI_Copy_buffer_list_init(struct copy_buffer_list *cbl, int Nbufs,
+                               size_t size)
 {
     int i;
+    int ret;
 
     cbl->size     = size;
     cbl->cb_alloc = (struct copy_buffer*)
         tMPI_Malloc(sizeof(struct copy_buffer)*Nbufs);
+    if (cbl->cb_alloc == NULL)
+    {
+        return TMPI_ERR_NO_MEM;
+    }
     cbl->cb    = cbl->cb_alloc; /* the first one */
     cbl->Nbufs = Nbufs;
     for (i = 0; i < Nbufs; i++)
     {
-        tMPI_Copy_buffer_init( &(cbl->cb_alloc[i]), size );
+        ret = tMPI_Copy_buffer_init( &(cbl->cb_alloc[i]), size );
+        if (ret != TMPI_SUCCESS)
+        {
+            return ret;
+        }
         if (i < Nbufs-1)
         {
             cbl->cb_alloc[i].next = &(cbl->cb_alloc[i+1]);
@@ -97,6 +111,7 @@ void tMPI_Copy_buffer_list_init(struct copy_buffer_list *cbl, int Nbufs,
             cbl->cb_alloc[i].next = NULL;
         }
     }
+    return TMPI_SUCCESS;
 }
 
 void tMPI_Copy_buffer_list_destroy(struct copy_buffer_list *cbl)
@@ -115,8 +130,8 @@ struct copy_buffer *tMPI_Copy_buffer_list_get(struct copy_buffer_list *cbl)
     struct copy_buffer *ret = cbl->cb;
     if (!ret)
     {
-        fprintf(stderr, "out of copy buffers!!");
-        exit(1);
+        tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COPY_NBUFFERS);
+        return NULL;
     }
     cbl->cb = ret->next;
 
@@ -138,20 +153,38 @@ void tMPI_Copy_buffer_list_return(struct copy_buffer_list *cbl,
 
 
 
-void tMPI_Coll_envt_init(struct coll_env_thread *met, int N)
+int tMPI_Coll_envt_init(struct coll_env_thread *met, int N)
 {
     tMPI_Atomic_set(&(met->current_sync), 0);
     tMPI_Atomic_set(&(met->n_remaining), 0);
-    met->buf       = (void**)tMPI_Malloc(sizeof(void*)*N);
-    met->bufsize   = (size_t*)tMPI_Malloc(sizeof(size_t)*N);
+    met->buf = (void**)tMPI_Malloc(sizeof(void*)*N);
+    if (met->buf == NULL)
+    {
+        return TMPI_ERR_NO_MEM;
+    }
+    met->bufsize = (size_t*)tMPI_Malloc(sizeof(size_t)*N);
+    if (met->bufsize == NULL)
+    {
+        return TMPI_ERR_NO_MEM;
+    }
     met->read_data = (tmpi_bool*)tMPI_Malloc(sizeof(tmpi_bool)*N);
+    if (met->read_data == NULL)
+    {
+        return TMPI_ERR_NO_MEM;
+    }
 #ifdef USE_COLLECTIVE_COPY_BUFFER
-    met->cpbuf    = (tMPI_Atomic_ptr_t*)tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*N);
+    met->cpbuf = (tMPI_Atomic_ptr_t*)tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*
+                                                 N);
+    if (met->read_data == NULL)
+    {
+        return TMPI_ERR_NO_MEM;
+    }
     met->cb       = NULL;
     met->using_cb = FALSE;
 #endif
     tMPI_Event_init( &(met->send_ev) );
     tMPI_Event_init( &(met->recv_ev) );
+    return TMPI_SUCCESS;
 }
 
 
@@ -166,19 +199,29 @@ void tMPI_Coll_envt_destroy(struct coll_env_thread *met)
 #endif
 }
 
-void tMPI_Coll_env_init(struct coll_env *cev, int N)
+int tMPI_Coll_env_init(struct coll_env *cev, int N)
 {
     int i;
+    int ret;
 
     cev->met = (struct coll_env_thread*)tMPI_Malloc(
                 sizeof(struct coll_env_thread)*N);
+    if (cev->met == NULL)
+    {
+        return TMPI_ERR_NO_MEM;
+    }
     cev->N = N;
     tMPI_Atomic_set(&(cev->coll.current_sync), 0);
     tMPI_Atomic_set(&(cev->coll.n_remaining), 0);
     for (i = 0; i < N; i++)
     {
-        tMPI_Coll_envt_init(&(cev->met[i]), N);
+        ret = tMPI_Coll_envt_init(&(cev->met[i]), N);
+        if (ret != TMPI_SUCCESS)
+        {
+            return ret;
+        }
     }
+    return TMPI_SUCCESS;
 }
 
 void tMPI_Coll_env_destroy(struct coll_env *cev)
@@ -192,7 +235,7 @@ void tMPI_Coll_env_destroy(struct coll_env *cev)
 }
 
 
-void tMPI_Coll_sync_init(struct coll_sync *csync, int N)
+int tMPI_Coll_sync_init(struct coll_sync *csync, int N)
 {
     int i;
 
@@ -201,10 +244,15 @@ void tMPI_Coll_sync_init(struct coll_sync *csync, int N)
     csync->N     = N;
 
     csync->events = (tMPI_Event*)tMPI_Malloc(sizeof(tMPI_Event)*N);
+    if (csync->events == NULL)
+    {
+        return TMPI_ERR_NO_MEM;
+    }
     for (i = 0; i < N; i++)
     {
         tMPI_Event_init( &(csync->events[i]) );
     }
+    return TMPI_SUCCESS;
 }
 
 void tMPI_Coll_sync_destroy(struct coll_sync *csync)
@@ -244,8 +292,11 @@ struct coll_env *tMPI_Get_cev(tMPI_Comm comm, int myrank, int *counter)
 #ifdef USE_COLLECTIVE_COPY_BUFFER
     if (cev->met[myrank].using_cb)
     {
-        N = tMPI_Event_wait( &(cev->met[myrank].send_ev));
-        tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
+        if (cev->N > 1)
+        {
+            N = tMPI_Event_wait( &(cev->met[myrank].send_ev));
+            tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
+        }
     }
 #endif
 #ifdef USE_COLLECTIVE_COPY_BUFFER
@@ -316,7 +367,7 @@ void tMPI_Mult_recv(tMPI_Comm comm, struct coll_env *cev, int rank,
                 /* we need to try checking the pointer again after we increase
                    the read counter, signaling that one more thread
                    is reading. */
-                tMPI_Atomic_add_return(&(cev->met[rank].buf_readcount), 1);
+                tMPI_Atomic_fetch_add(&(cev->met[rank].buf_readcount), 1);
                 /* a full memory barrier */
                 tMPI_Atomic_memory_barrier();
                 try_again_srcbuf = tMPI_Atomic_ptr_get(
@@ -335,7 +386,7 @@ void tMPI_Mult_recv(tMPI_Comm comm, struct coll_env *cev, int rank,
                     /* We tried again, and this time there was a copied buffer.
                        We use that, and indicate that we're not reading from the
                        regular buf. This case should be pretty rare.  */
-                    tMPI_Atomic_add_return(&(cev->met[rank].buf_readcount), -1);
+                    tMPI_Atomic_fetch_add(&(cev->met[rank].buf_readcount), -1);
                     tMPI_Atomic_memory_barrier_acq();
                     srcbuf = try_again_srcbuf;
                 }
@@ -360,7 +411,7 @@ void tMPI_Mult_recv(tMPI_Comm comm, struct coll_env *cev, int rank,
         {
             /* we decrement the read count; potentially releasing the buffer. */
             tMPI_Atomic_memory_barrier_rel();
-            tMPI_Atomic_add_return( &(cev->met[rank].buf_readcount), -1);
+            tMPI_Atomic_fetch_add( &(cev->met[rank].buf_readcount), -1);
         }
 #endif
     }
@@ -368,8 +419,8 @@ void tMPI_Mult_recv(tMPI_Comm comm, struct coll_env *cev, int rank,
     {
         int reta;
         tMPI_Atomic_memory_barrier_rel();
-        reta = tMPI_Atomic_add_return( &(cev->met[rank].n_remaining), -1);
-        if (reta <= 0)
+        reta = tMPI_Atomic_fetch_add( &(cev->met[rank].n_remaining), -1);
+        if (reta <= 1) /* n_remaining == 0 now. */
         {
             tMPI_Event_signal( &(cev->met[rank].send_ev) );
         }
@@ -401,9 +452,9 @@ void tMPI_Coll_root_xfer(tMPI_Comm comm, tMPI_Datatype sendtype,
     memcpy(recvbuf, sendbuf, sendsize);
 }
 
-void tMPI_Post_multi(struct coll_env *cev, int myrank, int index,
-                     int tag, tMPI_Datatype datatype, size_t bufsize,
-                     void *buf, int n_remaining, int synct, int dest)
+int tMPI_Post_multi(struct coll_env *cev, int myrank, int index,
+                    int tag, tMPI_Datatype datatype, size_t bufsize,
+                    void *buf, int n_remaining, int synct, int dest)
 {
     int i;
 #ifdef USE_COLLECTIVE_COPY_BUFFER
@@ -452,10 +503,13 @@ void tMPI_Post_multi(struct coll_env *cev, int myrank, int index,
         struct tmpi_thread *cur = tMPI_Get_current();
         /* copy the buffer locally. First allocate */
         cev->met[myrank].cb = tMPI_Copy_buffer_list_get( &(cur->cbl_multi) );
+        if (cev->met[myrank].cb == NULL)
+        {
+            return TMPI_ERR_COPY_NBUFFERS;
+        }
         if (cev->met[myrank].cb->size < bufsize)
         {
-            fprintf(stderr, "ERROR: cb size too small\n");
-            exit(1);
+            return TMPI_ERR_COPY_BUFFER_SIZE;
         }
         /* copy to the new buf */
         memcpy(cev->met[myrank].cb->buf, buf, bufsize);
@@ -467,6 +521,7 @@ void tMPI_Post_multi(struct coll_env *cev, int myrank, int index,
                             cev->met[myrank].cb->buf);
     }
 #endif
+    return TMPI_SUCCESS;
 }
 
 
@@ -477,38 +532,45 @@ void tMPI_Wait_for_others(struct coll_env *cev, int myrank)
     tMPI_Profile_wait_start(cur);
 #endif
 
-#ifdef USE_COLLECTIVE_COPY_BUFFER
-    if (!(cev->met[myrank].using_cb) )
-#endif
+    if (cev->N > 1)
     {
-        /* wait until everybody else is done copying the buffer */
-        tMPI_Event_wait( &(cev->met[myrank].send_ev));
-        tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
-    }
 #ifdef USE_COLLECTIVE_COPY_BUFFER
-    else
-    {
-        /* wait until everybody else is done copying the original buffer.
-           We use atomic add-return because we want to be sure of coherency.
-           This wait is bound to be very short (otherwise it wouldn't
-           be double-buffering) so we always spin here. */
-        /*tMPI_Atomic_memory_barrier_rel();*/
-#if 0
-        while (!tMPI_Atomic_cas( &(cev->met[rank].buf_readcount), 0,
-                                 -100000))
+        if (!(cev->met[myrank].using_cb) )
 #endif
+        {
+            /* wait until everybody else is done copying the buffer */
+            tMPI_Event_wait( &(cev->met[myrank].send_ev));
+            tMPI_Event_process( &(cev->met[myrank].send_ev), 1);
+        }
+#ifdef USE_COLLECTIVE_COPY_BUFFER
+        else
+        {
+            /* wait until everybody else is done copying the original buffer.
+               This wait is bound to be very short (otherwise it wouldn't
+               be double-buffering) so we always spin here. */
 #if 0
-        while (tMPI_Atomic_add_return( &(cev->met[myrank].buf_readcount), 0)
-               != 0)
+            /* dummy compare-and-swap to a value that is non-zero. The
+               atomic read with barrier below is simpler, but we keep this
+               code here commented out for if there is ever a platform
+               where the simple read doesn't work because of, say, cache
+               coherency issues. */
+            while (!tMPI_Atomic_cas( &(cev->met[rank].buf_readcount), 0,
+                                     -100000))
 #endif
 #if 1
-        while (tMPI_Atomic_get( &(cev->met[rank].buf_readcount) ) > 0)
+            tMPI_Atomic_memory_barrier();         /* a full barrier to make
+                                                     sure that the sending
+                                                     doesn't interfere with the
+                                                     waiting */
+            while (tMPI_Atomic_get( &(cev->met[myrank].buf_readcount) ) > 0)
 #endif
-        {
+            {
+                tMPI_Atomic_memory_barrier_acq();
+            }
+            tMPI_Atomic_memory_barrier_acq();
         }
-        tMPI_Atomic_memory_barrier_acq();
-    }
 #endif
+    }
 #if defined(TMPI_PROFILE)
     tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_send);
 #endif