Merge release-4-6 into master
[alexxy/gromacs.git] / src / gromacs / mdlib / domdec_con.c
1 /* -*- mode: c; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; c-file-style: "stroustrup"; -*-
2  *
3  * 
4  * This file is part of Gromacs        Copyright (c) 1991-2008
5  * David van der Spoel, Erik Lindahl, Berk Hess, University of Groningen.
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License
9  * as published by the Free Software Foundation; either version 2
10  * of the License, or (at your option) any later version.
11  *
12  * To help us fund GROMACS development, we humbly ask that you cite
13  * the research papers on the package. Check out http://www.gromacs.org
14  * 
15  * And Hey:
16  * Gnomes, ROck Monsters And Chili Sauce
17  */
18
19 #ifdef HAVE_CONFIG_H
20 #include <config.h>
21 #endif
22 #include <assert.h>
23
24 #include "smalloc.h"
25 #include "vec.h"
26 #include "constr.h"
27 #include "domdec.h"
28 #include "domdec_network.h"
29 #include "mtop_util.h"
30 #include "gmx_ga2la.h"
31 #include "gmx_hash.h"
32 #include "gmx_omp_nthreads.h"
33 #include "macros.h"
34
35 typedef struct {
36     int nsend;
37     int *a;
38     int a_nalloc;
39     int nrecv;
40 } gmx_specatsend_t;
41
42 typedef struct {
43     int *ind;
44     int nalloc;
45     int n;
46 } ind_req_t;
47
48 typedef struct gmx_domdec_specat_comm {
49     /* The number of indices to receive during the setup */
50     int  nreq[DIM][2][2];
51     /* The atoms to send */
52     gmx_specatsend_t spas[DIM][2];
53     gmx_bool *bSendAtom;
54     int   bSendAtom_nalloc;
55     /* Send buffers */
56     int  *ibuf;
57     int  ibuf_nalloc;
58     rvec *vbuf;
59     int  vbuf_nalloc;
60     rvec *vbuf2;
61     int  vbuf2_nalloc;
62     /* The range in the local buffer(s) for received atoms */
63     int  at_start;
64     int  at_end;
65
66     /* The atom indices we need from the surrounding cells.
67      * We can gather the indices over nthread threads.
68      */
69     int nthread;
70     ind_req_t *ireq;
71 } gmx_domdec_specat_comm_t;
72
73 typedef struct gmx_domdec_constraints {
74     int  *molb_con_offset;
75     int  *molb_ncon_mol;
76     /* The fully local and connected constraints */
77     int  ncon;
78     /* The global constraint number, only required for clearing gc_req */
79     int  *con_gl;
80     int  *con_nlocat;
81     int  con_nalloc;
82     /* Boolean that tells if a global constraint index has been requested */
83     char *gc_req;
84     /* Global to local communicated constraint atom only index */
85     gmx_hash_t ga2la;
86
87     /* Multi-threading stuff */
88     int nthread;
89     t_ilist *ils;
90 } gmx_domdec_constraints_t;
91
92
93 static void dd_move_f_specat(gmx_domdec_t *dd,gmx_domdec_specat_comm_t *spac,
94                              rvec *f,rvec *fshift)
95 {
96     gmx_specatsend_t *spas;
97     rvec *vbuf;
98     int  n,n0,n1,d,dim,dir,i;
99     ivec vis;
100     int  is;
101     gmx_bool bPBC,bScrew;
102     
103     n = spac->at_end;
104     for(d=dd->ndim-1; d>=0; d--)
105     {
106         dim = dd->dim[d];
107         if (dd->nc[dim] > 2)
108         {
109             /* Pulse the grid forward and backward */
110             spas = spac->spas[d];
111             n0 = spas[0].nrecv;
112             n1 = spas[1].nrecv;
113             n -= n1 + n0;
114             vbuf = spac->vbuf;
115             /* Send and receive the coordinates */
116             dd_sendrecv2_rvec(dd,d,
117                               f+n+n1,n0,vbuf              ,spas[0].nsend,
118                               f+n   ,n1,vbuf+spas[0].nsend,spas[1].nsend);
119             for(dir=0; dir<2; dir++)
120             {
121                 bPBC   = ((dir == 0 && dd->ci[dim] == 0) || 
122                           (dir == 1 && dd->ci[dim] == dd->nc[dim]-1));
123                 bScrew = (bPBC && dd->bScrewPBC && dim == XX);
124                 
125                 spas = &spac->spas[d][dir];
126                 /* Sum the buffer into the required forces */
127                 if (!bPBC || (!bScrew && fshift == NULL))
128                 {
129                     for(i=0; i<spas->nsend; i++)
130                     {
131                         rvec_inc(f[spas->a[i]],*vbuf);
132                         vbuf++;
133                     }
134                 }
135                 else
136                 {
137                     clear_ivec(vis);
138                     vis[dim] = (dir==0 ? 1 : -1);
139                     is = IVEC2IS(vis);
140                     if (!bScrew)
141                     {
142                         /* Sum and add to shift forces */
143                         for(i=0; i<spas->nsend; i++)
144                         {
145                             rvec_inc(f[spas->a[i]],*vbuf);
146                             rvec_inc(fshift[is],*vbuf);
147                             vbuf++;
148                         }
149                     }
150                     else
151                     {       
152                         /* Rotate the forces */
153                         for(i=0; i<spas->nsend; i++)
154                         {
155                             f[spas->a[i]][XX] += (*vbuf)[XX];
156                             f[spas->a[i]][YY] -= (*vbuf)[YY];
157                             f[spas->a[i]][ZZ] -= (*vbuf)[ZZ];
158                             if (fshift)
159                             {
160                                 rvec_inc(fshift[is],*vbuf);
161                             }
162                             vbuf++;
163                         }
164                     }
165                 }
166             }
167         }
168         else
169         {
170             /* Two cells, so we only need to communicate one way */
171             spas = &spac->spas[d][0];
172             n -= spas->nrecv;
173             /* Send and receive the coordinates */
174             dd_sendrecv_rvec(dd,d,dddirForward,
175                              f+n,spas->nrecv,spac->vbuf,spas->nsend);
176             /* Sum the buffer into the required forces */
177             if (dd->bScrewPBC && dim == XX &&
178                 (dd->ci[dim] == 0 ||
179                  dd->ci[dim] == dd->nc[dim]-1))
180             {
181                 for(i=0; i<spas->nsend; i++)
182                 {
183                     /* Rotate the force */
184                     f[spas->a[i]][XX] += spac->vbuf[i][XX];
185                     f[spas->a[i]][YY] -= spac->vbuf[i][YY];
186                     f[spas->a[i]][ZZ] -= spac->vbuf[i][ZZ];
187                 }
188             }
189             else
190             {
191                 for(i=0; i<spas->nsend; i++)
192                 {
193                     rvec_inc(f[spas->a[i]],spac->vbuf[i]);
194                 }
195             }
196         }
197     }
198 }
199
200 void dd_move_f_vsites(gmx_domdec_t *dd,rvec *f,rvec *fshift)
201 {
202     if (dd->vsite_comm)
203     {
204         dd_move_f_specat(dd,dd->vsite_comm,f,fshift);
205     }
206 }
207
208 void dd_clear_f_vsites(gmx_domdec_t *dd,rvec *f)
209 {
210     int i;
211     
212     if (dd->vsite_comm)
213     {
214         for(i=dd->vsite_comm->at_start; i<dd->vsite_comm->at_end; i++)
215         {
216             clear_rvec(f[i]);
217         }
218     }
219 }
220
221 static void dd_move_x_specat(gmx_domdec_t *dd,gmx_domdec_specat_comm_t *spac,
222                              matrix box,rvec *x0,rvec *x1)
223 {
224     gmx_specatsend_t *spas;
225     rvec *x,*vbuf,*rbuf;
226     int  nvec,v,n,nn,ns0,ns1,nr0,nr1,nr,d,dim,dir,i;
227     gmx_bool bPBC,bScrew=FALSE;
228     rvec shift={0,0,0};
229     
230     nvec = 1;
231     if (x1)
232     {
233         nvec++;
234     }
235     
236     n = spac->at_start;
237     for(d=0; d<dd->ndim; d++)
238     {
239         dim = dd->dim[d];
240         if (dd->nc[dim] > 2)
241         {
242             /* Pulse the grid forward and backward */
243             vbuf = spac->vbuf;
244             for(dir=0; dir<2; dir++)
245             {
246                 if (dir == 0 && dd->ci[dim] == 0)
247                 {
248                     bPBC   = TRUE;
249                     bScrew = (dd->bScrewPBC && dim == XX);
250                     copy_rvec(box[dim],shift);
251                 }
252                 else if (dir == 1 && dd->ci[dim] == dd->nc[dim]-1)
253                 {
254                     bPBC = TRUE;
255                     bScrew = (dd->bScrewPBC && dim == XX);
256                     for(i=0; i<DIM; i++)
257                     {
258                         shift[i] = -box[dim][i];
259                     }
260                 }
261                 else
262                 {
263                     bPBC = FALSE;
264                     bScrew = FALSE;
265                 }
266                 spas = &spac->spas[d][dir];
267                 for(v=0; v<nvec; v++)
268                 {
269                     x = (v == 0 ? x0 : x1);
270                     /* Copy the required coordinates to the send buffer */
271                     if (!bPBC)
272                     {
273                         /* Only copy */
274                         for(i=0; i<spas->nsend; i++)
275                         {
276                             copy_rvec(x[spas->a[i]],*vbuf);
277                             vbuf++;
278                         }
279                     }
280                     else if (!bScrew)
281                     {
282                         /* Shift coordinates */
283                         for(i=0; i<spas->nsend; i++)
284                         {
285                             rvec_add(x[spas->a[i]],shift,*vbuf);
286                             vbuf++;
287                         }
288                     }
289                     else
290                     {
291                         /* Shift and rotate coordinates */
292                         for(i=0; i<spas->nsend; i++)
293                         {
294                             (*vbuf)[XX] =               x[spas->a[i]][XX] + shift[XX];
295                             (*vbuf)[YY] = box[YY][YY] - x[spas->a[i]][YY] + shift[YY];
296                             (*vbuf)[ZZ] = box[ZZ][ZZ] - x[spas->a[i]][ZZ] + shift[ZZ];
297                             vbuf++;
298                         }
299                     }
300                 }
301             }
302             /* Send and receive the coordinates */
303             spas = spac->spas[d];
304             ns0 = spas[0].nsend;
305             nr0 = spas[0].nrecv;
306             ns1 = spas[1].nsend;
307             nr1 = spas[1].nrecv;
308             if (nvec == 1)
309             {
310                 dd_sendrecv2_rvec(dd,d,
311                                   spac->vbuf+ns0,ns1,x0+n    ,nr1,
312                                   spac->vbuf    ,ns0,x0+n+nr1,nr0);
313             }
314             else
315             {
316                 /* Communicate both vectors in one buffer */
317                 rbuf = spac->vbuf2;
318                 dd_sendrecv2_rvec(dd,d,
319                                   spac->vbuf+2*ns0,2*ns1,rbuf      ,2*nr1,
320                                   spac->vbuf      ,2*ns0,rbuf+2*nr1,2*nr0);
321                 /* Split the buffer into the two vectors */
322                 nn = n;
323                 for(dir=1; dir>=0; dir--)
324                 {
325                     nr = spas[dir].nrecv;
326                     for(v=0; v<2; v++)
327                     {
328                         x = (v == 0 ? x0 : x1);
329                         for(i=0; i<nr; i++)
330                         {
331                             copy_rvec(*rbuf,x[nn+i]);
332                             rbuf++;
333                         }
334                     }
335                     nn += nr;
336                 }
337             }
338             n += nr0 + nr1;
339         }
340         else
341         {
342             spas = &spac->spas[d][0];
343             /* Copy the required coordinates to the send buffer */
344             vbuf = spac->vbuf;
345             for(v=0; v<nvec; v++)
346             {
347                 x = (v == 0 ? x0 : x1);
348                 if (dd->bScrewPBC && dim == XX &&
349                     (dd->ci[XX] == 0 || dd->ci[XX] == dd->nc[XX]-1))
350                 {
351                     /* Here we only perform the rotation, the rest of the pbc
352                      * is handled in the constraint or viste routines.
353                      */
354                     for(i=0; i<spas->nsend; i++)
355                     {
356                         (*vbuf)[XX] =               x[spas->a[i]][XX];
357                         (*vbuf)[YY] = box[YY][YY] - x[spas->a[i]][YY];
358                         (*vbuf)[ZZ] = box[ZZ][ZZ] - x[spas->a[i]][ZZ];
359                         vbuf++;
360                     }     
361                 }
362                 else
363                 {
364                     for(i=0; i<spas->nsend; i++)
365                     {
366                         copy_rvec(x[spas->a[i]],*vbuf);
367                         vbuf++;
368                     }
369                 }
370             }
371             /* Send and receive the coordinates */
372             if (nvec == 1)
373             {
374                 dd_sendrecv_rvec(dd,d,dddirBackward,
375                                  spac->vbuf,spas->nsend,x0+n,spas->nrecv);
376             }
377             else
378             {
379                 /* Communicate both vectors in one buffer */
380                 rbuf = spac->vbuf2;
381                 dd_sendrecv_rvec(dd,d,dddirBackward,
382                                  spac->vbuf,2*spas->nsend,rbuf,2*spas->nrecv);
383                 /* Split the buffer into the two vectors */
384                 nr = spas[0].nrecv;
385                 for(v=0; v<2; v++)
386                 {
387                     x = (v == 0 ? x0 : x1);
388                     for(i=0; i<nr; i++)
389                     {
390                         copy_rvec(*rbuf,x[n+i]);
391                         rbuf++;
392                     }
393                 }
394             }
395             n += spas->nrecv;
396         }
397     }
398 }
399
400 void dd_move_x_constraints(gmx_domdec_t *dd,matrix box,rvec *x0,rvec *x1)
401 {
402     if (dd->constraint_comm)
403     {
404         dd_move_x_specat(dd,dd->constraint_comm,box,x0,x1);
405     }
406 }
407
408 void dd_move_x_vsites(gmx_domdec_t *dd,matrix box,rvec *x)
409 {
410     if (dd->vsite_comm)
411     {
412         dd_move_x_specat(dd,dd->vsite_comm,box,x,NULL);
413     }
414 }
415
416 int *dd_constraints_nlocalatoms(gmx_domdec_t *dd)
417 {
418     if (dd->constraints)
419     {
420         return dd->constraints->con_nlocat;
421     }
422     else
423     {
424         return NULL;
425     }
426 }
427
428 void dd_clear_local_constraint_indices(gmx_domdec_t *dd)
429 {
430     gmx_domdec_constraints_t *dc;
431     int i;
432     
433     dc = dd->constraints;
434     
435     for(i=0; i<dc->ncon; i++)
436     {
437         dc->gc_req[dc->con_gl[i]] = 0;
438     }
439   
440     if (dd->constraint_comm)
441     {
442         gmx_hash_clear_and_optimize(dc->ga2la);
443     }
444 }
445
446 void dd_clear_local_vsite_indices(gmx_domdec_t *dd)
447 {
448     int i;
449     
450     if (dd->vsite_comm)
451     {
452         gmx_hash_clear_and_optimize(dd->ga2la_vsite);
453     }
454 }
455
456 static int setup_specat_communication(gmx_domdec_t *dd,
457                                       ind_req_t *ireq,
458                                       gmx_domdec_specat_comm_t *spac,
459                                       gmx_hash_t ga2la_specat,
460                                       int at_start,
461                                       int vbuf_fac,
462                                       const char *specat_type,
463                                       const char *add_err)
464 {
465     int  nsend[2],nlast,nsend_zero[2]={0,0},*nsend_ptr;
466     int  d,dim,ndir,dir,nr,ns,i,nrecv_local,n0,start,indr,ind,buf[2];
467     int  nat_tot_specat,nat_tot_prev,nalloc_old;
468     gmx_bool bPBC,bFirst;
469     gmx_specatsend_t *spas;
470     
471     if (debug)
472     {
473         fprintf(debug,"Begin setup_specat_communication for %s\n",specat_type);
474     }
475     
476     /* nsend[0]: the number of atoms requested by this node only,
477      *           we communicate this for more efficients checks
478      * nsend[1]: the total number of requested atoms
479      */
480     nsend[0] = ireq->n;
481     nsend[1] = nsend[0];
482     nlast    = nsend[1];
483     for(d=dd->ndim-1; d>=0; d--)
484     {
485         /* Pulse the grid forward and backward */
486         dim = dd->dim[d];
487         bPBC = (dim < dd->npbcdim);
488         if (dd->nc[dim] == 2)
489         {
490             /* Only 2 cells, so we only need to communicate once */
491             ndir = 1;
492         }
493         else
494         {
495             ndir = 2;
496         }
497         for(dir=0; dir<ndir; dir++)
498         {
499             if (!bPBC && 
500                 dd->nc[dim] > 2 &&
501                 ((dir == 0 && dd->ci[dim] == dd->nc[dim] - 1) ||
502                  (dir == 1 && dd->ci[dim] == 0)))
503             {
504                 /* No pbc: the fist/last cell should not request atoms */
505                 nsend_ptr = nsend_zero;
506             }
507             else
508             {
509                 nsend_ptr = nsend;
510             }
511             /* Communicate the number of indices */
512             dd_sendrecv_int(dd,d,dir==0 ? dddirForward : dddirBackward,
513                             nsend_ptr,2,spac->nreq[d][dir],2);
514             nr = spac->nreq[d][dir][1];
515             if (nlast+nr > ireq->nalloc)
516             {
517                 ireq->nalloc = over_alloc_dd(nlast+nr);
518                 srenew(ireq->ind,ireq->nalloc);
519             }
520             /* Communicate the indices */
521             dd_sendrecv_int(dd,d,dir==0 ? dddirForward : dddirBackward,
522                             ireq->ind,nsend_ptr[1],ireq->ind+nlast,nr);
523             nlast += nr;
524         }
525         nsend[1] = nlast;
526     }
527     if (debug)
528     {
529         fprintf(debug,"Communicated the counts\n");
530     }
531     
532     /* Search for the requested atoms and communicate the indices we have */
533     nat_tot_specat = at_start;
534     nrecv_local = 0;
535     for(d=0; d<dd->ndim; d++)
536     {
537         bFirst = (d == 0);
538         /* Pulse the grid forward and backward */
539         if (dd->dim[d] >= dd->npbcdim || dd->nc[dd->dim[d]] > 2)
540         {
541             ndir = 2;
542         }
543         else
544         {
545             ndir = 1;
546         }
547         nat_tot_prev = nat_tot_specat;
548         for(dir=ndir-1; dir>=0; dir--)
549         {
550             if (nat_tot_specat > spac->bSendAtom_nalloc)
551             {
552                 nalloc_old = spac->bSendAtom_nalloc;
553                 spac->bSendAtom_nalloc = over_alloc_dd(nat_tot_specat);
554                 srenew(spac->bSendAtom,spac->bSendAtom_nalloc);
555                 for(i=nalloc_old; i<spac->bSendAtom_nalloc; i++)
556                 {
557                     spac->bSendAtom[i] = FALSE;
558                 }
559             }
560             spas = &spac->spas[d][dir];
561             n0 = spac->nreq[d][dir][0];
562             nr = spac->nreq[d][dir][1];
563             if (debug)
564             {
565                 fprintf(debug,"dim=%d, dir=%d, searching for %d atoms\n",
566                         d,dir,nr);
567             }
568             start = nlast - nr;
569             spas->nsend = 0;
570             nsend[0] = 0;
571             for(i=0; i<nr; i++)
572             {
573                 indr = ireq->ind[start+i];
574                 ind = -1;
575                 /* Check if this is a home atom and if so ind will be set */
576                 if (!ga2la_get_home(dd->ga2la,indr,&ind))
577                 {
578                     /* Search in the communicated atoms */
579                     ind = gmx_hash_get_minone(ga2la_specat,indr);
580                 }
581                 if (ind >= 0)
582                 {
583                     if (i < n0 || !spac->bSendAtom[ind])
584                     {
585                         if (spas->nsend+1 > spas->a_nalloc)
586                         {
587                             spas->a_nalloc = over_alloc_large(spas->nsend+1);
588                             srenew(spas->a,spas->a_nalloc);
589                         }
590                         /* Store the local index so we know which coordinates
591                          * to send out later.
592                          */
593                         spas->a[spas->nsend] = ind;
594                         spac->bSendAtom[ind] = TRUE;
595                         if (spas->nsend+1 > spac->ibuf_nalloc)
596                         {
597                             spac->ibuf_nalloc = over_alloc_large(spas->nsend+1);
598                             srenew(spac->ibuf,spac->ibuf_nalloc);
599                         }
600                         /* Store the global index so we can send it now */
601                         spac->ibuf[spas->nsend] = indr;
602                         if (i < n0)
603                         {
604                             nsend[0]++;
605                         }
606                         spas->nsend++;
607                     }
608                 }
609             }
610             nlast = start;
611             /* Clear the local flags */
612             for(i=0; i<spas->nsend; i++)
613             {
614                 spac->bSendAtom[spas->a[i]] = FALSE;
615             }
616             /* Send and receive the number of indices to communicate */
617             nsend[1] = spas->nsend;
618             dd_sendrecv_int(dd,d,dir==0 ? dddirBackward : dddirForward,
619                             nsend,2,buf,2);
620             if (debug)
621             {
622                 fprintf(debug,"Send to node %d, %d (%d) indices, "
623                         "receive from node %d, %d (%d) indices\n",
624                         dd->neighbor[d][1-dir],nsend[1],nsend[0],
625                         dd->neighbor[d][dir],buf[1],buf[0]);
626                 if (gmx_debug_at)
627                 {
628                     for(i=0; i<spas->nsend; i++)
629                     {
630                         fprintf(debug," %d",spac->ibuf[i]+1);
631                     }
632                     fprintf(debug,"\n");
633                 }
634             }
635             nrecv_local += buf[0];
636             spas->nrecv  = buf[1];
637             if (nat_tot_specat + spas->nrecv > dd->gatindex_nalloc)
638             {
639                 dd->gatindex_nalloc =
640                     over_alloc_dd(nat_tot_specat + spas->nrecv);
641                 srenew(dd->gatindex,dd->gatindex_nalloc);
642             }
643             /* Send and receive the indices */
644             dd_sendrecv_int(dd,d,dir==0 ? dddirBackward : dddirForward,
645                             spac->ibuf,spas->nsend,
646                             dd->gatindex+nat_tot_specat,spas->nrecv);
647             nat_tot_specat += spas->nrecv;
648         }
649         
650         /* Allocate the x/f communication buffers */
651         ns = spac->spas[d][0].nsend;
652         nr = spac->spas[d][0].nrecv;
653         if (ndir == 2)
654         {
655             ns += spac->spas[d][1].nsend;
656             nr += spac->spas[d][1].nrecv;
657         }
658         if (vbuf_fac*ns > spac->vbuf_nalloc)
659         {
660             spac->vbuf_nalloc = over_alloc_dd(vbuf_fac*ns);
661             srenew(spac->vbuf,spac->vbuf_nalloc);
662         }
663         if (vbuf_fac == 2 && vbuf_fac*nr > spac->vbuf2_nalloc)
664         {
665             spac->vbuf2_nalloc = over_alloc_dd(vbuf_fac*nr);
666             srenew(spac->vbuf2,spac->vbuf2_nalloc);
667         }
668         
669         /* Make a global to local index for the communication atoms */
670         for(i=nat_tot_prev; i<nat_tot_specat; i++)
671         {
672             gmx_hash_change_or_set(ga2la_specat,dd->gatindex[i],i);
673         }
674     }
675     
676     /* Check that in the end we got the number of atoms we asked for */
677     if (nrecv_local != ireq->n)
678     {
679         if (debug)
680         {
681             fprintf(debug,"Requested %d, received %d (tot recv %d)\n",
682                     ireq->n,nrecv_local,nat_tot_specat-at_start);
683             if (gmx_debug_at)
684             {
685                 for(i=0; i<ireq->n; i++)
686                 {
687                     ind = gmx_hash_get_minone(ga2la_specat,ireq->ind[i]);
688                     fprintf(debug," %s%d",
689                             (ind >= 0) ? "" : "!",
690                             ireq->ind[i]+1);
691                 }
692                 fprintf(debug,"\n");
693             }
694         }
695         fprintf(stderr,"\nDD cell %d %d %d: Neighboring cells do not have atoms:",
696                 dd->ci[XX],dd->ci[YY],dd->ci[ZZ]);
697         for(i=0; i<ireq->n; i++)
698         {
699             if (gmx_hash_get_minone(ga2la_specat,ireq->ind[i]) < 0)
700             {
701                 fprintf(stderr," %d",ireq->ind[i]+1);
702             }
703         }
704         fprintf(stderr,"\n");
705         gmx_fatal(FARGS,"DD cell %d %d %d could only obtain %d of the %d atoms that are connected via %ss from the neighboring cells. This probably means your %s lengths are too long compared to the domain decomposition cell size. Decrease the number of domain decomposition grid cells%s%s.",
706                   dd->ci[XX],dd->ci[YY],dd->ci[ZZ],
707                   nrecv_local,ireq->n,specat_type,
708                   specat_type,add_err,
709                   dd->bGridJump ? " or use the -rcon option of mdrun" : "");
710     }
711     
712     spac->at_start = at_start;
713     spac->at_end   = nat_tot_specat;
714     
715     if (debug)
716     {
717         fprintf(debug,"Done setup_specat_communication\n");
718     }
719     
720     return nat_tot_specat;
721 }
722
723 static void walk_out(int con,int con_offset,int a,int offset,int nrec,
724                      int ncon1,const t_iatom *ia1,const t_iatom *ia2,
725                      const t_blocka *at2con,
726                      const gmx_ga2la_t ga2la,gmx_bool bHomeConnect,
727                      gmx_domdec_constraints_t *dc,
728                      gmx_domdec_specat_comm_t *dcc,
729                      t_ilist *il_local,
730                      ind_req_t *ireq)
731 {
732     int a1_gl,a2_gl,a_loc,i,coni,b;
733     const t_iatom *iap;
734   
735     if (dc->gc_req[con_offset+con] == 0)
736     {
737         /* Add this non-home constraint to the list */
738         if (dc->ncon+1 > dc->con_nalloc)
739         {
740             dc->con_nalloc = over_alloc_large(dc->ncon+1);
741             srenew(dc->con_gl,dc->con_nalloc);
742             srenew(dc->con_nlocat,dc->con_nalloc);
743         }
744         dc->con_gl[dc->ncon] = con_offset + con;
745         dc->con_nlocat[dc->ncon] = (bHomeConnect ? 1 : 0);
746         dc->gc_req[con_offset+con] = 1;
747         if (il_local->nr + 3 > il_local->nalloc)
748         {
749             il_local->nalloc = over_alloc_dd(il_local->nr+3);
750             srenew(il_local->iatoms,il_local->nalloc);
751         }
752         iap = constr_iatomptr(ncon1,ia1,ia2,con);
753         il_local->iatoms[il_local->nr++] = iap[0];
754         a1_gl = offset + iap[1];
755         a2_gl = offset + iap[2];
756         /* The following indexing code can probably be optizimed */
757         if (ga2la_get_home(ga2la,a1_gl,&a_loc))
758         {
759             il_local->iatoms[il_local->nr++] = a_loc;
760         }
761         else
762         {
763             /* We set this index later */
764             il_local->iatoms[il_local->nr++] = -a1_gl - 1;
765         }
766         if (ga2la_get_home(ga2la,a2_gl,&a_loc))
767         {
768             il_local->iatoms[il_local->nr++] = a_loc;
769         }
770         else
771         {
772             /* We set this index later */
773             il_local->iatoms[il_local->nr++] = -a2_gl - 1;
774         }
775         dc->ncon++;
776     }
777     /* Check to not ask for the same atom more than once */
778     if (gmx_hash_get_minone(dc->ga2la,offset+a) == -1)
779     {
780         assert(dcc);
781         /* Add this non-home atom to the list */
782         if (ireq->n+1 > ireq->nalloc)
783         {
784             ireq->nalloc = over_alloc_large(ireq->n+1);
785             srenew(ireq->ind,ireq->nalloc);
786         }
787         ireq->ind[ireq->n++] = offset + a;
788         /* Temporarily mark with -2, we get the index later */
789         gmx_hash_set(dc->ga2la,offset+a,-2);
790     }
791     
792     if (nrec > 0)
793     {
794         for(i=at2con->index[a]; i<at2con->index[a+1]; i++)
795         {
796             coni = at2con->a[i];
797             if (coni != con)
798             {
799                 /* Walk further */
800                 iap = constr_iatomptr(ncon1,ia1,ia2,coni);
801                 if (a == iap[1])
802                 {
803                     b = iap[2];
804                 }
805                 else
806                 {
807                     b = iap[1];
808                 }
809                 if (!ga2la_get_home(ga2la,offset+b,&a_loc))
810                 {
811                     walk_out(coni,con_offset,b,offset,nrec-1,
812                              ncon1,ia1,ia2,at2con,
813                              ga2la,FALSE,dc,dcc,il_local,ireq);
814                 }
815             }
816         }
817     }
818 }
819
820 static void atoms_to_settles(gmx_domdec_t *dd,
821                              const gmx_mtop_t *mtop,
822                              const int *cginfo,
823                              const int **at2settle_mt,
824                              int cg_start,int cg_end,
825                              t_ilist *ils_local,
826                              ind_req_t *ireq)
827 {
828     gmx_ga2la_t ga2la;
829     gmx_mtop_atomlookup_t alook;
830     int settle;
831     int nral,sa;
832     int cg,a,a_gl,a_glsa,a_gls[3],a_locs[3];
833     int mb,molnr,a_mol,offset;
834     const gmx_molblock_t *molb;
835     const t_iatom *ia1;
836     gmx_bool a_home[3];
837     int nlocal;
838     gmx_bool bAssign;
839
840     ga2la  = dd->ga2la;
841
842     alook = gmx_mtop_atomlookup_settle_init(mtop);
843
844     nral = NRAL(F_SETTLE);
845
846     for(cg=cg_start; cg<cg_end; cg++)
847     {
848         if (GET_CGINFO_SETTLE(cginfo[cg]))
849         {
850             for(a=dd->cgindex[cg]; a<dd->cgindex[cg+1]; a++)
851             {
852                 a_gl = dd->gatindex[a];
853                 
854                 gmx_mtop_atomnr_to_molblock_ind(alook,a_gl,&mb,&molnr,&a_mol);
855                 molb = &mtop->molblock[mb];
856
857                 settle = at2settle_mt[molb->type][a_mol];
858
859                 if (settle >= 0)
860                 {
861                     offset = a_gl - a_mol;
862
863                     ia1 = mtop->moltype[molb->type].ilist[F_SETTLE].iatoms;
864
865                     bAssign = FALSE;
866                     nlocal = 0;
867                     for(sa=0; sa<nral; sa++)
868                     {
869                         a_glsa = offset + ia1[settle*(1+nral)+1+sa];
870                         a_gls[sa] = a_glsa;
871                         a_home[sa] = ga2la_get_home(ga2la,a_glsa,&a_locs[sa]);
872                         if (a_home[sa])
873                         {
874                             if (nlocal == 0 && a_gl == a_glsa)
875                             {
876                                 bAssign = TRUE;
877                             }
878                             nlocal++;
879                         }
880                     }
881
882                     if (bAssign)
883                     {
884                         if (ils_local->nr+1+nral > ils_local->nalloc)
885                         {
886                             ils_local->nalloc = over_alloc_dd(ils_local->nr+1+nral);
887                             srenew(ils_local->iatoms,ils_local->nalloc);
888                         }
889
890                         ils_local->iatoms[ils_local->nr++] = ia1[settle*4];
891
892                         for(sa=0; sa<nral; sa++)
893                         {
894                             if (ga2la_get_home(ga2la,a_gls[sa],&a_locs[sa]))
895                             {
896                                 ils_local->iatoms[ils_local->nr++] = a_locs[sa];
897                             }
898                             else
899                             {
900                                 ils_local->iatoms[ils_local->nr++] = -a_gls[sa] - 1;
901                                 /* Add this non-home atom to the list */
902                                 if (ireq->n+1 > ireq->nalloc)
903                                 {
904                                     ireq->nalloc = over_alloc_large(ireq->n+1);
905                                     srenew(ireq->ind,ireq->nalloc);
906                                 }
907                                 ireq->ind[ireq->n++] = a_gls[sa];
908                                 /* A check on double atom requests is
909                                  * not required for settle.
910                                  */
911                             }
912                         }
913                     }
914                 }
915             }
916         }
917     }
918
919     gmx_mtop_atomlookup_destroy(alook);
920 }
921
922 static void atoms_to_constraints(gmx_domdec_t *dd,
923                                  const gmx_mtop_t *mtop,
924                                  const int *cginfo,
925                                  const t_blocka *at2con_mt,int nrec,
926                                  t_ilist *ilc_local,
927                                  ind_req_t *ireq)
928 {
929     const t_blocka *at2con;
930     gmx_ga2la_t ga2la;
931     gmx_mtop_atomlookup_t alook;
932     int ncon1;
933     gmx_molblock_t *molb;
934     t_iatom *ia1,*ia2,*iap;
935     int nhome,cg,a,a_gl,a_mol,a_loc,b_lo,offset,mb,molnr,b_mol,i,con,con_offset;
936     gmx_domdec_constraints_t *dc;
937     gmx_domdec_specat_comm_t *dcc;
938     
939     dc  = dd->constraints;
940     dcc = dd->constraint_comm;
941     
942     ga2la  = dd->ga2la;
943
944     alook = gmx_mtop_atomlookup_init(mtop);
945
946     nhome = 0;
947     for(cg=0; cg<dd->ncg_home; cg++)
948     {
949         if (GET_CGINFO_CONSTR(cginfo[cg]))
950         {
951             for(a=dd->cgindex[cg]; a<dd->cgindex[cg+1]; a++)
952             {
953                 a_gl = dd->gatindex[a];
954         
955                 gmx_mtop_atomnr_to_molblock_ind(alook,a_gl,&mb,&molnr,&a_mol);
956                 molb = &mtop->molblock[mb];
957         
958                 ncon1 = mtop->moltype[molb->type].ilist[F_CONSTR].nr/NRAL(F_SETTLE);
959
960                 ia1 = mtop->moltype[molb->type].ilist[F_CONSTR].iatoms;
961                 ia2 = mtop->moltype[molb->type].ilist[F_CONSTRNC].iatoms;
962
963                 /* Calculate the global constraint number offset for the molecule.
964                  * This is only required for the global index to make sure
965                  * that we use each constraint only once.
966                  */
967                 con_offset =
968                     dc->molb_con_offset[mb] + molnr*dc->molb_ncon_mol[mb];
969             
970                 /* The global atom number offset for this molecule */
971                 offset = a_gl - a_mol;
972                 at2con = &at2con_mt[molb->type];
973                 for(i=at2con->index[a_mol]; i<at2con->index[a_mol+1]; i++)
974                 {
975                     con = at2con->a[i];
976                     iap = constr_iatomptr(ncon1,ia1,ia2,con);
977                     if (a_mol == iap[1])
978                     {
979                         b_mol = iap[2];
980                     }
981                     else
982                     {
983                         b_mol = iap[1];
984                     }
985                     if (ga2la_get_home(ga2la,offset+b_mol,&a_loc))
986                     {
987                         /* Add this fully home constraint at the first atom */
988                         if (a_mol < b_mol)
989                         {
990                             if (dc->ncon+1 > dc->con_nalloc)
991                             {
992                                 dc->con_nalloc = over_alloc_large(dc->ncon+1);
993                                 srenew(dc->con_gl,dc->con_nalloc);
994                                 srenew(dc->con_nlocat,dc->con_nalloc);
995                             }
996                             dc->con_gl[dc->ncon] = con_offset + con;
997                             dc->con_nlocat[dc->ncon] = 2;
998                             if (ilc_local->nr + 3 > ilc_local->nalloc)
999                             {
1000                                 ilc_local->nalloc = over_alloc_dd(ilc_local->nr + 3);
1001                                 srenew(ilc_local->iatoms,ilc_local->nalloc);
1002                             }
1003                             b_lo = a_loc;
1004                             ilc_local->iatoms[ilc_local->nr++] = iap[0];
1005                             ilc_local->iatoms[ilc_local->nr++] = (a_gl == iap[1] ? a    : b_lo);
1006                             ilc_local->iatoms[ilc_local->nr++] = (a_gl == iap[1] ? b_lo : a   );
1007                             dc->ncon++;
1008                             nhome++;
1009                         }
1010                     }
1011                     else
1012                     {
1013                         /* We need the nrec constraints coupled to this constraint,
1014                          * so we need to walk out of the home cell by nrec+1 atoms,
1015                          * since already atom bg is not locally present.
1016                          * Therefore we call walk_out with nrec recursions to go
1017                          * after this first call.
1018                          */
1019                         walk_out(con,con_offset,b_mol,offset,nrec,
1020                                  ncon1,ia1,ia2,at2con,
1021                                  dd->ga2la,TRUE,dc,dcc,ilc_local,ireq);
1022                     }
1023                 }
1024             }
1025         }
1026     }
1027
1028     gmx_mtop_atomlookup_destroy(alook);
1029
1030     if (debug)
1031     {
1032         fprintf(debug,
1033                 "Constraints: home %3d border %3d atoms: %3d\n",
1034                 nhome,dc->ncon-nhome,
1035                 dd->constraint_comm ? ireq->n : 0);
1036     }
1037 }
1038
1039 int dd_make_local_constraints(gmx_domdec_t *dd,int at_start,
1040                               const gmx_mtop_t *mtop,
1041                               const int *cginfo,
1042                               gmx_constr_t constr,int nrec,
1043                               t_ilist *il_local)
1044 {
1045     gmx_domdec_constraints_t *dc;
1046     t_ilist *ilc_local,*ils_local;
1047     ind_req_t *ireq;
1048     const t_blocka *at2con_mt;
1049     const int **at2settle_mt;
1050     gmx_hash_t ga2la_specat;
1051     int at_end,i,j;
1052     t_iatom *iap;
1053     
1054     dc = dd->constraints;
1055     
1056     ilc_local = &il_local[F_CONSTR];
1057     ils_local = &il_local[F_SETTLE];
1058
1059     dc->ncon      = 0;
1060     ilc_local->nr = 0;
1061     if (dd->constraint_comm)
1062     {
1063         at2con_mt = atom2constraints_moltype(constr);
1064         ireq = &dd->constraint_comm->ireq[0];
1065         ireq->n = 0;
1066     }
1067     else
1068     {
1069         at2con_mt = NULL;
1070         ireq = NULL;
1071     }
1072
1073     if (dd->bInterCGsettles)
1074     {
1075         at2settle_mt = atom2settle_moltype(constr);
1076         ils_local->nr = 0;
1077     }
1078     else
1079     {
1080         /* Settle works inside charge groups, we assigned them already */
1081         at2settle_mt = NULL;
1082     }
1083
1084     if (at2settle_mt == NULL)
1085     {
1086         atoms_to_constraints(dd,mtop,cginfo,at2con_mt,nrec,
1087                              ilc_local,ireq);
1088     }
1089     else
1090     {
1091         int t0_set;
1092         int thread;
1093
1094         /* Do the constraints, if present, on the first thread.
1095          * Do the settles on all other threads.
1096          */
1097         t0_set = ((at2con_mt != NULL && dc->nthread > 1) ? 1 : 0);
1098
1099 #pragma omp parallel for num_threads(dc->nthread) schedule(static)
1100         for(thread=0; thread<dc->nthread; thread++)
1101         {
1102             if (at2con_mt && thread == 0)
1103             {
1104                 atoms_to_constraints(dd,mtop,cginfo,at2con_mt,nrec,
1105                                      ilc_local,ireq);
1106             }
1107
1108             if (thread >= t0_set)
1109             {
1110                 int cg0,cg1;
1111                 t_ilist *ilst;
1112                 ind_req_t *ireqt;
1113
1114                 /* Distribute the settle check+assignments over
1115                  * dc->nthread or dc->nthread-1 threads.
1116                  */
1117                 cg0 = (dd->ncg_home*(thread-t0_set  ))/(dc->nthread-t0_set);
1118                 cg1 = (dd->ncg_home*(thread-t0_set+1))/(dc->nthread-t0_set);
1119
1120                 if (thread == t0_set)
1121                 {
1122                     ilst = ils_local;
1123                 }
1124                 else
1125                 {
1126                     ilst = &dc->ils[thread];
1127                 }
1128                 ilst->nr = 0;
1129
1130                 ireqt = &dd->constraint_comm->ireq[thread];
1131                 if (thread > 0)
1132                 {
1133                     ireqt->n = 0;
1134                 }
1135
1136                 atoms_to_settles(dd,mtop,cginfo,at2settle_mt,
1137                                  cg0,cg1,
1138                                  ilst,ireqt);
1139             }
1140         }
1141
1142         /* Combine the generate settles and requested indices */
1143         for(thread=1; thread<dc->nthread; thread++)
1144         {
1145             t_ilist *ilst;
1146             ind_req_t *ireqt;
1147             int ia;
1148
1149             if (thread > t0_set)
1150             {
1151                 ilst = &dc->ils[thread];
1152                 if (ils_local->nr + ilst->nr > ils_local->nalloc)
1153                 {
1154                     ils_local->nalloc = over_alloc_large(ils_local->nr + ilst->nr);
1155                     srenew(ils_local->iatoms,ils_local->nalloc);
1156                 }
1157                 for(ia=0; ia<ilst->nr; ia++)
1158                 {
1159                     ils_local->iatoms[ils_local->nr+ia] = ilst->iatoms[ia];
1160                 }
1161                 ils_local->nr += ilst->nr;
1162             }
1163
1164             ireqt = &dd->constraint_comm->ireq[thread];
1165             if (ireq->n+ireqt->n > ireq->nalloc)
1166             {
1167                 ireq->nalloc = over_alloc_large(ireq->n+ireqt->n);
1168                 srenew(ireq->ind,ireq->nalloc);
1169             }
1170             for(ia=0; ia<ireqt->n; ia++)
1171             {
1172                 ireq->ind[ireq->n+ia] = ireqt->ind[ia];
1173             }
1174             ireq->n += ireqt->n;
1175         }
1176
1177         if (debug)
1178         {
1179             fprintf(debug,"Settles: total %3d\n",ils_local->nr/4);
1180         }
1181     }
1182
1183     if (dd->constraint_comm) {
1184         int nral1;
1185
1186         at_end =
1187             setup_specat_communication(dd,ireq,dd->constraint_comm,
1188                                        dd->constraints->ga2la,
1189                                        at_start,2,
1190                                        "constraint"," or lincs-order");
1191         
1192         /* Fill in the missing indices */
1193         ga2la_specat = dd->constraints->ga2la;
1194
1195         nral1 = 1 + NRAL(F_CONSTR);
1196         for(i=0; i<ilc_local->nr; i+=nral1)
1197         {
1198             iap = ilc_local->iatoms + i;
1199             for(j=1; j<nral1; j++)
1200             {
1201                 if (iap[j] < 0)
1202                 {
1203                     iap[j] = gmx_hash_get_minone(ga2la_specat,-iap[j]-1);
1204                 }
1205             }
1206         }
1207
1208         nral1 = 1 + NRAL(F_SETTLE);
1209         for(i=0; i<ils_local->nr; i+=nral1)
1210         {
1211             iap = ils_local->iatoms + i;
1212             for(j=1; j<nral1; j++)
1213             {
1214                 if (iap[j] < 0)
1215                 {
1216                     iap[j] = gmx_hash_get_minone(ga2la_specat,-iap[j]-1);
1217                 }
1218             }
1219         }
1220     }
1221     else
1222     {
1223         at_end = at_start;
1224     }
1225     
1226     return at_end;
1227 }
1228
1229 int dd_make_local_vsites(gmx_domdec_t *dd,int at_start,t_ilist *lil)
1230 {
1231     gmx_domdec_specat_comm_t *spac;
1232     ind_req_t *ireq;
1233     gmx_hash_t ga2la_specat;
1234     int  ftype,nral,i,j,gat,a;
1235     t_ilist *lilf;
1236     t_iatom *iatoms;
1237     int  at_end;
1238     
1239     spac         = dd->vsite_comm;
1240     ireq         = &spac->ireq[0];
1241     ga2la_specat = dd->ga2la_vsite;
1242     
1243     ireq->n = 0;
1244     /* Loop over all the home vsites */
1245     for(ftype=0; ftype<F_NRE; ftype++)
1246     {
1247         if (interaction_function[ftype].flags & IF_VSITE)
1248         {
1249             nral = NRAL(ftype);
1250             lilf = &lil[ftype];
1251             for(i=0; i<lilf->nr; i+=1+nral)
1252             {
1253                 iatoms = lilf->iatoms + i;
1254                 /* Check if we have the other atoms */
1255                 for(j=1; j<1+nral; j++)
1256                 {
1257                     if (iatoms[j] < 0) {
1258                         /* This is not a home atom,
1259                          * we need to ask our neighbors.
1260                          */
1261                         a = -iatoms[j] - 1;
1262                         /* Check to not ask for the same atom more than once */
1263                         if (gmx_hash_get_minone(dd->ga2la_vsite,a) == -1)
1264                         {
1265                             /* Add this non-home atom to the list */
1266                             if (ireq->n+1 > ireq->nalloc)
1267                             {
1268                                 ireq->nalloc = over_alloc_large(ireq->n+1);
1269                                 srenew(ireq->ind,ireq->nalloc);
1270                             }
1271                             ireq->ind[ireq->n++] = a;
1272                             /* Temporarily mark with -2,
1273                              * we get the index later.
1274                              */
1275                             gmx_hash_set(ga2la_specat,a,-2);
1276                         }
1277                     }
1278                 }
1279             }
1280         }
1281     }
1282     
1283     at_end = setup_specat_communication(dd,ireq,dd->vsite_comm,ga2la_specat,
1284                                         at_start,1,"vsite","");
1285     
1286     /* Fill in the missing indices */
1287     for(ftype=0; ftype<F_NRE; ftype++)
1288     {
1289         if (interaction_function[ftype].flags & IF_VSITE)
1290         {
1291             nral = NRAL(ftype);
1292             lilf = &lil[ftype];
1293             for(i=0; i<lilf->nr; i+=1+nral)
1294             {
1295                 iatoms = lilf->iatoms + i;
1296                 for(j=1; j<1+nral; j++)
1297                 {
1298                     if (iatoms[j] < 0)
1299                     {
1300                         iatoms[j] = gmx_hash_get_minone(ga2la_specat,-iatoms[j]-1);
1301                     }
1302                 }
1303             }
1304         }
1305     }
1306     
1307     return at_end;
1308 }
1309
1310 static gmx_domdec_specat_comm_t *specat_comm_init(int nthread)
1311 {
1312     gmx_domdec_specat_comm_t *spac;
1313
1314     snew(spac,1);
1315     spac->nthread = nthread;
1316     snew(spac->ireq,spac->nthread);
1317
1318     return spac;
1319 }
1320
1321 void init_domdec_constraints(gmx_domdec_t *dd,
1322                              gmx_mtop_t *mtop,
1323                              gmx_constr_t constr)
1324 {
1325     gmx_domdec_constraints_t *dc;
1326     gmx_molblock_t *molb;
1327     int mb,ncon,c,a;
1328     
1329     if (debug)
1330     {
1331         fprintf(debug,"Begin init_domdec_constraints\n");
1332     }
1333     
1334     snew(dd->constraints,1);
1335     dc = dd->constraints;
1336     
1337     snew(dc->molb_con_offset,mtop->nmolblock);
1338     snew(dc->molb_ncon_mol,mtop->nmolblock);
1339     
1340     ncon = 0;
1341     for(mb=0; mb<mtop->nmolblock; mb++)
1342     {
1343         molb = &mtop->molblock[mb];
1344         dc->molb_con_offset[mb] = ncon;
1345         dc->molb_ncon_mol[mb] =
1346             mtop->moltype[molb->type].ilist[F_CONSTR].nr/3 +
1347             mtop->moltype[molb->type].ilist[F_CONSTRNC].nr/3;
1348         ncon += molb->nmol*dc->molb_ncon_mol[mb];
1349     }
1350     
1351     if (ncon > 0)
1352     {
1353         snew(dc->gc_req,ncon);
1354         for(c=0; c<ncon; c++)
1355         {
1356             dc->gc_req[c] = 0;
1357         }
1358     }
1359
1360     /* Use a hash table for the global to local index.
1361      * The number of keys is a rough estimate, it will be optimized later.
1362      */
1363     dc->ga2la = gmx_hash_init(min(mtop->natoms/20,
1364                                   mtop->natoms/(2*dd->nnodes)));
1365
1366     dc->nthread = gmx_omp_nthreads_get(emntDomdec);
1367     snew(dc->ils,dc->nthread);
1368
1369     dd->constraint_comm = specat_comm_init(dc->nthread);
1370 }
1371
1372 void init_domdec_vsites(gmx_domdec_t *dd,int n_intercg_vsite)
1373 {
1374     int i;
1375     gmx_domdec_constraints_t *dc;
1376     
1377     if (debug)
1378     {
1379         fprintf(debug,"Begin init_domdec_vsites\n");
1380     }
1381     
1382     /* Use a hash table for the global to local index.
1383      * The number of keys is a rough estimate, it will be optimized later.
1384      */
1385     dd->ga2la_vsite = gmx_hash_init(min(n_intercg_vsite/20,
1386                                         n_intercg_vsite/(2*dd->nnodes)));
1387     
1388     dd->vsite_comm = specat_comm_init(1);
1389 }