Removed old-fashioned MPE logging code
[alexxy/gromacs.git] / src / gromacs / mdlib / pme.c
1 /* -*- mode: c; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; c-file-style: "stroustrup"; -*-
2  *
3  * 
4  *                This source code is part of
5  * 
6  *                 G   R   O   M   A   C   S
7  * 
8  *          GROningen MAchine for Chemical Simulations
9  * 
10  *                        VERSION 3.2.0
11  * Written by David van der Spoel, Erik Lindahl, Berk Hess, and others.
12  * Copyright (c) 1991-2000, University of Groningen, The Netherlands.
13  * Copyright (c) 2001-2004, The GROMACS development team,
14  * check out http://www.gromacs.org for more information.
15
16  * This program is free software; you can redistribute it and/or
17  * modify it under the terms of the GNU General Public License
18  * as published by the Free Software Foundation; either version 2
19  * of the License, or (at your option) any later version.
20  * 
21  * If you want to redistribute modifications, please consider that
22  * scientific software is very special. Version control is crucial -
23  * bugs must be traceable. We will be happy to consider code for
24  * inclusion in the official distribution, but derived work must not
25  * be called official GROMACS. Details are found in the README & COPYING
26  * files - if they are missing, get the official version at www.gromacs.org.
27  * 
28  * To help us fund GROMACS development, we humbly ask that you cite
29  * the papers on the package - you can find them in the top README file.
30  * 
31  * For more info, check our website at http://www.gromacs.org
32  * 
33  * And Hey:
34  * GROwing Monsters And Cloning Shrimps
35  */
36 /* IMPORTANT FOR DEVELOPERS:
37  *
38  * Triclinic pme stuff isn't entirely trivial, and we've experienced
39  * some bugs during development (many of them due to me). To avoid
40  * this in the future, please check the following things if you make
41  * changes in this file:
42  *
43  * 1. You should obtain identical (at least to the PME precision)
44  *    energies, forces, and virial for
45  *    a rectangular box and a triclinic one where the z (or y) axis is
46  *    tilted a whole box side. For instance you could use these boxes:
47  *
48  *    rectangular       triclinic
49  *     2  0  0           2  0  0
50  *     0  2  0           0  2  0
51  *     0  0  6           2  2  6
52  *
53  * 2. You should check the energy conservation in a triclinic box.
54  *
55  * It might seem an overkill, but better safe than sorry.
56  * /Erik 001109
57  */ 
58
59 #ifdef HAVE_CONFIG_H
60 #include <config.h>
61 #endif
62
63 #ifdef GMX_LIB_MPI
64 #include <mpi.h>
65 #endif
66 #ifdef GMX_THREADS
67 #include "tmpi.h"
68 #endif
69
70
71 #include <stdio.h>
72 #include <string.h>
73 #include <math.h>
74 #include "typedefs.h"
75 #include "txtdump.h"
76 #include "vec.h"
77 #include "gmxcomplex.h"
78 #include "smalloc.h"
79 #include "futil.h"
80 #include "coulomb.h"
81 #include "gmx_fatal.h"
82 #include "pme.h"
83 #include "network.h"
84 #include "physics.h"
85 #include "nrnb.h"
86 #include "copyrite.h"
87 #include "gmx_wallcycle.h"
88 #include "gmx_parallel_3dfft.h"
89 #include "pdbio.h"
90
91 #if ( !defined(GMX_DOUBLE) && ( defined(GMX_IA32_SSE) || defined(GMX_X86_64_SSE) || defined(GMX_X86_64_SSE2) ) )
92 #include "gmx_sse2_single.h"
93 #endif
94
95 #define DFT_TOL 1e-7
96 /* #define PRT_FORCE */
97 /* conditions for on the fly time-measurement */
98 /* #define TAKETIME (step > 1 && timesteps < 10) */
99 #define TAKETIME FALSE
100
101 #ifdef GMX_DOUBLE
102 #define mpi_type MPI_DOUBLE
103 #else
104 #define mpi_type MPI_FLOAT
105 #endif
106
107 /* Internal datastructures */
108 typedef struct {
109     int send_index0;
110     int send_nindex;
111     int recv_index0;
112     int recv_nindex;
113 } pme_grid_comm_t;
114
115 typedef struct {
116 #ifdef GMX_MPI
117     MPI_Comm mpi_comm;
118 #endif
119     int  nnodes,nodeid;
120     int  *s2g0;
121     int  *s2g1;
122     int  noverlap_nodes;
123     int  *send_id,*recv_id;
124     pme_grid_comm_t *comm_data;
125 } pme_overlap_t;
126
127 typedef struct {
128     int  dimind;            /* The index of the dimension, 0=x, 1=y */
129     int  nslab;
130     int  nodeid;
131 #ifdef GMX_MPI
132     MPI_Comm mpi_comm;
133 #endif
134
135     int  *node_dest;        /* The nodes to send x and q to with DD */
136     int  *node_src;         /* The nodes to receive x and q from with DD */
137     int  *buf_index;        /* Index for commnode into the buffers */
138
139     int  maxshift;
140
141     int  npd;
142     int  pd_nalloc;
143     int  *pd;
144     int  *count;            /* The number of atoms to send to each node */
145     int  *rcount;           /* The number of atoms to receive */
146
147     int  n;
148     int  nalloc;
149     rvec *x;
150     real *q;
151     rvec *f;
152     gmx_bool bSpread;           /* These coordinates are used for spreading */
153     int  pme_order;
154     splinevec theta,dtheta;
155     ivec *idx;
156     rvec *fractx;            /* Fractional coordinate relative to the
157                               * lower cell boundary 
158                               */
159 } pme_atomcomm_t;
160
161 typedef struct gmx_pme {
162     int  ndecompdim;         /* The number of decomposition dimensions */
163     int  nodeid;             /* Our nodeid in mpi->mpi_comm */
164     int  nodeid_major;
165     int  nodeid_minor;
166     int  nnodes;             /* The number of nodes doing PME */
167     int  nnodes_major;
168     int  nnodes_minor;
169
170     MPI_Comm mpi_comm;
171     MPI_Comm mpi_comm_d[2];  /* Indexed on dimension, 0=x, 1=y */
172 #ifdef GMX_MPI
173     MPI_Datatype  rvec_mpi;  /* the pme vector's MPI type */
174 #endif
175
176     gmx_bool bPPnode;            /* Node also does particle-particle forces */
177     gmx_bool bFEP;               /* Compute Free energy contribution */
178     int nkx,nky,nkz;         /* Grid dimensions */
179     int pme_order;
180     real epsilon_r;           
181     
182     real *  pmegridA;  /* Grids on which we do spreading/interpolation, includes overlap */
183     real *  pmegridB;
184     int     pmegrid_nx,pmegrid_ny,pmegrid_nz;
185     int     pmegrid_start_ix,pmegrid_start_iy,pmegrid_start_iz;    
186     
187     real *  pmegrid_sendbuf;
188     real *  pmegrid_recvbuf;
189     
190     real *fftgridA;             /* Grids for FFT. With 1D FFT decomposition this can be a pointer */
191     real *fftgridB;             /* inside the interpolation grid, but separate for 2D PME decomp. */
192     int   fftgrid_nx,fftgrid_ny,fftgrid_nz;
193     
194     t_complex *cfftgridA;             /* Grids for complex FFT data */
195     t_complex *cfftgridB;            
196     int   cfftgrid_nx,cfftgrid_ny,cfftgrid_nz;
197     
198     gmx_parallel_3dfft_t  pfft_setupA;
199     gmx_parallel_3dfft_t  pfft_setupB;
200     
201     int  *nnx,*nny,*nnz;
202     real *fshx,*fshy,*fshz;
203     
204     pme_atomcomm_t atc[2];  /* Indexed on decomposition index */
205     matrix    recipbox;
206     splinevec bsp_mod;
207     
208     pme_overlap_t overlap[2]; /* Indexed on dimension, 0=x, 1=y */
209
210
211     pme_atomcomm_t atc_energy; /* Only for gmx_pme_calc_energy */
212     
213     rvec *bufv;             /* Communication buffer */
214     real *bufr;             /* Communication buffer */
215     int  buf_nalloc;        /* The communication buffer size */
216
217     /* work data for solve_pme */
218     int      work_nalloc;
219     real *   work_mhx;
220     real *   work_mhy;
221     real *   work_mhz;
222     real *   work_m2;
223     real *   work_denom;
224     real *   work_tmp1_alloc;
225     real *   work_tmp1;
226     real *   work_m2inv;
227
228     /* Work data for PME_redist */
229     gmx_bool     redist_init;
230     int *    scounts; 
231     int *    rcounts;
232     int *    sdispls;
233     int *    rdispls;
234     int *    sidx;
235     int *    idxa;    
236     real *   redist_buf;
237     int      redist_buf_nalloc;
238     
239     /* Work data for sum_qgrid */
240     real *   sum_qgrid_tmp;
241     real *   sum_qgrid_dd_tmp;
242 } t_gmx_pme;
243
244
245 static void calc_interpolation_idx(gmx_pme_t pme,pme_atomcomm_t *atc)
246 {
247     int  i;
248     int  *idxptr,tix,tiy,tiz;
249     real *xptr,*fptr,tx,ty,tz;
250     real rxx,ryx,ryy,rzx,rzy,rzz;
251     int  nx,ny,nz;
252     int  start_ix,start_iy,start_iz;
253     
254     nx  = pme->nkx;
255     ny  = pme->nky;
256     nz  = pme->nkz;
257     
258     start_ix = pme->pmegrid_start_ix;
259     start_iy = pme->pmegrid_start_iy;
260     start_iz = pme->pmegrid_start_iz;
261     
262     rxx = pme->recipbox[XX][XX];
263     ryx = pme->recipbox[YY][XX];
264     ryy = pme->recipbox[YY][YY];
265     rzx = pme->recipbox[ZZ][XX];
266     rzy = pme->recipbox[ZZ][YY];
267     rzz = pme->recipbox[ZZ][ZZ];
268     
269     for(i=0; (i<atc->n); i++) {
270         xptr   = atc->x[i];
271         idxptr = atc->idx[i];
272         fptr   = atc->fractx[i];
273         
274         /* Fractional coordinates along box vectors, add 2.0 to make 100% sure we are positive for triclinic boxes */
275         tx = nx * ( xptr[XX] * rxx + xptr[YY] * ryx + xptr[ZZ] * rzx + 2.0 );
276         ty = ny * (                  xptr[YY] * ryy + xptr[ZZ] * rzy + 2.0 );
277         tz = nz * (                                   xptr[ZZ] * rzz + 2.0 );
278         
279         tix = (int)(tx);
280         tiy = (int)(ty);
281         tiz = (int)(tz);
282         
283         /* Because decomposition only occurs in x and y,
284          * we never have a fraction correction in z.
285          */
286         fptr[XX] = tx - tix + pme->fshx[tix];
287         fptr[YY] = ty - tiy + pme->fshy[tiy];
288         fptr[ZZ] = tz - tiz;   
289
290         idxptr[XX] = pme->nnx[tix];
291         idxptr[YY] = pme->nny[tiy];
292         idxptr[ZZ] = pme->nnz[tiz];
293
294 #ifdef DEBUG
295         range_check(idxptr[XX],0,pme->pmegrid_nx);
296         range_check(idxptr[YY],0,pme->pmegrid_ny);
297         range_check(idxptr[ZZ],0,pme->pmegrid_nz);
298 #endif
299   }  
300 }
301
302 static void pme_calc_pidx(int natoms, matrix recipbox, rvec x[],
303                           pme_atomcomm_t *atc)
304 {
305     int  nslab,i;
306     int  si;
307     real *xptr,s;
308     real rxx,ryx,rzx,ryy,rzy;
309     int *pd,*count;
310
311     /* Calculate PME task index (pidx) for each grid index.
312      * Here we always assign equally sized slabs to each node
313      * for load balancing reasons (the PME grid spacing is not used).
314      */
315     
316     nslab = atc->nslab;
317     pd    = atc->pd;
318     count = atc->count;
319
320     /* Reset the count */
321     for(i=0; i<nslab; i++)
322     {
323         count[i] = 0;
324     }
325     
326     if (atc->dimind == 0)
327     {
328         rxx = recipbox[XX][XX];
329         ryx = recipbox[YY][XX];
330         rzx = recipbox[ZZ][XX];
331         /* Calculate the node index in x-dimension */
332         for(i=0; (i<natoms); i++)
333         {
334             xptr   = x[i];
335             /* Fractional coordinates along box vectors */
336             s = nslab*(xptr[XX]*rxx + xptr[YY]*ryx + xptr[ZZ]*rzx);
337             si = (int)(s + 2*nslab) % nslab;
338             pd[i] = si;
339             count[si]++;
340         }
341     }
342     else
343     {
344         ryy = recipbox[YY][YY];
345         rzy = recipbox[ZZ][YY];
346         /* Calculate the node index in y-dimension */
347         for(i=0; (i<natoms); i++)
348         {
349             xptr   = x[i];
350             /* Fractional coordinates along box vectors */
351             s = nslab*(xptr[YY]*ryy + xptr[ZZ]*rzy);
352             si = (int)(s + 2*nslab) % nslab;
353             pd[i] = si;
354             count[si]++;
355         }
356     }
357 }
358
359 static void pme_realloc_atomcomm_things(pme_atomcomm_t *atc)
360 {
361     int nalloc_old,i;
362     
363     /* We have to avoid a NULL pointer for atc->x to avoid
364      * possible fatal errors in MPI routines.
365      */
366     if (atc->n > atc->nalloc || atc->nalloc == 0)
367     {
368         nalloc_old = atc->nalloc;
369         atc->nalloc = over_alloc_dd(max(atc->n,1));
370         
371         if (atc->nslab > 1) {
372             srenew(atc->x,atc->nalloc);
373             srenew(atc->q,atc->nalloc);
374             srenew(atc->f,atc->nalloc);
375             for(i=nalloc_old; i<atc->nalloc; i++)
376             {
377                 clear_rvec(atc->f[i]);
378             }
379         }
380         if (atc->bSpread) {
381             for(i=0;i<DIM;i++) {
382                 srenew(atc->theta[i] ,atc->pme_order*atc->nalloc); 
383                 srenew(atc->dtheta[i],atc->pme_order*atc->nalloc);
384             }
385             srenew(atc->fractx,atc->nalloc); 
386             srenew(atc->idx   ,atc->nalloc);
387         }
388     }
389 }
390
391 static void pmeredist_pd(gmx_pme_t pme, gmx_bool forw,
392                          int n, gmx_bool bXF, rvec *x_f, real *charge,
393                          pme_atomcomm_t *atc)
394 /* Redistribute particle data for PME calculation */
395 /* domain decomposition by x coordinate           */
396 {
397     int *idxa;
398     int i, ii;
399     
400     if(FALSE == pme->redist_init) {
401         snew(pme->scounts,atc->nslab);
402         snew(pme->rcounts,atc->nslab);
403         snew(pme->sdispls,atc->nslab);
404         snew(pme->rdispls,atc->nslab);
405         snew(pme->sidx,atc->nslab);
406         pme->redist_init = TRUE;
407     }
408     if (n > pme->redist_buf_nalloc) {
409         pme->redist_buf_nalloc = over_alloc_dd(n);
410         srenew(pme->redist_buf,pme->redist_buf_nalloc*DIM);
411     }
412     
413     pme->idxa = atc->pd;
414
415 #ifdef GMX_MPI
416     if (forw && bXF) {
417         /* forward, redistribution from pp to pme */ 
418         
419         /* Calculate send counts and exchange them with other nodes */
420         for(i=0; (i<atc->nslab); i++) pme->scounts[i]=0;
421         for(i=0; (i<n); i++) pme->scounts[pme->idxa[i]]++;
422         MPI_Alltoall( pme->scounts, 1, MPI_INT, pme->rcounts, 1, MPI_INT, atc->mpi_comm);
423         
424         /* Calculate send and receive displacements and index into send 
425            buffer */
426         pme->sdispls[0]=0;
427         pme->rdispls[0]=0;
428         pme->sidx[0]=0;
429         for(i=1; i<atc->nslab; i++) {
430             pme->sdispls[i]=pme->sdispls[i-1]+pme->scounts[i-1];
431             pme->rdispls[i]=pme->rdispls[i-1]+pme->rcounts[i-1];
432             pme->sidx[i]=pme->sdispls[i];
433         }
434         /* Total # of particles to be received */
435         atc->n = pme->rdispls[atc->nslab-1] + pme->rcounts[atc->nslab-1];
436         
437         pme_realloc_atomcomm_things(atc);
438         
439         /* Copy particle coordinates into send buffer and exchange*/
440         for(i=0; (i<n); i++) {
441             ii=DIM*pme->sidx[pme->idxa[i]];
442             pme->sidx[pme->idxa[i]]++;
443             pme->redist_buf[ii+XX]=x_f[i][XX];
444             pme->redist_buf[ii+YY]=x_f[i][YY];
445             pme->redist_buf[ii+ZZ]=x_f[i][ZZ];
446         }
447         MPI_Alltoallv(pme->redist_buf, pme->scounts, pme->sdispls, 
448                       pme->rvec_mpi, atc->x, pme->rcounts, pme->rdispls, 
449                       pme->rvec_mpi, atc->mpi_comm);
450     }
451     if (forw) {
452         /* Copy charge into send buffer and exchange*/
453         for(i=0; i<atc->nslab; i++) pme->sidx[i]=pme->sdispls[i];
454         for(i=0; (i<n); i++) {
455             ii=pme->sidx[pme->idxa[i]];
456             pme->sidx[pme->idxa[i]]++;
457             pme->redist_buf[ii]=charge[i];
458         }
459         MPI_Alltoallv(pme->redist_buf, pme->scounts, pme->sdispls, mpi_type,
460                       atc->q, pme->rcounts, pme->rdispls, mpi_type,
461                       atc->mpi_comm);
462     }
463     else { /* backward, redistribution from pme to pp */ 
464         MPI_Alltoallv(atc->f, pme->rcounts, pme->rdispls, pme->rvec_mpi,
465                       pme->redist_buf, pme->scounts, pme->sdispls, 
466                       pme->rvec_mpi, atc->mpi_comm);
467         
468         /* Copy data from receive buffer */
469         for(i=0; i<atc->nslab; i++)
470             pme->sidx[i] = pme->sdispls[i];
471         for(i=0; (i<n); i++) {
472             ii = DIM*pme->sidx[pme->idxa[i]];
473             x_f[i][XX] += pme->redist_buf[ii+XX];
474             x_f[i][YY] += pme->redist_buf[ii+YY];
475             x_f[i][ZZ] += pme->redist_buf[ii+ZZ];
476             pme->sidx[pme->idxa[i]]++;
477         }
478     }
479 #endif 
480 }
481
482 static void pme_dd_sendrecv(pme_atomcomm_t *atc,
483                             gmx_bool bBackward,int shift,
484                             void *buf_s,int nbyte_s,
485                             void *buf_r,int nbyte_r)
486 {
487 #ifdef GMX_MPI
488     int dest,src;
489     MPI_Status stat;
490     
491     if (bBackward == FALSE) {
492         dest = atc->node_dest[shift];
493         src  = atc->node_src[shift];
494     } else {
495         dest = atc->node_src[shift];
496         src  = atc->node_dest[shift];
497     }
498     
499     if (nbyte_s > 0 && nbyte_r > 0) {
500         MPI_Sendrecv(buf_s,nbyte_s,MPI_BYTE,
501                      dest,shift,
502                      buf_r,nbyte_r,MPI_BYTE,
503                      src,shift,
504                      atc->mpi_comm,&stat);
505     } else if (nbyte_s > 0) {
506         MPI_Send(buf_s,nbyte_s,MPI_BYTE,
507                  dest,shift,
508                  atc->mpi_comm);
509     } else if (nbyte_r > 0) {
510         MPI_Recv(buf_r,nbyte_r,MPI_BYTE,
511                  src,shift,
512                  atc->mpi_comm,&stat);
513     }
514 #endif
515 }
516
517 static void dd_pmeredist_x_q(gmx_pme_t pme, 
518                              int n, gmx_bool bX, rvec *x, real *charge,
519                              pme_atomcomm_t *atc)
520 {
521     int *commnode,*buf_index;
522     int nnodes_comm,i,nsend,local_pos,buf_pos,node,scount,rcount;
523     
524     commnode  = atc->node_dest;
525     buf_index = atc->buf_index;
526     
527     nnodes_comm = min(2*atc->maxshift,atc->nslab-1);
528     
529     nsend = 0;
530     for(i=0; i<nnodes_comm; i++) {
531         buf_index[commnode[i]] = nsend;
532         nsend += atc->count[commnode[i]];
533     }
534     if (bX) {
535         if (atc->count[atc->nodeid] + nsend != n)
536             gmx_fatal(FARGS,"%d particles communicated to PME node %d are more than 2/3 times the cut-off out of the domain decomposition cell of their charge group in dimension %c.\n"
537                       "This usually means that your system is not well equilibrated.",
538                       n - (atc->count[atc->nodeid] + nsend),
539                       pme->nodeid,'x'+atc->dimind);
540         
541         if (nsend > pme->buf_nalloc) {
542             pme->buf_nalloc = over_alloc_dd(nsend);
543             srenew(pme->bufv,pme->buf_nalloc);
544             srenew(pme->bufr,pme->buf_nalloc);
545         }
546         
547         atc->n = atc->count[atc->nodeid];
548         for(i=0; i<nnodes_comm; i++) {
549             scount = atc->count[commnode[i]];
550             /* Communicate the count */
551             if (debug)
552                 fprintf(debug,"dimind %d PME node %d send to node %d: %d\n",
553                         atc->dimind,atc->nodeid,commnode[i],scount);
554             pme_dd_sendrecv(atc,FALSE,i,
555                             &scount,sizeof(int),
556                             &atc->rcount[i],sizeof(int));
557             atc->n += atc->rcount[i];
558         }
559         
560         pme_realloc_atomcomm_things(atc);
561     }
562     
563     local_pos = 0;
564     for(i=0; i<n; i++) {
565         node = atc->pd[i];
566         if (node == atc->nodeid) {
567             /* Copy direct to the receive buffer */
568             if (bX) {
569                 copy_rvec(x[i],atc->x[local_pos]);
570             }
571             atc->q[local_pos] = charge[i];
572             local_pos++;
573         } else {
574             /* Copy to the send buffer */
575             if (bX) {
576                 copy_rvec(x[i],pme->bufv[buf_index[node]]);
577             }
578             pme->bufr[buf_index[node]] = charge[i];
579             buf_index[node]++;
580         }
581     }
582     
583     buf_pos = 0;
584     for(i=0; i<nnodes_comm; i++) {
585         scount = atc->count[commnode[i]];
586         rcount = atc->rcount[i];
587         if (scount > 0 || rcount > 0) {
588             if (bX) {
589                 /* Communicate the coordinates */
590                 pme_dd_sendrecv(atc,FALSE,i,
591                                 pme->bufv[buf_pos],scount*sizeof(rvec),
592                                 atc->x[local_pos],rcount*sizeof(rvec));
593             }
594             /* Communicate the charges */
595             pme_dd_sendrecv(atc,FALSE,i,
596                             pme->bufr+buf_pos,scount*sizeof(real),
597                             atc->q+local_pos,rcount*sizeof(real));
598             buf_pos   += scount;
599             local_pos += atc->rcount[i];
600         }
601     }
602 }
603
604 static void dd_pmeredist_f(gmx_pme_t pme, pme_atomcomm_t *atc,
605                            int n, rvec *f,
606                            gmx_bool bAddF)
607 {
608   int *commnode,*buf_index;
609   int nnodes_comm,local_pos,buf_pos,i,scount,rcount,node;
610
611   commnode  = atc->node_dest;
612   buf_index = atc->buf_index;
613
614   nnodes_comm = min(2*atc->maxshift,atc->nslab-1);
615
616   local_pos = atc->count[atc->nodeid];
617   buf_pos = 0;
618   for(i=0; i<nnodes_comm; i++) {
619     scount = atc->rcount[i];
620     rcount = atc->count[commnode[i]];
621     if (scount > 0 || rcount > 0) {
622       /* Communicate the forces */
623       pme_dd_sendrecv(atc,TRUE,i,
624                       atc->f[local_pos],scount*sizeof(rvec),
625                       pme->bufv[buf_pos],rcount*sizeof(rvec));
626       local_pos += scount;
627     }
628     buf_index[commnode[i]] = buf_pos;
629     buf_pos   += rcount;
630   }
631
632     local_pos = 0;
633     if (bAddF)
634     {
635         for(i=0; i<n; i++)
636         {
637             node = atc->pd[i];
638             if (node == atc->nodeid)
639             {
640                 /* Add from the local force array */
641                 rvec_inc(f[i],atc->f[local_pos]);
642                 local_pos++;
643             }
644             else
645             {
646                 /* Add from the receive buffer */
647                 rvec_inc(f[i],pme->bufv[buf_index[node]]);
648                 buf_index[node]++;
649             }
650         }
651     }
652     else
653     {
654         for(i=0; i<n; i++)
655         {
656             node = atc->pd[i];
657             if (node == atc->nodeid)
658             {
659                 /* Copy from the local force array */
660                 copy_rvec(atc->f[local_pos],f[i]);
661                 local_pos++;
662             }
663             else
664             {
665                 /* Copy from the receive buffer */
666                 copy_rvec(pme->bufv[buf_index[node]],f[i]);
667                 buf_index[node]++;
668             }
669         }
670     }
671 }
672
673 #ifdef GMX_MPI
674 static void 
675 gmx_sum_qgrid_dd(gmx_pme_t pme, real *grid, int direction)
676 {
677     pme_overlap_t *overlap;
678     int send_index0,send_nindex;
679     int recv_index0,recv_nindex;
680     MPI_Status stat;
681     int i,j,k,ix,iy,iz,icnt;
682     int ipulse,send_id,recv_id,datasize;
683     real *p;
684     real *sendptr,*recvptr;
685     
686     /* Start with minor-rank communication. This is a bit of a pain since it is not contiguous */
687     overlap = &pme->overlap[1];
688     
689     for(ipulse=0;ipulse<overlap->noverlap_nodes;ipulse++)
690     {
691         /* Since we have already (un)wrapped the overlap in the z-dimension,
692          * we only have to communicate 0 to nkz (not pmegrid_nz).
693          */
694         if (direction==GMX_SUM_QGRID_FORWARD)
695         {
696             send_id = overlap->send_id[ipulse];
697             recv_id = overlap->recv_id[ipulse];
698             send_index0   = overlap->comm_data[ipulse].send_index0;
699             send_nindex   = overlap->comm_data[ipulse].send_nindex;
700             recv_index0   = overlap->comm_data[ipulse].recv_index0;
701             recv_nindex   = overlap->comm_data[ipulse].recv_nindex;
702         }
703         else
704         {
705             send_id = overlap->recv_id[ipulse];
706             recv_id = overlap->send_id[ipulse];
707             send_index0   = overlap->comm_data[ipulse].recv_index0;
708             send_nindex   = overlap->comm_data[ipulse].recv_nindex;            
709             recv_index0   = overlap->comm_data[ipulse].send_index0;
710             recv_nindex   = overlap->comm_data[ipulse].send_nindex;
711         }
712
713         /* Copy data to contiguous send buffer */
714         if (debug)
715         {
716             fprintf(debug,"PME send node %d %d -> %d grid start %d Communicating %d to %d\n",
717                     pme->nodeid,overlap->nodeid,send_id,
718                     pme->pmegrid_start_iy,
719                     send_index0-pme->pmegrid_start_iy,
720                     send_index0-pme->pmegrid_start_iy+send_nindex);
721         }
722         icnt = 0;
723         for(i=0;i<pme->pmegrid_nx;i++)
724         {
725             ix = i;
726             for(j=0;j<send_nindex;j++)
727             {
728                 iy = j + send_index0 - pme->pmegrid_start_iy;
729                 for(k=0;k<pme->nkz;k++)
730                 {
731                     iz = k;
732                     pme->pmegrid_sendbuf[icnt++] = grid[ix*(pme->pmegrid_ny*pme->pmegrid_nz)+iy*(pme->pmegrid_nz)+iz];
733                 }
734             }
735         }
736             
737         datasize      = pme->pmegrid_nx * pme->nkz;
738         
739         MPI_Sendrecv(pme->pmegrid_sendbuf,send_nindex*datasize,GMX_MPI_REAL,
740                      send_id,ipulse,
741                      pme->pmegrid_recvbuf,recv_nindex*datasize,GMX_MPI_REAL,
742                      recv_id,ipulse,
743                      overlap->mpi_comm,&stat);
744         
745         /* Get data from contiguous recv buffer */
746         if (debug)
747         {
748             fprintf(debug,"PME recv node %d %d <- %d grid start %d Communicating %d to %d\n",
749                     pme->nodeid,overlap->nodeid,recv_id,
750                     pme->pmegrid_start_iy,
751                     recv_index0-pme->pmegrid_start_iy,
752                     recv_index0-pme->pmegrid_start_iy+recv_nindex);
753         }
754         icnt = 0;
755         for(i=0;i<pme->pmegrid_nx;i++)
756         {
757             ix = i;
758             for(j=0;j<recv_nindex;j++)
759             {
760                 iy = j + recv_index0 - pme->pmegrid_start_iy;
761                 for(k=0;k<pme->nkz;k++)
762                 {
763                     iz = k;
764                     if(direction==GMX_SUM_QGRID_FORWARD)
765                     {
766                         grid[ix*(pme->pmegrid_ny*pme->pmegrid_nz)+iy*(pme->pmegrid_nz)+iz] += pme->pmegrid_recvbuf[icnt++];
767                     }
768                     else
769                     {
770                         grid[ix*(pme->pmegrid_ny*pme->pmegrid_nz)+iy*(pme->pmegrid_nz)+iz]  = pme->pmegrid_recvbuf[icnt++];
771                     }
772                 }
773             }
774         }
775     }
776     
777     /* Major dimension is easier, no copying required,
778      * but we might have to sum to separate array.
779      * Since we don't copy, we have to communicate up to pmegrid_nz,
780      * not nkz as for the minor direction.
781      */
782     overlap = &pme->overlap[0];
783     
784     for(ipulse=0;ipulse<overlap->noverlap_nodes;ipulse++)
785     {
786         if(direction==GMX_SUM_QGRID_FORWARD)
787         {
788             send_id = overlap->send_id[ipulse];
789             recv_id = overlap->recv_id[ipulse];
790             send_index0   = overlap->comm_data[ipulse].send_index0;
791             send_nindex   = overlap->comm_data[ipulse].send_nindex;
792             recv_index0   = overlap->comm_data[ipulse].recv_index0;
793             recv_nindex   = overlap->comm_data[ipulse].recv_nindex;
794             recvptr   = pme->pmegrid_recvbuf;
795         }
796         else
797         {
798             send_id = overlap->recv_id[ipulse];
799             recv_id = overlap->send_id[ipulse];
800             send_index0   = overlap->comm_data[ipulse].recv_index0;
801             send_nindex   = overlap->comm_data[ipulse].recv_nindex;            
802             recv_index0   = overlap->comm_data[ipulse].send_index0;
803             recv_nindex   = overlap->comm_data[ipulse].send_nindex;
804             recvptr   = grid + (recv_index0-pme->pmegrid_start_ix)*(pme->pmegrid_ny*pme->pmegrid_nz);
805         }
806                 
807         sendptr       = grid + (send_index0-pme->pmegrid_start_ix)*(pme->pmegrid_ny*pme->pmegrid_nz);
808         datasize      = pme->pmegrid_ny * pme->pmegrid_nz;
809
810         if (debug)
811         {
812             fprintf(debug,"PME send node %d %d -> %d grid start %d Communicating %d to %d\n",
813                     pme->nodeid,overlap->nodeid,send_id,
814                     pme->pmegrid_start_ix,
815                     send_index0-pme->pmegrid_start_ix,
816                     send_index0-pme->pmegrid_start_ix+send_nindex);
817             fprintf(debug,"PME recv node %d %d <- %d grid start %d Communicating %d to %d\n",
818                     pme->nodeid,overlap->nodeid,recv_id,
819                     pme->pmegrid_start_ix,
820                     recv_index0-pme->pmegrid_start_ix,
821                     recv_index0-pme->pmegrid_start_ix+recv_nindex);
822         }
823
824         MPI_Sendrecv(sendptr,send_nindex*datasize,GMX_MPI_REAL,
825                      send_id,ipulse,
826                      recvptr,recv_nindex*datasize,GMX_MPI_REAL,
827                      recv_id,ipulse,
828                      overlap->mpi_comm,&stat);
829         
830         /* ADD data from contiguous recv buffer */
831         if(direction==GMX_SUM_QGRID_FORWARD)
832         {        
833             p = grid + (recv_index0-pme->pmegrid_start_ix)*(pme->pmegrid_ny*pme->pmegrid_nz);
834             for(i=0;i<recv_nindex*datasize;i++)
835             {
836                 p[i] += pme->pmegrid_recvbuf[i];
837             }
838         }
839     }
840 }
841 #endif
842
843
844 static int
845 copy_pmegrid_to_fftgrid(gmx_pme_t pme, real *pmegrid, real *fftgrid)
846 {
847     ivec    local_fft_ndata,local_fft_offset,local_fft_size;
848     ivec    local_pme_size;
849     int     i,ix,iy,iz;
850     int     pmeidx,fftidx;
851
852     /* Dimensions should be identical for A/B grid, so we just use A here */
853     gmx_parallel_3dfft_real_limits(pme->pfft_setupA,
854                                    local_fft_ndata,
855                                    local_fft_offset,
856                                    local_fft_size);
857     
858     local_pme_size[0] = pme->pmegrid_nx;
859     local_pme_size[1] = pme->pmegrid_ny;
860     local_pme_size[2] = pme->pmegrid_nz;
861     
862     /* The fftgrid is always 'justified' to the lower-left corner of the PME grid, 
863      the offset is identical, and the PME grid always has more data (due to overlap)
864      */
865     {
866 #ifdef DEBUG_PME
867         FILE *fp,*fp2;
868         char fn[STRLEN],format[STRLEN];
869         real val;
870         sprintf(fn,"pmegrid%d.pdb",pme->nodeid);
871         fp = ffopen(fn,"w");
872         sprintf(fn,"pmegrid%d.txt",pme->nodeid);
873         fp2 = ffopen(fn,"w");
874      sprintf(format,"%s%s\n",pdbformat,"%6.2f%6.2f");
875 #endif
876     for(ix=0;ix<local_fft_ndata[XX];ix++)
877     {
878         for(iy=0;iy<local_fft_ndata[YY];iy++)
879         {
880             for(iz=0;iz<local_fft_ndata[ZZ];iz++)
881             {
882                 pmeidx = ix*(local_pme_size[YY]*local_pme_size[ZZ])+iy*(local_pme_size[ZZ])+iz;
883                 fftidx = ix*(local_fft_size[YY]*local_fft_size[ZZ])+iy*(local_fft_size[ZZ])+iz;
884                 fftgrid[fftidx] = pmegrid[pmeidx];
885 #ifdef DEBUG_PME
886                 val = 100*pmegrid[pmeidx];
887                 if (pmegrid[pmeidx] != 0)
888                 fprintf(fp,format,"ATOM",pmeidx,"CA","GLY",' ',pmeidx,' ',
889                         5.0*ix,5.0*iy,5.0*iz,1.0,val);
890                 if (pmegrid[pmeidx] != 0)
891                     fprintf(fp2,"%-12s  %5d  %5d  %5d  %12.5e\n",
892                             "qgrid",
893                             pme->pmegrid_start_ix + ix,
894                             pme->pmegrid_start_iy + iy,
895                             pme->pmegrid_start_iz + iz,
896                             pmegrid[pmeidx]);
897 #endif
898             }
899         }
900     }
901 #ifdef DEBUG_PME
902     fclose(fp);
903     fclose(fp2);
904 #endif
905     }
906     return 0;
907 }
908
909
910 static int
911 copy_fftgrid_to_pmegrid(gmx_pme_t pme, real *fftgrid, real *pmegrid)
912 {
913     ivec    local_fft_ndata,local_fft_offset,local_fft_size;
914     ivec    local_pme_size;
915     int     i,ix,iy,iz;
916     int     pmeidx,fftidx;
917     
918     /* Dimensions should be identical for A/B grid, so we just use A here */
919     gmx_parallel_3dfft_real_limits(pme->pfft_setupA,
920                                    local_fft_ndata,
921                                    local_fft_offset,
922                                    local_fft_size);
923
924     local_pme_size[0] = pme->pmegrid_nx;
925     local_pme_size[1] = pme->pmegrid_ny;
926     local_pme_size[2] = pme->pmegrid_nz;
927     
928     /* The fftgrid is always 'justified' to the lower-left corner of the PME grid, 
929      the offset is identical, and the PME grid always has more data (due to overlap)
930      */
931     for(ix=0;ix<local_fft_ndata[XX];ix++)
932     {
933         for(iy=0;iy<local_fft_ndata[YY];iy++)
934         {
935             for(iz=0;iz<local_fft_ndata[ZZ];iz++)
936             {
937                 pmeidx = ix*(local_pme_size[YY]*local_pme_size[ZZ])+iy*(local_pme_size[ZZ])+iz;
938                 fftidx = ix*(local_fft_size[YY]*local_fft_size[ZZ])+iy*(local_fft_size[ZZ])+iz;
939                 pmegrid[pmeidx] = fftgrid[fftidx];
940             }
941         }
942     }   
943     return 0;
944 }
945
946
947 static void
948 wrap_periodic_pmegrid(gmx_pme_t pme, real *pmegrid)
949 {
950     int     nx,ny,nz,pnx,pny,pnz,ny_x,overlap,ix,iy,iz;
951
952     nx = pme->nkx;
953     ny = pme->nky;
954     nz = pme->nkz;
955
956     pnx = pme->pmegrid_nx;
957     pny = pme->pmegrid_ny;
958     pnz = pme->pmegrid_nz;
959
960     overlap = pme->pme_order - 1;
961
962     /* Add periodic overlap in z */
963     for(ix=0; ix<pnx; ix++)
964     {
965         for(iy=0; iy<pny; iy++)
966         {
967             for(iz=0; iz<overlap; iz++)
968             {
969                 pmegrid[(ix*pny+iy)*pnz+iz] +=
970                     pmegrid[(ix*pny+iy)*pnz+nz+iz];
971             }
972         }
973     }
974
975     if (pme->nnodes_minor == 1)
976     {
977        for(ix=0; ix<pnx; ix++)
978        {
979            for(iy=0; iy<overlap; iy++)
980            {
981                for(iz=0; iz<nz; iz++)
982                {
983                    pmegrid[(ix*pny+iy)*pnz+iz] +=
984                        pmegrid[(ix*pny+ny+iy)*pnz+iz];
985                }
986            }
987        }
988     }
989      
990     if (pme->nnodes_major == 1)
991     {
992         ny_x = (pme->nnodes_minor == 1 ? ny : pny);
993
994         for(ix=0; ix<overlap; ix++)
995         {
996             for(iy=0; iy<ny_x; iy++)
997             {
998                 for(iz=0; iz<nz; iz++)
999                 {
1000                     pmegrid[(ix*pny+iy)*pnz+iz] +=
1001                         pmegrid[((nx+ix)*pny+iy)*pnz+iz];
1002                 }
1003             }
1004         }
1005     }
1006 }
1007
1008
1009 static void
1010 unwrap_periodic_pmegrid(gmx_pme_t pme, real *pmegrid)
1011 {
1012     int     nx,ny,nz,pnx,pny,pnz,ny_x,overlap,ix,iy,iz;
1013
1014     nx = pme->nkx;
1015     ny = pme->nky;
1016     nz = pme->nkz;
1017
1018     pnx = pme->pmegrid_nx;
1019     pny = pme->pmegrid_ny;
1020     pnz = pme->pmegrid_nz;
1021
1022     overlap = pme->pme_order - 1;
1023
1024     if (pme->nnodes_major == 1)
1025     {
1026         ny_x = (pme->nnodes_minor == 1 ? ny : pny);
1027
1028         for(ix=0; ix<overlap; ix++)
1029         {
1030             for(iy=0; iy<ny_x; iy++)
1031             {
1032                 for(iz=0; iz<nz; iz++)
1033                 {
1034                     pmegrid[((nx+ix)*pny+iy)*pnz+iz] =
1035                         pmegrid[(ix*pny+iy)*pnz+iz];
1036                 }
1037             }
1038         }
1039     }
1040
1041     if (pme->nnodes_minor == 1)
1042     {
1043        for(ix=0; ix<pnx; ix++)
1044        {
1045            for(iy=0; iy<overlap; iy++)
1046            {
1047                for(iz=0; iz<nz; iz++)
1048                {
1049                    pmegrid[(ix*pny+ny+iy)*pnz+iz] =
1050                        pmegrid[(ix*pny+iy)*pnz+iz];
1051                }
1052            }
1053        }
1054     }
1055
1056     /* Copy periodic overlap in z */
1057     for(ix=0; ix<pnx; ix++)
1058     {
1059         for(iy=0; iy<pny; iy++)
1060         {
1061             for(iz=0; iz<overlap; iz++)
1062             {
1063                 pmegrid[(ix*pny+iy)*pnz+nz+iz] =
1064                     pmegrid[(ix*pny+iy)*pnz+iz];
1065             }
1066         }
1067     }
1068 }
1069
1070
1071 /* This has to be a macro to enable full compiler optimization with xlC (and probably others too) */
1072 #define DO_BSPLINE(order)                            \
1073 for(ithx=0; (ithx<order); ithx++)                    \
1074 {                                                    \
1075     index_x = (i0+ithx)*pny*pnz;                     \
1076     valx    = qn*thx[ithx];                          \
1077                                                      \
1078     for(ithy=0; (ithy<order); ithy++)                \
1079     {                                                \
1080         valxy    = valx*thy[ithy];                   \
1081         index_xy = index_x+(j0+ithy)*pnz;            \
1082                                                      \
1083         for(ithz=0; (ithz<order); ithz++)            \
1084         {                                            \
1085             index_xyz        = index_xy+(k0+ithz);   \
1086             grid[index_xyz] += valxy*thz[ithz];      \
1087         }                                            \
1088     }                                                \
1089 }
1090
1091
1092 static void spread_q_bsplines(gmx_pme_t pme, pme_atomcomm_t *atc, 
1093                               real *grid)
1094 {
1095
1096     /* spread charges from home atoms to local grid */
1097     pme_overlap_t *ol;
1098     int      b,i,nn,n,ithx,ithy,ithz,i0,j0,k0;
1099     int *    idxptr;
1100     int      order,norder,index_x,index_xy,index_xyz;
1101     real     valx,valxy,qn;
1102     real     *thx,*thy,*thz;
1103     int      localsize, bndsize;
1104   
1105     int      pnx,pny,pnz,ndatatot;
1106   
1107     pnx = pme->pmegrid_nx;
1108     pny = pme->pmegrid_ny;
1109     pnz = pme->pmegrid_nz;
1110     ndatatot = pnx*pny*pnz;
1111     
1112     for(i=0;i<ndatatot;i++)
1113     {
1114         grid[i] = 0;
1115     }
1116
1117     order = pme->pme_order;
1118
1119     for(nn=0; (nn<atc->n);nn++) 
1120     {
1121         n      = nn;
1122         qn     = atc->q[n];
1123
1124         if (qn != 0) 
1125         {
1126             idxptr = atc->idx[n];
1127             norder = n*order;
1128             
1129             i0   = idxptr[XX]; 
1130             j0   = idxptr[YY];
1131             k0   = idxptr[ZZ];
1132             thx = atc->theta[XX] + norder;
1133             thy = atc->theta[YY] + norder;
1134             thz = atc->theta[ZZ] + norder;
1135             
1136             switch (order) {
1137             case 4:  DO_BSPLINE(4);     break;
1138             case 5:  DO_BSPLINE(5);     break;
1139             default: DO_BSPLINE(order); break;
1140             }
1141         }
1142     }   
1143 }
1144
1145
1146 #if ( !defined(GMX_DOUBLE) && ( defined(GMX_IA32_SSE) || defined(GMX_X86_64_SSE) || defined(GMX_X86_64_SSE2) ) )
1147     /* Calculate exponentials through SSE in float precision */
1148 #define CALC_EXPONENTIALS(start,end,r_aligned)      \
1149     {                                               \
1150         __m128 tmp_sse;                             \
1151         for(kx=0; kx<end; kx+=4)                    \
1152         {                                           \
1153             tmp_sse = _mm_load_ps(r_aligned+kx);    \
1154             tmp_sse = gmx_mm_exp_ps(tmp_sse);       \
1155             _mm_store_ps(r_aligned+kx,tmp_sse);     \
1156         }                                           \
1157     }
1158 #else
1159 #define CALC_EXPONENTIALS(start,end,r)          \
1160     for(kx=start; kx<end; kx++)                 \
1161     {                                           \
1162         r[kx] = exp(r[kx]);                     \
1163     }
1164 #endif
1165
1166
1167 static int solve_pme_yzx(gmx_pme_t pme,t_complex *grid,
1168                          real ewaldcoeff,real vol,
1169                          gmx_bool bEnerVir,real *mesh_energy,matrix vir)
1170 {
1171     /* do recip sum over local cells in grid */
1172     /* y major, z middle, x minor or continuous */
1173     t_complex *p0;
1174     int     kx,ky,kz,maxkx,maxky,maxkz;
1175     int     nx,ny,nz,iy,iz,kxstart,kxend;
1176     real    mx,my,mz;
1177     real    factor=M_PI*M_PI/(ewaldcoeff*ewaldcoeff);
1178     real    ets2,struct2,vfactor,ets2vf;
1179     real    eterm,d1,d2,energy=0;
1180     real    by,bz;
1181     real    virxx=0,virxy=0,virxz=0,viryy=0,viryz=0,virzz=0;
1182     real    rxx,ryx,ryy,rzx,rzy,rzz;
1183         real    *mhx,*mhy,*mhz,*m2,*denom,*tmp1,*m2inv;
1184     real    mhxk,mhyk,mhzk,m2k;
1185     real    corner_fac;
1186     ivec    complex_order;
1187     ivec    local_ndata,local_offset,local_size;
1188     
1189     nx = pme->nkx;
1190     ny = pme->nky;
1191     nz = pme->nkz;
1192     
1193     /* Dimensions should be identical for A/B grid, so we just use A here */
1194     gmx_parallel_3dfft_complex_limits(pme->pfft_setupA,
1195                                       complex_order,
1196                                       local_ndata,
1197                                       local_offset,
1198                                       local_size);
1199     
1200     rxx = pme->recipbox[XX][XX];
1201     ryx = pme->recipbox[YY][XX];
1202     ryy = pme->recipbox[YY][YY];
1203     rzx = pme->recipbox[ZZ][XX];
1204     rzy = pme->recipbox[ZZ][YY];
1205     rzz = pme->recipbox[ZZ][ZZ];
1206     
1207     maxkx = (nx+1)/2;
1208     maxky = (ny+1)/2;
1209     maxkz = nz/2+1;
1210         
1211         mhx   = pme->work_mhx;
1212         mhy   = pme->work_mhy;
1213         mhz   = pme->work_mhz;
1214         m2    = pme->work_m2;
1215         denom = pme->work_denom;
1216         tmp1  = pme->work_tmp1;
1217         m2inv = pme->work_m2inv;        
1218
1219     for(iy=0;iy<local_ndata[YY];iy++)
1220     {
1221         ky = iy + local_offset[YY];
1222         
1223         if (ky < maxky) 
1224         {
1225             my = ky;
1226         }
1227         else 
1228         {
1229             my = (ky - ny);
1230         }
1231         
1232         by = M_PI*vol*pme->bsp_mod[YY][ky];
1233
1234         for(iz=0;iz<local_ndata[ZZ];iz++)
1235         {
1236             kz = iz + local_offset[ZZ];
1237             
1238             mz = kz;
1239
1240             bz = pme->bsp_mod[ZZ][kz];
1241             
1242             /* 0.5 correction for corner points */
1243                         corner_fac = 1;
1244             if (kz == 0)
1245                 corner_fac = 0.5;
1246             if (kz == (nz+1)/2)
1247                 corner_fac = 0.5;
1248                       
1249             p0 = grid + iy*local_size[ZZ]*local_size[XX] + iz*local_size[XX];
1250             
1251             /* We should skip the k-space point (0,0,0) */
1252             if (local_offset[XX] > 0 ||
1253                 local_offset[YY] > 0 || ky > 0 ||
1254                 kz > 0)
1255             {
1256                 kxstart = local_offset[XX];
1257             }
1258             else
1259             {
1260                 kxstart = local_offset[XX] + 1;
1261                 p0++;
1262             }
1263             kxend = local_offset[XX] + local_ndata[XX];
1264                         
1265             if (bEnerVir)
1266             {
1267                 /* More expensive inner loop, especially because of the storage
1268                  * of the mh elements in array's.
1269                  * Because x is the minor grid index, all mh elements
1270                  * depend on kx for triclinic unit cells.
1271                  */
1272
1273                 /* Two explicit loops to avoid a conditional inside the loop */
1274                 for(kx=kxstart; kx<maxkx; kx++)
1275                 {
1276                     mx = kx;
1277                     
1278                     mhxk      = mx * rxx;
1279                     mhyk      = mx * ryx + my * ryy;
1280                     mhzk      = mx * rzx + my * rzy + mz * rzz;
1281                     m2k       = mhxk*mhxk + mhyk*mhyk + mhzk*mhzk;
1282                     mhx[kx]   = mhxk;
1283                     mhy[kx]   = mhyk;
1284                     mhz[kx]   = mhzk;
1285                     m2[kx]    = m2k;
1286                     denom[kx] = m2k*bz*by*pme->bsp_mod[XX][kx];
1287                     tmp1[kx]  = -factor*m2k;
1288                 }
1289                 
1290                 for(kx=maxkx; kx<kxend; kx++)
1291                 {
1292                     mx = (kx - nx);
1293
1294                     mhxk      = mx * rxx;
1295                     mhyk      = mx * ryx + my * ryy;
1296                     mhzk      = mx * rzx + my * rzy + mz * rzz;
1297                     m2k       = mhxk*mhxk + mhyk*mhyk + mhzk*mhzk;
1298                     mhx[kx]   = mhxk;
1299                     mhy[kx]   = mhyk;
1300                     mhz[kx]   = mhzk;
1301                     m2[kx]    = m2k;
1302                     denom[kx] = m2k*bz*by*pme->bsp_mod[XX][kx];
1303                     tmp1[kx]  = -factor*m2k;
1304                 }
1305                 
1306                 for(kx=kxstart; kx<kxend; kx++)
1307                 {
1308                     m2inv[kx] = 1.0/m2[kx];
1309                 }
1310                 for(kx=kxstart; kx<kxend; kx++)
1311                 {
1312                     denom[kx] = 1.0/denom[kx];
1313                 }
1314
1315                 CALC_EXPONENTIALS(kxstart,kxend,tmp1);
1316
1317                 for(kx=kxstart; kx<kxend; kx++,p0++)
1318                 {
1319                     d1      = p0->re;
1320                     d2      = p0->im;
1321                     
1322                     eterm    = ONE_4PI_EPS0/pme->epsilon_r*tmp1[kx]*denom[kx];
1323                     
1324                     p0->re  = d1*eterm;
1325                     p0->im  = d2*eterm;
1326                     
1327                     struct2 = 2.0*(d1*d1+d2*d2);
1328                     
1329                     tmp1[kx] = eterm*struct2;
1330                 }
1331                 
1332                 for(kx=kxstart; kx<kxend; kx++)
1333                 {
1334                     ets2     = corner_fac*tmp1[kx];
1335                     vfactor  = (factor*m2[kx] + 1.0)*2.0*m2inv[kx];
1336                     energy  += ets2;
1337                     
1338                     ets2vf   = ets2*vfactor;
1339                     virxx   += ets2vf*mhx[kx]*mhx[kx] - ets2;
1340                     virxy   += ets2vf*mhx[kx]*mhy[kx];
1341                     virxz   += ets2vf*mhx[kx]*mhz[kx];
1342                     viryy   += ets2vf*mhy[kx]*mhy[kx] - ets2;
1343                     viryz   += ets2vf*mhy[kx]*mhz[kx];
1344                     virzz   += ets2vf*mhz[kx]*mhz[kx] - ets2;
1345                 }
1346             }
1347             else
1348             {
1349                 /* We don't need to calculate the energy and the virial.
1350                  * In this case the triclinic overhead is small.
1351                  */
1352
1353                 /* Two explicit loops to avoid a conditional inside the loop */
1354
1355                 for(kx=kxstart; kx<maxkx; kx++)
1356                 {
1357                     mx = kx;
1358                     
1359                     mhxk      = mx * rxx;
1360                     mhyk      = mx * ryx + my * ryy;
1361                     mhzk      = mx * rzx + my * rzy + mz * rzz;
1362                     m2k       = mhxk*mhxk + mhyk*mhyk + mhzk*mhzk;
1363                     denom[kx] = m2k*bz*by*pme->bsp_mod[XX][kx];
1364                     tmp1[kx]  = -factor*m2k;
1365                 }
1366                 
1367                 for(kx=maxkx; kx<kxend; kx++)
1368                 {
1369                     mx = (kx - nx);
1370                     
1371                     mhxk      = mx * rxx;
1372                     mhyk      = mx * ryx + my * ryy;
1373                     mhzk      = mx * rzx + my * rzy + mz * rzz;
1374                     m2k       = mhxk*mhxk + mhyk*mhyk + mhzk*mhzk;
1375                     denom[kx] = m2k*bz*by*pme->bsp_mod[XX][kx];
1376                     tmp1[kx]  = -factor*m2k;
1377                 }
1378                 
1379                 for(kx=kxstart; kx<kxend; kx++)
1380                 {
1381                     denom[kx] = 1.0/denom[kx];
1382                 }
1383
1384                 CALC_EXPONENTIALS(kxstart,kxend,tmp1);
1385                
1386                 for(kx=kxstart; kx<kxend; kx++,p0++)
1387                 {
1388                     d1      = p0->re;
1389                     d2      = p0->im;
1390                     
1391                     eterm    = ONE_4PI_EPS0/pme->epsilon_r*tmp1[kx]*denom[kx];
1392                     
1393                     p0->re  = d1*eterm;
1394                     p0->im  = d2*eterm;
1395                 }
1396             }
1397         }
1398     }
1399     
1400     if (bEnerVir)
1401     {
1402         /* Update virial with local values.
1403          * The virial is symmetric by definition.
1404          * this virial seems ok for isotropic scaling, but I'm
1405          * experiencing problems on semiisotropic membranes.
1406          * IS THAT COMMENT STILL VALID??? (DvdS, 2001/02/07).
1407          */
1408         vir[XX][XX] = 0.25*virxx;
1409         vir[YY][YY] = 0.25*viryy;
1410         vir[ZZ][ZZ] = 0.25*virzz;
1411         vir[XX][YY] = vir[YY][XX] = 0.25*virxy;
1412         vir[XX][ZZ] = vir[ZZ][XX] = 0.25*virxz;
1413         vir[YY][ZZ] = vir[ZZ][YY] = 0.25*viryz;
1414         
1415         /* This energy should be corrected for a charged system */
1416         *mesh_energy = 0.5*energy;
1417     }
1418
1419     /* Return the loop count */
1420     return local_ndata[YY]*local_ndata[ZZ]*local_ndata[XX];
1421 }
1422
1423
1424 #define DO_FSPLINE(order)                      \
1425 for(ithx=0; (ithx<order); ithx++)              \
1426 {                                                                          \
1427     index_x = (i0+ithx)*pny*pnz;               \
1428     tx      = thx[ithx];                       \
1429     dx      = dthx[ithx];                      \
1430                                                \
1431     for(ithy=0; (ithy<order); ithy++)          \
1432     {                                                                              \
1433         index_xy = index_x+(j0+ithy)*pnz;      \
1434         ty       = thy[ithy];                  \
1435         dy       = dthy[ithy];                 \
1436         fxy1     = fz1 = 0;                    \
1437                                                \
1438         for(ithz=0; (ithz<order); ithz++)      \
1439         {                                                                  \
1440             gval  = grid[index_xy+(k0+ithz)];  \
1441             fxy1 += thz[ithz]*gval;            \
1442             fz1  += dthz[ithz]*gval;           \
1443         }                                      \
1444         fx += dx*ty*fxy1;                      \
1445         fy += tx*dy*fxy1;                      \
1446         fz += tx*ty*fz1;                       \
1447     }                                          \
1448 }
1449
1450
1451 void gather_f_bsplines(gmx_pme_t pme,real *grid,
1452                        gmx_bool bClearF,pme_atomcomm_t *atc,real scale)
1453 {
1454     /* sum forces for local particles */  
1455     int     nn,n,ithx,ithy,ithz,i0,j0,k0;
1456     int     index_x,index_xy;
1457     int     nx,ny,nz,pnx,pny,pnz;
1458     int *   idxptr;
1459     real    tx,ty,dx,dy,qn;
1460     real    fx,fy,fz,gval;
1461     real    fxy1,fz1;
1462     real    *thx,*thy,*thz,*dthx,*dthy,*dthz;
1463     int     norder;
1464     real    rxx,ryx,ryy,rzx,rzy,rzz;
1465     int     order;
1466     
1467     order = pme->pme_order;
1468     thx   = atc->theta[XX];
1469     thy   = atc->theta[YY];
1470     thz   = atc->theta[ZZ];
1471     dthx  = atc->dtheta[XX];
1472     dthy  = atc->dtheta[YY];
1473     dthz  = atc->dtheta[ZZ];
1474     nx    = pme->nkx;
1475     ny    = pme->nky;
1476     nz    = pme->nkz;
1477     pnx   = pme->pmegrid_nx;
1478     pny   = pme->pmegrid_ny;
1479     pnz   = pme->pmegrid_nz;
1480     
1481     rxx   = pme->recipbox[XX][XX];
1482     ryx   = pme->recipbox[YY][XX];
1483     ryy   = pme->recipbox[YY][YY];
1484     rzx   = pme->recipbox[ZZ][XX];
1485     rzy   = pme->recipbox[ZZ][YY];
1486     rzz   = pme->recipbox[ZZ][ZZ];
1487
1488     for(nn=0; (nn<atc->n); nn++) 
1489     {
1490         n = nn;
1491         qn      = scale*atc->q[n];
1492         
1493         if (bClearF) 
1494         {
1495             atc->f[n][XX] = 0;
1496             atc->f[n][YY] = 0;
1497             atc->f[n][ZZ] = 0;
1498         }
1499         if (qn != 0) 
1500         {
1501             fx     = 0;
1502             fy     = 0;
1503             fz     = 0;
1504             idxptr = atc->idx[n];
1505             norder = n*order;
1506             
1507             i0   = idxptr[XX]; 
1508             j0   = idxptr[YY];
1509             k0   = idxptr[ZZ];
1510             
1511             /* Pointer arithmetic alert, next six statements */
1512             thx  = atc->theta[XX] + norder;
1513             thy  = atc->theta[YY] + norder;
1514             thz  = atc->theta[ZZ] + norder;
1515             dthx = atc->dtheta[XX] + norder;
1516             dthy = atc->dtheta[YY] + norder;
1517             dthz = atc->dtheta[ZZ] + norder;
1518             
1519             switch (order) {
1520             case 4:  DO_FSPLINE(4);     break;
1521             case 5:  DO_FSPLINE(5);     break;
1522             default: DO_FSPLINE(order); break;
1523             }
1524
1525             atc->f[n][XX] += -qn*( fx*nx*rxx );
1526             atc->f[n][YY] += -qn*( fx*nx*ryx + fy*ny*ryy );
1527             atc->f[n][ZZ] += -qn*( fx*nx*rzx + fy*ny*rzy + fz*nz*rzz );
1528         }
1529     }
1530     /* Since the energy and not forces are interpolated
1531      * the net force might not be exactly zero.
1532      * This can be solved by also interpolating F, but
1533      * that comes at a cost.
1534      * A better hack is to remove the net force every
1535      * step, but that must be done at a higher level
1536      * since this routine doesn't see all atoms if running
1537      * in parallel. Don't know how important it is?  EL 990726
1538      */
1539 }
1540
1541 static real gather_energy_bsplines(gmx_pme_t pme,real *grid,
1542                                    pme_atomcomm_t *atc)
1543 {
1544     int     n,ithx,ithy,ithz,i0,j0,k0;
1545     int     index_x,index_xy;
1546     int *   idxptr;
1547     real    energy,pot,tx,ty,qn,gval;
1548     real    *thx,*thy,*thz;
1549     int     norder;
1550     int     order;
1551     
1552     
1553     order = pme->pme_order;
1554     
1555     energy = 0;
1556     for(n=0; (n<atc->n); n++) {
1557         qn      = atc->q[n];
1558         
1559         if (qn != 0) {
1560             idxptr = atc->idx[n];
1561             norder = n*order;
1562             
1563             i0   = idxptr[XX]; 
1564             j0   = idxptr[YY];
1565             k0   = idxptr[ZZ];
1566             
1567             /* Pointer arithmetic alert, next three statements */
1568             thx  = atc->theta[XX] + norder;
1569             thy  = atc->theta[YY] + norder;
1570             thz  = atc->theta[ZZ] + norder;
1571
1572             pot = 0;
1573             for(ithx=0; (ithx<order); ithx++)
1574             {
1575                 index_x = (i0+ithx)*pme->pmegrid_ny*pme->pmegrid_nz;
1576                 tx      = thx[ithx];
1577
1578                 for(ithy=0; (ithy<order); ithy++)
1579                 {
1580                     index_xy = index_x+(j0+ithy)*pme->pmegrid_nz;
1581                     ty       = thy[ithy];
1582
1583                     for(ithz=0; (ithz<order); ithz++)
1584                     {
1585                         gval  = grid[index_xy+(k0+ithz)];
1586                         pot  += tx*ty*thz[ithz]*gval;
1587                     }
1588
1589                 }
1590             }
1591
1592             energy += pot*qn;
1593         }
1594     }
1595
1596     return energy;
1597 }
1598
1599 void make_bsplines(splinevec theta,splinevec dtheta,int order,
1600                    rvec fractx[],int nr,real charge[],
1601                    gmx_bool bFreeEnergy)
1602 {
1603     /* construct splines for local atoms */
1604     int  i,j,k,l;
1605     real dr,div;
1606     real *data,*ddata,*xptr;
1607     
1608     for(i=0; (i<nr); i++) {
1609         /* With free energy we do not use the charge check.
1610          * In most cases this will be more efficient than calling make_bsplines
1611          * twice, since usually more than half the particles have charges.
1612          */
1613         if (bFreeEnergy || charge[i] != 0.0) {
1614             xptr = fractx[i];
1615             for(j=0; (j<DIM); j++) {
1616                 dr  = xptr[j];
1617                 
1618                 /* dr is relative offset from lower cell limit */
1619                 data=&(theta[j][i*order]);
1620                 data[order-1]=0;
1621                 data[1]=dr;
1622                 data[0]=1-dr;
1623                 
1624                 for(k=3; (k<order); k++) {
1625                     div=1.0/(k-1.0);    
1626                     data[k-1]=div*dr*data[k-2];
1627                     for(l=1; (l<(k-1)); l++) {
1628                         data[k-l-1]=div*((dr+l)*data[k-l-2]+(k-l-dr)*
1629                                          data[k-l-1]);
1630                     }
1631                     data[0]=div*(1-dr)*data[0];
1632                 }
1633                 /* differentiate */
1634                 ddata    = &(dtheta[j][i*order]);
1635                 ddata[0] = -data[0];
1636                 for(k=1; (k<order); k++) {
1637                     ddata[k]=data[k-1]-data[k];
1638                 }
1639                 
1640                 div=1.0/(order-1);
1641                 data[order-1]=div*dr*data[order-2];
1642                 for(l=1; (l<(order-1)); l++) {
1643                     data[order-l-1]=div*((dr+l)*data[order-l-2]+
1644                                          (order-l-dr)*data[order-l-1]);
1645                 }
1646                 data[0]=div*(1-dr)*data[0]; 
1647             }
1648         }
1649     }
1650 }
1651
1652
1653 void make_dft_mod(real *mod,real *data,int ndata)
1654 {
1655   int i,j;
1656   real sc,ss,arg;
1657     
1658   for(i=0;i<ndata;i++) {
1659     sc=ss=0;
1660     for(j=0;j<ndata;j++) {
1661       arg=(2.0*M_PI*i*j)/ndata;
1662       sc+=data[j]*cos(arg);
1663       ss+=data[j]*sin(arg);
1664     }
1665     mod[i]=sc*sc+ss*ss;
1666   }
1667   for(i=0;i<ndata;i++)
1668     if(mod[i]<1e-7)
1669       mod[i]=(mod[i-1]+mod[i+1])*0.5;
1670 }
1671
1672
1673
1674 void make_bspline_moduli(splinevec bsp_mod,int nx,int ny,int nz,int order)
1675 {
1676   int nmax=max(nx,max(ny,nz));
1677   real *data,*ddata,*bsp_data;
1678   int i,k,l;
1679   real div;
1680     
1681   snew(data,order);
1682   snew(ddata,order);
1683   snew(bsp_data,nmax);
1684
1685   data[order-1]=0;
1686   data[1]=0;
1687   data[0]=1;
1688             
1689   for(k=3;k<order;k++) {
1690     div=1.0/(k-1.0);
1691     data[k-1]=0;
1692     for(l=1;l<(k-1);l++)
1693       data[k-l-1]=div*(l*data[k-l-2]+(k-l)*data[k-l-1]);
1694     data[0]=div*data[0];
1695   }
1696   /* differentiate */
1697   ddata[0]=-data[0];
1698   for(k=1;k<order;k++)
1699     ddata[k]=data[k-1]-data[k];
1700   div=1.0/(order-1);
1701   data[order-1]=0;
1702   for(l=1;l<(order-1);l++)
1703     data[order-l-1]=div*(l*data[order-l-2]+(order-l)*data[order-l-1]);
1704   data[0]=div*data[0]; 
1705
1706   for(i=0;i<nmax;i++)
1707     bsp_data[i]=0;
1708   for(i=1;i<=order;i++)
1709     bsp_data[i]=data[i-1];
1710     
1711   make_dft_mod(bsp_mod[XX],bsp_data,nx);
1712   make_dft_mod(bsp_mod[YY],bsp_data,ny);
1713   make_dft_mod(bsp_mod[ZZ],bsp_data,nz);
1714
1715   sfree(data);
1716   sfree(ddata);
1717   sfree(bsp_data);
1718 }
1719
1720 static void setup_coordinate_communication(pme_atomcomm_t *atc)
1721 {
1722   int nslab,n,i;
1723   int fw,bw;
1724
1725   nslab = atc->nslab;
1726
1727   n = 0;
1728   for(i=1; i<=nslab/2; i++) {
1729     fw = (atc->nodeid + i) % nslab;
1730     bw = (atc->nodeid - i + nslab) % nslab;
1731     if (n < nslab - 1) {
1732       atc->node_dest[n] = fw;
1733       atc->node_src[n]  = bw;
1734       n++;
1735     } 
1736     if (n < nslab - 1) {
1737       atc->node_dest[n] = bw;
1738       atc->node_src[n]  = fw;
1739       n++;
1740     }
1741   }
1742 }
1743
1744 int gmx_pme_destroy(FILE *log,gmx_pme_t *pmedata)
1745 {
1746     if(NULL != log)
1747     {
1748         fprintf(log,"Destroying PME data structures.\n");
1749     }
1750
1751     sfree((*pmedata)->nnx);
1752     sfree((*pmedata)->nny);
1753     sfree((*pmedata)->nnz);
1754         
1755     sfree((*pmedata)->pmegridA);
1756     sfree((*pmedata)->fftgridA);
1757     sfree((*pmedata)->cfftgridA);
1758     gmx_parallel_3dfft_destroy((*pmedata)->pfft_setupA);
1759     
1760     if((*pmedata)->pmegridB)
1761     {
1762         sfree((*pmedata)->pmegridB);
1763         sfree((*pmedata)->fftgridB);
1764         sfree((*pmedata)->cfftgridB);
1765         gmx_parallel_3dfft_destroy((*pmedata)->pfft_setupB);
1766     }
1767     sfree((*pmedata)->work_mhz);
1768     sfree((*pmedata)->work_m2);
1769     sfree((*pmedata)->work_denom);
1770     sfree((*pmedata)->work_tmp1_alloc);
1771     sfree((*pmedata)->work_m2inv);
1772         
1773     sfree(*pmedata);
1774     *pmedata = NULL;
1775   
1776   return 0;
1777 }
1778
1779 static int mult_up(int n,int f)
1780 {
1781     return ((n + f - 1)/f)*f;
1782 }
1783
1784
1785 static double pme_load_imbalance(gmx_pme_t pme)
1786 {
1787     int    nma,nmi;
1788     double n1,n2,n3;
1789
1790     nma = pme->nnodes_major;
1791     nmi = pme->nnodes_minor;
1792
1793     n1 = mult_up(pme->nkx,nma)*mult_up(pme->nky,nmi)*pme->nkz;
1794     n2 = mult_up(pme->nkx,nma)*mult_up(pme->nkz,nmi)*pme->nky;
1795     n3 = mult_up(pme->nky,nma)*mult_up(pme->nkz,nmi)*pme->nkx;
1796
1797     /* pme_solve is roughly double the cost of an fft */
1798
1799     return (n1 + n2 + 3*n3)/(double)(6*pme->nkx*pme->nky*pme->nkz);
1800 }
1801
1802 static void init_atomcomm(gmx_pme_t pme,pme_atomcomm_t *atc, t_commrec *cr,
1803                           int dimind,gmx_bool bSpread)
1804 {
1805     int nk,k,s;
1806
1807     atc->dimind = dimind;
1808     atc->nslab  = 1;
1809     atc->nodeid = 0;
1810     atc->pd_nalloc = 0;
1811 #ifdef GMX_MPI
1812     if (pme->nnodes > 1)
1813     {
1814         atc->mpi_comm = pme->mpi_comm_d[dimind];
1815         MPI_Comm_size(atc->mpi_comm,&atc->nslab);
1816         MPI_Comm_rank(atc->mpi_comm,&atc->nodeid);
1817     }
1818     if (debug)
1819     {
1820         fprintf(debug,"For PME atom communication in dimind %d: nslab %d rank %d\n",atc->dimind,atc->nslab,atc->nodeid);
1821     }
1822 #endif
1823
1824     atc->bSpread   = bSpread;
1825     atc->pme_order = pme->pme_order;
1826
1827     if (atc->nslab > 1)
1828     {
1829         /* These three allocations are not required for particle decomp. */
1830         snew(atc->node_dest,atc->nslab);
1831         snew(atc->node_src,atc->nslab);
1832         setup_coordinate_communication(atc);
1833         
1834         snew(atc->count,atc->nslab);
1835         snew(atc->rcount,atc->nslab);
1836         snew(atc->buf_index,atc->nslab);
1837     }
1838 }
1839
1840 static void 
1841 init_overlap_comm(pme_overlap_t *  ol,
1842                   int              norder,
1843 #ifdef GMX_MPI
1844                   MPI_Comm         comm,  
1845 #endif
1846                   int              nnodes, 
1847                   int              nodeid,
1848                   int              ndata)
1849 {
1850     int lbnd,rbnd,maxlr,b,i;
1851     int exten;
1852     int nn,nk;
1853     pme_grid_comm_t *pgc;
1854     gmx_bool bCont;
1855     int fft_start,fft_end,send_index1,recv_index1;
1856     
1857 #ifdef GMX_MPI
1858     ol->mpi_comm = comm;
1859 #endif
1860     
1861     ol->nnodes = nnodes;
1862     ol->nodeid = nodeid;
1863
1864     /* Linear translation of the PME grid wo'nt affect reciprocal space
1865      * calculations, so to optimize we only interpolate "upwards",
1866      * which also means we only have to consider overlap in one direction.
1867      * I.e., particles on this node might also be spread to grid indices
1868      * that belong to higher nodes (modulo nnodes)
1869      */
1870
1871     snew(ol->s2g0,ol->nnodes+1);
1872     snew(ol->s2g1,ol->nnodes);
1873     if (debug) { fprintf(debug,"PME slab boundaries:"); }
1874     for(i=0; i<nnodes; i++) 
1875     {
1876         /* s2g0 the local interpolation grid start.
1877          * s2g1 the local interpolation grid end.
1878          * Because grid overlap communication only goes forward,
1879          * the grid the slabs for fft's should be rounded down.
1880          */
1881         ol->s2g0[i] = ( i   *ndata + 0       )/nnodes;
1882         ol->s2g1[i] = ((i+1)*ndata + nnodes-1)/nnodes + norder - 1;
1883
1884         if (debug)
1885         {
1886             fprintf(debug,"  %3d %3d",ol->s2g0[i],ol->s2g1[i]);
1887         }
1888     }
1889     ol->s2g0[nnodes] = ndata;
1890     if (debug) { fprintf(debug,"\n"); }
1891
1892     /* Determine with how many nodes we need to communicate the grid overlap */
1893     b = 0;
1894     do
1895     {
1896         b++;
1897         bCont = FALSE;
1898         for(i=0; i<nnodes; i++)
1899         {
1900             if ((i+b <  nnodes && ol->s2g1[i] > ol->s2g0[i+b]) ||
1901                 (i+b >= nnodes && ol->s2g1[i] > ol->s2g0[i+b-nnodes] + ndata))
1902             {
1903                 bCont = TRUE;
1904             }
1905         }
1906     }
1907     while (bCont && b < nnodes);
1908     ol->noverlap_nodes = b - 1;
1909
1910     snew(ol->send_id,ol->noverlap_nodes);
1911     snew(ol->recv_id,ol->noverlap_nodes);
1912     for(b=0; b<ol->noverlap_nodes; b++)
1913     {
1914         ol->send_id[b] = (ol->nodeid + (b + 1)) % ol->nnodes;
1915         ol->recv_id[b] = (ol->nodeid - (b + 1) + ol->nnodes) % ol->nnodes;
1916     }
1917     snew(ol->comm_data, ol->noverlap_nodes);
1918
1919     for(b=0; b<ol->noverlap_nodes; b++)
1920     {
1921         pgc = &ol->comm_data[b];
1922         /* Send */
1923         fft_start        = ol->s2g0[ol->send_id[b]];
1924         fft_end          = ol->s2g0[ol->send_id[b]+1];
1925         if (ol->send_id[b] < nodeid)
1926         {
1927             fft_start += ndata;
1928             fft_end   += ndata;
1929         }
1930         send_index1      = ol->s2g1[nodeid];
1931         send_index1      = min(send_index1,fft_end);
1932         pgc->send_index0 = fft_start;
1933         pgc->send_nindex = max(0,send_index1 - pgc->send_index0);
1934
1935         /* We always start receiving to the first index of our slab */
1936         fft_start        = ol->s2g0[ol->nodeid];
1937         fft_end          = ol->s2g0[ol->nodeid+1];
1938         recv_index1      = ol->s2g1[ol->recv_id[b]];
1939         if (ol->recv_id[b] > nodeid)
1940         {
1941             recv_index1 -= ndata;
1942         }
1943         recv_index1      = min(recv_index1,fft_end);
1944         pgc->recv_index0 = fft_start;
1945         pgc->recv_nindex = max(0,recv_index1 - pgc->recv_index0);
1946     }
1947 }
1948
1949 static void
1950 make_gridindex5_to_localindex(int n,int local_start,int local_range,
1951                               int **global_to_local,
1952                               real **fraction_shift)
1953 {
1954     int i;
1955     int * gtl;
1956     real * fsh;
1957
1958     snew(gtl,5*n);
1959     snew(fsh,5*n);
1960     for(i=0; (i<5*n); i++)
1961     {
1962         /* Determine the global to local grid index */
1963         gtl[i] = (i - local_start + n) % n;
1964         /* For coordinates that fall within the local grid the fraction
1965          * is correct, we don't need to shift it.
1966          */
1967         fsh[i] = 0;
1968         if (local_range < n)
1969         {
1970             /* Due to rounding issues i could be 1 beyond the lower or
1971              * upper boundary of the local grid. Correct the index for this.
1972              * If we shift the index, we need to shift the fraction by
1973              * the same amount in the other direction to not affect
1974              * the weights.
1975              * Note that due to this shifting the weights at the end of
1976              * the spline might change, but that will only involve values
1977              * between zero and values close to the precision of a real,
1978              * which is anyhow the accuracy of the whole mesh calculation.
1979              */
1980             /* With local_range=0 we should not change i=local_start */
1981             if (i % n != local_start)
1982             {
1983                 if (gtl[i] == n-1)
1984                 {
1985                     gtl[i] = 0;
1986                     fsh[i] = -1; 
1987                 }
1988                 else if (gtl[i] == local_range)
1989                 {
1990                     gtl[i] = local_range - 1;
1991                     fsh[i] = 1;
1992                 }
1993             }
1994         }
1995     }
1996
1997     *global_to_local = gtl;
1998     *fraction_shift  = fsh;
1999 }
2000
2001 static void
2002 gmx_pme_check_grid_restrictions(FILE *fplog,char dim,int nnodes,int *nk)
2003 {
2004     int nk_new;
2005
2006     if (*nk % nnodes != 0)
2007     {
2008         nk_new = nnodes*(*nk/nnodes + 1);
2009
2010         if (2*nk_new >= 3*(*nk))
2011         {
2012             gmx_fatal(FARGS,"The PME grid size in dim %c (%d) is not divisble by the number of nodes doing PME in dim %c (%d). The grid size would have to be increased by more than 50%% to make the grid divisible. Change the total number of nodes or the number of domain decomposition cells in x or the PME grid %c dimension (and the cut-off).",
2013                       dim,*nk,dim,nnodes,dim);
2014         }
2015         
2016         if (fplog != NULL)
2017         {
2018             fprintf(fplog,"\nNOTE: The PME grid size in dim %c (%d) is not divisble by the number of nodes doing PME in dim %c (%d). Increasing the PME grid size in dim %c to %d. This will increase the accuracy and will not decrease the performance significantly on this number of nodes. For optimal performance change the total number of nodes or the number of domain decomposition cells in x or the PME grid %c dimension (and the cut-off).\n\n",
2019                     dim,*nk,dim,nnodes,dim,nk_new,dim);
2020         }
2021             
2022         *nk = nk_new;
2023     }
2024 }
2025
2026 int gmx_pme_init(gmx_pme_t *         pmedata,
2027                  t_commrec *         cr,
2028                  int                 nnodes_major,
2029                  int                 nnodes_minor,
2030                  t_inputrec *        ir,
2031                  int                 homenr,
2032                  gmx_bool                bFreeEnergy,
2033                  gmx_bool                bReproducible)
2034 {
2035     gmx_pme_t pme=NULL;
2036     
2037     pme_atomcomm_t *atc;
2038     int bufsizex,bufsizey,bufsize;
2039     ivec ndata;
2040     
2041     if (debug)
2042         fprintf(debug,"Creating PME data structures.\n");
2043     snew(pme,1);
2044         
2045     pme->redist_init         = FALSE;
2046     pme->sum_qgrid_tmp       = NULL;
2047     pme->sum_qgrid_dd_tmp    = NULL;
2048     pme->buf_nalloc          = 0;
2049     pme->redist_buf_nalloc   = 0;
2050     
2051     pme->nnodes              = 1;
2052     pme->bPPnode             = TRUE;
2053     
2054     pme->nnodes_major        = nnodes_major;
2055     pme->nnodes_minor        = nnodes_minor;
2056
2057 #ifdef GMX_MPI
2058     if (nnodes_major*nnodes_minor > 1 && PAR(cr)) 
2059     {
2060         pme->mpi_comm        = cr->mpi_comm_mygroup;
2061         
2062         MPI_Comm_rank(pme->mpi_comm,&pme->nodeid);
2063         MPI_Comm_size(pme->mpi_comm,&pme->nnodes);
2064         if (pme->nnodes != nnodes_major*nnodes_minor)
2065         {
2066             gmx_incons("PME node count mismatch");
2067         }
2068     }
2069 #endif
2070
2071     if (pme->nnodes == 1)
2072     {
2073         pme->ndecompdim = 0;
2074         pme->nodeid_major = 0;
2075         pme->nodeid_minor = 0;
2076     }
2077     else
2078     {
2079         if (nnodes_minor == 1)
2080         {
2081 #ifdef GMX_MPI
2082             pme->mpi_comm_d[0] = pme->mpi_comm;
2083             pme->mpi_comm_d[1] = NULL;
2084 #endif
2085             pme->ndecompdim = 1;
2086             pme->nodeid_major = pme->nodeid;
2087             pme->nodeid_minor = 0;
2088             
2089         }
2090         else if (nnodes_major == 1)
2091         {
2092 #ifdef GMX_MPI
2093             pme->mpi_comm_d[0] = NULL;
2094             pme->mpi_comm_d[1] = pme->mpi_comm;
2095 #endif
2096             pme->ndecompdim = 1;
2097             pme->nodeid_major = 0;
2098             pme->nodeid_minor = pme->nodeid;
2099         }
2100         else 
2101         {
2102             if (pme->nnodes % nnodes_major != 0)
2103             {
2104                 gmx_incons("For 2D PME decomposition, #PME nodes must be divisible by the number of nodes in the major dimension");
2105             }
2106             pme->ndecompdim = 2;
2107             
2108 #ifdef GMX_MPI
2109             MPI_Comm_split(pme->mpi_comm,pme->nodeid % nnodes_minor,
2110                            pme->nodeid,&pme->mpi_comm_d[0]);  /* My communicator along major dimension */
2111             MPI_Comm_split(pme->mpi_comm,pme->nodeid/nnodes_minor,
2112                            pme->nodeid,&pme->mpi_comm_d[1]);  /* My communicator along minor dimension */
2113             
2114             MPI_Comm_rank(pme->mpi_comm_d[0],&pme->nodeid_major);
2115             MPI_Comm_size(pme->mpi_comm_d[0],&pme->nnodes_major);
2116             MPI_Comm_rank(pme->mpi_comm_d[1],&pme->nodeid_minor);
2117             MPI_Comm_size(pme->mpi_comm_d[1],&pme->nnodes_minor);
2118 #endif
2119         }
2120         pme->bPPnode = (cr->duty & DUTY_PP);
2121     }
2122     
2123     if (ir->ePBC == epbcSCREW)
2124     {
2125         gmx_fatal(FARGS,"pme does not (yet) work with pbc = screw");
2126     }
2127     
2128     pme->bFEP        = ((ir->efep != efepNO) && bFreeEnergy);
2129     pme->nkx         = ir->nkx;
2130     pme->nky         = ir->nky;
2131     pme->nkz         = ir->nkz;
2132     pme->pme_order   = ir->pme_order;
2133     pme->epsilon_r   = ir->epsilon_r;
2134     
2135     /* Currently pme.c supports only the fft5d FFT code.
2136      * Therefore the grid always needs to be divisible by nnodes.
2137      * When the old 1D code is also supported again, change this check.
2138      *
2139      * This check should be done before calling gmx_pme_init
2140      * and fplog should be passed iso stderr.
2141      *
2142     if (pme->ndecompdim >= 2)
2143     */
2144     if (pme->ndecompdim >= 1)
2145     {
2146         /*
2147         gmx_pme_check_grid_restrictions(pme->nodeid==0 ? stderr : NULL,
2148                                         'x',nnodes_major,&pme->nkx);
2149         gmx_pme_check_grid_restrictions(pme->nodeid==0 ? stderr : NULL,
2150                                         'y',nnodes_minor,&pme->nky);
2151         */
2152     }
2153
2154     if (pme->nkx <= pme->pme_order*(pme->nnodes_major > 1 ? 2 : 1) ||
2155         pme->nky <= pme->pme_order*(pme->nnodes_minor > 1 ? 2 : 1) ||
2156         pme->nkz <= pme->pme_order)
2157     {
2158         gmx_fatal(FARGS,"The pme grid dimensions need to be larger than pme_order (%d) and in parallel larger than 2*pme_order for x and/or y",pme->pme_order);
2159     }
2160
2161     if (pme->nnodes > 1) {
2162         double imbal;
2163
2164 #ifdef GMX_MPI
2165         MPI_Type_contiguous(DIM, mpi_type, &(pme->rvec_mpi));
2166         MPI_Type_commit(&(pme->rvec_mpi));
2167 #endif
2168         
2169         /* Note that the charge spreading and force gathering, which usually
2170          * takes about the same amount of time as FFT+solve_pme,
2171          * is always fully load balanced
2172          * (unless the charge distribution is inhomogeneous).
2173          */
2174         
2175         imbal = pme_load_imbalance(pme);
2176         if (imbal >= 1.2 && pme->nodeid_major == 0 && pme->nodeid_minor == 0)
2177         {
2178             fprintf(stderr,
2179                     "\n"
2180                     "NOTE: The load imbalance in PME FFT and solve is %d%%.\n"
2181                     "      For optimal PME load balancing\n"
2182                     "      PME grid_x (%d) and grid_y (%d) should be divisible by #PME_nodes_x (%d)\n"
2183                     "      and PME grid_y (%d) and grid_z (%d) should be divisible by #PME_nodes_y (%d)\n"
2184                     "\n",
2185                     (int)((imbal-1)*100 + 0.5),
2186                     pme->nkx,pme->nky,pme->nnodes_major,
2187                     pme->nky,pme->nkz,pme->nnodes_minor);
2188         }
2189     }
2190
2191     init_overlap_comm(&pme->overlap[0],pme->pme_order,
2192 #ifdef GMX_MPI
2193                       pme->mpi_comm_d[0],
2194 #endif
2195                       pme->nnodes_major,pme->nodeid_major,pme->nkx);
2196     
2197     init_overlap_comm(&pme->overlap[1],pme->pme_order,
2198 #ifdef GMX_MPI
2199                       pme->mpi_comm_d[1],
2200 #endif
2201                       pme->nnodes_minor,pme->nodeid_minor,pme->nky);
2202     
2203     snew(pme->bsp_mod[XX],pme->nkx);
2204     snew(pme->bsp_mod[YY],pme->nky);
2205     snew(pme->bsp_mod[ZZ],pme->nkz);
2206     
2207     /* Allocate data for the interpolation grid, including overlap */
2208     pme->pmegrid_nx = pme->overlap[0].s2g1[pme->nodeid_major] -
2209                       pme->overlap[0].s2g0[pme->nodeid_major];
2210     pme->pmegrid_ny = pme->overlap[1].s2g1[pme->nodeid_minor] - 
2211                       pme->overlap[1].s2g0[pme->nodeid_minor];
2212     pme->pmegrid_nz = pme->nkz + pme->pme_order - 1;
2213     
2214     pme->pmegrid_start_ix = pme->overlap[0].s2g0[pme->nodeid_major];
2215     pme->pmegrid_start_iy = pme->overlap[1].s2g0[pme->nodeid_minor];
2216     pme->pmegrid_start_iz = 0;
2217     
2218     make_gridindex5_to_localindex(pme->nkx,
2219                                   pme->pmegrid_start_ix,
2220                                   pme->pmegrid_nx - (pme->pme_order-1),
2221                                   &pme->nnx,&pme->fshx);
2222     make_gridindex5_to_localindex(pme->nky,
2223                                   pme->pmegrid_start_iy,
2224                                   pme->pmegrid_ny - (pme->pme_order-1),
2225                                   &pme->nny,&pme->fshy);
2226     make_gridindex5_to_localindex(pme->nkz,
2227                                   pme->pmegrid_start_iz,
2228                                   pme->pmegrid_nz - (pme->pme_order-1),
2229                                   &pme->nnz,&pme->fshz);
2230     
2231     snew(pme->pmegridA,pme->pmegrid_nx*pme->pmegrid_ny*pme->pmegrid_nz);
2232     
2233     /* For non-divisible grid we need pme_order iso pme_order-1 */
2234     /* x overlap is copied in place: take padding into account.
2235      * y is always copied through a buffer: we don't need padding in z,
2236      * but we do need the overlap in x because of the communication order.
2237      */
2238     bufsizex = pme->pme_order*pme->pmegrid_ny*pme->pmegrid_nz;
2239     bufsizey = pme->pme_order*pme->pmegrid_nx*pme->nkz;
2240     bufsize  = (bufsizex>bufsizey) ? bufsizex : bufsizey;
2241     
2242     snew(pme->pmegrid_sendbuf,bufsize);
2243     snew(pme->pmegrid_recvbuf,bufsize);
2244     
2245     ndata[0] = pme->nkx;
2246     ndata[1] = pme->nky;
2247     ndata[2] = pme->nkz;
2248     
2249     /* This routine will allocate the grid data to fit the FFTs */
2250     gmx_parallel_3dfft_init(&pme->pfft_setupA,ndata,
2251                             &pme->fftgridA,&pme->cfftgridA,
2252                             pme->mpi_comm_d,
2253                             pme->overlap[0].s2g0,pme->overlap[1].s2g0,
2254                             bReproducible);
2255     
2256     if (bFreeEnergy)
2257     {
2258         snew(pme->pmegridB,pme->pmegrid_nx*pme->pmegrid_ny*pme->pmegrid_nz);    
2259         gmx_parallel_3dfft_init(&pme->pfft_setupB,ndata,
2260                                 &pme->fftgridB,&pme->cfftgridB,
2261                                 pme->mpi_comm_d,
2262                                 pme->overlap[0].s2g0,pme->overlap[1].s2g0,
2263                                 bReproducible);
2264     } else 
2265     {
2266         pme->pmegridB    = NULL;
2267         pme->fftgridB    = NULL;
2268         pme->cfftgridB   = NULL;
2269     }
2270     
2271     make_bspline_moduli(pme->bsp_mod,pme->nkx,pme->nky,pme->nkz,pme->pme_order);
2272     
2273     /* Use atc[0] for spreading */
2274     init_atomcomm(pme,&pme->atc[0],cr,nnodes_major > 1 ? 0 : 1,TRUE);
2275     if (pme->ndecompdim >= 2)
2276     {
2277         init_atomcomm(pme,&pme->atc[1],cr,1,FALSE);
2278     }
2279     
2280     if (pme->nnodes == 1) {
2281         pme->atc[0].n = homenr;
2282         pme_realloc_atomcomm_things(&pme->atc[0]);
2283     }
2284     
2285     /* Use fft5d, order after FFT is y major, z, x minor */
2286     pme->work_nalloc = pme->nkx;
2287     snew(pme->work_mhx,pme->work_nalloc);
2288     snew(pme->work_mhy,pme->work_nalloc);
2289     snew(pme->work_mhz,pme->work_nalloc);
2290     snew(pme->work_m2,pme->work_nalloc);
2291     snew(pme->work_denom,pme->work_nalloc);
2292     /* Allocate an aligned pointer for SSE operations, including 3 extra
2293      * elements at the end since SSE operates on 4 elements at a time.
2294      */
2295     snew(pme->work_tmp1_alloc,pme->work_nalloc+8);
2296     pme->work_tmp1 = (real *) (((size_t) pme->work_tmp1_alloc + 16) & (~((size_t) 15)));
2297     snew(pme->work_m2inv,pme->work_nalloc);
2298
2299     *pmedata = pme;
2300     
2301     return 0;
2302 }
2303
2304 static void spread_on_grid(gmx_pme_t pme,
2305                            pme_atomcomm_t *atc,real *grid,
2306                            gmx_bool bCalcSplines,gmx_bool bSpread)
2307 {    
2308     if (bCalcSplines)
2309     {
2310     
2311         /* Compute fftgrid index for all atoms,
2312          * with help of some extra variables.
2313          */
2314         calc_interpolation_idx(pme,atc);
2315         
2316         /* make local bsplines  */
2317         make_bsplines(atc->theta,atc->dtheta,pme->pme_order,
2318                       atc->fractx,atc->n,atc->q,pme->bFEP);
2319     }    
2320     
2321     if (bSpread)
2322     {
2323         /* put local atoms on grid. */
2324         spread_q_bsplines(pme,atc,grid);
2325     }
2326 }
2327
2328 void gmx_pme_calc_energy(gmx_pme_t pme,int n,rvec *x,real *q,real *V)
2329 {
2330     pme_atomcomm_t *atc;
2331     real *grid;
2332
2333     if (pme->nnodes > 1)
2334     {
2335         gmx_incons("gmx_pme_calc_energy called in parallel");
2336     }
2337     if (pme->bFEP > 1)
2338     {
2339         gmx_incons("gmx_pme_calc_energy with free energy");
2340     }
2341
2342     atc = &pme->atc_energy;
2343     atc->nslab     = 1;
2344     atc->bSpread   = TRUE;
2345     atc->pme_order = pme->pme_order;
2346     atc->n         = n;
2347     pme_realloc_atomcomm_things(atc);
2348     atc->x         = x;
2349     atc->q         = q;
2350     
2351     /* We only use the A-charges grid */
2352     grid = pme->pmegridA;
2353
2354     spread_on_grid(pme,atc,NULL,TRUE,FALSE);
2355
2356     *V = gather_energy_bsplines(pme,grid,atc);
2357 }
2358
2359
2360 static void reset_pmeonly_counters(t_commrec *cr,gmx_wallcycle_t wcycle,
2361         t_nrnb *nrnb,t_inputrec *ir, gmx_large_int_t step_rel)
2362 {
2363     /* Reset all the counters related to performance over the run */
2364     wallcycle_stop(wcycle,ewcRUN);
2365     wallcycle_reset_all(wcycle);
2366     init_nrnb(nrnb);
2367     ir->init_step += step_rel;
2368     ir->nsteps    -= step_rel;
2369     wallcycle_start(wcycle,ewcRUN);
2370 }
2371
2372
2373 int gmx_pmeonly(gmx_pme_t pme,
2374                 t_commrec *cr,    t_nrnb *nrnb,
2375                 gmx_wallcycle_t wcycle,
2376                 real ewaldcoeff,  gmx_bool bGatherOnly,
2377                 t_inputrec *ir)
2378 {
2379     gmx_pme_pp_t pme_pp;
2380     int  natoms;
2381     matrix box;
2382     rvec *x_pp=NULL,*f_pp=NULL;
2383     real *chargeA=NULL,*chargeB=NULL;
2384     real lambda=0;
2385     int  maxshift_x=0,maxshift_y=0;
2386     real energy,dvdlambda;
2387     matrix vir;
2388     float cycles;
2389     int  count;
2390     gmx_bool bEnerVir;
2391     gmx_large_int_t step,step_rel;
2392     
2393     
2394     pme_pp = gmx_pme_pp_init(cr);
2395     
2396     init_nrnb(nrnb);
2397     
2398     count = 0;
2399     do /****** this is a quasi-loop over time steps! */
2400     {
2401         /* Domain decomposition */
2402         natoms = gmx_pme_recv_q_x(pme_pp,
2403                                   &chargeA,&chargeB,box,&x_pp,&f_pp,
2404                                   &maxshift_x,&maxshift_y,
2405                                   &pme->bFEP,&lambda,
2406                                   &bEnerVir,
2407                                   &step);
2408         
2409         if (natoms == -1) {
2410             /* We should stop: break out of the loop */
2411             break;
2412         }
2413         
2414         step_rel = step - ir->init_step;
2415         
2416         if (count == 0)
2417             wallcycle_start(wcycle,ewcRUN);
2418         
2419         wallcycle_start(wcycle,ewcPMEMESH);
2420         
2421         dvdlambda = 0;
2422         clear_mat(vir);
2423         gmx_pme_do(pme,0,natoms,x_pp,f_pp,chargeA,chargeB,box,
2424                    cr,maxshift_x,maxshift_y,nrnb,wcycle,vir,ewaldcoeff,
2425                    &energy,lambda,&dvdlambda,
2426                    GMX_PME_DO_ALL_F | (bEnerVir ? GMX_PME_CALC_ENER_VIR : 0));
2427         
2428         cycles = wallcycle_stop(wcycle,ewcPMEMESH);
2429         
2430         gmx_pme_send_force_vir_ener(pme_pp,
2431                                     f_pp,vir,energy,dvdlambda,
2432                                     cycles);
2433         
2434         count++;
2435
2436         if (step_rel == wcycle_get_reset_counters(wcycle))
2437         {
2438             /* Reset all the counters related to performance over the run */
2439             reset_pmeonly_counters(cr,wcycle,nrnb,ir,step_rel);
2440             wcycle_set_reset_counters(wcycle, 0);
2441         }
2442         
2443     } /***** end of quasi-loop, we stop with the break above */
2444     while (TRUE);
2445     
2446     return 0;
2447 }
2448
2449 int gmx_pme_do(gmx_pme_t pme,
2450                int start,       int homenr,
2451                rvec x[],        rvec f[],
2452                real *chargeA,   real *chargeB,
2453                matrix box,      t_commrec *cr,
2454                int  maxshift_x, int maxshift_y,
2455                t_nrnb *nrnb,    gmx_wallcycle_t wcycle,
2456                matrix vir,      real ewaldcoeff,
2457                real *energy,    real lambda, 
2458                real *dvdlambda, int flags)
2459 {
2460     int     q,d,i,j,ntot,npme;
2461     int     nx,ny,nz;
2462     int     n_d,local_ny;
2463     int     loop_count;
2464     pme_atomcomm_t *atc=NULL;
2465     real *  grid=NULL;
2466     real    *ptr;
2467     rvec    *x_d,*f_d;
2468     real    *charge=NULL,*q_d,vol;
2469     real    energy_AB[2];
2470     matrix  vir_AB[2];
2471     gmx_bool    bClearF;
2472     gmx_parallel_3dfft_t pfft_setup;
2473     real *  fftgrid;
2474     t_complex * cfftgrid;
2475
2476     if (pme->nnodes > 1) {
2477         atc = &pme->atc[0];
2478         atc->npd = homenr;
2479         if (atc->npd > atc->pd_nalloc) {
2480             atc->pd_nalloc = over_alloc_dd(atc->npd);
2481             srenew(atc->pd,atc->pd_nalloc);
2482         }
2483         atc->maxshift = (atc->dimind==0 ? maxshift_x : maxshift_y);
2484     }
2485     else
2486     {
2487         /* This could be necessary for TPI */
2488         pme->atc[0].n = homenr;
2489     }
2490     
2491     for(q=0; q<(pme->bFEP ? 2 : 1); q++) {
2492         if (q == 0) {
2493             grid = pme->pmegridA;
2494             fftgrid = pme->fftgridA;
2495             cfftgrid = pme->cfftgridA;
2496             pfft_setup = pme->pfft_setupA;
2497             charge = chargeA+start;
2498         } else {
2499             grid = pme->pmegridB;
2500             fftgrid = pme->fftgridB;
2501             cfftgrid = pme->cfftgridB;
2502             pfft_setup = pme->pfft_setupB;
2503             charge = chargeB+start;
2504         }
2505         /* Unpack structure */
2506         if (debug) {
2507             fprintf(debug,"PME: nnodes = %d, nodeid = %d\n",
2508                     cr->nnodes,cr->nodeid);
2509             fprintf(debug,"Grid = %p\n",(void*)grid);
2510             if (grid == NULL)
2511                 gmx_fatal(FARGS,"No grid!");
2512         }
2513         where();
2514         
2515         m_inv_ur0(box,pme->recipbox); 
2516
2517         if (pme->nnodes == 1) {
2518             atc = &pme->atc[0];
2519             if (DOMAINDECOMP(cr)) {
2520                 atc->n = homenr;
2521                 pme_realloc_atomcomm_things(atc);
2522             }
2523             atc->x = x;
2524             atc->q = charge;
2525             atc->f = f;
2526         } else {
2527             wallcycle_start(wcycle,ewcPME_REDISTXF);
2528             for(d=pme->ndecompdim-1; d>=0; d--)
2529             {
2530                 if (d == pme->ndecompdim-1)
2531                 {
2532                     n_d = homenr;
2533                     x_d = x + start;
2534                     q_d = charge;
2535                 }
2536                 else
2537                 {
2538                     n_d = pme->atc[d+1].n;
2539                     x_d = atc->x;
2540                     q_d = atc->q;
2541                 }
2542                 atc = &pme->atc[d];
2543                 atc->npd = n_d;
2544                 if (atc->npd > atc->pd_nalloc) {
2545                     atc->pd_nalloc = over_alloc_dd(atc->npd);
2546                     srenew(atc->pd,atc->pd_nalloc);
2547                 }
2548                 atc->maxshift = (atc->dimind==0 ? maxshift_x : maxshift_y);
2549                 pme_calc_pidx(n_d,pme->recipbox,x_d,atc);
2550                 where();
2551                 
2552                 /* Redistribute x (only once) and qA or qB */
2553                 if (DOMAINDECOMP(cr)) {
2554                     dd_pmeredist_x_q(pme, n_d, q==0, x_d, q_d, atc);
2555                 } else {
2556                     pmeredist_pd(pme, TRUE, n_d, q==0, x_d, q_d, atc);
2557                 }
2558             }
2559             where();
2560
2561             wallcycle_stop(wcycle,ewcPME_REDISTXF);
2562         }
2563         
2564         if (debug)
2565             fprintf(debug,"Node= %6d, pme local particles=%6d\n",
2566                     cr->nodeid,atc->n);
2567
2568         if (flags & GMX_PME_SPREAD_Q)
2569         {
2570             wallcycle_start(wcycle,ewcPME_SPREADGATHER);
2571
2572             /* Spread the charges on a grid */
2573             spread_on_grid(pme,&pme->atc[0],grid,q==0,TRUE);
2574
2575             if (q == 0)
2576             {
2577                 inc_nrnb(nrnb,eNR_WEIGHTS,DIM*atc->n);
2578             }
2579             inc_nrnb(nrnb,eNR_SPREADQBSP,
2580                      pme->pme_order*pme->pme_order*pme->pme_order*atc->n);
2581
2582             wrap_periodic_pmegrid(pme,grid);
2583
2584             /* sum contributions to local grid from other nodes */
2585 #ifdef GMX_MPI
2586             if (pme->nnodes > 1) {
2587                 gmx_sum_qgrid_dd(pme,grid,GMX_SUM_QGRID_FORWARD);
2588                 where();
2589             }
2590 #endif
2591             where();
2592
2593             copy_pmegrid_to_fftgrid(pme,grid,fftgrid);
2594
2595             wallcycle_stop(wcycle,ewcPME_SPREADGATHER);
2596         }
2597          
2598         if (flags & GMX_PME_SOLVE)
2599         {
2600             /* do 3d-fft */ 
2601             wallcycle_start(wcycle,ewcPME_FFT);
2602             gmx_parallel_3dfft_execute(pfft_setup,GMX_FFT_REAL_TO_COMPLEX,fftgrid,cfftgrid);
2603             wallcycle_stop(wcycle,ewcPME_FFT);
2604             where();
2605             
2606             /* solve in k-space for our local cells */
2607             vol = det(box);
2608             wallcycle_start(wcycle,ewcPME_SOLVE);
2609             loop_count =
2610                 solve_pme_yzx(pme,cfftgrid,ewaldcoeff,vol,
2611                               flags & GMX_PME_CALC_ENER_VIR,
2612                               &energy_AB[q],vir_AB[q]);
2613             wallcycle_stop(wcycle,ewcPME_SOLVE);
2614             where();
2615             inc_nrnb(nrnb,eNR_SOLVEPME,loop_count);
2616         }
2617
2618         if ((flags & GMX_PME_CALC_F) ||
2619             (flags & GMX_PME_CALC_POT))
2620         {
2621             
2622             /* do 3d-invfft */
2623             where();
2624             wallcycle_start(wcycle,ewcPME_FFT);
2625             gmx_parallel_3dfft_execute(pfft_setup,GMX_FFT_COMPLEX_TO_REAL,cfftgrid,fftgrid);
2626             wallcycle_stop(wcycle,ewcPME_FFT);
2627
2628             where();
2629
2630             if (pme->nodeid == 0)
2631             {
2632                 ntot = pme->nkx*pme->nky*pme->nkz;
2633                 npme  = ntot*log((real)ntot)/log(2.0);
2634                 inc_nrnb(nrnb,eNR_FFT,2*npme);
2635             }
2636
2637             wallcycle_start(wcycle,ewcPME_SPREADGATHER);
2638
2639             copy_fftgrid_to_pmegrid(pme,fftgrid,grid);
2640
2641             /* distribute local grid to all nodes */
2642 #ifdef GMX_MPI
2643             if (pme->nnodes > 1) {
2644                 gmx_sum_qgrid_dd(pme,grid,GMX_SUM_QGRID_BACKWARD);
2645             }
2646 #endif
2647             where();
2648
2649             unwrap_periodic_pmegrid(pme,grid);
2650         }
2651
2652         if (flags & GMX_PME_CALC_F)
2653         {
2654             /* interpolate forces for our local atoms */
2655
2656             where();
2657             
2658             /* If we are running without parallelization,
2659              * atc->f is the actual force array, not a buffer,
2660              * therefore we should not clear it.
2661              */
2662             bClearF = (q == 0 && PAR(cr));
2663             gather_f_bsplines(pme,grid,bClearF,&pme->atc[0],
2664                               pme->bFEP ? (q==0 ? 1.0-lambda : lambda) : 1.0);
2665             where();
2666
2667             inc_nrnb(nrnb,eNR_GATHERFBSP,
2668                      pme->pme_order*pme->pme_order*pme->pme_order*pme->atc[0].n);
2669             wallcycle_stop(wcycle,ewcPME_SPREADGATHER);
2670        }
2671     } /* of q-loop */
2672     
2673     if ((flags & GMX_PME_CALC_F) && pme->nnodes > 1) {
2674         wallcycle_start(wcycle,ewcPME_REDISTXF);
2675         for(d=0; d<pme->ndecompdim; d++)
2676         {
2677             atc = &pme->atc[d];
2678             if (d == pme->ndecompdim - 1)
2679             {
2680                 n_d = homenr;
2681                 f_d = f + start;
2682             }
2683             else
2684             {
2685                 n_d = pme->atc[d+1].n;
2686                 f_d = pme->atc[d+1].f;
2687             }
2688             if (DOMAINDECOMP(cr)) {
2689                 dd_pmeredist_f(pme,atc,n_d,f_d,
2690                                d==pme->ndecompdim-1 && pme->bPPnode);
2691             } else {
2692                 pmeredist_pd(pme, FALSE, n_d, TRUE, f_d, NULL, atc);
2693             }
2694         }
2695
2696         wallcycle_stop(wcycle,ewcPME_REDISTXF);
2697     }
2698     where();
2699     
2700     if (!pme->bFEP) {
2701         *energy = energy_AB[0];
2702         m_add(vir,vir_AB[0],vir);
2703     } else {
2704         *energy = (1.0-lambda)*energy_AB[0] + lambda*energy_AB[1];
2705         *dvdlambda += energy_AB[1] - energy_AB[0];
2706         for(i=0; i<DIM; i++)
2707             for(j=0; j<DIM; j++)
2708                 vir[i][j] += (1.0-lambda)*vir_AB[0][i][j] + lambda*vir_AB[1][i][j];
2709     }
2710
2711     if (debug)
2712         fprintf(debug,"PME mesh energy: %g\n",*energy);
2713     
2714     return 0;
2715 }