Clang-tidy: enable further tests
[alexxy/gromacs.git] / src / gromacs / domdec / domdec.cpp
1 /*
2  * This file is part of the GROMACS molecular simulation package.
3  *
4  * Copyright (c) 2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017,2018, by the GROMACS development team, led by
5  * Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6  * and including many others, as listed in the AUTHORS file in the
7  * top-level source directory and at http://www.gromacs.org.
8  *
9  * GROMACS is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public License
11  * as published by the Free Software Foundation; either version 2.1
12  * of the License, or (at your option) any later version.
13  *
14  * GROMACS is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Lesser General Public License for more details.
18  *
19  * You should have received a copy of the GNU Lesser General Public
20  * License along with GROMACS; if not, see
21  * http://www.gnu.org/licenses, or write to the Free Software Foundation,
22  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
23  *
24  * If you want to redistribute modifications to GROMACS, please
25  * consider that scientific software is very special. Version
26  * control is crucial - bugs must be traceable. We will be happy to
27  * consider code for inclusion in the official distribution, but
28  * derived work must not be called official GROMACS. Details are found
29  * in the README & COPYING files - if they are missing, get the
30  * official version at http://www.gromacs.org.
31  *
32  * To help us fund GROMACS development, we humbly ask that you cite
33  * the research papers on the package. Check out http://www.gromacs.org.
34  */
35
36 #include "gmxpre.h"
37
38 #include "domdec.h"
39
40 #include "config.h"
41
42 #include <assert.h>
43 #include <limits.h>
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <string.h>
47
48 #include <cmath>
49
50 #include <algorithm>
51
52 #include "gromacs/compat/make_unique.h"
53 #include "gromacs/domdec/collect.h"
54 #include "gromacs/domdec/dlbtiming.h"
55 #include "gromacs/domdec/domdec_network.h"
56 #include "gromacs/domdec/ga2la.h"
57 #include "gromacs/ewald/pme.h"
58 #include "gromacs/fileio/gmxfio.h"
59 #include "gromacs/fileio/pdbio.h"
60 #include "gromacs/gmxlib/chargegroup.h"
61 #include "gromacs/gmxlib/network.h"
62 #include "gromacs/gmxlib/nrnb.h"
63 #include "gromacs/gpu_utils/gpu_utils.h"
64 #include "gromacs/hardware/hw_info.h"
65 #include "gromacs/imd/imd.h"
66 #include "gromacs/listed-forces/manage-threading.h"
67 #include "gromacs/math/functions.h"
68 #include "gromacs/math/vec.h"
69 #include "gromacs/math/vectypes.h"
70 #include "gromacs/mdlib/constr.h"
71 #include "gromacs/mdlib/constraintrange.h"
72 #include "gromacs/mdlib/forcerec.h"
73 #include "gromacs/mdlib/gmx_omp_nthreads.h"
74 #include "gromacs/mdlib/lincs.h"
75 #include "gromacs/mdlib/mdatoms.h"
76 #include "gromacs/mdlib/mdrun.h"
77 #include "gromacs/mdlib/mdsetup.h"
78 #include "gromacs/mdlib/nb_verlet.h"
79 #include "gromacs/mdlib/nbnxn_grid.h"
80 #include "gromacs/mdlib/nsgrid.h"
81 #include "gromacs/mdlib/vsite.h"
82 #include "gromacs/mdtypes/commrec.h"
83 #include "gromacs/mdtypes/df_history.h"
84 #include "gromacs/mdtypes/forcerec.h"
85 #include "gromacs/mdtypes/inputrec.h"
86 #include "gromacs/mdtypes/md_enums.h"
87 #include "gromacs/mdtypes/mdatom.h"
88 #include "gromacs/mdtypes/nblist.h"
89 #include "gromacs/mdtypes/state.h"
90 #include "gromacs/pbcutil/ishift.h"
91 #include "gromacs/pbcutil/pbc.h"
92 #include "gromacs/pulling/pull.h"
93 #include "gromacs/pulling/pull_rotation.h"
94 #include "gromacs/swap/swapcoords.h"
95 #include "gromacs/timing/wallcycle.h"
96 #include "gromacs/topology/block.h"
97 #include "gromacs/topology/idef.h"
98 #include "gromacs/topology/ifunc.h"
99 #include "gromacs/topology/mtop_lookup.h"
100 #include "gromacs/topology/mtop_util.h"
101 #include "gromacs/topology/topology.h"
102 #include "gromacs/utility/basedefinitions.h"
103 #include "gromacs/utility/basenetwork.h"
104 #include "gromacs/utility/cstringutil.h"
105 #include "gromacs/utility/exceptions.h"
106 #include "gromacs/utility/fatalerror.h"
107 #include "gromacs/utility/gmxmpi.h"
108 #include "gromacs/utility/qsort_threadsafe.h"
109 #include "gromacs/utility/real.h"
110 #include "gromacs/utility/smalloc.h"
111 #include "gromacs/utility/stringutil.h"
112
113 #include "atomdistribution.h"
114 #include "cellsizes.h"
115 #include "distribute.h"
116 #include "domdec_constraints.h"
117 #include "domdec_internal.h"
118 #include "domdec_vsite.h"
119 #include "redistribute.h"
120 #include "utility.h"
121
122 #define DD_NLOAD_MAX 9
123
124 const char *edlbs_names[edlbsNR] = { "off", "auto", "locked", "on", "on" };
125
126 /* The size per charge group of the cggl_flag buffer in gmx_domdec_comm_t */
127 #define DD_CGIBS 2
128
129 /* The flags for the cggl_flag buffer in gmx_domdec_comm_t */
130 #define DD_FLAG_NRCG  65535
131 #define DD_FLAG_FW(d) (1<<(16+(d)*2))
132 #define DD_FLAG_BW(d) (1<<(16+(d)*2+1))
133
134 /* The DD zone order */
135 static const ivec dd_zo[DD_MAXZONE] =
136 {{0, 0, 0}, {1, 0, 0}, {1, 1, 0}, {0, 1, 0}, {0, 1, 1}, {0, 0, 1}, {1, 0, 1}, {1, 1, 1}};
137
138 /* The non-bonded zone-pair setup for domain decomposition
139  * The first number is the i-zone, the second number the first j-zone seen by
140  * this i-zone, the third number the last+1 j-zone seen by this i-zone.
141  * As is, this is for 3D decomposition, where there are 4 i-zones.
142  * With 2D decomposition use only the first 2 i-zones and a last+1 j-zone of 4.
143  * With 1D decomposition use only the first i-zone and a last+1 j-zone of 2.
144  */
145 static const int
146     ddNonbondedZonePairRanges[DD_MAXIZONE][3] = {{0, 0, 8},
147                                                  {1, 3, 6},
148                                                  {2, 5, 6},
149                                                  {3, 5, 7}};
150
151 /* Turn on DLB when the load imbalance causes this amount of total loss.
152  * There is a bit of overhead with DLB and it's difficult to achieve
153  * a load imbalance of less than 2% with DLB.
154  */
155 #define DD_PERF_LOSS_DLB_ON  0.02
156
157 /* Warn about imbalance due to PP or PP/PME load imbalance at this loss */
158 #define DD_PERF_LOSS_WARN    0.05
159
160
161 /* We check if to turn on DLB at the first and every 100 DD partitionings.
162  * With large imbalance DLB will turn on at the first step, so we can
163  * make the interval so large that the MPI overhead of the check is negligible.
164  */
165 static const int c_checkTurnDlbOnInterval  = 100;
166 /* We need to check if DLB results in worse performance and then turn it off.
167  * We check this more often then for turning DLB on, because the DLB can scale
168  * the domains very rapidly, so if unlucky the load imbalance can go up quickly
169  * and furthermore, we are already synchronizing often with DLB, so
170  * the overhead of the MPI Bcast is not that high.
171  */
172 static const int c_checkTurnDlbOffInterval =  20;
173
174 /* Forward declaration */
175 static void dd_dlb_set_should_check_whether_to_turn_dlb_on(gmx_domdec_t *dd, gmx_bool bValue);
176
177
178 /*
179    #define dd_index(n,i) ((((i)[ZZ]*(n)[YY] + (i)[YY])*(n)[XX]) + (i)[XX])
180
181    static void index2xyz(ivec nc,int ind,ivec xyz)
182    {
183    xyz[XX] = ind % nc[XX];
184    xyz[YY] = (ind / nc[XX]) % nc[YY];
185    xyz[ZZ] = ind / (nc[YY]*nc[XX]);
186    }
187  */
188
189 static void ddindex2xyz(ivec nc, int ind, ivec xyz)
190 {
191     xyz[XX] = ind / (nc[YY]*nc[ZZ]);
192     xyz[YY] = (ind / nc[ZZ]) % nc[YY];
193     xyz[ZZ] = ind % nc[ZZ];
194 }
195
196 static int ddcoord2ddnodeid(gmx_domdec_t *dd, ivec c)
197 {
198     int ddindex;
199     int ddnodeid = -1;
200
201     ddindex = dd_index(dd->nc, c);
202     if (dd->comm->bCartesianPP_PME)
203     {
204         ddnodeid = dd->comm->ddindex2ddnodeid[ddindex];
205     }
206     else if (dd->comm->bCartesianPP)
207     {
208 #if GMX_MPI
209         MPI_Cart_rank(dd->mpi_comm_all, c, &ddnodeid);
210 #endif
211     }
212     else
213     {
214         ddnodeid = ddindex;
215     }
216
217     return ddnodeid;
218 }
219
220 static gmx_bool dynamic_dd_box(const gmx_ddbox_t *ddbox, const t_inputrec *ir)
221 {
222     return (ddbox->nboundeddim < DIM || inputrecDynamicBox(ir));
223 }
224
225 int ddglatnr(const gmx_domdec_t *dd, int i)
226 {
227     int atnr;
228
229     if (dd == nullptr)
230     {
231         atnr = i + 1;
232     }
233     else
234     {
235         if (i >= dd->comm->atomRanges.numAtomsTotal())
236         {
237             gmx_fatal(FARGS, "glatnr called with %d, which is larger than the local number of atoms (%d)", i, dd->comm->atomRanges.numAtomsTotal());
238         }
239         atnr = dd->globalAtomIndices[i] + 1;
240     }
241
242     return atnr;
243 }
244
245 t_block *dd_charge_groups_global(gmx_domdec_t *dd)
246 {
247     return &dd->comm->cgs_gl;
248 }
249
250 void dd_store_state(gmx_domdec_t *dd, t_state *state)
251 {
252     int i;
253
254     if (state->ddp_count != dd->ddp_count)
255     {
256         gmx_incons("The MD state does not match the domain decomposition state");
257     }
258
259     state->cg_gl.resize(dd->ncg_home);
260     for (i = 0; i < dd->ncg_home; i++)
261     {
262         state->cg_gl[i] = dd->globalAtomGroupIndices[i];
263     }
264
265     state->ddp_count_cg_gl = dd->ddp_count;
266 }
267
268 gmx_domdec_zones_t *domdec_zones(gmx_domdec_t *dd)
269 {
270     return &dd->comm->zones;
271 }
272
273 void dd_get_ns_ranges(const gmx_domdec_t *dd, int icg,
274                       int *jcg0, int *jcg1, ivec shift0, ivec shift1)
275 {
276     gmx_domdec_zones_t *zones;
277     int                 izone, d, dim;
278
279     zones = &dd->comm->zones;
280
281     izone = 0;
282     while (icg >= zones->izone[izone].cg1)
283     {
284         izone++;
285     }
286
287     if (izone == 0)
288     {
289         *jcg0 = icg;
290     }
291     else if (izone < zones->nizone)
292     {
293         *jcg0 = zones->izone[izone].jcg0;
294     }
295     else
296     {
297         gmx_fatal(FARGS, "DD icg %d out of range: izone (%d) >= nizone (%d)",
298                   icg, izone, zones->nizone);
299     }
300
301     *jcg1 = zones->izone[izone].jcg1;
302
303     for (d = 0; d < dd->ndim; d++)
304     {
305         dim         = dd->dim[d];
306         shift0[dim] = zones->izone[izone].shift0[dim];
307         shift1[dim] = zones->izone[izone].shift1[dim];
308         if (dd->comm->tric_dir[dim] || (isDlbOn(dd->comm) && d > 0))
309         {
310             /* A conservative approach, this can be optimized */
311             shift0[dim] -= 1;
312             shift1[dim] += 1;
313         }
314     }
315 }
316
317 int dd_numHomeAtoms(const gmx_domdec_t &dd)
318 {
319     return dd.comm->atomRanges.numHomeAtoms();
320 }
321
322 int dd_natoms_mdatoms(const gmx_domdec_t *dd)
323 {
324     /* We currently set mdatoms entries for all atoms:
325      * local + non-local + communicated for vsite + constraints
326      */
327
328     return dd->comm->atomRanges.numAtomsTotal();
329 }
330
331 int dd_natoms_vsite(const gmx_domdec_t *dd)
332 {
333     return dd->comm->atomRanges.end(DDAtomRanges::Type::Vsites);
334 }
335
336 void dd_get_constraint_range(const gmx_domdec_t *dd, int *at_start, int *at_end)
337 {
338     *at_start = dd->comm->atomRanges.start(DDAtomRanges::Type::Constraints);
339     *at_end   = dd->comm->atomRanges.end(DDAtomRanges::Type::Constraints);
340 }
341
342 void dd_move_x(gmx_domdec_t             *dd,
343                matrix                    box,
344                gmx::ArrayRef<gmx::RVec>  x,
345                gmx_wallcycle            *wcycle)
346 {
347     wallcycle_start(wcycle, ewcMOVEX);
348
349     int                    nzone, nat_tot;
350     gmx_domdec_comm_t     *comm;
351     gmx_domdec_comm_dim_t *cd;
352     rvec                   shift = {0, 0, 0};
353     gmx_bool               bPBC, bScrew;
354
355     comm = dd->comm;
356
357     const BlockRanges &atomGroups = dd->atomGroups();
358
359     nzone   = 1;
360     nat_tot = comm->atomRanges.numHomeAtoms();
361     for (int d = 0; d < dd->ndim; d++)
362     {
363         bPBC   = (dd->ci[dd->dim[d]] == 0);
364         bScrew = (bPBC && dd->bScrewPBC && dd->dim[d] == XX);
365         if (bPBC)
366         {
367             copy_rvec(box[dd->dim[d]], shift);
368         }
369         cd = &comm->cd[d];
370         for (const gmx_domdec_ind_t &ind : cd->ind)
371         {
372             gmx::ArrayRef<const int>   index = ind.index;
373             DDBufferAccess<gmx::RVec>  sendBufferAccess(comm->rvecBuffer, ind.nsend[nzone + 1]);
374             gmx::ArrayRef<gmx::RVec>  &sendBuffer = sendBufferAccess.buffer;
375             int                        n          = 0;
376             if (!bPBC)
377             {
378                 for (int i = 0; i < ind.nsend[nzone]; i++)
379                 {
380                     int at0 = atomGroups.index[index[i]];
381                     int at1 = atomGroups.index[index[i] + 1];
382                     for (int j = at0; j < at1; j++)
383                     {
384                         sendBuffer[n] = x[j];
385                         n++;
386                     }
387                 }
388             }
389             else if (!bScrew)
390             {
391                 for (int i = 0; i < ind.nsend[nzone]; i++)
392                 {
393                     int at0 = atomGroups.index[index[i]];
394                     int at1 = atomGroups.index[index[i] + 1];
395                     for (int j = at0; j < at1; j++)
396                     {
397                         /* We need to shift the coordinates */
398                         for (int d = 0; d < DIM; d++)
399                         {
400                             sendBuffer[n][d] = x[j][d] + shift[d];
401                         }
402                         n++;
403                     }
404                 }
405             }
406             else
407             {
408                 for (int i = 0; i < ind.nsend[nzone]; i++)
409                 {
410                     int at0 = atomGroups.index[index[i]];
411                     int at1 = atomGroups.index[index[i]+1];
412                     for (int j = at0; j < at1; j++)
413                     {
414                         /* Shift x */
415                         sendBuffer[n][XX] = x[j][XX] + shift[XX];
416                         /* Rotate y and z.
417                          * This operation requires a special shift force
418                          * treatment, which is performed in calc_vir.
419                          */
420                         sendBuffer[n][YY] = box[YY][YY] - x[j][YY];
421                         sendBuffer[n][ZZ] = box[ZZ][ZZ] - x[j][ZZ];
422                         n++;
423                     }
424                 }
425             }
426
427             DDBufferAccess<gmx::RVec>  receiveBufferAccess(comm->rvecBuffer2, cd->receiveInPlace ? 0 : ind.nrecv[nzone + 1]);
428
429             gmx::ArrayRef<gmx::RVec>   receiveBuffer;
430             if (cd->receiveInPlace)
431             {
432                 receiveBuffer = gmx::arrayRefFromArray(x.data() + nat_tot, ind.nrecv[nzone + 1]);
433             }
434             else
435             {
436                 receiveBuffer = receiveBufferAccess.buffer;
437             }
438             /* Send and receive the coordinates */
439             ddSendrecv(dd, d, dddirBackward,
440                        sendBuffer, receiveBuffer);
441
442             if (!cd->receiveInPlace)
443             {
444                 int j = 0;
445                 for (int zone = 0; zone < nzone; zone++)
446                 {
447                     for (int i = ind.cell2at0[zone]; i < ind.cell2at1[zone]; i++)
448                     {
449                         x[i] = receiveBuffer[j++];
450                     }
451                 }
452             }
453             nat_tot += ind.nrecv[nzone+1];
454         }
455         nzone += nzone;
456     }
457
458     wallcycle_stop(wcycle, ewcMOVEX);
459 }
460
461 void dd_move_f(gmx_domdec_t             *dd,
462                gmx::ArrayRef<gmx::RVec>  f,
463                rvec                     *fshift,
464                gmx_wallcycle            *wcycle)
465 {
466     wallcycle_start(wcycle, ewcMOVEF);
467
468     int                    nzone, nat_tot;
469     gmx_domdec_comm_t     *comm;
470     gmx_domdec_comm_dim_t *cd;
471     ivec                   vis;
472     int                    is;
473     gmx_bool               bShiftForcesNeedPbc, bScrew;
474
475     comm = dd->comm;
476
477     const BlockRanges &atomGroups = dd->atomGroups();
478
479     nzone   = comm->zones.n/2;
480     nat_tot = comm->atomRanges.end(DDAtomRanges::Type::Zones);
481     for (int d = dd->ndim-1; d >= 0; d--)
482     {
483         /* Only forces in domains near the PBC boundaries need to
484            consider PBC in the treatment of fshift */
485         bShiftForcesNeedPbc   = (dd->ci[dd->dim[d]] == 0);
486         bScrew                = (bShiftForcesNeedPbc && dd->bScrewPBC && dd->dim[d] == XX);
487         if (fshift == nullptr && !bScrew)
488         {
489             bShiftForcesNeedPbc = FALSE;
490         }
491         /* Determine which shift vector we need */
492         clear_ivec(vis);
493         vis[dd->dim[d]] = 1;
494         is              = IVEC2IS(vis);
495
496         cd = &comm->cd[d];
497         for (int p = cd->numPulses() - 1; p >= 0; p--)
498         {
499             const gmx_domdec_ind_t    &ind  = cd->ind[p];
500             DDBufferAccess<gmx::RVec>  receiveBufferAccess(comm->rvecBuffer, ind.nsend[nzone + 1]);
501             gmx::ArrayRef<gmx::RVec>  &receiveBuffer = receiveBufferAccess.buffer;
502
503             nat_tot                        -= ind.nrecv[nzone+1];
504
505             DDBufferAccess<gmx::RVec>  sendBufferAccess(comm->rvecBuffer2, cd->receiveInPlace ? 0 : ind.nrecv[nzone + 1]);
506
507             gmx::ArrayRef<gmx::RVec>   sendBuffer;
508             if (cd->receiveInPlace)
509             {
510                 sendBuffer = gmx::arrayRefFromArray(f.data() + nat_tot, ind.nrecv[nzone + 1]);
511             }
512             else
513             {
514                 sendBuffer = sendBufferAccess.buffer;
515                 int j = 0;
516                 for (int zone = 0; zone < nzone; zone++)
517                 {
518                     for (int i = ind.cell2at0[zone]; i < ind.cell2at1[zone]; i++)
519                     {
520                         sendBuffer[j++] = f[i];
521                     }
522                 }
523             }
524             /* Communicate the forces */
525             ddSendrecv(dd, d, dddirForward,
526                        sendBuffer, receiveBuffer);
527             gmx::ArrayRef<const int> index = ind.index;
528             /* Add the received forces */
529             int                      n = 0;
530             if (!bShiftForcesNeedPbc)
531             {
532                 for (int i = 0; i < ind.nsend[nzone]; i++)
533                 {
534                     int at0 = atomGroups.index[index[i]];
535                     int at1 = atomGroups.index[index[i] + 1];
536                     for (int j = at0; j < at1; j++)
537                     {
538                         for (int d = 0; d < DIM; d++)
539                         {
540                             f[j][d] += receiveBuffer[n][d];
541                         }
542                         n++;
543                     }
544                 }
545             }
546             else if (!bScrew)
547             {
548                 /* fshift should always be defined if this function is
549                  * called when bShiftForcesNeedPbc is true */
550                 assert(NULL != fshift);
551                 for (int i = 0; i < ind.nsend[nzone]; i++)
552                 {
553                     int at0 = atomGroups.index[index[i]];
554                     int at1 = atomGroups.index[index[i] + 1];
555                     for (int j = at0; j < at1; j++)
556                     {
557                         for (int d = 0; d < DIM; d++)
558                         {
559                             f[j][d] += receiveBuffer[n][d];
560                         }
561                         /* Add this force to the shift force */
562                         for (int d = 0; d < DIM; d++)
563                         {
564                             fshift[is][d] += receiveBuffer[n][d];
565                         }
566                         n++;
567                     }
568                 }
569             }
570             else
571             {
572                 for (int i = 0; i < ind.nsend[nzone]; i++)
573                 {
574                     int at0 = atomGroups.index[index[i]];
575                     int at1 = atomGroups.index[index[i] + 1];
576                     for (int j = at0; j < at1; j++)
577                     {
578                         /* Rotate the force */
579                         f[j][XX] += receiveBuffer[n][XX];
580                         f[j][YY] -= receiveBuffer[n][YY];
581                         f[j][ZZ] -= receiveBuffer[n][ZZ];
582                         if (fshift)
583                         {
584                             /* Add this force to the shift force */
585                             for (int d = 0; d < DIM; d++)
586                             {
587                                 fshift[is][d] += receiveBuffer[n][d];
588                             }
589                         }
590                         n++;
591                     }
592                 }
593             }
594         }
595         nzone /= 2;
596     }
597     wallcycle_stop(wcycle, ewcMOVEF);
598 }
599
600 /* Convenience function for extracting a real buffer from an rvec buffer
601  *
602  * To reduce the number of temporary communication buffers and avoid
603  * cache polution, we reuse gmx::RVec buffers for storing reals.
604  * This functions return a real buffer reference with the same number
605  * of elements as the gmx::RVec buffer (so 1/3 of the size in bytes).
606  */
607 static gmx::ArrayRef<real>
608 realArrayRefFromRvecArrayRef(gmx::ArrayRef<gmx::RVec> arrayRef)
609 {
610     return gmx::arrayRefFromArray(as_rvec_array(arrayRef.data())[0],
611                                   arrayRef.size());
612 }
613
614 void dd_atom_spread_real(gmx_domdec_t *dd, real v[])
615 {
616     int                    nzone, nat_tot;
617     gmx_domdec_comm_t     *comm;
618     gmx_domdec_comm_dim_t *cd;
619
620     comm = dd->comm;
621
622     const BlockRanges &atomGroups = dd->atomGroups();
623
624     nzone   = 1;
625     nat_tot = comm->atomRanges.numHomeAtoms();
626     for (int d = 0; d < dd->ndim; d++)
627     {
628         cd = &comm->cd[d];
629         for (const gmx_domdec_ind_t &ind : cd->ind)
630         {
631             gmx::ArrayRef<const int>  index = ind.index;
632
633             /* Note: We provision for RVec instead of real, so a factor of 3
634              * more than needed. The buffer actually already has this size
635              * and we pass a plain pointer below, so this does not matter.
636              */
637             DDBufferAccess<gmx::RVec> sendBufferAccess(comm->rvecBuffer, ind.nsend[nzone + 1]);
638             gmx::ArrayRef<real>       sendBuffer = realArrayRefFromRvecArrayRef(sendBufferAccess.buffer);
639             int                       n          = 0;
640             for (int i = 0; i < ind.nsend[nzone]; i++)
641             {
642                 int at0 = atomGroups.index[index[i]];
643                 int at1 = atomGroups.index[index[i] + 1];
644                 for (int j = at0; j < at1; j++)
645                 {
646                     sendBuffer[n++] = v[j];
647                 }
648             }
649
650             DDBufferAccess<gmx::RVec> receiveBufferAccess(comm->rvecBuffer2, cd->receiveInPlace ? 0 : ind.nrecv[nzone + 1]);
651
652             gmx::ArrayRef<real>       receiveBuffer;
653             if (cd->receiveInPlace)
654             {
655                 receiveBuffer = gmx::arrayRefFromArray(v + nat_tot, ind.nrecv[nzone + 1]);
656             }
657             else
658             {
659                 receiveBuffer = realArrayRefFromRvecArrayRef(receiveBufferAccess.buffer);
660             }
661             /* Send and receive the data */
662             ddSendrecv(dd, d, dddirBackward,
663                        sendBuffer, receiveBuffer);
664             if (!cd->receiveInPlace)
665             {
666                 int j = 0;
667                 for (int zone = 0; zone < nzone; zone++)
668                 {
669                     for (int i = ind.cell2at0[zone]; i < ind.cell2at1[zone]; i++)
670                     {
671                         v[i] = receiveBuffer[j++];
672                     }
673                 }
674             }
675             nat_tot += ind.nrecv[nzone+1];
676         }
677         nzone += nzone;
678     }
679 }
680
681 void dd_atom_sum_real(gmx_domdec_t *dd, real v[])
682 {
683     int                    nzone, nat_tot;
684     gmx_domdec_comm_t     *comm;
685     gmx_domdec_comm_dim_t *cd;
686
687     comm = dd->comm;
688
689     const gmx::BlockRanges &atomGroups = dd->atomGroups();
690
691     nzone   = comm->zones.n/2;
692     nat_tot = comm->atomRanges.end(DDAtomRanges::Type::Zones);
693     for (int d = dd->ndim-1; d >= 0; d--)
694     {
695         cd = &comm->cd[d];
696         for (int p = cd->numPulses() - 1; p >= 0; p--)
697         {
698             const gmx_domdec_ind_t &ind = cd->ind[p];
699
700             /* Note: We provision for RVec instead of real, so a factor of 3
701              * more than needed. The buffer actually already has this size
702              * and we typecast, so this works as intended.
703              */
704             DDBufferAccess<gmx::RVec> receiveBufferAccess(comm->rvecBuffer, ind.nsend[nzone + 1]);
705             gmx::ArrayRef<real>       receiveBuffer = realArrayRefFromRvecArrayRef(receiveBufferAccess.buffer);
706             nat_tot -= ind.nrecv[nzone + 1];
707
708             DDBufferAccess<gmx::RVec> sendBufferAccess(comm->rvecBuffer2, cd->receiveInPlace ? 0 : ind.nrecv[nzone + 1]);
709
710             gmx::ArrayRef<real>       sendBuffer;
711             if (cd->receiveInPlace)
712             {
713                 sendBuffer = gmx::arrayRefFromArray(v + nat_tot, ind.nrecv[nzone + 1]);
714             }
715             else
716             {
717                 sendBuffer = realArrayRefFromRvecArrayRef(sendBufferAccess.buffer);
718                 int j = 0;
719                 for (int zone = 0; zone < nzone; zone++)
720                 {
721                     for (int i = ind.cell2at0[zone]; i < ind.cell2at1[zone]; i++)
722                     {
723                         sendBuffer[j++] = v[i];
724                     }
725                 }
726             }
727             /* Communicate the forces */
728             ddSendrecv(dd, d, dddirForward,
729                        sendBuffer, receiveBuffer);
730             gmx::ArrayRef<const int> index = ind.index;
731             /* Add the received forces */
732             int                      n = 0;
733             for (int i = 0; i < ind.nsend[nzone]; i++)
734             {
735                 int at0 = atomGroups.index[index[i]];
736                 int at1 = atomGroups.index[index[i] + 1];
737                 for (int j = at0; j < at1; j++)
738                 {
739                     v[j] += receiveBuffer[n];
740                     n++;
741                 }
742             }
743         }
744         nzone /= 2;
745     }
746 }
747
748 static void print_ddzone(FILE *fp, int d, int i, int j, gmx_ddzone_t *zone)
749 {
750     fprintf(fp, "zone d0 %d d1 %d d2 %d  min0 %6.3f max1 %6.3f mch0 %6.3f mch1 %6.3f p1_0 %6.3f p1_1 %6.3f\n",
751             d, i, j,
752             zone->min0, zone->max1,
753             zone->mch0, zone->mch0,
754             zone->p1_0, zone->p1_1);
755 }
756
757
758 #define DDZONECOMM_MAXZONE  5
759 #define DDZONECOMM_BUFSIZE  3
760
761 static void dd_sendrecv_ddzone(const gmx_domdec_t *dd,
762                                int ddimind, int direction,
763                                gmx_ddzone_t *buf_s, int n_s,
764                                gmx_ddzone_t *buf_r, int n_r)
765 {
766 #define ZBS  DDZONECOMM_BUFSIZE
767     rvec vbuf_s[DDZONECOMM_MAXZONE*ZBS];
768     rvec vbuf_r[DDZONECOMM_MAXZONE*ZBS];
769     int  i;
770
771     for (i = 0; i < n_s; i++)
772     {
773         vbuf_s[i*ZBS  ][0] = buf_s[i].min0;
774         vbuf_s[i*ZBS  ][1] = buf_s[i].max1;
775         vbuf_s[i*ZBS  ][2] = buf_s[i].min1;
776         vbuf_s[i*ZBS+1][0] = buf_s[i].mch0;
777         vbuf_s[i*ZBS+1][1] = buf_s[i].mch1;
778         vbuf_s[i*ZBS+1][2] = 0;
779         vbuf_s[i*ZBS+2][0] = buf_s[i].p1_0;
780         vbuf_s[i*ZBS+2][1] = buf_s[i].p1_1;
781         vbuf_s[i*ZBS+2][2] = 0;
782     }
783
784     ddSendrecv(dd, ddimind, direction,
785                vbuf_s, n_s*ZBS,
786                vbuf_r, n_r*ZBS);
787
788     for (i = 0; i < n_r; i++)
789     {
790         buf_r[i].min0 = vbuf_r[i*ZBS  ][0];
791         buf_r[i].max1 = vbuf_r[i*ZBS  ][1];
792         buf_r[i].min1 = vbuf_r[i*ZBS  ][2];
793         buf_r[i].mch0 = vbuf_r[i*ZBS+1][0];
794         buf_r[i].mch1 = vbuf_r[i*ZBS+1][1];
795         buf_r[i].p1_0 = vbuf_r[i*ZBS+2][0];
796         buf_r[i].p1_1 = vbuf_r[i*ZBS+2][1];
797     }
798
799 #undef ZBS
800 }
801
802 static void dd_move_cellx(gmx_domdec_t *dd, gmx_ddbox_t *ddbox,
803                           rvec cell_ns_x0, rvec cell_ns_x1)
804 {
805     int                d, d1, dim, pos, buf_size, i, j, p, npulse, npulse_min;
806     gmx_ddzone_t      *zp;
807     gmx_ddzone_t       buf_s[DDZONECOMM_MAXZONE];
808     gmx_ddzone_t       buf_r[DDZONECOMM_MAXZONE];
809     gmx_ddzone_t       buf_e[DDZONECOMM_MAXZONE];
810     rvec               extr_s[2], extr_r[2];
811     rvec               dh;
812     real               dist_d, c = 0, det;
813     gmx_domdec_comm_t *comm;
814     gmx_bool           bPBC, bUse;
815
816     comm = dd->comm;
817
818     for (d = 1; d < dd->ndim; d++)
819     {
820         dim      = dd->dim[d];
821         zp       = (d == 1) ? &comm->zone_d1[0] : &comm->zone_d2[0][0];
822         zp->min0 = cell_ns_x0[dim];
823         zp->max1 = cell_ns_x1[dim];
824         zp->min1 = cell_ns_x1[dim];
825         zp->mch0 = cell_ns_x0[dim];
826         zp->mch1 = cell_ns_x1[dim];
827         zp->p1_0 = cell_ns_x0[dim];
828         zp->p1_1 = cell_ns_x1[dim];
829     }
830
831     for (d = dd->ndim-2; d >= 0; d--)
832     {
833         dim  = dd->dim[d];
834         bPBC = (dim < ddbox->npbcdim);
835
836         /* Use an rvec to store two reals */
837         extr_s[d][0] = comm->cell_f0[d+1];
838         extr_s[d][1] = comm->cell_f1[d+1];
839         extr_s[d][2] = comm->cell_f1[d+1];
840
841         pos = 0;
842         /* Store the extremes in the backward sending buffer,
843          * so the get updated separately from the forward communication.
844          */
845         for (d1 = d; d1 < dd->ndim-1; d1++)
846         {
847             /* We invert the order to be able to use the same loop for buf_e */
848             buf_s[pos].min0 = extr_s[d1][1];
849             buf_s[pos].max1 = extr_s[d1][0];
850             buf_s[pos].min1 = extr_s[d1][2];
851             buf_s[pos].mch0 = 0;
852             buf_s[pos].mch1 = 0;
853             /* Store the cell corner of the dimension we communicate along */
854             buf_s[pos].p1_0 = comm->cell_x0[dim];
855             buf_s[pos].p1_1 = 0;
856             pos++;
857         }
858
859         buf_s[pos] = (dd->ndim == 2) ? comm->zone_d1[0] : comm->zone_d2[0][0];
860         pos++;
861
862         if (dd->ndim == 3 && d == 0)
863         {
864             buf_s[pos] = comm->zone_d2[0][1];
865             pos++;
866             buf_s[pos] = comm->zone_d1[0];
867             pos++;
868         }
869
870         /* We only need to communicate the extremes
871          * in the forward direction
872          */
873         npulse = comm->cd[d].numPulses();
874         if (bPBC)
875         {
876             /* Take the minimum to avoid double communication */
877             npulse_min = std::min(npulse, dd->nc[dim]-1-npulse);
878         }
879         else
880         {
881             /* Without PBC we should really not communicate over
882              * the boundaries, but implementing that complicates
883              * the communication setup and therefore we simply
884              * do all communication, but ignore some data.
885              */
886             npulse_min = npulse;
887         }
888         for (p = 0; p < npulse_min; p++)
889         {
890             /* Communicate the extremes forward */
891             bUse = (bPBC || dd->ci[dim] > 0);
892
893             ddSendrecv(dd, d, dddirForward,
894                        extr_s+d, dd->ndim-d-1,
895                        extr_r+d, dd->ndim-d-1);
896
897             if (bUse)
898             {
899                 for (d1 = d; d1 < dd->ndim-1; d1++)
900                 {
901                     extr_s[d1][0] = std::max(extr_s[d1][0], extr_r[d1][0]);
902                     extr_s[d1][1] = std::min(extr_s[d1][1], extr_r[d1][1]);
903                     extr_s[d1][2] = std::min(extr_s[d1][2], extr_r[d1][2]);
904                 }
905             }
906         }
907
908         buf_size = pos;
909         for (p = 0; p < npulse; p++)
910         {
911             /* Communicate all the zone information backward */
912             bUse = (bPBC || dd->ci[dim] < dd->nc[dim] - 1);
913
914             dd_sendrecv_ddzone(dd, d, dddirBackward,
915                                buf_s, buf_size,
916                                buf_r, buf_size);
917
918             clear_rvec(dh);
919             if (p > 0)
920             {
921                 for (d1 = d+1; d1 < dd->ndim; d1++)
922                 {
923                     /* Determine the decrease of maximum required
924                      * communication height along d1 due to the distance along d,
925                      * this avoids a lot of useless atom communication.
926                      */
927                     dist_d = comm->cell_x1[dim] - buf_r[0].p1_0;
928
929                     if (ddbox->tric_dir[dim])
930                     {
931                         /* c is the off-diagonal coupling between the cell planes
932                          * along directions d and d1.
933                          */
934                         c = ddbox->v[dim][dd->dim[d1]][dim];
935                     }
936                     else
937                     {
938                         c = 0;
939                     }
940                     det = (1 + c*c)*comm->cutoff*comm->cutoff - dist_d*dist_d;
941                     if (det > 0)
942                     {
943                         dh[d1] = comm->cutoff - (c*dist_d + std::sqrt(det))/(1 + c*c);
944                     }
945                     else
946                     {
947                         /* A negative value signals out of range */
948                         dh[d1] = -1;
949                     }
950                 }
951             }
952
953             /* Accumulate the extremes over all pulses */
954             for (i = 0; i < buf_size; i++)
955             {
956                 if (p == 0)
957                 {
958                     buf_e[i] = buf_r[i];
959                 }
960                 else
961                 {
962                     if (bUse)
963                     {
964                         buf_e[i].min0 = std::min(buf_e[i].min0, buf_r[i].min0);
965                         buf_e[i].max1 = std::max(buf_e[i].max1, buf_r[i].max1);
966                         buf_e[i].min1 = std::min(buf_e[i].min1, buf_r[i].min1);
967                     }
968
969                     if (dd->ndim == 3 && d == 0 && i == buf_size - 1)
970                     {
971                         d1 = 1;
972                     }
973                     else
974                     {
975                         d1 = d + 1;
976                     }
977                     if (bUse && dh[d1] >= 0)
978                     {
979                         buf_e[i].mch0 = std::max(buf_e[i].mch0, buf_r[i].mch0-dh[d1]);
980                         buf_e[i].mch1 = std::max(buf_e[i].mch1, buf_r[i].mch1-dh[d1]);
981                     }
982                 }
983                 /* Copy the received buffer to the send buffer,
984                  * to pass the data through with the next pulse.
985                  */
986                 buf_s[i] = buf_r[i];
987             }
988             if (((bPBC || dd->ci[dim]+npulse < dd->nc[dim]) && p == npulse-1) ||
989                 (!bPBC && dd->ci[dim]+1+p == dd->nc[dim]-1))
990             {
991                 /* Store the extremes */
992                 pos = 0;
993
994                 for (d1 = d; d1 < dd->ndim-1; d1++)
995                 {
996                     extr_s[d1][1] = std::min(extr_s[d1][1], buf_e[pos].min0);
997                     extr_s[d1][0] = std::max(extr_s[d1][0], buf_e[pos].max1);
998                     extr_s[d1][2] = std::min(extr_s[d1][2], buf_e[pos].min1);
999                     pos++;
1000                 }
1001
1002                 if (d == 1 || (d == 0 && dd->ndim == 3))
1003                 {
1004                     for (i = d; i < 2; i++)
1005                     {
1006                         comm->zone_d2[1-d][i] = buf_e[pos];
1007                         pos++;
1008                     }
1009                 }
1010                 if (d == 0)
1011                 {
1012                     comm->zone_d1[1] = buf_e[pos];
1013                     pos++;
1014                 }
1015             }
1016         }
1017     }
1018
1019     if (dd->ndim >= 2)
1020     {
1021         dim = dd->dim[1];
1022         for (i = 0; i < 2; i++)
1023         {
1024             if (debug)
1025             {
1026                 print_ddzone(debug, 1, i, 0, &comm->zone_d1[i]);
1027             }
1028             cell_ns_x0[dim] = std::min(cell_ns_x0[dim], comm->zone_d1[i].min0);
1029             cell_ns_x1[dim] = std::max(cell_ns_x1[dim], comm->zone_d1[i].max1);
1030         }
1031     }
1032     if (dd->ndim >= 3)
1033     {
1034         dim = dd->dim[2];
1035         for (i = 0; i < 2; i++)
1036         {
1037             for (j = 0; j < 2; j++)
1038             {
1039                 if (debug)
1040                 {
1041                     print_ddzone(debug, 2, i, j, &comm->zone_d2[i][j]);
1042                 }
1043                 cell_ns_x0[dim] = std::min(cell_ns_x0[dim], comm->zone_d2[i][j].min0);
1044                 cell_ns_x1[dim] = std::max(cell_ns_x1[dim], comm->zone_d2[i][j].max1);
1045             }
1046         }
1047     }
1048     for (d = 1; d < dd->ndim; d++)
1049     {
1050         comm->cell_f_max0[d] = extr_s[d-1][0];
1051         comm->cell_f_min1[d] = extr_s[d-1][1];
1052         if (debug)
1053         {
1054             fprintf(debug, "Cell fraction d %d, max0 %f, min1 %f\n",
1055                     d, comm->cell_f_max0[d], comm->cell_f_min1[d]);
1056         }
1057     }
1058 }
1059
1060 static void write_dd_grid_pdb(const char *fn, gmx_int64_t step,
1061                               gmx_domdec_t *dd, matrix box, gmx_ddbox_t *ddbox)
1062 {
1063     rvec   grid_s[2], *grid_r = nullptr, cx, r;
1064     char   fname[STRLEN], buf[22];
1065     FILE  *out;
1066     int    a, i, d, z, y, x;
1067     matrix tric;
1068     real   vol;
1069
1070     copy_rvec(dd->comm->cell_x0, grid_s[0]);
1071     copy_rvec(dd->comm->cell_x1, grid_s[1]);
1072
1073     if (DDMASTER(dd))
1074     {
1075         snew(grid_r, 2*dd->nnodes);
1076     }
1077
1078     dd_gather(dd, 2*sizeof(rvec), grid_s, DDMASTER(dd) ? grid_r : nullptr);
1079
1080     if (DDMASTER(dd))
1081     {
1082         for (d = 0; d < DIM; d++)
1083         {
1084             for (i = 0; i < DIM; i++)
1085             {
1086                 if (d == i)
1087                 {
1088                     tric[d][i] = 1;
1089                 }
1090                 else
1091                 {
1092                     if (d < ddbox->npbcdim && dd->nc[d] > 1)
1093                     {
1094                         tric[d][i] = box[i][d]/box[i][i];
1095                     }
1096                     else
1097                     {
1098                         tric[d][i] = 0;
1099                     }
1100                 }
1101             }
1102         }
1103         sprintf(fname, "%s_%s.pdb", fn, gmx_step_str(step, buf));
1104         out = gmx_fio_fopen(fname, "w");
1105         gmx_write_pdb_box(out, dd->bScrewPBC ? epbcSCREW : epbcXYZ, box);
1106         a = 1;
1107         for (i = 0; i < dd->nnodes; i++)
1108         {
1109             vol = dd->nnodes/(box[XX][XX]*box[YY][YY]*box[ZZ][ZZ]);
1110             for (d = 0; d < DIM; d++)
1111             {
1112                 vol *= grid_r[i*2+1][d] - grid_r[i*2][d];
1113             }
1114             for (z = 0; z < 2; z++)
1115             {
1116                 for (y = 0; y < 2; y++)
1117                 {
1118                     for (x = 0; x < 2; x++)
1119                     {
1120                         cx[XX] = grid_r[i*2+x][XX];
1121                         cx[YY] = grid_r[i*2+y][YY];
1122                         cx[ZZ] = grid_r[i*2+z][ZZ];
1123                         mvmul(tric, cx, r);
1124                         gmx_fprintf_pdb_atomline(out, epdbATOM, a++, "CA", ' ', "GLY", ' ', i+1, ' ',
1125                                                  10*r[XX], 10*r[YY], 10*r[ZZ], 1.0, vol, "");
1126                     }
1127                 }
1128             }
1129             for (d = 0; d < DIM; d++)
1130             {
1131                 for (x = 0; x < 4; x++)
1132                 {
1133                     switch (d)
1134                     {
1135                         case 0: y = 1 + i*8 + 2*x; break;
1136                         case 1: y = 1 + i*8 + 2*x - (x % 2); break;
1137                         case 2: y = 1 + i*8 + x; break;
1138                     }
1139                     fprintf(out, "%6s%5d%5d\n", "CONECT", y, y+(1<<d));
1140                 }
1141             }
1142         }
1143         gmx_fio_fclose(out);
1144         sfree(grid_r);
1145     }
1146 }
1147
1148 void write_dd_pdb(const char *fn, gmx_int64_t step, const char *title,
1149                   const gmx_mtop_t *mtop, const t_commrec *cr,
1150                   int natoms, const rvec x[], const matrix box)
1151 {
1152     char          fname[STRLEN], buf[22];
1153     FILE         *out;
1154     int           resnr;
1155     const char   *atomname, *resname;
1156     gmx_domdec_t *dd;
1157
1158     dd = cr->dd;
1159     if (natoms == -1)
1160     {
1161         natoms = dd->comm->atomRanges.end(DDAtomRanges::Type::Vsites);
1162     }
1163
1164     sprintf(fname, "%s_%s_n%d.pdb", fn, gmx_step_str(step, buf), cr->sim_nodeid);
1165
1166     out = gmx_fio_fopen(fname, "w");
1167
1168     fprintf(out, "TITLE     %s\n", title);
1169     gmx_write_pdb_box(out, dd->bScrewPBC ? epbcSCREW : epbcXYZ, box);
1170     int molb = 0;
1171     for (int i = 0; i < natoms; i++)
1172     {
1173         int  ii = dd->globalAtomIndices[i];
1174         mtopGetAtomAndResidueName(mtop, ii, &molb, &atomname, &resnr, &resname, nullptr);
1175         int  c;
1176         real b;
1177         if (i < dd->comm->atomRanges.end(DDAtomRanges::Type::Zones))
1178         {
1179             c = 0;
1180             while (i >= dd->atomGroups().index[dd->comm->zones.cg_range[c+1]])
1181             {
1182                 c++;
1183             }
1184             b = c;
1185         }
1186         else if (i < dd->comm->atomRanges.end(DDAtomRanges::Type::Vsites))
1187         {
1188             b = dd->comm->zones.n;
1189         }
1190         else
1191         {
1192             b = dd->comm->zones.n + 1;
1193         }
1194         gmx_fprintf_pdb_atomline(out, epdbATOM, ii+1, atomname, ' ', resname, ' ', resnr, ' ',
1195                                  10*x[i][XX], 10*x[i][YY], 10*x[i][ZZ], 1.0, b, "");
1196     }
1197     fprintf(out, "TER\n");
1198
1199     gmx_fio_fclose(out);
1200 }
1201
1202 real dd_cutoff_multibody(const gmx_domdec_t *dd)
1203 {
1204     gmx_domdec_comm_t *comm;
1205     int                di;
1206     real               r;
1207
1208     comm = dd->comm;
1209
1210     r = -1;
1211     if (comm->bInterCGBondeds)
1212     {
1213         if (comm->cutoff_mbody > 0)
1214         {
1215             r = comm->cutoff_mbody;
1216         }
1217         else
1218         {
1219             /* cutoff_mbody=0 means we do not have DLB */
1220             r = comm->cellsize_min[dd->dim[0]];
1221             for (di = 1; di < dd->ndim; di++)
1222             {
1223                 r = std::min(r, comm->cellsize_min[dd->dim[di]]);
1224             }
1225             if (comm->bBondComm)
1226             {
1227                 r = std::max(r, comm->cutoff_mbody);
1228             }
1229             else
1230             {
1231                 r = std::min(r, comm->cutoff);
1232             }
1233         }
1234     }
1235
1236     return r;
1237 }
1238
1239 real dd_cutoff_twobody(const gmx_domdec_t *dd)
1240 {
1241     real r_mb;
1242
1243     r_mb = dd_cutoff_multibody(dd);
1244
1245     return std::max(dd->comm->cutoff, r_mb);
1246 }
1247
1248
1249 static void dd_cart_coord2pmecoord(const gmx_domdec_t *dd, const ivec coord,
1250                                    ivec coord_pme)
1251 {
1252     int nc, ntot;
1253
1254     nc   = dd->nc[dd->comm->cartpmedim];
1255     ntot = dd->comm->ntot[dd->comm->cartpmedim];
1256     copy_ivec(coord, coord_pme);
1257     coord_pme[dd->comm->cartpmedim] =
1258         nc + (coord[dd->comm->cartpmedim]*(ntot - nc) + (ntot - nc)/2)/nc;
1259 }
1260
1261 static int ddindex2pmeindex(const gmx_domdec_t *dd, int ddindex)
1262 {
1263     int npp, npme;
1264
1265     npp  = dd->nnodes;
1266     npme = dd->comm->npmenodes;
1267
1268     /* Here we assign a PME node to communicate with this DD node
1269      * by assuming that the major index of both is x.
1270      * We add cr->npmenodes/2 to obtain an even distribution.
1271      */
1272     return (ddindex*npme + npme/2)/npp;
1273 }
1274
1275 static int *dd_interleaved_pme_ranks(const gmx_domdec_t *dd)
1276 {
1277     int *pme_rank;
1278     int  n, i, p0, p1;
1279
1280     snew(pme_rank, dd->comm->npmenodes);
1281     n = 0;
1282     for (i = 0; i < dd->nnodes; i++)
1283     {
1284         p0 = ddindex2pmeindex(dd, i);
1285         p1 = ddindex2pmeindex(dd, i+1);
1286         if (i+1 == dd->nnodes || p1 > p0)
1287         {
1288             if (debug)
1289             {
1290                 fprintf(debug, "pme_rank[%d] = %d\n", n, i+1+n);
1291             }
1292             pme_rank[n] = i + 1 + n;
1293             n++;
1294         }
1295     }
1296
1297     return pme_rank;
1298 }
1299
1300 static int gmx_ddcoord2pmeindex(const t_commrec *cr, int x, int y, int z)
1301 {
1302     gmx_domdec_t *dd;
1303     ivec          coords;
1304     int           slab;
1305
1306     dd = cr->dd;
1307     /*
1308        if (dd->comm->bCartesian) {
1309        gmx_ddindex2xyz(dd->nc,ddindex,coords);
1310        dd_coords2pmecoords(dd,coords,coords_pme);
1311        copy_ivec(dd->ntot,nc);
1312        nc[dd->cartpmedim]         -= dd->nc[dd->cartpmedim];
1313        coords_pme[dd->cartpmedim] -= dd->nc[dd->cartpmedim];
1314
1315        slab = (coords_pme[XX]*nc[YY] + coords_pme[YY])*nc[ZZ] + coords_pme[ZZ];
1316        } else {
1317        slab = (ddindex*cr->npmenodes + cr->npmenodes/2)/dd->nnodes;
1318        }
1319      */
1320     coords[XX] = x;
1321     coords[YY] = y;
1322     coords[ZZ] = z;
1323     slab       = ddindex2pmeindex(dd, dd_index(dd->nc, coords));
1324
1325     return slab;
1326 }
1327
1328 static int ddcoord2simnodeid(const t_commrec *cr, int x, int y, int z)
1329 {
1330     gmx_domdec_comm_t *comm;
1331     ivec               coords;
1332     int                ddindex, nodeid = -1;
1333
1334     comm = cr->dd->comm;
1335
1336     coords[XX] = x;
1337     coords[YY] = y;
1338     coords[ZZ] = z;
1339     if (comm->bCartesianPP_PME)
1340     {
1341 #if GMX_MPI
1342         MPI_Cart_rank(cr->mpi_comm_mysim, coords, &nodeid);
1343 #endif
1344     }
1345     else
1346     {
1347         ddindex = dd_index(cr->dd->nc, coords);
1348         if (comm->bCartesianPP)
1349         {
1350             nodeid = comm->ddindex2simnodeid[ddindex];
1351         }
1352         else
1353         {
1354             if (comm->pmenodes)
1355             {
1356                 nodeid = ddindex + gmx_ddcoord2pmeindex(cr, x, y, z);
1357             }
1358             else
1359             {
1360                 nodeid = ddindex;
1361             }
1362         }
1363     }
1364
1365     return nodeid;
1366 }
1367
1368 static int dd_simnode2pmenode(const gmx_domdec_t         *dd,
1369                               const t_commrec gmx_unused *cr,
1370                               int                         sim_nodeid)
1371 {
1372     int pmenode = -1;
1373
1374     const gmx_domdec_comm_t *comm = dd->comm;
1375
1376     /* This assumes a uniform x domain decomposition grid cell size */
1377     if (comm->bCartesianPP_PME)
1378     {
1379 #if GMX_MPI
1380         ivec coord, coord_pme;
1381         MPI_Cart_coords(cr->mpi_comm_mysim, sim_nodeid, DIM, coord);
1382         if (coord[comm->cartpmedim] < dd->nc[comm->cartpmedim])
1383         {
1384             /* This is a PP node */
1385             dd_cart_coord2pmecoord(dd, coord, coord_pme);
1386             MPI_Cart_rank(cr->mpi_comm_mysim, coord_pme, &pmenode);
1387         }
1388 #endif
1389     }
1390     else if (comm->bCartesianPP)
1391     {
1392         if (sim_nodeid < dd->nnodes)
1393         {
1394             pmenode = dd->nnodes + ddindex2pmeindex(dd, sim_nodeid);
1395         }
1396     }
1397     else
1398     {
1399         /* This assumes DD cells with identical x coordinates
1400          * are numbered sequentially.
1401          */
1402         if (dd->comm->pmenodes == nullptr)
1403         {
1404             if (sim_nodeid < dd->nnodes)
1405             {
1406                 /* The DD index equals the nodeid */
1407                 pmenode = dd->nnodes + ddindex2pmeindex(dd, sim_nodeid);
1408             }
1409         }
1410         else
1411         {
1412             int i = 0;
1413             while (sim_nodeid > dd->comm->pmenodes[i])
1414             {
1415                 i++;
1416             }
1417             if (sim_nodeid < dd->comm->pmenodes[i])
1418             {
1419                 pmenode = dd->comm->pmenodes[i];
1420             }
1421         }
1422     }
1423
1424     return pmenode;
1425 }
1426
1427 NumPmeDomains getNumPmeDomains(const gmx_domdec_t *dd)
1428 {
1429     if (dd != nullptr)
1430     {
1431         return {
1432                    dd->comm->npmenodes_x, dd->comm->npmenodes_y
1433         };
1434     }
1435     else
1436     {
1437         return {
1438                    1, 1
1439         };
1440     }
1441 }
1442
1443 std::vector<int> get_pme_ddranks(const t_commrec *cr, int pmenodeid)
1444 {
1445     gmx_domdec_t *dd;
1446     int           x, y, z;
1447     ivec          coord, coord_pme;
1448
1449     dd = cr->dd;
1450
1451     std::vector<int> ddranks;
1452     ddranks.reserve((dd->nnodes+cr->npmenodes-1)/cr->npmenodes);
1453
1454     for (x = 0; x < dd->nc[XX]; x++)
1455     {
1456         for (y = 0; y < dd->nc[YY]; y++)
1457         {
1458             for (z = 0; z < dd->nc[ZZ]; z++)
1459             {
1460                 if (dd->comm->bCartesianPP_PME)
1461                 {
1462                     coord[XX] = x;
1463                     coord[YY] = y;
1464                     coord[ZZ] = z;
1465                     dd_cart_coord2pmecoord(dd, coord, coord_pme);
1466                     if (dd->ci[XX] == coord_pme[XX] &&
1467                         dd->ci[YY] == coord_pme[YY] &&
1468                         dd->ci[ZZ] == coord_pme[ZZ])
1469                     {
1470                         ddranks.push_back(ddcoord2simnodeid(cr, x, y, z));
1471                     }
1472                 }
1473                 else
1474                 {
1475                     /* The slab corresponds to the nodeid in the PME group */
1476                     if (gmx_ddcoord2pmeindex(cr, x, y, z) == pmenodeid)
1477                     {
1478                         ddranks.push_back(ddcoord2simnodeid(cr, x, y, z));
1479                     }
1480                 }
1481             }
1482         }
1483     }
1484     return ddranks;
1485 }
1486
1487 static gmx_bool receive_vir_ener(const gmx_domdec_t *dd, const t_commrec *cr)
1488 {
1489     gmx_bool bReceive = TRUE;
1490
1491     if (cr->npmenodes < dd->nnodes)
1492     {
1493         gmx_domdec_comm_t *comm = dd->comm;
1494         if (comm->bCartesianPP_PME)
1495         {
1496 #if GMX_MPI
1497             int  pmenode = dd_simnode2pmenode(dd, cr, cr->sim_nodeid);
1498             ivec coords;
1499             MPI_Cart_coords(cr->mpi_comm_mysim, cr->sim_nodeid, DIM, coords);
1500             coords[comm->cartpmedim]++;
1501             if (coords[comm->cartpmedim] < dd->nc[comm->cartpmedim])
1502             {
1503                 int rank;
1504                 MPI_Cart_rank(cr->mpi_comm_mysim, coords, &rank);
1505                 if (dd_simnode2pmenode(dd, cr, rank) == pmenode)
1506                 {
1507                     /* This is not the last PP node for pmenode */
1508                     bReceive = FALSE;
1509                 }
1510             }
1511 #else
1512             GMX_RELEASE_ASSERT(false, "Without MPI we should not have Cartesian PP-PME with #PMEnodes < #DDnodes");
1513 #endif
1514         }
1515         else
1516         {
1517             int pmenode = dd_simnode2pmenode(dd, cr, cr->sim_nodeid);
1518             if (cr->sim_nodeid+1 < cr->nnodes &&
1519                 dd_simnode2pmenode(dd, cr, cr->sim_nodeid+1) == pmenode)
1520             {
1521                 /* This is not the last PP node for pmenode */
1522                 bReceive = FALSE;
1523             }
1524         }
1525     }
1526
1527     return bReceive;
1528 }
1529
1530 static void set_zones_ncg_home(gmx_domdec_t *dd)
1531 {
1532     gmx_domdec_zones_t *zones;
1533     int                 i;
1534
1535     zones = &dd->comm->zones;
1536
1537     zones->cg_range[0] = 0;
1538     for (i = 1; i < zones->n+1; i++)
1539     {
1540         zones->cg_range[i] = dd->ncg_home;
1541     }
1542     /* zone_ncg1[0] should always be equal to ncg_home */
1543     dd->comm->zone_ncg1[0] = dd->ncg_home;
1544 }
1545
1546 static void restoreAtomGroups(gmx_domdec_t *dd,
1547                               const int *gcgs_index, const t_state *state)
1548 {
1549     gmx::ArrayRef<const int>  atomGroupsState        = state->cg_gl;
1550
1551     std::vector<int>         &globalAtomGroupIndices = dd->globalAtomGroupIndices;
1552     gmx::BlockRanges         &atomGroups             = dd->atomGroups_;
1553
1554     globalAtomGroupIndices.resize(atomGroupsState.size());
1555     atomGroups.index.resize(atomGroupsState.size() + 1);
1556
1557     /* Copy back the global charge group indices from state
1558      * and rebuild the local charge group to atom index.
1559      */
1560     int atomIndex = 0;
1561     for (unsigned int i = 0; i < atomGroupsState.size(); i++)
1562     {
1563         atomGroups.index[i]        = atomIndex;
1564         const int atomGroupGlobal  = atomGroupsState[i];
1565         globalAtomGroupIndices[i]  = atomGroupGlobal;
1566         atomIndex                 += gcgs_index[atomGroupGlobal + 1] - gcgs_index[atomGroupGlobal];
1567     }
1568     atomGroups.index[atomGroupsState.size()] = atomIndex;
1569
1570     dd->ncg_home = atomGroupsState.size();
1571     dd->comm->atomRanges.setEnd(DDAtomRanges::Type::Home, atomIndex);
1572
1573     set_zones_ncg_home(dd);
1574 }
1575
1576 static void dd_set_cginfo(gmx::ArrayRef<const int> index_gl, int cg0, int cg1,
1577                           t_forcerec *fr, char *bLocalCG)
1578 {
1579     cginfo_mb_t *cginfo_mb;
1580     int         *cginfo;
1581     int          cg;
1582
1583     if (fr != nullptr)
1584     {
1585         cginfo_mb = fr->cginfo_mb;
1586         cginfo    = fr->cginfo;
1587
1588         for (cg = cg0; cg < cg1; cg++)
1589         {
1590             cginfo[cg] = ddcginfo(cginfo_mb, index_gl[cg]);
1591         }
1592     }
1593
1594     if (bLocalCG != nullptr)
1595     {
1596         for (cg = cg0; cg < cg1; cg++)
1597         {
1598             bLocalCG[index_gl[cg]] = TRUE;
1599         }
1600     }
1601 }
1602
1603 static void make_dd_indices(gmx_domdec_t *dd,
1604                             const int *gcgs_index, int cg_start)
1605 {
1606     const int                 numZones               = dd->comm->zones.n;
1607     const int                *zone2cg                = dd->comm->zones.cg_range;
1608     const int                *zone_ncg1              = dd->comm->zone_ncg1;
1609     gmx::ArrayRef<const int>  globalAtomGroupIndices = dd->globalAtomGroupIndices;
1610     const gmx_bool            bCGs                   = dd->comm->bCGs;
1611
1612     std::vector<int>         &globalAtomIndices      = dd->globalAtomIndices;
1613
1614     if (zone2cg[1] != dd->ncg_home)
1615     {
1616         gmx_incons("dd->ncg_zone is not up to date");
1617     }
1618
1619     /* Make the local to global and global to local atom index */
1620     int a = dd->atomGroups().index[cg_start];
1621     globalAtomIndices.resize(a);
1622     for (int zone = 0; zone < numZones; zone++)
1623     {
1624         int cg0;
1625         if (zone == 0)
1626         {
1627             cg0 = cg_start;
1628         }
1629         else
1630         {
1631             cg0 = zone2cg[zone];
1632         }
1633         int cg1    = zone2cg[zone+1];
1634         int cg1_p1 = cg0 + zone_ncg1[zone];
1635
1636         for (int cg = cg0; cg < cg1; cg++)
1637         {
1638             int zone1 = zone;
1639             if (cg >= cg1_p1)
1640             {
1641                 /* Signal that this cg is from more than one pulse away */
1642                 zone1 += numZones;
1643             }
1644             int cg_gl = globalAtomGroupIndices[cg];
1645             if (bCGs)
1646             {
1647                 for (int a_gl = gcgs_index[cg_gl]; a_gl < gcgs_index[cg_gl+1]; a_gl++)
1648                 {
1649                     globalAtomIndices.push_back(a_gl);
1650                     ga2la_set(dd->ga2la, a_gl, a, zone1);
1651                     a++;
1652                 }
1653             }
1654             else
1655             {
1656                 globalAtomIndices.push_back(cg_gl);
1657                 ga2la_set(dd->ga2la, cg_gl, a, zone1);
1658                 a++;
1659             }
1660         }
1661     }
1662 }
1663
1664 static int check_bLocalCG(gmx_domdec_t *dd, int ncg_sys, const char *bLocalCG,
1665                           const char *where)
1666 {
1667     int nerr = 0;
1668     if (bLocalCG == nullptr)
1669     {
1670         return nerr;
1671     }
1672     for (size_t i = 0; i < dd->globalAtomGroupIndices.size(); i++)
1673     {
1674         if (!bLocalCG[dd->globalAtomGroupIndices[i]])
1675         {
1676             fprintf(stderr,
1677                     "DD rank %d, %s: atom group %zu, global atom group %d is not marked in bLocalCG (ncg_home %d)\n", dd->rank, where, i + 1, dd->globalAtomGroupIndices[i] + 1, dd->ncg_home);
1678             nerr++;
1679         }
1680     }
1681     size_t ngl = 0;
1682     for (int i = 0; i < ncg_sys; i++)
1683     {
1684         if (bLocalCG[i])
1685         {
1686             ngl++;
1687         }
1688     }
1689     if (ngl != dd->globalAtomGroupIndices.size())
1690     {
1691         fprintf(stderr, "DD rank %d, %s: In bLocalCG %zu atom groups are marked as local, whereas there are %zu\n", dd->rank, where, ngl, dd->globalAtomGroupIndices.size());
1692         nerr++;
1693     }
1694
1695     return nerr;
1696 }
1697
1698 static void check_index_consistency(gmx_domdec_t *dd,
1699                                     int natoms_sys, int ncg_sys,
1700                                     const char *where)
1701 {
1702     int       nerr = 0;
1703
1704     const int numAtomsInZones = dd->comm->atomRanges.end(DDAtomRanges::Type::Zones);
1705
1706     if (dd->comm->DD_debug > 1)
1707     {
1708         std::vector<int> have(natoms_sys);
1709         for (int a = 0; a < numAtomsInZones; a++)
1710         {
1711             int globalAtomIndex = dd->globalAtomIndices[a];
1712             if (have[globalAtomIndex] > 0)
1713             {
1714                 fprintf(stderr, "DD rank %d: global atom %d occurs twice: index %d and %d\n", dd->rank, globalAtomIndex + 1, have[globalAtomIndex], a+1);
1715             }
1716             else
1717             {
1718                 have[globalAtomIndex] = a + 1;
1719             }
1720         }
1721     }
1722
1723     std::vector<int> have(numAtomsInZones);
1724
1725     int              ngl = 0;
1726     for (int i = 0; i < natoms_sys; i++)
1727     {
1728         int a;
1729         int cell;
1730         if (ga2la_get(dd->ga2la, i, &a, &cell))
1731         {
1732             if (a >= numAtomsInZones)
1733             {
1734                 fprintf(stderr, "DD rank %d: global atom %d marked as local atom %d, which is larger than nat_tot (%d)\n", dd->rank, i+1, a+1, numAtomsInZones);
1735                 nerr++;
1736             }
1737             else
1738             {
1739                 have[a] = 1;
1740                 if (dd->globalAtomIndices[a] != i)
1741                 {
1742                     fprintf(stderr, "DD rank %d: global atom %d marked as local atom %d, which has global atom index %d\n", dd->rank, i+1, a+1, dd->globalAtomIndices[a]+1);
1743                     nerr++;
1744                 }
1745             }
1746             ngl++;
1747         }
1748     }
1749     if (ngl != numAtomsInZones)
1750     {
1751         fprintf(stderr,
1752                 "DD rank %d, %s: %d global atom indices, %d local atoms\n",
1753                 dd->rank, where, ngl, numAtomsInZones);
1754     }
1755     for (int a = 0; a < numAtomsInZones; a++)
1756     {
1757         if (have[a] == 0)
1758         {
1759             fprintf(stderr,
1760                     "DD rank %d, %s: local atom %d, global %d has no global index\n",
1761                     dd->rank, where, a + 1, dd->globalAtomIndices[a] + 1);
1762         }
1763     }
1764
1765     nerr += check_bLocalCG(dd, ncg_sys, dd->comm->bLocalCG, where);
1766
1767     if (nerr > 0)
1768     {
1769         gmx_fatal(FARGS, "DD rank %d, %s: %d atom(group) index inconsistencies",
1770                   dd->rank, where, nerr);
1771     }
1772 }
1773
1774 /* Clear all DD global state indices, starting from \p atomGroupStart and \p atomStart */
1775 static void clearDDStateIndices(gmx_domdec_t *dd,
1776                                 int           atomGroupStart,
1777                                 int           atomStart)
1778 {
1779     if (atomStart == 0)
1780     {
1781         /* Clear the whole list without searching */
1782         ga2la_clear(dd->ga2la);
1783     }
1784     else
1785     {
1786         const int numAtomsInZones = dd->comm->atomRanges.end(DDAtomRanges::Type::Zones);
1787         for (int i = 0; i < numAtomsInZones; i++)
1788         {
1789             ga2la_del(dd->ga2la, dd->globalAtomIndices[i]);
1790         }
1791     }
1792
1793     char *bLocalCG = dd->comm->bLocalCG;
1794     if (bLocalCG)
1795     {
1796         for (size_t atomGroup = atomGroupStart; atomGroup < dd->globalAtomGroupIndices.size(); atomGroup++)
1797         {
1798             bLocalCG[dd->globalAtomGroupIndices[atomGroup]] = FALSE;
1799         }
1800     }
1801
1802     dd_clear_local_vsite_indices(dd);
1803
1804     if (dd->constraints)
1805     {
1806         dd_clear_local_constraint_indices(dd);
1807     }
1808 }
1809
1810 static gmx_bool check_grid_jump(gmx_int64_t     step,
1811                                 gmx_domdec_t   *dd,
1812                                 real            cutoff,
1813                                 gmx_ddbox_t    *ddbox,
1814                                 gmx_bool        bFatal)
1815 {
1816     gmx_domdec_comm_t *comm;
1817     int                d, dim;
1818     real               limit, bfac;
1819     gmx_bool           bInvalid;
1820
1821     bInvalid = FALSE;
1822
1823     comm = dd->comm;
1824
1825     for (d = 1; d < dd->ndim; d++)
1826     {
1827         dim   = dd->dim[d];
1828         limit = grid_jump_limit(comm, cutoff, d);
1829         bfac  = ddbox->box_size[dim];
1830         if (ddbox->tric_dir[dim])
1831         {
1832             bfac *= ddbox->skew_fac[dim];
1833         }
1834         if ((comm->cell_f1[d] - comm->cell_f_max0[d])*bfac <  limit ||
1835                                                               (comm->cell_f0[d] - comm->cell_f_min1[d])*bfac > -limit)
1836         {
1837             bInvalid = TRUE;
1838
1839             if (bFatal)
1840             {
1841                 char buf[22];
1842
1843                 /* This error should never be triggered under normal
1844                  * circumstances, but you never know ...
1845                  */
1846                 gmx_fatal(FARGS, "step %s: The domain decomposition grid has shifted too much in the %c-direction around cell %d %d %d. This should not have happened. Running with fewer ranks might avoid this issue.",
1847                           gmx_step_str(step, buf),
1848                           dim2char(dim), dd->ci[XX], dd->ci[YY], dd->ci[ZZ]);
1849             }
1850         }
1851     }
1852
1853     return bInvalid;
1854 }
1855
1856 static float dd_force_load(gmx_domdec_comm_t *comm)
1857 {
1858     float load;
1859
1860     if (comm->eFlop)
1861     {
1862         load = comm->flop;
1863         if (comm->eFlop > 1)
1864         {
1865             load *= 1.0 + (comm->eFlop - 1)*(0.1*rand()/RAND_MAX - 0.05);
1866         }
1867     }
1868     else
1869     {
1870         load = comm->cycl[ddCyclF];
1871         if (comm->cycl_n[ddCyclF] > 1)
1872         {
1873             /* Subtract the maximum of the last n cycle counts
1874              * to get rid of possible high counts due to other sources,
1875              * for instance system activity, that would otherwise
1876              * affect the dynamic load balancing.
1877              */
1878             load -= comm->cycl_max[ddCyclF];
1879         }
1880
1881 #if GMX_MPI
1882         if (comm->cycl_n[ddCyclWaitGPU] && comm->nrank_gpu_shared > 1)
1883         {
1884             float gpu_wait, gpu_wait_sum;
1885
1886             gpu_wait = comm->cycl[ddCyclWaitGPU];
1887             if (comm->cycl_n[ddCyclF] > 1)
1888             {
1889                 /* We should remove the WaitGPU time of the same MD step
1890                  * as the one with the maximum F time, since the F time
1891                  * and the wait time are not independent.
1892                  * Furthermore, the step for the max F time should be chosen
1893                  * the same on all ranks that share the same GPU.
1894                  * But to keep the code simple, we remove the average instead.
1895                  * The main reason for artificially long times at some steps
1896                  * is spurious CPU activity or MPI time, so we don't expect
1897                  * that changes in the GPU wait time matter a lot here.
1898                  */
1899                 gpu_wait *= (comm->cycl_n[ddCyclF] - 1)/(float)comm->cycl_n[ddCyclF];
1900             }
1901             /* Sum the wait times over the ranks that share the same GPU */
1902             MPI_Allreduce(&gpu_wait, &gpu_wait_sum, 1, MPI_FLOAT, MPI_SUM,
1903                           comm->mpi_comm_gpu_shared);
1904             /* Replace the wait time by the average over the ranks */
1905             load += -gpu_wait + gpu_wait_sum/comm->nrank_gpu_shared;
1906         }
1907 #endif
1908     }
1909
1910     return load;
1911 }
1912
1913 static void set_slb_pme_dim_f(gmx_domdec_t *dd, int dim, real **dim_f)
1914 {
1915     gmx_domdec_comm_t *comm;
1916     int                i;
1917
1918     comm = dd->comm;
1919
1920     snew(*dim_f, dd->nc[dim]+1);
1921     (*dim_f)[0] = 0;
1922     for (i = 1; i < dd->nc[dim]; i++)
1923     {
1924         if (comm->slb_frac[dim])
1925         {
1926             (*dim_f)[i] = (*dim_f)[i-1] + comm->slb_frac[dim][i-1];
1927         }
1928         else
1929         {
1930             (*dim_f)[i] = (real)i/(real)dd->nc[dim];
1931         }
1932     }
1933     (*dim_f)[dd->nc[dim]] = 1;
1934 }
1935
1936 static void init_ddpme(gmx_domdec_t *dd, gmx_ddpme_t *ddpme, int dimind)
1937 {
1938     int  pmeindex, slab, nso, i;
1939     ivec xyz;
1940
1941     if (dimind == 0 && dd->dim[0] == YY && dd->comm->npmenodes_x == 1)
1942     {
1943         ddpme->dim = YY;
1944     }
1945     else
1946     {
1947         ddpme->dim = dimind;
1948     }
1949     ddpme->dim_match = (ddpme->dim == dd->dim[dimind]);
1950
1951     ddpme->nslab = (ddpme->dim == 0 ?
1952                     dd->comm->npmenodes_x :
1953                     dd->comm->npmenodes_y);
1954
1955     if (ddpme->nslab <= 1)
1956     {
1957         return;
1958     }
1959
1960     nso = dd->comm->npmenodes/ddpme->nslab;
1961     /* Determine for each PME slab the PP location range for dimension dim */
1962     snew(ddpme->pp_min, ddpme->nslab);
1963     snew(ddpme->pp_max, ddpme->nslab);
1964     for (slab = 0; slab < ddpme->nslab; slab++)
1965     {
1966         ddpme->pp_min[slab] = dd->nc[dd->dim[dimind]] - 1;
1967         ddpme->pp_max[slab] = 0;
1968     }
1969     for (i = 0; i < dd->nnodes; i++)
1970     {
1971         ddindex2xyz(dd->nc, i, xyz);
1972         /* For y only use our y/z slab.
1973          * This assumes that the PME x grid size matches the DD grid size.
1974          */
1975         if (dimind == 0 || xyz[XX] == dd->ci[XX])
1976         {
1977             pmeindex = ddindex2pmeindex(dd, i);
1978             if (dimind == 0)
1979             {
1980                 slab = pmeindex/nso;
1981             }
1982             else
1983             {
1984                 slab = pmeindex % ddpme->nslab;
1985             }
1986             ddpme->pp_min[slab] = std::min(ddpme->pp_min[slab], xyz[dimind]);
1987             ddpme->pp_max[slab] = std::max(ddpme->pp_max[slab], xyz[dimind]);
1988         }
1989     }
1990
1991     set_slb_pme_dim_f(dd, ddpme->dim, &ddpme->slb_dim_f);
1992 }
1993
1994 int dd_pme_maxshift_x(const gmx_domdec_t *dd)
1995 {
1996     if (dd->comm->ddpme[0].dim == XX)
1997     {
1998         return dd->comm->ddpme[0].maxshift;
1999     }
2000     else
2001     {
2002         return 0;
2003     }
2004 }
2005
2006 int dd_pme_maxshift_y(const gmx_domdec_t *dd)
2007 {
2008     if (dd->comm->ddpme[0].dim == YY)
2009     {
2010         return dd->comm->ddpme[0].maxshift;
2011     }
2012     else if (dd->comm->npmedecompdim >= 2 && dd->comm->ddpme[1].dim == YY)
2013     {
2014         return dd->comm->ddpme[1].maxshift;
2015     }
2016     else
2017     {
2018         return 0;
2019     }
2020 }
2021
2022 static void comm_dd_ns_cell_sizes(gmx_domdec_t *dd,
2023                                   gmx_ddbox_t *ddbox,
2024                                   rvec cell_ns_x0, rvec cell_ns_x1,
2025                                   gmx_int64_t step)
2026 {
2027     gmx_domdec_comm_t *comm;
2028     int                dim_ind, dim;
2029
2030     comm = dd->comm;
2031
2032     for (dim_ind = 0; dim_ind < dd->ndim; dim_ind++)
2033     {
2034         dim = dd->dim[dim_ind];
2035
2036         /* Without PBC we don't have restrictions on the outer cells */
2037         if (!(dim >= ddbox->npbcdim &&
2038               (dd->ci[dim] == 0 || dd->ci[dim] == dd->nc[dim] - 1)) &&
2039             isDlbOn(comm) &&
2040             (comm->cell_x1[dim] - comm->cell_x0[dim])*ddbox->skew_fac[dim] <
2041             comm->cellsize_min[dim])
2042         {
2043             char buf[22];
2044             gmx_fatal(FARGS, "step %s: The %c-size (%f) times the triclinic skew factor (%f) is smaller than the smallest allowed cell size (%f) for domain decomposition grid cell %d %d %d",
2045                       gmx_step_str(step, buf), dim2char(dim),
2046                       comm->cell_x1[dim] - comm->cell_x0[dim],
2047                       ddbox->skew_fac[dim],
2048                       dd->comm->cellsize_min[dim],
2049                       dd->ci[XX], dd->ci[YY], dd->ci[ZZ]);
2050         }
2051     }
2052
2053     if ((isDlbOn(dd->comm) && dd->ndim > 1) || ddbox->nboundeddim < DIM)
2054     {
2055         /* Communicate the boundaries and update cell_ns_x0/1 */
2056         dd_move_cellx(dd, ddbox, cell_ns_x0, cell_ns_x1);
2057         if (isDlbOn(dd->comm) && dd->ndim > 1)
2058         {
2059             check_grid_jump(step, dd, dd->comm->cutoff, ddbox, TRUE);
2060         }
2061     }
2062 }
2063
2064 void dd_cycles_add(const gmx_domdec_t *dd, float cycles, int ddCycl)
2065 {
2066     /* Note that the cycles value can be incorrect, either 0 or some
2067      * extremely large value, when our thread migrated to another core
2068      * with an unsynchronized cycle counter. If this happens less often
2069      * that once per nstlist steps, this will not cause issues, since
2070      * we later subtract the maximum value from the sum over nstlist steps.
2071      * A zero count will slightly lower the total, but that's a small effect.
2072      * Note that the main purpose of the subtraction of the maximum value
2073      * is to avoid throwing off the load balancing when stalls occur due
2074      * e.g. system activity or network congestion.
2075      */
2076     dd->comm->cycl[ddCycl] += cycles;
2077     dd->comm->cycl_n[ddCycl]++;
2078     if (cycles > dd->comm->cycl_max[ddCycl])
2079     {
2080         dd->comm->cycl_max[ddCycl] = cycles;
2081     }
2082 }
2083
2084 static double force_flop_count(t_nrnb *nrnb)
2085 {
2086     int         i;
2087     double      sum;
2088     const char *name;
2089
2090     sum = 0;
2091     for (i = 0; i < eNR_NBKERNEL_FREE_ENERGY; i++)
2092     {
2093         /* To get closer to the real timings, we half the count
2094          * for the normal loops and again half it for water loops.
2095          */
2096         name = nrnb_str(i);
2097         if (strstr(name, "W3") != nullptr || strstr(name, "W4") != nullptr)
2098         {
2099             sum += nrnb->n[i]*0.25*cost_nrnb(i);
2100         }
2101         else
2102         {
2103             sum += nrnb->n[i]*0.50*cost_nrnb(i);
2104         }
2105     }
2106     for (i = eNR_NBKERNEL_FREE_ENERGY; i <= eNR_NB14; i++)
2107     {
2108         name = nrnb_str(i);
2109         if (strstr(name, "W3") != nullptr || strstr(name, "W4") != nullptr)
2110         {
2111             sum += nrnb->n[i]*cost_nrnb(i);
2112         }
2113     }
2114     for (i = eNR_BONDS; i <= eNR_WALLS; i++)
2115     {
2116         sum += nrnb->n[i]*cost_nrnb(i);
2117     }
2118
2119     return sum;
2120 }
2121
2122 void dd_force_flop_start(gmx_domdec_t *dd, t_nrnb *nrnb)
2123 {
2124     if (dd->comm->eFlop)
2125     {
2126         dd->comm->flop -= force_flop_count(nrnb);
2127     }
2128 }
2129 void dd_force_flop_stop(gmx_domdec_t *dd, t_nrnb *nrnb)
2130 {
2131     if (dd->comm->eFlop)
2132     {
2133         dd->comm->flop += force_flop_count(nrnb);
2134         dd->comm->flop_n++;
2135     }
2136 }
2137
2138 static void clear_dd_cycle_counts(gmx_domdec_t *dd)
2139 {
2140     int i;
2141
2142     for (i = 0; i < ddCyclNr; i++)
2143     {
2144         dd->comm->cycl[i]     = 0;
2145         dd->comm->cycl_n[i]   = 0;
2146         dd->comm->cycl_max[i] = 0;
2147     }
2148     dd->comm->flop   = 0;
2149     dd->comm->flop_n = 0;
2150 }
2151
2152 static void get_load_distribution(gmx_domdec_t *dd, gmx_wallcycle_t wcycle)
2153 {
2154     gmx_domdec_comm_t *comm;
2155     domdec_load_t     *load;
2156     domdec_root_t     *root = nullptr;
2157     int                d, dim, i, pos;
2158     float              cell_frac = 0, sbuf[DD_NLOAD_MAX];
2159     gmx_bool           bSepPME;
2160
2161     if (debug)
2162     {
2163         fprintf(debug, "get_load_distribution start\n");
2164     }
2165
2166     wallcycle_start(wcycle, ewcDDCOMMLOAD);
2167
2168     comm = dd->comm;
2169
2170     bSepPME = (dd->pme_nodeid >= 0);
2171
2172     if (dd->ndim == 0 && bSepPME)
2173     {
2174         /* Without decomposition, but with PME nodes, we need the load */
2175         comm->load[0].mdf = comm->cycl[ddCyclPPduringPME];
2176         comm->load[0].pme = comm->cycl[ddCyclPME];
2177     }
2178
2179     for (d = dd->ndim-1; d >= 0; d--)
2180     {
2181         dim = dd->dim[d];
2182         /* Check if we participate in the communication in this dimension */
2183         if (d == dd->ndim-1 ||
2184             (dd->ci[dd->dim[d+1]] == 0 && dd->ci[dd->dim[dd->ndim-1]] == 0))
2185         {
2186             load = &comm->load[d];
2187             if (isDlbOn(dd->comm))
2188             {
2189                 cell_frac = comm->cell_f1[d] - comm->cell_f0[d];
2190             }
2191             pos = 0;
2192             if (d == dd->ndim-1)
2193             {
2194                 sbuf[pos++] = dd_force_load(comm);
2195                 sbuf[pos++] = sbuf[0];
2196                 if (isDlbOn(dd->comm))
2197                 {
2198                     sbuf[pos++] = sbuf[0];
2199                     sbuf[pos++] = cell_frac;
2200                     if (d > 0)
2201                     {
2202                         sbuf[pos++] = comm->cell_f_max0[d];
2203                         sbuf[pos++] = comm->cell_f_min1[d];
2204                     }
2205                 }
2206                 if (bSepPME)
2207                 {
2208                     sbuf[pos++] = comm->cycl[ddCyclPPduringPME];
2209                     sbuf[pos++] = comm->cycl[ddCyclPME];
2210                 }
2211             }
2212             else
2213             {
2214                 sbuf[pos++] = comm->load[d+1].sum;
2215                 sbuf[pos++] = comm->load[d+1].max;
2216                 if (isDlbOn(dd->comm))
2217                 {
2218                     sbuf[pos++] = comm->load[d+1].sum_m;
2219                     sbuf[pos++] = comm->load[d+1].cvol_min*cell_frac;
2220                     sbuf[pos++] = comm->load[d+1].flags;
2221                     if (d > 0)
2222                     {
2223                         sbuf[pos++] = comm->cell_f_max0[d];
2224                         sbuf[pos++] = comm->cell_f_min1[d];
2225                     }
2226                 }
2227                 if (bSepPME)
2228                 {
2229                     sbuf[pos++] = comm->load[d+1].mdf;
2230                     sbuf[pos++] = comm->load[d+1].pme;
2231                 }
2232             }
2233             load->nload = pos;
2234             /* Communicate a row in DD direction d.
2235              * The communicators are setup such that the root always has rank 0.
2236              */
2237 #if GMX_MPI
2238             MPI_Gather(sbuf, load->nload*sizeof(float), MPI_BYTE,
2239                        load->load, load->nload*sizeof(float), MPI_BYTE,
2240                        0, comm->mpi_comm_load[d]);
2241 #endif
2242             if (dd->ci[dim] == dd->master_ci[dim])
2243             {
2244                 /* We are the root, process this row */
2245                 if (isDlbOn(comm))
2246                 {
2247                     root = comm->root[d];
2248                 }
2249                 load->sum      = 0;
2250                 load->max      = 0;
2251                 load->sum_m    = 0;
2252                 load->cvol_min = 1;
2253                 load->flags    = 0;
2254                 load->mdf      = 0;
2255                 load->pme      = 0;
2256                 pos            = 0;
2257                 for (i = 0; i < dd->nc[dim]; i++)
2258                 {
2259                     load->sum += load->load[pos++];
2260                     load->max  = std::max(load->max, load->load[pos]);
2261                     pos++;
2262                     if (isDlbOn(dd->comm))
2263                     {
2264                         if (root->bLimited)
2265                         {
2266                             /* This direction could not be load balanced properly,
2267                              * therefore we need to use the maximum iso the average load.
2268                              */
2269                             load->sum_m = std::max(load->sum_m, load->load[pos]);
2270                         }
2271                         else
2272                         {
2273                             load->sum_m += load->load[pos];
2274                         }
2275                         pos++;
2276                         load->cvol_min = std::min(load->cvol_min, load->load[pos]);
2277                         pos++;
2278                         if (d < dd->ndim-1)
2279                         {
2280                             load->flags = (int)(load->load[pos++] + 0.5);
2281                         }
2282                         if (d > 0)
2283                         {
2284                             root->cell_f_max0[i] = load->load[pos++];
2285                             root->cell_f_min1[i] = load->load[pos++];
2286                         }
2287                     }
2288                     if (bSepPME)
2289                     {
2290                         load->mdf = std::max(load->mdf, load->load[pos]);
2291                         pos++;
2292                         load->pme = std::max(load->pme, load->load[pos]);
2293                         pos++;
2294                     }
2295                 }
2296                 if (isDlbOn(comm) && root->bLimited)
2297                 {
2298                     load->sum_m *= dd->nc[dim];
2299                     load->flags |= (1<<d);
2300                 }
2301             }
2302         }
2303     }
2304
2305     if (DDMASTER(dd))
2306     {
2307         comm->nload      += dd_load_count(comm);
2308         comm->load_step  += comm->cycl[ddCyclStep];
2309         comm->load_sum   += comm->load[0].sum;
2310         comm->load_max   += comm->load[0].max;
2311         if (isDlbOn(comm))
2312         {
2313             for (d = 0; d < dd->ndim; d++)
2314             {
2315                 if (comm->load[0].flags & (1<<d))
2316                 {
2317                     comm->load_lim[d]++;
2318                 }
2319             }
2320         }
2321         if (bSepPME)
2322         {
2323             comm->load_mdf += comm->load[0].mdf;
2324             comm->load_pme += comm->load[0].pme;
2325         }
2326     }
2327
2328     wallcycle_stop(wcycle, ewcDDCOMMLOAD);
2329
2330     if (debug)
2331     {
2332         fprintf(debug, "get_load_distribution finished\n");
2333     }
2334 }
2335
2336 static float dd_force_load_fraction(gmx_domdec_t *dd)
2337 {
2338     /* Return the relative performance loss on the total run time
2339      * due to the force calculation load imbalance.
2340      */
2341     if (dd->comm->nload > 0 && dd->comm->load_step > 0)
2342     {
2343         return dd->comm->load_sum/(dd->comm->load_step*dd->nnodes);
2344     }
2345     else
2346     {
2347         return 0;
2348     }
2349 }
2350
2351 static float dd_force_imb_perf_loss(gmx_domdec_t *dd)
2352 {
2353     /* Return the relative performance loss on the total run time
2354      * due to the force calculation load imbalance.
2355      */
2356     if (dd->comm->nload > 0 && dd->comm->load_step > 0)
2357     {
2358         return
2359             (dd->comm->load_max*dd->nnodes - dd->comm->load_sum)/
2360             (dd->comm->load_step*dd->nnodes);
2361     }
2362     else
2363     {
2364         return 0;
2365     }
2366 }
2367
2368 static void print_dd_load_av(FILE *fplog, gmx_domdec_t *dd)
2369 {
2370     gmx_domdec_comm_t *comm = dd->comm;
2371
2372     /* Only the master rank prints loads and only if we measured loads */
2373     if (!DDMASTER(dd) || comm->nload == 0)
2374     {
2375         return;
2376     }
2377
2378     char  buf[STRLEN];
2379     int   numPpRanks   = dd->nnodes;
2380     int   numPmeRanks  = (dd->pme_nodeid >= 0) ? comm->npmenodes : 0;
2381     int   numRanks     = numPpRanks + numPmeRanks;
2382     float lossFraction = 0;
2383
2384     /* Print the average load imbalance and performance loss */
2385     if (dd->nnodes > 1 && comm->load_sum > 0)
2386     {
2387         float imbalance = comm->load_max*numPpRanks/comm->load_sum - 1;
2388         lossFraction    = dd_force_imb_perf_loss(dd);
2389
2390         std::string msg         = "\n Dynamic load balancing report:\n";
2391         std::string dlbStateStr;
2392
2393         switch (dd->comm->dlbState)
2394         {
2395             case edlbsOffUser:
2396                 dlbStateStr = "DLB was off during the run per user request.";
2397                 break;
2398             case edlbsOffForever:
2399                 /* Currectly this can happen due to performance loss observed, cell size
2400                  * limitations or incompatibility with other settings observed during
2401                  * determineInitialDlbState(). */
2402                 dlbStateStr = "DLB got disabled because it was unsuitable to use.";
2403                 break;
2404             case edlbsOffCanTurnOn:
2405                 dlbStateStr = "DLB was off during the run due to low measured imbalance.";
2406                 break;
2407             case edlbsOffTemporarilyLocked:
2408                 dlbStateStr = "DLB was locked at the end of the run due to unfinished PP-PME balancing.";
2409                 break;
2410             case edlbsOnCanTurnOff:
2411                 dlbStateStr = "DLB was turned on during the run due to measured imbalance.";
2412                 break;
2413             case edlbsOnUser:
2414                 dlbStateStr = "DLB was permanently on during the run per user request.";
2415                 break;
2416             default:
2417                 GMX_ASSERT(false, "Undocumented DLB state");
2418         }
2419
2420         msg += " " + dlbStateStr + "\n";
2421         msg += gmx::formatString(" Average load imbalance: %.1f%%.\n", imbalance*100);
2422         msg += gmx::formatString(" The balanceable part of the MD step is %d%%, load imbalance is computed from this.\n",
2423                                  static_cast<int>(dd_force_load_fraction(dd)*100 + 0.5));
2424         msg += gmx::formatString(" Part of the total run time spent waiting due to load imbalance: %.1f%%.\n",
2425                                  lossFraction*100);
2426         fprintf(fplog, "%s", msg.c_str());
2427         fprintf(stderr, "%s", msg.c_str());
2428     }
2429
2430     /* Print during what percentage of steps the  load balancing was limited */
2431     bool dlbWasLimited = false;
2432     if (isDlbOn(comm))
2433     {
2434         sprintf(buf, " Steps where the load balancing was limited by -rdd, -rcon and/or -dds:");
2435         for (int d = 0; d < dd->ndim; d++)
2436         {
2437             int limitPercentage = (200*comm->load_lim[d] + 1)/(2*comm->nload);
2438             sprintf(buf+strlen(buf), " %c %d %%",
2439                     dim2char(dd->dim[d]), limitPercentage);
2440             if (limitPercentage >= 50)
2441             {
2442                 dlbWasLimited = true;
2443             }
2444         }
2445         sprintf(buf + strlen(buf), "\n");
2446         fprintf(fplog, "%s", buf);
2447         fprintf(stderr, "%s", buf);
2448     }
2449
2450     /* Print the performance loss due to separate PME - PP rank imbalance */
2451     float lossFractionPme = 0;
2452     if (numPmeRanks > 0 && comm->load_mdf > 0 && comm->load_step > 0)
2453     {
2454         float pmeForceRatio = comm->load_pme/comm->load_mdf;
2455         lossFractionPme     = (comm->load_pme - comm->load_mdf)/comm->load_step;
2456         if (lossFractionPme <= 0)
2457         {
2458             lossFractionPme *= numPmeRanks/static_cast<float>(numRanks);
2459         }
2460         else
2461         {
2462             lossFractionPme *= numPpRanks/static_cast<float>(numRanks);
2463         }
2464         sprintf(buf, " Average PME mesh/force load: %5.3f\n", pmeForceRatio);
2465         fprintf(fplog, "%s", buf);
2466         fprintf(stderr, "%s", buf);
2467         sprintf(buf, " Part of the total run time spent waiting due to PP/PME imbalance: %.1f %%\n", fabs(lossFractionPme)*100);
2468         fprintf(fplog, "%s", buf);
2469         fprintf(stderr, "%s", buf);
2470     }
2471     fprintf(fplog, "\n");
2472     fprintf(stderr, "\n");
2473
2474     if (lossFraction >= DD_PERF_LOSS_WARN)
2475     {
2476         sprintf(buf,
2477                 "NOTE: %.1f %% of the available CPU time was lost due to load imbalance\n"
2478                 "      in the domain decomposition.\n", lossFraction*100);
2479         if (!isDlbOn(comm))
2480         {
2481             sprintf(buf+strlen(buf), "      You might want to use dynamic load balancing (option -dlb.)\n");
2482         }
2483         else if (dlbWasLimited)
2484         {
2485             sprintf(buf+strlen(buf), "      You might want to decrease the cell size limit (options -rdd, -rcon and/or -dds).\n");
2486         }
2487         fprintf(fplog, "%s\n", buf);
2488         fprintf(stderr, "%s\n", buf);
2489     }
2490     if (numPmeRanks > 0 && fabs(lossFractionPme) >= DD_PERF_LOSS_WARN)
2491     {
2492         sprintf(buf,
2493                 "NOTE: %.1f %% performance was lost because the PME ranks\n"
2494                 "      had %s work to do than the PP ranks.\n"
2495                 "      You might want to %s the number of PME ranks\n"
2496                 "      or %s the cut-off and the grid spacing.\n",
2497                 fabs(lossFractionPme*100),
2498                 (lossFractionPme < 0) ? "less"     : "more",
2499                 (lossFractionPme < 0) ? "decrease" : "increase",
2500                 (lossFractionPme < 0) ? "decrease" : "increase");
2501         fprintf(fplog, "%s\n", buf);
2502         fprintf(stderr, "%s\n", buf);
2503     }
2504 }
2505
2506 static float dd_vol_min(gmx_domdec_t *dd)
2507 {
2508     return dd->comm->load[0].cvol_min*dd->nnodes;
2509 }
2510
2511 static gmx_bool dd_load_flags(gmx_domdec_t *dd)
2512 {
2513     return dd->comm->load[0].flags;
2514 }
2515
2516 static float dd_f_imbal(gmx_domdec_t *dd)
2517 {
2518     if (dd->comm->load[0].sum > 0)
2519     {
2520         return dd->comm->load[0].max*dd->nnodes/dd->comm->load[0].sum - 1.0f;
2521     }
2522     else
2523     {
2524         /* Something is wrong in the cycle counting, report no load imbalance */
2525         return 0.0f;
2526     }
2527 }
2528
2529 float dd_pme_f_ratio(gmx_domdec_t *dd)
2530 {
2531     /* Should only be called on the DD master rank */
2532     assert(DDMASTER(dd));
2533
2534     if (dd->comm->load[0].mdf > 0 && dd->comm->cycl_n[ddCyclPME] > 0)
2535     {
2536         return dd->comm->load[0].pme/dd->comm->load[0].mdf;
2537     }
2538     else
2539     {
2540         return -1.0;
2541     }
2542 }
2543
2544 static void dd_print_load(FILE *fplog, gmx_domdec_t *dd, gmx_int64_t step)
2545 {
2546     int  flags, d;
2547     char buf[22];
2548
2549     flags = dd_load_flags(dd);
2550     if (flags)
2551     {
2552         fprintf(fplog,
2553                 "DD  load balancing is limited by minimum cell size in dimension");
2554         for (d = 0; d < dd->ndim; d++)
2555         {
2556             if (flags & (1<<d))
2557             {
2558                 fprintf(fplog, " %c", dim2char(dd->dim[d]));
2559             }
2560         }
2561         fprintf(fplog, "\n");
2562     }
2563     fprintf(fplog, "DD  step %s", gmx_step_str(step, buf));
2564     if (isDlbOn(dd->comm))
2565     {
2566         fprintf(fplog, "  vol min/aver %5.3f%c",
2567                 dd_vol_min(dd), flags ? '!' : ' ');
2568     }
2569     if (dd->nnodes > 1)
2570     {
2571         fprintf(fplog, " load imb.: force %4.1f%%", dd_f_imbal(dd)*100);
2572     }
2573     if (dd->comm->cycl_n[ddCyclPME])
2574     {
2575         fprintf(fplog, "  pme mesh/force %5.3f", dd_pme_f_ratio(dd));
2576     }
2577     fprintf(fplog, "\n\n");
2578 }
2579
2580 static void dd_print_load_verbose(gmx_domdec_t *dd)
2581 {
2582     if (isDlbOn(dd->comm))
2583     {
2584         fprintf(stderr, "vol %4.2f%c ",
2585                 dd_vol_min(dd), dd_load_flags(dd) ? '!' : ' ');
2586     }
2587     if (dd->nnodes > 1)
2588     {
2589         fprintf(stderr, "imb F %2d%% ", (int)(dd_f_imbal(dd)*100+0.5));
2590     }
2591     if (dd->comm->cycl_n[ddCyclPME])
2592     {
2593         fprintf(stderr, "pme/F %4.2f ", dd_pme_f_ratio(dd));
2594     }
2595 }
2596
2597 #if GMX_MPI
2598 static void make_load_communicator(gmx_domdec_t *dd, int dim_ind, ivec loc)
2599 {
2600     MPI_Comm           c_row;
2601     int                dim, i, rank;
2602     ivec               loc_c;
2603     domdec_root_t     *root;
2604     gmx_bool           bPartOfGroup = FALSE;
2605
2606     dim = dd->dim[dim_ind];
2607     copy_ivec(loc, loc_c);
2608     for (i = 0; i < dd->nc[dim]; i++)
2609     {
2610         loc_c[dim] = i;
2611         rank       = dd_index(dd->nc, loc_c);
2612         if (rank == dd->rank)
2613         {
2614             /* This process is part of the group */
2615             bPartOfGroup = TRUE;
2616         }
2617     }
2618     MPI_Comm_split(dd->mpi_comm_all, bPartOfGroup ? 0 : MPI_UNDEFINED, dd->rank,
2619                    &c_row);
2620     if (bPartOfGroup)
2621     {
2622         dd->comm->mpi_comm_load[dim_ind] = c_row;
2623         if (!isDlbDisabled(dd->comm))
2624         {
2625             if (dd->ci[dim] == dd->master_ci[dim])
2626             {
2627                 /* This is the root process of this row */
2628                 snew(dd->comm->root[dim_ind], 1);
2629                 root = dd->comm->root[dim_ind];
2630                 snew(root->cell_f, ddCellFractionBufferSize(dd, dim_ind));
2631                 snew(root->old_cell_f, dd->nc[dim]+1);
2632                 snew(root->bCellMin, dd->nc[dim]);
2633                 if (dim_ind > 0)
2634                 {
2635                     snew(root->cell_f_max0, dd->nc[dim]);
2636                     snew(root->cell_f_min1, dd->nc[dim]);
2637                     snew(root->bound_min, dd->nc[dim]);
2638                     snew(root->bound_max, dd->nc[dim]);
2639                 }
2640                 snew(root->buf_ncd, dd->nc[dim]);
2641             }
2642             else
2643             {
2644                 /* This is not a root process, we only need to receive cell_f */
2645                 snew(dd->comm->cell_f_row, ddCellFractionBufferSize(dd, dim_ind));
2646             }
2647         }
2648         if (dd->ci[dim] == dd->master_ci[dim])
2649         {
2650             snew(dd->comm->load[dim_ind].load, dd->nc[dim]*DD_NLOAD_MAX);
2651         }
2652     }
2653 }
2654 #endif
2655
2656 void dd_setup_dlb_resource_sharing(t_commrec            *cr,
2657                                    int                   gpu_id)
2658 {
2659 #if GMX_MPI
2660     int           physicalnode_id_hash;
2661     gmx_domdec_t *dd;
2662     MPI_Comm      mpi_comm_pp_physicalnode;
2663
2664     if (!thisRankHasDuty(cr, DUTY_PP) || gpu_id < 0)
2665     {
2666         /* Only ranks with short-ranged tasks (currently) use GPUs.
2667          * If we don't have GPUs assigned, there are no resources to share.
2668          */
2669         return;
2670     }
2671
2672     physicalnode_id_hash = gmx_physicalnode_id_hash();
2673
2674     dd = cr->dd;
2675
2676     if (debug)
2677     {
2678         fprintf(debug, "dd_setup_dd_dlb_gpu_sharing:\n");
2679         fprintf(debug, "DD PP rank %d physical node hash %d gpu_id %d\n",
2680                 dd->rank, physicalnode_id_hash, gpu_id);
2681     }
2682     /* Split the PP communicator over the physical nodes */
2683     /* TODO: See if we should store this (before), as it's also used for
2684      * for the nodecomm summation.
2685      */
2686     // TODO PhysicalNodeCommunicator could be extended/used to handle
2687     // the need for per-node per-group communicators.
2688     MPI_Comm_split(dd->mpi_comm_all, physicalnode_id_hash, dd->rank,
2689                    &mpi_comm_pp_physicalnode);
2690     MPI_Comm_split(mpi_comm_pp_physicalnode, gpu_id, dd->rank,
2691                    &dd->comm->mpi_comm_gpu_shared);
2692     MPI_Comm_free(&mpi_comm_pp_physicalnode);
2693     MPI_Comm_size(dd->comm->mpi_comm_gpu_shared, &dd->comm->nrank_gpu_shared);
2694
2695     if (debug)
2696     {
2697         fprintf(debug, "nrank_gpu_shared %d\n", dd->comm->nrank_gpu_shared);
2698     }
2699
2700     /* Note that some ranks could share a GPU, while others don't */
2701
2702     if (dd->comm->nrank_gpu_shared == 1)
2703     {
2704         MPI_Comm_free(&dd->comm->mpi_comm_gpu_shared);
2705     }
2706 #else
2707     GMX_UNUSED_VALUE(cr);
2708     GMX_UNUSED_VALUE(gpu_id);
2709 #endif
2710 }
2711
2712 static void make_load_communicators(gmx_domdec_t gmx_unused *dd)
2713 {
2714 #if GMX_MPI
2715     int  dim0, dim1, i, j;
2716     ivec loc;
2717
2718     if (debug)
2719     {
2720         fprintf(debug, "Making load communicators\n");
2721     }
2722
2723     snew(dd->comm->load,          std::max(dd->ndim, 1));
2724     snew(dd->comm->mpi_comm_load, std::max(dd->ndim, 1));
2725
2726     if (dd->ndim == 0)
2727     {
2728         return;
2729     }
2730
2731     clear_ivec(loc);
2732     make_load_communicator(dd, 0, loc);
2733     if (dd->ndim > 1)
2734     {
2735         dim0 = dd->dim[0];
2736         for (i = 0; i < dd->nc[dim0]; i++)
2737         {
2738             loc[dim0] = i;
2739             make_load_communicator(dd, 1, loc);
2740         }
2741     }
2742     if (dd->ndim > 2)
2743     {
2744         dim0 = dd->dim[0];
2745         for (i = 0; i < dd->nc[dim0]; i++)
2746         {
2747             loc[dim0] = i;
2748             dim1      = dd->dim[1];
2749             for (j = 0; j < dd->nc[dim1]; j++)
2750             {
2751                 loc[dim1] = j;
2752                 make_load_communicator(dd, 2, loc);
2753             }
2754         }
2755     }
2756
2757     if (debug)
2758     {
2759         fprintf(debug, "Finished making load communicators\n");
2760     }
2761 #endif
2762 }
2763
2764 /*! \brief Sets up the relation between neighboring domains and zones */
2765 static void setup_neighbor_relations(gmx_domdec_t *dd)
2766 {
2767     int                     d, dim, i, j, m;
2768     ivec                    tmp, s;
2769     gmx_domdec_zones_t     *zones;
2770     gmx_domdec_ns_ranges_t *izone;
2771
2772     for (d = 0; d < dd->ndim; d++)
2773     {
2774         dim = dd->dim[d];
2775         copy_ivec(dd->ci, tmp);
2776         tmp[dim]           = (tmp[dim] + 1) % dd->nc[dim];
2777         dd->neighbor[d][0] = ddcoord2ddnodeid(dd, tmp);
2778         copy_ivec(dd->ci, tmp);
2779         tmp[dim]           = (tmp[dim] - 1 + dd->nc[dim]) % dd->nc[dim];
2780         dd->neighbor[d][1] = ddcoord2ddnodeid(dd, tmp);
2781         if (debug)
2782         {
2783             fprintf(debug, "DD rank %d neighbor ranks in dir %d are + %d - %d\n",
2784                     dd->rank, dim,
2785                     dd->neighbor[d][0],
2786                     dd->neighbor[d][1]);
2787         }
2788     }
2789
2790     int nzone  = (1 << dd->ndim);
2791     int nizone = (1 << std::max(dd->ndim - 1, 0));
2792     assert(nizone >= 1 && nizone <= DD_MAXIZONE);
2793
2794     zones = &dd->comm->zones;
2795
2796     for (i = 0; i < nzone; i++)
2797     {
2798         m = 0;
2799         clear_ivec(zones->shift[i]);
2800         for (d = 0; d < dd->ndim; d++)
2801         {
2802             zones->shift[i][dd->dim[d]] = dd_zo[i][m++];
2803         }
2804     }
2805
2806     zones->n = nzone;
2807     for (i = 0; i < nzone; i++)
2808     {
2809         for (d = 0; d < DIM; d++)
2810         {
2811             s[d] = dd->ci[d] - zones->shift[i][d];
2812             if (s[d] < 0)
2813             {
2814                 s[d] += dd->nc[d];
2815             }
2816             else if (s[d] >= dd->nc[d])
2817             {
2818                 s[d] -= dd->nc[d];
2819             }
2820         }
2821     }
2822     zones->nizone = nizone;
2823     for (i = 0; i < zones->nizone; i++)
2824     {
2825         assert(ddNonbondedZonePairRanges[i][0] == i);
2826
2827         izone     = &zones->izone[i];
2828         /* dd_zp3 is for 3D decomposition, for fewer dimensions use only
2829          * j-zones up to nzone.
2830          */
2831         izone->j0 = std::min(ddNonbondedZonePairRanges[i][1], nzone);
2832         izone->j1 = std::min(ddNonbondedZonePairRanges[i][2], nzone);
2833         for (dim = 0; dim < DIM; dim++)
2834         {
2835             if (dd->nc[dim] == 1)
2836             {
2837                 /* All shifts should be allowed */
2838                 izone->shift0[dim] = -1;
2839                 izone->shift1[dim] = 1;
2840             }
2841             else
2842             {
2843                 /* Determine the min/max j-zone shift wrt the i-zone */
2844                 izone->shift0[dim] = 1;
2845                 izone->shift1[dim] = -1;
2846                 for (j = izone->j0; j < izone->j1; j++)
2847                 {
2848                     int shift_diff = zones->shift[j][dim] - zones->shift[i][dim];
2849                     if (shift_diff < izone->shift0[dim])
2850                     {
2851                         izone->shift0[dim] = shift_diff;
2852                     }
2853                     if (shift_diff > izone->shift1[dim])
2854                     {
2855                         izone->shift1[dim] = shift_diff;
2856                     }
2857                 }
2858             }
2859         }
2860     }
2861
2862     if (!isDlbDisabled(dd->comm))
2863     {
2864         snew(dd->comm->root, dd->ndim);
2865     }
2866
2867     if (dd->comm->bRecordLoad)
2868     {
2869         make_load_communicators(dd);
2870     }
2871 }
2872
2873 static void make_pp_communicator(FILE                 *fplog,
2874                                  gmx_domdec_t         *dd,
2875                                  t_commrec gmx_unused *cr,
2876                                  int gmx_unused        reorder)
2877 {
2878 #if GMX_MPI
2879     gmx_domdec_comm_t *comm;
2880     int                rank, *buf;
2881     ivec               periods;
2882     MPI_Comm           comm_cart;
2883
2884     comm = dd->comm;
2885
2886     if (comm->bCartesianPP)
2887     {
2888         /* Set up cartesian communication for the particle-particle part */
2889         if (fplog)
2890         {
2891             fprintf(fplog, "Will use a Cartesian communicator: %d x %d x %d\n",
2892                     dd->nc[XX], dd->nc[YY], dd->nc[ZZ]);
2893         }
2894
2895         for (int i = 0; i < DIM; i++)
2896         {
2897             periods[i] = TRUE;
2898         }
2899         MPI_Cart_create(cr->mpi_comm_mygroup, DIM, dd->nc, periods, reorder,
2900                         &comm_cart);
2901         /* We overwrite the old communicator with the new cartesian one */
2902         cr->mpi_comm_mygroup = comm_cart;
2903     }
2904
2905     dd->mpi_comm_all = cr->mpi_comm_mygroup;
2906     MPI_Comm_rank(dd->mpi_comm_all, &dd->rank);
2907
2908     if (comm->bCartesianPP_PME)
2909     {
2910         /* Since we want to use the original cartesian setup for sim,
2911          * and not the one after split, we need to make an index.
2912          */
2913         snew(comm->ddindex2ddnodeid, dd->nnodes);
2914         comm->ddindex2ddnodeid[dd_index(dd->nc, dd->ci)] = dd->rank;
2915         gmx_sumi(dd->nnodes, comm->ddindex2ddnodeid, cr);
2916         /* Get the rank of the DD master,
2917          * above we made sure that the master node is a PP node.
2918          */
2919         if (MASTER(cr))
2920         {
2921             rank = dd->rank;
2922         }
2923         else
2924         {
2925             rank = 0;
2926         }
2927         MPI_Allreduce(&rank, &dd->masterrank, 1, MPI_INT, MPI_SUM, dd->mpi_comm_all);
2928     }
2929     else if (comm->bCartesianPP)
2930     {
2931         if (cr->npmenodes == 0)
2932         {
2933             /* The PP communicator is also
2934              * the communicator for this simulation
2935              */
2936             cr->mpi_comm_mysim = cr->mpi_comm_mygroup;
2937         }
2938         cr->nodeid = dd->rank;
2939
2940         MPI_Cart_coords(dd->mpi_comm_all, dd->rank, DIM, dd->ci);
2941
2942         /* We need to make an index to go from the coordinates
2943          * to the nodeid of this simulation.
2944          */
2945         snew(comm->ddindex2simnodeid, dd->nnodes);
2946         snew(buf, dd->nnodes);
2947         if (thisRankHasDuty(cr, DUTY_PP))
2948         {
2949             buf[dd_index(dd->nc, dd->ci)] = cr->sim_nodeid;
2950         }
2951         /* Communicate the ddindex to simulation nodeid index */
2952         MPI_Allreduce(buf, comm->ddindex2simnodeid, dd->nnodes, MPI_INT, MPI_SUM,
2953                       cr->mpi_comm_mysim);
2954         sfree(buf);
2955
2956         /* Determine the master coordinates and rank.
2957          * The DD master should be the same node as the master of this sim.
2958          */
2959         for (int i = 0; i < dd->nnodes; i++)
2960         {
2961             if (comm->ddindex2simnodeid[i] == 0)
2962             {
2963                 ddindex2xyz(dd->nc, i, dd->master_ci);
2964                 MPI_Cart_rank(dd->mpi_comm_all, dd->master_ci, &dd->masterrank);
2965             }
2966         }
2967         if (debug)
2968         {
2969             fprintf(debug, "The master rank is %d\n", dd->masterrank);
2970         }
2971     }
2972     else
2973     {
2974         /* No Cartesian communicators */
2975         /* We use the rank in dd->comm->all as DD index */
2976         ddindex2xyz(dd->nc, dd->rank, dd->ci);
2977         /* The simulation master nodeid is 0, so the DD master rank is also 0 */
2978         dd->masterrank = 0;
2979         clear_ivec(dd->master_ci);
2980     }
2981 #endif
2982
2983     if (fplog)
2984     {
2985         fprintf(fplog,
2986                 "Domain decomposition rank %d, coordinates %d %d %d\n\n",
2987                 dd->rank, dd->ci[XX], dd->ci[YY], dd->ci[ZZ]);
2988     }
2989     if (debug)
2990     {
2991         fprintf(debug,
2992                 "Domain decomposition rank %d, coordinates %d %d %d\n\n",
2993                 dd->rank, dd->ci[XX], dd->ci[YY], dd->ci[ZZ]);
2994     }
2995 }
2996
2997 static void receive_ddindex2simnodeid(gmx_domdec_t         *dd,
2998                                       t_commrec            *cr)
2999 {
3000 #if GMX_MPI
3001     gmx_domdec_comm_t *comm = dd->comm;
3002
3003     if (!comm->bCartesianPP_PME && comm->bCartesianPP)
3004     {
3005         int *buf;
3006         snew(comm->ddindex2simnodeid, dd->nnodes);
3007         snew(buf, dd->nnodes);
3008         if (thisRankHasDuty(cr, DUTY_PP))
3009         {
3010             buf[dd_index(dd->nc, dd->ci)] = cr->sim_nodeid;
3011         }
3012         /* Communicate the ddindex to simulation nodeid index */
3013         MPI_Allreduce(buf, comm->ddindex2simnodeid, dd->nnodes, MPI_INT, MPI_SUM,
3014                       cr->mpi_comm_mysim);
3015         sfree(buf);
3016     }
3017 #else
3018     GMX_UNUSED_VALUE(dd);
3019     GMX_UNUSED_VALUE(cr);
3020 #endif
3021 }
3022
3023 static void split_communicator(FILE *fplog, t_commrec *cr, gmx_domdec_t *dd,
3024                                DdRankOrder gmx_unused rankOrder,
3025                                int gmx_unused reorder)
3026 {
3027     gmx_domdec_comm_t *comm;
3028     int                i;
3029     gmx_bool           bDiv[DIM];
3030 #if GMX_MPI
3031     MPI_Comm           comm_cart;
3032 #endif
3033
3034     comm = dd->comm;
3035
3036     if (comm->bCartesianPP)
3037     {
3038         for (i = 1; i < DIM; i++)
3039         {
3040             bDiv[i] = ((cr->npmenodes*dd->nc[i]) % (dd->nnodes) == 0);
3041         }
3042         if (bDiv[YY] || bDiv[ZZ])
3043         {
3044             comm->bCartesianPP_PME = TRUE;
3045             /* If we have 2D PME decomposition, which is always in x+y,
3046              * we stack the PME only nodes in z.
3047              * Otherwise we choose the direction that provides the thinnest slab
3048              * of PME only nodes as this will have the least effect
3049              * on the PP communication.
3050              * But for the PME communication the opposite might be better.
3051              */
3052             if (bDiv[ZZ] && (comm->npmenodes_y > 1 ||
3053                              !bDiv[YY] ||
3054                              dd->nc[YY] > dd->nc[ZZ]))
3055             {
3056                 comm->cartpmedim = ZZ;
3057             }
3058             else
3059             {
3060                 comm->cartpmedim = YY;
3061             }
3062             comm->ntot[comm->cartpmedim]
3063                 += (cr->npmenodes*dd->nc[comm->cartpmedim])/dd->nnodes;
3064         }
3065         else if (fplog)
3066         {
3067             fprintf(fplog, "Number of PME-only ranks (%d) is not a multiple of nx*ny (%d*%d) or nx*nz (%d*%d)\n", cr->npmenodes, dd->nc[XX], dd->nc[YY], dd->nc[XX], dd->nc[ZZ]);
3068             fprintf(fplog,
3069                     "Will not use a Cartesian communicator for PP <-> PME\n\n");
3070         }
3071     }
3072
3073     if (comm->bCartesianPP_PME)
3074     {
3075 #if GMX_MPI
3076         int  rank;
3077         ivec periods;
3078
3079         if (fplog)
3080         {
3081             fprintf(fplog, "Will use a Cartesian communicator for PP <-> PME: %d x %d x %d\n", comm->ntot[XX], comm->ntot[YY], comm->ntot[ZZ]);
3082         }
3083
3084         for (i = 0; i < DIM; i++)
3085         {
3086             periods[i] = TRUE;
3087         }
3088         MPI_Cart_create(cr->mpi_comm_mysim, DIM, comm->ntot, periods, reorder,
3089                         &comm_cart);
3090         MPI_Comm_rank(comm_cart, &rank);
3091         if (MASTER(cr) && rank != 0)
3092         {
3093             gmx_fatal(FARGS, "MPI rank 0 was renumbered by MPI_Cart_create, we do not allow this");
3094         }
3095
3096         /* With this assigment we loose the link to the original communicator
3097          * which will usually be MPI_COMM_WORLD, unless have multisim.
3098          */
3099         cr->mpi_comm_mysim = comm_cart;
3100         cr->sim_nodeid     = rank;
3101
3102         MPI_Cart_coords(cr->mpi_comm_mysim, cr->sim_nodeid, DIM, dd->ci);
3103
3104         if (fplog)
3105         {
3106             fprintf(fplog, "Cartesian rank %d, coordinates %d %d %d\n\n",
3107                     cr->sim_nodeid, dd->ci[XX], dd->ci[YY], dd->ci[ZZ]);
3108         }
3109
3110         if (dd->ci[comm->cartpmedim] < dd->nc[comm->cartpmedim])
3111         {
3112             cr->duty = DUTY_PP;
3113         }
3114         if (cr->npmenodes == 0 ||
3115             dd->ci[comm->cartpmedim] >= dd->nc[comm->cartpmedim])
3116         {
3117             cr->duty = DUTY_PME;
3118         }
3119
3120         /* Split the sim communicator into PP and PME only nodes */
3121         MPI_Comm_split(cr->mpi_comm_mysim,
3122                        getThisRankDuties(cr),
3123                        dd_index(comm->ntot, dd->ci),
3124                        &cr->mpi_comm_mygroup);
3125 #endif
3126     }
3127     else
3128     {
3129         switch (rankOrder)
3130         {
3131             case DdRankOrder::pp_pme:
3132                 if (fplog)
3133                 {
3134                     fprintf(fplog, "Order of the ranks: PP first, PME last\n");
3135                 }
3136                 break;
3137             case DdRankOrder::interleave:
3138                 /* Interleave the PP-only and PME-only ranks */
3139                 if (fplog)
3140                 {
3141                     fprintf(fplog, "Interleaving PP and PME ranks\n");
3142                 }
3143                 comm->pmenodes = dd_interleaved_pme_ranks(dd);
3144                 break;
3145             case DdRankOrder::cartesian:
3146                 break;
3147             default:
3148                 gmx_fatal(FARGS, "Invalid ddRankOrder=%d", static_cast<int>(rankOrder));
3149         }
3150
3151         if (dd_simnode2pmenode(dd, cr, cr->sim_nodeid) == -1)
3152         {
3153             cr->duty = DUTY_PME;
3154         }
3155         else
3156         {
3157             cr->duty = DUTY_PP;
3158         }
3159 #if GMX_MPI
3160         /* Split the sim communicator into PP and PME only nodes */
3161         MPI_Comm_split(cr->mpi_comm_mysim,
3162                        getThisRankDuties(cr),
3163                        cr->nodeid,
3164                        &cr->mpi_comm_mygroup);
3165         MPI_Comm_rank(cr->mpi_comm_mygroup, &cr->nodeid);
3166 #endif
3167     }
3168
3169     if (fplog)
3170     {
3171         fprintf(fplog, "This rank does only %s work.\n\n",
3172                 thisRankHasDuty(cr, DUTY_PP) ? "particle-particle" : "PME-mesh");
3173     }
3174 }
3175
3176 /*! \brief Generates the MPI communicators for domain decomposition */
3177 static void make_dd_communicators(FILE *fplog, t_commrec *cr,
3178                                   gmx_domdec_t *dd, DdRankOrder ddRankOrder)
3179 {
3180     gmx_domdec_comm_t *comm;
3181     int                CartReorder;
3182
3183     comm = dd->comm;
3184
3185     copy_ivec(dd->nc, comm->ntot);
3186
3187     comm->bCartesianPP     = (ddRankOrder == DdRankOrder::cartesian);
3188     comm->bCartesianPP_PME = FALSE;
3189
3190     /* Reorder the nodes by default. This might change the MPI ranks.
3191      * Real reordering is only supported on very few architectures,
3192      * Blue Gene is one of them.
3193      */
3194     CartReorder = (getenv("GMX_NO_CART_REORDER") == nullptr);
3195
3196     if (cr->npmenodes > 0)
3197     {
3198         /* Split the communicator into a PP and PME part */
3199         split_communicator(fplog, cr, dd, ddRankOrder, CartReorder);
3200         if (comm->bCartesianPP_PME)
3201         {
3202             /* We (possibly) reordered the nodes in split_communicator,
3203              * so it is no longer required in make_pp_communicator.
3204              */
3205             CartReorder = FALSE;
3206         }
3207     }
3208     else
3209     {
3210         /* All nodes do PP and PME */
3211 #if GMX_MPI
3212         /* We do not require separate communicators */
3213         cr->mpi_comm_mygroup = cr->mpi_comm_mysim;
3214 #endif
3215     }
3216
3217     if (thisRankHasDuty(cr, DUTY_PP))
3218     {
3219         /* Copy or make a new PP communicator */
3220         make_pp_communicator(fplog, dd, cr, CartReorder);
3221     }
3222     else
3223     {
3224         receive_ddindex2simnodeid(dd, cr);
3225     }
3226
3227     if (!thisRankHasDuty(cr, DUTY_PME))
3228     {
3229         /* Set up the commnuication to our PME node */
3230         dd->pme_nodeid           = dd_simnode2pmenode(dd, cr, cr->sim_nodeid);
3231         dd->pme_receive_vir_ener = receive_vir_ener(dd, cr);
3232         if (debug)
3233         {
3234             fprintf(debug, "My pme_nodeid %d receive ener %d\n",
3235                     dd->pme_nodeid, dd->pme_receive_vir_ener);
3236         }
3237     }
3238     else
3239     {
3240         dd->pme_nodeid = -1;
3241     }
3242
3243     if (DDMASTER(dd))
3244     {
3245         dd->ma = gmx::compat::make_unique<AtomDistribution>(dd->nc,
3246                                                             comm->cgs_gl.nr,
3247                                                             comm->cgs_gl.index[comm->cgs_gl.nr]);
3248     }
3249 }
3250
3251 static real *get_slb_frac(FILE *fplog, const char *dir, int nc, const char *size_string)
3252 {
3253     real  *slb_frac, tot;
3254     int    i, n;
3255     double dbl;
3256
3257     slb_frac = nullptr;
3258     if (nc > 1 && size_string != nullptr)
3259     {
3260         if (fplog)
3261         {
3262             fprintf(fplog, "Using static load balancing for the %s direction\n",
3263                     dir);
3264         }
3265         snew(slb_frac, nc);
3266         tot = 0;
3267         for (i = 0; i < nc; i++)
3268         {
3269             dbl = 0;
3270             sscanf(size_string, "%20lf%n", &dbl, &n);
3271             if (dbl == 0)
3272             {
3273                 gmx_fatal(FARGS, "Incorrect or not enough DD cell size entries for direction %s: '%s'", dir, size_string);
3274             }
3275             slb_frac[i]  = dbl;
3276             size_string += n;
3277             tot         += slb_frac[i];
3278         }
3279         /* Normalize */
3280         if (fplog)
3281         {
3282             fprintf(fplog, "Relative cell sizes:");
3283         }
3284         for (i = 0; i < nc; i++)
3285         {
3286             slb_frac[i] /= tot;
3287             if (fplog)
3288             {
3289                 fprintf(fplog, " %5.3f", slb_frac[i]);
3290             }
3291         }
3292         if (fplog)
3293         {
3294             fprintf(fplog, "\n");
3295         }
3296     }
3297
3298     return slb_frac;
3299 }
3300
3301 static int multi_body_bondeds_count(const gmx_mtop_t *mtop)
3302 {
3303     int                  n, nmol, ftype;
3304     gmx_mtop_ilistloop_t iloop;
3305     const t_ilist       *il;
3306
3307     n     = 0;
3308     iloop = gmx_mtop_ilistloop_init(mtop);
3309     while (gmx_mtop_ilistloop_next(iloop, &il, &nmol))
3310     {
3311         for (ftype = 0; ftype < F_NRE; ftype++)
3312         {
3313             if ((interaction_function[ftype].flags & IF_BOND) &&
3314                 NRAL(ftype) >  2)
3315             {
3316                 n += nmol*il[ftype].nr/(1 + NRAL(ftype));
3317             }
3318         }
3319     }
3320
3321     return n;
3322 }
3323
3324 static int dd_getenv(FILE *fplog, const char *env_var, int def)
3325 {
3326     char *val;
3327     int   nst;
3328
3329     nst = def;
3330     val = getenv(env_var);
3331     if (val)
3332     {
3333         if (sscanf(val, "%20d", &nst) <= 0)
3334         {
3335             nst = 1;
3336         }
3337         if (fplog)
3338         {
3339             fprintf(fplog, "Found env.var. %s = %s, using value %d\n",
3340                     env_var, val, nst);
3341         }
3342     }
3343
3344     return nst;
3345 }
3346
3347 static void dd_warning(const t_commrec *cr, FILE *fplog, const char *warn_string)
3348 {
3349     if (MASTER(cr))
3350     {
3351         fprintf(stderr, "\n%s\n", warn_string);
3352     }
3353     if (fplog)
3354     {
3355         fprintf(fplog, "\n%s\n", warn_string);
3356     }
3357 }
3358
3359 static void check_dd_restrictions(t_commrec *cr, const gmx_domdec_t *dd,
3360                                   const t_inputrec *ir, FILE *fplog)
3361 {
3362     if (ir->ePBC == epbcSCREW &&
3363         (dd->nc[XX] == 1 || dd->nc[YY] > 1 || dd->nc[ZZ] > 1))
3364     {
3365         gmx_fatal(FARGS, "With pbc=%s can only do domain decomposition in the x-direction", epbc_names[ir->ePBC]);
3366     }
3367
3368     if (ir->ns_type == ensSIMPLE)
3369     {
3370         gmx_fatal(FARGS, "Domain decomposition does not support simple neighbor searching, use grid searching or run with one MPI rank");
3371     }
3372
3373     if (ir->nstlist == 0)
3374     {
3375         gmx_fatal(FARGS, "Domain decomposition does not work with nstlist=0");
3376     }
3377
3378     if (ir->comm_mode == ecmANGULAR && ir->ePBC != epbcNONE)
3379     {
3380         dd_warning(cr, fplog, "comm-mode angular will give incorrect results when the comm group partially crosses a periodic boundary");
3381     }
3382 }
3383
3384 static real average_cellsize_min(gmx_domdec_t *dd, gmx_ddbox_t *ddbox)
3385 {
3386     int  di, d;
3387     real r;
3388
3389     r = ddbox->box_size[XX];
3390     for (di = 0; di < dd->ndim; di++)
3391     {
3392         d = dd->dim[di];
3393         /* Check using the initial average cell size */
3394         r = std::min(r, ddbox->box_size[d]*ddbox->skew_fac[d]/dd->nc[d]);
3395     }
3396
3397     return r;
3398 }
3399
3400 /*! \brief Depending on the DLB initial value return the DLB switched off state or issue an error.
3401  */
3402 static int forceDlbOffOrBail(int                cmdlineDlbState,
3403                              const std::string &reasonStr,
3404                              t_commrec         *cr,
3405                              FILE              *fplog)
3406 {
3407     std::string dlbNotSupportedErr  = "Dynamic load balancing requested, but ";
3408     std::string dlbDisableNote      = "NOTE: disabling dynamic load balancing as ";
3409
3410     if (cmdlineDlbState == edlbsOnUser)
3411     {
3412         gmx_fatal(FARGS, (dlbNotSupportedErr + reasonStr).c_str());
3413     }
3414     else if (cmdlineDlbState == edlbsOffCanTurnOn)
3415     {
3416         dd_warning(cr, fplog, (dlbDisableNote + reasonStr + "\n").c_str());
3417     }
3418     return edlbsOffForever;
3419 }
3420
3421 /*! \brief Return the dynamic load balancer's initial state based on initial conditions and user inputs.
3422  *
3423  * This function parses the parameters of "-dlb" command line option setting
3424  * corresponding state values. Then it checks the consistency of the determined
3425  * state with other run parameters and settings. As a result, the initial state
3426  * may be altered or an error may be thrown if incompatibility of options is detected.
3427  *
3428  * \param [in] fplog       Pointer to mdrun log file.
3429  * \param [in] cr          Pointer to MPI communication object.
3430  * \param [in] dlbOption   Enum value for the DLB option.
3431  * \param [in] bRecordLoad True if the load balancer is recording load information.
3432  * \param [in] mdrunOptions  Options for mdrun.
3433  * \param [in] ir          Pointer mdrun to input parameters.
3434  * \returns                DLB initial/startup state.
3435  */
3436 static int determineInitialDlbState(FILE *fplog, t_commrec *cr,
3437                                     DlbOption dlbOption, gmx_bool bRecordLoad,
3438                                     const MdrunOptions &mdrunOptions,
3439                                     const t_inputrec *ir)
3440 {
3441     int dlbState = edlbsOffCanTurnOn;
3442
3443     switch (dlbOption)
3444     {
3445         case DlbOption::turnOnWhenUseful: dlbState = edlbsOffCanTurnOn; break;
3446         case DlbOption::no:               dlbState = edlbsOffUser;      break;
3447         case DlbOption::yes:              dlbState = edlbsOnUser;       break;
3448         default: gmx_incons("Invalid dlbOption enum value");
3449     }
3450
3451     /* Reruns don't support DLB: bail or override auto mode */
3452     if (mdrunOptions.rerun)
3453     {
3454         std::string reasonStr = "it is not supported in reruns.";
3455         return forceDlbOffOrBail(dlbState, reasonStr, cr, fplog);
3456     }
3457
3458     /* Unsupported integrators */
3459     if (!EI_DYNAMICS(ir->eI))
3460     {
3461         auto reasonStr = gmx::formatString("it is only supported with dynamics, not with integrator '%s'.", EI(ir->eI));
3462         return forceDlbOffOrBail(dlbState, reasonStr, cr, fplog);
3463     }
3464
3465     /* Without cycle counters we can't time work to balance on */
3466     if (!bRecordLoad)
3467     {
3468         std::string reasonStr = "cycle counters unsupported or not enabled in the operating system kernel.";
3469         return forceDlbOffOrBail(dlbState, reasonStr, cr, fplog);
3470     }
3471
3472     if (mdrunOptions.reproducible)
3473     {
3474         std::string reasonStr = "you started a reproducible run.";
3475         switch (dlbState)
3476         {
3477             case edlbsOffUser:
3478                 break;
3479             case edlbsOffForever:
3480                 GMX_RELEASE_ASSERT(false, "edlbsOffForever is not a valid initial state");
3481                 break;
3482             case edlbsOffCanTurnOn:
3483                 return forceDlbOffOrBail(dlbState, reasonStr, cr, fplog);
3484                 break;
3485             case edlbsOnCanTurnOff:
3486                 GMX_RELEASE_ASSERT(false, "edlbsOffCanTurnOff is not a valid initial state");
3487                 break;
3488             case edlbsOnUser:
3489                 return forceDlbOffOrBail(dlbState, reasonStr + " In load balanced runs binary reproducibility cannot be ensured.", cr, fplog);
3490                 break;
3491             default:
3492                 gmx_fatal(FARGS, "Death horror: undefined case (%d) for load balancing choice", dlbState);
3493                 break;
3494         }
3495     }
3496
3497     return dlbState;
3498 }
3499
3500 static void set_dd_dim(FILE *fplog, gmx_domdec_t *dd)
3501 {
3502     dd->ndim = 0;
3503     if (getenv("GMX_DD_ORDER_ZYX") != nullptr)
3504     {
3505         /* Decomposition order z,y,x */
3506         if (fplog)
3507         {
3508             fprintf(fplog, "Using domain decomposition order z, y, x\n");
3509         }
3510         for (int dim = DIM-1; dim >= 0; dim--)
3511         {
3512             if (dd->nc[dim] > 1)
3513             {
3514                 dd->dim[dd->ndim++] = dim;
3515             }
3516         }
3517     }
3518     else
3519     {
3520         /* Decomposition order x,y,z */
3521         for (int dim = 0; dim < DIM; dim++)
3522         {
3523             if (dd->nc[dim] > 1)
3524             {
3525                 dd->dim[dd->ndim++] = dim;
3526             }
3527         }
3528     }
3529
3530     if (dd->ndim == 0)
3531     {
3532         /* Set dim[0] to avoid extra checks on ndim in several places */
3533         dd->dim[0] = XX;
3534     }
3535 }
3536
3537 static gmx_domdec_comm_t *init_dd_comm()
3538 {
3539     gmx_domdec_comm_t *comm = new gmx_domdec_comm_t;
3540
3541     comm->n_load_have    = 0;
3542     comm->n_load_collect = 0;
3543
3544     for (int i = 0; i < static_cast<int>(DDAtomRanges::Type::Number); i++)
3545     {
3546         comm->sum_nat[i] = 0;
3547     }
3548     comm->ndecomp   = 0;
3549     comm->nload     = 0;
3550     comm->load_step = 0;
3551     comm->load_sum  = 0;
3552     comm->load_max  = 0;
3553     clear_ivec(comm->load_lim);
3554     comm->load_mdf  = 0;
3555     comm->load_pme  = 0;
3556
3557     /* This should be replaced by a unique pointer */
3558     comm->balanceRegion = ddBalanceRegionAllocate();
3559
3560     return comm;
3561 }
3562
3563 /*! \brief Set the cell size and interaction limits, as well as the DD grid */
3564 static void set_dd_limits_and_grid(FILE *fplog, t_commrec *cr, gmx_domdec_t *dd,
3565                                    const DomdecOptions &options,
3566                                    const MdrunOptions &mdrunOptions,
3567                                    const gmx_mtop_t *mtop,
3568                                    const t_inputrec *ir,
3569                                    const matrix box,
3570                                    gmx::ArrayRef<const gmx::RVec> xGlobal,
3571                                    gmx_ddbox_t *ddbox)
3572 {
3573     real               r_bonded         = -1;
3574     real               r_bonded_limit   = -1;
3575     const real         tenPercentMargin = 1.1;
3576     gmx_domdec_comm_t *comm             = dd->comm;
3577
3578     dd->npbcdim   = ePBC2npbcdim(ir->ePBC);
3579     dd->bScrewPBC = (ir->ePBC == epbcSCREW);
3580
3581     dd->pme_recv_f_alloc = 0;
3582     dd->pme_recv_f_buf   = nullptr;
3583
3584     /* Initialize to GPU share count to 0, might change later */
3585     comm->nrank_gpu_shared = 0;
3586
3587     comm->dlbState         = determineInitialDlbState(fplog, cr, options.dlbOption, comm->bRecordLoad, mdrunOptions, ir);
3588     dd_dlb_set_should_check_whether_to_turn_dlb_on(dd, TRUE);
3589     /* To consider turning DLB on after 2*nstlist steps we need to check
3590      * at partitioning count 3. Thus we need to increase the first count by 2.
3591      */
3592     comm->ddPartioningCountFirstDlbOff += 2;
3593
3594     if (fplog)
3595     {
3596         fprintf(fplog, "Dynamic load balancing: %s\n",
3597                 edlbs_names[comm->dlbState]);
3598     }
3599     comm->bPMELoadBalDLBLimits = FALSE;
3600
3601     /* Allocate the charge group/atom sorting struct */
3602     snew(comm->sort, 1);
3603
3604     comm->bCGs = (ncg_mtop(mtop) < mtop->natoms);
3605
3606     comm->bInterCGBondeds = ((ncg_mtop(mtop) > gmx_mtop_num_molecules(*mtop)) ||
3607                              mtop->bIntermolecularInteractions);
3608     if (comm->bInterCGBondeds)
3609     {
3610         comm->bInterCGMultiBody = (multi_body_bondeds_count(mtop) > 0);
3611     }
3612     else
3613     {
3614         comm->bInterCGMultiBody = FALSE;
3615     }
3616
3617     dd->bInterCGcons    = inter_charge_group_constraints(*mtop);
3618     dd->bInterCGsettles = inter_charge_group_settles(*mtop);
3619
3620     if (ir->rlist == 0)
3621     {
3622         /* Set the cut-off to some very large value,
3623          * so we don't need if statements everywhere in the code.
3624          * We use sqrt, since the cut-off is squared in some places.
3625          */
3626         comm->cutoff   = GMX_CUTOFF_INF;
3627     }
3628     else
3629     {
3630         comm->cutoff   = ir->rlist;
3631     }
3632     comm->cutoff_mbody = 0;
3633
3634     comm->cellsize_limit = 0;
3635     comm->bBondComm      = FALSE;
3636
3637     /* Atoms should be able to move by up to half the list buffer size (if > 0)
3638      * within nstlist steps. Since boundaries are allowed to displace by half
3639      * a cell size, DD cells should be at least the size of the list buffer.
3640      */
3641     comm->cellsize_limit = std::max(comm->cellsize_limit,
3642                                     ir->rlist - std::max(ir->rvdw, ir->rcoulomb));
3643
3644     if (comm->bInterCGBondeds)
3645     {
3646         if (options.minimumCommunicationRange > 0)
3647         {
3648             comm->cutoff_mbody = options.minimumCommunicationRange;
3649             if (options.useBondedCommunication)
3650             {
3651                 comm->bBondComm = (comm->cutoff_mbody > comm->cutoff);
3652             }
3653             else
3654             {
3655                 comm->cutoff = std::max(comm->cutoff, comm->cutoff_mbody);
3656             }
3657             r_bonded_limit = comm->cutoff_mbody;
3658         }
3659         else if (ir->bPeriodicMols)
3660         {
3661             /* Can not easily determine the required cut-off */
3662             dd_warning(cr, fplog, "NOTE: Periodic molecules are present in this system. Because of this, the domain decomposition algorithm cannot easily determine the minimum cell size that it requires for treating bonded interactions. Instead, domain decomposition will assume that half the non-bonded cut-off will be a suitable lower bound.\n");
3663             comm->cutoff_mbody = comm->cutoff/2;
3664             r_bonded_limit     = comm->cutoff_mbody;
3665         }
3666         else
3667         {
3668             real r_2b, r_mb;
3669
3670             if (MASTER(cr))
3671             {
3672                 dd_bonded_cg_distance(fplog, mtop, ir, as_rvec_array(xGlobal.data()), box,
3673                                       options.checkBondedInteractions,
3674                                       &r_2b, &r_mb);
3675             }
3676             gmx_bcast(sizeof(r_2b), &r_2b, cr);
3677             gmx_bcast(sizeof(r_mb), &r_mb, cr);
3678
3679             /* We use an initial margin of 10% for the minimum cell size,
3680              * except when we are just below the non-bonded cut-off.
3681              */
3682             if (options.useBondedCommunication)
3683             {
3684                 if (std::max(r_2b, r_mb) > comm->cutoff)
3685                 {
3686                     r_bonded        = std::max(r_2b, r_mb);
3687                     r_bonded_limit  = tenPercentMargin*r_bonded;
3688                     comm->bBondComm = TRUE;
3689                 }
3690                 else
3691                 {
3692                     r_bonded       = r_mb;
3693                     r_bonded_limit = std::min(tenPercentMargin*r_bonded, comm->cutoff);
3694                 }
3695                 /* We determine cutoff_mbody later */
3696             }
3697             else
3698             {
3699                 /* No special bonded communication,
3700                  * simply increase the DD cut-off.
3701                  */
3702                 r_bonded_limit     = tenPercentMargin*std::max(r_2b, r_mb);
3703                 comm->cutoff_mbody = r_bonded_limit;
3704                 comm->cutoff       = std::max(comm->cutoff, comm->cutoff_mbody);
3705             }
3706         }
3707         if (fplog)
3708         {
3709             fprintf(fplog,
3710                     "Minimum cell size due to bonded interactions: %.3f nm\n",
3711                     r_bonded_limit);
3712         }
3713         comm->cellsize_limit = std::max(comm->cellsize_limit, r_bonded_limit);
3714     }
3715
3716     real rconstr = 0;
3717     if (dd->bInterCGcons && options.constraintCommunicationRange <= 0)
3718     {
3719         /* There is a cell size limit due to the constraints (P-LINCS) */
3720         rconstr = constr_r_max(fplog, mtop, ir);
3721         if (fplog)
3722         {
3723             fprintf(fplog,
3724                     "Estimated maximum distance required for P-LINCS: %.3f nm\n",
3725                     rconstr);
3726             if (rconstr > comm->cellsize_limit)
3727             {
3728                 fprintf(fplog, "This distance will limit the DD cell size, you can override this with -rcon\n");
3729             }
3730         }
3731     }
3732     else if (options.constraintCommunicationRange > 0 && fplog)
3733     {
3734         /* Here we do not check for dd->bInterCGcons,
3735          * because one can also set a cell size limit for virtual sites only
3736          * and at this point we don't know yet if there are intercg v-sites.
3737          */
3738         fprintf(fplog,
3739                 "User supplied maximum distance required for P-LINCS: %.3f nm\n",
3740                 options.constraintCommunicationRange);
3741         rconstr = options.constraintCommunicationRange;
3742     }
3743     comm->cellsize_limit = std::max(comm->cellsize_limit, rconstr);
3744
3745     comm->cgs_gl = gmx_mtop_global_cgs(mtop);
3746
3747     if (options.numCells[XX] > 0)
3748     {
3749         copy_ivec(options.numCells, dd->nc);
3750         set_dd_dim(fplog, dd);
3751         set_ddbox_cr(cr, &dd->nc, ir, box, xGlobal, ddbox);
3752
3753         if (options.numPmeRanks >= 0)
3754         {
3755             cr->npmenodes = options.numPmeRanks;
3756         }
3757         else
3758         {
3759             /* When the DD grid is set explicitly and -npme is set to auto,
3760              * don't use PME ranks. We check later if the DD grid is
3761              * compatible with the total number of ranks.
3762              */
3763             cr->npmenodes = 0;
3764         }
3765
3766         real acs = average_cellsize_min(dd, ddbox);
3767         if (acs < comm->cellsize_limit)
3768         {
3769             if (fplog)
3770             {
3771                 fprintf(fplog, "ERROR: The initial cell size (%f) is smaller than the cell size limit (%f)\n", acs, comm->cellsize_limit);
3772             }
3773             gmx_fatal_collective(FARGS, cr->mpi_comm_mysim, MASTER(cr),
3774                                  "The initial cell size (%f) is smaller than the cell size limit (%f), change options -dd, -rdd or -rcon, see the log file for details",
3775                                  acs, comm->cellsize_limit);
3776         }
3777     }
3778     else
3779     {
3780         set_ddbox_cr(cr, nullptr, ir, box, xGlobal, ddbox);
3781
3782         /* We need to choose the optimal DD grid and possibly PME nodes */
3783         real limit =
3784             dd_choose_grid(fplog, cr, dd, ir, mtop, box, ddbox,
3785                            options.numPmeRanks,
3786                            !isDlbDisabled(comm),
3787                            options.dlbScaling,
3788                            comm->cellsize_limit, comm->cutoff,
3789                            comm->bInterCGBondeds);
3790
3791         if (dd->nc[XX] == 0)
3792         {
3793             char     buf[STRLEN];
3794             gmx_bool bC = (dd->bInterCGcons && rconstr > r_bonded_limit);
3795             sprintf(buf, "Change the number of ranks or mdrun option %s%s%s",
3796                     !bC ? "-rdd" : "-rcon",
3797                     comm->dlbState != edlbsOffUser ? " or -dds" : "",
3798                     bC ? " or your LINCS settings" : "");
3799
3800             gmx_fatal_collective(FARGS, cr->mpi_comm_mysim, MASTER(cr),
3801                                  "There is no domain decomposition for %d ranks that is compatible with the given box and a minimum cell size of %g nm\n"
3802                                  "%s\n"
3803                                  "Look in the log file for details on the domain decomposition",
3804                                  cr->nnodes-cr->npmenodes, limit, buf);
3805         }
3806         set_dd_dim(fplog, dd);
3807     }
3808
3809     if (fplog)
3810     {
3811         fprintf(fplog,
3812                 "Domain decomposition grid %d x %d x %d, separate PME ranks %d\n",
3813                 dd->nc[XX], dd->nc[YY], dd->nc[ZZ], cr->npmenodes);
3814     }
3815
3816     dd->nnodes = dd->nc[XX]*dd->nc[YY]*dd->nc[ZZ];
3817     if (cr->nnodes - dd->nnodes != cr->npmenodes)
3818     {
3819         gmx_fatal_collective(FARGS, cr->mpi_comm_mysim, MASTER(cr),
3820                              "The size of the domain decomposition grid (%d) does not match the number of ranks (%d). The total number of ranks is %d",
3821                              dd->nnodes, cr->nnodes - cr->npmenodes, cr->nnodes);
3822     }
3823     if (cr->npmenodes > dd->nnodes)
3824     {
3825         gmx_fatal_collective(FARGS, cr->mpi_comm_mysim, MASTER(cr),
3826                              "The number of separate PME ranks (%d) is larger than the number of PP ranks (%d), this is not supported.", cr->npmenodes, dd->nnodes);
3827     }
3828     if (cr->npmenodes > 0)
3829     {
3830         comm->npmenodes = cr->npmenodes;
3831     }
3832     else
3833     {
3834         comm->npmenodes = dd->nnodes;
3835     }
3836
3837     if (EEL_PME(ir->coulombtype) || EVDW_PME(ir->vdwtype))
3838     {
3839         /* The following choices should match those
3840          * in comm_cost_est in domdec_setup.c.
3841          * Note that here the checks have to take into account
3842          * that the decomposition might occur in a different order than xyz
3843          * (for instance through the env.var. GMX_DD_ORDER_ZYX),
3844          * in which case they will not match those in comm_cost_est,
3845          * but since that is mainly for testing purposes that's fine.
3846          */
3847         if (dd->ndim >= 2 && dd->dim[0] == XX && dd->dim[1] == YY &&
3848             comm->npmenodes > dd->nc[XX] && comm->npmenodes % dd->nc[XX] == 0 &&
3849             getenv("GMX_PMEONEDD") == nullptr)
3850         {
3851             comm->npmedecompdim = 2;
3852             comm->npmenodes_x   = dd->nc[XX];
3853             comm->npmenodes_y   = comm->npmenodes/comm->npmenodes_x;
3854         }
3855         else
3856         {
3857             /* In case nc is 1 in both x and y we could still choose to
3858              * decompose pme in y instead of x, but we use x for simplicity.
3859              */
3860             comm->npmedecompdim = 1;
3861             if (dd->dim[0] == YY)
3862             {
3863                 comm->npmenodes_x = 1;
3864                 comm->npmenodes_y = comm->npmenodes;
3865             }
3866             else
3867             {
3868                 comm->npmenodes_x = comm->npmenodes;
3869                 comm->npmenodes_y = 1;
3870             }
3871         }
3872         if (fplog)
3873         {
3874             fprintf(fplog, "PME domain decomposition: %d x %d x %d\n",
3875                     comm->npmenodes_x, comm->npmenodes_y, 1);
3876         }
3877     }
3878     else
3879     {
3880         comm->npmedecompdim = 0;
3881         comm->npmenodes_x   = 0;
3882         comm->npmenodes_y   = 0;
3883     }
3884
3885     snew(comm->slb_frac, DIM);
3886     if (isDlbDisabled(comm))
3887     {
3888         comm->slb_frac[XX] = get_slb_frac(fplog, "x", dd->nc[XX], options.cellSizeX);
3889         comm->slb_frac[YY] = get_slb_frac(fplog, "y", dd->nc[YY], options.cellSizeY);
3890         comm->slb_frac[ZZ] = get_slb_frac(fplog, "z", dd->nc[ZZ], options.cellSizeZ);
3891     }
3892
3893     if (comm->bInterCGBondeds && comm->cutoff_mbody == 0)
3894     {
3895         if (comm->bBondComm || !isDlbDisabled(comm))
3896         {
3897             /* Set the bonded communication distance to halfway
3898              * the minimum and the maximum,
3899              * since the extra communication cost is nearly zero.
3900              */
3901             real acs           = average_cellsize_min(dd, ddbox);
3902             comm->cutoff_mbody = 0.5*(r_bonded + acs);
3903             if (!isDlbDisabled(comm))
3904             {
3905                 /* Check if this does not limit the scaling */
3906                 comm->cutoff_mbody = std::min(comm->cutoff_mbody,
3907                                               options.dlbScaling*acs);
3908             }
3909             if (!comm->bBondComm)
3910             {
3911                 /* Without bBondComm do not go beyond the n.b. cut-off */
3912                 comm->cutoff_mbody = std::min(comm->cutoff_mbody, comm->cutoff);
3913                 if (comm->cellsize_limit >= comm->cutoff)
3914                 {
3915                     /* We don't loose a lot of efficieny
3916                      * when increasing it to the n.b. cut-off.
3917                      * It can even be slightly faster, because we need
3918                      * less checks for the communication setup.
3919                      */
3920                     comm->cutoff_mbody = comm->cutoff;
3921                 }
3922             }
3923             /* Check if we did not end up below our original limit */
3924             comm->cutoff_mbody = std::max(comm->cutoff_mbody, r_bonded_limit);
3925
3926             if (comm->cutoff_mbody > comm->cellsize_limit)
3927             {
3928                 comm->cellsize_limit = comm->cutoff_mbody;
3929             }
3930         }
3931         /* Without DLB and cutoff_mbody<cutoff, cutoff_mbody is dynamic */
3932     }
3933
3934     if (debug)
3935     {
3936         fprintf(debug, "Bonded atom communication beyond the cut-off: %d\n"
3937                 "cellsize limit %f\n",
3938                 comm->bBondComm, comm->cellsize_limit);
3939     }
3940
3941     if (MASTER(cr))
3942     {
3943         check_dd_restrictions(cr, dd, ir, fplog);
3944     }
3945 }
3946
3947 static void set_dlb_limits(gmx_domdec_t *dd)
3948
3949 {
3950     for (int d = 0; d < dd->ndim; d++)
3951     {
3952         /* Set the number of pulses to the value for DLB */
3953         dd->comm->cd[d].ind.resize(dd->comm->cd[d].np_dlb);
3954
3955         dd->comm->cellsize_min[dd->dim[d]] =
3956             dd->comm->cellsize_min_dlb[dd->dim[d]];
3957     }
3958 }
3959
3960
3961 static void turn_on_dlb(FILE *fplog, const t_commrec *cr, gmx_int64_t step)
3962 {
3963     gmx_domdec_t      *dd;
3964     gmx_domdec_comm_t *comm;
3965     real               cellsize_min;
3966     int                d, nc, i;
3967
3968     dd   = cr->dd;
3969     comm = dd->comm;
3970
3971     cellsize_min = comm->cellsize_min[dd->dim[0]];
3972     for (d = 1; d < dd->ndim; d++)
3973     {
3974         cellsize_min = std::min(cellsize_min, comm->cellsize_min[dd->dim[d]]);
3975     }
3976
3977     /* Turn off DLB if we're too close to the cell size limit. */
3978     if (cellsize_min < comm->cellsize_limit*1.05)
3979     {
3980         auto str = gmx::formatString("step %" GMX_PRId64 " Measured %.1f %% performance loss due to load imbalance, "
3981                                      "but the minimum cell size is smaller than 1.05 times the cell size limit."
3982                                      "Will no longer try dynamic load balancing.\n", step, dd_force_imb_perf_loss(dd)*100);
3983         dd_warning(cr, fplog, str.c_str());
3984
3985         comm->dlbState = edlbsOffForever;
3986         return;
3987     }
3988
3989     char buf[STRLEN];
3990     sprintf(buf, "step %" GMX_PRId64 " Turning on dynamic load balancing, because the performance loss due to load imbalance is %.1f %%.\n", step, dd_force_imb_perf_loss(dd)*100);
3991     dd_warning(cr, fplog, buf);
3992     comm->dlbState = edlbsOnCanTurnOff;
3993
3994     /* Store the non-DLB performance, so we can check if DLB actually
3995      * improves performance.
3996      */
3997     GMX_RELEASE_ASSERT(comm->cycl_n[ddCyclStep] > 0, "When we turned on DLB, we should have measured cycles");
3998     comm->cyclesPerStepBeforeDLB = comm->cycl[ddCyclStep]/comm->cycl_n[ddCyclStep];
3999
4000     set_dlb_limits(dd);
4001
4002     /* We can set the required cell size info here,
4003      * so we do not need to communicate this.
4004      * The grid is completely uniform.
4005      */
4006     for (d = 0; d < dd->ndim; d++)
4007     {
4008         if (comm->root[d])
4009         {
4010             comm->load[d].sum_m = comm->load[d].sum;
4011
4012             nc = dd->nc[dd->dim[d]];
4013             for (i = 0; i < nc; i++)
4014             {
4015                 comm->root[d]->cell_f[i]    = i/(real)nc;
4016                 if (d > 0)
4017                 {
4018                     comm->root[d]->cell_f_max0[i] =  i   /(real)nc;
4019                     comm->root[d]->cell_f_min1[i] = (i+1)/(real)nc;
4020                 }
4021             }
4022             comm->root[d]->cell_f[nc] = 1.0;
4023         }
4024     }
4025 }
4026
4027 static void turn_off_dlb(FILE *fplog, const t_commrec *cr, gmx_int64_t step)
4028 {
4029     gmx_domdec_t *dd = cr->dd;
4030
4031     char          buf[STRLEN];
4032     sprintf(buf, "step %" GMX_PRId64 " Turning off dynamic load balancing, because it is degrading performance.\n", step);
4033     dd_warning(cr, fplog, buf);
4034     dd->comm->dlbState                     = edlbsOffCanTurnOn;
4035     dd->comm->haveTurnedOffDlb             = true;
4036     dd->comm->ddPartioningCountFirstDlbOff = dd->ddp_count;
4037 }
4038
4039 static void turn_off_dlb_forever(FILE *fplog, const t_commrec *cr, gmx_int64_t step)
4040 {
4041     GMX_RELEASE_ASSERT(cr->dd->comm->dlbState == edlbsOffCanTurnOn, "Can only turn off DLB forever when it was in the can-turn-on state");
4042     char buf[STRLEN];
4043     sprintf(buf, "step %" GMX_PRId64 " Will no longer try dynamic load balancing, as it degraded performance.\n", step);
4044     dd_warning(cr, fplog, buf);
4045     cr->dd->comm->dlbState = edlbsOffForever;
4046 }
4047
4048 static char *init_bLocalCG(const gmx_mtop_t *mtop)
4049 {
4050     int   ncg, cg;
4051     char *bLocalCG;
4052
4053     ncg = ncg_mtop(mtop);
4054     snew(bLocalCG, ncg);
4055     for (cg = 0; cg < ncg; cg++)
4056     {
4057         bLocalCG[cg] = FALSE;
4058     }
4059
4060     return bLocalCG;
4061 }
4062
4063 void dd_init_bondeds(FILE *fplog,
4064                      gmx_domdec_t *dd,
4065                      const gmx_mtop_t *mtop,
4066                      const gmx_vsite_t *vsite,
4067                      const t_inputrec *ir,
4068                      gmx_bool bBCheck, cginfo_mb_t *cginfo_mb)
4069 {
4070     gmx_domdec_comm_t *comm;
4071
4072     dd_make_reverse_top(fplog, dd, mtop, vsite, ir, bBCheck);
4073
4074     comm = dd->comm;
4075
4076     if (comm->bBondComm)
4077     {
4078         /* Communicate atoms beyond the cut-off for bonded interactions */
4079         comm = dd->comm;
4080
4081         comm->cglink = make_charge_group_links(mtop, dd, cginfo_mb);
4082
4083         comm->bLocalCG = init_bLocalCG(mtop);
4084     }
4085     else
4086     {
4087         /* Only communicate atoms based on cut-off */
4088         comm->cglink   = nullptr;
4089         comm->bLocalCG = nullptr;
4090     }
4091 }
4092
4093 static void print_dd_settings(FILE *fplog, gmx_domdec_t *dd,
4094                               const gmx_mtop_t *mtop, const t_inputrec *ir,
4095                               gmx_bool bDynLoadBal, real dlb_scale,
4096                               const gmx_ddbox_t *ddbox)
4097 {
4098     gmx_domdec_comm_t *comm;
4099     int                d;
4100     ivec               np;
4101     real               limit, shrink;
4102     char               buf[64];
4103
4104     if (fplog == nullptr)
4105     {
4106         return;
4107     }
4108
4109     comm = dd->comm;
4110
4111     if (bDynLoadBal)
4112     {
4113         fprintf(fplog, "The maximum number of communication pulses is:");
4114         for (d = 0; d < dd->ndim; d++)
4115         {
4116             fprintf(fplog, " %c %d", dim2char(dd->dim[d]), comm->cd[d].np_dlb);
4117         }
4118         fprintf(fplog, "\n");
4119         fprintf(fplog, "The minimum size for domain decomposition cells is %.3f nm\n", comm->cellsize_limit);
4120         fprintf(fplog, "The requested allowed shrink of DD cells (option -dds) is: %.2f\n", dlb_scale);
4121         fprintf(fplog, "The allowed shrink of domain decomposition cells is:");
4122         for (d = 0; d < DIM; d++)
4123         {
4124             if (dd->nc[d] > 1)
4125             {
4126                 if (d >= ddbox->npbcdim && dd->nc[d] == 2)
4127                 {
4128                     shrink = 0;
4129                 }
4130                 else
4131                 {
4132                     shrink =
4133                         comm->cellsize_min_dlb[d]/
4134                         (ddbox->box_size[d]*ddbox->skew_fac[d]/dd->nc[d]);
4135                 }
4136                 fprintf(fplog, " %c %.2f", dim2char(d), shrink);
4137             }
4138         }
4139         fprintf(fplog, "\n");
4140     }
4141     else
4142     {
4143         set_dd_cell_sizes_slb(dd, ddbox, setcellsizeslbPULSE_ONLY, np);
4144         fprintf(fplog, "The initial number of communication pulses is:");
4145         for (d = 0; d < dd->ndim; d++)
4146         {
4147             fprintf(fplog, " %c %d", dim2char(dd->dim[d]), np[dd->dim[d]]);
4148         }
4149         fprintf(fplog, "\n");
4150         fprintf(fplog, "The initial domain decomposition cell size is:");
4151         for (d = 0; d < DIM; d++)
4152         {
4153             if (dd->nc[d] > 1)
4154             {
4155                 fprintf(fplog, " %c %.2f nm",
4156                         dim2char(d), dd->comm->cellsize_min[d]);
4157             }
4158         }
4159         fprintf(fplog, "\n\n");
4160     }
4161
4162     gmx_bool bInterCGVsites = count_intercg_vsites(mtop);
4163
4164     if (comm->bInterCGBondeds ||
4165         bInterCGVsites ||
4166         dd->bInterCGcons || dd->bInterCGsettles)
4167     {
4168         fprintf(fplog, "The maximum allowed distance for charge groups involved in interactions is:\n");
4169         fprintf(fplog, "%40s  %-7s %6.3f nm\n",
4170                 "non-bonded interactions", "", comm->cutoff);
4171
4172         if (bDynLoadBal)
4173         {
4174             limit = dd->comm->cellsize_limit;
4175         }
4176         else
4177         {
4178             if (dynamic_dd_box(ddbox, ir))
4179             {
4180                 fprintf(fplog, "(the following are initial values, they could change due to box deformation)\n");
4181             }
4182             limit = dd->comm->cellsize_min[XX];
4183             for (d = 1; d < DIM; d++)
4184             {
4185                 limit = std::min(limit, dd->comm->cellsize_min[d]);
4186             }
4187         }
4188
4189         if (comm->bInterCGBondeds)
4190         {
4191             fprintf(fplog, "%40s  %-7s %6.3f nm\n",
4192                     "two-body bonded interactions", "(-rdd)",
4193                     std::max(comm->cutoff, comm->cutoff_mbody));
4194             fprintf(fplog, "%40s  %-7s %6.3f nm\n",
4195                     "multi-body bonded interactions", "(-rdd)",
4196                     (comm->bBondComm || isDlbOn(dd->comm)) ? comm->cutoff_mbody : std::min(comm->cutoff, limit));
4197         }
4198         if (bInterCGVsites)
4199         {
4200             fprintf(fplog, "%40s  %-7s %6.3f nm\n",
4201                     "virtual site constructions", "(-rcon)", limit);
4202         }
4203         if (dd->bInterCGcons || dd->bInterCGsettles)
4204         {
4205             sprintf(buf, "atoms separated by up to %d constraints",
4206                     1+ir->nProjOrder);
4207             fprintf(fplog, "%40s  %-7s %6.3f nm\n",
4208                     buf, "(-rcon)", limit);
4209         }
4210         fprintf(fplog, "\n");
4211     }
4212
4213     fflush(fplog);
4214 }
4215
4216 static void set_cell_limits_dlb(gmx_domdec_t      *dd,
4217                                 real               dlb_scale,
4218                                 const t_inputrec  *ir,
4219                                 const gmx_ddbox_t *ddbox)
4220 {
4221     gmx_domdec_comm_t *comm;
4222     int                d, dim, npulse, npulse_d_max, npulse_d;
4223     gmx_bool           bNoCutOff;
4224
4225     comm = dd->comm;
4226
4227     bNoCutOff = (ir->rvdw == 0 || ir->rcoulomb == 0);
4228
4229     /* Determine the maximum number of comm. pulses in one dimension */
4230
4231     comm->cellsize_limit = std::max(comm->cellsize_limit, comm->cutoff_mbody);
4232
4233     /* Determine the maximum required number of grid pulses */
4234     if (comm->cellsize_limit >= comm->cutoff)
4235     {
4236         /* Only a single pulse is required */
4237         npulse = 1;
4238     }
4239     else if (!bNoCutOff && comm->cellsize_limit > 0)
4240     {
4241         /* We round down slightly here to avoid overhead due to the latency
4242          * of extra communication calls when the cut-off
4243          * would be only slightly longer than the cell size.
4244          * Later cellsize_limit is redetermined,
4245          * so we can not miss interactions due to this rounding.
4246          */
4247         npulse = (int)(0.96 + comm->cutoff/comm->cellsize_limit);
4248     }
4249     else
4250     {
4251         /* There is no cell size limit */
4252         npulse = std::max(dd->nc[XX]-1, std::max(dd->nc[YY]-1, dd->nc[ZZ]-1));
4253     }
4254
4255     if (!bNoCutOff && npulse > 1)
4256     {
4257         /* See if we can do with less pulses, based on dlb_scale */
4258         npulse_d_max = 0;
4259         for (d = 0; d < dd->ndim; d++)
4260         {
4261             dim      = dd->dim[d];
4262             npulse_d = (int)(1 + dd->nc[dim]*comm->cutoff
4263                              /(ddbox->box_size[dim]*ddbox->skew_fac[dim]*dlb_scale));
4264             npulse_d_max = std::max(npulse_d_max, npulse_d);
4265         }
4266         npulse = std::min(npulse, npulse_d_max);
4267     }
4268
4269     /* This env var can override npulse */
4270     d = dd_getenv(debug, "GMX_DD_NPULSE", 0);
4271     if (d > 0)
4272     {
4273         npulse = d;
4274     }
4275
4276     comm->maxpulse       = 1;
4277     comm->bVacDLBNoLimit = (ir->ePBC == epbcNONE);
4278     for (d = 0; d < dd->ndim; d++)
4279     {
4280         comm->cd[d].np_dlb    = std::min(npulse, dd->nc[dd->dim[d]]-1);
4281         comm->maxpulse        = std::max(comm->maxpulse, comm->cd[d].np_dlb);
4282         if (comm->cd[d].np_dlb < dd->nc[dd->dim[d]]-1)
4283         {
4284             comm->bVacDLBNoLimit = FALSE;
4285         }
4286     }
4287
4288     /* cellsize_limit is set for LINCS in init_domain_decomposition */
4289     if (!comm->bVacDLBNoLimit)
4290     {
4291         comm->cellsize_limit = std::max(comm->cellsize_limit,
4292                                         comm->cutoff/comm->maxpulse);
4293     }
4294     comm->cellsize_limit = std::max(comm->cellsize_limit, comm->cutoff_mbody);
4295     /* Set the minimum cell size for each DD dimension */
4296     for (d = 0; d < dd->ndim; d++)
4297     {
4298         if (comm->bVacDLBNoLimit ||
4299             comm->cd[d].np_dlb*comm->cellsize_limit >= comm->cutoff)
4300         {
4301             comm->cellsize_min_dlb[dd->dim[d]] = comm->cellsize_limit;
4302         }
4303         else
4304         {
4305             comm->cellsize_min_dlb[dd->dim[d]] =
4306                 comm->cutoff/comm->cd[d].np_dlb;
4307         }
4308     }
4309     if (comm->cutoff_mbody <= 0)
4310     {
4311         comm->cutoff_mbody = std::min(comm->cutoff, comm->cellsize_limit);
4312     }
4313     if (isDlbOn(comm))
4314     {
4315         set_dlb_limits(dd);
4316     }
4317 }
4318
4319 gmx_bool dd_bonded_molpbc(const gmx_domdec_t *dd, int ePBC)
4320 {
4321     /* If each molecule is a single charge group
4322      * or we use domain decomposition for each periodic dimension,
4323      * we do not need to take pbc into account for the bonded interactions.
4324      */
4325     return (ePBC != epbcNONE && dd->comm->bInterCGBondeds &&
4326             !(dd->nc[XX] > 1 &&
4327               dd->nc[YY] > 1 &&
4328               (dd->nc[ZZ] > 1 || ePBC == epbcXY)));
4329 }
4330
4331 /*! \brief Sets grid size limits and PP-PME setup, prints settings to log */
4332 static void set_ddgrid_parameters(FILE *fplog, gmx_domdec_t *dd, real dlb_scale,
4333                                   const gmx_mtop_t *mtop, const t_inputrec *ir,
4334                                   const gmx_ddbox_t *ddbox)
4335 {
4336     gmx_domdec_comm_t *comm;
4337     int                natoms_tot;
4338     real               vol_frac;
4339
4340     comm = dd->comm;
4341
4342     if (EEL_PME(ir->coulombtype) || EVDW_PME(ir->vdwtype))
4343     {
4344         init_ddpme(dd, &comm->ddpme[0], 0);
4345         if (comm->npmedecompdim >= 2)
4346         {
4347             init_ddpme(dd, &comm->ddpme[1], 1);
4348         }
4349     }
4350     else
4351     {
4352         comm->npmenodes = 0;
4353         if (dd->pme_nodeid >= 0)
4354         {
4355             gmx_fatal_collective(FARGS, dd->mpi_comm_all, DDMASTER(dd),
4356                                  "Can not have separate PME ranks without PME electrostatics");
4357         }
4358     }
4359
4360     if (debug)
4361     {
4362         fprintf(debug, "The DD cut-off is %f\n", comm->cutoff);
4363     }
4364     if (!isDlbDisabled(comm))
4365     {
4366         set_cell_limits_dlb(dd, dlb_scale, ir, ddbox);
4367     }
4368
4369     print_dd_settings(fplog, dd, mtop, ir, isDlbOn(comm), dlb_scale, ddbox);
4370     if (comm->dlbState == edlbsOffCanTurnOn)
4371     {
4372         if (fplog)
4373         {
4374             fprintf(fplog, "When dynamic load balancing gets turned on, these settings will change to:\n");
4375         }
4376         print_dd_settings(fplog, dd, mtop, ir, TRUE, dlb_scale, ddbox);
4377     }
4378
4379     if (ir->ePBC == epbcNONE)
4380     {
4381         vol_frac = 1 - 1/(double)dd->nnodes;
4382     }
4383     else
4384     {
4385         vol_frac =
4386             (1 + comm_box_frac(dd->nc, comm->cutoff, ddbox))/(double)dd->nnodes;
4387     }
4388     if (debug)
4389     {
4390         fprintf(debug, "Volume fraction for all DD zones: %f\n", vol_frac);
4391     }
4392     natoms_tot = comm->cgs_gl.index[comm->cgs_gl.nr];
4393
4394     dd->ga2la = ga2la_init(natoms_tot, static_cast<int>(vol_frac*natoms_tot));
4395 }
4396
4397 /*! \brief Set some important DD parameters that can be modified by env.vars */
4398 static void set_dd_envvar_options(FILE *fplog, gmx_domdec_t *dd, int rank_mysim)
4399 {
4400     gmx_domdec_comm_t *comm = dd->comm;
4401
4402     dd->bSendRecv2      = dd_getenv(fplog, "GMX_DD_USE_SENDRECV2", 0);
4403     comm->dlb_scale_lim = dd_getenv(fplog, "GMX_DLB_MAX_BOX_SCALING", 10);
4404     comm->eFlop         = dd_getenv(fplog, "GMX_DLB_BASED_ON_FLOPS", 0);
4405     int recload         = dd_getenv(fplog, "GMX_DD_RECORD_LOAD", 1);
4406     comm->nstDDDump     = dd_getenv(fplog, "GMX_DD_NST_DUMP", 0);
4407     comm->nstDDDumpGrid = dd_getenv(fplog, "GMX_DD_NST_DUMP_GRID", 0);
4408     comm->DD_debug      = dd_getenv(fplog, "GMX_DD_DEBUG", 0);
4409
4410     if (dd->bSendRecv2 && fplog)
4411     {
4412         fprintf(fplog, "Will use two sequential MPI_Sendrecv calls instead of two simultaneous non-blocking MPI_Irecv and MPI_Isend pairs for constraint and vsite communication\n");
4413     }
4414
4415     if (comm->eFlop)
4416     {
4417         if (fplog)
4418         {
4419             fprintf(fplog, "Will load balance based on FLOP count\n");
4420         }
4421         if (comm->eFlop > 1)
4422         {
4423             srand(1 + rank_mysim);
4424         }
4425         comm->bRecordLoad = TRUE;
4426     }
4427     else
4428     {
4429         comm->bRecordLoad = (wallcycle_have_counter() && recload > 0);
4430     }
4431 }
4432
4433 DomdecOptions::DomdecOptions() :
4434     checkBondedInteractions(TRUE),
4435     useBondedCommunication(TRUE),
4436     numPmeRanks(-1),
4437     rankOrder(DdRankOrder::pp_pme),
4438     minimumCommunicationRange(0),
4439     constraintCommunicationRange(0),
4440     dlbOption(DlbOption::turnOnWhenUseful),
4441     dlbScaling(0.8),
4442     cellSizeX(nullptr),
4443     cellSizeY(nullptr),
4444     cellSizeZ(nullptr)
4445 {
4446     clear_ivec(numCells);
4447 }
4448
4449 gmx_domdec_t *init_domain_decomposition(FILE *fplog, t_commrec *cr,
4450                                         const DomdecOptions &options,
4451                                         const MdrunOptions &mdrunOptions,
4452                                         const gmx_mtop_t *mtop,
4453                                         const t_inputrec *ir,
4454                                         const matrix box,
4455                                         gmx::ArrayRef<const gmx::RVec> xGlobal)
4456 {
4457     gmx_domdec_t      *dd;
4458
4459     if (fplog)
4460     {
4461         fprintf(fplog,
4462                 "\nInitializing Domain Decomposition on %d ranks\n", cr->nnodes);
4463     }
4464
4465     dd = new gmx_domdec_t;
4466
4467     dd->comm = init_dd_comm();
4468
4469     /* Initialize DD paritioning counters */
4470     dd->comm->partition_step = INT_MIN;
4471     dd->ddp_count            = 0;
4472
4473     set_dd_envvar_options(fplog, dd, cr->nodeid);
4474
4475     gmx_ddbox_t ddbox = {0};
4476     set_dd_limits_and_grid(fplog, cr, dd, options, mdrunOptions,
4477                            mtop, ir,
4478                            box, xGlobal,
4479                            &ddbox);
4480
4481     make_dd_communicators(fplog, cr, dd, options.rankOrder);
4482
4483     if (thisRankHasDuty(cr, DUTY_PP))
4484     {
4485         set_ddgrid_parameters(fplog, dd, options.dlbScaling, mtop, ir, &ddbox);
4486
4487         setup_neighbor_relations(dd);
4488     }
4489
4490     /* Set overallocation to avoid frequent reallocation of arrays */
4491     set_over_alloc_dd(TRUE);
4492
4493     clear_dd_cycle_counts(dd);
4494
4495     return dd;
4496 }
4497
4498 static gmx_bool test_dd_cutoff(t_commrec *cr,
4499                                t_state *state, const t_inputrec *ir,
4500                                real cutoff_req)
4501 {
4502     gmx_domdec_t *dd;
4503     gmx_ddbox_t   ddbox;
4504     int           d, dim, np;
4505     real          inv_cell_size;
4506     int           LocallyLimited;
4507
4508     dd = cr->dd;
4509
4510     set_ddbox(dd, false, ir, state->box, true, state->x, &ddbox);
4511
4512     LocallyLimited = 0;
4513
4514     for (d = 0; d < dd->ndim; d++)
4515     {
4516         dim = dd->dim[d];
4517
4518         inv_cell_size = DD_CELL_MARGIN*dd->nc[dim]/ddbox.box_size[dim];
4519         if (dynamic_dd_box(&ddbox, ir))
4520         {
4521             inv_cell_size *= DD_PRES_SCALE_MARGIN;
4522         }
4523
4524         np = 1 + (int)(cutoff_req*inv_cell_size*ddbox.skew_fac[dim]);
4525
4526         if (!isDlbDisabled(dd->comm) && (dim < ddbox.npbcdim) && (dd->comm->cd[d].np_dlb > 0))
4527         {
4528             if (np > dd->comm->cd[d].np_dlb)
4529             {
4530                 return FALSE;
4531             }
4532
4533             /* If a current local cell size is smaller than the requested
4534              * cut-off, we could still fix it, but this gets very complicated.
4535              * Without fixing here, we might actually need more checks.
4536              */
4537             if ((dd->comm->cell_x1[dim] - dd->comm->cell_x0[dim])*ddbox.skew_fac[dim]*dd->comm->cd[d].np_dlb < cutoff_req)
4538             {
4539                 LocallyLimited = 1;
4540             }
4541         }
4542     }
4543
4544     if (!isDlbDisabled(dd->comm))
4545     {
4546         /* If DLB is not active yet, we don't need to check the grid jumps.
4547          * Actually we shouldn't, because then the grid jump data is not set.
4548          */
4549         if (isDlbOn(dd->comm) &&
4550             check_grid_jump(0, dd, cutoff_req, &ddbox, FALSE))
4551         {
4552             LocallyLimited = 1;
4553         }
4554
4555         gmx_sumi(1, &LocallyLimited, cr);
4556
4557         if (LocallyLimited > 0)
4558         {
4559             return FALSE;
4560         }
4561     }
4562
4563     return TRUE;
4564 }
4565
4566 gmx_bool change_dd_cutoff(t_commrec *cr, t_state *state, const t_inputrec *ir,
4567                           real cutoff_req)
4568 {
4569     gmx_bool bCutoffAllowed;
4570
4571     bCutoffAllowed = test_dd_cutoff(cr, state, ir, cutoff_req);
4572
4573     if (bCutoffAllowed)
4574     {
4575         cr->dd->comm->cutoff = cutoff_req;
4576     }
4577
4578     return bCutoffAllowed;
4579 }
4580
4581 void set_dd_dlb_max_cutoff(t_commrec *cr, real cutoff)
4582 {
4583     gmx_domdec_comm_t *comm;
4584
4585     comm = cr->dd->comm;
4586
4587     /* Turn on the DLB limiting (might have been on already) */
4588     comm->bPMELoadBalDLBLimits = TRUE;
4589
4590     /* Change the cut-off limit */
4591     comm->PMELoadBal_max_cutoff = cutoff;
4592
4593     if (debug)
4594     {
4595         fprintf(debug, "PME load balancing set a limit to the DLB staggering such that a %f cut-off will continue to fit\n",
4596                 comm->PMELoadBal_max_cutoff);
4597     }
4598 }
4599
4600 /* Sets whether we should later check the load imbalance data, so that
4601  * we can trigger dynamic load balancing if enough imbalance has
4602  * arisen.
4603  *
4604  * Used after PME load balancing unlocks DLB, so that the check
4605  * whether DLB will be useful can happen immediately.
4606  */
4607 static void dd_dlb_set_should_check_whether_to_turn_dlb_on(gmx_domdec_t *dd, gmx_bool bValue)
4608 {
4609     if (dd->comm->dlbState == edlbsOffCanTurnOn)
4610     {
4611         dd->comm->bCheckWhetherToTurnDlbOn = bValue;
4612
4613         if (bValue == TRUE)
4614         {
4615             /* Store the DD partitioning count, so we can ignore cycle counts
4616              * over the next nstlist steps, which are often slower.
4617              */
4618             dd->comm->ddPartioningCountFirstDlbOff = dd->ddp_count;
4619         }
4620     }
4621 }
4622
4623 /* Returns if we should check whether there has been enough load
4624  * imbalance to trigger dynamic load balancing.
4625  */
4626 static gmx_bool dd_dlb_get_should_check_whether_to_turn_dlb_on(gmx_domdec_t *dd)
4627 {
4628     if (dd->comm->dlbState != edlbsOffCanTurnOn)
4629     {
4630         return FALSE;
4631     }
4632
4633     if (dd->ddp_count <= dd->comm->ddPartioningCountFirstDlbOff)
4634     {
4635         /* We ignore the first nstlist steps at the start of the run
4636          * or after PME load balancing or after turning DLB off, since
4637          * these often have extra allocation or cache miss overhead.
4638          */
4639         return FALSE;
4640     }
4641
4642     if (dd->comm->cycl_n[ddCyclStep] == 0)
4643     {
4644         /* We can have zero timed steps when dd_partition_system is called
4645          * more than once at the same step, e.g. with replica exchange.
4646          * Turning on DLB would trigger an assertion failure later, but is
4647          * also useless right after exchanging replicas.
4648          */
4649         return FALSE;
4650     }
4651
4652     /* We should check whether we should use DLB directly after
4653      * unlocking DLB. */
4654     if (dd->comm->bCheckWhetherToTurnDlbOn)
4655     {
4656         /* This flag was set when the PME load-balancing routines
4657            unlocked DLB, and should now be cleared. */
4658         dd_dlb_set_should_check_whether_to_turn_dlb_on(dd, FALSE);
4659         return TRUE;
4660     }
4661     /* We check whether we should use DLB every c_checkTurnDlbOnInterval
4662      * partitionings (we do not do this every partioning, so that we
4663      * avoid excessive communication). */
4664     if (dd->comm->n_load_have % c_checkTurnDlbOnInterval == c_checkTurnDlbOnInterval - 1)
4665     {
4666         return TRUE;
4667     }
4668
4669     return FALSE;
4670 }
4671
4672 gmx_bool dd_dlb_is_on(const gmx_domdec_t *dd)
4673 {
4674     return isDlbOn(dd->comm);
4675 }
4676
4677 gmx_bool dd_dlb_is_locked(const gmx_domdec_t *dd)
4678 {
4679     return (dd->comm->dlbState == edlbsOffTemporarilyLocked);
4680 }
4681
4682 void dd_dlb_lock(gmx_domdec_t *dd)
4683 {
4684     /* We can only lock the DLB when it is set to auto, otherwise don't do anything */
4685     if (dd->comm->dlbState == edlbsOffCanTurnOn)
4686     {
4687         dd->comm->dlbState = edlbsOffTemporarilyLocked;
4688     }
4689 }
4690
4691 void dd_dlb_unlock(gmx_domdec_t *dd)
4692 {
4693     /* We can only lock the DLB when it is set to auto, otherwise don't do anything */
4694     if (dd->comm->dlbState == edlbsOffTemporarilyLocked)
4695     {
4696         dd->comm->dlbState = edlbsOffCanTurnOn;
4697         dd_dlb_set_should_check_whether_to_turn_dlb_on(dd, TRUE);
4698     }
4699 }
4700
4701 static void merge_cg_buffers(int ncell,
4702                              gmx_domdec_comm_dim_t *cd, int pulse,
4703                              int  *ncg_cell,
4704                              gmx::ArrayRef<int> index_gl,
4705                              int  *recv_i,
4706                              rvec *cg_cm,    rvec *recv_vr,
4707                              gmx::ArrayRef<int> cgindex,
4708                              cginfo_mb_t *cginfo_mb, int *cginfo)
4709 {
4710     gmx_domdec_ind_t *ind, *ind_p;
4711     int               p, cell, c, cg, cg0, cg1, cg_gl, nat;
4712     int               shift, shift_at;
4713
4714     ind = &cd->ind[pulse];
4715
4716     /* First correct the already stored data */
4717     shift = ind->nrecv[ncell];
4718     for (cell = ncell-1; cell >= 0; cell--)
4719     {
4720         shift -= ind->nrecv[cell];
4721         if (shift > 0)
4722         {
4723             /* Move the cg's present from previous grid pulses */
4724             cg0                = ncg_cell[ncell+cell];
4725             cg1                = ncg_cell[ncell+cell+1];
4726             cgindex[cg1+shift] = cgindex[cg1];
4727             for (cg = cg1-1; cg >= cg0; cg--)
4728             {
4729                 index_gl[cg+shift] = index_gl[cg];
4730                 copy_rvec(cg_cm[cg], cg_cm[cg+shift]);
4731                 cgindex[cg+shift] = cgindex[cg];
4732                 cginfo[cg+shift]  = cginfo[cg];
4733             }
4734             /* Correct the already stored send indices for the shift */
4735             for (p = 1; p <= pulse; p++)
4736             {
4737                 ind_p = &cd->ind[p];
4738                 cg0   = 0;
4739                 for (c = 0; c < cell; c++)
4740                 {
4741                     cg0 += ind_p->nsend[c];
4742                 }
4743                 cg1 = cg0 + ind_p->nsend[cell];
4744                 for (cg = cg0; cg < cg1; cg++)
4745                 {
4746                     ind_p->index[cg] += shift;
4747                 }
4748             }
4749         }
4750     }
4751
4752     /* Merge in the communicated buffers */
4753     shift    = 0;
4754     shift_at = 0;
4755     cg0      = 0;
4756     for (cell = 0; cell < ncell; cell++)
4757     {
4758         cg1 = ncg_cell[ncell+cell+1] + shift;
4759         if (shift_at > 0)
4760         {
4761             /* Correct the old cg indices */
4762             for (cg = ncg_cell[ncell+cell]; cg < cg1; cg++)
4763             {
4764                 cgindex[cg+1] += shift_at;
4765             }
4766         }
4767         for (cg = 0; cg < ind->nrecv[cell]; cg++)
4768         {
4769             /* Copy this charge group from the buffer */
4770             index_gl[cg1] = recv_i[cg0];
4771             copy_rvec(recv_vr[cg0], cg_cm[cg1]);
4772             /* Add it to the cgindex */
4773             cg_gl          = index_gl[cg1];
4774             cginfo[cg1]    = ddcginfo(cginfo_mb, cg_gl);
4775             nat            = GET_CGINFO_NATOMS(cginfo[cg1]);
4776             cgindex[cg1+1] = cgindex[cg1] + nat;
4777             cg0++;
4778             cg1++;
4779             shift_at += nat;
4780         }
4781         shift                 += ind->nrecv[cell];
4782         ncg_cell[ncell+cell+1] = cg1;
4783     }
4784 }
4785
4786 static void make_cell2at_index(gmx_domdec_comm_dim_t *cd,
4787                                int                    nzone,
4788                                int                    cg0,
4789                                const BlockRanges     &atomGroups)
4790 {
4791     /* Store the atom block boundaries for easy copying of communication buffers
4792      */
4793     int cg = cg0;
4794     for (int zone = 0; zone < nzone; zone++)
4795     {
4796         for (gmx_domdec_ind_t &ind : cd->ind)
4797         {
4798             ind.cell2at0[zone]  = atomGroups.index[cg];
4799             cg                 += ind.nrecv[zone];
4800             ind.cell2at1[zone]  = atomGroups.index[cg];
4801         }
4802     }
4803 }
4804
4805 static gmx_bool missing_link(t_blocka *link, int cg_gl, char *bLocalCG)
4806 {
4807     int      i;
4808     gmx_bool bMiss;
4809
4810     bMiss = FALSE;
4811     for (i = link->index[cg_gl]; i < link->index[cg_gl+1]; i++)
4812     {
4813         if (!bLocalCG[link->a[i]])
4814         {
4815             bMiss = TRUE;
4816         }
4817     }
4818
4819     return bMiss;
4820 }
4821
4822 /* Domain corners for communication, a maximum of 4 i-zones see a j domain */
4823 typedef struct {
4824     real c[DIM][4]; /* the corners for the non-bonded communication */
4825     real cr0;       /* corner for rounding */
4826     real cr1[4];    /* corners for rounding */
4827     real bc[DIM];   /* corners for bounded communication */
4828     real bcr1;      /* corner for rounding for bonded communication */
4829 } dd_corners_t;
4830
4831 /* Determine the corners of the domain(s) we are communicating with */
4832 static void
4833 set_dd_corners(const gmx_domdec_t *dd,
4834                int dim0, int dim1, int dim2,
4835                gmx_bool bDistMB,
4836                dd_corners_t *c)
4837 {
4838     const gmx_domdec_comm_t  *comm;
4839     const gmx_domdec_zones_t *zones;
4840     int i, j;
4841
4842     comm = dd->comm;
4843
4844     zones = &comm->zones;
4845
4846     /* Keep the compiler happy */
4847     c->cr0  = 0;
4848     c->bcr1 = 0;
4849
4850     /* The first dimension is equal for all cells */
4851     c->c[0][0] = comm->cell_x0[dim0];
4852     if (bDistMB)
4853     {
4854         c->bc[0] = c->c[0][0];
4855     }
4856     if (dd->ndim >= 2)
4857     {
4858         dim1 = dd->dim[1];
4859         /* This cell row is only seen from the first row */
4860         c->c[1][0] = comm->cell_x0[dim1];
4861         /* All rows can see this row */
4862         c->c[1][1] = comm->cell_x0[dim1];
4863         if (isDlbOn(dd->comm))
4864         {
4865             c->c[1][1] = std::max(comm->cell_x0[dim1], comm->zone_d1[1].mch0);
4866             if (bDistMB)
4867             {
4868                 /* For the multi-body distance we need the maximum */
4869                 c->bc[1] = std::max(comm->cell_x0[dim1], comm->zone_d1[1].p1_0);
4870             }
4871         }
4872         /* Set the upper-right corner for rounding */
4873         c->cr0 = comm->cell_x1[dim0];
4874
4875         if (dd->ndim >= 3)
4876         {
4877             dim2 = dd->dim[2];
4878             for (j = 0; j < 4; j++)
4879             {
4880                 c->c[2][j] = comm->cell_x0[dim2];
4881             }
4882             if (isDlbOn(dd->comm))
4883             {
4884                 /* Use the maximum of the i-cells that see a j-cell */
4885                 for (i = 0; i < zones->nizone; i++)
4886                 {
4887                     for (j = zones->izone[i].j0; j < zones->izone[i].j1; j++)
4888                     {
4889                         if (j >= 4)
4890                         {
4891                             c->c[2][j-4] =
4892                                 std::max(c->c[2][j-4],
4893                                          comm->zone_d2[zones->shift[i][dim0]][zones->shift[i][dim1]].mch0);
4894                         }
4895                     }
4896                 }
4897                 if (bDistMB)
4898                 {
4899                     /* For the multi-body distance we need the maximum */
4900                     c->bc[2] = comm->cell_x0[dim2];
4901                     for (i = 0; i < 2; i++)
4902                     {
4903                         for (j = 0; j < 2; j++)
4904                         {
4905                             c->bc[2] = std::max(c->bc[2], comm->zone_d2[i][j].p1_0);
4906                         }
4907                     }
4908                 }
4909             }
4910
4911             /* Set the upper-right corner for rounding */
4912             /* Cell (0,0,0) and cell (1,0,0) can see cell 4 (0,1,1)
4913              * Only cell (0,0,0) can see cell 7 (1,1,1)
4914              */
4915             c->cr1[0] = comm->cell_x1[dim1];
4916             c->cr1[3] = comm->cell_x1[dim1];
4917             if (isDlbOn(dd->comm))
4918             {
4919                 c->cr1[0] = std::max(comm->cell_x1[dim1], comm->zone_d1[1].mch1);
4920                 if (bDistMB)
4921                 {
4922                     /* For the multi-body distance we need the maximum */
4923                     c->bcr1 = std::max(comm->cell_x1[dim1], comm->zone_d1[1].p1_1);
4924                 }
4925             }
4926         }
4927     }
4928 }
4929
4930 /* Add the atom groups we need to send in this pulse from this zone to
4931  * \p localAtomGroups and \p work
4932  */
4933 static void
4934 get_zone_pulse_cgs(gmx_domdec_t *dd,
4935                    int zonei, int zone,
4936                    int cg0, int cg1,
4937                    gmx::ArrayRef<const int> globalAtomGroupIndices,
4938                    const gmx::BlockRanges &atomGroups,
4939                    int dim, int dim_ind,
4940                    int dim0, int dim1, int dim2,
4941                    real r_comm2, real r_bcomm2,
4942                    matrix box,
4943                    bool distanceIsTriclinic,
4944                    rvec *normal,
4945                    real skew_fac2_d, real skew_fac_01,
4946                    rvec *v_d, rvec *v_0, rvec *v_1,
4947                    const dd_corners_t *c,
4948                    rvec sf2_round,
4949                    gmx_bool bDistBonded,
4950                    gmx_bool bBondComm,
4951                    gmx_bool bDist2B,
4952                    gmx_bool bDistMB,
4953                    rvec *cg_cm,
4954                    int *cginfo,
4955                    std::vector<int>     *localAtomGroups,
4956                    dd_comm_setup_work_t *work)
4957 {
4958     gmx_domdec_comm_t *comm;
4959     gmx_bool           bScrew;
4960     gmx_bool           bDistMB_pulse;
4961     int                cg, i;
4962     real               r2, rb2, r, tric_sh;
4963     rvec               rn, rb;
4964     int                dimd;
4965     int                nsend_z, nat;
4966
4967     comm = dd->comm;
4968
4969     bScrew = (dd->bScrewPBC && dim == XX);
4970
4971     bDistMB_pulse = (bDistMB && bDistBonded);
4972
4973     /* Unpack the work data */
4974     std::vector<int>       &ibuf = work->atomGroupBuffer;
4975     std::vector<gmx::RVec> &vbuf = work->positionBuffer;
4976     nsend_z                      = 0;
4977     nat                          = work->nat;
4978
4979     for (cg = cg0; cg < cg1; cg++)
4980     {
4981         r2  = 0;
4982         rb2 = 0;
4983         if (!distanceIsTriclinic)
4984         {
4985             /* Rectangular direction, easy */
4986             r = cg_cm[cg][dim] - c->c[dim_ind][zone];
4987             if (r > 0)
4988             {
4989                 r2 += r*r;
4990             }
4991             if (bDistMB_pulse)
4992             {
4993                 r = cg_cm[cg][dim] - c->bc[dim_ind];
4994                 if (r > 0)
4995                 {
4996                     rb2 += r*r;
4997                 }
4998             }
4999             /* Rounding gives at most a 16% reduction
5000              * in communicated atoms
5001              */
5002             if (dim_ind >= 1 && (zonei == 1 || zonei == 2))
5003             {
5004                 r = cg_cm[cg][dim0] - c->cr0;
5005                 /* This is the first dimension, so always r >= 0 */
5006                 r2 += r*r;
5007                 if (bDistMB_pulse)
5008                 {
5009                     rb2 += r*r;
5010                 }
5011             }
5012             if (dim_ind == 2 && (zonei == 2 || zonei == 3))
5013             {
5014                 r = cg_cm[cg][dim1] - c->cr1[zone];
5015                 if (r > 0)
5016                 {
5017                     r2 += r*r;
5018                 }
5019                 if (bDistMB_pulse)
5020                 {
5021                     r = cg_cm[cg][dim1] - c->bcr1;
5022                     if (r > 0)
5023                     {
5024                         rb2 += r*r;
5025                     }
5026                 }
5027             }
5028         }
5029         else
5030         {
5031             /* Triclinic direction, more complicated */
5032             clear_rvec(rn);
5033             clear_rvec(rb);
5034             /* Rounding, conservative as the skew_fac multiplication
5035              * will slightly underestimate the distance.
5036              */
5037             if (dim_ind >= 1 && (zonei == 1 || zonei == 2))
5038             {
5039                 rn[dim0] = cg_cm[cg][dim0] - c->cr0;
5040                 for (i = dim0+1; i < DIM; i++)
5041                 {
5042                     rn[dim0] -= cg_cm[cg][i]*v_0[i][dim0];
5043                 }
5044                 r2 = rn[dim0]*rn[dim0]*sf2_round[dim0];
5045                 if (bDistMB_pulse)
5046                 {
5047                     rb[dim0] = rn[dim0];
5048                     rb2      = r2;
5049                 }
5050                 /* Take care that the cell planes along dim0 might not
5051                  * be orthogonal to those along dim1 and dim2.
5052                  */
5053                 for (i = 1; i <= dim_ind; i++)
5054                 {
5055                     dimd = dd->dim[i];
5056                     if (normal[dim0][dimd] > 0)
5057                     {
5058                         rn[dimd] -= rn[dim0]*normal[dim0][dimd];
5059                         if (bDistMB_pulse)
5060                         {
5061                             rb[dimd] -= rb[dim0]*normal[dim0][dimd];
5062                         }
5063                     }
5064                 }
5065             }
5066             if (dim_ind == 2 && (zonei == 2 || zonei == 3))
5067             {
5068                 rn[dim1] += cg_cm[cg][dim1] - c->cr1[zone];
5069                 tric_sh   = 0;
5070                 for (i = dim1+1; i < DIM; i++)
5071                 {
5072                     tric_sh -= cg_cm[cg][i]*v_1[i][dim1];
5073                 }
5074                 rn[dim1] += tric_sh;
5075                 if (rn[dim1] > 0)
5076                 {
5077                     r2 += rn[dim1]*rn[dim1]*sf2_round[dim1];
5078                     /* Take care of coupling of the distances
5079                      * to the planes along dim0 and dim1 through dim2.
5080                      */
5081                     r2 -= rn[dim0]*rn[dim1]*skew_fac_01;
5082                     /* Take care that the cell planes along dim1
5083                      * might not be orthogonal to that along dim2.
5084                      */
5085                     if (normal[dim1][dim2] > 0)
5086                     {
5087                         rn[dim2] -= rn[dim1]*normal[dim1][dim2];
5088                     }
5089                 }
5090                 if (bDistMB_pulse)
5091                 {
5092                     rb[dim1] +=
5093                         cg_cm[cg][dim1] - c->bcr1 + tric_sh;
5094                     if (rb[dim1] > 0)
5095                     {
5096                         rb2 += rb[dim1]*rb[dim1]*sf2_round[dim1];
5097                         /* Take care of coupling of the distances
5098                          * to the planes along dim0 and dim1 through dim2.
5099                          */
5100                         rb2 -= rb[dim0]*rb[dim1]*skew_fac_01;
5101                         /* Take care that the cell planes along dim1
5102                          * might not be orthogonal to that along dim2.
5103                          */
5104                         if (normal[dim1][dim2] > 0)
5105                         {
5106                             rb[dim2] -= rb[dim1]*normal[dim1][dim2];
5107                         }
5108                     }
5109                 }
5110             }
5111             /* The distance along the communication direction */
5112             rn[dim] += cg_cm[cg][dim] - c->c[dim_ind][zone];
5113             tric_sh  = 0;
5114             for (i = dim+1; i < DIM; i++)
5115             {
5116                 tric_sh -= cg_cm[cg][i]*v_d[i][dim];
5117             }
5118             rn[dim] += tric_sh;
5119             if (rn[dim] > 0)
5120             {
5121                 r2 += rn[dim]*rn[dim]*skew_fac2_d;
5122                 /* Take care of coupling of the distances
5123                  * to the planes along dim0 and dim1 through dim2.
5124                  */
5125                 if (dim_ind == 1 && zonei == 1)
5126                 {
5127                     r2 -= rn[dim0]*rn[dim]*skew_fac_01;
5128                 }
5129             }
5130             if (bDistMB_pulse)
5131             {
5132                 clear_rvec(rb);
5133                 rb[dim] += cg_cm[cg][dim] - c->bc[dim_ind] + tric_sh;
5134                 if (rb[dim] > 0)
5135                 {
5136                     rb2 += rb[dim]*rb[dim]*skew_fac2_d;
5137                     /* Take care of coupling of the distances
5138                      * to the planes along dim0 and dim1 through dim2.
5139                      */
5140                     if (dim_ind == 1 && zonei == 1)
5141                     {
5142                         rb2 -= rb[dim0]*rb[dim]*skew_fac_01;
5143                     }
5144                 }
5145             }
5146         }
5147
5148         if (r2 < r_comm2 ||
5149             (bDistBonded &&
5150              ((bDistMB && rb2 < r_bcomm2) ||
5151               (bDist2B && r2  < r_bcomm2)) &&
5152              (!bBondComm ||
5153               (GET_CGINFO_BOND_INTER(cginfo[cg]) &&
5154                missing_link(comm->cglink, globalAtomGroupIndices[cg],
5155                             comm->bLocalCG)))))
5156         {
5157             /* Store the local and global atom group indices and position */
5158             localAtomGroups->push_back(cg);
5159             ibuf.push_back(globalAtomGroupIndices[cg]);
5160             nsend_z++;
5161
5162             rvec posPbc;
5163             if (dd->ci[dim] == 0)
5164             {
5165                 /* Correct cg_cm for pbc */
5166                 rvec_add(cg_cm[cg], box[dim], posPbc);
5167                 if (bScrew)
5168                 {
5169                     posPbc[YY] = box[YY][YY] - posPbc[YY];
5170                     posPbc[ZZ] = box[ZZ][ZZ] - posPbc[ZZ];
5171                 }
5172             }
5173             else
5174             {
5175                 copy_rvec(cg_cm[cg], posPbc);
5176             }
5177             vbuf.emplace_back(posPbc[XX], posPbc[YY], posPbc[ZZ]);
5178
5179             nat += atomGroups.index[cg+1] - atomGroups.index[cg];
5180         }
5181     }
5182
5183     work->nat        = nat;
5184     work->nsend_zone = nsend_z;
5185 }
5186
5187 static void clearCommSetupData(dd_comm_setup_work_t *work)
5188 {
5189     work->localAtomGroupBuffer.clear();
5190     work->atomGroupBuffer.clear();
5191     work->positionBuffer.clear();
5192     work->nat        = 0;
5193     work->nsend_zone = 0;
5194 }
5195
5196 static void setup_dd_communication(gmx_domdec_t *dd,
5197                                    matrix box, gmx_ddbox_t *ddbox,
5198                                    t_forcerec *fr,
5199                                    t_state *state, PaddedRVecVector *f)
5200 {
5201     int                    dim_ind, dim, dim0, dim1, dim2, dimd, nat_tot;
5202     int                    nzone, nzone_send, zone, zonei, cg0, cg1;
5203     int                    c, i, cg, cg_gl, nrcg;
5204     int                   *zone_cg_range, pos_cg;
5205     gmx_domdec_comm_t     *comm;
5206     gmx_domdec_zones_t    *zones;
5207     gmx_domdec_comm_dim_t *cd;
5208     cginfo_mb_t           *cginfo_mb;
5209     gmx_bool               bBondComm, bDist2B, bDistMB, bDistBonded;
5210     real                   r_comm2, r_bcomm2;
5211     dd_corners_t           corners;
5212     rvec                  *cg_cm, *normal, *v_d, *v_0 = nullptr, *v_1 = nullptr;
5213     real                   skew_fac2_d, skew_fac_01;
5214     rvec                   sf2_round;
5215
5216     if (debug)
5217     {
5218         fprintf(debug, "Setting up DD communication\n");
5219     }
5220
5221     comm  = dd->comm;
5222
5223     if (comm->dth.empty())
5224     {
5225         /* Initialize the thread data.
5226          * This can not be done in init_domain_decomposition,
5227          * as the numbers of threads is determined later.
5228          */
5229         int numThreads = gmx_omp_nthreads_get(emntDomdec);
5230         comm->dth.resize(numThreads);
5231     }
5232
5233     switch (fr->cutoff_scheme)
5234     {
5235         case ecutsGROUP:
5236             cg_cm = fr->cg_cm;
5237             break;
5238         case ecutsVERLET:
5239             cg_cm = as_rvec_array(state->x.data());
5240             break;
5241         default:
5242             gmx_incons("unimplemented");
5243             cg_cm = nullptr;
5244     }
5245
5246     bBondComm = comm->bBondComm;
5247
5248     /* Do we need to determine extra distances for multi-body bondeds? */
5249     bDistMB = (comm->bInterCGMultiBody && isDlbOn(dd->comm) && dd->ndim > 1);
5250
5251     /* Do we need to determine extra distances for only two-body bondeds? */
5252     bDist2B = (bBondComm && !bDistMB);
5253
5254     r_comm2  = gmx::square(comm->cutoff);
5255     r_bcomm2 = gmx::square(comm->cutoff_mbody);
5256
5257     if (debug)
5258     {
5259         fprintf(debug, "bBondComm %d, r_bc %f\n", bBondComm, std::sqrt(r_bcomm2));
5260     }
5261
5262     zones = &comm->zones;
5263
5264     dim0 = dd->dim[0];
5265     dim1 = (dd->ndim >= 2 ? dd->dim[1] : -1);
5266     dim2 = (dd->ndim >= 3 ? dd->dim[2] : -1);
5267
5268     set_dd_corners(dd, dim0, dim1, dim2, bDistMB, &corners);
5269
5270     /* Triclinic stuff */
5271     normal      = ddbox->normal;
5272     skew_fac_01 = 0;
5273     if (dd->ndim >= 2)
5274     {
5275         v_0 = ddbox->v[dim0];
5276         if (ddbox->tric_dir[dim0] && ddbox->tric_dir[dim1])
5277         {
5278             /* Determine the coupling coefficient for the distances
5279              * to the cell planes along dim0 and dim1 through dim2.
5280              * This is required for correct rounding.
5281              */
5282             skew_fac_01 =
5283                 ddbox->v[dim0][dim1+1][dim0]*ddbox->v[dim1][dim1+1][dim1];
5284             if (debug)
5285             {
5286                 fprintf(debug, "\nskew_fac_01 %f\n", skew_fac_01);
5287             }
5288         }
5289     }
5290     if (dd->ndim >= 3)
5291     {
5292         v_1 = ddbox->v[dim1];
5293     }
5294
5295     zone_cg_range = zones->cg_range;
5296     cginfo_mb     = fr->cginfo_mb;
5297
5298     zone_cg_range[0]   = 0;
5299     zone_cg_range[1]   = dd->ncg_home;
5300     comm->zone_ncg1[0] = dd->ncg_home;
5301     pos_cg             = dd->ncg_home;
5302
5303     nat_tot = comm->atomRanges.numHomeAtoms();
5304     nzone   = 1;
5305     for (dim_ind = 0; dim_ind < dd->ndim; dim_ind++)
5306     {
5307         dim = dd->dim[dim_ind];
5308         cd  = &comm->cd[dim_ind];
5309
5310         /* Check if we need to compute triclinic distances along this dim */
5311         bool distanceIsTriclinic = false;
5312         for (i = 0; i <= dim_ind; i++)
5313         {
5314             if (ddbox->tric_dir[dd->dim[i]])
5315             {
5316                 distanceIsTriclinic = true;
5317             }
5318         }
5319
5320         if (dim >= ddbox->npbcdim && dd->ci[dim] == 0)
5321         {
5322             /* No pbc in this dimension, the first node should not comm. */
5323             nzone_send = 0;
5324         }
5325         else
5326         {
5327             nzone_send = nzone;
5328         }
5329
5330         v_d                = ddbox->v[dim];
5331         skew_fac2_d        = gmx::square(ddbox->skew_fac[dim]);
5332
5333         cd->receiveInPlace = true;
5334         for (int p = 0; p < cd->numPulses(); p++)
5335         {
5336             /* Only atoms communicated in the first pulse are used
5337              * for multi-body bonded interactions or for bBondComm.
5338              */
5339             bDistBonded = ((bDistMB || bDist2B) && p == 0);
5340
5341             gmx_domdec_ind_t *ind = &cd->ind[p];
5342
5343             /* Thread 0 writes in the global index array */
5344             ind->index.clear();
5345             clearCommSetupData(&comm->dth[0]);
5346
5347             for (zone = 0; zone < nzone_send; zone++)
5348             {
5349                 if (dim_ind > 0 && distanceIsTriclinic)
5350                 {
5351                     /* Determine slightly more optimized skew_fac's
5352                      * for rounding.
5353                      * This reduces the number of communicated atoms
5354                      * by about 10% for 3D DD of rhombic dodecahedra.
5355                      */
5356                     for (dimd = 0; dimd < dim; dimd++)
5357                     {
5358                         sf2_round[dimd] = 1;
5359                         if (ddbox->tric_dir[dimd])
5360                         {
5361                             for (i = dd->dim[dimd]+1; i < DIM; i++)
5362                             {
5363                                 /* If we are shifted in dimension i
5364                                  * and the cell plane is tilted forward
5365                                  * in dimension i, skip this coupling.
5366                                  */
5367                                 if (!(zones->shift[nzone+zone][i] &&
5368                                       ddbox->v[dimd][i][dimd] >= 0))
5369                                 {
5370                                     sf2_round[dimd] +=
5371                                         gmx::square(ddbox->v[dimd][i][dimd]);
5372                                 }
5373                             }
5374                             sf2_round[dimd] = 1/sf2_round[dimd];
5375                         }
5376                     }
5377                 }
5378
5379                 zonei = zone_perm[dim_ind][zone];
5380                 if (p == 0)
5381                 {
5382                     /* Here we permutate the zones to obtain a convenient order
5383                      * for neighbor searching
5384                      */
5385                     cg0 = zone_cg_range[zonei];
5386                     cg1 = zone_cg_range[zonei+1];
5387                 }
5388                 else
5389                 {
5390                     /* Look only at the cg's received in the previous grid pulse
5391                      */
5392                     cg1 = zone_cg_range[nzone+zone+1];
5393                     cg0 = cg1 - cd->ind[p-1].nrecv[zone];
5394                 }
5395
5396                 const int numThreads = static_cast<int>(comm->dth.size());
5397 #pragma omp parallel for num_threads(numThreads) schedule(static)
5398                 for (int th = 0; th < numThreads; th++)
5399                 {
5400                     try
5401                     {
5402                         dd_comm_setup_work_t &work = comm->dth[th];
5403
5404                         /* Retain data accumulated into buffers of thread 0 */
5405                         if (th > 0)
5406                         {
5407                             clearCommSetupData(&work);
5408                         }
5409
5410                         int cg0_th = cg0 + ((cg1 - cg0)* th   )/numThreads;
5411                         int cg1_th = cg0 + ((cg1 - cg0)*(th+1))/numThreads;
5412
5413                         /* Get the cg's for this pulse in this zone */
5414                         get_zone_pulse_cgs(dd, zonei, zone, cg0_th, cg1_th,
5415                                            dd->globalAtomGroupIndices,
5416                                            dd->atomGroups(),
5417                                            dim, dim_ind, dim0, dim1, dim2,
5418                                            r_comm2, r_bcomm2,
5419                                            box, distanceIsTriclinic,
5420                                            normal, skew_fac2_d, skew_fac_01,
5421                                            v_d, v_0, v_1, &corners, sf2_round,
5422                                            bDistBonded, bBondComm,
5423                                            bDist2B, bDistMB,
5424                                            cg_cm, fr->cginfo,
5425                                            th == 0 ? &ind->index : &work.localAtomGroupBuffer,
5426                                            &work);
5427                     }
5428                     GMX_CATCH_ALL_AND_EXIT_WITH_FATAL_ERROR;
5429                 } // END
5430
5431                 std::vector<int>       &atomGroups = comm->dth[0].atomGroupBuffer;
5432                 std::vector<gmx::RVec> &positions  = comm->dth[0].positionBuffer;
5433                 ind->nsend[zone]  = comm->dth[0].nsend_zone;
5434                 /* Append data of threads>=1 to the communication buffers */
5435                 for (int th = 1; th < numThreads; th++)
5436                 {
5437                     const dd_comm_setup_work_t &dth = comm->dth[th];
5438
5439                     ind->index.insert(ind->index.end(), dth.localAtomGroupBuffer.begin(), dth.localAtomGroupBuffer.end());
5440                     atomGroups.insert(atomGroups.end(), dth.atomGroupBuffer.begin(), dth.atomGroupBuffer.end());
5441                     positions.insert(positions.end(), dth.positionBuffer.begin(), dth.positionBuffer.end());
5442                     comm->dth[0].nat += dth.nat;
5443                     ind->nsend[zone] += dth.nsend_zone;
5444                 }
5445             }
5446             /* Clear the counts in case we do not have pbc */
5447             for (zone = nzone_send; zone < nzone; zone++)
5448             {
5449                 ind->nsend[zone] = 0;
5450             }
5451             ind->nsend[nzone]     = ind->index.size();
5452             ind->nsend[nzone + 1] = comm->dth[0].nat;
5453             /* Communicate the number of cg's and atoms to receive */
5454             ddSendrecv(dd, dim_ind, dddirBackward,
5455                        ind->nsend, nzone+2,
5456                        ind->nrecv, nzone+2);
5457
5458             if (p > 0)
5459             {
5460                 /* We can receive in place if only the last zone is not empty */
5461                 for (zone = 0; zone < nzone-1; zone++)
5462                 {
5463                     if (ind->nrecv[zone] > 0)
5464                     {
5465                         cd->receiveInPlace = false;
5466                     }
5467                 }
5468             }
5469
5470             int receiveBufferSize = 0;
5471             if (!cd->receiveInPlace)
5472             {
5473                 receiveBufferSize = ind->nrecv[nzone];
5474             }
5475             /* These buffer are actually only needed with in-place */
5476             DDBufferAccess<int>       globalAtomGroupBuffer(comm->intBuffer, receiveBufferSize);
5477             DDBufferAccess<gmx::RVec> rvecBuffer(comm->rvecBuffer, receiveBufferSize);
5478
5479             dd_comm_setup_work_t     &work = comm->dth[0];
5480
5481             /* Make space for the global cg indices */
5482             int numAtomGroupsNew = pos_cg + ind->nrecv[nzone];
5483             dd->globalAtomGroupIndices.resize(numAtomGroupsNew);
5484             dd->atomGroups_.index.resize(numAtomGroupsNew + 1);
5485             /* Communicate the global cg indices */
5486             gmx::ArrayRef<int> integerBufferRef;
5487             if (cd->receiveInPlace)
5488             {
5489                 integerBufferRef = gmx::arrayRefFromArray(dd->globalAtomGroupIndices.data() + pos_cg, ind->nrecv[nzone]);
5490             }
5491             else
5492             {
5493                 integerBufferRef = globalAtomGroupBuffer.buffer;
5494             }
5495             ddSendrecv<int>(dd, dim_ind, dddirBackward,
5496                             work.atomGroupBuffer, integerBufferRef);
5497
5498             /* Make space for cg_cm */
5499             dd_check_alloc_ncg(fr, state, f, pos_cg + ind->nrecv[nzone]);
5500             if (fr->cutoff_scheme == ecutsGROUP)
5501             {
5502                 cg_cm = fr->cg_cm;
5503             }
5504             else
5505             {
5506                 cg_cm = as_rvec_array(state->x.data());
5507             }
5508             /* Communicate cg_cm */
5509             gmx::ArrayRef<gmx::RVec> rvecBufferRef;
5510             if (cd->receiveInPlace)
5511             {
5512                 rvecBufferRef = gmx::arrayRefFromArray(reinterpret_cast<gmx::RVec *>(cg_cm + pos_cg), ind->nrecv[nzone]);
5513             }
5514             else
5515             {
5516                 rvecBufferRef = rvecBuffer.buffer;
5517             }
5518             ddSendrecv<gmx::RVec>(dd, dim_ind, dddirBackward,
5519                                   work.positionBuffer, rvecBufferRef);
5520
5521             /* Make the charge group index */
5522             if (cd->receiveInPlace)
5523             {
5524                 zone = (p == 0 ? 0 : nzone - 1);
5525                 while (zone < nzone)
5526                 {
5527                     for (cg = 0; cg < ind->nrecv[zone]; cg++)
5528                     {
5529                         cg_gl                              = dd->globalAtomGroupIndices[pos_cg];
5530                         fr->cginfo[pos_cg]                 = ddcginfo(cginfo_mb, cg_gl);
5531                         nrcg                               = GET_CGINFO_NATOMS(fr->cginfo[pos_cg]);
5532                         dd->atomGroups_.index[pos_cg + 1]  = dd->atomGroups_.index[pos_cg] + nrcg;
5533                         if (bBondComm)
5534                         {
5535                             /* Update the charge group presence,
5536                              * so we can use it in the next pass of the loop.
5537                              */
5538                             comm->bLocalCG[cg_gl] = TRUE;
5539                         }
5540                         pos_cg++;
5541                     }
5542                     if (p == 0)
5543                     {
5544                         comm->zone_ncg1[nzone+zone] = ind->nrecv[zone];
5545                     }
5546                     zone++;
5547                     zone_cg_range[nzone+zone] = pos_cg;
5548                 }
5549             }
5550             else
5551             {
5552                 /* This part of the code is never executed with bBondComm. */
5553                 merge_cg_buffers(nzone, cd, p, zone_cg_range,
5554                                  dd->globalAtomGroupIndices, integerBufferRef.data(),
5555                                  cg_cm, as_rvec_array(rvecBufferRef.data()),
5556                                  dd->atomGroups_.index,
5557                                  fr->cginfo_mb, fr->cginfo);
5558                 pos_cg += ind->nrecv[nzone];
5559             }
5560             nat_tot += ind->nrecv[nzone+1];
5561         }
5562         if (!cd->receiveInPlace)
5563         {
5564             /* Store the atom block for easy copying of communication buffers */
5565             make_cell2at_index(cd, nzone, zone_cg_range[nzone], dd->atomGroups());
5566         }
5567         nzone += nzone;
5568     }
5569
5570     comm->atomRanges.setEnd(DDAtomRanges::Type::Zones, nat_tot);
5571
5572     if (!bBondComm)
5573     {
5574         /* We don't need to update cginfo, since that was alrady done above.
5575          * So we pass NULL for the forcerec.
5576          */
5577         dd_set_cginfo(dd->globalAtomGroupIndices,
5578                       dd->ncg_home, dd->globalAtomGroupIndices.size(),
5579                       nullptr, comm->bLocalCG);
5580     }
5581
5582     if (debug)
5583     {
5584         fprintf(debug, "Finished setting up DD communication, zones:");
5585         for (c = 0; c < zones->n; c++)
5586         {
5587             fprintf(debug, " %d", zones->cg_range[c+1]-zones->cg_range[c]);
5588         }
5589         fprintf(debug, "\n");
5590     }
5591 }
5592
5593 static void set_cg_boundaries(gmx_domdec_zones_t *zones)
5594 {
5595     int c;
5596
5597     for (c = 0; c < zones->nizone; c++)
5598     {
5599         zones->izone[c].cg1  = zones->cg_range[c+1];
5600         zones->izone[c].jcg0 = zones->cg_range[zones->izone[c].j0];
5601         zones->izone[c].jcg1 = zones->cg_range[zones->izone[c].j1];
5602     }
5603 }
5604
5605 /* \brief Set zone dimensions for zones \p zone_start to \p zone_end-1
5606  *
5607  * Also sets the atom density for the home zone when \p zone_start=0.
5608  * For this \p numMovedChargeGroupsInHomeZone needs to be passed to tell
5609  * how many charge groups will move but are still part of the current range.
5610  * \todo When converting domdec to use proper classes, all these variables
5611  *       should be private and a method should return the correct count
5612  *       depending on an internal state.
5613  *
5614  * \param[in,out] dd          The domain decomposition struct
5615  * \param[in]     box         The box
5616  * \param[in]     ddbox       The domain decomposition box struct
5617  * \param[in]     zone_start  The start of the zone range to set sizes for
5618  * \param[in]     zone_end    The end of the zone range to set sizes for
5619  * \param[in]     numMovedChargeGroupsInHomeZone  The number of charge groups in the home zone that should moved but are still present in dd->comm->zones.cg_range
5620  */
5621 static void set_zones_size(gmx_domdec_t *dd,
5622                            matrix box, const gmx_ddbox_t *ddbox,
5623                            int zone_start, int zone_end,
5624                            int numMovedChargeGroupsInHomeZone)
5625 {
5626     gmx_domdec_comm_t  *comm;
5627     gmx_domdec_zones_t *zones;
5628     gmx_bool            bDistMB;
5629     int                 z, zi, d, dim;
5630     real                rcs, rcmbs;
5631     int                 i, j;
5632     real                vol;
5633
5634     comm = dd->comm;
5635
5636     zones = &comm->zones;
5637
5638     /* Do we need to determine extra distances for multi-body bondeds? */
5639     bDistMB = (comm->bInterCGMultiBody && isDlbOn(dd->comm) && dd->ndim > 1);
5640
5641     for (z = zone_start; z < zone_end; z++)
5642     {
5643         /* Copy cell limits to zone limits.
5644          * Valid for non-DD dims and non-shifted dims.
5645          */
5646         copy_rvec(comm->cell_x0, zones->size[z].x0);
5647         copy_rvec(comm->cell_x1, zones->size[z].x1);
5648     }
5649
5650     for (d = 0; d < dd->ndim; d++)
5651     {
5652         dim = dd->dim[d];
5653
5654         for (z = 0; z < zones->n; z++)
5655         {
5656             /* With a staggered grid we have different sizes
5657              * for non-shifted dimensions.
5658              */
5659             if (isDlbOn(dd->comm) && zones->shift[z][dim] == 0)
5660             {
5661                 if (d == 1)
5662                 {
5663                     zones->size[z].x0[dim] = comm->zone_d1[zones->shift[z][dd->dim[d-1]]].min0;
5664                     zones->size[z].x1[dim] = comm->zone_d1[zones->shift[z][dd->dim[d-1]]].max1;
5665                 }
5666                 else if (d == 2)
5667                 {
5668                     zones->size[z].x0[dim] = comm->zone_d2[zones->shift[z][dd->dim[d-2]]][zones->shift[z][dd->dim[d-1]]].min0;
5669                     zones->size[z].x1[dim] = comm->zone_d2[zones->shift[z][dd->dim[d-2]]][zones->shift[z][dd->dim[d-1]]].max1;
5670                 }
5671             }
5672         }
5673
5674         rcs   = comm->cutoff;
5675         rcmbs = comm->cutoff_mbody;
5676         if (ddbox->tric_dir[dim])
5677         {
5678             rcs   /= ddbox->skew_fac[dim];
5679             rcmbs /= ddbox->skew_fac[dim];
5680         }
5681
5682         /* Set the lower limit for the shifted zone dimensions */
5683         for (z = zone_start; z < zone_end; z++)
5684         {
5685             if (zones->shift[z][dim] > 0)
5686             {
5687                 dim = dd->dim[d];
5688                 if (!isDlbOn(dd->comm) || d == 0)
5689                 {
5690                     zones->size[z].x0[dim] = comm->cell_x1[dim];
5691                     zones->size[z].x1[dim] = comm->cell_x1[dim] + rcs;
5692                 }
5693                 else
5694                 {
5695                     /* Here we take the lower limit of the zone from
5696                      * the lowest domain of the zone below.
5697                      */
5698                     if (z < 4)
5699                     {
5700                         zones->size[z].x0[dim] =
5701                             comm->zone_d1[zones->shift[z][dd->dim[d-1]]].min1;
5702                     }
5703                     else
5704                     {
5705                         if (d == 1)
5706                         {
5707                             zones->size[z].x0[dim] =
5708                                 zones->size[zone_perm[2][z-4]].x0[dim];
5709                         }
5710                         else
5711                         {
5712                             zones->size[z].x0[dim] =
5713                                 comm->zone_d2[zones->shift[z][dd->dim[d-2]]][zones->shift[z][dd->dim[d-1]]].min1;
5714                         }
5715                     }
5716                     /* A temporary limit, is updated below */
5717                     zones->size[z].x1[dim] = zones->size[z].x0[dim];
5718
5719                     if (bDistMB)
5720                     {
5721                         for (zi = 0; zi < zones->nizone; zi++)
5722                         {
5723                             if (zones->shift[zi][dim] == 0)
5724                             {
5725                                 /* This takes the whole zone into account.
5726                                  * With multiple pulses this will lead
5727                                  * to a larger zone then strictly necessary.
5728                                  */
5729                                 zones->size[z].x1[dim] = std::max(zones->size[z].x1[dim],
5730                                                                   zones->size[zi].x1[dim]+rcmbs);
5731                             }
5732                         }
5733                     }
5734                 }
5735             }
5736         }
5737
5738         /* Loop over the i-zones to set the upper limit of each
5739          * j-zone they see.
5740          */
5741         for (zi = 0; zi < zones->nizone; zi++)
5742         {
5743             if (zones->shift[zi][dim] == 0)
5744             {
5745                 /* We should only use zones up to zone_end */
5746                 int jZoneEnd = std::min(zones->izone[zi].j1, zone_end);
5747                 for (z = zones->izone[zi].j0; z < jZoneEnd; z++)
5748                 {
5749                     if (zones->shift[z][dim] > 0)
5750                     {
5751                         zones->size[z].x1[dim] = std::max(zones->size[z].x1[dim],
5752                                                           zones->size[zi].x1[dim]+rcs);
5753                     }
5754                 }
5755             }
5756         }
5757     }
5758
5759     for (z = zone_start; z < zone_end; z++)
5760     {
5761         /* Initialization only required to keep the compiler happy */
5762         rvec corner_min = {0, 0, 0}, corner_max = {0, 0, 0}, corner;
5763         int  nc, c;
5764
5765         /* To determine the bounding box for a zone we need to find
5766          * the extreme corners of 4, 2 or 1 corners.
5767          */
5768         nc = 1 << (ddbox->nboundeddim - 1);
5769
5770         for (c = 0; c < nc; c++)
5771         {
5772             /* Set up a zone corner at x=0, ignoring trilinic couplings */
5773             corner[XX] = 0;
5774             if ((c & 1) == 0)
5775             {
5776                 corner[YY] = zones->size[z].x0[YY];
5777             }
5778             else
5779             {
5780                 corner[YY] = zones->size[z].x1[YY];
5781             }
5782             if ((c & 2) == 0)
5783             {
5784                 corner[ZZ] = zones->size[z].x0[ZZ];
5785             }
5786             else
5787             {
5788                 corner[ZZ] = zones->size[z].x1[ZZ];
5789             }
5790             if (dd->ndim == 1 && dd->dim[0] < ZZ && ZZ < dd->npbcdim &&
5791                 box[ZZ][1 - dd->dim[0]] != 0)
5792             {
5793                 /* With 1D domain decomposition the cg's are not in
5794                  * the triclinic box, but triclinic x-y and rectangular y/x-z.
5795                  * Shift the corner of the z-vector back to along the box
5796                  * vector of dimension d, so it will later end up at 0 along d.
5797                  * This can affect the location of this corner along dd->dim[0]
5798                  * through the matrix operation below if box[d][dd->dim[0]]!=0.
5799                  */
5800                 int d = 1 - dd->dim[0];
5801
5802                 corner[d] -= corner[ZZ]*box[ZZ][d]/box[ZZ][ZZ];
5803             }
5804             /* Apply the triclinic couplings */
5805             assert(ddbox->npbcdim <= DIM);
5806             for (i = YY; i < ddbox->npbcdim; i++)
5807             {
5808                 for (j = XX; j < i; j++)
5809                 {
5810                     corner[j] += corner[i]*box[i][j]/box[i][i];
5811                 }
5812             }
5813             if (c == 0)
5814             {
5815                 copy_rvec(corner, corner_min);
5816                 copy_rvec(corner, corner_max);
5817             }
5818             else
5819             {
5820                 for (i = 0; i < DIM; i++)
5821                 {
5822                     corner_min[i] = std::min(corner_min[i], corner[i]);
5823                     corner_max[i] = std::max(corner_max[i], corner[i]);
5824                 }
5825             }
5826         }
5827         /* Copy the extreme cornes without offset along x */
5828         for (i = 0; i < DIM; i++)
5829         {
5830             zones->size[z].bb_x0[i] = corner_min[i];
5831             zones->size[z].bb_x1[i] = corner_max[i];
5832         }
5833         /* Add the offset along x */
5834         zones->size[z].bb_x0[XX] += zones->size[z].x0[XX];
5835         zones->size[z].bb_x1[XX] += zones->size[z].x1[XX];
5836     }
5837
5838     if (zone_start == 0)
5839     {
5840         vol = 1;
5841         for (dim = 0; dim < DIM; dim++)
5842         {
5843             vol *= zones->size[0].x1[dim] - zones->size[0].x0[dim];
5844         }
5845         zones->dens_zone0 = (zones->cg_range[1] - zones->cg_range[0] - numMovedChargeGroupsInHomeZone)/vol;
5846     }
5847
5848     if (debug)
5849     {
5850         for (z = zone_start; z < zone_end; z++)
5851         {
5852             fprintf(debug, "zone %d    %6.3f - %6.3f  %6.3f - %6.3f  %6.3f - %6.3f\n",
5853                     z,
5854                     zones->size[z].x0[XX], zones->size[z].x1[XX],
5855                     zones->size[z].x0[YY], zones->size[z].x1[YY],
5856                     zones->size[z].x0[ZZ], zones->size[z].x1[ZZ]);
5857             fprintf(debug, "zone %d bb %6.3f - %6.3f  %6.3f - %6.3f  %6.3f - %6.3f\n",
5858                     z,
5859                     zones->size[z].bb_x0[XX], zones->size[z].bb_x1[XX],
5860                     zones->size[z].bb_x0[YY], zones->size[z].bb_x1[YY],
5861                     zones->size[z].bb_x0[ZZ], zones->size[z].bb_x1[ZZ]);
5862         }
5863     }
5864 }
5865
5866 static int comp_cgsort(const void *a, const void *b)
5867 {
5868     int           comp;
5869
5870     gmx_cgsort_t *cga, *cgb;
5871     cga = (gmx_cgsort_t *)a;
5872     cgb = (gmx_cgsort_t *)b;
5873
5874     comp = cga->nsc - cgb->nsc;
5875     if (comp == 0)
5876     {
5877         comp = cga->ind_gl - cgb->ind_gl;
5878     }
5879
5880     return comp;
5881 }
5882
5883 static void order_int_cg(int n, const gmx_cgsort_t *sort,
5884                          int *a, int *buf)
5885 {
5886     int i;
5887
5888     /* Order the data */
5889     for (i = 0; i < n; i++)
5890     {
5891         buf[i] = a[sort[i].ind];
5892     }
5893
5894     /* Copy back to the original array */
5895     for (i = 0; i < n; i++)
5896     {
5897         a[i] = buf[i];
5898     }
5899 }
5900
5901 static void order_vec_cg(int                      n,
5902                          const gmx_cgsort_t      *sort,
5903                          rvec                    *v,
5904                          gmx::ArrayRef<gmx::RVec> sortBuffer)
5905 {
5906     GMX_ASSERT(sortBuffer.size() >= static_cast<size_t>(n), "Need a sufficiently large sorting buffer");
5907
5908     rvec *buf = as_rvec_array(sortBuffer.data());
5909
5910     /* Order the data */
5911     for (int i = 0; i < n; i++)
5912     {
5913         copy_rvec(v[sort[i].ind], buf[i]);
5914     }
5915
5916     /* Copy back to the original array */
5917     for (int i = 0; i < n; i++)
5918     {
5919         copy_rvec(buf[i], v[i]);
5920     }
5921 }
5922
5923 static void order_vec_atom(int                       ncg,
5924                            const gmx::BlockRanges   *atomGroups,
5925                            const gmx_cgsort_t       *sort,
5926                            gmx::ArrayRef<gmx::RVec>  v,
5927                            gmx::ArrayRef<gmx::RVec>  buf)
5928 {
5929     int a, atot, cg, cg0, cg1, i;
5930
5931     if (atomGroups == nullptr)
5932     {
5933         /* Avoid the useless loop of the atoms within a cg */
5934         order_vec_cg(ncg, sort, as_rvec_array(v.data()), buf);
5935
5936         return;
5937     }
5938
5939     /* Order the data */
5940     a = 0;
5941     for (cg = 0; cg < ncg; cg++)
5942     {
5943         cg0 = atomGroups->index[sort[cg].ind];
5944         cg1 = atomGroups->index[sort[cg].ind+1];
5945         for (i = cg0; i < cg1; i++)
5946         {
5947             copy_rvec(v[i], buf[a]);
5948             a++;
5949         }
5950     }
5951     atot = a;
5952
5953     /* Copy back to the original array */
5954     for (a = 0; a < atot; a++)
5955     {
5956         copy_rvec(buf[a], v[a]);
5957     }
5958 }
5959
5960 static void ordered_sort(int nsort2, gmx_cgsort_t *sort2,
5961                          int nsort_new, gmx_cgsort_t *sort_new,
5962                          gmx_cgsort_t *sort1)
5963 {
5964     int i1, i2, i_new;
5965
5966     /* The new indices are not very ordered, so we qsort them */
5967     gmx_qsort_threadsafe(sort_new, nsort_new, sizeof(sort_new[0]), comp_cgsort);
5968
5969     /* sort2 is already ordered, so now we can merge the two arrays */
5970     i1    = 0;
5971     i2    = 0;
5972     i_new = 0;
5973     while (i2 < nsort2 || i_new < nsort_new)
5974     {
5975         if (i2 == nsort2)
5976         {
5977             sort1[i1++] = sort_new[i_new++];
5978         }
5979         else if (i_new == nsort_new)
5980         {
5981             sort1[i1++] = sort2[i2++];
5982         }
5983         else if (sort2[i2].nsc < sort_new[i_new].nsc ||
5984                  (sort2[i2].nsc == sort_new[i_new].nsc &&
5985                   sort2[i2].ind_gl < sort_new[i_new].ind_gl))
5986         {
5987             sort1[i1++] = sort2[i2++];
5988         }
5989         else
5990         {
5991             sort1[i1++] = sort_new[i_new++];
5992         }
5993     }
5994 }
5995
5996 static int dd_sort_order(gmx_domdec_t *dd, t_forcerec *fr, int ncg_home_old)
5997 {
5998     gmx_domdec_sort_t *sort;
5999     gmx_cgsort_t      *cgsort, *sort_i;
6000     int                ncg_new, nsort2, nsort_new, i, *a, moved;
6001
6002     sort = dd->comm->sort;
6003
6004     a = fr->ns->grid->cell_index;
6005
6006     moved = NSGRID_SIGNAL_MOVED_FAC*fr->ns->grid->ncells;
6007
6008     if (ncg_home_old >= 0)
6009     {
6010         /* The charge groups that remained in the same ns grid cell
6011          * are completely ordered. So we can sort efficiently by sorting
6012          * the charge groups that did move into the stationary list.
6013          */
6014         ncg_new   = 0;
6015         nsort2    = 0;
6016         nsort_new = 0;
6017         for (i = 0; i < dd->ncg_home; i++)
6018         {
6019             /* Check if this cg did not move to another node */
6020             if (a[i] < moved)
6021             {
6022                 if (i >= ncg_home_old || a[i] != sort->sort[i].nsc)
6023                 {
6024                     /* This cg is new on this node or moved ns grid cell */
6025                     if (nsort_new >= sort->sort_new_nalloc)
6026                     {
6027                         sort->sort_new_nalloc = over_alloc_dd(nsort_new+1);
6028                         srenew(sort->sort_new, sort->sort_new_nalloc);
6029                     }
6030                     sort_i = &(sort->sort_new[nsort_new++]);
6031                 }
6032                 else
6033                 {
6034                     /* This cg did not move */
6035                     sort_i = &(sort->sort2[nsort2++]);
6036                 }
6037                 /* Sort on the ns grid cell indices
6038                  * and the global topology index.
6039                  * index_gl is irrelevant with cell ns,
6040                  * but we set it here anyhow to avoid a conditional.
6041                  */
6042                 sort_i->nsc    = a[i];
6043                 sort_i->ind_gl = dd->globalAtomGroupIndices[i];
6044                 sort_i->ind    = i;
6045                 ncg_new++;
6046             }
6047         }
6048         if (debug)
6049         {
6050             fprintf(debug, "ordered sort cgs: stationary %d moved %d\n",
6051                     nsort2, nsort_new);
6052         }
6053         /* Sort efficiently */
6054         ordered_sort(nsort2, sort->sort2, nsort_new, sort->sort_new,
6055                      sort->sort);
6056     }
6057     else
6058     {
6059         cgsort  = sort->sort;
6060         ncg_new = 0;
6061         for (i = 0; i < dd->ncg_home; i++)
6062         {
6063             /* Sort on the ns grid cell indices
6064              * and the global topology index
6065              */
6066             cgsort[i].nsc    = a[i];
6067             cgsort[i].ind_gl = dd->globalAtomGroupIndices[i];
6068             cgsort[i].ind    = i;
6069             if (cgsort[i].nsc < moved)
6070             {
6071                 ncg_new++;
6072             }
6073         }
6074         if (debug)
6075         {
6076             fprintf(debug, "qsort cgs: %d new home %d\n", dd->ncg_home, ncg_new);
6077         }
6078         /* Determine the order of the charge groups using qsort */
6079         gmx_qsort_threadsafe(cgsort, dd->ncg_home, sizeof(cgsort[0]), comp_cgsort);
6080     }
6081
6082     return ncg_new;
6083 }
6084
6085 static int dd_sort_order_nbnxn(gmx_domdec_t *dd, t_forcerec *fr)
6086 {
6087     gmx_cgsort_t *sort;
6088     int           ncg_new, i, na;
6089     const int    *a;
6090
6091     sort = dd->comm->sort->sort;
6092
6093     nbnxn_get_atomorder(fr->nbv->nbs.get(), &a, &na);
6094
6095     ncg_new = 0;
6096     for (i = 0; i < na; i++)
6097     {
6098         if (a[i] >= 0)
6099         {
6100             sort[ncg_new].ind = a[i];
6101             ncg_new++;
6102         }
6103     }
6104
6105     return ncg_new;
6106 }
6107
6108 static void dd_sort_state(gmx_domdec_t *dd, rvec *cgcm, t_forcerec *fr, t_state *state,
6109                           int ncg_home_old)
6110 {
6111     gmx_domdec_sort_t *sort;
6112     gmx_cgsort_t      *cgsort;
6113     int                ncg_new, i, *ibuf, cgsize;
6114
6115     sort = dd->comm->sort;
6116
6117     if (dd->ncg_home > sort->sort_nalloc)
6118     {
6119         sort->sort_nalloc = over_alloc_dd(dd->ncg_home);
6120         srenew(sort->sort, sort->sort_nalloc);
6121         srenew(sort->sort2, sort->sort_nalloc);
6122     }
6123     cgsort = sort->sort;
6124
6125     switch (fr->cutoff_scheme)
6126     {
6127         case ecutsGROUP:
6128             ncg_new = dd_sort_order(dd, fr, ncg_home_old);
6129             break;
6130         case ecutsVERLET:
6131             ncg_new = dd_sort_order_nbnxn(dd, fr);
6132             break;
6133         default:
6134             gmx_incons("unimplemented");
6135             ncg_new = 0;
6136     }
6137
6138     const gmx::BlockRanges &atomGroups = dd->atomGroups();
6139
6140     /* We alloc with the old size, since cgindex is still old */
6141     DDBufferAccess<gmx::RVec>  rvecBuffer(dd->comm->rvecBuffer, atomGroups.index[dd->ncg_home]);
6142
6143     const gmx::BlockRanges    *atomGroupsPtr = (dd->comm->bCGs ? &atomGroups : nullptr);
6144
6145     /* Remove the charge groups which are no longer at home here */
6146     dd->ncg_home = ncg_new;
6147     if (debug)
6148     {
6149         fprintf(debug, "Set the new home charge group count to %d\n",
6150                 dd->ncg_home);
6151     }
6152
6153     /* Reorder the state */
6154     if (state->flags & (1 << estX))
6155     {
6156         order_vec_atom(dd->ncg_home, atomGroupsPtr, cgsort, state->x, rvecBuffer.buffer);
6157     }
6158     if (state->flags & (1 << estV))
6159     {
6160         order_vec_atom(dd->ncg_home, atomGroupsPtr, cgsort, state->v, rvecBuffer.buffer);
6161     }
6162     if (state->flags & (1 << estCGP))
6163     {
6164         order_vec_atom(dd->ncg_home, atomGroupsPtr, cgsort, state->cg_p, rvecBuffer.buffer);
6165     }
6166
6167     if (fr->cutoff_scheme == ecutsGROUP)
6168     {
6169         /* Reorder cgcm */
6170         order_vec_cg(dd->ncg_home, cgsort, cgcm, rvecBuffer.buffer);
6171     }
6172
6173     if (dd->ncg_home+1 > sort->ibuf_nalloc)
6174     {
6175         sort->ibuf_nalloc = over_alloc_dd(dd->ncg_home+1);
6176         srenew(sort->ibuf, sort->ibuf_nalloc);
6177     }
6178     ibuf = sort->ibuf;
6179     /* Reorder the global cg index */
6180     order_int_cg(dd->ncg_home, cgsort, dd->globalAtomGroupIndices.data(), ibuf);
6181     /* Reorder the cginfo */
6182     order_int_cg(dd->ncg_home, cgsort, fr->cginfo, ibuf);
6183     /* Rebuild the local cg index */
6184     if (dd->comm->bCGs)
6185     {
6186         ibuf[0] = 0;
6187         for (i = 0; i < dd->ncg_home; i++)
6188         {
6189             cgsize    = atomGroups.index[cgsort[i].ind+1] - atomGroups.index[cgsort[i].ind];
6190             ibuf[i+1] = ibuf[i] + cgsize;
6191         }
6192         for (i = 0; i < dd->ncg_home+1; i++)
6193         {
6194             dd->atomGroups_.index[i] = ibuf[i];
6195         }
6196     }
6197     else
6198     {
6199         for (i = 0; i < dd->ncg_home+1; i++)
6200         {
6201             dd->atomGroups_.index[i] = i;
6202         }
6203     }
6204     /* Set the home atom number */
6205     dd->comm->atomRanges.setEnd(DDAtomRanges::Type::Home, dd->atomGroups().index[dd->ncg_home]);
6206
6207     if (fr->cutoff_scheme == ecutsVERLET)
6208     {
6209         /* The atoms are now exactly in grid order, update the grid order */
6210         nbnxn_set_atomorder(fr->nbv->nbs.get());
6211     }
6212     else
6213     {
6214         /* Copy the sorted ns cell indices back to the ns grid struct */
6215         for (i = 0; i < dd->ncg_home; i++)
6216         {
6217             fr->ns->grid->cell_index[i] = cgsort[i].nsc;
6218         }
6219         fr->ns->grid->nr = dd->ncg_home;
6220     }
6221 }
6222
6223 static void add_dd_statistics(gmx_domdec_t *dd)
6224 {
6225     gmx_domdec_comm_t *comm = dd->comm;
6226
6227     for (int i = 0; i < static_cast<int>(DDAtomRanges::Type::Number); i++)
6228     {
6229         auto range = static_cast<DDAtomRanges::Type>(i);
6230         comm->sum_nat[i] +=
6231             comm->atomRanges.end(range) - comm->atomRanges.start(range);
6232     }
6233     comm->ndecomp++;
6234 }
6235
6236 void reset_dd_statistics_counters(gmx_domdec_t *dd)
6237 {
6238     gmx_domdec_comm_t *comm = dd->comm;
6239
6240     /* Reset all the statistics and counters for total run counting */
6241     for (int i = 0; i < static_cast<int>(DDAtomRanges::Type::Number); i++)
6242     {
6243         comm->sum_nat[i] = 0;
6244     }
6245     comm->ndecomp   = 0;
6246     comm->nload     = 0;
6247     comm->load_step = 0;
6248     comm->load_sum  = 0;
6249     comm->load_max  = 0;
6250     clear_ivec(comm->load_lim);
6251     comm->load_mdf = 0;
6252     comm->load_pme = 0;
6253 }
6254
6255 void print_dd_statistics(const t_commrec *cr, const t_inputrec *ir, FILE *fplog)
6256 {
6257     gmx_domdec_comm_t *comm      = cr->dd->comm;
6258
6259     const int          numRanges = static_cast<int>(DDAtomRanges::Type::Number);
6260     gmx_sumd(numRanges, comm->sum_nat, cr);
6261
6262     if (fplog == nullptr)
6263     {
6264         return;
6265     }
6266
6267     fprintf(fplog, "\n    D O M A I N   D E C O M P O S I T I O N   S T A T I S T I C S\n\n");
6268
6269     for (int i = static_cast<int>(DDAtomRanges::Type::Zones); i < numRanges; i++)
6270     {
6271         auto   range = static_cast<DDAtomRanges::Type>(i);
6272         double av    = comm->sum_nat[i]/comm->ndecomp;
6273         switch (range)
6274         {
6275             case DDAtomRanges::Type::Zones:
6276                 fprintf(fplog,
6277                         " av. #atoms communicated per step for force:  %d x %.1f\n",
6278                         2, av);
6279                 break;
6280             case DDAtomRanges::Type::Vsites:
6281                 if (cr->dd->vsite_comm)
6282                 {
6283                     fprintf(fplog,
6284                             " av. #atoms communicated per step for vsites: %d x %.1f\n",
6285                             (EEL_PME(ir->coulombtype) || ir->coulombtype == eelEWALD) ? 3 : 2,
6286                             av);
6287                 }
6288                 break;
6289             case DDAtomRanges::Type::Constraints:
6290                 if (cr->dd->constraint_comm)
6291                 {
6292                     fprintf(fplog,
6293                             " av. #atoms communicated per step for LINCS:  %d x %.1f\n",
6294                             1 + ir->nLincsIter, av);
6295                 }
6296                 break;
6297             default:
6298                 gmx_incons(" Unknown type for DD statistics");
6299         }
6300     }
6301     fprintf(fplog, "\n");
6302
6303     if (comm->bRecordLoad && EI_DYNAMICS(ir->eI))
6304     {
6305         print_dd_load_av(fplog, cr->dd);
6306     }
6307 }
6308
6309 void dd_partition_system(FILE                *fplog,
6310                          gmx_int64_t          step,
6311                          const t_commrec     *cr,
6312                          gmx_bool             bMasterState,
6313                          int                  nstglobalcomm,
6314                          t_state             *state_global,
6315                          const gmx_mtop_t    *top_global,
6316                          const t_inputrec    *ir,
6317                          t_state             *state_local,
6318                          PaddedRVecVector    *f,
6319                          gmx::MDAtoms        *mdAtoms,
6320                          gmx_localtop_t      *top_local,
6321                          t_forcerec          *fr,
6322                          gmx_vsite_t         *vsite,
6323                          gmx::Constraints    *constr,
6324                          t_nrnb              *nrnb,
6325                          gmx_wallcycle       *wcycle,
6326                          gmx_bool             bVerbose)
6327 {
6328     gmx_domdec_t      *dd;
6329     gmx_domdec_comm_t *comm;
6330     gmx_ddbox_t        ddbox = {0};
6331     t_block           *cgs_gl;
6332     gmx_int64_t        step_pcoupl;
6333     rvec               cell_ns_x0, cell_ns_x1;
6334     int                ncgindex_set, ncg_home_old = -1, ncg_moved, nat_f_novirsum;
6335     gmx_bool           bBoxChanged, bNStGlobalComm, bDoDLB, bCheckWhetherToTurnDlbOn, bLogLoad;
6336     gmx_bool           bRedist, bSortCG, bResortAll;
6337     ivec               ncells_old = {0, 0, 0}, ncells_new = {0, 0, 0}, np;
6338     real               grid_density;
6339     char               sbuf[22];
6340
6341     wallcycle_start(wcycle, ewcDOMDEC);
6342
6343     dd   = cr->dd;
6344     comm = dd->comm;
6345
6346     // TODO if the update code becomes accessible here, use
6347     // upd->deform for this logic.
6348     bBoxChanged = (bMasterState || inputrecDeform(ir));
6349     if (ir->epc != epcNO)
6350     {
6351         /* With nstpcouple > 1 pressure coupling happens.
6352          * one step after calculating the pressure.
6353          * Box scaling happens at the end of the MD step,
6354          * after the DD partitioning.
6355          * We therefore have to do DLB in the first partitioning
6356          * after an MD step where P-coupling occurred.
6357          * We need to determine the last step in which p-coupling occurred.
6358          * MRS -- need to validate this for vv?
6359          */
6360         int n = ir->nstpcouple;
6361         if (n == 1)
6362         {
6363             step_pcoupl = step - 1;
6364         }
6365         else
6366         {
6367             step_pcoupl = ((step - 1)/n)*n + 1;
6368         }
6369         if (step_pcoupl >= comm->partition_step)
6370         {
6371             bBoxChanged = TRUE;
6372         }
6373     }
6374
6375     bNStGlobalComm = (step % nstglobalcomm == 0);
6376
6377     if (!isDlbOn(comm))
6378     {
6379         bDoDLB = FALSE;
6380     }
6381     else
6382     {
6383         /* Should we do dynamic load balacing this step?
6384          * Since it requires (possibly expensive) global communication,
6385          * we might want to do DLB less frequently.
6386          */
6387         if (bBoxChanged || ir->epc != epcNO)
6388         {
6389             bDoDLB = bBoxChanged;
6390         }
6391         else
6392         {
6393             bDoDLB = bNStGlobalComm;
6394         }
6395     }
6396
6397     /* Check if we have recorded loads on the nodes */
6398     if (comm->bRecordLoad && dd_load_count(comm) > 0)
6399     {
6400         bCheckWhetherToTurnDlbOn = dd_dlb_get_should_check_whether_to_turn_dlb_on(dd);
6401
6402         /* Print load every nstlog, first and last step to the log file */
6403         bLogLoad = ((ir->nstlog > 0 && step % ir->nstlog == 0) ||
6404                     comm->n_load_collect == 0 ||
6405                     (ir->nsteps >= 0 &&
6406                      (step + ir->nstlist > ir->init_step + ir->nsteps)));
6407
6408         /* Avoid extra communication due to verbose screen output
6409          * when nstglobalcomm is set.
6410          */
6411         if (bDoDLB || bLogLoad || bCheckWhetherToTurnDlbOn ||
6412             (bVerbose && (ir->nstlist == 0 || nstglobalcomm <= ir->nstlist)))
6413         {
6414             get_load_distribution(dd, wcycle);
6415             if (DDMASTER(dd))
6416             {
6417                 if (bLogLoad)
6418                 {
6419                     dd_print_load(fplog, dd, step-1);
6420                 }
6421                 if (bVerbose)
6422                 {
6423                     dd_print_load_verbose(dd);
6424                 }
6425             }
6426             comm->n_load_collect++;
6427
6428             if (isDlbOn(comm))
6429             {
6430                 if (DDMASTER(dd))
6431                 {
6432                     /* Add the measured cycles to the running average */
6433                     const float averageFactor        = 0.1f;
6434                     comm->cyclesPerStepDlbExpAverage =
6435                         (1 - averageFactor)*comm->cyclesPerStepDlbExpAverage +
6436                         averageFactor*comm->cycl[ddCyclStep]/comm->cycl_n[ddCyclStep];
6437                 }
6438                 if (comm->dlbState == edlbsOnCanTurnOff &&
6439                     dd->comm->n_load_have % c_checkTurnDlbOffInterval == c_checkTurnDlbOffInterval - 1)
6440                 {
6441                     gmx_bool turnOffDlb;
6442                     if (DDMASTER(dd))
6443                     {
6444                         /* If the running averaged cycles with DLB are more
6445                          * than before we turned on DLB, turn off DLB.
6446                          * We will again run and check the cycles without DLB
6447                          * and we can then decide if to turn off DLB forever.
6448                          */
6449                         turnOffDlb = (comm->cyclesPerStepDlbExpAverage >
6450                                       comm->cyclesPerStepBeforeDLB);
6451                     }
6452                     dd_bcast(dd, sizeof(turnOffDlb), &turnOffDlb);
6453                     if (turnOffDlb)
6454                     {
6455                         /* To turn off DLB, we need to redistribute the atoms */
6456                         dd_collect_state(dd, state_local, state_global);
6457                         bMasterState = TRUE;
6458                         turn_off_dlb(fplog, cr, step);
6459                     }
6460                 }
6461             }
6462             else if (bCheckWhetherToTurnDlbOn)
6463             {
6464                 gmx_bool turnOffDlbForever = FALSE;
6465                 gmx_bool turnOnDlb         = FALSE;
6466
6467                 /* Since the timings are node dependent, the master decides */
6468                 if (DDMASTER(dd))
6469                 {
6470                     /* If we recently turned off DLB, we want to check if
6471                      * performance is better without DLB. We want to do this
6472                      * ASAP to minimize the chance that external factors
6473                      * slowed down the DLB step are gone here and we
6474                      * incorrectly conclude that DLB was causing the slowdown.
6475                      * So we measure one nstlist block, no running average.
6476                      */
6477                     if (comm->haveTurnedOffDlb &&
6478                         comm->cycl[ddCyclStep]/comm->cycl_n[ddCyclStep] <
6479                         comm->cyclesPerStepDlbExpAverage)
6480                     {
6481                         /* After turning off DLB we ran nstlist steps in fewer
6482                          * cycles than with DLB. This likely means that DLB
6483                          * in not benefical, but this could be due to a one
6484                          * time unlucky fluctuation, so we require two such
6485                          * observations in close succession to turn off DLB
6486                          * forever.
6487                          */
6488                         if (comm->dlbSlowerPartitioningCount > 0 &&
6489                             dd->ddp_count < comm->dlbSlowerPartitioningCount + 10*c_checkTurnDlbOnInterval)
6490                         {
6491                             turnOffDlbForever = TRUE;
6492                         }
6493                         comm->haveTurnedOffDlb           = false;
6494                         /* Register when we last measured DLB slowdown */
6495                         comm->dlbSlowerPartitioningCount = dd->ddp_count;
6496                     }
6497                     else
6498                     {
6499                         /* Here we check if the max PME rank load is more than 0.98
6500                          * the max PP force load. If so, PP DLB will not help,
6501                          * since we are (almost) limited by PME. Furthermore,
6502                          * DLB will cause a significant extra x/f redistribution
6503                          * cost on the PME ranks, which will then surely result
6504                          * in lower total performance.
6505                          */
6506                         if (cr->npmenodes > 0 &&
6507                             dd_pme_f_ratio(dd) > 1 - DD_PERF_LOSS_DLB_ON)
6508                         {
6509                             turnOnDlb = FALSE;
6510                         }
6511                         else
6512                         {
6513                             turnOnDlb = (dd_force_imb_perf_loss(dd) >= DD_PERF_LOSS_DLB_ON);
6514                         }
6515                     }
6516                 }
6517                 struct
6518                 {
6519                     gmx_bool turnOffDlbForever;
6520                     gmx_bool turnOnDlb;
6521                 }
6522                 bools {
6523                     turnOffDlbForever, turnOnDlb
6524                 };
6525                 dd_bcast(dd, sizeof(bools), &bools);
6526                 if (bools.turnOffDlbForever)
6527                 {
6528                     turn_off_dlb_forever(fplog, cr, step);
6529                 }
6530                 else if (bools.turnOnDlb)
6531                 {
6532                     turn_on_dlb(fplog, cr, step);
6533                     bDoDLB = TRUE;
6534                 }
6535             }
6536         }
6537         comm->n_load_have++;
6538     }
6539
6540     cgs_gl = &comm->cgs_gl;
6541
6542     bRedist = FALSE;
6543     if (bMasterState)
6544     {
6545         /* Clear the old state */
6546         clearDDStateIndices(dd, 0, 0);
6547         ncgindex_set = 0;
6548
6549         auto xGlobal = positionsFromStatePointer(state_global);
6550
6551         set_ddbox(dd, true, ir,
6552                   DDMASTER(dd) ? state_global->box : nullptr,
6553                   true, xGlobal,
6554                   &ddbox);
6555
6556         distributeState(fplog, dd, state_global, *cgs_gl, ddbox, state_local, f);
6557
6558         dd_make_local_cgs(dd, &top_local->cgs);
6559
6560         /* Ensure that we have space for the new distribution */
6561         dd_check_alloc_ncg(fr, state_local, f, dd->ncg_home);
6562
6563         if (fr->cutoff_scheme == ecutsGROUP)
6564         {
6565             calc_cgcm(fplog, 0, dd->ncg_home,
6566                       &top_local->cgs, as_rvec_array(state_local->x.data()), fr->cg_cm);
6567         }
6568
6569         inc_nrnb(nrnb, eNR_CGCM, comm->atomRanges.numHomeAtoms());
6570
6571         dd_set_cginfo(dd->globalAtomGroupIndices, 0, dd->ncg_home, fr, comm->bLocalCG);
6572     }
6573     else if (state_local->ddp_count != dd->ddp_count)
6574     {
6575         if (state_local->ddp_count > dd->ddp_count)
6576         {
6577             gmx_fatal(FARGS, "Internal inconsistency state_local->ddp_count (%d) > dd->ddp_count (%d)", state_local->ddp_count, dd->ddp_count);
6578         }
6579
6580         if (state_local->ddp_count_cg_gl != state_local->ddp_count)
6581         {
6582             gmx_fatal(FARGS, "Internal inconsistency state_local->ddp_count_cg_gl (%d) != state_local->ddp_count (%d)", state_local->ddp_count_cg_gl, state_local->ddp_count);
6583         }
6584
6585         /* Clear the old state */
6586         clearDDStateIndices(dd, 0, 0);
6587
6588         /* Restore the atom group indices from state_local */
6589         restoreAtomGroups(dd, cgs_gl->index, state_local);
6590         make_dd_indices(dd, cgs_gl->index, 0);
6591         ncgindex_set = dd->ncg_home;
6592
6593         if (fr->cutoff_scheme == ecutsGROUP)
6594         {
6595             /* Redetermine the cg COMs */
6596             calc_cgcm(fplog, 0, dd->ncg_home,
6597                       &top_local->cgs, as_rvec_array(state_local->x.data()), fr->cg_cm);
6598         }
6599
6600         inc_nrnb(nrnb, eNR_CGCM, comm->atomRanges.numHomeAtoms());
6601
6602         dd_set_cginfo(dd->globalAtomGroupIndices, 0, dd->ncg_home, fr, comm->bLocalCG);
6603
6604         set_ddbox(dd, bMasterState, ir, state_local->box,
6605                   true, state_local->x, &ddbox);
6606
6607         bRedist = isDlbOn(comm);
6608     }
6609     else
6610     {
6611         /* We have the full state, only redistribute the cgs */
6612
6613         /* Clear the non-home indices */
6614         clearDDStateIndices(dd, dd->ncg_home, comm->atomRanges.numHomeAtoms());
6615         ncgindex_set = 0;
6616
6617         /* Avoid global communication for dim's without pbc and -gcom */
6618         if (!bNStGlobalComm)
6619         {
6620             copy_rvec(comm->box0, ddbox.box0    );
6621             copy_rvec(comm->box_size, ddbox.box_size);
6622         }
6623         set_ddbox(dd, bMasterState, ir, state_local->box,
6624                   bNStGlobalComm, state_local->x, &ddbox);
6625
6626         bBoxChanged = TRUE;
6627         bRedist     = TRUE;
6628     }
6629     /* For dim's without pbc and -gcom */
6630     copy_rvec(ddbox.box0, comm->box0    );
6631     copy_rvec(ddbox.box_size, comm->box_size);
6632
6633     set_dd_cell_sizes(dd, &ddbox, dynamic_dd_box(&ddbox, ir), bMasterState, bDoDLB,
6634                       step, wcycle);
6635
6636     if (comm->nstDDDumpGrid > 0 && step % comm->nstDDDumpGrid == 0)
6637     {
6638         write_dd_grid_pdb("dd_grid", step, dd, state_local->box, &ddbox);
6639     }
6640
6641     /* Check if we should sort the charge groups */
6642     bSortCG = (bMasterState || bRedist);
6643
6644     ncg_home_old = dd->ncg_home;
6645
6646     /* When repartitioning we mark charge groups that will move to neighboring
6647      * DD cells, but we do not move them right away for performance reasons.
6648      * Thus we need to keep track of how many charge groups will move for
6649      * obtaining correct local charge group / atom counts.
6650      */
6651     ncg_moved = 0;
6652     if (bRedist)
6653     {
6654         wallcycle_sub_start(wcycle, ewcsDD_REDIST);
6655
6656         dd_redistribute_cg(fplog, step, dd, ddbox.tric_dir,
6657                            state_local, f, fr,
6658                            !bSortCG, nrnb, &ncgindex_set, &ncg_moved);
6659
6660         wallcycle_sub_stop(wcycle, ewcsDD_REDIST);
6661     }
6662
6663     get_nsgrid_boundaries(ddbox.nboundeddim, state_local->box,
6664                           dd, &ddbox,
6665                           &comm->cell_x0, &comm->cell_x1,
6666                           dd->ncg_home, fr->cg_cm,
6667                           cell_ns_x0, cell_ns_x1, &grid_density);
6668
6669     if (bBoxChanged)
6670     {
6671         comm_dd_ns_cell_sizes(dd, &ddbox, cell_ns_x0, cell_ns_x1, step);
6672     }
6673
6674     switch (fr->cutoff_scheme)
6675     {
6676         case ecutsGROUP:
6677             copy_ivec(fr->ns->grid->n, ncells_old);
6678             grid_first(fplog, fr->ns->grid, dd, &ddbox,
6679                        state_local->box, cell_ns_x0, cell_ns_x1,
6680                        fr->rlist, grid_density);
6681             break;
6682         case ecutsVERLET:
6683             nbnxn_get_ncells(fr->nbv->nbs.get(), &ncells_old[XX], &ncells_old[YY]);
6684             break;
6685         default:
6686             gmx_incons("unimplemented");
6687     }
6688     /* We need to store tric_dir for dd_get_ns_ranges called from ns.c */
6689     copy_ivec(ddbox.tric_dir, comm->tric_dir);
6690
6691     if (bSortCG)
6692     {
6693         wallcycle_sub_start(wcycle, ewcsDD_GRID);
6694
6695         /* Sort the state on charge group position.
6696          * This enables exact restarts from this step.
6697          * It also improves performance by about 15% with larger numbers
6698          * of atoms per node.
6699          */
6700
6701         /* Fill the ns grid with the home cell,
6702          * so we can sort with the indices.
6703          */
6704         set_zones_ncg_home(dd);
6705
6706         switch (fr->cutoff_scheme)
6707         {
6708             case ecutsVERLET:
6709                 set_zones_size(dd, state_local->box, &ddbox, 0, 1, ncg_moved);
6710
6711                 nbnxn_put_on_grid(fr->nbv->nbs.get(), fr->ePBC, state_local->box,
6712                                   0,
6713                                   comm->zones.size[0].bb_x0,
6714                                   comm->zones.size[0].bb_x1,
6715                                   0, dd->ncg_home,
6716                                   comm->zones.dens_zone0,
6717                                   fr->cginfo,
6718                                   as_rvec_array(state_local->x.data()),
6719                                   ncg_moved, bRedist ? comm->movedBuffer.data() : nullptr,
6720                                   fr->nbv->grp[eintLocal].kernel_type,
6721                                   fr->nbv->nbat);
6722
6723                 nbnxn_get_ncells(fr->nbv->nbs.get(), &ncells_new[XX], &ncells_new[YY]);
6724                 break;
6725             case ecutsGROUP:
6726                 fill_grid(&comm->zones, fr->ns->grid, dd->ncg_home,
6727                           0, dd->ncg_home, fr->cg_cm);
6728
6729                 copy_ivec(fr->ns->grid->n, ncells_new);
6730                 break;
6731             default:
6732                 gmx_incons("unimplemented");
6733         }
6734
6735         bResortAll = bMasterState;
6736
6737         /* Check if we can user the old order and ns grid cell indices
6738          * of the charge groups to sort the charge groups efficiently.
6739          */
6740         if (ncells_new[XX] != ncells_old[XX] ||
6741             ncells_new[YY] != ncells_old[YY] ||
6742             ncells_new[ZZ] != ncells_old[ZZ])
6743         {
6744             bResortAll = TRUE;
6745         }
6746
6747         if (debug)
6748         {
6749             fprintf(debug, "Step %s, sorting the %d home charge groups\n",
6750                     gmx_step_str(step, sbuf), dd->ncg_home);
6751         }
6752         dd_sort_state(dd, fr->cg_cm, fr, state_local,
6753                       bResortAll ? -1 : ncg_home_old);
6754
6755         /* After sorting and compacting we set the correct size */
6756         dd_resize_state(state_local, f, comm->atomRanges.numHomeAtoms());
6757
6758         /* Rebuild all the indices */
6759         ga2la_clear(dd->ga2la);
6760         ncgindex_set = 0;
6761
6762         wallcycle_sub_stop(wcycle, ewcsDD_GRID);
6763     }
6764
6765     wallcycle_sub_start(wcycle, ewcsDD_SETUPCOMM);
6766
6767     /* Setup up the communication and communicate the coordinates */
6768     setup_dd_communication(dd, state_local->box, &ddbox, fr, state_local, f);
6769
6770     /* Set the indices */
6771     make_dd_indices(dd, cgs_gl->index, ncgindex_set);
6772
6773     /* Set the charge group boundaries for neighbor searching */
6774     set_cg_boundaries(&comm->zones);
6775
6776     if (fr->cutoff_scheme == ecutsVERLET)
6777     {
6778         /* When bSortCG=true, we have already set the size for zone 0 */
6779         set_zones_size(dd, state_local->box, &ddbox,
6780                        bSortCG ? 1 : 0, comm->zones.n,
6781                        0);
6782     }
6783
6784     wallcycle_sub_stop(wcycle, ewcsDD_SETUPCOMM);
6785
6786     /*
6787        write_dd_pdb("dd_home",step,"dump",top_global,cr,
6788                  -1,as_rvec_array(state_local->x.data()),state_local->box);
6789      */
6790
6791     wallcycle_sub_start(wcycle, ewcsDD_MAKETOP);
6792
6793     /* Extract a local topology from the global topology */
6794     for (int i = 0; i < dd->ndim; i++)
6795     {
6796         np[dd->dim[i]] = comm->cd[i].numPulses();
6797     }
6798     dd_make_local_top(dd, &comm->zones, dd->npbcdim, state_local->box,
6799                       comm->cellsize_min, np,
6800                       fr,
6801                       fr->cutoff_scheme == ecutsGROUP ? fr->cg_cm : as_rvec_array(state_local->x.data()),
6802                       vsite, top_global, top_local);
6803
6804     wallcycle_sub_stop(wcycle, ewcsDD_MAKETOP);
6805
6806     wallcycle_sub_start(wcycle, ewcsDD_MAKECONSTR);
6807
6808     /* Set up the special atom communication */
6809     int n = comm->atomRanges.end(DDAtomRanges::Type::Zones);
6810     for (int i = static_cast<int>(DDAtomRanges::Type::Zones) + 1; i < static_cast<int>(DDAtomRanges::Type::Number); i++)
6811     {
6812         auto range = static_cast<DDAtomRanges::Type>(i);
6813         switch (range)
6814         {
6815             case DDAtomRanges::Type::Vsites:
6816                 if (vsite && vsite->n_intercg_vsite)
6817                 {
6818                     n = dd_make_local_vsites(dd, n, top_local->idef.il);
6819                 }
6820                 break;
6821             case DDAtomRanges::Type::Constraints:
6822                 if (dd->bInterCGcons || dd->bInterCGsettles)
6823                 {
6824                     /* Only for inter-cg constraints we need special code */
6825                     n = dd_make_local_constraints(dd, n, top_global, fr->cginfo,
6826                                                   constr, ir->nProjOrder,
6827                                                   top_local->idef.il);
6828                 }
6829                 break;
6830             default:
6831                 gmx_incons("Unknown special atom type setup");
6832         }
6833         comm->atomRanges.setEnd(range, n);
6834     }
6835
6836     wallcycle_sub_stop(wcycle, ewcsDD_MAKECONSTR);
6837
6838     wallcycle_sub_start(wcycle, ewcsDD_TOPOTHER);
6839
6840     /* Make space for the extra coordinates for virtual site
6841      * or constraint communication.
6842      */
6843     state_local->natoms = comm->atomRanges.numAtomsTotal();
6844
6845     dd_resize_state(state_local, f, state_local->natoms);
6846
6847     if (fr->haveDirectVirialContributions)
6848     {
6849         if (vsite && vsite->n_intercg_vsite)
6850         {
6851             nat_f_novirsum = comm->atomRanges.end(DDAtomRanges::Type::Vsites);
6852         }
6853         else
6854         {
6855             if (EEL_FULL(ir->coulombtype) && dd->n_intercg_excl > 0)
6856             {
6857                 nat_f_novirsum = comm->atomRanges.end(DDAtomRanges::Type::Zones);
6858             }
6859             else
6860             {
6861                 nat_f_novirsum = comm->atomRanges.numHomeAtoms();
6862             }
6863         }
6864     }
6865     else
6866     {
6867         nat_f_novirsum = 0;
6868     }
6869
6870     /* Set the number of atoms required for the force calculation.
6871      * Forces need to be constrained when doing energy
6872      * minimization. For simple simulations we could avoid some
6873      * allocation, zeroing and copying, but this is probably not worth
6874      * the complications and checking.
6875      */
6876     forcerec_set_ranges(fr, dd->ncg_home, dd->globalAtomGroupIndices.size(),
6877                         comm->atomRanges.end(DDAtomRanges::Type::Zones),
6878                         comm->atomRanges.end(DDAtomRanges::Type::Constraints),
6879                         nat_f_novirsum);
6880
6881     /* Update atom data for mdatoms and several algorithms */
6882     mdAlgorithmsSetupAtomData(cr, ir, top_global, top_local, fr,
6883                               nullptr, mdAtoms, constr, vsite, nullptr);
6884
6885     auto mdatoms = mdAtoms->mdatoms();
6886     if (!thisRankHasDuty(cr, DUTY_PME))
6887     {
6888         /* Send the charges and/or c6/sigmas to our PME only node */
6889         gmx_pme_send_parameters(cr,
6890                                 fr->ic,
6891                                 mdatoms->nChargePerturbed, mdatoms->nTypePerturbed,
6892                                 mdatoms->chargeA, mdatoms->chargeB,
6893                                 mdatoms->sqrt_c6A, mdatoms->sqrt_c6B,
6894                                 mdatoms->sigmaA, mdatoms->sigmaB,
6895                                 dd_pme_maxshift_x(dd), dd_pme_maxshift_y(dd));
6896     }
6897
6898     if (ir->bPull)
6899     {
6900         /* Update the local pull groups */
6901         dd_make_local_pull_groups(cr, ir->pull_work, mdatoms);
6902     }
6903
6904     if (ir->bRot)
6905     {
6906         /* Update the local rotation groups */
6907         dd_make_local_rotation_groups(dd, ir->rot);
6908     }
6909
6910     if (ir->eSwapCoords != eswapNO)
6911     {
6912         /* Update the local groups needed for ion swapping */
6913         dd_make_local_swap_groups(dd, ir->swap);
6914     }
6915
6916     /* Update the local atoms to be communicated via the IMD protocol if bIMD is TRUE. */
6917     dd_make_local_IMD_atoms(ir->bIMD, dd, ir->imd);
6918
6919     add_dd_statistics(dd);
6920
6921     /* Make sure we only count the cycles for this DD partitioning */
6922     clear_dd_cycle_counts(dd);
6923
6924     /* Because the order of the atoms might have changed since
6925      * the last vsite construction, we need to communicate the constructing
6926      * atom coordinates again (for spreading the forces this MD step).
6927      */
6928     dd_move_x_vsites(dd, state_local->box, as_rvec_array(state_local->x.data()));
6929
6930     wallcycle_sub_stop(wcycle, ewcsDD_TOPOTHER);
6931
6932     if (comm->nstDDDump > 0 && step % comm->nstDDDump == 0)
6933     {
6934         dd_move_x(dd, state_local->box, state_local->x, nullWallcycle);
6935         write_dd_pdb("dd_dump", step, "dump", top_global, cr,
6936                      -1, as_rvec_array(state_local->x.data()), state_local->box);
6937     }
6938
6939     /* Store the partitioning step */
6940     comm->partition_step = step;
6941
6942     /* Increase the DD partitioning counter */
6943     dd->ddp_count++;
6944     /* The state currently matches this DD partitioning count, store it */
6945     state_local->ddp_count = dd->ddp_count;
6946     if (bMasterState)
6947     {
6948         /* The DD master node knows the complete cg distribution,
6949          * store the count so we can possibly skip the cg info communication.
6950          */
6951         comm->master_cg_ddp_count = (bSortCG ? 0 : dd->ddp_count);
6952     }
6953
6954     if (comm->DD_debug > 0)
6955     {
6956         /* Set the env var GMX_DD_DEBUG if you suspect corrupted indices */
6957         check_index_consistency(dd, top_global->natoms, ncg_mtop(top_global),
6958                                 "after partitioning");
6959     }
6960
6961     wallcycle_stop(wcycle, ewcDOMDEC);
6962 }
6963
6964 /*! \brief Check whether bonded interactions are missing, if appropriate */
6965 void checkNumberOfBondedInteractions(FILE                 *fplog,
6966                                      t_commrec            *cr,
6967                                      int                   totalNumberOfBondedInteractions,
6968                                      const gmx_mtop_t     *top_global,
6969                                      const gmx_localtop_t *top_local,
6970                                      const t_state        *state,
6971                                      bool                 *shouldCheckNumberOfBondedInteractions)
6972 {
6973     if (*shouldCheckNumberOfBondedInteractions)
6974     {
6975         if (totalNumberOfBondedInteractions != cr->dd->nbonded_global)
6976         {
6977             dd_print_missing_interactions(fplog, cr, totalNumberOfBondedInteractions, top_global, top_local, state); // Does not return
6978         }
6979         *shouldCheckNumberOfBondedInteractions = false;
6980     }
6981 }