2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
40 /* Include the defines that determine which thread library to use.
41 * We do not use HAVE_PTHREAD_H directly, since we might want to
42 * turn off thread support explicity (e.g. for debugging).
44 #ifdef HAVE_TMPI_CONFIG_H
45 #include "tmpi_config.h"
53 #ifdef THREAD_PTHREADS
55 #ifdef HAVE_PTHREAD_SETAFFINITY
59 /* pthread.h must be the first header, apart from the defines in config.h */
73 #include "thread_mpi/atomic.h"
74 #include "thread_mpi/threads.h"
81 /* mutex for initializing mutexes */
82 static pthread_mutex_t mutex_init=PTHREAD_MUTEX_INITIALIZER;
83 /* mutex for initializing barriers */
84 static pthread_mutex_t once_init=PTHREAD_MUTEX_INITIALIZER;
85 /* mutex for initializing thread_conds */
86 static pthread_mutex_t cond_init=PTHREAD_MUTEX_INITIALIZER;
87 /* mutex for initializing barriers */
88 static pthread_mutex_t barrier_init=PTHREAD_MUTEX_INITIALIZER;
90 /* mutex for initializing barriers */
91 static pthread_mutex_t aff_init=PTHREAD_MUTEX_INITIALIZER;
92 static int aff_thread_number=0;
95 /* TODO: this needs to go away! (there's another one in winthreads.c)
96 fatal errors are thankfully really rare*/
97 void tMPI_Fatal_error(const char *file, int line, const char *message, ...)
101 fprintf(stderr, "tMPI Fatal error in %s, line %d: ", file, line);
102 va_start(ap, message);
103 vfprintf(stderr, message, ap);
105 fprintf(stderr,"\n");
110 enum tMPI_Thread_support tMPI_Thread_support(void)
112 return TMPI_THREAD_SUPPORT_YES;
116 int tMPI_Thread_get_hw_number(void)
120 #if defined(_SC_NPROCESSORS_ONLN)
121 ret=sysconf(_SC_NPROCESSORS_ONLN);
122 #elif defined(_SC_NPROC_ONLN)
123 ret=sysconf(_SC_NPROC_ONLN);
124 #elif defined(_SC_NPROCESSORS_CONF)
125 ret=sysconf(_SC_NPROCESSORS_CONF);
126 #elif defined(_SC_NPROC_CONF)
127 ret=sysconf(_SC_NPROC_CONF);
134 int tMPI_Thread_create(tMPI_Thread_t *thread, void *(*start_routine)(void *),
141 tMPI_Fatal_error(TMPI_FARGS,"Invalid thread pointer.");
145 *thread=(struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
146 ret=pthread_create(&((*thread)->th),NULL,start_routine,arg);
150 /* Cannot use tMPI_error() since messages use threads for locking */
151 tMPI_Fatal_error(TMPI_FARGS,"Failed to create POSIX thread:%s, rc=%d",
152 strerror(errno), ret);
153 /* Use system memory allocation routines */
161 /* set thread's own affinity to a processor number n */
162 static int tMPI_Set_affinity(int n)
164 #ifdef HAVE_PTHREAD_SETAFFINITY
169 return pthread_setaffinity_np(pthread_self(), sizeof(set), &set);
174 int tMPI_Thread_create_aff(tMPI_Thread_t *thread,
175 void *(*start_routine)(void *),
180 #ifdef TMPI_SET_AFFINITY
181 /* set the calling thread's affinity mask */
182 pthread_mutex_lock( &(aff_init) );
183 if (aff_thread_number==0)
185 tMPI_Set_affinity(aff_thread_number++);
187 pthread_mutex_unlock( &(aff_init) );
192 tMPI_Fatal_error(TMPI_FARGS,"Invalid thread pointer.");
196 *thread=(struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
197 ret=pthread_create(&((*thread)->th),NULL,start_routine,arg);
201 /* Cannot use tMPI_error() since messages use threads for locking */
202 tMPI_Fatal_error(TMPI_FARGS,"Failed to create POSIX thread:%s, rc=%d",
203 strerror(errno), ret);
204 /* Use system memory allocation routines */
209 #ifdef TMPI_SET_AFFINITY
210 /* now set the affinity of the new thread */
211 pthread_mutex_lock( &(aff_init) );
212 ret=tMPI_Set_affinity(aff_thread_number++);
213 pthread_mutex_unlock( &(aff_init) );
214 /* failure is non-fatal, so we won't check the result */
226 int tMPI_Thread_join(tMPI_Thread_t thread, void **value_ptr)
229 pthread_t th=thread->th;
233 ret = pthread_join( th, value_ptr );
237 tMPI_Fatal_error(TMPI_FARGS,"Failed to join POSIX thread. rc=%d",ret);
244 int tMPI_Thread_mutex_init(tMPI_Thread_mutex_t *mtx)
253 mtx->mutex=(struct tMPI_Mutex*)tMPI_Malloc(sizeof(struct tMPI_Mutex)*1);
254 ret = pthread_mutex_init(&(mtx->mutex->mtx),NULL);
258 tMPI_Fatal_error(TMPI_FARGS,"Error initializing POSIX mutex. rc=%d");
259 /* Use system memory allocation routines */
263 tMPI_Atomic_set(&(mtx->initialized), 1);
267 static int tMPI_Thread_mutex_init_once(tMPI_Thread_mutex_t *mtx)
271 /* we're relying on the memory barrier semantics of mutex_lock/unlock
272 for the check preceding this function call to have worked */
273 pthread_mutex_lock( &(mutex_init) );
276 mtx->mutex=(struct tMPI_Mutex*)tMPI_Malloc(sizeof(struct tMPI_Mutex)*1);
277 ret=pthread_mutex_init( &(mtx->mutex->mtx), NULL);
279 pthread_mutex_unlock( &(mutex_init) );
284 int tMPI_Thread_mutex_destroy(tMPI_Thread_mutex_t *mtx)
293 ret = pthread_mutex_destroy( &(mtx->mutex->mtx) );
298 tMPI_Fatal_error(TMPI_FARGS,"Error destroying POSIX mutex. rc=%d",ret);
299 /* Use system memory allocation routines */
306 int tMPI_Thread_mutex_lock(tMPI_Thread_mutex_t *mtx)
310 /* check whether the mutex is initialized */
311 if (tMPI_Atomic_get( &(mtx->initialized) ) == 0)
313 ret=tMPI_Thread_mutex_init_once(mtx);
318 ret=pthread_mutex_lock(&(mtx->mutex->mtx));
326 int tMPI_Thread_mutex_trylock(tMPI_Thread_mutex_t *mtx)
330 /* check whether the mutex is initialized */
331 if (tMPI_Atomic_get( &(mtx->initialized) ) == 0)
333 ret=tMPI_Thread_mutex_init_once(mtx);
338 ret=pthread_mutex_trylock(&(mtx->mutex->mtx));
345 int tMPI_Thread_mutex_unlock(tMPI_Thread_mutex_t *mtx)
349 /* check whether the mutex is initialized */
350 if (tMPI_Atomic_get( &(mtx->initialized) ) == 0)
352 ret=tMPI_Thread_mutex_init_once(mtx);
357 ret = pthread_mutex_unlock(&(mtx->mutex->mtx));
364 int tMPI_Thread_key_create(tMPI_Thread_key_t *key, void (*destructor)(void *))
370 tMPI_Fatal_error(TMPI_FARGS,"Invalid key pointer.");
375 key->key=(struct tMPI_Thread_key*)tMPI_Malloc(sizeof(struct
377 ret = pthread_key_create(&((key)->key->pkey), destructor);
380 tMPI_Fatal_error(TMPI_FARGS,"Failed to create thread key, rc=%d.",ret);
385 tMPI_Atomic_set(&(key->initialized), 1);
390 int tMPI_Thread_key_delete(tMPI_Thread_key_t key)
394 ret=pthread_key_delete((key.key->pkey));
399 tMPI_Fatal_error(TMPI_FARGS,"Failed to delete thread key, rc=%d.",ret);
408 void * tMPI_Thread_getspecific(tMPI_Thread_key_t key)
412 p=pthread_getspecific((key.key->pkey));
418 int tMPI_Thread_setspecific(tMPI_Thread_key_t key, void *value)
422 ret=pthread_setspecific((key.key->pkey),value);
428 int tMPI_Thread_once(tMPI_Thread_once_t *once_control,
429 void (*init_routine)(void))
432 if (!once_control || !init_routine)
437 /* really ugly hack - and it's slow... */
438 if ( (ret=pthread_mutex_lock( &once_init )) )
440 if (tMPI_Atomic_get(&(once_control->once)) == 0)
443 tMPI_Atomic_set(&(once_control->once), 1);
445 pthread_mutex_unlock( &once_init );
453 int tMPI_Thread_cond_init(tMPI_Thread_cond_t *cond)
462 cond->condp=(struct tMPI_Thread_cond*)
463 tMPI_Malloc(sizeof(struct tMPI_Thread_cond)*1);
464 ret = pthread_cond_init(&(cond->condp->cond), NULL);
468 tMPI_Fatal_error(TMPI_FARGS,"Error initializing POSIX condition variable. rc=%d",ret);
471 tMPI_Atomic_set(&(cond->initialized),1);
476 static int tMPI_Thread_cond_init_once(tMPI_Thread_cond_t *cond)
480 /* we're relying on the memory barrier semantics of mutex_lock/unlock
481 for the check preceding this function call to have worked */
482 pthread_mutex_lock( &(cond_init) );
483 if(cond->condp==NULL)
485 cond->condp=(struct tMPI_Thread_cond*)
486 tMPI_Malloc(sizeof(struct tMPI_Thread_cond)*1);
487 ret=pthread_cond_init( &(cond->condp->cond), NULL);
489 pthread_mutex_unlock( &(cond_init) );
495 int tMPI_Thread_cond_destroy(tMPI_Thread_cond_t *cond)
504 ret = pthread_cond_destroy(&(cond->condp->cond));
509 tMPI_Fatal_error(TMPI_FARGS,
510 "Error destroying POSIX condition variable. rc=%d",
518 int tMPI_Thread_cond_wait(tMPI_Thread_cond_t *cond, tMPI_Thread_mutex_t *mtx)
522 /* check whether the condition is initialized */
523 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
525 tMPI_Thread_cond_init_once(cond);
527 /* the mutex must have been initialized because it should be locked here */
529 ret = pthread_cond_wait( &(cond->condp->cond), &(mtx->mutex->mtx) );
537 int tMPI_Thread_cond_signal(tMPI_Thread_cond_t *cond)
541 /* check whether the condition is initialized */
542 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
544 tMPI_Thread_cond_init_once(cond);
547 ret = pthread_cond_signal( &(cond->condp->cond) );
554 int tMPI_Thread_cond_broadcast(tMPI_Thread_cond_t *cond)
558 /* check whether the condition is initialized */
559 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
561 tMPI_Thread_cond_init_once(cond);
564 ret = pthread_cond_broadcast( &(cond->condp->cond) );
572 void tMPI_Thread_exit(void * value_ptr)
574 pthread_exit(value_ptr);
578 int tMPI_Thread_cancel(tMPI_Thread_t thread)
580 return pthread_cancel(thread->th);
586 int tMPI_Thread_barrier_init(tMPI_Thread_barrier_t *barrier, int n)
589 /*tMPI_Thread_pthread_barrier_t *p;*/
596 barrier->barrierp=(struct tMPI_Thread_barrier*)
597 tMPI_Malloc(sizeof(struct tMPI_Thread_barrier)*1);
598 ret = pthread_mutex_init(&(barrier->barrierp->mutex),NULL);
602 tMPI_Fatal_error(TMPI_FARGS,"Error initializing POSIX mutex. rc=%d",
607 ret = pthread_cond_init(&(barrier->barrierp->cv),NULL);
611 tMPI_Fatal_error(TMPI_FARGS,
612 "Error initializing POSIX condition variable. rc=%d",
617 barrier->threshold = n;
621 tMPI_Atomic_set(&(barrier->initialized), 1);
625 static int tMPI_Thread_barrier_init_once(tMPI_Thread_barrier_t *barrier)
629 /* we're relying on the memory barrier semantics of mutex_lock/unlock
630 for the check preceding this function call to have worked */
631 pthread_mutex_lock( &(barrier_init) );
632 if(barrier->barrierp==NULL)
634 barrier->barrierp=(struct tMPI_Thread_barrier*)
635 tMPI_Malloc(sizeof(struct tMPI_Thread_barrier)*1);
636 ret = pthread_mutex_init(&(barrier->barrierp->mutex),NULL);
640 tMPI_Fatal_error(TMPI_FARGS,"Error initializing POSIX mutex. rc=%d",
645 ret = pthread_cond_init(&(barrier->barrierp->cv),NULL);
649 tMPI_Fatal_error(TMPI_FARGS,
650 "Error initializing POSIX condition variable. rc=%d",
655 pthread_mutex_unlock( &(barrier_init) );
662 int tMPI_Thread_barrier_destroy(tMPI_Thread_barrier_t *barrier)
669 pthread_mutex_destroy(&(barrier->barrierp->mutex));
670 pthread_cond_destroy(&(barrier->barrierp->cv));
672 free(barrier->barrierp);
678 int tMPI_Thread_barrier_wait(tMPI_Thread_barrier_t * barrier)
683 /* check whether the barrier is initialized */
684 if (tMPI_Atomic_get( &(barrier->initialized) ) == 0)
686 tMPI_Thread_barrier_init_once(barrier);
690 rc = pthread_mutex_lock(&barrier->barrierp->mutex);
696 cycle = barrier->cycle;
698 /* Decrement the count atomically and check if it is zero.
699 * This will only be true for the last thread calling us.
701 if( --barrier->count <= 0 )
703 barrier->cycle = !barrier->cycle;
704 barrier->count = barrier->threshold;
705 rc = pthread_cond_broadcast(&barrier->barrierp->cv);
712 while(cycle == barrier->cycle)
714 rc = pthread_cond_wait(&barrier->barrierp->cv,
715 &barrier->barrierp->mutex);
720 pthread_mutex_unlock(&barrier->barrierp->mutex);
726 /* just to have some symbols */
727 int tMPI_Thread_pthreads=0;
729 #endif /* THREAD_PTHREADS */