622d15f5444458c08a2d7d3747b8165b89d5d9b9
[alexxy/gromacs.git] / src / gromacs / mdlib / pme.c
1 /* -*- mode: c; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; c-file-style: "stroustrup"; -*-
2  *
3  * 
4  *                This source code is part of
5  * 
6  *                 G   R   O   M   A   C   S
7  * 
8  *          GROningen MAchine for Chemical Simulations
9  * 
10  *                        VERSION 3.2.0
11  * Written by David van der Spoel, Erik Lindahl, Berk Hess, and others.
12  * Copyright (c) 1991-2000, University of Groningen, The Netherlands.
13  * Copyright (c) 2001-2004, The GROMACS development team,
14  * check out http://www.gromacs.org for more information.
15
16  * This program is free software; you can redistribute it and/or
17  * modify it under the terms of the GNU General Public License
18  * as published by the Free Software Foundation; either version 2
19  * of the License, or (at your option) any later version.
20  * 
21  * If you want to redistribute modifications, please consider that
22  * scientific software is very special. Version control is crucial -
23  * bugs must be traceable. We will be happy to consider code for
24  * inclusion in the official distribution, but derived work must not
25  * be called official GROMACS. Details are found in the README & COPYING
26  * files - if they are missing, get the official version at www.gromacs.org.
27  * 
28  * To help us fund GROMACS development, we humbly ask that you cite
29  * the papers on the package - you can find them in the top README file.
30  * 
31  * For more info, check our website at http://www.gromacs.org
32  * 
33  * And Hey:
34  * GROwing Monsters And Cloning Shrimps
35  */
36 /* IMPORTANT FOR DEVELOPERS:
37  *
38  * Triclinic pme stuff isn't entirely trivial, and we've experienced
39  * some bugs during development (many of them due to me). To avoid
40  * this in the future, please check the following things if you make
41  * changes in this file:
42  *
43  * 1. You should obtain identical (at least to the PME precision)
44  *    energies, forces, and virial for
45  *    a rectangular box and a triclinic one where the z (or y) axis is
46  *    tilted a whole box side. For instance you could use these boxes:
47  *
48  *    rectangular       triclinic
49  *     2  0  0           2  0  0
50  *     0  2  0           0  2  0
51  *     0  0  6           2  2  6
52  *
53  * 2. You should check the energy conservation in a triclinic box.
54  *
55  * It might seem an overkill, but better safe than sorry.
56  * /Erik 001109
57  */ 
58
59 #ifdef HAVE_CONFIG_H
60 #include <config.h>
61 #endif
62
63 #ifdef GMX_LIB_MPI
64 #include <mpi.h>
65 #endif
66 #ifdef GMX_THREADS
67 #include "tmpi.h"
68 #endif
69
70
71 #include <stdio.h>
72 #include <string.h>
73 #include <math.h>
74 #include "typedefs.h"
75 #include "txtdump.h"
76 #include "vec.h"
77 #include "gmxcomplex.h"
78 #include "smalloc.h"
79 #include "futil.h"
80 #include "coulomb.h"
81 #include "gmx_fatal.h"
82 #include "pme.h"
83 #include "network.h"
84 #include "physics.h"
85 #include "nrnb.h"
86 #include "copyrite.h"
87 #include "gmx_wallcycle.h"
88 #include "gmx_parallel_3dfft.h"
89 #include "pdbio.h"
90
91 #if ( !defined(GMX_DOUBLE) && ( defined(GMX_IA32_SSE) || defined(GMX_X86_64_SSE) || defined(GMX_X86_64_SSE2) ) )
92 #include "gmx_sse2_single.h"
93 #endif
94
95 #include "mpelogging.h"
96
97 #define DFT_TOL 1e-7
98 /* #define PRT_FORCE */
99 /* conditions for on the fly time-measurement */
100 /* #define TAKETIME (step > 1 && timesteps < 10) */
101 #define TAKETIME FALSE
102
103 #ifdef GMX_DOUBLE
104 #define mpi_type MPI_DOUBLE
105 #else
106 #define mpi_type MPI_FLOAT
107 #endif
108
109 /* Internal datastructures */
110 typedef struct {
111     int send_index0;
112     int send_nindex;
113     int recv_index0;
114     int recv_nindex;
115 } pme_grid_comm_t;
116
117 typedef struct {
118 #ifdef GMX_MPI
119     MPI_Comm mpi_comm;
120 #endif
121     int  nnodes,nodeid;
122     int  *s2g0;
123     int  *s2g1;
124     int  noverlap_nodes;
125     int  *send_id,*recv_id;
126     pme_grid_comm_t *comm_data;
127 } pme_overlap_t;
128
129 typedef struct {
130     int  dimind;            /* The index of the dimension, 0=x, 1=y */
131     int  nslab;
132     int  nodeid;
133 #ifdef GMX_MPI
134     MPI_Comm mpi_comm;
135 #endif
136
137     int  *node_dest;        /* The nodes to send x and q to with DD */
138     int  *node_src;         /* The nodes to receive x and q from with DD */
139     int  *buf_index;        /* Index for commnode into the buffers */
140
141     int  maxshift;
142
143     int  npd;
144     int  pd_nalloc;
145     int  *pd;
146     int  *count;            /* The number of atoms to send to each node */
147     int  *rcount;           /* The number of atoms to receive */
148
149     int  n;
150     int  nalloc;
151     rvec *x;
152     real *q;
153     rvec *f;
154     gmx_bool bSpread;           /* These coordinates are used for spreading */
155     int  pme_order;
156     splinevec theta,dtheta;
157     ivec *idx;
158     rvec *fractx;            /* Fractional coordinate relative to the
159                               * lower cell boundary 
160                               */
161 } pme_atomcomm_t;
162
163 typedef struct gmx_pme {
164     int  ndecompdim;         /* The number of decomposition dimensions */
165     int  nodeid;             /* Our nodeid in mpi->mpi_comm */
166     int  nodeid_major;
167     int  nodeid_minor;
168     int  nnodes;             /* The number of nodes doing PME */
169     int  nnodes_major;
170     int  nnodes_minor;
171
172     MPI_Comm mpi_comm;
173     MPI_Comm mpi_comm_d[2];  /* Indexed on dimension, 0=x, 1=y */
174 #ifdef GMX_MPI
175     MPI_Datatype  rvec_mpi;  /* the pme vector's MPI type */
176 #endif
177
178     gmx_bool bPPnode;            /* Node also does particle-particle forces */
179     gmx_bool bFEP;               /* Compute Free energy contribution */
180     int nkx,nky,nkz;         /* Grid dimensions */
181     int pme_order;
182     real epsilon_r;           
183     
184     real *  pmegridA;  /* Grids on which we do spreading/interpolation, includes overlap */
185     real *  pmegridB;
186     int     pmegrid_nx,pmegrid_ny,pmegrid_nz;
187     int     pmegrid_start_ix,pmegrid_start_iy,pmegrid_start_iz;    
188     
189     real *  pmegrid_sendbuf;
190     real *  pmegrid_recvbuf;
191     
192     real *fftgridA;             /* Grids for FFT. With 1D FFT decomposition this can be a pointer */
193     real *fftgridB;             /* inside the interpolation grid, but separate for 2D PME decomp. */
194     int   fftgrid_nx,fftgrid_ny,fftgrid_nz;
195     
196     t_complex *cfftgridA;             /* Grids for complex FFT data */
197     t_complex *cfftgridB;            
198     int   cfftgrid_nx,cfftgrid_ny,cfftgrid_nz;
199     
200     gmx_parallel_3dfft_t  pfft_setupA;
201     gmx_parallel_3dfft_t  pfft_setupB;
202     
203     int  *nnx,*nny,*nnz;
204     real *fshx,*fshy,*fshz;
205     
206     pme_atomcomm_t atc[2];  /* Indexed on decomposition index */
207     matrix    recipbox;
208     splinevec bsp_mod;
209     
210     pme_overlap_t overlap[2]; /* Indexed on dimension, 0=x, 1=y */
211
212
213     pme_atomcomm_t atc_energy; /* Only for gmx_pme_calc_energy */
214     
215     rvec *bufv;             /* Communication buffer */
216     real *bufr;             /* Communication buffer */
217     int  buf_nalloc;        /* The communication buffer size */
218
219     /* work data for solve_pme */
220     int      work_nalloc;
221     real *   work_mhx;
222     real *   work_mhy;
223     real *   work_mhz;
224     real *   work_m2;
225     real *   work_denom;
226     real *   work_tmp1_alloc;
227     real *   work_tmp1;
228     real *   work_m2inv;
229
230     /* Work data for PME_redist */
231     gmx_bool     redist_init;
232     int *    scounts; 
233     int *    rcounts;
234     int *    sdispls;
235     int *    rdispls;
236     int *    sidx;
237     int *    idxa;    
238     real *   redist_buf;
239     int      redist_buf_nalloc;
240     
241     /* Work data for sum_qgrid */
242     real *   sum_qgrid_tmp;
243     real *   sum_qgrid_dd_tmp;
244 } t_gmx_pme;
245
246
247 static void calc_interpolation_idx(gmx_pme_t pme,pme_atomcomm_t *atc)
248 {
249     int  i;
250     int  *idxptr,tix,tiy,tiz;
251     real *xptr,*fptr,tx,ty,tz;
252     real rxx,ryx,ryy,rzx,rzy,rzz;
253     int  nx,ny,nz;
254     int  start_ix,start_iy,start_iz;
255     
256     nx  = pme->nkx;
257     ny  = pme->nky;
258     nz  = pme->nkz;
259     
260     start_ix = pme->pmegrid_start_ix;
261     start_iy = pme->pmegrid_start_iy;
262     start_iz = pme->pmegrid_start_iz;
263     
264     rxx = pme->recipbox[XX][XX];
265     ryx = pme->recipbox[YY][XX];
266     ryy = pme->recipbox[YY][YY];
267     rzx = pme->recipbox[ZZ][XX];
268     rzy = pme->recipbox[ZZ][YY];
269     rzz = pme->recipbox[ZZ][ZZ];
270     
271     for(i=0; (i<atc->n); i++) {
272         xptr   = atc->x[i];
273         idxptr = atc->idx[i];
274         fptr   = atc->fractx[i];
275         
276         /* Fractional coordinates along box vectors, add 2.0 to make 100% sure we are positive for triclinic boxes */
277         tx = nx * ( xptr[XX] * rxx + xptr[YY] * ryx + xptr[ZZ] * rzx + 2.0 );
278         ty = ny * (                  xptr[YY] * ryy + xptr[ZZ] * rzy + 2.0 );
279         tz = nz * (                                   xptr[ZZ] * rzz + 2.0 );
280         
281         tix = (int)(tx);
282         tiy = (int)(ty);
283         tiz = (int)(tz);
284         
285         /* Because decomposition only occurs in x and y,
286          * we never have a fraction correction in z.
287          */
288         fptr[XX] = tx - tix + pme->fshx[tix];
289         fptr[YY] = ty - tiy + pme->fshy[tiy];
290         fptr[ZZ] = tz - tiz;   
291
292         idxptr[XX] = pme->nnx[tix];
293         idxptr[YY] = pme->nny[tiy];
294         idxptr[ZZ] = pme->nnz[tiz];
295
296 #ifdef DEBUG
297         range_check(idxptr[XX],0,pme->pmegrid_nx);
298         range_check(idxptr[YY],0,pme->pmegrid_ny);
299         range_check(idxptr[ZZ],0,pme->pmegrid_nz);
300 #endif
301   }  
302 }
303
304 static void pme_calc_pidx(int natoms, matrix recipbox, rvec x[],
305                           pme_atomcomm_t *atc)
306 {
307     int  nslab,i;
308     int  si;
309     real *xptr,s;
310     real rxx,ryx,rzx,ryy,rzy;
311     int *pd,*count;
312
313     /* Calculate PME task index (pidx) for each grid index.
314      * Here we always assign equally sized slabs to each node
315      * for load balancing reasons (the PME grid spacing is not used).
316      */
317     
318     nslab = atc->nslab;
319     pd    = atc->pd;
320     count = atc->count;
321
322     /* Reset the count */
323     for(i=0; i<nslab; i++)
324     {
325         count[i] = 0;
326     }
327     
328     if (atc->dimind == 0)
329     {
330         rxx = recipbox[XX][XX];
331         ryx = recipbox[YY][XX];
332         rzx = recipbox[ZZ][XX];
333         /* Calculate the node index in x-dimension */
334         for(i=0; (i<natoms); i++)
335         {
336             xptr   = x[i];
337             /* Fractional coordinates along box vectors */
338             s = nslab*(xptr[XX]*rxx + xptr[YY]*ryx + xptr[ZZ]*rzx);
339             si = (int)(s + 2*nslab) % nslab;
340             pd[i] = si;
341             count[si]++;
342         }
343     }
344     else
345     {
346         ryy = recipbox[YY][YY];
347         rzy = recipbox[ZZ][YY];
348         /* Calculate the node index in y-dimension */
349         for(i=0; (i<natoms); i++)
350         {
351             xptr   = x[i];
352             /* Fractional coordinates along box vectors */
353             s = nslab*(xptr[YY]*ryy + xptr[ZZ]*rzy);
354             si = (int)(s + 2*nslab) % nslab;
355             pd[i] = si;
356             count[si]++;
357         }
358     }
359 }
360
361 static void pme_realloc_atomcomm_things(pme_atomcomm_t *atc)
362 {
363     int nalloc_old,i;
364     
365     /* We have to avoid a NULL pointer for atc->x to avoid
366      * possible fatal errors in MPI routines.
367      */
368     if (atc->n > atc->nalloc || atc->nalloc == 0)
369     {
370         nalloc_old = atc->nalloc;
371         atc->nalloc = over_alloc_dd(max(atc->n,1));
372         
373         if (atc->nslab > 1) {
374             srenew(atc->x,atc->nalloc);
375             srenew(atc->q,atc->nalloc);
376             srenew(atc->f,atc->nalloc);
377             for(i=nalloc_old; i<atc->nalloc; i++)
378             {
379                 clear_rvec(atc->f[i]);
380             }
381         }
382         if (atc->bSpread) {
383             for(i=0;i<DIM;i++) {
384                 srenew(atc->theta[i] ,atc->pme_order*atc->nalloc); 
385                 srenew(atc->dtheta[i],atc->pme_order*atc->nalloc);
386             }
387             srenew(atc->fractx,atc->nalloc); 
388             srenew(atc->idx   ,atc->nalloc);
389         }
390     }
391 }
392
393 static void pmeredist_pd(gmx_pme_t pme, gmx_bool forw,
394                          int n, gmx_bool bXF, rvec *x_f, real *charge,
395                          pme_atomcomm_t *atc)
396 /* Redistribute particle data for PME calculation */
397 /* domain decomposition by x coordinate           */
398 {
399     int *idxa;
400     int i, ii;
401     
402     if(FALSE == pme->redist_init) {
403         snew(pme->scounts,atc->nslab);
404         snew(pme->rcounts,atc->nslab);
405         snew(pme->sdispls,atc->nslab);
406         snew(pme->rdispls,atc->nslab);
407         snew(pme->sidx,atc->nslab);
408         pme->redist_init = TRUE;
409     }
410     if (n > pme->redist_buf_nalloc) {
411         pme->redist_buf_nalloc = over_alloc_dd(n);
412         srenew(pme->redist_buf,pme->redist_buf_nalloc*DIM);
413     }
414     
415     pme->idxa = atc->pd;
416
417 #ifdef GMX_MPI
418     if (forw && bXF) {
419         /* forward, redistribution from pp to pme */ 
420         
421         /* Calculate send counts and exchange them with other nodes */
422         for(i=0; (i<atc->nslab); i++) pme->scounts[i]=0;
423         for(i=0; (i<n); i++) pme->scounts[pme->idxa[i]]++;
424         MPI_Alltoall( pme->scounts, 1, MPI_INT, pme->rcounts, 1, MPI_INT, atc->mpi_comm);
425         
426         /* Calculate send and receive displacements and index into send 
427            buffer */
428         pme->sdispls[0]=0;
429         pme->rdispls[0]=0;
430         pme->sidx[0]=0;
431         for(i=1; i<atc->nslab; i++) {
432             pme->sdispls[i]=pme->sdispls[i-1]+pme->scounts[i-1];
433             pme->rdispls[i]=pme->rdispls[i-1]+pme->rcounts[i-1];
434             pme->sidx[i]=pme->sdispls[i];
435         }
436         /* Total # of particles to be received */
437         atc->n = pme->rdispls[atc->nslab-1] + pme->rcounts[atc->nslab-1];
438         
439         pme_realloc_atomcomm_things(atc);
440         
441         /* Copy particle coordinates into send buffer and exchange*/
442         for(i=0; (i<n); i++) {
443             ii=DIM*pme->sidx[pme->idxa[i]];
444             pme->sidx[pme->idxa[i]]++;
445             pme->redist_buf[ii+XX]=x_f[i][XX];
446             pme->redist_buf[ii+YY]=x_f[i][YY];
447             pme->redist_buf[ii+ZZ]=x_f[i][ZZ];
448         }
449         MPI_Alltoallv(pme->redist_buf, pme->scounts, pme->sdispls, 
450                       pme->rvec_mpi, atc->x, pme->rcounts, pme->rdispls, 
451                       pme->rvec_mpi, atc->mpi_comm);
452     }
453     if (forw) {
454         /* Copy charge into send buffer and exchange*/
455         for(i=0; i<atc->nslab; i++) pme->sidx[i]=pme->sdispls[i];
456         for(i=0; (i<n); i++) {
457             ii=pme->sidx[pme->idxa[i]];
458             pme->sidx[pme->idxa[i]]++;
459             pme->redist_buf[ii]=charge[i];
460         }
461         MPI_Alltoallv(pme->redist_buf, pme->scounts, pme->sdispls, mpi_type,
462                       atc->q, pme->rcounts, pme->rdispls, mpi_type,
463                       atc->mpi_comm);
464     }
465     else { /* backward, redistribution from pme to pp */ 
466         MPI_Alltoallv(atc->f, pme->rcounts, pme->rdispls, pme->rvec_mpi,
467                       pme->redist_buf, pme->scounts, pme->sdispls, 
468                       pme->rvec_mpi, atc->mpi_comm);
469         
470         /* Copy data from receive buffer */
471         for(i=0; i<atc->nslab; i++)
472             pme->sidx[i] = pme->sdispls[i];
473         for(i=0; (i<n); i++) {
474             ii = DIM*pme->sidx[pme->idxa[i]];
475             x_f[i][XX] += pme->redist_buf[ii+XX];
476             x_f[i][YY] += pme->redist_buf[ii+YY];
477             x_f[i][ZZ] += pme->redist_buf[ii+ZZ];
478             pme->sidx[pme->idxa[i]]++;
479         }
480     }
481 #endif 
482 }
483
484 static void pme_dd_sendrecv(pme_atomcomm_t *atc,
485                             gmx_bool bBackward,int shift,
486                             void *buf_s,int nbyte_s,
487                             void *buf_r,int nbyte_r)
488 {
489 #ifdef GMX_MPI
490     int dest,src;
491     MPI_Status stat;
492     
493     if (bBackward == FALSE) {
494         dest = atc->node_dest[shift];
495         src  = atc->node_src[shift];
496     } else {
497         dest = atc->node_src[shift];
498         src  = atc->node_dest[shift];
499     }
500     
501     if (nbyte_s > 0 && nbyte_r > 0) {
502         MPI_Sendrecv(buf_s,nbyte_s,MPI_BYTE,
503                      dest,shift,
504                      buf_r,nbyte_r,MPI_BYTE,
505                      src,shift,
506                      atc->mpi_comm,&stat);
507     } else if (nbyte_s > 0) {
508         MPI_Send(buf_s,nbyte_s,MPI_BYTE,
509                  dest,shift,
510                  atc->mpi_comm);
511     } else if (nbyte_r > 0) {
512         MPI_Recv(buf_r,nbyte_r,MPI_BYTE,
513                  src,shift,
514                  atc->mpi_comm,&stat);
515     }
516 #endif
517 }
518
519 static void dd_pmeredist_x_q(gmx_pme_t pme, 
520                              int n, gmx_bool bX, rvec *x, real *charge,
521                              pme_atomcomm_t *atc)
522 {
523     int *commnode,*buf_index;
524     int nnodes_comm,i,nsend,local_pos,buf_pos,node,scount,rcount;
525     
526     commnode  = atc->node_dest;
527     buf_index = atc->buf_index;
528     
529     nnodes_comm = min(2*atc->maxshift,atc->nslab-1);
530     
531     nsend = 0;
532     for(i=0; i<nnodes_comm; i++) {
533         buf_index[commnode[i]] = nsend;
534         nsend += atc->count[commnode[i]];
535     }
536     if (bX) {
537         if (atc->count[atc->nodeid] + nsend != n)
538             gmx_fatal(FARGS,"%d particles communicated to PME node %d are more than 2/3 times the cut-off out of the domain decomposition cell of their charge group in dimension %c.\n"
539                       "This usually means that your system is not well equilibrated.",
540                       n - (atc->count[atc->nodeid] + nsend),
541                       pme->nodeid,'x'+atc->dimind);
542         
543         if (nsend > pme->buf_nalloc) {
544             pme->buf_nalloc = over_alloc_dd(nsend);
545             srenew(pme->bufv,pme->buf_nalloc);
546             srenew(pme->bufr,pme->buf_nalloc);
547         }
548         
549         atc->n = atc->count[atc->nodeid];
550         for(i=0; i<nnodes_comm; i++) {
551             scount = atc->count[commnode[i]];
552             /* Communicate the count */
553             if (debug)
554                 fprintf(debug,"dimind %d PME node %d send to node %d: %d\n",
555                         atc->dimind,atc->nodeid,commnode[i],scount);
556             pme_dd_sendrecv(atc,FALSE,i,
557                             &scount,sizeof(int),
558                             &atc->rcount[i],sizeof(int));
559             atc->n += atc->rcount[i];
560         }
561         
562         pme_realloc_atomcomm_things(atc);
563     }
564     
565     local_pos = 0;
566     for(i=0; i<n; i++) {
567         node = atc->pd[i];
568         if (node == atc->nodeid) {
569             /* Copy direct to the receive buffer */
570             if (bX) {
571                 copy_rvec(x[i],atc->x[local_pos]);
572             }
573             atc->q[local_pos] = charge[i];
574             local_pos++;
575         } else {
576             /* Copy to the send buffer */
577             if (bX) {
578                 copy_rvec(x[i],pme->bufv[buf_index[node]]);
579             }
580             pme->bufr[buf_index[node]] = charge[i];
581             buf_index[node]++;
582         }
583     }
584     
585     buf_pos = 0;
586     for(i=0; i<nnodes_comm; i++) {
587         scount = atc->count[commnode[i]];
588         rcount = atc->rcount[i];
589         if (scount > 0 || rcount > 0) {
590             if (bX) {
591                 /* Communicate the coordinates */
592                 pme_dd_sendrecv(atc,FALSE,i,
593                                 pme->bufv[buf_pos],scount*sizeof(rvec),
594                                 atc->x[local_pos],rcount*sizeof(rvec));
595             }
596             /* Communicate the charges */
597             pme_dd_sendrecv(atc,FALSE,i,
598                             pme->bufr+buf_pos,scount*sizeof(real),
599                             atc->q+local_pos,rcount*sizeof(real));
600             buf_pos   += scount;
601             local_pos += atc->rcount[i];
602         }
603     }
604 }
605
606 static void dd_pmeredist_f(gmx_pme_t pme, pme_atomcomm_t *atc,
607                            int n, rvec *f,
608                            gmx_bool bAddF)
609 {
610   int *commnode,*buf_index;
611   int nnodes_comm,local_pos,buf_pos,i,scount,rcount,node;
612
613   commnode  = atc->node_dest;
614   buf_index = atc->buf_index;
615
616   nnodes_comm = min(2*atc->maxshift,atc->nslab-1);
617
618   local_pos = atc->count[atc->nodeid];
619   buf_pos = 0;
620   for(i=0; i<nnodes_comm; i++) {
621     scount = atc->rcount[i];
622     rcount = atc->count[commnode[i]];
623     if (scount > 0 || rcount > 0) {
624       /* Communicate the forces */
625       pme_dd_sendrecv(atc,TRUE,i,
626                       atc->f[local_pos],scount*sizeof(rvec),
627                       pme->bufv[buf_pos],rcount*sizeof(rvec));
628       local_pos += scount;
629     }
630     buf_index[commnode[i]] = buf_pos;
631     buf_pos   += rcount;
632   }
633
634     local_pos = 0;
635     if (bAddF)
636     {
637         for(i=0; i<n; i++)
638         {
639             node = atc->pd[i];
640             if (node == atc->nodeid)
641             {
642                 /* Add from the local force array */
643                 rvec_inc(f[i],atc->f[local_pos]);
644                 local_pos++;
645             }
646             else
647             {
648                 /* Add from the receive buffer */
649                 rvec_inc(f[i],pme->bufv[buf_index[node]]);
650                 buf_index[node]++;
651             }
652         }
653     }
654     else
655     {
656         for(i=0; i<n; i++)
657         {
658             node = atc->pd[i];
659             if (node == atc->nodeid)
660             {
661                 /* Copy from the local force array */
662                 copy_rvec(atc->f[local_pos],f[i]);
663                 local_pos++;
664             }
665             else
666             {
667                 /* Copy from the receive buffer */
668                 copy_rvec(pme->bufv[buf_index[node]],f[i]);
669                 buf_index[node]++;
670             }
671         }
672     }
673 }
674
675 #ifdef GMX_MPI
676 static void 
677 gmx_sum_qgrid_dd(gmx_pme_t pme, real *grid, int direction)
678 {
679     pme_overlap_t *overlap;
680     int send_index0,send_nindex;
681     int recv_index0,recv_nindex;
682     MPI_Status stat;
683     int i,j,k,ix,iy,iz,icnt;
684     int ipulse,send_id,recv_id,datasize;
685     real *p;
686     real *sendptr,*recvptr;
687     
688     /* Start with minor-rank communication. This is a bit of a pain since it is not contiguous */
689     overlap = &pme->overlap[1];
690     
691     for(ipulse=0;ipulse<overlap->noverlap_nodes;ipulse++)
692     {
693         /* Since we have already (un)wrapped the overlap in the z-dimension,
694          * we only have to communicate 0 to nkz (not pmegrid_nz).
695          */
696         if (direction==GMX_SUM_QGRID_FORWARD)
697         {
698             send_id = overlap->send_id[ipulse];
699             recv_id = overlap->recv_id[ipulse];
700             send_index0   = overlap->comm_data[ipulse].send_index0;
701             send_nindex   = overlap->comm_data[ipulse].send_nindex;
702             recv_index0   = overlap->comm_data[ipulse].recv_index0;
703             recv_nindex   = overlap->comm_data[ipulse].recv_nindex;
704         }
705         else
706         {
707             send_id = overlap->recv_id[ipulse];
708             recv_id = overlap->send_id[ipulse];
709             send_index0   = overlap->comm_data[ipulse].recv_index0;
710             send_nindex   = overlap->comm_data[ipulse].recv_nindex;            
711             recv_index0   = overlap->comm_data[ipulse].send_index0;
712             recv_nindex   = overlap->comm_data[ipulse].send_nindex;
713         }
714
715         /* Copy data to contiguous send buffer */
716         if (debug)
717         {
718             fprintf(debug,"PME send node %d %d -> %d grid start %d Communicating %d to %d\n",
719                     pme->nodeid,overlap->nodeid,send_id,
720                     pme->pmegrid_start_iy,
721                     send_index0-pme->pmegrid_start_iy,
722                     send_index0-pme->pmegrid_start_iy+send_nindex);
723         }
724         icnt = 0;
725         for(i=0;i<pme->pmegrid_nx;i++)
726         {
727             ix = i;
728             for(j=0;j<send_nindex;j++)
729             {
730                 iy = j + send_index0 - pme->pmegrid_start_iy;
731                 for(k=0;k<pme->nkz;k++)
732                 {
733                     iz = k;
734                     pme->pmegrid_sendbuf[icnt++] = grid[ix*(pme->pmegrid_ny*pme->pmegrid_nz)+iy*(pme->pmegrid_nz)+iz];
735                 }
736             }
737         }
738             
739         datasize      = pme->pmegrid_nx * pme->nkz;
740         
741         MPI_Sendrecv(pme->pmegrid_sendbuf,send_nindex*datasize,GMX_MPI_REAL,
742                      send_id,ipulse,
743                      pme->pmegrid_recvbuf,recv_nindex*datasize,GMX_MPI_REAL,
744                      recv_id,ipulse,
745                      overlap->mpi_comm,&stat);
746         
747         /* Get data from contiguous recv buffer */
748         if (debug)
749         {
750             fprintf(debug,"PME recv node %d %d <- %d grid start %d Communicating %d to %d\n",
751                     pme->nodeid,overlap->nodeid,recv_id,
752                     pme->pmegrid_start_iy,
753                     recv_index0-pme->pmegrid_start_iy,
754                     recv_index0-pme->pmegrid_start_iy+recv_nindex);
755         }
756         icnt = 0;
757         for(i=0;i<pme->pmegrid_nx;i++)
758         {
759             ix = i;
760             for(j=0;j<recv_nindex;j++)
761             {
762                 iy = j + recv_index0 - pme->pmegrid_start_iy;
763                 for(k=0;k<pme->nkz;k++)
764                 {
765                     iz = k;
766                     if(direction==GMX_SUM_QGRID_FORWARD)
767                     {
768                         grid[ix*(pme->pmegrid_ny*pme->pmegrid_nz)+iy*(pme->pmegrid_nz)+iz] += pme->pmegrid_recvbuf[icnt++];
769                     }
770                     else
771                     {
772                         grid[ix*(pme->pmegrid_ny*pme->pmegrid_nz)+iy*(pme->pmegrid_nz)+iz]  = pme->pmegrid_recvbuf[icnt++];
773                     }
774                 }
775             }
776         }
777     }
778     
779     /* Major dimension is easier, no copying required,
780      * but we might have to sum to separate array.
781      * Since we don't copy, we have to communicate up to pmegrid_nz,
782      * not nkz as for the minor direction.
783      */
784     overlap = &pme->overlap[0];
785     
786     for(ipulse=0;ipulse<overlap->noverlap_nodes;ipulse++)
787     {
788         if(direction==GMX_SUM_QGRID_FORWARD)
789         {
790             send_id = overlap->send_id[ipulse];
791             recv_id = overlap->recv_id[ipulse];
792             send_index0   = overlap->comm_data[ipulse].send_index0;
793             send_nindex   = overlap->comm_data[ipulse].send_nindex;
794             recv_index0   = overlap->comm_data[ipulse].recv_index0;
795             recv_nindex   = overlap->comm_data[ipulse].recv_nindex;
796             recvptr   = pme->pmegrid_recvbuf;
797         }
798         else
799         {
800             send_id = overlap->recv_id[ipulse];
801             recv_id = overlap->send_id[ipulse];
802             send_index0   = overlap->comm_data[ipulse].recv_index0;
803             send_nindex   = overlap->comm_data[ipulse].recv_nindex;            
804             recv_index0   = overlap->comm_data[ipulse].send_index0;
805             recv_nindex   = overlap->comm_data[ipulse].send_nindex;
806             recvptr   = grid + (recv_index0-pme->pmegrid_start_ix)*(pme->pmegrid_ny*pme->pmegrid_nz);
807         }
808                 
809         sendptr       = grid + (send_index0-pme->pmegrid_start_ix)*(pme->pmegrid_ny*pme->pmegrid_nz);
810         datasize      = pme->pmegrid_ny * pme->pmegrid_nz;
811
812         if (debug)
813         {
814             fprintf(debug,"PME send node %d %d -> %d grid start %d Communicating %d to %d\n",
815                     pme->nodeid,overlap->nodeid,send_id,
816                     pme->pmegrid_start_ix,
817                     send_index0-pme->pmegrid_start_ix,
818                     send_index0-pme->pmegrid_start_ix+send_nindex);
819             fprintf(debug,"PME recv node %d %d <- %d grid start %d Communicating %d to %d\n",
820                     pme->nodeid,overlap->nodeid,recv_id,
821                     pme->pmegrid_start_ix,
822                     recv_index0-pme->pmegrid_start_ix,
823                     recv_index0-pme->pmegrid_start_ix+recv_nindex);
824         }
825
826         MPI_Sendrecv(sendptr,send_nindex*datasize,GMX_MPI_REAL,
827                      send_id,ipulse,
828                      recvptr,recv_nindex*datasize,GMX_MPI_REAL,
829                      recv_id,ipulse,
830                      overlap->mpi_comm,&stat);
831         
832         /* ADD data from contiguous recv buffer */
833         if(direction==GMX_SUM_QGRID_FORWARD)
834         {        
835             p = grid + (recv_index0-pme->pmegrid_start_ix)*(pme->pmegrid_ny*pme->pmegrid_nz);
836             for(i=0;i<recv_nindex*datasize;i++)
837             {
838                 p[i] += pme->pmegrid_recvbuf[i];
839             }
840         }
841     }
842 }
843 #endif
844
845
846 static int
847 copy_pmegrid_to_fftgrid(gmx_pme_t pme, real *pmegrid, real *fftgrid)
848 {
849     ivec    local_fft_ndata,local_fft_offset,local_fft_size;
850     ivec    local_pme_size;
851     int     i,ix,iy,iz;
852     int     pmeidx,fftidx;
853
854     /* Dimensions should be identical for A/B grid, so we just use A here */
855     gmx_parallel_3dfft_real_limits(pme->pfft_setupA,
856                                    local_fft_ndata,
857                                    local_fft_offset,
858                                    local_fft_size);
859     
860     local_pme_size[0] = pme->pmegrid_nx;
861     local_pme_size[1] = pme->pmegrid_ny;
862     local_pme_size[2] = pme->pmegrid_nz;
863     
864     /* The fftgrid is always 'justified' to the lower-left corner of the PME grid, 
865      the offset is identical, and the PME grid always has more data (due to overlap)
866      */
867     {
868 #ifdef DEBUG_PME
869         FILE *fp,*fp2;
870         char fn[STRLEN],format[STRLEN];
871         real val;
872         sprintf(fn,"pmegrid%d.pdb",pme->nodeid);
873         fp = ffopen(fn,"w");
874         sprintf(fn,"pmegrid%d.txt",pme->nodeid);
875         fp2 = ffopen(fn,"w");
876      sprintf(format,"%s%s\n",pdbformat,"%6.2f%6.2f");
877 #endif
878     for(ix=0;ix<local_fft_ndata[XX];ix++)
879     {
880         for(iy=0;iy<local_fft_ndata[YY];iy++)
881         {
882             for(iz=0;iz<local_fft_ndata[ZZ];iz++)
883             {
884                 pmeidx = ix*(local_pme_size[YY]*local_pme_size[ZZ])+iy*(local_pme_size[ZZ])+iz;
885                 fftidx = ix*(local_fft_size[YY]*local_fft_size[ZZ])+iy*(local_fft_size[ZZ])+iz;
886                 fftgrid[fftidx] = pmegrid[pmeidx];
887 #ifdef DEBUG_PME
888                 val = 100*pmegrid[pmeidx];
889                 if (pmegrid[pmeidx] != 0)
890                 fprintf(fp,format,"ATOM",pmeidx,"CA","GLY",' ',pmeidx,' ',
891                         5.0*ix,5.0*iy,5.0*iz,1.0,val);
892                 if (pmegrid[pmeidx] != 0)
893                     fprintf(fp2,"%-12s  %5d  %5d  %5d  %12.5e\n",
894                             "qgrid",
895                             pme->pmegrid_start_ix + ix,
896                             pme->pmegrid_start_iy + iy,
897                             pme->pmegrid_start_iz + iz,
898                             pmegrid[pmeidx]);
899 #endif
900             }
901         }
902     }
903 #ifdef DEBUG_PME
904     fclose(fp);
905     fclose(fp2);
906 #endif
907     }
908     return 0;
909 }
910
911
912 static int
913 copy_fftgrid_to_pmegrid(gmx_pme_t pme, real *fftgrid, real *pmegrid)
914 {
915     ivec    local_fft_ndata,local_fft_offset,local_fft_size;
916     ivec    local_pme_size;
917     int     i,ix,iy,iz;
918     int     pmeidx,fftidx;
919     
920     /* Dimensions should be identical for A/B grid, so we just use A here */
921     gmx_parallel_3dfft_real_limits(pme->pfft_setupA,
922                                    local_fft_ndata,
923                                    local_fft_offset,
924                                    local_fft_size);
925
926     local_pme_size[0] = pme->pmegrid_nx;
927     local_pme_size[1] = pme->pmegrid_ny;
928     local_pme_size[2] = pme->pmegrid_nz;
929     
930     /* The fftgrid is always 'justified' to the lower-left corner of the PME grid, 
931      the offset is identical, and the PME grid always has more data (due to overlap)
932      */
933     for(ix=0;ix<local_fft_ndata[XX];ix++)
934     {
935         for(iy=0;iy<local_fft_ndata[YY];iy++)
936         {
937             for(iz=0;iz<local_fft_ndata[ZZ];iz++)
938             {
939                 pmeidx = ix*(local_pme_size[YY]*local_pme_size[ZZ])+iy*(local_pme_size[ZZ])+iz;
940                 fftidx = ix*(local_fft_size[YY]*local_fft_size[ZZ])+iy*(local_fft_size[ZZ])+iz;
941                 pmegrid[pmeidx] = fftgrid[fftidx];
942             }
943         }
944     }   
945     return 0;
946 }
947
948
949 static void
950 wrap_periodic_pmegrid(gmx_pme_t pme, real *pmegrid)
951 {
952     int     nx,ny,nz,pnx,pny,pnz,ny_x,overlap,ix,iy,iz;
953
954     nx = pme->nkx;
955     ny = pme->nky;
956     nz = pme->nkz;
957
958     pnx = pme->pmegrid_nx;
959     pny = pme->pmegrid_ny;
960     pnz = pme->pmegrid_nz;
961
962     overlap = pme->pme_order - 1;
963
964     /* Add periodic overlap in z */
965     for(ix=0; ix<pnx; ix++)
966     {
967         for(iy=0; iy<pny; iy++)
968         {
969             for(iz=0; iz<overlap; iz++)
970             {
971                 pmegrid[(ix*pny+iy)*pnz+iz] +=
972                     pmegrid[(ix*pny+iy)*pnz+nz+iz];
973             }
974         }
975     }
976
977     if (pme->nnodes_minor == 1)
978     {
979        for(ix=0; ix<pnx; ix++)
980        {
981            for(iy=0; iy<overlap; iy++)
982            {
983                for(iz=0; iz<nz; iz++)
984                {
985                    pmegrid[(ix*pny+iy)*pnz+iz] +=
986                        pmegrid[(ix*pny+ny+iy)*pnz+iz];
987                }
988            }
989        }
990     }
991      
992     if (pme->nnodes_major == 1)
993     {
994         ny_x = (pme->nnodes_minor == 1 ? ny : pny);
995
996         for(ix=0; ix<overlap; ix++)
997         {
998             for(iy=0; iy<ny_x; iy++)
999             {
1000                 for(iz=0; iz<nz; iz++)
1001                 {
1002                     pmegrid[(ix*pny+iy)*pnz+iz] +=
1003                         pmegrid[((nx+ix)*pny+iy)*pnz+iz];
1004                 }
1005             }
1006         }
1007     }
1008 }
1009
1010
1011 static void
1012 unwrap_periodic_pmegrid(gmx_pme_t pme, real *pmegrid)
1013 {
1014     int     nx,ny,nz,pnx,pny,pnz,ny_x,overlap,ix,iy,iz;
1015
1016     nx = pme->nkx;
1017     ny = pme->nky;
1018     nz = pme->nkz;
1019
1020     pnx = pme->pmegrid_nx;
1021     pny = pme->pmegrid_ny;
1022     pnz = pme->pmegrid_nz;
1023
1024     overlap = pme->pme_order - 1;
1025
1026     if (pme->nnodes_major == 1)
1027     {
1028         ny_x = (pme->nnodes_minor == 1 ? ny : pny);
1029
1030         for(ix=0; ix<overlap; ix++)
1031         {
1032             for(iy=0; iy<ny_x; iy++)
1033             {
1034                 for(iz=0; iz<nz; iz++)
1035                 {
1036                     pmegrid[((nx+ix)*pny+iy)*pnz+iz] =
1037                         pmegrid[(ix*pny+iy)*pnz+iz];
1038                 }
1039             }
1040         }
1041     }
1042
1043     if (pme->nnodes_minor == 1)
1044     {
1045        for(ix=0; ix<pnx; ix++)
1046        {
1047            for(iy=0; iy<overlap; iy++)
1048            {
1049                for(iz=0; iz<nz; iz++)
1050                {
1051                    pmegrid[(ix*pny+ny+iy)*pnz+iz] =
1052                        pmegrid[(ix*pny+iy)*pnz+iz];
1053                }
1054            }
1055        }
1056     }
1057
1058     /* Copy periodic overlap in z */
1059     for(ix=0; ix<pnx; ix++)
1060     {
1061         for(iy=0; iy<pny; iy++)
1062         {
1063             for(iz=0; iz<overlap; iz++)
1064             {
1065                 pmegrid[(ix*pny+iy)*pnz+nz+iz] =
1066                     pmegrid[(ix*pny+iy)*pnz+iz];
1067             }
1068         }
1069     }
1070 }
1071
1072
1073 /* This has to be a macro to enable full compiler optimization with xlC (and probably others too) */
1074 #define DO_BSPLINE(order)                            \
1075 for(ithx=0; (ithx<order); ithx++)                    \
1076 {                                                    \
1077     index_x = (i0+ithx)*pny*pnz;                     \
1078     valx    = qn*thx[ithx];                          \
1079                                                      \
1080     for(ithy=0; (ithy<order); ithy++)                \
1081     {                                                \
1082         valxy    = valx*thy[ithy];                   \
1083         index_xy = index_x+(j0+ithy)*pnz;            \
1084                                                      \
1085         for(ithz=0; (ithz<order); ithz++)            \
1086         {                                            \
1087             index_xyz        = index_xy+(k0+ithz);   \
1088             grid[index_xyz] += valxy*thz[ithz];      \
1089         }                                            \
1090     }                                                \
1091 }
1092
1093
1094 static void spread_q_bsplines(gmx_pme_t pme, pme_atomcomm_t *atc, 
1095                               real *grid)
1096 {
1097
1098     /* spread charges from home atoms to local grid */
1099     pme_overlap_t *ol;
1100     int      b,i,nn,n,ithx,ithy,ithz,i0,j0,k0;
1101     int *    idxptr;
1102     int      order,norder,index_x,index_xy,index_xyz;
1103     real     valx,valxy,qn;
1104     real     *thx,*thy,*thz;
1105     int      localsize, bndsize;
1106   
1107     int      pnx,pny,pnz,ndatatot;
1108   
1109     pnx = pme->pmegrid_nx;
1110     pny = pme->pmegrid_ny;
1111     pnz = pme->pmegrid_nz;
1112     ndatatot = pnx*pny*pnz;
1113     
1114     for(i=0;i<ndatatot;i++)
1115     {
1116         grid[i] = 0;
1117     }
1118
1119     order = pme->pme_order;
1120
1121     for(nn=0; (nn<atc->n);nn++) 
1122     {
1123         n      = nn;
1124         qn     = atc->q[n];
1125
1126         if (qn != 0) 
1127         {
1128             idxptr = atc->idx[n];
1129             norder = n*order;
1130             
1131             i0   = idxptr[XX]; 
1132             j0   = idxptr[YY];
1133             k0   = idxptr[ZZ];
1134             thx = atc->theta[XX] + norder;
1135             thy = atc->theta[YY] + norder;
1136             thz = atc->theta[ZZ] + norder;
1137             
1138             switch (order) {
1139             case 4:  DO_BSPLINE(4);     break;
1140             case 5:  DO_BSPLINE(5);     break;
1141             default: DO_BSPLINE(order); break;
1142             }
1143         }
1144     }   
1145 }
1146
1147
1148 #if ( !defined(GMX_DOUBLE) && ( defined(GMX_IA32_SSE) || defined(GMX_X86_64_SSE) || defined(GMX_X86_64_SSE2) ) )
1149     /* Calculate exponentials through SSE in float precision */
1150 #define CALC_EXPONENTIALS(start,end,r_aligned)      \
1151     {                                               \
1152         __m128 tmp_sse;                             \
1153         for(kx=0; kx<end; kx+=4)                    \
1154         {                                           \
1155             tmp_sse = _mm_load_ps(r_aligned+kx);    \
1156             tmp_sse = gmx_mm_exp_ps(tmp_sse);       \
1157             _mm_store_ps(r_aligned+kx,tmp_sse);     \
1158         }                                           \
1159     }
1160 #else
1161 #define CALC_EXPONENTIALS(start,end,r)          \
1162     for(kx=start; kx<end; kx++)                 \
1163     {                                           \
1164         r[kx] = exp(r[kx]);                     \
1165     }
1166 #endif
1167
1168
1169 static int solve_pme_yzx(gmx_pme_t pme,t_complex *grid,
1170                          real ewaldcoeff,real vol,
1171                          gmx_bool bEnerVir,real *mesh_energy,matrix vir)
1172 {
1173     /* do recip sum over local cells in grid */
1174     /* y major, z middle, x minor or continuous */
1175     t_complex *p0;
1176     int     kx,ky,kz,maxkx,maxky,maxkz;
1177     int     nx,ny,nz,iy,iz,kxstart,kxend;
1178     real    mx,my,mz;
1179     real    factor=M_PI*M_PI/(ewaldcoeff*ewaldcoeff);
1180     real    ets2,struct2,vfactor,ets2vf;
1181     real    eterm,d1,d2,energy=0;
1182     real    by,bz;
1183     real    virxx=0,virxy=0,virxz=0,viryy=0,viryz=0,virzz=0;
1184     real    rxx,ryx,ryy,rzx,rzy,rzz;
1185         real    *mhx,*mhy,*mhz,*m2,*denom,*tmp1,*m2inv;
1186     real    mhxk,mhyk,mhzk,m2k;
1187     real    corner_fac;
1188     ivec    complex_order;
1189     ivec    local_ndata,local_offset,local_size;
1190     
1191     nx = pme->nkx;
1192     ny = pme->nky;
1193     nz = pme->nkz;
1194     
1195     /* Dimensions should be identical for A/B grid, so we just use A here */
1196     gmx_parallel_3dfft_complex_limits(pme->pfft_setupA,
1197                                       complex_order,
1198                                       local_ndata,
1199                                       local_offset,
1200                                       local_size);
1201     
1202     rxx = pme->recipbox[XX][XX];
1203     ryx = pme->recipbox[YY][XX];
1204     ryy = pme->recipbox[YY][YY];
1205     rzx = pme->recipbox[ZZ][XX];
1206     rzy = pme->recipbox[ZZ][YY];
1207     rzz = pme->recipbox[ZZ][ZZ];
1208     
1209     maxkx = (nx+1)/2;
1210     maxky = (ny+1)/2;
1211     maxkz = nz/2+1;
1212         
1213         mhx   = pme->work_mhx;
1214         mhy   = pme->work_mhy;
1215         mhz   = pme->work_mhz;
1216         m2    = pme->work_m2;
1217         denom = pme->work_denom;
1218         tmp1  = pme->work_tmp1;
1219         m2inv = pme->work_m2inv;        
1220
1221     for(iy=0;iy<local_ndata[YY];iy++)
1222     {
1223         ky = iy + local_offset[YY];
1224         
1225         if (ky < maxky) 
1226         {
1227             my = ky;
1228         }
1229         else 
1230         {
1231             my = (ky - ny);
1232         }
1233         
1234         by = M_PI*vol*pme->bsp_mod[YY][ky];
1235
1236         for(iz=0;iz<local_ndata[ZZ];iz++)
1237         {
1238             kz = iz + local_offset[ZZ];
1239             
1240             mz = kz;
1241
1242             bz = pme->bsp_mod[ZZ][kz];
1243             
1244             /* 0.5 correction for corner points */
1245                         corner_fac = 1;
1246             if (kz == 0)
1247                 corner_fac = 0.5;
1248             if (kz == (nz+1)/2)
1249                 corner_fac = 0.5;
1250                       
1251             p0 = grid + iy*local_size[ZZ]*local_size[XX] + iz*local_size[XX];
1252             
1253             /* We should skip the k-space point (0,0,0) */
1254             if (local_offset[XX] > 0 ||
1255                 local_offset[YY] > 0 || ky > 0 ||
1256                 kz > 0)
1257             {
1258                 kxstart = local_offset[XX];
1259             }
1260             else
1261             {
1262                 kxstart = local_offset[XX] + 1;
1263                 p0++;
1264             }
1265             kxend = local_offset[XX] + local_ndata[XX];
1266                         
1267             if (bEnerVir)
1268             {
1269                 /* More expensive inner loop, especially because of the storage
1270                  * of the mh elements in array's.
1271                  * Because x is the minor grid index, all mh elements
1272                  * depend on kx for triclinic unit cells.
1273                  */
1274
1275                 /* Two explicit loops to avoid a conditional inside the loop */
1276                 for(kx=kxstart; kx<maxkx; kx++)
1277                 {
1278                     mx = kx;
1279                     
1280                     mhxk      = mx * rxx;
1281                     mhyk      = mx * ryx + my * ryy;
1282                     mhzk      = mx * rzx + my * rzy + mz * rzz;
1283                     m2k       = mhxk*mhxk + mhyk*mhyk + mhzk*mhzk;
1284                     mhx[kx]   = mhxk;
1285                     mhy[kx]   = mhyk;
1286                     mhz[kx]   = mhzk;
1287                     m2[kx]    = m2k;
1288                     denom[kx] = m2k*bz*by*pme->bsp_mod[XX][kx];
1289                     tmp1[kx]  = -factor*m2k;
1290                 }
1291                 
1292                 for(kx=maxkx; kx<kxend; kx++)
1293                 {
1294                     mx = (kx - nx);
1295
1296                     mhxk      = mx * rxx;
1297                     mhyk      = mx * ryx + my * ryy;
1298                     mhzk      = mx * rzx + my * rzy + mz * rzz;
1299                     m2k       = mhxk*mhxk + mhyk*mhyk + mhzk*mhzk;
1300                     mhx[kx]   = mhxk;
1301                     mhy[kx]   = mhyk;
1302                     mhz[kx]   = mhzk;
1303                     m2[kx]    = m2k;
1304                     denom[kx] = m2k*bz*by*pme->bsp_mod[XX][kx];
1305                     tmp1[kx]  = -factor*m2k;
1306                 }
1307                 
1308                 for(kx=kxstart; kx<kxend; kx++)
1309                 {
1310                     m2inv[kx] = 1.0/m2[kx];
1311                 }
1312                 for(kx=kxstart; kx<kxend; kx++)
1313                 {
1314                     denom[kx] = 1.0/denom[kx];
1315                 }
1316
1317                 CALC_EXPONENTIALS(kxstart,kxend,tmp1);
1318
1319                 for(kx=kxstart; kx<kxend; kx++,p0++)
1320                 {
1321                     d1      = p0->re;
1322                     d2      = p0->im;
1323                     
1324                     eterm    = ONE_4PI_EPS0/pme->epsilon_r*tmp1[kx]*denom[kx];
1325                     
1326                     p0->re  = d1*eterm;
1327                     p0->im  = d2*eterm;
1328                     
1329                     struct2 = 2.0*(d1*d1+d2*d2);
1330                     
1331                     tmp1[kx] = eterm*struct2;
1332                 }
1333                 
1334                 for(kx=kxstart; kx<kxend; kx++)
1335                 {
1336                     ets2     = corner_fac*tmp1[kx];
1337                     vfactor  = (factor*m2[kx] + 1.0)*2.0*m2inv[kx];
1338                     energy  += ets2;
1339                     
1340                     ets2vf   = ets2*vfactor;
1341                     virxx   += ets2vf*mhx[kx]*mhx[kx] - ets2;
1342                     virxy   += ets2vf*mhx[kx]*mhy[kx];
1343                     virxz   += ets2vf*mhx[kx]*mhz[kx];
1344                     viryy   += ets2vf*mhy[kx]*mhy[kx] - ets2;
1345                     viryz   += ets2vf*mhy[kx]*mhz[kx];
1346                     virzz   += ets2vf*mhz[kx]*mhz[kx] - ets2;
1347                 }
1348             }
1349             else
1350             {
1351                 /* We don't need to calculate the energy and the virial.
1352                  * In this case the triclinic overhead is small.
1353                  */
1354
1355                 /* Two explicit loops to avoid a conditional inside the loop */
1356
1357                 for(kx=kxstart; kx<maxkx; kx++)
1358                 {
1359                     mx = kx;
1360                     
1361                     mhxk      = mx * rxx;
1362                     mhyk      = mx * ryx + my * ryy;
1363                     mhzk      = mx * rzx + my * rzy + mz * rzz;
1364                     m2k       = mhxk*mhxk + mhyk*mhyk + mhzk*mhzk;
1365                     denom[kx] = m2k*bz*by*pme->bsp_mod[XX][kx];
1366                     tmp1[kx]  = -factor*m2k;
1367                 }
1368                 
1369                 for(kx=maxkx; kx<kxend; kx++)
1370                 {
1371                     mx = (kx - nx);
1372                     
1373                     mhxk      = mx * rxx;
1374                     mhyk      = mx * ryx + my * ryy;
1375                     mhzk      = mx * rzx + my * rzy + mz * rzz;
1376                     m2k       = mhxk*mhxk + mhyk*mhyk + mhzk*mhzk;
1377                     denom[kx] = m2k*bz*by*pme->bsp_mod[XX][kx];
1378                     tmp1[kx]  = -factor*m2k;
1379                 }
1380                 
1381                 for(kx=kxstart; kx<kxend; kx++)
1382                 {
1383                     denom[kx] = 1.0/denom[kx];
1384                 }
1385
1386                 CALC_EXPONENTIALS(kxstart,kxend,tmp1);
1387                
1388                 for(kx=kxstart; kx<kxend; kx++,p0++)
1389                 {
1390                     d1      = p0->re;
1391                     d2      = p0->im;
1392                     
1393                     eterm    = ONE_4PI_EPS0/pme->epsilon_r*tmp1[kx]*denom[kx];
1394                     
1395                     p0->re  = d1*eterm;
1396                     p0->im  = d2*eterm;
1397                 }
1398             }
1399         }
1400     }
1401     
1402     if (bEnerVir)
1403     {
1404         /* Update virial with local values.
1405          * The virial is symmetric by definition.
1406          * this virial seems ok for isotropic scaling, but I'm
1407          * experiencing problems on semiisotropic membranes.
1408          * IS THAT COMMENT STILL VALID??? (DvdS, 2001/02/07).
1409          */
1410         vir[XX][XX] = 0.25*virxx;
1411         vir[YY][YY] = 0.25*viryy;
1412         vir[ZZ][ZZ] = 0.25*virzz;
1413         vir[XX][YY] = vir[YY][XX] = 0.25*virxy;
1414         vir[XX][ZZ] = vir[ZZ][XX] = 0.25*virxz;
1415         vir[YY][ZZ] = vir[ZZ][YY] = 0.25*viryz;
1416         
1417         /* This energy should be corrected for a charged system */
1418         *mesh_energy = 0.5*energy;
1419     }
1420
1421     /* Return the loop count */
1422     return local_ndata[YY]*local_ndata[ZZ]*local_ndata[XX];
1423 }
1424
1425
1426 #define DO_FSPLINE(order)                      \
1427 for(ithx=0; (ithx<order); ithx++)              \
1428 {                                                                          \
1429     index_x = (i0+ithx)*pny*pnz;               \
1430     tx      = thx[ithx];                       \
1431     dx      = dthx[ithx];                      \
1432                                                \
1433     for(ithy=0; (ithy<order); ithy++)          \
1434     {                                                                              \
1435         index_xy = index_x+(j0+ithy)*pnz;      \
1436         ty       = thy[ithy];                  \
1437         dy       = dthy[ithy];                 \
1438         fxy1     = fz1 = 0;                    \
1439                                                \
1440         for(ithz=0; (ithz<order); ithz++)      \
1441         {                                                                  \
1442             gval  = grid[index_xy+(k0+ithz)];  \
1443             fxy1 += thz[ithz]*gval;            \
1444             fz1  += dthz[ithz]*gval;           \
1445         }                                      \
1446         fx += dx*ty*fxy1;                      \
1447         fy += tx*dy*fxy1;                      \
1448         fz += tx*ty*fz1;                       \
1449     }                                          \
1450 }
1451
1452
1453 void gather_f_bsplines(gmx_pme_t pme,real *grid,
1454                        gmx_bool bClearF,pme_atomcomm_t *atc,real scale)
1455 {
1456     /* sum forces for local particles */  
1457     int     nn,n,ithx,ithy,ithz,i0,j0,k0;
1458     int     index_x,index_xy;
1459     int     nx,ny,nz,pnx,pny,pnz;
1460     int *   idxptr;
1461     real    tx,ty,dx,dy,qn;
1462     real    fx,fy,fz,gval;
1463     real    fxy1,fz1;
1464     real    *thx,*thy,*thz,*dthx,*dthy,*dthz;
1465     int     norder;
1466     real    rxx,ryx,ryy,rzx,rzy,rzz;
1467     int     order;
1468     
1469     order = pme->pme_order;
1470     thx   = atc->theta[XX];
1471     thy   = atc->theta[YY];
1472     thz   = atc->theta[ZZ];
1473     dthx  = atc->dtheta[XX];
1474     dthy  = atc->dtheta[YY];
1475     dthz  = atc->dtheta[ZZ];
1476     nx    = pme->nkx;
1477     ny    = pme->nky;
1478     nz    = pme->nkz;
1479     pnx   = pme->pmegrid_nx;
1480     pny   = pme->pmegrid_ny;
1481     pnz   = pme->pmegrid_nz;
1482     
1483     rxx   = pme->recipbox[XX][XX];
1484     ryx   = pme->recipbox[YY][XX];
1485     ryy   = pme->recipbox[YY][YY];
1486     rzx   = pme->recipbox[ZZ][XX];
1487     rzy   = pme->recipbox[ZZ][YY];
1488     rzz   = pme->recipbox[ZZ][ZZ];
1489
1490     for(nn=0; (nn<atc->n); nn++) 
1491     {
1492         n = nn;
1493         qn      = scale*atc->q[n];
1494         
1495         if (bClearF) 
1496         {
1497             atc->f[n][XX] = 0;
1498             atc->f[n][YY] = 0;
1499             atc->f[n][ZZ] = 0;
1500         }
1501         if (qn != 0) 
1502         {
1503             fx     = 0;
1504             fy     = 0;
1505             fz     = 0;
1506             idxptr = atc->idx[n];
1507             norder = n*order;
1508             
1509             i0   = idxptr[XX]; 
1510             j0   = idxptr[YY];
1511             k0   = idxptr[ZZ];
1512             
1513             /* Pointer arithmetic alert, next six statements */
1514             thx  = atc->theta[XX] + norder;
1515             thy  = atc->theta[YY] + norder;
1516             thz  = atc->theta[ZZ] + norder;
1517             dthx = atc->dtheta[XX] + norder;
1518             dthy = atc->dtheta[YY] + norder;
1519             dthz = atc->dtheta[ZZ] + norder;
1520             
1521             switch (order) {
1522             case 4:  DO_FSPLINE(4);     break;
1523             case 5:  DO_FSPLINE(5);     break;
1524             default: DO_FSPLINE(order); break;
1525             }
1526
1527             atc->f[n][XX] += -qn*( fx*nx*rxx );
1528             atc->f[n][YY] += -qn*( fx*nx*ryx + fy*ny*ryy );
1529             atc->f[n][ZZ] += -qn*( fx*nx*rzx + fy*ny*rzy + fz*nz*rzz );
1530         }
1531     }
1532     /* Since the energy and not forces are interpolated
1533      * the net force might not be exactly zero.
1534      * This can be solved by also interpolating F, but
1535      * that comes at a cost.
1536      * A better hack is to remove the net force every
1537      * step, but that must be done at a higher level
1538      * since this routine doesn't see all atoms if running
1539      * in parallel. Don't know how important it is?  EL 990726
1540      */
1541 }
1542
1543 static real gather_energy_bsplines(gmx_pme_t pme,real *grid,
1544                                    pme_atomcomm_t *atc)
1545 {
1546     int     n,ithx,ithy,ithz,i0,j0,k0;
1547     int     index_x,index_xy;
1548     int *   idxptr;
1549     real    energy,pot,tx,ty,qn,gval;
1550     real    *thx,*thy,*thz;
1551     int     norder;
1552     int     order;
1553     
1554     
1555     order = pme->pme_order;
1556     
1557     energy = 0;
1558     for(n=0; (n<atc->n); n++) {
1559         qn      = atc->q[n];
1560         
1561         if (qn != 0) {
1562             idxptr = atc->idx[n];
1563             norder = n*order;
1564             
1565             i0   = idxptr[XX]; 
1566             j0   = idxptr[YY];
1567             k0   = idxptr[ZZ];
1568             
1569             /* Pointer arithmetic alert, next three statements */
1570             thx  = atc->theta[XX] + norder;
1571             thy  = atc->theta[YY] + norder;
1572             thz  = atc->theta[ZZ] + norder;
1573
1574             pot = 0;
1575             for(ithx=0; (ithx<order); ithx++)
1576             {
1577                 index_x = (i0+ithx)*pme->pmegrid_ny*pme->pmegrid_nz;
1578                 tx      = thx[ithx];
1579
1580                 for(ithy=0; (ithy<order); ithy++)
1581                 {
1582                     index_xy = index_x+(j0+ithy)*pme->pmegrid_nz;
1583                     ty       = thy[ithy];
1584
1585                     for(ithz=0; (ithz<order); ithz++)
1586                     {
1587                         gval  = grid[index_xy+(k0+ithz)];
1588                         pot  += tx*ty*thz[ithz]*gval;
1589                     }
1590
1591                 }
1592             }
1593
1594             energy += pot*qn;
1595         }
1596     }
1597
1598     return energy;
1599 }
1600
1601 void make_bsplines(splinevec theta,splinevec dtheta,int order,
1602                    rvec fractx[],int nr,real charge[],
1603                    gmx_bool bFreeEnergy)
1604 {
1605     /* construct splines for local atoms */
1606     int  i,j,k,l;
1607     real dr,div;
1608     real *data,*ddata,*xptr;
1609     
1610     for(i=0; (i<nr); i++) {
1611         /* With free energy we do not use the charge check.
1612          * In most cases this will be more efficient than calling make_bsplines
1613          * twice, since usually more than half the particles have charges.
1614          */
1615         if (bFreeEnergy || charge[i] != 0.0) {
1616             xptr = fractx[i];
1617             for(j=0; (j<DIM); j++) {
1618                 dr  = xptr[j];
1619                 
1620                 /* dr is relative offset from lower cell limit */
1621                 data=&(theta[j][i*order]);
1622                 data[order-1]=0;
1623                 data[1]=dr;
1624                 data[0]=1-dr;
1625                 
1626                 for(k=3; (k<order); k++) {
1627                     div=1.0/(k-1.0);    
1628                     data[k-1]=div*dr*data[k-2];
1629                     for(l=1; (l<(k-1)); l++) {
1630                         data[k-l-1]=div*((dr+l)*data[k-l-2]+(k-l-dr)*
1631                                          data[k-l-1]);
1632                     }
1633                     data[0]=div*(1-dr)*data[0];
1634                 }
1635                 /* differentiate */
1636                 ddata    = &(dtheta[j][i*order]);
1637                 ddata[0] = -data[0];
1638                 for(k=1; (k<order); k++) {
1639                     ddata[k]=data[k-1]-data[k];
1640                 }
1641                 
1642                 div=1.0/(order-1);
1643                 data[order-1]=div*dr*data[order-2];
1644                 for(l=1; (l<(order-1)); l++) {
1645                     data[order-l-1]=div*((dr+l)*data[order-l-2]+
1646                                          (order-l-dr)*data[order-l-1]);
1647                 }
1648                 data[0]=div*(1-dr)*data[0]; 
1649             }
1650         }
1651     }
1652 }
1653
1654
1655 void make_dft_mod(real *mod,real *data,int ndata)
1656 {
1657   int i,j;
1658   real sc,ss,arg;
1659     
1660   for(i=0;i<ndata;i++) {
1661     sc=ss=0;
1662     for(j=0;j<ndata;j++) {
1663       arg=(2.0*M_PI*i*j)/ndata;
1664       sc+=data[j]*cos(arg);
1665       ss+=data[j]*sin(arg);
1666     }
1667     mod[i]=sc*sc+ss*ss;
1668   }
1669   for(i=0;i<ndata;i++)
1670     if(mod[i]<1e-7)
1671       mod[i]=(mod[i-1]+mod[i+1])*0.5;
1672 }
1673
1674
1675
1676 void make_bspline_moduli(splinevec bsp_mod,int nx,int ny,int nz,int order)
1677 {
1678   int nmax=max(nx,max(ny,nz));
1679   real *data,*ddata,*bsp_data;
1680   int i,k,l;
1681   real div;
1682     
1683   snew(data,order);
1684   snew(ddata,order);
1685   snew(bsp_data,nmax);
1686
1687   data[order-1]=0;
1688   data[1]=0;
1689   data[0]=1;
1690             
1691   for(k=3;k<order;k++) {
1692     div=1.0/(k-1.0);
1693     data[k-1]=0;
1694     for(l=1;l<(k-1);l++)
1695       data[k-l-1]=div*(l*data[k-l-2]+(k-l)*data[k-l-1]);
1696     data[0]=div*data[0];
1697   }
1698   /* differentiate */
1699   ddata[0]=-data[0];
1700   for(k=1;k<order;k++)
1701     ddata[k]=data[k-1]-data[k];
1702   div=1.0/(order-1);
1703   data[order-1]=0;
1704   for(l=1;l<(order-1);l++)
1705     data[order-l-1]=div*(l*data[order-l-2]+(order-l)*data[order-l-1]);
1706   data[0]=div*data[0]; 
1707
1708   for(i=0;i<nmax;i++)
1709     bsp_data[i]=0;
1710   for(i=1;i<=order;i++)
1711     bsp_data[i]=data[i-1];
1712     
1713   make_dft_mod(bsp_mod[XX],bsp_data,nx);
1714   make_dft_mod(bsp_mod[YY],bsp_data,ny);
1715   make_dft_mod(bsp_mod[ZZ],bsp_data,nz);
1716
1717   sfree(data);
1718   sfree(ddata);
1719   sfree(bsp_data);
1720 }
1721
1722 static void setup_coordinate_communication(pme_atomcomm_t *atc)
1723 {
1724   int nslab,n,i;
1725   int fw,bw;
1726
1727   nslab = atc->nslab;
1728
1729   n = 0;
1730   for(i=1; i<=nslab/2; i++) {
1731     fw = (atc->nodeid + i) % nslab;
1732     bw = (atc->nodeid - i + nslab) % nslab;
1733     if (n < nslab - 1) {
1734       atc->node_dest[n] = fw;
1735       atc->node_src[n]  = bw;
1736       n++;
1737     } 
1738     if (n < nslab - 1) {
1739       atc->node_dest[n] = bw;
1740       atc->node_src[n]  = fw;
1741       n++;
1742     }
1743   }
1744 }
1745
1746 int gmx_pme_destroy(FILE *log,gmx_pme_t *pmedata)
1747 {
1748     if(NULL != log)
1749     {
1750         fprintf(log,"Destroying PME data structures.\n");
1751     }
1752
1753     sfree((*pmedata)->nnx);
1754     sfree((*pmedata)->nny);
1755     sfree((*pmedata)->nnz);
1756         
1757     sfree((*pmedata)->pmegridA);
1758     sfree((*pmedata)->fftgridA);
1759     sfree((*pmedata)->cfftgridA);
1760     gmx_parallel_3dfft_destroy((*pmedata)->pfft_setupA);
1761     
1762     if((*pmedata)->pmegridB)
1763     {
1764         sfree((*pmedata)->pmegridB);
1765         sfree((*pmedata)->fftgridB);
1766         sfree((*pmedata)->cfftgridB);
1767         gmx_parallel_3dfft_destroy((*pmedata)->pfft_setupB);
1768     }
1769     sfree((*pmedata)->work_mhz);
1770     sfree((*pmedata)->work_m2);
1771     sfree((*pmedata)->work_denom);
1772     sfree((*pmedata)->work_tmp1_alloc);
1773     sfree((*pmedata)->work_m2inv);
1774         
1775     sfree(*pmedata);
1776     *pmedata = NULL;
1777   
1778   return 0;
1779 }
1780
1781 static int mult_up(int n,int f)
1782 {
1783     return ((n + f - 1)/f)*f;
1784 }
1785
1786
1787 static double pme_load_imbalance(gmx_pme_t pme)
1788 {
1789     int    nma,nmi;
1790     double n1,n2,n3;
1791
1792     nma = pme->nnodes_major;
1793     nmi = pme->nnodes_minor;
1794
1795     n1 = mult_up(pme->nkx,nma)*mult_up(pme->nky,nmi)*pme->nkz;
1796     n2 = mult_up(pme->nkx,nma)*mult_up(pme->nkz,nmi)*pme->nky;
1797     n3 = mult_up(pme->nky,nma)*mult_up(pme->nkz,nmi)*pme->nkx;
1798
1799     /* pme_solve is roughly double the cost of an fft */
1800
1801     return (n1 + n2 + 3*n3)/(double)(6*pme->nkx*pme->nky*pme->nkz);
1802 }
1803
1804 static void init_atomcomm(gmx_pme_t pme,pme_atomcomm_t *atc, t_commrec *cr,
1805                           int dimind,gmx_bool bSpread)
1806 {
1807     int nk,k,s;
1808
1809     atc->dimind = dimind;
1810     atc->nslab  = 1;
1811     atc->nodeid = 0;
1812     atc->pd_nalloc = 0;
1813 #ifdef GMX_MPI
1814     if (pme->nnodes > 1)
1815     {
1816         atc->mpi_comm = pme->mpi_comm_d[dimind];
1817         MPI_Comm_size(atc->mpi_comm,&atc->nslab);
1818         MPI_Comm_rank(atc->mpi_comm,&atc->nodeid);
1819     }
1820     if (debug)
1821     {
1822         fprintf(debug,"For PME atom communication in dimind %d: nslab %d rank %d\n",atc->dimind,atc->nslab,atc->nodeid);
1823     }
1824 #endif
1825
1826     atc->bSpread   = bSpread;
1827     atc->pme_order = pme->pme_order;
1828
1829     if (atc->nslab > 1)
1830     {
1831         /* These three allocations are not required for particle decomp. */
1832         snew(atc->node_dest,atc->nslab);
1833         snew(atc->node_src,atc->nslab);
1834         setup_coordinate_communication(atc);
1835         
1836         snew(atc->count,atc->nslab);
1837         snew(atc->rcount,atc->nslab);
1838         snew(atc->buf_index,atc->nslab);
1839     }
1840 }
1841
1842 static void 
1843 init_overlap_comm(pme_overlap_t *  ol,
1844                   int              norder,
1845 #ifdef GMX_MPI
1846                   MPI_Comm         comm,  
1847 #endif
1848                   int              nnodes, 
1849                   int              nodeid,
1850                   int              ndata)
1851 {
1852     int lbnd,rbnd,maxlr,b,i;
1853     int exten;
1854     int nn,nk;
1855     pme_grid_comm_t *pgc;
1856     gmx_bool bCont;
1857     int fft_start,fft_end,send_index1,recv_index1;
1858     
1859 #ifdef GMX_MPI
1860     ol->mpi_comm = comm;
1861 #endif
1862     
1863     ol->nnodes = nnodes;
1864     ol->nodeid = nodeid;
1865
1866     /* Linear translation of the PME grid wo'nt affect reciprocal space
1867      * calculations, so to optimize we only interpolate "upwards",
1868      * which also means we only have to consider overlap in one direction.
1869      * I.e., particles on this node might also be spread to grid indices
1870      * that belong to higher nodes (modulo nnodes)
1871      */
1872
1873     snew(ol->s2g0,ol->nnodes+1);
1874     snew(ol->s2g1,ol->nnodes);
1875     if (debug) { fprintf(debug,"PME slab boundaries:"); }
1876     for(i=0; i<nnodes; i++) 
1877     {
1878         /* s2g0 the local interpolation grid start.
1879          * s2g1 the local interpolation grid end.
1880          * Because grid overlap communication only goes forward,
1881          * the grid the slabs for fft's should be rounded down.
1882          */
1883         ol->s2g0[i] = ( i   *ndata + 0       )/nnodes;
1884         ol->s2g1[i] = ((i+1)*ndata + nnodes-1)/nnodes + norder - 1;
1885
1886         if (debug)
1887         {
1888             fprintf(debug,"  %3d %3d",ol->s2g0[i],ol->s2g1[i]);
1889         }
1890     }
1891     ol->s2g0[nnodes] = ndata;
1892     if (debug) { fprintf(debug,"\n"); }
1893
1894     /* Determine with how many nodes we need to communicate the grid overlap */
1895     b = 0;
1896     do
1897     {
1898         b++;
1899         bCont = FALSE;
1900         for(i=0; i<nnodes; i++)
1901         {
1902             if ((i+b <  nnodes && ol->s2g1[i] > ol->s2g0[i+b]) ||
1903                 (i+b >= nnodes && ol->s2g1[i] > ol->s2g0[i+b-nnodes] + ndata))
1904             {
1905                 bCont = TRUE;
1906             }
1907         }
1908     }
1909     while (bCont && b < nnodes);
1910     ol->noverlap_nodes = b - 1;
1911
1912     snew(ol->send_id,ol->noverlap_nodes);
1913     snew(ol->recv_id,ol->noverlap_nodes);
1914     for(b=0; b<ol->noverlap_nodes; b++)
1915     {
1916         ol->send_id[b] = (ol->nodeid + (b + 1)) % ol->nnodes;
1917         ol->recv_id[b] = (ol->nodeid - (b + 1) + ol->nnodes) % ol->nnodes;
1918     }
1919     snew(ol->comm_data, ol->noverlap_nodes);
1920
1921     for(b=0; b<ol->noverlap_nodes; b++)
1922     {
1923         pgc = &ol->comm_data[b];
1924         /* Send */
1925         fft_start        = ol->s2g0[ol->send_id[b]];
1926         fft_end          = ol->s2g0[ol->send_id[b]+1];
1927         if (ol->send_id[b] < nodeid)
1928         {
1929             fft_start += ndata;
1930             fft_end   += ndata;
1931         }
1932         send_index1      = ol->s2g1[nodeid];
1933         send_index1      = min(send_index1,fft_end);
1934         pgc->send_index0 = fft_start;
1935         pgc->send_nindex = max(0,send_index1 - pgc->send_index0);
1936
1937         /* We always start receiving to the first index of our slab */
1938         fft_start        = ol->s2g0[ol->nodeid];
1939         fft_end          = ol->s2g0[ol->nodeid+1];
1940         recv_index1      = ol->s2g1[ol->recv_id[b]];
1941         if (ol->recv_id[b] > nodeid)
1942         {
1943             recv_index1 -= ndata;
1944         }
1945         recv_index1      = min(recv_index1,fft_end);
1946         pgc->recv_index0 = fft_start;
1947         pgc->recv_nindex = max(0,recv_index1 - pgc->recv_index0);
1948     }
1949 }
1950
1951 static void
1952 make_gridindex5_to_localindex(int n,int local_start,int local_range,
1953                               int **global_to_local,
1954                               real **fraction_shift)
1955 {
1956     int i;
1957     int * gtl;
1958     real * fsh;
1959
1960     snew(gtl,5*n);
1961     snew(fsh,5*n);
1962     for(i=0; (i<5*n); i++)
1963     {
1964         /* Determine the global to local grid index */
1965         gtl[i] = (i - local_start + n) % n;
1966         /* For coordinates that fall within the local grid the fraction
1967          * is correct, we don't need to shift it.
1968          */
1969         fsh[i] = 0;
1970         if (local_range < n)
1971         {
1972             /* Due to rounding issues i could be 1 beyond the lower or
1973              * upper boundary of the local grid. Correct the index for this.
1974              * If we shift the index, we need to shift the fraction by
1975              * the same amount in the other direction to not affect
1976              * the weights.
1977              * Note that due to this shifting the weights at the end of
1978              * the spline might change, but that will only involve values
1979              * between zero and values close to the precision of a real,
1980              * which is anyhow the accuracy of the whole mesh calculation.
1981              */
1982             /* With local_range=0 we should not change i=local_start */
1983             if (i % n != local_start)
1984             {
1985                 if (gtl[i] == n-1)
1986                 {
1987                     gtl[i] = 0;
1988                     fsh[i] = -1; 
1989                 }
1990                 else if (gtl[i] == local_range)
1991                 {
1992                     gtl[i] = local_range - 1;
1993                     fsh[i] = 1;
1994                 }
1995             }
1996         }
1997     }
1998
1999     *global_to_local = gtl;
2000     *fraction_shift  = fsh;
2001 }
2002
2003 static void
2004 gmx_pme_check_grid_restrictions(FILE *fplog,char dim,int nnodes,int *nk)
2005 {
2006     int nk_new;
2007
2008     if (*nk % nnodes != 0)
2009     {
2010         nk_new = nnodes*(*nk/nnodes + 1);
2011
2012         if (2*nk_new >= 3*(*nk))
2013         {
2014             gmx_fatal(FARGS,"The PME grid size in dim %c (%d) is not divisble by the number of nodes doing PME in dim %c (%d). The grid size would have to be increased by more than 50%% to make the grid divisible. Change the total number of nodes or the number of domain decomposition cells in x or the PME grid %c dimension (and the cut-off).",
2015                       dim,*nk,dim,nnodes,dim);
2016         }
2017         
2018         if (fplog != NULL)
2019         {
2020             fprintf(fplog,"\nNOTE: The PME grid size in dim %c (%d) is not divisble by the number of nodes doing PME in dim %c (%d). Increasing the PME grid size in dim %c to %d. This will increase the accuracy and will not decrease the performance significantly on this number of nodes. For optimal performance change the total number of nodes or the number of domain decomposition cells in x or the PME grid %c dimension (and the cut-off).\n\n",
2021                     dim,*nk,dim,nnodes,dim,nk_new,dim);
2022         }
2023             
2024         *nk = nk_new;
2025     }
2026 }
2027
2028 int gmx_pme_init(gmx_pme_t *         pmedata,
2029                  t_commrec *         cr,
2030                  int                 nnodes_major,
2031                  int                 nnodes_minor,
2032                  t_inputrec *        ir,
2033                  int                 homenr,
2034                  gmx_bool                bFreeEnergy,
2035                  gmx_bool                bReproducible)
2036 {
2037     gmx_pme_t pme=NULL;
2038     
2039     pme_atomcomm_t *atc;
2040     int bufsizex,bufsizey,bufsize;
2041     ivec ndata;
2042     
2043     if (debug)
2044         fprintf(debug,"Creating PME data structures.\n");
2045     snew(pme,1);
2046         
2047     pme->redist_init         = FALSE;
2048     pme->sum_qgrid_tmp       = NULL;
2049     pme->sum_qgrid_dd_tmp    = NULL;
2050     pme->buf_nalloc          = 0;
2051     pme->redist_buf_nalloc   = 0;
2052     
2053     pme->nnodes              = 1;
2054     pme->bPPnode             = TRUE;
2055     
2056     pme->nnodes_major        = nnodes_major;
2057     pme->nnodes_minor        = nnodes_minor;
2058
2059 #ifdef GMX_MPI
2060     if (nnodes_major*nnodes_minor > 1 && PAR(cr)) 
2061     {
2062         pme->mpi_comm        = cr->mpi_comm_mygroup;
2063         
2064         MPI_Comm_rank(pme->mpi_comm,&pme->nodeid);
2065         MPI_Comm_size(pme->mpi_comm,&pme->nnodes);
2066         if (pme->nnodes != nnodes_major*nnodes_minor)
2067         {
2068             gmx_incons("PME node count mismatch");
2069         }
2070     }
2071 #endif
2072
2073     if (pme->nnodes == 1)
2074     {
2075         pme->ndecompdim = 0;
2076         pme->nodeid_major = 0;
2077         pme->nodeid_minor = 0;
2078     }
2079     else
2080     {
2081         if (nnodes_minor == 1)
2082         {
2083 #ifdef GMX_MPI
2084             pme->mpi_comm_d[0] = pme->mpi_comm;
2085             pme->mpi_comm_d[1] = NULL;
2086 #endif
2087             pme->ndecompdim = 1;
2088             pme->nodeid_major = pme->nodeid;
2089             pme->nodeid_minor = 0;
2090             
2091         }
2092         else if (nnodes_major == 1)
2093         {
2094 #ifdef GMX_MPI
2095             pme->mpi_comm_d[0] = NULL;
2096             pme->mpi_comm_d[1] = pme->mpi_comm;
2097 #endif
2098             pme->ndecompdim = 1;
2099             pme->nodeid_major = 0;
2100             pme->nodeid_minor = pme->nodeid;
2101         }
2102         else 
2103         {
2104             if (pme->nnodes % nnodes_major != 0)
2105             {
2106                 gmx_incons("For 2D PME decomposition, #PME nodes must be divisible by the number of nodes in the major dimension");
2107             }
2108             pme->ndecompdim = 2;
2109             
2110 #ifdef GMX_MPI
2111             MPI_Comm_split(pme->mpi_comm,pme->nodeid % nnodes_minor,
2112                            pme->nodeid,&pme->mpi_comm_d[0]);  /* My communicator along major dimension */
2113             MPI_Comm_split(pme->mpi_comm,pme->nodeid/nnodes_minor,
2114                            pme->nodeid,&pme->mpi_comm_d[1]);  /* My communicator along minor dimension */
2115             
2116             MPI_Comm_rank(pme->mpi_comm_d[0],&pme->nodeid_major);
2117             MPI_Comm_size(pme->mpi_comm_d[0],&pme->nnodes_major);
2118             MPI_Comm_rank(pme->mpi_comm_d[1],&pme->nodeid_minor);
2119             MPI_Comm_size(pme->mpi_comm_d[1],&pme->nnodes_minor);
2120 #endif
2121         }
2122         pme->bPPnode = (cr->duty & DUTY_PP);
2123     }
2124     
2125     if (ir->ePBC == epbcSCREW)
2126     {
2127         gmx_fatal(FARGS,"pme does not (yet) work with pbc = screw");
2128     }
2129     
2130     pme->bFEP        = ((ir->efep != efepNO) && bFreeEnergy);
2131     pme->nkx         = ir->nkx;
2132     pme->nky         = ir->nky;
2133     pme->nkz         = ir->nkz;
2134     pme->pme_order   = ir->pme_order;
2135     pme->epsilon_r   = ir->epsilon_r;
2136     
2137     /* Currently pme.c supports only the fft5d FFT code.
2138      * Therefore the grid always needs to be divisible by nnodes.
2139      * When the old 1D code is also supported again, change this check.
2140      *
2141      * This check should be done before calling gmx_pme_init
2142      * and fplog should be passed iso stderr.
2143      *
2144     if (pme->ndecompdim >= 2)
2145     */
2146     if (pme->ndecompdim >= 1)
2147     {
2148         /*
2149         gmx_pme_check_grid_restrictions(pme->nodeid==0 ? stderr : NULL,
2150                                         'x',nnodes_major,&pme->nkx);
2151         gmx_pme_check_grid_restrictions(pme->nodeid==0 ? stderr : NULL,
2152                                         'y',nnodes_minor,&pme->nky);
2153         */
2154     }
2155
2156     if (pme->nkx <= pme->pme_order*(pme->nnodes_major > 1 ? 2 : 1) ||
2157         pme->nky <= pme->pme_order*(pme->nnodes_minor > 1 ? 2 : 1) ||
2158         pme->nkz <= pme->pme_order)
2159     {
2160         gmx_fatal(FARGS,"The pme grid dimensions need to be larger than pme_order (%d) and in parallel larger than 2*pme_order for x and/or y",pme->pme_order);
2161     }
2162
2163     if (pme->nnodes > 1) {
2164         double imbal;
2165
2166 #ifdef GMX_MPI
2167         MPI_Type_contiguous(DIM, mpi_type, &(pme->rvec_mpi));
2168         MPI_Type_commit(&(pme->rvec_mpi));
2169 #endif
2170         
2171         /* Note that the charge spreading and force gathering, which usually
2172          * takes about the same amount of time as FFT+solve_pme,
2173          * is always fully load balanced
2174          * (unless the charge distribution is inhomogeneous).
2175          */
2176         
2177         imbal = pme_load_imbalance(pme);
2178         if (imbal >= 1.2 && pme->nodeid_major == 0 && pme->nodeid_minor == 0)
2179         {
2180             fprintf(stderr,
2181                     "\n"
2182                     "NOTE: The load imbalance in PME FFT and solve is %d%%.\n"
2183                     "      For optimal PME load balancing\n"
2184                     "      PME grid_x (%d) and grid_y (%d) should be divisible by #PME_nodes_x (%d)\n"
2185                     "      and PME grid_y (%d) and grid_z (%d) should be divisible by #PME_nodes_y (%d)\n"
2186                     "\n",
2187                     (int)((imbal-1)*100 + 0.5),
2188                     pme->nkx,pme->nky,pme->nnodes_major,
2189                     pme->nky,pme->nkz,pme->nnodes_minor);
2190         }
2191     }
2192
2193     init_overlap_comm(&pme->overlap[0],pme->pme_order,
2194 #ifdef GMX_MPI
2195                       pme->mpi_comm_d[0],
2196 #endif
2197                       pme->nnodes_major,pme->nodeid_major,pme->nkx);
2198     
2199     init_overlap_comm(&pme->overlap[1],pme->pme_order,
2200 #ifdef GMX_MPI
2201                       pme->mpi_comm_d[1],
2202 #endif
2203                       pme->nnodes_minor,pme->nodeid_minor,pme->nky);
2204     
2205     snew(pme->bsp_mod[XX],pme->nkx);
2206     snew(pme->bsp_mod[YY],pme->nky);
2207     snew(pme->bsp_mod[ZZ],pme->nkz);
2208     
2209     /* Allocate data for the interpolation grid, including overlap */
2210     pme->pmegrid_nx = pme->overlap[0].s2g1[pme->nodeid_major] -
2211                       pme->overlap[0].s2g0[pme->nodeid_major];
2212     pme->pmegrid_ny = pme->overlap[1].s2g1[pme->nodeid_minor] - 
2213                       pme->overlap[1].s2g0[pme->nodeid_minor];
2214     pme->pmegrid_nz = pme->nkz + pme->pme_order - 1;
2215     
2216     pme->pmegrid_start_ix = pme->overlap[0].s2g0[pme->nodeid_major];
2217     pme->pmegrid_start_iy = pme->overlap[1].s2g0[pme->nodeid_minor];
2218     pme->pmegrid_start_iz = 0;
2219     
2220     make_gridindex5_to_localindex(pme->nkx,
2221                                   pme->pmegrid_start_ix,
2222                                   pme->pmegrid_nx - (pme->pme_order-1),
2223                                   &pme->nnx,&pme->fshx);
2224     make_gridindex5_to_localindex(pme->nky,
2225                                   pme->pmegrid_start_iy,
2226                                   pme->pmegrid_ny - (pme->pme_order-1),
2227                                   &pme->nny,&pme->fshy);
2228     make_gridindex5_to_localindex(pme->nkz,
2229                                   pme->pmegrid_start_iz,
2230                                   pme->pmegrid_nz - (pme->pme_order-1),
2231                                   &pme->nnz,&pme->fshz);
2232     
2233     snew(pme->pmegridA,pme->pmegrid_nx*pme->pmegrid_ny*pme->pmegrid_nz);
2234     
2235     /* For non-divisible grid we need pme_order iso pme_order-1 */
2236     /* x overlap is copied in place: take padding into account.
2237      * y is always copied through a buffer: we don't need padding in z,
2238      * but we do need the overlap in x because of the communication order.
2239      */
2240     bufsizex = pme->pme_order*pme->pmegrid_ny*pme->pmegrid_nz;
2241     bufsizey = pme->pme_order*pme->pmegrid_nx*pme->nkz;
2242     bufsize  = (bufsizex>bufsizey) ? bufsizex : bufsizey;
2243     
2244     snew(pme->pmegrid_sendbuf,bufsize);
2245     snew(pme->pmegrid_recvbuf,bufsize);
2246     
2247     ndata[0] = pme->nkx;
2248     ndata[1] = pme->nky;
2249     ndata[2] = pme->nkz;
2250     
2251     /* This routine will allocate the grid data to fit the FFTs */
2252     gmx_parallel_3dfft_init(&pme->pfft_setupA,ndata,
2253                             &pme->fftgridA,&pme->cfftgridA,
2254                             pme->mpi_comm_d,
2255                             pme->overlap[0].s2g0,pme->overlap[1].s2g0,
2256                             bReproducible);
2257     
2258     if (bFreeEnergy)
2259     {
2260         snew(pme->pmegridB,pme->pmegrid_nx*pme->pmegrid_ny*pme->pmegrid_nz);    
2261         gmx_parallel_3dfft_init(&pme->pfft_setupB,ndata,
2262                                 &pme->fftgridB,&pme->cfftgridB,
2263                                 pme->mpi_comm_d,
2264                                 pme->overlap[0].s2g0,pme->overlap[1].s2g0,
2265                                 bReproducible);
2266     } else 
2267     {
2268         pme->pmegridB    = NULL;
2269         pme->fftgridB    = NULL;
2270         pme->cfftgridB   = NULL;
2271     }
2272     
2273     make_bspline_moduli(pme->bsp_mod,pme->nkx,pme->nky,pme->nkz,pme->pme_order);
2274     
2275     /* Use atc[0] for spreading */
2276     init_atomcomm(pme,&pme->atc[0],cr,nnodes_major > 1 ? 0 : 1,TRUE);
2277     if (pme->ndecompdim >= 2)
2278     {
2279         init_atomcomm(pme,&pme->atc[1],cr,1,FALSE);
2280     }
2281     
2282     if (pme->nnodes == 1) {
2283         pme->atc[0].n = homenr;
2284         pme_realloc_atomcomm_things(&pme->atc[0]);
2285     }
2286     
2287     /* Use fft5d, order after FFT is y major, z, x minor */
2288     pme->work_nalloc = pme->nkx;
2289     snew(pme->work_mhx,pme->work_nalloc);
2290     snew(pme->work_mhy,pme->work_nalloc);
2291     snew(pme->work_mhz,pme->work_nalloc);
2292     snew(pme->work_m2,pme->work_nalloc);
2293     snew(pme->work_denom,pme->work_nalloc);
2294     /* Allocate an aligned pointer for SSE operations, including 3 extra
2295      * elements at the end since SSE operates on 4 elements at a time.
2296      */
2297     snew(pme->work_tmp1_alloc,pme->work_nalloc+8);
2298     pme->work_tmp1 = (real *) (((size_t) pme->work_tmp1_alloc + 16) & (~((size_t) 15)));
2299     snew(pme->work_m2inv,pme->work_nalloc);
2300
2301     *pmedata = pme;
2302     
2303     return 0;
2304 }
2305
2306 static void spread_on_grid(gmx_pme_t pme,
2307                            pme_atomcomm_t *atc,real *grid,
2308                            gmx_bool bCalcSplines,gmx_bool bSpread)
2309 {    
2310     if (bCalcSplines)
2311     {
2312     
2313         /* Compute fftgrid index for all atoms,
2314          * with help of some extra variables.
2315          */
2316         calc_interpolation_idx(pme,atc);
2317         
2318         /* make local bsplines  */
2319         make_bsplines(atc->theta,atc->dtheta,pme->pme_order,
2320                       atc->fractx,atc->n,atc->q,pme->bFEP);
2321     }    
2322     
2323     if (bSpread)
2324     {
2325         /* put local atoms on grid. */
2326         spread_q_bsplines(pme,atc,grid);
2327     }
2328 }
2329
2330 void gmx_pme_calc_energy(gmx_pme_t pme,int n,rvec *x,real *q,real *V)
2331 {
2332     pme_atomcomm_t *atc;
2333     real *grid;
2334
2335     if (pme->nnodes > 1)
2336     {
2337         gmx_incons("gmx_pme_calc_energy called in parallel");
2338     }
2339     if (pme->bFEP > 1)
2340     {
2341         gmx_incons("gmx_pme_calc_energy with free energy");
2342     }
2343
2344     atc = &pme->atc_energy;
2345     atc->nslab     = 1;
2346     atc->bSpread   = TRUE;
2347     atc->pme_order = pme->pme_order;
2348     atc->n         = n;
2349     pme_realloc_atomcomm_things(atc);
2350     atc->x         = x;
2351     atc->q         = q;
2352     
2353     /* We only use the A-charges grid */
2354     grid = pme->pmegridA;
2355
2356     spread_on_grid(pme,atc,NULL,TRUE,FALSE);
2357
2358     *V = gather_energy_bsplines(pme,grid,atc);
2359 }
2360
2361
2362 static void reset_pmeonly_counters(t_commrec *cr,gmx_wallcycle_t wcycle,
2363         t_nrnb *nrnb,t_inputrec *ir, gmx_large_int_t step_rel)
2364 {
2365     /* Reset all the counters related to performance over the run */
2366     wallcycle_stop(wcycle,ewcRUN);
2367     wallcycle_reset_all(wcycle);
2368     init_nrnb(nrnb);
2369     ir->init_step += step_rel;
2370     ir->nsteps    -= step_rel;
2371     wallcycle_start(wcycle,ewcRUN);
2372 }
2373
2374
2375 int gmx_pmeonly(gmx_pme_t pme,
2376                 t_commrec *cr,    t_nrnb *nrnb,
2377                 gmx_wallcycle_t wcycle,
2378                 real ewaldcoeff,  gmx_bool bGatherOnly,
2379                 t_inputrec *ir)
2380 {
2381     gmx_pme_pp_t pme_pp;
2382     int  natoms;
2383     matrix box;
2384     rvec *x_pp=NULL,*f_pp=NULL;
2385     real *chargeA=NULL,*chargeB=NULL;
2386     real lambda=0;
2387     int  maxshift_x=0,maxshift_y=0;
2388     real energy,dvdlambda;
2389     matrix vir;
2390     float cycles;
2391     int  count;
2392     gmx_bool bEnerVir;
2393     gmx_large_int_t step,step_rel;
2394     
2395     
2396     pme_pp = gmx_pme_pp_init(cr);
2397     
2398     init_nrnb(nrnb);
2399     
2400     count = 0;
2401     do /****** this is a quasi-loop over time steps! */
2402     {
2403         /* Domain decomposition */
2404         natoms = gmx_pme_recv_q_x(pme_pp,
2405                                   &chargeA,&chargeB,box,&x_pp,&f_pp,
2406                                   &maxshift_x,&maxshift_y,
2407                                   &pme->bFEP,&lambda,
2408                                   &bEnerVir,
2409                                   &step);
2410         
2411         if (natoms == -1) {
2412             /* We should stop: break out of the loop */
2413             break;
2414         }
2415         
2416         step_rel = step - ir->init_step;
2417         
2418         if (count == 0)
2419             wallcycle_start(wcycle,ewcRUN);
2420         
2421         wallcycle_start(wcycle,ewcPMEMESH);
2422         
2423         dvdlambda = 0;
2424         clear_mat(vir);
2425         gmx_pme_do(pme,0,natoms,x_pp,f_pp,chargeA,chargeB,box,
2426                    cr,maxshift_x,maxshift_y,nrnb,wcycle,vir,ewaldcoeff,
2427                    &energy,lambda,&dvdlambda,
2428                    GMX_PME_DO_ALL_F | (bEnerVir ? GMX_PME_CALC_ENER_VIR : 0));
2429         
2430         cycles = wallcycle_stop(wcycle,ewcPMEMESH);
2431         
2432         gmx_pme_send_force_vir_ener(pme_pp,
2433                                     f_pp,vir,energy,dvdlambda,
2434                                     cycles);
2435         
2436         count++;
2437
2438         if (step_rel == wcycle_get_reset_counters(wcycle))
2439         {
2440             /* Reset all the counters related to performance over the run */
2441             reset_pmeonly_counters(cr,wcycle,nrnb,ir,step_rel);
2442             wcycle_set_reset_counters(wcycle, 0);
2443         }
2444         
2445     } /***** end of quasi-loop, we stop with the break above */
2446     while (TRUE);
2447     
2448     return 0;
2449 }
2450
2451 int gmx_pme_do(gmx_pme_t pme,
2452                int start,       int homenr,
2453                rvec x[],        rvec f[],
2454                real *chargeA,   real *chargeB,
2455                matrix box,      t_commrec *cr,
2456                int  maxshift_x, int maxshift_y,
2457                t_nrnb *nrnb,    gmx_wallcycle_t wcycle,
2458                matrix vir,      real ewaldcoeff,
2459                real *energy,    real lambda, 
2460                real *dvdlambda, int flags)
2461 {
2462     int     q,d,i,j,ntot,npme;
2463     int     nx,ny,nz;
2464     int     n_d,local_ny;
2465     int     loop_count;
2466     pme_atomcomm_t *atc=NULL;
2467     real *  grid=NULL;
2468     real    *ptr;
2469     rvec    *x_d,*f_d;
2470     real    *charge=NULL,*q_d,vol;
2471     real    energy_AB[2];
2472     matrix  vir_AB[2];
2473     gmx_bool    bClearF;
2474     gmx_parallel_3dfft_t pfft_setup;
2475     real *  fftgrid;
2476     t_complex * cfftgrid;
2477
2478     if (pme->nnodes > 1) {
2479         atc = &pme->atc[0];
2480         atc->npd = homenr;
2481         if (atc->npd > atc->pd_nalloc) {
2482             atc->pd_nalloc = over_alloc_dd(atc->npd);
2483             srenew(atc->pd,atc->pd_nalloc);
2484         }
2485         atc->maxshift = (atc->dimind==0 ? maxshift_x : maxshift_y);
2486     }
2487     else
2488     {
2489         /* This could be necessary for TPI */
2490         pme->atc[0].n = homenr;
2491     }
2492     
2493     for(q=0; q<(pme->bFEP ? 2 : 1); q++) {
2494         if (q == 0) {
2495             grid = pme->pmegridA;
2496             fftgrid = pme->fftgridA;
2497             cfftgrid = pme->cfftgridA;
2498             pfft_setup = pme->pfft_setupA;
2499             charge = chargeA+start;
2500         } else {
2501             grid = pme->pmegridB;
2502             fftgrid = pme->fftgridB;
2503             cfftgrid = pme->cfftgridB;
2504             pfft_setup = pme->pfft_setupB;
2505             charge = chargeB+start;
2506         }
2507         /* Unpack structure */
2508         if (debug) {
2509             fprintf(debug,"PME: nnodes = %d, nodeid = %d\n",
2510                     cr->nnodes,cr->nodeid);
2511             fprintf(debug,"Grid = %p\n",(void*)grid);
2512             if (grid == NULL)
2513                 gmx_fatal(FARGS,"No grid!");
2514         }
2515         where();
2516         
2517         m_inv_ur0(box,pme->recipbox); 
2518
2519         if (pme->nnodes == 1) {
2520             atc = &pme->atc[0];
2521             if (DOMAINDECOMP(cr)) {
2522                 atc->n = homenr;
2523                 pme_realloc_atomcomm_things(atc);
2524             }
2525             atc->x = x;
2526             atc->q = charge;
2527             atc->f = f;
2528         } else {
2529             wallcycle_start(wcycle,ewcPME_REDISTXF);
2530             for(d=pme->ndecompdim-1; d>=0; d--)
2531             {
2532                 if (d == pme->ndecompdim-1)
2533                 {
2534                     n_d = homenr;
2535                     x_d = x + start;
2536                     q_d = charge;
2537                 }
2538                 else
2539                 {
2540                     n_d = pme->atc[d+1].n;
2541                     x_d = atc->x;
2542                     q_d = atc->q;
2543                 }
2544                 atc = &pme->atc[d];
2545                 atc->npd = n_d;
2546                 if (atc->npd > atc->pd_nalloc) {
2547                     atc->pd_nalloc = over_alloc_dd(atc->npd);
2548                     srenew(atc->pd,atc->pd_nalloc);
2549                 }
2550                 atc->maxshift = (atc->dimind==0 ? maxshift_x : maxshift_y);
2551                 pme_calc_pidx(n_d,pme->recipbox,x_d,atc);
2552                 where();
2553                 
2554                 GMX_BARRIER(cr->mpi_comm_mygroup);
2555                 /* Redistribute x (only once) and qA or qB */
2556                 if (DOMAINDECOMP(cr)) {
2557                     dd_pmeredist_x_q(pme, n_d, q==0, x_d, q_d, atc);
2558                 } else {
2559                     pmeredist_pd(pme, TRUE, n_d, q==0, x_d, q_d, atc);
2560                 }
2561             }
2562             where();
2563
2564             wallcycle_stop(wcycle,ewcPME_REDISTXF);
2565         }
2566         
2567         if (debug)
2568             fprintf(debug,"Node= %6d, pme local particles=%6d\n",
2569                     cr->nodeid,atc->n);
2570
2571         if (flags & GMX_PME_SPREAD_Q)
2572         {
2573             wallcycle_start(wcycle,ewcPME_SPREADGATHER);
2574
2575             /* Spread the charges on a grid */
2576             GMX_MPE_LOG(ev_spread_on_grid_start);
2577             
2578             /* Spread the charges on a grid */
2579             spread_on_grid(pme,&pme->atc[0],grid,q==0,TRUE);
2580             GMX_MPE_LOG(ev_spread_on_grid_finish);
2581
2582             if (q == 0)
2583             {
2584                 inc_nrnb(nrnb,eNR_WEIGHTS,DIM*atc->n);
2585             }
2586             inc_nrnb(nrnb,eNR_SPREADQBSP,
2587                      pme->pme_order*pme->pme_order*pme->pme_order*atc->n);
2588
2589             wrap_periodic_pmegrid(pme,grid);
2590
2591             /* sum contributions to local grid from other nodes */
2592 #ifdef GMX_MPI
2593             if (pme->nnodes > 1) {
2594                 GMX_BARRIER(cr->mpi_comm_mygroup);
2595                 gmx_sum_qgrid_dd(pme,grid,GMX_SUM_QGRID_FORWARD);
2596                 where();
2597             }
2598 #endif
2599             where();
2600
2601             copy_pmegrid_to_fftgrid(pme,grid,fftgrid);
2602
2603             wallcycle_stop(wcycle,ewcPME_SPREADGATHER);
2604         }
2605          
2606         if (flags & GMX_PME_SOLVE)
2607         {
2608             /* do 3d-fft */ 
2609             GMX_BARRIER(cr->mpi_comm_mygroup);
2610             GMX_MPE_LOG(ev_gmxfft3d_start);
2611             wallcycle_start(wcycle,ewcPME_FFT);
2612             gmx_parallel_3dfft_execute(pfft_setup,GMX_FFT_REAL_TO_COMPLEX,fftgrid,cfftgrid);
2613             wallcycle_stop(wcycle,ewcPME_FFT);
2614             GMX_MPE_LOG(ev_gmxfft3d_finish);
2615             where();
2616             
2617             /* solve in k-space for our local cells */
2618             vol = det(box);
2619             GMX_BARRIER(cr->mpi_comm_mygroup);
2620             GMX_MPE_LOG(ev_solve_pme_start);
2621             wallcycle_start(wcycle,ewcPME_SOLVE);
2622             loop_count =
2623                 solve_pme_yzx(pme,cfftgrid,ewaldcoeff,vol,
2624                               flags & GMX_PME_CALC_ENER_VIR,
2625                               &energy_AB[q],vir_AB[q]);
2626             wallcycle_stop(wcycle,ewcPME_SOLVE);
2627             where();
2628             GMX_MPE_LOG(ev_solve_pme_finish);
2629             inc_nrnb(nrnb,eNR_SOLVEPME,loop_count);
2630         }
2631
2632         if ((flags & GMX_PME_CALC_F) ||
2633             (flags & GMX_PME_CALC_POT))
2634         {
2635             
2636             /* do 3d-invfft */
2637             GMX_BARRIER(cr->mpi_comm_mygroup);
2638             GMX_MPE_LOG(ev_gmxfft3d_start);
2639             where();
2640             wallcycle_start(wcycle,ewcPME_FFT);
2641             gmx_parallel_3dfft_execute(pfft_setup,GMX_FFT_COMPLEX_TO_REAL,cfftgrid,fftgrid);
2642             wallcycle_stop(wcycle,ewcPME_FFT);
2643
2644             where();
2645             GMX_MPE_LOG(ev_gmxfft3d_finish);
2646
2647             if (pme->nodeid == 0)
2648             {
2649                 ntot = pme->nkx*pme->nky*pme->nkz;
2650                 npme  = ntot*log((real)ntot)/log(2.0);
2651                 inc_nrnb(nrnb,eNR_FFT,2*npme);
2652             }
2653
2654             wallcycle_start(wcycle,ewcPME_SPREADGATHER);
2655
2656             copy_fftgrid_to_pmegrid(pme,fftgrid,grid);
2657
2658             /* distribute local grid to all nodes */
2659 #ifdef GMX_MPI
2660             if (pme->nnodes > 1) {
2661                 GMX_BARRIER(cr->mpi_comm_mygroup);
2662                 gmx_sum_qgrid_dd(pme,grid,GMX_SUM_QGRID_BACKWARD);
2663             }
2664 #endif
2665             where();
2666
2667             unwrap_periodic_pmegrid(pme,grid);
2668         }
2669
2670         if (flags & GMX_PME_CALC_F)
2671         {
2672             /* interpolate forces for our local atoms */
2673             GMX_BARRIER(cr->mpi_comm_mygroup);
2674             GMX_MPE_LOG(ev_gather_f_bsplines_start);
2675
2676             where();
2677             
2678             /* If we are running without parallelization,
2679              * atc->f is the actual force array, not a buffer,
2680              * therefore we should not clear it.
2681              */
2682             bClearF = (q == 0 && PAR(cr));
2683             gather_f_bsplines(pme,grid,bClearF,&pme->atc[0],
2684                               pme->bFEP ? (q==0 ? 1.0-lambda : lambda) : 1.0);
2685             where();
2686             
2687             GMX_MPE_LOG(ev_gather_f_bsplines_finish);
2688             
2689             inc_nrnb(nrnb,eNR_GATHERFBSP,
2690                      pme->pme_order*pme->pme_order*pme->pme_order*pme->atc[0].n);
2691             wallcycle_stop(wcycle,ewcPME_SPREADGATHER);
2692        }
2693     } /* of q-loop */
2694     
2695     if ((flags & GMX_PME_CALC_F) && pme->nnodes > 1) {
2696         wallcycle_start(wcycle,ewcPME_REDISTXF);
2697         for(d=0; d<pme->ndecompdim; d++)
2698         {
2699             atc = &pme->atc[d];
2700             if (d == pme->ndecompdim - 1)
2701             {
2702                 n_d = homenr;
2703                 f_d = f + start;
2704             }
2705             else
2706             {
2707                 n_d = pme->atc[d+1].n;
2708                 f_d = pme->atc[d+1].f;
2709             }
2710             GMX_BARRIER(cr->mpi_comm_mygroup);
2711             if (DOMAINDECOMP(cr)) {
2712                 dd_pmeredist_f(pme,atc,n_d,f_d,
2713                                d==pme->ndecompdim-1 && pme->bPPnode);
2714             } else {
2715                 pmeredist_pd(pme, FALSE, n_d, TRUE, f_d, NULL, atc);
2716             }
2717         }
2718
2719         wallcycle_stop(wcycle,ewcPME_REDISTXF);
2720     }
2721     where();
2722     
2723     if (!pme->bFEP) {
2724         *energy = energy_AB[0];
2725         m_add(vir,vir_AB[0],vir);
2726     } else {
2727         *energy = (1.0-lambda)*energy_AB[0] + lambda*energy_AB[1];
2728         *dvdlambda += energy_AB[1] - energy_AB[0];
2729         for(i=0; i<DIM; i++)
2730             for(j=0; j<DIM; j++)
2731                 vir[i][j] += (1.0-lambda)*vir_AB[0][i][j] + lambda*vir_AB[1][i][j];
2732     }
2733
2734     if (debug)
2735         fprintf(debug,"PME mesh energy: %g\n",*energy);
2736     
2737     return 0;
2738 }