2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
39 /* this is the header file for the implementation side of the thread_mpi
40 library. It contains the definitions for all the internal data structures
41 and the prototypes for all the internal functions that aren't static. */
51 #if defined( _WIN32 ) || defined( _WIN64 )
55 #ifdef HAVE_SYS_TIME_H
66 #include "thread_mpi/atomic.h"
67 #include "thread_mpi/threads.h"
68 #include "thread_mpi/event.h"
69 #include "thread_mpi/tmpi.h"
70 #include "thread_mpi/collective.h"
71 #include "thread_mpi/barrier.h"
72 #include "thread_mpi/lock.h"
79 /**************************************************************************
83 **************************************************************************/
86 typedef int tmpi_bool;
92 #ifdef USE_COLLECTIVE_COPY_BUFFER
93 /**************************************************************************
95 PRE-ALLOCATED COMMUNICATION BUFFERS
97 **************************************************************************/
100 /* Buffer structure for collective communications. Every thread structure
101 has several of these ready to be used when the collective data
102 transmission is small enough for double copying to occur (i.e. the size
103 of the transmission is less than N*MAX_COPY_BUFFER_SIZE, where N is the
104 number of receiving threads). */
107 void *buf; /* the actual buffer */
108 struct copy_buffer *next; /* pointer to next free buffer in buffer_list */
109 size_t size; /* allocated size of buffer */
112 /* a list of copy_buffers of a specific size. */
113 struct copy_buffer_list
115 struct copy_buffer *cb; /* pointer to the first copy_buffer */
116 size_t size; /* allocated size of buffers in this list */
117 struct copy_buffer *cb_alloc; /* list as allocated */
118 int Nbufs; /* number of allocated buffers */
134 /**************************************************************************
136 POINT-TO-POINT COMMUNICATION DATA STRUCTURES
138 **************************************************************************/
140 /* the message envelopes (as described in the MPI standard).
141 These fully describes the message, and make each message unique (enough).
143 Transmitting data works by having the sender put a pointer to an envelope
144 onto the receiver's new envelope list corresponding to the originating
146 The sender then waits until the receiver finishes the transmission, while
147 matching all incoming new envelopes against its own list of receive
150 The receiver either directly matches its receiving envelope against
151 all previously un-matched sending envelopes, or, if no suitable envelope
152 is found, it puts the receive envelope on a receive list.
153 Once waiting for completion, the receiver matches against all incoming
156 /* the state of an individual point-to-point transmission */
159 env_unmatched = 0, /* the envelope has not had a match yet */
160 env_copying = 1, /* busy copying (only used for send envelope
161 by receiver if using_cpbuf is true,
162 but cb was still NULL). */
163 env_cb_available = 2, /* the copy buffer is available. Set by
164 the sender on a send_buffer. */
165 env_finished = 3 /* the transmission has finished */
169 /* the envelope. Held in tmpi_thread->evs[src_thread] for send envelopes,
170 or in tmpi_thread->evl for receive envelopes */
173 int tag; /* the tag */
174 tMPI_Comm comm; /* this is a structure shared across threads, so we
175 can test easily whether two threads are talking
176 about the same comm. */
178 struct tmpi_thread *src, *dest; /* these are pretty obvious */
180 void *buf; /* buffer to be sent */
181 size_t bufsize; /* the size of the data to be transmitted */
182 tMPI_Datatype datatype; /* the data type */
184 tmpi_bool nonblock; /* whether the receiver is non-blocking */
186 /* state, values from enum_envelope_state .
187 (there's a few busy-waits relying on this flag).
188 status=env_unmatched is the initial state.*/
191 /* the error condition */
194 /* the message status */
195 /*tMPI_Status *status;*/
197 /* prev and next envelopes in the send/recv_envelope_list linked list */
198 struct envelope *prev, *next;
200 tmpi_bool send; /* whether this is a send envelope (if TRUE), or a receive
201 envelope (if FALSE) */
202 #ifdef USE_SEND_RECV_COPY_BUFFER
203 tmpi_bool using_cb; /* whether a copy buffer is (going to be) used */
204 void * cb; /* the allocated copy buffer pointer */
206 /* the next and previous envelopes in the request list */
207 struct envelope *prev_req, *next_req;
209 /* the list I'm in */
210 struct recv_envelope_list *rlist;
211 struct send_envelope_list *slist;
216 /* singly linked lists of free send & receive envelopes belonging to a
218 struct free_envelope_list
220 struct envelope *head_recv; /* the first element in the linked list */
221 struct envelope *recv_alloc_head; /* the allocated recv list */
224 /* collection of send envelopes to a specific thread */
225 struct send_envelope_list
227 struct envelope *head_free; /* singly linked list with free send
228 envelopes. A single-thread LIFO.*/
229 #ifdef TMPI_LOCK_FREE_LISTS
230 tMPI_Atomic_ptr_t head_new; /* singly linked list with the new send
231 envelopes (i.e. those that are put there by
232 the sending thread, but not yet checked by
233 the receiving thread). This is a lock-free
234 shared detachable list.*/
235 tMPI_Atomic_ptr_t head_rts; /* singly linked list with free send
236 envelopes returned by the other thread.
237 This is a lock-free shared LIFO.*/
239 struct envelope *head_new; /* singly linked list with the new send
240 envelopes (i.e. those that are put there by
241 the sending thread, but not yet checked by
242 the receiving thread). */
243 struct envelope *head_rts; /* singly linked list with free send envelopes */
244 tMPI_Spinlock_t lock_new; /* this locks head_new */
245 tMPI_Spinlock_t lock_rts; /* this locks head_rts */
247 struct envelope *head_old; /* the old send envelopes, in a circular doubly
248 linked list. These have been checked by the
249 receiving thread against the existing
250 recv_envelope_list. */
252 struct envelope *alloc_head; /* the allocated send list */
253 size_t Nalloc; /* number of allocted sends */
256 struct recv_envelope_list
258 struct envelope *head; /* first envelope in this list */
259 struct envelope dummy; /* the dummy element for the list */
263 /* the request object for asynchronious operations. */
266 tmpi_bool finished; /* whether it's finished */
267 struct envelope *ev; /* the envelope */
269 struct tmpi_thread *source; /* the message source (for receives) */
270 tMPI_Comm comm; /* the comm */
271 int tag; /* the tag */
272 int error; /* error code */
273 size_t transferred; /* the number of transferred bytes */
274 tmpi_bool cancelled; /* whether the transmission was canceled */
276 struct tmpi_req_ *next, *prev; /* next,prev request in linked list,
277 used in the req_list, but also in
281 /* pre-allocated request object list */
284 struct tmpi_req_ *head; /* pre-allocated singly linked list of requests.
285 (i.e. reqs->prev is undefined). */
286 struct tmpi_req_ *alloc_head; /* the allocated block */
304 /**************************************************************************
306 MULTICAST COMMUNICATION DATA STRUCTURES
308 **************************************************************************/
310 /* these are data structures meant for keeping track of multicast operations
311 (tMPI_Bcast, tMPI_Gather, etc.). Because these operations are all collective
312 across the comm, and are always blocking, the protocol can be much simpler
313 than that for point-to-point communication through tMPI_Send/Recv, etc. */
315 /* unique tags for multicast & collective operations */
316 #define TMPI_BCAST_TAG 1
317 #define TMPI_GATHER_TAG 2
318 #define TMPI_GATHERV_TAG 3
319 #define TMPI_SCATTER_TAG 4
320 #define TMPI_SCATTERV_TAG 5
321 #define TMPI_REDUCE_TAG 6
322 #define TMPI_ALLTOALL_TAG 7
323 #define TMPI_ALLTOALLV_TAG 8
326 /* thread-specific part of the coll_env */
327 struct coll_env_thread
329 tMPI_Atomic_t current_sync; /* sync counter value for the current
331 tMPI_Atomic_t n_remaining; /* remaining threads count for each thread */
333 int tag; /* collective communication type */
334 tMPI_Datatype datatype; /* datatype */
336 void **buf; /* array of send/recv buffer values */
337 size_t *bufsize; /* array of number of bytes to send/recv */
339 #ifdef USE_COLLECTIVE_COPY_BUFFER
340 tmpi_bool using_cb; /* whether a copy buffer is (going to be) used */
341 tMPI_Atomic_t buf_readcount; /* Number of threads reading from buf
342 while using_cpbuf is true, but cpbuf
344 tMPI_Atomic_ptr_t *cpbuf; /* copy_buffer pointers. */
345 struct copy_buffer *cb; /* the copy buffer cpbuf points to */
348 tMPI_Event send_ev; /* event associated with being the sending thread.
349 Triggered when last receiving thread is ready,
350 and the coll_env_thread is ready for re-use. */
351 tMPI_Event recv_ev; /* event associated with being a receiving thread. */
353 tmpi_bool *read_data; /* whether we read data from a specific thread. */
356 /* Collective communications once sync. These run in parallel with
357 the collection of coll_env_threads*/
360 /* collective sync data */
361 tMPI_Atomic_t current_sync; /* sync counter value for the current
363 tMPI_Atomic_t n_remaining; /* remaining threads count */
365 void *res; /* result data for once calls. */
368 /* the collective communication envelope. There's a few of these per
369 comm, and each one stands for one collective communication call. */
372 struct coll_env_thread *met; /* thread-specific collective envelope data.*/
374 struct coll_env_coll coll;
378 /* multicast synchronization data structure. There's one of these for
379 each thread in each tMPI_Comm structure */
382 int synct; /* sync counter for coll_env_thread. */
383 int syncs; /* sync counter for coll_env_coll. */
385 tMPI_Event *events; /* One event for each other thread */
386 int N; /* the number of threads */
399 /**************************************************************************
401 THREAD DATA STRUCTURES
403 **************************************************************************/
405 /* information about a running thread. This structure is put in a
406 globally available array; the envelope exchange, etc. are all done through
407 the elements of this array.*/
410 tMPI_Thread_t thread_id; /* this thread's id */
412 /* p2p communication structures: */
414 /* the receive envelopes posted for other threads to check */
415 struct recv_envelope_list evr;
416 /* the send envelopes posted by other threadas */
417 struct send_envelope_list *evs;
418 /* free send and receive envelopes */
419 struct free_envelope_list envelopes;
420 /* number of finished send envelopes */
421 tMPI_Atomic_t ev_outgoing_received;
422 /* the p2p communication events (incoming envelopes + finished send
423 envelopes generate events) */
424 tMPI_Event p2p_event;
425 TMPI_YIELD_WAIT_DATA /* data associated with waiting */
426 struct req_list rql; /* list of pre-allocated requests */
428 /* collective communication structures: */
429 #ifdef USE_COLLECTIVE_COPY_BUFFER
430 /* copy buffer list for multicast communications */
431 struct copy_buffer_list cbl_multi;
434 /* miscellaneous data: */
436 tMPI_Comm self_comm; /* comms for MPI_COMM_SELF */
438 /* the per-thread profile structure that keeps call counts & wait times. */
439 struct tmpi_profile profile;
441 /* The start function (or NULL, if a main()-style start function is to
443 void (*start_fn)(void*);
444 /* The main()-style start function */
445 int (*start_fn_main)(int, char**);
446 /* the argument to the start function, if it's not main()*/
449 /* we copy these for each thread (providing these to main() is not
450 required by the MPI standard, but it's convenient). Note that we copy,
451 because some programs (like Gromacs) like to manipulate these. */
461 /**************************************************************************
463 ERROR HANDLER DATA STRUCTURES
465 **************************************************************************/
468 /* the error handler */
469 struct tmpi_errhandler_
472 tMPI_Errhandler_fn fn;
475 /* standard error handler functions */
476 void tmpi_errors_are_fatal_fn(tMPI_Comm *comm, int *err);
477 void tmpi_errors_return_fn(tMPI_Comm *comm, int *err);
483 /**************************************************************************
485 GLOBAL DATA STRUCTURE
487 **************************************************************************/
489 /* global MPI information */
492 /* list of pointers to all user-defined types */
493 struct tmpi_datatype_ **usertypes;
495 int Nalloc_usertypes;
497 /* spinlock/mutex for manipulating tmpi_user_types */
498 tMPI_Spinlock_t datatype_lock;
500 /* Lock to prevent multiple threads manipulating the linked list of comm
502 tMPI_Thread_mutex_t comm_link_lock;
504 /* barrier for tMPI_Finalize(), etc. */
505 tMPI_Thread_barrier_t barrier;
507 /* the timer for tMPI_Wtime() */
508 tMPI_Thread_mutex_t timer_mutex;
509 #if !(defined( _WIN32 ) || defined( _WIN64 ) )
510 /* the time at initialization. */
511 struct timeval timer_init;
513 /* the time at initialization. */
532 /**************************************************************************
534 COMMUNICATOR DATA STRUCTURES
536 **************************************************************************/
541 int N; /* the number of threads */
542 struct tmpi_thread **peers; /* the list of peers to communicate with */
544 int Nrefs; /* the number of references to this structure */
549 /* the communicator objects are globally shared. */
552 struct tmpi_group_ grp; /* the communicator group */
554 /* the barrier for tMPI_Barrier() */
555 tMPI_Barrier_t barrier;
558 /* List of barriers for reduce operations.
559 reduce_barrier[0] contains a list of N/2 barriers for N threads
560 reduce_barrier[1] contains a list of N/4 barriers for N/2 threads
561 reduce_barrier[2] contains a list of N/8 barriers for N/4 threads
562 and so on. (until N/x reaches 1)
563 This is to facilitate tree-based algorithms for tMPI_Reduce, etc. */
564 tMPI_Barrier_t **reduce_barrier;
565 int *N_reduce; /* the number of barriers in each iteration */
566 int N_reduce_iter; /* the number of iterations */
569 struct coll_env *cev; /* list of multicast envelope objecs */
570 struct coll_sync *csync; /* list of multicast sync objecs */
572 /* lists of globally shared send/receive buffers for tMPI_Reduce. */
573 tMPI_Atomic_ptr_t *reduce_sendbuf, *reduce_recvbuf;
575 /* mutex for communication object creation. Traditional mutexes are
576 better here because communicator creation should not be done in
577 time-critical sections of code. */
578 tMPI_Thread_mutex_t comm_create_lock;
579 tMPI_Thread_cond_t comm_create_prep;
580 tMPI_Thread_cond_t comm_create_finish;
582 tMPI_Comm *new_comm; /* newly created communicators */
584 /* the split structure is shared among the comm threads and is
585 allocated & deallocated during tMPI_Comm_split */
586 struct tmpi_split *split;
588 /* the topologies (only cartesian topology is currently implemented */
589 struct cart_topol *cart;
590 /*struct tmpi_graph_topol_ *graph;*/
594 /* links for a global circular list of all comms that starts at
595 TMPI_COMM_WORLD. Used to de-allocate the comm structures after
597 struct tmpi_comm_ *next, *prev;
599 /* A counter that counts up to N before the comm is freed. */
600 tMPI_Atomic_t destroy_counter;
605 /* specific for tMPI_Split: */
608 volatile int Ncol_init;
609 volatile int Ncol_destroy;
610 volatile tmpi_bool can_finish;
611 volatile int *colors;
615 /* cartesian topology */
618 int ndims; /* number of dimensions */
619 int *dims; /* procs per coordinate */
620 int *periods; /* whether the grid is periodic, per dimension */
625 struct tmpi_graph_topol_
634 /**************************************************************************
636 DATA TYPE DATA STRUCTURES
638 **************************************************************************/
640 /* tMPI_Reduce Op functions */
641 typedef void (*tMPI_Op_fn)(void*, void*, void*, int);
644 struct tmpi_datatype_component
646 struct tmpi_datatype_ *type;
650 /* we don't support datatypes with holes (yet) */
651 struct tmpi_datatype_
653 size_t size; /* full extent of type. */
654 tMPI_Op_fn *op_functions; /* array of op functions for this datatype */
655 int N_comp; /* number of components */
656 struct tmpi_datatype_component *comps; /* the components */
657 tmpi_bool committed; /* whether the data type is committed */
659 /* just as a shorthand: */
660 typedef struct tmpi_datatype_ tmpi_dt;
669 /**************************************************************************
673 **************************************************************************/
676 /* the threads themselves (tmpi_comm only contains lists of pointers to this
678 extern struct tmpi_thread *threads;
682 extern tMPI_Thread_key_t id_key; /* the key to get the thread id */
684 /* misc. global information about MPI */
685 extern struct tmpi_global *tmpi_global;
694 /**************************************************************************
696 FUNCTION PROTOTYPES & MACROS
698 **************************************************************************/
701 void tMPI_Trace_print(const char *fmt, ...);
704 /* error-checking malloc/realloc: */
705 void *tMPI_Malloc(size_t size);
706 void *tMPI_Realloc(void *p, size_t size);
707 void tMPI_Free(void *p);
710 /* get the current thread structure pointer */
711 #define tMPI_Get_current() ((struct tmpi_thread*) \
712 tMPI_Thread_getspecific(id_key))
714 /* get the number of this thread */
715 /*#define tMPI_This_threadnr() (tMPI_Get_current() - threads)*/
717 /* get the number of a specific thread. We convert to the resulting size_t to
718 int, which is unlikely to cause problems in the foreseeable future. */
719 #define tMPI_Threadnr(th) (int)(th - threads)
721 /* get thread associated with rank */
722 #define tMPI_Get_thread(comm, rank) (comm->grp.peers[rank])
726 /* get the current thread structure pointer */
727 struct tmpi_thread *tMPI_Get_current(void);
728 /* get the thread belonging to comm with rank rank */
729 struct tmpi_thread *tMPI_Get_thread(tMPI_Comm comm, int rank);
733 /* handle an error, returning the errorcode */
734 int tMPI_Error(tMPI_Comm comm, int tmpi_errno);
738 /* check whether we're the main thread */
739 tmpi_bool tMPI_Is_master(void);
740 /* check whether the current process is in a group */
741 tmpi_bool tMPI_In_group(tMPI_Group group);
743 /* find the rank of a thread in a comm */
744 int tMPI_Comm_seek_rank(tMPI_Comm comm, struct tmpi_thread *th);
745 /* find the size of a comm */
746 int tMPI_Comm_N(tMPI_Comm comm);
748 /* allocate a comm object, making space for N threads */
749 int tMPI_Comm_alloc(tMPI_Comm *newcomm, tMPI_Comm parent, int N);
750 /* de-allocate a comm object */
751 int tMPI_Comm_destroy(tMPI_Comm comm, tmpi_bool do_link_lock);
752 /* allocate a group object */
753 tMPI_Group tMPI_Group_alloc(void);
755 /* topology functions */
756 /* de-allocate a cartesian topology structure. (it is allocated with
757 the internal function tMPI_Cart_init()) */
758 void tMPI_Cart_destroy(struct cart_topol *top);
765 /* initialize a free envelope list with N envelopes */
766 int tMPI_Free_env_list_init(struct free_envelope_list *evl, int N);
767 /* destroy a free envelope list */
768 void tMPI_Free_env_list_destroy(struct free_envelope_list *evl);
771 /* initialize a send envelope list */
772 int tMPI_Send_env_list_init(struct send_envelope_list *evl, int N);
773 /* destroy a send envelope list */
774 void tMPI_Send_env_list_destroy(struct send_envelope_list *evl);
781 /* initialize a recv envelope list */
782 int tMPI_Recv_env_list_init(struct recv_envelope_list *evl);
783 /* destroy a recv envelope list */
784 void tMPI_Recv_env_list_destroy(struct recv_envelope_list *evl);
789 /* initialize request list */
790 int tMPI_Req_list_init(struct req_list *rl, int N_reqs);
791 /* destroy request list */
792 void tMPI_Req_list_destroy(struct req_list *rl);
796 /* collective data structure ops */
799 /* initialize a coll env structure */
800 int tMPI_Coll_env_init(struct coll_env *mev, int N);
801 /* destroy a coll env structure */
802 void tMPI_Coll_env_destroy(struct coll_env *mev);
804 /* initialize a coll sync structure */
805 int tMPI_Coll_sync_init(struct coll_sync *msc, int N);
806 /* destroy a coll sync structure */
807 void tMPI_Coll_sync_destroy(struct coll_sync *msc);
809 #ifdef USE_COLLECTIVE_COPY_BUFFER
810 /* initialize a copy_buffer_list */
811 int tMPI_Copy_buffer_list_init(struct copy_buffer_list *cbl, int Nbufs,
813 /* initialize a copy_buffer_list */
814 void tMPI_Copy_buffer_list_destroy(struct copy_buffer_list *cbl);
815 /* get a copy buffer from a list */
816 struct copy_buffer *tMPI_Copy_buffer_list_get(struct copy_buffer_list *cbl);
817 /* return a copy buffer to a list */
818 void tMPI_Copy_buffer_list_return(struct copy_buffer_list *cbl,
819 struct copy_buffer *cb);
820 /* initialize a copy buffer */
821 int tMPI_Copy_buffer_init(struct copy_buffer *cb, size_t size);
822 void tMPI_Copy_buffer_destroy(struct copy_buffer *cb);
826 /* reduce ops: run a single iteration of a reduce operation on a, b -> dest */
827 int tMPI_Reduce_run_op(void *dest, void *src_a, void *src_b,
828 tMPI_Datatype datatype, int count, tMPI_Op op,
832 /* and we need this prototype */
833 int main(int argc, char **argv);