/* start N threads with argc, argv (used by tMPI_Init)*/
-void tMPI_Start_threads(tmpi_bool main_returns, int N,
- tMPI_Affinity_strategy aff_strategy,
- int *argc, char ***argv,
- void (*start_fn)(void*), void *start_arg,
- int (*start_fn_main)(int, char**));
+int tMPI_Start_threads(tmpi_bool main_returns, int N,
+ tMPI_Affinity_strategy aff_strategy,
+ int *argc, char ***argv,
+ void (*start_fn)(void*), void *start_arg,
+ int (*start_fn_main)(int, char**));
/* starter function for threads; takes a void pointer to a
struct tmpi_starter_, which calls main() if tmpi_start_.fn == NULL */
static void* tMPI_Thread_starter(void *arg);
/* allocate and initialize the data associated with a thread structure */
-static void tMPI_Thread_init(struct tmpi_thread *th);
+static int tMPI_Thread_init(struct tmpi_thread *th);
/* deallocate the data associated with a thread structure */
static void tMPI_Thread_destroy(struct tmpi_thread *th);
struct tmpi_thread * th = NULL;
static tMPI_Thread_mutex_t mtx = TMPI_THREAD_MUTEX_INITIALIZER;
+ /* don't check for errors during trace */
tMPI_Thread_mutex_lock(&mtx);
if (threads)
{
#endif
-#if 0
-struct tmpi_thread *tMPI_Get_current(void)
-{
- if (!threads)
- {
- return NULL;
- }
-
- return (struct tmpi_thread*)tMPI_thread_getspecific(id_key);
-}
-
-
-unsigned int tMPI_Threadnr(struct tmpi_thread *thr)
-{
- return thr-threads;
-}
-#endif
-#if 0
-unsigned int tMPI_This_threadnr(void)
-{
- return tMPI_Get_current()-threads;
-}
-
-struct tmpi_thread *tMPI_Get_thread(tMPI_Comm comm, int rank)
-{
- /* check destination */
- if ( (rank < 0) || (rank > comm->grp.N) )
- {
- tMPI_Error(comm, TMPI_ERR_GROUP_RANK);
- return NULL;
- }
- return comm->grp.peers[rank];
-}
-#endif
-
tmpi_bool tMPI_Is_master(void)
{
/* if there are no other threads, we're the main thread */
return ret;
}
-static void tMPI_Thread_init(struct tmpi_thread *th)
+static int tMPI_Thread_init(struct tmpi_thread *th)
{
+ int ret;
int N_envelopes = (Nthreads+1)*N_EV_ALLOC;
int N_send_envelopes = N_EV_ALLOC;
int N_reqs = (Nthreads+1)*N_EV_ALLOC;
int i;
/* we set our thread id, as a thread-specific piece of global data. */
- tMPI_Thread_setspecific(id_key, th);
+ ret = tMPI_Thread_setspecific(id_key, th);
+ if (ret != 0)
+ {
+ return ret;
+ }
/* allocate comm.self */
- th->self_comm = tMPI_Comm_alloc(TMPI_COMM_WORLD, 1);
+ ret = tMPI_Comm_alloc( &(th->self_comm), TMPI_COMM_WORLD, 1);
+ if (ret != TMPI_SUCCESS)
+ {
+ return ret;
+ }
th->self_comm->grp.peers[0] = th;
/* allocate envelopes */
- tMPI_Free_env_list_init( &(th->envelopes), N_envelopes );
+ ret = tMPI_Free_env_list_init( &(th->envelopes), N_envelopes );
+ if (ret != TMPI_SUCCESS)
+ {
+ return ret;
+ }
/* recv list */
- tMPI_Recv_env_list_init( &(th->evr));
+ ret = tMPI_Recv_env_list_init( &(th->evr));
+ if (ret != TMPI_SUCCESS)
+ {
+ return ret;
+ }
/* send lists */
th->evs = (struct send_envelope_list*)tMPI_Malloc(
sizeof(struct send_envelope_list)*Nthreads);
+ if (th->evs == NULL)
+ {
+ return TMPI_ERR_NO_MEM;
+ }
for (i = 0; i < Nthreads; i++)
{
- tMPI_Send_env_list_init( &(th->evs[i]), N_send_envelopes);
+ ret = tMPI_Send_env_list_init( &(th->evs[i]), N_send_envelopes);
+ if (ret != TMPI_SUCCESS)
+ {
+ return ret;
+ }
}
tMPI_Atomic_set( &(th->ev_outgoing_received), 0);
tMPI_Event_init( &(th->p2p_event) );
/* allocate requests */
- tMPI_Req_list_init(&(th->rql), N_reqs);
+ ret = tMPI_Req_list_init(&(th->rql), N_reqs);
+ if (ret != TMPI_SUCCESS)
+ {
+ return ret;
+ }
+
#ifdef USE_COLLECTIVE_COPY_BUFFER
/* allcate copy_buffer list */
- tMPI_Copy_buffer_list_init(&(th->cbl_multi), (Nthreads+1)*(N_COLL_ENV+1),
- Nthreads*COPY_BUFFER_SIZE);
+ ret = tMPI_Copy_buffer_list_init(&(th->cbl_multi),
+ (Nthreads+1)*(N_COLL_ENV+1),
+ Nthreads*COPY_BUFFER_SIZE);
+ if (ret != TMPI_SUCCESS)
+ {
+ return ret;
+ }
#endif
#ifdef TMPI_PROFILE
- tMPI_Profile_init(&(th->profile));
+ ret = tMPI_Profile_init(&(th->profile));
+ if (ret != TMPI_SUCCESS)
+ {
+ return ret;
+ }
#endif
/* now wait for all other threads to come on line, before we
start the MPI program */
- tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
+ ret = tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
+ if (ret != 0)
+ {
+ return ret;;
+ }
+ return ret;
}
}
}
-static void tMPI_Global_init(struct tmpi_global *g, int Nthreads)
+static int tMPI_Global_init(struct tmpi_global *g, int Nthreads)
{
+ int ret;
+
g->usertypes = NULL;
g->N_usertypes = 0;
g->Nalloc_usertypes = 0;
- tMPI_Thread_mutex_init(&(g->timer_mutex));
+ ret = tMPI_Thread_mutex_init(&(g->timer_mutex));
+ if (ret != 0)
+ {
+ return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
+ }
tMPI_Spinlock_init(&(g->datatype_lock));
- tMPI_Thread_barrier_init( &(g->barrier), Nthreads);
+ ret = tMPI_Thread_barrier_init( &(g->barrier), Nthreads);
+ if (ret != 0)
+ {
+ return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
+ }
+
+ ret = tMPI_Thread_mutex_init(&(g->comm_link_lock));
+ if (ret != 0)
+ {
+ return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
+ }
- tMPI_Thread_mutex_init(&(g->comm_link_lock));
#if !(defined( _WIN32 ) || defined( _WIN64 ) )
/* the time at initialization. */
/* the time at initialization. */
g->timer_init = GetTickCount();
#endif
-
+ return TMPI_SUCCESS;
}
static void tMPI_Global_destroy(struct tmpi_global *g)
static void* tMPI_Thread_starter(void *arg)
{
+ int ret;
struct tmpi_thread *th = (struct tmpi_thread*)arg;
#ifdef TMPI_TRACE
tMPI_Trace_print("Created thread nr. %d", (int)(th-threads));
#endif
- tMPI_Thread_init(th);
+ ret = tMPI_Thread_init(th);
+ if (ret != TMPI_SUCCESS)
+ {
+ return NULL;
+ }
/* start_fn, start_arg, argc and argv were set by the calling function */
if (!th->start_fn)
}
}
- return 0;
+ return NULL;
}
-void tMPI_Start_threads(tmpi_bool main_returns, int N,
- tMPI_Affinity_strategy aff_strategy,
- int *argc, char ***argv,
- void (*start_fn)(void*), void *start_arg,
- int (*start_fn_main)(int, char**))
+int tMPI_Start_threads(tmpi_bool main_returns, int N,
+ tMPI_Affinity_strategy aff_strategy,
+ int *argc, char ***argv,
+ void (*start_fn)(void*), void *start_arg,
+ int (*start_fn_main)(int, char**))
{
+ int ret;
#ifdef TMPI_TRACE
tMPI_Trace_print("tMPI_Start_threads(%d, %d, %d, %d, %d, %p, %p, %p, %p)",
main_returns, N, aff_strategy, argc, argv, start_fn,
/* allocate global data */
tmpi_global = (struct tmpi_global*)
tMPI_Malloc(sizeof(struct tmpi_global));
- tMPI_Global_init(tmpi_global, N);
+ if (tmpi_global == 0)
+ {
+ return TMPI_ERR_NO_MEM;
+ }
+ ret = tMPI_Global_init(tmpi_global, N);
+ if (ret != TMPI_SUCCESS)
+ {
+ return ret;
+ }
/* allocate world and thread data */
- threads = (struct tmpi_thread*)tMPI_Malloc(sizeof(struct tmpi_thread)*N);
- TMPI_COMM_WORLD = tMPI_Comm_alloc(NULL, N);
+ threads = (struct tmpi_thread*)
+ tMPI_Malloc(sizeof(struct tmpi_thread)*N);
+ if (threads == NULL)
+ {
+ return TMPI_ERR_NO_MEM;
+ }
+ ret = tMPI_Comm_alloc(&TMPI_COMM_WORLD, NULL, N);
+ if (ret != TMPI_SUCCESS)
+ {
+ return ret;
+ }
TMPI_GROUP_EMPTY = tMPI_Group_alloc();
if (tMPI_Thread_key_create(&id_key, NULL))
{
- tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_INIT);
+ return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_INIT);
}
for (i = 0; i < N; i++)
{
for (i = 1; i < N; i++) /* zero is the main thread */
{
- int ret;
ret = tMPI_Thread_create(&(threads[i].thread_id),
tMPI_Thread_starter,
(void*)&(threads[i]) );
{
tMPI_Thread_setaffinity_single(threads[i].thread_id, i);
}
- if (ret)
+ if (ret != TMPI_SUCCESS)
{
- tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_INIT);
+ return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_INIT);
}
}
/* the main thread also runs start_fn if we don't want
if (!main_returns)
{
tMPI_Thread_starter((void*)&(threads[0]));
+
}
else
{
- tMPI_Thread_init(&(threads[0]));
+ ret = tMPI_Thread_init(&(threads[0]));
+ if (ret != 0)
+ {
+ return ret;
+ }
}
}
+ return TMPI_SUCCESS;
}
int tMPI_Init(int *argc, char ***argv,
int (*start_function)(int, char**))
{
+ int ret;
#ifdef TMPI_TRACE
tMPI_Trace_print("tMPI_Init(%p, %p, %p)", argc, argv, start_function);
#endif
{
int N = 0;
tMPI_Get_N(argc, argv, "-nt", &N);
- tMPI_Start_threads(TRUE, N, TMPI_AFFINITY_ALL_CORES, argc, argv,
- NULL, NULL, start_function);
+ ret = tMPI_Start_threads(TRUE, N, TMPI_AFFINITY_ALL_CORES, argc, argv,
+ NULL, NULL, start_function) != 0;
+ if (ret != 0)
+ {
+ return ret;
+ }
}
else
{
tMPI_Affinity_strategy aff_strategy,
void (*start_function)(void*), void *arg)
{
+ int ret;
#ifdef TMPI_TRACE
tMPI_Trace_print("tMPI_Init_fn(%d, %p, %p)", N, start_function, arg);
#endif
if (TMPI_COMM_WORLD == 0 && N >= 1) /* we're the main process */
{
- tMPI_Start_threads(main_thread_returns, N, aff_strategy,
- 0, 0, start_function, arg, NULL);
+ ret = tMPI_Start_threads(main_thread_returns, N, aff_strategy,
+ 0, 0, start_function, arg, NULL);
+ if (ret != 0)
+ {
+ return ret;
+ }
}
return TMPI_SUCCESS;
}
int tMPI_Finalize(void)
{
int i;
+ int ret;
#ifdef TMPI_TRACE
tMPI_Trace_print("tMPI_Finalize()");
#endif
struct tmpi_thread *cur = tMPI_Get_current();
tMPI_Profile_stop( &(cur->profile) );
- tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
+ ret = tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
+ if (ret != 0)
+ {
+ return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
+ }
if (tMPI_Is_master())
{
}
}
#endif
- tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
+ ret = tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
+ if (ret != 0)
+ {
+ return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
+ }
+
+
if (tMPI_Is_master())
{
{
if (tMPI_Thread_join(threads[i].thread_id, NULL))
{
- tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_FINALIZE);
+ return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_FINALIZE);
}
tMPI_Thread_destroy(&(threads[i]));
}
{
tMPI_Comm cur;
- tMPI_Thread_mutex_lock(&(tmpi_global->comm_link_lock));
+ ret = tMPI_Thread_mutex_lock(&(tmpi_global->comm_link_lock));
+ if (ret != 0)
+ {
+ return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
+ }
cur = TMPI_COMM_WORLD->next;
while (cur && (cur != TMPI_COMM_WORLD) )
{
tMPI_Comm next = cur->next;
- tMPI_Comm_destroy(cur, FALSE);
+ ret = tMPI_Comm_destroy(cur, FALSE);
+ if (ret != 0)
+ {
+ tMPI_Thread_mutex_unlock(&(tmpi_global->comm_link_lock));
+ return ret;
+ }
cur = next;
}
- tMPI_Comm_destroy(TMPI_COMM_WORLD, FALSE);
- tMPI_Thread_mutex_unlock(&(tmpi_global->comm_link_lock));
+ ret = tMPI_Comm_destroy(TMPI_COMM_WORLD, FALSE);
+ if (ret != 0)
+ {
+ tMPI_Thread_mutex_unlock(&(tmpi_global->comm_link_lock));
+ return ret;
+ }
+ ret = tMPI_Thread_mutex_unlock(&(tmpi_global->comm_link_lock));
+ if (ret != 0)
+ {
+ return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
+ }
+
}
tMPI_Group_free(&TMPI_GROUP_EMPTY);
}
else
{
- fprintf(stderr, "tMPI_Abort called on main thread with errorcode=%d\n",
+ fprintf(stderr,
+ "tMPI_Abort called on main thread with errorcode=%d\n",
errorcode);
}
fflush(stderr);
#endif
}
-
-
-
-
-
-
int tMPI_Get_count(tMPI_Status *status, tMPI_Datatype datatype, int *count)
{
#ifdef TMPI_TRACE