endif(NOT DEFINED TMPI_ATOMICS)
ENDMACRO(TEST_TMPI_ATOMICS VARIABLE)
+MACRO(TMPI_MAKE_CXX_LIB)
+ set(TMPI_CXX_LIB 1)
+ # the C++ library
+ set(THREAD_MPI_CXX_SRC
+ thread_mpi/system_error.cpp )
+ENDMACRO(TMPI_MAKE_CXX_LIB)
include(FindThreads)
if (CMAKE_USE_PTHREADS_INIT)
thread_mpi/group.c thread_mpi/tmpi_init.c
thread_mpi/topology.c thread_mpi/list.c
thread_mpi/type.c thread_mpi/lock.c
- thread_mpi/numa_malloc.c thread_mpi/once.c)
+ thread_mpi/numa_malloc.c thread_mpi/once.c
+ thread_mpi/scan.c thread_mpi/tmpi_malloc.c)
set(THREAD_LIB ${CMAKE_THREAD_LIBS_INIT})
else (CMAKE_USE_PTHREADS_INIT)
if (CMAKE_USE_WIN32_THREADS_INIT)
thread_mpi/topology.c thread_mpi/list.c
thread_mpi/type.c thread_mpi/lock.c
thread_mpi/winthreads.c thread_mpi/once.c
- thread_mpi/numa_malloc.c)
+ thread_mpi/numa_malloc.c
+ thread_mpi/scan.c thread_mpi/tmpi_malloc.c)
set(THREAD_LIBRARY )
endif (CMAKE_USE_WIN32_THREADS_INIT)
endif (CMAKE_USE_PTHREADS_INIT)
+
# the spin-waiting option
option(THREAD_MPI_WAIT_FOR_NO_ONE "Use busy waits without yielding to the OS scheduler. Turning this on might improve performance (very) slightly at the cost of very poor performance if the threads are competing for CPU time." OFF)
mark_as_advanced(THREAD_MPI_WAIT_FOR_NO_ONE)
include(CheckCSourceCompiles)
-# Windows NUMA allocator
-if (THREAD_WINDOWS)
- check_c_source_compiles(
- "#include <windows.h>
- int main(void) { PROCESSOR_NUMBER a; return 0; }"
- HAVE_PROCESSOR_NUMBER)
- if(HAVE_PROCESSOR_NUMBER)
- #add_definitions(-DTMPI_WINDOWS_NUMA_API)
- set(TMPI_WINDOWS_NUMA_API 1)
- endif(HAVE_PROCESSOR_NUMBER)
-endif(THREAD_WINDOWS)
-
# option to set affinity
option(THREAD_MPI_SET_AFFINITY "Set thread affinity to a core if number of threads equal to number of hardware threads." ON)
mark_as_advanced(THREAD_MPI_SET_AFFINITY)
files.
*/
-#ifndef _TMPI_ATOMIC_H_
-#define _TMPI_ATOMIC_H_
+#ifndef TMPI_ATOMIC_H_
+#define TMPI_ATOMIC_H_
/*! \file atomic.h
*
too */
#if ( (defined(__GNUC__) || defined(__PATHSCALE__) || defined(__PGI)) && (!defined(__xlc__)) )
+
+
+
/* now check specifically for several architectures: */
-#if (defined(i386) || defined(__x86_64__))
+#if ((defined(i386) || defined(__x86_64__)) && ! defined(__OPEN64__))
/* first x86: */
#include "atomic/gcc_x86.h"
/*#include "atomic/gcc.h"*/
#error No atomic operations implemented for this cpu/compiler combination.
#endif
+/** Indicates that no support for atomic operations is present. */
#define TMPI_NO_ATOMICS
-/* System mutex used for locking to guarantee atomicity */
+/** System mutex used for locking to guarantee atomicity */
static tMPI_Thread_mutex_t tMPI_Atomic_mutex = TMPI_THREAD_MUTEX_INITIALIZER;
/** Atomic operations datatype
*/
typedef struct tMPI_Atomic
{
- int value;
+ int value; /**< The atomic value. */
}
tMPI_Atomic_t;
*/
typedef struct tMPI_Atomic_ptr
{
- void* value;
+ void* value; /**< The atomic pointer value. */
}
tMPI_Atomic_ptr_t;
*/
static inline int tMPI_Spinlock_islocked(const tMPI_Spinlock_t *x)
{
- int rc;
-
if(tMPI_Spinlock_trylock(x) != 0)
{
/* It was locked */
*/
static inline void tMPI_Spinlock_wait(tMPI_Spinlock_t *x)
{
- int rc;
-
tMPI_Spinlock_lock(x);
/* Got the lock now, so the waiting is over */
tMPI_Spinlock_unlock(x);
#endif
-#endif /* _TMPI_ATOMIC_H_ */
+#endif /* TMPI_ATOMIC_H_ */
files.
*/
-#ifndef _TMPI_BARRIER_H_
-#define _TMPI_BARRIER_H_
+#ifndef TMPI_BARRIER_H_
+#define TMPI_BARRIER_H_
#include "wait.h"
tMPI_Atomic_t count; /*!< Number of threads remaining */
int threshold; /*!< Total number of threads */
volatile int cycle; /*!< Current cycle (alternating 0/1) */
- TMPI_YIELD_WAIT_DATA;
+ TMPI_YIELD_WAIT_DATA
};
#define tMPI_Barrier_N(barrier) ((barrier)->threshold)
#endif
-#endif
+#endif /* TMPI_BARRIER_H_ */
files.
*/
-#ifndef _THREAD_MPI_COLLECTIVE_H_
-#define _THREAD_MPI_COLLECTIVE_H_
+#ifndef TMPI_COLLECTIVE_H_
+#define TMPI_COLLECTIVE_H_
/** \file
*
tMPI_Comm comm;
} tMPI_Reduce_req;
+/** Allocate data structure for asynchronous reduce. */
tMPI_Reduce_req *tMPI_Reduce_req_alloc(tMPI_Comm comm);
#if 0
-/** Execute fast a asynchronious reduce over comm.
+/** Execute fast a asynchronous reduce over comm.
Reduces array input with supplied funtion. This function may return before
the input array is ready to be written to again; to check for its completion,
} /* closing extern "C" */
#endif
-#endif /* _THREAD_MPI_COLLECTIVE_H_ */
+#endif /* TMPI_COLLECTIVE_H_ */
files.
*/
-#ifndef _TMPI_EVENT_H_
-#define _TMPI_EVENT_H_
+#ifndef TMPI_EVENT_H_
+#define TMPI_EVENT_H_
#include "wait.h"
{
tMPI_Atomic_t sync; /* the event sync counter */
int last_sync; /* the last sync event looked at */
- TMPI_YIELD_WAIT_DATA; /* data associated with yielding */
+ TMPI_YIELD_WAIT_DATA /* data associated with yielding */
};
Sets the number of events that had occurred during the wait in N.
\param ev The event structure to wait on.
- \ret The number of events that have occurred at function
+ \returns The number of events that have occurred at function
return time. */
int tMPI_Event_wait(tMPI_Event *ev);
}
#endif
-#endif
+#endif /* TMPI_EVENT_H_ */
Returns the total number of cores and SMT threads that can run.
- \ret The maximum number of threads that can run simulataneously. If this
- number cannot be determined for the current architecture, -1 is
- returned.
+ \returns The maximum number of threads that can run simulataneously.
+ If this number cannot be determined for the current architecture,
+ -1 is returned.
*/
int tMPI_Get_hw_nthreads(void);
equal to the number of hardware threads available, or 1 if the number
can't be determined, or if there are no atomics for this platform.
- \ret The maximum number of threads to run on.
+ \returns The maximum number of threads to run on.
*/
int tMPI_Get_recommended_nthreads(void);
files.
*/
-#ifndef _TMPI_LIST_H_
-#define _TMPI_LIST_H_
+#ifndef TMPI_LIST_H_
+#define TMPI_LIST_H_
#include "atomic.h"
Is a list with push, pop and detach operations */
typedef struct
{
- tMPI_Atomic_ptr_t head;
+ tMPI_Atomic_ptr_t head; /**< Pointer to the top stack element. */
} tMPI_Stack;
/** A single element in stack */
typedef struct tMPI_Stack_element
{
- struct tMPI_Stack_element *next; /*< pointer to the next stack element */
- void *data; /*< pointer to data */
+ struct tMPI_Stack_element *next; /**< Pointer to the next stack element. */
+ void *data; /**< Pointer to data. */
} tMPI_Stack_element;
-
-/** Lock-free double-ended queue (LIFO)
+#if 0
+/** Lock-free double-ended queue (FIFO)
Is a list with enqueue and dequeue operations */
typedef struct
/** A single element in a queue */
typedef struct tMPI_Queue_element
{
- struct tMPI_Queue_element *next,*prev; /*< pointer to the next, prev queue
- element */
- void *data; /*< pointer to data */
+ struct tMPI_Queue_element *next; /**< Pointer to the next queue element. */
+ struct tMPI_Queue_element *prev; /**< Pointer to the prev queue element. */
+ void *data; /**< Pointer to data. */
} tMPI_Queue_element;
/** Initialize a queue */
struct tMPI_List_element *next, *prev;
void *data;
} tMPI_List_element;
-
+
+/** Initialize a list */
void tMPI_List_init(tMPI_List *l);
+/** Deallocates a list */
void tMPI_List_destroy(tMPI_List *l);
tMPI_List_element* tMPI_List_first(tMPI_List *l);
void tMPI_List_insert(tMPI_List *l, tMPI_List_element *after,
tMPI_List_element *le);
void tMPI_List_remove(tMPI_List *l, tMPI_List_element *le);
+#endif
#ifdef __cplusplus
} /* closing extern "C" */
#endif
-#endif /* _TMPI_H_ */
+#endif /* TMPI_LIST_H_ */
files.
*/
-#ifndef _TMPI_FASTLOCK_H_
-#define _TMPI_FASTLOCK_H_
+#ifndef TMPI_FASTLOCK_H_
+#define TMPI_FASTLOCK_H_
#include "wait.h"
#include "atomic.h"
struct tMPI_Lock
{
tMPI_Spinlock_t lock; /*!< The underlying spin lock */
- TMPI_YIELD_WAIT_DATA;
+ TMPI_YIELD_WAIT_DATA
};
-#endif
+#endif /* TMPI_FASTLOCK_H_ */
files.
*/
-#ifndef _MPI_BINDINGS_H_
-#define _MPI_BINDINGS_H_
+#ifndef TMPI_MPI_BINDINGS_H_
+#define TMPI_MPI_BINDINGS_H_
/** \file
\brief MPI bindings for thread_mpi/tmpi.h
typedef struct tmpi_status_ MPI_Status;
/* data types */
typedef struct tmpi_datatype_ *MPI_Datatype;
+/* reduce operations */
+typedef tMPI_Op MPI_Op;
+
#define MPI_CHAR TMPI_CHAR
#define MPI_Reduce tMPI_Reduce
#define MPI_Allreduce tMPI_Allreduce
+#define MPI_Scan tMPI_Scan
#ifdef __cplusplus
} /* closing extern "C" */
#endif
-#endif /* _MPI_BINDINGS_H_ */
+#endif /* TMPI_MPI_BINDINGS_H_ */
--- /dev/null
+/*
+This source code file is part of thread_mpi.
+Written by Sander Pronk, Erik Lindahl, and possibly others.
+
+Copyright (c) 2009, Sander Pronk, Erik Lindahl.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+1) Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+2) Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+3) Neither the name of the copyright holders nor the
+ names of its contributors may be used to endorse or promote products
+ derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
+EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
+DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+If you want to redistribute modifications, please consider that
+scientific software is very special. Version control is crucial -
+bugs must be traceable. We will be happy to consider code for
+inclusion in the official distribution, but derived work should not
+be called official thread_mpi. Details are found in the README & COPYING
+files.
+*/
+
+/** \file
+ *
+ * \brief mutex objects with C++11 API compatibility.
+ *
+ * This header contains classes mutex and lock_guard, used in C++ mutex
+ * implementations safe for exceptions.
+*/
+
+#ifndef TMPI_MUTEX_H_
+#define TMPI_MUTEX_H_
+
+#include "system_error.h"
+#include "threads.h"
+
+#ifdef __cplusplus
+
+
+namespace tMPI
+{
+ /*! \brief A lock guard class that allows for the simple management of
+ mutexes. C++11 compatible.
+
+ In C++, mutexes would normally have to be unlocked with explicit
+ exception handlers and unlock statements. This class automates that
+ by handling the mutex unlock in a destructor. The constructor locks
+ the mutex.
+
+ Usage example:
+ tMPI::mutex mtx;
+ void do_count()
+ {
+ tMPI::lock_guard<tMPI::mutex> lock(mtx);
+ count += 1;
+ }
+ */
+ template <class Mutex> class lock_guard
+ {
+ public:
+ typedef Mutex mutex_type;
+ /*! \brief The constructor, which locks the mutex.
+
+ \param m The exisiting (globally accessible) mutex to lock. */
+ explicit lock_guard(mutex_type &m) : m_(m)
+ {
+ m_.lock();
+ }
+ //lock_guard(mutex_type &m, adopt_lock_t t);
+
+ /*! \brief The destructor, which unlocks the mutex */
+ ~lock_guard()
+ {
+ m_.unlock();
+ }
+ private:
+ // forbid copy constructor & assignment
+ lock_guard(const lock_guard &l);
+ lock_guard& operator=(const lock_guard &l);
+
+ mutex_type &m_;
+ };
+
+ /*! \brief A basic mutex class with C++11 compatibility. */
+ class mutex
+ {
+ public:
+ typedef tMPI_Thread_mutex_t* native_handle_type;
+
+ /*! \brief The constructor.
+
+ Throws a tMPI::system_error exception upon failure. */
+ mutex()
+ {
+ int ret=tMPI_Thread_mutex_init(&handle_);
+ if (ret)
+ throw system_error(ret);
+ }
+
+ /*! \brief The destructor.*/
+ ~mutex()
+ {
+ tMPI_Thread_mutex_destroy(&handle_);
+ }
+
+ /*! \brief The lock function.
+
+ Throws a tMPI::system_error exception upon failure. */
+ void lock()
+ {
+ int ret=tMPI_Thread_mutex_lock(&handle_);
+ if (ret)
+ throw system_error(ret);
+ }
+
+ /*! \brief The try_lock function.
+
+ Throws a tMPI::system_error exception upon failure.
+ \return true if the lock was locked successfully, false if not*/
+ bool try_lock()
+ {
+ if (tMPI_Thread_mutex_trylock(&handle_))
+ return false;
+ return true;
+ }
+
+ /*! \brief The unlock function.
+
+ Throws a tMPI::system_error exception upon failure. */
+ void unlock()
+ {
+ int ret=tMPI_Thread_mutex_unlock(&handle_);
+ if (ret)
+ throw system_error(ret);
+ }
+
+ native_handle_type native_handle() { return &handle_; }
+ private:
+ // forbid copy constructor & assignment
+ mutex(const mutex &m);
+ mutex& operator=(const mutex &m);
+
+ tMPI_Thread_mutex_t handle_;
+ };
+}
+
+#endif /* __cplusplus */
+
+#endif /* TMPI_MUTEX_H_ */
+
*/
-#ifndef _TMPI_NUMA_MALLOC_H_
-#define _TMPI_NUMA_MALLOC_H_
+#ifndef TMPI_NUMA_MALLOC_H_
+#define TMPI_NUMA_MALLOC_H_
-/*! \file numa_alloc.h
+/*! \file
\brief NUMA aware memory allocators.
Currently this is only implemented on Windows. Check for the presence
of these functions with
+ \code
#ifdef TMPI_NUMA_MALLOC
....
#endif
+ \endcode
*/
}
#endif
-#endif /* _TMPI_NUMA_MALLOC_H_ */
+#endif /* TMPI_NUMA_MALLOC_H_ */
--- /dev/null
+/*
+This source code file is part of thread_mpi.
+Written by Sander Pronk, Erik Lindahl, and possibly others.
+
+Copyright (c) 2009, Sander Pronk, Erik Lindahl.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+1) Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+2) Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+3) Neither the name of the copyright holders nor the
+ names of its contributors may be used to endorse or promote products
+ derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
+EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
+DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+If you want to redistribute modifications, please consider that
+scientific software is very special. Version control is crucial -
+bugs must be traceable. We will be happy to consider code for
+inclusion in the official distribution, but derived work should not
+be called official thread_mpi. Details are found in the README & COPYING
+files.
+*/
+
+/** \file
+ * \brief A C++11 compatible system_error class for reporting exceptions
+ *
+ * This header contains class definitions for system_error.
+ */
+
+#ifndef TMPI_SYSTEM_ERROR_H_
+#define TMPI_SYSTEM_ERROR_H_
+
+#include <stdexcept>
+
+#ifdef __cplusplus
+
+
+namespace tMPI
+{
+ /*! \brief Subset of the C++11 system_error class
+
+ Only contains the errno-based constructor. */
+ class system_error : public std::runtime_error
+ {
+ public:
+ typedef int error_code;
+
+ //system_error(error_code ec, const std::string& what_arg);
+ //system_error(error_code ec, const char* what_arg);
+ /*! \brief Constuctor that takes an system error number */
+ system_error(error_code ec) ;
+
+ /*! \brief Returns the error code */
+ const error_code& code() const
+ {
+ return ec_;
+ }
+ private:
+ error_code ec_;
+ };
+}
+
+#endif /* __cplusplus */
+
+#endif /* TMPI_SYSTEM_ERROR_H_ */
*/
-#ifndef _TMPI_THREAD_H_
-#define _TMPI_THREAD_H_
+#ifndef TMPI_THREADS_H_
+#define TMPI_THREADS_H_
/*! \file threads.h
*
*/
typedef struct
{
- tMPI_Atomic_t initialized;
- struct tMPI_Mutex* mutex;
+ tMPI_Atomic_t initialized; /*!< Whether \a mutex has been initialized. */
+ struct tMPI_Mutex* mutex; /*!< Actual mutex data structure. */
} tMPI_Thread_mutex_t;
/*! \brief Static initializer for tMPI_Thread_mutex_t
*
*/
typedef struct
{
- tMPI_Atomic_t initialized;
- struct tMPI_Thread_key *key;
+ tMPI_Atomic_t initialized; /*!< Whether \a key has been initialized. */
+ struct tMPI_Thread_key *key; /*!< Actual key data structure. */
} tMPI_Thread_key_t;
*/
typedef struct
{
- tMPI_Atomic_t once;
+ tMPI_Atomic_t once; /*!< Whether the operation has been performed. */
} tMPI_Thread_once_t;
/*! \brief Static initializer for tMPI_Thread_once_t
*
*/
typedef struct
{
- tMPI_Atomic_t initialized;
- struct tMPI_Thread_cond* condp;
+ tMPI_Atomic_t initialized; /*!< Whether \a condp has been initialized. */
+ struct tMPI_Thread_cond* condp; /*!< Actual condition variable data structure. */
} tMPI_Thread_cond_t;
/*! \brief Static initializer for tMPI_Thread_cond_t
*
*/
typedef struct
{
- tMPI_Atomic_t initialized;
- struct tMPI_Thread_barrier* barrierp;
+ tMPI_Atomic_t initialized; /*!< Whether \a barrierp has been initialized. */
+ struct tMPI_Thread_barrier* barrierp; /*!< Actual barrier data structure. */
volatile int threshold; /*!< Total number of members in barrier */
volatile int count; /*!< Remaining count before completion */
volatile int cycle; /*!< Alternating 0/1 to indicate round */
\param message format string for error message.
*/
void tMPI_Fatal_error(const char *file, int line, const char *message, ...);
+/** Convenience macro for the first two arguments to tMPI_Fatal_error(). */
#define TMPI_FARGS __FILE__,__LINE__
Returns the total number of cores and SMT threads that can run.
- \ret The maximum number of threads that can run simulataneously. If this
- number cannot be determined for the current architecture, 0 is
- returned. */
+ \returns The maximum number of threads that can run simulataneously.
+ If this number cannot be determined for the current architecture,
+ 0 is returned.
+ */
int tMPI_Thread_get_hw_number(void);
*
* This routine always return directly. If the mutex was available and
* we successfully locked it we return 0, otherwise a non-zero
- * error code (usually meaning the mutex was already locked).
+ * return code (usually meaning the mutex was already locked).
*
* \param mtx Pointer to the mutex to try and lock
- * \return 0 or a non-zero error code.
+ * \return 0 or a non-zero return error code.
*/
int tMPI_Thread_mutex_trylock(tMPI_Thread_mutex_t *mtx);
}
#endif
-#endif /* _TMPI_THREAD_H_ */
+#endif /* TMPI_THREADS_H_ */
files.
*/
-#ifndef _TMPI_H_
-#define _TMPI_H_
+#ifndef TMPI_TMPI_H_
+#define TMPI_TMPI_H_
/** \file
*
/** Wait until some of several messages are transferred. Waits until at least
one message is transferred.
- \param[in] count The number of requests
+ \param[in] incount The number of requests
\param[in,out] array_of_requests List of count requests obtained with
tMPI_Isend()/tMPI_Irecv().
\param[out] outcount Number of completed requests
/** Test whether some of several messages are transferred.
- \param[in] count The number of requests
+ \param[in] incount The number of requests
\param[in,out] array_of_requests List of count requests obtained with
tMPI_Isend()/tMPI_Irecv().
\param[out] outcount Number of completed requests
\param[in] sendbuf The operand parameters. Any process may specify
TMPI_IN_PLACE, in which case recvbuf will hold
the operand parameters for that process.
- \param[out] recvbuf The result buffer.
+ \param[in,out] recvbuf The result buffer.
\param[in] count The number of items to do operation on.
\param[in] datatype The data type of the items.
\param[in] op The operation to perform.
int tMPI_Reduce_fast(void* sendbuf, void* recvbuf, int count,
tMPI_Datatype datatype, tMPI_Op op, int root,
tMPI_Comm comm);
+
+/** Do a partial reduce operation, based on rank: the results of the
+ reduction operation of ranks 0 - i will be put in the recvbuf of
+ rank i.
+
+ Collective function.
+
+ \param[in] sendbuf The operand parameters. All ranks may specify
+ TMPI_IN_PLACE, in which case recvbuf will hold
+ the operand parameters.
+ \param[in,out] recvbuf The result buffer.
+ \param[in] count The number of items to do operation on.
+ \param[in] datatype The data type of the items.
+ \param[in] op The operation to perform.
+ \param[in] comm The communicator.
+
+ \return TMPI_SUCCESS on success, TMPI_FAILURE on failure. */
+int tMPI_Scan(void* sendbuf, void* recvbuf, int count,
+ tMPI_Datatype datatype, tMPI_Op op, tMPI_Comm comm);
+
+
/*! \} */
} /* closing extern "C" */
#endif
-#endif /* _TMPI_H_ */
+#endif /* TMPI_TMPI_H_ */
*/
-#ifndef _TMPI_WAIT_H_
-#define _TMPI_WAIT_H_
+#ifndef TMPI_WAIT_H_
+#define TMPI_WAIT_H_
#ifndef TMPI_WAIT_FOR_NO_ONE
lead to starvation. This mixed approach actually gives better real-world
performance in the test program.*/
/* the data associated with waiting. */
-#define TMPI_YIELD_WAIT_DATA int yield_wait_counter
+#define TMPI_YIELD_WAIT_DATA int yield_wait_counter;
/* the initialization associated with waiting. */
#define TMPI_YIELD_WAIT_DATA_INIT(data) { (data)->yield_wait_counter=0; }
#endif /* !TMPI_WAIT_FOR_NO_ONE */
-#endif
+#endif /* TMPI_WAIT_H_ */
# Note that not all .c files are compiled directly: some of them
# are #included (some multiple times) from other source files.
set(THREAD_MPI_LIB_SOURCE
- alltoall.c reduce.c
+ alltoall.c reduce.c scan.c
barrier.c list.c reduce_fast.c
bcast.c lock.c scatter.c
collective.c once.c tmpi_init.c
comm.c p2p_protocol.c topology.c
errhandler.c p2p_send_recv.c type.c
- event.c p2p_wait.c
+ event.c p2p_wait.c tmpi_malloc.c
gather.c profile.c
- group.c numa_malloc.c)
+ group.c numa_malloc.c )
if (THREAD_PTHREADS)
# make it link to the threads library (-lpthreads, for example)
target_link_libraries(thread_mpi ${THREAD_LIB})
+if (TMPI_CXX_LIB)
+ set(THREAD_MPI_CXX_LIB_SOURCE
+ system_error.cpp )
+ add_library(thread_mpi_cxx STATIC ${THREAD_MPI_CXX_LIB_SOURCE})
+ target_link_libraries(thread_mpi_cxx thread_mpi)
+endif (TMPI_CXX_LIB)
#configure_file(tmpi_config.h.cmakein tmpi_config.h)
#add_definitions(-DHAVE_TMPI_CONFIG_H)
include_directories(${CMAKE_CURRENT_BINARY_DIR})
add_definitions(-DHAVE_TMPI_CONFIG_H)
+install(TARGETS thread_mpi DESTINATION lib)
/* the p2p communication events (incoming envelopes + finished send
envelopes generate events) */
tMPI_Event p2p_event;
- TMPI_YIELD_WAIT_DATA; /* data associated with waiting */
+ TMPI_YIELD_WAIT_DATA /* data associated with waiting */
struct req_list rql; /* list of pre-allocated requests */
/* collective communication structures: */
#endif
+/* reduce ops: run a single iteration of a reduce operation on a, b -> dest */
+int tMPI_Reduce_run_op(void *dest, void *src_a, void *src_b,
+ tMPI_Datatype datatype, int count, tMPI_Op op,
+ tMPI_Comm comm);
/* and we need this prototype */
{
return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
}
- send_dst = tMPI_Get_thread(comm, dest);
+ send_dst=tMPI_Get_thread(comm, dest);
if (!send_dst)
{
return tMPI_Error(comm, TMPI_ERR_SEND_DEST);
"Alltoall",
"Alltoallv",
"Reduce",
- "Allreduce"
+ "Allreduce",
+ "Scan"
};
TMPIFN_Reduce,
TMPIFN_Allreduce,
+ TMPIFN_Scan,
TMPIFN_Nfunctions
};
#include "collective.h"
-
-/* run a single binary reduce operation on src_a and src_b, producing dest.
- dest and src_a may be identical */
-static int tMPI_Reduce_run_op(void *dest, void *src_a, void *src_b,
- tMPI_Datatype datatype, int count, tMPI_Op op,
- tMPI_Comm comm);
-
-
-static int tMPI_Reduce_run_op(void *dest, void *src_a, void *src_b,
- tMPI_Datatype datatype, int count, tMPI_Op op,
- tMPI_Comm comm)
+int tMPI_Reduce_run_op(void *dest, void *src_a, void *src_b,
+ tMPI_Datatype datatype, int count, tMPI_Op op,
+ tMPI_Comm comm)
{
tMPI_Op_fn fn=datatype->op_functions[op];
a=recvbuf;
b=(void*)tMPI_Atomic_ptr_get(&(comm->reduce_recvbuf[nbr]));
}
- /* here we check for overlapping buffers */
- if (a==b)
- {
- return tMPI_Error(comm, TMPI_ERR_XFER_BUF_OVERLAP);
- }
-
if ((ret=tMPI_Reduce_run_op(recvbuf, a, b, datatype,
count, op, comm)) != TMPI_SUCCESS)
return ret;
}
+
--- /dev/null
+/*
+This source code file is part of thread_mpi.
+Written by Sander Pronk, Erik Lindahl, and possibly others.
+
+Copyright (c) 2009, Sander Pronk, Erik Lindahl.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+1) Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+2) Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+3) Neither the name of the copyright holders nor the
+ names of its contributors may be used to endorse or promote products
+ derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
+EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
+DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+If you want to redistribute modifications, please consider that
+scientific software is very special. Version control is crucial -
+bugs must be traceable. We will be happy to consider code for
+inclusion in the official distribution, but derived work should not
+be called official thread_mpi. Details are found in the README & COPYING
+files.
+*/
+
+#ifdef HAVE_TMPI_CONFIG_H
+#include "tmpi_config.h"
+#endif
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+
+#include <errno.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <stdarg.h>
+#include <string.h>
+
+#include "impl.h"
+#include "collective.h"
+
+
+
+int tMPI_Scan(void* sendbuf, void* recvbuf, int count,
+ tMPI_Datatype datatype, tMPI_Op op, tMPI_Comm comm)
+{
+ struct tmpi_thread *cur=tMPI_Get_current();
+ int myrank=tMPI_Comm_seek_rank(comm, cur);
+ int N=tMPI_Comm_N(comm);
+ int ret;
+ int prev=myrank - 1; /* my previous neighbor */
+ int next=myrank + 1; /* my next neighbor */
+
+#ifdef TMPI_PROFILE
+ tMPI_Profile_count_start(cur);
+#endif
+#ifdef TMPI_TRACE
+ tMPI_Trace_print("tMPI_Scan(%p, %p, %d, %p, %p, %p)",
+ sendbuf, recvbuf, count, datatype, op, comm);
+#endif
+ if (count==0)
+ return TMPI_SUCCESS;
+ if (!recvbuf)
+ {
+ return tMPI_Error(comm, TMPI_ERR_BUF);
+ }
+ if (sendbuf==TMPI_IN_PLACE)
+ {
+ sendbuf=recvbuf;
+ }
+
+ /* we set our send and recv buffers */
+ tMPI_Atomic_ptr_set(&(comm->reduce_sendbuf[myrank]),sendbuf);
+ tMPI_Atomic_ptr_set(&(comm->reduce_recvbuf[myrank]),recvbuf);
+
+ /* now wait for the previous rank to finish */
+ if (myrank > 0)
+ {
+ void *a, *b;
+#if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
+ tMPI_Profile_wait_start(cur);
+#endif
+ /* wait for the previous neighbor's data to be ready */
+ tMPI_Event_wait( &(comm->csync[myrank].events[prev]) );
+ tMPI_Event_process( &(comm->csync[myrank].events[prev]), 1);
+#if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
+ tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);
+#endif
+#ifdef TMPI_DEBUG
+ printf("%d: scanning with %d \n", myrank, prev, iteration);
+ fflush(stdout);
+#endif
+ /* now do the reduction */
+ if (prev > 0)
+ {
+ a = (void*)tMPI_Atomic_ptr_get(&(comm->reduce_recvbuf[prev]));
+ }
+ else
+ {
+ a = (void*)tMPI_Atomic_ptr_get(&(comm->reduce_sendbuf[prev]));
+ }
+ b = sendbuf;
+
+ if ((ret=tMPI_Reduce_run_op(recvbuf, a, b, datatype,
+ count, op, comm)) != TMPI_SUCCESS)
+ {
+ return ret;
+ }
+
+ /* signal to my previous neighbor that I'm done with the data */
+ tMPI_Event_signal( &(comm->csync[prev].events[prev]) );
+ }
+ else
+ {
+ if (sendbuf != recvbuf)
+ {
+ /* copy the data if this is rank 0, and not MPI_IN_PLACE */
+ memcpy(recvbuf, sendbuf, count*datatype->size);
+ }
+ }
+
+ if (myrank < N-1)
+ {
+ /* signal to my next neighbor that I have the data */
+ tMPI_Event_signal( &(comm->csync[next].events[myrank]) );
+ /* and wait for my next neighbor to finish */
+ tMPI_Event_wait( &(comm->csync[myrank].events[myrank]) );
+ tMPI_Event_process( &(comm->csync[myrank].events[myrank]), 1);
+ }
+
+
+#if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
+ tMPI_Profile_wait_start(cur);
+#endif
+ /*tMPI_Barrier_wait( &(comm->barrier));*/
+#if defined(TMPI_PROFILE)
+ /*tMPI_Profile_wait_stop(cur, TMPIWAIT_Reduce);*/
+ tMPI_Profile_count_stop(cur, TMPIFN_Scan);
+#endif
+ return ret;
+}
+
+
+
--- /dev/null
+/*
+This source code file is part of thread_mpi.
+Written by Sander Pronk, Erik Lindahl, and possibly others.
+
+Copyright (c) 2009, Sander Pronk, Erik Lindahl.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+1) Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+2) Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+3) Neither the name of the copyright holders nor the
+ names of its contributors may be used to endorse or promote products
+ derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
+EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
+DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+If you want to redistribute modifications, please consider that
+scientific software is very special. Version control is crucial -
+bugs must be traceable. We will be happy to consider code for
+inclusion in the official distribution, but derived work should not
+be called official thread_mpi. Details are found in the README & COPYING
+files.
+*/
+
+#ifdef __cplusplus
+
+#include <cerrno>
+#include <cstring>
+#include <cstdlib>
+#include <stdexcept>
+#include "thread_mpi/system_error.h"
+
+tMPI::system_error::system_error(error_code ec)
+ : runtime_error(std::strerror(ec)), ec_(ec)
+{
+}
+
+#endif /* __cplusplus */
+
/* there are a few global variables that maintain information about the
running threads. Some are defined by the MPI standard: */
-tMPI_Comm TMPI_COMM_WORLD=NULL;
+/* TMPI_COMM_WORLD is in tmpi_malloc.c due to technical reasons */
tMPI_Group TMPI_GROUP_EMPTY=NULL;
#endif
-void *tMPI_Malloc(size_t size)
-{
- void *ret=(void*)malloc(size);
-
- if (!ret)
- {
- tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_MALLOC);
- }
- return ret;
-}
-
-void *tMPI_Realloc(void *p, size_t size)
-{
- void *ret=(void*)realloc(p, size);
- if (!ret)
- {
- tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_MALLOC);
- }
- return ret;
-}
-
-void tMPI_Free(void *p)
-{
- free(p);
-}
-
-
#if 0
struct tmpi_thread *tMPI_Get_current(void)
{
--- /dev/null
+/*
+This source code file is part of thread_mpi.
+Written by Sander Pronk, Erik Lindahl, and possibly others.
+
+Copyright (c) 2009, Sander Pronk, Erik Lindahl.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+1) Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+2) Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+3) Neither the name of the copyright holders nor the
+ names of its contributors may be used to endorse or promote products
+ derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
+EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
+DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+If you want to redistribute modifications, please consider that
+scientific software is very special. Version control is crucial -
+bugs must be traceable. We will be happy to consider code for
+inclusion in the official distribution, but derived work should not
+be called official thread_mpi. Details are found in the README & COPYING
+files.
+*/
+
+
+#ifdef HAVE_TMPI_CONFIG_H
+#include "tmpi_config.h"
+#endif
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+
+#include <stdlib.h>
+
+
+#include "impl.h"
+
+
+/* there are a few global variables that maintain information about the
+ running threads. Some are defined by the MPI standard: */
+/* This is declared here because it is needed for the error handling */
+tMPI_Comm TMPI_COMM_WORLD=NULL;
+
+
+void *tMPI_Malloc(size_t size)
+{
+ void *ret=(void*)malloc(size);
+
+ if (!ret)
+ {
+ tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_MALLOC);
+ }
+ return ret;
+}
+
+void *tMPI_Realloc(void *p, size_t size)
+{
+ void *ret=(void*)realloc(p, size);
+ if (!ret)
+ {
+ tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_MALLOC);
+ }
+ return ret;
+}
+
+void tMPI_Free(void *p)
+{
+ free(p);
+}