-/*! \brief Pthread implementation of the abstract tMPI_Thread type
+/*! \brief Thread ID: abstract tMPI_Thread type
*
* The contents of this structure depends on the actual threads
* implementation used.
+/*! \name Thread creation, destruction, and inspection
+ \{ */
/** Check if threads are supported
*
* This routine provides a cleaner way to check if threads are supported
*
* Please be careful not to change arg after calling this function.
*
- * \param thread Pointer to opaque thread datatype
- * \param start_routine The function to call in the new thread
- * \param arg Argument to call with
+ * \param[out] thread Pointer to thread ID
+ * \param[in] start_routine The function to call in the new thread
+ * \param[in] arg Argument to call with
*
* \return Status - 0 on success, or an error code.
*/
*
* Please be careful not to change arg after calling this function.
*
- * \param thread Pointer to opaque thread datatype
- * \param start_routine The function to call in the new thread
- * \param arg Argument to call with
+ * \param[out] thread Pointer to thread ID
+ * \param[in] start_routine The function to call in the new thread
+ * \param[in] arg Argument to call with
*
* \return Status - 0 on success, or an error code.
*/
*
* If the thread has already finished the routine returns immediately.
*
- * \param thread Opaque thread datatype to wait for.
- * \param value_ptr Pointer to location where to store pointer to exit value
- * from threads that called tMPI_Thread_exit().
+ * \param[in] thread Pointer to thread ID
+ * \param[out] value_ptr Pointer to location where to store pointer to
+ * exit value from threads that called
+ * tMPI_Thread_exit().
*
* \return 0 if the join went ok, or a non-zero error code.
*/
int tMPI_Thread_join(tMPI_Thread_t thread, void **value_ptr);
+/** Terminate calling thread
+ *
+ * Die voluntarily.
+ *
+ * \param value_ptr Pointer to a return value. Threads waiting for us to
+ * join them can read this value if they try.
+ * \return
+ */
+void tMPI_Thread_exit(void *value_ptr);
+
+
+
+/** Ask a thread to exit
+ *
+ * This routine tries to end the execution of another thread, but there are
+ * no guarantees it will succeed.
+ *
+ * \param thread Handle to thread we want to see dead.
+ * \return 0 or a non-zero error message.
+ */
+int tMPI_Thread_cancel(tMPI_Thread_t thread);
+
+
+
+
+/** Get a thread ID of the calling thread.
+ *
+ * This function also works on threads not started with tMPI_Thread_create,
+ * or any other function in thread_mpi. This makes it possible to, for
+ * example assign thread affinities to any thread.
+ *
+ * \return A thread ID of the calling thread */
+tMPI_Thread_t tMPI_Thread_self(void);
+
+
+
+/** Check whether two thread pointers point to the same thread
+ *
+ * \param[in] t1 Thread ID 1
+ * \param[in] t2 Thread ID 2
+ * \return non-zero if the thread structs refer to the same thread,
+ 0 if the threads are different*/
+int tMPI_Thread_equal(tMPI_Thread_t t1, tMPI_Thread_t t2);
+
+
+/** Set thread affinity to a single core
+ *
+ * This function sets the thread affinity of a thread to a a specific
+ * numbered processor. This only works if the underlying operating system
+ * supports it. The processor number must be between 0 and the number returned
+ * by tMPI_Thread_get_hw_number().
+ *
+ * \param[in] thread Thread ID of the thread to set affinity for
+ * \param[in] nr Processor number to set affinity to */
+int tMPI_Thread_setaffinity_single(tMPI_Thread_t thread, unsigned int nr);
+
+
+/*! \} */
+/*! \name Mutexes
+ \{ */
+
/** Initialize a new mutex
*
+/*! \} */
+/*! \name Thread-specific storage
+ \{ */
+
/** Initialize thread-specific-storage handle
*
int tMPI_Thread_setspecific(tMPI_Thread_key_t key, void *value);
+/*! \} */
+/*! \name Run-once
+ \{ */
+
/** Run the provided routine exactly once
*
void (*init_routine)(void));
+/*! \} */
+/*! \name Condition variables
+ \{ */
/** Initialize condition variable
*
-
-/** Terminate calling thread
- *
- * Die voluntarily.
- *
- * \param value_ptr Pointer to a return value. Threads waiting for us to
- * join them can read this value if they try.
- * \return
- */
-void tMPI_Thread_exit(void *value_ptr);
-
-
-
-/** Ask a thread to exit
- *
- * This routine tries to end the execution of another thread, but there are
- * no guarantees it will succeed.
- *
- * \param thread Handle to thread we want to see dead.
- * \return 0 or a non-zero error message.
- */
-int tMPI_Thread_cancel(tMPI_Thread_t thread);
-
-
-
-
+/*! \} */
+/*! \name Barriers
+ \{ */
/** Initialize a synchronization barrier type
*/
int tMPI_Thread_barrier_wait(tMPI_Thread_barrier_t *barrier);
-
+/*! \} */
#ifdef __cplusplus
*/
+/* for size_t, include stddef.h - which is in C89. This is done
+ regardless of whether we're compiling C++ or C code because the base
+ library for this is in C. */
+#include <stddef.h>
+
+
#ifdef __cplusplus
extern "C"
{
} /* Avoids screwing up auto-indentation */
#endif
+
+
/** tMPI definition.
Use this to check for thread_mpi with the preprocessor. */
#define TMPI
+/** tMPI initialization thread affinity strategy.
+
+ Used in the tMPI_Init_affinity() and tMPI_Init_fn_affinity() functions,
+ to control how affinity is set. The default tMPI_Init() and tMPI_Init_fn()
+ functions use the TMPI_AFFINITY_ALL_CORES strategy.
+
+ These strategies are fairly basic. For more flexibility, use the
+ tMPI_Set_affinity() function.*/
+typedef enum
+{
+ TMPI_AFFINITY_NONE=0, /**< Do not set any thread affinity */
+ TMPI_AFFINITY_ALL_CORES, /**< Only set affinity if the number of threads
+ is equal to the number of hardware threads
+ (cores + hyperthreads). This is the only
+ safe way to set thread affinity,
+ without clashes between multiple
+ instances of the same program. */
+} tMPI_Affinity_strategy;
+
+
/** tMPI Communicator
TMPI_ERR_ENVELOPES,
TMPI_ERR_REQUESTS,
TMPI_ERR_IN_STATUS,
+ TMPI_ERR_PROCNR, /*!< Hardware processor number (such as for
+ thread affinity) error */
TMPI_FAILURE,
TMPI_ERR_UNKNOWN,
N_TMPI_ERR /* this must be the last one */
/*! \name Initialization and exit functions
\{ */
-/** Traditional MPI initializer; spawns threads that start at
- the given function.
-
+/** Traditional MPI initializer; spawns threads that start at the given
+ function.
+
Seeks the argument '-nt n', where n is the number of
threads that will be created. If n==0, the number of threads will
be the recommended number of threads for this platform as obtained
argc and argv. This function could be main(), or any other function;
calling this function again - whether from the started threads or from
the main thread - has no effect.
-
- \param[in] argc argc of original main() invocation, or NULL
- \param[in] argv argv of original main() invocation, or NULL.
- \param[in] start_function Starting function of type
- int start_function(int argc, char *argv[]);
+
+ On platforms that support thread affinity setting, this function will
+ use the 'all-cores' affinity strategy: it will only set thread affinity
+ if the number of threads is equal to the number of hardware threads
+ (cores + hyperthreads).
+
+ \param[in] argc argc of original main() invocation, or NULL
+ \param[in] argv argv of original main() invocation, or NULL.
+ \param[in] start_function Starting function of type
+ int start_function(int argc, char *argv[]);
\return TMPI_SUCCESS on success, TMPI_FAILURE on failure. */
-int tMPI_Init(int *argc, char ***argv, int (*start_function)(int, char**));
+int tMPI_Init(int *argc, char ***argv,
+ int (*start_function)(int, char**));
-/** Alternate thread MPI intializer and thread spawner.
+/** Generic init function thread MPI intializer and thread spawner.
Creates N threads (including main thread)
that run the function start_function, which takes a void* argument,
If N==0, the number of threads will be the recommended number of
threads for this platform as obtained from tMPI_Get_recommended_ntreads().
+ Note that thread affinity strategy only has an effect when this is
+ supported by the underlying platform. As of yet (2012), this is not the
+ case for Mac OS X, for example.
+
\param[in] main_thread_returns whether the control in the main thread
should return immediately (if true), or
the start_function() should be called
from the main thread, too (if false).
\param[in] N The number of threads to start (or 0 to
automatically determine this).
+ \param[in] aff_strategy The thread affinity strategy to use.
\param[in] start_function The function to start threads at
(including main thread if
main_thread_returns).
\return TMPI_FAILURE on failure, TMPI_SUCCESS on succes (after all
threads have finished if main_thread_returns=true). */
-int tMPI_Init_fn(int main_thread_returns, int N,
+int tMPI_Init_fn(int main_thread_returns, int N,
+ tMPI_Affinity_strategy aff_strategy,
void (*start_function)(void*), void *arg);
+
+
+
+
/** get the number of threads from the command line
can be called before tMPI_Init()
/* mutex for initializing barriers */
static pthread_mutex_t barrier_init=PTHREAD_MUTEX_INITIALIZER;
-/* mutex for initializing barriers */
-static pthread_mutex_t aff_init=PTHREAD_MUTEX_INITIALIZER;
-static int aff_thread_number=0;
+/* mutex for managing thread IDs */
+static pthread_mutex_t thread_id_mutex=PTHREAD_MUTEX_INITIALIZER;
+static pthread_key_t thread_id_key;
+static int thread_id_key_initialized=0;
+
/* TODO: this needs to go away! (there's another one in winthreads.c)
return ret;
}
-int tMPI_Thread_create(tMPI_Thread_t *thread, void *(*start_routine)(void *),
- void *arg)
+/* destructor for thread ids */
+static void tMPI_Destroy_thread_id(void* thread_id)
{
- int ret;
-
- if(thread==NULL)
+ struct tMPI_Thread *thread=(struct tMPI_Thread*)thread_id;
+ if (!thread->started_by_tmpi)
{
- tMPI_Fatal_error(TMPI_FARGS,"Invalid thread pointer.");
- return EINVAL;
+ /* if the thread is started by tMPI, it must be freed in the join()
+ call. */
+ free(thread_id);
}
+}
- *thread=(struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
- ret=pthread_create(&((*thread)->th),NULL,start_routine,arg);
-
- if(ret!=0)
+/* initialize the thread id vars if not already initialized */
+static void tMPI_Init_thread_ids(void)
+{
+ pthread_mutex_lock( &thread_id_mutex );
+ if (!thread_id_key_initialized)
{
- /* Cannot use tMPI_error() since messages use threads for locking */
- tMPI_Fatal_error(TMPI_FARGS,"Failed to create POSIX thread:%s, rc=%d",
- strerror(errno), ret);
- /* Use system memory allocation routines */
- return -1;
- }
+ /* initialize and set the thread id thread-specific variable */
+ struct tMPI_Thread *main_thread;
- return 0;
+ thread_id_key_initialized=1;
+ pthread_key_create(&thread_id_key, tMPI_Destroy_thread_id);
+ main_thread=(struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
+ main_thread->th=pthread_self();
+ main_thread->started_by_tmpi=0;
+ pthread_setspecific(thread_id_key, main_thread);
+ }
+ pthread_mutex_unlock( &thread_id_mutex );
}
+/* structure to hold the arguments for the thread_starter function */
+struct tMPI_Thread_starter
+{
+ struct tMPI_Thread *thread;
+ void *(*start_routine)(void*);
+ void *arg;
+};
-/* set thread's own affinity to a processor number n */
-static int tMPI_Set_affinity(int n)
+/* the thread_starter function that sets the thread id */
+static void *tMPI_Thread_starter(void *arg)
{
-#ifdef HAVE_PTHREAD_SETAFFINITY
- cpu_set_t set;
+ struct tMPI_Thread_starter *starter=(struct tMPI_Thread_starter *)arg;
+ void *(*start_routine)(void*);
+ void *parg;
- CPU_ZERO(&set);
- CPU_SET(n, &set);
- return pthread_setaffinity_np(pthread_self(), sizeof(set), &set);
-#endif
- return 0;
+ pthread_setspecific(thread_id_key, starter->thread);
+ start_routine=starter->start_routine;
+ parg=starter->arg;
+
+ free(starter);
+ return (*start_routine)(parg);
}
-int tMPI_Thread_create_aff(tMPI_Thread_t *thread,
- void *(*start_routine)(void *),
- void *arg)
+int tMPI_Thread_create(tMPI_Thread_t *thread, void *(*start_routine)(void *),
+ void *arg)
{
int ret;
-
-#ifdef TMPI_SET_AFFINITY
- /* set the calling thread's affinity mask */
- pthread_mutex_lock( &(aff_init) );
- if (aff_thread_number==0)
- {
- tMPI_Set_affinity(aff_thread_number++);
- }
- pthread_mutex_unlock( &(aff_init) );
-#endif
+ struct tMPI_Thread_starter *starter;
if(thread==NULL)
{
tMPI_Fatal_error(TMPI_FARGS,"Invalid thread pointer.");
return EINVAL;
}
+ tMPI_Init_thread_ids();
*thread=(struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
- ret=pthread_create(&((*thread)->th),NULL,start_routine,arg);
+ (*thread)->started_by_tmpi = 1;
+ starter=(struct tMPI_Thread_starter*)
+ malloc(sizeof(struct tMPI_Thread_starter)*1);
+ /* fill the starter structure */
+ starter->thread=*thread;
+ starter->start_routine=start_routine;
+ starter->arg=arg;
+
+ /*ret=pthread_create(&((*thread)->th),NULL,start_routine,arg);*/
+ ret=pthread_create(&((*thread)->th),NULL,tMPI_Thread_starter,
+ (void*)starter);
if(ret!=0)
{
/* Use system memory allocation routines */
return -1;
}
- else
- {
-#ifdef TMPI_SET_AFFINITY
- /* now set the affinity of the new thread */
- pthread_mutex_lock( &(aff_init) );
- ret=tMPI_Set_affinity(aff_thread_number++);
- pthread_mutex_unlock( &(aff_init) );
- /* failure is non-fatal, so we won't check the result */
- return 0;
-#else
- return 0;
-#endif
- }
-}
-
+ return 0;
+}
int ret;
pthread_t th=thread->th;
- free(thread);
ret = pthread_join( th, value_ptr );
+ free(thread);
if(ret != 0 )
{
tMPI_Fatal_error(TMPI_FARGS,"Failed to join POSIX thread. rc=%d",ret);
}
+tMPI_Thread_t tMPI_Thread_self(void)
+{
+ tMPI_Thread_t th;
+ /* make sure the key var is set */
+ tMPI_Init_thread_ids();
+
+ th=pthread_getspecific(thread_id_key);
+
+ /* check if it is already in our list */
+ if (th == NULL)
+ {
+ /* if not, create an ID, set it and return it */
+ th=(struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
+ th->started_by_tmpi=0;
+ th->th=pthread_self();
+ pthread_setspecific(thread_id_key, th);
+ }
+ return th;
+}
+
+int tMPI_Thread_equal(tMPI_Thread_t t1, tMPI_Thread_t t2)
+{
+ return pthread_equal(t1->th, t2->th);
+}
+
+/* set thread's own affinity to a processor number n */
+int tMPI_Thread_setaffinity_single(tMPI_Thread_t thread, unsigned int nr)
+{
+#ifdef HAVE_PTHREAD_SETAFFINITY
+ int nt=tMPI_Thread_get_hw_number();
+ cpu_set_t set;
+
+ if (nt < nr)
+ {
+ return TMPI_ERR_PROCNR;
+ }
+
+ CPU_ZERO(&set);
+ CPU_SET(nr, &set);
+ return pthread_setaffinity_np(thread->th, sizeof(set), &set);
+#endif
+ return 0;
+}
+
+
+
int tMPI_Thread_mutex_init(tMPI_Thread_mutex_t *mtx)
{
-void tMPI_Thread_exit(void * value_ptr)
+void tMPI_Thread_exit(void *value_ptr)
{
pthread_exit(value_ptr);
}
-int tMPI_Thread_cancel(tMPI_Thread_t thread)
+int tMPI_Thread_cancel(tMPI_Thread_t thread)
{
return pthread_cancel(thread->th);
}
struct tMPI_Thread
{
- pthread_t th;
+ pthread_t th; /*!< The POSIX thread ID */
+ int started_by_tmpi; /*!< whether the thread is started by tMPI */
};
struct tMPI_Thread_key
/* start N threads with argc, argv (used by tMPI_Init)*/
-void tMPI_Start_threads(tmpi_bool main_returns, int N, int *argc, char ***argv,
+void tMPI_Start_threads(tmpi_bool main_returns, int N,
+ tMPI_Affinity_strategy aff_strategy,
+ int *argc, char ***argv,
void (*start_fn)(void*), void *start_arg,
int (*start_fn_main)(int, char**));
void tMPI_Trace_print(const char *fmt, ...)
{
va_list argp;
- struct tmpi_thread* th=tMPI_Get_current();
+ struct tmpi_thread* th=NULL;
static tMPI_Thread_mutex_t mtx=TMPI_THREAD_MUTEX_INITIALIZER;
tMPI_Thread_mutex_lock(&mtx);
if (threads)
+ {
+ th=tMPI_Get_current();
printf("THREAD %02d: ", (int)(th-threads));
+ }
else
+ {
printf("THREAD main: ");
+ }
va_start(argp, fmt);
vprintf(fmt, argp);
printf("\n");
static void tMPI_Global_destroy(struct tmpi_global *g)
{
+ tMPI_Thread_barrier_destroy(&(g->barrier));
tMPI_Thread_mutex_destroy(&(g->timer_mutex));
tMPI_Thread_mutex_destroy(&(g->comm_link_lock));
}
}
-void tMPI_Start_threads(tmpi_bool main_returns, int N, int *argc, char ***argv,
+void tMPI_Start_threads(tmpi_bool main_returns, int N,
+ tMPI_Affinity_strategy aff_strategy,
+ int *argc, char ***argv,
void (*start_fn)(void*), void *start_arg,
int (*start_fn_main)(int, char**))
{
#ifdef TMPI_TRACE
- tMPI_Trace_print("tMPI_Start_threads(%d, %p, %p, %p, %p)", N, argc,
- argv, start_fn, start_arg);
+ tMPI_Trace_print("tMPI_Start_threads(%d, %d, %d, %d, %d, %p, %p, %p, %p)",
+ main_returns, N, aff_strategy, argc, argv, start_fn,
+ start_arg);
#endif
if (N>0)
{
}
/* now check whether to set affinity */
-#ifdef TMPI_THREAD_AFFINITY
+ if (aff_strategy == TMPI_AFFINITY_ALL_CORES)
{
int nhw=tMPI_Thread_get_hw_number();
if ((nhw > 1) && (nhw == N))
set_affinity=TRUE;
}
}
-#endif
+
+ /* set thread 0's properties */
+ threads[0].thread_id=tMPI_Thread_self();
+ if (set_affinity)
+ {
+ /* set the main thread's affinity */
+ tMPI_Thread_setaffinity_single(threads[0].thread_id, 0);
+ }
for(i=1;i<N;i++) /* zero is the main thread */
{
int ret;
+ ret=tMPI_Thread_create(&(threads[i].thread_id),
+ tMPI_Thread_starter,
+ (void*)&(threads[i]) ) ;
if (set_affinity)
{
- ret=tMPI_Thread_create_aff(&(threads[i].thread_id),
- tMPI_Thread_starter,
- (void*)&(threads[i]) ) ;
- }
- else
- {
- ret=tMPI_Thread_create(&(threads[i].thread_id),
- tMPI_Thread_starter,
- (void*)&(threads[i]) ) ;
+ tMPI_Thread_setaffinity_single(threads[i].thread_id, i);
}
-
if(ret)
{
tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_INIT);
}
-int tMPI_Init(int *argc, char ***argv, int (*start_function)(int, char**))
+int tMPI_Init(int *argc, char ***argv,
+ int (*start_function)(int, char**))
{
#ifdef TMPI_TRACE
tMPI_Trace_print("tMPI_Init(%p, %p, %p)", argc, argv, start_function);
{
int N=0;
tMPI_Get_N(argc, argv, "-nt", &N);
- tMPI_Start_threads(FALSE, N, argc, argv, NULL, NULL, start_function);
+ tMPI_Start_threads(FALSE, N, TMPI_AFFINITY_ALL_CORES, argc, argv,
+ NULL, NULL, start_function);
}
else
{
return TMPI_SUCCESS;
}
+
+
+
int tMPI_Init_fn(int main_thread_returns, int N,
+ tMPI_Affinity_strategy aff_strategy,
void (*start_function)(void*), void *arg)
{
#ifdef TMPI_TRACE
if (TMPI_COMM_WORLD==0 && N>=1) /* we're the main process */
{
- tMPI_Start_threads(main_thread_returns, N, 0, 0, start_function, arg,
- NULL);
+ tMPI_Start_threads(main_thread_returns, N, aff_strategy,
+ 0, 0, start_function, arg, NULL);
}
return TMPI_SUCCESS;
}
static tMPI_Spinlock_t main_thread_aff_lock=TMPI_SPINLOCK_INITIALIZER;
static tMPI_Atomic_t main_thread_aff_set={ 0 };
+/* mutex for managing thread IDs */
+/*static tMPI_Spinlock_t thread_id_lock=TMPI_SPINLOCK_INITIALIZER;*/
+static DWORD thread_id_key;
+
+
+
+
+/* data structure to keep track of thread key destructors. */
+typedef struct
+{
+ void (*destructor) (void*);
+ DWORD key;
+} thread_key_destructors;
+
+static thread_key_destructors *destructors=NULL;
+
+
+
/*
NUMA and Processor Group awareness support.
/* total number of processors in g_MPI_ProcessInfo array */
ULONG g_ulTotalProcessors;
/* array describing available processors, affinity masks, and NUMA node */
-MPI_NUMA_PROCESSOR_INFO *g_MPI_ProcessorInfo;
+MPI_NUMA_PROCESSOR_INFO *g_MPI_ProcessorInfo=NULL;
/* function prototypes and variables to support obtaining function addresses
dynamically -- supports down-level operating systems */
typedef BOOL (WINAPI *func_GetNumaHighestNodeNumber_t)( PULONG
HighestNodeNumber );
+typedef DWORD (WINAPI *func_SetThreadIdealProcessor_t)( HANDLE hThread,
+ DWORD dwIdealProcessor );
typedef BOOL (WINAPI *func_SetThreadGroupAffinity_t)( HANDLE hThread,
const GROUP_AFFINITY *GroupAffinity,
PGROUP_AFFINITY PreviousGroupAffinity );
/* WinXP SP2, WinXP64, WinSrv 2003 */
-func_GetNumaHighestNodeNumber_t func_GetNumaHighestNodeNumber;
+func_GetNumaHighestNodeNumber_t func_GetNumaHighestNodeNumber;
+func_SetThreadIdealProcessor_t func_SetThreadIdealProcessor;
/* Windows 7, WinSrv 2008R2 */
func_SetThreadGroupAffinity_t func_SetThreadGroupAffinity;
func_SetThreadIdealProcessorEx_t func_SetThreadIdealProcessorEx;
func_DeleteProcThreadAttributeList_t func_DeleteProcThreadAttributeList;
-/* Set the main thread's affinity */
-static int tMPI_Set_main_thread_affinity(void)
-{
- /* calling thread PROCESSOR_NUMBER */
- PROCESSOR_NUMBER CurrentProcessorNumber;
- /* calling thread GROUP_AFFINITY */
- GROUP_AFFINITY CurrentThreadGroupAffinity;
- /* calling thread NUMA node */
- USHORT CurrentNumaNodeNumber;
-
-
- /* we can pre-check because it's atomic */
- if (tMPI_Atomic_get(&main_thread_aff_set) == 0)
- {
- /* this can be a spinlock because the chances of collision are low. */
- tMPI_Spinlock_lock( &main_thread_aff_lock );
- if( g_ulHighestNumaNodeNumber != 0 )
- {
- func_GetCurrentProcessorNumberEx(&CurrentProcessorNumber);
-
-
- /* set the NUMA node affinity for the current thread
- failures to set the current thread affinity are ignored,
- as a fringe case can arise on >32 processor systems with a 32bit
- build/code.
- */
- func_SetThreadIdealProcessorEx(GetCurrentThread(),
- &CurrentProcessorNumber,
- NULL);
-
- if(func_GetNumaProcessorNodeEx(&CurrentProcessorNumber,
- &CurrentNumaNodeNumber))
- {
- /* for the NUMA node number associated with the current processor
- number, get the group affinity mask */
- if(func_GetNumaNodeProcessorMaskEx(CurrentNumaNodeNumber,
- &CurrentThreadGroupAffinity))
- {
- /* set the current thread affinity to prevent it from running on
- other NUMA nodes */
- func_SetThreadGroupAffinity(GetCurrentThread(),
- &CurrentThreadGroupAffinity,
- NULL);
- }
- }
- }
- else
- {
- /* No NUMA. For now, we just do a similar thing. */
- if ( (func_GetCurrentProcessorNumberEx != NULL) &&
- (func_SetThreadIdealProcessorEx))
- {
- func_GetCurrentProcessorNumberEx(&CurrentProcessorNumber);
- func_SetThreadIdealProcessorEx(GetCurrentThread(),
- &CurrentProcessorNumber,
- NULL);
- }
- }
- tMPI_Atomic_set( &main_thread_aff_set, 1);
- tMPI_Spinlock_unlock( &main_thread_aff_lock );
- }
- return 0;
-}
/* returns 0 on success.
Success is returned if the system is non-NUMA, OR the system doesn't
/* calling thread PROCESSOR_NUMBER */
PROCESSOR_NUMBER CurrentProcessorNumber;
/* calling thread GROUP_AFFINITY */
- GROUP_AFFINITY CurrentThreadGroupAffinity;
+ /*GROUP_AFFINITY CurrentThreadGroupAffinity; */
/* calling thread NUMA node */
- USHORT CurrentNumaNodeNumber;
+ /*USHORT CurrentNumaNodeNumber;*/
WORD wActiveGroupCount;
WORD GroupIndex;
*/
func_GetNumaHighestNodeNumber = (func_GetNumaHighestNodeNumber_t) GetProcAddress( hModKernel32, "GetNumaHighestNodeNumber" );
+ func_SetThreadIdealProcessor = (func_SetThreadIdealProcessor_t) GetProcAddress( hModKernel32, "SetThreadIdealProcessor" );
if( func_GetNumaHighestNodeNumber == NULL )
{
return -1;
}
- if( ulHighestNumaNodeNumber == 0 )
- {
- /* system is not NUMA */
- return 0;
- }
+
func_SetThreadGroupAffinity = (func_SetThreadGroupAffinity_t)GetProcAddress( hModKernel32, "SetThreadGroupAffinity" );
func_SetThreadIdealProcessorEx = (func_SetThreadIdealProcessorEx_t)GetProcAddress( hModKernel32, "SetThreadIdealProcessorEx" );
isn't supported */
return 0;
}
+#if 0
+ if( ulHighestNumaNodeNumber == 0 )
+ {
+ /* system is not NUMA */
+ return 0;
+ }
+#endif
/* count the active processors across the groups */
}
}
-#if 0
- /* set the NUMA node affinity for the current thread
- failures to set the current thread affinity are ignored,
- as a fringe case can arise on >32 processor systems with a 32bit
- build/code.
- */
- func_SetThreadIdealProcessorEx(GetCurrentThread(),
- &CurrentProcessorNumber,
- NULL);
- if(func_GetNumaProcessorNodeEx(&CurrentProcessorNumber,
- &CurrentNumaNodeNumber))
- {
- /* for the NUMA node number associated with the current processor
- number, get the group affinity mask */
- if(func_GetNumaNodeProcessorMaskEx(CurrentNumaNodeNumber,
- &CurrentThreadGroupAffinity))
- {
- /* set the current thread affinity to prevent it from running on
- other NUMA nodes */
- func_SetThreadGroupAffinity(GetCurrentThread(),
- &CurrentThreadGroupAffinity,
- NULL);
- }
- }
-#endif
-
/* capture number of processors, highest NUMA node number, and processor
array */
g_ulTotalProcessors = dwTotalProcessors;
InitializeCriticalSection(&once_init);
InitializeCriticalSection(&cond_init);
InitializeCriticalSection(&barrier_init);
+ thread_id_key=TlsAlloc();
/* fatal errors are handled by the routine by calling tMPI_Fatal_error() */
tMPI_Init_NUMA();
{
void *(*start_routine)(void*); /* the function */
void *param; /* its parameter */
+ struct tMPI_Thread *thread;
};
static DWORD WINAPI tMPI_Win32_thread_starter( LPVOID lpParam )
struct tMPI_Thread_starter_param *prm=
(struct tMPI_Thread_starter_param*)lpParam;
+ TlsSetValue(thread_id_key, prm->thread);
(prm->start_routine)(prm->param);
return 0;
}
-HANDLE tMPI_Thread_create_NUMA(LPSECURITY_ATTRIBUTES lpThreadAttributes,
- SIZE_T dwStackSize,
- LPTHREAD_START_ROUTINE lpStartAddress,
- LPVOID lpParameter,
- DWORD dwCreationFlags,
- LPDWORD lpThreadId)
-{
- LPPROC_THREAD_ATTRIBUTE_LIST pAttributeList = NULL;
- HANDLE hThread = NULL;
- SIZE_T cbAttributeList = 0;
- GROUP_AFFINITY GroupAffinity;
- PROCESSOR_NUMBER IdealProcessorNumber;
- ULONG CurrentProcessorIndex;
-
- /* for each thread created, round-robin through the set of valid
- processors and affinity masks.
- the assumption is that callers of tMPI_Thread_create_NUMA are creating
- threads that saturate a given processor.
- for cases where threads are being created that rarely do work, standard
- thread creation (eg: CreateThread) should be invoked instead.
- */
-
- CurrentProcessorIndex = (ULONG)InterlockedIncrement((volatile LONG *)&g_ulThreadIndex);
- CurrentProcessorIndex = CurrentProcessorIndex % g_ulTotalProcessors;
-
- /* group, mask. */
-
- memcpy(&GroupAffinity,
- &(g_MPI_ProcessorInfo[CurrentProcessorIndex].GroupAffinity),
- sizeof(GROUP_AFFINITY));
-
- /* group, processor number */
-
- memcpy(&IdealProcessorNumber,
- &(g_MPI_ProcessorInfo[CurrentProcessorIndex].ProcessorNumber),
- sizeof(PROCESSOR_NUMBER));
-
- /* determine size of allocation for AttributeList */
-
- if(!func_InitializeProcThreadAttributeList(pAttributeList,
- 2,
- 0,
- &cbAttributeList))
- {
- DWORD dwLastError = GetLastError();
- if( dwLastError != ERROR_INSUFFICIENT_BUFFER )
- {
- tMPI_Fatal_error(TMPI_FARGS,
- "InitializeProcThreadAttributeList, error code=%d",
- dwLastError);
- goto cleanup;
- }
- }
-
- pAttributeList = (LPPROC_THREAD_ATTRIBUTE_LIST)tMPI_Malloc( cbAttributeList );
- if( pAttributeList == NULL )
- {
- tMPI_Fatal_error(TMPI_FARGS,"Failed to allocate pAttributeList");
- goto cleanup;
- }
-
- memset( pAttributeList, 0, cbAttributeList );
-
- if(!func_InitializeProcThreadAttributeList(pAttributeList,
- 2,
- 0,
- &cbAttributeList))
- {
- tMPI_Fatal_error(TMPI_FARGS,
- "InitializeProcThreadAttributeList, error code=%d",
- GetLastError());
- goto cleanup;
- }
-
- if(!func_UpdateProcThreadAttribute(pAttributeList,
- 0,
- PROC_THREAD_ATTRIBUTE_GROUP_AFFINITY,
- &GroupAffinity,
- sizeof(GroupAffinity),
- NULL,
- NULL))
- {
- tMPI_Fatal_error(TMPI_FARGS,"UpdateProcThreadAttribute, error code=%d",
- GetLastError());
- goto cleanup;
- }
-
- if(!func_UpdateProcThreadAttribute(pAttributeList,
- 0,
- PROC_THREAD_ATTRIBUTE_IDEAL_PROCESSOR,
- &IdealProcessorNumber,
- sizeof(IdealProcessorNumber),
- NULL,
- NULL))
- {
- tMPI_Fatal_error(TMPI_FARGS,"UpdateProcThreadAttribute, error code=%d",
- GetLastError());
- goto cleanup;
- }
-
-
- hThread = func_CreateRemoteThreadEx( GetCurrentProcess(),
- lpThreadAttributes,
- dwStackSize,
- lpStartAddress,
- lpParameter,
- dwCreationFlags,
- pAttributeList,
- lpThreadId);
-
- func_DeleteProcThreadAttributeList( pAttributeList );
-
-#if 0
- // TODO: debug only or DISCARD
- if( hThread )
- {
- PROCESSOR_NUMBER ProcNumber;
- USHORT NodeNumber;
-
- GetThreadIdealProcessorEx(hThread, &ProcNumber);
- GetNumaProcessorNodeEx(&ProcNumber, &NodeNumber);
-
- printf("started thread tid=%lu group=%lu mask=0x%I64x number=%lu numanode=%lu\n",
- *lpThreadId,
- GroupAffinity.Group,
- (ULONGLONG)GroupAffinity.Mask,
- ProcNumber.Number,
- NodeNumber
- );
- }
-#endif
-
-cleanup:
-
- if( pAttributeList )
- {
- tMPI_Free( pAttributeList );
- }
-
- return hThread;
-}
int tMPI_Thread_get_hw_number(void)
{
}
+
+
int tMPI_Thread_create(tMPI_Thread_t *thread,
void *(*start_routine)(void *), void *arg)
{
return EINVAL;
}
/* just create a plain thread. */
+ (*thread)->started_by_tmpi=1;
(*thread)->th = CreateThread(NULL,
0,
tMPI_Win32_thread_starter,
-int tMPI_Thread_create_aff(tMPI_Thread_t *thread,
- void *(*start_routine)(void *), void *arg)
-{
- DWORD thread_id;
- struct tMPI_Thread_starter_param *prm;
-
- tMPI_Init_initers();
- tMPI_Set_main_thread_affinity();
-
- /* a small memory leak to be sure that it doesn't get deallocated
- once this function ends, before the newly created thread uses it. */
- prm=(struct tMPI_Thread_starter_param*)
- tMPI_Malloc(sizeof(struct tMPI_Thread_starter_param));
- prm->start_routine= start_routine;
- prm->param=arg;
-
- *thread=(struct tMPI_Thread*)tMPI_Malloc(sizeof(struct tMPI_Thread)*1);
-
- if(thread==NULL)
- {
- tMPI_Fatal_error(TMPI_FARGS,"Invalid thread pointer.");
- return EINVAL;
- }
-
- if( g_ulHighestNumaNodeNumber != 0 )
- {
- /* if running on a NUMA system, use the group and NUMA aware thread
- creation logic */
- (*thread)->th = tMPI_Thread_create_NUMA(NULL,
- 0,
- tMPI_Win32_thread_starter,
- prm,
- 0,
- &thread_id);
- } else {
- /* TODO: for now, non-NUMA systems don't set thread affinity. */
- (*thread)->th = CreateThread(NULL,
- 0,
- tMPI_Win32_thread_starter,
- prm,
- 0,
- &thread_id);
- }
-
- if((*thread)->th==NULL)
- {
- tMPI_Free(thread);
- tMPI_Fatal_error(TMPI_FARGS,"Failed to create thread, error code=%d",
- GetLastError());
- return -1;
- }
-
- /* inherit the thread priority from the parent thread. */
- /* TODO: is there value in setting this, vs. just allowing it to default
- from the process? currently, this limits the effectivenes of changing
- the priority in eg: TaskManager. */
- SetThreadPriority(((*thread)->th), GetThreadPriority(GetCurrentThread()));
-
- return 0;
-}
-
int tMPI_Thread_join(tMPI_Thread_t thread, void **value_ptr)
}
+tMPI_Thread_t tMPI_Thread_self(void)
+{
+ tMPI_Thread_t th;
+ tMPI_Init_initers();
+
+ th=(struct tMPI_Thread*)TlsGetValue(thread_id_key);
+ /* check if it is already in our list */
+ if (th == NULL)
+ {
+ if (GetLastError() != ERROR_SUCCESS )
+ {
+ return NULL;
+ }
+ /* if not, create an ID, set it and return it */
+ th=(struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
+
+ /* to create a handle that can be used outside of the current
+ thread, the handle from GetCurrentThread() must first
+ be duplicated.. */
+ DuplicateHandle(GetCurrentProcess(),
+ GetCurrentThread(),
+ GetCurrentProcess(),
+ &th->th,
+ 0,
+ FALSE,
+ DUPLICATE_SAME_ACCESS);
+
+ /* This causes a small memory leak that is hard to fix. */
+ th->started_by_tmpi=0;
+ TlsSetValue(thread_id_key, th);
+ }
+ return th;
+}
+
+
+int tMPI_Thread_equal(tMPI_Thread_t t1, tMPI_Thread_t t2)
+{
+ /* because the thread IDs are unique, we can compare them directly */
+ return (t1 == t2);
+}
+
+int tMPI_Thread_setaffinity_single(tMPI_Thread_t thread, unsigned int nr)
+{
+ GROUP_AFFINITY GroupAffinity;
+ PROCESSOR_NUMBER IdealProcessorNumber;
+ /* thread NUMA node */
+ USHORT NumaNodeNumber;
+
+ /* check for a processor info array. This exists if NUMA
+ style calls have been succesfully initialized. */
+ if( g_MPI_ProcessorInfo != NULL )
+ {
+
+ /*func_GetCurrentProcessorNumberEx(&CurrentProcessorNumber);*/
+ /* group, mask. */
+ memcpy(&GroupAffinity,
+ &(g_MPI_ProcessorInfo[nr].GroupAffinity),
+ sizeof(GROUP_AFFINITY));
+
+ /* group, processor number */
+
+ memcpy(&IdealProcessorNumber,
+ &(g_MPI_ProcessorInfo[nr].ProcessorNumber),
+ sizeof(PROCESSOR_NUMBER));
+
+
+ /* set the NUMA node affinity for the current thread
+ failures to set the current thread affinity are ignored,
+ as a fringe case can arise on >32 processor systems with a 32bit
+ build/code.
+ */
+ func_SetThreadIdealProcessorEx(thread->th,
+ &IdealProcessorNumber,
+ NULL);
+
+ if(func_GetNumaProcessorNodeEx(&IdealProcessorNumber,
+ &NumaNodeNumber))
+ {
+ /* for the NUMA node number associated with the current processor
+ number, get the group affinity mask */
+ if(func_GetNumaNodeProcessorMaskEx(NumaNodeNumber,
+ &GroupAffinity))
+ {
+ /* set the current thread affinity to prevent it from running on
+ other NUMA nodes */
+ func_SetThreadGroupAffinity(thread->th,
+ &GroupAffinity,
+ NULL);
+ }
+ }
+ }
+ else
+ {
+ /* No NUMA-style calls. We just do a simpler thing. */
+ if ( (func_SetThreadIdealProcessor != NULL) )
+ {
+ return func_SetThreadIdealProcessor(thread->th, nr);
+ }
+ }
+ return 0;
+}
+
int tMPI_Thread_mutex_init(tMPI_Thread_mutex_t *mtx)
struct tMPI_Thread
{
- HANDLE th;
+ HANDLE th; /* the thread handle */
+ int started_by_tmpi; /* whether this thread was started by tmpi */
};
struct tMPI_Thread_key
fflush(stderr);
/* now spawn new threads that start mdrunner_start_fn(), while
the main thread returns */
- ret=tMPI_Init_fn(TRUE, hw_opt->nthreads_tmpi,
+ ret=tMPI_Init_fn(TRUE, hw_opt->nthreads_tmpi, TMPI_AFFINITY_ALL_CORES,
mdrunner_start_fn, (void*)(mda) );
if (ret!=TMPI_SUCCESS)
return NULL;