added Verlet scheme and NxN non-bonded functionality
[alexxy/gromacs.git] / src / mdlib / domdec_con.c
1 /* -*- mode: c; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; c-file-style: "stroustrup"; -*-
2  *
3  * 
4  * This file is part of Gromacs        Copyright (c) 1991-2008
5  * David van der Spoel, Erik Lindahl, Berk Hess, University of Groningen.
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License
9  * as published by the Free Software Foundation; either version 2
10  * of the License, or (at your option) any later version.
11  *
12  * To help us fund GROMACS development, we humbly ask that you cite
13  * the research papers on the package. Check out http://www.gromacs.org
14  * 
15  * And Hey:
16  * Gnomes, ROck Monsters And Chili Sauce
17  */
18
19 #ifdef HAVE_CONFIG_H
20 #include <config.h>
21 #endif
22 #include <assert.h>
23
24 #include "smalloc.h"
25 #include "vec.h"
26 #include "constr.h"
27 #include "domdec.h"
28 #include "domdec_network.h"
29 #include "mtop_util.h"
30 #include "gmx_ga2la.h"
31 #include "gmx_hash.h"
32 #include "gmx_omp_nthreads.h"
33
34 typedef struct {
35     int nsend;
36     int *a;
37     int a_nalloc;
38     int nrecv;
39 } gmx_specatsend_t;
40
41 typedef struct {
42     int *ind;
43     int nalloc;
44     int n;
45 } ind_req_t;
46
47 typedef struct gmx_domdec_specat_comm {
48     /* The number of indices to receive during the setup */
49     int  nreq[DIM][2][2];
50     /* The atoms to send */
51     gmx_specatsend_t spas[DIM][2];
52     gmx_bool *bSendAtom;
53     int   bSendAtom_nalloc;
54     /* Send buffers */
55     int  *ibuf;
56     int  ibuf_nalloc;
57     rvec *vbuf;
58     int  vbuf_nalloc;
59     rvec *vbuf2;
60     int  vbuf2_nalloc;
61     /* The range in the local buffer(s) for received atoms */
62     int  at_start;
63     int  at_end;
64
65     /* The atom indices we need from the surrounding cells.
66      * We can gather the indices over nthread threads.
67      */
68     int nthread;
69     ind_req_t *ireq;
70 } gmx_domdec_specat_comm_t;
71
72 typedef struct gmx_domdec_constraints {
73     int  *molb_con_offset;
74     int  *molb_ncon_mol;
75     /* The fully local and connected constraints */
76     int  ncon;
77     /* The global constraint number, only required for clearing gc_req */
78     int  *con_gl;
79     int  *con_nlocat;
80     int  con_nalloc;
81     /* Boolean that tells if a global constraint index has been requested */
82     char *gc_req;
83     /* Global to local communicated constraint atom only index */
84     gmx_hash_t ga2la;
85
86     /* Multi-threading stuff */
87     int nthread;
88     t_ilist *ils;
89 } gmx_domdec_constraints_t;
90
91
92 static void dd_move_f_specat(gmx_domdec_t *dd,gmx_domdec_specat_comm_t *spac,
93                              rvec *f,rvec *fshift)
94 {
95     gmx_specatsend_t *spas;
96     rvec *vbuf;
97     int  n,n0,n1,d,dim,dir,i;
98     ivec vis;
99     int  is;
100     gmx_bool bPBC,bScrew;
101     
102     n = spac->at_end;
103     for(d=dd->ndim-1; d>=0; d--)
104     {
105         dim = dd->dim[d];
106         if (dd->nc[dim] > 2)
107         {
108             /* Pulse the grid forward and backward */
109             spas = spac->spas[d];
110             n0 = spas[0].nrecv;
111             n1 = spas[1].nrecv;
112             n -= n1 + n0;
113             vbuf = spac->vbuf;
114             /* Send and receive the coordinates */
115             dd_sendrecv2_rvec(dd,d,
116                               f+n+n1,n0,vbuf              ,spas[0].nsend,
117                               f+n   ,n1,vbuf+spas[0].nsend,spas[1].nsend);
118             for(dir=0; dir<2; dir++)
119             {
120                 bPBC   = ((dir == 0 && dd->ci[dim] == 0) || 
121                           (dir == 1 && dd->ci[dim] == dd->nc[dim]-1));
122                 bScrew = (bPBC && dd->bScrewPBC && dim == XX);
123                 
124                 spas = &spac->spas[d][dir];
125                 /* Sum the buffer into the required forces */
126                 if (!bPBC || (!bScrew && fshift == NULL))
127                 {
128                     for(i=0; i<spas->nsend; i++)
129                     {
130                         rvec_inc(f[spas->a[i]],*vbuf);
131                         vbuf++;
132                     }
133                 }
134                 else
135                 {
136                     clear_ivec(vis);
137                     vis[dim] = (dir==0 ? 1 : -1);
138                     is = IVEC2IS(vis);
139                     if (!bScrew)
140                     {
141                         /* Sum and add to shift forces */
142                         for(i=0; i<spas->nsend; i++)
143                         {
144                             rvec_inc(f[spas->a[i]],*vbuf);
145                             rvec_inc(fshift[is],*vbuf);
146                             vbuf++;
147                         }
148                     }
149                     else
150                     {       
151                         /* Rotate the forces */
152                         for(i=0; i<spas->nsend; i++)
153                         {
154                             f[spas->a[i]][XX] += (*vbuf)[XX];
155                             f[spas->a[i]][YY] -= (*vbuf)[YY];
156                             f[spas->a[i]][ZZ] -= (*vbuf)[ZZ];
157                             if (fshift)
158                             {
159                                 rvec_inc(fshift[is],*vbuf);
160                             }
161                             vbuf++;
162                         }
163                     }
164                 }
165             }
166         }
167         else
168         {
169             /* Two cells, so we only need to communicate one way */
170             spas = &spac->spas[d][0];
171             n -= spas->nrecv;
172             /* Send and receive the coordinates */
173             dd_sendrecv_rvec(dd,d,dddirForward,
174                              f+n,spas->nrecv,spac->vbuf,spas->nsend);
175             /* Sum the buffer into the required forces */
176             if (dd->bScrewPBC && dim == XX &&
177                 (dd->ci[dim] == 0 ||
178                  dd->ci[dim] == dd->nc[dim]-1))
179             {
180                 for(i=0; i<spas->nsend; i++)
181                 {
182                     /* Rotate the force */
183                     f[spas->a[i]][XX] += spac->vbuf[i][XX];
184                     f[spas->a[i]][YY] -= spac->vbuf[i][YY];
185                     f[spas->a[i]][ZZ] -= spac->vbuf[i][ZZ];
186                 }
187             }
188             else
189             {
190                 for(i=0; i<spas->nsend; i++)
191                 {
192                     rvec_inc(f[spas->a[i]],spac->vbuf[i]);
193                 }
194             }
195         }
196     }
197 }
198
199 void dd_move_f_vsites(gmx_domdec_t *dd,rvec *f,rvec *fshift)
200 {
201     if (dd->vsite_comm)
202     {
203         dd_move_f_specat(dd,dd->vsite_comm,f,fshift);
204     }
205 }
206
207 void dd_clear_f_vsites(gmx_domdec_t *dd,rvec *f)
208 {
209     int i;
210     
211     if (dd->vsite_comm)
212     {
213         for(i=dd->vsite_comm->at_start; i<dd->vsite_comm->at_end; i++)
214         {
215             clear_rvec(f[i]);
216         }
217     }
218 }
219
220 static void dd_move_x_specat(gmx_domdec_t *dd,gmx_domdec_specat_comm_t *spac,
221                              matrix box,rvec *x0,rvec *x1)
222 {
223     gmx_specatsend_t *spas;
224     rvec *x,*vbuf,*rbuf;
225     int  nvec,v,n,nn,ns0,ns1,nr0,nr1,nr,d,dim,dir,i;
226     gmx_bool bPBC,bScrew=FALSE;
227     rvec shift={0,0,0};
228     
229     nvec = 1;
230     if (x1)
231     {
232         nvec++;
233     }
234     
235     n = spac->at_start;
236     for(d=0; d<dd->ndim; d++)
237     {
238         dim = dd->dim[d];
239         if (dd->nc[dim] > 2)
240         {
241             /* Pulse the grid forward and backward */
242             vbuf = spac->vbuf;
243             for(dir=0; dir<2; dir++)
244             {
245                 if (dir == 0 && dd->ci[dim] == 0)
246                 {
247                     bPBC   = TRUE;
248                     bScrew = (dd->bScrewPBC && dim == XX);
249                     copy_rvec(box[dim],shift);
250                 }
251                 else if (dir == 1 && dd->ci[dim] == dd->nc[dim]-1)
252                 {
253                     bPBC = TRUE;
254                     bScrew = (dd->bScrewPBC && dim == XX);
255                     for(i=0; i<DIM; i++)
256                     {
257                         shift[i] = -box[dim][i];
258                     }
259                 }
260                 else
261                 {
262                     bPBC = FALSE;
263                     bScrew = FALSE;
264                 }
265                 spas = &spac->spas[d][dir];
266                 for(v=0; v<nvec; v++)
267                 {
268                     x = (v == 0 ? x0 : x1);
269                     /* Copy the required coordinates to the send buffer */
270                     if (!bPBC)
271                     {
272                         /* Only copy */
273                         for(i=0; i<spas->nsend; i++)
274                         {
275                             copy_rvec(x[spas->a[i]],*vbuf);
276                             vbuf++;
277                         }
278                     }
279                     else if (!bScrew)
280                     {
281                         /* Shift coordinates */
282                         for(i=0; i<spas->nsend; i++)
283                         {
284                             rvec_add(x[spas->a[i]],shift,*vbuf);
285                             vbuf++;
286                         }
287                     }
288                     else
289                     {
290                         /* Shift and rotate coordinates */
291                         for(i=0; i<spas->nsend; i++)
292                         {
293                             (*vbuf)[XX] =               x[spas->a[i]][XX] + shift[XX];
294                             (*vbuf)[YY] = box[YY][YY] - x[spas->a[i]][YY] + shift[YY];
295                             (*vbuf)[ZZ] = box[ZZ][ZZ] - x[spas->a[i]][ZZ] + shift[ZZ];
296                             vbuf++;
297                         }
298                     }
299                 }
300             }
301             /* Send and receive the coordinates */
302             spas = spac->spas[d];
303             ns0 = spas[0].nsend;
304             nr0 = spas[0].nrecv;
305             ns1 = spas[1].nsend;
306             nr1 = spas[1].nrecv;
307             if (nvec == 1)
308             {
309                 dd_sendrecv2_rvec(dd,d,
310                                   spac->vbuf+ns0,ns1,x0+n    ,nr1,
311                                   spac->vbuf    ,ns0,x0+n+nr1,nr0);
312             }
313             else
314             {
315                 /* Communicate both vectors in one buffer */
316                 rbuf = spac->vbuf2;
317                 dd_sendrecv2_rvec(dd,d,
318                                   spac->vbuf+2*ns0,2*ns1,rbuf      ,2*nr1,
319                                   spac->vbuf      ,2*ns0,rbuf+2*nr1,2*nr0);
320                 /* Split the buffer into the two vectors */
321                 nn = n;
322                 for(dir=1; dir>=0; dir--)
323                 {
324                     nr = spas[dir].nrecv;
325                     for(v=0; v<2; v++)
326                     {
327                         x = (v == 0 ? x0 : x1);
328                         for(i=0; i<nr; i++)
329                         {
330                             copy_rvec(*rbuf,x[nn+i]);
331                             rbuf++;
332                         }
333                     }
334                     nn += nr;
335                 }
336             }
337             n += nr0 + nr1;
338         }
339         else
340         {
341             spas = &spac->spas[d][0];
342             /* Copy the required coordinates to the send buffer */
343             vbuf = spac->vbuf;
344             for(v=0; v<nvec; v++)
345             {
346                 x = (v == 0 ? x0 : x1);
347                 if (dd->bScrewPBC && dim == XX &&
348                     (dd->ci[XX] == 0 || dd->ci[XX] == dd->nc[XX]-1))
349                 {
350                     /* Here we only perform the rotation, the rest of the pbc
351                      * is handled in the constraint or viste routines.
352                      */
353                     for(i=0; i<spas->nsend; i++)
354                     {
355                         (*vbuf)[XX] =               x[spas->a[i]][XX];
356                         (*vbuf)[YY] = box[YY][YY] - x[spas->a[i]][YY];
357                         (*vbuf)[ZZ] = box[ZZ][ZZ] - x[spas->a[i]][ZZ];
358                         vbuf++;
359                     }     
360                 }
361                 else
362                 {
363                     for(i=0; i<spas->nsend; i++)
364                     {
365                         copy_rvec(x[spas->a[i]],*vbuf);
366                         vbuf++;
367                     }
368                 }
369             }
370             /* Send and receive the coordinates */
371             if (nvec == 1)
372             {
373                 dd_sendrecv_rvec(dd,d,dddirBackward,
374                                  spac->vbuf,spas->nsend,x0+n,spas->nrecv);
375             }
376             else
377             {
378                 /* Communicate both vectors in one buffer */
379                 rbuf = spac->vbuf2;
380                 dd_sendrecv_rvec(dd,d,dddirBackward,
381                                  spac->vbuf,2*spas->nsend,rbuf,2*spas->nrecv);
382                 /* Split the buffer into the two vectors */
383                 nr = spas[0].nrecv;
384                 for(v=0; v<2; v++)
385                 {
386                     x = (v == 0 ? x0 : x1);
387                     for(i=0; i<nr; i++)
388                     {
389                         copy_rvec(*rbuf,x[n+i]);
390                         rbuf++;
391                     }
392                 }
393             }
394             n += spas->nrecv;
395         }
396     }
397 }
398
399 void dd_move_x_constraints(gmx_domdec_t *dd,matrix box,rvec *x0,rvec *x1)
400 {
401     if (dd->constraint_comm)
402     {
403         dd_move_x_specat(dd,dd->constraint_comm,box,x0,x1);
404     }
405 }
406
407 void dd_move_x_vsites(gmx_domdec_t *dd,matrix box,rvec *x)
408 {
409     if (dd->vsite_comm)
410     {
411         dd_move_x_specat(dd,dd->vsite_comm,box,x,NULL);
412     }
413 }
414
415 int *dd_constraints_nlocalatoms(gmx_domdec_t *dd)
416 {
417     if (dd->constraints)
418     {
419         return dd->constraints->con_nlocat;
420     }
421     else
422     {
423         return NULL;
424     }
425 }
426
427 void dd_clear_local_constraint_indices(gmx_domdec_t *dd)
428 {
429     gmx_domdec_constraints_t *dc;
430     int i;
431     
432     dc = dd->constraints;
433     
434     for(i=0; i<dc->ncon; i++)
435     {
436         dc->gc_req[dc->con_gl[i]] = 0;
437     }
438   
439     if (dd->constraint_comm)
440     {
441         gmx_hash_clear_and_optimize(dc->ga2la);
442     }
443 }
444
445 void dd_clear_local_vsite_indices(gmx_domdec_t *dd)
446 {
447     int i;
448     
449     if (dd->vsite_comm)
450     {
451         gmx_hash_clear_and_optimize(dd->ga2la_vsite);
452     }
453 }
454
455 static int setup_specat_communication(gmx_domdec_t *dd,
456                                       ind_req_t *ireq,
457                                       gmx_domdec_specat_comm_t *spac,
458                                       gmx_hash_t ga2la_specat,
459                                       int at_start,
460                                       int vbuf_fac,
461                                       const char *specat_type,
462                                       const char *add_err)
463 {
464     int  nsend[2],nlast,nsend_zero[2]={0,0},*nsend_ptr;
465     int  d,dim,ndir,dir,nr,ns,i,nrecv_local,n0,start,indr,ind,buf[2];
466     int  nat_tot_specat,nat_tot_prev,nalloc_old;
467     gmx_bool bPBC,bFirst;
468     gmx_specatsend_t *spas;
469     
470     if (debug)
471     {
472         fprintf(debug,"Begin setup_specat_communication for %s\n",specat_type);
473     }
474     
475     /* nsend[0]: the number of atoms requested by this node only,
476      *           we communicate this for more efficients checks
477      * nsend[1]: the total number of requested atoms
478      */
479     nsend[0] = ireq->n;
480     nsend[1] = nsend[0];
481     nlast    = nsend[1];
482     for(d=dd->ndim-1; d>=0; d--)
483     {
484         /* Pulse the grid forward and backward */
485         dim = dd->dim[d];
486         bPBC = (dim < dd->npbcdim);
487         if (dd->nc[dim] == 2)
488         {
489             /* Only 2 cells, so we only need to communicate once */
490             ndir = 1;
491         }
492         else
493         {
494             ndir = 2;
495         }
496         for(dir=0; dir<ndir; dir++)
497         {
498             if (!bPBC && 
499                 dd->nc[dim] > 2 &&
500                 ((dir == 0 && dd->ci[dim] == dd->nc[dim] - 1) ||
501                  (dir == 1 && dd->ci[dim] == 0)))
502             {
503                 /* No pbc: the fist/last cell should not request atoms */
504                 nsend_ptr = nsend_zero;
505             }
506             else
507             {
508                 nsend_ptr = nsend;
509             }
510             /* Communicate the number of indices */
511             dd_sendrecv_int(dd,d,dir==0 ? dddirForward : dddirBackward,
512                             nsend_ptr,2,spac->nreq[d][dir],2);
513             nr = spac->nreq[d][dir][1];
514             if (nlast+nr > ireq->nalloc)
515             {
516                 ireq->nalloc = over_alloc_dd(nlast+nr);
517                 srenew(ireq->ind,ireq->nalloc);
518             }
519             /* Communicate the indices */
520             dd_sendrecv_int(dd,d,dir==0 ? dddirForward : dddirBackward,
521                             ireq->ind,nsend_ptr[1],ireq->ind+nlast,nr);
522             nlast += nr;
523         }
524         nsend[1] = nlast;
525     }
526     if (debug)
527     {
528         fprintf(debug,"Communicated the counts\n");
529     }
530     
531     /* Search for the requested atoms and communicate the indices we have */
532     nat_tot_specat = at_start;
533     nrecv_local = 0;
534     for(d=0; d<dd->ndim; d++)
535     {
536         bFirst = (d == 0);
537         /* Pulse the grid forward and backward */
538         if (dd->dim[d] >= dd->npbcdim || dd->nc[dd->dim[d]] > 2)
539         {
540             ndir = 2;
541         }
542         else
543         {
544             ndir = 1;
545         }
546         nat_tot_prev = nat_tot_specat;
547         for(dir=ndir-1; dir>=0; dir--)
548         {
549             if (nat_tot_specat > spac->bSendAtom_nalloc)
550             {
551                 nalloc_old = spac->bSendAtom_nalloc;
552                 spac->bSendAtom_nalloc = over_alloc_dd(nat_tot_specat);
553                 srenew(spac->bSendAtom,spac->bSendAtom_nalloc);
554                 for(i=nalloc_old; i<spac->bSendAtom_nalloc; i++)
555                 {
556                     spac->bSendAtom[i] = FALSE;
557                 }
558             }
559             spas = &spac->spas[d][dir];
560             n0 = spac->nreq[d][dir][0];
561             nr = spac->nreq[d][dir][1];
562             if (debug)
563             {
564                 fprintf(debug,"dim=%d, dir=%d, searching for %d atoms\n",
565                         d,dir,nr);
566             }
567             start = nlast - nr;
568             spas->nsend = 0;
569             nsend[0] = 0;
570             for(i=0; i<nr; i++)
571             {
572                 indr = ireq->ind[start+i];
573                 ind = -1;
574                 /* Check if this is a home atom and if so ind will be set */
575                 if (!ga2la_get_home(dd->ga2la,indr,&ind))
576                 {
577                     /* Search in the communicated atoms */
578                     ind = gmx_hash_get_minone(ga2la_specat,indr);
579                 }
580                 if (ind >= 0)
581                 {
582                     if (i < n0 || !spac->bSendAtom[ind])
583                     {
584                         if (spas->nsend+1 > spas->a_nalloc)
585                         {
586                             spas->a_nalloc = over_alloc_large(spas->nsend+1);
587                             srenew(spas->a,spas->a_nalloc);
588                         }
589                         /* Store the local index so we know which coordinates
590                          * to send out later.
591                          */
592                         spas->a[spas->nsend] = ind;
593                         spac->bSendAtom[ind] = TRUE;
594                         if (spas->nsend+1 > spac->ibuf_nalloc)
595                         {
596                             spac->ibuf_nalloc = over_alloc_large(spas->nsend+1);
597                             srenew(spac->ibuf,spac->ibuf_nalloc);
598                         }
599                         /* Store the global index so we can send it now */
600                         spac->ibuf[spas->nsend] = indr;
601                         if (i < n0)
602                         {
603                             nsend[0]++;
604                         }
605                         spas->nsend++;
606                     }
607                 }
608             }
609             nlast = start;
610             /* Clear the local flags */
611             for(i=0; i<spas->nsend; i++)
612             {
613                 spac->bSendAtom[spas->a[i]] = FALSE;
614             }
615             /* Send and receive the number of indices to communicate */
616             nsend[1] = spas->nsend;
617             dd_sendrecv_int(dd,d,dir==0 ? dddirBackward : dddirForward,
618                             nsend,2,buf,2);
619             if (debug)
620             {
621                 fprintf(debug,"Send to node %d, %d (%d) indices, "
622                         "receive from node %d, %d (%d) indices\n",
623                         dd->neighbor[d][1-dir],nsend[1],nsend[0],
624                         dd->neighbor[d][dir],buf[1],buf[0]);
625                 if (gmx_debug_at)
626                 {
627                     for(i=0; i<spas->nsend; i++)
628                     {
629                         fprintf(debug," %d",spac->ibuf[i]+1);
630                     }
631                     fprintf(debug,"\n");
632                 }
633             }
634             nrecv_local += buf[0];
635             spas->nrecv  = buf[1];
636             if (nat_tot_specat + spas->nrecv > dd->gatindex_nalloc)
637             {
638                 dd->gatindex_nalloc =
639                     over_alloc_dd(nat_tot_specat + spas->nrecv);
640                 srenew(dd->gatindex,dd->gatindex_nalloc);
641             }
642             /* Send and receive the indices */
643             dd_sendrecv_int(dd,d,dir==0 ? dddirBackward : dddirForward,
644                             spac->ibuf,spas->nsend,
645                             dd->gatindex+nat_tot_specat,spas->nrecv);
646             nat_tot_specat += spas->nrecv;
647         }
648         
649         /* Allocate the x/f communication buffers */
650         ns = spac->spas[d][0].nsend;
651         nr = spac->spas[d][0].nrecv;
652         if (ndir == 2)
653         {
654             ns += spac->spas[d][1].nsend;
655             nr += spac->spas[d][1].nrecv;
656         }
657         if (vbuf_fac*ns > spac->vbuf_nalloc)
658         {
659             spac->vbuf_nalloc = over_alloc_dd(vbuf_fac*ns);
660             srenew(spac->vbuf,spac->vbuf_nalloc);
661         }
662         if (vbuf_fac == 2 && vbuf_fac*nr > spac->vbuf2_nalloc)
663         {
664             spac->vbuf2_nalloc = over_alloc_dd(vbuf_fac*nr);
665             srenew(spac->vbuf2,spac->vbuf2_nalloc);
666         }
667         
668         /* Make a global to local index for the communication atoms */
669         for(i=nat_tot_prev; i<nat_tot_specat; i++)
670         {
671             gmx_hash_change_or_set(ga2la_specat,dd->gatindex[i],i);
672         }
673     }
674     
675     /* Check that in the end we got the number of atoms we asked for */
676     if (nrecv_local != ireq->n)
677     {
678         if (debug)
679         {
680             fprintf(debug,"Requested %d, received %d (tot recv %d)\n",
681                     ireq->n,nrecv_local,nat_tot_specat-at_start);
682             if (gmx_debug_at)
683             {
684                 for(i=0; i<ireq->n; i++)
685                 {
686                     ind = gmx_hash_get_minone(ga2la_specat,ireq->ind[i]);
687                     fprintf(debug," %s%d",
688                             (ind >= 0) ? "" : "!",
689                             ireq->ind[i]+1);
690                 }
691                 fprintf(debug,"\n");
692             }
693         }
694         fprintf(stderr,"\nDD cell %d %d %d: Neighboring cells do not have atoms:",
695                 dd->ci[XX],dd->ci[YY],dd->ci[ZZ]);
696         for(i=0; i<ireq->n; i++)
697         {
698             if (gmx_hash_get_minone(ga2la_specat,ireq->ind[i]) < 0)
699             {
700                 fprintf(stderr," %d",ireq->ind[i]+1);
701             }
702         }
703         fprintf(stderr,"\n");
704         gmx_fatal(FARGS,"DD cell %d %d %d could only obtain %d of the %d atoms that are connected via %ss from the neighboring cells. This probably means your %s lengths are too long compared to the domain decomposition cell size. Decrease the number of domain decomposition grid cells%s%s.",
705                   dd->ci[XX],dd->ci[YY],dd->ci[ZZ],
706                   nrecv_local,ireq->n,specat_type,
707                   specat_type,add_err,
708                   dd->bGridJump ? " or use the -rcon option of mdrun" : "");
709     }
710     
711     spac->at_start = at_start;
712     spac->at_end   = nat_tot_specat;
713     
714     if (debug)
715     {
716         fprintf(debug,"Done setup_specat_communication\n");
717     }
718     
719     return nat_tot_specat;
720 }
721
722 static void walk_out(int con,int con_offset,int a,int offset,int nrec,
723                      int ncon1,const t_iatom *ia1,const t_iatom *ia2,
724                      const t_blocka *at2con,
725                      const gmx_ga2la_t ga2la,gmx_bool bHomeConnect,
726                      gmx_domdec_constraints_t *dc,
727                      gmx_domdec_specat_comm_t *dcc,
728                      t_ilist *il_local,
729                      ind_req_t *ireq)
730 {
731     int a1_gl,a2_gl,a_loc,i,coni,b;
732     const t_iatom *iap;
733   
734     if (dc->gc_req[con_offset+con] == 0)
735     {
736         /* Add this non-home constraint to the list */
737         if (dc->ncon+1 > dc->con_nalloc)
738         {
739             dc->con_nalloc = over_alloc_large(dc->ncon+1);
740             srenew(dc->con_gl,dc->con_nalloc);
741             srenew(dc->con_nlocat,dc->con_nalloc);
742         }
743         dc->con_gl[dc->ncon] = con_offset + con;
744         dc->con_nlocat[dc->ncon] = (bHomeConnect ? 1 : 0);
745         dc->gc_req[con_offset+con] = 1;
746         if (il_local->nr + 3 > il_local->nalloc)
747         {
748             il_local->nalloc = over_alloc_dd(il_local->nr+3);
749             srenew(il_local->iatoms,il_local->nalloc);
750         }
751         iap = constr_iatomptr(ncon1,ia1,ia2,con);
752         il_local->iatoms[il_local->nr++] = iap[0];
753         a1_gl = offset + iap[1];
754         a2_gl = offset + iap[2];
755         /* The following indexing code can probably be optizimed */
756         if (ga2la_get_home(ga2la,a1_gl,&a_loc))
757         {
758             il_local->iatoms[il_local->nr++] = a_loc;
759         }
760         else
761         {
762             /* We set this index later */
763             il_local->iatoms[il_local->nr++] = -a1_gl - 1;
764         }
765         if (ga2la_get_home(ga2la,a2_gl,&a_loc))
766         {
767             il_local->iatoms[il_local->nr++] = a_loc;
768         }
769         else
770         {
771             /* We set this index later */
772             il_local->iatoms[il_local->nr++] = -a2_gl - 1;
773         }
774         dc->ncon++;
775     }
776     /* Check to not ask for the same atom more than once */
777     if (gmx_hash_get_minone(dc->ga2la,offset+a) == -1)
778     {
779         assert(dcc);
780         /* Add this non-home atom to the list */
781         if (ireq->n+1 > ireq->nalloc)
782         {
783             ireq->nalloc = over_alloc_large(ireq->n+1);
784             srenew(ireq->ind,ireq->nalloc);
785         }
786         ireq->ind[ireq->n++] = offset + a;
787         /* Temporarily mark with -2, we get the index later */
788         gmx_hash_set(dc->ga2la,offset+a,-2);
789     }
790     
791     if (nrec > 0)
792     {
793         for(i=at2con->index[a]; i<at2con->index[a+1]; i++)
794         {
795             coni = at2con->a[i];
796             if (coni != con)
797             {
798                 /* Walk further */
799                 iap = constr_iatomptr(ncon1,ia1,ia2,coni);
800                 if (a == iap[1])
801                 {
802                     b = iap[2];
803                 }
804                 else
805                 {
806                     b = iap[1];
807                 }
808                 if (!ga2la_get_home(ga2la,offset+b,&a_loc))
809                 {
810                     walk_out(coni,con_offset,b,offset,nrec-1,
811                              ncon1,ia1,ia2,at2con,
812                              ga2la,FALSE,dc,dcc,il_local,ireq);
813                 }
814             }
815         }
816     }
817 }
818
819 static void atoms_to_settles(gmx_domdec_t *dd,
820                              const gmx_mtop_t *mtop,
821                              const int *cginfo,
822                              const int **at2settle_mt,
823                              int cg_start,int cg_end,
824                              t_ilist *ils_local,
825                              ind_req_t *ireq)
826 {
827     gmx_ga2la_t ga2la;
828     gmx_mtop_atomlookup_t alook;
829     int settle;
830     int nral,sa;
831     int cg,a,a_gl,a_glsa,a_gls[3],a_locs[3];
832     int mb,molnr,a_mol,offset;
833     const gmx_molblock_t *molb;
834     const t_iatom *ia1;
835     gmx_bool a_home[3];
836     int nlocal;
837     gmx_bool bAssign;
838
839     ga2la  = dd->ga2la;
840
841     alook = gmx_mtop_atomlookup_settle_init(mtop);
842
843     nral = NRAL(F_SETTLE);
844
845     for(cg=cg_start; cg<cg_end; cg++)
846     {
847         if (GET_CGINFO_SETTLE(cginfo[cg]))
848         {
849             for(a=dd->cgindex[cg]; a<dd->cgindex[cg+1]; a++)
850             {
851                 a_gl = dd->gatindex[a];
852                 
853                 gmx_mtop_atomnr_to_molblock_ind(alook,a_gl,&mb,&molnr,&a_mol);
854                 molb = &mtop->molblock[mb];
855
856                 settle = at2settle_mt[molb->type][a_mol];
857
858                 if (settle >= 0)
859                 {
860                     offset = a_gl - a_mol;
861
862                     ia1 = mtop->moltype[molb->type].ilist[F_SETTLE].iatoms;
863
864                     bAssign = FALSE;
865                     nlocal = 0;
866                     for(sa=0; sa<nral; sa++)
867                     {
868                         a_glsa = offset + ia1[settle*(1+nral)+1+sa];
869                         a_gls[sa] = a_glsa;
870                         a_home[sa] = ga2la_get_home(ga2la,a_glsa,&a_locs[sa]);
871                         if (a_home[sa])
872                         {
873                             if (nlocal == 0 && a_gl == a_glsa)
874                             {
875                                 bAssign = TRUE;
876                             }
877                             nlocal++;
878                         }
879                     }
880
881                     if (bAssign)
882                     {
883                         if (ils_local->nr+1+nral > ils_local->nalloc)
884                         {
885                             ils_local->nalloc = over_alloc_dd(ils_local->nr+1+nral);
886                             srenew(ils_local->iatoms,ils_local->nalloc);
887                         }
888
889                         ils_local->iatoms[ils_local->nr++] = ia1[settle*4];
890
891                         for(sa=0; sa<nral; sa++)
892                         {
893                             if (ga2la_get_home(ga2la,a_gls[sa],&a_locs[sa]))
894                             {
895                                 ils_local->iatoms[ils_local->nr++] = a_locs[sa];
896                             }
897                             else
898                             {
899                                 ils_local->iatoms[ils_local->nr++] = -a_gls[sa] - 1;
900                                 /* Add this non-home atom to the list */
901                                 if (ireq->n+1 > ireq->nalloc)
902                                 {
903                                     ireq->nalloc = over_alloc_large(ireq->n+1);
904                                     srenew(ireq->ind,ireq->nalloc);
905                                 }
906                                 ireq->ind[ireq->n++] = a_gls[sa];
907                                 /* A check on double atom requests is
908                                  * not required for settle.
909                                  */
910                             }
911                         }
912                     }
913                 }
914             }
915         }
916     }
917
918     gmx_mtop_atomlookup_destroy(alook);
919 }
920
921 static void atoms_to_constraints(gmx_domdec_t *dd,
922                                  const gmx_mtop_t *mtop,
923                                  const int *cginfo,
924                                  const t_blocka *at2con_mt,int nrec,
925                                  t_ilist *ilc_local,
926                                  ind_req_t *ireq)
927 {
928     const t_blocka *at2con;
929     gmx_ga2la_t ga2la;
930     gmx_mtop_atomlookup_t alook;
931     int ncon1;
932     gmx_molblock_t *molb;
933     t_iatom *ia1,*ia2,*iap;
934     int nhome,cg,a,a_gl,a_mol,a_loc,b_lo,offset,mb,molnr,b_mol,i,con,con_offset;
935     gmx_domdec_constraints_t *dc;
936     gmx_domdec_specat_comm_t *dcc;
937     
938     dc  = dd->constraints;
939     dcc = dd->constraint_comm;
940     
941     ga2la  = dd->ga2la;
942
943     alook = gmx_mtop_atomlookup_init(mtop);
944
945     nhome = 0;
946     for(cg=0; cg<dd->ncg_home; cg++)
947     {
948         if (GET_CGINFO_CONSTR(cginfo[cg]))
949         {
950             for(a=dd->cgindex[cg]; a<dd->cgindex[cg+1]; a++)
951             {
952                 a_gl = dd->gatindex[a];
953         
954                 gmx_mtop_atomnr_to_molblock_ind(alook,a_gl,&mb,&molnr,&a_mol);
955                 molb = &mtop->molblock[mb];
956         
957                 ncon1 = mtop->moltype[molb->type].ilist[F_CONSTR].nr/NRAL(F_SETTLE);
958
959                 ia1 = mtop->moltype[molb->type].ilist[F_CONSTR].iatoms;
960                 ia2 = mtop->moltype[molb->type].ilist[F_CONSTRNC].iatoms;
961
962                 /* Calculate the global constraint number offset for the molecule.
963                  * This is only required for the global index to make sure
964                  * that we use each constraint only once.
965                  */
966                 con_offset =
967                     dc->molb_con_offset[mb] + molnr*dc->molb_ncon_mol[mb];
968             
969                 /* The global atom number offset for this molecule */
970                 offset = a_gl - a_mol;
971                 at2con = &at2con_mt[molb->type];
972                 for(i=at2con->index[a_mol]; i<at2con->index[a_mol+1]; i++)
973                 {
974                     con = at2con->a[i];
975                     iap = constr_iatomptr(ncon1,ia1,ia2,con);
976                     if (a_mol == iap[1])
977                     {
978                         b_mol = iap[2];
979                     }
980                     else
981                     {
982                         b_mol = iap[1];
983                     }
984                     if (ga2la_get_home(ga2la,offset+b_mol,&a_loc))
985                     {
986                         /* Add this fully home constraint at the first atom */
987                         if (a_mol < b_mol)
988                         {
989                             if (dc->ncon+1 > dc->con_nalloc)
990                             {
991                                 dc->con_nalloc = over_alloc_large(dc->ncon+1);
992                                 srenew(dc->con_gl,dc->con_nalloc);
993                                 srenew(dc->con_nlocat,dc->con_nalloc);
994                             }
995                             dc->con_gl[dc->ncon] = con_offset + con;
996                             dc->con_nlocat[dc->ncon] = 2;
997                             if (ilc_local->nr + 3 > ilc_local->nalloc)
998                             {
999                                 ilc_local->nalloc = over_alloc_dd(ilc_local->nr + 3);
1000                                 srenew(ilc_local->iatoms,ilc_local->nalloc);
1001                             }
1002                             b_lo = a_loc;
1003                             ilc_local->iatoms[ilc_local->nr++] = iap[0];
1004                             ilc_local->iatoms[ilc_local->nr++] = (a_gl == iap[1] ? a    : b_lo);
1005                             ilc_local->iatoms[ilc_local->nr++] = (a_gl == iap[1] ? b_lo : a   );
1006                             dc->ncon++;
1007                             nhome++;
1008                         }
1009                     }
1010                     else
1011                     {
1012                         /* We need the nrec constraints coupled to this constraint,
1013                          * so we need to walk out of the home cell by nrec+1 atoms,
1014                          * since already atom bg is not locally present.
1015                          * Therefore we call walk_out with nrec recursions to go
1016                          * after this first call.
1017                          */
1018                         walk_out(con,con_offset,b_mol,offset,nrec,
1019                                  ncon1,ia1,ia2,at2con,
1020                                  dd->ga2la,TRUE,dc,dcc,ilc_local,ireq);
1021                     }
1022                 }
1023             }
1024         }
1025     }
1026
1027     gmx_mtop_atomlookup_destroy(alook);
1028
1029     if (debug)
1030     {
1031         fprintf(debug,
1032                 "Constraints: home %3d border %3d atoms: %3d\n",
1033                 nhome,dc->ncon-nhome,
1034                 dd->constraint_comm ? ireq->n : 0);
1035     }
1036 }
1037
1038 int dd_make_local_constraints(gmx_domdec_t *dd,int at_start,
1039                               const gmx_mtop_t *mtop,
1040                               const int *cginfo,
1041                               gmx_constr_t constr,int nrec,
1042                               t_ilist *il_local)
1043 {
1044     gmx_domdec_constraints_t *dc;
1045     t_ilist *ilc_local,*ils_local;
1046     ind_req_t *ireq;
1047     const t_blocka *at2con_mt;
1048     const int **at2settle_mt;
1049     gmx_hash_t ga2la_specat;
1050     int at_end,i,j;
1051     t_iatom *iap;
1052     
1053     dc = dd->constraints;
1054     
1055     ilc_local = &il_local[F_CONSTR];
1056     ils_local = &il_local[F_SETTLE];
1057
1058     dc->ncon      = 0;
1059     ilc_local->nr = 0;
1060     if (dd->constraint_comm)
1061     {
1062         at2con_mt = atom2constraints_moltype(constr);
1063         ireq = &dd->constraint_comm->ireq[0];
1064         ireq->n = 0;
1065     }
1066     else
1067     {
1068         at2con_mt = NULL;
1069         ireq = NULL;
1070     }
1071
1072     if (dd->bInterCGsettles)
1073     {
1074         at2settle_mt = atom2settle_moltype(constr);
1075         ils_local->nr = 0;
1076     }
1077     else
1078     {
1079         /* Settle works inside charge groups, we assigned them already */
1080         at2settle_mt = NULL;
1081     }
1082
1083     if (at2settle_mt == NULL)
1084     {
1085         atoms_to_constraints(dd,mtop,cginfo,at2con_mt,nrec,
1086                              ilc_local,ireq);
1087     }
1088     else
1089     {
1090         int t0_set;
1091         int thread;
1092
1093         /* Do the constraints, if present, on the first thread.
1094          * Do the settles on all other threads.
1095          */
1096         t0_set = ((at2con_mt != NULL && dc->nthread > 1) ? 1 : 0);
1097
1098 #pragma omp parallel for num_threads(dc->nthread) schedule(static)
1099         for(thread=0; thread<dc->nthread; thread++)
1100         {
1101             if (at2con_mt && thread == 0)
1102             {
1103                 atoms_to_constraints(dd,mtop,cginfo,at2con_mt,nrec,
1104                                      ilc_local,ireq);
1105             }
1106
1107             if (thread >= t0_set)
1108             {
1109                 int cg0,cg1;
1110                 t_ilist *ilst;
1111                 ind_req_t *ireqt;
1112
1113                 /* Distribute the settle check+assignments over
1114                  * dc->nthread or dc->nthread-1 threads.
1115                  */
1116                 cg0 = (dd->ncg_home*(thread-t0_set  ))/(dc->nthread-t0_set);
1117                 cg1 = (dd->ncg_home*(thread-t0_set+1))/(dc->nthread-t0_set);
1118
1119                 if (thread == t0_set)
1120                 {
1121                     ilst = ils_local;
1122                 }
1123                 else
1124                 {
1125                     ilst = &dc->ils[thread];
1126                 }
1127                 ilst->nr = 0;
1128
1129                 ireqt = &dd->constraint_comm->ireq[thread];
1130                 if (thread > 0)
1131                 {
1132                     ireqt->n = 0;
1133                 }
1134
1135                 atoms_to_settles(dd,mtop,cginfo,at2settle_mt,
1136                                  cg0,cg1,
1137                                  ilst,ireqt);
1138             }
1139         }
1140
1141         /* Combine the generate settles and requested indices */
1142         for(thread=1; thread<dc->nthread; thread++)
1143         {
1144             t_ilist *ilst;
1145             ind_req_t *ireqt;
1146             int ia;
1147
1148             if (thread > t0_set)
1149             {
1150                 ilst = &dc->ils[thread];
1151                 if (ils_local->nr + ilst->nr > ils_local->nalloc)
1152                 {
1153                     ils_local->nalloc = over_alloc_large(ils_local->nr + ilst->nr);
1154                     srenew(ils_local->iatoms,ils_local->nalloc);
1155                 }
1156                 for(ia=0; ia<ilst->nr; ia++)
1157                 {
1158                     ils_local->iatoms[ils_local->nr+ia] = ilst->iatoms[ia];
1159                 }
1160                 ils_local->nr += ilst->nr;
1161             }
1162
1163             ireqt = &dd->constraint_comm->ireq[thread];
1164             if (ireq->n+ireqt->n > ireq->nalloc)
1165             {
1166                 ireq->nalloc = over_alloc_large(ireq->n+ireqt->n);
1167                 srenew(ireq->ind,ireq->nalloc);
1168             }
1169             for(ia=0; ia<ireqt->n; ia++)
1170             {
1171                 ireq->ind[ireq->n+ia] = ireqt->ind[ia];
1172             }
1173             ireq->n += ireqt->n;
1174         }
1175
1176         if (debug)
1177         {
1178             fprintf(debug,"Settles: total %3d\n",ils_local->nr/4);
1179         }
1180     }
1181
1182     if (dd->constraint_comm) {
1183         int nral1;
1184
1185         at_end =
1186             setup_specat_communication(dd,ireq,dd->constraint_comm,
1187                                        dd->constraints->ga2la,
1188                                        at_start,2,
1189                                        "constraint"," or lincs-order");
1190         
1191         /* Fill in the missing indices */
1192         ga2la_specat = dd->constraints->ga2la;
1193
1194         nral1 = 1 + NRAL(F_CONSTR);
1195         for(i=0; i<ilc_local->nr; i+=nral1)
1196         {
1197             iap = ilc_local->iatoms + i;
1198             for(j=1; j<nral1; j++)
1199             {
1200                 if (iap[j] < 0)
1201                 {
1202                     iap[j] = gmx_hash_get_minone(ga2la_specat,-iap[j]-1);
1203                 }
1204             }
1205         }
1206
1207         nral1 = 1 + NRAL(F_SETTLE);
1208         for(i=0; i<ils_local->nr; i+=nral1)
1209         {
1210             iap = ils_local->iatoms + i;
1211             for(j=1; j<nral1; j++)
1212             {
1213                 if (iap[j] < 0)
1214                 {
1215                     iap[j] = gmx_hash_get_minone(ga2la_specat,-iap[j]-1);
1216                 }
1217             }
1218         }
1219     }
1220     else
1221     {
1222         at_end = at_start;
1223     }
1224     
1225     return at_end;
1226 }
1227
1228 int dd_make_local_vsites(gmx_domdec_t *dd,int at_start,t_ilist *lil)
1229 {
1230     gmx_domdec_specat_comm_t *spac;
1231     ind_req_t *ireq;
1232     gmx_hash_t ga2la_specat;
1233     int  ftype,nral,i,j,gat,a;
1234     t_ilist *lilf;
1235     t_iatom *iatoms;
1236     int  at_end;
1237     
1238     spac         = dd->vsite_comm;
1239     ireq         = &spac->ireq[0];
1240     ga2la_specat = dd->ga2la_vsite;
1241     
1242     ireq->n = 0;
1243     /* Loop over all the home vsites */
1244     for(ftype=0; ftype<F_NRE; ftype++)
1245     {
1246         if (interaction_function[ftype].flags & IF_VSITE)
1247         {
1248             nral = NRAL(ftype);
1249             lilf = &lil[ftype];
1250             for(i=0; i<lilf->nr; i+=1+nral)
1251             {
1252                 iatoms = lilf->iatoms + i;
1253                 /* Check if we have the other atoms */
1254                 for(j=1; j<1+nral; j++)
1255                 {
1256                     if (iatoms[j] < 0) {
1257                         /* This is not a home atom,
1258                          * we need to ask our neighbors.
1259                          */
1260                         a = -iatoms[j] - 1;
1261                         /* Check to not ask for the same atom more than once */
1262                         if (gmx_hash_get_minone(dd->ga2la_vsite,a) == -1)
1263                         {
1264                             /* Add this non-home atom to the list */
1265                             if (ireq->n+1 > ireq->nalloc)
1266                             {
1267                                 ireq->nalloc = over_alloc_large(ireq->n+1);
1268                                 srenew(ireq->ind,ireq->nalloc);
1269                             }
1270                             ireq->ind[ireq->n++] = a;
1271                             /* Temporarily mark with -2,
1272                              * we get the index later.
1273                              */
1274                             gmx_hash_set(ga2la_specat,a,-2);
1275                         }
1276                     }
1277                 }
1278             }
1279         }
1280     }
1281     
1282     at_end = setup_specat_communication(dd,ireq,dd->vsite_comm,ga2la_specat,
1283                                         at_start,1,"vsite","");
1284     
1285     /* Fill in the missing indices */
1286     for(ftype=0; ftype<F_NRE; ftype++)
1287     {
1288         if (interaction_function[ftype].flags & IF_VSITE)
1289         {
1290             nral = NRAL(ftype);
1291             lilf = &lil[ftype];
1292             for(i=0; i<lilf->nr; i+=1+nral)
1293             {
1294                 iatoms = lilf->iatoms + i;
1295                 for(j=1; j<1+nral; j++)
1296                 {
1297                     if (iatoms[j] < 0)
1298                     {
1299                         iatoms[j] = gmx_hash_get_minone(ga2la_specat,-iatoms[j]-1);
1300                     }
1301                 }
1302             }
1303         }
1304     }
1305     
1306     return at_end;
1307 }
1308
1309 static gmx_domdec_specat_comm_t *specat_comm_init(int nthread)
1310 {
1311     gmx_domdec_specat_comm_t *spac;
1312
1313     snew(spac,1);
1314     spac->nthread = nthread;
1315     snew(spac->ireq,spac->nthread);
1316
1317     return spac;
1318 }
1319
1320 void init_domdec_constraints(gmx_domdec_t *dd,
1321                              gmx_mtop_t *mtop,
1322                              gmx_constr_t constr)
1323 {
1324     gmx_domdec_constraints_t *dc;
1325     gmx_molblock_t *molb;
1326     int mb,ncon,c,a;
1327     
1328     if (debug)
1329     {
1330         fprintf(debug,"Begin init_domdec_constraints\n");
1331     }
1332     
1333     snew(dd->constraints,1);
1334     dc = dd->constraints;
1335     
1336     snew(dc->molb_con_offset,mtop->nmolblock);
1337     snew(dc->molb_ncon_mol,mtop->nmolblock);
1338     
1339     ncon = 0;
1340     for(mb=0; mb<mtop->nmolblock; mb++)
1341     {
1342         molb = &mtop->molblock[mb];
1343         dc->molb_con_offset[mb] = ncon;
1344         dc->molb_ncon_mol[mb] =
1345             mtop->moltype[molb->type].ilist[F_CONSTR].nr/3 +
1346             mtop->moltype[molb->type].ilist[F_CONSTRNC].nr/3;
1347         ncon += molb->nmol*dc->molb_ncon_mol[mb];
1348     }
1349     
1350     if (ncon > 0)
1351     {
1352         snew(dc->gc_req,ncon);
1353         for(c=0; c<ncon; c++)
1354         {
1355             dc->gc_req[c] = 0;
1356         }
1357     }
1358
1359     /* Use a hash table for the global to local index.
1360      * The number of keys is a rough estimate, it will be optimized later.
1361      */
1362     dc->ga2la = gmx_hash_init(min(mtop->natoms/20,
1363                                   mtop->natoms/(2*dd->nnodes)));
1364
1365     dc->nthread = gmx_omp_nthreads_get(emntDomdec);
1366     snew(dc->ils,dc->nthread);
1367
1368     dd->constraint_comm = specat_comm_init(dc->nthread);
1369 }
1370
1371 void init_domdec_vsites(gmx_domdec_t *dd,int n_intercg_vsite)
1372 {
1373     int i;
1374     gmx_domdec_constraints_t *dc;
1375     
1376     if (debug)
1377     {
1378         fprintf(debug,"Begin init_domdec_vsites\n");
1379     }
1380     
1381     /* Use a hash table for the global to local index.
1382      * The number of keys is a rough estimate, it will be optimized later.
1383      */
1384     dd->ga2la_vsite = gmx_hash_init(min(n_intercg_vsite/20,
1385                                         n_intercg_vsite/(2*dd->nnodes)));
1386     
1387     dd->vsite_comm = specat_comm_init(1);
1388 }