2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
39 #ifdef HAVE_TMPI_CONFIG_H
40 #include "tmpi_config.h"
55 #if !(defined( _WIN32 ) || defined( _WIN64 ) )
72 /* there are a few global variables that maintain information about the
73 running threads. Some are defined by the MPI standard: */
74 /* TMPI_COMM_WORLD is in tmpi_malloc.c due to technical reasons */
75 tMPI_Group TMPI_GROUP_EMPTY = NULL;
78 /* the threads themselves (tmpi_comm only contains lists of pointers to this
80 struct tmpi_thread *threads = NULL;
84 tMPI_Thread_key_t id_key; /* the key to get the thread id */
88 /* whether MPI has finalized (we need this to distinguish pre-inited from
89 post-finalized states */
90 static tmpi_bool tmpi_finalized = FALSE;
92 /* misc. global information about MPI */
93 struct tmpi_global *tmpi_global = NULL;
100 /* start N threads with argc, argv (used by tMPI_Init)*/
101 int tMPI_Start_threads(tmpi_bool main_returns, int N,
102 tMPI_Affinity_strategy aff_strategy,
103 int *argc, char ***argv,
104 void (*start_fn)(void*), void *start_arg,
105 int (*start_fn_main)(int, char**));
107 /* starter function for threads; takes a void pointer to a
108 struct tmpi_starter_, which calls main() if tmpi_start_.fn == NULL */
109 static void* tMPI_Thread_starter(void *arg);
111 /* allocate and initialize the data associated with a thread structure */
112 static int tMPI_Thread_init(struct tmpi_thread *th);
113 /* deallocate the data associated with a thread structure */
114 static void tMPI_Thread_destroy(struct tmpi_thread *th);
120 void tMPI_Trace_print(const char *fmt, ...)
123 struct tmpi_thread * th = NULL;
124 static tMPI_Thread_mutex_t mtx = TMPI_THREAD_MUTEX_INITIALIZER;
126 /* don't check for errors during trace */
127 tMPI_Thread_mutex_lock(&mtx);
130 th = tMPI_Get_current();
131 printf("THREAD %02d: ", (int)(th-threads));
135 printf("THREAD main: ");
142 tMPI_Thread_mutex_unlock(&mtx);
147 tmpi_bool tMPI_Is_master(void)
149 /* if there are no other threads, we're the main thread */
150 if ( (!TMPI_COMM_WORLD) || TMPI_COMM_WORLD->grp.N == 0)
155 /* otherwise we know this through thread specific data: */
156 /* whether the thread pointer points to the head of the threads array */
157 return (tmpi_bool)(tMPI_Get_current() == threads);
160 tMPI_Comm tMPI_Get_comm_self(void)
162 struct tmpi_thread* th = tMPI_Get_current();
163 return th->self_comm;
167 int tMPI_Get_N(int *argc, char ***argv, const char *optname, int *nthreads)
170 int ret = TMPI_SUCCESS;
179 for (i = 1; i < *argc; i++)
181 if (strcmp(optname, (*argv)[i]) == 0)
189 /* the number of processes is an argument */
191 *nthreads = strtol((*argv)[i+1], &end, 10);
192 if (!end || (*end != 0) )
200 int nth = tMPI_Thread_get_hw_number();
204 nth = 1; /* make sure it's at least 1 */
212 static int tMPI_Thread_init(struct tmpi_thread *th)
215 int N_envelopes = (Nthreads+1)*N_EV_ALLOC;
216 int N_send_envelopes = N_EV_ALLOC;
217 int N_reqs = (Nthreads+1)*N_EV_ALLOC;
220 /* we set our thread id, as a thread-specific piece of global data. */
221 ret = tMPI_Thread_setspecific(id_key, th);
227 /* allocate comm.self */
228 ret = tMPI_Comm_alloc( &(th->self_comm), TMPI_COMM_WORLD, 1);
229 if (ret != TMPI_SUCCESS)
233 th->self_comm->grp.peers[0] = th;
235 /* allocate envelopes */
236 ret = tMPI_Free_env_list_init( &(th->envelopes), N_envelopes );
237 if (ret != TMPI_SUCCESS)
242 ret = tMPI_Recv_env_list_init( &(th->evr));
243 if (ret != TMPI_SUCCESS)
248 th->evs = (struct send_envelope_list*)tMPI_Malloc(
249 sizeof(struct send_envelope_list)*Nthreads);
252 return TMPI_ERR_NO_MEM;
254 for (i = 0; i < Nthreads; i++)
256 ret = tMPI_Send_env_list_init( &(th->evs[i]), N_send_envelopes);
257 if (ret != TMPI_SUCCESS)
263 tMPI_Atomic_set( &(th->ev_outgoing_received), 0);
265 tMPI_Event_init( &(th->p2p_event) );
267 /* allocate requests */
268 ret = tMPI_Req_list_init(&(th->rql), N_reqs);
269 if (ret != TMPI_SUCCESS)
275 #ifdef USE_COLLECTIVE_COPY_BUFFER
276 /* allcate copy_buffer list */
277 ret = tMPI_Copy_buffer_list_init(&(th->cbl_multi),
278 (Nthreads+1)*(N_COLL_ENV+1),
279 Nthreads*COPY_BUFFER_SIZE);
280 if (ret != TMPI_SUCCESS)
287 ret = tMPI_Profile_init(&(th->profile));
288 if (ret != TMPI_SUCCESS)
293 /* now wait for all other threads to come on line, before we
294 start the MPI program */
295 ret = tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
304 static void tMPI_Thread_destroy(struct tmpi_thread *th)
308 tMPI_Recv_env_list_destroy( &(th->evr));
309 for (i = 0; i < Nthreads; i++)
311 tMPI_Send_env_list_destroy( &(th->evs[i]));
314 tMPI_Free_env_list_destroy( &(th->envelopes) );
315 tMPI_Event_destroy( &(th->p2p_event) );
316 tMPI_Req_list_destroy( &(th->rql) );
318 #ifdef USE_COLLECTIVE_COPY_BUFFER
319 tMPI_Copy_buffer_list_destroy(&(th->cbl_multi));
322 for (i = 0; i < th->argc; i++)
328 static int tMPI_Global_init(struct tmpi_global *g, int Nthreads)
334 g->Nalloc_usertypes = 0;
335 ret = tMPI_Thread_mutex_init(&(g->timer_mutex));
338 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
340 tMPI_Spinlock_init(&(g->datatype_lock));
342 ret = tMPI_Thread_barrier_init( &(g->barrier), Nthreads);
345 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
348 ret = tMPI_Thread_mutex_init(&(g->comm_link_lock));
351 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
355 #if !(defined( _WIN32 ) || defined( _WIN64 ) )
356 /* the time at initialization. */
357 gettimeofday( &(g->timer_init), NULL);
359 /* the time at initialization. */
360 g->timer_init = GetTickCount();
365 static void tMPI_Global_destroy(struct tmpi_global *g)
367 tMPI_Thread_barrier_destroy(&(g->barrier));
368 tMPI_Thread_mutex_destroy(&(g->timer_mutex));
369 tMPI_Thread_mutex_destroy(&(g->comm_link_lock));
375 static void* tMPI_Thread_starter(void *arg)
378 struct tmpi_thread *th = (struct tmpi_thread*)arg;
381 tMPI_Trace_print("Created thread nr. %d", (int)(th-threads));
384 ret = tMPI_Thread_init(th);
385 if (ret != TMPI_SUCCESS)
390 /* start_fn, start_arg, argc and argv were set by the calling function */
393 th->start_fn_main(th->argc, th->argv);
397 th->start_fn(th->start_arg);
408 int tMPI_Start_threads(tmpi_bool main_returns, int N,
409 tMPI_Affinity_strategy aff_strategy,
410 int *argc, char ***argv,
411 void (*start_fn)(void*), void *start_arg,
412 int (*start_fn_main)(int, char**))
416 tMPI_Trace_print("tMPI_Start_threads(%d, %d, %d, %d, %d, %p, %p, %p, %p)",
417 main_returns, N, aff_strategy, argc, argv, start_fn,
423 int set_affinity = FALSE;
425 tmpi_finalized = FALSE;
428 /* allocate global data */
429 tmpi_global = (struct tmpi_global*)
430 tMPI_Malloc(sizeof(struct tmpi_global));
431 if (tmpi_global == 0)
433 return TMPI_ERR_NO_MEM;
435 ret = tMPI_Global_init(tmpi_global, N);
436 if (ret != TMPI_SUCCESS)
441 /* allocate world and thread data */
442 threads = (struct tmpi_thread*)
443 tMPI_Malloc(sizeof(struct tmpi_thread)*N);
446 return TMPI_ERR_NO_MEM;
448 ret = tMPI_Comm_alloc(&TMPI_COMM_WORLD, NULL, N);
449 if (ret != TMPI_SUCCESS)
453 TMPI_GROUP_EMPTY = tMPI_Group_alloc();
455 if (tMPI_Thread_key_create(&id_key, NULL))
457 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_INIT);
459 for (i = 0; i < N; i++)
461 TMPI_COMM_WORLD->grp.peers[i] = &(threads[i]);
463 /* copy argc, argv */
467 threads[i].argc = *argc;
468 threads[i].argv = (char**)tMPI_Malloc(threads[i].argc*
470 for (j = 0; j < threads[i].argc; j++)
472 #if !(defined( _WIN32 ) || defined( _WIN64 ) )
473 threads[i].argv[j] = strdup( (*argv)[j] );
475 threads[i].argv[j] = _strdup( (*argv)[j] );
482 threads[i].argv = NULL;
484 threads[i].start_fn = start_fn;
485 threads[i].start_fn_main = start_fn_main;
486 threads[i].start_arg = start_arg;
489 /* now check whether to set affinity */
490 if (aff_strategy == TMPI_AFFINITY_ALL_CORES)
492 int nhw = tMPI_Thread_get_hw_number();
493 if ((nhw > 1) && (nhw == N))
499 /* set thread 0's properties */
500 threads[0].thread_id = tMPI_Thread_self();
503 /* set the main thread's affinity */
504 tMPI_Thread_setaffinity_single(threads[0].thread_id, 0);
507 for (i = 1; i < N; i++) /* zero is the main thread */
509 ret = tMPI_Thread_create(&(threads[i].thread_id),
511 (void*)&(threads[i]) );
515 tMPI_Thread_setaffinity_single(threads[i].thread_id, i);
517 if (ret != TMPI_SUCCESS)
519 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_INIT);
522 /* the main thread also runs start_fn if we don't want
526 tMPI_Thread_starter((void*)&(threads[0]));
531 ret = tMPI_Thread_init(&(threads[0]));
542 int tMPI_Init(int *argc, char ***argv,
543 int (*start_function)(int, char**))
547 tMPI_Trace_print("tMPI_Init(%p, %p, %p)", argc, argv, start_function);
550 if (TMPI_COMM_WORLD == 0) /* we're the main process */
553 tMPI_Get_N(argc, argv, "-nt", &N);
554 ret = tMPI_Start_threads(TRUE, N, TMPI_AFFINITY_ALL_CORES, argc, argv,
555 NULL, NULL, start_function) != 0;
563 /* if we're a sub-thread we need don't need to do anyhing, because
564 everything has already been set up by either the main thread,
565 or the thread runner function.*/
573 int tMPI_Init_fn(int main_thread_returns, int N,
574 tMPI_Affinity_strategy aff_strategy,
575 void (*start_function)(void*), void *arg)
579 tMPI_Trace_print("tMPI_Init_fn(%d, %p, %p)", N, start_function, arg);
584 N = tMPI_Thread_get_hw_number();
587 N = 1; /*because that's what the fn returns if it doesn't know*/
591 if (TMPI_COMM_WORLD == 0 && N >= 1) /* we're the main process */
593 ret = tMPI_Start_threads(main_thread_returns, N, aff_strategy,
594 0, 0, start_function, arg, NULL);
603 int tMPI_Initialized(int *flag)
606 tMPI_Trace_print("tMPI_Initialized(%p)", flag);
609 *flag = (TMPI_COMM_WORLD && !tmpi_finalized);
614 int tMPI_Finalize(void)
619 tMPI_Trace_print("tMPI_Finalize()");
622 printf("%5d: tMPI_Finalize called\n", tMPI_This_threadnr());
628 struct tmpi_thread *cur = tMPI_Get_current();
630 tMPI_Profile_stop( &(cur->profile) );
631 ret = tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
634 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
637 if (tMPI_Is_master())
639 tMPI_Profiles_summarize(Nthreads, threads);
643 ret = tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
646 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
651 if (tMPI_Is_master())
654 /* we just wait for all threads to finish; the order isn't very
655 relevant, as all threads should arrive at their endpoints soon. */
656 for (i = 1; i < Nthreads; i++)
658 if (tMPI_Thread_join(threads[i].thread_id, NULL))
660 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_FINALIZE);
662 tMPI_Thread_destroy(&(threads[i]));
664 /* at this point, we are the only thread left, so we can
665 destroy the global structures with impunity. */
666 tMPI_Thread_destroy(&(threads[0]));
669 tMPI_Thread_key_delete(id_key);
670 /* de-allocate all the comm stuctures. */
674 ret = tMPI_Thread_mutex_lock(&(tmpi_global->comm_link_lock));
677 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
679 cur = TMPI_COMM_WORLD->next;
680 while (cur && (cur != TMPI_COMM_WORLD) )
682 tMPI_Comm next = cur->next;
683 ret = tMPI_Comm_destroy(cur, FALSE);
686 tMPI_Thread_mutex_unlock(&(tmpi_global->comm_link_lock));
691 ret = tMPI_Comm_destroy(TMPI_COMM_WORLD, FALSE);
694 tMPI_Thread_mutex_unlock(&(tmpi_global->comm_link_lock));
697 ret = tMPI_Thread_mutex_unlock(&(tmpi_global->comm_link_lock));
700 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
705 tMPI_Group_free(&TMPI_GROUP_EMPTY);
707 TMPI_COMM_WORLD = NULL;
708 TMPI_GROUP_EMPTY = NULL;
711 /* deallocate the 'global' structure */
712 tMPI_Global_destroy(tmpi_global);
715 tmpi_finalized = TRUE;
725 int tMPI_Finalized(int *flag)
728 tMPI_Trace_print("tMPI_Finalized(%p)", flag);
730 *flag = tmpi_finalized;
737 int tMPI_Abort(tMPI_Comm comm, int errorcode)
740 tMPI_Trace_print("tMPI_Abort(%p, %d)", comm, errorcode);
743 /* we abort(). This way we can run a debugger on it */
744 fprintf(stderr, "tMPI_Abort called with error code %d", errorcode);
745 if (comm == TMPI_COMM_WORLD)
747 fprintf(stderr, " on TMPI_COMM_WORLD");
749 fprintf(stderr, "\n");
754 /* we just kill all threads, but not the main process */
756 if (tMPI_Is_master())
758 if (comm == TMPI_COMM_WORLD)
761 "tMPI_Abort called on TMPI_COMM_WORLD main with errorcode=%d\n",
767 "tMPI_Abort called on main thread with errorcode=%d\n",
777 fprintf(stderr, "tMPI_Abort called with error code %d on thread %d\n",
778 errorcode, tMPI_This_threadnr());
780 ret = (int*)malloc(sizeof(int));
781 tMPI_Thread_exit(ret);
788 int tMPI_Get_processor_name(char *name, int *resultlen)
790 int nr = tMPI_Threadnr(tMPI_Get_current());
791 unsigned int digits = 0;
792 const unsigned int base = 10;
795 tMPI_Trace_print("tMPI_Get_processor_name(%p, %p)", name, resultlen);
797 /* we don't want to call sprintf here (it turns out to be not entirely
798 thread-safe on Mac OS X, for example), so we do it our own way: */
800 /* first determine number of digits */
814 strcpy(name, "thread #");
816 strncpy_s(name, TMPI_MAX_PROCESSOR_NAME, "thread #", TMPI_MAX_PROCESSOR_NAME);
818 /* now construct the number */
820 size_t len = strlen(name);
824 for (i = 0; i < digits; i++)
826 size_t pos = len + (digits-i-1);
827 if (pos < (TMPI_MAX_PROCESSOR_NAME -1) )
829 name[ pos ] = (char)('0' + rest%base);
833 if ( (digits+len) < TMPI_MAX_PROCESSOR_NAME)
835 name[digits + len] = '\0';
839 name[TMPI_MAX_PROCESSOR_NAME] = '\0';
845 *resultlen = (int)strlen(name); /* For some reason the MPI standard
846 uses ints instead of size_ts for
856 /* TODO: there must be better ways to do this */
857 double tMPI_Wtime(void)
862 tMPI_Trace_print("tMPI_Wtime()");
865 #if !(defined( _WIN32 ) || defined( _WIN64 ) )
871 gettimeofday(&tv, NULL);
872 secdiff = tv.tv_sec - tmpi_global->timer_init.tv_sec;
873 usecdiff = tv.tv_usec - tmpi_global->timer_init.tv_usec;
875 ret = (double)secdiff + 1e-6*usecdiff;
879 DWORD tv = GetTickCount();
881 /* the windows absolute time GetTickCount() wraps around in ~49 days,
882 so it's safer to always use differences, and assume that our
883 program doesn't run that long.. */
884 ret = 1e-3*((unsigned int)(tv - tmpi_global->timer_init));
890 double tMPI_Wtick(void)
892 #if !(defined( _WIN32 ) || defined( _WIN64 ) )
893 /* In Unix, we don't really know. Any modern OS should be at least
894 this precise, though */
897 /* According to the Windows documentation, this is about right: */
902 int tMPI_Get_count(tMPI_Status *status, tMPI_Datatype datatype, int *count)
905 tMPI_Trace_print("tMPI_Get_count(%p, %p, %p)", status, datatype, count);
909 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_STATUS);
911 *count = (int)(status->transferred/datatype->size);