2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
39 /* this is the header file for the implementation side of the thread_mpi
40 library. It contains the definitions for all the internal data structures
41 and the prototypes for all the internal functions that aren't static. */
48 #ifdef HAVE_SYS_TIME_H
59 #include "thread_mpi/atomic.h"
60 #include "thread_mpi/threads.h"
61 #include "thread_mpi/event.h"
62 #include "thread_mpi/tmpi.h"
63 #include "thread_mpi/collective.h"
64 #include "thread_mpi/barrier.h"
65 #include "thread_mpi/lock.h"
72 /**************************************************************************
76 **************************************************************************/
79 typedef int tmpi_bool;
85 #ifdef USE_COLLECTIVE_COPY_BUFFER
86 /**************************************************************************
88 PRE-ALLOCATED COMMUNICATION BUFFERS
90 **************************************************************************/
93 /* Buffer structure for collective communications. Every thread structure
94 has several of these ready to be used when the collective data
95 transmission is small enough for double copying to occur (i.e. the size
96 of the transmission is less than N*MAX_COPY_BUFFER_SIZE, where N is the
97 number of receiving threads). */
100 void *buf; /* the actual buffer */
101 struct copy_buffer *next; /* pointer to next free buffer in buffer_list */
102 size_t size; /* allocated size of buffer */
105 /* a list of copy_buffers of a specific size. */
106 struct copy_buffer_list
108 struct copy_buffer *cb; /* pointer to the first copy_buffer */
109 size_t size; /* allocated size of buffers in this list */
110 struct copy_buffer *cb_alloc; /* list as allocated */
111 int Nbufs; /* number of allocated buffers */
127 /**************************************************************************
129 POINT-TO-POINT COMMUNICATION DATA STRUCTURES
131 **************************************************************************/
133 /* the message envelopes (as described in the MPI standard).
134 These fully describes the message, and make each message unique (enough).
136 Transmitting data works by having the sender put a pointer to an envelope
137 onto the receiver's new envelope list corresponding to the originating
139 The sender then waits until the receiver finishes the transmission, while
140 matching all incoming new envelopes against its own list of receive
143 The receiver either directly matches its receiving envelope against
144 all previously un-matched sending envelopes, or, if no suitable envelope
145 is found, it puts the receive envelope on a receive list.
146 Once waiting for completion, the receiver matches against all incoming
149 /* the state of an individual point-to-point transmission */
152 env_unmatched = 0, /* the envelope has not had a match yet */
153 env_copying = 1, /* busy copying (only used for send envelope
154 by receiver if using_cpbuf is true,
155 but cb was still NULL). */
156 env_cb_available = 2, /* the copy buffer is available. Set by
157 the sender on a send_buffer. */
158 env_finished = 3 /* the transmission has finished */
162 /* the envelope. Held in tmpi_thread->evs[src_thread] for send envelopes,
163 or in tmpi_thread->evl for receive envelopes */
166 int tag; /* the tag */
167 tMPI_Comm comm; /* this is a structure shared across threads, so we
168 can test easily whether two threads are talking
169 about the same comm. */
171 struct tmpi_thread *src, *dest; /* these are pretty obvious */
173 void *buf; /* buffer to be sent */
174 size_t bufsize; /* the size of the data to be transmitted */
175 tMPI_Datatype datatype; /* the data type */
177 tmpi_bool nonblock; /* whether the receiver is non-blocking */
179 /* state, values from enum_envelope_state .
180 (there's a few busy-waits relying on this flag).
181 status=env_unmatched is the initial state.*/
184 /* the error condition */
187 /* the message status */
188 /*tMPI_Status *status;*/
190 /* prev and next envelopes in the send/recv_envelope_list linked list */
191 struct envelope *prev,*next;
193 tmpi_bool send; /* whether this is a send envelope (if TRUE), or a receive
194 envelope (if FALSE) */
195 #ifdef USE_SEND_RECV_COPY_BUFFER
196 tmpi_bool using_cb; /* whether a copy buffer is (going to be) used */
197 void* cb;/* the allocated copy buffer pointer */
199 /* the next and previous envelopes in the request list */
200 struct envelope *prev_req, *next_req;
202 /* the list I'm in */
203 struct recv_envelope_list *rlist;
204 struct send_envelope_list *slist;
209 /* singly linked lists of free send & receive envelopes belonging to a
211 struct free_envelope_list
213 struct envelope *head_recv; /* the first element in the linked list */
214 struct envelope *recv_alloc_head; /* the allocated recv list */
217 /* collection of send envelopes to a specific thread */
218 struct send_envelope_list
220 struct envelope *head_free; /* singly linked list with free send
221 envelopes. A single-thread LIFO.*/
222 #ifdef TMPI_LOCK_FREE_LISTS
223 tMPI_Atomic_ptr_t head_new; /* singly linked list with the new send
224 envelopes (i.e. those that are put there by
225 the sending thread, but not yet checked by
226 the receiving thread). This is a lock-free
227 shared detachable list.*/
228 tMPI_Atomic_ptr_t head_rts; /* singly linked list with free send
229 envelopes returned by the other thread.
230 This is a lock-free shared LIFO.*/
232 struct envelope *head_new; /* singly linked list with the new send
233 envelopes (i.e. those that are put there by
234 the sending thread, but not yet checked by
235 the receiving thread). */
236 struct envelope *head_rts; /* singly linked list with free send envelopes */
237 tMPI_Spinlock_t lock_new; /* this locks head_new */
238 tMPI_Spinlock_t lock_rts; /* this locks head_rts */
240 struct envelope *head_old; /* the old send envelopes, in a circular doubly
241 linked list. These have been checked by the
242 receiving thread against the existing
243 recv_envelope_list. */
245 struct envelope *alloc_head; /* the allocated send list */
246 size_t Nalloc; /* number of allocted sends */
249 struct recv_envelope_list
251 struct envelope *head; /* first envelope in this list */
252 struct envelope dummy; /* the dummy element for the list */
256 /* the request object for asynchronious operations. */
259 tmpi_bool finished; /* whether it's finished */
260 struct envelope *ev; /* the envelope */
262 struct tmpi_thread *source; /* the message source (for receives) */
263 tMPI_Comm comm; /* the comm */
264 int tag; /* the tag */
265 int error; /* error code */
266 size_t transferred; /* the number of transferred bytes */
267 tmpi_bool cancelled; /* whether the transmission was canceled */
269 struct tmpi_req_ *next,*prev; /* next,prev request in linked list,
270 used in the req_list, but also in
274 /* pre-allocated request object list */
277 struct tmpi_req_ *head; /* pre-allocated singly linked list of requests.
278 (i.e. reqs->prev is undefined). */
279 struct tmpi_req_ *alloc_head; /* the allocated block */
297 /**************************************************************************
299 MULTICAST COMMUNICATION DATA STRUCTURES
301 **************************************************************************/
303 /* these are data structures meant for keeping track of multicast operations
304 (tMPI_Bcast, tMPI_Gather, etc.). Because these operations are all collective
305 across the comm, and are always blocking, the protocol can be much simpler
306 than that for point-to-point communication through tMPI_Send/Recv, etc. */
308 /* unique tags for multicast & collective operations */
309 #define TMPI_BCAST_TAG 1
310 #define TMPI_GATHER_TAG 2
311 #define TMPI_GATHERV_TAG 3
312 #define TMPI_SCATTER_TAG 4
313 #define TMPI_SCATTERV_TAG 5
314 #define TMPI_REDUCE_TAG 6
315 #define TMPI_ALLTOALL_TAG 7
316 #define TMPI_ALLTOALLV_TAG 8
319 /* thread-specific part of the coll_env */
320 struct coll_env_thread
322 tMPI_Atomic_t current_sync; /* sync counter value for the current
324 tMPI_Atomic_t n_remaining; /* remaining threads count for each thread */
326 int tag; /* collective communication type */
327 tMPI_Datatype datatype; /* datatype */
329 void **buf; /* array of send/recv buffer values */
330 size_t *bufsize; /* array of number of bytes to send/recv */
332 #ifdef USE_COLLECTIVE_COPY_BUFFER
333 tmpi_bool using_cb; /* whether a copy buffer is (going to be) used */
334 tMPI_Atomic_t buf_readcount; /* Number of threads reading from buf
335 while using_cpbuf is true, but cpbuf
337 tMPI_Atomic_ptr_t *cpbuf; /* copy_buffer pointers. */
338 struct copy_buffer *cb; /* the copy buffer cpbuf points to */
341 tMPI_Event send_ev; /* event associated with being the sending thread.
342 Triggered when last receiving thread is ready,
343 and the coll_env_thread is ready for re-use. */
344 tMPI_Event recv_ev; /* event associated with being a receiving thread. */
346 tmpi_bool *read_data; /* whether we read data from a specific thread. */
349 /* Collective communications once sync. These run in parallel with
350 the collection of coll_env_threads*/
353 /* collective sync data */
354 tMPI_Atomic_t current_sync; /* sync counter value for the current
356 tMPI_Atomic_t n_remaining; /* remaining threads count */
358 void *res; /* result data for once calls. */
361 /* the collective communication envelope. There's a few of these per
362 comm, and each one stands for one collective communication call. */
365 struct coll_env_thread *met; /* thread-specific collective envelope data.*/
367 struct coll_env_coll coll;
371 /* multicast synchronization data structure. There's one of these for
372 each thread in each tMPI_Comm structure */
375 int synct; /* sync counter for coll_env_thread. */
376 int syncs; /* sync counter for coll_env_coll. */
378 tMPI_Event *events; /* One event for each other thread */
379 int N; /* the number of threads */
392 /**************************************************************************
394 THREAD DATA STRUCTURES
396 **************************************************************************/
398 /* information about a running thread. This structure is put in a
399 globally available array; the envelope exchange, etc. are all done through
400 the elements of this array.*/
403 tMPI_Thread_t thread_id; /* this thread's id */
405 /* p2p communication structures: */
407 /* the receive envelopes posted for other threads to check */
408 struct recv_envelope_list evr;
409 /* the send envelopes posted by other threadas */
410 struct send_envelope_list *evs;
411 /* free send and receive envelopes */
412 struct free_envelope_list envelopes;
413 /* number of finished send envelopes */
414 tMPI_Atomic_t ev_outgoing_received;
415 /* the p2p communication events (incoming envelopes + finished send
416 envelopes generate events) */
417 tMPI_Event p2p_event;
418 TMPI_YIELD_WAIT_DATA /* data associated with waiting */
419 struct req_list rql; /* list of pre-allocated requests */
421 /* collective communication structures: */
422 #ifdef USE_COLLECTIVE_COPY_BUFFER
423 /* copy buffer list for multicast communications */
424 struct copy_buffer_list cbl_multi;
427 /* miscellaneous data: */
429 tMPI_Comm self_comm; /* comms for MPI_COMM_SELF */
431 /* the per-thread profile structure that keeps call counts & wait times. */
432 struct tmpi_profile profile;
434 /* The start function (or NULL, if a main()-style start function is to
436 void (*start_fn)(void*);
437 /* The main()-style start function */
438 int (*start_fn_main)(int, char**);
439 /* the argument to the start function, if it's not main()*/
442 /* we copy these for each thread (providing these to main() is not
443 required by the MPI standard, but it's convenient). Note that we copy,
444 because some programs (like Gromacs) like to manipulate these. */
454 /**************************************************************************
456 ERROR HANDLER DATA STRUCTURES
458 **************************************************************************/
461 /* the error handler */
462 struct tmpi_errhandler_
465 tMPI_Errhandler_fn fn;
468 /* standard error handler functions */
469 void tmpi_errors_are_fatal_fn(tMPI_Comm *comm, int *err);
470 void tmpi_errors_return_fn(tMPI_Comm *comm, int *err);
476 /**************************************************************************
478 GLOBAL DATA STRUCTURE
480 **************************************************************************/
482 /* global MPI information */
485 /* list of pointers to all user-defined types */
486 struct tmpi_datatype_ **usertypes;
488 int Nalloc_usertypes;
490 /* spinlock/mutex for manipulating tmpi_user_types */
491 tMPI_Spinlock_t datatype_lock;
493 /* barrier for tMPI_Finalize(), etc. */
494 tMPI_Thread_barrier_t barrier;
496 /* the timer for tMPI_Wtime() */
497 tMPI_Thread_mutex_t timer_mutex;
498 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
499 /* the time at initialization. */
500 struct timeval timer_init;
502 /* the time at initialization. */
521 /**************************************************************************
523 COMMUNICATOR DATA STRUCTURES
525 **************************************************************************/
530 int N; /* the number of threads */
531 struct tmpi_thread **peers; /* the list of peers to communicate with */
533 int Nrefs; /* the number of references to this structure */
538 /* the communicator objects are globally shared. */
541 struct tmpi_group_ grp; /* the communicator group */
543 /* the barrier for tMPI_Barrier() */
544 tMPI_Barrier_t barrier;
547 /* List of barriers for reduce operations.
548 reduce_barrier[0] contains a list of N/2 barriers for N threads
549 reduce_barrier[1] contains a list of N/4 barriers for N/2 threads
550 reduce_barrier[2] contains a list of N/8 barriers for N/4 threads
551 and so on. (until N/x reaches 1)
552 This is to facilitate tree-based algorithms for tMPI_Reduce, etc. */
553 tMPI_Barrier_t **reduce_barrier;
554 int *N_reduce; /* the number of barriers in each iteration */
555 int N_reduce_iter; /* the number of iterations */
558 struct coll_env *cev; /* list of multicast envelope objecs */
559 struct coll_sync *csync; /* list of multicast sync objecs */
561 /* lists of globally shared send/receive buffers for tMPI_Reduce. */
562 tMPI_Atomic_ptr_t *reduce_sendbuf, *reduce_recvbuf;
564 /* mutex for communication object creation. Traditional mutexes are
565 better here because communicator creation should not be done in
566 time-critical sections of code. */
567 tMPI_Thread_mutex_t comm_create_lock;
568 tMPI_Thread_cond_t comm_create_prep;
569 tMPI_Thread_cond_t comm_create_finish;
571 tMPI_Comm *new_comm; /* newly created communicators */
573 /* the split structure is shared among the comm threads and is
574 allocated & deallocated during tMPI_Comm_split */
575 struct tmpi_split *split;
577 /* the topologies (only cartesian topology is currently implemented */
578 struct cart_topol *cart;
579 /*struct tmpi_graph_topol_ *graph;*/
583 /* links for a global circular list of all comms that starts at
584 TMPI_COMM_WORLD. Used to de-allocate the comm structures after
586 struct tmpi_comm_ *next,*prev;
590 /* specific for tMPI_Split: */
593 volatile int Ncol_init;
594 volatile int Ncol_destroy;
595 volatile tmpi_bool can_finish;
596 volatile int *colors;
600 /* cartesian topology */
603 int ndims; /* number of dimensions */
604 int *dims; /* procs per coordinate */
605 int *periods; /* whether the grid is periodic, per dimension */
610 struct tmpi_graph_topol_
619 /**************************************************************************
621 DATA TYPE DATA STRUCTURES
623 **************************************************************************/
625 /* tMPI_Reduce Op functions */
626 typedef void (*tMPI_Op_fn)(void*, void*, void*, int);
629 struct tmpi_datatype_component
631 struct tmpi_datatype_ *type;
635 /* we don't support datatypes with holes (yet) */
636 struct tmpi_datatype_
638 size_t size; /* full extent of type. */
639 tMPI_Op_fn *op_functions; /* array of op functions for this datatype */
640 int N_comp; /* number of components */
641 struct tmpi_datatype_component *comps; /* the components */
642 tmpi_bool committed; /* whether the data type is committed */
644 /* just as a shorthand: */
645 typedef struct tmpi_datatype_ tmpi_dt;
654 /**************************************************************************
658 **************************************************************************/
661 /* the threads themselves (tmpi_comm only contains lists of pointers to this
663 extern struct tmpi_thread *threads;
667 extern tMPI_Thread_key_t id_key; /* the key to get the thread id */
669 /* misc. global information about MPI */
670 extern struct tmpi_global *tmpi_global;
679 /**************************************************************************
681 FUNCTION PROTOTYPES & MACROS
683 **************************************************************************/
686 void tMPI_Trace_print(const char *fmt, ...);
689 /* error-checking malloc/realloc: */
690 void *tMPI_Malloc(size_t size);
691 void *tMPI_Realloc(void *p, size_t size);
692 void tMPI_Free(void *p);
695 /* get the current thread structure pointer */
696 #define tMPI_Get_current() ((struct tmpi_thread*) \
697 tMPI_Thread_getspecific(id_key))
699 /* get the number of this thread */
700 /*#define tMPI_This_threadnr() (tMPI_Get_current() - threads)*/
702 /* get the number of a specific thread. We convert to the resulting size_t to
703 int, which is unlikely to cause problems in the foreseeable future. */
704 #define tMPI_Threadnr(th) (int)(th - threads)
706 /* get thread associated with rank */
707 #define tMPI_Get_thread(comm, rank) (comm->grp.peers[rank])
711 /* get the current thread structure pointer */
712 struct tmpi_thread *tMPI_Get_current(void);
713 /* get the thread belonging to comm with rank rank */
714 struct tmpi_thread *tMPI_Get_thread(tMPI_Comm comm, int rank);
718 /* handle an error, returning the errorcode */
719 int tMPI_Error(tMPI_Comm comm, int tmpi_errno);
723 /* check whether we're the main thread */
724 tmpi_bool tMPI_Is_master(void);
725 /* check whether the current process is in a group */
726 tmpi_bool tMPI_In_group(tMPI_Group group);
728 /* find the rank of a thread in a comm */
729 int tMPI_Comm_seek_rank(tMPI_Comm comm, struct tmpi_thread *th);
730 /* find the size of a comm */
731 int tMPI_Comm_N(tMPI_Comm comm);
733 /* allocate a comm object, making space for N threads */
734 tMPI_Comm tMPI_Comm_alloc(tMPI_Comm parent, int N);
735 /* de-allocate a comm object */
736 void tMPI_Comm_destroy(tMPI_Comm comm);
737 /* allocate a group object */
738 tMPI_Group tMPI_Group_alloc(void);
740 /* topology functions */
741 /* de-allocate a cartesian topology structure. (it is allocated with
742 the internal function tMPI_Cart_init()) */
743 void tMPI_Cart_destroy(struct cart_topol *top);
750 /* initialize a free envelope list with N envelopes */
751 void tMPI_Free_env_list_init(struct free_envelope_list *evl, int N);
752 /* destroy a free envelope list */
753 void tMPI_Free_env_list_destroy(struct free_envelope_list *evl);
756 /* initialize a send envelope list */
757 void tMPI_Send_env_list_init(struct send_envelope_list *evl, int N);
758 /* destroy a send envelope list */
759 void tMPI_Send_env_list_destroy(struct send_envelope_list *evl);
766 /* initialize a recv envelope list */
767 void tMPI_Recv_env_list_init(struct recv_envelope_list *evl);
768 /* destroy a recv envelope list */
769 void tMPI_Recv_env_list_destroy(struct recv_envelope_list *evl);
774 /* initialize request list */
775 void tMPI_Req_list_init(struct req_list *rl, int N_reqs);
776 /* destroy request list */
777 void tMPI_Req_list_destroy(struct req_list *rl);
781 /* collective data structure ops */
784 /* initialize a coll env structure */
785 void tMPI_Coll_env_init(struct coll_env *mev, int N);
786 /* destroy a coll env structure */
787 void tMPI_Coll_env_destroy(struct coll_env *mev);
789 /* initialize a coll sync structure */
790 void tMPI_Coll_sync_init(struct coll_sync *msc, int N);
791 /* destroy a coll sync structure */
792 void tMPI_Coll_sync_destroy(struct coll_sync *msc);
794 #ifdef USE_COLLECTIVE_COPY_BUFFER
795 /* initialize a copy_buffer_list */
796 void tMPI_Copy_buffer_list_init(struct copy_buffer_list *cbl, int Nbufs,
798 /* initialize a copy_buffer_list */
799 void tMPI_Copy_buffer_list_destroy(struct copy_buffer_list *cbl);
800 /* get a copy buffer from a list */
801 struct copy_buffer *tMPI_Copy_buffer_list_get(struct copy_buffer_list *cbl);
802 /* return a copy buffer to a list */
803 void tMPI_Copy_buffer_list_return(struct copy_buffer_list *cbl,
804 struct copy_buffer *cb);
805 /* initialize a copy buffer */
806 void tMPI_Copy_buffer_init(struct copy_buffer *cb, size_t size);
807 void tMPI_Copy_buffer_destroy(struct copy_buffer *cb);
811 /* reduce ops: run a single iteration of a reduce operation on a, b -> dest */
812 int tMPI_Reduce_run_op(void *dest, void *src_a, void *src_b,
813 tMPI_Datatype datatype, int count, tMPI_Op op,
817 /* and we need this prototype */
818 int main(int argc, char **argv);