2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
59 /* helper function for tMPI_Comm_split. Splits N entities with color and key
60 out so that the output contains Ngroups groups each with elements
61 of the same color. The group array contains the entities in each group. */
62 static void tMPI_Split_colors(int N, const int *color, const int *key,
63 int *Ngroups, int *grp_N, int *grp_color,
71 /* communicator query&manipulation functions */
72 int tMPI_Comm_N(tMPI_Comm comm)
75 tMPI_Trace_print("tMPI_Comm_N(%p)", comm);
84 int tMPI_Comm_size(tMPI_Comm comm, int *size)
87 tMPI_Trace_print("tMPI_Comm_size(%p, %p)", comm, size);
89 return tMPI_Group_size(&(comm->grp), size);
92 int tMPI_Comm_rank(tMPI_Comm comm, int *rank)
95 tMPI_Trace_print("tMPI_Comm_rank(%p, %p)", comm, rank);
97 return tMPI_Group_rank(&(comm->grp), rank);
101 int tMPI_Comm_compare(tMPI_Comm comm1, tMPI_Comm comm2, int *result)
105 tMPI_Trace_print("tMPI_Comm_compare(%p, %p, %p)", comm1, comm2, result);
109 *result = TMPI_IDENT;
113 if ( (!comm1) || (!comm2) )
115 *result = TMPI_UNEQUAL;
119 if (comm1->grp.N != comm2->grp.N)
121 *result = TMPI_UNEQUAL;
125 *result = TMPI_CONGRUENT;
126 /* we assume that there are two identical comm members within a comm */
127 for (i = 0; i < comm1->grp.N; i++)
129 if (comm1->grp.peers[i] != comm2->grp.peers[i])
131 tmpi_bool found = FALSE;
133 *result = TMPI_SIMILAR;
134 for (j = 0; j < comm2->grp.N; j++)
136 if (comm1->grp.peers[i] == comm2->grp.peers[j])
144 *result = TMPI_UNEQUAL;
153 int tMPI_Comm_alloc(tMPI_Comm *newcomm, tMPI_Comm parent, int N)
155 struct tmpi_comm_ *retc;
159 retc = (struct tmpi_comm_*)tMPI_Malloc(sizeof(struct tmpi_comm_));
162 return TMPI_ERR_NO_MEM;
165 retc->grp.peers = (struct tmpi_thread**)tMPI_Malloc(
166 sizeof(struct tmpi_thread*)*Nthreads);
167 if (retc->grp.peers == NULL)
169 return TMPI_ERR_NO_MEM;
173 ret = tMPI_Thread_mutex_init( &(retc->comm_create_lock) );
176 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
178 ret = tMPI_Thread_cond_init( &(retc->comm_create_prep) );
181 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
183 ret = tMPI_Thread_cond_init( &(retc->comm_create_finish) );
186 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
190 retc->new_comm = NULL;
191 /* we have no topology to start out with */
193 /*retc->graph=NULL;*/
195 /* we start counting at 0 */
196 tMPI_Atomic_set( &(retc->destroy_counter), 0);
198 /* initialize the main barrier */
199 tMPI_Barrier_init(&(retc->barrier), N);
201 /* the reduce barriers */
203 /* First calculate the number of reduce barriers */
204 int Niter = 0; /* the iteration number */
205 int Nred = N; /* the number of reduce barriers for this iteration */
208 /* Nred is now Nred/2 + a rest term because solitary
209 process at the end of the list must still be accounter for */
210 Nred = Nred/2 + Nred%2;
214 retc->N_reduce_iter = Niter;
215 /* allocate the list */
216 retc->reduce_barrier = (tMPI_Barrier_t**)
217 tMPI_Malloc(sizeof(tMPI_Barrier_t*)*(Niter+1));
218 if (retc->reduce_barrier == NULL)
220 return TMPI_ERR_NO_MEM;
222 retc->N_reduce = (int*)tMPI_Malloc(sizeof(int)*(Niter+1));
223 if (retc->N_reduce == NULL)
225 return TMPI_ERR_NO_MEM;
228 /* we re-set Nred to N */
230 for (i = 0; i < Niter; i++)
234 Nred = Nred/2 + Nred%2;
235 retc->N_reduce[i] = Nred;
236 /* allocate the sub-list */
237 retc->reduce_barrier[i] = (tMPI_Barrier_t*)
238 tMPI_Malloc(sizeof(tMPI_Barrier_t)*(Nred));
239 if (retc->reduce_barrier[i] == NULL)
241 return TMPI_ERR_NO_MEM;
243 for (j = 0; j < Nred; j++)
245 tMPI_Barrier_init(&(retc->reduce_barrier[i][j]), 2);
250 /* the reduce buffers */
251 retc->reduce_sendbuf = (tMPI_Atomic_ptr_t*)
252 tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*Nthreads);
253 if (retc->reduce_sendbuf == NULL)
255 return TMPI_ERR_NO_MEM;
257 retc->reduce_recvbuf = (tMPI_Atomic_ptr_t*)
258 tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*Nthreads);
259 if (retc->reduce_recvbuf == NULL)
261 return TMPI_ERR_NO_MEM;
266 retc->erh = parent->erh;
270 retc->erh = TMPI_ERRORS_ARE_FATAL;
273 /* coll_env objects */
274 retc->cev = (struct coll_env*)tMPI_Malloc(sizeof(struct coll_env)*
276 if (retc->cev == NULL)
278 return TMPI_ERR_NO_MEM;
281 for (i = 0; i < N_COLL_ENV; i++)
283 ret = tMPI_Coll_env_init( &(retc->cev[i]), N);
284 if (ret != TMPI_SUCCESS)
289 /* multi_sync objects */
290 retc->csync = (struct coll_sync*)tMPI_Malloc(sizeof(struct coll_sync)*N);
291 if (retc->csync == NULL)
293 return TMPI_ERR_NO_MEM;
296 for (i = 0; i < N; i++)
298 ret = tMPI_Coll_sync_init( &(retc->csync[i]), N);
299 if (ret != TMPI_SUCCESS)
305 ret = tMPI_Thread_mutex_lock( &(tmpi_global->comm_link_lock) );
308 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
310 /* we insert ourselves in the circular list, after TMPI_COMM_WORLD */
313 retc->next = TMPI_COMM_WORLD;
314 retc->prev = TMPI_COMM_WORLD->prev;
316 TMPI_COMM_WORLD->prev->next = retc;
317 TMPI_COMM_WORLD->prev = retc;
321 retc->prev = retc->next = retc;
323 ret = tMPI_Thread_mutex_unlock( &(tmpi_global->comm_link_lock) );
326 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
332 int tMPI_Comm_destroy(tMPI_Comm comm, tmpi_bool do_link_lock)
337 free(comm->grp.peers);
338 for (i = 0; i < comm->N_reduce_iter; i++)
340 free(comm->reduce_barrier[i]);
342 free(comm->reduce_barrier);
343 free(comm->N_reduce);
345 for (i = 0; i < N_COLL_ENV; i++)
347 tMPI_Coll_env_destroy( &(comm->cev[i]) );
349 for (i = 0; i < comm->grp.N; i++)
351 tMPI_Coll_sync_destroy( &(comm->csync[i]) );
356 ret = tMPI_Thread_mutex_destroy( &(comm->comm_create_lock) );
359 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
361 ret = tMPI_Thread_cond_destroy( &(comm->comm_create_prep) );
364 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
366 ret = tMPI_Thread_cond_destroy( &(comm->comm_create_finish) );
369 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
372 free((void*)comm->reduce_sendbuf);
373 free((void*)comm->reduce_recvbuf);
377 tMPI_Cart_destroy( comm->cart );
381 /* remove ourselves from the circular list */
384 ret = tMPI_Thread_mutex_lock( &(tmpi_global->comm_link_lock) );
387 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
392 comm->next->prev = comm->prev;
396 comm->prev->next = comm->next;
401 ret = tMPI_Thread_mutex_unlock( &(tmpi_global->comm_link_lock) );
404 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
410 int tMPI_Comm_free(tMPI_Comm *comm)
416 tMPI_Trace_print("tMPI_Comm_free(%p)", comm);
424 if ((*comm)->grp.N > 1)
426 /* we remove ourselves from the comm. */
427 ret = tMPI_Thread_mutex_lock(&((*comm)->comm_create_lock));
430 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
432 (*comm)->grp.peers[myrank] = (*comm)->grp.peers[(*comm)->grp.N-1];
434 ret = tMPI_Thread_mutex_unlock(&((*comm)->comm_create_lock));
437 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
442 /* we're the last one so we can safely destroy it */
443 ret = tMPI_Comm_destroy(*comm, TRUE);
450 /* This is correct if programs actually treat Comm_free as a collective
457 size = (*comm)->grp.N;
459 /* we add 1 to the destroy counter and actually deallocate if the counter
461 sum = tMPI_Atomic_fetch_add( &((*comm)->destroy_counter), 1) + 1;
462 /* this is a collective call on a shared data structure, so only
463 one process (the last one in this case) should do anything */
466 ret = tMPI_Comm_destroy(*comm, TRUE);
476 int tMPI_Comm_dup(tMPI_Comm comm, tMPI_Comm *newcomm)
479 tMPI_Trace_print("tMPI_Comm_dup(%p, %p)", comm, newcomm);
481 /* we just call Comm_split because it already contains all the
482 neccesary synchronization constructs. */
483 return tMPI_Comm_split(comm, 0, tMPI_Comm_seek_rank(comm,
484 tMPI_Get_current()), newcomm);
488 int tMPI_Comm_create(tMPI_Comm comm, tMPI_Group group, tMPI_Comm *newcomm)
490 int color = TMPI_UNDEFINED;
491 int key = tMPI_Comm_seek_rank(comm, tMPI_Get_current());
494 tMPI_Trace_print("tMPI_Comm_create(%p, %p, %p)", comm, group, newcomm);
496 if (tMPI_In_group(group))
500 /* the MPI specs specifically say that this is equivalent */
501 return tMPI_Comm_split(comm, color, key, newcomm);
504 static void tMPI_Split_colors(int N, const int *color, const int *key,
505 int *Ngroups, int *grp_N, int *grp_color,
512 for (i = 0; i < N; i++)
516 for (i = 0; i < N; i++)
518 if (color[i] != TMPI_UNDEFINED)
521 for (j = 0; j < (*Ngroups); j++)
523 if (grp_color[j] == color[i])
525 /* we insert where we need to, by counting back */
528 while (k > 0 && ( key[group[N*j + k-1]] > key[i]) )
531 group[N*j + k] = group[N*j + k-1];
541 /* not found. just add a new color */
542 grp_N[(*Ngroups)] = 1;
543 grp_color[(*Ngroups)] = color[i];
544 group[N*(*Ngroups) + 0] = i;
551 /* this is the main comm creation function. All other functions that create
553 int tMPI_Comm_split(tMPI_Comm comm, int color, int key, tMPI_Comm *newcomm)
556 int N = tMPI_Comm_N(comm);
557 volatile tMPI_Comm *newcomm_list;
558 volatile int colors[MAX_PREALLOC_THREADS]; /* array with the colors
560 volatile int keys[MAX_PREALLOC_THREADS]; /* same for keys (only one of
561 the threads actually suplies
562 these arrays to the comm
564 tmpi_bool i_am_first = FALSE;
565 int myrank = tMPI_Comm_seek_rank(comm, tMPI_Get_current());
566 struct tmpi_split *spl;
570 tMPI_Trace_print("tMPI_Comm_split(%p, %d, %d, %p)", comm, color, key,
576 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
579 ret = tMPI_Thread_mutex_lock(&(comm->comm_create_lock));
582 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
584 /* first get the colors */
587 /* i am apparently first */
588 comm->split = (struct tmpi_split*)tMPI_Malloc(sizeof(struct tmpi_split));
589 comm->new_comm = (tMPI_Comm*)tMPI_Malloc(N*sizeof(tMPI_Comm));
590 if (N <= MAX_PREALLOC_THREADS)
592 comm->split->colors = colors;
593 comm->split->keys = keys;
597 comm->split->colors = (int*)tMPI_Malloc(N*sizeof(int));
598 comm->split->keys = (int*)tMPI_Malloc(N*sizeof(int));
600 comm->split->Ncol_init = tMPI_Comm_N(comm);
601 comm->split->can_finish = FALSE;
603 /* the main communicator contains a list the size of grp.N */
605 newcomm_list = comm->new_comm; /* we copy it to the local stacks because
606 we can later erase comm->new_comm safely */
607 spl = comm->split; /* we do the same for spl */
608 spl->colors[myrank] = color;
609 spl->keys[myrank] = key;
612 if (spl->Ncol_init == 0)
614 ret = tMPI_Thread_cond_signal(&(comm->comm_create_prep));
617 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
623 /* all other threads can just wait until the creator thread is
625 while (!spl->can_finish)
627 ret = tMPI_Thread_cond_wait(&(comm->comm_create_finish),
628 &(comm->comm_create_lock) );
631 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
638 int comm_color_[MAX_PREALLOC_THREADS];
639 int comm_N_[MAX_PREALLOC_THREADS];
640 int *comm_color = comm_color_; /* there can't be more comms than N*/
641 int *comm_N = comm_N_; /* the number of procs in a group */
643 int *comm_groups; /* the groups */
644 tMPI_Comm *comms; /* the communicators */
646 /* wait for the colors to be done */
648 while (spl->Ncol_init > 0)
650 ret = tMPI_Thread_cond_wait(&(comm->comm_create_prep),
651 &(comm->comm_create_lock));
654 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
658 /* reset the state so that a new comm creating function can run */
659 spl->Ncol_destroy = N;
663 comm_groups = (int*)tMPI_Malloc(N*N*sizeof(int));
664 if (N > MAX_PREALLOC_THREADS)
666 comm_color = (int*)tMPI_Malloc(N*sizeof(int));
667 comm_N = (int*)tMPI_Malloc(N*sizeof(int));
670 /* count colors, allocate and split up communicators */
671 tMPI_Split_colors(N, (int*)spl->colors,
674 comm_N, comm_color, comm_groups);
677 /* allocate a bunch of communicators */
678 comms = (tMPI_Comm*)tMPI_Malloc(Ncomms*sizeof(tMPI_Comm));
679 for (i = 0; i < Ncomms; i++)
681 ret = tMPI_Comm_alloc(&(comms[i]), comm, comm_N[i]);
682 if (ret != TMPI_SUCCESS)
688 /* now distribute the comms */
689 for (i = 0; i < Ncomms; i++)
691 comms[i]->grp.N = comm_N[i];
692 for (j = 0; j < comm_N[i]; j++)
694 comms[i]->grp.peers[j] =
695 comm->grp.peers[comm_groups[i*comm->grp.N + j]];
698 /* and put them into the newcomm_list */
699 for (i = 0; i < N; i++)
701 newcomm_list[i] = TMPI_COMM_NULL;
702 for (j = 0; j < Ncomms; j++)
704 if (spl->colors[i] == comm_color[j])
706 newcomm_list[i] = comms[j];
714 for (i = 0; i < Ncomms; i++)
716 printf("Group %d (color %d) has %d members: ",
717 i, comm_color[i], comm_N[i]);
718 for (j = 0; j < comm_N[i]; j++)
720 printf(" %d ", comm_groups[comm->grp.N*i + j]);
724 for (j = 0; j < comm_N[i]; j++)
726 printf(" %d ", spl->keys[comm_groups[N*i + j]]);
729 for (j = 0; j < comm_N[i]; j++)
731 printf(" %d ", spl->colors[comm_groups[N*i + j]]);
736 if (N > MAX_PREALLOC_THREADS)
738 free((int*)spl->colors);
739 free((int*)spl->keys);
745 spl->can_finish = TRUE;
747 /* tell the waiting threads that there's a comm ready */
748 ret = tMPI_Thread_cond_broadcast(&(comm->comm_create_finish));
751 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
754 /* here the individual threads get their comm object */
755 *newcomm = newcomm_list[myrank];
757 /* free when we have assigned them all, so we can reuse the object*/
759 if (spl->Ncol_destroy == 0)
761 free((void*)newcomm_list);
765 ret = tMPI_Thread_mutex_unlock(&(comm->comm_create_lock));
768 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_IO);
774 int tMPI_Comm_seek_rank(tMPI_Comm comm, struct tmpi_thread *th)
782 for (i = 0; i < comm->grp.N; i++)
784 if (comm->grp.peers[i] == th)