2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
40 /* Include the defines that determine which thread library to use.
41 * We do not use HAVE_PTHREAD_H directly, since we might want to
42 * turn off thread support explicity (e.g. for debugging).
45 #ifdef HAVE_TMPI_CONFIG_H
46 #include "tmpi_config.h"
56 /* the win32 header */
66 #include "thread_mpi/atomic.h"
67 #include "thread_mpi/threads.h"
70 #include "winthreads.h"
72 /*! \brief System mutex for all one-time initialization
74 * This static variable is necessary in order to make the header file
75 * independent of the thread library implementation. Anyway, it
76 * will only be locked a handful of times at the start of program execution.
79 static CRITICAL_SECTION mutex_init; /* mutex for initializing mutexes */
80 static CRITICAL_SECTION once_init; /* mutex for initializing barriers */
81 static CRITICAL_SECTION cond_init; /* mutex for initializing thread_conds */
82 static CRITICAL_SECTION barrier_init; /* mutex for initializing barriers */
85 /* spinlock for initializing the above mutexes */
86 static tMPI_Spinlock_t init_init = TMPI_SPINLOCK_INITIALIZER;
88 /* whether tMPI_Thread_create has initialized these mutexes */
89 static tMPI_Atomic_t init_inited = { 0 };
91 /* whether the main thread affinity has been set */
92 static tMPI_Spinlock_t main_thread_aff_lock = TMPI_SPINLOCK_INITIALIZER;
93 static tMPI_Atomic_t main_thread_aff_set = { 0 };
95 /* mutex for managing thread IDs */
96 static CRITICAL_SECTION thread_id_list_lock;
99 DWORD thread_id; /* the thread ID as returned by GetCurrentTreadID() */
100 struct tMPI_Thread* th; /* the associated tMPI thread structure */
102 /* the size of the thrread id list */
103 static int Nalloc_thread_id_list = 0;
104 /* the number of elements in the thread id list */
105 static int N_thread_id_list = 0;
106 /* the thread ID list */
107 static thread_id_list_t *thread_id_list;
111 /* data structure to keep track of thread key destructors. */
114 void (*destructor)(void*);
116 } thread_key_destructors;
118 static thread_key_destructors *destructors = NULL;
123 NUMA and Processor Group awareness support.
125 NUMA support is implemented to maximize the chance that memory access
126 patterns remain Local to the NUMA node.
127 NUMA node processor affinity is utilized to prevent scheduler associated
128 drift across NUMA nodes.
129 Process Group support is implemented to enable > 64 processors to be
130 utilized. This is only supported when building 64bit.
132 The high level approach is:
133 1. Build a description of CPU topology, including processor numbers, NUMA
134 node numbers, and affinity masks.
135 2. For processor intensive worker threads, create threads such that
136 the processor affinity and thread stack is kept local within a NUMA node.
137 3. Employ simple round-robin affinity and node assignment approach when
139 4. Use GetProcAddress() to obtain function pointers to functions that
140 are operating system version dependent, to allow maximum binary
143 Scott Field (sfield@microsoft.com) Jan-2011
148 PROCESSOR_NUMBER ProcessorNumber;
149 GROUP_AFFINITY GroupAffinity;
150 USHORT NumaNodeNumber;
151 } MPI_NUMA_PROCESSOR_INFO;
154 /* thread/processor index, to allow setting round-robin affinity. */
155 volatile ULONG g_ulThreadIndex;
156 /* a value of zero implies the system is not NUMA */
157 ULONG g_ulHighestNumaNodeNumber = 0;
158 /* total number of processors in g_MPI_ProcessInfo array */
159 ULONG g_ulTotalProcessors;
160 /* array describing available processors, affinity masks, and NUMA node */
161 MPI_NUMA_PROCESSOR_INFO *g_MPI_ProcessorInfo = NULL;
163 /* function prototypes and variables to support obtaining function addresses
164 dynamically -- supports down-level operating systems */
166 typedef BOOL (WINAPI *func_GetNumaHighestNodeNumber_t)( PULONG
168 typedef DWORD (WINAPI *func_SetThreadIdealProcessor_t)( HANDLE hThread,
169 DWORD dwIdealProcessor );
170 typedef BOOL (WINAPI *func_SetThreadGroupAffinity_t)( HANDLE hThread,
171 const GROUP_AFFINITY *GroupAffinity,
172 PGROUP_AFFINITY PreviousGroupAffinity );
173 typedef BOOL (WINAPI *func_SetThreadIdealProcessorEx_t)( HANDLE hThread,
174 PPROCESSOR_NUMBER lpIdealProcessor,
175 PPROCESSOR_NUMBER lpPreviousIdealProcessor );
176 typedef BOOL (WINAPI *func_GetNumaNodeProcessorMaskEx_t)( USHORT Node,
177 PGROUP_AFFINITY ProcessorMask );
178 typedef BOOL (WINAPI *func_GetNumaProcessorNodeEx_t)(
179 PPROCESSOR_NUMBER Processor,
180 PUSHORT NodeNumber );
181 typedef VOID (WINAPI *func_GetCurrentProcessorNumberEx_t)(
182 PPROCESSOR_NUMBER ProcNumber );
184 typedef HANDLE (WINAPI *func_CreateRemoteThreadEx_t)(
186 LPSECURITY_ATTRIBUTES lpThreadAttributes,
188 LPTHREAD_START_ROUTINE lpStartAddress,
190 DWORD dwCreationFlags,
191 LPPROC_THREAD_ATTRIBUTE_LIST lpAttributeList,
194 typedef BOOL (WINAPI *func_InitializeProcThreadAttributeList_t)(
195 LPPROC_THREAD_ATTRIBUTE_LIST lpAttributeList,
196 DWORD dwAttributeCount,
199 typedef BOOL (WINAPI *func_UpdateProcThreadAttribute_t)(
200 LPPROC_THREAD_ATTRIBUTE_LIST lpAttributeList,
205 PVOID lpPreviousValue,
206 PSIZE_T lpReturnSize);
207 typedef VOID (WINAPI *func_DeleteProcThreadAttributeList_t)(
208 LPPROC_THREAD_ATTRIBUTE_LIST lpAttributeList);
209 typedef DWORD (WINAPI *func_GetActiveProcessorCount_t)(WORD GroupNumber);
210 typedef WORD (WINAPI *func_GetActiveProcessorGroupCount_t)(void);
213 /* WinXP SP2, WinXP64, WinSrv 2003 */
214 func_GetNumaHighestNodeNumber_t func_GetNumaHighestNodeNumber;
215 func_SetThreadIdealProcessor_t func_SetThreadIdealProcessor;
216 /* Windows 7, WinSrv 2008R2 */
217 func_SetThreadGroupAffinity_t func_SetThreadGroupAffinity;
218 func_SetThreadIdealProcessorEx_t func_SetThreadIdealProcessorEx;
219 func_GetNumaNodeProcessorMaskEx_t func_GetNumaNodeProcessorMaskEx;
220 func_GetNumaProcessorNodeEx_t func_GetNumaProcessorNodeEx;
221 func_GetCurrentProcessorNumberEx_t func_GetCurrentProcessorNumberEx;
222 func_GetActiveProcessorCount_t func_GetActiveProcessorCount;
223 func_GetActiveProcessorGroupCount_t func_GetActiveProcessorGroupCount;
224 func_CreateRemoteThreadEx_t func_CreateRemoteThreadEx;
225 /* Windows Vista, WinSrv 2008 */
226 func_InitializeProcThreadAttributeList_t func_InitializeProcThreadAttributeList;
227 func_UpdateProcThreadAttribute_t func_UpdateProcThreadAttribute;
228 func_DeleteProcThreadAttributeList_t func_DeleteProcThreadAttributeList;
232 /* returns 0 on success.
233 Success is returned if the system is non-NUMA, OR the system doesn't
234 support appropriate NUMA APIs, OR the system is NUMA and we successfully
238 This can happen if an API returned an error, a memory allocation failed, or
239 we failed to initialize affinity mapping information.
241 int tMPI_Init_NUMA(void)
243 /* module handle to kernel32.dll -- we already reference it, so it's already loaded */
244 HMODULE hModKernel32 = NULL;
245 /* 0-based NUMA node count -- does not imply all nodes have available (eg: hot-plug) processors */
246 ULONG ulHighestNumaNodeNumber;
247 /* total number of processors available per affinity masks */
248 DWORD dwTotalProcessors = 0;
251 /* calling thread PROCESSOR_NUMBER */
252 PROCESSOR_NUMBER CurrentProcessorNumber;
253 /* calling thread GROUP_AFFINITY */
254 /*GROUP_AFFINITY CurrentThreadGroupAffinity; */
255 /* calling thread NUMA node */
256 /*USHORT CurrentNumaNodeNumber;*/
258 WORD wActiveGroupCount;
261 /* array of processor information structures */
262 MPI_NUMA_PROCESSOR_INFO *pMPI_ProcessorInfo = NULL;
264 /* assume an error condition */
267 hModKernel32 = GetModuleHandleA("kernel32.dll");
269 if (hModKernel32 == NULL)
274 /* obtain addresses of relevant NUMA functions, most of which are
275 Windows 7 / Windows Server 2008R2 only functions
276 this is done using GetProcAddress to enable the binary to run on older
280 func_GetNumaHighestNodeNumber = (func_GetNumaHighestNodeNumber_t) GetProcAddress( hModKernel32, "GetNumaHighestNodeNumber" );
281 func_SetThreadIdealProcessor = (func_SetThreadIdealProcessor_t) GetProcAddress( hModKernel32, "SetThreadIdealProcessor" );
283 if (func_GetNumaHighestNodeNumber == NULL)
288 /* determine if we're on a NUMA system and if so, determine the number of
291 if (!func_GetNumaHighestNodeNumber( &ulHighestNumaNodeNumber ))
298 func_SetThreadGroupAffinity = (func_SetThreadGroupAffinity_t)GetProcAddress( hModKernel32, "SetThreadGroupAffinity" );
299 func_SetThreadIdealProcessorEx = (func_SetThreadIdealProcessorEx_t)GetProcAddress( hModKernel32, "SetThreadIdealProcessorEx" );
300 func_CreateRemoteThreadEx = (func_CreateRemoteThreadEx_t)GetProcAddress( hModKernel32, "CreateRemoteThreadEx" );
301 func_GetNumaNodeProcessorMaskEx = (func_GetNumaNodeProcessorMaskEx_t)GetProcAddress( hModKernel32, "GetNumaNodeProcessorMaskEx" );
302 func_GetNumaProcessorNodeEx = (func_GetNumaProcessorNodeEx_t)GetProcAddress( hModKernel32, "GetNumaProcessorNodeEx" );
303 func_GetCurrentProcessorNumberEx = (func_GetCurrentProcessorNumberEx_t)GetProcAddress( hModKernel32, "GetCurrentProcessorNumberEx" );
304 func_GetActiveProcessorCount = (func_GetActiveProcessorCount_t)GetProcAddress( hModKernel32, "GetActiveProcessorCount" );
305 func_GetActiveProcessorGroupCount = (func_GetActiveProcessorGroupCount_t)GetProcAddress( hModKernel32, "GetActiveProcessorGroupCount" );
306 func_InitializeProcThreadAttributeList = (func_InitializeProcThreadAttributeList_t)GetProcAddress( hModKernel32, "InitializeProcThreadAttributeList" );
307 func_UpdateProcThreadAttribute = (func_UpdateProcThreadAttribute_t)GetProcAddress( hModKernel32, "UpdateProcThreadAttribute" );
308 func_DeleteProcThreadAttributeList = (func_DeleteProcThreadAttributeList_t)GetProcAddress( hModKernel32, "DeleteProcThreadAttributeList" );
310 if ( (func_SetThreadGroupAffinity == NULL) ||
311 (func_SetThreadIdealProcessorEx == NULL) ||
312 (func_CreateRemoteThreadEx == NULL) ||
313 (func_GetNumaNodeProcessorMaskEx == NULL) ||
314 (func_GetNumaProcessorNodeEx == NULL) ||
315 (func_GetCurrentProcessorNumberEx == NULL) ||
316 (func_GetActiveProcessorCount == NULL) ||
317 (func_GetActiveProcessorGroupCount == NULL) ||
318 (func_InitializeProcThreadAttributeList == NULL) ||
319 (func_UpdateProcThreadAttribute == NULL) ||
320 (func_DeleteProcThreadAttributeList == NULL) )
322 /* if any addresses couldn't be located, assume NUMA functionality
327 if (ulHighestNumaNodeNumber == 0)
329 /* system is not NUMA */
334 /* count the active processors across the groups */
336 func_GetCurrentProcessorNumberEx(&CurrentProcessorNumber);
338 wActiveGroupCount = func_GetActiveProcessorGroupCount();
340 dwTotalProcessors = func_GetActiveProcessorCount( ALL_PROCESSOR_GROUPS );
342 #if !((defined WIN64 || defined _WIN64))
343 /* WOW64 doesn't allow setting the affinity correctly beyond 32
344 processors -- the KAFFINITY mask is only 32 bits wide
345 This check is only here for completeness -- large systems should be
346 running 64bit Gromacs code, where the processor quantity is not
348 By failing here, the WOW64 32bit client will use normal CreateThread(),
349 which can schedule up to 64 un-affinitized threads
352 if (dwTotalProcessors > 32)
358 /* allocate array of processor info blocks */
360 pMPI_ProcessorInfo = malloc( sizeof(MPI_NUMA_PROCESSOR_INFO) *
362 if (pMPI_ProcessorInfo == NULL)
367 /* zero fill to cover reserved must be-zero fields */
368 memset(pMPI_ProcessorInfo, 0, sizeof(MPI_NUMA_PROCESSOR_INFO) * dwTotalProcessors);
370 /* loop through each processor group, and for each group, capture the
371 processor numbers and NUMA node information. */
373 for (GroupIndex = 0; GroupIndex < wActiveGroupCount; GroupIndex++)
375 DWORD dwGroupProcessorCount;
378 dwGroupProcessorCount = func_GetActiveProcessorCount( GroupIndex );
380 for (ProcessorIndex = 0; ProcessorIndex < dwGroupProcessorCount;
383 PROCESSOR_NUMBER *pProcessorNumber = &(pMPI_ProcessorInfo[i].ProcessorNumber);
384 GROUP_AFFINITY *pGroupAffinity = &(pMPI_ProcessorInfo[i].GroupAffinity);
385 USHORT *pNodeNumber = &(pMPI_ProcessorInfo[i].NumaNodeNumber);
387 pProcessorNumber->Group = GroupIndex;
388 pProcessorNumber->Number = ProcessorIndex;
390 /* save an index to the processor array entry for the current processor
391 this is used to enable subsequent threads to be created in a round
392 robin fashion starting at the next array entry
395 if ( (CurrentProcessorNumber.Group == pProcessorNumber->Group ) &&
396 (CurrentProcessorNumber.Number == pProcessorNumber->Number) )
398 /* set global: current thread index into processor array */
402 /* capture the node number and group affinity associated with processor entry
403 any failures here are assumed to be catastrophic and disable
404 the group & NUMA aware thread support
407 if (!func_GetNumaProcessorNodeEx(pProcessorNumber, pNodeNumber))
412 if (!func_GetNumaNodeProcessorMaskEx(*pNodeNumber, pGroupAffinity))
417 /* future enhancement: construct GroupAffinity (single) processor
418 mask within NUMA node for this processor entry */
420 /* increment processor array index */
423 /* sanity check, should never happen */
425 if (i > dwTotalProcessors)
433 /* capture number of processors, highest NUMA node number, and processor
435 g_ulTotalProcessors = dwTotalProcessors;
436 g_ulHighestNumaNodeNumber = ulHighestNumaNodeNumber;
437 g_MPI_ProcessorInfo = pMPI_ProcessorInfo;
445 if (pMPI_ProcessorInfo)
447 tMPI_Free( pMPI_ProcessorInfo );
454 static int tMPI_Thread_id_list_init(void)
458 EnterCriticalSection( &thread_id_list_lock );
460 N_thread_id_list = 0;
461 Nalloc_thread_id_list = 4; /* number of initial allocation*/
462 thread_id_list = (thread_id_list_t*)malloc(sizeof(thread_id_list_t)*
463 Nalloc_thread_id_list);
464 if (thread_id_list == NULL)
469 LeaveCriticalSection( &thread_id_list_lock );
474 /* add an entry to the thread ID list, assuming it's locked */
475 static int tMPI_Thread_id_list_add_locked(DWORD thread_id,
476 struct tMPI_Thread *th)
478 if (Nalloc_thread_id_list < N_thread_id_list + 1)
480 thread_id_list_t* new_list;
483 /* double the size */
484 Nalloc_thread_id_list *= 2;
485 /* and allocate the new list */
486 new_list = (thread_id_list_t*)malloc(sizeof(thread_id_list_t)*
487 Nalloc_thread_id_list);
488 if (new_list == NULL)
492 /* and copy over all elements */
493 for (i = 0; i < N_thread_id_list; i++)
495 new_list[i] = thread_id_list[i];
497 /* free the old list */
498 tMPI_Free(thread_id_list);
499 thread_id_list = new_list;
501 thread_id_list[ N_thread_id_list ].thread_id = thread_id;
502 thread_id_list[ N_thread_id_list ].th = th;
509 /* add an entry to the thread ID list */
510 static int tMPI_Thread_id_list_add(DWORD thread_id, struct tMPI_Thread *th)
513 EnterCriticalSection( &thread_id_list_lock );
514 ret = tMPI_Thread_id_list_add_locked(thread_id, th);
515 LeaveCriticalSection( &thread_id_list_lock );
519 /* Remove an entry from the thread_id list, assuming it's locked.
520 Does nothing if an entry is not found.*/
521 static void tMPI_Thread_id_list_remove_locked(DWORD thread_id)
524 tmpi_bool found = FALSE;
526 /* move the last thread_id_list item to the one we want to remove */
527 for (i = 0; i < N_thread_id_list; i++)
529 if (thread_id_list[i].thread_id == thread_id)
531 thread_id_list[i] = thread_id_list[N_thread_id_list - 1];
544 /* Remove an entry from the thread_id list */
545 static void tMPI_Thread_id_list_remove(DWORD thread_id)
548 EnterCriticalSection( &thread_id_list_lock );
549 tMPI_Thread_id_list_remove_locked(thread_id);
550 LeaveCriticalSection( &thread_id_list_lock );
555 /* try to find a thread id in the thread id list. Return NULL when there is no
556 such thread id in the list. Assumes the list is locked.*/
557 static struct tMPI_Thread *tMPI_Thread_id_list_find_locked(DWORD thread_id)
560 struct tMPI_Thread *ret = NULL;
562 /* this is a linear search but it's only O(Nthreads). */
563 for (i = 0; i < N_thread_id_list; i++)
565 if (thread_id_list[i].thread_id == thread_id)
567 ret = thread_id_list[i].th;
575 /* try to find a thread id in the thread id list. Return NULL when there is no
576 such thread id in the list.*/
577 static struct tMPI_Thread *tMPI_Thread_id_list_find(DWORD thread_id)
579 struct tMPI_Thread *ret = NULL;
581 EnterCriticalSection( &thread_id_list_lock );
582 ret = tMPI_Thread_id_list_find_locked(thread_id);
583 LeaveCriticalSection( &thread_id_list_lock );
587 /* try to add the running thread to the list. Returns the tMPI_Thrread struct
588 associated with this thread, or NULL in case of an error.*/
589 static struct tMPI_Thread *tMPI_Thread_id_list_add_self(void)
592 struct tMPI_Thread *th = NULL;
595 EnterCriticalSection( &thread_id_list_lock );
597 thread_id = GetCurrentThreadId();
598 th = tMPI_Thread_id_list_find_locked(thread_id);
601 /* if not, create an ID, set it and return it */
602 th = (struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
604 /* to create a handle that can be used outside of the current
605 thread, the handle from GetCurrentThread() must first
607 DuplicateHandle(GetCurrentProcess(),
613 DUPLICATE_SAME_ACCESS);
615 /* This causes a small memory leak that is hard to fix. */
616 th->started_by_tmpi = 0;
617 ret = tMPI_Thread_id_list_add_locked(thread_id, th);
624 LeaveCriticalSection( &thread_id_list_lock );
629 static int tMPI_Init_initers(void)
634 /* we can pre-check because it's atomic */
635 if (tMPI_Atomic_get(&init_inited) == 0)
637 /* this can be a spinlock because the chances of collision are low. */
638 tMPI_Spinlock_lock( &init_init );
640 state = tMPI_Atomic_get(&init_inited);
641 tMPI_Atomic_memory_barrier_acq();
644 InitializeCriticalSection(&mutex_init);
645 InitializeCriticalSection(&once_init);
646 InitializeCriticalSection(&cond_init);
647 InitializeCriticalSection(&barrier_init);
648 InitializeCriticalSection(&thread_id_list_lock);
650 ret = tMPI_Init_NUMA();
657 ret = tMPI_Thread_id_list_init();
663 tMPI_Atomic_memory_barrier_rel();
664 tMPI_Atomic_set(&init_inited, 1);
667 tMPI_Spinlock_unlock( &init_init );
671 tMPI_Spinlock_unlock( &init_init );
677 enum tMPI_Thread_support tMPI_Thread_support(void)
679 return TMPI_THREAD_SUPPORT_YES;
682 struct tMPI_Thread_starter_param
684 void *(*start_routine)(void*); /* the function */
685 void *param; /* its parameter */
686 struct tMPI_Thread *thread;
689 static DWORD WINAPI tMPI_Win32_thread_starter( LPVOID lpParam )
691 struct tMPI_Thread_starter_param *prm =
692 (struct tMPI_Thread_starter_param*)lpParam;
694 (prm->start_routine)(prm->param);
699 int tMPI_Thread_get_hw_number(void)
704 GetSystemInfo( &sysinfo );
706 ret = sysinfo.dwNumberOfProcessors;
713 int tMPI_Thread_create(tMPI_Thread_t *thread,
714 void *(*start_routine)(void *), void *arg)
717 struct tMPI_Thread_starter_param *prm;
720 ret = tMPI_Init_initers();
731 /* a small memory leak to be sure that it doesn't get deallocated
732 once this function ends, before the newly created thread uses it. */
733 prm = (struct tMPI_Thread_starter_param*)
734 malloc(sizeof(struct tMPI_Thread_starter_param));
739 prm->start_routine = start_routine;
742 *thread = (struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
749 /* this must be locked before the thread is created to prevent a race
750 condition if the thread immediately wants to create its own entry */
751 EnterCriticalSection( &thread_id_list_lock );
752 /* just create a plain thread. */
753 (*thread)->started_by_tmpi = 1;
754 (*thread)->th = CreateThread(NULL,
756 tMPI_Win32_thread_starter,
760 if ((*thread)->th == NULL)
765 (*thread)->id = thread_id;
767 if ((*thread)->th == NULL)
772 ret = tMPI_Thread_id_list_add_locked(thread_id, (*thread));
777 LeaveCriticalSection( &thread_id_list_lock );
780 /* inherit the thread priority from the parent thread. */
781 /* TODO: is there value in setting this, vs. just allowing it to default
782 from the process? currently, this limits the effectivenes of changing
783 the priority in eg: TaskManager. */
784 SetThreadPriority(((*thread)->th), GetThreadPriority(GetCurrentThread()));
791 LeaveCriticalSection( &thread_id_list_lock );
801 int tMPI_Thread_join(tMPI_Thread_t thread, void **value_ptr)
805 ret = WaitForSingleObject(thread->th, INFINITE);
813 if (!GetExitCodeThread(thread, &retval))
818 CloseHandle(thread->th);
819 tMPI_Thread_id_list_remove(thread->id);
826 void tMPI_Thread_exit(void *value_ptr)
828 /* TODO: call destructors for thread-local storage */
835 int tMPI_Thread_cancel(tMPI_Thread_t thread)
837 if (!TerminateThread( thread, -1) )
841 tMPI_Thread_id_list_remove(thread->id);
846 tMPI_Thread_t tMPI_Thread_self(void)
851 ret = tMPI_Init_initers();
857 th = tMPI_Thread_id_list_add_self();
862 int tMPI_Thread_equal(tMPI_Thread_t t1, tMPI_Thread_t t2)
864 /* because the tMPI thread IDs are unique, we can compare them directly */
868 enum tMPI_Thread_setaffinity_support tMPI_Thread_setaffinity_support(void)
870 /* Windows supports seting of thread affinities */
871 return TMPI_SETAFFINITY_SUPPORT_YES;
874 int tMPI_Thread_setaffinity_single(tMPI_Thread_t thread, unsigned int nr)
876 GROUP_AFFINITY GroupAffinity;
877 PROCESSOR_NUMBER IdealProcessorNumber;
878 /* thread NUMA node */
879 USHORT NumaNodeNumber;
881 /* check for a processor info array. This exists if NUMA
882 style calls have been succesfully initialized. */
883 if (g_MPI_ProcessorInfo != NULL)
886 /*func_GetCurrentProcessorNumberEx(&CurrentProcessorNumber);*/
888 memcpy(&GroupAffinity,
889 &(g_MPI_ProcessorInfo[nr].GroupAffinity),
890 sizeof(GROUP_AFFINITY));
892 /* group, processor number */
894 memcpy(&IdealProcessorNumber,
895 &(g_MPI_ProcessorInfo[nr].ProcessorNumber),
896 sizeof(PROCESSOR_NUMBER));
899 /* set the NUMA node affinity for the current thread
900 failures to set the current thread affinity are ignored,
901 as a fringe case can arise on >32 processor systems with a 32bit
904 func_SetThreadIdealProcessorEx(thread->th,
905 &IdealProcessorNumber,
908 if (func_GetNumaProcessorNodeEx(&IdealProcessorNumber,
911 /* for the NUMA node number associated with the current processor
912 number, get the group affinity mask */
913 if (func_GetNumaNodeProcessorMaskEx(NumaNodeNumber,
916 /* set the current thread affinity to prevent it from running
917 on other NUMA nodes */
918 func_SetThreadGroupAffinity(thread->th,
928 /* No NUMA-style calls. We just do a simpler thing. */
929 if ( (func_SetThreadIdealProcessor != NULL) )
931 return (func_SetThreadIdealProcessor(thread->th, nr) == -1);
939 int tMPI_Thread_mutex_init(tMPI_Thread_mutex_t *mtx)
946 mtx->mutex = (struct tMPI_Mutex*)malloc(sizeof(struct tMPI_Mutex)*1);
947 if (mtx->mutex == NULL)
951 InitializeCriticalSection(&(mtx->mutex->cs));
957 int tMPI_Thread_mutex_destroy(tMPI_Thread_mutex_t *mtx)
964 DeleteCriticalSection(&(mtx->mutex->cs));
973 static int tMPI_Thread_mutex_init_once(tMPI_Thread_mutex_t *mtx)
977 /* This is essentially a copy of the code from the one-time
978 * initialization, but with a call to the mutex init routine instead.
979 * It might seem like overkill, but it will only be executed the first
980 * time you call a static mutex, and it is important to get all the
981 * memory barriers right. Trust me, you don't want a deadlock here...
984 /* initialize the initializers */
985 ret = tMPI_Init_initers();
990 /* Lock the common one-time init mutex so we can check carefully */
991 EnterCriticalSection( &mutex_init );
993 /* Do the actual (locked) check - system mutex is locked if we get here */
994 if (mtx->mutex == NULL)
996 /* No need to keep the lock during execution -
997 * Only one thread can do it anyway.
999 ret = tMPI_Thread_mutex_init(mtx);
1001 LeaveCriticalSection( &mutex_init );
1008 int tMPI_Thread_mutex_lock(tMPI_Thread_mutex_t *mtx)
1010 /* check whether the mutex is initialized */
1011 if (tMPI_Atomic_get( &(mtx->initialized) ) == 0)
1013 tMPI_Thread_mutex_init_once(mtx);
1016 /* The mutex is now guaranteed to be valid. */
1017 EnterCriticalSection( &(mtx->mutex->cs) );
1025 int tMPI_Thread_mutex_trylock(tMPI_Thread_mutex_t *mtx)
1029 /* check whether the mutex is initialized */
1030 if (tMPI_Atomic_get( &(mtx->initialized) ) == 0)
1032 tMPI_Thread_mutex_init_once(mtx);
1035 /* The mutex is now guaranteed to be valid. */
1036 ret = TryEnterCriticalSection( &(mtx->mutex->cs) );
1043 int tMPI_Thread_mutex_unlock(tMPI_Thread_mutex_t *mtx)
1045 /* we should have initialized our critical section anyway */
1046 LeaveCriticalSection( &(mtx->mutex->cs) );
1053 int tMPI_Thread_key_create(tMPI_Thread_key_t *key, void (*destructor)(void *))
1061 /* TODO: make list of destructors for thread-local storage */
1062 key->key = (struct tMPI_Thread_key*)malloc(sizeof(struct tMPI_Thread_key));
1063 if (key->key == NULL)
1068 (key)->key->wkey = TlsAlloc();
1070 if ( (key)->key->wkey == TLS_OUT_OF_INDEXES)
1079 int tMPI_Thread_key_delete(tMPI_Thread_key_t key)
1081 TlsFree(key.key->wkey);
1089 void * tMPI_Thread_getspecific(tMPI_Thread_key_t key)
1093 p = TlsGetValue(key.key->wkey);
1099 int tMPI_Thread_setspecific(tMPI_Thread_key_t key, void *value)
1103 ret = TlsSetValue(key.key->wkey, value);
1109 /* use once Vista is minimum required version */
1110 static BOOL CALLBACK InitHandleWrapperFunction(PINIT_ONCE InitOnce,
1114 void (*fn)(void) = (void (*)(void))Parameter;
1121 CRITICAL_SECTION tMPI_Once_cs;
1122 tMPI_Spinlock_t tMPI_Once_cs_lock = TMPI_SPINLOCK_INITIALIZER;
1123 volatile int tMPI_Once_init = 0;
1126 int tMPI_Thread_once(tMPI_Thread_once_t *once_control,
1127 void (*init_routine)(void))
1130 /* use once Vista is minimum required version */
1132 bStatus = InitOnceExecuteOnce(once_control, InitHandleWrapperFunction,
1133 init_routine, NULL);
1142 /* really ugly hack - and it's slow... */
1143 ret = tMPI_Init_initers();
1149 EnterCriticalSection(&once_init);
1150 if (tMPI_Atomic_get(&(once_control->once)) == 0)
1153 tMPI_Atomic_set(&(once_control->once), 1);
1155 LeaveCriticalSection(&once_init);
1164 int tMPI_Thread_cond_init(tMPI_Thread_cond_t *cond)
1171 cond->condp = (struct tMPI_Thread_cond*)
1172 malloc(sizeof(struct tMPI_Thread_cond));
1173 if (cond->condp == NULL)
1178 /* use this code once Vista is the minimum version required */
1179 InitializeConditionVariable( &(cond->cv) );
1181 cond->condp->Nwaiters = 0;
1182 InitializeCriticalSection(&(cond->condp->wtr_lock));
1183 cond->condp->Nrelease = 0;
1184 cond->condp->cycle = 0;
1185 /* a manual reset, unsignalled event */
1186 cond->condp->ev = CreateEvent(NULL, TRUE, FALSE, NULL);
1192 int tMPI_Thread_cond_destroy(tMPI_Thread_cond_t *cond)
1195 /* use this code once Vista is the minimum version required */
1196 /* windows doesnt have this function */
1198 DeleteCriticalSection(&(cond->condp->wtr_lock));
1206 /*! \brief Static init routine for pthread barrier
1210 * This is only used as a wrapper to enable static initialization
1211 * of posix thread types together with out abstraction layer for tMPI_Thread.h
1213 * \param cond Condition variable, must be statically initialized
1215 * \return status - 0 on success, or a standard error code.
1217 static int tMPI_Thread_cond_init_once(tMPI_Thread_cond_t *cond)
1221 /* This is essentially a copy of the code from the one-time
1222 * initialization, but with a call to the cond init routine instead.
1223 * It might seem like overkill, but it will only be executed the first
1224 * time you call a static condition variable, and it is important to get
1225 * the memory barriers right. Trust me, you don't want a deadlock here...
1228 /* initialize the initializers */
1229 ret = tMPI_Init_initers();
1234 /* Lock the common one-time init mutex so we can check carefully */
1235 EnterCriticalSection( &cond_init );
1237 /* Do the actual (locked) check - system mutex is locked if we get here */
1238 if (cond->condp == NULL)
1240 /* No need to keep the lock during execution -
1241 * Only one thread can do it anyway. */
1242 ret = tMPI_Thread_cond_init(cond);
1244 LeaveCriticalSection( &cond_init );
1252 int tMPI_Thread_cond_wait(tMPI_Thread_cond_t *cond, tMPI_Thread_mutex_t *mtx)
1254 BOOL wait_done = FALSE;
1255 BOOL last_waiter = FALSE;
1259 /* check whether the condition is initialized */
1260 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
1262 ret = tMPI_Thread_cond_init_once(cond);
1268 /* the mutex must have been initialized because it should be locked here */
1271 /* use this code once Vista is the minimum version required */
1272 ret = SleepConditionVariableCS (&(cond->cv), &(mtx->cs), INFINITE);
1279 /* serially increase waiter count */
1280 EnterCriticalSection(&(cond->condp->wtr_lock));
1281 cond->condp->Nwaiters++;
1282 my_cycle = cond->condp->cycle;
1283 LeaveCriticalSection(&(cond->condp->wtr_lock));
1285 /* now it's safe to release the mutex from the fn call */
1286 LeaveCriticalSection(&(mtx->mutex->cs));
1288 /* Loop a wait until we found out we've waited for the right event.
1289 Note that this loop is potentially a busy-wait loop in bad
1290 circumstances (higher priority threads, for example). */
1293 /* do the actual waiting */
1294 if (WaitForSingleObject( cond->condp->ev, INFINITE ) == WAIT_FAILED)
1299 /* serially check whether we got the right event. */
1300 EnterCriticalSection(&(cond->condp->wtr_lock));
1301 wait_done = (cond->condp->Nrelease > 0) &&
1302 (cond->condp->cycle != my_cycle);
1303 LeaveCriticalSection(&(cond->condp->wtr_lock));
1307 /* We obtain the mutex from the function call */
1308 EnterCriticalSection(&(mtx->mutex->cs));
1310 /* we serially decrease the waiter count and release count */
1311 EnterCriticalSection(&(cond->condp->wtr_lock));
1312 cond->condp->Nwaiters--;
1313 cond->condp->Nrelease--;
1314 last_waiter = (cond->condp->Nrelease == 0);
1315 LeaveCriticalSection(&(cond->condp->wtr_lock));
1317 /* manually release the event if everybody's done with it */
1320 if (!ResetEvent( cond->condp->ev ))
1333 int tMPI_Thread_cond_signal(tMPI_Thread_cond_t *cond)
1336 /* check whether the condition is initialized */
1337 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
1339 ret = tMPI_Thread_cond_init_once(cond);
1345 /* The condition variable is now guaranteed to be valid. */
1347 /* use this code once Vista is the minimum version required */
1348 WakeConditionVariable( &(cond->cv) );
1350 EnterCriticalSection(&(cond->condp->wtr_lock));
1351 /* check if we're not still busy with a release. If we are, do nothing. */
1352 if (cond->condp->Nwaiters > cond->condp->Nrelease)
1354 cond->condp->Nrelease++;
1355 cond->condp->cycle++;
1356 if (!SetEvent(cond->condp->ev)) /* actually release the
1362 LeaveCriticalSection(&(cond->condp->wtr_lock));
1370 int tMPI_Thread_cond_broadcast(tMPI_Thread_cond_t *cond)
1373 /* check whether the condition is initialized */
1374 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
1376 ret = tMPI_Thread_cond_init_once(cond);
1383 /* The condition variable is now guaranteed to be valid. */
1385 /* use this code once Vista is the minimum version required */
1386 WakeAllConditionVariable( &(cond->cv) );
1388 EnterCriticalSection(&(cond->condp->wtr_lock));
1389 /* check whether there are any waiters */
1390 if (cond->condp->Nwaiters > 0)
1392 cond->condp->Nrelease = cond->condp->Nwaiters;
1393 cond->condp->cycle++;
1394 if (!SetEvent(cond->condp->ev)) /* actually release the
1400 LeaveCriticalSection(&(cond->condp->wtr_lock));
1408 int tMPI_Thread_barrier_init(tMPI_Thread_barrier_t *barrier, int n)
1412 if (barrier == NULL)
1417 barrier->barrierp = (struct tMPI_Thread_barrier*)
1418 malloc(sizeof(struct tMPI_Thread_barrier)*1);
1419 if (barrier->barrierp == NULL)
1425 /* use this once Vista is the oldest supported windows version: */
1426 InitializeCriticalSection(&(barrier->barrierp->cs));
1427 InitializeConditionVariable(&(barrier->barrierp->cv));
1429 ret = tMPI_Thread_mutex_init(&(barrier->barrierp->cs));
1434 ret = tMPI_Thread_cond_init(&(barrier->barrierp->cv));
1441 barrier->threshold = n;
1450 int tMPI_Thread_barrier_destroy(tMPI_Thread_barrier_t *barrier)
1454 if (barrier == NULL)
1460 DeleteCriticalSection(&(barrier->barrierp->cs));
1462 ret = tMPI_Thread_mutex_destroy(&(barrier->barrierp->cs));
1469 ret = tMPI_Thread_cond_destroy(&(barrier->barrierp->cv));
1475 free(barrier->barrierp);
1482 /*! \brief Static init routine for pthread barrier
1486 * This is only used as a wrapper to enable static initialization
1487 * of posix thread types together with out abstraction layer for tMPI_Thread.h
1489 * \param barrier Statically initialized barrier type
1490 * \param n Number of members in barrier
1492 * \return status - 0 on success, or a standard error code.
1494 static int tMPI_Thread_barrier_init_once(tMPI_Thread_barrier_t *barrier, int n)
1498 /* This is essentially a copy of the code from the one-time
1499 * initialization, but with a call to the cond init routine instead.
1500 * It might seem like overkill, but it will only be executed the first
1501 * time you call a static condition variable, and it is important to get
1502 * the memory barriers right. Trust me, you don't want a deadlock here...
1506 /* initialize the initializers */
1507 ret = tMPI_Init_initers();
1513 /* Lock the common one-time init mutex so we can check carefully */
1514 EnterCriticalSection( &barrier_init );
1516 /* Do the actual (locked) check - system mutex is locked if we get here */
1517 if (barrier->barrierp == NULL)
1519 /* No need to keep the lock during execution -
1520 * Only one thread can do it anyway. */
1521 ret = tMPI_Thread_barrier_init(barrier, n);
1523 LeaveCriticalSection( &barrier_init );
1530 int tMPI_Thread_barrier_wait(tMPI_Thread_barrier_t *barrier)
1535 /*tMPI_Thread_pthread_barrier_t *p;*/
1537 /* check whether the barrier is initialized */
1538 if (tMPI_Atomic_get( &(barrier->initialized) ) == 0)
1540 ret = tMPI_Thread_barrier_init_once(barrier, barrier->threshold);
1547 EnterCriticalSection( &(barrier->barrierp->cs) );
1549 ret = tMPI_Thread_mutex_lock( &(barrier->barrierp->cs) );
1558 cycle = barrier->cycle;
1560 /* Decrement the count atomically and check if it is zero.
1561 * This will only be true for the last thread calling us.
1563 if (--(barrier->count) <= 0)
1565 barrier->cycle = !barrier->cycle;
1566 barrier->count = barrier->threshold;
1568 WakeAllConditionVariable( &(barrier->barrierp->cv) );
1570 ret = tMPI_Thread_cond_broadcast( &(barrier->barrierp->cv) );
1579 while (cycle == barrier->cycle)
1582 rc = SleepConditionVariableCS (&(barrier->barrierp->cv),
1583 &(barrier->barrierp->cs),
1591 rc = tMPI_Thread_cond_wait(&barrier->barrierp->cv,
1592 &barrier->barrierp->cs);
1601 LeaveCriticalSection( &(barrier->barrierp->cs) );
1603 tMPI_Thread_mutex_unlock( &(barrier->barrierp->cs) );
1610 /* just to have some symbols */
1611 int tMPI_Thread_winthreads = 0;
1613 #endif /* THREAD_WINDOWS */