2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
39 #ifdef HAVE_TMPI_CONFIG_H
40 #include "tmpi_config.h"
55 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
72 /* there are a few global variables that maintain information about the
73 running threads. Some are defined by the MPI standard: */
74 tMPI_Comm TMPI_COMM_WORLD=NULL;
75 tMPI_Group tMPI_GROUP_EMPTY=NULL;
78 /* the threads themselves (tmpi_comm only contains lists of pointers to this
80 struct tmpi_thread *threads=NULL;
84 tMPI_Thread_key_t id_key; /* the key to get the thread id */
88 /* whether MPI has finalized (we need this to distinguish pre-inited from
89 post-finalized states */
90 static gmx_bool tmpi_finalized=FALSE;
92 /* misc. global information about MPI */
93 struct tmpi_global *tmpi_global=NULL;
102 /* start N threads with argc, argv (used by tMPI_Init)*/
103 void tMPI_Start_threads(gmx_bool main_returns, int N, int *argc, char ***argv,
104 void (*start_fn)(void*), void *start_arg,
105 int (*start_fn_main)(int, char**));
107 /* starter function for threads; takes a void pointer to a
108 struct tmpi_starter_, which calls main() if tmpi_start_.fn == NULL */
109 static void* tMPI_Thread_starter(void *arg);
111 /* allocate and initialize the data associated with a thread structure */
112 static void tMPI_Thread_init(struct tmpi_thread *th);
113 /* deallocate the data associated with a thread structure */
114 static void tMPI_Thread_destroy(struct tmpi_thread *th);
120 void tMPI_Trace_print(const char *fmt, ...)
123 struct tmpi_thread* th=tMPI_Get_current();
124 static tMPI_Thread_mutex_t mtx=TMPI_THREAD_MUTEX_INITIALIZER;
126 tMPI_Thread_mutex_lock(&mtx);
128 printf("THREAD %02d: ", (int)(th-threads));
130 printf("THREAD main: ");
136 tMPI_Thread_mutex_unlock(&mtx);
141 void *tMPI_Malloc(size_t size)
143 void *ret=(void*)malloc(size);
147 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_MALLOC);
152 void *tMPI_Realloc(void *p, size_t size)
154 void *ret=(void*)realloc(p, size);
157 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_MALLOC);
164 struct tmpi_thread *tMPI_Get_current(void)
169 return (struct tmpi_thread*)tMPI_thread_getspecific(id_key);
173 unsigned int tMPI_Threadnr(struct tmpi_thread *thr)
179 unsigned int tMPI_This_threadnr(void)
181 return tMPI_Get_current()-threads;
184 struct tmpi_thread *tMPI_Get_thread(tMPI_Comm comm, int rank)
186 /* check destination */
187 if ( (rank < 0) || (rank > comm->grp.N) )
189 tMPI_Error(comm, TMPI_ERR_GROUP_RANK);
192 return comm->grp.peers[rank];
196 gmx_bool tMPI_Is_master(void)
198 /* if there are no other threads, we're the main thread */
199 if ( (!TMPI_COMM_WORLD) || TMPI_COMM_WORLD->grp.N==0)
202 /* otherwise we know this through thread specific data: */
203 /* whether the thread pointer points to the head of the threads array */
204 return (gmx_bool)(tMPI_Get_current() == threads);
207 tMPI_Comm tMPI_Get_comm_self(void)
209 struct tmpi_thread* th=tMPI_Get_current();
210 return th->self_comm;
214 int tMPI_Get_N(int *argc, char ***argv, const char *optname, int *nthreads)
217 int ret=TMPI_SUCCESS;
228 if (strcmp(optname, (*argv)[i]) == 0)
236 /* the number of processes is an argument */
238 *nthreads=strtol((*argv)[i+1], &end, 10);
239 if ( !end || (*end != 0) )
247 *nthreads=tMPI_Get_recommended_nthreads();
253 static void tMPI_Thread_init(struct tmpi_thread *th)
255 int N_envelopes=(Nthreads+1)*N_EV_ALLOC;
256 int N_send_envelopes=N_EV_ALLOC;
257 int N_reqs=(Nthreads+1)*N_EV_ALLOC;
260 /* we set our thread id, as a thread-specific piece of global data. */
261 tMPI_Thread_setspecific(id_key, th);
263 /* allocate comm.self */
264 th->self_comm=tMPI_Comm_alloc(TMPI_COMM_WORLD, 1);
265 th->self_comm->grp.peers[0]=th;
267 /* allocate envelopes */
268 tMPI_Free_env_list_init( &(th->envelopes), N_envelopes );
270 tMPI_Recv_env_list_init( &(th->evr));
272 th->evs=(struct send_envelope_list*)tMPI_Malloc(
273 sizeof(struct send_envelope_list)*Nthreads);
274 for(i=0;i<Nthreads;i++)
276 tMPI_Send_env_list_init( &(th->evs[i]), N_send_envelopes);
279 tMPI_Atomic_set( &(th->ev_outgoing_received), 0);
281 tMPI_Event_init( &(th->p2p_event) );
283 /* allocate requests */
284 tMPI_Req_list_init(&(th->rql), N_reqs);
286 #ifdef USE_COLLECTIVE_COPY_BUFFER
287 /* allcate copy_buffer list */
288 tMPI_Copy_buffer_list_init(&(th->cbl_multi), (Nthreads+1)*(N_COLL_ENV+1),
289 Nthreads*COPY_BUFFER_SIZE);
293 tMPI_Profile_init(&(th->profile));
295 /* now wait for all other threads to come on line, before we
296 start the MPI program */
297 tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
301 static void tMPI_Thread_destroy(struct tmpi_thread *th)
305 tMPI_Recv_env_list_destroy( &(th->evr));
306 for(i=0;i<Nthreads;i++)
308 tMPI_Send_env_list_destroy( &(th->evs[i]));
311 tMPI_Free_env_list_destroy( &(th->envelopes) );
312 tMPI_Event_destroy( &(th->p2p_event) );
313 tMPI_Req_list_destroy( &(th->rql) );
315 #ifdef USE_COLLECTIVE_COPY_BUFFER
316 tMPI_Copy_buffer_list_destroy(&(th->cbl_multi));
319 for(i=0;i<th->argc;i++)
325 static void tMPI_Global_init(struct tmpi_global *g, int Nthreads)
329 g->Nalloc_usertypes=0;
330 tMPI_Thread_mutex_init(&(g->timer_mutex));
331 tMPI_Spinlock_init(&(g->datatype_lock));
333 tMPI_Thread_barrier_init( &(g->barrier), Nthreads);
335 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
336 /* the time at initialization. */
337 gettimeofday( &(g->timer_init), NULL);
339 /* the time at initialization. */
340 g->timer_init=GetTickCount();
345 static void tMPI_Global_destroy(struct tmpi_global *g)
347 tMPI_Thread_mutex_destroy(&(g->timer_mutex));
353 static void* tMPI_Thread_starter(void *arg)
355 struct tmpi_thread *th=(struct tmpi_thread*)arg;
358 tMPI_Trace_print("Created thread nr. %d", (int)(th-threads));
361 tMPI_Thread_init(th);
363 /* start_fn, start_arg, argc and argv were set by the calling function */
366 th->start_fn_main(th->argc, th->argv);
370 th->start_fn(th->start_arg);
379 void tMPI_Start_threads(gmx_bool main_returns, int N, int *argc, char ***argv,
380 void (*start_fn)(void*), void *start_arg,
381 int (*start_fn_main)(int, char**))
384 tMPI_Trace_print("tMPI_Start_threads(%d, %p, %p, %p, %p)", N, argc,
385 argv, start_fn, start_arg);
391 tmpi_finalized=FALSE;
394 /* allocate global data */
395 tmpi_global=(struct tmpi_global*)
396 tMPI_Malloc(sizeof(struct tmpi_global));
397 tMPI_Global_init(tmpi_global, N);
399 /* allocate world and thread data */
400 threads=(struct tmpi_thread*)tMPI_Malloc(sizeof(struct tmpi_thread)*N);
401 TMPI_COMM_WORLD=tMPI_Comm_alloc(NULL, N);
402 tMPI_GROUP_EMPTY=tMPI_Group_alloc();
404 if (tMPI_Thread_key_create(&id_key, NULL))
406 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_INIT);
410 TMPI_COMM_WORLD->grp.peers[i]=&(threads[i]);
412 /* copy argc, argv */
416 threads[i].argc=*argc;
417 threads[i].argv=(char**)tMPI_Malloc(threads[i].argc*
419 for(j=0;j<threads[i].argc;j++)
421 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
422 threads[i].argv[j]=strdup( (*argv)[j] );
424 threads[i].argv[j]=_strdup( (*argv)[j] );
431 threads[i].argv=NULL;
433 threads[i].start_fn=start_fn;
434 threads[i].start_fn_main=start_fn_main;
435 threads[i].start_arg=start_arg;
437 for(i=1;i<N;i++) /* zero is the main thread */
439 if (tMPI_Thread_create(&(threads[i].thread_id),
441 (void*)&(threads[i]) ) )
443 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_INIT);
446 /* the main thread now also runs start_fn if we don't want
449 tMPI_Thread_starter((void*)&(threads[0]));
451 tMPI_Thread_init(&(threads[0]));
456 int tMPI_Init(int *argc, char ***argv, int (*start_function)(int, char**))
459 tMPI_Trace_print("tMPI_Init(%p, %p, %p)", argc, argv, start_function);
463 if (TMPI_COMM_WORLD==0) /* we're the main process */
466 tMPI_Get_N(argc, argv, "-nt", &N);
467 tMPI_Start_threads(FALSE, N, argc, argv, NULL, NULL, start_function);
471 /* if we're a sub-thread we need don't need to do anyhing, because
472 everything has already been set up by either the main thread,
473 or the thread runner function.*/
478 int tMPI_Init_fn(int main_thread_returns, int N,
479 void (*start_function)(void*), void *arg)
482 tMPI_Trace_print("tMPI_Init_fn(%d, %p, %p)", N, start_function, arg);
487 N=tMPI_Get_recommended_nthreads();
490 if (TMPI_COMM_WORLD==0 && N>=1) /* we're the main process */
492 tMPI_Start_threads(main_thread_returns, N, 0, 0, start_function, arg,
498 int tMPI_Initialized(int *flag)
501 tMPI_Trace_print("tMPI_Initialized(%p)", flag);
504 *flag=(TMPI_COMM_WORLD && !tmpi_finalized);
509 int tMPI_Finalize(void)
513 tMPI_Trace_print("tMPI_Finalize()");
516 printf("%5d: tMPI_Finalize called\n", tMPI_This_threadnr());
522 struct tmpi_thread *cur=tMPI_Get_current();
524 tMPI_Profile_stop( &(cur->profile) );
525 tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
527 if (tMPI_Is_master())
529 tMPI_Profiles_summarize(Nthreads, threads);
533 tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
535 if (tMPI_Is_master())
538 /* we just wait for all threads to finish; the order isn't very
539 relevant, as all threads should arrive at their endpoints soon. */
540 for(i=1;i<Nthreads;i++)
542 if (tMPI_Thread_join(threads[i].thread_id, NULL))
544 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_FINALIZE);
546 tMPI_Thread_destroy(&(threads[i]));
548 /* at this point, we are the only thread left, so we can
549 destroy the global structures with impunity. */
550 tMPI_Thread_destroy(&(threads[0]));
553 tMPI_Thread_key_delete(id_key);
554 /* de-allocate all the comm stuctures. */
556 tMPI_Comm cur=TMPI_COMM_WORLD->next;
557 while(cur && (cur!=TMPI_COMM_WORLD) )
559 tMPI_Comm next=cur->next;
560 tMPI_Comm_destroy(cur);
563 tMPI_Comm_destroy(TMPI_COMM_WORLD);
566 tMPI_Group_free(&tMPI_GROUP_EMPTY);
568 TMPI_COMM_WORLD=NULL;
569 tMPI_GROUP_EMPTY=NULL;
572 /* deallocate the 'global' structure */
573 tMPI_Global_destroy(tmpi_global);
586 int tMPI_Finalized(int *flag)
589 tMPI_Trace_print("tMPI_Finalized(%p)", flag);
591 *flag=tmpi_finalized;
598 int tMPI_Abort(tMPI_Comm comm, int errorcode)
601 tMPI_Trace_print("tMPI_Abort(%p, %d)", comm, errorcode);
604 /* we abort(). This way we can run a debugger on it */
605 fprintf(stderr, "tMPI_Abort called with error code %d",errorcode);
606 if (comm==TMPI_COMM_WORLD)
607 fprintf(stderr, " on TMPI_COMM_WORLD");
608 fprintf(stderr,"\n");
613 /* we just kill all threads, but not the main process */
615 if (tMPI_Is_master())
617 if (comm==TMPI_COMM_WORLD)
619 "tMPI_Abort called on TMPI_COMM_WORLD main with errorcode=%d\n",
622 fprintf(stderr, "tMPI_Abort called on main thread with errorcode=%d\n",
631 fprintf(stderr, "tMPI_Abort called with error code %d on thread %d\n",
632 errorcode, tMPI_This_threadnr());
634 ret=(int*)malloc(sizeof(int));
635 tMPI_Thread_exit(ret);
642 int tMPI_Get_processor_name(char *name, int *resultlen)
644 int nr=tMPI_Threadnr(tMPI_Get_current());
645 unsigned int digits=0;
646 const unsigned int base=10;
649 tMPI_Trace_print("tMPI_Get_processor_name(%p, %p)", name, resultlen);
651 /* we don't want to call sprintf here (it turns out to be not entirely
652 thread-safe on Mac OS X, for example), so we do it our own way: */
654 /* first determine number of digits */
665 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
666 strcpy(name, "thread #");
668 strncpy_s(name, TMPI_MAX_PROCESSOR_NAME, "thread #", TMPI_MAX_PROCESSOR_NAME);
670 /* now construct the number */
672 size_t len=strlen(name);
676 for(i=0;i<digits;i++)
678 size_t pos=len + (digits-i-1);
679 if (pos < (TMPI_MAX_PROCESSOR_NAME -1) )
680 name[ pos ]=(char)('0' + rest%base);
683 if ( (digits+len) < TMPI_MAX_PROCESSOR_NAME)
684 name[digits + len]='\0';
686 name[TMPI_MAX_PROCESSOR_NAME]='\0';
690 *resultlen=(int)strlen(name); /* For some reason the MPI standard
691 uses ints instead of size_ts for
700 /* TODO: there must be better ways to do this */
701 double tMPI_Wtime(void)
706 tMPI_Trace_print("tMPI_Wtime()");
709 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
715 gettimeofday(&tv, NULL);
716 secdiff = tv.tv_sec - tmpi_global->timer_init.tv_sec;
717 usecdiff = tv.tv_usec - tmpi_global->timer_init.tv_usec;
719 ret=(double)secdiff + 1e-6*usecdiff;
723 DWORD tv=GetTickCount();
725 /* the windows absolute time GetTickCount() wraps around in ~49 days,
726 so it's safer to always use differences, and assume that our
727 program doesn't run that long.. */
728 ret=1e-3*((unsigned int)(tv - tmpi_global->timer_init));
734 double tMPI_Wtick(void)
736 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
737 /* In Unix, we don't really know. Any modern OS should be at least
738 this precise, though */
741 /* According to the Windows documentation, this is about right: */
752 int tMPI_Get_count(tMPI_Status *status, tMPI_Datatype datatype, int *count)
755 tMPI_Trace_print("tMPI_Get_count(%p, %p, %p)", status, datatype, count);
759 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_STATUS);
761 *count = (int)(status->transferred/datatype->size);