2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
40 /* Include the defines that determine which thread library to use.
41 * We do not use HAVE_PTHREAD_H directly, since we might want to
42 * turn off thread support explicity (e.g. for debugging).
44 #ifdef HAVE_TMPI_CONFIG_H
45 #include "tmpi_config.h"
53 #ifdef THREAD_PTHREADS
55 #ifdef HAVE_PTHREAD_SETAFFINITY
59 /* pthread.h must be the first header, apart from the defines in config.h */
73 #include "thread_mpi/atomic.h"
74 #include "thread_mpi/threads.h"
81 /* mutex for initializing mutexes */
82 static pthread_mutex_t mutex_init = PTHREAD_MUTEX_INITIALIZER;
83 /* mutex for initializing barriers */
84 static pthread_mutex_t once_init = PTHREAD_MUTEX_INITIALIZER;
85 /* mutex for initializing thread_conds */
86 static pthread_mutex_t cond_init = PTHREAD_MUTEX_INITIALIZER;
87 /* mutex for initializing barriers */
88 static pthread_mutex_t barrier_init = PTHREAD_MUTEX_INITIALIZER;
90 /* mutex for managing thread IDs */
91 static pthread_mutex_t thread_id_mutex = PTHREAD_MUTEX_INITIALIZER;
92 static pthread_key_t thread_id_key;
93 static int thread_id_key_initialized = 0;
98 enum tMPI_Thread_support tMPI_Thread_support(void)
100 return TMPI_THREAD_SUPPORT_YES;
104 int tMPI_Thread_get_hw_number(void)
108 #if defined(_SC_NPROCESSORS_ONLN)
109 ret = sysconf(_SC_NPROCESSORS_ONLN);
110 #elif defined(_SC_NPROC_ONLN)
111 ret = sysconf(_SC_NPROC_ONLN);
112 #elif defined(_SC_NPROCESSORS_CONF)
113 ret = sysconf(_SC_NPROCESSORS_CONF);
114 #elif defined(_SC_NPROC_CONF)
115 ret = sysconf(_SC_NPROC_CONF);
122 /* destructor for thread ids */
123 static void tMPI_Destroy_thread_id(void* thread_id)
125 struct tMPI_Thread *thread = (struct tMPI_Thread*)thread_id;
126 if (!thread->started_by_tmpi)
128 /* if the thread is started by tMPI, it must be freed in the join()
134 /* initialize the thread id vars if not already initialized */
135 static int tMPI_Init_thread_ids(void)
138 ret = pthread_mutex_lock( &thread_id_mutex );
144 if (!thread_id_key_initialized)
146 /* initialize and set the thread id thread-specific variable */
147 struct tMPI_Thread *main_thread;
149 thread_id_key_initialized = 1;
150 ret = pthread_key_create(&thread_id_key, tMPI_Destroy_thread_id);
155 main_thread = (struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
156 if (main_thread == NULL)
161 main_thread->th = pthread_self();
162 main_thread->started_by_tmpi = 0;
163 ret = pthread_setspecific(thread_id_key, main_thread);
170 ret = pthread_mutex_unlock( &thread_id_mutex );
173 pthread_mutex_unlock( &thread_id_mutex );
177 /* structure to hold the arguments for the thread_starter function */
178 struct tMPI_Thread_starter
180 struct tMPI_Thread *thread;
181 void *(*start_routine)(void*);
185 /* the thread_starter function that sets the thread id */
186 static void *tMPI_Thread_starter(void *arg)
188 struct tMPI_Thread_starter *starter = (struct tMPI_Thread_starter *)arg;
189 void *(*start_routine)(void*);
193 ret = pthread_setspecific(thread_id_key, starter->thread);
198 start_routine = starter->start_routine;
202 return (*start_routine)(parg);
205 int tMPI_Thread_create(tMPI_Thread_t *thread, void *(*start_routine)(void *),
209 struct tMPI_Thread_starter *starter;
215 tMPI_Init_thread_ids();
217 *thread = (struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
222 (*thread)->started_by_tmpi = 1;
223 starter = (struct tMPI_Thread_starter*)
224 malloc(sizeof(struct tMPI_Thread_starter)*1);
229 /* fill the starter structure */
230 starter->thread = *thread;
231 starter->start_routine = start_routine;
234 ret = pthread_create(&((*thread)->th), NULL, tMPI_Thread_starter,
242 int tMPI_Thread_join(tMPI_Thread_t thread, void **value_ptr)
245 pthread_t th = thread->th;
247 ret = pthread_join( th, value_ptr );
257 tMPI_Thread_t tMPI_Thread_self(void)
262 /* make sure the key var is set */
263 ret = tMPI_Init_thread_ids();
269 th = pthread_getspecific(thread_id_key);
271 /* check if it is already in our list */
274 /* if not, create an ID, set it and return it */
275 th = (struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
280 th->th = pthread_self();
281 th->started_by_tmpi = 0;
282 /* we ignore errors here because they're not important -
283 the next iteration will do the same thing. */
284 pthread_setspecific(thread_id_key, th);
289 int tMPI_Thread_equal(tMPI_Thread_t t1, tMPI_Thread_t t2)
291 return pthread_equal(t1->th, t2->th);
295 enum tMPI_Thread_setaffinity_support tMPI_Thread_setaffinity_support(void)
297 #ifdef HAVE_PTHREAD_SETAFFINITY
301 /* run getaffinity to check whether we get back ENOSYS */
302 ret = pthread_getaffinity_np(pthread_self(), sizeof(set), &set);
305 return TMPI_SETAFFINITY_SUPPORT_YES;
309 return TMPI_SETAFFINITY_SUPPORT_NO;
312 return TMPI_SETAFFINITY_SUPPORT_NO;
317 /* set thread's own affinity to a processor number n */
318 int tMPI_Thread_setaffinity_single(tMPI_Thread_t thread, unsigned int nr)
320 #ifdef HAVE_PTHREAD_SETAFFINITY
321 int nt = tMPI_Thread_get_hw_number();
326 return TMPI_ERR_PROCNR;
331 return pthread_setaffinity_np(thread->th, sizeof(set), &set);
339 int tMPI_Thread_mutex_init(tMPI_Thread_mutex_t *mtx)
348 mtx->mutex = (struct tMPI_Mutex*)malloc(sizeof(struct tMPI_Mutex)*1);
349 if (mtx->mutex == NULL)
353 ret = pthread_mutex_init(&(mtx->mutex->mtx), NULL);
359 #ifndef TMPI_NO_ATOMICS
360 tMPI_Atomic_set(&(mtx->initialized), 1);
362 mtx->initialized.value = 1;
367 static inline int tMPI_Thread_mutex_init_once(tMPI_Thread_mutex_t *mtx)
371 #ifndef TMPI_NO_ATOMICS
372 /* check whether the mutex is initialized */
373 if (tMPI_Atomic_get( &(mtx->initialized) ) == 0)
376 /* we're relying on the memory barrier semantics of mutex_lock/unlock
377 for the check preceding this function call to have worked */
378 ret = pthread_mutex_lock( &(mutex_init) );
384 if (mtx->mutex == NULL)
386 mtx->mutex = (struct tMPI_Mutex*)malloc(sizeof(struct tMPI_Mutex));
387 if (mtx->mutex == NULL)
392 ret = pthread_mutex_init( &(mtx->mutex->mtx), NULL);
399 ret = pthread_mutex_unlock( &(mutex_init) );
402 pthread_mutex_unlock( &(mutex_init) );
407 int tMPI_Thread_mutex_destroy(tMPI_Thread_mutex_t *mtx)
416 ret = pthread_mutex_destroy( &(mtx->mutex->mtx) );
427 int tMPI_Thread_mutex_lock(tMPI_Thread_mutex_t *mtx)
431 /* check whether the mutex is initialized */
432 ret = tMPI_Thread_mutex_init_once(mtx);
438 ret = pthread_mutex_lock(&(mtx->mutex->mtx));
445 int tMPI_Thread_mutex_trylock(tMPI_Thread_mutex_t *mtx)
449 /* check whether the mutex is initialized */
450 ret = tMPI_Thread_mutex_init_once(mtx);
456 ret = pthread_mutex_trylock(&(mtx->mutex->mtx));
462 int tMPI_Thread_mutex_unlock(tMPI_Thread_mutex_t *mtx)
466 /* check whether the mutex is initialized */
467 ret = tMPI_Thread_mutex_init_once(mtx);
473 ret = pthread_mutex_unlock(&(mtx->mutex->mtx));
479 int tMPI_Thread_key_create(tMPI_Thread_key_t *key, void (*destructor)(void *))
489 key->key = (struct tMPI_Thread_key*)malloc(sizeof(struct
491 if (key->key == NULL)
495 ret = pthread_key_create(&((key)->key->pkey), destructor);
501 tMPI_Atomic_set(&(key->initialized), 1);
506 int tMPI_Thread_key_delete(tMPI_Thread_key_t key)
510 ret = pthread_key_delete((key.key->pkey));
522 void * tMPI_Thread_getspecific(tMPI_Thread_key_t key)
526 p = pthread_getspecific((key.key->pkey));
532 int tMPI_Thread_setspecific(tMPI_Thread_key_t key, void *value)
536 ret = pthread_setspecific((key.key->pkey), value);
542 int tMPI_Thread_once(tMPI_Thread_once_t *once_control,
543 void (*init_routine)(void))
546 if (!once_control || !init_routine)
551 /* really ugly hack - and it's slow... */
552 ret = pthread_mutex_lock( &once_init );
557 if (tMPI_Atomic_get(&(once_control->once)) == 0)
560 tMPI_Atomic_set(&(once_control->once), 1);
562 ret = pthread_mutex_unlock( &once_init );
570 int tMPI_Thread_cond_init(tMPI_Thread_cond_t *cond)
579 cond->condp = (struct tMPI_Thread_cond*)malloc(
580 sizeof(struct tMPI_Thread_cond));
581 if (cond->condp == NULL)
586 ret = pthread_cond_init(&(cond->condp->cond), NULL);
591 tMPI_Atomic_set(&(cond->initialized), 1);
592 tMPI_Atomic_memory_barrier();
598 static int tMPI_Thread_cond_init_once(tMPI_Thread_cond_t *cond)
602 /* we're relying on the memory barrier semantics of mutex_lock/unlock
603 for the check preceding this function call to have worked */
604 ret = pthread_mutex_lock( &(cond_init) );
609 if (cond->condp == NULL)
611 cond->condp = (struct tMPI_Thread_cond*)
612 malloc(sizeof(struct tMPI_Thread_cond)*1);
613 if (cond->condp == NULL)
618 ret = pthread_cond_init( &(cond->condp->cond), NULL);
624 ret = pthread_mutex_unlock( &(cond_init) );
627 /* try to unlock anyway */
628 pthread_mutex_unlock( &(cond_init) );
634 int tMPI_Thread_cond_destroy(tMPI_Thread_cond_t *cond)
643 ret = pthread_cond_destroy(&(cond->condp->cond));
654 int tMPI_Thread_cond_wait(tMPI_Thread_cond_t *cond, tMPI_Thread_mutex_t *mtx)
658 /* check whether the condition is initialized */
659 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
661 ret = tMPI_Thread_cond_init_once(cond);
667 /* the mutex must have been initialized because it should be locked here */
669 ret = pthread_cond_wait( &(cond->condp->cond), &(mtx->mutex->mtx) );
677 int tMPI_Thread_cond_signal(tMPI_Thread_cond_t *cond)
681 /* check whether the condition is initialized */
682 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
684 ret = tMPI_Thread_cond_init_once(cond);
691 ret = pthread_cond_signal( &(cond->condp->cond) );
698 int tMPI_Thread_cond_broadcast(tMPI_Thread_cond_t *cond)
702 /* check whether the condition is initialized */
703 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
705 ret = tMPI_Thread_cond_init_once(cond);
712 ret = pthread_cond_broadcast( &(cond->condp->cond) );
720 void tMPI_Thread_exit(void *value_ptr)
722 pthread_exit(value_ptr);
726 int tMPI_Thread_cancel(tMPI_Thread_t thread)
728 return pthread_cancel(thread->th);
734 int tMPI_Thread_barrier_init(tMPI_Thread_barrier_t *barrier, int n)
737 /*tMPI_Thread_pthread_barrier_t *p;*/
744 barrier->barrierp = (struct tMPI_Thread_barrier*)
745 malloc(sizeof(struct tMPI_Thread_barrier)*1);
746 if (barrier->barrierp == NULL)
751 ret = pthread_mutex_init(&(barrier->barrierp->mutex), NULL);
757 ret = pthread_cond_init(&(barrier->barrierp->cv), NULL);
763 barrier->threshold = n;
767 tMPI_Atomic_set(&(barrier->initialized), 1);
771 static int tMPI_Thread_barrier_init_once(tMPI_Thread_barrier_t *barrier)
775 /* we're relying on the memory barrier semantics of mutex_lock/unlock
776 for the check preceding this function call to have worked */
777 ret = pthread_mutex_lock( &(barrier_init) );
783 if (barrier->barrierp == NULL)
785 barrier->barrierp = (struct tMPI_Thread_barrier*)
786 malloc(sizeof(struct tMPI_Thread_barrier)*1);
787 if (barrier->barrierp == NULL)
793 ret = pthread_mutex_init(&(barrier->barrierp->mutex), NULL);
800 ret = pthread_cond_init(&(barrier->barrierp->cv), NULL);
807 ret = pthread_mutex_unlock( &(barrier_init) );
810 pthread_mutex_unlock( &(barrier_init) );
817 int tMPI_Thread_barrier_destroy(tMPI_Thread_barrier_t *barrier)
826 ret = pthread_mutex_destroy(&(barrier->barrierp->mutex));
831 ret = pthread_cond_destroy(&(barrier->barrierp->cv));
837 free(barrier->barrierp);
843 int tMPI_Thread_barrier_wait(tMPI_Thread_barrier_t * barrier)
848 /* check whether the barrier is initialized */
849 if (tMPI_Atomic_get( &(barrier->initialized) ) == 0)
851 tMPI_Thread_barrier_init_once(barrier);
855 ret = pthread_mutex_lock(&barrier->barrierp->mutex);
861 cycle = barrier->cycle;
863 /* Decrement the count atomically and check if it is zero.
864 * This will only be true for the last thread calling us.
866 if (--barrier->count <= 0)
868 barrier->cycle = !barrier->cycle;
869 barrier->count = barrier->threshold;
870 ret = pthread_cond_broadcast(&barrier->barrierp->cv);
879 while (cycle == barrier->cycle)
881 ret = pthread_cond_wait(&barrier->barrierp->cv,
882 &barrier->barrierp->mutex);
890 ret = pthread_mutex_unlock(&barrier->barrierp->mutex);
893 pthread_mutex_unlock(&barrier->barrierp->mutex);
900 /* just to have some symbols */
901 int tMPI_Thread_pthreads = 0;
903 #endif /* THREAD_PTHREADS */