73687e911ab1b2fb2aba6abe0e4bf6ceaf7d94d1
[alexxy/gromacs.git] / src / mdlib / pme.c
1 /* -*- mode: c; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; c-file-style: "stroustrup"; -*-
2  *
3  * 
4  *                This source code is part of
5  * 
6  *                 G   R   O   M   A   C   S
7  * 
8  *          GROningen MAchine for Chemical Simulations
9  * 
10  *                        VERSION 3.2.0
11  * Written by David van der Spoel, Erik Lindahl, Berk Hess, and others.
12  * Copyright (c) 1991-2000, University of Groningen, The Netherlands.
13  * Copyright (c) 2001-2004, The GROMACS development team,
14  * check out http://www.gromacs.org for more information.
15
16  * This program is free software; you can redistribute it and/or
17  * modify it under the terms of the GNU General Public License
18  * as published by the Free Software Foundation; either version 2
19  * of the License, or (at your option) any later version.
20  * 
21  * If you want to redistribute modifications, please consider that
22  * scientific software is very special. Version control is crucial -
23  * bugs must be traceable. We will be happy to consider code for
24  * inclusion in the official distribution, but derived work must not
25  * be called official GROMACS. Details are found in the README & COPYING
26  * files - if they are missing, get the official version at www.gromacs.org.
27  * 
28  * To help us fund GROMACS development, we humbly ask that you cite
29  * the papers on the package - you can find them in the top README file.
30  * 
31  * For more info, check our website at http://www.gromacs.org
32  * 
33  * And Hey:
34  * GROwing Monsters And Cloning Shrimps
35  */
36 /* IMPORTANT FOR DEVELOPERS:
37  *
38  * Triclinic pme stuff isn't entirely trivial, and we've experienced
39  * some bugs during development (many of them due to me). To avoid
40  * this in the future, please check the following things if you make
41  * changes in this file:
42  *
43  * 1. You should obtain identical (at least to the PME precision)
44  *    energies, forces, and virial for
45  *    a rectangular box and a triclinic one where the z (or y) axis is
46  *    tilted a whole box side. For instance you could use these boxes:
47  *
48  *    rectangular       triclinic
49  *     2  0  0           2  0  0
50  *     0  2  0           0  2  0
51  *     0  0  6           2  2  6
52  *
53  * 2. You should check the energy conservation in a triclinic box.
54  *
55  * It might seem an overkill, but better safe than sorry.
56  * /Erik 001109
57  */ 
58
59 #ifdef HAVE_CONFIG_H
60 #include <config.h>
61 #endif
62
63 #ifdef GMX_LIB_MPI
64 #include <mpi.h>
65 #endif
66 #ifdef GMX_THREADS
67 #include "tmpi.h"
68 #endif
69
70
71 #include <stdio.h>
72 #include <string.h>
73 #include <math.h>
74 #include "typedefs.h"
75 #include "txtdump.h"
76 #include "vec.h"
77 #include "gmxcomplex.h"
78 #include "smalloc.h"
79 #include "futil.h"
80 #include "coulomb.h"
81 #include "gmx_fatal.h"
82 #include "pme.h"
83 #include "network.h"
84 #include "physics.h"
85 #include "nrnb.h"
86 #include "copyrite.h"
87 #include "gmx_wallcycle.h"
88 #include "gmx_parallel_3dfft.h"
89 #include "pdbio.h"
90
91 #if ( !defined(GMX_DOUBLE) && ( defined(GMX_IA32_SSE) || defined(GMX_X86_64_SSE) || defined(GMX_X86_64_SSE2) ) )
92 #include "gmx_sse2_single.h"
93 #endif
94
95 #include "mpelogging.h"
96
97 #define DFT_TOL 1e-7
98 /* #define PRT_FORCE */
99 /* conditions for on the fly time-measurement */
100 /* #define TAKETIME (step > 1 && timesteps < 10) */
101 #define TAKETIME FALSE
102
103 #ifdef GMX_DOUBLE
104 #define mpi_type MPI_DOUBLE
105 #else
106 #define mpi_type MPI_FLOAT
107 #endif
108
109 /* Internal datastructures */
110 typedef struct {
111     int send_index0;
112     int send_nindex;
113     int recv_index0;
114     int recv_nindex;
115 } pme_grid_comm_t;
116
117 typedef struct {
118 #ifdef GMX_MPI
119     MPI_Comm mpi_comm;
120 #endif
121     int  nnodes,nodeid;
122     int  *s2g0;
123     int  *s2g1;
124     int  noverlap_nodes;
125     int  *send_id,*recv_id;
126     pme_grid_comm_t *comm_data;
127 } pme_overlap_t;
128
129 typedef struct {
130     int  dimind;            /* The index of the dimension, 0=x, 1=y */
131     int  nslab;
132     int  nodeid;
133 #ifdef GMX_MPI
134     MPI_Comm mpi_comm;
135 #endif
136
137     int  *node_dest;        /* The nodes to send x and q to with DD */
138     int  *node_src;         /* The nodes to receive x and q from with DD */
139     int  *buf_index;        /* Index for commnode into the buffers */
140
141     int  maxshift;
142
143     int  npd;
144     int  pd_nalloc;
145     int  *pd;
146     int  *count;            /* The number of atoms to send to each node */
147     int  *rcount;           /* The number of atoms to receive */
148
149     int  n;
150     int  nalloc;
151     rvec *x;
152     real *q;
153     rvec *f;
154     bool bSpread;           /* These coordinates are used for spreading */
155     int  pme_order;
156     splinevec theta,dtheta;
157     ivec *idx;
158     rvec *fractx;            /* Fractional coordinate relative to the
159                               * lower cell boundary 
160                               */
161 } pme_atomcomm_t;
162
163 typedef struct gmx_pme {
164     int  ndecompdim;         /* The number of decomposition dimensions */
165     int  nodeid;             /* Our nodeid in mpi->mpi_comm */
166     int  nodeid_major;
167     int  nodeid_minor;
168     int  nnodes;             /* The number of nodes doing PME */
169     int  nnodes_major;
170     int  nnodes_minor;
171
172     MPI_Comm mpi_comm;
173     MPI_Comm mpi_comm_d[2];  /* Indexed on dimension, 0=x, 1=y */
174 #ifdef GMX_MPI
175     MPI_Datatype  rvec_mpi;  /* the pme vector's MPI type */
176 #endif
177
178     bool bPPnode;            /* Node also does particle-particle forces */
179     bool bFEP;               /* Compute Free energy contribution */
180     int nkx,nky,nkz;         /* Grid dimensions */
181     int pme_order;
182     real epsilon_r;           
183     
184     real *  pmegridA;  /* Grids on which we do spreading/interpolation, includes overlap */
185     real *  pmegridB;
186     int     pmegrid_nx,pmegrid_ny,pmegrid_nz;
187     int     pmegrid_start_ix,pmegrid_start_iy,pmegrid_start_iz;    
188     
189     real *  pmegrid_sendbuf;
190     real *  pmegrid_recvbuf;
191     
192     real *fftgridA;             /* Grids for FFT. With 1D FFT decomposition this can be a pointer */
193     real *fftgridB;             /* inside the interpolation grid, but separate for 2D PME decomp. */
194     int   fftgrid_nx,fftgrid_ny,fftgrid_nz;
195     
196     t_complex *cfftgridA;             /* Grids for complex FFT data */
197     t_complex *cfftgridB;            
198     int   cfftgrid_nx,cfftgrid_ny,cfftgrid_nz;
199     
200     gmx_parallel_3dfft_t  pfft_setupA;
201     gmx_parallel_3dfft_t  pfft_setupB;
202     
203     int  *nnx,*nny,*nnz;
204     real *fshx,*fshy,*fshz;
205     
206     pme_atomcomm_t atc[2];  /* Indexed on decomposition index */
207     matrix    recipbox;
208     splinevec bsp_mod;
209     
210     pme_overlap_t overlap[2]; /* Indexed on dimension, 0=x, 1=y */
211
212
213     pme_atomcomm_t atc_energy; /* Only for gmx_pme_calc_energy */
214     
215     rvec *bufv;             /* Communication buffer */
216     real *bufr;             /* Communication buffer */
217     int  buf_nalloc;        /* The communication buffer size */
218
219     /* work data for solve_pme */
220     int      work_nalloc;
221     real *   work_mhx;
222     real *   work_mhy;
223     real *   work_mhz;
224     real *   work_m2;
225     real *   work_denom;
226     real *   work_tmp1_alloc;
227     real *   work_tmp1;
228     real *   work_m2inv;
229
230     /* Work data for PME_redist */
231     bool     redist_init;
232     int *    scounts; 
233     int *    rcounts;
234     int *    sdispls;
235     int *    rdispls;
236     int *    sidx;
237     int *    idxa;    
238     real *   redist_buf;
239     int      redist_buf_nalloc;
240     
241     /* Work data for sum_qgrid */
242     real *   sum_qgrid_tmp;
243     real *   sum_qgrid_dd_tmp;
244 } t_gmx_pme;
245
246
247 static void calc_interpolation_idx(gmx_pme_t pme,pme_atomcomm_t *atc)
248 {
249     int  i;
250     int  *idxptr,tix,tiy,tiz;
251     real *xptr,*fptr,tx,ty,tz;
252     real rxx,ryx,ryy,rzx,rzy,rzz;
253     int  nx,ny,nz;
254     int  start_ix,start_iy,start_iz;
255     
256     nx  = pme->nkx;
257     ny  = pme->nky;
258     nz  = pme->nkz;
259     
260     start_ix = pme->pmegrid_start_ix;
261     start_iy = pme->pmegrid_start_iy;
262     start_iz = pme->pmegrid_start_iz;
263     
264     rxx = pme->recipbox[XX][XX];
265     ryx = pme->recipbox[YY][XX];
266     ryy = pme->recipbox[YY][YY];
267     rzx = pme->recipbox[ZZ][XX];
268     rzy = pme->recipbox[ZZ][YY];
269     rzz = pme->recipbox[ZZ][ZZ];
270     
271     for(i=0; (i<atc->n); i++) {
272         xptr   = atc->x[i];
273         idxptr = atc->idx[i];
274         fptr   = atc->fractx[i];
275         
276         /* Fractional coordinates along box vectors, add 2.0 to make 100% sure we are positive for triclinic boxes */
277         tx = nx * ( xptr[XX] * rxx + xptr[YY] * ryx + xptr[ZZ] * rzx + 2.0 );
278         ty = ny * (                  xptr[YY] * ryy + xptr[ZZ] * rzy + 2.0 );
279         tz = nz * (                                   xptr[ZZ] * rzz + 2.0 );
280         
281         tix = (int)(tx);
282         tiy = (int)(ty);
283         tiz = (int)(tz);
284         
285         /* Because decomposition only occurs in x and y,
286          * we never have a fraction correction in z.
287          */
288         fptr[XX] = tx - tix + pme->fshx[tix];
289         fptr[YY] = ty - tiy + pme->fshy[tiy];
290         fptr[ZZ] = tz - tiz;   
291
292         idxptr[XX] = pme->nnx[tix];
293         idxptr[YY] = pme->nny[tiy];
294         idxptr[ZZ] = pme->nnz[tiz];
295
296 #ifdef DEBUG
297         range_check(idxptr[XX],0,pme->pmegrid_nx);
298         range_check(idxptr[YY],0,pme->pmegrid_ny);
299         range_check(idxptr[ZZ],0,pme->pmegrid_nz);
300 #endif
301   }  
302 }
303
304 static void pme_calc_pidx(int natoms, matrix recipbox, rvec x[],
305                           pme_atomcomm_t *atc)
306 {
307     int  nslab,i;
308     int  si;
309     real *xptr,s;
310     real rxx,ryx,rzx,ryy,rzy;
311     int *pd,*count;
312
313     /* Calculate PME task index (pidx) for each grid index.
314      * Here we always assign equally sized slabs to each node
315      * for load balancing reasons (the PME grid spacing is not used).
316      */
317     
318     nslab = atc->nslab;
319     pd    = atc->pd;
320     count = atc->count;
321
322     /* Reset the count */
323     for(i=0; i<nslab; i++)
324     {
325         count[i] = 0;
326     }
327     
328     if (atc->dimind == 0)
329     {
330         rxx = recipbox[XX][XX];
331         ryx = recipbox[YY][XX];
332         rzx = recipbox[ZZ][XX];
333         /* Calculate the node index in x-dimension */
334         for(i=0; (i<natoms); i++)
335         {
336             xptr   = x[i];
337             /* Fractional coordinates along box vectors */
338             s = nslab*(xptr[XX]*rxx + xptr[YY]*ryx + xptr[ZZ]*rzx);
339             si = (int)(s + 2*nslab) % nslab;
340             pd[i] = si;
341             count[si]++;
342         }
343     }
344     else
345     {
346         ryy = recipbox[YY][YY];
347         rzy = recipbox[ZZ][YY];
348         /* Calculate the node index in y-dimension */
349         for(i=0; (i<natoms); i++)
350         {
351             xptr   = x[i];
352             /* Fractional coordinates along box vectors */
353             s = nslab*(xptr[YY]*ryy + xptr[ZZ]*rzy);
354             si = (int)(s + 2*nslab) % nslab;
355             pd[i] = si;
356             count[si]++;
357         }
358     }
359 }
360
361 static void pme_realloc_atomcomm_things(pme_atomcomm_t *atc)
362 {
363     int nalloc_old,i;
364     
365     if (atc->n > atc->nalloc) {
366         nalloc_old = atc->nalloc;
367         atc->nalloc = over_alloc_dd(atc->n);
368         
369         if (atc->nslab > 1) {
370             srenew(atc->x,atc->nalloc);
371             srenew(atc->q,atc->nalloc);
372             srenew(atc->f,atc->nalloc);
373             for(i=nalloc_old; i<atc->nalloc; i++)
374             {
375                 clear_rvec(atc->f[i]);
376             }
377         }
378         if (atc->bSpread) {
379             for(i=0;i<DIM;i++) {
380                 srenew(atc->theta[i] ,atc->pme_order*atc->nalloc); 
381                 srenew(atc->dtheta[i],atc->pme_order*atc->nalloc);
382             }
383             srenew(atc->fractx,atc->nalloc); 
384             srenew(atc->idx   ,atc->nalloc);
385         }
386     }
387 }
388
389 static void pmeredist_pd(gmx_pme_t pme, bool forw,
390                          int n, bool bXF, rvec *x_f, real *charge,
391                          pme_atomcomm_t *atc)
392 /* Redistribute particle data for PME calculation */
393 /* domain decomposition by x coordinate           */
394 {
395     int *idxa;
396     int i, ii;
397     
398     if(FALSE == pme->redist_init) {
399         snew(pme->scounts,atc->nslab);
400         snew(pme->rcounts,atc->nslab);
401         snew(pme->sdispls,atc->nslab);
402         snew(pme->rdispls,atc->nslab);
403         snew(pme->sidx,atc->nslab);
404         pme->redist_init = TRUE;
405     }
406     if (n > pme->redist_buf_nalloc) {
407         pme->redist_buf_nalloc = over_alloc_dd(n);
408         srenew(pme->redist_buf,pme->redist_buf_nalloc*DIM);
409     }
410     
411     pme->idxa = atc->pd;
412
413 #ifdef GMX_MPI
414     if (forw && bXF) {
415         /* forward, redistribution from pp to pme */ 
416         
417         /* Calculate send counts and exchange them with other nodes */
418         for(i=0; (i<atc->nslab); i++) pme->scounts[i]=0;
419         for(i=0; (i<n); i++) pme->scounts[pme->idxa[i]]++;
420         MPI_Alltoall( pme->scounts, 1, MPI_INT, pme->rcounts, 1, MPI_INT, atc->mpi_comm);
421         
422         /* Calculate send and receive displacements and index into send 
423            buffer */
424         pme->sdispls[0]=0;
425         pme->rdispls[0]=0;
426         pme->sidx[0]=0;
427         for(i=1; i<atc->nslab; i++) {
428             pme->sdispls[i]=pme->sdispls[i-1]+pme->scounts[i-1];
429             pme->rdispls[i]=pme->rdispls[i-1]+pme->rcounts[i-1];
430             pme->sidx[i]=pme->sdispls[i];
431         }
432         /* Total # of particles to be received */
433         atc->n = pme->rdispls[atc->nslab-1] + pme->rcounts[atc->nslab-1];
434         
435         pme_realloc_atomcomm_things(atc);
436         
437         /* Copy particle coordinates into send buffer and exchange*/
438         for(i=0; (i<n); i++) {
439             ii=DIM*pme->sidx[pme->idxa[i]];
440             pme->sidx[pme->idxa[i]]++;
441             pme->redist_buf[ii+XX]=x_f[i][XX];
442             pme->redist_buf[ii+YY]=x_f[i][YY];
443             pme->redist_buf[ii+ZZ]=x_f[i][ZZ];
444         }
445         MPI_Alltoallv(pme->redist_buf, pme->scounts, pme->sdispls, 
446                       pme->rvec_mpi, atc->x, pme->rcounts, pme->rdispls, 
447                       pme->rvec_mpi, atc->mpi_comm);
448     }
449     if (forw) {
450         /* Copy charge into send buffer and exchange*/
451         for(i=0; i<atc->nslab; i++) pme->sidx[i]=pme->sdispls[i];
452         for(i=0; (i<n); i++) {
453             ii=pme->sidx[pme->idxa[i]];
454             pme->sidx[pme->idxa[i]]++;
455             pme->redist_buf[ii]=charge[i];
456         }
457         MPI_Alltoallv(pme->redist_buf, pme->scounts, pme->sdispls, mpi_type,
458                       atc->q, pme->rcounts, pme->rdispls, mpi_type,
459                       atc->mpi_comm);
460     }
461     else { /* backward, redistribution from pme to pp */ 
462         MPI_Alltoallv(atc->f, pme->rcounts, pme->rdispls, pme->rvec_mpi,
463                       pme->redist_buf, pme->scounts, pme->sdispls, 
464                       pme->rvec_mpi, atc->mpi_comm);
465         
466         /* Copy data from receive buffer */
467         for(i=0; i<atc->nslab; i++)
468             pme->sidx[i] = pme->sdispls[i];
469         for(i=0; (i<n); i++) {
470             ii = DIM*pme->sidx[pme->idxa[i]];
471             x_f[i][XX] += pme->redist_buf[ii+XX];
472             x_f[i][YY] += pme->redist_buf[ii+YY];
473             x_f[i][ZZ] += pme->redist_buf[ii+ZZ];
474             pme->sidx[pme->idxa[i]]++;
475         }
476     }
477 #endif 
478 }
479
480 static void pme_dd_sendrecv(pme_atomcomm_t *atc,
481                             bool bBackward,int shift,
482                             void *buf_s,int nbyte_s,
483                             void *buf_r,int nbyte_r)
484 {
485 #ifdef GMX_MPI
486     int dest,src;
487     MPI_Status stat;
488     
489     if (bBackward == FALSE) {
490         dest = atc->node_dest[shift];
491         src  = atc->node_src[shift];
492     } else {
493         dest = atc->node_src[shift];
494         src  = atc->node_dest[shift];
495     }
496     
497     if (nbyte_s > 0 && nbyte_r > 0) {
498         MPI_Sendrecv(buf_s,nbyte_s,MPI_BYTE,
499                      dest,shift,
500                      buf_r,nbyte_r,MPI_BYTE,
501                      src,shift,
502                      atc->mpi_comm,&stat);
503     } else if (nbyte_s > 0) {
504         MPI_Send(buf_s,nbyte_s,MPI_BYTE,
505                  dest,shift,
506                  atc->mpi_comm);
507     } else if (nbyte_r > 0) {
508         MPI_Recv(buf_r,nbyte_r,MPI_BYTE,
509                  src,shift,
510                  atc->mpi_comm,&stat);
511     }
512 #endif
513 }
514
515 static void dd_pmeredist_x_q(gmx_pme_t pme, 
516                              int n, bool bX, rvec *x, real *charge,
517                              pme_atomcomm_t *atc)
518 {
519     int *commnode,*buf_index;
520     int nnodes_comm,i,nsend,local_pos,buf_pos,node,scount,rcount;
521     
522     commnode  = atc->node_dest;
523     buf_index = atc->buf_index;
524     
525     nnodes_comm = min(2*atc->maxshift,atc->nslab-1);
526     
527     nsend = 0;
528     for(i=0; i<nnodes_comm; i++) {
529         buf_index[commnode[i]] = nsend;
530         nsend += atc->count[commnode[i]];
531     }
532     if (bX) {
533         if (atc->count[atc->nodeid] + nsend != n)
534             gmx_fatal(FARGS,"%d particles communicated to PME node %d are more than 2/3 times the cut-off out of the domain decomposition cell of their charge group in dimension %c.\n"
535                       "This usually means that your system is not well equilibrated.",
536                       n - (atc->count[atc->nodeid] + nsend),
537                       pme->nodeid,'x'+atc->dimind);
538         
539         if (nsend > pme->buf_nalloc) {
540             pme->buf_nalloc = over_alloc_dd(nsend);
541             srenew(pme->bufv,pme->buf_nalloc);
542             srenew(pme->bufr,pme->buf_nalloc);
543         }
544         
545         atc->n = atc->count[atc->nodeid];
546         for(i=0; i<nnodes_comm; i++) {
547             scount = atc->count[commnode[i]];
548             /* Communicate the count */
549             if (debug)
550                 fprintf(debug,"dimind %d PME node %d send to node %d: %d\n",
551                         atc->dimind,atc->nodeid,commnode[i],scount);
552             pme_dd_sendrecv(atc,FALSE,i,
553                             &scount,sizeof(int),
554                             &atc->rcount[i],sizeof(int));
555             atc->n += atc->rcount[i];
556         }
557         
558         pme_realloc_atomcomm_things(atc);
559     }
560     
561     local_pos = 0;
562     for(i=0; i<n; i++) {
563         node = atc->pd[i];
564         if (node == atc->nodeid) {
565             /* Copy direct to the receive buffer */
566             if (bX) {
567                 copy_rvec(x[i],atc->x[local_pos]);
568             }
569             atc->q[local_pos] = charge[i];
570             local_pos++;
571         } else {
572             /* Copy to the send buffer */
573             if (bX) {
574                 copy_rvec(x[i],pme->bufv[buf_index[node]]);
575             }
576             pme->bufr[buf_index[node]] = charge[i];
577             buf_index[node]++;
578         }
579     }
580     
581     buf_pos = 0;
582     for(i=0; i<nnodes_comm; i++) {
583         scount = atc->count[commnode[i]];
584         rcount = atc->rcount[i];
585         if (scount > 0 || rcount > 0) {
586             if (bX) {
587                 /* Communicate the coordinates */
588                 pme_dd_sendrecv(atc,FALSE,i,
589                                 pme->bufv[buf_pos],scount*sizeof(rvec),
590                                 atc->x[local_pos],rcount*sizeof(rvec));
591             }
592             /* Communicate the charges */
593             pme_dd_sendrecv(atc,FALSE,i,
594                             pme->bufr+buf_pos,scount*sizeof(real),
595                             atc->q+local_pos,rcount*sizeof(real));
596             buf_pos   += scount;
597             local_pos += atc->rcount[i];
598         }
599     }
600 }
601
602 static void dd_pmeredist_f(gmx_pme_t pme, pme_atomcomm_t *atc,
603                            int n, rvec *f,
604                            bool bAddF)
605 {
606   int *commnode,*buf_index;
607   int nnodes_comm,local_pos,buf_pos,i,scount,rcount,node;
608
609   commnode  = atc->node_dest;
610   buf_index = atc->buf_index;
611
612   nnodes_comm = min(2*atc->maxshift,atc->nslab-1);
613
614   local_pos = atc->count[atc->nodeid];
615   buf_pos = 0;
616   for(i=0; i<nnodes_comm; i++) {
617     scount = atc->rcount[i];
618     rcount = atc->count[commnode[i]];
619     if (scount > 0 || rcount > 0) {
620       /* Communicate the forces */
621       pme_dd_sendrecv(atc,TRUE,i,
622                       atc->f[local_pos],scount*sizeof(rvec),
623                       pme->bufv[buf_pos],rcount*sizeof(rvec));
624       local_pos += scount;
625     }
626     buf_index[commnode[i]] = buf_pos;
627     buf_pos   += rcount;
628   }
629
630     local_pos = 0;
631     if (bAddF)
632     {
633         for(i=0; i<n; i++)
634         {
635             node = atc->pd[i];
636             if (node == atc->nodeid)
637             {
638                 /* Add from the local force array */
639                 rvec_inc(f[i],atc->f[local_pos]);
640                 local_pos++;
641             }
642             else
643             {
644                 /* Add from the receive buffer */
645                 rvec_inc(f[i],pme->bufv[buf_index[node]]);
646                 buf_index[node]++;
647             }
648         }
649     }
650     else
651     {
652         for(i=0; i<n; i++)
653         {
654             node = atc->pd[i];
655             if (node == atc->nodeid)
656             {
657                 /* Copy from the local force array */
658                 copy_rvec(atc->f[local_pos],f[i]);
659                 local_pos++;
660             }
661             else
662             {
663                 /* Copy from the receive buffer */
664                 copy_rvec(pme->bufv[buf_index[node]],f[i]);
665                 buf_index[node]++;
666             }
667         }
668     }
669 }
670
671 #ifdef GMX_MPI
672 static void 
673 gmx_sum_qgrid_dd(gmx_pme_t pme, real *grid, int direction)
674 {
675     pme_overlap_t *overlap;
676     int send_index0,send_nindex;
677     int recv_index0,recv_nindex;
678     MPI_Status stat;
679     int i,j,k,ix,iy,iz,icnt;
680     int ipulse,send_id,recv_id,datasize;
681     real *p;
682     real *sendptr,*recvptr;
683     
684     /* Start with minor-rank communication. This is a bit of a pain since it is not contiguous */
685     overlap = &pme->overlap[1];
686     
687     for(ipulse=0;ipulse<overlap->noverlap_nodes;ipulse++)
688     {
689         /* Since we have already (un)wrapped the overlap in the z-dimension,
690          * we only have to communicate 0 to nkz (not pmegrid_nz).
691          */
692         if (direction==GMX_SUM_QGRID_FORWARD)
693         {
694             send_id = overlap->send_id[ipulse];
695             recv_id = overlap->recv_id[ipulse];
696             send_index0   = overlap->comm_data[ipulse].send_index0;
697             send_nindex   = overlap->comm_data[ipulse].send_nindex;
698             recv_index0   = overlap->comm_data[ipulse].recv_index0;
699             recv_nindex   = overlap->comm_data[ipulse].recv_nindex;
700         }
701         else
702         {
703             send_id = overlap->recv_id[ipulse];
704             recv_id = overlap->send_id[ipulse];
705             send_index0   = overlap->comm_data[ipulse].recv_index0;
706             send_nindex   = overlap->comm_data[ipulse].recv_nindex;            
707             recv_index0   = overlap->comm_data[ipulse].send_index0;
708             recv_nindex   = overlap->comm_data[ipulse].send_nindex;
709         }
710
711         /* Copy data to contiguous send buffer */
712         if (debug)
713         {
714             fprintf(debug,"PME send node %d %d -> %d grid start %d Communicating %d to %d\n",
715                     pme->nodeid,overlap->nodeid,send_id,
716                     pme->pmegrid_start_iy,
717                     send_index0-pme->pmegrid_start_iy,
718                     send_index0-pme->pmegrid_start_iy+send_nindex);
719         }
720         icnt = 0;
721         for(i=0;i<pme->pmegrid_nx;i++)
722         {
723             ix = i;
724             for(j=0;j<send_nindex;j++)
725             {
726                 iy = j + send_index0 - pme->pmegrid_start_iy;
727                 for(k=0;k<pme->nkz;k++)
728                 {
729                     iz = k;
730                     pme->pmegrid_sendbuf[icnt++] = grid[ix*(pme->pmegrid_ny*pme->pmegrid_nz)+iy*(pme->pmegrid_nz)+iz];
731                 }
732             }
733         }
734             
735         datasize      = pme->pmegrid_nx * pme->nkz;
736         
737         MPI_Sendrecv(pme->pmegrid_sendbuf,send_nindex*datasize,GMX_MPI_REAL,
738                      send_id,ipulse,
739                      pme->pmegrid_recvbuf,recv_nindex*datasize,GMX_MPI_REAL,
740                      recv_id,ipulse,
741                      overlap->mpi_comm,&stat);
742         
743         /* Get data from contiguous recv buffer */
744         if (debug)
745         {
746             fprintf(debug,"PME recv node %d %d <- %d grid start %d Communicating %d to %d\n",
747                     pme->nodeid,overlap->nodeid,recv_id,
748                     pme->pmegrid_start_iy,
749                     recv_index0-pme->pmegrid_start_iy,
750                     recv_index0-pme->pmegrid_start_iy+recv_nindex);
751         }
752         icnt = 0;
753         for(i=0;i<pme->pmegrid_nx;i++)
754         {
755             ix = i;
756             for(j=0;j<recv_nindex;j++)
757             {
758                 iy = j + recv_index0 - pme->pmegrid_start_iy;
759                 for(k=0;k<pme->nkz;k++)
760                 {
761                     iz = k;
762                     if(direction==GMX_SUM_QGRID_FORWARD)
763                     {
764                         grid[ix*(pme->pmegrid_ny*pme->pmegrid_nz)+iy*(pme->pmegrid_nz)+iz] += pme->pmegrid_recvbuf[icnt++];
765                     }
766                     else
767                     {
768                         grid[ix*(pme->pmegrid_ny*pme->pmegrid_nz)+iy*(pme->pmegrid_nz)+iz]  = pme->pmegrid_recvbuf[icnt++];
769                     }
770                 }
771             }
772         }
773     }
774     
775     /* Major dimension is easier, no copying required,
776      * but we might have to sum to separate array.
777      * Since we don't copy, we have to communicate up to pmegrid_nz,
778      * not nkz as for the minor direction.
779      */
780     overlap = &pme->overlap[0];
781     
782     for(ipulse=0;ipulse<overlap->noverlap_nodes;ipulse++)
783     {
784         if(direction==GMX_SUM_QGRID_FORWARD)
785         {
786             send_id = overlap->send_id[ipulse];
787             recv_id = overlap->recv_id[ipulse];
788             send_index0   = overlap->comm_data[ipulse].send_index0;
789             send_nindex   = overlap->comm_data[ipulse].send_nindex;
790             recv_index0   = overlap->comm_data[ipulse].recv_index0;
791             recv_nindex   = overlap->comm_data[ipulse].recv_nindex;
792             recvptr   = pme->pmegrid_recvbuf;
793         }
794         else
795         {
796             send_id = overlap->recv_id[ipulse];
797             recv_id = overlap->send_id[ipulse];
798             send_index0   = overlap->comm_data[ipulse].recv_index0;
799             send_nindex   = overlap->comm_data[ipulse].recv_nindex;            
800             recv_index0   = overlap->comm_data[ipulse].send_index0;
801             recv_nindex   = overlap->comm_data[ipulse].send_nindex;
802             recvptr   = grid + (recv_index0-pme->pmegrid_start_ix)*(pme->pmegrid_ny*pme->pmegrid_nz);
803         }
804                 
805         sendptr       = grid + (send_index0-pme->pmegrid_start_ix)*(pme->pmegrid_ny*pme->pmegrid_nz);
806         datasize      = pme->pmegrid_ny * pme->pmegrid_nz;
807
808         if (debug)
809         {
810             fprintf(debug,"PME send node %d %d -> %d grid start %d Communicating %d to %d\n",
811                     pme->nodeid,overlap->nodeid,send_id,
812                     pme->pmegrid_start_ix,
813                     send_index0-pme->pmegrid_start_ix,
814                     send_index0-pme->pmegrid_start_ix+send_nindex);
815             fprintf(debug,"PME recv node %d %d <- %d grid start %d Communicating %d to %d\n",
816                     pme->nodeid,overlap->nodeid,recv_id,
817                     pme->pmegrid_start_ix,
818                     recv_index0-pme->pmegrid_start_ix,
819                     recv_index0-pme->pmegrid_start_ix+recv_nindex);
820         }
821
822         MPI_Sendrecv(sendptr,send_nindex*datasize,GMX_MPI_REAL,
823                      send_id,ipulse,
824                      recvptr,recv_nindex*datasize,GMX_MPI_REAL,
825                      recv_id,ipulse,
826                      overlap->mpi_comm,&stat);
827         
828         /* ADD data from contiguous recv buffer */
829         if(direction==GMX_SUM_QGRID_FORWARD)
830         {        
831             p = grid + (recv_index0-pme->pmegrid_start_ix)*(pme->pmegrid_ny*pme->pmegrid_nz);
832             for(i=0;i<recv_nindex*datasize;i++)
833             {
834                 p[i] += pme->pmegrid_recvbuf[i];
835             }
836         }
837     }
838 }
839 #endif
840
841
842 static int
843 copy_pmegrid_to_fftgrid(gmx_pme_t pme, real *pmegrid, real *fftgrid)
844 {
845     ivec    local_fft_ndata,local_fft_offset,local_fft_size;
846     ivec    local_pme_size;
847     int     i,ix,iy,iz;
848     int     pmeidx,fftidx;
849
850     /* Dimensions should be identical for A/B grid, so we just use A here */
851     gmx_parallel_3dfft_real_limits(pme->pfft_setupA,
852                                    local_fft_ndata,
853                                    local_fft_offset,
854                                    local_fft_size);
855     
856     local_pme_size[0] = pme->pmegrid_nx;
857     local_pme_size[1] = pme->pmegrid_ny;
858     local_pme_size[2] = pme->pmegrid_nz;
859     
860     /* The fftgrid is always 'justified' to the lower-left corner of the PME grid, 
861      the offset is identical, and the PME grid always has more data (due to overlap)
862      */
863     {
864 #ifdef DEBUG_PME
865         FILE *fp,*fp2;
866         char fn[STRLEN],format[STRLEN];
867         real val;
868         sprintf(fn,"pmegrid%d.pdb",pme->nodeid);
869         fp = ffopen(fn,"w");
870         sprintf(fn,"pmegrid%d.txt",pme->nodeid);
871         fp2 = ffopen(fn,"w");
872      sprintf(format,"%s%s\n",pdbformat,"%6.2f%6.2f");
873 #endif
874     for(ix=0;ix<local_fft_ndata[XX];ix++)
875     {
876         for(iy=0;iy<local_fft_ndata[YY];iy++)
877         {
878             for(iz=0;iz<local_fft_ndata[ZZ];iz++)
879             {
880                 pmeidx = ix*(local_pme_size[YY]*local_pme_size[ZZ])+iy*(local_pme_size[ZZ])+iz;
881                 fftidx = ix*(local_fft_size[YY]*local_fft_size[ZZ])+iy*(local_fft_size[ZZ])+iz;
882                 fftgrid[fftidx] = pmegrid[pmeidx];
883 #ifdef DEBUG_PME
884                 val = 100*pmegrid[pmeidx];
885                 if (pmegrid[pmeidx] != 0)
886                 fprintf(fp,format,"ATOM",pmeidx,"CA","GLY",' ',pmeidx,' ',
887                         5.0*ix,5.0*iy,5.0*iz,1.0,val);
888                 if (pmegrid[pmeidx] != 0)
889                     fprintf(fp2,"%-12s  %5d  %5d  %5d  %12.5e\n",
890                             "qgrid",
891                             pme->pmegrid_start_ix + ix,
892                             pme->pmegrid_start_iy + iy,
893                             pme->pmegrid_start_iz + iz,
894                             pmegrid[pmeidx]);
895 #endif
896             }
897         }
898     }
899 #ifdef DEBUG_PME
900     fclose(fp);
901     fclose(fp2);
902 #endif
903     }
904     return 0;
905 }
906
907
908 static int
909 copy_fftgrid_to_pmegrid(gmx_pme_t pme, real *fftgrid, real *pmegrid)
910 {
911     ivec    local_fft_ndata,local_fft_offset,local_fft_size;
912     ivec    local_pme_size;
913     int     i,ix,iy,iz;
914     int     pmeidx,fftidx;
915     
916     /* Dimensions should be identical for A/B grid, so we just use A here */
917     gmx_parallel_3dfft_real_limits(pme->pfft_setupA,
918                                    local_fft_ndata,
919                                    local_fft_offset,
920                                    local_fft_size);
921
922     local_pme_size[0] = pme->pmegrid_nx;
923     local_pme_size[1] = pme->pmegrid_ny;
924     local_pme_size[2] = pme->pmegrid_nz;
925     
926     /* The fftgrid is always 'justified' to the lower-left corner of the PME grid, 
927      the offset is identical, and the PME grid always has more data (due to overlap)
928      */
929     for(ix=0;ix<local_fft_ndata[XX];ix++)
930     {
931         for(iy=0;iy<local_fft_ndata[YY];iy++)
932         {
933             for(iz=0;iz<local_fft_ndata[ZZ];iz++)
934             {
935                 pmeidx = ix*(local_pme_size[YY]*local_pme_size[ZZ])+iy*(local_pme_size[ZZ])+iz;
936                 fftidx = ix*(local_fft_size[YY]*local_fft_size[ZZ])+iy*(local_fft_size[ZZ])+iz;
937                 pmegrid[pmeidx] = fftgrid[fftidx];
938             }
939         }
940     }   
941     return 0;
942 }
943
944
945 static void
946 wrap_periodic_pmegrid(gmx_pme_t pme, real *pmegrid)
947 {
948     int     nx,ny,nz,pnx,pny,pnz,ny_x,overlap,ix,iy,iz;
949
950     nx = pme->nkx;
951     ny = pme->nky;
952     nz = pme->nkz;
953
954     pnx = pme->pmegrid_nx;
955     pny = pme->pmegrid_ny;
956     pnz = pme->pmegrid_nz;
957
958     overlap = pme->pme_order - 1;
959
960     /* Add periodic overlap in z */
961     for(ix=0; ix<pnx; ix++)
962     {
963         for(iy=0; iy<pny; iy++)
964         {
965             for(iz=0; iz<overlap; iz++)
966             {
967                 pmegrid[(ix*pny+iy)*pnz+iz] +=
968                     pmegrid[(ix*pny+iy)*pnz+nz+iz];
969             }
970         }
971     }
972
973     if (pme->nnodes_minor == 1)
974     {
975        for(ix=0; ix<pnx; ix++)
976        {
977            for(iy=0; iy<overlap; iy++)
978            {
979                for(iz=0; iz<nz; iz++)
980                {
981                    pmegrid[(ix*pny+iy)*pnz+iz] +=
982                        pmegrid[(ix*pny+ny+iy)*pnz+iz];
983                }
984            }
985        }
986     }
987      
988     if (pme->nnodes_major == 1)
989     {
990         ny_x = (pme->nnodes_minor == 1 ? ny : pny);
991
992         for(ix=0; ix<overlap; ix++)
993         {
994             for(iy=0; iy<ny_x; iy++)
995             {
996                 for(iz=0; iz<nz; iz++)
997                 {
998                     pmegrid[(ix*pny+iy)*pnz+iz] +=
999                         pmegrid[((nx+ix)*pny+iy)*pnz+iz];
1000                 }
1001             }
1002         }
1003     }
1004 }
1005
1006
1007 static void
1008 unwrap_periodic_pmegrid(gmx_pme_t pme, real *pmegrid)
1009 {
1010     int     nx,ny,nz,pnx,pny,pnz,ny_x,overlap,ix,iy,iz;
1011
1012     nx = pme->nkx;
1013     ny = pme->nky;
1014     nz = pme->nkz;
1015
1016     pnx = pme->pmegrid_nx;
1017     pny = pme->pmegrid_ny;
1018     pnz = pme->pmegrid_nz;
1019
1020     overlap = pme->pme_order - 1;
1021
1022     if (pme->nnodes_major == 1)
1023     {
1024         ny_x = (pme->nnodes_minor == 1 ? ny : pny);
1025
1026         for(ix=0; ix<overlap; ix++)
1027         {
1028             for(iy=0; iy<ny_x; iy++)
1029             {
1030                 for(iz=0; iz<nz; iz++)
1031                 {
1032                     pmegrid[((nx+ix)*pny+iy)*pnz+iz] =
1033                         pmegrid[(ix*pny+iy)*pnz+iz];
1034                 }
1035             }
1036         }
1037     }
1038
1039     if (pme->nnodes_minor == 1)
1040     {
1041        for(ix=0; ix<pnx; ix++)
1042        {
1043            for(iy=0; iy<overlap; iy++)
1044            {
1045                for(iz=0; iz<nz; iz++)
1046                {
1047                    pmegrid[(ix*pny+ny+iy)*pnz+iz] =
1048                        pmegrid[(ix*pny+iy)*pnz+iz];
1049                }
1050            }
1051        }
1052     }
1053
1054     /* Copy periodic overlap in z */
1055     for(ix=0; ix<pnx; ix++)
1056     {
1057         for(iy=0; iy<pny; iy++)
1058         {
1059             for(iz=0; iz<overlap; iz++)
1060             {
1061                 pmegrid[(ix*pny+iy)*pnz+nz+iz] =
1062                     pmegrid[(ix*pny+iy)*pnz+iz];
1063             }
1064         }
1065     }
1066 }
1067
1068
1069 /* This has to be a macro to enable full compiler optimization with xlC (and probably others too) */
1070 #define DO_BSPLINE(order)                            \
1071 for(ithx=0; (ithx<order); ithx++)                    \
1072 {                                                    \
1073     index_x = (i0+ithx)*pny*pnz;                     \
1074     valx    = qn*thx[ithx];                          \
1075                                                      \
1076     for(ithy=0; (ithy<order); ithy++)                \
1077     {                                                \
1078         valxy    = valx*thy[ithy];                   \
1079         index_xy = index_x+(j0+ithy)*pnz;            \
1080                                                      \
1081         for(ithz=0; (ithz<order); ithz++)            \
1082         {                                            \
1083             index_xyz        = index_xy+(k0+ithz);   \
1084             grid[index_xyz] += valxy*thz[ithz];      \
1085         }                                            \
1086     }                                                \
1087 }
1088
1089
1090 static void spread_q_bsplines(gmx_pme_t pme, pme_atomcomm_t *atc, 
1091                               real *grid)
1092 {
1093
1094     /* spread charges from home atoms to local grid */
1095     pme_overlap_t *ol;
1096     int      b,i,nn,n,ithx,ithy,ithz,i0,j0,k0;
1097     int *    idxptr;
1098     int      order,norder,index_x,index_xy,index_xyz;
1099     real     valx,valxy,qn;
1100     real     *thx,*thy,*thz;
1101     int      localsize, bndsize;
1102   
1103     int      pnx,pny,pnz,ndatatot;
1104   
1105     pnx = pme->pmegrid_nx;
1106     pny = pme->pmegrid_ny;
1107     pnz = pme->pmegrid_nz;
1108     ndatatot = pnx*pny*pnz;
1109     
1110     for(i=0;i<ndatatot;i++)
1111     {
1112         grid[i] = 0;
1113     }
1114
1115     order = pme->pme_order;
1116
1117     for(nn=0; (nn<atc->n);nn++) 
1118     {
1119         n      = nn;
1120         qn     = atc->q[n];
1121
1122         if (qn != 0) 
1123         {
1124             idxptr = atc->idx[n];
1125             norder = n*order;
1126             
1127             i0   = idxptr[XX]; 
1128             j0   = idxptr[YY];
1129             k0   = idxptr[ZZ];
1130             thx = atc->theta[XX] + norder;
1131             thy = atc->theta[YY] + norder;
1132             thz = atc->theta[ZZ] + norder;
1133             
1134             switch (order) {
1135             case 4:  DO_BSPLINE(4);     break;
1136             case 5:  DO_BSPLINE(5);     break;
1137             default: DO_BSPLINE(order); break;
1138             }
1139         }
1140     }   
1141 }
1142
1143
1144 #if ( !defined(GMX_DOUBLE) && ( defined(GMX_IA32_SSE) || defined(GMX_X86_64_SSE) || defined(GMX_X86_64_SSE2) ) )
1145     /* Calculate exponentials through SSE in float precision */
1146 #define CALC_EXPONENTIALS(start,end,r_aligned)      \
1147     {                                               \
1148         __m128 tmp_sse;                             \
1149         for(kx=0; kx<end; kx+=4)                    \
1150         {                                           \
1151             tmp_sse = _mm_load_ps(r_aligned+kx);    \
1152             tmp_sse = gmx_mm_exp_ps(tmp_sse);       \
1153             _mm_store_ps(r_aligned+kx,tmp_sse);     \
1154         }                                           \
1155     }
1156 #else
1157 #define CALC_EXPONENTIALS(start,end,r)          \
1158     for(kx=start; kx<end; kx++)                 \
1159     {                                           \
1160         r[kx] = exp(r[kx]);                     \
1161     }
1162 #endif
1163
1164
1165 static int solve_pme_yzx(gmx_pme_t pme,t_complex *grid,
1166                          real ewaldcoeff,real vol,
1167                          bool bEnerVir,real *mesh_energy,matrix vir)
1168 {
1169     /* do recip sum over local cells in grid */
1170     /* y major, z middle, x minor or continuous */
1171     t_complex *p0;
1172     int     kx,ky,kz,maxkx,maxky,maxkz;
1173     int     nx,ny,nz,iy,iz,kxstart,kxend;
1174     real    mx,my,mz;
1175     real    factor=M_PI*M_PI/(ewaldcoeff*ewaldcoeff);
1176     real    ets2,struct2,vfactor,ets2vf;
1177     real    eterm,d1,d2,energy=0;
1178     real    by,bz;
1179     real    virxx=0,virxy=0,virxz=0,viryy=0,viryz=0,virzz=0;
1180     real    rxx,ryx,ryy,rzx,rzy,rzz;
1181         real    *mhx,*mhy,*mhz,*m2,*denom,*tmp1,*m2inv;
1182     real    mhxk,mhyk,mhzk,m2k;
1183     real    corner_fac;
1184     ivec    complex_order;
1185     ivec    local_ndata,local_offset,local_size;
1186     
1187     nx = pme->nkx;
1188     ny = pme->nky;
1189     nz = pme->nkz;
1190     
1191     /* Dimensions should be identical for A/B grid, so we just use A here */
1192     gmx_parallel_3dfft_complex_limits(pme->pfft_setupA,
1193                                       complex_order,
1194                                       local_ndata,
1195                                       local_offset,
1196                                       local_size);
1197     
1198     rxx = pme->recipbox[XX][XX];
1199     ryx = pme->recipbox[YY][XX];
1200     ryy = pme->recipbox[YY][YY];
1201     rzx = pme->recipbox[ZZ][XX];
1202     rzy = pme->recipbox[ZZ][YY];
1203     rzz = pme->recipbox[ZZ][ZZ];
1204     
1205     maxkx = (nx+1)/2;
1206     maxky = (ny+1)/2;
1207     maxkz = nz/2+1;
1208         
1209         mhx   = pme->work_mhx;
1210         mhy   = pme->work_mhy;
1211         mhz   = pme->work_mhz;
1212         m2    = pme->work_m2;
1213         denom = pme->work_denom;
1214         tmp1  = pme->work_tmp1;
1215         m2inv = pme->work_m2inv;        
1216
1217     for(iy=0;iy<local_ndata[YY];iy++)
1218     {
1219         ky = iy + local_offset[YY];
1220         
1221         if (ky < maxky) 
1222         {
1223             my = ky;
1224         }
1225         else 
1226         {
1227             my = (ky - ny);
1228         }
1229         
1230         by = M_PI*vol*pme->bsp_mod[YY][ky];
1231
1232         for(iz=0;iz<local_ndata[ZZ];iz++)
1233         {
1234             kz = iz + local_offset[ZZ];
1235             
1236             mz = kz;
1237
1238             bz = pme->bsp_mod[ZZ][kz];
1239             
1240             /* 0.5 correction for corner points */
1241                         corner_fac = 1;
1242             if (kz == 0)
1243                 corner_fac = 0.5;
1244             if (kz == (nz+1)/2)
1245                 corner_fac = 0.5;
1246                       
1247             p0 = grid + iy*local_size[ZZ]*local_size[XX] + iz*local_size[XX];
1248             
1249             /* We should skip the k-space point (0,0,0) */
1250             if (local_offset[XX] > 0 ||
1251                 local_offset[YY] > 0 || ky > 0 ||
1252                 kz > 0)
1253             {
1254                 kxstart = local_offset[XX];
1255             }
1256             else
1257             {
1258                 kxstart = local_offset[XX] + 1;
1259                 p0++;
1260             }
1261             kxend = local_offset[XX] + local_ndata[XX];
1262                         
1263             if (bEnerVir)
1264             {
1265                 /* More expensive inner loop, especially because of the storage
1266                  * of the mh elements in array's.
1267                  * Because x is the minor grid index, all mh elements
1268                  * depend on kx for triclinic unit cells.
1269                  */
1270
1271                 /* Two explicit loops to avoid a conditional inside the loop */
1272                 for(kx=kxstart; kx<maxkx; kx++)
1273                 {
1274                     mx = kx;
1275                     
1276                     mhxk      = mx * rxx;
1277                     mhyk      = mx * ryx + my * ryy;
1278                     mhzk      = mx * rzx + my * rzy + mz * rzz;
1279                     m2k       = mhxk*mhxk + mhyk*mhyk + mhzk*mhzk;
1280                     mhx[kx]   = mhxk;
1281                     mhy[kx]   = mhyk;
1282                     mhz[kx]   = mhzk;
1283                     m2[kx]    = m2k;
1284                     denom[kx] = m2k*bz*by*pme->bsp_mod[XX][kx];
1285                     tmp1[kx]  = -factor*m2k;
1286                 }
1287                 
1288                 for(kx=maxkx; kx<kxend; kx++)
1289                 {
1290                     mx = (kx - nx);
1291
1292                     mhxk      = mx * rxx;
1293                     mhyk      = mx * ryx + my * ryy;
1294                     mhzk      = mx * rzx + my * rzy + mz * rzz;
1295                     m2k       = mhxk*mhxk + mhyk*mhyk + mhzk*mhzk;
1296                     mhx[kx]   = mhxk;
1297                     mhy[kx]   = mhyk;
1298                     mhz[kx]   = mhzk;
1299                     m2[kx]    = m2k;
1300                     denom[kx] = m2k*bz*by*pme->bsp_mod[XX][kx];
1301                     tmp1[kx]  = -factor*m2k;
1302                 }
1303                 
1304                 for(kx=kxstart; kx<kxend; kx++)
1305                 {
1306                     m2inv[kx] = 1.0/m2[kx];
1307                 }
1308                 for(kx=kxstart; kx<kxend; kx++)
1309                 {
1310                     denom[kx] = 1.0/denom[kx];
1311                 }
1312
1313                 CALC_EXPONENTIALS(kxstart,kxend,tmp1);
1314
1315                 for(kx=kxstart; kx<kxend; kx++,p0++)
1316                 {
1317                     d1      = p0->re;
1318                     d2      = p0->im;
1319                     
1320                     eterm    = ONE_4PI_EPS0/pme->epsilon_r*tmp1[kx]*denom[kx];
1321                     
1322                     p0->re  = d1*eterm;
1323                     p0->im  = d2*eterm;
1324                     
1325                     struct2 = 2.0*(d1*d1+d2*d2);
1326                     
1327                     tmp1[kx] = eterm*struct2;
1328                 }
1329                 
1330                 for(kx=kxstart; kx<kxend; kx++)
1331                 {
1332                     ets2     = corner_fac*tmp1[kx];
1333                     vfactor  = (factor*m2[kx] + 1.0)*2.0*m2inv[kx];
1334                     energy  += ets2;
1335                     
1336                     ets2vf   = ets2*vfactor;
1337                     virxx   += ets2vf*mhx[kx]*mhx[kx] - ets2;
1338                     virxy   += ets2vf*mhx[kx]*mhy[kx];
1339                     virxz   += ets2vf*mhx[kx]*mhz[kx];
1340                     viryy   += ets2vf*mhy[kx]*mhy[kx] - ets2;
1341                     viryz   += ets2vf*mhy[kx]*mhz[kx];
1342                     virzz   += ets2vf*mhz[kx]*mhz[kx] - ets2;
1343                 }
1344             }
1345             else
1346             {
1347                 /* We don't need to calculate the energy and the virial.
1348                  * In this case the triclinic overhead is small.
1349                  */
1350
1351                 /* Two explicit loops to avoid a conditional inside the loop */
1352
1353                 for(kx=kxstart; kx<maxkx; kx++)
1354                 {
1355                     mx = kx;
1356                     
1357                     mhxk      = mx * rxx;
1358                     mhyk      = mx * ryx + my * ryy;
1359                     mhzk      = mx * rzx + my * rzy + mz * rzz;
1360                     m2k       = mhxk*mhxk + mhyk*mhyk + mhzk*mhzk;
1361                     denom[kx] = m2k*bz*by*pme->bsp_mod[XX][kx];
1362                     tmp1[kx]  = -factor*m2k;
1363                 }
1364                 
1365                 for(kx=maxkx; kx<kxend; kx++)
1366                 {
1367                     mx = (kx - nx);
1368                     
1369                     mhxk      = mx * rxx;
1370                     mhyk      = mx * ryx + my * ryy;
1371                     mhzk      = mx * rzx + my * rzy + mz * rzz;
1372                     m2k       = mhxk*mhxk + mhyk*mhyk + mhzk*mhzk;
1373                     denom[kx] = m2k*bz*by*pme->bsp_mod[XX][kx];
1374                     tmp1[kx]  = -factor*m2k;
1375                 }
1376                 
1377                 for(kx=kxstart; kx<kxend; kx++)
1378                 {
1379                     denom[kx] = 1.0/denom[kx];
1380                 }
1381
1382                 CALC_EXPONENTIALS(kxstart,kxend,tmp1);
1383                
1384                 for(kx=kxstart; kx<kxend; kx++,p0++)
1385                 {
1386                     d1      = p0->re;
1387                     d2      = p0->im;
1388                     
1389                     eterm    = ONE_4PI_EPS0/pme->epsilon_r*tmp1[kx]*denom[kx];
1390                     
1391                     p0->re  = d1*eterm;
1392                     p0->im  = d2*eterm;
1393                 }
1394             }
1395         }
1396     }
1397     
1398     if (bEnerVir)
1399     {
1400         /* Update virial with local values.
1401          * The virial is symmetric by definition.
1402          * this virial seems ok for isotropic scaling, but I'm
1403          * experiencing problems on semiisotropic membranes.
1404          * IS THAT COMMENT STILL VALID??? (DvdS, 2001/02/07).
1405          */
1406         vir[XX][XX] = 0.25*virxx;
1407         vir[YY][YY] = 0.25*viryy;
1408         vir[ZZ][ZZ] = 0.25*virzz;
1409         vir[XX][YY] = vir[YY][XX] = 0.25*virxy;
1410         vir[XX][ZZ] = vir[ZZ][XX] = 0.25*virxz;
1411         vir[YY][ZZ] = vir[ZZ][YY] = 0.25*viryz;
1412         
1413         /* This energy should be corrected for a charged system */
1414         *mesh_energy = 0.5*energy;
1415     }
1416
1417     /* Return the loop count */
1418     return local_ndata[YY]*local_ndata[ZZ]*local_ndata[XX];
1419 }
1420
1421
1422 #define DO_FSPLINE(order)                      \
1423 for(ithx=0; (ithx<order); ithx++)              \
1424 {                                                                          \
1425     index_x = (i0+ithx)*pny*pnz;               \
1426     tx      = thx[ithx];                       \
1427     dx      = dthx[ithx];                      \
1428                                                \
1429     for(ithy=0; (ithy<order); ithy++)          \
1430     {                                                                              \
1431         index_xy = index_x+(j0+ithy)*pnz;      \
1432         ty       = thy[ithy];                  \
1433         dy       = dthy[ithy];                 \
1434         fxy1     = fz1 = 0;                    \
1435                                                \
1436         for(ithz=0; (ithz<order); ithz++)      \
1437         {                                                                  \
1438             gval  = grid[index_xy+(k0+ithz)];  \
1439             fxy1 += thz[ithz]*gval;            \
1440             fz1  += dthz[ithz]*gval;           \
1441         }                                      \
1442         fx += dx*ty*fxy1;                      \
1443         fy += tx*dy*fxy1;                      \
1444         fz += tx*ty*fz1;                       \
1445     }                                          \
1446 }
1447
1448
1449 void gather_f_bsplines(gmx_pme_t pme,real *grid,
1450                        bool bClearF,pme_atomcomm_t *atc,real scale)
1451 {
1452     /* sum forces for local particles */  
1453     int     nn,n,ithx,ithy,ithz,i0,j0,k0;
1454     int     index_x,index_xy;
1455     int     nx,ny,nz,pnx,pny,pnz;
1456     int *   idxptr;
1457     real    tx,ty,dx,dy,qn;
1458     real    fx,fy,fz,gval;
1459     real    fxy1,fz1;
1460     real    *thx,*thy,*thz,*dthx,*dthy,*dthz;
1461     int     norder;
1462     real    rxx,ryx,ryy,rzx,rzy,rzz;
1463     int     order;
1464     
1465     order = pme->pme_order;
1466     thx   = atc->theta[XX];
1467     thy   = atc->theta[YY];
1468     thz   = atc->theta[ZZ];
1469     dthx  = atc->dtheta[XX];
1470     dthy  = atc->dtheta[YY];
1471     dthz  = atc->dtheta[ZZ];
1472     nx    = pme->nkx;
1473     ny    = pme->nky;
1474     nz    = pme->nkz;
1475     pnx   = pme->pmegrid_nx;
1476     pny   = pme->pmegrid_ny;
1477     pnz   = pme->pmegrid_nz;
1478     
1479     rxx   = pme->recipbox[XX][XX];
1480     ryx   = pme->recipbox[YY][XX];
1481     ryy   = pme->recipbox[YY][YY];
1482     rzx   = pme->recipbox[ZZ][XX];
1483     rzy   = pme->recipbox[ZZ][YY];
1484     rzz   = pme->recipbox[ZZ][ZZ];
1485
1486     for(nn=0; (nn<atc->n); nn++) 
1487     {
1488         n = nn;
1489         qn      = scale*atc->q[n];
1490         
1491         if (bClearF) 
1492         {
1493             atc->f[n][XX] = 0;
1494             atc->f[n][YY] = 0;
1495             atc->f[n][ZZ] = 0;
1496         }
1497         if (qn != 0) 
1498         {
1499             fx     = 0;
1500             fy     = 0;
1501             fz     = 0;
1502             idxptr = atc->idx[n];
1503             norder = n*order;
1504             
1505             i0   = idxptr[XX]; 
1506             j0   = idxptr[YY];
1507             k0   = idxptr[ZZ];
1508             
1509             /* Pointer arithmetic alert, next six statements */
1510             thx  = atc->theta[XX] + norder;
1511             thy  = atc->theta[YY] + norder;
1512             thz  = atc->theta[ZZ] + norder;
1513             dthx = atc->dtheta[XX] + norder;
1514             dthy = atc->dtheta[YY] + norder;
1515             dthz = atc->dtheta[ZZ] + norder;
1516             
1517             switch (order) {
1518             case 4:  DO_FSPLINE(4);     break;
1519             case 5:  DO_FSPLINE(5);     break;
1520             default: DO_FSPLINE(order); break;
1521             }
1522
1523             atc->f[n][XX] += -qn*( fx*nx*rxx );
1524             atc->f[n][YY] += -qn*( fx*nx*ryx + fy*ny*ryy );
1525             atc->f[n][ZZ] += -qn*( fx*nx*rzx + fy*ny*rzy + fz*nz*rzz );
1526         }
1527     }
1528     /* Since the energy and not forces are interpolated
1529      * the net force might not be exactly zero.
1530      * This can be solved by also interpolating F, but
1531      * that comes at a cost.
1532      * A better hack is to remove the net force every
1533      * step, but that must be done at a higher level
1534      * since this routine doesn't see all atoms if running
1535      * in parallel. Don't know how important it is?  EL 990726
1536      */
1537 }
1538
1539 static real gather_energy_bsplines(gmx_pme_t pme,real *grid,
1540                                    pme_atomcomm_t *atc)
1541 {
1542     int     n,ithx,ithy,ithz,i0,j0,k0;
1543     int     index_x,index_xy;
1544     int *   idxptr;
1545     real    energy,pot,tx,ty,qn,gval;
1546     real    *thx,*thy,*thz;
1547     int     norder;
1548     int     order;
1549     
1550     
1551     order = pme->pme_order;
1552     thx   = atc->theta[XX];
1553     thy   = atc->theta[YY];
1554     thz   = atc->theta[ZZ];
1555     
1556     energy = 0;
1557     for(n=0; (n<atc->n); n++) {
1558         qn      = atc->q[n];
1559         
1560         if (qn != 0) {
1561             idxptr = atc->idx[n];
1562             norder = n*order;
1563             
1564             i0   = idxptr[XX]; 
1565             j0   = idxptr[YY];
1566             k0   = idxptr[ZZ];
1567             
1568             /* Pointer arithmetic alert, next three statements */
1569             thx  = atc->theta[XX] + norder;
1570             thy  = atc->theta[YY] + norder;
1571             thz  = atc->theta[ZZ] + norder;
1572
1573             pot = 0;
1574             for(ithx=0; (ithx<order); ithx++)
1575             {
1576                 index_x = (i0+ithx)*pme->pmegrid_ny*pme->pmegrid_nz;
1577                 tx      = thx[ithx];
1578
1579                 for(ithy=0; (ithy<order); ithy++)
1580                 {
1581                     index_xy = index_x+(j0+ithy)*pme->pmegrid_nz;
1582                     ty       = thy[ithy];
1583
1584                     for(ithz=0; (ithz<order); ithz++)
1585                     {
1586                         gval  = grid[index_xy+(k0+ithz)];
1587                         pot  += tx*ty*thz[ithz]*gval;
1588                     }
1589
1590                 }
1591             }
1592
1593             energy += pot*qn;
1594         }
1595     }
1596
1597     return energy;
1598 }
1599
1600 void make_bsplines(splinevec theta,splinevec dtheta,int order,
1601                    rvec fractx[],int nr,real charge[],
1602                    bool bFreeEnergy)
1603 {
1604     /* construct splines for local atoms */
1605     int  i,j,k,l;
1606     real dr,div;
1607     real *data,*ddata,*xptr;
1608     
1609     for(i=0; (i<nr); i++) {
1610         /* With free energy we do not use the charge check.
1611          * In most cases this will be more efficient than calling make_bsplines
1612          * twice, since usually more than half the particles have charges.
1613          */
1614         if (bFreeEnergy || charge[i] != 0.0) {
1615             xptr = fractx[i];
1616             for(j=0; (j<DIM); j++) {
1617                 dr  = xptr[j];
1618                 
1619                 /* dr is relative offset from lower cell limit */
1620                 data=&(theta[j][i*order]);
1621                 data[order-1]=0;
1622                 data[1]=dr;
1623                 data[0]=1-dr;
1624                 
1625                 for(k=3; (k<order); k++) {
1626                     div=1.0/(k-1.0);    
1627                     data[k-1]=div*dr*data[k-2];
1628                     for(l=1; (l<(k-1)); l++) {
1629                         data[k-l-1]=div*((dr+l)*data[k-l-2]+(k-l-dr)*
1630                                          data[k-l-1]);
1631                     }
1632                     data[0]=div*(1-dr)*data[0];
1633                 }
1634                 /* differentiate */
1635                 ddata    = &(dtheta[j][i*order]);
1636                 ddata[0] = -data[0];
1637                 for(k=1; (k<order); k++) {
1638                     ddata[k]=data[k-1]-data[k];
1639                 }
1640                 
1641                 div=1.0/(order-1);
1642                 data[order-1]=div*dr*data[order-2];
1643                 for(l=1; (l<(order-1)); l++) {
1644                     data[order-l-1]=div*((dr+l)*data[order-l-2]+
1645                                          (order-l-dr)*data[order-l-1]);
1646                 }
1647                 data[0]=div*(1-dr)*data[0]; 
1648             }
1649         }
1650     }
1651 }
1652
1653
1654 void make_dft_mod(real *mod,real *data,int ndata)
1655 {
1656   int i,j;
1657   real sc,ss,arg;
1658     
1659   for(i=0;i<ndata;i++) {
1660     sc=ss=0;
1661     for(j=0;j<ndata;j++) {
1662       arg=(2.0*M_PI*i*j)/ndata;
1663       sc+=data[j]*cos(arg);
1664       ss+=data[j]*sin(arg);
1665     }
1666     mod[i]=sc*sc+ss*ss;
1667   }
1668   for(i=0;i<ndata;i++)
1669     if(mod[i]<1e-7)
1670       mod[i]=(mod[i-1]+mod[i+1])*0.5;
1671 }
1672
1673
1674
1675 void make_bspline_moduli(splinevec bsp_mod,int nx,int ny,int nz,int order)
1676 {
1677   int nmax=max(nx,max(ny,nz));
1678   real *data,*ddata,*bsp_data;
1679   int i,k,l;
1680   real div;
1681     
1682   snew(data,order);
1683   snew(ddata,order);
1684   snew(bsp_data,nmax);
1685
1686   data[order-1]=0;
1687   data[1]=0;
1688   data[0]=1;
1689             
1690   for(k=3;k<order;k++) {
1691     div=1.0/(k-1.0);
1692     data[k-1]=0;
1693     for(l=1;l<(k-1);l++)
1694       data[k-l-1]=div*(l*data[k-l-2]+(k-l)*data[k-l-1]);
1695     data[0]=div*data[0];
1696   }
1697   /* differentiate */
1698   ddata[0]=-data[0];
1699   for(k=1;k<order;k++)
1700     ddata[k]=data[k-1]-data[k];
1701   div=1.0/(order-1);
1702   data[order-1]=0;
1703   for(l=1;l<(order-1);l++)
1704     data[order-l-1]=div*(l*data[order-l-2]+(order-l)*data[order-l-1]);
1705   data[0]=div*data[0]; 
1706
1707   for(i=0;i<nmax;i++)
1708     bsp_data[i]=0;
1709   for(i=1;i<=order;i++)
1710     bsp_data[i]=data[i-1];
1711     
1712   make_dft_mod(bsp_mod[XX],bsp_data,nx);
1713   make_dft_mod(bsp_mod[YY],bsp_data,ny);
1714   make_dft_mod(bsp_mod[ZZ],bsp_data,nz);
1715
1716   sfree(data);
1717   sfree(ddata);
1718   sfree(bsp_data);
1719 }
1720
1721 static void setup_coordinate_communication(pme_atomcomm_t *atc)
1722 {
1723   int nslab,n,i;
1724   int fw,bw;
1725
1726   nslab = atc->nslab;
1727
1728   n = 0;
1729   for(i=1; i<=nslab/2; i++) {
1730     fw = (atc->nodeid + i) % nslab;
1731     bw = (atc->nodeid - i + nslab) % nslab;
1732     if (n < nslab - 1) {
1733       atc->node_dest[n] = fw;
1734       atc->node_src[n]  = bw;
1735       n++;
1736     } 
1737     if (n < nslab - 1) {
1738       atc->node_dest[n] = bw;
1739       atc->node_src[n]  = fw;
1740       n++;
1741     }
1742   }
1743 }
1744
1745 int gmx_pme_destroy(FILE *log,gmx_pme_t *pmedata)
1746 {
1747     if(NULL != log)
1748     {
1749         fprintf(log,"Destroying PME data structures.\n");
1750     }
1751
1752     sfree((*pmedata)->nnx);
1753     sfree((*pmedata)->nny);
1754     sfree((*pmedata)->nnz);
1755         
1756     sfree((*pmedata)->pmegridA);
1757     sfree((*pmedata)->fftgridA);
1758     sfree((*pmedata)->cfftgridA);
1759     gmx_parallel_3dfft_destroy((*pmedata)->pfft_setupA);
1760     
1761     if((*pmedata)->pmegridB)
1762     {
1763         sfree((*pmedata)->pmegridB);
1764         sfree((*pmedata)->fftgridB);
1765         sfree((*pmedata)->cfftgridB);
1766         gmx_parallel_3dfft_destroy((*pmedata)->pfft_setupB);
1767     }
1768     sfree((*pmedata)->work_mhz);
1769     sfree((*pmedata)->work_m2);
1770     sfree((*pmedata)->work_denom);
1771     sfree((*pmedata)->work_tmp1_alloc);
1772     sfree((*pmedata)->work_m2inv);
1773         
1774     sfree(*pmedata);
1775     *pmedata = NULL;
1776   
1777   return 0;
1778 }
1779
1780 static int mult_up(int n,int f)
1781 {
1782     return ((n + f - 1)/f)*f;
1783 }
1784
1785
1786 static double pme_load_imbalance(gmx_pme_t pme)
1787 {
1788     int    nma,nmi;
1789     double n1,n2,n3;
1790
1791     nma = pme->nnodes_major;
1792     nmi = pme->nnodes_minor;
1793
1794     n1 = mult_up(pme->nkx,nma)*mult_up(pme->nky,nmi)*pme->nkz;
1795     n2 = mult_up(pme->nkx,nma)*mult_up(pme->nkz,nmi)*pme->nky;
1796     n3 = mult_up(pme->nky,nma)*mult_up(pme->nkz,nmi)*pme->nkx;
1797
1798     /* pme_solve is roughly double the cost of an fft */
1799
1800     return (n1 + n2 + 3*n3)/(double)(6*pme->nkx*pme->nky*pme->nkz);
1801 }
1802
1803 static void init_atomcomm(gmx_pme_t pme,pme_atomcomm_t *atc, t_commrec *cr,
1804                           int dimind,bool bSpread)
1805 {
1806     int nk,k,s;
1807
1808     atc->dimind = dimind;
1809     atc->nslab  = 1;
1810     atc->nodeid = 0;
1811     atc->pd_nalloc = 0;
1812 #ifdef GMX_MPI
1813     if (pme->nnodes > 1)
1814     {
1815         atc->mpi_comm = pme->mpi_comm_d[dimind];
1816         MPI_Comm_size(atc->mpi_comm,&atc->nslab);
1817         MPI_Comm_rank(atc->mpi_comm,&atc->nodeid);
1818     }
1819     if (debug)
1820     {
1821         fprintf(debug,"For PME atom communication in dimind %d: nslab %d rank %d\n",atc->dimind,atc->nslab,atc->nodeid);
1822     }
1823 #endif
1824
1825     atc->bSpread   = bSpread;
1826     atc->pme_order = pme->pme_order;
1827
1828     if (atc->nslab > 1)
1829     {
1830         /* These three allocations are not required for particle decomp. */
1831         snew(atc->node_dest,atc->nslab);
1832         snew(atc->node_src,atc->nslab);
1833         setup_coordinate_communication(atc);
1834         
1835         snew(atc->count,atc->nslab);
1836         snew(atc->rcount,atc->nslab);
1837         snew(atc->buf_index,atc->nslab);
1838     }
1839 }
1840
1841 static void 
1842 init_overlap_comm(pme_overlap_t *  ol,
1843                   int              norder,
1844 #ifdef GMX_MPI
1845                   MPI_Comm         comm,  
1846 #endif
1847                   int              nnodes, 
1848                   int              nodeid,
1849                   int              ndata)
1850 {
1851     int lbnd,rbnd,maxlr,b,i;
1852     int exten;
1853     int nn,nk;
1854     pme_grid_comm_t *pgc;
1855     bool bCont;
1856     int fft_start,fft_end,send_index1,recv_index1;
1857     
1858 #ifdef GMX_MPI
1859     ol->mpi_comm = comm;
1860 #endif
1861     
1862     ol->nnodes = nnodes;
1863     ol->nodeid = nodeid;
1864
1865     /* Linear translation of the PME grid wo'nt affect reciprocal space
1866      * calculations, so to optimize we only interpolate "upwards",
1867      * which also means we only have to consider overlap in one direction.
1868      * I.e., particles on this node might also be spread to grid indices
1869      * that belong to higher nodes (modulo nnodes)
1870      */
1871
1872     snew(ol->s2g0,ol->nnodes+1);
1873     snew(ol->s2g1,ol->nnodes);
1874     if (debug) { fprintf(debug,"PME slab boundaries:"); }
1875     for(i=0; i<nnodes; i++) 
1876     {
1877         /* s2g0 the local interpolation grid start.
1878          * s2g1 the local interpolation grid end.
1879          * Because grid overlap communication only goes forward,
1880          * the grid the slabs for fft's should be rounded down.
1881          */
1882         ol->s2g0[i] = ( i   *ndata + 0       )/nnodes;
1883         ol->s2g1[i] = ((i+1)*ndata + nnodes-1)/nnodes + norder - 1;
1884
1885         if (debug)
1886         {
1887             fprintf(debug,"  %3d %3d",ol->s2g0[i],ol->s2g1[i]);
1888         }
1889     }
1890     ol->s2g0[nnodes] = ndata;
1891     if (debug) { fprintf(debug,"\n"); }
1892
1893     /* Determine with how many nodes we need to communicate the grid overlap */
1894     b = 0;
1895     do
1896     {
1897         b++;
1898         bCont = FALSE;
1899         for(i=0; i<nnodes; i++)
1900         {
1901             if ((i+b <  nnodes && ol->s2g1[i] > ol->s2g0[i+b]) ||
1902                 (i+b >= nnodes && ol->s2g1[i] > ol->s2g0[i+b-nnodes] + ndata))
1903             {
1904                 bCont = TRUE;
1905             }
1906         }
1907     }
1908     while (bCont && b < nnodes);
1909     ol->noverlap_nodes = b - 1;
1910
1911     snew(ol->send_id,ol->noverlap_nodes);
1912     snew(ol->recv_id,ol->noverlap_nodes);
1913     for(b=0; b<ol->noverlap_nodes; b++)
1914     {
1915         ol->send_id[b] = (ol->nodeid + (b + 1)) % ol->nnodes;
1916         ol->recv_id[b] = (ol->nodeid - (b + 1) + ol->nnodes) % ol->nnodes;
1917     }
1918     snew(ol->comm_data, ol->noverlap_nodes);
1919
1920     for(b=0; b<ol->noverlap_nodes; b++)
1921     {
1922         pgc = &ol->comm_data[b];
1923         /* Send */
1924         fft_start        = ol->s2g0[ol->send_id[b]];
1925         fft_end          = ol->s2g0[ol->send_id[b]+1];
1926         if (ol->send_id[b] < nodeid)
1927         {
1928             fft_start += ndata;
1929             fft_end   += ndata;
1930         }
1931         send_index1      = ol->s2g1[nodeid];
1932         send_index1      = min(send_index1,fft_end);
1933         pgc->send_index0 = fft_start;
1934         pgc->send_nindex = max(0,send_index1 - pgc->send_index0);
1935
1936         /* We always start receiving to the first index of our slab */
1937         fft_start        = ol->s2g0[ol->nodeid];
1938         fft_end          = ol->s2g0[ol->nodeid+1];
1939         recv_index1      = ol->s2g1[ol->recv_id[b]];
1940         if (ol->recv_id[b] > nodeid)
1941         {
1942             recv_index1 -= ndata;
1943         }
1944         recv_index1      = min(recv_index1,fft_end);
1945         pgc->recv_index0 = fft_start;
1946         pgc->recv_nindex = max(0,recv_index1 - pgc->recv_index0);
1947     }
1948 }
1949
1950 static void
1951 make_gridindex5_to_localindex(int n,int local_start,int local_range,
1952                               int **global_to_local,
1953                               real **fraction_shift)
1954 {
1955     int i;
1956     int * gtl;
1957     real * fsh;
1958
1959     snew(gtl,5*n);
1960     snew(fsh,5*n);
1961     for(i=0; (i<5*n); i++)
1962     {
1963         /* Determine the global to local grid index */
1964         gtl[i] = (i - local_start + n) % n;
1965         /* For coordinates that fall within the local grid the fraction
1966          * is correct, we don't need to shift it.
1967          */
1968         fsh[i] = 0;
1969         if (local_range < n)
1970         {
1971             /* Due to rounding issues i could be 1 beyond the lower or
1972              * upper boundary of the local grid. Correct the index for this.
1973              * If we shift the index, we need to shift the fraction by
1974              * the same amount in the other direction to not affect
1975              * the weights.
1976              * Note that due to this shifting the weights at the end of
1977              * the spline might change, but that will only involve values
1978              * between zero and values close to the precision of a real,
1979              * which is anyhow the accuracy of the whole mesh calculation.
1980              */
1981             /* With local_range=0 we should not change i=local_start */
1982             if (i % n != local_start)
1983             {
1984                 if (gtl[i] == n-1)
1985                 {
1986                     gtl[i] = 0;
1987                     fsh[i] = -1; 
1988                 }
1989                 else if (gtl[i] == local_range)
1990                 {
1991                     gtl[i] = local_range - 1;
1992                     fsh[i] = 1;
1993                 }
1994             }
1995         }
1996     }
1997
1998     *global_to_local = gtl;
1999     *fraction_shift  = fsh;
2000 }
2001
2002 static void
2003 gmx_pme_check_grid_restrictions(FILE *fplog,char dim,int nnodes,int *nk)
2004 {
2005     int nk_new;
2006
2007     if (*nk % nnodes != 0)
2008     {
2009         nk_new = nnodes*(*nk/nnodes + 1);
2010
2011         if (2*nk_new >= 3*(*nk))
2012         {
2013             gmx_fatal(FARGS,"The PME grid size in dim %c (%d) is not divisble by the number of nodes doing PME in dim %c (%d). The grid size would have to be increased by more than 50%% to make the grid divisible. Change the total number of nodes or the number of domain decomposition cells in x or the PME grid %c dimension (and the cut-off).",
2014                       dim,*nk,dim,nnodes,dim);
2015         }
2016         
2017         if (fplog != NULL)
2018         {
2019             fprintf(fplog,"\nNOTE: The PME grid size in dim %c (%d) is not divisble by the number of nodes doing PME in dim %c (%d). Increasing the PME grid size in dim %c to %d. This will increase the accuracy and will not decrease the performance significantly on this number of nodes. For optimal performance change the total number of nodes or the number of domain decomposition cells in x or the PME grid %c dimension (and the cut-off).\n\n",
2020                     dim,*nk,dim,nnodes,dim,nk_new,dim);
2021         }
2022             
2023         *nk = nk_new;
2024     }
2025 }
2026
2027 int gmx_pme_init(gmx_pme_t *         pmedata,
2028                  t_commrec *         cr,
2029                  int                 nnodes_major,
2030                  int                 nnodes_minor,
2031                  t_inputrec *        ir,
2032                  int                 homenr,
2033                  bool                bFreeEnergy,
2034                  bool                bReproducible)
2035 {
2036     gmx_pme_t pme=NULL;
2037     
2038     pme_atomcomm_t *atc;
2039     int bufsizex,bufsizey,bufsize;
2040     ivec ndata;
2041     
2042     if (debug)
2043         fprintf(debug,"Creating PME data structures.\n");
2044     snew(pme,1);
2045         
2046     pme->redist_init         = FALSE;
2047     pme->sum_qgrid_tmp       = NULL;
2048     pme->sum_qgrid_dd_tmp    = NULL;
2049     pme->buf_nalloc          = 0;
2050     pme->redist_buf_nalloc   = 0;
2051     
2052     pme->nnodes              = 1;
2053     pme->bPPnode             = TRUE;
2054     
2055     pme->nnodes_major        = nnodes_major;
2056     pme->nnodes_minor        = nnodes_minor;
2057
2058 #ifdef GMX_MPI
2059     if (PAR(cr)) 
2060     {
2061         pme->mpi_comm        = cr->mpi_comm_mygroup;
2062         
2063         MPI_Comm_rank(pme->mpi_comm,&pme->nodeid);
2064         MPI_Comm_size(pme->mpi_comm,&pme->nnodes);
2065     }
2066 #endif
2067
2068     if (pme->nnodes == 1)
2069     {
2070         pme->ndecompdim = 0;
2071         pme->nodeid_major = 0;
2072         pme->nodeid_minor = 0;
2073     }
2074     else
2075     {
2076         if (nnodes_minor == 1)
2077         {
2078 #ifdef GMX_MPI
2079             pme->mpi_comm_d[0] = pme->mpi_comm;
2080             pme->mpi_comm_d[1] = NULL;
2081 #endif
2082             pme->ndecompdim = 1;
2083             pme->nodeid_major = pme->nodeid;
2084             pme->nodeid_minor = 0;
2085             
2086         }
2087         else if (nnodes_major == 1)
2088         {
2089 #ifdef GMX_MPI
2090             pme->mpi_comm_d[0] = NULL;
2091             pme->mpi_comm_d[1] = pme->mpi_comm;
2092 #endif
2093             pme->ndecompdim = 1;
2094             pme->nodeid_major = 0;
2095             pme->nodeid_minor = pme->nodeid;
2096         }
2097         else 
2098         {
2099             if (pme->nnodes % nnodes_major != 0)
2100             {
2101                 gmx_incons("For 2D PME decomposition, #PME nodes must be divisible by the number of nodes in the major dimension");
2102             }
2103             pme->ndecompdim = 2;
2104             
2105 #ifdef GMX_MPI
2106             MPI_Comm_split(pme->mpi_comm,pme->nodeid % nnodes_minor,
2107                            pme->nodeid,&pme->mpi_comm_d[0]);  /* My communicator along major dimension */
2108             MPI_Comm_split(pme->mpi_comm,pme->nodeid/nnodes_minor,
2109                            pme->nodeid,&pme->mpi_comm_d[1]);  /* My communicator along minor dimension */
2110             
2111             MPI_Comm_rank(pme->mpi_comm_d[0],&pme->nodeid_major);
2112             MPI_Comm_size(pme->mpi_comm_d[0],&pme->nnodes_major);
2113             MPI_Comm_rank(pme->mpi_comm_d[1],&pme->nodeid_minor);
2114             MPI_Comm_size(pme->mpi_comm_d[1],&pme->nnodes_minor);
2115 #endif
2116         }
2117         pme->bPPnode = (cr->duty & DUTY_PP);
2118     }
2119     
2120     if (ir->ePBC == epbcSCREW)
2121     {
2122         gmx_fatal(FARGS,"pme does not (yet) work with pbc = screw");
2123     }
2124     
2125     pme->bFEP        = ((ir->efep != efepNO) && bFreeEnergy);
2126     pme->nkx         = ir->nkx;
2127     pme->nky         = ir->nky;
2128     pme->nkz         = ir->nkz;
2129     pme->pme_order   = ir->pme_order;
2130     pme->epsilon_r   = ir->epsilon_r;
2131     
2132     /* Currently pme.c supports only the fft5d FFT code.
2133      * Therefore the grid always needs to be divisible by nnodes.
2134      * When the old 1D code is also supported again, change this check.
2135      *
2136      * This check should be done before calling gmx_pme_init
2137      * and fplog should be passed iso stderr.
2138      *
2139     if (pme->ndecompdim >= 2)
2140     */
2141     if (pme->ndecompdim >= 1)
2142     {
2143         /*
2144         gmx_pme_check_grid_restrictions(pme->nodeid==0 ? stderr : NULL,
2145                                         'x',nnodes_major,&pme->nkx);
2146         gmx_pme_check_grid_restrictions(pme->nodeid==0 ? stderr : NULL,
2147                                         'y',nnodes_minor,&pme->nky);
2148         */
2149     }
2150
2151     if (pme->nkx <= pme->pme_order*(pme->nnodes_major > 1 ? 2 : 1) ||
2152         pme->nky <= pme->pme_order*(pme->nnodes_minor > 1 ? 2 : 1) ||
2153         pme->nkz <= pme->pme_order)
2154     {
2155         gmx_fatal(FARGS,"The pme grid dimensions need to be larger than pme_order (%d) and in parallel larger than 2*pme_ordern for x and/or y",pme->pme_order);
2156     }
2157
2158     if (pme->nnodes > 1) {
2159         double imbal;
2160
2161 #ifdef GMX_MPI
2162         MPI_Type_contiguous(DIM, mpi_type, &(pme->rvec_mpi));
2163         MPI_Type_commit(&(pme->rvec_mpi));
2164 #endif
2165         
2166         /* Note that the charge spreading and force gathering, which usually
2167          * takes about the same amount of time as FFT+solve_pme,
2168          * is always fully load balanced
2169          * (unless the charge distribution is inhomogeneous).
2170          */
2171         
2172         imbal = pme_load_imbalance(pme);
2173         if (imbal >= 1.2 && pme->nodeid_major == 0 && pme->nodeid_minor == 0)
2174         {
2175             fprintf(stderr,
2176                     "\n"
2177                     "NOTE: The load imbalance in PME FFT and solve is %d%%.\n"
2178                     "      For optimal PME load balancing\n"
2179                     "      PME grid_x (%d) and grid_y (%d) should be divisible by #PME_nodes_x (%d)\n"
2180                     "      and PME grid_y (%d) and grid_z (%d) should be divisible by #PME_nodes_y (%d)\n"
2181                     "\n",
2182                     (int)((imbal-1)*100 + 0.5),
2183                     pme->nkx,pme->nky,pme->nnodes_major,
2184                     pme->nky,pme->nkz,pme->nnodes_minor);
2185         }
2186     }
2187
2188     init_overlap_comm(&pme->overlap[0],pme->pme_order,
2189 #ifdef GMX_MPI
2190                       pme->mpi_comm_d[0],
2191 #endif
2192                       pme->nnodes_major,pme->nodeid_major,pme->nkx);
2193     
2194     init_overlap_comm(&pme->overlap[1],pme->pme_order,
2195 #ifdef GMX_MPI
2196                       pme->mpi_comm_d[1],
2197 #endif
2198                       pme->nnodes_minor,pme->nodeid_minor,pme->nky);
2199     
2200     snew(pme->bsp_mod[XX],pme->nkx);
2201     snew(pme->bsp_mod[YY],pme->nky);
2202     snew(pme->bsp_mod[ZZ],pme->nkz);
2203     
2204     /* Allocate data for the interpolation grid, including overlap */
2205     pme->pmegrid_nx = pme->overlap[0].s2g1[pme->nodeid_major] -
2206                       pme->overlap[0].s2g0[pme->nodeid_major];
2207     pme->pmegrid_ny = pme->overlap[1].s2g1[pme->nodeid_minor] - 
2208                       pme->overlap[1].s2g0[pme->nodeid_minor];
2209     pme->pmegrid_nz = pme->nkz + pme->pme_order - 1;
2210     
2211     pme->pmegrid_start_ix = pme->overlap[0].s2g0[pme->nodeid_major];
2212     pme->pmegrid_start_iy = pme->overlap[1].s2g0[pme->nodeid_minor];
2213     pme->pmegrid_start_iz = 0;
2214     
2215     make_gridindex5_to_localindex(pme->nkx,
2216                                   pme->pmegrid_start_ix,
2217                                   pme->pmegrid_nx - (pme->pme_order-1),
2218                                   &pme->nnx,&pme->fshx);
2219     make_gridindex5_to_localindex(pme->nky,
2220                                   pme->pmegrid_start_iy,
2221                                   pme->pmegrid_ny - (pme->pme_order-1),
2222                                   &pme->nny,&pme->fshy);
2223     make_gridindex5_to_localindex(pme->nkz,
2224                                   pme->pmegrid_start_iz,
2225                                   pme->pmegrid_nz - (pme->pme_order-1),
2226                                   &pme->nnz,&pme->fshz);
2227     
2228     snew(pme->pmegridA,pme->pmegrid_nx*pme->pmegrid_ny*pme->pmegrid_nz);
2229     
2230     /* For non-divisible grid we need pme_order iso pme_order-1 */
2231     /* x overlap is copied in place: take padding into account.
2232      * y is always copied through a buffer: we don't need padding in z,
2233      * but we do need the overlap in x because of the communication order.
2234      */
2235     bufsizex = pme->pme_order*pme->pmegrid_ny*pme->pmegrid_nz;
2236     bufsizey = pme->pme_order*pme->pmegrid_nx*pme->nkz;
2237     bufsize  = (bufsizex>bufsizey) ? bufsizex : bufsizey;
2238     
2239     snew(pme->pmegrid_sendbuf,bufsize);
2240     snew(pme->pmegrid_recvbuf,bufsize);
2241     
2242     ndata[0] = pme->nkx;
2243     ndata[1] = pme->nky;
2244     ndata[2] = pme->nkz;
2245     
2246     /* This routine will allocate the grid data to fit the FFTs */
2247     gmx_parallel_3dfft_init(&pme->pfft_setupA,ndata,
2248                             &pme->fftgridA,&pme->cfftgridA,
2249                             pme->mpi_comm_d,
2250                             pme->overlap[0].s2g0,pme->overlap[1].s2g0,
2251                             bReproducible);
2252     
2253     if (bFreeEnergy)
2254     {
2255         snew(pme->pmegridB,pme->pmegrid_nx*pme->pmegrid_ny*pme->pmegrid_nz);    
2256         gmx_parallel_3dfft_init(&pme->pfft_setupB,ndata,
2257                                 &pme->fftgridB,&pme->cfftgridB,
2258                                 pme->mpi_comm_d,
2259                                 pme->overlap[0].s2g0,pme->overlap[1].s2g0,
2260                                 bReproducible);
2261     } else 
2262     {
2263         pme->pmegridB    = NULL;
2264         pme->fftgridB    = NULL;
2265         pme->cfftgridB   = NULL;
2266     }
2267     
2268     make_bspline_moduli(pme->bsp_mod,pme->nkx,pme->nky,pme->nkz,pme->pme_order);
2269     
2270     /* Use atc[0] for spreading */
2271     init_atomcomm(pme,&pme->atc[0],cr,nnodes_major > 1 ? 0 : 1,TRUE);
2272     if (pme->ndecompdim >= 2)
2273     {
2274         init_atomcomm(pme,&pme->atc[1],cr,1,FALSE);
2275     }
2276     
2277     if (pme->nnodes == 1) {
2278         pme->atc[0].n = homenr;
2279         pme_realloc_atomcomm_things(&pme->atc[0]);
2280     }
2281     
2282     /* Use fft5d, order after FFT is y major, z, x minor */
2283     pme->work_nalloc = pme->nkx;
2284     snew(pme->work_mhx,pme->work_nalloc);
2285     snew(pme->work_mhy,pme->work_nalloc);
2286     snew(pme->work_mhz,pme->work_nalloc);
2287     snew(pme->work_m2,pme->work_nalloc);
2288     snew(pme->work_denom,pme->work_nalloc);
2289     /* Allocate an aligned pointer for SSE operations, including 3 extra
2290      * elements at the end since SSE operates on 4 elements at a time.
2291      */
2292     snew(pme->work_tmp1_alloc,pme->work_nalloc+8);
2293     pme->work_tmp1 = (real *) (((size_t) pme->work_tmp1_alloc + 16) & (~((size_t) 15)));
2294     snew(pme->work_m2inv,pme->work_nalloc);
2295
2296     *pmedata = pme;
2297     
2298     return 0;
2299 }
2300
2301 static void spread_on_grid(gmx_pme_t pme,
2302                            pme_atomcomm_t *atc,real *grid,
2303                            bool bCalcSplines,bool bSpread)
2304 {    
2305     if (bCalcSplines)
2306     {
2307     
2308         /* Compute fftgrid index for all atoms,
2309          * with help of some extra variables.
2310          */
2311         calc_interpolation_idx(pme,atc);
2312         
2313         /* make local bsplines  */
2314         make_bsplines(atc->theta,atc->dtheta,pme->pme_order,
2315                       atc->fractx,atc->n,atc->q,pme->bFEP);
2316     }    
2317     
2318     if (bSpread)
2319     {
2320         /* put local atoms on grid. */
2321         spread_q_bsplines(pme,atc,grid);
2322     }
2323 }
2324
2325 void gmx_pme_calc_energy(gmx_pme_t pme,int n,rvec *x,real *q,real *V)
2326 {
2327     pme_atomcomm_t *atc;
2328     real *grid;
2329
2330     if (pme->nnodes > 1)
2331     {
2332         gmx_incons("gmx_pme_calc_energy called in parallel");
2333     }
2334     if (pme->bFEP > 1)
2335     {
2336         gmx_incons("gmx_pme_calc_energy with free energy");
2337     }
2338
2339     atc = &pme->atc_energy;
2340     atc->nslab     = 1;
2341     atc->bSpread   = TRUE;
2342     atc->pme_order = pme->pme_order;
2343     atc->n         = n;
2344     pme_realloc_atomcomm_things(atc);
2345     atc->x         = x;
2346     atc->q         = q;
2347     
2348     /* We only use the A-charges grid */
2349     grid = pme->pmegridA;
2350
2351     spread_on_grid(pme,atc,grid,TRUE,FALSE);
2352
2353     *V = gather_energy_bsplines(pme,grid,atc);
2354 }
2355
2356
2357 static void reset_pmeonly_counters(t_commrec *cr,gmx_wallcycle_t wcycle,
2358         t_nrnb *nrnb,t_inputrec *ir, gmx_large_int_t step_rel)
2359 {
2360     /* Reset all the counters related to performance over the run */
2361     wallcycle_stop(wcycle,ewcRUN);
2362     wallcycle_reset_all(wcycle);
2363     init_nrnb(nrnb);
2364     ir->init_step += step_rel;
2365     ir->nsteps    -= step_rel;
2366     wallcycle_start(wcycle,ewcRUN);
2367 }
2368
2369
2370 int gmx_pmeonly(gmx_pme_t pme,
2371                 t_commrec *cr,    t_nrnb *nrnb,
2372                 gmx_wallcycle_t wcycle,
2373                 real ewaldcoeff,  bool bGatherOnly,
2374                 t_inputrec *ir)
2375 {
2376     gmx_pme_pp_t pme_pp;
2377     int  natoms;
2378     matrix box;
2379     rvec *x_pp=NULL,*f_pp=NULL;
2380     real *chargeA=NULL,*chargeB=NULL;
2381     real lambda=0;
2382     int  maxshift_x=0,maxshift_y=0;
2383     real energy,dvdlambda;
2384     matrix vir;
2385     float cycles;
2386     int  count;
2387     bool bEnerVir;
2388     gmx_large_int_t step,step_rel;
2389     
2390     
2391     pme_pp = gmx_pme_pp_init(cr);
2392     
2393     init_nrnb(nrnb);
2394     
2395     count = 0;
2396     do /****** this is a quasi-loop over time steps! */
2397     {
2398         /* Domain decomposition */
2399         natoms = gmx_pme_recv_q_x(pme_pp,
2400                                   &chargeA,&chargeB,box,&x_pp,&f_pp,
2401                                   &maxshift_x,&maxshift_y,
2402                                   &pme->bFEP,&lambda,
2403                                   &bEnerVir,
2404                                   &step);
2405         
2406         if (natoms == -1) {
2407             /* We should stop: break out of the loop */
2408             break;
2409         }
2410         
2411         step_rel = step - ir->init_step;
2412         
2413         if (count == 0)
2414             wallcycle_start(wcycle,ewcRUN);
2415         
2416         wallcycle_start(wcycle,ewcPMEMESH);
2417         
2418         dvdlambda = 0;
2419         clear_mat(vir);
2420         gmx_pme_do(pme,0,natoms,x_pp,f_pp,chargeA,chargeB,box,
2421                    cr,maxshift_x,maxshift_y,nrnb,wcycle,vir,ewaldcoeff,
2422                    &energy,lambda,&dvdlambda,
2423                    GMX_PME_DO_ALL_F | (bEnerVir ? GMX_PME_CALC_ENER_VIR : 0));
2424         
2425         cycles = wallcycle_stop(wcycle,ewcPMEMESH);
2426         
2427         gmx_pme_send_force_vir_ener(pme_pp,
2428                                     f_pp,vir,energy,dvdlambda,
2429                                     cycles);
2430         
2431         count++;
2432
2433         if (step_rel == wcycle_get_reset_counters(wcycle))
2434         {
2435             /* Reset all the counters related to performance over the run */
2436             reset_pmeonly_counters(cr,wcycle,nrnb,ir,step_rel);
2437             wcycle_set_reset_counters(wcycle, 0);
2438         }
2439         
2440     } /***** end of quasi-loop, we stop with the break above */
2441     while (TRUE);
2442     
2443     return 0;
2444 }
2445
2446 int gmx_pme_do(gmx_pme_t pme,
2447                int start,       int homenr,
2448                rvec x[],        rvec f[],
2449                real *chargeA,   real *chargeB,
2450                matrix box,      t_commrec *cr,
2451                int  maxshift_x, int maxshift_y,
2452                t_nrnb *nrnb,    gmx_wallcycle_t wcycle,
2453                matrix vir,      real ewaldcoeff,
2454                real *energy,    real lambda, 
2455                real *dvdlambda, int flags)
2456 {
2457     int     q,d,i,j,ntot,npme;
2458     int     nx,ny,nz;
2459     int     n_d,local_ny;
2460     int     loop_count;
2461     pme_atomcomm_t *atc=NULL;
2462     real *  grid=NULL;
2463     real    *ptr;
2464     rvec    *x_d,*f_d;
2465     real    *charge=NULL,*q_d,vol;
2466     real    energy_AB[2];
2467     matrix  vir_AB[2];
2468     bool    bClearF;
2469     gmx_parallel_3dfft_t pfft_setup;
2470     real *  fftgrid;
2471     t_complex * cfftgrid;
2472
2473     if (pme->nnodes > 1) {
2474         atc = &pme->atc[0];
2475         atc->npd = homenr;
2476         if (atc->npd > atc->pd_nalloc) {
2477             atc->pd_nalloc = over_alloc_dd(atc->npd);
2478             srenew(atc->pd,atc->pd_nalloc);
2479         }
2480         atc->maxshift = (atc->dimind==0 ? maxshift_x : maxshift_y);
2481     }
2482     
2483     for(q=0; q<(pme->bFEP ? 2 : 1); q++) {
2484         if (q == 0) {
2485             grid = pme->pmegridA;
2486             fftgrid = pme->fftgridA;
2487             cfftgrid = pme->cfftgridA;
2488             pfft_setup = pme->pfft_setupA;
2489             charge = chargeA+start;
2490         } else {
2491             grid = pme->pmegridB;
2492             fftgrid = pme->fftgridB;
2493             cfftgrid = pme->cfftgridB;
2494             pfft_setup = pme->pfft_setupB;
2495             charge = chargeB+start;
2496         }
2497         /* Unpack structure */
2498         if (debug) {
2499             fprintf(debug,"PME: nnodes = %d, nodeid = %d\n",
2500                     cr->nnodes,cr->nodeid);
2501             fprintf(debug,"Grid = %p\n",(void*)grid);
2502             if (grid == NULL)
2503                 gmx_fatal(FARGS,"No grid!");
2504         }
2505         where();
2506         
2507         m_inv_ur0(box,pme->recipbox); 
2508
2509         if (pme->nnodes == 1) {
2510             atc = &pme->atc[0];
2511             if (DOMAINDECOMP(cr)) {
2512                 atc->n = homenr;
2513                 pme_realloc_atomcomm_things(atc);
2514             }
2515             atc->x = x;
2516             atc->q = charge;
2517             atc->f = f;
2518         } else {
2519             wallcycle_start(wcycle,ewcPME_REDISTXF);
2520             for(d=pme->ndecompdim-1; d>=0; d--)
2521             {
2522                 if (d == pme->ndecompdim-1)
2523                 {
2524                     n_d = homenr;
2525                     x_d = x + start;
2526                     q_d = charge;
2527                 }
2528                 else
2529                 {
2530                     n_d = pme->atc[d+1].n;
2531                     x_d = atc->x;
2532                     q_d = atc->q;
2533                 }
2534                 atc = &pme->atc[d];
2535                 atc->npd = n_d;
2536                 if (atc->npd > atc->pd_nalloc) {
2537                     atc->pd_nalloc = over_alloc_dd(atc->npd);
2538                     srenew(atc->pd,atc->pd_nalloc);
2539                 }
2540                 atc->maxshift = (atc->dimind==0 ? maxshift_x : maxshift_y);
2541                 pme_calc_pidx(n_d,pme->recipbox,x_d,atc);
2542                 where();
2543                 
2544                 GMX_BARRIER(cr->mpi_comm_mygroup);
2545                 /* Redistribute x (only once) and qA or qB */
2546                 if (DOMAINDECOMP(cr)) {
2547                     dd_pmeredist_x_q(pme, n_d, q==0, x_d, q_d, atc);
2548                 } else {
2549                     pmeredist_pd(pme, TRUE, n_d, q==0, x_d, q_d, atc);
2550                 }
2551             }
2552             where();
2553
2554             wallcycle_stop(wcycle,ewcPME_REDISTXF);
2555         }
2556         
2557         if (debug)
2558             fprintf(debug,"Node= %6d, pme local particles=%6d\n",
2559                     cr->nodeid,atc->n);
2560
2561         if (flags & GMX_PME_SPREAD_Q)
2562         {
2563             wallcycle_start(wcycle,ewcPME_SPREADGATHER);
2564
2565             /* Spread the charges on a grid */
2566             GMX_MPE_LOG(ev_spread_on_grid_start);
2567             
2568             /* Spread the charges on a grid */
2569             spread_on_grid(pme,&pme->atc[0],grid,q==0,TRUE);
2570             GMX_MPE_LOG(ev_spread_on_grid_finish);
2571
2572             if (q == 0)
2573             {
2574                 inc_nrnb(nrnb,eNR_WEIGHTS,DIM*atc->n);
2575             }
2576             inc_nrnb(nrnb,eNR_SPREADQBSP,
2577                      pme->pme_order*pme->pme_order*pme->pme_order*atc->n);
2578
2579             wrap_periodic_pmegrid(pme,grid);
2580
2581             /* sum contributions to local grid from other nodes */
2582 #ifdef GMX_MPI
2583             if (pme->nnodes > 1) {
2584                 GMX_BARRIER(cr->mpi_comm_mygroup);
2585                 gmx_sum_qgrid_dd(pme,grid,GMX_SUM_QGRID_FORWARD);
2586                 where();
2587             }
2588 #endif
2589             where();
2590
2591             copy_pmegrid_to_fftgrid(pme,grid,fftgrid);
2592
2593             wallcycle_stop(wcycle,ewcPME_SPREADGATHER);
2594         }
2595          
2596         if (flags & GMX_PME_SOLVE)
2597         {
2598             /* do 3d-fft */ 
2599             GMX_BARRIER(cr->mpi_comm_mygroup);
2600             GMX_MPE_LOG(ev_gmxfft3d_start);
2601             wallcycle_start(wcycle,ewcPME_FFT);
2602             gmx_parallel_3dfft_execute(pfft_setup,GMX_FFT_REAL_TO_COMPLEX,fftgrid,cfftgrid);
2603             wallcycle_stop(wcycle,ewcPME_FFT);
2604             GMX_MPE_LOG(ev_gmxfft3d_finish);
2605             where();
2606             
2607             /* solve in k-space for our local cells */
2608             vol = det(box);
2609             GMX_BARRIER(cr->mpi_comm_mygroup);
2610             GMX_MPE_LOG(ev_solve_pme_start);
2611             wallcycle_start(wcycle,ewcPME_SOLVE);
2612             loop_count =
2613                 solve_pme_yzx(pme,cfftgrid,ewaldcoeff,vol,
2614                               flags & GMX_PME_CALC_ENER_VIR,
2615                               &energy_AB[q],vir_AB[q]);
2616             wallcycle_stop(wcycle,ewcPME_SOLVE);
2617             where();
2618             GMX_MPE_LOG(ev_solve_pme_finish);
2619             inc_nrnb(nrnb,eNR_SOLVEPME,loop_count);
2620         }
2621
2622         if (flags & GMX_PME_CALC_F)
2623         {
2624             
2625             /* do 3d-invfft */
2626             GMX_BARRIER(cr->mpi_comm_mygroup);
2627             GMX_MPE_LOG(ev_gmxfft3d_start);
2628             where();
2629             wallcycle_start(wcycle,ewcPME_FFT);
2630             gmx_parallel_3dfft_execute(pfft_setup,GMX_FFT_COMPLEX_TO_REAL,cfftgrid,fftgrid);
2631             wallcycle_stop(wcycle,ewcPME_FFT);
2632
2633             where();
2634             GMX_MPE_LOG(ev_gmxfft3d_finish);
2635
2636             if (pme->nodeid == 0)
2637             {
2638                 ntot = pme->nkx*pme->nky*pme->nkz;
2639                 npme  = ntot*log((real)ntot)/log(2.0);
2640                 inc_nrnb(nrnb,eNR_FFT,2*npme);
2641             }
2642
2643             wallcycle_start(wcycle,ewcPME_SPREADGATHER);
2644
2645             copy_fftgrid_to_pmegrid(pme,fftgrid,grid);
2646
2647             /* distribute local grid to all nodes */
2648 #ifdef GMX_MPI
2649             if (pme->nnodes > 1) {
2650                 GMX_BARRIER(cr->mpi_comm_mygroup);
2651                 gmx_sum_qgrid_dd(pme,grid,GMX_SUM_QGRID_BACKWARD);
2652             }
2653 #endif
2654             where();
2655
2656             unwrap_periodic_pmegrid(pme,grid);
2657             
2658             /* interpolate forces for our local atoms */
2659             GMX_BARRIER(cr->mpi_comm_mygroup);
2660             GMX_MPE_LOG(ev_gather_f_bsplines_start);
2661
2662             where();
2663             
2664             /* If we are running without parallelization,
2665              * atc->f is the actual force array, not a buffer,
2666              * therefore we should not clear it.
2667              */
2668             bClearF = (q == 0 && PAR(cr));
2669             gather_f_bsplines(pme,grid,bClearF,&pme->atc[0],
2670                               pme->bFEP ? (q==0 ? 1.0-lambda : lambda) : 1.0);
2671             where();
2672             
2673             GMX_MPE_LOG(ev_gather_f_bsplines_finish);
2674             
2675             inc_nrnb(nrnb,eNR_GATHERFBSP,
2676                      pme->pme_order*pme->pme_order*pme->pme_order*pme->atc[0].n);
2677             wallcycle_stop(wcycle,ewcPME_SPREADGATHER);
2678        }
2679     } /* of q-loop */
2680     
2681     if ((flags & GMX_PME_CALC_F) && pme->nnodes > 1) {
2682         wallcycle_start(wcycle,ewcPME_REDISTXF);
2683         for(d=0; d<pme->ndecompdim; d++)
2684         {
2685             atc = &pme->atc[d];
2686             if (d == pme->ndecompdim - 1)
2687             {
2688                 n_d = homenr;
2689                 f_d = f + start;
2690             }
2691             else
2692             {
2693                 n_d = pme->atc[d+1].n;
2694                 f_d = pme->atc[d+1].f;
2695             }
2696             GMX_BARRIER(cr->mpi_comm_mygroup);
2697             if (DOMAINDECOMP(cr)) {
2698                 dd_pmeredist_f(pme,atc,n_d,f_d,
2699                                d==pme->ndecompdim-1 && pme->bPPnode);
2700             } else {
2701                 pmeredist_pd(pme, FALSE, n_d, TRUE, f_d, NULL, atc);
2702             }
2703         }
2704
2705         wallcycle_stop(wcycle,ewcPME_REDISTXF);
2706     }
2707     where();
2708     
2709     if (!pme->bFEP) {
2710         *energy = energy_AB[0];
2711         m_add(vir,vir_AB[0],vir);
2712     } else {
2713         *energy = (1.0-lambda)*energy_AB[0] + lambda*energy_AB[1];
2714         *dvdlambda += energy_AB[1] - energy_AB[0];
2715         for(i=0; i<DIM; i++)
2716             for(j=0; j<DIM; j++)
2717                 vir[i][j] += (1.0-lambda)*vir_AB[0][i][j] + lambda*vir_AB[1][i][j];
2718     }
2719
2720     if (debug)
2721         fprintf(debug,"PME mesh energy: %g\n",*energy);
2722     
2723     return 0;
2724 }