2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
39 /* this is the header file for the implementation side of the thread_mpi
40 library. It contains the definitions for all the internal data structures
41 and the prototypes for all the internal functions that aren't static. */
51 #ifdef HAVE_SYS_TIME_H
62 #include "thread_mpi/atomic.h"
63 #include "thread_mpi/threads.h"
64 #include "thread_mpi/event.h"
65 #include "thread_mpi/tmpi.h"
66 #include "thread_mpi/collective.h"
67 #include "thread_mpi/barrier.h"
68 #include "thread_mpi/lock.h"
75 /**************************************************************************
79 **************************************************************************/
82 typedef int tmpi_bool;
88 #ifdef USE_COLLECTIVE_COPY_BUFFER
89 /**************************************************************************
91 PRE-ALLOCATED COMMUNICATION BUFFERS
93 **************************************************************************/
96 /* Buffer structure for collective communications. Every thread structure
97 has several of these ready to be used when the collective data
98 transmission is small enough for double copying to occur (i.e. the size
99 of the transmission is less than N*MAX_COPY_BUFFER_SIZE, where N is the
100 number of receiving threads). */
103 void *buf; /* the actual buffer */
104 struct copy_buffer *next; /* pointer to next free buffer in buffer_list */
105 size_t size; /* allocated size of buffer */
108 /* a list of copy_buffers of a specific size. */
109 struct copy_buffer_list
111 struct copy_buffer *cb; /* pointer to the first copy_buffer */
112 size_t size; /* allocated size of buffers in this list */
113 struct copy_buffer *cb_alloc; /* list as allocated */
114 int Nbufs; /* number of allocated buffers */
130 /**************************************************************************
132 POINT-TO-POINT COMMUNICATION DATA STRUCTURES
134 **************************************************************************/
136 /* the message envelopes (as described in the MPI standard).
137 These fully describes the message, and make each message unique (enough).
139 Transmitting data works by having the sender put a pointer to an envelope
140 onto the receiver's new envelope list corresponding to the originating
142 The sender then waits until the receiver finishes the transmission, while
143 matching all incoming new envelopes against its own list of receive
146 The receiver either directly matches its receiving envelope against
147 all previously un-matched sending envelopes, or, if no suitable envelope
148 is found, it puts the receive envelope on a receive list.
149 Once waiting for completion, the receiver matches against all incoming
152 /* the state of an individual point-to-point transmission */
155 env_unmatched = 0, /* the envelope has not had a match yet */
156 env_copying = 1, /* busy copying (only used for send envelope
157 by receiver if using_cpbuf is true,
158 but cb was still NULL). */
159 env_cb_available = 2, /* the copy buffer is available. Set by
160 the sender on a send_buffer. */
161 env_finished = 3 /* the transmission has finished */
165 /* the envelope. Held in tmpi_thread->evs[src_thread] for send envelopes,
166 or in tmpi_thread->evl for receive envelopes */
169 int tag; /* the tag */
170 tMPI_Comm comm; /* this is a structure shared across threads, so we
171 can test easily whether two threads are talking
172 about the same comm. */
174 struct tmpi_thread *src, *dest; /* these are pretty obvious */
176 void *buf; /* buffer to be sent */
177 size_t bufsize; /* the size of the data to be transmitted */
178 tMPI_Datatype datatype; /* the data type */
180 tmpi_bool nonblock; /* whether the receiver is non-blocking */
182 /* state, values from enum_envelope_state .
183 (there's a few busy-waits relying on this flag).
184 status=env_unmatched is the initial state.*/
187 /* the error condition */
190 /* the message status */
191 /*tMPI_Status *status;*/
193 /* prev and next envelopes in the send/recv_envelope_list linked list */
194 struct envelope *prev, *next;
196 tmpi_bool send; /* whether this is a send envelope (if TRUE), or a receive
197 envelope (if FALSE) */
198 #ifdef USE_SEND_RECV_COPY_BUFFER
199 tmpi_bool using_cb; /* whether a copy buffer is (going to be) used */
200 void * cb; /* the allocated copy buffer pointer */
202 /* the next and previous envelopes in the request list */
203 struct envelope *prev_req, *next_req;
205 /* the list I'm in */
206 struct recv_envelope_list *rlist;
207 struct send_envelope_list *slist;
212 /* singly linked lists of free send & receive envelopes belonging to a
214 struct free_envelope_list
216 struct envelope *head_recv; /* the first element in the linked list */
217 struct envelope *recv_alloc_head; /* the allocated recv list */
220 /* collection of send envelopes to a specific thread */
221 struct send_envelope_list
223 struct envelope *head_free; /* singly linked list with free send
224 envelopes. A single-thread LIFO.*/
225 #ifdef TMPI_LOCK_FREE_LISTS
226 tMPI_Atomic_ptr_t head_new; /* singly linked list with the new send
227 envelopes (i.e. those that are put there by
228 the sending thread, but not yet checked by
229 the receiving thread). This is a lock-free
230 shared detachable list.*/
231 tMPI_Atomic_ptr_t head_rts; /* singly linked list with free send
232 envelopes returned by the other thread.
233 This is a lock-free shared LIFO.*/
235 struct envelope *head_new; /* singly linked list with the new send
236 envelopes (i.e. those that are put there by
237 the sending thread, but not yet checked by
238 the receiving thread). */
239 struct envelope *head_rts; /* singly linked list with free send envelopes */
240 tMPI_Spinlock_t lock_new; /* this locks head_new */
241 tMPI_Spinlock_t lock_rts; /* this locks head_rts */
243 struct envelope *head_old; /* the old send envelopes, in a circular doubly
244 linked list. These have been checked by the
245 receiving thread against the existing
246 recv_envelope_list. */
248 struct envelope *alloc_head; /* the allocated send list */
249 size_t Nalloc; /* number of allocted sends */
252 struct recv_envelope_list
254 struct envelope *head; /* first envelope in this list */
255 struct envelope dummy; /* the dummy element for the list */
259 /* the request object for asynchronious operations. */
262 tmpi_bool finished; /* whether it's finished */
263 struct envelope *ev; /* the envelope */
265 struct tmpi_thread *source; /* the message source (for receives) */
266 tMPI_Comm comm; /* the comm */
267 int tag; /* the tag */
268 int error; /* error code */
269 size_t transferred; /* the number of transferred bytes */
270 tmpi_bool cancelled; /* whether the transmission was canceled */
272 struct tmpi_req_ *next, *prev; /* next,prev request in linked list,
273 used in the req_list, but also in
277 /* pre-allocated request object list */
280 struct tmpi_req_ *head; /* pre-allocated singly linked list of requests.
281 (i.e. reqs->prev is undefined). */
282 struct tmpi_req_ *alloc_head; /* the allocated block */
300 /**************************************************************************
302 MULTICAST COMMUNICATION DATA STRUCTURES
304 **************************************************************************/
306 /* these are data structures meant for keeping track of multicast operations
307 (tMPI_Bcast, tMPI_Gather, etc.). Because these operations are all collective
308 across the comm, and are always blocking, the protocol can be much simpler
309 than that for point-to-point communication through tMPI_Send/Recv, etc. */
311 /* unique tags for multicast & collective operations */
312 #define TMPI_BCAST_TAG 1
313 #define TMPI_GATHER_TAG 2
314 #define TMPI_GATHERV_TAG 3
315 #define TMPI_SCATTER_TAG 4
316 #define TMPI_SCATTERV_TAG 5
317 #define TMPI_REDUCE_TAG 6
318 #define TMPI_ALLTOALL_TAG 7
319 #define TMPI_ALLTOALLV_TAG 8
322 /* thread-specific part of the coll_env */
323 struct coll_env_thread
325 tMPI_Atomic_t current_sync; /* sync counter value for the current
327 tMPI_Atomic_t n_remaining; /* remaining threads count for each thread */
329 int tag; /* collective communication type */
330 tMPI_Datatype datatype; /* datatype */
332 void **buf; /* array of send/recv buffer values */
333 size_t *bufsize; /* array of number of bytes to send/recv */
335 #ifdef USE_COLLECTIVE_COPY_BUFFER
336 tmpi_bool using_cb; /* whether a copy buffer is (going to be) used */
337 tMPI_Atomic_t buf_readcount; /* Number of threads reading from buf
338 while using_cpbuf is true, but cpbuf
340 tMPI_Atomic_ptr_t *cpbuf; /* copy_buffer pointers. */
341 struct copy_buffer *cb; /* the copy buffer cpbuf points to */
344 tMPI_Event send_ev; /* event associated with being the sending thread.
345 Triggered when last receiving thread is ready,
346 and the coll_env_thread is ready for re-use. */
347 tMPI_Event recv_ev; /* event associated with being a receiving thread. */
349 tmpi_bool *read_data; /* whether we read data from a specific thread. */
352 /* Collective communications once sync. These run in parallel with
353 the collection of coll_env_threads*/
356 /* collective sync data */
357 tMPI_Atomic_t current_sync; /* sync counter value for the current
359 tMPI_Atomic_t n_remaining; /* remaining threads count */
361 void *res; /* result data for once calls. */
364 /* the collective communication envelope. There's a few of these per
365 comm, and each one stands for one collective communication call. */
368 struct coll_env_thread *met; /* thread-specific collective envelope data.*/
370 struct coll_env_coll coll;
374 /* multicast synchronization data structure. There's one of these for
375 each thread in each tMPI_Comm structure */
378 int synct; /* sync counter for coll_env_thread. */
379 int syncs; /* sync counter for coll_env_coll. */
381 tMPI_Event *events; /* One event for each other thread */
382 int N; /* the number of threads */
395 /**************************************************************************
397 THREAD DATA STRUCTURES
399 **************************************************************************/
401 /* information about a running thread. This structure is put in a
402 globally available array; the envelope exchange, etc. are all done through
403 the elements of this array.*/
406 tMPI_Thread_t thread_id; /* this thread's id */
408 /* p2p communication structures: */
410 /* the receive envelopes posted for other threads to check */
411 struct recv_envelope_list evr;
412 /* the send envelopes posted by other threadas */
413 struct send_envelope_list *evs;
414 /* free send and receive envelopes */
415 struct free_envelope_list envelopes;
416 /* number of finished send envelopes */
417 tMPI_Atomic_t ev_outgoing_received;
418 /* the p2p communication events (incoming envelopes + finished send
419 envelopes generate events) */
420 tMPI_Event p2p_event;
421 TMPI_YIELD_WAIT_DATA /* data associated with waiting */
422 struct req_list rql; /* list of pre-allocated requests */
424 /* collective communication structures: */
425 #ifdef USE_COLLECTIVE_COPY_BUFFER
426 /* copy buffer list for multicast communications */
427 struct copy_buffer_list cbl_multi;
430 /* miscellaneous data: */
432 tMPI_Comm self_comm; /* comms for MPI_COMM_SELF */
434 /* the per-thread profile structure that keeps call counts & wait times. */
435 struct tmpi_profile profile;
437 /* The start function (or NULL, if a main()-style start function is to
439 void (*start_fn)(void*);
440 /* The main()-style start function */
441 int (*start_fn_main)(int, char**);
442 /* the argument to the start function, if it's not main()*/
445 /* we copy these for each thread (providing these to main() is not
446 required by the MPI standard, but it's convenient). Note that we copy,
447 because some programs (like Gromacs) like to manipulate these. */
457 /**************************************************************************
459 ERROR HANDLER DATA STRUCTURES
461 **************************************************************************/
464 /* the error handler */
465 struct tmpi_errhandler_
468 tMPI_Errhandler_fn fn;
471 /* standard error handler functions */
472 void tmpi_errors_are_fatal_fn(tMPI_Comm *comm, int *err);
473 void tmpi_errors_return_fn(tMPI_Comm *comm, int *err);
479 /**************************************************************************
481 GLOBAL DATA STRUCTURE
483 **************************************************************************/
485 /* global MPI information */
488 /* list of pointers to all user-defined types */
489 struct tmpi_datatype_ **usertypes;
491 int Nalloc_usertypes;
493 /* spinlock/mutex for manipulating tmpi_user_types */
494 tMPI_Spinlock_t datatype_lock;
496 /* Lock to prevent multiple threads manipulating the linked list of comm
498 tMPI_Thread_mutex_t comm_link_lock;
500 /* barrier for tMPI_Finalize(), etc. */
501 tMPI_Thread_barrier_t barrier;
503 /* the timer for tMPI_Wtime() */
504 tMPI_Thread_mutex_t timer_mutex;
505 #if !(defined( _WIN32 ) || defined( _WIN64 ) )
506 /* the time at initialization. */
507 struct timeval timer_init;
509 /* the time at initialization. */
528 /**************************************************************************
530 COMMUNICATOR DATA STRUCTURES
532 **************************************************************************/
537 int N; /* the number of threads */
538 struct tmpi_thread **peers; /* the list of peers to communicate with */
540 int Nrefs; /* the number of references to this structure */
545 /* the communicator objects are globally shared. */
548 struct tmpi_group_ grp; /* the communicator group */
550 /* the barrier for tMPI_Barrier() */
551 tMPI_Barrier_t barrier;
554 /* List of barriers for reduce operations.
555 reduce_barrier[0] contains a list of N/2 barriers for N threads
556 reduce_barrier[1] contains a list of N/4 barriers for N/2 threads
557 reduce_barrier[2] contains a list of N/8 barriers for N/4 threads
558 and so on. (until N/x reaches 1)
559 This is to facilitate tree-based algorithms for tMPI_Reduce, etc. */
560 tMPI_Barrier_t **reduce_barrier;
561 int *N_reduce; /* the number of barriers in each iteration */
562 int N_reduce_iter; /* the number of iterations */
565 struct coll_env *cev; /* list of multicast envelope objecs */
566 struct coll_sync *csync; /* list of multicast sync objecs */
568 /* lists of globally shared send/receive buffers for tMPI_Reduce. */
569 tMPI_Atomic_ptr_t *reduce_sendbuf, *reduce_recvbuf;
571 /* mutex for communication object creation. Traditional mutexes are
572 better here because communicator creation should not be done in
573 time-critical sections of code. */
574 tMPI_Thread_mutex_t comm_create_lock;
575 tMPI_Thread_cond_t comm_create_prep;
576 tMPI_Thread_cond_t comm_create_finish;
578 tMPI_Comm *new_comm; /* newly created communicators */
580 /* the split structure is shared among the comm threads and is
581 allocated & deallocated during tMPI_Comm_split */
582 struct tmpi_split *split;
584 /* the topologies (only cartesian topology is currently implemented */
585 struct cart_topol *cart;
586 /*struct tmpi_graph_topol_ *graph;*/
590 /* links for a global circular list of all comms that starts at
591 TMPI_COMM_WORLD. Used to de-allocate the comm structures after
593 struct tmpi_comm_ *next, *prev;
595 /* A counter that counts up to N before the comm is freed. */
596 tMPI_Atomic_t destroy_counter;
601 /* specific for tMPI_Split: */
604 volatile int Ncol_init;
605 volatile int Ncol_destroy;
606 volatile tmpi_bool can_finish;
607 volatile int *colors;
611 /* cartesian topology */
614 int ndims; /* number of dimensions */
615 int *dims; /* procs per coordinate */
616 int *periods; /* whether the grid is periodic, per dimension */
621 struct tmpi_graph_topol_
630 /**************************************************************************
632 DATA TYPE DATA STRUCTURES
634 **************************************************************************/
636 /* tMPI_Reduce Op functions */
637 typedef void (*tMPI_Op_fn)(void*, void*, void*, int);
640 struct tmpi_datatype_component
642 struct tmpi_datatype_ *type;
646 /* we don't support datatypes with holes (yet) */
647 struct tmpi_datatype_
649 size_t size; /* full extent of type. */
650 tMPI_Op_fn *op_functions; /* array of op functions for this datatype */
651 int N_comp; /* number of components */
652 struct tmpi_datatype_component *comps; /* the components */
653 tmpi_bool committed; /* whether the data type is committed */
655 /* just as a shorthand: */
656 typedef struct tmpi_datatype_ tmpi_dt;
665 /**************************************************************************
669 **************************************************************************/
672 /* the threads themselves (tmpi_comm only contains lists of pointers to this
674 extern struct tmpi_thread *threads;
678 extern tMPI_Thread_key_t id_key; /* the key to get the thread id */
680 /* misc. global information about MPI */
681 extern struct tmpi_global *tmpi_global;
690 /**************************************************************************
692 FUNCTION PROTOTYPES & MACROS
694 **************************************************************************/
697 void tMPI_Trace_print(const char *fmt, ...);
700 /* error-checking malloc/realloc: */
701 void *tMPI_Malloc(size_t size);
702 void *tMPI_Realloc(void *p, size_t size);
703 void tMPI_Free(void *p);
706 /* get the current thread structure pointer */
707 #define tMPI_Get_current() ((struct tmpi_thread*) \
708 tMPI_Thread_getspecific(id_key))
710 /* get the number of this thread */
711 /*#define tMPI_This_threadnr() (tMPI_Get_current() - threads)*/
713 /* get the number of a specific thread. We convert to the resulting size_t to
714 int, which is unlikely to cause problems in the foreseeable future. */
715 #define tMPI_Threadnr(th) (int)(th - threads)
717 /* get thread associated with rank */
718 #define tMPI_Get_thread(comm, rank) (comm->grp.peers[rank])
722 /* get the current thread structure pointer */
723 struct tmpi_thread *tMPI_Get_current(void);
724 /* get the thread belonging to comm with rank rank */
725 struct tmpi_thread *tMPI_Get_thread(tMPI_Comm comm, int rank);
729 /* handle an error, returning the errorcode */
730 int tMPI_Error(tMPI_Comm comm, int tmpi_errno);
734 /* check whether we're the main thread */
735 tmpi_bool tMPI_Is_master(void);
736 /* check whether the current process is in a group */
737 tmpi_bool tMPI_In_group(tMPI_Group group);
739 /* find the rank of a thread in a comm */
740 int tMPI_Comm_seek_rank(tMPI_Comm comm, struct tmpi_thread *th);
741 /* find the size of a comm */
742 int tMPI_Comm_N(tMPI_Comm comm);
744 /* allocate a comm object, making space for N threads */
745 int tMPI_Comm_alloc(tMPI_Comm *newcomm, tMPI_Comm parent, int N);
746 /* de-allocate a comm object */
747 int tMPI_Comm_destroy(tMPI_Comm comm, tmpi_bool do_link_lock);
748 /* allocate a group object */
749 tMPI_Group tMPI_Group_alloc(void);
751 /* topology functions */
752 /* de-allocate a cartesian topology structure. (it is allocated with
753 the internal function tMPI_Cart_init()) */
754 void tMPI_Cart_destroy(struct cart_topol *top);
761 /* initialize a free envelope list with N envelopes */
762 int tMPI_Free_env_list_init(struct free_envelope_list *evl, int N);
763 /* destroy a free envelope list */
764 void tMPI_Free_env_list_destroy(struct free_envelope_list *evl);
767 /* initialize a send envelope list */
768 int tMPI_Send_env_list_init(struct send_envelope_list *evl, int N);
769 /* destroy a send envelope list */
770 void tMPI_Send_env_list_destroy(struct send_envelope_list *evl);
777 /* initialize a recv envelope list */
778 int tMPI_Recv_env_list_init(struct recv_envelope_list *evl);
779 /* destroy a recv envelope list */
780 void tMPI_Recv_env_list_destroy(struct recv_envelope_list *evl);
785 /* initialize request list */
786 int tMPI_Req_list_init(struct req_list *rl, int N_reqs);
787 /* destroy request list */
788 void tMPI_Req_list_destroy(struct req_list *rl);
792 /* collective data structure ops */
795 /* initialize a coll env structure */
796 int tMPI_Coll_env_init(struct coll_env *mev, int N);
797 /* destroy a coll env structure */
798 void tMPI_Coll_env_destroy(struct coll_env *mev);
800 /* initialize a coll sync structure */
801 int tMPI_Coll_sync_init(struct coll_sync *msc, int N);
802 /* destroy a coll sync structure */
803 void tMPI_Coll_sync_destroy(struct coll_sync *msc);
805 #ifdef USE_COLLECTIVE_COPY_BUFFER
806 /* initialize a copy_buffer_list */
807 int tMPI_Copy_buffer_list_init(struct copy_buffer_list *cbl, int Nbufs,
809 /* initialize a copy_buffer_list */
810 void tMPI_Copy_buffer_list_destroy(struct copy_buffer_list *cbl);
811 /* get a copy buffer from a list */
812 struct copy_buffer *tMPI_Copy_buffer_list_get(struct copy_buffer_list *cbl);
813 /* return a copy buffer to a list */
814 void tMPI_Copy_buffer_list_return(struct copy_buffer_list *cbl,
815 struct copy_buffer *cb);
816 /* initialize a copy buffer */
817 int tMPI_Copy_buffer_init(struct copy_buffer *cb, size_t size);
818 void tMPI_Copy_buffer_destroy(struct copy_buffer *cb);
822 /* reduce ops: run a single iteration of a reduce operation on a, b -> dest */
823 int tMPI_Reduce_run_op(void *dest, void *src_a, void *src_b,
824 tMPI_Datatype datatype, int count, tMPI_Op op,
828 /* and we need this prototype */
829 int main(int argc, char **argv);