2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
40 /* Include the defines that determine which thread library to use.
41 * We do not use HAVE_PTHREAD_H directly, since we might want to
42 * turn off thread support explicity (e.g. for debugging).
44 #ifdef HAVE_TMPI_CONFIG_H
45 #include "tmpi_config.h"
53 #ifdef THREAD_PTHREADS
55 #ifdef HAVE_PTHREAD_SETAFFINITY
59 /* pthread.h must be the first header, apart from the defines in config.h */
72 #include "thread_mpi/atomic.h"
73 #include "thread_mpi/threads.h"
79 /* mutex for initializing mutexes */
80 static pthread_mutex_t mutex_init = PTHREAD_MUTEX_INITIALIZER;
81 /* mutex for initializing barriers */
82 static pthread_mutex_t once_init = PTHREAD_MUTEX_INITIALIZER;
83 /* mutex for initializing thread_conds */
84 static pthread_mutex_t cond_init = PTHREAD_MUTEX_INITIALIZER;
85 /* mutex for initializing barriers */
86 static pthread_mutex_t barrier_init = PTHREAD_MUTEX_INITIALIZER;
88 /* mutex for managing thread IDs */
89 static pthread_mutex_t thread_id_mutex = PTHREAD_MUTEX_INITIALIZER;
90 static pthread_key_t thread_id_key;
91 static int thread_id_key_initialized = 0;
96 enum tMPI_Thread_support tMPI_Thread_support(void)
98 return TMPI_THREAD_SUPPORT_YES;
102 int tMPI_Thread_get_hw_number(void)
106 #if defined(_SC_NPROCESSORS_ONLN)
107 ret = sysconf(_SC_NPROCESSORS_ONLN);
108 #elif defined(_SC_NPROC_ONLN)
109 ret = sysconf(_SC_NPROC_ONLN);
110 #elif defined(_SC_NPROCESSORS_CONF)
111 ret = sysconf(_SC_NPROCESSORS_CONF);
112 #elif defined(_SC_NPROC_CONF)
113 ret = sysconf(_SC_NPROC_CONF);
120 /* destructor for thread ids */
121 static void tMPI_Destroy_thread_id(void* thread_id)
123 struct tMPI_Thread *thread = (struct tMPI_Thread*)thread_id;
124 if (!thread->started_by_tmpi)
126 /* if the thread is started by tMPI, it must be freed in the join()
133 /* Set the thread id var for this thread
134 Returns a pointer to the thread object if succesful, NULL if ENOMEM */
135 static struct tMPI_Thread* tMPI_Set_thread_id_key(int started_by_tmpi)
137 struct tMPI_Thread *th;
139 th = (struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
144 th->th = pthread_self();
145 th->started_by_tmpi = started_by_tmpi;
146 /* we ignore errors because any thread that needs this value will
147 re-generate it in the next iteration. */
148 pthread_setspecific(thread_id_key, th);
152 /* initialize the thread id vars if not already initialized */
153 static int tMPI_Init_thread_ids(void)
156 ret = pthread_mutex_lock( &thread_id_mutex );
162 if (!thread_id_key_initialized)
164 /* initialize and set the thread id thread-specific variable */
165 struct tMPI_Thread *th;
167 thread_id_key_initialized = 1;
168 ret = pthread_key_create(&thread_id_key, tMPI_Destroy_thread_id);
173 th = tMPI_Set_thread_id_key(0);
181 ret = pthread_mutex_unlock( &thread_id_mutex );
184 pthread_mutex_unlock( &thread_id_mutex );
188 /* structure to hold the arguments for the thread_starter function */
189 struct tMPI_Thread_starter
191 struct tMPI_Thread *thread;
192 void *(*start_routine)(void*);
194 pthread_mutex_t cond_lock; /* lock for initialization of thread
198 /* the thread_starter function that sets the thread id */
200 __attribute__((force_align_arg_pointer))
202 static void *tMPI_Thread_starter(void *arg)
204 struct tMPI_Thread_starter *starter = (struct tMPI_Thread_starter *)arg;
205 void *(*start_routine)(void*);
209 /* first wait for the parent thread to signal that the starter->thread
210 structure is ready. That's done by unlocking the starter->cond_lock */
211 ret = pthread_mutex_lock(&(starter->cond_lock));
216 ret = pthread_mutex_unlock(&(starter->cond_lock));
222 /* now remember the tMPI_thread_t structure for this thread */
223 ret = pthread_setspecific(thread_id_key, starter->thread);
228 start_routine = starter->start_routine;
231 /* deallocate the starter structure. Errors here are non-fatal. */
232 pthread_mutex_destroy(&(starter->cond_lock));
234 return (*start_routine)(parg);
237 int tMPI_Thread_create(tMPI_Thread_t *thread, void *(*start_routine)(void *),
241 struct tMPI_Thread_starter *starter;
247 tMPI_Init_thread_ids();
249 *thread = (struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
254 (*thread)->started_by_tmpi = 1;
255 starter = (struct tMPI_Thread_starter*)
256 malloc(sizeof(struct tMPI_Thread_starter)*1);
261 /* fill the starter structure */
262 starter->thread = *thread;
263 starter->start_routine = start_routine;
266 ret = pthread_mutex_init(&(starter->cond_lock), NULL);
271 /* now lock the mutex so we can unlock it once we know the data in
272 thread->th is safe. */
273 ret = pthread_mutex_lock(&(starter->cond_lock));
279 ret = pthread_create(&((*thread)->th), NULL, tMPI_Thread_starter,
286 /* Here we know thread->th is safe. */
287 ret = pthread_mutex_unlock(&(starter->cond_lock));
294 int tMPI_Thread_join(tMPI_Thread_t thread, void **value_ptr)
297 pthread_t th = thread->th;
299 ret = pthread_join( th, value_ptr );
309 tMPI_Thread_t tMPI_Thread_self(void)
314 /* make sure the key var is set */
315 ret = tMPI_Init_thread_ids();
321 th = pthread_getspecific(thread_id_key);
323 /* check if it is already in our list */
326 th = tMPI_Set_thread_id_key(0);
331 int tMPI_Thread_equal(tMPI_Thread_t t1, tMPI_Thread_t t2)
333 return pthread_equal(t1->th, t2->th);
337 enum tMPI_Thread_setaffinity_support tMPI_Thread_setaffinity_support(void)
339 #ifdef HAVE_PTHREAD_SETAFFINITY
343 /* run getaffinity to check whether we get back ENOSYS */
344 ret = pthread_getaffinity_np(pthread_self(), sizeof(set), &set);
347 return TMPI_SETAFFINITY_SUPPORT_YES;
351 return TMPI_SETAFFINITY_SUPPORT_NO;
354 return TMPI_SETAFFINITY_SUPPORT_NO;
359 /* set thread's own affinity to a processor number n */
360 int tMPI_Thread_setaffinity_single(tMPI_Thread_t tmpi_unused thread,
361 unsigned int tmpi_unused nr)
363 #ifdef HAVE_PTHREAD_SETAFFINITY
364 int nt = tMPI_Thread_get_hw_number();
369 return TMPI_ERR_PROCNR;
374 return pthread_setaffinity_np(thread->th, sizeof(set), &set);
383 int tMPI_Thread_mutex_init(tMPI_Thread_mutex_t *mtx)
392 mtx->mutex = (struct tMPI_Mutex*)malloc(sizeof(struct tMPI_Mutex)*1);
393 if (mtx->mutex == NULL)
397 ret = pthread_mutex_init(&(mtx->mutex->mtx), NULL);
403 #ifndef TMPI_NO_ATOMICS
404 tMPI_Atomic_set(&(mtx->initialized), 1);
406 mtx->initialized.value = 1;
411 static inline int tMPI_Thread_mutex_init_once(tMPI_Thread_mutex_t *mtx)
415 #ifndef TMPI_NO_ATOMICS
416 /* check whether the mutex is initialized */
417 if (tMPI_Atomic_get( &(mtx->initialized) ) == 0)
420 /* we're relying on the memory barrier semantics of mutex_lock/unlock
421 for the check preceding this function call to have worked */
422 ret = pthread_mutex_lock( &(mutex_init) );
428 if (mtx->mutex == NULL)
430 mtx->mutex = (struct tMPI_Mutex*)malloc(sizeof(struct tMPI_Mutex));
431 if (mtx->mutex == NULL)
436 ret = pthread_mutex_init( &(mtx->mutex->mtx), NULL);
442 ret = pthread_mutex_unlock( &(mutex_init) );
446 pthread_mutex_unlock( &(mutex_init) );
451 int tMPI_Thread_mutex_destroy(tMPI_Thread_mutex_t *mtx)
460 ret = pthread_mutex_destroy( &(mtx->mutex->mtx) );
471 int tMPI_Thread_mutex_lock(tMPI_Thread_mutex_t *mtx)
475 /* check whether the mutex is initialized */
476 ret = tMPI_Thread_mutex_init_once(mtx);
482 ret = pthread_mutex_lock(&(mtx->mutex->mtx));
489 int tMPI_Thread_mutex_trylock(tMPI_Thread_mutex_t *mtx)
493 /* check whether the mutex is initialized */
494 ret = tMPI_Thread_mutex_init_once(mtx);
500 ret = pthread_mutex_trylock(&(mtx->mutex->mtx));
506 int tMPI_Thread_mutex_unlock(tMPI_Thread_mutex_t *mtx)
510 /* check whether the mutex is initialized */
511 ret = tMPI_Thread_mutex_init_once(mtx);
517 ret = pthread_mutex_unlock(&(mtx->mutex->mtx));
523 int tMPI_Thread_key_create(tMPI_Thread_key_t *key, void (*destructor)(void *))
533 key->key = (struct tMPI_Thread_key*)malloc(sizeof(struct
535 if (key->key == NULL)
539 ret = pthread_key_create(&((key)->key->pkey), destructor);
545 tMPI_Atomic_set(&(key->initialized), 1);
550 int tMPI_Thread_key_delete(tMPI_Thread_key_t key)
554 ret = pthread_key_delete((key.key->pkey));
566 void * tMPI_Thread_getspecific(tMPI_Thread_key_t key)
570 p = pthread_getspecific((key.key->pkey));
576 int tMPI_Thread_setspecific(tMPI_Thread_key_t key, void *value)
580 ret = pthread_setspecific((key.key->pkey), value);
586 int tMPI_Thread_once(tMPI_Thread_once_t *once_control,
587 void (*init_routine)(void))
590 if (!once_control || !init_routine)
595 /* really ugly hack - and it's slow... */
596 ret = pthread_mutex_lock( &once_init );
601 if (tMPI_Atomic_get(&(once_control->once)) == 0)
604 tMPI_Atomic_set(&(once_control->once), 1);
606 ret = pthread_mutex_unlock( &once_init );
614 int tMPI_Thread_cond_init(tMPI_Thread_cond_t *cond)
623 cond->condp = (struct tMPI_Thread_cond*)malloc(
624 sizeof(struct tMPI_Thread_cond));
625 if (cond->condp == NULL)
630 ret = pthread_cond_init(&(cond->condp->cond), NULL);
635 tMPI_Atomic_set(&(cond->initialized), 1);
636 tMPI_Atomic_memory_barrier();
642 static int tMPI_Thread_cond_init_once(tMPI_Thread_cond_t *cond)
646 /* we're relying on the memory barrier semantics of mutex_lock/unlock
647 for the check preceding this function call to have worked */
648 ret = pthread_mutex_lock( &(cond_init) );
653 if (cond->condp == NULL)
655 cond->condp = (struct tMPI_Thread_cond*)
656 malloc(sizeof(struct tMPI_Thread_cond)*1);
657 if (cond->condp == NULL)
662 ret = pthread_cond_init( &(cond->condp->cond), NULL);
668 ret = pthread_mutex_unlock( &(cond_init) );
671 /* try to unlock anyway */
672 pthread_mutex_unlock( &(cond_init) );
678 int tMPI_Thread_cond_destroy(tMPI_Thread_cond_t *cond)
687 ret = pthread_cond_destroy(&(cond->condp->cond));
698 int tMPI_Thread_cond_wait(tMPI_Thread_cond_t *cond, tMPI_Thread_mutex_t *mtx)
702 /* check whether the condition is initialized */
703 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
705 ret = tMPI_Thread_cond_init_once(cond);
711 /* the mutex must have been initialized because it should be locked here */
713 ret = pthread_cond_wait( &(cond->condp->cond), &(mtx->mutex->mtx) );
721 int tMPI_Thread_cond_signal(tMPI_Thread_cond_t *cond)
725 /* check whether the condition is initialized */
726 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
728 ret = tMPI_Thread_cond_init_once(cond);
735 ret = pthread_cond_signal( &(cond->condp->cond) );
742 int tMPI_Thread_cond_broadcast(tMPI_Thread_cond_t *cond)
746 /* check whether the condition is initialized */
747 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
749 ret = tMPI_Thread_cond_init_once(cond);
756 ret = pthread_cond_broadcast( &(cond->condp->cond) );
764 void tMPI_Thread_exit(void *value_ptr)
766 pthread_exit(value_ptr);
770 int tMPI_Thread_cancel(tMPI_Thread_t thread)
772 #ifdef __native_client__
775 return pthread_cancel(thread->th);
781 int tMPI_Thread_barrier_init(tMPI_Thread_barrier_t *barrier, int n)
784 /*tMPI_Thread_pthread_barrier_t *p;*/
791 barrier->barrierp = (struct tMPI_Thread_barrier*)
792 malloc(sizeof(struct tMPI_Thread_barrier)*1);
793 if (barrier->barrierp == NULL)
798 ret = pthread_mutex_init(&(barrier->barrierp->mutex), NULL);
804 ret = pthread_cond_init(&(barrier->barrierp->cv), NULL);
810 barrier->threshold = n;
814 tMPI_Atomic_set(&(barrier->initialized), 1);
818 static int tMPI_Thread_barrier_init_once(tMPI_Thread_barrier_t *barrier)
822 /* we're relying on the memory barrier semantics of mutex_lock/unlock
823 for the check preceding this function call to have worked */
824 ret = pthread_mutex_lock( &(barrier_init) );
830 if (barrier->barrierp == NULL)
832 barrier->barrierp = (struct tMPI_Thread_barrier*)
833 malloc(sizeof(struct tMPI_Thread_barrier)*1);
834 if (barrier->barrierp == NULL)
840 ret = pthread_mutex_init(&(barrier->barrierp->mutex), NULL);
847 ret = pthread_cond_init(&(barrier->barrierp->cv), NULL);
854 ret = pthread_mutex_unlock( &(barrier_init) );
857 pthread_mutex_unlock( &(barrier_init) );
864 int tMPI_Thread_barrier_destroy(tMPI_Thread_barrier_t *barrier)
873 ret = pthread_mutex_destroy(&(barrier->barrierp->mutex));
878 ret = pthread_cond_destroy(&(barrier->barrierp->cv));
884 free(barrier->barrierp);
890 int tMPI_Thread_barrier_wait(tMPI_Thread_barrier_t * barrier)
895 /* check whether the barrier is initialized */
896 if (tMPI_Atomic_get( &(barrier->initialized) ) == 0)
898 tMPI_Thread_barrier_init_once(barrier);
902 ret = pthread_mutex_lock(&barrier->barrierp->mutex);
908 cycle = barrier->cycle;
910 /* Decrement the count atomically and check if it is zero.
911 * This will only be true for the last thread calling us.
913 if (--barrier->count <= 0)
915 barrier->cycle = !barrier->cycle;
916 barrier->count = barrier->threshold;
917 ret = pthread_cond_broadcast(&barrier->barrierp->cv);
926 while (cycle == barrier->cycle)
928 ret = pthread_cond_wait(&barrier->barrierp->cv,
929 &barrier->barrierp->mutex);
937 ret = pthread_mutex_unlock(&barrier->barrierp->mutex);
940 pthread_mutex_unlock(&barrier->barrierp->mutex);
947 /* just to have some symbols */
948 int tMPI_Thread_pthreads = 0;
950 #endif /* THREAD_PTHREADS */