2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
39 #ifdef HAVE_TMPI_CONFIG_H
40 #include "tmpi_config.h"
55 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
72 /* there are a few global variables that maintain information about the
73 running threads. Some are defined by the MPI standard: */
74 /* TMPI_COMM_WORLD is in tmpi_malloc.c due to technical reasons */
75 tMPI_Group TMPI_GROUP_EMPTY=NULL;
78 /* the threads themselves (tmpi_comm only contains lists of pointers to this
80 struct tmpi_thread *threads=NULL;
84 tMPI_Thread_key_t id_key; /* the key to get the thread id */
88 /* whether MPI has finalized (we need this to distinguish pre-inited from
89 post-finalized states */
90 static tmpi_bool tmpi_finalized=FALSE;
92 /* misc. global information about MPI */
93 struct tmpi_global *tmpi_global=NULL;
100 /* start N threads with argc, argv (used by tMPI_Init)*/
101 void tMPI_Start_threads(tmpi_bool main_returns, int N,
102 tMPI_Affinity_strategy aff_strategy,
103 int *argc, char ***argv,
104 void (*start_fn)(void*), void *start_arg,
105 int (*start_fn_main)(int, char**));
107 /* starter function for threads; takes a void pointer to a
108 struct tmpi_starter_, which calls main() if tmpi_start_.fn == NULL */
109 static void* tMPI_Thread_starter(void *arg);
111 /* allocate and initialize the data associated with a thread structure */
112 static void tMPI_Thread_init(struct tmpi_thread *th);
113 /* deallocate the data associated with a thread structure */
114 static void tMPI_Thread_destroy(struct tmpi_thread *th);
120 void tMPI_Trace_print(const char *fmt, ...)
123 struct tmpi_thread* th=NULL;
124 static tMPI_Thread_mutex_t mtx=TMPI_THREAD_MUTEX_INITIALIZER;
126 tMPI_Thread_mutex_lock(&mtx);
129 th=tMPI_Get_current();
130 printf("THREAD %02d: ", (int)(th-threads));
134 printf("THREAD main: ");
141 tMPI_Thread_mutex_unlock(&mtx);
147 struct tmpi_thread *tMPI_Get_current(void)
152 return (struct tmpi_thread*)tMPI_thread_getspecific(id_key);
156 unsigned int tMPI_Threadnr(struct tmpi_thread *thr)
162 unsigned int tMPI_This_threadnr(void)
164 return tMPI_Get_current()-threads;
167 struct tmpi_thread *tMPI_Get_thread(tMPI_Comm comm, int rank)
169 /* check destination */
170 if ( (rank < 0) || (rank > comm->grp.N) )
172 tMPI_Error(comm, TMPI_ERR_GROUP_RANK);
175 return comm->grp.peers[rank];
179 tmpi_bool tMPI_Is_master(void)
181 /* if there are no other threads, we're the main thread */
182 if ( (!TMPI_COMM_WORLD) || TMPI_COMM_WORLD->grp.N==0)
185 /* otherwise we know this through thread specific data: */
186 /* whether the thread pointer points to the head of the threads array */
187 return (tmpi_bool)(tMPI_Get_current() == threads);
190 tMPI_Comm tMPI_Get_comm_self(void)
192 struct tmpi_thread* th=tMPI_Get_current();
193 return th->self_comm;
197 int tMPI_Get_N(int *argc, char ***argv, const char *optname, int *nthreads)
200 int ret=TMPI_SUCCESS;
211 if (strcmp(optname, (*argv)[i]) == 0)
219 /* the number of processes is an argument */
221 *nthreads=strtol((*argv)[i+1], &end, 10);
222 if ( !end || (*end != 0) )
230 int nth=tMPI_Thread_get_hw_number();
232 if (nth<1) nth=1; /* make sure it's at least 1 */
239 static void tMPI_Thread_init(struct tmpi_thread *th)
241 int N_envelopes=(Nthreads+1)*N_EV_ALLOC;
242 int N_send_envelopes=N_EV_ALLOC;
243 int N_reqs=(Nthreads+1)*N_EV_ALLOC;
246 /* we set our thread id, as a thread-specific piece of global data. */
247 tMPI_Thread_setspecific(id_key, th);
249 /* allocate comm.self */
250 th->self_comm=tMPI_Comm_alloc(TMPI_COMM_WORLD, 1);
251 th->self_comm->grp.peers[0]=th;
253 /* allocate envelopes */
254 tMPI_Free_env_list_init( &(th->envelopes), N_envelopes );
256 tMPI_Recv_env_list_init( &(th->evr));
258 th->evs=(struct send_envelope_list*)tMPI_Malloc(
259 sizeof(struct send_envelope_list)*Nthreads);
260 for(i=0;i<Nthreads;i++)
262 tMPI_Send_env_list_init( &(th->evs[i]), N_send_envelopes);
265 tMPI_Atomic_set( &(th->ev_outgoing_received), 0);
267 tMPI_Event_init( &(th->p2p_event) );
269 /* allocate requests */
270 tMPI_Req_list_init(&(th->rql), N_reqs);
272 #ifdef USE_COLLECTIVE_COPY_BUFFER
273 /* allcate copy_buffer list */
274 tMPI_Copy_buffer_list_init(&(th->cbl_multi), (Nthreads+1)*(N_COLL_ENV+1),
275 Nthreads*COPY_BUFFER_SIZE);
279 tMPI_Profile_init(&(th->profile));
281 /* now wait for all other threads to come on line, before we
282 start the MPI program */
283 tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
287 static void tMPI_Thread_destroy(struct tmpi_thread *th)
291 tMPI_Recv_env_list_destroy( &(th->evr));
292 for(i=0;i<Nthreads;i++)
294 tMPI_Send_env_list_destroy( &(th->evs[i]));
297 tMPI_Free_env_list_destroy( &(th->envelopes) );
298 tMPI_Event_destroy( &(th->p2p_event) );
299 tMPI_Req_list_destroy( &(th->rql) );
301 #ifdef USE_COLLECTIVE_COPY_BUFFER
302 tMPI_Copy_buffer_list_destroy(&(th->cbl_multi));
305 for(i=0;i<th->argc;i++)
311 static void tMPI_Global_init(struct tmpi_global *g, int Nthreads)
315 g->Nalloc_usertypes=0;
316 tMPI_Thread_mutex_init(&(g->timer_mutex));
317 tMPI_Spinlock_init(&(g->datatype_lock));
319 tMPI_Thread_barrier_init( &(g->barrier), Nthreads);
321 tMPI_Thread_mutex_init(&(g->comm_link_lock));
323 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
324 /* the time at initialization. */
325 gettimeofday( &(g->timer_init), NULL);
327 /* the time at initialization. */
328 g->timer_init=GetTickCount();
333 static void tMPI_Global_destroy(struct tmpi_global *g)
335 tMPI_Thread_barrier_destroy(&(g->barrier));
336 tMPI_Thread_mutex_destroy(&(g->timer_mutex));
337 tMPI_Thread_mutex_destroy(&(g->comm_link_lock));
343 static void* tMPI_Thread_starter(void *arg)
345 struct tmpi_thread *th=(struct tmpi_thread*)arg;
348 tMPI_Trace_print("Created thread nr. %d", (int)(th-threads));
351 tMPI_Thread_init(th);
353 /* start_fn, start_arg, argc and argv were set by the calling function */
356 th->start_fn_main(th->argc, th->argv);
360 th->start_fn(th->start_arg);
369 void tMPI_Start_threads(tmpi_bool main_returns, int N,
370 tMPI_Affinity_strategy aff_strategy,
371 int *argc, char ***argv,
372 void (*start_fn)(void*), void *start_arg,
373 int (*start_fn_main)(int, char**))
376 tMPI_Trace_print("tMPI_Start_threads(%d, %d, %d, %d, %d, %p, %p, %p, %p)",
377 main_returns, N, aff_strategy, argc, argv, start_fn,
383 int set_affinity=FALSE;
385 tmpi_finalized=FALSE;
388 /* allocate global data */
389 tmpi_global=(struct tmpi_global*)
390 tMPI_Malloc(sizeof(struct tmpi_global));
391 tMPI_Global_init(tmpi_global, N);
393 /* allocate world and thread data */
394 threads=(struct tmpi_thread*)tMPI_Malloc(sizeof(struct tmpi_thread)*N);
395 TMPI_COMM_WORLD=tMPI_Comm_alloc(NULL, N);
396 TMPI_GROUP_EMPTY=tMPI_Group_alloc();
398 if (tMPI_Thread_key_create(&id_key, NULL))
400 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_INIT);
404 TMPI_COMM_WORLD->grp.peers[i]=&(threads[i]);
406 /* copy argc, argv */
410 threads[i].argc=*argc;
411 threads[i].argv=(char**)tMPI_Malloc(threads[i].argc*
413 for(j=0;j<threads[i].argc;j++)
415 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
416 threads[i].argv[j]=strdup( (*argv)[j] );
418 threads[i].argv[j]=_strdup( (*argv)[j] );
425 threads[i].argv=NULL;
427 threads[i].start_fn=start_fn;
428 threads[i].start_fn_main=start_fn_main;
429 threads[i].start_arg=start_arg;
432 /* now check whether to set affinity */
433 if (aff_strategy == TMPI_AFFINITY_ALL_CORES)
435 int nhw=tMPI_Thread_get_hw_number();
436 if ((nhw > 1) && (nhw == N))
442 /* set thread 0's properties */
443 threads[0].thread_id=tMPI_Thread_self();
446 /* set the main thread's affinity */
447 tMPI_Thread_setaffinity_single(threads[0].thread_id, 0);
450 for(i=1;i<N;i++) /* zero is the main thread */
453 ret=tMPI_Thread_create(&(threads[i].thread_id),
455 (void*)&(threads[i]) ) ;
459 tMPI_Thread_setaffinity_single(threads[i].thread_id, i);
463 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_INIT);
466 /* the main thread also runs start_fn if we don't want
469 tMPI_Thread_starter((void*)&(threads[0]));
471 tMPI_Thread_init(&(threads[0]));
476 int tMPI_Init(int *argc, char ***argv,
477 int (*start_function)(int, char**))
480 tMPI_Trace_print("tMPI_Init(%p, %p, %p)", argc, argv, start_function);
483 if (TMPI_COMM_WORLD==0) /* we're the main process */
486 tMPI_Get_N(argc, argv, "-nt", &N);
487 tMPI_Start_threads(TRUE, N, TMPI_AFFINITY_ALL_CORES, argc, argv,
488 NULL, NULL, start_function);
492 /* if we're a sub-thread we need don't need to do anyhing, because
493 everything has already been set up by either the main thread,
494 or the thread runner function.*/
502 int tMPI_Init_fn(int main_thread_returns, int N,
503 tMPI_Affinity_strategy aff_strategy,
504 void (*start_function)(void*), void *arg)
507 tMPI_Trace_print("tMPI_Init_fn(%d, %p, %p)", N, start_function, arg);
512 N=tMPI_Thread_get_hw_number();
513 if (N<1) N=1; /*because that's what the fn returns if it doesn't know*/
516 if (TMPI_COMM_WORLD==0 && N>=1) /* we're the main process */
518 tMPI_Start_threads(main_thread_returns, N, aff_strategy,
519 0, 0, start_function, arg, NULL);
524 int tMPI_Initialized(int *flag)
527 tMPI_Trace_print("tMPI_Initialized(%p)", flag);
530 *flag=(TMPI_COMM_WORLD && !tmpi_finalized);
535 int tMPI_Finalize(void)
539 tMPI_Trace_print("tMPI_Finalize()");
542 printf("%5d: tMPI_Finalize called\n", tMPI_This_threadnr());
548 struct tmpi_thread *cur=tMPI_Get_current();
550 tMPI_Profile_stop( &(cur->profile) );
551 tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
553 if (tMPI_Is_master())
555 tMPI_Profiles_summarize(Nthreads, threads);
559 tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
561 if (tMPI_Is_master())
564 /* we just wait for all threads to finish; the order isn't very
565 relevant, as all threads should arrive at their endpoints soon. */
566 for(i=1;i<Nthreads;i++)
568 if (tMPI_Thread_join(threads[i].thread_id, NULL))
570 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_FINALIZE);
572 tMPI_Thread_destroy(&(threads[i]));
574 /* at this point, we are the only thread left, so we can
575 destroy the global structures with impunity. */
576 tMPI_Thread_destroy(&(threads[0]));
579 tMPI_Thread_key_delete(id_key);
580 /* de-allocate all the comm stuctures. */
584 tMPI_Thread_mutex_lock(&(tmpi_global->comm_link_lock));
585 cur=TMPI_COMM_WORLD->next;
586 while(cur && (cur!=TMPI_COMM_WORLD) )
588 tMPI_Comm next=cur->next;
589 tMPI_Comm_destroy(cur, FALSE);
592 tMPI_Comm_destroy(TMPI_COMM_WORLD, FALSE);
593 tMPI_Thread_mutex_unlock(&(tmpi_global->comm_link_lock));
596 tMPI_Group_free(&TMPI_GROUP_EMPTY);
598 TMPI_COMM_WORLD=NULL;
599 TMPI_GROUP_EMPTY=NULL;
602 /* deallocate the 'global' structure */
603 tMPI_Global_destroy(tmpi_global);
616 int tMPI_Finalized(int *flag)
619 tMPI_Trace_print("tMPI_Finalized(%p)", flag);
621 *flag=tmpi_finalized;
628 int tMPI_Abort(tMPI_Comm comm, int errorcode)
631 tMPI_Trace_print("tMPI_Abort(%p, %d)", comm, errorcode);
634 /* we abort(). This way we can run a debugger on it */
635 fprintf(stderr, "tMPI_Abort called with error code %d",errorcode);
636 if (comm==TMPI_COMM_WORLD)
637 fprintf(stderr, " on TMPI_COMM_WORLD");
638 fprintf(stderr,"\n");
643 /* we just kill all threads, but not the main process */
645 if (tMPI_Is_master())
647 if (comm==TMPI_COMM_WORLD)
649 "tMPI_Abort called on TMPI_COMM_WORLD main with errorcode=%d\n",
652 fprintf(stderr, "tMPI_Abort called on main thread with errorcode=%d\n",
661 fprintf(stderr, "tMPI_Abort called with error code %d on thread %d\n",
662 errorcode, tMPI_This_threadnr());
664 ret=(int*)malloc(sizeof(int));
665 tMPI_Thread_exit(ret);
672 int tMPI_Get_processor_name(char *name, int *resultlen)
674 int nr=tMPI_Threadnr(tMPI_Get_current());
675 unsigned int digits=0;
676 const unsigned int base=10;
679 tMPI_Trace_print("tMPI_Get_processor_name(%p, %p)", name, resultlen);
681 /* we don't want to call sprintf here (it turns out to be not entirely
682 thread-safe on Mac OS X, for example), so we do it our own way: */
684 /* first determine number of digits */
695 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
696 strcpy(name, "thread #");
698 strncpy_s(name, TMPI_MAX_PROCESSOR_NAME, "thread #", TMPI_MAX_PROCESSOR_NAME);
700 /* now construct the number */
702 size_t len=strlen(name);
706 for(i=0;i<digits;i++)
708 size_t pos=len + (digits-i-1);
709 if (pos < (TMPI_MAX_PROCESSOR_NAME -1) )
710 name[ pos ]=(char)('0' + rest%base);
713 if ( (digits+len) < TMPI_MAX_PROCESSOR_NAME)
714 name[digits + len]='\0';
716 name[TMPI_MAX_PROCESSOR_NAME]='\0';
720 *resultlen=(int)strlen(name); /* For some reason the MPI standard
721 uses ints instead of size_ts for
730 /* TODO: there must be better ways to do this */
731 double tMPI_Wtime(void)
736 tMPI_Trace_print("tMPI_Wtime()");
739 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
745 gettimeofday(&tv, NULL);
746 secdiff = tv.tv_sec - tmpi_global->timer_init.tv_sec;
747 usecdiff = tv.tv_usec - tmpi_global->timer_init.tv_usec;
749 ret=(double)secdiff + 1e-6*usecdiff;
753 DWORD tv=GetTickCount();
755 /* the windows absolute time GetTickCount() wraps around in ~49 days,
756 so it's safer to always use differences, and assume that our
757 program doesn't run that long.. */
758 ret=1e-3*((unsigned int)(tv - tmpi_global->timer_init));
764 double tMPI_Wtick(void)
766 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
767 /* In Unix, we don't really know. Any modern OS should be at least
768 this precise, though */
771 /* According to the Windows documentation, this is about right: */
782 int tMPI_Get_count(tMPI_Status *status, tMPI_Datatype datatype, int *count)
785 tMPI_Trace_print("tMPI_Get_count(%p, %p, %p)", status, datatype, count);
789 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_STATUS);
791 *count = (int)(status->transferred/datatype->size);