Merge release-4-6 into master
[alexxy/gromacs.git] / src / gromacs / gmxlib / thread_mpi / pthreads.c
1 /*
2 This source code file is part of thread_mpi.  
3 Written by Sander Pronk, Erik Lindahl, and possibly others. 
4
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6 All rights reserved.
7
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11    notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13    notice, this list of conditions and the following disclaimer in the
14    documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16    names of its contributors may be used to endorse or promote products
17    derived from this software without specific prior written permission.
18
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
35 files.
36 */
37
38
39
40 /* Include the defines that determine which thread library to use.
41 * We do not use HAVE_PTHREAD_H directly, since we might want to
42 * turn off thread support explicity (e.g. for debugging).
43 */
44 #ifdef HAVE_TMPI_CONFIG_H
45 #include "tmpi_config.h"
46 #endif
47
48 #ifdef HAVE_CONFIG_H
49 #include "config.h"
50 #endif
51
52
53 #ifdef THREAD_PTHREADS 
54
55 #ifdef HAVE_PTHREAD_SETAFFINITY
56 #define _GNU_SOURCE
57 #endif
58
59 /* pthread.h must be the first header, apart from the defines in config.h */
60 #include <pthread.h>
61
62
63 #ifdef HAVE_UNISTD_H
64 #include <unistd.h>
65 #endif
66
67 #include <errno.h>
68 #include <stdlib.h>
69 #include <stdio.h>
70 #include <stdarg.h>
71
72
73 #include "thread_mpi/atomic.h"
74 #include "thread_mpi/threads.h"
75 #include "impl.h"
76
77
78 #include "pthreads.h"
79
80
81 /* mutex for initializing mutexes */
82 static pthread_mutex_t mutex_init=PTHREAD_MUTEX_INITIALIZER;   
83 /* mutex for initializing barriers */
84 static pthread_mutex_t once_init=PTHREAD_MUTEX_INITIALIZER;    
85 /* mutex for initializing thread_conds */
86 static pthread_mutex_t cond_init=PTHREAD_MUTEX_INITIALIZER;    
87 /* mutex for initializing barriers */
88 static pthread_mutex_t barrier_init=PTHREAD_MUTEX_INITIALIZER; 
89
90 /* mutex for initializing barriers */
91 static pthread_mutex_t aff_init=PTHREAD_MUTEX_INITIALIZER; 
92 static int aff_thread_number=0;
93
94
95 /* TODO: this needs to go away!  (there's another one in winthreads.c)
96    fatal errors are thankfully really rare*/
97 void tMPI_Fatal_error(const char *file, int line, const char *message, ...)
98 {
99     va_list ap;
100
101     fprintf(stderr, "tMPI Fatal error in %s, line %d: ", file, line);
102     va_start(ap, message);
103     vfprintf(stderr, message, ap);
104     va_end(ap);
105     fprintf(stderr,"\n");
106     abort();
107 }
108
109
110 enum tMPI_Thread_support tMPI_Thread_support(void)
111 {
112     return TMPI_THREAD_SUPPORT_YES;
113 }
114
115
116 int tMPI_Thread_get_hw_number(void)
117 {
118     int ret=0;
119 #ifdef HAVE_SYSCONF
120 #if defined(_SC_NPROCESSORS_ONLN)
121     ret=sysconf(_SC_NPROCESSORS_ONLN);
122 #elif defined(_SC_NPROC_ONLN)
123     ret=sysconf(_SC_NPROC_ONLN);
124 #elif defined(_SC_NPROCESSORS_CONF)
125     ret=sysconf(_SC_NPROCESSORS_CONF);
126 #elif defined(_SC_NPROC_CONF)
127     ret=sysconf(_SC_NPROC_CONF);
128 #endif
129 #endif
130
131     return ret;
132 }
133
134 int tMPI_Thread_create(tMPI_Thread_t *thread, void *(*start_routine)(void *),
135                        void *arg)
136 {
137     int ret;
138
139     if(thread==NULL)
140     {
141         tMPI_Fatal_error(TMPI_FARGS,"Invalid thread pointer.");
142         return EINVAL;
143     }
144
145     *thread=(struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
146     ret=pthread_create(&((*thread)->th),NULL,start_routine,arg);
147
148     if(ret!=0)
149     {
150         /* Cannot use tMPI_error() since messages use threads for locking */
151         tMPI_Fatal_error(TMPI_FARGS,"Failed to create POSIX thread:%s, rc=%d",
152                          strerror(errno), ret);
153         /* Use system memory allocation routines */
154         return -1;
155     }
156
157     return 0;
158 }
159
160
161 /* set thread's own affinity to a processor number n */
162 static int tMPI_Set_affinity(int n)
163 {
164 #ifdef HAVE_PTHREAD_SETAFFINITY
165     cpu_set_t set;
166
167     CPU_ZERO(&set);
168     CPU_SET(n, &set);
169     return pthread_setaffinity_np(pthread_self(), sizeof(set), &set);
170 #endif
171     return 0;
172 }
173
174 int tMPI_Thread_create_aff(tMPI_Thread_t *thread, 
175                            void *(*start_routine)(void *),
176                            void *arg)
177 {
178     int ret;
179
180 #ifdef TMPI_SET_AFFINITY
181     /* set the calling thread's affinity mask */
182     pthread_mutex_lock( &(aff_init) );
183     if (aff_thread_number==0)
184     {
185         tMPI_Set_affinity(aff_thread_number++);
186     }
187     pthread_mutex_unlock( &(aff_init) );
188 #endif
189
190     if(thread==NULL)
191     {
192         tMPI_Fatal_error(TMPI_FARGS,"Invalid thread pointer.");
193         return EINVAL;
194     }
195
196     *thread=(struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
197     ret=pthread_create(&((*thread)->th),NULL,start_routine,arg);
198
199     if(ret!=0)
200     {
201         /* Cannot use tMPI_error() since messages use threads for locking */
202         tMPI_Fatal_error(TMPI_FARGS,"Failed to create POSIX thread:%s, rc=%d",
203                          strerror(errno), ret);
204         /* Use system memory allocation routines */
205         return -1;
206     }
207     else
208     {
209 #ifdef TMPI_SET_AFFINITY
210         /* now set the affinity of the new thread */
211         pthread_mutex_lock( &(aff_init) );
212         ret=tMPI_Set_affinity(aff_thread_number++);
213         pthread_mutex_unlock( &(aff_init) );
214         /* failure is non-fatal, so we won't check the result */
215         return 0;
216 #else
217         return 0;
218 #endif
219     }
220 }
221
222
223
224
225
226 int tMPI_Thread_join(tMPI_Thread_t thread, void **value_ptr)
227 {
228     int ret;
229     pthread_t th=thread->th;
230
231     free(thread);
232     
233     ret = pthread_join( th, value_ptr );
234
235     if(ret != 0 )
236     {
237         tMPI_Fatal_error(TMPI_FARGS,"Failed to join POSIX thread. rc=%d",ret);
238     }
239     return ret;
240 }
241
242
243
244 int tMPI_Thread_mutex_init(tMPI_Thread_mutex_t *mtx) 
245 {
246     int ret;
247   
248     if (mtx == NULL)
249     {
250         return EINVAL;
251     }
252
253     mtx->mutex=(struct tMPI_Mutex*)tMPI_Malloc(sizeof(struct tMPI_Mutex)*1);
254     ret = pthread_mutex_init(&(mtx->mutex->mtx),NULL);
255     
256     if(ret!=0)
257     {
258         tMPI_Fatal_error(TMPI_FARGS,"Error initializing POSIX mutex. rc=%d");
259         /* Use system memory allocation routines */
260         return ret;
261     }
262
263     tMPI_Atomic_set(&(mtx->initialized), 1);
264     return 0;
265 }
266
267 static int tMPI_Thread_mutex_init_once(tMPI_Thread_mutex_t *mtx)
268 {
269     int ret=0;
270
271     /* we're relying on the memory barrier semantics of mutex_lock/unlock
272        for the check preceding this function call to have worked */
273     pthread_mutex_lock( &(mutex_init) );    
274     if(mtx->mutex==NULL)
275     {
276         mtx->mutex=(struct tMPI_Mutex*)tMPI_Malloc(sizeof(struct tMPI_Mutex)*1);
277         ret=pthread_mutex_init( &(mtx->mutex->mtx), NULL);
278     }
279     pthread_mutex_unlock( &(mutex_init) );    
280     return ret;
281 }
282
283
284 int tMPI_Thread_mutex_destroy(tMPI_Thread_mutex_t *mtx) 
285 {
286     int ret;
287
288     if(mtx == NULL)
289     {
290         return EINVAL;
291     }
292     
293     ret = pthread_mutex_destroy( &(mtx->mutex->mtx) );
294     free(mtx->mutex);
295     
296     if(ret!=0)
297     {
298         tMPI_Fatal_error(TMPI_FARGS,"Error destroying POSIX mutex. rc=%d",ret);
299         /* Use system memory allocation routines */
300     }
301     return ret;
302 }
303
304
305
306 int tMPI_Thread_mutex_lock(tMPI_Thread_mutex_t *mtx)
307 {
308     int ret;
309
310     /* check whether the mutex is initialized */
311     if (tMPI_Atomic_get( &(mtx->initialized)  ) == 0)
312     {
313         ret=tMPI_Thread_mutex_init_once(mtx);
314         if (ret)
315             return ret;
316     }
317    
318     ret=pthread_mutex_lock(&(mtx->mutex->mtx));
319
320     return ret;
321 }
322
323  
324
325
326 int tMPI_Thread_mutex_trylock(tMPI_Thread_mutex_t *mtx)
327 {
328     int ret;
329
330     /* check whether the mutex is initialized */
331     if (tMPI_Atomic_get( &(mtx->initialized)  ) == 0)
332     {
333         ret=tMPI_Thread_mutex_init_once(mtx);
334         if (ret)
335             return ret;
336     }
337
338     ret=pthread_mutex_trylock(&(mtx->mutex->mtx));
339     
340     return ret;
341 }
342
343
344
345 int tMPI_Thread_mutex_unlock(tMPI_Thread_mutex_t *mtx)
346 {
347     int ret;
348  
349     /* check whether the mutex is initialized */
350     if (tMPI_Atomic_get( &(mtx->initialized)  ) == 0)
351     {
352         ret=tMPI_Thread_mutex_init_once(mtx);
353         if (ret)
354             return ret;
355     }
356  
357     ret = pthread_mutex_unlock(&(mtx->mutex->mtx));
358     
359     return ret;
360 }
361
362
363
364 int tMPI_Thread_key_create(tMPI_Thread_key_t *key, void (*destructor)(void *))
365 {
366     int ret;
367
368     if(key==NULL)
369     {
370         tMPI_Fatal_error(TMPI_FARGS,"Invalid key pointer.");
371         return EINVAL;
372     }
373
374
375     key->key=(struct tMPI_Thread_key*)tMPI_Malloc(sizeof(struct 
376                                                          tMPI_Thread_key)*1);
377     ret = pthread_key_create(&((key)->key->pkey), destructor);
378     if(ret!=0)
379     {
380         tMPI_Fatal_error(TMPI_FARGS,"Failed to create thread key, rc=%d.",ret);
381         fflush(stderr);
382         return -1;
383     }
384
385     tMPI_Atomic_set(&(key->initialized), 1);
386     return 0;
387 }
388
389
390 int tMPI_Thread_key_delete(tMPI_Thread_key_t key)
391 {
392     int ret;
393
394     ret=pthread_key_delete((key.key->pkey));
395     free(key.key);
396
397     if(ret!=0)
398     {
399         tMPI_Fatal_error(TMPI_FARGS,"Failed to delete thread key, rc=%d.",ret);
400         fflush(stderr);
401     }
402     
403     return ret;
404 }
405
406
407
408 void * tMPI_Thread_getspecific(tMPI_Thread_key_t key)
409 {
410     void *p = NULL;
411
412     p=pthread_getspecific((key.key->pkey));
413
414     return p;
415 }
416
417
418 int tMPI_Thread_setspecific(tMPI_Thread_key_t key, void *value)
419 {
420     int ret;
421     
422     ret=pthread_setspecific((key.key->pkey),value);
423     
424     return ret;
425 }
426
427
428 int tMPI_Thread_once(tMPI_Thread_once_t *once_control,
429                      void (*init_routine)(void))
430 {
431     int ret;
432     if (!once_control || !init_routine)
433     {
434         return EINVAL;
435     }
436
437     /* really ugly hack - and it's slow... */
438     if ( (ret=pthread_mutex_lock( &once_init )) )
439         return ret;
440     if (tMPI_Atomic_get(&(once_control->once)) == 0)
441     {
442         (*init_routine)();
443         tMPI_Atomic_set(&(once_control->once), 1);
444     }
445     pthread_mutex_unlock( &once_init );
446
447     return 0;
448 }
449
450
451
452
453 int tMPI_Thread_cond_init(tMPI_Thread_cond_t *cond) 
454 {
455     int ret;
456     
457     if(cond==NULL)
458     {
459         return EINVAL;
460     }
461    
462     cond->condp=(struct tMPI_Thread_cond*)
463               tMPI_Malloc(sizeof(struct tMPI_Thread_cond)*1);
464     ret = pthread_cond_init(&(cond->condp->cond), NULL);
465     
466     if(ret!=0)
467     {
468         tMPI_Fatal_error(TMPI_FARGS,"Error initializing POSIX condition variable. rc=%d",ret);
469         fflush(stderr);
470     }
471     tMPI_Atomic_set(&(cond->initialized),1);
472     return ret;
473 }
474
475
476 static int tMPI_Thread_cond_init_once(tMPI_Thread_cond_t *cond)
477 {
478     int ret=0;
479
480     /* we're relying on the memory barrier semantics of mutex_lock/unlock
481        for the check preceding this function call to have worked */
482     pthread_mutex_lock( &(cond_init) );    
483     if(cond->condp==NULL)
484     {
485         cond->condp=(struct tMPI_Thread_cond*)
486                   tMPI_Malloc(sizeof(struct tMPI_Thread_cond)*1);
487         ret=pthread_cond_init( &(cond->condp->cond), NULL);
488     }
489     pthread_mutex_unlock( &(cond_init) );    
490     return ret;
491 }
492
493
494
495 int tMPI_Thread_cond_destroy(tMPI_Thread_cond_t *cond) 
496 {
497     int ret;
498     
499     if(cond == NULL)
500     {
501         return EINVAL;
502     }
503     
504     ret = pthread_cond_destroy(&(cond->condp->cond));
505     free(cond->condp);
506    
507     if(ret!=0)
508     {
509         tMPI_Fatal_error(TMPI_FARGS,
510                          "Error destroying POSIX condition variable. rc=%d",
511                          ret);
512         fflush(stderr);
513     }
514     return ret;
515 }
516
517
518 int tMPI_Thread_cond_wait(tMPI_Thread_cond_t *cond, tMPI_Thread_mutex_t *mtx)
519 {
520     int ret;
521
522     /* check whether the condition is initialized */
523     if (tMPI_Atomic_get( &(cond->initialized)  ) == 0)
524     {
525         tMPI_Thread_cond_init_once(cond);
526     }
527     /* the mutex must have been initialized because it should be locked here */
528    
529     ret = pthread_cond_wait( &(cond->condp->cond), &(mtx->mutex->mtx) );
530     
531     return ret;
532 }
533
534
535
536
537 int tMPI_Thread_cond_signal(tMPI_Thread_cond_t *cond)
538 {
539     int ret;
540
541     /* check whether the condition is initialized */
542     if (tMPI_Atomic_get( &(cond->initialized)  ) == 0)
543     {
544         tMPI_Thread_cond_init_once(cond);
545     }
546     
547     ret = pthread_cond_signal( &(cond->condp->cond) );
548     
549     return ret;
550 }
551
552
553
554 int tMPI_Thread_cond_broadcast(tMPI_Thread_cond_t *cond)
555 {
556     int ret;
557
558     /* check whether the condition is initialized */
559     if (tMPI_Atomic_get( &(cond->initialized)  ) == 0)
560     {
561         tMPI_Thread_cond_init_once(cond);
562     }
563    
564     ret = pthread_cond_broadcast( &(cond->condp->cond) );
565     
566     return ret;
567 }
568
569
570
571
572 void tMPI_Thread_exit(void *      value_ptr)
573 {
574     pthread_exit(value_ptr);
575 }
576
577
578 int tMPI_Thread_cancel(tMPI_Thread_t     thread)
579 {
580     return pthread_cancel(thread->th);
581 }
582
583
584
585
586 int tMPI_Thread_barrier_init(tMPI_Thread_barrier_t *barrier, int n)
587 {
588     int ret;
589     /*tMPI_Thread_pthread_barrier_t *p;*/
590     
591     if(barrier==NULL)
592     {
593         return EINVAL;
594     }
595     
596     barrier->barrierp=(struct tMPI_Thread_barrier*)
597               tMPI_Malloc(sizeof(struct tMPI_Thread_barrier)*1);
598     ret = pthread_mutex_init(&(barrier->barrierp->mutex),NULL);
599         
600     if(ret!=0)
601     {
602         tMPI_Fatal_error(TMPI_FARGS,"Error initializing POSIX mutex. rc=%d",
603                          ret);
604         return ret;
605     }
606     
607     ret = pthread_cond_init(&(barrier->barrierp->cv),NULL);
608     
609     if(ret!=0)
610     {
611         tMPI_Fatal_error(TMPI_FARGS,
612                          "Error initializing POSIX condition variable. rc=%d",
613                          ret);
614         return ret;
615     }
616         
617     barrier->threshold = n;
618     barrier->count     = n;
619     barrier->cycle     = 0;
620
621     tMPI_Atomic_set(&(barrier->initialized), 1);
622     return 0;
623 }
624
625 static int tMPI_Thread_barrier_init_once(tMPI_Thread_barrier_t *barrier)
626 {
627     int ret=0;
628
629     /* we're relying on the memory barrier semantics of mutex_lock/unlock
630        for the check preceding this function call to have worked */
631     pthread_mutex_lock( &(barrier_init) );    
632     if(barrier->barrierp==NULL)
633     {
634         barrier->barrierp=(struct tMPI_Thread_barrier*)
635                   tMPI_Malloc(sizeof(struct tMPI_Thread_barrier)*1);
636         ret = pthread_mutex_init(&(barrier->barrierp->mutex),NULL);
637
638         if(ret!=0)
639         {
640             tMPI_Fatal_error(TMPI_FARGS,"Error initializing POSIX mutex. rc=%d",
641                              ret);
642             return ret;
643         }
644
645         ret = pthread_cond_init(&(barrier->barrierp->cv),NULL);
646
647         if(ret!=0)
648         {
649             tMPI_Fatal_error(TMPI_FARGS,
650                              "Error initializing POSIX condition variable. rc=%d",
651                              ret);
652             return ret;
653         }
654     }
655     pthread_mutex_unlock( &(barrier_init) );    
656     return ret;
657 }
658
659
660
661
662 int tMPI_Thread_barrier_destroy(tMPI_Thread_barrier_t *barrier)
663 {
664     if(barrier==NULL)
665     {
666         return EINVAL;
667     }
668
669     pthread_mutex_destroy(&(barrier->barrierp->mutex));
670     pthread_cond_destroy(&(barrier->barrierp->cv));
671
672     free(barrier->barrierp);
673     
674     return 0;
675 }
676  
677
678 int tMPI_Thread_barrier_wait(tMPI_Thread_barrier_t *   barrier)
679 {
680     int    cycle;
681     int    rc;
682     
683     /* check whether the barrier is initialized */
684     if (tMPI_Atomic_get( &(barrier->initialized)  ) == 0)
685     {
686         tMPI_Thread_barrier_init_once(barrier);
687     }
688
689
690     rc = pthread_mutex_lock(&barrier->barrierp->mutex);
691
692     
693     if(rc != 0)
694         return EBUSY;
695
696     cycle = barrier->cycle;
697     
698     /* Decrement the count atomically and check if it is zero.
699         * This will only be true for the last thread calling us.
700         */
701     if( --barrier->count <= 0 )
702     { 
703         barrier->cycle = !barrier->cycle;
704         barrier->count = barrier->threshold;
705         rc = pthread_cond_broadcast(&barrier->barrierp->cv);
706         
707         if(rc == 0)
708             rc = -1;
709     }
710     else
711     {
712         while(cycle == barrier->cycle)
713         {
714             rc = pthread_cond_wait(&barrier->barrierp->cv,
715                                    &barrier->barrierp->mutex);
716             if(rc != 0) break;
717         }
718     }
719     
720     pthread_mutex_unlock(&barrier->barrierp->mutex);
721     return rc;
722 }
723
724 #else
725
726 /* just to have some symbols */
727 int tMPI_Thread_pthreads=0;
728
729 #endif /* THREAD_PTHREADS */
730