2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
39 /* this is the header file for the implementation side of the thread_mpi
40 library. It contains the definitions for all the internal data structures
41 and the prototypes for all the internal functions that aren't static. */
48 #ifdef HAVE_SYS_TIME_H
59 #include "thread_mpi/atomic.h"
60 #include "thread_mpi/threads.h"
61 #include "thread_mpi/event.h"
62 #include "thread_mpi/tmpi.h"
63 #include "thread_mpi/collective.h"
64 #include "thread_mpi/barrier.h"
65 #include "thread_mpi/hwinfo.h"
66 #include "thread_mpi/lock.h"
73 /**************************************************************************
77 **************************************************************************/
98 #ifdef USE_COLLECTIVE_COPY_BUFFER
99 /**************************************************************************
101 PRE-ALLOCATED COMMUNICATION BUFFERS
103 **************************************************************************/
106 /* Buffer structure. Every thread structure has several of these ready to
107 be used when the data transmission is small enough for double copying to
108 occur (i.e. the size of the transmission is less than N*MAX_COPY_BUFFER_SIZE,
109 where N is the number of receiving threads).
111 These buffers come in two sizes: one pre-allocated to MAX_COPY_BUFFER_SIZE
112 (for point-to-point transmissions, and one pre-allocated to
113 Nthreads*MAX_COPY_BUFFE_SIZE). */
116 void *buf; /* the actual buffer */
117 struct copy_buffer *next; /* pointer to next free buffer in buffer_list */
118 size_t size; /* allocated size of buffer */
121 /* a list of copy_buffers of a specific size. */
122 struct copy_buffer_list
124 struct copy_buffer *cb; /* pointer to the first copy_buffer */
125 size_t size; /* allocated size of buffers in this list */
126 struct copy_buffer *cb_alloc; /* list as allocated */
127 int Nbufs; /* number of allocated buffers */
143 /**************************************************************************
145 POINT-TO-POINT COMMUNICATION DATA STRUCTURES
147 **************************************************************************/
149 /* the message envelopes (as described in the MPI standard).
150 These fully describes the message, and make each message unique (enough).
152 Transmitting data works by having the sender put a pointer to an envelope
153 onto the receiver's new envelope list corresponding to the originating
155 The sender then waits until the receiver finishes the transmission, while
156 matching all incoming new envelopes against its own list of receive
159 The receiver either directly matches its receiving envelope against
160 all previously un-matched sending envelopes, or, if no suitable envelope
161 is found, it puts the receive envelope on a receive list.
162 Once waiting for completion, the receiver matches against all incoming
165 /* the state of an individual point-to-point transmission */
168 env_unmatched = 0, /* the envelope has not had a match yet */
169 env_copying = 1, /* busy copying (only used for send envelope
170 by receiver if using_cpbuf is true,
171 but cb was still NULL). */
172 env_cb_available = 2, /* the copy buffer is available. Set by
173 the sender on a send_buffer. */
174 env_finished = 3 /* the transmission has finished */
178 /* the envelope. Held in tmpi_thread->evs[src_thread] for send envelopes,
179 or in tmpi_thread->evl for receive envelopes */
182 int tag; /* the tag */
183 tMPI_Comm comm; /* this is a structure shared across threads, so we
184 can test easily whether two threads are talking
185 about the same comm. */
187 struct tmpi_thread *src, *dest; /* these are pretty obvious */
189 void *buf; /* buffer to be sent */
190 size_t bufsize; /* the size of the data to be transmitted */
191 tMPI_Datatype datatype; /* the data type */
193 bool nonblock; /* whether the receiver is non-blocking */
195 /* state, values from enum_envelope_state .
196 (there's a few busy-waits relying on this flag).
197 status=env_unmatched is the initial state.*/
200 /* the error condition */
203 /* the message status */
204 /*tMPI_Status *status;*/
206 /* prev and next envelopes in the send/recv_envelope_list linked list */
207 struct envelope *prev,*next;
209 bool send; /* whether this is a send envelope (if TRUE), or a receive
210 envelope (if FALSE) */
211 #ifdef USE_SEND_RECV_COPY_BUFFER
212 bool using_cb; /* whether a copy buffer is (going to be) used */
213 void* cb;/* the allocated copy buffer pointer */
215 /* the next and previous envelopes in the request list */
216 struct envelope *prev_req, *next_req;
218 /* the list I'm in */
219 struct recv_envelope_list *rlist;
220 struct send_envelope_list *slist;
225 /* singly linked lists of free send & receive envelopes belonging to a
227 struct free_envelope_list
229 struct envelope *head_recv; /* the first element in the linked list */
230 struct envelope *recv_alloc_head; /* the allocated recv list */
233 /* collection of send envelopes to a specific thread */
234 struct send_envelope_list
236 struct envelope *head_free; /* singly linked list with free send
237 envelopes. A single-thread LIFO.*/
238 #ifdef TMPI_LOCK_FREE_LISTS
239 tMPI_Atomic_ptr_t head_new; /* singly linked list with the new send
240 envelopes (i.e. those that are put there by
241 the sending thread, but not yet checked by
242 the receiving thread). This is a lock-free
243 shared detachable list.*/
244 tMPI_Atomic_ptr_t head_rts; /* singly linked list with free send
245 envelopes returned by the other thread.
246 This is a lock-free shared LIFO.*/
248 struct envelope *head_new; /* singly linked list with the new send
249 envelopes (i.e. those that are put there by
250 the sending thread, but not yet checked by
251 the receiving thread). */
252 struct envelope *head_rts; /* singly linked list with free send envelopes */
253 tMPI_Spinlock_t lock_new; /* this locks head_new */
254 tMPI_Spinlock_t lock_rts; /* this locks head_rts */
256 struct envelope *head_old; /* the old send envelopes, in a circular doubly
257 linked list. These have been checked by the
258 receiving thread against the existing
259 recv_envelope_list. */
261 struct envelope *alloc_head; /* the allocated send list */
262 size_t Nalloc; /* number of allocted sends */
265 struct recv_envelope_list
267 struct envelope *head; /* first envelope in this list */
268 struct envelope dummy; /* the dummy element for the list */
272 /* the request object for asynchronious operations. */
275 bool finished; /* whether it's finished */
276 struct envelope *ev; /* the envelope */
278 struct tmpi_thread *source; /* the message source (for receives) */
279 tMPI_Comm comm; /* the comm */
280 int tag; /* the tag */
281 int error; /* error code */
282 size_t transferred; /* the number of transferred bytes */
283 bool cancelled; /* whether the transmission was canceled */
285 struct tmpi_req_ *next,*prev; /* next,prev request in linked list,
286 used in the req_list, but also in
290 /* pre-allocated request object list */
293 struct tmpi_req_ *head; /* pre-allocated singly linked list of requests.
294 (i.e. reqs->prev is undefined). */
295 struct tmpi_req_ *alloc_head; /* the allocated block */
313 /**************************************************************************
315 MULTICAST COMMUNICATION DATA STRUCTURES
317 **************************************************************************/
319 /* these are data structures meant for keeping track of multicast operations
320 (tMPI_Bcast, tMPI_Gather, etc.). Because these operations are all collective
321 across the comm, and are always blocking, the protocol can be much simpler
322 than that for point-to-point communication through tMPI_Send/Recv, etc. */
324 /* unique tags for multicast & collective operations */
325 #define TMPI_BCAST_TAG 1
326 #define TMPI_GATHER_TAG 2
327 #define TMPI_GATHERV_TAG 3
328 #define TMPI_SCATTER_TAG 4
329 #define TMPI_SCATTERV_TAG 5
330 #define TMPI_REDUCE_TAG 6
331 #define TMPI_ALLTOALL_TAG 7
332 #define TMPI_ALLTOALLV_TAG 8
335 /* thread-specific part of the coll_env */
336 struct coll_env_thread
338 tMPI_Atomic_t current_sync; /* sync counter value for the current
340 tMPI_Atomic_t n_remaining; /* remaining threads count for each thread */
342 int tag; /* collective communication type */
343 tMPI_Datatype datatype; /* datatype */
345 void **buf; /* array of send/recv buffer values */
346 size_t *bufsize; /* array of number of bytes to send/recv */
348 #ifdef USE_COLLECTIVE_COPY_BUFFER
349 bool using_cb; /* whether a copy buffer is (going to be) used */
350 tMPI_Atomic_t buf_readcount; /* Number of threads reading from buf
351 while using_cpbuf is true, but cpbuf
353 tMPI_Atomic_ptr_t *cpbuf; /* copy_buffer pointers. */
354 struct copy_buffer *cb; /* the copy buffer cpbuf points to */
357 tMPI_Event send_ev; /* event associated with being the sending thread.
358 Triggered when last receiving thread is ready,
359 and the coll_env_thread is ready for re-use. */
360 tMPI_Event recv_ev; /* event associated with being a receiving thread. */
362 bool *read_data; /* whether we read data from a specific thread. */
365 /* Collective communications once sync. These run in parallel with
366 the collection of coll_env_threads*/
369 /* collective sync data */
370 tMPI_Atomic_t current_sync; /* sync counter value for the current
372 tMPI_Atomic_t n_remaining; /* remaining threads count */
374 void *res; /* result data for once calls. */
377 /* the collective communication envelope. There's a few of these per
378 comm, and each one stands for one collective communication call. */
381 struct coll_env_thread *met; /* thread-specific collective envelope data.*/
383 struct coll_env_coll coll;
387 /* multicast synchronization data structure. There's one of these for
388 each thread in each tMPI_Comm structure */
391 int synct; /* sync counter for coll_env_thread. */
392 int syncs; /* sync counter for coll_env_coll. */
394 tMPI_Event *events; /* One event for each other thread */
395 int N; /* the number of threads */
408 /**************************************************************************
410 THREAD DATA STRUCTURES
412 **************************************************************************/
414 /* information about a running thread. This structure is put in a
415 globally available array; the envelope exchange, etc. are all done through
416 the elements of this array.*/
419 tMPI_Thread_t thread_id; /* this thread's id */
421 /* p2p communication structures: */
423 /* the receive envelopes posted for other threads to check */
424 struct recv_envelope_list evr;
425 /* the send envelopes posted by other threadas */
426 struct send_envelope_list *evs;
427 /* free send and receive envelopes */
428 struct free_envelope_list envelopes;
429 /* number of finished send envelopes */
430 tMPI_Atomic_t ev_outgoing_received;
431 /* the p2p communication events (incoming envelopes + finished send
432 envelopes generate events) */
433 tMPI_Event p2p_event;
434 TMPI_YIELD_WAIT_DATA /* data associated with waiting */
435 struct req_list rql; /* list of pre-allocated requests */
437 /* collective communication structures: */
438 #ifdef USE_COLLECTIVE_COPY_BUFFER
439 /* copy buffer list for multicast communications */
440 struct copy_buffer_list cbl_multi;
443 /* miscellaneous data: */
445 tMPI_Comm self_comm; /* comms for MPI_COMM_SELF */
447 /* the per-thread profile structure that keeps call counts & wait times. */
448 struct tmpi_profile profile;
450 /* The start function (or NULL, if a main()-style start function is to
452 void (*start_fn)(void*);
453 /* The main()-style start function */
454 int (*start_fn_main)(int, char**);
455 /* the argument to the start function, if it's not main()*/
458 /* we copy these for each thread (providing these to main() is not
459 required by the MPI standard, but it's convenient). Note that we copy,
460 because some programs (like Gromacs) like to manipulate these. */
470 /**************************************************************************
472 ERROR HANDLER DATA STRUCTURES
474 **************************************************************************/
477 /* the error handler */
478 struct tmpi_errhandler_
481 tMPI_Errhandler_fn fn;
484 /* standard error handler functions */
485 void tmpi_errors_are_fatal_fn(tMPI_Comm *comm, int *err);
486 void tmpi_errors_return_fn(tMPI_Comm *comm, int *err);
492 /**************************************************************************
494 GLOBAL DATA STRUCTURE
496 **************************************************************************/
498 /* global MPI information */
501 /* list of pointers to all user-defined types */
502 struct tmpi_datatype_ **usertypes;
504 int Nalloc_usertypes;
506 /* spinlock/mutex for manipulating tmpi_user_types */
507 tMPI_Spinlock_t datatype_lock;
509 /* barrier for tMPI_Finalize(), etc. */
510 tMPI_Thread_barrier_t barrier;
512 /* the timer for tMPI_Wtime() */
513 tMPI_Thread_mutex_t timer_mutex;
514 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
515 /* the time at initialization. */
516 struct timeval timer_init;
518 /* the time at initialization. */
537 /**************************************************************************
539 COMMUNICATOR DATA STRUCTURES
541 **************************************************************************/
546 int N; /* the number of threads */
547 struct tmpi_thread **peers; /* the list of peers to communicate with */
549 int Nrefs; /* the number of references to this structure */
554 /* the communicator objects are globally shared. */
557 struct tmpi_group_ grp; /* the communicator group */
559 /* the barrier for tMPI_Barrier() */
560 tMPI_Barrier_t barrier;
563 /* List of barriers for reduce operations.
564 reduce_barrier[0] contains a list of N/2 barriers for N threads
565 reduce_barrier[1] contains a list of N/4 barriers for N/2 threads
566 reduce_barrier[2] contains a list of N/8 barriers for N/4 threads
567 and so on. (until N/x reaches 1)
568 This is to facilitate tree-based algorithms for tMPI_Reduce, etc. */
569 tMPI_Barrier_t **reduce_barrier;
570 int *N_reduce; /* the number of barriers in each iteration */
571 int N_reduce_iter; /* the number of iterations */
574 struct coll_env *cev; /* list of multicast envelope objecs */
575 struct coll_sync *csync; /* list of multicast sync objecs */
577 /* lists of globally shared send/receive buffers for tMPI_Reduce. */
578 tMPI_Atomic_ptr_t *reduce_sendbuf, *reduce_recvbuf;
580 /* mutex for communication object creation. Traditional mutexes are
581 better here because communicator creation should not be done in
582 time-critical sections of code. */
583 tMPI_Thread_mutex_t comm_create_lock;
584 tMPI_Thread_cond_t comm_create_prep;
585 tMPI_Thread_cond_t comm_create_finish;
587 tMPI_Comm *new_comm; /* newly created communicators */
589 /* the split structure is shared among the comm threads and is
590 allocated & deallocated during tMPI_Comm_split */
591 struct tmpi_split *split;
593 /* the topologies (only cartesian topology is currently implemented */
594 struct cart_topol *cart;
595 /*struct tmpi_graph_topol_ *graph;*/
599 /* links for a global circular list of all comms that starts at
600 TMPI_COMM_WORLD. Used to de-allocate the comm structures after
602 struct tmpi_comm_ *next,*prev;
606 /* specific for tMPI_Split: */
609 volatile int Ncol_init;
610 volatile int Ncol_destroy;
611 volatile bool can_finish;
612 volatile int *colors;
616 /* cartesian topology */
619 int ndims; /* number of dimensions */
620 int *dims; /* procs per coordinate */
621 int *periods; /* whether the grid is periodic, per dimension */
626 struct tmpi_graph_topol_
635 /**************************************************************************
637 DATA TYPE DATA STRUCTURES
639 **************************************************************************/
641 /* tMPI_Reduce Op functions */
642 typedef void (*tMPI_Op_fn)(void*, void*, void*, int);
645 struct tmpi_datatype_component
647 struct tmpi_datatype_ *type;
651 /* we don't support datatypes with holes (yet) */
652 struct tmpi_datatype_
654 size_t size; /* full extent of type. */
655 tMPI_Op_fn *op_functions; /* array of op functions for this datatype */
656 int N_comp; /* number of components */
657 struct tmpi_datatype_component *comps; /* the components */
658 bool committed; /* whether the data type is committed */
660 /* just as a shorthand: */
661 typedef struct tmpi_datatype_ tmpi_dt;
670 /**************************************************************************
674 **************************************************************************/
677 /* the threads themselves (tmpi_comm only contains lists of pointers to this
679 extern struct tmpi_thread *threads;
683 extern tMPI_Thread_key_t id_key; /* the key to get the thread id */
685 /* misc. global information about MPI */
686 extern struct tmpi_global *tmpi_global;
695 /**************************************************************************
697 FUNCTION PROTOTYPES & MACROS
699 **************************************************************************/
702 void tMPI_Trace_print(const char *fmt, ...);
705 /* error-checking malloc/realloc: */
706 void *tMPI_Malloc(size_t size);
707 void *tMPI_Realloc(void *p, size_t size);
710 /* get the current thread structure pointer */
711 #define tMPI_Get_current() ((struct tmpi_thread*) \
712 tMPI_Thread_getspecific(id_key))
714 /* get the number of this thread */
715 /*#define tMPI_This_threadnr() (tMPI_Get_current() - threads)*/
717 /* get the number of a specific thread. We convert to the resulting size_t to
718 int, which is unlikely to cause problems in the foreseeable future. */
719 #define tMPI_Threadnr(th) (int)(th - threads)
721 /* get thread associated with rank */
722 #define tMPI_Get_thread(comm, rank) (comm->grp.peers[rank])
726 /* get the current thread structure pointer */
727 struct tmpi_thread *tMPI_Get_current(void);
728 /* get the thread belonging to comm with rank rank */
729 struct tmpi_thread *tMPI_Get_thread(tMPI_Comm comm, int rank);
733 /* handle an error, returning the errorcode */
734 int tMPI_Error(tMPI_Comm comm, int tmpi_errno);
738 /* check whether we're the main thread */
739 bool tMPI_Is_master(void);
740 /* check whether the current process is in a group */
741 bool tMPI_In_group(tMPI_Group group);
743 /* find the rank of a thread in a comm */
744 int tMPI_Comm_seek_rank(tMPI_Comm comm, struct tmpi_thread *th);
745 /* find the size of a comm */
746 int tMPI_Comm_N(tMPI_Comm comm);
748 /* allocate a comm object, making space for N threads */
749 tMPI_Comm tMPI_Comm_alloc(tMPI_Comm parent, int N);
750 /* de-allocate a comm object */
751 void tMPI_Comm_destroy(tMPI_Comm comm);
752 /* allocate a group object */
753 tMPI_Group tMPI_Group_alloc(void);
755 /* topology functions */
756 /* de-allocate a cartesian topology structure. (it is allocated with
757 the internal function tMPI_Cart_init()) */
758 void tMPI_Cart_destroy(struct cart_topol *top);
761 #ifdef USE_COLLECTIVE_COPY_BUFFER
762 /* initialize a copy_buffer_list */
763 void tMPI_Copy_buffer_list_init(struct copy_buffer_list *cbl, int Nbufs,
765 /* initialize a copy_buffer_list */
766 void tMPI_Copy_buffer_list_destroy(struct copy_buffer_list *cbl);
767 /* get a copy buffer from a list */
768 struct copy_buffer *tMPI_Copy_buffer_list_get(struct copy_buffer_list *cbl);
769 /* return a copy buffer to a list */
770 void tMPI_Copy_buffer_list_return(struct copy_buffer_list *cbl,
771 struct copy_buffer *cb);
772 /* initialize a copy buffer */
773 void tMPI_Copy_buffer_init(struct copy_buffer *cb, size_t size);
774 void tMPI_Copy_buffer_destroy(struct copy_buffer *cb);
781 /* initialize a free envelope list with N envelopes */
782 void tMPI_Free_env_list_init(struct free_envelope_list *evl, int N);
783 /* destroy a free envelope list */
784 void tMPI_Free_env_list_destroy(struct free_envelope_list *evl);
787 /* initialize a send envelope list */
788 void tMPI_Send_env_list_init(struct send_envelope_list *evl, int N);
789 /* destroy a send envelope list */
790 void tMPI_Send_env_list_destroy(struct send_envelope_list *evl);
797 /* initialize a recv envelope list */
798 void tMPI_Recv_env_list_init(struct recv_envelope_list *evl);
799 /* destroy a recv envelope list */
800 void tMPI_Recv_env_list_destroy(struct recv_envelope_list *evl);
805 /* initialize request list */
806 void tMPI_Req_list_init(struct req_list *rl, int N_reqs);
807 /* destroy request list */
808 void tMPI_Req_list_destroy(struct req_list *rl);
813 /* multicast functions */
814 /* initialize a coll env structure */
815 void tMPI_Coll_env_init(struct coll_env *mev, int N);
816 /* destroy a coll env structure */
817 void tMPI_Coll_env_destroy(struct coll_env *mev);
819 /* initialize a coll sync structure */
820 void tMPI_Coll_sync_init(struct coll_sync *msc, int N);
821 /* destroy a coll sync structure */
822 void tMPI_Coll_sync_destroy(struct coll_sync *msc);
827 /* and we need this prototype */
828 int main(int argc, char **argv);