2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
40 /* Include the defines that determine which thread library to use.
41 * We do not use HAVE_PTHREAD_H directly, since we might want to
42 * turn off thread support explicity (e.g. for debugging).
45 #ifdef HAVE_TMPI_CONFIG_H
46 #include "tmpi_config.h"
56 /* the win32 header */
58 /* Couple of types (e.g. PROCESSOR_NUMBER) are only available since
59 * WinServer2008 (0x600) and Windows7 (0x601). MingW doesn't have
60 * it defined for 0x600 in the headers */
61 #define _WIN32_WINNT 0x0601
72 #include "thread_mpi/atomic.h"
73 #include "thread_mpi/threads.h"
77 #include "winthreads.h"
79 /*! \brief System mutex for all one-time initialization
81 * This static variable is necessary in order to make the header file
82 * independent of the thread library implementation. Anyway, it
83 * will only be locked a handful of times at the start of program execution.
86 static CRITICAL_SECTION mutex_init; /* mutex for initializing mutexes */
87 static CRITICAL_SECTION once_init; /* mutex for initializing barriers */
88 static CRITICAL_SECTION cond_init; /* mutex for initializing thread_conds */
89 static CRITICAL_SECTION barrier_init; /* mutex for initializing barriers */
92 /* spinlock for initializing the above mutexes */
93 static tMPI_Spinlock_t init_init = TMPI_SPINLOCK_INITIALIZER;
95 /* whether tMPI_Thread_create has initialized these mutexes */
96 static tMPI_Atomic_t init_inited = { 0 };
98 /* whether the main thread affinity has been set */
99 static tMPI_Spinlock_t main_thread_aff_lock = TMPI_SPINLOCK_INITIALIZER;
100 static tMPI_Atomic_t main_thread_aff_set = { 0 };
102 /* mutex for managing thread IDs */
103 static CRITICAL_SECTION thread_id_list_lock;
106 DWORD thread_id; /* the thread ID as returned by GetCurrentTreadID() */
107 struct tMPI_Thread* th; /* the associated tMPI thread structure */
109 /* the size of the thrread id list */
110 static int Nalloc_thread_id_list = 0;
111 /* the number of elements in the thread id list */
112 static int N_thread_id_list = 0;
113 /* the thread ID list */
114 static thread_id_list_t *thread_id_list;
118 /* data structure to keep track of thread key destructors. */
121 void (*destructor)(void*);
123 } thread_key_destructors;
125 static thread_key_destructors *destructors = NULL;
130 NUMA and Processor Group awareness support.
132 NUMA support is implemented to maximize the chance that memory access
133 patterns remain Local to the NUMA node.
134 NUMA node processor affinity is utilized to prevent scheduler associated
135 drift across NUMA nodes.
136 Process Group support is implemented to enable > 64 processors to be
137 utilized. This is only supported when building 64bit.
139 The high level approach is:
140 1. Build a description of CPU topology, including processor numbers, NUMA
141 node numbers, and affinity masks.
142 2. For processor intensive worker threads, create threads such that
143 the processor affinity and thread stack is kept local within a NUMA node.
144 3. Employ simple round-robin affinity and node assignment approach when
146 4. Use GetProcAddress() to obtain function pointers to functions that
147 are operating system version dependent, to allow maximum binary
150 Scott Field (sfield@microsoft.com) Jan-2011
155 PROCESSOR_NUMBER ProcessorNumber;
156 GROUP_AFFINITY GroupAffinity;
157 USHORT NumaNodeNumber;
158 } MPI_NUMA_PROCESSOR_INFO;
161 /* thread/processor index, to allow setting round-robin affinity. */
162 volatile ULONG g_ulThreadIndex;
163 /* a value of zero implies the system is not NUMA */
164 ULONG g_ulHighestNumaNodeNumber = 0;
165 /* total number of processors in g_MPI_ProcessInfo array */
166 ULONG g_ulTotalProcessors;
167 /* array describing available processors, affinity masks, and NUMA node */
168 MPI_NUMA_PROCESSOR_INFO *g_MPI_ProcessorInfo = NULL;
170 /* function prototypes and variables to support obtaining function addresses
171 dynamically -- supports down-level operating systems */
173 typedef BOOL (WINAPI *func_GetNumaHighestNodeNumber_t)( PULONG
175 typedef DWORD (WINAPI *func_SetThreadIdealProcessor_t)( HANDLE hThread,
176 DWORD dwIdealProcessor );
177 typedef BOOL (WINAPI *func_SetThreadGroupAffinity_t)( HANDLE hThread,
178 const GROUP_AFFINITY *GroupAffinity,
179 PGROUP_AFFINITY PreviousGroupAffinity );
180 typedef BOOL (WINAPI *func_SetThreadIdealProcessorEx_t)( HANDLE hThread,
181 PPROCESSOR_NUMBER lpIdealProcessor,
182 PPROCESSOR_NUMBER lpPreviousIdealProcessor );
183 typedef BOOL (WINAPI *func_GetNumaNodeProcessorMaskEx_t)( USHORT Node,
184 PGROUP_AFFINITY ProcessorMask );
185 typedef BOOL (WINAPI *func_GetNumaProcessorNodeEx_t)(
186 PPROCESSOR_NUMBER Processor,
187 PUSHORT NodeNumber );
188 typedef VOID (WINAPI *func_GetCurrentProcessorNumberEx_t)(
189 PPROCESSOR_NUMBER ProcNumber );
191 typedef HANDLE (WINAPI *func_CreateRemoteThreadEx_t)(
193 LPSECURITY_ATTRIBUTES lpThreadAttributes,
195 LPTHREAD_START_ROUTINE lpStartAddress,
197 DWORD dwCreationFlags,
198 LPPROC_THREAD_ATTRIBUTE_LIST lpAttributeList,
201 typedef BOOL (WINAPI *func_InitializeProcThreadAttributeList_t)(
202 LPPROC_THREAD_ATTRIBUTE_LIST lpAttributeList,
203 DWORD dwAttributeCount,
206 typedef BOOL (WINAPI *func_UpdateProcThreadAttribute_t)(
207 LPPROC_THREAD_ATTRIBUTE_LIST lpAttributeList,
212 PVOID lpPreviousValue,
213 PSIZE_T lpReturnSize);
214 typedef VOID (WINAPI *func_DeleteProcThreadAttributeList_t)(
215 LPPROC_THREAD_ATTRIBUTE_LIST lpAttributeList);
216 typedef DWORD (WINAPI *func_GetActiveProcessorCount_t)(WORD GroupNumber);
217 typedef WORD (WINAPI *func_GetActiveProcessorGroupCount_t)(void);
220 /* WinXP SP2, WinXP64, WinSrv 2003 */
221 func_GetNumaHighestNodeNumber_t func_GetNumaHighestNodeNumber;
222 func_SetThreadIdealProcessor_t func_SetThreadIdealProcessor;
223 /* Windows 7, WinSrv 2008R2 */
224 func_SetThreadGroupAffinity_t func_SetThreadGroupAffinity;
225 func_SetThreadIdealProcessorEx_t func_SetThreadIdealProcessorEx;
226 func_GetNumaNodeProcessorMaskEx_t func_GetNumaNodeProcessorMaskEx;
227 func_GetNumaProcessorNodeEx_t func_GetNumaProcessorNodeEx;
228 func_GetCurrentProcessorNumberEx_t func_GetCurrentProcessorNumberEx;
229 func_GetActiveProcessorCount_t func_GetActiveProcessorCount;
230 func_GetActiveProcessorGroupCount_t func_GetActiveProcessorGroupCount;
231 func_CreateRemoteThreadEx_t func_CreateRemoteThreadEx;
232 /* Windows Vista, WinSrv 2008 */
233 func_InitializeProcThreadAttributeList_t func_InitializeProcThreadAttributeList;
234 func_UpdateProcThreadAttribute_t func_UpdateProcThreadAttribute;
235 func_DeleteProcThreadAttributeList_t func_DeleteProcThreadAttributeList;
239 /* returns 0 on success.
240 Success is returned if the system is non-NUMA, OR the system doesn't
241 support appropriate NUMA APIs, OR the system is NUMA and we successfully
245 This can happen if an API returned an error, a memory allocation failed, or
246 we failed to initialize affinity mapping information.
248 int tMPI_Init_NUMA(void)
250 /* module handle to kernel32.dll -- we already reference it, so it's already loaded */
251 HMODULE hModKernel32 = NULL;
252 /* 0-based NUMA node count -- does not imply all nodes have available (eg: hot-plug) processors */
253 ULONG ulHighestNumaNodeNumber;
254 /* total number of processors available per affinity masks */
255 DWORD dwTotalProcessors = 0;
258 /* calling thread PROCESSOR_NUMBER */
259 PROCESSOR_NUMBER CurrentProcessorNumber;
260 /* calling thread GROUP_AFFINITY */
261 /*GROUP_AFFINITY CurrentThreadGroupAffinity; */
262 /* calling thread NUMA node */
263 /*USHORT CurrentNumaNodeNumber;*/
265 WORD wActiveGroupCount;
268 /* array of processor information structures */
269 MPI_NUMA_PROCESSOR_INFO *pMPI_ProcessorInfo = NULL;
271 /* assume an error condition */
274 hModKernel32 = GetModuleHandleA("kernel32.dll");
276 if (hModKernel32 == NULL)
281 /* obtain addresses of relevant NUMA functions, most of which are
282 Windows 7 / Windows Server 2008R2 only functions
283 this is done using GetProcAddress to enable the binary to run on older
287 func_GetNumaHighestNodeNumber = (func_GetNumaHighestNodeNumber_t) GetProcAddress( hModKernel32, "GetNumaHighestNodeNumber" );
288 func_SetThreadIdealProcessor = (func_SetThreadIdealProcessor_t) GetProcAddress( hModKernel32, "SetThreadIdealProcessor" );
290 if (func_GetNumaHighestNodeNumber == NULL)
295 /* determine if we're on a NUMA system and if so, determine the number of
298 if (!func_GetNumaHighestNodeNumber( &ulHighestNumaNodeNumber ))
305 func_SetThreadGroupAffinity = (func_SetThreadGroupAffinity_t)GetProcAddress( hModKernel32, "SetThreadGroupAffinity" );
306 func_SetThreadIdealProcessorEx = (func_SetThreadIdealProcessorEx_t)GetProcAddress( hModKernel32, "SetThreadIdealProcessorEx" );
307 func_CreateRemoteThreadEx = (func_CreateRemoteThreadEx_t)GetProcAddress( hModKernel32, "CreateRemoteThreadEx" );
308 func_GetNumaNodeProcessorMaskEx = (func_GetNumaNodeProcessorMaskEx_t)GetProcAddress( hModKernel32, "GetNumaNodeProcessorMaskEx" );
309 func_GetNumaProcessorNodeEx = (func_GetNumaProcessorNodeEx_t)GetProcAddress( hModKernel32, "GetNumaProcessorNodeEx" );
310 func_GetCurrentProcessorNumberEx = (func_GetCurrentProcessorNumberEx_t)GetProcAddress( hModKernel32, "GetCurrentProcessorNumberEx" );
311 func_GetActiveProcessorCount = (func_GetActiveProcessorCount_t)GetProcAddress( hModKernel32, "GetActiveProcessorCount" );
312 func_GetActiveProcessorGroupCount = (func_GetActiveProcessorGroupCount_t)GetProcAddress( hModKernel32, "GetActiveProcessorGroupCount" );
313 func_InitializeProcThreadAttributeList = (func_InitializeProcThreadAttributeList_t)GetProcAddress( hModKernel32, "InitializeProcThreadAttributeList" );
314 func_UpdateProcThreadAttribute = (func_UpdateProcThreadAttribute_t)GetProcAddress( hModKernel32, "UpdateProcThreadAttribute" );
315 func_DeleteProcThreadAttributeList = (func_DeleteProcThreadAttributeList_t)GetProcAddress( hModKernel32, "DeleteProcThreadAttributeList" );
317 if ( (func_SetThreadGroupAffinity == NULL) ||
318 (func_SetThreadIdealProcessorEx == NULL) ||
319 (func_CreateRemoteThreadEx == NULL) ||
320 (func_GetNumaNodeProcessorMaskEx == NULL) ||
321 (func_GetNumaProcessorNodeEx == NULL) ||
322 (func_GetCurrentProcessorNumberEx == NULL) ||
323 (func_GetActiveProcessorCount == NULL) ||
324 (func_GetActiveProcessorGroupCount == NULL) ||
325 (func_InitializeProcThreadAttributeList == NULL) ||
326 (func_UpdateProcThreadAttribute == NULL) ||
327 (func_DeleteProcThreadAttributeList == NULL) )
329 /* if any addresses couldn't be located, assume NUMA functionality
334 if (ulHighestNumaNodeNumber == 0)
336 /* system is not NUMA */
341 /* count the active processors across the groups */
343 func_GetCurrentProcessorNumberEx(&CurrentProcessorNumber);
345 wActiveGroupCount = func_GetActiveProcessorGroupCount();
347 dwTotalProcessors = func_GetActiveProcessorCount( ALL_PROCESSOR_GROUPS );
349 #if !((defined WIN64 || defined _WIN64))
350 /* WOW64 doesn't allow setting the affinity correctly beyond 32
351 processors -- the KAFFINITY mask is only 32 bits wide
352 This check is only here for completeness -- large systems should be
353 running 64bit Gromacs code, where the processor quantity is not
355 By failing here, the WOW64 32bit client will use normal CreateThread(),
356 which can schedule up to 64 un-affinitized threads
359 if (dwTotalProcessors > 32)
365 /* allocate array of processor info blocks */
367 pMPI_ProcessorInfo = malloc( sizeof(MPI_NUMA_PROCESSOR_INFO) *
369 if (pMPI_ProcessorInfo == NULL)
374 /* zero fill to cover reserved must be-zero fields */
375 memset(pMPI_ProcessorInfo, 0, sizeof(MPI_NUMA_PROCESSOR_INFO) * dwTotalProcessors);
377 /* loop through each processor group, and for each group, capture the
378 processor numbers and NUMA node information. */
380 for (GroupIndex = 0; GroupIndex < wActiveGroupCount; GroupIndex++)
382 DWORD dwGroupProcessorCount;
385 dwGroupProcessorCount = func_GetActiveProcessorCount( GroupIndex );
387 for (ProcessorIndex = 0; ProcessorIndex < dwGroupProcessorCount;
390 PROCESSOR_NUMBER *pProcessorNumber = &(pMPI_ProcessorInfo[i].ProcessorNumber);
391 GROUP_AFFINITY *pGroupAffinity = &(pMPI_ProcessorInfo[i].GroupAffinity);
392 USHORT *pNodeNumber = &(pMPI_ProcessorInfo[i].NumaNodeNumber);
394 pProcessorNumber->Group = GroupIndex;
395 pProcessorNumber->Number = ProcessorIndex;
397 /* save an index to the processor array entry for the current processor
398 this is used to enable subsequent threads to be created in a round
399 robin fashion starting at the next array entry
402 if ( (CurrentProcessorNumber.Group == pProcessorNumber->Group ) &&
403 (CurrentProcessorNumber.Number == pProcessorNumber->Number) )
405 /* set global: current thread index into processor array */
409 /* capture the node number and group affinity associated with processor entry
410 any failures here are assumed to be catastrophic and disable
411 the group & NUMA aware thread support
414 if (!func_GetNumaProcessorNodeEx(pProcessorNumber, pNodeNumber))
419 if (!func_GetNumaNodeProcessorMaskEx(*pNodeNumber, pGroupAffinity))
424 /* future enhancement: construct GroupAffinity (single) processor
425 mask within NUMA node for this processor entry */
427 /* increment processor array index */
430 /* sanity check, should never happen */
432 if (i > dwTotalProcessors)
440 /* capture number of processors, highest NUMA node number, and processor
442 g_ulTotalProcessors = dwTotalProcessors;
443 g_ulHighestNumaNodeNumber = ulHighestNumaNodeNumber;
444 g_MPI_ProcessorInfo = pMPI_ProcessorInfo;
452 if (pMPI_ProcessorInfo)
454 tMPI_Free( pMPI_ProcessorInfo );
461 static int tMPI_Thread_id_list_init(void)
465 EnterCriticalSection( &thread_id_list_lock );
467 N_thread_id_list = 0;
468 Nalloc_thread_id_list = 4; /* number of initial allocation*/
469 thread_id_list = (thread_id_list_t*)malloc(sizeof(thread_id_list_t)*
470 Nalloc_thread_id_list);
471 if (thread_id_list == NULL)
476 LeaveCriticalSection( &thread_id_list_lock );
481 /* add an entry to the thread ID list, assuming it's locked */
482 static int tMPI_Thread_id_list_add_locked(DWORD thread_id,
483 struct tMPI_Thread *th)
485 if (Nalloc_thread_id_list < N_thread_id_list + 1)
487 thread_id_list_t* new_list;
490 /* double the size */
491 Nalloc_thread_id_list *= 2;
492 /* and allocate the new list */
493 new_list = (thread_id_list_t*)malloc(sizeof(thread_id_list_t)*
494 Nalloc_thread_id_list);
495 if (new_list == NULL)
499 /* and copy over all elements */
500 for (i = 0; i < N_thread_id_list; i++)
502 new_list[i] = thread_id_list[i];
504 /* free the old list */
505 tMPI_Free(thread_id_list);
506 thread_id_list = new_list;
508 thread_id_list[ N_thread_id_list ].thread_id = thread_id;
509 thread_id_list[ N_thread_id_list ].th = th;
516 /* add an entry to the thread ID list */
517 static int tMPI_Thread_id_list_add(DWORD thread_id, struct tMPI_Thread *th)
520 EnterCriticalSection( &thread_id_list_lock );
521 ret = tMPI_Thread_id_list_add_locked(thread_id, th);
522 LeaveCriticalSection( &thread_id_list_lock );
526 /* Remove an entry from the thread_id list, assuming it's locked.
527 Does nothing if an entry is not found.*/
528 static void tMPI_Thread_id_list_remove_locked(DWORD thread_id)
531 tmpi_bool found = FALSE;
533 /* move the last thread_id_list item to the one we want to remove */
534 for (i = 0; i < N_thread_id_list; i++)
536 if (thread_id_list[i].thread_id == thread_id)
538 thread_id_list[i] = thread_id_list[N_thread_id_list - 1];
551 /* Remove an entry from the thread_id list */
552 static void tMPI_Thread_id_list_remove(DWORD thread_id)
555 EnterCriticalSection( &thread_id_list_lock );
556 tMPI_Thread_id_list_remove_locked(thread_id);
557 LeaveCriticalSection( &thread_id_list_lock );
562 /* try to find a thread id in the thread id list. Return NULL when there is no
563 such thread id in the list. Assumes the list is locked.*/
564 static struct tMPI_Thread *tMPI_Thread_id_list_find_locked(DWORD thread_id)
567 struct tMPI_Thread *ret = NULL;
569 /* this is a linear search but it's only O(Nthreads). */
570 for (i = 0; i < N_thread_id_list; i++)
572 if (thread_id_list[i].thread_id == thread_id)
574 ret = thread_id_list[i].th;
582 /* try to find a thread id in the thread id list. Return NULL when there is no
583 such thread id in the list.*/
584 static struct tMPI_Thread *tMPI_Thread_id_list_find(DWORD thread_id)
586 struct tMPI_Thread *ret = NULL;
588 EnterCriticalSection( &thread_id_list_lock );
589 ret = tMPI_Thread_id_list_find_locked(thread_id);
590 LeaveCriticalSection( &thread_id_list_lock );
594 /* try to add the running thread to the list. Returns the tMPI_Thrread struct
595 associated with this thread, or NULL in case of an error.*/
596 static struct tMPI_Thread *tMPI_Thread_id_list_add_self(void)
599 struct tMPI_Thread *th = NULL;
602 EnterCriticalSection( &thread_id_list_lock );
604 thread_id = GetCurrentThreadId();
605 th = tMPI_Thread_id_list_find_locked(thread_id);
608 /* if not, create an ID, set it and return it */
609 th = (struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
611 /* to create a handle that can be used outside of the current
612 thread, the handle from GetCurrentThread() must first
614 DuplicateHandle(GetCurrentProcess(),
620 DUPLICATE_SAME_ACCESS);
622 /* This causes a small memory leak that is hard to fix. */
623 th->started_by_tmpi = 0;
624 ret = tMPI_Thread_id_list_add_locked(thread_id, th);
631 LeaveCriticalSection( &thread_id_list_lock );
636 static int tMPI_Init_initers(void)
641 /* we can pre-check because it's atomic */
642 if (tMPI_Atomic_get(&init_inited) == 0)
644 /* this can be a spinlock because the chances of collision are low. */
645 tMPI_Spinlock_lock( &init_init );
647 state = tMPI_Atomic_get(&init_inited);
648 tMPI_Atomic_memory_barrier_acq();
651 InitializeCriticalSection(&mutex_init);
652 InitializeCriticalSection(&once_init);
653 InitializeCriticalSection(&cond_init);
654 InitializeCriticalSection(&barrier_init);
655 InitializeCriticalSection(&thread_id_list_lock);
657 ret = tMPI_Init_NUMA();
664 ret = tMPI_Thread_id_list_init();
670 tMPI_Atomic_memory_barrier_rel();
671 tMPI_Atomic_set(&init_inited, 1);
674 tMPI_Spinlock_unlock( &init_init );
678 tMPI_Spinlock_unlock( &init_init );
684 enum tMPI_Thread_support tMPI_Thread_support(void)
686 return TMPI_THREAD_SUPPORT_YES;
689 struct tMPI_Thread_starter_param
691 void *(*start_routine)(void*); /* the function */
692 void *param; /* its parameter */
693 struct tMPI_Thread *thread;
697 __attribute__((force_align_arg_pointer))
699 static DWORD WINAPI tMPI_Win32_thread_starter( LPVOID lpParam )
701 struct tMPI_Thread_starter_param *prm =
702 (struct tMPI_Thread_starter_param*)lpParam;
704 (prm->start_routine)(prm->param);
709 int tMPI_Thread_get_hw_number(void)
714 GetSystemInfo( &sysinfo );
716 ret = sysinfo.dwNumberOfProcessors;
723 int tMPI_Thread_create(tMPI_Thread_t *thread,
724 void *(*start_routine)(void *), void *arg)
727 struct tMPI_Thread_starter_param *prm;
730 ret = tMPI_Init_initers();
741 /* a small memory leak to be sure that it doesn't get deallocated
742 once this function ends, before the newly created thread uses it. */
743 prm = (struct tMPI_Thread_starter_param*)
744 malloc(sizeof(struct tMPI_Thread_starter_param));
749 prm->start_routine = start_routine;
752 *thread = (struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
759 /* this must be locked before the thread is created to prevent a race
760 condition if the thread immediately wants to create its own entry */
761 EnterCriticalSection( &thread_id_list_lock );
762 /* just create a plain thread. */
763 (*thread)->started_by_tmpi = 1;
764 (*thread)->th = CreateThread(NULL,
766 tMPI_Win32_thread_starter,
770 if ((*thread)->th == NULL)
775 (*thread)->id = thread_id;
777 if ((*thread)->th == NULL)
782 ret = tMPI_Thread_id_list_add_locked(thread_id, (*thread));
787 LeaveCriticalSection( &thread_id_list_lock );
790 /* inherit the thread priority from the parent thread. */
791 /* TODO: is there value in setting this, vs. just allowing it to default
792 from the process? currently, this limits the effectivenes of changing
793 the priority in eg: TaskManager. */
794 SetThreadPriority(((*thread)->th), GetThreadPriority(GetCurrentThread()));
801 LeaveCriticalSection( &thread_id_list_lock );
811 int tMPI_Thread_join(tMPI_Thread_t thread, void **value_ptr)
815 ret = WaitForSingleObject(thread->th, INFINITE);
823 if (!GetExitCodeThread(thread, &retval))
828 CloseHandle(thread->th);
829 tMPI_Thread_id_list_remove(thread->id);
836 void tMPI_Thread_exit(void tmpi_unused *value_ptr)
838 /* TODO: call destructors for thread-local storage */
845 int tMPI_Thread_cancel(tMPI_Thread_t thread)
847 if (!TerminateThread( thread, -1) )
851 tMPI_Thread_id_list_remove(thread->id);
856 tMPI_Thread_t tMPI_Thread_self(void)
861 ret = tMPI_Init_initers();
867 th = tMPI_Thread_id_list_add_self();
872 int tMPI_Thread_equal(tMPI_Thread_t t1, tMPI_Thread_t t2)
874 /* because the tMPI thread IDs are unique, we can compare them directly */
878 enum tMPI_Thread_setaffinity_support tMPI_Thread_setaffinity_support(void)
880 /* Windows supports seting of thread affinities */
881 return TMPI_SETAFFINITY_SUPPORT_YES;
884 int tMPI_Thread_setaffinity_single(tMPI_Thread_t thread, unsigned int nr)
886 GROUP_AFFINITY GroupAffinity;
887 PROCESSOR_NUMBER IdealProcessorNumber;
888 /* thread NUMA node */
889 USHORT NumaNodeNumber;
891 /* check for a processor info array. This exists if NUMA
892 style calls have been succesfully initialized. */
893 if (g_MPI_ProcessorInfo != NULL)
896 /*func_GetCurrentProcessorNumberEx(&CurrentProcessorNumber);*/
898 memcpy(&GroupAffinity,
899 &(g_MPI_ProcessorInfo[nr].GroupAffinity),
900 sizeof(GROUP_AFFINITY));
902 /* group, processor number */
904 memcpy(&IdealProcessorNumber,
905 &(g_MPI_ProcessorInfo[nr].ProcessorNumber),
906 sizeof(PROCESSOR_NUMBER));
909 /* set the NUMA node affinity for the current thread
910 failures to set the current thread affinity are ignored,
911 as a fringe case can arise on >32 processor systems with a 32bit
914 func_SetThreadIdealProcessorEx(thread->th,
915 &IdealProcessorNumber,
918 if (func_GetNumaProcessorNodeEx(&IdealProcessorNumber,
921 /* for the NUMA node number associated with the current processor
922 number, get the group affinity mask */
923 if (func_GetNumaNodeProcessorMaskEx(NumaNodeNumber,
926 /* set the current thread affinity to prevent it from running
927 on other NUMA nodes */
928 func_SetThreadGroupAffinity(thread->th,
938 /* No NUMA-style calls. We just do a simpler thing. */
939 if ( (func_SetThreadIdealProcessor != NULL) )
941 return (func_SetThreadIdealProcessor(thread->th, nr) == -1);
949 int tMPI_Thread_mutex_init(tMPI_Thread_mutex_t *mtx)
956 mtx->mutex = (struct tMPI_Mutex*)malloc(sizeof(struct tMPI_Mutex)*1);
957 if (mtx->mutex == NULL)
961 InitializeCriticalSection(&(mtx->mutex->cs));
967 int tMPI_Thread_mutex_destroy(tMPI_Thread_mutex_t *mtx)
974 DeleteCriticalSection(&(mtx->mutex->cs));
983 static int tMPI_Thread_mutex_init_once(tMPI_Thread_mutex_t *mtx)
987 /* This is essentially a copy of the code from the one-time
988 * initialization, but with a call to the mutex init routine instead.
989 * It might seem like overkill, but it will only be executed the first
990 * time you call a static mutex, and it is important to get all the
991 * memory barriers right. Trust me, you don't want a deadlock here...
994 /* initialize the initializers */
995 ret = tMPI_Init_initers();
1000 /* Lock the common one-time init mutex so we can check carefully */
1001 EnterCriticalSection( &mutex_init );
1003 /* Do the actual (locked) check - system mutex is locked if we get here */
1004 if (mtx->mutex == NULL)
1006 /* No need to keep the lock during execution -
1007 * Only one thread can do it anyway.
1009 ret = tMPI_Thread_mutex_init(mtx);
1011 LeaveCriticalSection( &mutex_init );
1018 int tMPI_Thread_mutex_lock(tMPI_Thread_mutex_t *mtx)
1020 /* check whether the mutex is initialized */
1021 if (tMPI_Atomic_get( &(mtx->initialized) ) == 0)
1023 tMPI_Thread_mutex_init_once(mtx);
1026 /* The mutex is now guaranteed to be valid. */
1027 EnterCriticalSection( &(mtx->mutex->cs) );
1035 int tMPI_Thread_mutex_trylock(tMPI_Thread_mutex_t *mtx)
1039 /* check whether the mutex is initialized */
1040 if (tMPI_Atomic_get( &(mtx->initialized) ) == 0)
1042 tMPI_Thread_mutex_init_once(mtx);
1045 /* The mutex is now guaranteed to be valid. */
1046 ret = TryEnterCriticalSection( &(mtx->mutex->cs) );
1053 int tMPI_Thread_mutex_unlock(tMPI_Thread_mutex_t *mtx)
1055 /* we should have initialized our critical section anyway */
1056 LeaveCriticalSection( &(mtx->mutex->cs) );
1063 int tMPI_Thread_key_create(tMPI_Thread_key_t *key, void (*destructor)(void *) tmpi_unused)
1071 /* TODO: make list of destructors for thread-local storage */
1072 key->key = (struct tMPI_Thread_key*)malloc(sizeof(struct tMPI_Thread_key));
1073 if (key->key == NULL)
1078 (key)->key->wkey = TlsAlloc();
1080 if ( (key)->key->wkey == TLS_OUT_OF_INDEXES)
1089 int tMPI_Thread_key_delete(tMPI_Thread_key_t key)
1091 TlsFree(key.key->wkey);
1099 void * tMPI_Thread_getspecific(tMPI_Thread_key_t key)
1103 p = TlsGetValue(key.key->wkey);
1109 int tMPI_Thread_setspecific(tMPI_Thread_key_t key, void *value)
1113 ret = TlsSetValue(key.key->wkey, value);
1119 /* use once Vista is minimum required version */
1120 static BOOL CALLBACK InitHandleWrapperFunction(PINIT_ONCE InitOnce,
1124 void (*fn)(void) = (void (*)(void))Parameter;
1131 CRITICAL_SECTION tMPI_Once_cs;
1132 tMPI_Spinlock_t tMPI_Once_cs_lock = TMPI_SPINLOCK_INITIALIZER;
1133 volatile int tMPI_Once_init = 0;
1136 int tMPI_Thread_once(tMPI_Thread_once_t *once_control,
1137 void (*init_routine)(void))
1140 /* use once Vista is minimum required version */
1142 bStatus = InitOnceExecuteOnce(once_control, InitHandleWrapperFunction,
1143 init_routine, NULL);
1152 /* really ugly hack - and it's slow... */
1153 ret = tMPI_Init_initers();
1159 EnterCriticalSection(&once_init);
1160 if (tMPI_Atomic_get(&(once_control->once)) == 0)
1163 tMPI_Atomic_set(&(once_control->once), 1);
1165 LeaveCriticalSection(&once_init);
1174 int tMPI_Thread_cond_init(tMPI_Thread_cond_t *cond)
1181 cond->condp = (struct tMPI_Thread_cond*)
1182 malloc(sizeof(struct tMPI_Thread_cond));
1183 if (cond->condp == NULL)
1188 /* use this code once Vista is the minimum version required */
1189 InitializeConditionVariable( &(cond->cv) );
1191 cond->condp->Nwaiters = 0;
1192 InitializeCriticalSection(&(cond->condp->wtr_lock));
1193 cond->condp->Nrelease = 0;
1194 cond->condp->cycle = 0;
1195 /* a manual reset, unsignalled event */
1196 cond->condp->ev = CreateEvent(NULL, TRUE, FALSE, NULL);
1202 int tMPI_Thread_cond_destroy(tMPI_Thread_cond_t *cond)
1205 /* use this code once Vista is the minimum version required */
1206 /* windows doesnt have this function */
1208 DeleteCriticalSection(&(cond->condp->wtr_lock));
1216 /*! \brief Static init routine for pthread barrier
1220 * This is only used as a wrapper to enable static initialization
1221 * of posix thread types together with out abstraction layer for tMPI_Thread.h
1223 * \param cond Condition variable, must be statically initialized
1225 * \return status - 0 on success, or a standard error code.
1227 static int tMPI_Thread_cond_init_once(tMPI_Thread_cond_t *cond)
1231 /* This is essentially a copy of the code from the one-time
1232 * initialization, but with a call to the cond init routine instead.
1233 * It might seem like overkill, but it will only be executed the first
1234 * time you call a static condition variable, and it is important to get
1235 * the memory barriers right. Trust me, you don't want a deadlock here...
1238 /* initialize the initializers */
1239 ret = tMPI_Init_initers();
1244 /* Lock the common one-time init mutex so we can check carefully */
1245 EnterCriticalSection( &cond_init );
1247 /* Do the actual (locked) check - system mutex is locked if we get here */
1248 if (cond->condp == NULL)
1250 /* No need to keep the lock during execution -
1251 * Only one thread can do it anyway. */
1252 ret = tMPI_Thread_cond_init(cond);
1254 LeaveCriticalSection( &cond_init );
1262 int tMPI_Thread_cond_wait(tMPI_Thread_cond_t *cond, tMPI_Thread_mutex_t *mtx)
1264 BOOL wait_done = FALSE;
1265 BOOL last_waiter = FALSE;
1269 /* check whether the condition is initialized */
1270 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
1272 ret = tMPI_Thread_cond_init_once(cond);
1278 /* the mutex must have been initialized because it should be locked here */
1281 /* use this code once Vista is the minimum version required */
1282 ret = SleepConditionVariableCS (&(cond->cv), &(mtx->cs), INFINITE);
1289 /* serially increase waiter count */
1290 EnterCriticalSection(&(cond->condp->wtr_lock));
1291 cond->condp->Nwaiters++;
1292 my_cycle = cond->condp->cycle;
1293 LeaveCriticalSection(&(cond->condp->wtr_lock));
1295 /* now it's safe to release the mutex from the fn call */
1296 LeaveCriticalSection(&(mtx->mutex->cs));
1298 /* Loop a wait until we found out we've waited for the right event.
1299 Note that this loop is potentially a busy-wait loop in bad
1300 circumstances (higher priority threads, for example). */
1303 /* do the actual waiting */
1304 if (WaitForSingleObject( cond->condp->ev, INFINITE ) == WAIT_FAILED)
1309 /* serially check whether we got the right event. */
1310 EnterCriticalSection(&(cond->condp->wtr_lock));
1311 wait_done = (cond->condp->Nrelease > 0) &&
1312 (cond->condp->cycle != my_cycle);
1313 LeaveCriticalSection(&(cond->condp->wtr_lock));
1317 /* We obtain the mutex from the function call */
1318 EnterCriticalSection(&(mtx->mutex->cs));
1320 /* we serially decrease the waiter count and release count */
1321 EnterCriticalSection(&(cond->condp->wtr_lock));
1322 cond->condp->Nwaiters--;
1323 cond->condp->Nrelease--;
1324 last_waiter = (cond->condp->Nrelease == 0);
1325 LeaveCriticalSection(&(cond->condp->wtr_lock));
1327 /* manually release the event if everybody's done with it */
1330 if (!ResetEvent( cond->condp->ev ))
1343 int tMPI_Thread_cond_signal(tMPI_Thread_cond_t *cond)
1346 /* check whether the condition is initialized */
1347 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
1349 ret = tMPI_Thread_cond_init_once(cond);
1355 /* The condition variable is now guaranteed to be valid. */
1357 /* use this code once Vista is the minimum version required */
1358 WakeConditionVariable( &(cond->cv) );
1360 EnterCriticalSection(&(cond->condp->wtr_lock));
1361 /* check if we're not still busy with a release. If we are, do nothing. */
1362 if (cond->condp->Nwaiters > cond->condp->Nrelease)
1364 cond->condp->Nrelease++;
1365 cond->condp->cycle++;
1366 if (!SetEvent(cond->condp->ev)) /* actually release the
1372 LeaveCriticalSection(&(cond->condp->wtr_lock));
1380 int tMPI_Thread_cond_broadcast(tMPI_Thread_cond_t *cond)
1383 /* check whether the condition is initialized */
1384 if (tMPI_Atomic_get( &(cond->initialized) ) == 0)
1386 ret = tMPI_Thread_cond_init_once(cond);
1393 /* The condition variable is now guaranteed to be valid. */
1395 /* use this code once Vista is the minimum version required */
1396 WakeAllConditionVariable( &(cond->cv) );
1398 EnterCriticalSection(&(cond->condp->wtr_lock));
1399 /* check whether there are any waiters */
1400 if (cond->condp->Nwaiters > 0)
1402 cond->condp->Nrelease = cond->condp->Nwaiters;
1403 cond->condp->cycle++;
1404 if (!SetEvent(cond->condp->ev)) /* actually release the
1410 LeaveCriticalSection(&(cond->condp->wtr_lock));
1418 int tMPI_Thread_barrier_init(tMPI_Thread_barrier_t *barrier, int n)
1422 if (barrier == NULL)
1427 barrier->barrierp = (struct tMPI_Thread_barrier*)
1428 malloc(sizeof(struct tMPI_Thread_barrier)*1);
1429 if (barrier->barrierp == NULL)
1435 /* use this once Vista is the oldest supported windows version: */
1436 InitializeCriticalSection(&(barrier->barrierp->cs));
1437 InitializeConditionVariable(&(barrier->barrierp->cv));
1439 ret = tMPI_Thread_mutex_init(&(barrier->barrierp->cs));
1444 ret = tMPI_Thread_cond_init(&(barrier->barrierp->cv));
1451 barrier->threshold = n;
1460 int tMPI_Thread_barrier_destroy(tMPI_Thread_barrier_t *barrier)
1464 if (barrier == NULL)
1470 DeleteCriticalSection(&(barrier->barrierp->cs));
1472 ret = tMPI_Thread_mutex_destroy(&(barrier->barrierp->cs));
1479 ret = tMPI_Thread_cond_destroy(&(barrier->barrierp->cv));
1485 free(barrier->barrierp);
1492 /*! \brief Static init routine for pthread barrier
1496 * This is only used as a wrapper to enable static initialization
1497 * of posix thread types together with out abstraction layer for tMPI_Thread.h
1499 * \param barrier Statically initialized barrier type
1500 * \param n Number of members in barrier
1502 * \return status - 0 on success, or a standard error code.
1504 static int tMPI_Thread_barrier_init_once(tMPI_Thread_barrier_t *barrier, int n)
1508 /* This is essentially a copy of the code from the one-time
1509 * initialization, but with a call to the cond init routine instead.
1510 * It might seem like overkill, but it will only be executed the first
1511 * time you call a static condition variable, and it is important to get
1512 * the memory barriers right. Trust me, you don't want a deadlock here...
1516 /* initialize the initializers */
1517 ret = tMPI_Init_initers();
1523 /* Lock the common one-time init mutex so we can check carefully */
1524 EnterCriticalSection( &barrier_init );
1526 /* Do the actual (locked) check - system mutex is locked if we get here */
1527 if (barrier->barrierp == NULL)
1529 /* No need to keep the lock during execution -
1530 * Only one thread can do it anyway. */
1531 ret = tMPI_Thread_barrier_init(barrier, n);
1533 LeaveCriticalSection( &barrier_init );
1540 int tMPI_Thread_barrier_wait(tMPI_Thread_barrier_t *barrier)
1545 /*tMPI_Thread_pthread_barrier_t *p;*/
1547 /* check whether the barrier is initialized */
1548 if (tMPI_Atomic_get( &(barrier->initialized) ) == 0)
1550 ret = tMPI_Thread_barrier_init_once(barrier, barrier->threshold);
1557 EnterCriticalSection( &(barrier->barrierp->cs) );
1559 ret = tMPI_Thread_mutex_lock( &(barrier->barrierp->cs) );
1568 cycle = barrier->cycle;
1570 /* Decrement the count atomically and check if it is zero.
1571 * This will only be true for the last thread calling us.
1573 if (--(barrier->count) <= 0)
1575 barrier->cycle = !barrier->cycle;
1576 barrier->count = barrier->threshold;
1578 WakeAllConditionVariable( &(barrier->barrierp->cv) );
1580 ret = tMPI_Thread_cond_broadcast( &(barrier->barrierp->cv) );
1589 while (cycle == barrier->cycle)
1592 rc = SleepConditionVariableCS (&(barrier->barrierp->cv),
1593 &(barrier->barrierp->cs),
1601 rc = tMPI_Thread_cond_wait(&barrier->barrierp->cv,
1602 &barrier->barrierp->cs);
1611 LeaveCriticalSection( &(barrier->barrierp->cs) );
1613 tMPI_Thread_mutex_unlock( &(barrier->barrierp->cs) );
1620 /* just to have some symbols */
1621 int tMPI_Thread_winthreads = 0;
1623 #endif /* THREAD_WINDOWS */