Redefine the default boolean type to gmx_bool.
[alexxy/gromacs.git] / src / gmxlib / thread_mpi / comm.c
1 /*
2 This source code file is part of thread_mpi.  
3 Written by Sander Pronk, Erik Lindahl, and possibly others. 
4
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6 All rights reserved.
7
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11    notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13    notice, this list of conditions and the following disclaimer in the
14    documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16    names of its contributors may be used to endorse or promote products
17    derived from this software without specific prior written permission.
18
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
35 files.
36 */
37
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
40 #endif
41
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
45
46 #ifdef HAVE_UNISTD_H
47 #include <unistd.h>
48 #endif
49
50 #include <errno.h>
51 #include <stdlib.h>
52 #include <stdio.h>
53 #include <stdarg.h>
54 #include <string.h>
55
56
57 #include "impl.h"
58
59 /* helper function for tMPI_Comm_split. Splits N entities with color and key
60    out so that the output contains Ngroups groups each with elements
61    of the same color. The group array contains the entities in each group. */
62 static void tMPI_Split_colors(int N, const int *color, const int *key,
63                               int *Ngroups, int *grp_N, int *grp_color,
64                               int *group);
65
66
67
68
69
70
71 /* communicator query&manipulation functions */
72 int tMPI_Comm_N(tMPI_Comm comm)
73 {
74 #ifdef TMPI_TRACE
75     tMPI_Trace_print("tMPI_Comm_N(%p)", comm);
76 #endif
77     if (!comm)
78         return 0;
79     return comm->grp.N;
80 }
81
82 int tMPI_Comm_size(tMPI_Comm comm, int *size)
83 {
84 #ifdef TMPI_TRACE
85     tMPI_Trace_print("tMPI_Comm_size(%p, %p)", comm, size);
86 #endif
87     return tMPI_Group_size(&(comm->grp), size);
88 }
89
90 int tMPI_Comm_rank(tMPI_Comm comm, int *rank)
91 {
92 #ifdef TMPI_TRACE
93     tMPI_Trace_print("tMPI_Comm_rank(%p, %p)", comm, rank);
94 #endif
95     return tMPI_Group_rank(&(comm->grp), rank);
96 }
97
98
99 int tMPI_Comm_compare(tMPI_Comm comm1, tMPI_Comm comm2, int *result)
100 {
101     int i,j;
102 #ifdef TMPI_TRACE
103     tMPI_Trace_print("tMPI_Comm_compare(%p, %p, %p)", comm1, comm2, result);
104 #endif
105     if (comm1 == comm2)
106     {
107         *result=TMPI_IDENT;
108         return TMPI_SUCCESS;
109     }
110
111     if ( (!comm1) || (!comm2) )
112     {
113         *result=TMPI_UNEQUAL;
114         return TMPI_SUCCESS;
115     }
116
117     if (comm1->grp.N != comm2->grp.N)
118     {
119         *result=TMPI_UNEQUAL;
120         return TMPI_SUCCESS;
121     }
122
123     *result=TMPI_CONGRUENT;
124     /* we assume that there are two identical comm members within a comm */
125     for(i=0;i<comm1->grp.N;i++)
126     {
127         if (comm1->grp.peers[i] != comm2->grp.peers[i])
128         {
129             gmx_bool found=FALSE;
130
131             *result=TMPI_SIMILAR;
132             for(j=0;j<comm2->grp.N;j++)
133             {
134                 if (comm1->grp.peers[i] == comm2->grp.peers[j])
135                 {
136                     found=TRUE;
137                     break;
138                 }
139             }
140             if (!found)
141             {
142                 *result=TMPI_UNEQUAL;
143                 return TMPI_SUCCESS;
144             }
145         }
146     }
147     return TMPI_SUCCESS;
148 }
149
150
151 tMPI_Comm tMPI_Comm_alloc(tMPI_Comm parent, int N)
152 {
153     struct tmpi_comm_ *ret;
154     int i;
155
156     ret=(struct tmpi_comm_*)tMPI_Malloc(sizeof(struct tmpi_comm_));
157     ret->grp.peers=(struct tmpi_thread**)tMPI_Malloc(
158                                 sizeof(struct tmpi_thread*)*Nthreads);
159     ret->grp.N=N;
160
161     tMPI_Thread_mutex_init( &(ret->comm_create_lock) );
162     tMPI_Thread_cond_init( &(ret->comm_create_prep) );
163     tMPI_Thread_cond_init( &(ret->comm_create_finish) );
164
165     ret->split = NULL;
166     ret->new_comm = NULL;
167     /* we have no topology to start out with */
168     ret->cart=NULL;
169     /*ret->graph=NULL;*/
170
171
172     /* initialize the main barrier */
173     tMPI_Barrier_init(&(ret->barrier), N);
174
175 #if 0
176     {
177         /* calculate the number of reduce barriers */
178         int Nbarriers=0;
179         int Nred=N;
180         while(Nred>1) {
181             Nbarriers+=1;
182             Nred = Nred/2 + Nred%2;
183         } 
184
185         ret->Nreduce_barriers=Nbarriers;
186         ret->reduce_barrier=(tMPI_Barrier_t*)
187                   tMPI_Malloc(sizeof(tMPI_Barrier_t)*(Nbarriers+1));
188         ret->N_reduce_barrier=(int*)tMPI_Malloc(sizeof(int)*(Nbarriers+1));
189         Nred=N;
190         for(i=0;i<Nbarriers;i++)
191         {
192             tMPI_Barrier_init( &(ret->reduce_barrier[i]), Nred);
193             ret->N_reduce_barrier[i]=Nred;
194             /* Nred is now Nred/2 + a rest term because solitary 
195                process at the end of the list must still be accounter for */
196             Nred = Nred/2 + Nred%2;
197         }
198     }
199 #endif
200
201     /* the reduce barriers */
202     {
203         /* First calculate the number of reduce barriers */
204         int Niter=0; /* the iteration number */
205         int Nred=N; /* the number of reduce barriers for this iteration */
206         while(Nred>1) {
207             /* Nred is now Nred/2 + a rest term because solitary 
208                process at the end of the list must still be accounter for */
209             Nred = Nred/2 + Nred%2;
210             Niter+=1;
211         } 
212
213         ret->N_reduce_iter=Niter;
214         /* allocate the list */
215         ret->reduce_barrier=(tMPI_Barrier_t**)
216                   tMPI_Malloc(sizeof(tMPI_Barrier_t*)*(Niter+1));
217         ret->N_reduce=(int*)tMPI_Malloc(sizeof(int)*(Niter+1));
218
219         /* we re-set Nred to N */
220         Nred=N;
221         for(i=0;i<Niter;i++)
222         {
223             int j;
224
225             Nred = Nred/2 + Nred%2;
226             ret->N_reduce[i] = Nred;
227             /* allocate the sub-list */
228             ret->reduce_barrier[i]=(tMPI_Barrier_t*)
229                       tMPI_Malloc(sizeof(tMPI_Barrier_t)*(Nred));
230             for(j=0;j<Nred;j++)
231             {
232                 tMPI_Barrier_init(&(ret->reduce_barrier[i][j]),2);
233             }
234         }
235     }
236
237     /* the reduce buffers */
238 #if 0
239     ret->sendbuf=(volatile void**)tMPI_Malloc(sizeof(void*)*Nthreads);
240     ret->recvbuf=(volatile void**)tMPI_Malloc(sizeof(void*)*Nthreads);
241 #else
242     ret->reduce_sendbuf=(tMPI_Atomic_ptr_t*)
243               tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*Nthreads);
244     ret->reduce_recvbuf=(tMPI_Atomic_ptr_t*)
245               tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*Nthreads);
246 #endif
247
248
249     if (parent)
250     {
251         ret->erh=parent->erh;
252     }
253     else
254     {
255         ret->erh=TMPI_ERRORS_ARE_FATAL;
256     }
257
258     /* coll_env objects */
259     ret->cev=(struct coll_env*)tMPI_Malloc(sizeof(struct coll_env)*N_COLL_ENV);
260     for(i=0;i<N_COLL_ENV;i++)
261         tMPI_Coll_env_init( &(ret->cev[i]), N);
262     /* multi_sync objects */
263     ret->csync=(struct coll_sync*)tMPI_Malloc(sizeof(struct coll_sync)*N);
264     for(i=0;i<N;i++)
265         tMPI_Coll_sync_init( &(ret->csync[i]), N);
266
267     /* we insert ourselves in the circular list, after TMPI_COMM_WORLD */
268     if (TMPI_COMM_WORLD)
269     {
270         ret->next=TMPI_COMM_WORLD;
271         ret->prev=TMPI_COMM_WORLD->prev;
272
273         TMPI_COMM_WORLD->prev->next = ret;
274         TMPI_COMM_WORLD->prev = ret;
275     }
276     else
277     {
278         ret->prev=ret->next=ret;
279     }
280
281     return ret;
282 }
283
284 void tMPI_Comm_destroy(tMPI_Comm comm)
285 {
286     int i;
287
288     free(comm->grp.peers);
289 #if 0
290     free(comm->reduce_barrier);
291     free(comm->N_reduce_barrier);
292 #endif
293     for(i=0;i<comm->N_reduce_iter;i++)
294         free(comm->reduce_barrier[i]);
295     free(comm->reduce_barrier);
296     free(comm->N_reduce);
297
298     for(i=0;i<N_COLL_ENV;i++)
299         tMPI_Coll_env_destroy( &(comm->cev[i]) );
300     for(i=0;i<comm->grp.N;i++)
301         tMPI_Coll_sync_destroy( &(comm->csync[i]) );
302     free(comm->cev);
303     free(comm->csync);
304
305     tMPI_Thread_mutex_destroy( &(comm->comm_create_lock) );
306     tMPI_Thread_cond_destroy( &(comm->comm_create_prep) );
307     tMPI_Thread_cond_destroy( &(comm->comm_create_finish) );
308
309     free((void*)comm->reduce_sendbuf);
310     free((void*)comm->reduce_recvbuf);
311
312     if ( comm->cart )
313     {
314         tMPI_Cart_destroy( comm->cart );
315         free(comm->cart);
316     }
317
318     /* remove ourselves from the circular list */
319     if (comm->next)
320         comm->next->prev=comm->prev;
321     if (comm->prev)
322         comm->prev->next=comm->next;
323
324     free(comm);
325 }
326
327 int tMPI_Comm_free(tMPI_Comm *comm)
328 {
329     int myrank=tMPI_Comm_seek_rank(*comm, tMPI_Get_current());
330 #ifdef TMPI_TRACE
331     tMPI_Trace_print("tMPI_Comm_free(%p)", comm);
332 #endif
333 #ifndef TMPI_STRICT
334     if (! *comm)
335         return TMPI_SUCCESS;
336
337     if ((*comm)->grp.N > 1)
338     {
339         /* we remove ourselves from the comm. */
340         tMPI_Thread_mutex_lock(&((*comm)->comm_create_lock));
341         (*comm)->grp.peers[myrank] = (*comm)->grp.peers[(*comm)->grp.N-1];
342         (*comm)->grp.N--;
343         tMPI_Thread_mutex_unlock(&((*comm)->comm_create_lock));
344     }
345     else
346     {
347         /* we're the last one so we can safely destroy it */
348         tMPI_Comm_destroy(*comm);
349     }
350 #else
351     /* This is correct if programs actually treat Comm_free as a 
352        collective call */
353     /* we need to barrier because the comm is a shared structure and
354        we have to be sure that nobody else is using it 
355        (for example, to get its rank, like above) before destroying it*/
356     tMPI_Barrier(*comm);
357     /* this is a collective call on a shared data structure, so only 
358        one process (rank[0] in this case) should do anything */
359     if (myrank==0)
360     {
361         tMPI_Comm_destroy(*comm);
362     }
363 #endif
364     return TMPI_SUCCESS;
365 }
366
367 int tMPI_Comm_dup(tMPI_Comm comm, tMPI_Comm *newcomm)
368 {
369 #ifdef TMPI_TRACE
370     tMPI_Trace_print("tMPI_Comm_dup(%p, %p)", comm, newcomm);
371 #endif
372     /* we just call Comm_split because it already contains all the
373        neccesary synchronization constructs. */
374     return tMPI_Comm_split(comm, 0, tMPI_Comm_seek_rank(comm, 
375                             tMPI_Get_current()), newcomm);
376 }
377
378
379 int tMPI_Comm_create(tMPI_Comm comm, tMPI_Group group, tMPI_Comm *newcomm)
380 {
381     int color=TMPI_UNDEFINED;
382     int key=tMPI_Comm_seek_rank(comm, tMPI_Get_current());
383
384 #ifdef TMPI_TRACE
385     tMPI_Trace_print("tMPI_Comm_create(%p, %p, %p)", comm, group, newcomm);
386 #endif
387     if (tMPI_In_group(group))
388     {
389         color=1;
390     }
391     /* the MPI specs specifically say that this is equivalent */
392     return tMPI_Comm_split(comm, color, key, newcomm);
393 }
394
395 static void tMPI_Split_colors(int N, const int *color, const int *key, 
396                               int *Ngroups, int *grp_N, int *grp_color, 
397                               int *group)
398 {
399     int i,j;
400     gmx_bool found;
401
402     /* reset groups */
403     for(i=0;i<N;i++)
404         grp_N[i]=0;
405     for(i=0;i<N;i++)
406     {
407         if (color[i] != TMPI_UNDEFINED)
408         {
409             found=FALSE;
410             for(j=0;j<(*Ngroups);j++)
411             {
412                 if (grp_color[j] == color[i])
413                 {
414                     /* we insert where we need to, by counting back */
415                     int k=grp_N[j];
416
417                     while (k>0 && ( key[group[N*j + k-1]]>key[i]) )
418                     {
419                         /* shift up */
420                         group[N*j + k]=group[N*j + k-1];
421                         k--;
422                     }
423                     group[N*j+k]=i;
424                     grp_N[j]++;
425                     found=TRUE;
426                 }
427             }
428             if (!found)
429             {
430                 /* not found. just add a new color */
431                 grp_N[(*Ngroups)]=1;
432                 grp_color[(*Ngroups)]=color[i];
433                 group[N*(*Ngroups) + 0]=i;
434                 (*Ngroups)++;
435             }
436         }
437     }
438 }
439
440 /* this is the main comm creation function. All other functions that create
441     comms use this*/
442 int tMPI_Comm_split(tMPI_Comm comm, int color, int key, tMPI_Comm *newcomm)
443 {
444     int i,j;
445     int N=tMPI_Comm_N(comm);
446     volatile tMPI_Comm *newcomm_list;
447     volatile int colors[MAX_PREALLOC_THREADS]; /* array with the colors 
448                                                   of each thread */
449     volatile int keys[MAX_PREALLOC_THREADS]; /* same for keys (only one of 
450                                                 the threads actually suplies 
451                                                 these arrays to the comm 
452                                                 structure) */
453     gmx_bool i_am_first=FALSE;
454     int myrank=tMPI_Comm_seek_rank(comm, tMPI_Get_current());
455     struct tmpi_split *spl;
456
457 #ifdef TMPI_TRACE
458     tMPI_Trace_print("tMPI_Comm_split(%p, %d, %d, %p)", comm, color, key, 
459                        newcomm);
460 #endif
461     if (!comm)
462     {
463         *newcomm=NULL;
464         return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
465     }
466
467     tMPI_Thread_mutex_lock(&(comm->comm_create_lock));    
468     /* first get the colors */
469     if (!comm->new_comm)
470     {
471         /* i am apparently  first */
472         comm->split=(struct tmpi_split*)tMPI_Malloc(sizeof(struct tmpi_split));
473         comm->new_comm=(tMPI_Comm*)tMPI_Malloc(N*sizeof(tMPI_Comm));
474         if (N<=MAX_PREALLOC_THREADS)
475         {
476             comm->split->colors=colors;
477             comm->split->keys=keys;
478         }
479         else
480         {
481             comm->split->colors=(int*)tMPI_Malloc(N*sizeof(int));
482             comm->split->keys=(int*)tMPI_Malloc(N*sizeof(int));
483         }
484         comm->split->Ncol_init=tMPI_Comm_N(comm); 
485         comm->split->can_finish=FALSE;
486         i_am_first=TRUE;
487         /* the main communicator contains a list the size of grp.N */
488     }
489     newcomm_list=comm->new_comm; /* we copy it to the local stacks because
490                                     we can later erase comm->new_comm safely */
491     spl=comm->split; /* we do the same for spl */
492     spl->colors[myrank] = color;
493     spl->keys[myrank] = key;
494     spl->Ncol_init--;
495
496     if (spl->Ncol_init == 0)
497         tMPI_Thread_cond_signal(&(comm->comm_create_prep));
498
499     if (!i_am_first)
500     {
501         /* all other threads can just wait until the creator thread is 
502            finished */
503         while(! spl->can_finish )
504         {
505             tMPI_Thread_cond_wait(&(comm->comm_create_finish) ,
506                                   &(comm->comm_create_lock) );
507         }
508     }
509     else
510     {
511         int Ncomms=0;
512         int comm_color_[MAX_PREALLOC_THREADS]; 
513         int comm_N_[MAX_PREALLOC_THREADS]; 
514         int *comm_color=comm_color_; /* there can't be more comms than N*/
515         int *comm_N=comm_N_; /* the number of procs in a group */
516
517         int *comm_groups; /* the groups */
518         tMPI_Comm *comms; /* the communicators */
519
520         /* wait for the colors to be done */
521         /*if (N>1)*/
522         while(spl->Ncol_init > 0)
523         {
524             tMPI_Thread_cond_wait(&(comm->comm_create_prep), 
525                                   &(comm->comm_create_lock));
526         }
527
528         /* reset the state so that a new comm creating function can run */
529         spl->Ncol_destroy=N;
530         comm->new_comm=0;
531         comm->split=0;
532
533         comm_groups=(int*)tMPI_Malloc(N*N*sizeof(int));
534         if (N>MAX_PREALLOC_THREADS)
535         {
536             comm_color=(int*)tMPI_Malloc(N*sizeof(int));
537             comm_N=(int*)tMPI_Malloc(N*sizeof(int));
538         }
539
540         /* count colors, allocate and split up communicators */
541         tMPI_Split_colors(N, (int*)spl->colors, 
542                              (int*)spl->keys, 
543                              &Ncomms, 
544                              comm_N, comm_color, comm_groups);
545
546
547         /* allocate a bunch of communicators */
548         comms=(tMPI_Comm*)tMPI_Malloc(Ncomms*sizeof(tMPI_Comm));
549         for(i=0;i<Ncomms;i++)
550             comms[i]=tMPI_Comm_alloc(comm, comm_N[i]);
551
552         /* now distribute the comms */
553         for(i=0;i<Ncomms;i++)
554         {
555             comms[i]->grp.N=comm_N[i];
556             for(j=0;j<comm_N[i];j++)
557                 comms[i]->grp.peers[j]=
558                     comm->grp.peers[comm_groups[i*comm->grp.N + j]];
559         }
560         /* and put them into the newcomm_list */
561         for(i=0;i<N;i++)
562         {
563             newcomm_list[i]=TMPI_COMM_NULL;
564             for(j=0;j<Ncomms;j++)
565             {
566                 if (spl->colors[i] == comm_color[j])
567                 {
568                     newcomm_list[i] = comms[j];
569                     break;
570                 }
571             }
572         }
573
574 #ifdef TMPI_DEBUG
575         /* output */
576         for(i=0;i<Ncomms;i++)
577         {
578             printf("Group %d (color %d) has %d members: ",
579                     i, comm_color[i], comm_N[i]);
580             for(j=0;j<comm_N[i];j++)
581                 printf(" %d ",comm_groups[comm->grp.N*i + j]);
582
583             printf(" rank: ");
584             for(j=0;j<comm_N[i];j++)
585                 printf(" %d ",spl->keys[comm_groups[N*i + j]]);
586             printf(" color: ");
587             for(j=0;j<comm_N[i];j++)
588                 printf(" %d ",spl->colors[comm_groups[N*i + j]]);
589             printf("\n");
590         }
591 #endif
592         if (N>MAX_PREALLOC_THREADS)
593         {
594             free((int*)spl->colors);
595             free((int*)spl->keys);
596             free(comm_color);
597             free(comm_N);
598         }
599         free(comm_groups);
600         free(comms);
601         spl->can_finish=TRUE;
602
603         /* tell the waiting threads that there's a comm ready */
604         tMPI_Thread_cond_broadcast(&(comm->comm_create_finish));
605     }
606     /* here the individual threads get their comm object */
607     *newcomm=newcomm_list[myrank];
608
609     /* free when we have assigned them all, so we can reuse the object*/
610     spl->Ncol_destroy--;
611     if (spl->Ncol_destroy==0)
612     {
613         free((void*)newcomm_list);
614         free(spl);
615     }
616
617     tMPI_Thread_mutex_unlock(&(comm->comm_create_lock));
618
619     return TMPI_SUCCESS;    
620 }
621
622 int tMPI_Comm_seek_rank(tMPI_Comm comm, struct tmpi_thread *th)
623 {
624     int i;
625     if (!comm)
626         return -1;
627
628     for(i=0;i<comm->grp.N;i++)
629     {
630         if (comm->grp.peers[i] == th)
631             return i;
632     }
633     return -1;
634 }
635
636
637