2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
40 /* Include the defines that determine which thread library to use.
41 * We do not use HAVE_PTHREAD_H directly, since we might want to
42 * turn off thread support explicity (e.g. for debugging).
44 #ifdef HAVE_TMPI_CONFIG_H
45 #include "tmpi_config.h"
53 #ifdef THREAD_PTHREADS
55 #ifdef HAVE_PTHREAD_SETAFFINITY
59 /* pthread.h must be the first header, apart from the defines in config.h */
72 #include "thread_mpi/atomic.h"
73 #include "thread_mpi/threads.h"
79 /* mutex for initializing mutexes */
80 static pthread_mutex_t mutex_init = PTHREAD_MUTEX_INITIALIZER;
81 /* mutex for initializing barriers */
82 static pthread_mutex_t once_init = PTHREAD_MUTEX_INITIALIZER;
83 /* mutex for initializing thread_conds */
84 static pthread_mutex_t cond_init = PTHREAD_MUTEX_INITIALIZER;
85 /* mutex for initializing barriers */
86 static pthread_mutex_t barrier_init = PTHREAD_MUTEX_INITIALIZER;
88 /* mutex for managing thread IDs */
89 static pthread_mutex_t thread_id_mutex = PTHREAD_MUTEX_INITIALIZER;
90 static pthread_key_t thread_id_key;
91 static int thread_id_key_initialized = 0;
96 enum tMPI_Thread_support tMPI_Thread_support(void)
98 return TMPI_THREAD_SUPPORT_YES;
102 int tMPI_Thread_get_hw_number(void)
106 #if defined(_SC_NPROCESSORS_ONLN)
107 ret = sysconf(_SC_NPROCESSORS_ONLN);
108 #elif defined(_SC_NPROC_ONLN)
109 ret = sysconf(_SC_NPROC_ONLN);
110 #elif defined(_SC_NPROCESSORS_CONF)
111 ret = sysconf(_SC_NPROCESSORS_CONF);
112 #elif defined(_SC_NPROC_CONF)
113 ret = sysconf(_SC_NPROC_CONF);
120 /* destructor for thread ids */
121 static void tMPI_Destroy_thread_id(void* thread_id)
123 struct tMPI_Thread *thread = (struct tMPI_Thread*)thread_id;
124 if (!thread->started_by_tmpi)
126 /* if the thread is started by tMPI, it must be freed in the join()
133 /* Set the thread id var for this thread
134 Returns a pointer to the thread object if succesful, NULL if ENOMEM */
135 static struct tMPI_Thread* tMPI_Set_thread_id_key(int started_by_tmpi)
137 struct tMPI_Thread *th;
139 th = (struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
144 th->th = pthread_self();
145 th->started_by_tmpi = started_by_tmpi;
146 /* we ignore errors because any thread that needs this value will
147 re-generate it in the next iteration. */
148 pthread_setspecific(thread_id_key, th);
152 /* initialize the thread id vars if not already initialized */
153 static int tMPI_Init_thread_ids(void)
156 ret = pthread_mutex_lock( &thread_id_mutex );
162 if (!thread_id_key_initialized)
164 /* initialize and set the thread id thread-specific variable */
165 struct tMPI_Thread *th;
167 thread_id_key_initialized = 1;
168 ret = pthread_key_create(&thread_id_key, tMPI_Destroy_thread_id);
173 th = tMPI_Set_thread_id_key(0);
181 ret = pthread_mutex_unlock( &thread_id_mutex );
184 pthread_mutex_unlock( &thread_id_mutex );
188 /* structure to hold the arguments for the thread_starter function */
189 struct tMPI_Thread_starter
191 struct tMPI_Thread *thread;
192 void *(*start_routine)(void*);
194 pthread_mutex_t cond_lock; /* lock for initialization of thread
198 /* the thread_starter function that sets the thread id */
200 __attribute__((force_align_arg_pointer))
202 static void *tMPI_Thread_starter(void *arg)
204 struct tMPI_Thread_starter *starter = (struct tMPI_Thread_starter *)arg;
205 void *(*start_routine)(void*);
209 /* first wait for the parent thread to signal that the starter->thread
210 structure is ready. That's done by unlocking the starter->cond_lock */
211 ret = pthread_mutex_lock(&(starter->cond_lock));
216 ret = pthread_mutex_unlock(&(starter->cond_lock));
222 /* now remember the tMPI_thread_t structure for this thread */
223 ret = pthread_setspecific(thread_id_key, starter->thread);
228 start_routine = starter->start_routine;
231 /* deallocate the starter structure. Errors here are non-fatal. */
232 pthread_mutex_destroy(&(starter->cond_lock));
234 return (*start_routine)(parg);
237 int tMPI_Thread_create(tMPI_Thread_t *thread, void *(*start_routine)(void *),
241 struct tMPI_Thread_starter *starter;
247 tMPI_Init_thread_ids();
249 *thread = (struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
254 (*thread)->started_by_tmpi = 1;
255 starter = (struct tMPI_Thread_starter*)
256 malloc(sizeof(struct tMPI_Thread_starter)*1);
261 /* fill the starter structure */
262 starter->thread = *thread;
263 starter->start_routine = start_routine;
266 ret = pthread_mutex_init(&(starter->cond_lock), NULL);
271 /* now lock the mutex so we can unlock it once we know the data in
272 thread->th is safe. */
273 ret = pthread_mutex_lock(&(starter->cond_lock));
279 ret = pthread_create(&((*thread)->th), NULL, tMPI_Thread_starter,
286 /* Here we know thread->th is safe. */
287 ret = pthread_mutex_unlock(&(starter->cond_lock));
294 int tMPI_Thread_join(tMPI_Thread_t thread, void **value_ptr)
297 pthread_t th = thread->th;
299 ret = pthread_join( th, value_ptr );
309 tMPI_Thread_t tMPI_Thread_self(void)
314 /* make sure the key var is set */
315 ret = tMPI_Init_thread_ids();
321 th = pthread_getspecific(thread_id_key);
323 /* check if it is already in our list */
326 th = tMPI_Set_thread_id_key(0);
331 int tMPI_Thread_equal(tMPI_Thread_t t1, tMPI_Thread_t t2)
333 return pthread_equal(t1->th, t2->th);
337 enum tMPI_Thread_setaffinity_support tMPI_Thread_setaffinity_support(void)
339 #ifdef HAVE_PTHREAD_SETAFFINITY
343 /* run getaffinity to check whether we get back ENOSYS */
344 ret = pthread_getaffinity_np(pthread_self(), sizeof(set), &set);
347 return TMPI_SETAFFINITY_SUPPORT_YES;
351 return TMPI_SETAFFINITY_SUPPORT_NO;
354 return TMPI_SETAFFINITY_SUPPORT_NO;
359 /* set thread's own affinity to a processor number n */
360 int tMPI_Thread_setaffinity_single(tMPI_Thread_t tmpi_unused thread,
361 unsigned int tmpi_unused nr)
363 #ifdef HAVE_PTHREAD_SETAFFINITY
364 int nt = tMPI_Thread_get_hw_number();
369 return TMPI_ERR_PROCNR;
374 return pthread_setaffinity_np(thread->th, sizeof(set), &set);
382 int tMPI_Thread_mutex_init(tMPI_Thread_mutex_t *mtx)
391 mtx->mutex = (struct tMPI_Mutex*)malloc(sizeof(struct tMPI_Mutex)*1);
392 if (mtx->mutex == NULL)
396 ret = pthread_mutex_init(&(mtx->mutex->mtx), NULL);
402 #ifndef TMPI_NO_ATOMICS
403 tMPI_Atomic_set(&(mtx->initialized), 1);
405 mtx->initialized.value = 1;
410 static inline int tMPI_Thread_mutex_init_once(tMPI_Thread_mutex_t *mtx)
414 #ifndef TMPI_NO_ATOMICS
415 /* check whether the mutex is initialized */
416 if (tMPI_Atomic_get( &(mtx->initialized) ) == 0)
419 /* we're relying on the memory barrier semantics of mutex_lock/unlock
420 for the check preceding this function call to have worked */
421 ret = pthread_mutex_lock( &(mutex_init) );
427 if (mtx->mutex == NULL)
429 mtx->mutex = (struct tMPI_Mutex*)malloc(sizeof(struct tMPI_Mutex));
430 if (mtx->mutex == NULL)
435 ret = pthread_mutex_init( &(mtx->mutex->mtx), NULL);
441 ret = pthread_mutex_unlock( &(mutex_init) );
445 pthread_mutex_unlock( &(mutex_init) );
450 int tMPI_Thread_mutex_destroy(tMPI_Thread_mutex_t *mtx)
459 ret = pthread_mutex_destroy( &(mtx->mutex->mtx) );
470 int tMPI_Thread_mutex_lock(tMPI_Thread_mutex_t *mtx)
474 /* check whether the mutex is initialized */
475 ret = tMPI_Thread_mutex_init_once(mtx);
481 ret = pthread_mutex_lock(&(mtx->mutex->mtx));
488 int tMPI_Thread_mutex_trylock(tMPI_Thread_mutex_t *mtx)
492 /* check whether the mutex is initialized */
493 ret = tMPI_Thread_mutex_init_once(mtx);
499 ret = pthread_mutex_trylock(&(mtx->mutex->mtx));
505 int tMPI_Thread_mutex_unlock(tMPI_Thread_mutex_t *mtx)
509 /* check whether the mutex is initialized */
510 ret = tMPI_Thread_mutex_init_once(mtx);
516 ret = pthread_mutex_unlock(&(mtx->mutex->mtx));
522 int tMPI_Thread_key_create(tMPI_Thread_key_t *key, void (*destructor)(void *))
532 key->key = (struct tMPI_Thread_key*)malloc(sizeof(struct
534 if (key->key == NULL)
538 ret = pthread_key_create(&((key)->key->pkey), destructor);
544 tMPI_Atomic_set(&(key->initialized), 1);
549 int tMPI_Thread_key_delete(tMPI_Thread_key_t key)
553 ret = pthread_key_delete((key.key->pkey));
565 void * tMPI_Thread_getspecific(tMPI_Thread_key_t key)
569 p = pthread_getspecific((key.key->pkey));
575 int tMPI_Thread_setspecific(tMPI_Thread_key_t key, void *value)
579 ret = pthread_setspecific((key.key->pkey), value);
585 int tMPI_Thread_once(tMPI_Thread_once_t *once_control,
586 void (*init_routine)(void))
589 if (!once_control || !init_routine)
594 /* really ugly hack - and it's slow... */
595 ret = pthread_mutex_lock( &once_init );
600 if (tMPI_Atomic_get(&(once_control->once)) == 0)
603 tMPI_Atomic_set(&(once_control->once), 1);
605 ret = pthread_mutex_unlock( &once_init );
613 int tMPI_Thread_cond_init(tMPI_Thread_cond_t *cond)
622 cond->condp = (struct tMPI_Thread_cond*)malloc(
623 sizeof(struct tMPI_Thread_cond));
624 if (cond->condp == NULL)
629 ret = pthread_cond_init(&(cond->condp->cond), NULL);
634 tMPI_Atomic_set(&(cond->initialized), 1);
635 tMPI_Atomic_memory_barrier();
641 static int tMPI_Thread_cond_init_once(tMPI_Thread_cond_t *cond)
645 /* we're relying on the memory barrier semantics of mutex_lock/unlock
646 for the check preceding this function call to have worked */
647 ret = pthread_mutex_lock( &(cond_init) );
652 if (cond->condp == NULL)
654 cond->condp = (struct tMPI_Thread_cond*)
655 malloc(sizeof(struct tMPI_Thread_cond)*1);
656 if (cond->condp == NULL)
661 ret = pthread_cond_init( &(cond->condp->cond), NULL);
667 ret = pthread_mutex_unlock( &(cond_init) );
670 /* try to unlock anyway */
671 pthread_mutex_unlock( &(cond_init) );
677 int tMPI_Thread_cond_destroy(tMPI_Thread_cond_t *cond)
686 ret = pthread_cond_destroy(&(cond->condp->cond));
697 int tMPI_Thread_cond_wait(tMPI_Thread_cond_t *cond, tMPI_Thread_mutex_t *mtx)
701 /* check whether the condition is initialized */
702 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
704 ret = tMPI_Thread_cond_init_once(cond);
710 /* the mutex must have been initialized because it should be locked here */
712 ret = pthread_cond_wait( &(cond->condp->cond), &(mtx->mutex->mtx) );
720 int tMPI_Thread_cond_signal(tMPI_Thread_cond_t *cond)
724 /* check whether the condition is initialized */
725 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
727 ret = tMPI_Thread_cond_init_once(cond);
734 ret = pthread_cond_signal( &(cond->condp->cond) );
741 int tMPI_Thread_cond_broadcast(tMPI_Thread_cond_t *cond)
745 /* check whether the condition is initialized */
746 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
748 ret = tMPI_Thread_cond_init_once(cond);
755 ret = pthread_cond_broadcast( &(cond->condp->cond) );
763 void tMPI_Thread_exit(void *value_ptr)
765 pthread_exit(value_ptr);
769 int tMPI_Thread_cancel(tMPI_Thread_t thread)
771 #ifdef __native_client__
774 return pthread_cancel(thread->th);
780 int tMPI_Thread_barrier_init(tMPI_Thread_barrier_t *barrier, int n)
783 /*tMPI_Thread_pthread_barrier_t *p;*/
790 barrier->barrierp = (struct tMPI_Thread_barrier*)
791 malloc(sizeof(struct tMPI_Thread_barrier)*1);
792 if (barrier->barrierp == NULL)
797 ret = pthread_mutex_init(&(barrier->barrierp->mutex), NULL);
803 ret = pthread_cond_init(&(barrier->barrierp->cv), NULL);
809 barrier->threshold = n;
813 tMPI_Atomic_set(&(barrier->initialized), 1);
817 static int tMPI_Thread_barrier_init_once(tMPI_Thread_barrier_t *barrier)
821 /* we're relying on the memory barrier semantics of mutex_lock/unlock
822 for the check preceding this function call to have worked */
823 ret = pthread_mutex_lock( &(barrier_init) );
829 if (barrier->barrierp == NULL)
831 barrier->barrierp = (struct tMPI_Thread_barrier*)
832 malloc(sizeof(struct tMPI_Thread_barrier)*1);
833 if (barrier->barrierp == NULL)
839 ret = pthread_mutex_init(&(barrier->barrierp->mutex), NULL);
846 ret = pthread_cond_init(&(barrier->barrierp->cv), NULL);
853 ret = pthread_mutex_unlock( &(barrier_init) );
856 pthread_mutex_unlock( &(barrier_init) );
863 int tMPI_Thread_barrier_destroy(tMPI_Thread_barrier_t *barrier)
872 ret = pthread_mutex_destroy(&(barrier->barrierp->mutex));
877 ret = pthread_cond_destroy(&(barrier->barrierp->cv));
883 free(barrier->barrierp);
889 int tMPI_Thread_barrier_wait(tMPI_Thread_barrier_t * barrier)
894 /* check whether the barrier is initialized */
895 if (tMPI_Atomic_get( &(barrier->initialized) ) == 0)
897 tMPI_Thread_barrier_init_once(barrier);
901 ret = pthread_mutex_lock(&barrier->barrierp->mutex);
907 cycle = barrier->cycle;
909 /* Decrement the count atomically and check if it is zero.
910 * This will only be true for the last thread calling us.
912 if (--barrier->count <= 0)
914 barrier->cycle = !barrier->cycle;
915 barrier->count = barrier->threshold;
916 ret = pthread_cond_broadcast(&barrier->barrierp->cv);
925 while (cycle == barrier->cycle)
927 ret = pthread_cond_wait(&barrier->barrierp->cv,
928 &barrier->barrierp->mutex);
936 ret = pthread_mutex_unlock(&barrier->barrierp->mutex);
939 pthread_mutex_unlock(&barrier->barrierp->mutex);
946 /* just to have some symbols */
947 int tMPI_Thread_pthreads = 0;
949 #endif /* THREAD_PTHREADS */