2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
40 /* Include the defines that determine which thread library to use.
41 * We do not use HAVE_PTHREAD_H directly, since we might want to
42 * turn off thread support explicity (e.g. for debugging).
45 #ifdef HAVE_TMPI_CONFIG_H
46 #include "tmpi_config.h"
56 /* the win32 header */
66 #include "thread_mpi/atomic.h"
67 #include "thread_mpi/threads.h"
70 #include "winthreads.h"
72 /*! \brief System mutex for all one-time initialization
74 * This static variable is necessary in order to make the header file
75 * independent of the thread library implementation. Anyway, it
76 * will only be locked a handful of times at the start of program execution.
79 static CRITICAL_SECTION mutex_init; /* mutex for initializing mutexes */
80 static CRITICAL_SECTION once_init; /* mutex for initializing barriers */
81 static CRITICAL_SECTION cond_init; /* mutex for initializing thread_conds */
82 static CRITICAL_SECTION barrier_init; /* mutex for initializing barriers */
85 /* spinlock for initializing the above mutexes */
86 static tMPI_Spinlock_t init_init = TMPI_SPINLOCK_INITIALIZER;
88 /* whether tMPI_Thread_create has initialized these mutexes */
89 static tMPI_Atomic_t init_inited = { 0 };
91 /* whether the main thread affinity has been set */
92 static tMPI_Spinlock_t main_thread_aff_lock = TMPI_SPINLOCK_INITIALIZER;
93 static tMPI_Atomic_t main_thread_aff_set = { 0 };
95 /* mutex for managing thread IDs */
96 static CRITICAL_SECTION thread_id_list_lock;
99 DWORD thread_id; /* the thread ID as returned by GetCurrentTreadID() */
100 struct tMPI_Thread* th; /* the associated tMPI thread structure */
102 /* the size of the thrread id list */
103 static int Nalloc_thread_id_list = 0;
104 /* the number of elements in the thread id list */
105 static int N_thread_id_list = 0;
106 /* the thread ID list */
107 static thread_id_list_t *thread_id_list;
111 /* data structure to keep track of thread key destructors. */
114 void (*destructor)(void*);
116 } thread_key_destructors;
118 static thread_key_destructors *destructors = NULL;
123 NUMA and Processor Group awareness support.
125 NUMA support is implemented to maximize the chance that memory access
126 patterns remain Local to the NUMA node.
127 NUMA node processor affinity is utilized to prevent scheduler associated
128 drift across NUMA nodes.
129 Process Group support is implemented to enable > 64 processors to be
130 utilized. This is only supported when building 64bit.
132 The high level approach is:
133 1. Build a description of CPU topology, including processor numbers, NUMA
134 node numbers, and affinity masks.
135 2. For processor intensive worker threads, create threads such that
136 the processor affinity and thread stack is kept local within a NUMA node.
137 3. Employ simple round-robin affinity and node assignment approach when
139 4. Use GetProcAddress() to obtain function pointers to functions that
140 are operating system version dependent, to allow maximum binary
143 Scott Field (sfield@microsoft.com) Jan-2011
148 PROCESSOR_NUMBER ProcessorNumber;
149 GROUP_AFFINITY GroupAffinity;
150 USHORT NumaNodeNumber;
151 } MPI_NUMA_PROCESSOR_INFO;
154 /* thread/processor index, to allow setting round-robin affinity. */
155 volatile ULONG g_ulThreadIndex;
156 /* a value of zero implies the system is not NUMA */
157 ULONG g_ulHighestNumaNodeNumber = 0;
158 /* total number of processors in g_MPI_ProcessInfo array */
159 ULONG g_ulTotalProcessors;
160 /* array describing available processors, affinity masks, and NUMA node */
161 MPI_NUMA_PROCESSOR_INFO *g_MPI_ProcessorInfo = NULL;
163 /* function prototypes and variables to support obtaining function addresses
164 dynamically -- supports down-level operating systems */
166 typedef BOOL (WINAPI *func_GetNumaHighestNodeNumber_t)( PULONG
168 typedef DWORD (WINAPI *func_SetThreadIdealProcessor_t)( HANDLE hThread,
169 DWORD dwIdealProcessor );
170 typedef BOOL (WINAPI *func_SetThreadGroupAffinity_t)( HANDLE hThread,
171 const GROUP_AFFINITY *GroupAffinity,
172 PGROUP_AFFINITY PreviousGroupAffinity );
173 typedef BOOL (WINAPI *func_SetThreadIdealProcessorEx_t)( HANDLE hThread,
174 PPROCESSOR_NUMBER lpIdealProcessor,
175 PPROCESSOR_NUMBER lpPreviousIdealProcessor );
176 typedef BOOL (WINAPI *func_GetNumaNodeProcessorMaskEx_t)( USHORT Node,
177 PGROUP_AFFINITY ProcessorMask );
178 typedef BOOL (WINAPI *func_GetNumaProcessorNodeEx_t)(
179 PPROCESSOR_NUMBER Processor,
180 PUSHORT NodeNumber );
181 typedef VOID (WINAPI *func_GetCurrentProcessorNumberEx_t)(
182 PPROCESSOR_NUMBER ProcNumber );
184 typedef HANDLE (WINAPI *func_CreateRemoteThreadEx_t)(
186 LPSECURITY_ATTRIBUTES lpThreadAttributes,
188 LPTHREAD_START_ROUTINE lpStartAddress,
190 DWORD dwCreationFlags,
191 LPPROC_THREAD_ATTRIBUTE_LIST lpAttributeList,
194 typedef BOOL (WINAPI *func_InitializeProcThreadAttributeList_t)(
195 LPPROC_THREAD_ATTRIBUTE_LIST lpAttributeList,
196 DWORD dwAttributeCount,
199 typedef BOOL (WINAPI *func_UpdateProcThreadAttribute_t)(
200 LPPROC_THREAD_ATTRIBUTE_LIST lpAttributeList,
205 PVOID lpPreviousValue,
206 PSIZE_T lpReturnSize);
207 typedef VOID (WINAPI *func_DeleteProcThreadAttributeList_t)(
208 LPPROC_THREAD_ATTRIBUTE_LIST lpAttributeList);
209 typedef DWORD (WINAPI *func_GetActiveProcessorCount_t)(WORD GroupNumber);
210 typedef WORD (WINAPI *func_GetActiveProcessorGroupCount_t)(void);
213 /* WinXP SP2, WinXP64, WinSrv 2003 */
214 func_GetNumaHighestNodeNumber_t func_GetNumaHighestNodeNumber;
215 func_SetThreadIdealProcessor_t func_SetThreadIdealProcessor;
216 /* Windows 7, WinSrv 2008R2 */
217 func_SetThreadGroupAffinity_t func_SetThreadGroupAffinity;
218 func_SetThreadIdealProcessorEx_t func_SetThreadIdealProcessorEx;
219 func_GetNumaNodeProcessorMaskEx_t func_GetNumaNodeProcessorMaskEx;
220 func_GetNumaProcessorNodeEx_t func_GetNumaProcessorNodeEx;
221 func_GetCurrentProcessorNumberEx_t func_GetCurrentProcessorNumberEx;
222 func_GetActiveProcessorCount_t func_GetActiveProcessorCount;
223 func_GetActiveProcessorGroupCount_t func_GetActiveProcessorGroupCount;
224 func_CreateRemoteThreadEx_t func_CreateRemoteThreadEx;
225 /* Windows Vista, WinSrv 2008 */
226 func_InitializeProcThreadAttributeList_t func_InitializeProcThreadAttributeList;
227 func_UpdateProcThreadAttribute_t func_UpdateProcThreadAttribute;
228 func_DeleteProcThreadAttributeList_t func_DeleteProcThreadAttributeList;
232 /* returns 0 on success.
233 Success is returned if the system is non-NUMA, OR the system doesn't
234 support appropriate NUMA APIs, OR the system is NUMA and we successfully
238 This can happen if an API returned an error, a memory allocation failed, or
239 we failed to initialize affinity mapping information.
241 int tMPI_Init_NUMA(void)
243 /* module handle to kernel32.dll -- we already reference it, so it's already loaded */
244 HMODULE hModKernel32 = NULL;
245 /* 0-based NUMA node count -- does not imply all nodes have available (eg: hot-plug) processors */
246 ULONG ulHighestNumaNodeNumber;
247 /* total number of processors available per affinity masks */
248 DWORD dwTotalProcessors = 0;
251 /* calling thread PROCESSOR_NUMBER */
252 PROCESSOR_NUMBER CurrentProcessorNumber;
253 /* calling thread GROUP_AFFINITY */
254 /*GROUP_AFFINITY CurrentThreadGroupAffinity; */
255 /* calling thread NUMA node */
256 /*USHORT CurrentNumaNodeNumber;*/
258 WORD wActiveGroupCount;
261 /* array of processor information structures */
262 MPI_NUMA_PROCESSOR_INFO *pMPI_ProcessorInfo = NULL;
264 /* assume an error condition */
267 hModKernel32 = GetModuleHandleA("kernel32.dll");
269 if (hModKernel32 == NULL)
274 /* obtain addresses of relevant NUMA functions, most of which are
275 Windows 7 / Windows Server 2008R2 only functions
276 this is done using GetProcAddress to enable the binary to run on older
280 func_GetNumaHighestNodeNumber = (func_GetNumaHighestNodeNumber_t) GetProcAddress( hModKernel32, "GetNumaHighestNodeNumber" );
281 func_SetThreadIdealProcessor = (func_SetThreadIdealProcessor_t) GetProcAddress( hModKernel32, "SetThreadIdealProcessor" );
283 if (func_GetNumaHighestNodeNumber == NULL)
288 /* determine if we're on a NUMA system and if so, determine the number of
291 if (!func_GetNumaHighestNodeNumber( &ulHighestNumaNodeNumber ))
298 func_SetThreadGroupAffinity = (func_SetThreadGroupAffinity_t)GetProcAddress( hModKernel32, "SetThreadGroupAffinity" );
299 func_SetThreadIdealProcessorEx = (func_SetThreadIdealProcessorEx_t)GetProcAddress( hModKernel32, "SetThreadIdealProcessorEx" );
300 func_CreateRemoteThreadEx = (func_CreateRemoteThreadEx_t)GetProcAddress( hModKernel32, "CreateRemoteThreadEx" );
301 func_GetNumaNodeProcessorMaskEx = (func_GetNumaNodeProcessorMaskEx_t)GetProcAddress( hModKernel32, "GetNumaNodeProcessorMaskEx" );
302 func_GetNumaProcessorNodeEx = (func_GetNumaProcessorNodeEx_t)GetProcAddress( hModKernel32, "GetNumaProcessorNodeEx" );
303 func_GetCurrentProcessorNumberEx = (func_GetCurrentProcessorNumberEx_t)GetProcAddress( hModKernel32, "GetCurrentProcessorNumberEx" );
304 func_GetActiveProcessorCount = (func_GetActiveProcessorCount_t)GetProcAddress( hModKernel32, "GetActiveProcessorCount" );
305 func_GetActiveProcessorGroupCount = (func_GetActiveProcessorGroupCount_t)GetProcAddress( hModKernel32, "GetActiveProcessorGroupCount" );
306 func_InitializeProcThreadAttributeList = (func_InitializeProcThreadAttributeList_t)GetProcAddress( hModKernel32, "InitializeProcThreadAttributeList" );
307 func_UpdateProcThreadAttribute = (func_UpdateProcThreadAttribute_t)GetProcAddress( hModKernel32, "UpdateProcThreadAttribute" );
308 func_DeleteProcThreadAttributeList = (func_DeleteProcThreadAttributeList_t)GetProcAddress( hModKernel32, "DeleteProcThreadAttributeList" );
310 if ( (func_SetThreadGroupAffinity == NULL) ||
311 (func_SetThreadIdealProcessorEx == NULL) ||
312 (func_CreateRemoteThreadEx == NULL) ||
313 (func_GetNumaNodeProcessorMaskEx == NULL) ||
314 (func_GetNumaProcessorNodeEx == NULL) ||
315 (func_GetCurrentProcessorNumberEx == NULL) ||
316 (func_GetActiveProcessorCount == NULL) ||
317 (func_GetActiveProcessorGroupCount == NULL) ||
318 (func_InitializeProcThreadAttributeList == NULL) ||
319 (func_UpdateProcThreadAttribute == NULL) ||
320 (func_DeleteProcThreadAttributeList == NULL) )
322 /* if any addresses couldn't be located, assume NUMA functionality
327 if (ulHighestNumaNodeNumber == 0)
329 /* system is not NUMA */
334 /* count the active processors across the groups */
336 func_GetCurrentProcessorNumberEx(&CurrentProcessorNumber);
338 wActiveGroupCount = func_GetActiveProcessorGroupCount();
340 dwTotalProcessors = func_GetActiveProcessorCount( ALL_PROCESSOR_GROUPS );
342 #if !((defined WIN64 || defined _WIN64))
343 /* WOW64 doesn't allow setting the affinity correctly beyond 32
344 processors -- the KAFFINITY mask is only 32 bits wide
345 This check is only here for completeness -- large systems should be
346 running 64bit Gromacs code, where the processor quantity is not
348 By failing here, the WOW64 32bit client will use normal CreateThread(),
349 which can schedule up to 64 un-affinitized threads
352 if (dwTotalProcessors > 32)
358 /* allocate array of processor info blocks */
360 pMPI_ProcessorInfo = tMPI_Malloc( sizeof(MPI_NUMA_PROCESSOR_INFO) *
362 if (pMPI_ProcessorInfo == NULL)
364 tMPI_Fatal_error(TMPI_FARGS, "tMPI_Malloc failed for processor information");
368 /* zero fill to cover reserved must be-zero fields */
369 memset(pMPI_ProcessorInfo, 0, sizeof(MPI_NUMA_PROCESSOR_INFO) * dwTotalProcessors);
371 /* loop through each processor group, and for each group, capture the
372 processor numbers and NUMA node information. */
374 for (GroupIndex = 0; GroupIndex < wActiveGroupCount; GroupIndex++)
376 DWORD dwGroupProcessorCount;
379 dwGroupProcessorCount = func_GetActiveProcessorCount( GroupIndex );
381 for (ProcessorIndex = 0; ProcessorIndex < dwGroupProcessorCount;
384 PROCESSOR_NUMBER *pProcessorNumber = &(pMPI_ProcessorInfo[i].ProcessorNumber);
385 GROUP_AFFINITY *pGroupAffinity = &(pMPI_ProcessorInfo[i].GroupAffinity);
386 USHORT *pNodeNumber = &(pMPI_ProcessorInfo[i].NumaNodeNumber);
388 pProcessorNumber->Group = GroupIndex;
389 pProcessorNumber->Number = ProcessorIndex;
391 /* save an index to the processor array entry for the current processor
392 this is used to enable subsequent threads to be created in a round
393 robin fashion starting at the next array entry
396 if ( (CurrentProcessorNumber.Group == pProcessorNumber->Group ) &&
397 (CurrentProcessorNumber.Number == pProcessorNumber->Number) )
399 /* set global: current thread index into processor array */
403 /* capture the node number and group affinity associated with processor entry
404 any failures here are assumed to be catastrophic and disable
405 the group & NUMA aware thread support
408 if (!func_GetNumaProcessorNodeEx(pProcessorNumber, pNodeNumber))
410 tMPI_Fatal_error(TMPI_FARGS,
411 "Processor enumeration, GetNumaProcessorNodeEx failed, error code=%d",
416 if (!func_GetNumaNodeProcessorMaskEx(*pNodeNumber, pGroupAffinity))
418 tMPI_Fatal_error(TMPI_FARGS,
419 "Processor enumeration, GetNumaNodeProcessorMaskEx failed, error code=%d",
424 /* future enhancement: construct GroupAffinity (single) processor
425 mask within NUMA node for this processor entry */
427 /* increment processor array index */
430 /* sanity check, should never happen */
432 if (i > dwTotalProcessors)
434 tMPI_Fatal_error(TMPI_FARGS, "Processor enumeration exceeds allocated memory!");
441 /* capture number of processors, highest NUMA node number, and processor
443 g_ulTotalProcessors = dwTotalProcessors;
444 g_ulHighestNumaNodeNumber = ulHighestNumaNodeNumber;
445 g_MPI_ProcessorInfo = pMPI_ProcessorInfo;
453 if (pMPI_ProcessorInfo)
455 tMPI_Free( pMPI_ProcessorInfo );
462 static void tMPI_Thread_id_list_init(void)
464 EnterCriticalSection( &thread_id_list_lock );
466 N_thread_id_list = 0;
467 Nalloc_thread_id_list = 4; /* number of initial allocation*/
468 thread_id_list = (thread_id_list_t*)tMPI_Malloc(
469 sizeof(thread_id_list_t)*
470 Nalloc_thread_id_list);
472 LeaveCriticalSection( &thread_id_list_lock );
476 /* add an entry to the thread ID list, assuming it's locked */
477 static void tMPI_Thread_id_list_add_locked(DWORD thread_id,
478 struct tMPI_Thread *th)
480 if (Nalloc_thread_id_list < N_thread_id_list + 1)
482 thread_id_list_t* new_list;
485 /* double the size */
486 Nalloc_thread_id_list *= 2;
487 new_list = (thread_id_list_t*)tMPI_Malloc(
488 sizeof(thread_id_list_t)*
489 Nalloc_thread_id_list);
490 /* and copy over all elements */
491 for (i = 0; i < N_thread_id_list; i++)
493 new_list[i] = thread_id_list[i];
495 /* free the old list */
496 tMPI_Free(thread_id_list);
497 thread_id_list = new_list;
499 thread_id_list[ N_thread_id_list ].thread_id = thread_id;
500 thread_id_list[ N_thread_id_list ].th = th;
507 /* add an entry to the thread ID list */
508 static void tMPI_Thread_id_list_add(DWORD thread_id, struct tMPI_Thread *th)
510 EnterCriticalSection( &thread_id_list_lock );
511 tMPI_Thread_id_list_add_locked(thread_id, th);
512 LeaveCriticalSection( &thread_id_list_lock );
515 /* Remove an entry from the thread_id list, assuming it's locked */
516 static void tMPI_Thread_id_list_remove_locked(DWORD thread_id)
519 tmpi_bool found = FALSE;
521 /* move the last thread_id_list item to the one we want to remove */
522 for (i = 0; i < N_thread_id_list; i++)
524 if (thread_id_list[i].thread_id == thread_id)
526 thread_id_list[i] = thread_id_list[N_thread_id_list - 1];
539 /* Remove an entry from the thread_id list */
540 static void tMPI_Thread_id_list_remove(DWORD thread_id)
543 EnterCriticalSection( &thread_id_list_lock );
544 tMPI_Thread_id_list_remove_locked(thread_id);
545 LeaveCriticalSection( &thread_id_list_lock );
550 /* try to find a thread id in the thread id list. Return NULL when there is no
551 such thread id in the list. Assumes the list is locked.*/
552 static struct tMPI_Thread *tMPI_Thread_id_list_find_locked(DWORD thread_id)
555 struct tMPI_Thread *ret = NULL;
557 /* this is a linear search but it's only O(Nthreads). */
558 for (i = 0; i < N_thread_id_list; i++)
560 if (thread_id_list[i].thread_id == thread_id)
562 ret = thread_id_list[i].th;
570 /* try to find a thread id in the thread id list. Return NULL when there is no
571 such thread id in the list.*/
572 static struct tMPI_Thread *tMPI_Thread_id_list_find(DWORD thread_id)
574 struct tMPI_Thread *ret = NULL;
576 EnterCriticalSection( &thread_id_list_lock );
577 ret = tMPI_Thread_id_list_find_locked(thread_id);
579 LeaveCriticalSection( &thread_id_list_lock );
583 /* try to add the running thread to the list. Returns the tMPI_Thrread struct
584 associated with this thread.*/
585 static struct tMPI_Thread *tMPI_Thread_id_list_add_self(void)
588 struct tMPI_Thread *th = NULL;
590 EnterCriticalSection( &thread_id_list_lock );
592 thread_id = GetCurrentThreadId();
593 th = tMPI_Thread_id_list_find_locked(thread_id);
596 /* if not, create an ID, set it and return it */
597 th = (struct tMPI_Thread*)tMPI_Malloc(sizeof(struct tMPI_Thread)*1);
599 /* to create a handle that can be used outside of the current
600 thread, the handle from GetCurrentThread() must first
602 DuplicateHandle(GetCurrentProcess(),
608 DUPLICATE_SAME_ACCESS);
610 /* This causes a small memory leak that is hard to fix. */
611 th->started_by_tmpi = 0;
612 tMPI_Thread_id_list_add_locked(thread_id, th);
614 LeaveCriticalSection( &thread_id_list_lock );
620 static void tMPI_Init_initers(void)
623 /* we can pre-check because it's atomic */
624 if (tMPI_Atomic_get(&init_inited) == 0)
626 /* this can be a spinlock because the chances of collision are low. */
627 tMPI_Spinlock_lock( &init_init );
629 state = tMPI_Atomic_get(&init_inited);
630 tMPI_Atomic_memory_barrier_acq();
633 InitializeCriticalSection(&mutex_init);
634 InitializeCriticalSection(&once_init);
635 InitializeCriticalSection(&cond_init);
636 InitializeCriticalSection(&barrier_init);
637 InitializeCriticalSection(&thread_id_list_lock);
639 /* fatal errors are handled by the routine by calling
640 tMPI_Fatal_error() */
643 tMPI_Thread_id_list_init();
645 tMPI_Atomic_memory_barrier_rel();
646 tMPI_Atomic_set(&init_inited, 1);
649 tMPI_Spinlock_unlock( &init_init );
655 /* TODO: this needs to go away! (there's another one in pthreads.c)
656 fatal errors are thankfully really rare*/
657 void tMPI_Fatal_error(const char *file, int line, const char *message, ...)
661 fprintf(stderr, "tMPI Fatal error in %s, line %d: ", file, line);
662 va_start(ap, message);
663 vfprintf(stderr, message, ap);
665 fprintf(stderr, "\n");
671 enum tMPI_Thread_support tMPI_Thread_support(void)
673 return TMPI_THREAD_SUPPORT_YES;
676 struct tMPI_Thread_starter_param
678 void *(*start_routine)(void*); /* the function */
679 void *param; /* its parameter */
680 struct tMPI_Thread *thread;
683 static DWORD WINAPI tMPI_Win32_thread_starter( LPVOID lpParam )
685 struct tMPI_Thread_starter_param *prm =
686 (struct tMPI_Thread_starter_param*)lpParam;
688 (prm->start_routine)(prm->param);
693 int tMPI_Thread_get_hw_number(void)
698 GetSystemInfo( &sysinfo );
700 ret = sysinfo.dwNumberOfProcessors;
707 int tMPI_Thread_create(tMPI_Thread_t *thread,
708 void *(*start_routine)(void *), void *arg)
711 struct tMPI_Thread_starter_param *prm;
715 /* a small memory leak to be sure that it doesn't get deallocated
716 once this function ends, before the newly created thread uses it. */
717 prm = (struct tMPI_Thread_starter_param*)
718 tMPI_Malloc(sizeof(struct tMPI_Thread_starter_param));
719 prm->start_routine = start_routine;
722 *thread = (struct tMPI_Thread*)tMPI_Malloc(sizeof(struct tMPI_Thread)*1);
726 tMPI_Fatal_error(TMPI_FARGS, "Invalid thread pointer.");
729 /* this must be locked before the thread is created to prevent a race
730 condition if the thread immediately wants to create its own entry */
731 EnterCriticalSection( &thread_id_list_lock );
732 /* just create a plain thread. */
733 (*thread)->started_by_tmpi = 1;
734 (*thread)->th = CreateThread(NULL,
736 tMPI_Win32_thread_starter,
740 (*thread)->id = thread_id;
742 if ((*thread)->th == NULL)
745 tMPI_Fatal_error(TMPI_FARGS, "Failed to create thread, error code=%d",
749 tMPI_Thread_id_list_add_locked(thread_id, (*thread));
750 LeaveCriticalSection( &thread_id_list_lock );
752 /* inherit the thread priority from the parent thread. */
753 /* TODO: is there value in setting this, vs. just allowing it to default
754 from the process? currently, this limits the effectivenes of changing
755 the priority in eg: TaskManager. */
756 SetThreadPriority(((*thread)->th), GetThreadPriority(GetCurrentThread()));
767 int tMPI_Thread_join(tMPI_Thread_t thread, void **value_ptr)
771 ret = WaitForSingleObject(thread->th, INFINITE);
775 tMPI_Fatal_error(TMPI_FARGS, "Failed to join thread. error code=%d",
782 if (!GetExitCodeThread(thread, &retval))
784 /* TODO: somehow assign value_ptr */
785 tMPI_Fatal_error(TMPI_FARGS,
786 "Failed to get thread exit code: error=%d",
791 CloseHandle(thread->th);
792 tMPI_Thread_id_list_remove(thread->id);
799 void tMPI_Thread_exit(void *value_ptr)
801 /* TODO: fix exit code */
802 /* TODO: call destructors for thread-local storage */
809 int tMPI_Thread_cancel(tMPI_Thread_t thread)
811 if (!TerminateThread( thread, -1) )
813 tMPI_Fatal_error(TMPI_FARGS, "Failed thread_cancel, error code=%d",
817 tMPI_Thread_id_list_remove(thread->id);
822 tMPI_Thread_t tMPI_Thread_self(void)
827 th = tMPI_Thread_id_list_add_self();
833 int tMPI_Thread_equal(tMPI_Thread_t t1, tMPI_Thread_t t2)
835 /* because the tMPI thread IDs are unique, we can compare them directly */
839 enum tMPI_Thread_setaffinity_support tMPI_Thread_setaffinity_support(void)
841 /* Windows supports seting of thread affinities */
842 return TMPI_SETAFFINITY_SUPPORT_YES;
845 int tMPI_Thread_setaffinity_single(tMPI_Thread_t thread, unsigned int nr)
847 GROUP_AFFINITY GroupAffinity;
848 PROCESSOR_NUMBER IdealProcessorNumber;
849 /* thread NUMA node */
850 USHORT NumaNodeNumber;
852 /* check for a processor info array. This exists if NUMA
853 style calls have been succesfully initialized. */
854 if (g_MPI_ProcessorInfo != NULL)
857 /*func_GetCurrentProcessorNumberEx(&CurrentProcessorNumber);*/
859 memcpy(&GroupAffinity,
860 &(g_MPI_ProcessorInfo[nr].GroupAffinity),
861 sizeof(GROUP_AFFINITY));
863 /* group, processor number */
865 memcpy(&IdealProcessorNumber,
866 &(g_MPI_ProcessorInfo[nr].ProcessorNumber),
867 sizeof(PROCESSOR_NUMBER));
870 /* set the NUMA node affinity for the current thread
871 failures to set the current thread affinity are ignored,
872 as a fringe case can arise on >32 processor systems with a 32bit
875 func_SetThreadIdealProcessorEx(thread->th,
876 &IdealProcessorNumber,
879 if (func_GetNumaProcessorNodeEx(&IdealProcessorNumber,
882 /* for the NUMA node number associated with the current processor
883 number, get the group affinity mask */
884 if (func_GetNumaNodeProcessorMaskEx(NumaNodeNumber,
887 /* set the current thread affinity to prevent it from running
888 on other NUMA nodes */
889 func_SetThreadGroupAffinity(thread->th,
899 /* No NUMA-style calls. We just do a simpler thing. */
900 if ( (func_SetThreadIdealProcessor != NULL) )
902 return (func_SetThreadIdealProcessor(thread->th, nr) == -1);
910 int tMPI_Thread_mutex_init(tMPI_Thread_mutex_t *mtx)
917 mtx->mutex = (struct tMPI_Mutex*)tMPI_Malloc(sizeof(struct tMPI_Mutex)*1);
918 InitializeCriticalSection(&(mtx->mutex->cs));
924 int tMPI_Thread_mutex_destroy(tMPI_Thread_mutex_t *mtx)
931 DeleteCriticalSection(&(mtx->mutex->cs));
932 tMPI_Free(mtx->mutex);
940 static int tMPI_Thread_mutex_init_once(tMPI_Thread_mutex_t *mtx)
944 /* This is essentially a copy of the code from the one-time
945 * initialization, but with a call to the mutex init routine instead.
946 * It might seem like overkill, but it will only be executed the first
947 * time you call a static mutex, and it is important to get all the
948 * memory barriers right. Trust me, you don't want a deadlock here...
951 /* initialize the initializers */
953 /* Lock the common one-time init mutex so we can check carefully */
954 EnterCriticalSection( &mutex_init );
956 /* Do the actual (locked) check - system mutex is locked if we get here */
957 if (mtx->mutex == NULL)
959 /* No need to keep the lock during execution -
960 * Only one thread can do it anyway.
962 ret = tMPI_Thread_mutex_init(mtx);
964 LeaveCriticalSection( &mutex_init );
971 int tMPI_Thread_mutex_lock(tMPI_Thread_mutex_t *mtx)
973 /* check whether the mutex is initialized */
974 if (tMPI_Atomic_get( &(mtx->initialized) ) == 0)
976 tMPI_Thread_mutex_init_once(mtx);
979 /* The mutex is now guaranteed to be valid. */
980 EnterCriticalSection( &(mtx->mutex->cs) );
988 int tMPI_Thread_mutex_trylock(tMPI_Thread_mutex_t *mtx)
992 /* check whether the mutex is initialized */
993 if (tMPI_Atomic_get( &(mtx->initialized) ) == 0)
995 tMPI_Thread_mutex_init_once(mtx);
998 /* The mutex is now guaranteed to be valid. */
999 ret = TryEnterCriticalSection( &(mtx->mutex->cs) );
1006 int tMPI_Thread_mutex_unlock(tMPI_Thread_mutex_t *mtx)
1008 /* we should have initialized our critical section anyway */
1009 LeaveCriticalSection( &(mtx->mutex->cs) );
1016 int tMPI_Thread_key_create(tMPI_Thread_key_t *key, void (*destructor)(void *))
1020 tMPI_Fatal_error(TMPI_FARGS, "Invalid key pointer.");
1025 /* TODO: make list of destructors for thread-local storage */
1026 key->key = (struct tMPI_Thread_key*)tMPI_Malloc(sizeof(struct
1027 tMPI_Thread_key)*1);
1029 (key)->key->wkey = TlsAlloc();
1031 if ( (key)->key->wkey == TLS_OUT_OF_INDEXES)
1033 tMPI_Fatal_error(TMPI_FARGS,
1034 "Failed to create thread key, error code=%d.",
1043 int tMPI_Thread_key_delete(tMPI_Thread_key_t key)
1045 TlsFree(key.key->wkey);
1053 void * tMPI_Thread_getspecific(tMPI_Thread_key_t key)
1057 p = TlsGetValue(key.key->wkey);
1063 int tMPI_Thread_setspecific(tMPI_Thread_key_t key, void *value)
1067 ret = TlsSetValue(key.key->wkey, value);
1073 /* use once Vista is minimum required version */
1074 static BOOL CALLBACK InitHandleWrapperFunction(PINIT_ONCE InitOnce,
1078 void (*fn)(void) = (void (*)(void))Parameter;
1085 CRITICAL_SECTION tMPI_Once_cs;
1086 tMPI_Spinlock_t tMPI_Once_cs_lock = TMPI_SPINLOCK_INITIALIZER;
1087 volatile int tMPI_Once_init = 0;
1090 int tMPI_Thread_once(tMPI_Thread_once_t *once_control,
1091 void (*init_routine)(void))
1094 /* use once Vista is minimum required version */
1096 bStatus = InitOnceExecuteOnce(once_control, InitHandleWrapperFunction,
1097 init_routine, NULL);
1101 tMPI_Fatal_error(TMPI_FARGS, "Failed to run thread_once routine");
1105 /* really ugly hack - and it's slow... */
1106 tMPI_Init_initers();
1107 EnterCriticalSection(&once_init);
1108 if (tMPI_Atomic_get(&(once_control->once)) == 0)
1111 tMPI_Atomic_set(&(once_control->once), 1);
1113 LeaveCriticalSection(&once_init);
1122 int tMPI_Thread_cond_init(tMPI_Thread_cond_t *cond)
1129 cond->condp = (struct tMPI_Thread_cond*)
1130 tMPI_Malloc(sizeof(struct tMPI_Thread_cond)*1);
1132 /* use this code once Vista is the minimum version required */
1133 InitializeConditionVariable( &(cond->cv) );
1135 cond->condp->Nwaiters = 0;
1136 InitializeCriticalSection(&(cond->condp->wtr_lock));
1137 cond->condp->Nrelease = 0;
1138 cond->condp->cycle = 0;
1139 /* a manual reset, unsignalled event */
1140 cond->condp->ev = CreateEvent(NULL, TRUE, FALSE, NULL);
1146 int tMPI_Thread_cond_destroy(tMPI_Thread_cond_t *cond)
1149 /* use this code once Vista is the minimum version required */
1150 /* windows doesnt have this function */
1152 DeleteCriticalSection(&(cond->condp->wtr_lock));
1153 tMPI_Free(cond->condp);
1160 /*! \brief Static init routine for pthread barrier
1164 * This is only used as a wrapper to enable static initialization
1165 * of posix thread types together with out abstraction layer for tMPI_Thread.h
1167 * \param cond Condition variable, must be statically initialized
1169 * \return status - 0 on success, or a standard error code.
1171 static int tMPI_Thread_cond_init_once(tMPI_Thread_cond_t *cond)
1175 /* This is essentially a copy of the code from the one-time
1176 * initialization, but with a call to the cond init routine instead.
1177 * It might seem like overkill, but it will only be executed the first
1178 * time you call a static condition variable, and it is important to get
1179 * the memory barriers right. Trust me, you don't want a deadlock here...
1182 /* initialize the initializers */
1183 tMPI_Init_initers();
1184 /* Lock the common one-time init mutex so we can check carefully */
1185 EnterCriticalSection( &cond_init );
1187 /* Do the actual (locked) check - system mutex is locked if we get here */
1188 if (cond->condp == NULL)
1190 /* No need to keep the lock during execution -
1191 * Only one thread can do it anyway. */
1192 ret = tMPI_Thread_cond_init(cond);
1194 LeaveCriticalSection( &cond_init );
1202 int tMPI_Thread_cond_wait(tMPI_Thread_cond_t *cond, tMPI_Thread_mutex_t *mtx)
1204 BOOL wait_done = FALSE;
1205 BOOL last_waiter = FALSE;
1208 /* check whether the condition is initialized */
1209 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
1211 tMPI_Thread_cond_init_once(cond);
1213 /* the mutex must have been initialized because it should be locked here */
1216 /* use this code once Vista is the minimum version required */
1217 ret = SleepConditionVariableCS (&(cond->cv), &(mtx->cs), INFINITE);
1221 tMPI_Fatal_error(TMPI_FARGS, "Failed wait for condition, error code=%d",
1226 /* serially increase waiter count */
1227 EnterCriticalSection(&(cond->condp->wtr_lock));
1228 cond->condp->Nwaiters++;
1229 my_cycle = cond->condp->cycle;
1230 LeaveCriticalSection(&(cond->condp->wtr_lock));
1232 /* now it's safe to release the mutex from the fn call */
1233 LeaveCriticalSection(&(mtx->mutex->cs));
1235 /* Loop a wait until we found out we've waited for the right event.
1236 Note that this loop is potentially a busy-wait loop in bad
1237 circumstances (higher priority threads, for example). */
1240 /* do the actual waiting */
1241 if (WaitForSingleObject( cond->condp->ev, INFINITE ) == WAIT_FAILED)
1243 tMPI_Fatal_error(TMPI_FARGS, "Failed event reset, error code=%d",
1248 /* serially check whether we got the right event. */
1249 EnterCriticalSection(&(cond->condp->wtr_lock));
1250 wait_done = (cond->condp->Nrelease > 0) &&
1251 (cond->condp->cycle != my_cycle);
1252 LeaveCriticalSection(&(cond->condp->wtr_lock));
1256 /* We obtain the mutex from the function call */
1257 EnterCriticalSection(&(mtx->mutex->cs));
1259 /* we serially decrease the waiter count and release count */
1260 EnterCriticalSection(&(cond->condp->wtr_lock));
1261 cond->condp->Nwaiters--;
1262 cond->condp->Nrelease--;
1263 last_waiter = (cond->condp->Nrelease == 0);
1264 LeaveCriticalSection(&(cond->condp->wtr_lock));
1266 /* manually release the event if everybody's done with it */
1269 if (!ResetEvent( cond->condp->ev ))
1271 tMPI_Fatal_error(TMPI_FARGS, "Failed event reset, error code=%d",
1284 int tMPI_Thread_cond_signal(tMPI_Thread_cond_t *cond)
1286 /* check whether the condition is initialized */
1287 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
1289 tMPI_Thread_cond_init_once(cond);
1291 /* The condition variable is now guaranteed to be valid. */
1293 /* use this code once Vista is the minimum version required */
1294 WakeConditionVariable( &(cond->cv) );
1296 EnterCriticalSection(&(cond->condp->wtr_lock));
1297 /* check if we're not still busy with a release. If we are, do nothing. */
1298 if (cond->condp->Nwaiters > cond->condp->Nrelease)
1300 cond->condp->Nrelease++;
1301 cond->condp->cycle++;
1302 if (!SetEvent(cond->condp->ev)) /* actually release the
1305 tMPI_Fatal_error(TMPI_FARGS, "Failed SetEvent, error code=%d",
1310 LeaveCriticalSection(&(cond->condp->wtr_lock));
1318 int tMPI_Thread_cond_broadcast(tMPI_Thread_cond_t *cond)
1320 /* check whether the condition is initialized */
1321 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
1323 tMPI_Thread_cond_init_once(cond);
1325 /* The condition variable is now guaranteed to be valid. */
1327 /* use this code once Vista is the minimum version required */
1328 WakeAllConditionVariable( &(cond->cv) );
1330 EnterCriticalSection(&(cond->condp->wtr_lock));
1331 /* check whether there are any waiters */
1332 if (cond->condp->Nwaiters > 0)
1334 cond->condp->Nrelease = cond->condp->Nwaiters;
1335 cond->condp->cycle++;
1336 if (!SetEvent(cond->condp->ev)) /* actually release the
1339 tMPI_Fatal_error(TMPI_FARGS, "Failed SetEvent, error code=%d",
1344 LeaveCriticalSection(&(cond->condp->wtr_lock));
1352 int tMPI_Thread_barrier_init(tMPI_Thread_barrier_t *barrier, int n)
1354 if (barrier == NULL)
1359 barrier->barrierp = (struct tMPI_Thread_barrier*)
1360 tMPI_Malloc(sizeof(struct tMPI_Thread_barrier)*1);
1363 /* use this once Vista is the oldest supported windows version: */
1364 InitializeCriticalSection(&(barrier->barrierp->cs));
1365 InitializeConditionVariable(&(barrier->barrierp->cv));
1367 tMPI_Thread_mutex_init(&(barrier->barrierp->cs));
1368 tMPI_Thread_cond_init(&(barrier->barrierp->cv));
1371 barrier->threshold = n;
1380 int tMPI_Thread_barrier_destroy(tMPI_Thread_barrier_t *barrier)
1382 if (barrier == NULL)
1388 DeleteCriticalSection(&(barrier->barrierp->cs));
1390 tMPI_Thread_mutex_destroy(&(barrier->barrierp->cs));
1393 tMPI_Thread_cond_destroy(&(barrier->barrierp->cv));
1395 tMPI_Free(barrier->barrierp);
1402 /*! \brief Static init routine for pthread barrier
1406 * This is only used as a wrapper to enable static initialization
1407 * of posix thread types together with out abstraction layer for tMPI_Thread.h
1409 * \param barrier Statically initialized barrier type
1410 * \param n Number of members in barrier
1412 * \return status - 0 on success, or a standard error code.
1414 static int tMPI_Thread_barrier_init_once(tMPI_Thread_barrier_t *barrier, int n)
1418 /* This is essentially a copy of the code from the one-time
1419 * initialization, but with a call to the cond init routine instead.
1420 * It might seem like overkill, but it will only be executed the first
1421 * time you call a static condition variable, and it is important to get
1422 * the memory barriers right. Trust me, you don't want a deadlock here...
1426 /* initialize the initializers */
1427 tMPI_Init_initers();
1429 /* Lock the common one-time init mutex so we can check carefully */
1430 EnterCriticalSection( &barrier_init );
1432 /* Do the actual (locked) check - system mutex is locked if we get here */
1433 if (barrier->barrierp == NULL)
1435 /* No need to keep the lock during execution -
1436 * Only one thread can do it anyway. */
1437 ret = tMPI_Thread_barrier_init(barrier, n);
1439 LeaveCriticalSection( &barrier_init );
1446 int tMPI_Thread_barrier_wait(tMPI_Thread_barrier_t *barrier)
1451 /*tMPI_Thread_pthread_barrier_t *p;*/
1453 /* check whether the barrier is initialized */
1454 if (tMPI_Atomic_get( &(barrier->initialized) ) == 0)
1456 tMPI_Thread_barrier_init_once(barrier, barrier->threshold);
1460 EnterCriticalSection( &(barrier->barrierp->cs) );
1462 tMPI_Thread_mutex_lock( &(barrier->barrierp->cs) );
1467 cycle = barrier->cycle;
1469 /* Decrement the count atomically and check if it is zero.
1470 * This will only be true for the last thread calling us.
1472 if (--(barrier->count) <= 0)
1474 barrier->cycle = !barrier->cycle;
1475 barrier->count = barrier->threshold;
1477 WakeAllConditionVariable( &(barrier->barrierp->cv) );
1479 tMPI_Thread_cond_broadcast( &(barrier->barrierp->cv) );
1484 while (cycle == barrier->cycle)
1487 rc = SleepConditionVariableCS (&(barrier->barrierp->cv),
1488 &(barrier->barrierp->cs),
1496 rc = tMPI_Thread_cond_wait(&barrier->barrierp->cv,
1497 &barrier->barrierp->cs);
1506 LeaveCriticalSection( &(barrier->barrierp->cs) );
1508 tMPI_Thread_mutex_unlock( &(barrier->barrierp->cs) );
1515 /* just to have some symbols */
1516 int tMPI_Thread_winthreads = 0;
1518 #endif /* THREAD_WINDOWS */