Added thread affinity API to thread_mpi.
authorSander Pronk <pronk@cbr.su.se>
Fri, 12 Oct 2012 16:29:54 +0000 (18:29 +0200)
committerSander Pronk <pronk@cbr.su.se>
Mon, 15 Oct 2012 07:47:59 +0000 (09:47 +0200)
thread_mpi can now set thread affinity on Linux and Windows. Currently
that is limited to setting the affinity of any thread (including those
not started with thread_mpi) to any single core. The OS's core numbering
is maintained.

Change-Id: If0b7aa443f7da206b320a2bde5bd771a441fb2cb

include/thread_mpi/threads.h
include/thread_mpi/tmpi.h
src/gmxlib/thread_mpi/pthreads.c
src/gmxlib/thread_mpi/pthreads.h
src/gmxlib/thread_mpi/tmpi_init.c
src/gmxlib/thread_mpi/winthreads.c
src/gmxlib/thread_mpi/winthreads.h
src/kernel/runner.c

index 32d18af4499ac6d1388f33c90c095766d060caf2..7d5da2c6e309ff7b7cb64b1692163a02ed1aa9fb 100644 (file)
@@ -84,7 +84,7 @@ extern "C"
 
 
 
-/*! \brief Pthread implementation of the abstract tMPI_Thread type
+/*! \brief Thread ID: abstract tMPI_Thread type
  *
  *  The contents of this structure depends on the actual threads 
  *  implementation used.
@@ -243,6 +243,8 @@ void tMPI_Fatal_error(const char *file, int line, const char *message, ...);
 
 
 
+/*! \name Thread creation, destruction, and inspection
+    \{ */
 /** Check if threads are supported
  *
  *  This routine provides a cleaner way to check if threads are supported
@@ -274,9 +276,9 @@ int tMPI_Thread_get_hw_number(void);
  *
  *  Please be careful not to change arg after calling this function.
  * 
- *  \param thread          Pointer to opaque thread datatype
- *  \param start_routine   The function to call in the new thread
- *  \param arg             Argument to call with
+ *  \param[out] thread          Pointer to thread ID
+ *  \param[in] start_routine    The function to call in the new thread
+ *  \param[in] arg              Argument to call with
  *  
  *  \return Status - 0 on success, or an error code.
  */
@@ -298,9 +300,9 @@ int tMPI_Thread_create(tMPI_Thread_t *thread,
  *
  *  Please be careful not to change arg after calling this function.
  * 
- *  \param thread          Pointer to opaque thread datatype
- *  \param start_routine   The function to call in the new thread
- *  \param arg             Argument to call with
+ *  \param[out] thread         Pointer to thread ID
+ *  \param[in] start_routine   The function to call in the new thread
+ *  \param[in] arg             Argument to call with
  *  
  *  \return Status - 0 on success, or an error code.
  */
@@ -316,15 +318,77 @@ int tMPI_Thread_create_aff(tMPI_Thread_t *thread,
  *
  *  If the thread has already finished the routine returns immediately.
  *
- *  \param thread      Opaque thread datatype to wait for.
- *  \param value_ptr   Pointer to location where to store pointer to exit value
- *                     from threads that called tMPI_Thread_exit().
+ *  \param[in] thread       Pointer to thread ID
+ *  \param[out] value_ptr   Pointer to location where to store pointer to
+ *                          exit value from threads that called
+ *                          tMPI_Thread_exit().
  *  
  *  \return 0 if the join went ok, or a non-zero error code.
  */
 int tMPI_Thread_join(tMPI_Thread_t thread, void **value_ptr);
 
 
+/** Terminate calling thread
+ *
+ *  Die voluntarily.
+ *
+ *  \param value_ptr   Pointer to a return value. Threads waiting for us to
+ *                     join them can read this value if they try.
+ *  \return 
+ */
+void tMPI_Thread_exit(void *value_ptr);
+
+
+
+/** Ask a thread to exit
+ *
+ *  This routine tries to end the execution of another thread, but there are
+ *  no guarantees it will succeed.
+ *
+ *  \param thread     Handle to thread we want to see dead.
+ *  \return 0 or a non-zero error message.
+ */
+int tMPI_Thread_cancel(tMPI_Thread_t thread);
+
+
+
+
+/** Get a thread ID of the calling thread. 
+  *
+  * This function also works on threads not started with tMPI_Thread_create,
+  * or any other function in thread_mpi. This makes it possible to, for 
+  * example assign thread affinities to any thread. 
+  *
+  * \return A thread ID of the calling thread */
+tMPI_Thread_t tMPI_Thread_self(void);
+
+
+
+/** Check whether two thread pointers point to the same thread 
+  *
+  * \param[in]  t1  Thread ID 1
+  * \param[in]  t2  Thread ID 2
+  * \return non-zero if the thread structs refer to the same thread, 
+            0 if the threads are different*/
+int tMPI_Thread_equal(tMPI_Thread_t t1, tMPI_Thread_t t2);
+
+
+/** Set thread affinity to a single core
+  *
+  * This function sets the thread affinity of a thread to a a specific 
+  * numbered processor. This only works if the underlying operating system
+  * supports it. The processor number must be between 0 and the number returned
+  * by tMPI_Thread_get_hw_number().
+  *
+  * \param[in] thread   Thread ID of the thread to set affinity for
+  * \param[in] nr       Processor number to set affinity to */
+int tMPI_Thread_setaffinity_single(tMPI_Thread_t thread, unsigned int nr);
+
+
+/*! \} */
+/*! \name Mutexes 
+    \{ */
+
 
 /** Initialize a new mutex
  *
@@ -388,6 +452,10 @@ int tMPI_Thread_mutex_unlock(tMPI_Thread_mutex_t *mtx);
 
 
 
+/*! \} */
+/*! \name Thread-specific storage 
+    \{ */
+
 
 /** Initialize thread-specific-storage handle
  *
@@ -440,6 +508,10 @@ void * tMPI_Thread_getspecific(tMPI_Thread_key_t key);
 int tMPI_Thread_setspecific(tMPI_Thread_key_t key, void *value);
 
 
+/*! \} */
+/*! \name Run-once
+    \{ */
+
 
 /** Run the provided routine exactly once
  *
@@ -457,6 +529,9 @@ int tMPI_Thread_once(tMPI_Thread_once_t *once_data,
                      void (*init_routine)(void));    
 
 
+/*! \} */
+/*! \name Condition variables
+    \{ */
 
 /** Initialize condition variable
  *
@@ -531,32 +606,9 @@ int tMPI_Thread_cond_broadcast(tMPI_Thread_cond_t *cond);
 
 
 
-
-/** Terminate calling thread
- *
- *  Die voluntarily.
- *
- *  \param value_ptr   Pointer to a return value. Threads waiting for us to
- *                     join them can read this value if they try.
- *  \return 
- */
-void tMPI_Thread_exit(void *value_ptr);
-
-
-
-/** Ask a thread to exit
- *
- *  This routine tries to end the execution of another thread, but there are
- *  no guarantees it will succeed.
- *
- *  \param thread     Handle to thread we want to see dead.
- *  \return 0 or a non-zero error message.
- */
-int tMPI_Thread_cancel(tMPI_Thread_t thread);
-
-
-
-
+/*! \} */
+/*! \name Barriers
+    \{ */
 
 
 /** Initialize a synchronization barrier type
@@ -595,7 +647,7 @@ int tMPI_Thread_barrier_destroy(tMPI_Thread_barrier_t *barrier);
  */
 int tMPI_Thread_barrier_wait(tMPI_Thread_barrier_t *barrier);
 
-
+/*! \} */
 
 
 #ifdef __cplusplus
index 53fe4ae2bf8b35c1ba35abb40a12f9e6864ed2ff..d8d3fc7528a8d4243b6405262539e78a0f7d7a96 100644 (file)
@@ -55,6 +55,12 @@ files.
 */
 
 
+/* for size_t, include stddef.h - which is in C89. This is done  
+   regardless of whether we're compiling C++ or C code because the base
+   library for this is in C. */
+#include <stddef.h>
+
+
 #ifdef __cplusplus
 extern "C" 
 {  
@@ -63,12 +69,34 @@ extern "C"
 } /* Avoids screwing up auto-indentation */
 #endif
 
+
+
 /** tMPI definition. 
 
 Use this to check for thread_mpi with the preprocessor. */
 #define TMPI 
 
 
+/** tMPI initialization thread affinity strategy.
+
+    Used in the tMPI_Init_affinity() and  tMPI_Init_fn_affinity() functions,
+    to control how affinity is set. The default tMPI_Init() and tMPI_Init_fn()
+    functions use the TMPI_AFFINITY_ALL_CORES strategy.
+
+    These strategies are fairly basic. For more flexibility, use the 
+    tMPI_Set_affinity() function.*/
+typedef enum
+{
+    TMPI_AFFINITY_NONE=0,       /**< Do not set any thread affinity */
+    TMPI_AFFINITY_ALL_CORES,    /**< Only set affinity if the number of threads 
+                                     is equal to the number of hardware threads
+                                     (cores + hyperthreads). This is the only
+                                     safe way to set thread affinity, 
+                                     without clashes between multiple
+                                     instances of the same program. */
+} tMPI_Affinity_strategy;
+
+
 
 /** tMPI Communicator
   
@@ -150,6 +178,8 @@ enum
     TMPI_ERR_ENVELOPES,
     TMPI_ERR_REQUESTS,
     TMPI_ERR_IN_STATUS,
+    TMPI_ERR_PROCNR,                /*!< Hardware processor number (such as for 
+                                         thread affinity) error */
     TMPI_FAILURE,
     TMPI_ERR_UNKNOWN,
     N_TMPI_ERR  /* this must be the last one */
@@ -285,9 +315,9 @@ tMPI_Comm tMPI_Get_comm_self(void);
 
 /*! \name Initialization and exit functions 
     \{ */
-/** Traditional MPI initializer; spawns threads that start at 
-    the given function. 
-  
+/** Traditional MPI initializer; spawns threads that start at the given
+    function.
+
     Seeks the argument '-nt n', where n is the number of 
     threads that will be created. If n==0, the number of threads will
     be the recommended number of threads for this platform as obtained
@@ -297,17 +327,23 @@ tMPI_Comm tMPI_Get_comm_self(void);
     argc and argv. This function could be main(), or any other function; 
     calling this function again - whether from the started threads or from 
     the main thread - has no effect.
-    
-    \param[in] argc     argc of original main() invocation, or NULL
-    \param[in] argv     argv of original main() invocation, or NULL. 
-    \param[in] start_function Starting function of type 
-                        int start_function(int argc, char *argv[]);
+
+    On platforms that support thread affinity setting, this function will
+    use the 'all-cores' affinity strategy: it will only set thread affinity
+    if the number of threads is equal to the number of hardware threads 
+    (cores + hyperthreads). 
+
+    \param[in] argc             argc of original main() invocation, or NULL
+    \param[in] argv             argv of original main() invocation, or NULL. 
+    \param[in] start_function   Starting function of type 
+                                int start_function(int argc, char *argv[]);
 
     \return  TMPI_SUCCESS on success, TMPI_FAILURE on failure.  */
-int tMPI_Init(int *argc, char ***argv, int (*start_function)(int, char**));
+int tMPI_Init(int *argc, char ***argv, 
+              int (*start_function)(int, char**));
 
 
-/** Alternate thread MPI intializer and thread spawner.
+/** Generic init function thread MPI intializer and thread spawner.
   
     Creates N threads (including main thread) 
     that run the function start_function, which takes a void* argument, 
@@ -319,12 +355,17 @@ int tMPI_Init(int *argc, char ***argv, int (*start_function)(int, char**));
     If N==0, the number of threads will be the recommended number of 
     threads for this platform as obtained from tMPI_Get_recommended_ntreads(). 
 
+    Note that thread affinity strategy only has an effect when this is 
+    supported by the underlying platform. As of yet (2012), this is not the 
+    case for Mac OS X, for example.
+
     \param[in]  main_thread_returns   whether the control in the main thread 
                                       should return immediately (if true), or 
                                       the start_function() should be called 
                                       from the main thread, too (if false).
     \param[in] N                      The number of threads to start (or 0 to
                                       automatically determine this).
+    \param[in] aff_strategy           The thread affinity strategy to use.
     \param[in] start_function         The function to start threads at 
                                       (including main thread if 
                                       main_thread_returns). 
@@ -332,9 +373,14 @@ int tMPI_Init(int *argc, char ***argv, int (*start_function)(int, char**));
 
     \return  TMPI_FAILURE on failure, TMPI_SUCCESS on succes (after all
              threads have finished if main_thread_returns=true).  */
-int tMPI_Init_fn(int main_thread_returns, int N, 
+int tMPI_Init_fn(int main_thread_returns, int N,
+                 tMPI_Affinity_strategy aff_strategy,
                  void (*start_function)(void*), void *arg);
 
+
+
+
+
 /** get the number of threads from the command line
   
     can be called before tMPI_Init() 
index ebdf5c418223c45945e9196e169c0da13e943df6..c04413b2f134e0d7e3fbe027bab26a7670a9dcce 100644 (file)
@@ -87,9 +87,11 @@ static pthread_mutex_t cond_init=PTHREAD_MUTEX_INITIALIZER;
 /* mutex for initializing barriers */
 static pthread_mutex_t barrier_init=PTHREAD_MUTEX_INITIALIZER; 
 
-/* mutex for initializing barriers */
-static pthread_mutex_t aff_init=PTHREAD_MUTEX_INITIALIZER; 
-static int aff_thread_number=0;
+/* mutex for managing  thread IDs */
+static pthread_mutex_t thread_id_mutex=PTHREAD_MUTEX_INITIALIZER;
+static pthread_key_t thread_id_key;
+static int thread_id_key_initialized=0;
+
 
 
 /* TODO: this needs to go away!  (there's another one in winthreads.c)
@@ -131,70 +133,85 @@ int tMPI_Thread_get_hw_number(void)
     return ret;
 }
 
-int tMPI_Thread_create(tMPI_Thread_t *thread, void *(*start_routine)(void *),
-                       void *arg)
+/* destructor for thread ids */
+static void tMPI_Destroy_thread_id(void* thread_id)
 {
-    int ret;
-
-    if(thread==NULL)
+    struct tMPI_Thread *thread=(struct tMPI_Thread*)thread_id;
+    if (!thread->started_by_tmpi)
     {
-        tMPI_Fatal_error(TMPI_FARGS,"Invalid thread pointer.");
-        return EINVAL;
+        /* if the thread is started by tMPI, it must be freed in the join() 
+           call. */
+        free(thread_id);
     }
+}
 
-    *thread=(struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
-    ret=pthread_create(&((*thread)->th),NULL,start_routine,arg);
-
-    if(ret!=0)
+/* initialize the thread id vars if not already initialized */
+static void tMPI_Init_thread_ids(void)
+{
+    pthread_mutex_lock( &thread_id_mutex );
+    if (!thread_id_key_initialized)
     {
-        /* Cannot use tMPI_error() since messages use threads for locking */
-        tMPI_Fatal_error(TMPI_FARGS,"Failed to create POSIX thread:%s, rc=%d",
-                         strerror(errno), ret);
-        /* Use system memory allocation routines */
-        return -1;
-    }
+        /* initialize and set the thread id thread-specific variable */
+        struct tMPI_Thread *main_thread;
 
-    return 0;
+        thread_id_key_initialized=1;
+        pthread_key_create(&thread_id_key, tMPI_Destroy_thread_id);
+        main_thread=(struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
+        main_thread->th=pthread_self();
+        main_thread->started_by_tmpi=0;
+        pthread_setspecific(thread_id_key, main_thread);
+    }
+    pthread_mutex_unlock( &thread_id_mutex );
 }
 
+/* structure to hold the arguments for the thread_starter function */
+struct tMPI_Thread_starter
+{
+    struct tMPI_Thread *thread;
+    void *(*start_routine)(void*);
+    void *arg;
+};
 
-/* set thread's own affinity to a processor number n */
-static int tMPI_Set_affinity(int n)
+/* the thread_starter function that sets the thread id */
+static void *tMPI_Thread_starter(void *arg)
 {
-#ifdef HAVE_PTHREAD_SETAFFINITY
-    cpu_set_t set;
+    struct tMPI_Thread_starter *starter=(struct tMPI_Thread_starter *)arg;
+    void *(*start_routine)(void*);
+    void *parg;
 
-    CPU_ZERO(&set);
-    CPU_SET(n, &set);
-    return pthread_setaffinity_np(pthread_self(), sizeof(set), &set);
-#endif
-    return 0;
+    pthread_setspecific(thread_id_key, starter->thread);
+    start_routine=starter->start_routine;
+    parg=starter->arg;
+
+    free(starter);
+    return (*start_routine)(parg);
 }
 
-int tMPI_Thread_create_aff(tMPI_Thread_t *thread, 
-                           void *(*start_routine)(void *),
-                           void *arg)
+int tMPI_Thread_create(tMPI_Thread_t *thread, void *(*start_routine)(void *),
+                       void *arg)
 {
     int ret;
-
-#ifdef TMPI_SET_AFFINITY
-    /* set the calling thread's affinity mask */
-    pthread_mutex_lock( &(aff_init) );
-    if (aff_thread_number==0)
-    {
-        tMPI_Set_affinity(aff_thread_number++);
-    }
-    pthread_mutex_unlock( &(aff_init) );
-#endif
+    struct tMPI_Thread_starter *starter;
 
     if(thread==NULL)
     {
         tMPI_Fatal_error(TMPI_FARGS,"Invalid thread pointer.");
         return EINVAL;
     }
+    tMPI_Init_thread_ids();
 
     *thread=(struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
-    ret=pthread_create(&((*thread)->th),NULL,start_routine,arg);
+    (*thread)->started_by_tmpi = 1;
+    starter=(struct tMPI_Thread_starter*)
+              malloc(sizeof(struct tMPI_Thread_starter)*1);
+    /* fill the starter structure */
+    starter->thread=*thread;
+    starter->start_routine=start_routine;
+    starter->arg=arg;
+
+    /*ret=pthread_create(&((*thread)->th),NULL,start_routine,arg);*/
+    ret=pthread_create(&((*thread)->th),NULL,tMPI_Thread_starter,
+                       (void*)starter);
 
     if(ret!=0)
     {
@@ -204,22 +221,9 @@ int tMPI_Thread_create_aff(tMPI_Thread_t *thread,
         /* Use system memory allocation routines */
         return -1;
     }
-    else
-    {
-#ifdef TMPI_SET_AFFINITY
-        /* now set the affinity of the new thread */
-        pthread_mutex_lock( &(aff_init) );
-        ret=tMPI_Set_affinity(aff_thread_number++);
-        pthread_mutex_unlock( &(aff_init) );
-        /* failure is non-fatal, so we won't check the result */
-        return 0;
-#else
-        return 0;
-#endif
-    }
-}
-
 
+    return 0;
+}
 
 
 
@@ -228,10 +232,10 @@ int tMPI_Thread_join(tMPI_Thread_t thread, void **value_ptr)
     int ret;
     pthread_t th=thread->th;
 
-    free(thread);
     
     ret = pthread_join( th, value_ptr );
 
+    free(thread);
     if(ret != 0 )
     {
         tMPI_Fatal_error(TMPI_FARGS,"Failed to join POSIX thread. rc=%d",ret);
@@ -240,6 +244,52 @@ int tMPI_Thread_join(tMPI_Thread_t thread, void **value_ptr)
 }
 
 
+tMPI_Thread_t tMPI_Thread_self(void)
+{
+    tMPI_Thread_t th;
+    /* make sure the key var is set */
+    tMPI_Init_thread_ids();
+
+    th=pthread_getspecific(thread_id_key);
+
+    /* check if it is already in our list */
+    if (th == NULL)
+    {
+        /* if not, create an ID, set it and return it */
+        th=(struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
+        th->started_by_tmpi=0;
+        th->th=pthread_self();
+        pthread_setspecific(thread_id_key, th);
+    }
+    return th;
+}
+
+int tMPI_Thread_equal(tMPI_Thread_t t1, tMPI_Thread_t t2)
+{
+    return pthread_equal(t1->th, t2->th);
+}
+
+/* set thread's own affinity to a processor number n */
+int tMPI_Thread_setaffinity_single(tMPI_Thread_t thread, unsigned int nr)
+{
+#ifdef HAVE_PTHREAD_SETAFFINITY
+    int nt=tMPI_Thread_get_hw_number();
+    cpu_set_t set;
+
+    if (nt < nr)
+    {
+        return TMPI_ERR_PROCNR;
+    }
+
+    CPU_ZERO(&set);
+    CPU_SET(nr, &set);
+    return pthread_setaffinity_np(thread->th, sizeof(set), &set);
+#endif
+    return 0;
+}
+
+
+
 
 int tMPI_Thread_mutex_init(tMPI_Thread_mutex_t *mtx) 
 {
@@ -569,13 +619,13 @@ int tMPI_Thread_cond_broadcast(tMPI_Thread_cond_t *cond)
 
 
 
-void tMPI_Thread_exit(void *      value_ptr)
+void tMPI_Thread_exit(void *value_ptr)
 {
     pthread_exit(value_ptr);
 }
 
 
-int tMPI_Thread_cancel(tMPI_Thread_t     thread)
+int tMPI_Thread_cancel(tMPI_Thread_t thread)
 {
     return pthread_cancel(thread->th);
 }
index 674b4cacecfe152479d4898f3038e7d132dd0e44..ec31a342a20554279fa765151762f749d1ef0857 100644 (file)
@@ -39,7 +39,8 @@ files.
 
 struct tMPI_Thread
 {
-    pthread_t th;
+    pthread_t th;        /*!< The POSIX thread ID */
+    int started_by_tmpi; /*!< whether the thread is started by tMPI */
 };
 
 struct tMPI_Thread_key
index a577842b58a065c3cda7b0fa4c4b6ff256b9e9ee..7e59a300baabf3d351e7f6e09d9416b838a86433 100644 (file)
@@ -98,7 +98,9 @@ struct tmpi_global *tmpi_global=NULL;
 
 
 /* start N threads with argc, argv (used by tMPI_Init)*/
-void tMPI_Start_threads(tmpi_bool main_returns, int N, int *argc, char ***argv, 
+void tMPI_Start_threads(tmpi_bool main_returns, int N, 
+                        tMPI_Affinity_strategy aff_strategy,
+                        int *argc, char ***argv, 
                         void (*start_fn)(void*), void *start_arg,
                         int (*start_fn_main)(int, char**));
 
@@ -118,14 +120,19 @@ static void tMPI_Thread_destroy(struct tmpi_thread *th);
 void tMPI_Trace_print(const char *fmt, ...)
 {
     va_list argp;
-    struct tmpi_thread* th=tMPI_Get_current();
+    struct tmpi_thread* th=NULL;
     static tMPI_Thread_mutex_t mtx=TMPI_THREAD_MUTEX_INITIALIZER;
 
     tMPI_Thread_mutex_lock(&mtx);
     if (threads)
+    {
+        th=tMPI_Get_current();
         printf("THREAD %02d: ", (int)(th-threads));
+    }
     else
+    {
         printf("THREAD main: ");
+    }
     va_start(argp, fmt);
     vprintf(fmt, argp);
     printf("\n");
@@ -325,6 +332,7 @@ static void tMPI_Global_init(struct tmpi_global *g, int Nthreads)
 
 static void tMPI_Global_destroy(struct tmpi_global *g)
 {
+    tMPI_Thread_barrier_destroy(&(g->barrier));
     tMPI_Thread_mutex_destroy(&(g->timer_mutex));
     tMPI_Thread_mutex_destroy(&(g->comm_link_lock));
 }
@@ -358,13 +366,16 @@ static void* tMPI_Thread_starter(void *arg)
 }
 
 
-void tMPI_Start_threads(tmpi_bool main_returns, int N, int *argc, char ***argv, 
+void tMPI_Start_threads(tmpi_bool main_returns, int N, 
+                        tMPI_Affinity_strategy aff_strategy,
+                        int *argc, char ***argv, 
                         void (*start_fn)(void*), void *start_arg,
                         int (*start_fn_main)(int, char**))
 {
 #ifdef TMPI_TRACE
-    tMPI_Trace_print("tMPI_Start_threads(%d, %p, %p, %p, %p)", N, argc,
-                       argv, start_fn, start_arg);
+    tMPI_Trace_print("tMPI_Start_threads(%d, %d, %d, %d, %d, %p, %p, %p, %p)", 
+                      main_returns, N, aff_strategy, argc, argv, start_fn, 
+                      start_arg);
 #endif
     if (N>0) 
     {
@@ -419,7 +430,7 @@ void tMPI_Start_threads(tmpi_bool main_returns, int N, int *argc, char ***argv,
         }
 
         /* now check whether to set affinity */
-#ifdef TMPI_THREAD_AFFINITY
+        if (aff_strategy == TMPI_AFFINITY_ALL_CORES) 
         {
             int nhw=tMPI_Thread_get_hw_number();
             if ((nhw > 1) && (nhw == N))
@@ -427,25 +438,26 @@ void tMPI_Start_threads(tmpi_bool main_returns, int N, int *argc, char ***argv,
                 set_affinity=TRUE;
             }
         }
-#endif
+
+        /* set thread 0's properties */
+        threads[0].thread_id=tMPI_Thread_self();
+        if (set_affinity)
+        {
+            /* set the main thread's affinity */
+            tMPI_Thread_setaffinity_single(threads[0].thread_id, 0);
+        }
 
         for(i=1;i<N;i++) /* zero is the main thread */
         {
             int ret;
+            ret=tMPI_Thread_create(&(threads[i].thread_id), 
+                                   tMPI_Thread_starter,
+                                   (void*)&(threads[i]) ) ;
 
             if (set_affinity)
             {
-                ret=tMPI_Thread_create_aff(&(threads[i].thread_id), 
-                                           tMPI_Thread_starter,
-                                           (void*)&(threads[i]) ) ;
-            }
-            else
-            {
-                ret=tMPI_Thread_create(&(threads[i].thread_id), 
-                                       tMPI_Thread_starter,
-                                       (void*)&(threads[i]) ) ;
+                tMPI_Thread_setaffinity_single(threads[i].thread_id, i);
             }
-
             if(ret)
             {
                 tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_INIT);
@@ -461,7 +473,8 @@ void tMPI_Start_threads(tmpi_bool main_returns, int N, int *argc, char ***argv,
 }
 
 
-int tMPI_Init(int *argc, char ***argv, int (*start_function)(int, char**))
+int tMPI_Init(int *argc, char ***argv, 
+              int (*start_function)(int, char**))
 {
 #ifdef TMPI_TRACE
     tMPI_Trace_print("tMPI_Init(%p, %p, %p)", argc, argv, start_function);
@@ -472,7 +485,8 @@ int tMPI_Init(int *argc, char ***argv, int (*start_function)(int, char**))
     {
         int N=0;
         tMPI_Get_N(argc, argv, "-nt", &N);
-        tMPI_Start_threads(FALSE, N, argc, argv, NULL, NULL, start_function);
+        tMPI_Start_threads(FALSE, N, TMPI_AFFINITY_ALL_CORES, argc, argv, 
+                           NULL, NULL, start_function);
     }
     else
     {
@@ -483,7 +497,11 @@ int tMPI_Init(int *argc, char ***argv, int (*start_function)(int, char**))
     return TMPI_SUCCESS;
 }
 
+
+
+
 int tMPI_Init_fn(int main_thread_returns, int N, 
+                 tMPI_Affinity_strategy aff_strategy,
                  void (*start_function)(void*), void *arg)
 {
 #ifdef TMPI_TRACE
@@ -498,8 +516,8 @@ int tMPI_Init_fn(int main_thread_returns, int N,
 
     if (TMPI_COMM_WORLD==0 && N>=1) /* we're the main process */
     {
-        tMPI_Start_threads(main_thread_returns, N, 0, 0, start_function, arg
-                           NULL);
+        tMPI_Start_threads(main_thread_returns, N, aff_strategy
+                           0, 0, start_function, arg, NULL);
     }
     return TMPI_SUCCESS;
 }
index 98b89a92ad5fce261b747f1fafad1d4e05bccf97..c0dd8b71e68bd1446668273fc04227b137e9362e 100644 (file)
@@ -92,6 +92,24 @@ static tMPI_Atomic_t init_inited={ 0 };
 static tMPI_Spinlock_t main_thread_aff_lock=TMPI_SPINLOCK_INITIALIZER;
 static tMPI_Atomic_t main_thread_aff_set={ 0 };
 
+/* mutex for managing  thread IDs */
+/*static tMPI_Spinlock_t thread_id_lock=TMPI_SPINLOCK_INITIALIZER;*/
+static DWORD thread_id_key;
+
+
+
+
+/* data structure to keep track of thread key destructors. */
+typedef struct 
+{
+    void (*destructor) (void*);
+    DWORD key;
+} thread_key_destructors;
+
+static thread_key_destructors *destructors=NULL;
+
+
+
 /*
     NUMA and Processor Group awareness support.
 
@@ -131,13 +149,15 @@ ULONG g_ulHighestNumaNodeNumber=0;
 /* total number of processors in g_MPI_ProcessInfo array */
 ULONG g_ulTotalProcessors;
 /* array describing available processors, affinity masks, and NUMA node */
-MPI_NUMA_PROCESSOR_INFO *g_MPI_ProcessorInfo;   
+MPI_NUMA_PROCESSOR_INFO *g_MPI_ProcessorInfo=NULL;   
 
 /* function prototypes and variables to support obtaining function addresses 
    dynamically -- supports down-level operating systems */
 
 typedef BOOL (WINAPI *func_GetNumaHighestNodeNumber_t)( PULONG 
                                                         HighestNodeNumber );
+typedef DWORD (WINAPI *func_SetThreadIdealProcessor_t)( HANDLE hThread,
+                                                       DWORD dwIdealProcessor );
 typedef BOOL (WINAPI *func_SetThreadGroupAffinity_t)( HANDLE hThread, 
                             const GROUP_AFFINITY *GroupAffinity, 
                             PGROUP_AFFINITY PreviousGroupAffinity );
@@ -182,7 +202,8 @@ typedef WORD (WINAPI *func_GetActiveProcessorGroupCount_t)(void);
 
 
 /* WinXP SP2, WinXP64, WinSrv 2003 */
-func_GetNumaHighestNodeNumber_t             func_GetNumaHighestNodeNumber;              
+func_GetNumaHighestNodeNumber_t             func_GetNumaHighestNodeNumber;
+func_SetThreadIdealProcessor_t              func_SetThreadIdealProcessor;
 /* Windows 7, WinSrv 2008R2 */
 func_SetThreadGroupAffinity_t               func_SetThreadGroupAffinity;                
 func_SetThreadIdealProcessorEx_t            func_SetThreadIdealProcessorEx; 
@@ -198,69 +219,6 @@ func_UpdateProcThreadAttribute_t            func_UpdateProcThreadAttribute;
 func_DeleteProcThreadAttributeList_t        func_DeleteProcThreadAttributeList;         
 
 
-/* Set the main thread's affinity */
-static int tMPI_Set_main_thread_affinity(void)
-{
-    /* calling thread PROCESSOR_NUMBER */
-    PROCESSOR_NUMBER CurrentProcessorNumber;      
-    /* calling thread GROUP_AFFINITY */
-    GROUP_AFFINITY CurrentThreadGroupAffinity; 
-    /* calling thread NUMA node */
-    USHORT CurrentNumaNodeNumber;
-
-
-    /* we can pre-check because it's atomic */
-    if (tMPI_Atomic_get(&main_thread_aff_set) == 0)
-    {
-        /* this can be a spinlock because the chances of collision are low. */
-        tMPI_Spinlock_lock( &main_thread_aff_lock );
-        if( g_ulHighestNumaNodeNumber != 0 )
-        {
-            func_GetCurrentProcessorNumberEx(&CurrentProcessorNumber);
-
-
-            /* set the NUMA node affinity for the current thread
-               failures to set the current thread affinity are ignored, 
-               as a fringe case can arise on >32 processor systems with a 32bit 
-               build/code.
-               */
-            func_SetThreadIdealProcessorEx(GetCurrentThread(), 
-                                           &CurrentProcessorNumber, 
-                                           NULL);
-
-            if(func_GetNumaProcessorNodeEx(&CurrentProcessorNumber, 
-                                           &CurrentNumaNodeNumber))
-            {
-                /* for the NUMA node number associated with the current processor 
-                   number, get the group affinity mask */
-                if(func_GetNumaNodeProcessorMaskEx(CurrentNumaNodeNumber, 
-                                                   &CurrentThreadGroupAffinity))
-                {
-                    /* set the current thread affinity to prevent it from running on 
-                       other NUMA nodes */
-                    func_SetThreadGroupAffinity(GetCurrentThread(), 
-                                                &CurrentThreadGroupAffinity, 
-                                                NULL);
-                }
-            }
-        }
-        else
-        {
-            /* No NUMA. For now, we just do a similar thing. */
-            if ( (func_GetCurrentProcessorNumberEx != NULL)  &&
-                 (func_SetThreadIdealProcessorEx))
-            {
-                func_GetCurrentProcessorNumberEx(&CurrentProcessorNumber);
-                func_SetThreadIdealProcessorEx(GetCurrentThread(), 
-                                               &CurrentProcessorNumber, 
-                                               NULL);
-            }
-        }
-        tMPI_Atomic_set( &main_thread_aff_set, 1);
-        tMPI_Spinlock_unlock( &main_thread_aff_lock );
-    }
-    return 0;
-}
 
 /*  returns 0 on success.
     Success is returned if the system is non-NUMA, OR the system doesn't 
@@ -284,9 +242,9 @@ int tMPI_Init_NUMA(void)
     /* calling thread PROCESSOR_NUMBER */
     PROCESSOR_NUMBER CurrentProcessorNumber;      
     /* calling thread GROUP_AFFINITY */
-    GROUP_AFFINITY CurrentThreadGroupAffinity; 
+    /*GROUP_AFFINITY CurrentThreadGroupAffinity; */
     /* calling thread NUMA node */
-    USHORT CurrentNumaNodeNumber;
+    /*USHORT CurrentNumaNodeNumber;*/
 
     WORD wActiveGroupCount;
     WORD GroupIndex;
@@ -311,6 +269,7 @@ int tMPI_Init_NUMA(void)
     */
 
     func_GetNumaHighestNodeNumber = (func_GetNumaHighestNodeNumber_t) GetProcAddress( hModKernel32, "GetNumaHighestNodeNumber" );
+    func_SetThreadIdealProcessor = (func_SetThreadIdealProcessor_t) GetProcAddress( hModKernel32, "SetThreadIdealProcessor" );
 
     if( func_GetNumaHighestNodeNumber == NULL )
     {
@@ -325,11 +284,7 @@ int tMPI_Init_NUMA(void)
         return -1;
     }
 
-    if( ulHighestNumaNodeNumber == 0 )
-    {
-        /* system is not NUMA */
-        return 0;
-    }
+
 
     func_SetThreadGroupAffinity = (func_SetThreadGroupAffinity_t)GetProcAddress( hModKernel32, "SetThreadGroupAffinity" );
     func_SetThreadIdealProcessorEx = (func_SetThreadIdealProcessorEx_t)GetProcAddress( hModKernel32, "SetThreadIdealProcessorEx" );
@@ -359,6 +314,13 @@ int tMPI_Init_NUMA(void)
            isn't supported */
         return 0;
     }
+#if 0
+       if( ulHighestNumaNodeNumber == 0 )
+    {
+        /* system is not NUMA */
+        return 0;
+    }
+#endif
 
     /* count the active processors across the groups */
 
@@ -466,33 +428,7 @@ int tMPI_Init_NUMA(void)
         }
     }
 
-#if 0
-    /* set the NUMA node affinity for the current thread
-       failures to set the current thread affinity are ignored, 
-       as a fringe case can arise on >32 processor systems with a 32bit 
-       build/code.
-    */
-    func_SetThreadIdealProcessorEx(GetCurrentThread(), 
-                                   &CurrentProcessorNumber, 
-                                   NULL);
 
-    if(func_GetNumaProcessorNodeEx(&CurrentProcessorNumber, 
-                                   &CurrentNumaNodeNumber))
-    {
-        /* for the NUMA node number associated with the current processor 
-           number, get the group affinity mask */
-        if(func_GetNumaNodeProcessorMaskEx(CurrentNumaNodeNumber, 
-                                           &CurrentThreadGroupAffinity))
-        {
-            /* set the current thread affinity to prevent it from running on 
-               other NUMA nodes */
-            func_SetThreadGroupAffinity(GetCurrentThread(), 
-                                        &CurrentThreadGroupAffinity, 
-                                        NULL);
-        }
-    }
-#endif
     /* capture number of processors, highest NUMA node number, and processor 
        array */
     g_ulTotalProcessors = dwTotalProcessors;
@@ -542,6 +478,7 @@ static void tMPI_Init_initers(void)
             InitializeCriticalSection(&once_init);
             InitializeCriticalSection(&cond_init);
             InitializeCriticalSection(&barrier_init);
+            thread_id_key=TlsAlloc();
 
             /* fatal errors are handled by the routine by calling tMPI_Fatal_error() */
             tMPI_Init_NUMA();  
@@ -579,6 +516,7 @@ struct tMPI_Thread_starter_param
 {
     void *(*start_routine)(void*); /* the function */
     void *param; /* its parameter */
+    struct tMPI_Thread *thread;
 };
 
 static DWORD WINAPI tMPI_Win32_thread_starter( LPVOID lpParam ) 
@@ -586,151 +524,11 @@ static DWORD WINAPI tMPI_Win32_thread_starter( LPVOID lpParam )
     struct tMPI_Thread_starter_param *prm=
               (struct tMPI_Thread_starter_param*)lpParam;
 
+    TlsSetValue(thread_id_key, prm->thread);
     (prm->start_routine)(prm->param);
     return 0;
 }
 
-HANDLE tMPI_Thread_create_NUMA(LPSECURITY_ATTRIBUTES lpThreadAttributes,
-                               SIZE_T dwStackSize,
-                               LPTHREAD_START_ROUTINE lpStartAddress,
-                               LPVOID lpParameter,
-                               DWORD dwCreationFlags,
-                               LPDWORD lpThreadId)
-{
-    LPPROC_THREAD_ATTRIBUTE_LIST pAttributeList = NULL;
-    HANDLE hThread = NULL;
-    SIZE_T cbAttributeList = 0;
-    GROUP_AFFINITY GroupAffinity;
-    PROCESSOR_NUMBER IdealProcessorNumber;
-    ULONG CurrentProcessorIndex;
-
-    /* for each thread created, round-robin through the set of valid 
-       processors and affinity masks.
-       the assumption is that callers of tMPI_Thread_create_NUMA are creating 
-       threads that saturate a given processor.
-       for cases where threads are being created that rarely do work, standard 
-       thread creation (eg: CreateThread) should be invoked instead.
-    */
-
-    CurrentProcessorIndex = (ULONG)InterlockedIncrement((volatile LONG *)&g_ulThreadIndex);
-    CurrentProcessorIndex = CurrentProcessorIndex % g_ulTotalProcessors;
-
-    /* group, mask. */
-
-    memcpy(&GroupAffinity, 
-           &(g_MPI_ProcessorInfo[CurrentProcessorIndex].GroupAffinity), 
-           sizeof(GROUP_AFFINITY));
-
-    /* group, processor number */
-    
-    memcpy(&IdealProcessorNumber, 
-           &(g_MPI_ProcessorInfo[CurrentProcessorIndex].ProcessorNumber), 
-           sizeof(PROCESSOR_NUMBER)); 
-
-    /* determine size of allocation for AttributeList */
-
-    if(!func_InitializeProcThreadAttributeList(pAttributeList,
-                                               2,
-                                               0,
-                                               &cbAttributeList))
-    {
-        DWORD dwLastError = GetLastError();
-        if( dwLastError != ERROR_INSUFFICIENT_BUFFER )
-        {
-            tMPI_Fatal_error(TMPI_FARGS,
-                             "InitializeProcThreadAttributeList, error code=%d",
-                             dwLastError);
-            goto cleanup;
-        }
-    }
-
-    pAttributeList = (LPPROC_THREAD_ATTRIBUTE_LIST)tMPI_Malloc( cbAttributeList );
-    if( pAttributeList == NULL )
-    {
-        tMPI_Fatal_error(TMPI_FARGS,"Failed to allocate pAttributeList");
-        goto cleanup;
-    }
-
-    memset( pAttributeList, 0, cbAttributeList );
-
-    if(!func_InitializeProcThreadAttributeList(pAttributeList,
-                                               2,
-                                               0,
-                                               &cbAttributeList))
-    {
-        tMPI_Fatal_error(TMPI_FARGS,
-                         "InitializeProcThreadAttributeList, error code=%d",
-                         GetLastError());
-        goto cleanup;
-    }
-
-    if(!func_UpdateProcThreadAttribute(pAttributeList,
-                                       0,
-                                       PROC_THREAD_ATTRIBUTE_GROUP_AFFINITY,
-                                       &GroupAffinity,
-                                       sizeof(GroupAffinity),
-                                       NULL,
-                                       NULL))
-    {
-        tMPI_Fatal_error(TMPI_FARGS,"UpdateProcThreadAttribute, error code=%d",
-                         GetLastError());
-        goto cleanup;
-    }
-
-    if(!func_UpdateProcThreadAttribute(pAttributeList,
-                                       0,
-                                       PROC_THREAD_ATTRIBUTE_IDEAL_PROCESSOR,
-                                       &IdealProcessorNumber,
-                                       sizeof(IdealProcessorNumber),
-                                       NULL,
-                                       NULL))
-    {
-        tMPI_Fatal_error(TMPI_FARGS,"UpdateProcThreadAttribute, error code=%d",
-                         GetLastError());
-        goto cleanup;
-    }
-
-
-    hThread = func_CreateRemoteThreadEx( GetCurrentProcess(),
-                                         lpThreadAttributes,
-                                         dwStackSize,
-                                         lpStartAddress,
-                                         lpParameter,
-                                         dwCreationFlags,
-                                         pAttributeList,
-                                         lpThreadId);
-            
-    func_DeleteProcThreadAttributeList( pAttributeList );
-
-#if 0   
-       // TODO: debug only or DISCARD
-    if( hThread )
-    {
-        PROCESSOR_NUMBER ProcNumber;
-        USHORT NodeNumber;
-
-        GetThreadIdealProcessorEx(hThread, &ProcNumber);
-        GetNumaProcessorNodeEx(&ProcNumber, &NodeNumber);
-
-        printf("started thread tid=%lu group=%lu mask=0x%I64x number=%lu numanode=%lu\n",
-            *lpThreadId,
-            GroupAffinity.Group,
-            (ULONGLONG)GroupAffinity.Mask,
-            ProcNumber.Number,
-            NodeNumber
-            );
-    }
-#endif
-
-cleanup:
-    
-    if( pAttributeList )
-    {
-        tMPI_Free( pAttributeList );
-    }
-
-    return hThread;
-}
 
 int tMPI_Thread_get_hw_number(void)
 {
@@ -744,6 +542,8 @@ int tMPI_Thread_get_hw_number(void)
 }
 
 
+
+
 int tMPI_Thread_create(tMPI_Thread_t *thread,
                        void *(*start_routine)(void *), void *arg)
 {
@@ -767,6 +567,7 @@ int tMPI_Thread_create(tMPI_Thread_t *thread,
         return EINVAL;
     }
     /* just create a plain thread. */
+    (*thread)->started_by_tmpi=1;
     (*thread)->th = CreateThread(NULL,
                                  0,
                                  tMPI_Win32_thread_starter,
@@ -795,67 +596,6 @@ int tMPI_Thread_create(tMPI_Thread_t *thread,
 
 
 
-int tMPI_Thread_create_aff(tMPI_Thread_t *thread,
-                           void *(*start_routine)(void *), void *arg)
-{
-    DWORD thread_id;
-    struct tMPI_Thread_starter_param *prm;
-
-    tMPI_Init_initers();
-    tMPI_Set_main_thread_affinity();
-
-    /* a small memory leak to be sure that it doesn't get deallocated 
-       once this function ends, before the newly created thread uses it. */
-    prm=(struct tMPI_Thread_starter_param*)
-              tMPI_Malloc(sizeof(struct tMPI_Thread_starter_param));
-    prm->start_routine= start_routine;
-    prm->param=arg;
-
-    *thread=(struct tMPI_Thread*)tMPI_Malloc(sizeof(struct tMPI_Thread)*1);
-
-    if(thread==NULL)
-    {
-        tMPI_Fatal_error(TMPI_FARGS,"Invalid thread pointer.");
-        return EINVAL;
-    }
-
-    if( g_ulHighestNumaNodeNumber != 0 )
-    {
-        /* if running on a NUMA system, use the group and NUMA aware thread 
-           creation logic */
-        (*thread)->th = tMPI_Thread_create_NUMA(NULL,
-                                                0,
-                                                tMPI_Win32_thread_starter,
-                                                prm,
-                                                0, 
-                                                &thread_id);
-    } else {
-        /* TODO: for now, non-NUMA systems don't set thread affinity. */
-        (*thread)->th = CreateThread(NULL,
-                                     0,
-                                     tMPI_Win32_thread_starter,
-                                     prm,
-                                     0, 
-                                     &thread_id);
-    }
-
-    if((*thread)->th==NULL)
-    {
-        tMPI_Free(thread);
-        tMPI_Fatal_error(TMPI_FARGS,"Failed to create thread, error code=%d",
-                         GetLastError());
-        return -1;
-    }
-
-    /* inherit the thread priority from the parent thread. */
-    /* TODO: is there value in setting this, vs. just allowing it to default 
-       from the process?  currently, this limits the effectivenes of changing 
-       the priority in eg: TaskManager. */
-    SetThreadPriority(((*thread)->th), GetThreadPriority(GetCurrentThread()));
-
-    return 0;
-}
-
 
 
 int tMPI_Thread_join(tMPI_Thread_t thread, void **value_ptr)
@@ -911,6 +651,108 @@ int tMPI_Thread_cancel(tMPI_Thread_t thread)
 }
 
 
+tMPI_Thread_t tMPI_Thread_self(void)
+{
+    tMPI_Thread_t th;
+    tMPI_Init_initers();
+
+    th=(struct tMPI_Thread*)TlsGetValue(thread_id_key);
+    /* check if it is already in our list */
+    if (th == NULL) 
+    {
+        if (GetLastError() != ERROR_SUCCESS ) 
+        {
+            return NULL;
+        }
+        /* if not, create an ID, set it and return it */
+        th=(struct tMPI_Thread*)malloc(sizeof(struct tMPI_Thread)*1);
+
+        /* to create a handle that can be used outside of the current
+           thread, the handle from GetCurrentThread() must first
+           be duplicated.. */
+        DuplicateHandle(GetCurrentProcess(), 
+                        GetCurrentThread(), 
+                        GetCurrentProcess(),
+                        &th->th, 
+                        0,
+                        FALSE,
+                        DUPLICATE_SAME_ACCESS);
+
+        /* This causes a small memory leak that is hard to fix. */ 
+        th->started_by_tmpi=0;
+        TlsSetValue(thread_id_key, th);
+    }
+    return th;
+}
+
+
+int tMPI_Thread_equal(tMPI_Thread_t t1, tMPI_Thread_t t2)
+{
+    /* because the thread IDs are unique, we can compare them directly */
+    return (t1 == t2);
+}
+
+int tMPI_Thread_setaffinity_single(tMPI_Thread_t thread, unsigned int nr)
+{
+    GROUP_AFFINITY GroupAffinity;
+    PROCESSOR_NUMBER IdealProcessorNumber;
+    /* thread NUMA node */
+    USHORT NumaNodeNumber;
+
+    /* check for a processor info array. This exists if NUMA 
+       style calls have been succesfully initialized. */
+    if( g_MPI_ProcessorInfo != NULL )
+    {
+
+        /*func_GetCurrentProcessorNumberEx(&CurrentProcessorNumber);*/
+        /* group, mask. */
+        memcpy(&GroupAffinity, 
+               &(g_MPI_ProcessorInfo[nr].GroupAffinity), 
+               sizeof(GROUP_AFFINITY));
+
+        /* group, processor number */
+
+        memcpy(&IdealProcessorNumber, 
+               &(g_MPI_ProcessorInfo[nr].ProcessorNumber), 
+               sizeof(PROCESSOR_NUMBER)); 
+
+
+        /* set the NUMA node affinity for the current thread
+           failures to set the current thread affinity are ignored, 
+           as a fringe case can arise on >32 processor systems with a 32bit 
+           build/code.
+           */
+        func_SetThreadIdealProcessorEx(thread->th,
+                                       &IdealProcessorNumber,
+                                       NULL);
+
+        if(func_GetNumaProcessorNodeEx(&IdealProcessorNumber,
+                                       &NumaNodeNumber))
+        {
+            /* for the NUMA node number associated with the current processor 
+               number, get the group affinity mask */
+            if(func_GetNumaNodeProcessorMaskEx(NumaNodeNumber,
+                                               &GroupAffinity))
+            {
+                /* set the current thread affinity to prevent it from running on 
+                   other NUMA nodes */
+                func_SetThreadGroupAffinity(thread->th,
+                                            &GroupAffinity,
+                                            NULL);
+            }
+        }
+    }
+    else
+    {
+        /* No NUMA-style calls. We just do a simpler thing. */
+        if ( (func_SetThreadIdealProcessor != NULL) )
+        {
+            return func_SetThreadIdealProcessor(thread->th, nr);
+        }
+    }
+    return 0;
+}
+
 
 
 int tMPI_Thread_mutex_init(tMPI_Thread_mutex_t *mtx) 
index 3abe5373956bb482bffc145a82e85d2cb4ab527b..feb115d7ac5bbe48c2a7c4e4a9417d6b4ec0c14b 100644 (file)
@@ -40,7 +40,8 @@ files.
 
 struct tMPI_Thread
 {
-    HANDLE th;
+    HANDLE th;            /* the thread handle */
+    int started_by_tmpi;  /* whether this thread was started by tmpi */
 };
 
 struct tMPI_Thread_key
index 9cab66fb00c6fa933087cf797ad29e71f44815e0..654fcbccb05fba28094e22d6d8798417e5a52141 100644 (file)
@@ -263,7 +263,7 @@ static t_commrec *mdrunner_start_threads(gmx_hw_opt_t *hw_opt,
     fflush(stderr);
     /* now spawn new threads that start mdrunner_start_fn(), while 
        the main thread returns */
-    ret=tMPI_Init_fn(TRUE, hw_opt->nthreads_tmpi,
+    ret=tMPI_Init_fn(TRUE, hw_opt->nthreads_tmpi, TMPI_AFFINITY_ALL_CORES,
                      mdrunner_start_fn, (void*)(mda) );
     if (ret!=TMPI_SUCCESS)
         return NULL;