2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
40 /* Include the defines that determine which thread library to use.
41 * We do not use HAVE_PTHREAD_H directly, since we might want to
42 * turn off thread support explicity (e.g. for debugging).
44 #ifdef HAVE_TMPI_CONFIG_H
45 #include "tmpi_config.h"
53 #ifdef THREAD_PTHREADS
55 #ifdef HAVE_PTHREAD_SETAFFINITY
59 /* pthread.h must be the first header, apart from the defines in config.h */
72 #include "thread_mpi/atomic.h"
73 #include "thread_mpi/threads.h"
79 /* mutex for initializing mutexes */
80 static pthread_mutex_t mutex_init = PTHREAD_MUTEX_INITIALIZER;
81 /* mutex for initializing barriers */
82 static pthread_mutex_t once_init = PTHREAD_MUTEX_INITIALIZER;
83 /* mutex for initializing thread_conds */
84 static pthread_mutex_t cond_init = PTHREAD_MUTEX_INITIALIZER;
85 /* mutex for initializing barriers */
86 static pthread_mutex_t barrier_init = PTHREAD_MUTEX_INITIALIZER;
88 /* mutex for managing thread IDs */
89 static pthread_mutex_t thread_id_mutex = PTHREAD_MUTEX_INITIALIZER;
90 static pthread_key_t thread_id_key;
91 static int thread_id_key_initialized = 0;
96 enum tMPI_Thread_support tMPI_Thread_support(void)
98 return TMPI_THREAD_SUPPORT_YES;
102 int tMPI_Thread_get_hw_number(void)
106 #if defined(_SC_NPROCESSORS_ONLN)
107 ret = sysconf(_SC_NPROCESSORS_ONLN);
108 #elif defined(_SC_NPROC_ONLN)
109 ret = sysconf(_SC_NPROC_ONLN);
110 #elif defined(_SC_NPROCESSORS_CONF)
111 ret = sysconf(_SC_NPROCESSORS_CONF);
112 #elif defined(_SC_NPROC_CONF)
113 ret = sysconf(_SC_NPROC_CONF);
120 /* destructor for thread ids */
121 static void tMPI_Destroy_thread_id(void* thread_id)
123 struct tMPI_Thread *thread = (struct tMPI_Thread*)thread_id;
124 if (!thread->started_by_tmpi)
126 /* if the thread is started by tMPI, it must be freed in the join()
133 /* Set the thread id var for this thread
134 Returns a pointer to the thread object if succesful, NULL if ENOMEM */
135 static struct tMPI_Thread* tMPI_Set_thread_id_key(int started_by_tmpi)
137 struct tMPI_Thread *th;
139 th = (struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
144 th->th = pthread_self();
145 th->started_by_tmpi = started_by_tmpi;
146 /* we ignore errors because any thread that needs this value will
147 re-generate it in the next iteration. */
148 pthread_setspecific(thread_id_key, th);
152 /* initialize the thread id vars if not already initialized */
153 static int tMPI_Init_thread_ids(void)
156 ret = pthread_mutex_lock( &thread_id_mutex );
162 if (!thread_id_key_initialized)
164 /* initialize and set the thread id thread-specific variable */
165 struct tMPI_Thread *th;
167 thread_id_key_initialized = 1;
168 ret = pthread_key_create(&thread_id_key, tMPI_Destroy_thread_id);
173 th = tMPI_Set_thread_id_key(0);
181 ret = pthread_mutex_unlock( &thread_id_mutex );
184 pthread_mutex_unlock( &thread_id_mutex );
188 /* structure to hold the arguments for the thread_starter function */
189 struct tMPI_Thread_starter
191 struct tMPI_Thread *thread;
192 void *(*start_routine)(void*);
194 pthread_mutex_t cond_lock; /* lock for initialization of thread
198 /* the thread_starter function that sets the thread id */
199 static void *tMPI_Thread_starter(void *arg)
201 struct tMPI_Thread_starter *starter = (struct tMPI_Thread_starter *)arg;
202 void *(*start_routine)(void*);
206 /* first wait for the parent thread to signal that the starter->thread
207 structure is ready. That's done by unlocking the starter->cond_lock */
208 ret = pthread_mutex_lock(&(starter->cond_lock));
213 ret = pthread_mutex_unlock(&(starter->cond_lock));
219 /* now remember the tMPI_thread_t structure for this thread */
220 ret = pthread_setspecific(thread_id_key, starter->thread);
225 start_routine = starter->start_routine;
228 /* deallocate the starter structure. Errors here are non-fatal. */
229 pthread_mutex_destroy(&(starter->cond_lock));
231 return (*start_routine)(parg);
234 int tMPI_Thread_create(tMPI_Thread_t *thread, void *(*start_routine)(void *),
238 struct tMPI_Thread_starter *starter;
244 tMPI_Init_thread_ids();
246 *thread = (struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
251 (*thread)->started_by_tmpi = 1;
252 starter = (struct tMPI_Thread_starter*)
253 malloc(sizeof(struct tMPI_Thread_starter)*1);
258 /* fill the starter structure */
259 starter->thread = *thread;
260 starter->start_routine = start_routine;
263 ret = pthread_mutex_init(&(starter->cond_lock), NULL);
268 /* now lock the mutex so we can unlock it once we know the data in
269 thread->th is safe. */
270 ret = pthread_mutex_lock(&(starter->cond_lock));
276 ret = pthread_create(&((*thread)->th), NULL, tMPI_Thread_starter,
283 /* Here we know thread->th is safe. */
284 ret = pthread_mutex_unlock(&(starter->cond_lock));
291 int tMPI_Thread_join(tMPI_Thread_t thread, void **value_ptr)
294 pthread_t th = thread->th;
296 ret = pthread_join( th, value_ptr );
306 tMPI_Thread_t tMPI_Thread_self(void)
311 /* make sure the key var is set */
312 ret = tMPI_Init_thread_ids();
318 th = pthread_getspecific(thread_id_key);
320 /* check if it is already in our list */
323 th = tMPI_Set_thread_id_key(0);
328 int tMPI_Thread_equal(tMPI_Thread_t t1, tMPI_Thread_t t2)
330 return pthread_equal(t1->th, t2->th);
334 enum tMPI_Thread_setaffinity_support tMPI_Thread_setaffinity_support(void)
336 #ifdef HAVE_PTHREAD_SETAFFINITY
340 /* run getaffinity to check whether we get back ENOSYS */
341 ret = pthread_getaffinity_np(pthread_self(), sizeof(set), &set);
344 return TMPI_SETAFFINITY_SUPPORT_YES;
348 return TMPI_SETAFFINITY_SUPPORT_NO;
351 return TMPI_SETAFFINITY_SUPPORT_NO;
356 /* set thread's own affinity to a processor number n */
357 int tMPI_Thread_setaffinity_single(tMPI_Thread_t tmpi_unused thread,
358 unsigned int tmpi_unused nr)
360 #ifdef HAVE_PTHREAD_SETAFFINITY
361 int nt = tMPI_Thread_get_hw_number();
366 return TMPI_ERR_PROCNR;
371 return pthread_setaffinity_np(thread->th, sizeof(set), &set);
379 int tMPI_Thread_mutex_init(tMPI_Thread_mutex_t *mtx)
388 mtx->mutex = (struct tMPI_Mutex*)malloc(sizeof(struct tMPI_Mutex)*1);
389 if (mtx->mutex == NULL)
393 ret = pthread_mutex_init(&(mtx->mutex->mtx), NULL);
399 #ifndef TMPI_NO_ATOMICS
400 tMPI_Atomic_set(&(mtx->initialized), 1);
402 mtx->initialized.value = 1;
407 static inline int tMPI_Thread_mutex_init_once(tMPI_Thread_mutex_t *mtx)
411 #ifndef TMPI_NO_ATOMICS
412 /* check whether the mutex is initialized */
413 if (tMPI_Atomic_get( &(mtx->initialized) ) == 0)
416 /* we're relying on the memory barrier semantics of mutex_lock/unlock
417 for the check preceding this function call to have worked */
418 ret = pthread_mutex_lock( &(mutex_init) );
424 if (mtx->mutex == NULL)
426 mtx->mutex = (struct tMPI_Mutex*)malloc(sizeof(struct tMPI_Mutex));
427 if (mtx->mutex == NULL)
432 ret = pthread_mutex_init( &(mtx->mutex->mtx), NULL);
438 ret = pthread_mutex_unlock( &(mutex_init) );
442 pthread_mutex_unlock( &(mutex_init) );
447 int tMPI_Thread_mutex_destroy(tMPI_Thread_mutex_t *mtx)
456 ret = pthread_mutex_destroy( &(mtx->mutex->mtx) );
467 int tMPI_Thread_mutex_lock(tMPI_Thread_mutex_t *mtx)
471 /* check whether the mutex is initialized */
472 ret = tMPI_Thread_mutex_init_once(mtx);
478 ret = pthread_mutex_lock(&(mtx->mutex->mtx));
485 int tMPI_Thread_mutex_trylock(tMPI_Thread_mutex_t *mtx)
489 /* check whether the mutex is initialized */
490 ret = tMPI_Thread_mutex_init_once(mtx);
496 ret = pthread_mutex_trylock(&(mtx->mutex->mtx));
502 int tMPI_Thread_mutex_unlock(tMPI_Thread_mutex_t *mtx)
506 /* check whether the mutex is initialized */
507 ret = tMPI_Thread_mutex_init_once(mtx);
513 ret = pthread_mutex_unlock(&(mtx->mutex->mtx));
519 int tMPI_Thread_key_create(tMPI_Thread_key_t *key, void (*destructor)(void *))
529 key->key = (struct tMPI_Thread_key*)malloc(sizeof(struct
531 if (key->key == NULL)
535 ret = pthread_key_create(&((key)->key->pkey), destructor);
541 tMPI_Atomic_set(&(key->initialized), 1);
546 int tMPI_Thread_key_delete(tMPI_Thread_key_t key)
550 ret = pthread_key_delete((key.key->pkey));
562 void * tMPI_Thread_getspecific(tMPI_Thread_key_t key)
566 p = pthread_getspecific((key.key->pkey));
572 int tMPI_Thread_setspecific(tMPI_Thread_key_t key, void *value)
576 ret = pthread_setspecific((key.key->pkey), value);
582 int tMPI_Thread_once(tMPI_Thread_once_t *once_control,
583 void (*init_routine)(void))
586 if (!once_control || !init_routine)
591 /* really ugly hack - and it's slow... */
592 ret = pthread_mutex_lock( &once_init );
597 if (tMPI_Atomic_get(&(once_control->once)) == 0)
600 tMPI_Atomic_set(&(once_control->once), 1);
602 ret = pthread_mutex_unlock( &once_init );
610 int tMPI_Thread_cond_init(tMPI_Thread_cond_t *cond)
619 cond->condp = (struct tMPI_Thread_cond*)malloc(
620 sizeof(struct tMPI_Thread_cond));
621 if (cond->condp == NULL)
626 ret = pthread_cond_init(&(cond->condp->cond), NULL);
631 tMPI_Atomic_set(&(cond->initialized), 1);
632 tMPI_Atomic_memory_barrier();
638 static int tMPI_Thread_cond_init_once(tMPI_Thread_cond_t *cond)
642 /* we're relying on the memory barrier semantics of mutex_lock/unlock
643 for the check preceding this function call to have worked */
644 ret = pthread_mutex_lock( &(cond_init) );
649 if (cond->condp == NULL)
651 cond->condp = (struct tMPI_Thread_cond*)
652 malloc(sizeof(struct tMPI_Thread_cond)*1);
653 if (cond->condp == NULL)
658 ret = pthread_cond_init( &(cond->condp->cond), NULL);
664 ret = pthread_mutex_unlock( &(cond_init) );
667 /* try to unlock anyway */
668 pthread_mutex_unlock( &(cond_init) );
674 int tMPI_Thread_cond_destroy(tMPI_Thread_cond_t *cond)
683 ret = pthread_cond_destroy(&(cond->condp->cond));
694 int tMPI_Thread_cond_wait(tMPI_Thread_cond_t *cond, tMPI_Thread_mutex_t *mtx)
698 /* check whether the condition is initialized */
699 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
701 ret = tMPI_Thread_cond_init_once(cond);
707 /* the mutex must have been initialized because it should be locked here */
709 ret = pthread_cond_wait( &(cond->condp->cond), &(mtx->mutex->mtx) );
717 int tMPI_Thread_cond_signal(tMPI_Thread_cond_t *cond)
721 /* check whether the condition is initialized */
722 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
724 ret = tMPI_Thread_cond_init_once(cond);
731 ret = pthread_cond_signal( &(cond->condp->cond) );
738 int tMPI_Thread_cond_broadcast(tMPI_Thread_cond_t *cond)
742 /* check whether the condition is initialized */
743 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
745 ret = tMPI_Thread_cond_init_once(cond);
752 ret = pthread_cond_broadcast( &(cond->condp->cond) );
760 void tMPI_Thread_exit(void *value_ptr)
762 pthread_exit(value_ptr);
766 int tMPI_Thread_cancel(tMPI_Thread_t thread)
768 #ifdef __native_client__
771 return pthread_cancel(thread->th);
777 int tMPI_Thread_barrier_init(tMPI_Thread_barrier_t *barrier, int n)
780 /*tMPI_Thread_pthread_barrier_t *p;*/
787 barrier->barrierp = (struct tMPI_Thread_barrier*)
788 malloc(sizeof(struct tMPI_Thread_barrier)*1);
789 if (barrier->barrierp == NULL)
794 ret = pthread_mutex_init(&(barrier->barrierp->mutex), NULL);
800 ret = pthread_cond_init(&(barrier->barrierp->cv), NULL);
806 barrier->threshold = n;
810 tMPI_Atomic_set(&(barrier->initialized), 1);
814 static int tMPI_Thread_barrier_init_once(tMPI_Thread_barrier_t *barrier)
818 /* we're relying on the memory barrier semantics of mutex_lock/unlock
819 for the check preceding this function call to have worked */
820 ret = pthread_mutex_lock( &(barrier_init) );
826 if (barrier->barrierp == NULL)
828 barrier->barrierp = (struct tMPI_Thread_barrier*)
829 malloc(sizeof(struct tMPI_Thread_barrier)*1);
830 if (barrier->barrierp == NULL)
836 ret = pthread_mutex_init(&(barrier->barrierp->mutex), NULL);
843 ret = pthread_cond_init(&(barrier->barrierp->cv), NULL);
850 ret = pthread_mutex_unlock( &(barrier_init) );
853 pthread_mutex_unlock( &(barrier_init) );
860 int tMPI_Thread_barrier_destroy(tMPI_Thread_barrier_t *barrier)
869 ret = pthread_mutex_destroy(&(barrier->barrierp->mutex));
874 ret = pthread_cond_destroy(&(barrier->barrierp->cv));
880 free(barrier->barrierp);
886 int tMPI_Thread_barrier_wait(tMPI_Thread_barrier_t * barrier)
891 /* check whether the barrier is initialized */
892 if (tMPI_Atomic_get( &(barrier->initialized) ) == 0)
894 tMPI_Thread_barrier_init_once(barrier);
898 ret = pthread_mutex_lock(&barrier->barrierp->mutex);
904 cycle = barrier->cycle;
906 /* Decrement the count atomically and check if it is zero.
907 * This will only be true for the last thread calling us.
909 if (--barrier->count <= 0)
911 barrier->cycle = !barrier->cycle;
912 barrier->count = barrier->threshold;
913 ret = pthread_cond_broadcast(&barrier->barrierp->cv);
922 while (cycle == barrier->cycle)
924 ret = pthread_cond_wait(&barrier->barrierp->cv,
925 &barrier->barrierp->mutex);
933 ret = pthread_mutex_unlock(&barrier->barrierp->mutex);
936 pthread_mutex_unlock(&barrier->barrierp->mutex);
943 /* just to have some symbols */
944 int tMPI_Thread_pthreads = 0;
946 #endif /* THREAD_PTHREADS */