2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009,2018,2021, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
39 #ifdef HAVE_TMPI_CONFIG_H
40 #include "tmpi_config.h"
55 #if !(defined( _WIN32 ) || defined( _WIN64 ) )
73 /* there are a few global variables that maintain information about the
74 running threads. Some are defined by the MPI standard: */
75 /* TMPI_COMM_WORLD is in tmpi_malloc.c due to technical reasons */
76 tMPI_Group TMPI_GROUP_EMPTY = NULL;
79 /* the threads themselves (tmpi_comm only contains lists of pointers to this
81 struct tmpi_thread *threads = NULL;
85 tMPI_Thread_key_t id_key; /* the key to get the thread id */
89 /* whether MPI has finalized (we need this to distinguish pre-inited from
90 post-finalized states */
91 static tmpi_bool tmpi_finalized = FALSE;
93 /* misc. global information about MPI */
94 struct tmpi_global *tmpi_global = NULL;
101 /* start N threads with argc, argv (used by tMPI_Init)*/
103 int tMPI_Start_threads(tmpi_bool main_returns, int N,
104 tMPI_Affinity_strategy aff_strategy,
105 int *argc, char ***argv,
106 void (*start_fn)(const void*), const void *start_arg,
107 int (*start_fn_main)(int, char**));
109 /* starter function for threads; takes a void pointer to a
110 struct tmpi_starter_, which calls main() if tmpi_start_.fn == NULL */
111 static void* tMPI_Thread_starter(void *arg);
113 /* allocate and initialize the data associated with a thread structure */
114 static int tMPI_Thread_init(struct tmpi_thread *th);
115 /* deallocate the data associated with a thread structure */
116 static void tMPI_Thread_destroy(struct tmpi_thread *th);
122 void tMPI_Trace_print(const char *fmt, ...)
125 struct tmpi_thread * th = NULL;
126 static tMPI_Thread_mutex_t mtx = TMPI_THREAD_MUTEX_INITIALIZER;
128 /* don't check for errors during trace */
129 tMPI_Thread_mutex_lock(&mtx);
132 th = tMPI_Get_current();
133 printf("THREAD %02d: ", (int)(th-threads));
137 printf("THREAD main: ");
144 tMPI_Thread_mutex_unlock(&mtx);
149 tmpi_bool tMPI_Is_master(void)
151 /* if there are no other threads, we're the main thread */
152 if ( (!TMPI_COMM_WORLD) || TMPI_COMM_WORLD->grp.N == 0)
157 /* otherwise we know this through thread specific data: */
158 /* whether the thread pointer points to the head of the threads array */
159 return (tmpi_bool)(tMPI_Get_current() == threads);
162 tMPI_Comm tMPI_Get_comm_self(void)
164 struct tmpi_thread* th = tMPI_Get_current();
165 return th->self_comm;
169 int tMPI_Get_N(int *argc, char ***argv, const char *optname, int *nthreads)
172 int ret = TMPI_SUCCESS;
181 for (i = 1; i < *argc; i++)
183 if (strcmp(optname, (*argv)[i]) == 0)
191 /* the number of processes is an argument */
193 *nthreads = strtol((*argv)[i+1], &end, 10);
194 if (!end || (*end != 0) )
202 int nth = tMPI_Thread_get_hw_number();
206 nth = 1; /* make sure it's at least 1 */
214 static int tMPI_Thread_init(struct tmpi_thread *th)
217 int N_envelopes = (Nthreads+1)*N_EV_ALLOC;
218 int N_send_envelopes = N_EV_ALLOC;
219 int N_reqs = (Nthreads+1)*N_EV_ALLOC;
222 /* we set our thread id, as a thread-specific piece of global data. */
223 ret = tMPI_Thread_setspecific(id_key, th);
229 /* allocate comm.self */
230 ret = tMPI_Comm_alloc( &(th->self_comm), TMPI_COMM_WORLD, 1);
231 if (ret != TMPI_SUCCESS)
235 th->self_comm->grp.peers[0] = th;
237 /* allocate envelopes */
238 ret = tMPI_Free_env_list_init( &(th->envelopes), N_envelopes );
239 if (ret != TMPI_SUCCESS)
244 ret = tMPI_Recv_env_list_init( &(th->evr));
245 if (ret != TMPI_SUCCESS)
250 th->evs = (struct send_envelope_list*)tMPI_Malloc(
251 sizeof(struct send_envelope_list)*Nthreads);
254 return TMPI_ERR_NO_MEM;
256 for (i = 0; i < Nthreads; i++)
258 ret = tMPI_Send_env_list_init( &(th->evs[i]), N_send_envelopes);
259 if (ret != TMPI_SUCCESS)
265 tMPI_Atomic_set( &(th->ev_outgoing_received), 0);
267 tMPI_Event_init( &(th->p2p_event) );
269 /* allocate requests */
270 ret = tMPI_Req_list_init(&(th->rql), N_reqs);
271 if (ret != TMPI_SUCCESS)
277 #ifdef USE_COLLECTIVE_COPY_BUFFER
278 /* allcate copy_buffer list */
279 ret = tMPI_Copy_buffer_list_init(&(th->cbl_multi),
280 (Nthreads+1)*(N_COLL_ENV+1),
281 Nthreads*COPY_BUFFER_SIZE);
282 if (ret != TMPI_SUCCESS)
289 ret = tMPI_Profile_init(&(th->profile));
290 if (ret != TMPI_SUCCESS)
295 /* now wait for all other threads to come on line, before we
296 start the MPI program */
297 ret = tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
306 static void tMPI_Thread_destroy(struct tmpi_thread *th)
310 tMPI_Recv_env_list_destroy( &(th->evr));
311 for (i = 0; i < Nthreads; i++)
313 tMPI_Send_env_list_destroy( &(th->evs[i]));
316 tMPI_Free_env_list_destroy( &(th->envelopes) );
317 tMPI_Event_destroy( &(th->p2p_event) );
318 tMPI_Req_list_destroy( &(th->rql) );
320 #ifdef USE_COLLECTIVE_COPY_BUFFER
321 tMPI_Copy_buffer_list_destroy(&(th->cbl_multi));
324 for (i = 0; i < th->argc; i++)
330 static int tMPI_Global_init(struct tmpi_global *g, int Nthreads)
336 g->Nalloc_usertypes = 0;
337 ret = tMPI_Thread_mutex_init(&(g->timer_mutex));
340 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
342 tMPI_Spinlock_init(&(g->datatype_lock));
344 ret = tMPI_Thread_barrier_init( &(g->barrier), Nthreads);
347 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
350 ret = tMPI_Thread_mutex_init(&(g->comm_link_lock));
353 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
357 #if !(defined( _WIN32 ) || defined( _WIN64 ) )
358 /* the time at initialization. */
359 gettimeofday( &(g->timer_init), NULL);
361 /* the time at initialization. */
362 g->timer_init = GetTickCount();
367 static void tMPI_Global_destroy(struct tmpi_global *g)
369 tMPI_Thread_barrier_destroy(&(g->barrier));
370 tMPI_Thread_mutex_destroy(&(g->timer_mutex));
371 tMPI_Thread_mutex_destroy(&(g->comm_link_lock));
372 for (int i = 0; i < g->N_usertypes; i++)
374 tMPI_Free(g->usertypes[i]->comps);
375 tMPI_Free(g->usertypes[i]);
377 tMPI_Free(g->usertypes);
383 static void* tMPI_Thread_starter(void *arg)
386 struct tmpi_thread *th = (struct tmpi_thread*)arg;
389 tMPI_Trace_print("Created thread nr. %d", (int)(th-threads));
392 ret = tMPI_Thread_init(th);
393 if (ret != TMPI_SUCCESS)
398 /* start_fn, start_arg, argc and argv were set by the calling function */
401 th->start_fn_main(th->argc, th->argv);
405 th->start_fn(th->start_arg);
416 int tMPI_Start_threads(tmpi_bool main_returns, int N,
417 tMPI_Affinity_strategy aff_strategy,
418 int *argc, char ***argv,
419 void (*start_fn)(const void*), const void *start_arg,
420 int (*start_fn_main)(int, char**))
424 tMPI_Trace_print("tMPI_Start_threads(%d, %d, %d, %d, %d, %p, %p, %p, %p)",
425 main_returns, N, aff_strategy, argc, argv, start_fn,
431 int set_affinity = FALSE;
433 tmpi_finalized = FALSE;
436 /* allocate global data */
437 tmpi_global = (struct tmpi_global*)
438 tMPI_Malloc(sizeof(struct tmpi_global));
439 if (tmpi_global == 0)
441 return TMPI_ERR_NO_MEM;
443 ret = tMPI_Global_init(tmpi_global, N);
444 if (ret != TMPI_SUCCESS)
449 /* allocate world and thread data */
450 threads = (struct tmpi_thread*)
451 tMPI_Malloc(sizeof(struct tmpi_thread)*N);
454 return TMPI_ERR_NO_MEM;
456 ret = tMPI_Comm_alloc(&TMPI_COMM_WORLD, NULL, N);
457 if (ret != TMPI_SUCCESS)
461 assert(TMPI_COMM_WORLD != nullptr);
462 TMPI_GROUP_EMPTY = tMPI_Group_alloc();
464 if (tMPI_Thread_key_create(&id_key, NULL))
466 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_INIT);
468 for (i = 0; i < N; i++)
470 TMPI_COMM_WORLD->grp.peers[i] = &(threads[i]);
472 /* copy argc, argv */
476 threads[i].argc = *argc;
477 threads[i].argv = (char**)tMPI_Malloc(threads[i].argc*
479 for (j = 0; j < threads[i].argc; j++)
481 #if !(defined( _WIN32 ) || defined( _WIN64 ) )
482 threads[i].argv[j] = strdup( (*argv)[j] );
484 threads[i].argv[j] = _strdup( (*argv)[j] );
491 threads[i].argv = NULL;
493 threads[i].start_fn = start_fn;
494 threads[i].start_fn_main = start_fn_main;
495 threads[i].start_arg = start_arg;
498 /* now check whether to set affinity */
499 if (aff_strategy == TMPI_AFFINITY_ALL_CORES)
501 int nhw = tMPI_Thread_get_hw_number();
502 if ((nhw > 1) && (nhw == N))
508 /* set thread 0's properties */
509 threads[0].thread_id = tMPI_Thread_self();
512 /* set the main thread's affinity */
513 tMPI_Thread_setaffinity_single(threads[0].thread_id, 0);
516 for (i = 1; i < N; i++) /* zero is the main thread */
518 ret = tMPI_Thread_create(&(threads[i].thread_id),
520 (void*)&(threads[i]) );
524 tMPI_Thread_setaffinity_single(threads[i].thread_id, i);
526 if (ret != TMPI_SUCCESS)
528 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_INIT);
531 /* the main thread also runs start_fn if we don't want
535 tMPI_Thread_starter((void*)&(threads[0]));
540 ret = tMPI_Thread_init(&(threads[0]));
551 int tMPI_Init(int *argc, char ***argv,
552 int (*start_function)(int, char**))
556 tMPI_Trace_print("tMPI_Init(%p, %p, %p)", argc, argv, start_function);
559 if (TMPI_COMM_WORLD == 0) /* we're the main process */
562 tMPI_Get_N(argc, argv, "-nt", &N);
563 ret = tMPI_Start_threads(TRUE, N, TMPI_AFFINITY_ALL_CORES, argc, argv,
564 NULL, NULL, start_function) != 0;
572 /* if we're a sub-thread we need don't need to do anyhing, because
573 everything has already been set up by either the main thread,
574 or the thread runner function.*/
582 int tMPI_Init_fn(int main_thread_returns, int N,
583 tMPI_Affinity_strategy aff_strategy,
584 void (*start_function)(const void*), const void *arg)
588 tMPI_Trace_print("tMPI_Init_fn(%d, %p, %p)", N, start_function, arg);
593 N = tMPI_Thread_get_hw_number();
596 N = 1; /*because that's what the fn returns if it doesn't know*/
600 if (TMPI_COMM_WORLD == 0 && N >= 1) /* we're the main process */
602 ret = tMPI_Start_threads(main_thread_returns, N, aff_strategy,
603 0, 0, start_function, arg, NULL);
612 int tMPI_Initialized(int *flag)
615 tMPI_Trace_print("tMPI_Initialized(%p)", flag);
618 *flag = (TMPI_COMM_WORLD && !tmpi_finalized);
623 int tMPI_Finalize(void)
628 tMPI_Trace_print("tMPI_Finalize()");
631 printf("%5d: tMPI_Finalize called\n", tMPI_This_threadnr());
637 struct tmpi_thread *cur = tMPI_Get_current();
639 tMPI_Profile_stop( &(cur->profile) );
640 ret = tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
643 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
646 if (tMPI_Is_master())
648 tMPI_Profiles_summarize(Nthreads, threads);
652 ret = tMPI_Thread_barrier_wait( &(tmpi_global->barrier) );
655 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
660 if (tMPI_Is_master())
663 /* we just wait for all threads to finish; the order isn't very
664 relevant, as all threads should arrive at their endpoints soon. */
665 for (i = 1; i < Nthreads; i++)
667 if (tMPI_Thread_join(threads[i].thread_id, NULL))
669 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_FINALIZE);
671 tMPI_Thread_destroy(&(threads[i]));
673 /* at this point, we are the only thread left, so we can
674 destroy the global structures with impunity. */
675 tMPI_Thread_destroy(&(threads[0]));
678 tMPI_Thread_key_delete(id_key);
679 /* de-allocate all the comm stuctures. */
683 ret = tMPI_Thread_mutex_lock(&(tmpi_global->comm_link_lock));
686 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
688 cur = TMPI_COMM_WORLD->next;
689 while (cur && (cur != TMPI_COMM_WORLD) )
691 tMPI_Comm next = cur->next;
692 ret = tMPI_Comm_destroy(cur, FALSE);
695 tMPI_Thread_mutex_unlock(&(tmpi_global->comm_link_lock));
700 ret = tMPI_Comm_destroy(TMPI_COMM_WORLD, FALSE);
703 tMPI_Thread_mutex_unlock(&(tmpi_global->comm_link_lock));
706 ret = tMPI_Thread_mutex_unlock(&(tmpi_global->comm_link_lock));
709 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
714 tMPI_Group_free(&TMPI_GROUP_EMPTY);
716 TMPI_COMM_WORLD = NULL;
717 TMPI_GROUP_EMPTY = NULL;
720 /* deallocate the 'global' structure */
721 tMPI_Global_destroy(tmpi_global);
724 tmpi_finalized = TRUE;
734 int tMPI_Finalized(int *flag)
737 tMPI_Trace_print("tMPI_Finalized(%p)", flag);
739 *flag = tmpi_finalized;
746 int tMPI_Abort(tMPI_Comm comm, int errorcode)
749 tMPI_Trace_print("tMPI_Abort(%p, %d)", comm, errorcode);
752 /* we abort(). This way we can run a debugger on it */
753 fprintf(stderr, "tMPI_Abort called with error code %d", errorcode);
754 if (comm == TMPI_COMM_WORLD)
756 fprintf(stderr, " on TMPI_COMM_WORLD");
758 fprintf(stderr, "\n");
763 /* we just kill all threads, but not the main process */
765 if (tMPI_Is_master())
767 if (comm == TMPI_COMM_WORLD)
770 "tMPI_Abort called on TMPI_COMM_WORLD main with errorcode=%d\n",
776 "tMPI_Abort called on main thread with errorcode=%d\n",
786 fprintf(stderr, "tMPI_Abort called with error code %d on thread %d\n",
787 errorcode, tMPI_This_threadnr());
789 ret = (int*)malloc(sizeof(int));
790 tMPI_Thread_exit(ret);
797 int tMPI_Get_processor_name(char *name, int *resultlen)
799 int nr = tMPI_Threadnr(tMPI_Get_current());
800 unsigned int digits = 0;
801 const unsigned int base = 10;
804 tMPI_Trace_print("tMPI_Get_processor_name(%p, %p)", name, resultlen);
806 /* we don't want to call sprintf here (it turns out to be not entirely
807 thread-safe on Mac OS X, for example), so we do it our own way: */
809 /* first determine number of digits */
823 strcpy(name, "thread #");
825 strncpy_s(name, TMPI_MAX_PROCESSOR_NAME, "thread #", TMPI_MAX_PROCESSOR_NAME);
827 /* now construct the number */
829 size_t len = strlen(name);
833 for (i = 0; i < digits; i++)
835 size_t pos = len + (digits-i-1);
836 if (pos < (TMPI_MAX_PROCESSOR_NAME -1) )
838 name[ pos ] = (char)('0' + rest%base);
842 if ( (digits+len) < TMPI_MAX_PROCESSOR_NAME)
844 name[digits + len] = '\0';
848 name[TMPI_MAX_PROCESSOR_NAME] = '\0';
854 *resultlen = (int)strlen(name); /* For some reason the MPI standard
855 uses ints instead of size_ts for
865 /* TODO: there must be better ways to do this */
866 double tMPI_Wtime(void)
871 tMPI_Trace_print("tMPI_Wtime()");
874 #if !(defined( _WIN32 ) || defined( _WIN64 ) )
880 gettimeofday(&tv, NULL);
881 secdiff = tv.tv_sec - tmpi_global->timer_init.tv_sec;
882 usecdiff = tv.tv_usec - tmpi_global->timer_init.tv_usec;
884 ret = (double)secdiff + 1e-6*usecdiff;
888 DWORD tv = GetTickCount();
890 /* the windows absolute time GetTickCount() wraps around in ~49 days,
891 so it's safer to always use differences, and assume that our
892 program doesn't run that long.. */
893 ret = 1e-3*((unsigned int)(tv - tmpi_global->timer_init));
899 double tMPI_Wtick(void)
901 #if !(defined( _WIN32 ) || defined( _WIN64 ) )
902 /* In Unix, we don't really know. Any modern OS should be at least
903 this precise, though */
906 /* According to the Windows documentation, this is about right: */
911 int tMPI_Get_count(tMPI_Status *status, tMPI_Datatype datatype, int *count)
914 tMPI_Trace_print("tMPI_Get_count(%p, %p, %p)", status, datatype, count);
918 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_STATUS);
920 *count = (int)(status->transferred/datatype->size);