7d8df77ad30fdd262b35ffee95979b9b7ae3a2f1
[alexxy/gromacs.git] / src / gromacs / domdec / domdec.cpp
1 /*
2  * This file is part of the GROMACS molecular simulation package.
3  *
4  * Copyright (c) 2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017,2018, by the GROMACS development team, led by
5  * Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6  * and including many others, as listed in the AUTHORS file in the
7  * top-level source directory and at http://www.gromacs.org.
8  *
9  * GROMACS is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public License
11  * as published by the Free Software Foundation; either version 2.1
12  * of the License, or (at your option) any later version.
13  *
14  * GROMACS is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Lesser General Public License for more details.
18  *
19  * You should have received a copy of the GNU Lesser General Public
20  * License along with GROMACS; if not, see
21  * http://www.gnu.org/licenses, or write to the Free Software Foundation,
22  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
23  *
24  * If you want to redistribute modifications to GROMACS, please
25  * consider that scientific software is very special. Version
26  * control is crucial - bugs must be traceable. We will be happy to
27  * consider code for inclusion in the official distribution, but
28  * derived work must not be called official GROMACS. Details are found
29  * in the README & COPYING files - if they are missing, get the
30  * official version at http://www.gromacs.org.
31  *
32  * To help us fund GROMACS development, we humbly ask that you cite
33  * the research papers on the package. Check out http://www.gromacs.org.
34  */
35
36 #include "gmxpre.h"
37
38 #include "domdec.h"
39
40 #include "config.h"
41
42 #include <assert.h>
43 #include <limits.h>
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <string.h>
47
48 #include <cmath>
49
50 #include <algorithm>
51
52 #include "gromacs/compat/make_unique.h"
53 #include "gromacs/domdec/collect.h"
54 #include "gromacs/domdec/dlbtiming.h"
55 #include "gromacs/domdec/domdec_network.h"
56 #include "gromacs/domdec/ga2la.h"
57 #include "gromacs/ewald/pme.h"
58 #include "gromacs/fileio/gmxfio.h"
59 #include "gromacs/fileio/pdbio.h"
60 #include "gromacs/gmxlib/chargegroup.h"
61 #include "gromacs/gmxlib/network.h"
62 #include "gromacs/gmxlib/nrnb.h"
63 #include "gromacs/gpu_utils/gpu_utils.h"
64 #include "gromacs/hardware/hw_info.h"
65 #include "gromacs/imd/imd.h"
66 #include "gromacs/listed-forces/manage-threading.h"
67 #include "gromacs/math/functions.h"
68 #include "gromacs/math/vec.h"
69 #include "gromacs/math/vectypes.h"
70 #include "gromacs/mdlib/constr.h"
71 #include "gromacs/mdlib/constraintrange.h"
72 #include "gromacs/mdlib/forcerec.h"
73 #include "gromacs/mdlib/gmx_omp_nthreads.h"
74 #include "gromacs/mdlib/lincs.h"
75 #include "gromacs/mdlib/mdatoms.h"
76 #include "gromacs/mdlib/mdrun.h"
77 #include "gromacs/mdlib/mdsetup.h"
78 #include "gromacs/mdlib/nb_verlet.h"
79 #include "gromacs/mdlib/nbnxn_grid.h"
80 #include "gromacs/mdlib/nsgrid.h"
81 #include "gromacs/mdlib/vsite.h"
82 #include "gromacs/mdtypes/commrec.h"
83 #include "gromacs/mdtypes/df_history.h"
84 #include "gromacs/mdtypes/forcerec.h"
85 #include "gromacs/mdtypes/inputrec.h"
86 #include "gromacs/mdtypes/md_enums.h"
87 #include "gromacs/mdtypes/mdatom.h"
88 #include "gromacs/mdtypes/nblist.h"
89 #include "gromacs/mdtypes/state.h"
90 #include "gromacs/pbcutil/ishift.h"
91 #include "gromacs/pbcutil/pbc.h"
92 #include "gromacs/pulling/pull.h"
93 #include "gromacs/pulling/pull_rotation.h"
94 #include "gromacs/swap/swapcoords.h"
95 #include "gromacs/timing/wallcycle.h"
96 #include "gromacs/topology/block.h"
97 #include "gromacs/topology/idef.h"
98 #include "gromacs/topology/ifunc.h"
99 #include "gromacs/topology/mtop_lookup.h"
100 #include "gromacs/topology/mtop_util.h"
101 #include "gromacs/topology/topology.h"
102 #include "gromacs/utility/basedefinitions.h"
103 #include "gromacs/utility/basenetwork.h"
104 #include "gromacs/utility/cstringutil.h"
105 #include "gromacs/utility/exceptions.h"
106 #include "gromacs/utility/fatalerror.h"
107 #include "gromacs/utility/gmxmpi.h"
108 #include "gromacs/utility/qsort_threadsafe.h"
109 #include "gromacs/utility/real.h"
110 #include "gromacs/utility/smalloc.h"
111 #include "gromacs/utility/stringutil.h"
112
113 #include "atomdistribution.h"
114 #include "cellsizes.h"
115 #include "distribute.h"
116 #include "domdec_constraints.h"
117 #include "domdec_internal.h"
118 #include "domdec_vsite.h"
119 #include "redistribute.h"
120 #include "utility.h"
121
122 #define DD_NLOAD_MAX 9
123
124 const char *edlbs_names[edlbsNR] = { "off", "auto", "locked", "on", "on" };
125
126 /* The size per charge group of the cggl_flag buffer in gmx_domdec_comm_t */
127 #define DD_CGIBS 2
128
129 /* The flags for the cggl_flag buffer in gmx_domdec_comm_t */
130 #define DD_FLAG_NRCG  65535
131 #define DD_FLAG_FW(d) (1<<(16+(d)*2))
132 #define DD_FLAG_BW(d) (1<<(16+(d)*2+1))
133
134 /* The DD zone order */
135 static const ivec dd_zo[DD_MAXZONE] =
136 {{0, 0, 0}, {1, 0, 0}, {1, 1, 0}, {0, 1, 0}, {0, 1, 1}, {0, 0, 1}, {1, 0, 1}, {1, 1, 1}};
137
138 /* The non-bonded zone-pair setup for domain decomposition
139  * The first number is the i-zone, the second number the first j-zone seen by
140  * this i-zone, the third number the last+1 j-zone seen by this i-zone.
141  * As is, this is for 3D decomposition, where there are 4 i-zones.
142  * With 2D decomposition use only the first 2 i-zones and a last+1 j-zone of 4.
143  * With 1D decomposition use only the first i-zone and a last+1 j-zone of 2.
144  */
145 static const int
146     ddNonbondedZonePairRanges[DD_MAXIZONE][3] = {{0, 0, 8},
147                                                  {1, 3, 6},
148                                                  {2, 5, 6},
149                                                  {3, 5, 7}};
150
151 /* Turn on DLB when the load imbalance causes this amount of total loss.
152  * There is a bit of overhead with DLB and it's difficult to achieve
153  * a load imbalance of less than 2% with DLB.
154  */
155 #define DD_PERF_LOSS_DLB_ON  0.02
156
157 /* Warn about imbalance due to PP or PP/PME load imbalance at this loss */
158 #define DD_PERF_LOSS_WARN    0.05
159
160
161 /* We check if to turn on DLB at the first and every 100 DD partitionings.
162  * With large imbalance DLB will turn on at the first step, so we can
163  * make the interval so large that the MPI overhead of the check is negligible.
164  */
165 static const int c_checkTurnDlbOnInterval  = 100;
166 /* We need to check if DLB results in worse performance and then turn it off.
167  * We check this more often then for turning DLB on, because the DLB can scale
168  * the domains very rapidly, so if unlucky the load imbalance can go up quickly
169  * and furthermore, we are already synchronizing often with DLB, so
170  * the overhead of the MPI Bcast is not that high.
171  */
172 static const int c_checkTurnDlbOffInterval =  20;
173
174 /* Forward declaration */
175 static void dd_dlb_set_should_check_whether_to_turn_dlb_on(gmx_domdec_t *dd, gmx_bool bValue);
176
177
178 /*
179    #define dd_index(n,i) ((((i)[ZZ]*(n)[YY] + (i)[YY])*(n)[XX]) + (i)[XX])
180
181    static void index2xyz(ivec nc,int ind,ivec xyz)
182    {
183    xyz[XX] = ind % nc[XX];
184    xyz[YY] = (ind / nc[XX]) % nc[YY];
185    xyz[ZZ] = ind / (nc[YY]*nc[XX]);
186    }
187  */
188
189 static void ddindex2xyz(ivec nc, int ind, ivec xyz)
190 {
191     xyz[XX] = ind / (nc[YY]*nc[ZZ]);
192     xyz[YY] = (ind / nc[ZZ]) % nc[YY];
193     xyz[ZZ] = ind % nc[ZZ];
194 }
195
196 static int ddcoord2ddnodeid(gmx_domdec_t *dd, ivec c)
197 {
198     int ddindex;
199     int ddnodeid = -1;
200
201     ddindex = dd_index(dd->nc, c);
202     if (dd->comm->bCartesianPP_PME)
203     {
204         ddnodeid = dd->comm->ddindex2ddnodeid[ddindex];
205     }
206     else if (dd->comm->bCartesianPP)
207     {
208 #if GMX_MPI
209         MPI_Cart_rank(dd->mpi_comm_all, c, &ddnodeid);
210 #endif
211     }
212     else
213     {
214         ddnodeid = ddindex;
215     }
216
217     return ddnodeid;
218 }
219
220 static gmx_bool dynamic_dd_box(const gmx_ddbox_t *ddbox, const t_inputrec *ir)
221 {
222     return (ddbox->nboundeddim < DIM || inputrecDynamicBox(ir));
223 }
224
225 int ddglatnr(const gmx_domdec_t *dd, int i)
226 {
227     int atnr;
228
229     if (dd == nullptr)
230     {
231         atnr = i + 1;
232     }
233     else
234     {
235         if (i >= dd->comm->atomRanges.numAtomsTotal())
236         {
237             gmx_fatal(FARGS, "glatnr called with %d, which is larger than the local number of atoms (%d)", i, dd->comm->atomRanges.numAtomsTotal());
238         }
239         atnr = dd->globalAtomIndices[i] + 1;
240     }
241
242     return atnr;
243 }
244
245 t_block *dd_charge_groups_global(gmx_domdec_t *dd)
246 {
247     return &dd->comm->cgs_gl;
248 }
249
250 void dd_store_state(gmx_domdec_t *dd, t_state *state)
251 {
252     int i;
253
254     if (state->ddp_count != dd->ddp_count)
255     {
256         gmx_incons("The MD state does not match the domain decomposition state");
257     }
258
259     state->cg_gl.resize(dd->ncg_home);
260     for (i = 0; i < dd->ncg_home; i++)
261     {
262         state->cg_gl[i] = dd->globalAtomGroupIndices[i];
263     }
264
265     state->ddp_count_cg_gl = dd->ddp_count;
266 }
267
268 gmx_domdec_zones_t *domdec_zones(gmx_domdec_t *dd)
269 {
270     return &dd->comm->zones;
271 }
272
273 void dd_get_ns_ranges(const gmx_domdec_t *dd, int icg,
274                       int *jcg0, int *jcg1, ivec shift0, ivec shift1)
275 {
276     gmx_domdec_zones_t *zones;
277     int                 izone, d, dim;
278
279     zones = &dd->comm->zones;
280
281     izone = 0;
282     while (icg >= zones->izone[izone].cg1)
283     {
284         izone++;
285     }
286
287     if (izone == 0)
288     {
289         *jcg0 = icg;
290     }
291     else if (izone < zones->nizone)
292     {
293         *jcg0 = zones->izone[izone].jcg0;
294     }
295     else
296     {
297         gmx_fatal(FARGS, "DD icg %d out of range: izone (%d) >= nizone (%d)",
298                   icg, izone, zones->nizone);
299     }
300
301     *jcg1 = zones->izone[izone].jcg1;
302
303     for (d = 0; d < dd->ndim; d++)
304     {
305         dim         = dd->dim[d];
306         shift0[dim] = zones->izone[izone].shift0[dim];
307         shift1[dim] = zones->izone[izone].shift1[dim];
308         if (dd->comm->tric_dir[dim] || (isDlbOn(dd->comm) && d > 0))
309         {
310             /* A conservative approach, this can be optimized */
311             shift0[dim] -= 1;
312             shift1[dim] += 1;
313         }
314     }
315 }
316
317 int dd_numHomeAtoms(const gmx_domdec_t &dd)
318 {
319     return dd.comm->atomRanges.numHomeAtoms();
320 }
321
322 int dd_natoms_mdatoms(const gmx_domdec_t *dd)
323 {
324     /* We currently set mdatoms entries for all atoms:
325      * local + non-local + communicated for vsite + constraints
326      */
327
328     return dd->comm->atomRanges.numAtomsTotal();
329 }
330
331 int dd_natoms_vsite(const gmx_domdec_t *dd)
332 {
333     return dd->comm->atomRanges.end(DDAtomRanges::Type::Vsites);
334 }
335
336 void dd_get_constraint_range(const gmx_domdec_t *dd, int *at_start, int *at_end)
337 {
338     *at_start = dd->comm->atomRanges.start(DDAtomRanges::Type::Constraints);
339     *at_end   = dd->comm->atomRanges.end(DDAtomRanges::Type::Constraints);
340 }
341
342 void dd_move_x(gmx_domdec_t             *dd,
343                matrix                    box,
344                gmx::ArrayRef<gmx::RVec>  x,
345                gmx_wallcycle            *wcycle)
346 {
347     wallcycle_start(wcycle, ewcMOVEX);
348
349     int                    nzone, nat_tot;
350     gmx_domdec_comm_t     *comm;
351     gmx_domdec_comm_dim_t *cd;
352     rvec                   shift = {0, 0, 0};
353     gmx_bool               bPBC, bScrew;
354
355     comm = dd->comm;
356
357     const BlockRanges &atomGroups = dd->atomGroups();
358
359     nzone   = 1;
360     nat_tot = comm->atomRanges.numHomeAtoms();
361     for (int d = 0; d < dd->ndim; d++)
362     {
363         bPBC   = (dd->ci[dd->dim[d]] == 0);
364         bScrew = (bPBC && dd->bScrewPBC && dd->dim[d] == XX);
365         if (bPBC)
366         {
367             copy_rvec(box[dd->dim[d]], shift);
368         }
369         cd = &comm->cd[d];
370         for (const gmx_domdec_ind_t &ind : cd->ind)
371         {
372             gmx::ArrayRef<const int>   index = ind.index;
373             DDBufferAccess<gmx::RVec>  sendBufferAccess(comm->rvecBuffer, ind.nsend[nzone + 1]);
374             gmx::ArrayRef<gmx::RVec>  &sendBuffer = sendBufferAccess.buffer;
375             int                        n          = 0;
376             if (!bPBC)
377             {
378                 for (int i = 0; i < ind.nsend[nzone]; i++)
379                 {
380                     int at0 = atomGroups.index[index[i]];
381                     int at1 = atomGroups.index[index[i] + 1];
382                     for (int j = at0; j < at1; j++)
383                     {
384                         sendBuffer[n] = x[j];
385                         n++;
386                     }
387                 }
388             }
389             else if (!bScrew)
390             {
391                 for (int i = 0; i < ind.nsend[nzone]; i++)
392                 {
393                     int at0 = atomGroups.index[index[i]];
394                     int at1 = atomGroups.index[index[i] + 1];
395                     for (int j = at0; j < at1; j++)
396                     {
397                         /* We need to shift the coordinates */
398                         for (int d = 0; d < DIM; d++)
399                         {
400                             sendBuffer[n][d] = x[j][d] + shift[d];
401                         }
402                         n++;
403                     }
404                 }
405             }
406             else
407             {
408                 for (int i = 0; i < ind.nsend[nzone]; i++)
409                 {
410                     int at0 = atomGroups.index[index[i]];
411                     int at1 = atomGroups.index[index[i]+1];
412                     for (int j = at0; j < at1; j++)
413                     {
414                         /* Shift x */
415                         sendBuffer[n][XX] = x[j][XX] + shift[XX];
416                         /* Rotate y and z.
417                          * This operation requires a special shift force
418                          * treatment, which is performed in calc_vir.
419                          */
420                         sendBuffer[n][YY] = box[YY][YY] - x[j][YY];
421                         sendBuffer[n][ZZ] = box[ZZ][ZZ] - x[j][ZZ];
422                         n++;
423                     }
424                 }
425             }
426
427             DDBufferAccess<gmx::RVec>  receiveBufferAccess(comm->rvecBuffer2, cd->receiveInPlace ? 0 : ind.nrecv[nzone + 1]);
428
429             gmx::ArrayRef<gmx::RVec>   receiveBuffer;
430             if (cd->receiveInPlace)
431             {
432                 receiveBuffer = gmx::arrayRefFromArray(x.data() + nat_tot, ind.nrecv[nzone + 1]);
433             }
434             else
435             {
436                 receiveBuffer = receiveBufferAccess.buffer;
437             }
438             /* Send and receive the coordinates */
439             ddSendrecv(dd, d, dddirBackward,
440                        sendBuffer, receiveBuffer);
441
442             if (!cd->receiveInPlace)
443             {
444                 int j = 0;
445                 for (int zone = 0; zone < nzone; zone++)
446                 {
447                     for (int i = ind.cell2at0[zone]; i < ind.cell2at1[zone]; i++)
448                     {
449                         x[i] = receiveBuffer[j++];
450                     }
451                 }
452             }
453             nat_tot += ind.nrecv[nzone+1];
454         }
455         nzone += nzone;
456     }
457
458     wallcycle_stop(wcycle, ewcMOVEX);
459 }
460
461 void dd_move_f(gmx_domdec_t             *dd,
462                gmx::ArrayRef<gmx::RVec>  f,
463                rvec                     *fshift,
464                gmx_wallcycle            *wcycle)
465 {
466     wallcycle_start(wcycle, ewcMOVEF);
467
468     int                    nzone, nat_tot;
469     gmx_domdec_comm_t     *comm;
470     gmx_domdec_comm_dim_t *cd;
471     ivec                   vis;
472     int                    is;
473     gmx_bool               bShiftForcesNeedPbc, bScrew;
474
475     comm = dd->comm;
476
477     const BlockRanges &atomGroups = dd->atomGroups();
478
479     nzone   = comm->zones.n/2;
480     nat_tot = comm->atomRanges.end(DDAtomRanges::Type::Zones);
481     for (int d = dd->ndim-1; d >= 0; d--)
482     {
483         /* Only forces in domains near the PBC boundaries need to
484            consider PBC in the treatment of fshift */
485         bShiftForcesNeedPbc   = (dd->ci[dd->dim[d]] == 0);
486         bScrew                = (bShiftForcesNeedPbc && dd->bScrewPBC && dd->dim[d] == XX);
487         if (fshift == nullptr && !bScrew)
488         {
489             bShiftForcesNeedPbc = FALSE;
490         }
491         /* Determine which shift vector we need */
492         clear_ivec(vis);
493         vis[dd->dim[d]] = 1;
494         is              = IVEC2IS(vis);
495
496         cd = &comm->cd[d];
497         for (int p = cd->numPulses() - 1; p >= 0; p--)
498         {
499             const gmx_domdec_ind_t    &ind  = cd->ind[p];
500             DDBufferAccess<gmx::RVec>  receiveBufferAccess(comm->rvecBuffer, ind.nsend[nzone + 1]);
501             gmx::ArrayRef<gmx::RVec>  &receiveBuffer = receiveBufferAccess.buffer;
502
503             nat_tot                        -= ind.nrecv[nzone+1];
504
505             DDBufferAccess<gmx::RVec>  sendBufferAccess(comm->rvecBuffer2, cd->receiveInPlace ? 0 : ind.nrecv[nzone + 1]);
506
507             gmx::ArrayRef<gmx::RVec>   sendBuffer;
508             if (cd->receiveInPlace)
509             {
510                 sendBuffer = gmx::arrayRefFromArray(f.data() + nat_tot, ind.nrecv[nzone + 1]);
511             }
512             else
513             {
514                 sendBuffer = sendBufferAccess.buffer;
515                 int j = 0;
516                 for (int zone = 0; zone < nzone; zone++)
517                 {
518                     for (int i = ind.cell2at0[zone]; i < ind.cell2at1[zone]; i++)
519                     {
520                         sendBuffer[j++] = f[i];
521                     }
522                 }
523             }
524             /* Communicate the forces */
525             ddSendrecv(dd, d, dddirForward,
526                        sendBuffer, receiveBuffer);
527             gmx::ArrayRef<const int> index = ind.index;
528             /* Add the received forces */
529             int                      n = 0;
530             if (!bShiftForcesNeedPbc)
531             {
532                 for (int i = 0; i < ind.nsend[nzone]; i++)
533                 {
534                     int at0 = atomGroups.index[index[i]];
535                     int at1 = atomGroups.index[index[i] + 1];
536                     for (int j = at0; j < at1; j++)
537                     {
538                         for (int d = 0; d < DIM; d++)
539                         {
540                             f[j][d] += receiveBuffer[n][d];
541                         }
542                         n++;
543                     }
544                 }
545             }
546             else if (!bScrew)
547             {
548                 /* fshift should always be defined if this function is
549                  * called when bShiftForcesNeedPbc is true */
550                 assert(NULL != fshift);
551                 for (int i = 0; i < ind.nsend[nzone]; i++)
552                 {
553                     int at0 = atomGroups.index[index[i]];
554                     int at1 = atomGroups.index[index[i] + 1];
555                     for (int j = at0; j < at1; j++)
556                     {
557                         for (int d = 0; d < DIM; d++)
558                         {
559                             f[j][d] += receiveBuffer[n][d];
560                         }
561                         /* Add this force to the shift force */
562                         for (int d = 0; d < DIM; d++)
563                         {
564                             fshift[is][d] += receiveBuffer[n][d];
565                         }
566                         n++;
567                     }
568                 }
569             }
570             else
571             {
572                 for (int i = 0; i < ind.nsend[nzone]; i++)
573                 {
574                     int at0 = atomGroups.index[index[i]];
575                     int at1 = atomGroups.index[index[i] + 1];
576                     for (int j = at0; j < at1; j++)
577                     {
578                         /* Rotate the force */
579                         f[j][XX] += receiveBuffer[n][XX];
580                         f[j][YY] -= receiveBuffer[n][YY];
581                         f[j][ZZ] -= receiveBuffer[n][ZZ];
582                         if (fshift)
583                         {
584                             /* Add this force to the shift force */
585                             for (int d = 0; d < DIM; d++)
586                             {
587                                 fshift[is][d] += receiveBuffer[n][d];
588                             }
589                         }
590                         n++;
591                     }
592                 }
593             }
594         }
595         nzone /= 2;
596     }
597     wallcycle_stop(wcycle, ewcMOVEF);
598 }
599
600 /* Convenience function for extracting a real buffer from an rvec buffer
601  *
602  * To reduce the number of temporary communication buffers and avoid
603  * cache polution, we reuse gmx::RVec buffers for storing reals.
604  * This functions return a real buffer reference with the same number
605  * of elements as the gmx::RVec buffer (so 1/3 of the size in bytes).
606  */
607 static gmx::ArrayRef<real>
608 realArrayRefFromRvecArrayRef(gmx::ArrayRef<gmx::RVec> arrayRef)
609 {
610     return gmx::arrayRefFromArray(as_rvec_array(arrayRef.data())[0],
611                                   arrayRef.size());
612 }
613
614 void dd_atom_spread_real(gmx_domdec_t *dd, real v[])
615 {
616     int                    nzone, nat_tot;
617     gmx_domdec_comm_t     *comm;
618     gmx_domdec_comm_dim_t *cd;
619
620     comm = dd->comm;
621
622     const BlockRanges &atomGroups = dd->atomGroups();
623
624     nzone   = 1;
625     nat_tot = comm->atomRanges.numHomeAtoms();
626     for (int d = 0; d < dd->ndim; d++)
627     {
628         cd = &comm->cd[d];
629         for (const gmx_domdec_ind_t &ind : cd->ind)
630         {
631             gmx::ArrayRef<const int>  index = ind.index;
632
633             /* Note: We provision for RVec instead of real, so a factor of 3
634              * more than needed. The buffer actually already has this size
635              * and we pass a plain pointer below, so this does not matter.
636              */
637             DDBufferAccess<gmx::RVec> sendBufferAccess(comm->rvecBuffer, ind.nsend[nzone + 1]);
638             gmx::ArrayRef<real>       sendBuffer = realArrayRefFromRvecArrayRef(sendBufferAccess.buffer);
639             int                       n          = 0;
640             for (int i = 0; i < ind.nsend[nzone]; i++)
641             {
642                 int at0 = atomGroups.index[index[i]];
643                 int at1 = atomGroups.index[index[i] + 1];
644                 for (int j = at0; j < at1; j++)
645                 {
646                     sendBuffer[n++] = v[j];
647                 }
648             }
649
650             DDBufferAccess<gmx::RVec> receiveBufferAccess(comm->rvecBuffer2, cd->receiveInPlace ? 0 : ind.nrecv[nzone + 1]);
651
652             gmx::ArrayRef<real>       receiveBuffer;
653             if (cd->receiveInPlace)
654             {
655                 receiveBuffer = gmx::arrayRefFromArray(v + nat_tot, ind.nrecv[nzone + 1]);
656             }
657             else
658             {
659                 receiveBuffer = realArrayRefFromRvecArrayRef(receiveBufferAccess.buffer);
660             }
661             /* Send and receive the data */
662             ddSendrecv(dd, d, dddirBackward,
663                        sendBuffer, receiveBuffer);
664             if (!cd->receiveInPlace)
665             {
666                 int j = 0;
667                 for (int zone = 0; zone < nzone; zone++)
668                 {
669                     for (int i = ind.cell2at0[zone]; i < ind.cell2at1[zone]; i++)
670                     {
671                         v[i] = receiveBuffer[j++];
672                     }
673                 }
674             }
675             nat_tot += ind.nrecv[nzone+1];
676         }
677         nzone += nzone;
678     }
679 }
680
681 void dd_atom_sum_real(gmx_domdec_t *dd, real v[])
682 {
683     int                    nzone, nat_tot;
684     gmx_domdec_comm_t     *comm;
685     gmx_domdec_comm_dim_t *cd;
686
687     comm = dd->comm;
688
689     const gmx::BlockRanges &atomGroups = dd->atomGroups();
690
691     nzone   = comm->zones.n/2;
692     nat_tot = comm->atomRanges.end(DDAtomRanges::Type::Zones);
693     for (int d = dd->ndim-1; d >= 0; d--)
694     {
695         cd = &comm->cd[d];
696         for (int p = cd->numPulses() - 1; p >= 0; p--)
697         {
698             const gmx_domdec_ind_t &ind = cd->ind[p];
699
700             /* Note: We provision for RVec instead of real, so a factor of 3
701              * more than needed. The buffer actually already has this size
702              * and we typecast, so this works as intended.
703              */
704             DDBufferAccess<gmx::RVec> receiveBufferAccess(comm->rvecBuffer, ind.nsend[nzone + 1]);
705             gmx::ArrayRef<real>       receiveBuffer = realArrayRefFromRvecArrayRef(receiveBufferAccess.buffer);
706             nat_tot -= ind.nrecv[nzone + 1];
707
708             DDBufferAccess<gmx::RVec> sendBufferAccess(comm->rvecBuffer2, cd->receiveInPlace ? 0 : ind.nrecv[nzone + 1]);
709
710             gmx::ArrayRef<real>       sendBuffer;
711             if (cd->receiveInPlace)
712             {
713                 sendBuffer = gmx::arrayRefFromArray(v + nat_tot, ind.nrecv[nzone + 1]);
714             }
715             else
716             {
717                 sendBuffer = realArrayRefFromRvecArrayRef(sendBufferAccess.buffer);
718                 int j = 0;
719                 for (int zone = 0; zone < nzone; zone++)
720                 {
721                     for (int i = ind.cell2at0[zone]; i < ind.cell2at1[zone]; i++)
722                     {
723                         sendBuffer[j++] = v[i];
724                     }
725                 }
726             }
727             /* Communicate the forces */
728             ddSendrecv(dd, d, dddirForward,
729                        sendBuffer, receiveBuffer);
730             gmx::ArrayRef<const int> index = ind.index;
731             /* Add the received forces */
732             int                      n = 0;
733             for (int i = 0; i < ind.nsend[nzone]; i++)
734             {
735                 int at0 = atomGroups.index[index[i]];
736                 int at1 = atomGroups.index[index[i] + 1];
737                 for (int j = at0; j < at1; j++)
738                 {
739                     v[j] += receiveBuffer[n];
740                     n++;
741                 }
742             }
743         }
744         nzone /= 2;
745     }
746 }
747
748 static void print_ddzone(FILE *fp, int d, int i, int j, gmx_ddzone_t *zone)
749 {
750     fprintf(fp, "zone d0 %d d1 %d d2 %d  min0 %6.3f max1 %6.3f mch0 %6.3f mch1 %6.3f p1_0 %6.3f p1_1 %6.3f\n",
751             d, i, j,
752             zone->min0, zone->max1,
753             zone->mch0, zone->mch0,
754             zone->p1_0, zone->p1_1);
755 }
756
757
758 #define DDZONECOMM_MAXZONE  5
759 #define DDZONECOMM_BUFSIZE  3
760
761 static void dd_sendrecv_ddzone(const gmx_domdec_t *dd,
762                                int ddimind, int direction,
763                                gmx_ddzone_t *buf_s, int n_s,
764                                gmx_ddzone_t *buf_r, int n_r)
765 {
766 #define ZBS  DDZONECOMM_BUFSIZE
767     rvec vbuf_s[DDZONECOMM_MAXZONE*ZBS];
768     rvec vbuf_r[DDZONECOMM_MAXZONE*ZBS];
769     int  i;
770
771     for (i = 0; i < n_s; i++)
772     {
773         vbuf_s[i*ZBS  ][0] = buf_s[i].min0;
774         vbuf_s[i*ZBS  ][1] = buf_s[i].max1;
775         vbuf_s[i*ZBS  ][2] = buf_s[i].min1;
776         vbuf_s[i*ZBS+1][0] = buf_s[i].mch0;
777         vbuf_s[i*ZBS+1][1] = buf_s[i].mch1;
778         vbuf_s[i*ZBS+1][2] = 0;
779         vbuf_s[i*ZBS+2][0] = buf_s[i].p1_0;
780         vbuf_s[i*ZBS+2][1] = buf_s[i].p1_1;
781         vbuf_s[i*ZBS+2][2] = 0;
782     }
783
784     ddSendrecv(dd, ddimind, direction,
785                vbuf_s, n_s*ZBS,
786                vbuf_r, n_r*ZBS);
787
788     for (i = 0; i < n_r; i++)
789     {
790         buf_r[i].min0 = vbuf_r[i*ZBS  ][0];
791         buf_r[i].max1 = vbuf_r[i*ZBS  ][1];
792         buf_r[i].min1 = vbuf_r[i*ZBS  ][2];
793         buf_r[i].mch0 = vbuf_r[i*ZBS+1][0];
794         buf_r[i].mch1 = vbuf_r[i*ZBS+1][1];
795         buf_r[i].p1_0 = vbuf_r[i*ZBS+2][0];
796         buf_r[i].p1_1 = vbuf_r[i*ZBS+2][1];
797     }
798
799 #undef ZBS
800 }
801
802 static void dd_move_cellx(gmx_domdec_t *dd, gmx_ddbox_t *ddbox,
803                           rvec cell_ns_x0, rvec cell_ns_x1)
804 {
805     int                d, d1, dim, pos, buf_size, i, j, p, npulse, npulse_min;
806     gmx_ddzone_t      *zp;
807     gmx_ddzone_t       buf_s[DDZONECOMM_MAXZONE];
808     gmx_ddzone_t       buf_r[DDZONECOMM_MAXZONE];
809     gmx_ddzone_t       buf_e[DDZONECOMM_MAXZONE];
810     rvec               extr_s[2], extr_r[2];
811     rvec               dh;
812     real               dist_d, c = 0, det;
813     gmx_domdec_comm_t *comm;
814     gmx_bool           bPBC, bUse;
815
816     comm = dd->comm;
817
818     for (d = 1; d < dd->ndim; d++)
819     {
820         dim      = dd->dim[d];
821         zp       = (d == 1) ? &comm->zone_d1[0] : &comm->zone_d2[0][0];
822         zp->min0 = cell_ns_x0[dim];
823         zp->max1 = cell_ns_x1[dim];
824         zp->min1 = cell_ns_x1[dim];
825         zp->mch0 = cell_ns_x0[dim];
826         zp->mch1 = cell_ns_x1[dim];
827         zp->p1_0 = cell_ns_x0[dim];
828         zp->p1_1 = cell_ns_x1[dim];
829     }
830
831     for (d = dd->ndim-2; d >= 0; d--)
832     {
833         dim  = dd->dim[d];
834         bPBC = (dim < ddbox->npbcdim);
835
836         /* Use an rvec to store two reals */
837         extr_s[d][0] = comm->cell_f0[d+1];
838         extr_s[d][1] = comm->cell_f1[d+1];
839         extr_s[d][2] = comm->cell_f1[d+1];
840
841         pos = 0;
842         /* Store the extremes in the backward sending buffer,
843          * so the get updated separately from the forward communication.
844          */
845         for (d1 = d; d1 < dd->ndim-1; d1++)
846         {
847             /* We invert the order to be able to use the same loop for buf_e */
848             buf_s[pos].min0 = extr_s[d1][1];
849             buf_s[pos].max1 = extr_s[d1][0];
850             buf_s[pos].min1 = extr_s[d1][2];
851             buf_s[pos].mch0 = 0;
852             buf_s[pos].mch1 = 0;
853             /* Store the cell corner of the dimension we communicate along */
854             buf_s[pos].p1_0 = comm->cell_x0[dim];
855             buf_s[pos].p1_1 = 0;
856             pos++;
857         }
858
859         buf_s[pos] = (dd->ndim == 2) ? comm->zone_d1[0] : comm->zone_d2[0][0];
860         pos++;
861
862         if (dd->ndim == 3 && d == 0)
863         {
864             buf_s[pos] = comm->zone_d2[0][1];
865             pos++;
866             buf_s[pos] = comm->zone_d1[0];
867             pos++;
868         }
869
870         /* We only need to communicate the extremes
871          * in the forward direction
872          */
873         npulse = comm->cd[d].numPulses();
874         if (bPBC)
875         {
876             /* Take the minimum to avoid double communication */
877             npulse_min = std::min(npulse, dd->nc[dim]-1-npulse);
878         }
879         else
880         {
881             /* Without PBC we should really not communicate over
882              * the boundaries, but implementing that complicates
883              * the communication setup and therefore we simply
884              * do all communication, but ignore some data.
885              */
886             npulse_min = npulse;
887         }
888         for (p = 0; p < npulse_min; p++)
889         {
890             /* Communicate the extremes forward */
891             bUse = (bPBC || dd->ci[dim] > 0);
892
893             ddSendrecv(dd, d, dddirForward,
894                        extr_s+d, dd->ndim-d-1,
895                        extr_r+d, dd->ndim-d-1);
896
897             if (bUse)
898             {
899                 for (d1 = d; d1 < dd->ndim-1; d1++)
900                 {
901                     extr_s[d1][0] = std::max(extr_s[d1][0], extr_r[d1][0]);
902                     extr_s[d1][1] = std::min(extr_s[d1][1], extr_r[d1][1]);
903                     extr_s[d1][2] = std::min(extr_s[d1][2], extr_r[d1][2]);
904                 }
905             }
906         }
907
908         buf_size = pos;
909         for (p = 0; p < npulse; p++)
910         {
911             /* Communicate all the zone information backward */
912             bUse = (bPBC || dd->ci[dim] < dd->nc[dim] - 1);
913
914             dd_sendrecv_ddzone(dd, d, dddirBackward,
915                                buf_s, buf_size,
916                                buf_r, buf_size);
917
918             clear_rvec(dh);
919             if (p > 0)
920             {
921                 for (d1 = d+1; d1 < dd->ndim; d1++)
922                 {
923                     /* Determine the decrease of maximum required
924                      * communication height along d1 due to the distance along d,
925                      * this avoids a lot of useless atom communication.
926                      */
927                     dist_d = comm->cell_x1[dim] - buf_r[0].p1_0;
928
929                     if (ddbox->tric_dir[dim])
930                     {
931                         /* c is the off-diagonal coupling between the cell planes
932                          * along directions d and d1.
933                          */
934                         c = ddbox->v[dim][dd->dim[d1]][dim];
935                     }
936                     else
937                     {
938                         c = 0;
939                     }
940                     det = (1 + c*c)*comm->cutoff*comm->cutoff - dist_d*dist_d;
941                     if (det > 0)
942                     {
943                         dh[d1] = comm->cutoff - (c*dist_d + std::sqrt(det))/(1 + c*c);
944                     }
945                     else
946                     {
947                         /* A negative value signals out of range */
948                         dh[d1] = -1;
949                     }
950                 }
951             }
952
953             /* Accumulate the extremes over all pulses */
954             for (i = 0; i < buf_size; i++)
955             {
956                 if (p == 0)
957                 {
958                     buf_e[i] = buf_r[i];
959                 }
960                 else
961                 {
962                     if (bUse)
963                     {
964                         buf_e[i].min0 = std::min(buf_e[i].min0, buf_r[i].min0);
965                         buf_e[i].max1 = std::max(buf_e[i].max1, buf_r[i].max1);
966                         buf_e[i].min1 = std::min(buf_e[i].min1, buf_r[i].min1);
967                     }
968
969                     if (dd->ndim == 3 && d == 0 && i == buf_size - 1)
970                     {
971                         d1 = 1;
972                     }
973                     else
974                     {
975                         d1 = d + 1;
976                     }
977                     if (bUse && dh[d1] >= 0)
978                     {
979                         buf_e[i].mch0 = std::max(buf_e[i].mch0, buf_r[i].mch0-dh[d1]);
980                         buf_e[i].mch1 = std::max(buf_e[i].mch1, buf_r[i].mch1-dh[d1]);
981                     }
982                 }
983                 /* Copy the received buffer to the send buffer,
984                  * to pass the data through with the next pulse.
985                  */
986                 buf_s[i] = buf_r[i];
987             }
988             if (((bPBC || dd->ci[dim]+npulse < dd->nc[dim]) && p == npulse-1) ||
989                 (!bPBC && dd->ci[dim]+1+p == dd->nc[dim]-1))
990             {
991                 /* Store the extremes */
992                 pos = 0;
993
994                 for (d1 = d; d1 < dd->ndim-1; d1++)
995                 {
996                     extr_s[d1][1] = std::min(extr_s[d1][1], buf_e[pos].min0);
997                     extr_s[d1][0] = std::max(extr_s[d1][0], buf_e[pos].max1);
998                     extr_s[d1][2] = std::min(extr_s[d1][2], buf_e[pos].min1);
999                     pos++;
1000                 }
1001
1002                 if (d == 1 || (d == 0 && dd->ndim == 3))
1003                 {
1004                     for (i = d; i < 2; i++)
1005                     {
1006                         comm->zone_d2[1-d][i] = buf_e[pos];
1007                         pos++;
1008                     }
1009                 }
1010                 if (d == 0)
1011                 {
1012                     comm->zone_d1[1] = buf_e[pos];
1013                     pos++;
1014                 }
1015             }
1016         }
1017     }
1018
1019     if (dd->ndim >= 2)
1020     {
1021         dim = dd->dim[1];
1022         for (i = 0; i < 2; i++)
1023         {
1024             if (debug)
1025             {
1026                 print_ddzone(debug, 1, i, 0, &comm->zone_d1[i]);
1027             }
1028             cell_ns_x0[dim] = std::min(cell_ns_x0[dim], comm->zone_d1[i].min0);
1029             cell_ns_x1[dim] = std::max(cell_ns_x1[dim], comm->zone_d1[i].max1);
1030         }
1031     }
1032     if (dd->ndim >= 3)
1033     {
1034         dim = dd->dim[2];
1035         for (i = 0; i < 2; i++)
1036         {
1037             for (j = 0; j < 2; j++)
1038             {
1039                 if (debug)
1040                 {
1041                     print_ddzone(debug, 2, i, j, &comm->zone_d2[i][j]);
1042                 }
1043                 cell_ns_x0[dim] = std::min(cell_ns_x0[dim], comm->zone_d2[i][j].min0);
1044                 cell_ns_x1[dim] = std::max(cell_ns_x1[dim], comm->zone_d2[i][j].max1);
1045             }
1046         }
1047     }
1048     for (d = 1; d < dd->ndim; d++)
1049     {
1050         comm->cell_f_max0[d] = extr_s[d-1][0];
1051         comm->cell_f_min1[d] = extr_s[d-1][1];
1052         if (debug)
1053         {
1054             fprintf(debug, "Cell fraction d %d, max0 %f, min1 %f\n",
1055                     d, comm->cell_f_max0[d], comm->cell_f_min1[d]);
1056         }
1057     }
1058 }
1059
1060 static void write_dd_grid_pdb(const char *fn, gmx_int64_t step,
1061                               gmx_domdec_t *dd, matrix box, gmx_ddbox_t *ddbox)
1062 {
1063     rvec   grid_s[2], *grid_r = nullptr, cx, r;
1064     char   fname[STRLEN], buf[22];
1065     FILE  *out;
1066     int    a, i, d, z, y, x;
1067     matrix tric;
1068     real   vol;
1069
1070     copy_rvec(dd->comm->cell_x0, grid_s[0]);
1071     copy_rvec(dd->comm->cell_x1, grid_s[1]);
1072
1073     if (DDMASTER(dd))
1074     {
1075         snew(grid_r, 2*dd->nnodes);
1076     }
1077
1078     dd_gather(dd, 2*sizeof(rvec), grid_s, DDMASTER(dd) ? grid_r : nullptr);
1079
1080     if (DDMASTER(dd))
1081     {
1082         for (d = 0; d < DIM; d++)
1083         {
1084             for (i = 0; i < DIM; i++)
1085             {
1086                 if (d == i)
1087                 {
1088                     tric[d][i] = 1;
1089                 }
1090                 else
1091                 {
1092                     if (d < ddbox->npbcdim && dd->nc[d] > 1)
1093                     {
1094                         tric[d][i] = box[i][d]/box[i][i];
1095                     }
1096                     else
1097                     {
1098                         tric[d][i] = 0;
1099                     }
1100                 }
1101             }
1102         }
1103         sprintf(fname, "%s_%s.pdb", fn, gmx_step_str(step, buf));
1104         out = gmx_fio_fopen(fname, "w");
1105         gmx_write_pdb_box(out, dd->bScrewPBC ? epbcSCREW : epbcXYZ, box);
1106         a = 1;
1107         for (i = 0; i < dd->nnodes; i++)
1108         {
1109             vol = dd->nnodes/(box[XX][XX]*box[YY][YY]*box[ZZ][ZZ]);
1110             for (d = 0; d < DIM; d++)
1111             {
1112                 vol *= grid_r[i*2+1][d] - grid_r[i*2][d];
1113             }
1114             for (z = 0; z < 2; z++)
1115             {
1116                 for (y = 0; y < 2; y++)
1117                 {
1118                     for (x = 0; x < 2; x++)
1119                     {
1120                         cx[XX] = grid_r[i*2+x][XX];
1121                         cx[YY] = grid_r[i*2+y][YY];
1122                         cx[ZZ] = grid_r[i*2+z][ZZ];
1123                         mvmul(tric, cx, r);
1124                         gmx_fprintf_pdb_atomline(out, epdbATOM, a++, "CA", ' ', "GLY", ' ', i+1, ' ',
1125                                                  10*r[XX], 10*r[YY], 10*r[ZZ], 1.0, vol, "");
1126                     }
1127                 }
1128             }
1129             for (d = 0; d < DIM; d++)
1130             {
1131                 for (x = 0; x < 4; x++)
1132                 {
1133                     switch (d)
1134                     {
1135                         case 0: y = 1 + i*8 + 2*x; break;
1136                         case 1: y = 1 + i*8 + 2*x - (x % 2); break;
1137                         case 2: y = 1 + i*8 + x; break;
1138                     }
1139                     fprintf(out, "%6s%5d%5d\n", "CONECT", y, y+(1<<d));
1140                 }
1141             }
1142         }
1143         gmx_fio_fclose(out);
1144         sfree(grid_r);
1145     }
1146 }
1147
1148 void write_dd_pdb(const char *fn, gmx_int64_t step, const char *title,
1149                   const gmx_mtop_t *mtop, const t_commrec *cr,
1150                   int natoms, const rvec x[], const matrix box)
1151 {
1152     char          fname[STRLEN], buf[22];
1153     FILE         *out;
1154     int           resnr;
1155     const char   *atomname, *resname;
1156     gmx_domdec_t *dd;
1157
1158     dd = cr->dd;
1159     if (natoms == -1)
1160     {
1161         natoms = dd->comm->atomRanges.end(DDAtomRanges::Type::Vsites);
1162     }
1163
1164     sprintf(fname, "%s_%s_n%d.pdb", fn, gmx_step_str(step, buf), cr->sim_nodeid);
1165
1166     out = gmx_fio_fopen(fname, "w");
1167
1168     fprintf(out, "TITLE     %s\n", title);
1169     gmx_write_pdb_box(out, dd->bScrewPBC ? epbcSCREW : epbcXYZ, box);
1170     int molb = 0;
1171     for (int i = 0; i < natoms; i++)
1172     {
1173         int  ii = dd->globalAtomIndices[i];
1174         mtopGetAtomAndResidueName(mtop, ii, &molb, &atomname, &resnr, &resname, nullptr);
1175         int  c;
1176         real b;
1177         if (i < dd->comm->atomRanges.end(DDAtomRanges::Type::Zones))
1178         {
1179             c = 0;
1180             while (i >= dd->atomGroups().index[dd->comm->zones.cg_range[c+1]])
1181             {
1182                 c++;
1183             }
1184             b = c;
1185         }
1186         else if (i < dd->comm->atomRanges.end(DDAtomRanges::Type::Vsites))
1187         {
1188             b = dd->comm->zones.n;
1189         }
1190         else
1191         {
1192             b = dd->comm->zones.n + 1;
1193         }
1194         gmx_fprintf_pdb_atomline(out, epdbATOM, ii+1, atomname, ' ', resname, ' ', resnr, ' ',
1195                                  10*x[i][XX], 10*x[i][YY], 10*x[i][ZZ], 1.0, b, "");
1196     }
1197     fprintf(out, "TER\n");
1198
1199     gmx_fio_fclose(out);
1200 }
1201
1202 real dd_cutoff_multibody(const gmx_domdec_t *dd)
1203 {
1204     gmx_domdec_comm_t *comm;
1205     int                di;
1206     real               r;
1207
1208     comm = dd->comm;
1209
1210     r = -1;
1211     if (comm->bInterCGBondeds)
1212     {
1213         if (comm->cutoff_mbody > 0)
1214         {
1215             r = comm->cutoff_mbody;
1216         }
1217         else
1218         {
1219             /* cutoff_mbody=0 means we do not have DLB */
1220             r = comm->cellsize_min[dd->dim[0]];
1221             for (di = 1; di < dd->ndim; di++)
1222             {
1223                 r = std::min(r, comm->cellsize_min[dd->dim[di]]);
1224             }
1225             if (comm->bBondComm)
1226             {
1227                 r = std::max(r, comm->cutoff_mbody);
1228             }
1229             else
1230             {
1231                 r = std::min(r, comm->cutoff);
1232             }
1233         }
1234     }
1235
1236     return r;
1237 }
1238
1239 real dd_cutoff_twobody(const gmx_domdec_t *dd)
1240 {
1241     real r_mb;
1242
1243     r_mb = dd_cutoff_multibody(dd);
1244
1245     return std::max(dd->comm->cutoff, r_mb);
1246 }
1247
1248
1249 static void dd_cart_coord2pmecoord(const gmx_domdec_t *dd, const ivec coord,
1250                                    ivec coord_pme)
1251 {
1252     int nc, ntot;
1253
1254     nc   = dd->nc[dd->comm->cartpmedim];
1255     ntot = dd->comm->ntot[dd->comm->cartpmedim];
1256     copy_ivec(coord, coord_pme);
1257     coord_pme[dd->comm->cartpmedim] =
1258         nc + (coord[dd->comm->cartpmedim]*(ntot - nc) + (ntot - nc)/2)/nc;
1259 }
1260
1261 static int ddindex2pmeindex(const gmx_domdec_t *dd, int ddindex)
1262 {
1263     int npp, npme;
1264
1265     npp  = dd->nnodes;
1266     npme = dd->comm->npmenodes;
1267
1268     /* Here we assign a PME node to communicate with this DD node
1269      * by assuming that the major index of both is x.
1270      * We add cr->npmenodes/2 to obtain an even distribution.
1271      */
1272     return (ddindex*npme + npme/2)/npp;
1273 }
1274
1275 static int *dd_interleaved_pme_ranks(const gmx_domdec_t *dd)
1276 {
1277     int *pme_rank;
1278     int  n, i, p0, p1;
1279
1280     snew(pme_rank, dd->comm->npmenodes);
1281     n = 0;
1282     for (i = 0; i < dd->nnodes; i++)
1283     {
1284         p0 = ddindex2pmeindex(dd, i);
1285         p1 = ddindex2pmeindex(dd, i+1);
1286         if (i+1 == dd->nnodes || p1 > p0)
1287         {
1288             if (debug)
1289             {
1290                 fprintf(debug, "pme_rank[%d] = %d\n", n, i+1+n);
1291             }
1292             pme_rank[n] = i + 1 + n;
1293             n++;
1294         }
1295     }
1296
1297     return pme_rank;
1298 }
1299
1300 static int gmx_ddcoord2pmeindex(const t_commrec *cr, int x, int y, int z)
1301 {
1302     gmx_domdec_t *dd;
1303     ivec          coords;
1304     int           slab;
1305
1306     dd = cr->dd;
1307     /*
1308        if (dd->comm->bCartesian) {
1309        gmx_ddindex2xyz(dd->nc,ddindex,coords);
1310        dd_coords2pmecoords(dd,coords,coords_pme);
1311        copy_ivec(dd->ntot,nc);
1312        nc[dd->cartpmedim]         -= dd->nc[dd->cartpmedim];
1313        coords_pme[dd->cartpmedim] -= dd->nc[dd->cartpmedim];
1314
1315        slab = (coords_pme[XX]*nc[YY] + coords_pme[YY])*nc[ZZ] + coords_pme[ZZ];
1316        } else {
1317        slab = (ddindex*cr->npmenodes + cr->npmenodes/2)/dd->nnodes;
1318        }
1319      */
1320     coords[XX] = x;
1321     coords[YY] = y;
1322     coords[ZZ] = z;
1323     slab       = ddindex2pmeindex(dd, dd_index(dd->nc, coords));
1324
1325     return slab;
1326 }
1327
1328 static int ddcoord2simnodeid(const t_commrec *cr, int x, int y, int z)
1329 {
1330     gmx_domdec_comm_t *comm;
1331     ivec               coords;
1332     int                ddindex, nodeid = -1;
1333
1334     comm = cr->dd->comm;
1335
1336     coords[XX] = x;
1337     coords[YY] = y;
1338     coords[ZZ] = z;
1339     if (comm->bCartesianPP_PME)
1340     {
1341 #if GMX_MPI
1342         MPI_Cart_rank(cr->mpi_comm_mysim, coords, &nodeid);
1343 #endif
1344     }
1345     else
1346     {
1347         ddindex = dd_index(cr->dd->nc, coords);
1348         if (comm->bCartesianPP)
1349         {
1350             nodeid = comm->ddindex2simnodeid[ddindex];
1351         }
1352         else
1353         {
1354             if (comm->pmenodes)
1355             {
1356                 nodeid = ddindex + gmx_ddcoord2pmeindex(cr, x, y, z);
1357             }
1358             else
1359             {
1360                 nodeid = ddindex;
1361             }
1362         }
1363     }
1364
1365     return nodeid;
1366 }
1367
1368 static int dd_simnode2pmenode(const gmx_domdec_t         *dd,
1369                               const t_commrec gmx_unused *cr,
1370                               int                         sim_nodeid)
1371 {
1372     int pmenode = -1;
1373
1374     const gmx_domdec_comm_t *comm = dd->comm;
1375
1376     /* This assumes a uniform x domain decomposition grid cell size */
1377     if (comm->bCartesianPP_PME)
1378     {
1379 #if GMX_MPI
1380         ivec coord, coord_pme;
1381         MPI_Cart_coords(cr->mpi_comm_mysim, sim_nodeid, DIM, coord);
1382         if (coord[comm->cartpmedim] < dd->nc[comm->cartpmedim])
1383         {
1384             /* This is a PP node */
1385             dd_cart_coord2pmecoord(dd, coord, coord_pme);
1386             MPI_Cart_rank(cr->mpi_comm_mysim, coord_pme, &pmenode);
1387         }
1388 #endif
1389     }
1390     else if (comm->bCartesianPP)
1391     {
1392         if (sim_nodeid < dd->nnodes)
1393         {
1394             pmenode = dd->nnodes + ddindex2pmeindex(dd, sim_nodeid);
1395         }
1396     }
1397     else
1398     {
1399         /* This assumes DD cells with identical x coordinates
1400          * are numbered sequentially.
1401          */
1402         if (dd->comm->pmenodes == nullptr)
1403         {
1404             if (sim_nodeid < dd->nnodes)
1405             {
1406                 /* The DD index equals the nodeid */
1407                 pmenode = dd->nnodes + ddindex2pmeindex(dd, sim_nodeid);
1408             }
1409         }
1410         else
1411         {
1412             int i = 0;
1413             while (sim_nodeid > dd->comm->pmenodes[i])
1414             {
1415                 i++;
1416             }
1417             if (sim_nodeid < dd->comm->pmenodes[i])
1418             {
1419                 pmenode = dd->comm->pmenodes[i];
1420             }
1421         }
1422     }
1423
1424     return pmenode;
1425 }
1426
1427 NumPmeDomains getNumPmeDomains(const gmx_domdec_t *dd)
1428 {
1429     if (dd != nullptr)
1430     {
1431         return { dd->comm->npmenodes_x, dd->comm->npmenodes_y };
1432     }
1433     else
1434     {
1435         return { 1, 1 };
1436     }
1437 }
1438
1439 std::vector<int> get_pme_ddranks(const t_commrec *cr, int pmenodeid)
1440 {
1441     gmx_domdec_t *dd;
1442     int           x, y, z;
1443     ivec          coord, coord_pme;
1444
1445     dd = cr->dd;
1446
1447     std::vector<int> ddranks;
1448     ddranks.reserve((dd->nnodes+cr->npmenodes-1)/cr->npmenodes);
1449
1450     for (x = 0; x < dd->nc[XX]; x++)
1451     {
1452         for (y = 0; y < dd->nc[YY]; y++)
1453         {
1454             for (z = 0; z < dd->nc[ZZ]; z++)
1455             {
1456                 if (dd->comm->bCartesianPP_PME)
1457                 {
1458                     coord[XX] = x;
1459                     coord[YY] = y;
1460                     coord[ZZ] = z;
1461                     dd_cart_coord2pmecoord(dd, coord, coord_pme);
1462                     if (dd->ci[XX] == coord_pme[XX] &&
1463                         dd->ci[YY] == coord_pme[YY] &&
1464                         dd->ci[ZZ] == coord_pme[ZZ])
1465                     {
1466                         ddranks.push_back(ddcoord2simnodeid(cr, x, y, z));
1467                     }
1468                 }
1469                 else
1470                 {
1471                     /* The slab corresponds to the nodeid in the PME group */
1472                     if (gmx_ddcoord2pmeindex(cr, x, y, z) == pmenodeid)
1473                     {
1474                         ddranks.push_back(ddcoord2simnodeid(cr, x, y, z));
1475                     }
1476                 }
1477             }
1478         }
1479     }
1480     return ddranks;
1481 }
1482
1483 static gmx_bool receive_vir_ener(const gmx_domdec_t *dd, const t_commrec *cr)
1484 {
1485     gmx_bool bReceive = TRUE;
1486
1487     if (cr->npmenodes < dd->nnodes)
1488     {
1489         gmx_domdec_comm_t *comm = dd->comm;
1490         if (comm->bCartesianPP_PME)
1491         {
1492 #if GMX_MPI
1493             int  pmenode = dd_simnode2pmenode(dd, cr, cr->sim_nodeid);
1494             ivec coords;
1495             MPI_Cart_coords(cr->mpi_comm_mysim, cr->sim_nodeid, DIM, coords);
1496             coords[comm->cartpmedim]++;
1497             if (coords[comm->cartpmedim] < dd->nc[comm->cartpmedim])
1498             {
1499                 int rank;
1500                 MPI_Cart_rank(cr->mpi_comm_mysim, coords, &rank);
1501                 if (dd_simnode2pmenode(dd, cr, rank) == pmenode)
1502                 {
1503                     /* This is not the last PP node for pmenode */
1504                     bReceive = FALSE;
1505                 }
1506             }
1507 #else
1508             GMX_RELEASE_ASSERT(false, "Without MPI we should not have Cartesian PP-PME with #PMEnodes < #DDnodes");
1509 #endif
1510         }
1511         else
1512         {
1513             int pmenode = dd_simnode2pmenode(dd, cr, cr->sim_nodeid);
1514             if (cr->sim_nodeid+1 < cr->nnodes &&
1515                 dd_simnode2pmenode(dd, cr, cr->sim_nodeid+1) == pmenode)
1516             {
1517                 /* This is not the last PP node for pmenode */
1518                 bReceive = FALSE;
1519             }
1520         }
1521     }
1522
1523     return bReceive;
1524 }
1525
1526 static void set_zones_ncg_home(gmx_domdec_t *dd)
1527 {
1528     gmx_domdec_zones_t *zones;
1529     int                 i;
1530
1531     zones = &dd->comm->zones;
1532
1533     zones->cg_range[0] = 0;
1534     for (i = 1; i < zones->n+1; i++)
1535     {
1536         zones->cg_range[i] = dd->ncg_home;
1537     }
1538     /* zone_ncg1[0] should always be equal to ncg_home */
1539     dd->comm->zone_ncg1[0] = dd->ncg_home;
1540 }
1541
1542 static void restoreAtomGroups(gmx_domdec_t *dd,
1543                               const int *gcgs_index, const t_state *state)
1544 {
1545     gmx::ArrayRef<const int>  atomGroupsState        = state->cg_gl;
1546
1547     std::vector<int>         &globalAtomGroupIndices = dd->globalAtomGroupIndices;
1548     gmx::BlockRanges         &atomGroups             = dd->atomGroups_;
1549
1550     globalAtomGroupIndices.resize(atomGroupsState.size());
1551     atomGroups.index.resize(atomGroupsState.size() + 1);
1552
1553     /* Copy back the global charge group indices from state
1554      * and rebuild the local charge group to atom index.
1555      */
1556     int atomIndex = 0;
1557     for (unsigned int i = 0; i < atomGroupsState.size(); i++)
1558     {
1559         atomGroups.index[i]        = atomIndex;
1560         const int atomGroupGlobal  = atomGroupsState[i];
1561         globalAtomGroupIndices[i]  = atomGroupGlobal;
1562         atomIndex                 += gcgs_index[atomGroupGlobal + 1] - gcgs_index[atomGroupGlobal];
1563     }
1564     atomGroups.index[atomGroupsState.size()] = atomIndex;
1565
1566     dd->ncg_home = atomGroupsState.size();
1567     dd->comm->atomRanges.setEnd(DDAtomRanges::Type::Home, atomIndex);
1568
1569     set_zones_ncg_home(dd);
1570 }
1571
1572 static void dd_set_cginfo(gmx::ArrayRef<const int> index_gl, int cg0, int cg1,
1573                           t_forcerec *fr, char *bLocalCG)
1574 {
1575     cginfo_mb_t *cginfo_mb;
1576     int         *cginfo;
1577     int          cg;
1578
1579     if (fr != nullptr)
1580     {
1581         cginfo_mb = fr->cginfo_mb;
1582         cginfo    = fr->cginfo;
1583
1584         for (cg = cg0; cg < cg1; cg++)
1585         {
1586             cginfo[cg] = ddcginfo(cginfo_mb, index_gl[cg]);
1587         }
1588     }
1589
1590     if (bLocalCG != nullptr)
1591     {
1592         for (cg = cg0; cg < cg1; cg++)
1593         {
1594             bLocalCG[index_gl[cg]] = TRUE;
1595         }
1596     }
1597 }
1598
1599 static void make_dd_indices(gmx_domdec_t *dd,
1600                             const int *gcgs_index, int cg_start)
1601 {
1602     const int                 numZones               = dd->comm->zones.n;
1603     const int                *zone2cg                = dd->comm->zones.cg_range;
1604     const int                *zone_ncg1              = dd->comm->zone_ncg1;
1605     gmx::ArrayRef<const int>  globalAtomGroupIndices = dd->globalAtomGroupIndices;
1606     const gmx_bool            bCGs                   = dd->comm->bCGs;
1607
1608     std::vector<int>         &globalAtomIndices      = dd->globalAtomIndices;
1609
1610     if (zone2cg[1] != dd->ncg_home)
1611     {
1612         gmx_incons("dd->ncg_zone is not up to date");
1613     }
1614
1615     /* Make the local to global and global to local atom index */
1616     int a = dd->atomGroups().index[cg_start];
1617     globalAtomIndices.resize(a);
1618     for (int zone = 0; zone < numZones; zone++)
1619     {
1620         int cg0;
1621         if (zone == 0)
1622         {
1623             cg0 = cg_start;
1624         }
1625         else
1626         {
1627             cg0 = zone2cg[zone];
1628         }
1629         int cg1    = zone2cg[zone+1];
1630         int cg1_p1 = cg0 + zone_ncg1[zone];
1631
1632         for (int cg = cg0; cg < cg1; cg++)
1633         {
1634             int zone1 = zone;
1635             if (cg >= cg1_p1)
1636             {
1637                 /* Signal that this cg is from more than one pulse away */
1638                 zone1 += numZones;
1639             }
1640             int cg_gl = globalAtomGroupIndices[cg];
1641             if (bCGs)
1642             {
1643                 for (int a_gl = gcgs_index[cg_gl]; a_gl < gcgs_index[cg_gl+1]; a_gl++)
1644                 {
1645                     globalAtomIndices.push_back(a_gl);
1646                     ga2la_set(dd->ga2la, a_gl, a, zone1);
1647                     a++;
1648                 }
1649             }
1650             else
1651             {
1652                 globalAtomIndices.push_back(cg_gl);
1653                 ga2la_set(dd->ga2la, cg_gl, a, zone1);
1654                 a++;
1655             }
1656         }
1657     }
1658 }
1659
1660 static int check_bLocalCG(gmx_domdec_t *dd, int ncg_sys, const char *bLocalCG,
1661                           const char *where)
1662 {
1663     int nerr = 0;
1664     if (bLocalCG == nullptr)
1665     {
1666         return nerr;
1667     }
1668     for (size_t i = 0; i < dd->globalAtomGroupIndices.size(); i++)
1669     {
1670         if (!bLocalCG[dd->globalAtomGroupIndices[i]])
1671         {
1672             fprintf(stderr,
1673                     "DD rank %d, %s: atom group %zu, global atom group %d is not marked in bLocalCG (ncg_home %d)\n", dd->rank, where, i + 1, dd->globalAtomGroupIndices[i] + 1, dd->ncg_home);
1674             nerr++;
1675         }
1676     }
1677     size_t ngl = 0;
1678     for (int i = 0; i < ncg_sys; i++)
1679     {
1680         if (bLocalCG[i])
1681         {
1682             ngl++;
1683         }
1684     }
1685     if (ngl != dd->globalAtomGroupIndices.size())
1686     {
1687         fprintf(stderr, "DD rank %d, %s: In bLocalCG %zu atom groups are marked as local, whereas there are %zu\n", dd->rank, where, ngl, dd->globalAtomGroupIndices.size());
1688         nerr++;
1689     }
1690
1691     return nerr;
1692 }
1693
1694 static void check_index_consistency(gmx_domdec_t *dd,
1695                                     int natoms_sys, int ncg_sys,
1696                                     const char *where)
1697 {
1698     int       nerr = 0;
1699
1700     const int numAtomsInZones = dd->comm->atomRanges.end(DDAtomRanges::Type::Zones);
1701
1702     if (dd->comm->DD_debug > 1)
1703     {
1704         std::vector<int> have(natoms_sys);
1705         for (int a = 0; a < numAtomsInZones; a++)
1706         {
1707             int globalAtomIndex = dd->globalAtomIndices[a];
1708             if (have[globalAtomIndex] > 0)
1709             {
1710                 fprintf(stderr, "DD rank %d: global atom %d occurs twice: index %d and %d\n", dd->rank, globalAtomIndex + 1, have[globalAtomIndex], a+1);
1711             }
1712             else
1713             {
1714                 have[globalAtomIndex] = a + 1;
1715             }
1716         }
1717     }
1718
1719     std::vector<int> have(numAtomsInZones);
1720
1721     int              ngl = 0;
1722     for (int i = 0; i < natoms_sys; i++)
1723     {
1724         int a;
1725         int cell;
1726         if (ga2la_get(dd->ga2la, i, &a, &cell))
1727         {
1728             if (a >= numAtomsInZones)
1729             {
1730                 fprintf(stderr, "DD rank %d: global atom %d marked as local atom %d, which is larger than nat_tot (%d)\n", dd->rank, i+1, a+1, numAtomsInZones);
1731                 nerr++;
1732             }
1733             else
1734             {
1735                 have[a] = 1;
1736                 if (dd->globalAtomIndices[a] != i)
1737                 {
1738                     fprintf(stderr, "DD rank %d: global atom %d marked as local atom %d, which has global atom index %d\n", dd->rank, i+1, a+1, dd->globalAtomIndices[a]+1);
1739                     nerr++;
1740                 }
1741             }
1742             ngl++;
1743         }
1744     }
1745     if (ngl != numAtomsInZones)
1746     {
1747         fprintf(stderr,
1748                 "DD rank %d, %s: %d global atom indices, %d local atoms\n",
1749                 dd->rank, where, ngl, numAtomsInZones);
1750     }
1751     for (int a = 0; a < numAtomsInZones; a++)
1752     {
1753         if (have[a] == 0)
1754         {
1755             fprintf(stderr,
1756                     "DD rank %d, %s: local atom %d, global %d has no global index\n",
1757                     dd->rank, where, a + 1, dd->globalAtomIndices[a] + 1);
1758         }
1759     }
1760
1761     nerr += check_bLocalCG(dd, ncg_sys, dd->comm->bLocalCG, where);
1762
1763     if (nerr > 0)
1764     {
1765         gmx_fatal(FARGS, "DD rank %d, %s: %d atom(group) index inconsistencies",
1766                   dd->rank, where, nerr);
1767     }
1768 }
1769
1770 /* Clear all DD global state indices, starting from \p atomGroupStart and \p atomStart */
1771 static void clearDDStateIndices(gmx_domdec_t *dd,
1772                                 int           atomGroupStart,
1773                                 int           atomStart)
1774 {
1775     if (atomStart == 0)
1776     {
1777         /* Clear the whole list without searching */
1778         ga2la_clear(dd->ga2la);
1779     }
1780     else
1781     {
1782         const int numAtomsInZones = dd->comm->atomRanges.end(DDAtomRanges::Type::Zones);
1783         for (int i = 0; i < numAtomsInZones; i++)
1784         {
1785             ga2la_del(dd->ga2la, dd->globalAtomIndices[i]);
1786         }
1787     }
1788
1789     char *bLocalCG = dd->comm->bLocalCG;
1790     if (bLocalCG)
1791     {
1792         for (size_t atomGroup = atomGroupStart; atomGroup < dd->globalAtomGroupIndices.size(); atomGroup++)
1793         {
1794             bLocalCG[dd->globalAtomGroupIndices[atomGroup]] = FALSE;
1795         }
1796     }
1797
1798     dd_clear_local_vsite_indices(dd);
1799
1800     if (dd->constraints)
1801     {
1802         dd_clear_local_constraint_indices(dd);
1803     }
1804 }
1805
1806 static gmx_bool check_grid_jump(gmx_int64_t     step,
1807                                 gmx_domdec_t   *dd,
1808                                 real            cutoff,
1809                                 gmx_ddbox_t    *ddbox,
1810                                 gmx_bool        bFatal)
1811 {
1812     gmx_domdec_comm_t *comm;
1813     int                d, dim;
1814     real               limit, bfac;
1815     gmx_bool           bInvalid;
1816
1817     bInvalid = FALSE;
1818
1819     comm = dd->comm;
1820
1821     for (d = 1; d < dd->ndim; d++)
1822     {
1823         dim   = dd->dim[d];
1824         limit = grid_jump_limit(comm, cutoff, d);
1825         bfac  = ddbox->box_size[dim];
1826         if (ddbox->tric_dir[dim])
1827         {
1828             bfac *= ddbox->skew_fac[dim];
1829         }
1830         if ((comm->cell_f1[d] - comm->cell_f_max0[d])*bfac <  limit ||
1831                                                               (comm->cell_f0[d] - comm->cell_f_min1[d])*bfac > -limit)
1832         {
1833             bInvalid = TRUE;
1834
1835             if (bFatal)
1836             {
1837                 char buf[22];
1838
1839                 /* This error should never be triggered under normal
1840                  * circumstances, but you never know ...
1841                  */
1842                 gmx_fatal(FARGS, "step %s: The domain decomposition grid has shifted too much in the %c-direction around cell %d %d %d. This should not have happened. Running with fewer ranks might avoid this issue.",
1843                           gmx_step_str(step, buf),
1844                           dim2char(dim), dd->ci[XX], dd->ci[YY], dd->ci[ZZ]);
1845             }
1846         }
1847     }
1848
1849     return bInvalid;
1850 }
1851
1852 static float dd_force_load(gmx_domdec_comm_t *comm)
1853 {
1854     float load;
1855
1856     if (comm->eFlop)
1857     {
1858         load = comm->flop;
1859         if (comm->eFlop > 1)
1860         {
1861             load *= 1.0 + (comm->eFlop - 1)*(0.1*rand()/RAND_MAX - 0.05);
1862         }
1863     }
1864     else
1865     {
1866         load = comm->cycl[ddCyclF];
1867         if (comm->cycl_n[ddCyclF] > 1)
1868         {
1869             /* Subtract the maximum of the last n cycle counts
1870              * to get rid of possible high counts due to other sources,
1871              * for instance system activity, that would otherwise
1872              * affect the dynamic load balancing.
1873              */
1874             load -= comm->cycl_max[ddCyclF];
1875         }
1876
1877 #if GMX_MPI
1878         if (comm->cycl_n[ddCyclWaitGPU] && comm->nrank_gpu_shared > 1)
1879         {
1880             float gpu_wait, gpu_wait_sum;
1881
1882             gpu_wait = comm->cycl[ddCyclWaitGPU];
1883             if (comm->cycl_n[ddCyclF] > 1)
1884             {
1885                 /* We should remove the WaitGPU time of the same MD step
1886                  * as the one with the maximum F time, since the F time
1887                  * and the wait time are not independent.
1888                  * Furthermore, the step for the max F time should be chosen
1889                  * the same on all ranks that share the same GPU.
1890                  * But to keep the code simple, we remove the average instead.
1891                  * The main reason for artificially long times at some steps
1892                  * is spurious CPU activity or MPI time, so we don't expect
1893                  * that changes in the GPU wait time matter a lot here.
1894                  */
1895                 gpu_wait *= (comm->cycl_n[ddCyclF] - 1)/(float)comm->cycl_n[ddCyclF];
1896             }
1897             /* Sum the wait times over the ranks that share the same GPU */
1898             MPI_Allreduce(&gpu_wait, &gpu_wait_sum, 1, MPI_FLOAT, MPI_SUM,
1899                           comm->mpi_comm_gpu_shared);
1900             /* Replace the wait time by the average over the ranks */
1901             load += -gpu_wait + gpu_wait_sum/comm->nrank_gpu_shared;
1902         }
1903 #endif
1904     }
1905
1906     return load;
1907 }
1908
1909 static void set_slb_pme_dim_f(gmx_domdec_t *dd, int dim, real **dim_f)
1910 {
1911     gmx_domdec_comm_t *comm;
1912     int                i;
1913
1914     comm = dd->comm;
1915
1916     snew(*dim_f, dd->nc[dim]+1);
1917     (*dim_f)[0] = 0;
1918     for (i = 1; i < dd->nc[dim]; i++)
1919     {
1920         if (comm->slb_frac[dim])
1921         {
1922             (*dim_f)[i] = (*dim_f)[i-1] + comm->slb_frac[dim][i-1];
1923         }
1924         else
1925         {
1926             (*dim_f)[i] = (real)i/(real)dd->nc[dim];
1927         }
1928     }
1929     (*dim_f)[dd->nc[dim]] = 1;
1930 }
1931
1932 static void init_ddpme(gmx_domdec_t *dd, gmx_ddpme_t *ddpme, int dimind)
1933 {
1934     int  pmeindex, slab, nso, i;
1935     ivec xyz;
1936
1937     if (dimind == 0 && dd->dim[0] == YY && dd->comm->npmenodes_x == 1)
1938     {
1939         ddpme->dim = YY;
1940     }
1941     else
1942     {
1943         ddpme->dim = dimind;
1944     }
1945     ddpme->dim_match = (ddpme->dim == dd->dim[dimind]);
1946
1947     ddpme->nslab = (ddpme->dim == 0 ?
1948                     dd->comm->npmenodes_x :
1949                     dd->comm->npmenodes_y);
1950
1951     if (ddpme->nslab <= 1)
1952     {
1953         return;
1954     }
1955
1956     nso = dd->comm->npmenodes/ddpme->nslab;
1957     /* Determine for each PME slab the PP location range for dimension dim */
1958     snew(ddpme->pp_min, ddpme->nslab);
1959     snew(ddpme->pp_max, ddpme->nslab);
1960     for (slab = 0; slab < ddpme->nslab; slab++)
1961     {
1962         ddpme->pp_min[slab] = dd->nc[dd->dim[dimind]] - 1;
1963         ddpme->pp_max[slab] = 0;
1964     }
1965     for (i = 0; i < dd->nnodes; i++)
1966     {
1967         ddindex2xyz(dd->nc, i, xyz);
1968         /* For y only use our y/z slab.
1969          * This assumes that the PME x grid size matches the DD grid size.
1970          */
1971         if (dimind == 0 || xyz[XX] == dd->ci[XX])
1972         {
1973             pmeindex = ddindex2pmeindex(dd, i);
1974             if (dimind == 0)
1975             {
1976                 slab = pmeindex/nso;
1977             }
1978             else
1979             {
1980                 slab = pmeindex % ddpme->nslab;
1981             }
1982             ddpme->pp_min[slab] = std::min(ddpme->pp_min[slab], xyz[dimind]);
1983             ddpme->pp_max[slab] = std::max(ddpme->pp_max[slab], xyz[dimind]);
1984         }
1985     }
1986
1987     set_slb_pme_dim_f(dd, ddpme->dim, &ddpme->slb_dim_f);
1988 }
1989
1990 int dd_pme_maxshift_x(const gmx_domdec_t *dd)
1991 {
1992     if (dd->comm->ddpme[0].dim == XX)
1993     {
1994         return dd->comm->ddpme[0].maxshift;
1995     }
1996     else
1997     {
1998         return 0;
1999     }
2000 }
2001
2002 int dd_pme_maxshift_y(const gmx_domdec_t *dd)
2003 {
2004     if (dd->comm->ddpme[0].dim == YY)
2005     {
2006         return dd->comm->ddpme[0].maxshift;
2007     }
2008     else if (dd->comm->npmedecompdim >= 2 && dd->comm->ddpme[1].dim == YY)
2009     {
2010         return dd->comm->ddpme[1].maxshift;
2011     }
2012     else
2013     {
2014         return 0;
2015     }
2016 }
2017
2018 static void comm_dd_ns_cell_sizes(gmx_domdec_t *dd,
2019                                   gmx_ddbox_t *ddbox,
2020                                   rvec cell_ns_x0, rvec cell_ns_x1,
2021                                   gmx_int64_t step)
2022 {
2023     gmx_domdec_comm_t *comm;
2024     int                dim_ind, dim;
2025
2026     comm = dd->comm;
2027
2028     for (dim_ind = 0; dim_ind < dd->ndim; dim_ind++)
2029     {
2030         dim = dd->dim[dim_ind];
2031
2032         /* Without PBC we don't have restrictions on the outer cells */
2033         if (!(dim >= ddbox->npbcdim &&
2034               (dd->ci[dim] == 0 || dd->ci[dim] == dd->nc[dim] - 1)) &&
2035             isDlbOn(comm) &&
2036             (comm->cell_x1[dim] - comm->cell_x0[dim])*ddbox->skew_fac[dim] <
2037             comm->cellsize_min[dim])
2038         {
2039             char buf[22];
2040             gmx_fatal(FARGS, "step %s: The %c-size (%f) times the triclinic skew factor (%f) is smaller than the smallest allowed cell size (%f) for domain decomposition grid cell %d %d %d",
2041                       gmx_step_str(step, buf), dim2char(dim),
2042                       comm->cell_x1[dim] - comm->cell_x0[dim],
2043                       ddbox->skew_fac[dim],
2044                       dd->comm->cellsize_min[dim],
2045                       dd->ci[XX], dd->ci[YY], dd->ci[ZZ]);
2046         }
2047     }
2048
2049     if ((isDlbOn(dd->comm) && dd->ndim > 1) || ddbox->nboundeddim < DIM)
2050     {
2051         /* Communicate the boundaries and update cell_ns_x0/1 */
2052         dd_move_cellx(dd, ddbox, cell_ns_x0, cell_ns_x1);
2053         if (isDlbOn(dd->comm) && dd->ndim > 1)
2054         {
2055             check_grid_jump(step, dd, dd->comm->cutoff, ddbox, TRUE);
2056         }
2057     }
2058 }
2059
2060 void dd_cycles_add(const gmx_domdec_t *dd, float cycles, int ddCycl)
2061 {
2062     /* Note that the cycles value can be incorrect, either 0 or some
2063      * extremely large value, when our thread migrated to another core
2064      * with an unsynchronized cycle counter. If this happens less often
2065      * that once per nstlist steps, this will not cause issues, since
2066      * we later subtract the maximum value from the sum over nstlist steps.
2067      * A zero count will slightly lower the total, but that's a small effect.
2068      * Note that the main purpose of the subtraction of the maximum value
2069      * is to avoid throwing off the load balancing when stalls occur due
2070      * e.g. system activity or network congestion.
2071      */
2072     dd->comm->cycl[ddCycl] += cycles;
2073     dd->comm->cycl_n[ddCycl]++;
2074     if (cycles > dd->comm->cycl_max[ddCycl])
2075     {
2076         dd->comm->cycl_max[ddCycl] = cycles;
2077     }
2078 }
2079
2080 static double force_flop_count(t_nrnb *nrnb)
2081 {
2082     int         i;
2083     double      sum;
2084     const char *name;
2085
2086     sum = 0;
2087     for (i = 0; i < eNR_NBKERNEL_FREE_ENERGY; i++)
2088     {
2089         /* To get closer to the real timings, we half the count
2090          * for the normal loops and again half it for water loops.
2091          */
2092         name = nrnb_str(i);
2093         if (strstr(name, "W3") != nullptr || strstr(name, "W4") != nullptr)
2094         {
2095             sum += nrnb->n[i]*0.25*cost_nrnb(i);
2096         }
2097         else
2098         {
2099             sum += nrnb->n[i]*0.50*cost_nrnb(i);
2100         }
2101     }
2102     for (i = eNR_NBKERNEL_FREE_ENERGY; i <= eNR_NB14; i++)
2103     {
2104         name = nrnb_str(i);
2105         if (strstr(name, "W3") != nullptr || strstr(name, "W4") != nullptr)
2106         {
2107             sum += nrnb->n[i]*cost_nrnb(i);
2108         }
2109     }
2110     for (i = eNR_BONDS; i <= eNR_WALLS; i++)
2111     {
2112         sum += nrnb->n[i]*cost_nrnb(i);
2113     }
2114
2115     return sum;
2116 }
2117
2118 void dd_force_flop_start(gmx_domdec_t *dd, t_nrnb *nrnb)
2119 {
2120     if (dd->comm->eFlop)
2121     {
2122         dd->comm->flop -= force_flop_count(nrnb);
2123     }
2124 }
2125 void dd_force_flop_stop(gmx_domdec_t *dd, t_nrnb *nrnb)
2126 {
2127     if (dd->comm->eFlop)
2128     {
2129         dd->comm->flop += force_flop_count(nrnb);
2130         dd->comm->flop_n++;
2131     }
2132 }
2133
2134 static void clear_dd_cycle_counts(gmx_domdec_t *dd)
2135 {
2136     int i;
2137
2138     for (i = 0; i < ddCyclNr; i++)
2139     {
2140         dd->comm->cycl[i]     = 0;
2141         dd->comm->cycl_n[i]   = 0;
2142         dd->comm->cycl_max[i] = 0;
2143     }
2144     dd->comm->flop   = 0;
2145     dd->comm->flop_n = 0;
2146 }
2147
2148 static void get_load_distribution(gmx_domdec_t *dd, gmx_wallcycle_t wcycle)
2149 {
2150     gmx_domdec_comm_t *comm;
2151     domdec_load_t     *load;
2152     domdec_root_t     *root = nullptr;
2153     int                d, dim, i, pos;
2154     float              cell_frac = 0, sbuf[DD_NLOAD_MAX];
2155     gmx_bool           bSepPME;
2156
2157     if (debug)
2158     {
2159         fprintf(debug, "get_load_distribution start\n");
2160     }
2161
2162     wallcycle_start(wcycle, ewcDDCOMMLOAD);
2163
2164     comm = dd->comm;
2165
2166     bSepPME = (dd->pme_nodeid >= 0);
2167
2168     if (dd->ndim == 0 && bSepPME)
2169     {
2170         /* Without decomposition, but with PME nodes, we need the load */
2171         comm->load[0].mdf = comm->cycl[ddCyclPPduringPME];
2172         comm->load[0].pme = comm->cycl[ddCyclPME];
2173     }
2174
2175     for (d = dd->ndim-1; d >= 0; d--)
2176     {
2177         dim = dd->dim[d];
2178         /* Check if we participate in the communication in this dimension */
2179         if (d == dd->ndim-1 ||
2180             (dd->ci[dd->dim[d+1]] == 0 && dd->ci[dd->dim[dd->ndim-1]] == 0))
2181         {
2182             load = &comm->load[d];
2183             if (isDlbOn(dd->comm))
2184             {
2185                 cell_frac = comm->cell_f1[d] - comm->cell_f0[d];
2186             }
2187             pos = 0;
2188             if (d == dd->ndim-1)
2189             {
2190                 sbuf[pos++] = dd_force_load(comm);
2191                 sbuf[pos++] = sbuf[0];
2192                 if (isDlbOn(dd->comm))
2193                 {
2194                     sbuf[pos++] = sbuf[0];
2195                     sbuf[pos++] = cell_frac;
2196                     if (d > 0)
2197                     {
2198                         sbuf[pos++] = comm->cell_f_max0[d];
2199                         sbuf[pos++] = comm->cell_f_min1[d];
2200                     }
2201                 }
2202                 if (bSepPME)
2203                 {
2204                     sbuf[pos++] = comm->cycl[ddCyclPPduringPME];
2205                     sbuf[pos++] = comm->cycl[ddCyclPME];
2206                 }
2207             }
2208             else
2209             {
2210                 sbuf[pos++] = comm->load[d+1].sum;
2211                 sbuf[pos++] = comm->load[d+1].max;
2212                 if (isDlbOn(dd->comm))
2213                 {
2214                     sbuf[pos++] = comm->load[d+1].sum_m;
2215                     sbuf[pos++] = comm->load[d+1].cvol_min*cell_frac;
2216                     sbuf[pos++] = comm->load[d+1].flags;
2217                     if (d > 0)
2218                     {
2219                         sbuf[pos++] = comm->cell_f_max0[d];
2220                         sbuf[pos++] = comm->cell_f_min1[d];
2221                     }
2222                 }
2223                 if (bSepPME)
2224                 {
2225                     sbuf[pos++] = comm->load[d+1].mdf;
2226                     sbuf[pos++] = comm->load[d+1].pme;
2227                 }
2228             }
2229             load->nload = pos;
2230             /* Communicate a row in DD direction d.
2231              * The communicators are setup such that the root always has rank 0.
2232              */
2233 #if GMX_MPI
2234             MPI_Gather(sbuf, load->nload*sizeof(float), MPI_BYTE,
2235                        load->load, load->nload*sizeof(float), MPI_BYTE,
2236                        0, comm->mpi_comm_load[d]);
2237 #endif
2238             if (dd->ci[dim] == dd->master_ci[dim])
2239             {
2240                 /* We are the root, process this row */
2241                 if (isDlbOn(comm))
2242                 {
2243                     root = comm->root[d];
2244                 }
2245                 load->sum      = 0;
2246                 load->max      = 0;
2247                 load->sum_m    = 0;
2248                 load->cvol_min = 1;
2249                 load->flags    = 0;
2250                 load->mdf      = 0;
2251                 load->pme      = 0;
2252                 pos            = 0;
2253                 for (i = 0; i < dd->nc[dim]; i++)
2254                 {
2255                     load->sum += load->load[pos++];
2256                     load->max  = std::max(load->max, load->load[pos]);
2257                     pos++;
2258                     if (isDlbOn(dd->comm))
2259                     {
2260                         if (root->bLimited)
2261                         {
2262                             /* This direction could not be load balanced properly,
2263                              * therefore we need to use the maximum iso the average load.
2264                              */
2265                             load->sum_m = std::max(load->sum_m, load->load[pos]);
2266                         }
2267                         else
2268                         {
2269                             load->sum_m += load->load[pos];
2270                         }
2271                         pos++;
2272                         load->cvol_min = std::min(load->cvol_min, load->load[pos]);
2273                         pos++;
2274                         if (d < dd->ndim-1)
2275                         {
2276                             load->flags = (int)(load->load[pos++] + 0.5);
2277                         }
2278                         if (d > 0)
2279                         {
2280                             root->cell_f_max0[i] = load->load[pos++];
2281                             root->cell_f_min1[i] = load->load[pos++];
2282                         }
2283                     }
2284                     if (bSepPME)
2285                     {
2286                         load->mdf = std::max(load->mdf, load->load[pos]);
2287                         pos++;
2288                         load->pme = std::max(load->pme, load->load[pos]);
2289                         pos++;
2290                     }
2291                 }
2292                 if (isDlbOn(comm) && root->bLimited)
2293                 {
2294                     load->sum_m *= dd->nc[dim];
2295                     load->flags |= (1<<d);
2296                 }
2297             }
2298         }
2299     }
2300
2301     if (DDMASTER(dd))
2302     {
2303         comm->nload      += dd_load_count(comm);
2304         comm->load_step  += comm->cycl[ddCyclStep];
2305         comm->load_sum   += comm->load[0].sum;
2306         comm->load_max   += comm->load[0].max;
2307         if (isDlbOn(comm))
2308         {
2309             for (d = 0; d < dd->ndim; d++)
2310             {
2311                 if (comm->load[0].flags & (1<<d))
2312                 {
2313                     comm->load_lim[d]++;
2314                 }
2315             }
2316         }
2317         if (bSepPME)
2318         {
2319             comm->load_mdf += comm->load[0].mdf;
2320             comm->load_pme += comm->load[0].pme;
2321         }
2322     }
2323
2324     wallcycle_stop(wcycle, ewcDDCOMMLOAD);
2325
2326     if (debug)
2327     {
2328         fprintf(debug, "get_load_distribution finished\n");
2329     }
2330 }
2331
2332 static float dd_force_load_fraction(gmx_domdec_t *dd)
2333 {
2334     /* Return the relative performance loss on the total run time
2335      * due to the force calculation load imbalance.
2336      */
2337     if (dd->comm->nload > 0 && dd->comm->load_step > 0)
2338     {
2339         return dd->comm->load_sum/(dd->comm->load_step*dd->nnodes);
2340     }
2341     else
2342     {
2343         return 0;
2344     }
2345 }
2346
2347 static float dd_force_imb_perf_loss(gmx_domdec_t *dd)
2348 {
2349     /* Return the relative performance loss on the total run time
2350      * due to the force calculation load imbalance.
2351      */
2352     if (dd->comm->nload > 0 && dd->comm->load_step > 0)
2353     {
2354         return
2355             (dd->comm->load_max*dd->nnodes - dd->comm->load_sum)/
2356             (dd->comm->load_step*dd->nnodes);
2357     }
2358     else
2359     {
2360         return 0;
2361     }
2362 }
2363
2364 static void print_dd_load_av(FILE *fplog, gmx_domdec_t *dd)
2365 {
2366     gmx_domdec_comm_t *comm = dd->comm;
2367
2368     /* Only the master rank prints loads and only if we measured loads */
2369     if (!DDMASTER(dd) || comm->nload == 0)
2370     {
2371         return;
2372     }
2373
2374     char  buf[STRLEN];
2375     int   numPpRanks   = dd->nnodes;
2376     int   numPmeRanks  = (dd->pme_nodeid >= 0) ? comm->npmenodes : 0;
2377     int   numRanks     = numPpRanks + numPmeRanks;
2378     float lossFraction = 0;
2379
2380     /* Print the average load imbalance and performance loss */
2381     if (dd->nnodes > 1 && comm->load_sum > 0)
2382     {
2383         float imbalance = comm->load_max*numPpRanks/comm->load_sum - 1;
2384         lossFraction    = dd_force_imb_perf_loss(dd);
2385
2386         std::string msg         = "\n Dynamic load balancing report:\n";
2387         std::string dlbStateStr = "";
2388
2389         switch (dd->comm->dlbState)
2390         {
2391             case edlbsOffUser:
2392                 dlbStateStr = "DLB was off during the run per user request.";
2393                 break;
2394             case edlbsOffForever:
2395                 /* Currectly this can happen due to performance loss observed, cell size
2396                  * limitations or incompatibility with other settings observed during
2397                  * determineInitialDlbState(). */
2398                 dlbStateStr = "DLB got disabled because it was unsuitable to use.";
2399                 break;
2400             case edlbsOffCanTurnOn:
2401                 dlbStateStr = "DLB was off during the run due to low measured imbalance.";
2402                 break;
2403             case edlbsOffTemporarilyLocked:
2404                 dlbStateStr = "DLB was locked at the end of the run due to unfinished PP-PME balancing.";
2405                 break;
2406             case edlbsOnCanTurnOff:
2407                 dlbStateStr = "DLB was turned on during the run due to measured imbalance.";
2408                 break;
2409             case edlbsOnUser:
2410                 dlbStateStr = "DLB was permanently on during the run per user request.";
2411                 break;
2412             default:
2413                 GMX_ASSERT(false, "Undocumented DLB state");
2414         }
2415
2416         msg += " " + dlbStateStr + "\n";
2417         msg += gmx::formatString(" Average load imbalance: %.1f%%.\n", imbalance*100);
2418         msg += gmx::formatString(" The balanceable part of the MD step is %d%%, load imbalance is computed from this.\n",
2419                                  static_cast<int>(dd_force_load_fraction(dd)*100 + 0.5));
2420         msg += gmx::formatString(" Part of the total run time spent waiting due to load imbalance: %.1f%%.\n",
2421                                  lossFraction*100);
2422         fprintf(fplog, "%s", msg.c_str());
2423         fprintf(stderr, "%s", msg.c_str());
2424     }
2425
2426     /* Print during what percentage of steps the  load balancing was limited */
2427     bool dlbWasLimited = false;
2428     if (isDlbOn(comm))
2429     {
2430         sprintf(buf, " Steps where the load balancing was limited by -rdd, -rcon and/or -dds:");
2431         for (int d = 0; d < dd->ndim; d++)
2432         {
2433             int limitPercentage = (200*comm->load_lim[d] + 1)/(2*comm->nload);
2434             sprintf(buf+strlen(buf), " %c %d %%",
2435                     dim2char(dd->dim[d]), limitPercentage);
2436             if (limitPercentage >= 50)
2437             {
2438                 dlbWasLimited = true;
2439             }
2440         }
2441         sprintf(buf + strlen(buf), "\n");
2442         fprintf(fplog, "%s", buf);
2443         fprintf(stderr, "%s", buf);
2444     }
2445
2446     /* Print the performance loss due to separate PME - PP rank imbalance */
2447     float lossFractionPme = 0;
2448     if (numPmeRanks > 0 && comm->load_mdf > 0 && comm->load_step > 0)
2449     {
2450         float pmeForceRatio = comm->load_pme/comm->load_mdf;
2451         lossFractionPme     = (comm->load_pme - comm->load_mdf)/comm->load_step;
2452         if (lossFractionPme <= 0)
2453         {
2454             lossFractionPme *= numPmeRanks/static_cast<float>(numRanks);
2455         }
2456         else
2457         {
2458             lossFractionPme *= numPpRanks/static_cast<float>(numRanks);
2459         }
2460         sprintf(buf, " Average PME mesh/force load: %5.3f\n", pmeForceRatio);
2461         fprintf(fplog, "%s", buf);
2462         fprintf(stderr, "%s", buf);
2463         sprintf(buf, " Part of the total run time spent waiting due to PP/PME imbalance: %.1f %%\n", fabs(lossFractionPme)*100);
2464         fprintf(fplog, "%s", buf);
2465         fprintf(stderr, "%s", buf);
2466     }
2467     fprintf(fplog, "\n");
2468     fprintf(stderr, "\n");
2469
2470     if (lossFraction >= DD_PERF_LOSS_WARN)
2471     {
2472         sprintf(buf,
2473                 "NOTE: %.1f %% of the available CPU time was lost due to load imbalance\n"
2474                 "      in the domain decomposition.\n", lossFraction*100);
2475         if (!isDlbOn(comm))
2476         {
2477             sprintf(buf+strlen(buf), "      You might want to use dynamic load balancing (option -dlb.)\n");
2478         }
2479         else if (dlbWasLimited)
2480         {
2481             sprintf(buf+strlen(buf), "      You might want to decrease the cell size limit (options -rdd, -rcon and/or -dds).\n");
2482         }
2483         fprintf(fplog, "%s\n", buf);
2484         fprintf(stderr, "%s\n", buf);
2485     }
2486     if (numPmeRanks > 0 && fabs(lossFractionPme) >= DD_PERF_LOSS_WARN)
2487     {
2488         sprintf(buf,
2489                 "NOTE: %.1f %% performance was lost because the PME ranks\n"
2490                 "      had %s work to do than the PP ranks.\n"
2491                 "      You might want to %s the number of PME ranks\n"
2492                 "      or %s the cut-off and the grid spacing.\n",
2493                 fabs(lossFractionPme*100),
2494                 (lossFractionPme < 0) ? "less"     : "more",
2495                 (lossFractionPme < 0) ? "decrease" : "increase",
2496                 (lossFractionPme < 0) ? "decrease" : "increase");
2497         fprintf(fplog, "%s\n", buf);
2498         fprintf(stderr, "%s\n", buf);
2499     }
2500 }
2501
2502 static float dd_vol_min(gmx_domdec_t *dd)
2503 {
2504     return dd->comm->load[0].cvol_min*dd->nnodes;
2505 }
2506
2507 static gmx_bool dd_load_flags(gmx_domdec_t *dd)
2508 {
2509     return dd->comm->load[0].flags;
2510 }
2511
2512 static float dd_f_imbal(gmx_domdec_t *dd)
2513 {
2514     if (dd->comm->load[0].sum > 0)
2515     {
2516         return dd->comm->load[0].max*dd->nnodes/dd->comm->load[0].sum - 1.0f;
2517     }
2518     else
2519     {
2520         /* Something is wrong in the cycle counting, report no load imbalance */
2521         return 0.0f;
2522     }
2523 }
2524
2525 float dd_pme_f_ratio(gmx_domdec_t *dd)
2526 {
2527     /* Should only be called on the DD master rank */
2528     assert(DDMASTER(dd));
2529
2530     if (dd->comm->load[0].mdf > 0 && dd->comm->cycl_n[ddCyclPME] > 0)
2531     {
2532         return dd->comm->load[0].pme/dd->comm->load[0].mdf;
2533     }
2534     else
2535     {
2536         return -1.0;
2537     }
2538 }
2539
2540 static void dd_print_load(FILE *fplog, gmx_domdec_t *dd, gmx_int64_t step)
2541 {
2542     int  flags, d;
2543     char buf[22];
2544
2545     flags = dd_load_flags(dd);
2546     if (flags)
2547     {
2548         fprintf(fplog,
2549                 "DD  load balancing is limited by minimum cell size in dimension");
2550         for (d = 0; d < dd->ndim; d++)
2551         {
2552             if (flags & (1<<d))
2553             {
2554                 fprintf(fplog, " %c", dim2char(dd->dim[d]));
2555             }
2556         }
2557         fprintf(fplog, "\n");
2558     }
2559     fprintf(fplog, "DD  step %s", gmx_step_str(step, buf));
2560     if (isDlbOn(dd->comm))
2561     {
2562         fprintf(fplog, "  vol min/aver %5.3f%c",
2563                 dd_vol_min(dd), flags ? '!' : ' ');
2564     }
2565     if (dd->nnodes > 1)
2566     {
2567         fprintf(fplog, " load imb.: force %4.1f%%", dd_f_imbal(dd)*100);
2568     }
2569     if (dd->comm->cycl_n[ddCyclPME])
2570     {
2571         fprintf(fplog, "  pme mesh/force %5.3f", dd_pme_f_ratio(dd));
2572     }
2573     fprintf(fplog, "\n\n");
2574 }
2575
2576 static void dd_print_load_verbose(gmx_domdec_t *dd)
2577 {
2578     if (isDlbOn(dd->comm))
2579     {
2580         fprintf(stderr, "vol %4.2f%c ",
2581                 dd_vol_min(dd), dd_load_flags(dd) ? '!' : ' ');
2582     }
2583     if (dd->nnodes > 1)
2584     {
2585         fprintf(stderr, "imb F %2d%% ", (int)(dd_f_imbal(dd)*100+0.5));
2586     }
2587     if (dd->comm->cycl_n[ddCyclPME])
2588     {
2589         fprintf(stderr, "pme/F %4.2f ", dd_pme_f_ratio(dd));
2590     }
2591 }
2592
2593 #if GMX_MPI
2594 static void make_load_communicator(gmx_domdec_t *dd, int dim_ind, ivec loc)
2595 {
2596     MPI_Comm           c_row;
2597     int                dim, i, rank;
2598     ivec               loc_c;
2599     domdec_root_t     *root;
2600     gmx_bool           bPartOfGroup = FALSE;
2601
2602     dim = dd->dim[dim_ind];
2603     copy_ivec(loc, loc_c);
2604     for (i = 0; i < dd->nc[dim]; i++)
2605     {
2606         loc_c[dim] = i;
2607         rank       = dd_index(dd->nc, loc_c);
2608         if (rank == dd->rank)
2609         {
2610             /* This process is part of the group */
2611             bPartOfGroup = TRUE;
2612         }
2613     }
2614     MPI_Comm_split(dd->mpi_comm_all, bPartOfGroup ? 0 : MPI_UNDEFINED, dd->rank,
2615                    &c_row);
2616     if (bPartOfGroup)
2617     {
2618         dd->comm->mpi_comm_load[dim_ind] = c_row;
2619         if (!isDlbDisabled(dd->comm))
2620         {
2621             if (dd->ci[dim] == dd->master_ci[dim])
2622             {
2623                 /* This is the root process of this row */
2624                 snew(dd->comm->root[dim_ind], 1);
2625                 root = dd->comm->root[dim_ind];
2626                 snew(root->cell_f, ddCellFractionBufferSize(dd, dim_ind));
2627                 snew(root->old_cell_f, dd->nc[dim]+1);
2628                 snew(root->bCellMin, dd->nc[dim]);
2629                 if (dim_ind > 0)
2630                 {
2631                     snew(root->cell_f_max0, dd->nc[dim]);
2632                     snew(root->cell_f_min1, dd->nc[dim]);
2633                     snew(root->bound_min, dd->nc[dim]);
2634                     snew(root->bound_max, dd->nc[dim]);
2635                 }
2636                 snew(root->buf_ncd, dd->nc[dim]);
2637             }
2638             else
2639             {
2640                 /* This is not a root process, we only need to receive cell_f */
2641                 snew(dd->comm->cell_f_row, ddCellFractionBufferSize(dd, dim_ind));
2642             }
2643         }
2644         if (dd->ci[dim] == dd->master_ci[dim])
2645         {
2646             snew(dd->comm->load[dim_ind].load, dd->nc[dim]*DD_NLOAD_MAX);
2647         }
2648     }
2649 }
2650 #endif
2651
2652 void dd_setup_dlb_resource_sharing(t_commrec            *cr,
2653                                    int                   gpu_id)
2654 {
2655 #if GMX_MPI
2656     int           physicalnode_id_hash;
2657     gmx_domdec_t *dd;
2658     MPI_Comm      mpi_comm_pp_physicalnode;
2659
2660     if (!thisRankHasDuty(cr, DUTY_PP) || gpu_id < 0)
2661     {
2662         /* Only ranks with short-ranged tasks (currently) use GPUs.
2663          * If we don't have GPUs assigned, there are no resources to share.
2664          */
2665         return;
2666     }
2667
2668     physicalnode_id_hash = gmx_physicalnode_id_hash();
2669
2670     dd = cr->dd;
2671
2672     if (debug)
2673     {
2674         fprintf(debug, "dd_setup_dd_dlb_gpu_sharing:\n");
2675         fprintf(debug, "DD PP rank %d physical node hash %d gpu_id %d\n",
2676                 dd->rank, physicalnode_id_hash, gpu_id);
2677     }
2678     /* Split the PP communicator over the physical nodes */
2679     /* TODO: See if we should store this (before), as it's also used for
2680      * for the nodecomm summation.
2681      */
2682     // TODO PhysicalNodeCommunicator could be extended/used to handle
2683     // the need for per-node per-group communicators.
2684     MPI_Comm_split(dd->mpi_comm_all, physicalnode_id_hash, dd->rank,
2685                    &mpi_comm_pp_physicalnode);
2686     MPI_Comm_split(mpi_comm_pp_physicalnode, gpu_id, dd->rank,
2687                    &dd->comm->mpi_comm_gpu_shared);
2688     MPI_Comm_free(&mpi_comm_pp_physicalnode);
2689     MPI_Comm_size(dd->comm->mpi_comm_gpu_shared, &dd->comm->nrank_gpu_shared);
2690
2691     if (debug)
2692     {
2693         fprintf(debug, "nrank_gpu_shared %d\n", dd->comm->nrank_gpu_shared);
2694     }
2695
2696     /* Note that some ranks could share a GPU, while others don't */
2697
2698     if (dd->comm->nrank_gpu_shared == 1)
2699     {
2700         MPI_Comm_free(&dd->comm->mpi_comm_gpu_shared);
2701     }
2702 #else
2703     GMX_UNUSED_VALUE(cr);
2704     GMX_UNUSED_VALUE(gpu_id);
2705 #endif
2706 }
2707
2708 static void make_load_communicators(gmx_domdec_t gmx_unused *dd)
2709 {
2710 #if GMX_MPI
2711     int  dim0, dim1, i, j;
2712     ivec loc;
2713
2714     if (debug)
2715     {
2716         fprintf(debug, "Making load communicators\n");
2717     }
2718
2719     snew(dd->comm->load,          std::max(dd->ndim, 1));
2720     snew(dd->comm->mpi_comm_load, std::max(dd->ndim, 1));
2721
2722     if (dd->ndim == 0)
2723     {
2724         return;
2725     }
2726
2727     clear_ivec(loc);
2728     make_load_communicator(dd, 0, loc);
2729     if (dd->ndim > 1)
2730     {
2731         dim0 = dd->dim[0];
2732         for (i = 0; i < dd->nc[dim0]; i++)
2733         {
2734             loc[dim0] = i;
2735             make_load_communicator(dd, 1, loc);
2736         }
2737     }
2738     if (dd->ndim > 2)
2739     {
2740         dim0 = dd->dim[0];
2741         for (i = 0; i < dd->nc[dim0]; i++)
2742         {
2743             loc[dim0] = i;
2744             dim1      = dd->dim[1];
2745             for (j = 0; j < dd->nc[dim1]; j++)
2746             {
2747                 loc[dim1] = j;
2748                 make_load_communicator(dd, 2, loc);
2749             }
2750         }
2751     }
2752
2753     if (debug)
2754     {
2755         fprintf(debug, "Finished making load communicators\n");
2756     }
2757 #endif
2758 }
2759
2760 /*! \brief Sets up the relation between neighboring domains and zones */
2761 static void setup_neighbor_relations(gmx_domdec_t *dd)
2762 {
2763     int                     d, dim, i, j, m;
2764     ivec                    tmp, s;
2765     gmx_domdec_zones_t     *zones;
2766     gmx_domdec_ns_ranges_t *izone;
2767
2768     for (d = 0; d < dd->ndim; d++)
2769     {
2770         dim = dd->dim[d];
2771         copy_ivec(dd->ci, tmp);
2772         tmp[dim]           = (tmp[dim] + 1) % dd->nc[dim];
2773         dd->neighbor[d][0] = ddcoord2ddnodeid(dd, tmp);
2774         copy_ivec(dd->ci, tmp);
2775         tmp[dim]           = (tmp[dim] - 1 + dd->nc[dim]) % dd->nc[dim];
2776         dd->neighbor[d][1] = ddcoord2ddnodeid(dd, tmp);
2777         if (debug)
2778         {
2779             fprintf(debug, "DD rank %d neighbor ranks in dir %d are + %d - %d\n",
2780                     dd->rank, dim,
2781                     dd->neighbor[d][0],
2782                     dd->neighbor[d][1]);
2783         }
2784     }
2785
2786     int nzone  = (1 << dd->ndim);
2787     int nizone = (1 << std::max(dd->ndim - 1, 0));
2788     assert(nizone >= 1 && nizone <= DD_MAXIZONE);
2789
2790     zones = &dd->comm->zones;
2791
2792     for (i = 0; i < nzone; i++)
2793     {
2794         m = 0;
2795         clear_ivec(zones->shift[i]);
2796         for (d = 0; d < dd->ndim; d++)
2797         {
2798             zones->shift[i][dd->dim[d]] = dd_zo[i][m++];
2799         }
2800     }
2801
2802     zones->n = nzone;
2803     for (i = 0; i < nzone; i++)
2804     {
2805         for (d = 0; d < DIM; d++)
2806         {
2807             s[d] = dd->ci[d] - zones->shift[i][d];
2808             if (s[d] < 0)
2809             {
2810                 s[d] += dd->nc[d];
2811             }
2812             else if (s[d] >= dd->nc[d])
2813             {
2814                 s[d] -= dd->nc[d];
2815             }
2816         }
2817     }
2818     zones->nizone = nizone;
2819     for (i = 0; i < zones->nizone; i++)
2820     {
2821         assert(ddNonbondedZonePairRanges[i][0] == i);
2822
2823         izone     = &zones->izone[i];
2824         /* dd_zp3 is for 3D decomposition, for fewer dimensions use only
2825          * j-zones up to nzone.
2826          */
2827         izone->j0 = std::min(ddNonbondedZonePairRanges[i][1], nzone);
2828         izone->j1 = std::min(ddNonbondedZonePairRanges[i][2], nzone);
2829         for (dim = 0; dim < DIM; dim++)
2830         {
2831             if (dd->nc[dim] == 1)
2832             {
2833                 /* All shifts should be allowed */
2834                 izone->shift0[dim] = -1;
2835                 izone->shift1[dim] = 1;
2836             }
2837             else
2838             {
2839                 /* Determine the min/max j-zone shift wrt the i-zone */
2840                 izone->shift0[dim] = 1;
2841                 izone->shift1[dim] = -1;
2842                 for (j = izone->j0; j < izone->j1; j++)
2843                 {
2844                     int shift_diff = zones->shift[j][dim] - zones->shift[i][dim];
2845                     if (shift_diff < izone->shift0[dim])
2846                     {
2847                         izone->shift0[dim] = shift_diff;
2848                     }
2849                     if (shift_diff > izone->shift1[dim])
2850                     {
2851                         izone->shift1[dim] = shift_diff;
2852                     }
2853                 }
2854             }
2855         }
2856     }
2857
2858     if (!isDlbDisabled(dd->comm))
2859     {
2860         snew(dd->comm->root, dd->ndim);
2861     }
2862
2863     if (dd->comm->bRecordLoad)
2864     {
2865         make_load_communicators(dd);
2866     }
2867 }
2868
2869 static void make_pp_communicator(FILE                 *fplog,
2870                                  gmx_domdec_t         *dd,
2871                                  t_commrec gmx_unused *cr,
2872                                  int gmx_unused        reorder)
2873 {
2874 #if GMX_MPI
2875     gmx_domdec_comm_t *comm;
2876     int                rank, *buf;
2877     ivec               periods;
2878     MPI_Comm           comm_cart;
2879
2880     comm = dd->comm;
2881
2882     if (comm->bCartesianPP)
2883     {
2884         /* Set up cartesian communication for the particle-particle part */
2885         if (fplog)
2886         {
2887             fprintf(fplog, "Will use a Cartesian communicator: %d x %d x %d\n",
2888                     dd->nc[XX], dd->nc[YY], dd->nc[ZZ]);
2889         }
2890
2891         for (int i = 0; i < DIM; i++)
2892         {
2893             periods[i] = TRUE;
2894         }
2895         MPI_Cart_create(cr->mpi_comm_mygroup, DIM, dd->nc, periods, reorder,
2896                         &comm_cart);
2897         /* We overwrite the old communicator with the new cartesian one */
2898         cr->mpi_comm_mygroup = comm_cart;
2899     }
2900
2901     dd->mpi_comm_all = cr->mpi_comm_mygroup;
2902     MPI_Comm_rank(dd->mpi_comm_all, &dd->rank);
2903
2904     if (comm->bCartesianPP_PME)
2905     {
2906         /* Since we want to use the original cartesian setup for sim,
2907          * and not the one after split, we need to make an index.
2908          */
2909         snew(comm->ddindex2ddnodeid, dd->nnodes);
2910         comm->ddindex2ddnodeid[dd_index(dd->nc, dd->ci)] = dd->rank;
2911         gmx_sumi(dd->nnodes, comm->ddindex2ddnodeid, cr);
2912         /* Get the rank of the DD master,
2913          * above we made sure that the master node is a PP node.
2914          */
2915         if (MASTER(cr))
2916         {
2917             rank = dd->rank;
2918         }
2919         else
2920         {
2921             rank = 0;
2922         }
2923         MPI_Allreduce(&rank, &dd->masterrank, 1, MPI_INT, MPI_SUM, dd->mpi_comm_all);
2924     }
2925     else if (comm->bCartesianPP)
2926     {
2927         if (cr->npmenodes == 0)
2928         {
2929             /* The PP communicator is also
2930              * the communicator for this simulation
2931              */
2932             cr->mpi_comm_mysim = cr->mpi_comm_mygroup;
2933         }
2934         cr->nodeid = dd->rank;
2935
2936         MPI_Cart_coords(dd->mpi_comm_all, dd->rank, DIM, dd->ci);
2937
2938         /* We need to make an index to go from the coordinates
2939          * to the nodeid of this simulation.
2940          */
2941         snew(comm->ddindex2simnodeid, dd->nnodes);
2942         snew(buf, dd->nnodes);
2943         if (thisRankHasDuty(cr, DUTY_PP))
2944         {
2945             buf[dd_index(dd->nc, dd->ci)] = cr->sim_nodeid;
2946         }
2947         /* Communicate the ddindex to simulation nodeid index */
2948         MPI_Allreduce(buf, comm->ddindex2simnodeid, dd->nnodes, MPI_INT, MPI_SUM,
2949                       cr->mpi_comm_mysim);
2950         sfree(buf);
2951
2952         /* Determine the master coordinates and rank.
2953          * The DD master should be the same node as the master of this sim.
2954          */
2955         for (int i = 0; i < dd->nnodes; i++)
2956         {
2957             if (comm->ddindex2simnodeid[i] == 0)
2958             {
2959                 ddindex2xyz(dd->nc, i, dd->master_ci);
2960                 MPI_Cart_rank(dd->mpi_comm_all, dd->master_ci, &dd->masterrank);
2961             }
2962         }
2963         if (debug)
2964         {
2965             fprintf(debug, "The master rank is %d\n", dd->masterrank);
2966         }
2967     }
2968     else
2969     {
2970         /* No Cartesian communicators */
2971         /* We use the rank in dd->comm->all as DD index */
2972         ddindex2xyz(dd->nc, dd->rank, dd->ci);
2973         /* The simulation master nodeid is 0, so the DD master rank is also 0 */
2974         dd->masterrank = 0;
2975         clear_ivec(dd->master_ci);
2976     }
2977 #endif
2978
2979     if (fplog)
2980     {
2981         fprintf(fplog,
2982                 "Domain decomposition rank %d, coordinates %d %d %d\n\n",
2983                 dd->rank, dd->ci[XX], dd->ci[YY], dd->ci[ZZ]);
2984     }
2985     if (debug)
2986     {
2987         fprintf(debug,
2988                 "Domain decomposition rank %d, coordinates %d %d %d\n\n",
2989                 dd->rank, dd->ci[XX], dd->ci[YY], dd->ci[ZZ]);
2990     }
2991 }
2992
2993 static void receive_ddindex2simnodeid(gmx_domdec_t         *dd,
2994                                       t_commrec            *cr)
2995 {
2996 #if GMX_MPI
2997     gmx_domdec_comm_t *comm = dd->comm;
2998
2999     if (!comm->bCartesianPP_PME && comm->bCartesianPP)
3000     {
3001         int *buf;
3002         snew(comm->ddindex2simnodeid, dd->nnodes);
3003         snew(buf, dd->nnodes);
3004         if (thisRankHasDuty(cr, DUTY_PP))
3005         {
3006             buf[dd_index(dd->nc, dd->ci)] = cr->sim_nodeid;
3007         }
3008         /* Communicate the ddindex to simulation nodeid index */
3009         MPI_Allreduce(buf, comm->ddindex2simnodeid, dd->nnodes, MPI_INT, MPI_SUM,
3010                       cr->mpi_comm_mysim);
3011         sfree(buf);
3012     }
3013 #else
3014     GMX_UNUSED_VALUE(dd);
3015     GMX_UNUSED_VALUE(cr);
3016 #endif
3017 }
3018
3019 static void split_communicator(FILE *fplog, t_commrec *cr, gmx_domdec_t *dd,
3020                                DdRankOrder gmx_unused rankOrder,
3021                                int gmx_unused reorder)
3022 {
3023     gmx_domdec_comm_t *comm;
3024     int                i;
3025     gmx_bool           bDiv[DIM];
3026 #if GMX_MPI
3027     MPI_Comm           comm_cart;
3028 #endif
3029
3030     comm = dd->comm;
3031
3032     if (comm->bCartesianPP)
3033     {
3034         for (i = 1; i < DIM; i++)
3035         {
3036             bDiv[i] = ((cr->npmenodes*dd->nc[i]) % (dd->nnodes) == 0);
3037         }
3038         if (bDiv[YY] || bDiv[ZZ])
3039         {
3040             comm->bCartesianPP_PME = TRUE;
3041             /* If we have 2D PME decomposition, which is always in x+y,
3042              * we stack the PME only nodes in z.
3043              * Otherwise we choose the direction that provides the thinnest slab
3044              * of PME only nodes as this will have the least effect
3045              * on the PP communication.
3046              * But for the PME communication the opposite might be better.
3047              */
3048             if (bDiv[ZZ] && (comm->npmenodes_y > 1 ||
3049                              !bDiv[YY] ||
3050                              dd->nc[YY] > dd->nc[ZZ]))
3051             {
3052                 comm->cartpmedim = ZZ;
3053             }
3054             else
3055             {
3056                 comm->cartpmedim = YY;
3057             }
3058             comm->ntot[comm->cartpmedim]
3059                 += (cr->npmenodes*dd->nc[comm->cartpmedim])/dd->nnodes;
3060         }
3061         else if (fplog)
3062         {
3063             fprintf(fplog, "Number of PME-only ranks (%d) is not a multiple of nx*ny (%d*%d) or nx*nz (%d*%d)\n", cr->npmenodes, dd->nc[XX], dd->nc[YY], dd->nc[XX], dd->nc[ZZ]);
3064             fprintf(fplog,
3065                     "Will not use a Cartesian communicator for PP <-> PME\n\n");
3066         }
3067     }
3068
3069     if (comm->bCartesianPP_PME)
3070     {
3071 #if GMX_MPI
3072         int  rank;
3073         ivec periods;
3074
3075         if (fplog)
3076         {
3077             fprintf(fplog, "Will use a Cartesian communicator for PP <-> PME: %d x %d x %d\n", comm->ntot[XX], comm->ntot[YY], comm->ntot[ZZ]);
3078         }
3079
3080         for (i = 0; i < DIM; i++)
3081         {
3082             periods[i] = TRUE;
3083         }
3084         MPI_Cart_create(cr->mpi_comm_mysim, DIM, comm->ntot, periods, reorder,
3085                         &comm_cart);
3086         MPI_Comm_rank(comm_cart, &rank);
3087         if (MASTER(cr) && rank != 0)
3088         {
3089             gmx_fatal(FARGS, "MPI rank 0 was renumbered by MPI_Cart_create, we do not allow this");
3090         }
3091
3092         /* With this assigment we loose the link to the original communicator
3093          * which will usually be MPI_COMM_WORLD, unless have multisim.
3094          */
3095         cr->mpi_comm_mysim = comm_cart;
3096         cr->sim_nodeid     = rank;
3097
3098         MPI_Cart_coords(cr->mpi_comm_mysim, cr->sim_nodeid, DIM, dd->ci);
3099
3100         if (fplog)
3101         {
3102             fprintf(fplog, "Cartesian rank %d, coordinates %d %d %d\n\n",
3103                     cr->sim_nodeid, dd->ci[XX], dd->ci[YY], dd->ci[ZZ]);
3104         }
3105
3106         if (dd->ci[comm->cartpmedim] < dd->nc[comm->cartpmedim])
3107         {
3108             cr->duty = DUTY_PP;
3109         }
3110         if (cr->npmenodes == 0 ||
3111             dd->ci[comm->cartpmedim] >= dd->nc[comm->cartpmedim])
3112         {
3113             cr->duty = DUTY_PME;
3114         }
3115
3116         /* Split the sim communicator into PP and PME only nodes */
3117         MPI_Comm_split(cr->mpi_comm_mysim,
3118                        getThisRankDuties(cr),
3119                        dd_index(comm->ntot, dd->ci),
3120                        &cr->mpi_comm_mygroup);
3121 #endif
3122     }
3123     else
3124     {
3125         switch (rankOrder)
3126         {
3127             case DdRankOrder::pp_pme:
3128                 if (fplog)
3129                 {
3130                     fprintf(fplog, "Order of the ranks: PP first, PME last\n");
3131                 }
3132                 break;
3133             case DdRankOrder::interleave:
3134                 /* Interleave the PP-only and PME-only ranks */
3135                 if (fplog)
3136                 {
3137                     fprintf(fplog, "Interleaving PP and PME ranks\n");
3138                 }
3139                 comm->pmenodes = dd_interleaved_pme_ranks(dd);
3140                 break;
3141             case DdRankOrder::cartesian:
3142                 break;
3143             default:
3144                 gmx_fatal(FARGS, "Invalid ddRankOrder=%d", static_cast<int>(rankOrder));
3145         }
3146
3147         if (dd_simnode2pmenode(dd, cr, cr->sim_nodeid) == -1)
3148         {
3149             cr->duty = DUTY_PME;
3150         }
3151         else
3152         {
3153             cr->duty = DUTY_PP;
3154         }
3155 #if GMX_MPI
3156         /* Split the sim communicator into PP and PME only nodes */
3157         MPI_Comm_split(cr->mpi_comm_mysim,
3158                        getThisRankDuties(cr),
3159                        cr->nodeid,
3160                        &cr->mpi_comm_mygroup);
3161         MPI_Comm_rank(cr->mpi_comm_mygroup, &cr->nodeid);
3162 #endif
3163     }
3164
3165     if (fplog)
3166     {
3167         fprintf(fplog, "This rank does only %s work.\n\n",
3168                 thisRankHasDuty(cr, DUTY_PP) ? "particle-particle" : "PME-mesh");
3169     }
3170 }
3171
3172 /*! \brief Generates the MPI communicators for domain decomposition */
3173 static void make_dd_communicators(FILE *fplog, t_commrec *cr,
3174                                   gmx_domdec_t *dd, DdRankOrder ddRankOrder)
3175 {
3176     gmx_domdec_comm_t *comm;
3177     int                CartReorder;
3178
3179     comm = dd->comm;
3180
3181     copy_ivec(dd->nc, comm->ntot);
3182
3183     comm->bCartesianPP     = (ddRankOrder == DdRankOrder::cartesian);
3184     comm->bCartesianPP_PME = FALSE;
3185
3186     /* Reorder the nodes by default. This might change the MPI ranks.
3187      * Real reordering is only supported on very few architectures,
3188      * Blue Gene is one of them.
3189      */
3190     CartReorder = (getenv("GMX_NO_CART_REORDER") == nullptr);
3191
3192     if (cr->npmenodes > 0)
3193     {
3194         /* Split the communicator into a PP and PME part */
3195         split_communicator(fplog, cr, dd, ddRankOrder, CartReorder);
3196         if (comm->bCartesianPP_PME)
3197         {
3198             /* We (possibly) reordered the nodes in split_communicator,
3199              * so it is no longer required in make_pp_communicator.
3200              */
3201             CartReorder = FALSE;
3202         }
3203     }
3204     else
3205     {
3206         /* All nodes do PP and PME */
3207 #if GMX_MPI
3208         /* We do not require separate communicators */
3209         cr->mpi_comm_mygroup = cr->mpi_comm_mysim;
3210 #endif
3211     }
3212
3213     if (thisRankHasDuty(cr, DUTY_PP))
3214     {
3215         /* Copy or make a new PP communicator */
3216         make_pp_communicator(fplog, dd, cr, CartReorder);
3217     }
3218     else
3219     {
3220         receive_ddindex2simnodeid(dd, cr);
3221     }
3222
3223     if (!thisRankHasDuty(cr, DUTY_PME))
3224     {
3225         /* Set up the commnuication to our PME node */
3226         dd->pme_nodeid           = dd_simnode2pmenode(dd, cr, cr->sim_nodeid);
3227         dd->pme_receive_vir_ener = receive_vir_ener(dd, cr);
3228         if (debug)
3229         {
3230             fprintf(debug, "My pme_nodeid %d receive ener %d\n",
3231                     dd->pme_nodeid, dd->pme_receive_vir_ener);
3232         }
3233     }
3234     else
3235     {
3236         dd->pme_nodeid = -1;
3237     }
3238
3239     if (DDMASTER(dd))
3240     {
3241         dd->ma = gmx::compat::make_unique<AtomDistribution>(dd->nc,
3242                                                             comm->cgs_gl.nr,
3243                                                             comm->cgs_gl.index[comm->cgs_gl.nr]);
3244     }
3245 }
3246
3247 static real *get_slb_frac(FILE *fplog, const char *dir, int nc, const char *size_string)
3248 {
3249     real  *slb_frac, tot;
3250     int    i, n;
3251     double dbl;
3252
3253     slb_frac = nullptr;
3254     if (nc > 1 && size_string != nullptr)
3255     {
3256         if (fplog)
3257         {
3258             fprintf(fplog, "Using static load balancing for the %s direction\n",
3259                     dir);
3260         }
3261         snew(slb_frac, nc);
3262         tot = 0;
3263         for (i = 0; i < nc; i++)
3264         {
3265             dbl = 0;
3266             sscanf(size_string, "%20lf%n", &dbl, &n);
3267             if (dbl == 0)
3268             {
3269                 gmx_fatal(FARGS, "Incorrect or not enough DD cell size entries for direction %s: '%s'", dir, size_string);
3270             }
3271             slb_frac[i]  = dbl;
3272             size_string += n;
3273             tot         += slb_frac[i];
3274         }
3275         /* Normalize */
3276         if (fplog)
3277         {
3278             fprintf(fplog, "Relative cell sizes:");
3279         }
3280         for (i = 0; i < nc; i++)
3281         {
3282             slb_frac[i] /= tot;
3283             if (fplog)
3284             {
3285                 fprintf(fplog, " %5.3f", slb_frac[i]);
3286             }
3287         }
3288         if (fplog)
3289         {
3290             fprintf(fplog, "\n");
3291         }
3292     }
3293
3294     return slb_frac;
3295 }
3296
3297 static int multi_body_bondeds_count(const gmx_mtop_t *mtop)
3298 {
3299     int                  n, nmol, ftype;
3300     gmx_mtop_ilistloop_t iloop;
3301     const t_ilist       *il;
3302
3303     n     = 0;
3304     iloop = gmx_mtop_ilistloop_init(mtop);
3305     while (gmx_mtop_ilistloop_next(iloop, &il, &nmol))
3306     {
3307         for (ftype = 0; ftype < F_NRE; ftype++)
3308         {
3309             if ((interaction_function[ftype].flags & IF_BOND) &&
3310                 NRAL(ftype) >  2)
3311             {
3312                 n += nmol*il[ftype].nr/(1 + NRAL(ftype));
3313             }
3314         }
3315     }
3316
3317     return n;
3318 }
3319
3320 static int dd_getenv(FILE *fplog, const char *env_var, int def)
3321 {
3322     char *val;
3323     int   nst;
3324
3325     nst = def;
3326     val = getenv(env_var);
3327     if (val)
3328     {
3329         if (sscanf(val, "%20d", &nst) <= 0)
3330         {
3331             nst = 1;
3332         }
3333         if (fplog)
3334         {
3335             fprintf(fplog, "Found env.var. %s = %s, using value %d\n",
3336                     env_var, val, nst);
3337         }
3338     }
3339
3340     return nst;
3341 }
3342
3343 static void dd_warning(const t_commrec *cr, FILE *fplog, const char *warn_string)
3344 {
3345     if (MASTER(cr))
3346     {
3347         fprintf(stderr, "\n%s\n", warn_string);
3348     }
3349     if (fplog)
3350     {
3351         fprintf(fplog, "\n%s\n", warn_string);
3352     }
3353 }
3354
3355 static void check_dd_restrictions(t_commrec *cr, const gmx_domdec_t *dd,
3356                                   const t_inputrec *ir, FILE *fplog)
3357 {
3358     if (ir->ePBC == epbcSCREW &&
3359         (dd->nc[XX] == 1 || dd->nc[YY] > 1 || dd->nc[ZZ] > 1))
3360     {
3361         gmx_fatal(FARGS, "With pbc=%s can only do domain decomposition in the x-direction", epbc_names[ir->ePBC]);
3362     }
3363
3364     if (ir->ns_type == ensSIMPLE)
3365     {
3366         gmx_fatal(FARGS, "Domain decomposition does not support simple neighbor searching, use grid searching or run with one MPI rank");
3367     }
3368
3369     if (ir->nstlist == 0)
3370     {
3371         gmx_fatal(FARGS, "Domain decomposition does not work with nstlist=0");
3372     }
3373
3374     if (ir->comm_mode == ecmANGULAR && ir->ePBC != epbcNONE)
3375     {
3376         dd_warning(cr, fplog, "comm-mode angular will give incorrect results when the comm group partially crosses a periodic boundary");
3377     }
3378 }
3379
3380 static real average_cellsize_min(gmx_domdec_t *dd, gmx_ddbox_t *ddbox)
3381 {
3382     int  di, d;
3383     real r;
3384
3385     r = ddbox->box_size[XX];
3386     for (di = 0; di < dd->ndim; di++)
3387     {
3388         d = dd->dim[di];
3389         /* Check using the initial average cell size */
3390         r = std::min(r, ddbox->box_size[d]*ddbox->skew_fac[d]/dd->nc[d]);
3391     }
3392
3393     return r;
3394 }
3395
3396 /*! \brief Depending on the DLB initial value return the DLB switched off state or issue an error.
3397  */
3398 static int forceDlbOffOrBail(int                cmdlineDlbState,
3399                              const std::string &reasonStr,
3400                              t_commrec         *cr,
3401                              FILE              *fplog)
3402 {
3403     std::string dlbNotSupportedErr  = "Dynamic load balancing requested, but ";
3404     std::string dlbDisableNote      = "NOTE: disabling dynamic load balancing as ";
3405
3406     if (cmdlineDlbState == edlbsOnUser)
3407     {
3408         gmx_fatal(FARGS, (dlbNotSupportedErr + reasonStr).c_str());
3409     }
3410     else if (cmdlineDlbState == edlbsOffCanTurnOn)
3411     {
3412         dd_warning(cr, fplog, (dlbDisableNote + reasonStr + "\n").c_str());
3413     }
3414     return edlbsOffForever;
3415 }
3416
3417 /*! \brief Return the dynamic load balancer's initial state based on initial conditions and user inputs.
3418  *
3419  * This function parses the parameters of "-dlb" command line option setting
3420  * corresponding state values. Then it checks the consistency of the determined
3421  * state with other run parameters and settings. As a result, the initial state
3422  * may be altered or an error may be thrown if incompatibility of options is detected.
3423  *
3424  * \param [in] fplog       Pointer to mdrun log file.
3425  * \param [in] cr          Pointer to MPI communication object.
3426  * \param [in] dlbOption   Enum value for the DLB option.
3427  * \param [in] bRecordLoad True if the load balancer is recording load information.
3428  * \param [in] mdrunOptions  Options for mdrun.
3429  * \param [in] ir          Pointer mdrun to input parameters.
3430  * \returns                DLB initial/startup state.
3431  */
3432 static int determineInitialDlbState(FILE *fplog, t_commrec *cr,
3433                                     DlbOption dlbOption, gmx_bool bRecordLoad,
3434                                     const MdrunOptions &mdrunOptions,
3435                                     const t_inputrec *ir)
3436 {
3437     int dlbState = edlbsOffCanTurnOn;
3438
3439     switch (dlbOption)
3440     {
3441         case DlbOption::turnOnWhenUseful: dlbState = edlbsOffCanTurnOn; break;
3442         case DlbOption::no:               dlbState = edlbsOffUser;      break;
3443         case DlbOption::yes:              dlbState = edlbsOnUser;       break;
3444         default: gmx_incons("Invalid dlbOption enum value");
3445     }
3446
3447     /* Reruns don't support DLB: bail or override auto mode */
3448     if (mdrunOptions.rerun)
3449     {
3450         std::string reasonStr = "it is not supported in reruns.";
3451         return forceDlbOffOrBail(dlbState, reasonStr, cr, fplog);
3452     }
3453
3454     /* Unsupported integrators */
3455     if (!EI_DYNAMICS(ir->eI))
3456     {
3457         auto reasonStr = gmx::formatString("it is only supported with dynamics, not with integrator '%s'.", EI(ir->eI));
3458         return forceDlbOffOrBail(dlbState, reasonStr, cr, fplog);
3459     }
3460
3461     /* Without cycle counters we can't time work to balance on */
3462     if (!bRecordLoad)
3463     {
3464         std::string reasonStr = "cycle counters unsupported or not enabled in the operating system kernel.";
3465         return forceDlbOffOrBail(dlbState, reasonStr, cr, fplog);
3466     }
3467
3468     if (mdrunOptions.reproducible)
3469     {
3470         std::string reasonStr = "you started a reproducible run.";
3471         switch (dlbState)
3472         {
3473             case edlbsOffUser:
3474                 break;
3475             case edlbsOffForever:
3476                 GMX_RELEASE_ASSERT(false, "edlbsOffForever is not a valid initial state");
3477                 break;
3478             case edlbsOffCanTurnOn:
3479                 return forceDlbOffOrBail(dlbState, reasonStr, cr, fplog);
3480                 break;
3481             case edlbsOnCanTurnOff:
3482                 GMX_RELEASE_ASSERT(false, "edlbsOffCanTurnOff is not a valid initial state");
3483                 break;
3484             case edlbsOnUser:
3485                 return forceDlbOffOrBail(dlbState, reasonStr + " In load balanced runs binary reproducibility cannot be ensured.", cr, fplog);
3486                 break;
3487             default:
3488                 gmx_fatal(FARGS, "Death horror: undefined case (%d) for load balancing choice", dlbState);
3489                 break;
3490         }
3491     }
3492
3493     return dlbState;
3494 }
3495
3496 static void set_dd_dim(FILE *fplog, gmx_domdec_t *dd)
3497 {
3498     dd->ndim = 0;
3499     if (getenv("GMX_DD_ORDER_ZYX") != nullptr)
3500     {
3501         /* Decomposition order z,y,x */
3502         if (fplog)
3503         {
3504             fprintf(fplog, "Using domain decomposition order z, y, x\n");
3505         }
3506         for (int dim = DIM-1; dim >= 0; dim--)
3507         {
3508             if (dd->nc[dim] > 1)
3509             {
3510                 dd->dim[dd->ndim++] = dim;
3511             }
3512         }
3513     }
3514     else
3515     {
3516         /* Decomposition order x,y,z */
3517         for (int dim = 0; dim < DIM; dim++)
3518         {
3519             if (dd->nc[dim] > 1)
3520             {
3521                 dd->dim[dd->ndim++] = dim;
3522             }
3523         }
3524     }
3525
3526     if (dd->ndim == 0)
3527     {
3528         /* Set dim[0] to avoid extra checks on ndim in several places */
3529         dd->dim[0] = XX;
3530     }
3531 }
3532
3533 static gmx_domdec_comm_t *init_dd_comm()
3534 {
3535     gmx_domdec_comm_t *comm = new gmx_domdec_comm_t;
3536
3537     comm->n_load_have    = 0;
3538     comm->n_load_collect = 0;
3539
3540     for (int i = 0; i < static_cast<int>(DDAtomRanges::Type::Number); i++)
3541     {
3542         comm->sum_nat[i] = 0;
3543     }
3544     comm->ndecomp   = 0;
3545     comm->nload     = 0;
3546     comm->load_step = 0;
3547     comm->load_sum  = 0;
3548     comm->load_max  = 0;
3549     clear_ivec(comm->load_lim);
3550     comm->load_mdf  = 0;
3551     comm->load_pme  = 0;
3552
3553     /* This should be replaced by a unique pointer */
3554     comm->balanceRegion = ddBalanceRegionAllocate();
3555
3556     return comm;
3557 }
3558
3559 /*! \brief Set the cell size and interaction limits, as well as the DD grid */
3560 static void set_dd_limits_and_grid(FILE *fplog, t_commrec *cr, gmx_domdec_t *dd,
3561                                    const DomdecOptions &options,
3562                                    const MdrunOptions &mdrunOptions,
3563                                    const gmx_mtop_t *mtop,
3564                                    const t_inputrec *ir,
3565                                    const matrix box,
3566                                    gmx::ArrayRef<const gmx::RVec> xGlobal,
3567                                    gmx_ddbox_t *ddbox)
3568 {
3569     real               r_bonded         = -1;
3570     real               r_bonded_limit   = -1;
3571     const real         tenPercentMargin = 1.1;
3572     gmx_domdec_comm_t *comm             = dd->comm;
3573
3574     dd->npbcdim   = ePBC2npbcdim(ir->ePBC);
3575     dd->bScrewPBC = (ir->ePBC == epbcSCREW);
3576
3577     dd->pme_recv_f_alloc = 0;
3578     dd->pme_recv_f_buf   = nullptr;
3579
3580     /* Initialize to GPU share count to 0, might change later */
3581     comm->nrank_gpu_shared = 0;
3582
3583     comm->dlbState         = determineInitialDlbState(fplog, cr, options.dlbOption, comm->bRecordLoad, mdrunOptions, ir);
3584     dd_dlb_set_should_check_whether_to_turn_dlb_on(dd, TRUE);
3585     /* To consider turning DLB on after 2*nstlist steps we need to check
3586      * at partitioning count 3. Thus we need to increase the first count by 2.
3587      */
3588     comm->ddPartioningCountFirstDlbOff += 2;
3589
3590     if (fplog)
3591     {
3592         fprintf(fplog, "Dynamic load balancing: %s\n",
3593                 edlbs_names[comm->dlbState]);
3594     }
3595     comm->bPMELoadBalDLBLimits = FALSE;
3596
3597     /* Allocate the charge group/atom sorting struct */
3598     snew(comm->sort, 1);
3599
3600     comm->bCGs = (ncg_mtop(mtop) < mtop->natoms);
3601
3602     comm->bInterCGBondeds = ((ncg_mtop(mtop) > gmx_mtop_num_molecules(*mtop)) ||
3603                              mtop->bIntermolecularInteractions);
3604     if (comm->bInterCGBondeds)
3605     {
3606         comm->bInterCGMultiBody = (multi_body_bondeds_count(mtop) > 0);
3607     }
3608     else
3609     {
3610         comm->bInterCGMultiBody = FALSE;
3611     }
3612
3613     dd->bInterCGcons    = inter_charge_group_constraints(*mtop);
3614     dd->bInterCGsettles = inter_charge_group_settles(*mtop);
3615
3616     if (ir->rlist == 0)
3617     {
3618         /* Set the cut-off to some very large value,
3619          * so we don't need if statements everywhere in the code.
3620          * We use sqrt, since the cut-off is squared in some places.
3621          */
3622         comm->cutoff   = GMX_CUTOFF_INF;
3623     }
3624     else
3625     {
3626         comm->cutoff   = ir->rlist;
3627     }
3628     comm->cutoff_mbody = 0;
3629
3630     comm->cellsize_limit = 0;
3631     comm->bBondComm      = FALSE;
3632
3633     /* Atoms should be able to move by up to half the list buffer size (if > 0)
3634      * within nstlist steps. Since boundaries are allowed to displace by half
3635      * a cell size, DD cells should be at least the size of the list buffer.
3636      */
3637     comm->cellsize_limit = std::max(comm->cellsize_limit,
3638                                     ir->rlist - std::max(ir->rvdw, ir->rcoulomb));
3639
3640     if (comm->bInterCGBondeds)
3641     {
3642         if (options.minimumCommunicationRange > 0)
3643         {
3644             comm->cutoff_mbody = options.minimumCommunicationRange;
3645             if (options.useBondedCommunication)
3646             {
3647                 comm->bBondComm = (comm->cutoff_mbody > comm->cutoff);
3648             }
3649             else
3650             {
3651                 comm->cutoff = std::max(comm->cutoff, comm->cutoff_mbody);
3652             }
3653             r_bonded_limit = comm->cutoff_mbody;
3654         }
3655         else if (ir->bPeriodicMols)
3656         {
3657             /* Can not easily determine the required cut-off */
3658             dd_warning(cr, fplog, "NOTE: Periodic molecules are present in this system. Because of this, the domain decomposition algorithm cannot easily determine the minimum cell size that it requires for treating bonded interactions. Instead, domain decomposition will assume that half the non-bonded cut-off will be a suitable lower bound.\n");
3659             comm->cutoff_mbody = comm->cutoff/2;
3660             r_bonded_limit     = comm->cutoff_mbody;
3661         }
3662         else
3663         {
3664             real r_2b, r_mb;
3665
3666             if (MASTER(cr))
3667             {
3668                 dd_bonded_cg_distance(fplog, mtop, ir, as_rvec_array(xGlobal.data()), box,
3669                                       options.checkBondedInteractions,
3670                                       &r_2b, &r_mb);
3671             }
3672             gmx_bcast(sizeof(r_2b), &r_2b, cr);
3673             gmx_bcast(sizeof(r_mb), &r_mb, cr);
3674
3675             /* We use an initial margin of 10% for the minimum cell size,
3676              * except when we are just below the non-bonded cut-off.
3677              */
3678             if (options.useBondedCommunication)
3679             {
3680                 if (std::max(r_2b, r_mb) > comm->cutoff)
3681                 {
3682                     r_bonded        = std::max(r_2b, r_mb);
3683                     r_bonded_limit  = tenPercentMargin*r_bonded;
3684                     comm->bBondComm = TRUE;
3685                 }
3686                 else
3687                 {
3688                     r_bonded       = r_mb;
3689                     r_bonded_limit = std::min(tenPercentMargin*r_bonded, comm->cutoff);
3690                 }
3691                 /* We determine cutoff_mbody later */
3692             }
3693             else
3694             {
3695                 /* No special bonded communication,
3696                  * simply increase the DD cut-off.
3697                  */
3698                 r_bonded_limit     = tenPercentMargin*std::max(r_2b, r_mb);
3699                 comm->cutoff_mbody = r_bonded_limit;
3700                 comm->cutoff       = std::max(comm->cutoff, comm->cutoff_mbody);
3701             }
3702         }
3703         if (fplog)
3704         {
3705             fprintf(fplog,
3706                     "Minimum cell size due to bonded interactions: %.3f nm\n",
3707                     r_bonded_limit);
3708         }
3709         comm->cellsize_limit = std::max(comm->cellsize_limit, r_bonded_limit);
3710     }
3711
3712     real rconstr = 0;
3713     if (dd->bInterCGcons && options.constraintCommunicationRange <= 0)
3714     {
3715         /* There is a cell size limit due to the constraints (P-LINCS) */
3716         rconstr = constr_r_max(fplog, mtop, ir);
3717         if (fplog)
3718         {
3719             fprintf(fplog,
3720                     "Estimated maximum distance required for P-LINCS: %.3f nm\n",
3721                     rconstr);
3722             if (rconstr > comm->cellsize_limit)
3723             {
3724                 fprintf(fplog, "This distance will limit the DD cell size, you can override this with -rcon\n");
3725             }
3726         }
3727     }
3728     else if (options.constraintCommunicationRange > 0 && fplog)
3729     {
3730         /* Here we do not check for dd->bInterCGcons,
3731          * because one can also set a cell size limit for virtual sites only
3732          * and at this point we don't know yet if there are intercg v-sites.
3733          */
3734         fprintf(fplog,
3735                 "User supplied maximum distance required for P-LINCS: %.3f nm\n",
3736                 options.constraintCommunicationRange);
3737         rconstr = options.constraintCommunicationRange;
3738     }
3739     comm->cellsize_limit = std::max(comm->cellsize_limit, rconstr);
3740
3741     comm->cgs_gl = gmx_mtop_global_cgs(mtop);
3742
3743     if (options.numCells[XX] > 0)
3744     {
3745         copy_ivec(options.numCells, dd->nc);
3746         set_dd_dim(fplog, dd);
3747         set_ddbox_cr(cr, &dd->nc, ir, box, xGlobal, ddbox);
3748
3749         if (options.numPmeRanks >= 0)
3750         {
3751             cr->npmenodes = options.numPmeRanks;
3752         }
3753         else
3754         {
3755             /* When the DD grid is set explicitly and -npme is set to auto,
3756              * don't use PME ranks. We check later if the DD grid is
3757              * compatible with the total number of ranks.
3758              */
3759             cr->npmenodes = 0;
3760         }
3761
3762         real acs = average_cellsize_min(dd, ddbox);
3763         if (acs < comm->cellsize_limit)
3764         {
3765             if (fplog)
3766             {
3767                 fprintf(fplog, "ERROR: The initial cell size (%f) is smaller than the cell size limit (%f)\n", acs, comm->cellsize_limit);
3768             }
3769             gmx_fatal_collective(FARGS, cr->mpi_comm_mysim, MASTER(cr),
3770                                  "The initial cell size (%f) is smaller than the cell size limit (%f), change options -dd, -rdd or -rcon, see the log file for details",
3771                                  acs, comm->cellsize_limit);
3772         }
3773     }
3774     else
3775     {
3776         set_ddbox_cr(cr, nullptr, ir, box, xGlobal, ddbox);
3777
3778         /* We need to choose the optimal DD grid and possibly PME nodes */
3779         real limit =
3780             dd_choose_grid(fplog, cr, dd, ir, mtop, box, ddbox,
3781                            options.numPmeRanks,
3782                            !isDlbDisabled(comm),
3783                            options.dlbScaling,
3784                            comm->cellsize_limit, comm->cutoff,
3785                            comm->bInterCGBondeds);
3786
3787         if (dd->nc[XX] == 0)
3788         {
3789             char     buf[STRLEN];
3790             gmx_bool bC = (dd->bInterCGcons && rconstr > r_bonded_limit);
3791             sprintf(buf, "Change the number of ranks or mdrun option %s%s%s",
3792                     !bC ? "-rdd" : "-rcon",
3793                     comm->dlbState != edlbsOffUser ? " or -dds" : "",
3794                     bC ? " or your LINCS settings" : "");
3795
3796             gmx_fatal_collective(FARGS, cr->mpi_comm_mysim, MASTER(cr),
3797                                  "There is no domain decomposition for %d ranks that is compatible with the given box and a minimum cell size of %g nm\n"
3798                                  "%s\n"
3799                                  "Look in the log file for details on the domain decomposition",
3800                                  cr->nnodes-cr->npmenodes, limit, buf);
3801         }
3802         set_dd_dim(fplog, dd);
3803     }
3804
3805     if (fplog)
3806     {
3807         fprintf(fplog,
3808                 "Domain decomposition grid %d x %d x %d, separate PME ranks %d\n",
3809                 dd->nc[XX], dd->nc[YY], dd->nc[ZZ], cr->npmenodes);
3810     }
3811
3812     dd->nnodes = dd->nc[XX]*dd->nc[YY]*dd->nc[ZZ];
3813     if (cr->nnodes - dd->nnodes != cr->npmenodes)
3814     {
3815         gmx_fatal_collective(FARGS, cr->mpi_comm_mysim, MASTER(cr),
3816                              "The size of the domain decomposition grid (%d) does not match the number of ranks (%d). The total number of ranks is %d",
3817                              dd->nnodes, cr->nnodes - cr->npmenodes, cr->nnodes);
3818     }
3819     if (cr->npmenodes > dd->nnodes)
3820     {
3821         gmx_fatal_collective(FARGS, cr->mpi_comm_mysim, MASTER(cr),
3822                              "The number of separate PME ranks (%d) is larger than the number of PP ranks (%d), this is not supported.", cr->npmenodes, dd->nnodes);
3823     }
3824     if (cr->npmenodes > 0)
3825     {
3826         comm->npmenodes = cr->npmenodes;
3827     }
3828     else
3829     {
3830         comm->npmenodes = dd->nnodes;
3831     }
3832
3833     if (EEL_PME(ir->coulombtype) || EVDW_PME(ir->vdwtype))
3834     {
3835         /* The following choices should match those
3836          * in comm_cost_est in domdec_setup.c.
3837          * Note that here the checks have to take into account
3838          * that the decomposition might occur in a different order than xyz
3839          * (for instance through the env.var. GMX_DD_ORDER_ZYX),
3840          * in which case they will not match those in comm_cost_est,
3841          * but since that is mainly for testing purposes that's fine.
3842          */
3843         if (dd->ndim >= 2 && dd->dim[0] == XX && dd->dim[1] == YY &&
3844             comm->npmenodes > dd->nc[XX] && comm->npmenodes % dd->nc[XX] == 0 &&
3845             getenv("GMX_PMEONEDD") == nullptr)
3846         {
3847             comm->npmedecompdim = 2;
3848             comm->npmenodes_x   = dd->nc[XX];
3849             comm->npmenodes_y   = comm->npmenodes/comm->npmenodes_x;
3850         }
3851         else
3852         {
3853             /* In case nc is 1 in both x and y we could still choose to
3854              * decompose pme in y instead of x, but we use x for simplicity.
3855              */
3856             comm->npmedecompdim = 1;
3857             if (dd->dim[0] == YY)
3858             {
3859                 comm->npmenodes_x = 1;
3860                 comm->npmenodes_y = comm->npmenodes;
3861             }
3862             else
3863             {
3864                 comm->npmenodes_x = comm->npmenodes;
3865                 comm->npmenodes_y = 1;
3866             }
3867         }
3868         if (fplog)
3869         {
3870             fprintf(fplog, "PME domain decomposition: %d x %d x %d\n",
3871                     comm->npmenodes_x, comm->npmenodes_y, 1);
3872         }
3873     }
3874     else
3875     {
3876         comm->npmedecompdim = 0;
3877         comm->npmenodes_x   = 0;
3878         comm->npmenodes_y   = 0;
3879     }
3880
3881     snew(comm->slb_frac, DIM);
3882     if (isDlbDisabled(comm))
3883     {
3884         comm->slb_frac[XX] = get_slb_frac(fplog, "x", dd->nc[XX], options.cellSizeX);
3885         comm->slb_frac[YY] = get_slb_frac(fplog, "y", dd->nc[YY], options.cellSizeY);
3886         comm->slb_frac[ZZ] = get_slb_frac(fplog, "z", dd->nc[ZZ], options.cellSizeZ);
3887     }
3888
3889     if (comm->bInterCGBondeds && comm->cutoff_mbody == 0)
3890     {
3891         if (comm->bBondComm || !isDlbDisabled(comm))
3892         {
3893             /* Set the bonded communication distance to halfway
3894              * the minimum and the maximum,
3895              * since the extra communication cost is nearly zero.
3896              */
3897             real acs           = average_cellsize_min(dd, ddbox);
3898             comm->cutoff_mbody = 0.5*(r_bonded + acs);
3899             if (!isDlbDisabled(comm))
3900             {
3901                 /* Check if this does not limit the scaling */
3902                 comm->cutoff_mbody = std::min(comm->cutoff_mbody,
3903                                               options.dlbScaling*acs);
3904             }
3905             if (!comm->bBondComm)
3906             {
3907                 /* Without bBondComm do not go beyond the n.b. cut-off */
3908                 comm->cutoff_mbody = std::min(comm->cutoff_mbody, comm->cutoff);
3909                 if (comm->cellsize_limit >= comm->cutoff)
3910                 {
3911                     /* We don't loose a lot of efficieny
3912                      * when increasing it to the n.b. cut-off.
3913                      * It can even be slightly faster, because we need
3914                      * less checks for the communication setup.
3915                      */
3916                     comm->cutoff_mbody = comm->cutoff;
3917                 }
3918             }
3919             /* Check if we did not end up below our original limit */
3920             comm->cutoff_mbody = std::max(comm->cutoff_mbody, r_bonded_limit);
3921
3922             if (comm->cutoff_mbody > comm->cellsize_limit)
3923             {
3924                 comm->cellsize_limit = comm->cutoff_mbody;
3925             }
3926         }
3927         /* Without DLB and cutoff_mbody<cutoff, cutoff_mbody is dynamic */
3928     }
3929
3930     if (debug)
3931     {
3932         fprintf(debug, "Bonded atom communication beyond the cut-off: %d\n"
3933                 "cellsize limit %f\n",
3934                 comm->bBondComm, comm->cellsize_limit);
3935     }
3936
3937     if (MASTER(cr))
3938     {
3939         check_dd_restrictions(cr, dd, ir, fplog);
3940     }
3941 }
3942
3943 static void set_dlb_limits(gmx_domdec_t *dd)
3944
3945 {
3946     for (int d = 0; d < dd->ndim; d++)
3947     {
3948         /* Set the number of pulses to the value for DLB */
3949         dd->comm->cd[d].ind.resize(dd->comm->cd[d].np_dlb);
3950
3951         dd->comm->cellsize_min[dd->dim[d]] =
3952             dd->comm->cellsize_min_dlb[dd->dim[d]];
3953     }
3954 }
3955
3956
3957 static void turn_on_dlb(FILE *fplog, const t_commrec *cr, gmx_int64_t step)
3958 {
3959     gmx_domdec_t      *dd;
3960     gmx_domdec_comm_t *comm;
3961     real               cellsize_min;
3962     int                d, nc, i;
3963
3964     dd   = cr->dd;
3965     comm = dd->comm;
3966
3967     cellsize_min = comm->cellsize_min[dd->dim[0]];
3968     for (d = 1; d < dd->ndim; d++)
3969     {
3970         cellsize_min = std::min(cellsize_min, comm->cellsize_min[dd->dim[d]]);
3971     }
3972
3973     /* Turn off DLB if we're too close to the cell size limit. */
3974     if (cellsize_min < comm->cellsize_limit*1.05)
3975     {
3976         auto str = gmx::formatString("step %" GMX_PRId64 " Measured %.1f %% performance loss due to load imbalance, "
3977                                      "but the minimum cell size is smaller than 1.05 times the cell size limit."
3978                                      "Will no longer try dynamic load balancing.\n", step, dd_force_imb_perf_loss(dd)*100);
3979         dd_warning(cr, fplog, str.c_str());
3980
3981         comm->dlbState = edlbsOffForever;
3982         return;
3983     }
3984
3985     char buf[STRLEN];
3986     sprintf(buf, "step %" GMX_PRId64 " Turning on dynamic load balancing, because the performance loss due to load imbalance is %.1f %%.\n", step, dd_force_imb_perf_loss(dd)*100);
3987     dd_warning(cr, fplog, buf);
3988     comm->dlbState = edlbsOnCanTurnOff;
3989
3990     /* Store the non-DLB performance, so we can check if DLB actually
3991      * improves performance.
3992      */
3993     GMX_RELEASE_ASSERT(comm->cycl_n[ddCyclStep] > 0, "When we turned on DLB, we should have measured cycles");
3994     comm->cyclesPerStepBeforeDLB = comm->cycl[ddCyclStep]/comm->cycl_n[ddCyclStep];
3995
3996     set_dlb_limits(dd);
3997
3998     /* We can set the required cell size info here,
3999      * so we do not need to communicate this.
4000      * The grid is completely uniform.
4001      */
4002     for (d = 0; d < dd->ndim; d++)
4003     {
4004         if (comm->root[d])
4005         {
4006             comm->load[d].sum_m = comm->load[d].sum;
4007
4008             nc = dd->nc[dd->dim[d]];
4009             for (i = 0; i < nc; i++)
4010             {
4011                 comm->root[d]->cell_f[i]    = i/(real)nc;
4012                 if (d > 0)
4013                 {
4014                     comm->root[d]->cell_f_max0[i] =  i   /(real)nc;
4015                     comm->root[d]->cell_f_min1[i] = (i+1)/(real)nc;
4016                 }
4017             }
4018             comm->root[d]->cell_f[nc] = 1.0;
4019         }
4020     }
4021 }
4022
4023 static void turn_off_dlb(FILE *fplog, const t_commrec *cr, gmx_int64_t step)
4024 {
4025     gmx_domdec_t *dd = cr->dd;
4026
4027     char          buf[STRLEN];
4028     sprintf(buf, "step %" GMX_PRId64 " Turning off dynamic load balancing, because it is degrading performance.\n", step);
4029     dd_warning(cr, fplog, buf);
4030     dd->comm->dlbState                     = edlbsOffCanTurnOn;
4031     dd->comm->haveTurnedOffDlb             = true;
4032     dd->comm->ddPartioningCountFirstDlbOff = dd->ddp_count;
4033 }
4034
4035 static void turn_off_dlb_forever(FILE *fplog, const t_commrec *cr, gmx_int64_t step)
4036 {
4037     GMX_RELEASE_ASSERT(cr->dd->comm->dlbState == edlbsOffCanTurnOn, "Can only turn off DLB forever when it was in the can-turn-on state");
4038     char buf[STRLEN];
4039     sprintf(buf, "step %" GMX_PRId64 " Will no longer try dynamic load balancing, as it degraded performance.\n", step);
4040     dd_warning(cr, fplog, buf);
4041     cr->dd->comm->dlbState = edlbsOffForever;
4042 }
4043
4044 static char *init_bLocalCG(const gmx_mtop_t *mtop)
4045 {
4046     int   ncg, cg;
4047     char *bLocalCG;
4048
4049     ncg = ncg_mtop(mtop);
4050     snew(bLocalCG, ncg);
4051     for (cg = 0; cg < ncg; cg++)
4052     {
4053         bLocalCG[cg] = FALSE;
4054     }
4055
4056     return bLocalCG;
4057 }
4058
4059 void dd_init_bondeds(FILE *fplog,
4060                      gmx_domdec_t *dd,
4061                      const gmx_mtop_t *mtop,
4062                      const gmx_vsite_t *vsite,
4063                      const t_inputrec *ir,
4064                      gmx_bool bBCheck, cginfo_mb_t *cginfo_mb)
4065 {
4066     gmx_domdec_comm_t *comm;
4067
4068     dd_make_reverse_top(fplog, dd, mtop, vsite, ir, bBCheck);
4069
4070     comm = dd->comm;
4071
4072     if (comm->bBondComm)
4073     {
4074         /* Communicate atoms beyond the cut-off for bonded interactions */
4075         comm = dd->comm;
4076
4077         comm->cglink = make_charge_group_links(mtop, dd, cginfo_mb);
4078
4079         comm->bLocalCG = init_bLocalCG(mtop);
4080     }
4081     else
4082     {
4083         /* Only communicate atoms based on cut-off */
4084         comm->cglink   = nullptr;
4085         comm->bLocalCG = nullptr;
4086     }
4087 }
4088
4089 static void print_dd_settings(FILE *fplog, gmx_domdec_t *dd,
4090                               const gmx_mtop_t *mtop, const t_inputrec *ir,
4091                               gmx_bool bDynLoadBal, real dlb_scale,
4092                               const gmx_ddbox_t *ddbox)
4093 {
4094     gmx_domdec_comm_t *comm;
4095     int                d;
4096     ivec               np;
4097     real               limit, shrink;
4098     char               buf[64];
4099
4100     if (fplog == nullptr)
4101     {
4102         return;
4103     }
4104
4105     comm = dd->comm;
4106
4107     if (bDynLoadBal)
4108     {
4109         fprintf(fplog, "The maximum number of communication pulses is:");
4110         for (d = 0; d < dd->ndim; d++)
4111         {
4112             fprintf(fplog, " %c %d", dim2char(dd->dim[d]), comm->cd[d].np_dlb);
4113         }
4114         fprintf(fplog, "\n");
4115         fprintf(fplog, "The minimum size for domain decomposition cells is %.3f nm\n", comm->cellsize_limit);
4116         fprintf(fplog, "The requested allowed shrink of DD cells (option -dds) is: %.2f\n", dlb_scale);
4117         fprintf(fplog, "The allowed shrink of domain decomposition cells is:");
4118         for (d = 0; d < DIM; d++)
4119         {
4120             if (dd->nc[d] > 1)
4121             {
4122                 if (d >= ddbox->npbcdim && dd->nc[d] == 2)
4123                 {
4124                     shrink = 0;
4125                 }
4126                 else
4127                 {
4128                     shrink =
4129                         comm->cellsize_min_dlb[d]/
4130                         (ddbox->box_size[d]*ddbox->skew_fac[d]/dd->nc[d]);
4131                 }
4132                 fprintf(fplog, " %c %.2f", dim2char(d), shrink);
4133             }
4134         }
4135         fprintf(fplog, "\n");
4136     }
4137     else
4138     {
4139         set_dd_cell_sizes_slb(dd, ddbox, setcellsizeslbPULSE_ONLY, np);
4140         fprintf(fplog, "The initial number of communication pulses is:");
4141         for (d = 0; d < dd->ndim; d++)
4142         {
4143             fprintf(fplog, " %c %d", dim2char(dd->dim[d]), np[dd->dim[d]]);
4144         }
4145         fprintf(fplog, "\n");
4146         fprintf(fplog, "The initial domain decomposition cell size is:");
4147         for (d = 0; d < DIM; d++)
4148         {
4149             if (dd->nc[d] > 1)
4150             {
4151                 fprintf(fplog, " %c %.2f nm",
4152                         dim2char(d), dd->comm->cellsize_min[d]);
4153             }
4154         }
4155         fprintf(fplog, "\n\n");
4156     }
4157
4158     gmx_bool bInterCGVsites = count_intercg_vsites(mtop);
4159
4160     if (comm->bInterCGBondeds ||
4161         bInterCGVsites ||
4162         dd->bInterCGcons || dd->bInterCGsettles)
4163     {
4164         fprintf(fplog, "The maximum allowed distance for charge groups involved in interactions is:\n");
4165         fprintf(fplog, "%40s  %-7s %6.3f nm\n",
4166                 "non-bonded interactions", "", comm->cutoff);
4167
4168         if (bDynLoadBal)
4169         {
4170             limit = dd->comm->cellsize_limit;
4171         }
4172         else
4173         {
4174             if (dynamic_dd_box(ddbox, ir))
4175             {
4176                 fprintf(fplog, "(the following are initial values, they could change due to box deformation)\n");
4177             }
4178             limit = dd->comm->cellsize_min[XX];
4179             for (d = 1; d < DIM; d++)
4180             {
4181                 limit = std::min(limit, dd->comm->cellsize_min[d]);
4182             }
4183         }
4184
4185         if (comm->bInterCGBondeds)
4186         {
4187             fprintf(fplog, "%40s  %-7s %6.3f nm\n",
4188                     "two-body bonded interactions", "(-rdd)",
4189                     std::max(comm->cutoff, comm->cutoff_mbody));
4190             fprintf(fplog, "%40s  %-7s %6.3f nm\n",
4191                     "multi-body bonded interactions", "(-rdd)",
4192                     (comm->bBondComm || isDlbOn(dd->comm)) ? comm->cutoff_mbody : std::min(comm->cutoff, limit));
4193         }
4194         if (bInterCGVsites)
4195         {
4196             fprintf(fplog, "%40s  %-7s %6.3f nm\n",
4197                     "virtual site constructions", "(-rcon)", limit);
4198         }
4199         if (dd->bInterCGcons || dd->bInterCGsettles)
4200         {
4201             sprintf(buf, "atoms separated by up to %d constraints",
4202                     1+ir->nProjOrder);
4203             fprintf(fplog, "%40s  %-7s %6.3f nm\n",
4204                     buf, "(-rcon)", limit);
4205         }
4206         fprintf(fplog, "\n");
4207     }
4208
4209     fflush(fplog);
4210 }
4211
4212 static void set_cell_limits_dlb(gmx_domdec_t      *dd,
4213                                 real               dlb_scale,
4214                                 const t_inputrec  *ir,
4215                                 const gmx_ddbox_t *ddbox)
4216 {
4217     gmx_domdec_comm_t *comm;
4218     int                d, dim, npulse, npulse_d_max, npulse_d;
4219     gmx_bool           bNoCutOff;
4220
4221     comm = dd->comm;
4222
4223     bNoCutOff = (ir->rvdw == 0 || ir->rcoulomb == 0);
4224
4225     /* Determine the maximum number of comm. pulses in one dimension */
4226
4227     comm->cellsize_limit = std::max(comm->cellsize_limit, comm->cutoff_mbody);
4228
4229     /* Determine the maximum required number of grid pulses */
4230     if (comm->cellsize_limit >= comm->cutoff)
4231     {
4232         /* Only a single pulse is required */
4233         npulse = 1;
4234     }
4235     else if (!bNoCutOff && comm->cellsize_limit > 0)
4236     {
4237         /* We round down slightly here to avoid overhead due to the latency
4238          * of extra communication calls when the cut-off
4239          * would be only slightly longer than the cell size.
4240          * Later cellsize_limit is redetermined,
4241          * so we can not miss interactions due to this rounding.
4242          */
4243         npulse = (int)(0.96 + comm->cutoff/comm->cellsize_limit);
4244     }
4245     else
4246     {
4247         /* There is no cell size limit */
4248         npulse = std::max(dd->nc[XX]-1, std::max(dd->nc[YY]-1, dd->nc[ZZ]-1));
4249     }
4250
4251     if (!bNoCutOff && npulse > 1)
4252     {
4253         /* See if we can do with less pulses, based on dlb_scale */
4254         npulse_d_max = 0;
4255         for (d = 0; d < dd->ndim; d++)
4256         {
4257             dim      = dd->dim[d];
4258             npulse_d = (int)(1 + dd->nc[dim]*comm->cutoff
4259                              /(ddbox->box_size[dim]*ddbox->skew_fac[dim]*dlb_scale));
4260             npulse_d_max = std::max(npulse_d_max, npulse_d);
4261         }
4262         npulse = std::min(npulse, npulse_d_max);
4263     }
4264
4265     /* This env var can override npulse */
4266     d = dd_getenv(debug, "GMX_DD_NPULSE", 0);
4267     if (d > 0)
4268     {
4269         npulse = d;
4270     }
4271
4272     comm->maxpulse       = 1;
4273     comm->bVacDLBNoLimit = (ir->ePBC == epbcNONE);
4274     for (d = 0; d < dd->ndim; d++)
4275     {
4276         comm->cd[d].np_dlb    = std::min(npulse, dd->nc[dd->dim[d]]-1);
4277         comm->maxpulse        = std::max(comm->maxpulse, comm->cd[d].np_dlb);
4278         if (comm->cd[d].np_dlb < dd->nc[dd->dim[d]]-1)
4279         {
4280             comm->bVacDLBNoLimit = FALSE;
4281         }
4282     }
4283
4284     /* cellsize_limit is set for LINCS in init_domain_decomposition */
4285     if (!comm->bVacDLBNoLimit)
4286     {
4287         comm->cellsize_limit = std::max(comm->cellsize_limit,
4288                                         comm->cutoff/comm->maxpulse);
4289     }
4290     comm->cellsize_limit = std::max(comm->cellsize_limit, comm->cutoff_mbody);
4291     /* Set the minimum cell size for each DD dimension */
4292     for (d = 0; d < dd->ndim; d++)
4293     {
4294         if (comm->bVacDLBNoLimit ||
4295             comm->cd[d].np_dlb*comm->cellsize_limit >= comm->cutoff)
4296         {
4297             comm->cellsize_min_dlb[dd->dim[d]] = comm->cellsize_limit;
4298         }
4299         else
4300         {
4301             comm->cellsize_min_dlb[dd->dim[d]] =
4302                 comm->cutoff/comm->cd[d].np_dlb;
4303         }
4304     }
4305     if (comm->cutoff_mbody <= 0)
4306     {
4307         comm->cutoff_mbody = std::min(comm->cutoff, comm->cellsize_limit);
4308     }
4309     if (isDlbOn(comm))
4310     {
4311         set_dlb_limits(dd);
4312     }
4313 }
4314
4315 gmx_bool dd_bonded_molpbc(const gmx_domdec_t *dd, int ePBC)
4316 {
4317     /* If each molecule is a single charge group
4318      * or we use domain decomposition for each periodic dimension,
4319      * we do not need to take pbc into account for the bonded interactions.
4320      */
4321     return (ePBC != epbcNONE && dd->comm->bInterCGBondeds &&
4322             !(dd->nc[XX] > 1 &&
4323               dd->nc[YY] > 1 &&
4324               (dd->nc[ZZ] > 1 || ePBC == epbcXY)));
4325 }
4326
4327 /*! \brief Sets grid size limits and PP-PME setup, prints settings to log */
4328 static void set_ddgrid_parameters(FILE *fplog, gmx_domdec_t *dd, real dlb_scale,
4329                                   const gmx_mtop_t *mtop, const t_inputrec *ir,
4330                                   const gmx_ddbox_t *ddbox)
4331 {
4332     gmx_domdec_comm_t *comm;
4333     int                natoms_tot;
4334     real               vol_frac;
4335
4336     comm = dd->comm;
4337
4338     if (EEL_PME(ir->coulombtype) || EVDW_PME(ir->vdwtype))
4339     {
4340         init_ddpme(dd, &comm->ddpme[0], 0);
4341         if (comm->npmedecompdim >= 2)
4342         {
4343             init_ddpme(dd, &comm->ddpme[1], 1);
4344         }
4345     }
4346     else
4347     {
4348         comm->npmenodes = 0;
4349         if (dd->pme_nodeid >= 0)
4350         {
4351             gmx_fatal_collective(FARGS, dd->mpi_comm_all, DDMASTER(dd),
4352                                  "Can not have separate PME ranks without PME electrostatics");
4353         }
4354     }
4355
4356     if (debug)
4357     {
4358         fprintf(debug, "The DD cut-off is %f\n", comm->cutoff);
4359     }
4360     if (!isDlbDisabled(comm))
4361     {
4362         set_cell_limits_dlb(dd, dlb_scale, ir, ddbox);
4363     }
4364
4365     print_dd_settings(fplog, dd, mtop, ir, isDlbOn(comm), dlb_scale, ddbox);
4366     if (comm->dlbState == edlbsOffCanTurnOn)
4367     {
4368         if (fplog)
4369         {
4370             fprintf(fplog, "When dynamic load balancing gets turned on, these settings will change to:\n");
4371         }
4372         print_dd_settings(fplog, dd, mtop, ir, TRUE, dlb_scale, ddbox);
4373     }
4374
4375     if (ir->ePBC == epbcNONE)
4376     {
4377         vol_frac = 1 - 1/(double)dd->nnodes;
4378     }
4379     else
4380     {
4381         vol_frac =
4382             (1 + comm_box_frac(dd->nc, comm->cutoff, ddbox))/(double)dd->nnodes;
4383     }
4384     if (debug)
4385     {
4386         fprintf(debug, "Volume fraction for all DD zones: %f\n", vol_frac);
4387     }
4388     natoms_tot = comm->cgs_gl.index[comm->cgs_gl.nr];
4389
4390     dd->ga2la = ga2la_init(natoms_tot, static_cast<int>(vol_frac*natoms_tot));
4391 }
4392
4393 /*! \brief Set some important DD parameters that can be modified by env.vars */
4394 static void set_dd_envvar_options(FILE *fplog, gmx_domdec_t *dd, int rank_mysim)
4395 {
4396     gmx_domdec_comm_t *comm = dd->comm;
4397
4398     dd->bSendRecv2      = dd_getenv(fplog, "GMX_DD_USE_SENDRECV2", 0);
4399     comm->dlb_scale_lim = dd_getenv(fplog, "GMX_DLB_MAX_BOX_SCALING", 10);
4400     comm->eFlop         = dd_getenv(fplog, "GMX_DLB_BASED_ON_FLOPS", 0);
4401     int recload         = dd_getenv(fplog, "GMX_DD_RECORD_LOAD", 1);
4402     comm->nstDDDump     = dd_getenv(fplog, "GMX_DD_NST_DUMP", 0);
4403     comm->nstDDDumpGrid = dd_getenv(fplog, "GMX_DD_NST_DUMP_GRID", 0);
4404     comm->DD_debug      = dd_getenv(fplog, "GMX_DD_DEBUG", 0);
4405
4406     if (dd->bSendRecv2 && fplog)
4407     {
4408         fprintf(fplog, "Will use two sequential MPI_Sendrecv calls instead of two simultaneous non-blocking MPI_Irecv and MPI_Isend pairs for constraint and vsite communication\n");
4409     }
4410
4411     if (comm->eFlop)
4412     {
4413         if (fplog)
4414         {
4415             fprintf(fplog, "Will load balance based on FLOP count\n");
4416         }
4417         if (comm->eFlop > 1)
4418         {
4419             srand(1 + rank_mysim);
4420         }
4421         comm->bRecordLoad = TRUE;
4422     }
4423     else
4424     {
4425         comm->bRecordLoad = (wallcycle_have_counter() && recload > 0);
4426     }
4427 }
4428
4429 DomdecOptions::DomdecOptions() :
4430     checkBondedInteractions(TRUE),
4431     useBondedCommunication(TRUE),
4432     numPmeRanks(-1),
4433     rankOrder(DdRankOrder::pp_pme),
4434     minimumCommunicationRange(0),
4435     constraintCommunicationRange(0),
4436     dlbOption(DlbOption::turnOnWhenUseful),
4437     dlbScaling(0.8),
4438     cellSizeX(nullptr),
4439     cellSizeY(nullptr),
4440     cellSizeZ(nullptr)
4441 {
4442     clear_ivec(numCells);
4443 }
4444
4445 gmx_domdec_t *init_domain_decomposition(FILE *fplog, t_commrec *cr,
4446                                         const DomdecOptions &options,
4447                                         const MdrunOptions &mdrunOptions,
4448                                         const gmx_mtop_t *mtop,
4449                                         const t_inputrec *ir,
4450                                         const matrix box,
4451                                         gmx::ArrayRef<const gmx::RVec> xGlobal)
4452 {
4453     gmx_domdec_t      *dd;
4454
4455     if (fplog)
4456     {
4457         fprintf(fplog,
4458                 "\nInitializing Domain Decomposition on %d ranks\n", cr->nnodes);
4459     }
4460
4461     dd = new gmx_domdec_t;
4462
4463     dd->comm = init_dd_comm();
4464
4465     /* Initialize DD paritioning counters */
4466     dd->comm->partition_step = INT_MIN;
4467     dd->ddp_count            = 0;
4468
4469     set_dd_envvar_options(fplog, dd, cr->nodeid);
4470
4471     gmx_ddbox_t ddbox = {0};
4472     set_dd_limits_and_grid(fplog, cr, dd, options, mdrunOptions,
4473                            mtop, ir,
4474                            box, xGlobal,
4475                            &ddbox);
4476
4477     make_dd_communicators(fplog, cr, dd, options.rankOrder);
4478
4479     if (thisRankHasDuty(cr, DUTY_PP))
4480     {
4481         set_ddgrid_parameters(fplog, dd, options.dlbScaling, mtop, ir, &ddbox);
4482
4483         setup_neighbor_relations(dd);
4484     }
4485
4486     /* Set overallocation to avoid frequent reallocation of arrays */
4487     set_over_alloc_dd(TRUE);
4488
4489     clear_dd_cycle_counts(dd);
4490
4491     return dd;
4492 }
4493
4494 static gmx_bool test_dd_cutoff(t_commrec *cr,
4495                                t_state *state, const t_inputrec *ir,
4496                                real cutoff_req)
4497 {
4498     gmx_domdec_t *dd;
4499     gmx_ddbox_t   ddbox;
4500     int           d, dim, np;
4501     real          inv_cell_size;
4502     int           LocallyLimited;
4503
4504     dd = cr->dd;
4505
4506     set_ddbox(dd, false, ir, state->box, true, state->x, &ddbox);
4507
4508     LocallyLimited = 0;
4509
4510     for (d = 0; d < dd->ndim; d++)
4511     {
4512         dim = dd->dim[d];
4513
4514         inv_cell_size = DD_CELL_MARGIN*dd->nc[dim]/ddbox.box_size[dim];
4515         if (dynamic_dd_box(&ddbox, ir))
4516         {
4517             inv_cell_size *= DD_PRES_SCALE_MARGIN;
4518         }
4519
4520         np = 1 + (int)(cutoff_req*inv_cell_size*ddbox.skew_fac[dim]);
4521
4522         if (!isDlbDisabled(dd->comm) && (dim < ddbox.npbcdim) && (dd->comm->cd[d].np_dlb > 0))
4523         {
4524             if (np > dd->comm->cd[d].np_dlb)
4525             {
4526                 return FALSE;
4527             }
4528
4529             /* If a current local cell size is smaller than the requested
4530              * cut-off, we could still fix it, but this gets very complicated.
4531              * Without fixing here, we might actually need more checks.
4532              */
4533             if ((dd->comm->cell_x1[dim] - dd->comm->cell_x0[dim])*ddbox.skew_fac[dim]*dd->comm->cd[d].np_dlb < cutoff_req)
4534             {
4535                 LocallyLimited = 1;
4536             }
4537         }
4538     }
4539
4540     if (!isDlbDisabled(dd->comm))
4541     {
4542         /* If DLB is not active yet, we don't need to check the grid jumps.
4543          * Actually we shouldn't, because then the grid jump data is not set.
4544          */
4545         if (isDlbOn(dd->comm) &&
4546             check_grid_jump(0, dd, cutoff_req, &ddbox, FALSE))
4547         {
4548             LocallyLimited = 1;
4549         }
4550
4551         gmx_sumi(1, &LocallyLimited, cr);
4552
4553         if (LocallyLimited > 0)
4554         {
4555             return FALSE;
4556         }
4557     }
4558
4559     return TRUE;
4560 }
4561
4562 gmx_bool change_dd_cutoff(t_commrec *cr, t_state *state, const t_inputrec *ir,
4563                           real cutoff_req)
4564 {
4565     gmx_bool bCutoffAllowed;
4566
4567     bCutoffAllowed = test_dd_cutoff(cr, state, ir, cutoff_req);
4568
4569     if (bCutoffAllowed)
4570     {
4571         cr->dd->comm->cutoff = cutoff_req;
4572     }
4573
4574     return bCutoffAllowed;
4575 }
4576
4577 void set_dd_dlb_max_cutoff(t_commrec *cr, real cutoff)
4578 {
4579     gmx_domdec_comm_t *comm;
4580
4581     comm = cr->dd->comm;
4582
4583     /* Turn on the DLB limiting (might have been on already) */
4584     comm->bPMELoadBalDLBLimits = TRUE;
4585
4586     /* Change the cut-off limit */
4587     comm->PMELoadBal_max_cutoff = cutoff;
4588
4589     if (debug)
4590     {
4591         fprintf(debug, "PME load balancing set a limit to the DLB staggering such that a %f cut-off will continue to fit\n",
4592                 comm->PMELoadBal_max_cutoff);
4593     }
4594 }
4595
4596 /* Sets whether we should later check the load imbalance data, so that
4597  * we can trigger dynamic load balancing if enough imbalance has
4598  * arisen.
4599  *
4600  * Used after PME load balancing unlocks DLB, so that the check
4601  * whether DLB will be useful can happen immediately.
4602  */
4603 static void dd_dlb_set_should_check_whether_to_turn_dlb_on(gmx_domdec_t *dd, gmx_bool bValue)
4604 {
4605     if (dd->comm->dlbState == edlbsOffCanTurnOn)
4606     {
4607         dd->comm->bCheckWhetherToTurnDlbOn = bValue;
4608
4609         if (bValue == TRUE)
4610         {
4611             /* Store the DD partitioning count, so we can ignore cycle counts
4612              * over the next nstlist steps, which are often slower.
4613              */
4614             dd->comm->ddPartioningCountFirstDlbOff = dd->ddp_count;
4615         }
4616     }
4617 }
4618
4619 /* Returns if we should check whether there has been enough load
4620  * imbalance to trigger dynamic load balancing.
4621  */
4622 static gmx_bool dd_dlb_get_should_check_whether_to_turn_dlb_on(gmx_domdec_t *dd)
4623 {
4624     if (dd->comm->dlbState != edlbsOffCanTurnOn)
4625     {
4626         return FALSE;
4627     }
4628
4629     if (dd->ddp_count <= dd->comm->ddPartioningCountFirstDlbOff)
4630     {
4631         /* We ignore the first nstlist steps at the start of the run
4632          * or after PME load balancing or after turning DLB off, since
4633          * these often have extra allocation or cache miss overhead.
4634          */
4635         return FALSE;
4636     }
4637
4638     if (dd->comm->cycl_n[ddCyclStep] == 0)
4639     {
4640         /* We can have zero timed steps when dd_partition_system is called
4641          * more than once at the same step, e.g. with replica exchange.
4642          * Turning on DLB would trigger an assertion failure later, but is
4643          * also useless right after exchanging replicas.
4644          */
4645         return FALSE;
4646     }
4647
4648     /* We should check whether we should use DLB directly after
4649      * unlocking DLB. */
4650     if (dd->comm->bCheckWhetherToTurnDlbOn)
4651     {
4652         /* This flag was set when the PME load-balancing routines
4653            unlocked DLB, and should now be cleared. */
4654         dd_dlb_set_should_check_whether_to_turn_dlb_on(dd, FALSE);
4655         return TRUE;
4656     }
4657     /* We check whether we should use DLB every c_checkTurnDlbOnInterval
4658      * partitionings (we do not do this every partioning, so that we
4659      * avoid excessive communication). */
4660     if (dd->comm->n_load_have % c_checkTurnDlbOnInterval == c_checkTurnDlbOnInterval - 1)
4661     {
4662         return TRUE;
4663     }
4664
4665     return FALSE;
4666 }
4667
4668 gmx_bool dd_dlb_is_on(const gmx_domdec_t *dd)
4669 {
4670     return isDlbOn(dd->comm);
4671 }
4672
4673 gmx_bool dd_dlb_is_locked(const gmx_domdec_t *dd)
4674 {
4675     return (dd->comm->dlbState == edlbsOffTemporarilyLocked);
4676 }
4677
4678 void dd_dlb_lock(gmx_domdec_t *dd)
4679 {
4680     /* We can only lock the DLB when it is set to auto, otherwise don't do anything */
4681     if (dd->comm->dlbState == edlbsOffCanTurnOn)
4682     {
4683         dd->comm->dlbState = edlbsOffTemporarilyLocked;
4684     }
4685 }
4686
4687 void dd_dlb_unlock(gmx_domdec_t *dd)
4688 {
4689     /* We can only lock the DLB when it is set to auto, otherwise don't do anything */
4690     if (dd->comm->dlbState == edlbsOffTemporarilyLocked)
4691     {
4692         dd->comm->dlbState = edlbsOffCanTurnOn;
4693         dd_dlb_set_should_check_whether_to_turn_dlb_on(dd, TRUE);
4694     }
4695 }
4696
4697 static void merge_cg_buffers(int ncell,
4698                              gmx_domdec_comm_dim_t *cd, int pulse,
4699                              int  *ncg_cell,
4700                              gmx::ArrayRef<int> index_gl,
4701                              int  *recv_i,
4702                              rvec *cg_cm,    rvec *recv_vr,
4703                              gmx::ArrayRef<int> cgindex,
4704                              cginfo_mb_t *cginfo_mb, int *cginfo)
4705 {
4706     gmx_domdec_ind_t *ind, *ind_p;
4707     int               p, cell, c, cg, cg0, cg1, cg_gl, nat;
4708     int               shift, shift_at;
4709
4710     ind = &cd->ind[pulse];
4711
4712     /* First correct the already stored data */
4713     shift = ind->nrecv[ncell];
4714     for (cell = ncell-1; cell >= 0; cell--)
4715     {
4716         shift -= ind->nrecv[cell];
4717         if (shift > 0)
4718         {
4719             /* Move the cg's present from previous grid pulses */
4720             cg0                = ncg_cell[ncell+cell];
4721             cg1                = ncg_cell[ncell+cell+1];
4722             cgindex[cg1+shift] = cgindex[cg1];
4723             for (cg = cg1-1; cg >= cg0; cg--)
4724             {
4725                 index_gl[cg+shift] = index_gl[cg];
4726                 copy_rvec(cg_cm[cg], cg_cm[cg+shift]);
4727                 cgindex[cg+shift] = cgindex[cg];
4728                 cginfo[cg+shift]  = cginfo[cg];
4729             }
4730             /* Correct the already stored send indices for the shift */
4731             for (p = 1; p <= pulse; p++)
4732             {
4733                 ind_p = &cd->ind[p];
4734                 cg0   = 0;
4735                 for (c = 0; c < cell; c++)
4736                 {
4737                     cg0 += ind_p->nsend[c];
4738                 }
4739                 cg1 = cg0 + ind_p->nsend[cell];
4740                 for (cg = cg0; cg < cg1; cg++)
4741                 {
4742                     ind_p->index[cg] += shift;
4743                 }
4744             }
4745         }
4746     }
4747
4748     /* Merge in the communicated buffers */
4749     shift    = 0;
4750     shift_at = 0;
4751     cg0      = 0;
4752     for (cell = 0; cell < ncell; cell++)
4753     {
4754         cg1 = ncg_cell[ncell+cell+1] + shift;
4755         if (shift_at > 0)
4756         {
4757             /* Correct the old cg indices */
4758             for (cg = ncg_cell[ncell+cell]; cg < cg1; cg++)
4759             {
4760                 cgindex[cg+1] += shift_at;
4761             }
4762         }
4763         for (cg = 0; cg < ind->nrecv[cell]; cg++)
4764         {
4765             /* Copy this charge group from the buffer */
4766             index_gl[cg1] = recv_i[cg0];
4767             copy_rvec(recv_vr[cg0], cg_cm[cg1]);
4768             /* Add it to the cgindex */
4769             cg_gl          = index_gl[cg1];
4770             cginfo[cg1]    = ddcginfo(cginfo_mb, cg_gl);
4771             nat            = GET_CGINFO_NATOMS(cginfo[cg1]);
4772             cgindex[cg1+1] = cgindex[cg1] + nat;
4773             cg0++;
4774             cg1++;
4775             shift_at += nat;
4776         }
4777         shift                 += ind->nrecv[cell];
4778         ncg_cell[ncell+cell+1] = cg1;
4779     }
4780 }
4781
4782 static void make_cell2at_index(gmx_domdec_comm_dim_t *cd,
4783                                int                    nzone,
4784                                int                    cg0,
4785                                const BlockRanges     &atomGroups)
4786 {
4787     /* Store the atom block boundaries for easy copying of communication buffers
4788      */
4789     int cg = cg0;
4790     for (int zone = 0; zone < nzone; zone++)
4791     {
4792         for (gmx_domdec_ind_t &ind : cd->ind)
4793         {
4794             ind.cell2at0[zone]  = atomGroups.index[cg];
4795             cg                 += ind.nrecv[zone];
4796             ind.cell2at1[zone]  = atomGroups.index[cg];
4797         }
4798     }
4799 }
4800
4801 static gmx_bool missing_link(t_blocka *link, int cg_gl, char *bLocalCG)
4802 {
4803     int      i;
4804     gmx_bool bMiss;
4805
4806     bMiss = FALSE;
4807     for (i = link->index[cg_gl]; i < link->index[cg_gl+1]; i++)
4808     {
4809         if (!bLocalCG[link->a[i]])
4810         {
4811             bMiss = TRUE;
4812         }
4813     }
4814
4815     return bMiss;
4816 }
4817
4818 /* Domain corners for communication, a maximum of 4 i-zones see a j domain */
4819 typedef struct {
4820     real c[DIM][4]; /* the corners for the non-bonded communication */
4821     real cr0;       /* corner for rounding */
4822     real cr1[4];    /* corners for rounding */
4823     real bc[DIM];   /* corners for bounded communication */
4824     real bcr1;      /* corner for rounding for bonded communication */
4825 } dd_corners_t;
4826
4827 /* Determine the corners of the domain(s) we are communicating with */
4828 static void
4829 set_dd_corners(const gmx_domdec_t *dd,
4830                int dim0, int dim1, int dim2,
4831                gmx_bool bDistMB,
4832                dd_corners_t *c)
4833 {
4834     const gmx_domdec_comm_t  *comm;
4835     const gmx_domdec_zones_t *zones;
4836     int i, j;
4837
4838     comm = dd->comm;
4839
4840     zones = &comm->zones;
4841
4842     /* Keep the compiler happy */
4843     c->cr0  = 0;
4844     c->bcr1 = 0;
4845
4846     /* The first dimension is equal for all cells */
4847     c->c[0][0] = comm->cell_x0[dim0];
4848     if (bDistMB)
4849     {
4850         c->bc[0] = c->c[0][0];
4851     }
4852     if (dd->ndim >= 2)
4853     {
4854         dim1 = dd->dim[1];
4855         /* This cell row is only seen from the first row */
4856         c->c[1][0] = comm->cell_x0[dim1];
4857         /* All rows can see this row */
4858         c->c[1][1] = comm->cell_x0[dim1];
4859         if (isDlbOn(dd->comm))
4860         {
4861             c->c[1][1] = std::max(comm->cell_x0[dim1], comm->zone_d1[1].mch0);
4862             if (bDistMB)
4863             {
4864                 /* For the multi-body distance we need the maximum */
4865                 c->bc[1] = std::max(comm->cell_x0[dim1], comm->zone_d1[1].p1_0);
4866             }
4867         }
4868         /* Set the upper-right corner for rounding */
4869         c->cr0 = comm->cell_x1[dim0];
4870
4871         if (dd->ndim >= 3)
4872         {
4873             dim2 = dd->dim[2];
4874             for (j = 0; j < 4; j++)
4875             {
4876                 c->c[2][j] = comm->cell_x0[dim2];
4877             }
4878             if (isDlbOn(dd->comm))
4879             {
4880                 /* Use the maximum of the i-cells that see a j-cell */
4881                 for (i = 0; i < zones->nizone; i++)
4882                 {
4883                     for (j = zones->izone[i].j0; j < zones->izone[i].j1; j++)
4884                     {
4885                         if (j >= 4)
4886                         {
4887                             c->c[2][j-4] =
4888                                 std::max(c->c[2][j-4],
4889                                          comm->zone_d2[zones->shift[i][dim0]][zones->shift[i][dim1]].mch0);
4890                         }
4891                     }
4892                 }
4893                 if (bDistMB)
4894                 {
4895                     /* For the multi-body distance we need the maximum */
4896                     c->bc[2] = comm->cell_x0[dim2];
4897                     for (i = 0; i < 2; i++)
4898                     {
4899                         for (j = 0; j < 2; j++)
4900                         {
4901                             c->bc[2] = std::max(c->bc[2], comm->zone_d2[i][j].p1_0);
4902                         }
4903                     }
4904                 }
4905             }
4906
4907             /* Set the upper-right corner for rounding */
4908             /* Cell (0,0,0) and cell (1,0,0) can see cell 4 (0,1,1)
4909              * Only cell (0,0,0) can see cell 7 (1,1,1)
4910              */
4911             c->cr1[0] = comm->cell_x1[dim1];
4912             c->cr1[3] = comm->cell_x1[dim1];
4913             if (isDlbOn(dd->comm))
4914             {
4915                 c->cr1[0] = std::max(comm->cell_x1[dim1], comm->zone_d1[1].mch1);
4916                 if (bDistMB)
4917                 {
4918                     /* For the multi-body distance we need the maximum */
4919                     c->bcr1 = std::max(comm->cell_x1[dim1], comm->zone_d1[1].p1_1);
4920                 }
4921             }
4922         }
4923     }
4924 }
4925
4926 /* Add the atom groups we need to send in this pulse from this zone to
4927  * \p localAtomGroups and \p work
4928  */
4929 static void
4930 get_zone_pulse_cgs(gmx_domdec_t *dd,
4931                    int zonei, int zone,
4932                    int cg0, int cg1,
4933                    gmx::ArrayRef<const int> globalAtomGroupIndices,
4934                    const gmx::BlockRanges &atomGroups,
4935                    int dim, int dim_ind,
4936                    int dim0, int dim1, int dim2,
4937                    real r_comm2, real r_bcomm2,
4938                    matrix box,
4939                    bool distanceIsTriclinic,
4940                    rvec *normal,
4941                    real skew_fac2_d, real skew_fac_01,
4942                    rvec *v_d, rvec *v_0, rvec *v_1,
4943                    const dd_corners_t *c,
4944                    rvec sf2_round,
4945                    gmx_bool bDistBonded,
4946                    gmx_bool bBondComm,
4947                    gmx_bool bDist2B,
4948                    gmx_bool bDistMB,
4949                    rvec *cg_cm,
4950                    int *cginfo,
4951                    std::vector<int>     *localAtomGroups,
4952                    dd_comm_setup_work_t *work)
4953 {
4954     gmx_domdec_comm_t *comm;
4955     gmx_bool           bScrew;
4956     gmx_bool           bDistMB_pulse;
4957     int                cg, i;
4958     real               r2, rb2, r, tric_sh;
4959     rvec               rn, rb;
4960     int                dimd;
4961     int                nsend_z, nat;
4962
4963     comm = dd->comm;
4964
4965     bScrew = (dd->bScrewPBC && dim == XX);
4966
4967     bDistMB_pulse = (bDistMB && bDistBonded);
4968
4969     /* Unpack the work data */
4970     std::vector<int>       &ibuf = work->atomGroupBuffer;
4971     std::vector<gmx::RVec> &vbuf = work->positionBuffer;
4972     nsend_z                      = 0;
4973     nat                          = work->nat;
4974
4975     for (cg = cg0; cg < cg1; cg++)
4976     {
4977         r2  = 0;
4978         rb2 = 0;
4979         if (!distanceIsTriclinic)
4980         {
4981             /* Rectangular direction, easy */
4982             r = cg_cm[cg][dim] - c->c[dim_ind][zone];
4983             if (r > 0)
4984             {
4985                 r2 += r*r;
4986             }
4987             if (bDistMB_pulse)
4988             {
4989                 r = cg_cm[cg][dim] - c->bc[dim_ind];
4990                 if (r > 0)
4991                 {
4992                     rb2 += r*r;
4993                 }
4994             }
4995             /* Rounding gives at most a 16% reduction
4996              * in communicated atoms
4997              */
4998             if (dim_ind >= 1 && (zonei == 1 || zonei == 2))
4999             {
5000                 r = cg_cm[cg][dim0] - c->cr0;
5001                 /* This is the first dimension, so always r >= 0 */
5002                 r2 += r*r;
5003                 if (bDistMB_pulse)
5004                 {
5005                     rb2 += r*r;
5006                 }
5007             }
5008             if (dim_ind == 2 && (zonei == 2 || zonei == 3))
5009             {
5010                 r = cg_cm[cg][dim1] - c->cr1[zone];
5011                 if (r > 0)
5012                 {
5013                     r2 += r*r;
5014                 }
5015                 if (bDistMB_pulse)
5016                 {
5017                     r = cg_cm[cg][dim1] - c->bcr1;
5018                     if (r > 0)
5019                     {
5020                         rb2 += r*r;
5021                     }
5022                 }
5023             }
5024         }
5025         else
5026         {
5027             /* Triclinic direction, more complicated */
5028             clear_rvec(rn);
5029             clear_rvec(rb);
5030             /* Rounding, conservative as the skew_fac multiplication
5031              * will slightly underestimate the distance.
5032              */
5033             if (dim_ind >= 1 && (zonei == 1 || zonei == 2))
5034             {
5035                 rn[dim0] = cg_cm[cg][dim0] - c->cr0;
5036                 for (i = dim0+1; i < DIM; i++)
5037                 {
5038                     rn[dim0] -= cg_cm[cg][i]*v_0[i][dim0];
5039                 }
5040                 r2 = rn[dim0]*rn[dim0]*sf2_round[dim0];
5041                 if (bDistMB_pulse)
5042                 {
5043                     rb[dim0] = rn[dim0];
5044                     rb2      = r2;
5045                 }
5046                 /* Take care that the cell planes along dim0 might not
5047                  * be orthogonal to those along dim1 and dim2.
5048                  */
5049                 for (i = 1; i <= dim_ind; i++)
5050                 {
5051                     dimd = dd->dim[i];
5052                     if (normal[dim0][dimd] > 0)
5053                     {
5054                         rn[dimd] -= rn[dim0]*normal[dim0][dimd];
5055                         if (bDistMB_pulse)
5056                         {
5057                             rb[dimd] -= rb[dim0]*normal[dim0][dimd];
5058                         }
5059                     }
5060                 }
5061             }
5062             if (dim_ind == 2 && (zonei == 2 || zonei == 3))
5063             {
5064                 rn[dim1] += cg_cm[cg][dim1] - c->cr1[zone];
5065                 tric_sh   = 0;
5066                 for (i = dim1+1; i < DIM; i++)
5067                 {
5068                     tric_sh -= cg_cm[cg][i]*v_1[i][dim1];
5069                 }
5070                 rn[dim1] += tric_sh;
5071                 if (rn[dim1] > 0)
5072                 {
5073                     r2 += rn[dim1]*rn[dim1]*sf2_round[dim1];
5074                     /* Take care of coupling of the distances
5075                      * to the planes along dim0 and dim1 through dim2.
5076                      */
5077                     r2 -= rn[dim0]*rn[dim1]*skew_fac_01;
5078                     /* Take care that the cell planes along dim1
5079                      * might not be orthogonal to that along dim2.
5080                      */
5081                     if (normal[dim1][dim2] > 0)
5082                     {
5083                         rn[dim2] -= rn[dim1]*normal[dim1][dim2];
5084                     }
5085                 }
5086                 if (bDistMB_pulse)
5087                 {
5088                     rb[dim1] +=
5089                         cg_cm[cg][dim1] - c->bcr1 + tric_sh;
5090                     if (rb[dim1] > 0)
5091                     {
5092                         rb2 += rb[dim1]*rb[dim1]*sf2_round[dim1];
5093                         /* Take care of coupling of the distances
5094                          * to the planes along dim0 and dim1 through dim2.
5095                          */
5096                         rb2 -= rb[dim0]*rb[dim1]*skew_fac_01;
5097                         /* Take care that the cell planes along dim1
5098                          * might not be orthogonal to that along dim2.
5099                          */
5100                         if (normal[dim1][dim2] > 0)
5101                         {
5102                             rb[dim2] -= rb[dim1]*normal[dim1][dim2];
5103                         }
5104                     }
5105                 }
5106             }
5107             /* The distance along the communication direction */
5108             rn[dim] += cg_cm[cg][dim] - c->c[dim_ind][zone];
5109             tric_sh  = 0;
5110             for (i = dim+1; i < DIM; i++)
5111             {
5112                 tric_sh -= cg_cm[cg][i]*v_d[i][dim];
5113             }
5114             rn[dim] += tric_sh;
5115             if (rn[dim] > 0)
5116             {
5117                 r2 += rn[dim]*rn[dim]*skew_fac2_d;
5118                 /* Take care of coupling of the distances
5119                  * to the planes along dim0 and dim1 through dim2.
5120                  */
5121                 if (dim_ind == 1 && zonei == 1)
5122                 {
5123                     r2 -= rn[dim0]*rn[dim]*skew_fac_01;
5124                 }
5125             }
5126             if (bDistMB_pulse)
5127             {
5128                 clear_rvec(rb);
5129                 rb[dim] += cg_cm[cg][dim] - c->bc[dim_ind] + tric_sh;
5130                 if (rb[dim] > 0)
5131                 {
5132                     rb2 += rb[dim]*rb[dim]*skew_fac2_d;
5133                     /* Take care of coupling of the distances
5134                      * to the planes along dim0 and dim1 through dim2.
5135                      */
5136                     if (dim_ind == 1 && zonei == 1)
5137                     {
5138                         rb2 -= rb[dim0]*rb[dim]*skew_fac_01;
5139                     }
5140                 }
5141             }
5142         }
5143
5144         if (r2 < r_comm2 ||
5145             (bDistBonded &&
5146              ((bDistMB && rb2 < r_bcomm2) ||
5147               (bDist2B && r2  < r_bcomm2)) &&
5148              (!bBondComm ||
5149               (GET_CGINFO_BOND_INTER(cginfo[cg]) &&
5150                missing_link(comm->cglink, globalAtomGroupIndices[cg],
5151                             comm->bLocalCG)))))
5152         {
5153             /* Store the local and global atom group indices and position */
5154             localAtomGroups->push_back(cg);
5155             ibuf.push_back(globalAtomGroupIndices[cg]);
5156             nsend_z++;
5157
5158             rvec posPbc;
5159             if (dd->ci[dim] == 0)
5160             {
5161                 /* Correct cg_cm for pbc */
5162                 rvec_add(cg_cm[cg], box[dim], posPbc);
5163                 if (bScrew)
5164                 {
5165                     posPbc[YY] = box[YY][YY] - posPbc[YY];
5166                     posPbc[ZZ] = box[ZZ][ZZ] - posPbc[ZZ];
5167                 }
5168             }
5169             else
5170             {
5171                 copy_rvec(cg_cm[cg], posPbc);
5172             }
5173             vbuf.emplace_back(posPbc[XX], posPbc[YY], posPbc[ZZ]);
5174
5175             nat += atomGroups.index[cg+1] - atomGroups.index[cg];
5176         }
5177     }
5178
5179     work->nat        = nat;
5180     work->nsend_zone = nsend_z;
5181 }
5182
5183 static void clearCommSetupData(dd_comm_setup_work_t *work)
5184 {
5185     work->localAtomGroupBuffer.clear();
5186     work->atomGroupBuffer.clear();
5187     work->positionBuffer.clear();
5188     work->nat        = 0;
5189     work->nsend_zone = 0;
5190 }
5191
5192 static void setup_dd_communication(gmx_domdec_t *dd,
5193                                    matrix box, gmx_ddbox_t *ddbox,
5194                                    t_forcerec *fr,
5195                                    t_state *state, PaddedRVecVector *f)
5196 {
5197     int                    dim_ind, dim, dim0, dim1, dim2, dimd, nat_tot;
5198     int                    nzone, nzone_send, zone, zonei, cg0, cg1;
5199     int                    c, i, cg, cg_gl, nrcg;
5200     int                   *zone_cg_range, pos_cg;
5201     gmx_domdec_comm_t     *comm;
5202     gmx_domdec_zones_t    *zones;
5203     gmx_domdec_comm_dim_t *cd;
5204     cginfo_mb_t           *cginfo_mb;
5205     gmx_bool               bBondComm, bDist2B, bDistMB, bDistBonded;
5206     real                   r_comm2, r_bcomm2;
5207     dd_corners_t           corners;
5208     rvec                  *cg_cm, *normal, *v_d, *v_0 = nullptr, *v_1 = nullptr;
5209     real                   skew_fac2_d, skew_fac_01;
5210     rvec                   sf2_round;
5211
5212     if (debug)
5213     {
5214         fprintf(debug, "Setting up DD communication\n");
5215     }
5216
5217     comm  = dd->comm;
5218
5219     if (comm->dth.empty())
5220     {
5221         /* Initialize the thread data.
5222          * This can not be done in init_domain_decomposition,
5223          * as the numbers of threads is determined later.
5224          */
5225         int numThreads = gmx_omp_nthreads_get(emntDomdec);
5226         comm->dth.resize(numThreads);
5227     }
5228
5229     switch (fr->cutoff_scheme)
5230     {
5231         case ecutsGROUP:
5232             cg_cm = fr->cg_cm;
5233             break;
5234         case ecutsVERLET:
5235             cg_cm = as_rvec_array(state->x.data());
5236             break;
5237         default:
5238             gmx_incons("unimplemented");
5239             cg_cm = nullptr;
5240     }
5241
5242     bBondComm = comm->bBondComm;
5243
5244     /* Do we need to determine extra distances for multi-body bondeds? */
5245     bDistMB = (comm->bInterCGMultiBody && isDlbOn(dd->comm) && dd->ndim > 1);
5246
5247     /* Do we need to determine extra distances for only two-body bondeds? */
5248     bDist2B = (bBondComm && !bDistMB);
5249
5250     r_comm2  = gmx::square(comm->cutoff);
5251     r_bcomm2 = gmx::square(comm->cutoff_mbody);
5252
5253     if (debug)
5254     {
5255         fprintf(debug, "bBondComm %d, r_bc %f\n", bBondComm, std::sqrt(r_bcomm2));
5256     }
5257
5258     zones = &comm->zones;
5259
5260     dim0 = dd->dim[0];
5261     dim1 = (dd->ndim >= 2 ? dd->dim[1] : -1);
5262     dim2 = (dd->ndim >= 3 ? dd->dim[2] : -1);
5263
5264     set_dd_corners(dd, dim0, dim1, dim2, bDistMB, &corners);
5265
5266     /* Triclinic stuff */
5267     normal      = ddbox->normal;
5268     skew_fac_01 = 0;
5269     if (dd->ndim >= 2)
5270     {
5271         v_0 = ddbox->v[dim0];
5272         if (ddbox->tric_dir[dim0] && ddbox->tric_dir[dim1])
5273         {
5274             /* Determine the coupling coefficient for the distances
5275              * to the cell planes along dim0 and dim1 through dim2.
5276              * This is required for correct rounding.
5277              */
5278             skew_fac_01 =
5279                 ddbox->v[dim0][dim1+1][dim0]*ddbox->v[dim1][dim1+1][dim1];
5280             if (debug)
5281             {
5282                 fprintf(debug, "\nskew_fac_01 %f\n", skew_fac_01);
5283             }
5284         }
5285     }
5286     if (dd->ndim >= 3)
5287     {
5288         v_1 = ddbox->v[dim1];
5289     }
5290
5291     zone_cg_range = zones->cg_range;
5292     cginfo_mb     = fr->cginfo_mb;
5293
5294     zone_cg_range[0]   = 0;
5295     zone_cg_range[1]   = dd->ncg_home;
5296     comm->zone_ncg1[0] = dd->ncg_home;
5297     pos_cg             = dd->ncg_home;
5298
5299     nat_tot = comm->atomRanges.numHomeAtoms();
5300     nzone   = 1;
5301     for (dim_ind = 0; dim_ind < dd->ndim; dim_ind++)
5302     {
5303         dim = dd->dim[dim_ind];
5304         cd  = &comm->cd[dim_ind];
5305
5306         /* Check if we need to compute triclinic distances along this dim */
5307         bool distanceIsTriclinic = false;
5308         for (i = 0; i <= dim_ind; i++)
5309         {
5310             if (ddbox->tric_dir[dd->dim[i]])
5311             {
5312                 distanceIsTriclinic = true;
5313             }
5314         }
5315
5316         if (dim >= ddbox->npbcdim && dd->ci[dim] == 0)
5317         {
5318             /* No pbc in this dimension, the first node should not comm. */
5319             nzone_send = 0;
5320         }
5321         else
5322         {
5323             nzone_send = nzone;
5324         }
5325
5326         v_d                = ddbox->v[dim];
5327         skew_fac2_d        = gmx::square(ddbox->skew_fac[dim]);
5328
5329         cd->receiveInPlace = true;
5330         for (int p = 0; p < cd->numPulses(); p++)
5331         {
5332             /* Only atoms communicated in the first pulse are used
5333              * for multi-body bonded interactions or for bBondComm.
5334              */
5335             bDistBonded = ((bDistMB || bDist2B) && p == 0);
5336
5337             gmx_domdec_ind_t *ind = &cd->ind[p];
5338
5339             /* Thread 0 writes in the global index array */
5340             ind->index.clear();
5341             clearCommSetupData(&comm->dth[0]);
5342
5343             for (zone = 0; zone < nzone_send; zone++)
5344             {
5345                 if (dim_ind > 0 && distanceIsTriclinic)
5346                 {
5347                     /* Determine slightly more optimized skew_fac's
5348                      * for rounding.
5349                      * This reduces the number of communicated atoms
5350                      * by about 10% for 3D DD of rhombic dodecahedra.
5351                      */
5352                     for (dimd = 0; dimd < dim; dimd++)
5353                     {
5354                         sf2_round[dimd] = 1;
5355                         if (ddbox->tric_dir[dimd])
5356                         {
5357                             for (i = dd->dim[dimd]+1; i < DIM; i++)
5358                             {
5359                                 /* If we are shifted in dimension i
5360                                  * and the cell plane is tilted forward
5361                                  * in dimension i, skip this coupling.
5362                                  */
5363                                 if (!(zones->shift[nzone+zone][i] &&
5364                                       ddbox->v[dimd][i][dimd] >= 0))
5365                                 {
5366                                     sf2_round[dimd] +=
5367                                         gmx::square(ddbox->v[dimd][i][dimd]);
5368                                 }
5369                             }
5370                             sf2_round[dimd] = 1/sf2_round[dimd];
5371                         }
5372                     }
5373                 }
5374
5375                 zonei = zone_perm[dim_ind][zone];
5376                 if (p == 0)
5377                 {
5378                     /* Here we permutate the zones to obtain a convenient order
5379                      * for neighbor searching
5380                      */
5381                     cg0 = zone_cg_range[zonei];
5382                     cg1 = zone_cg_range[zonei+1];
5383                 }
5384                 else
5385                 {
5386                     /* Look only at the cg's received in the previous grid pulse
5387                      */
5388                     cg1 = zone_cg_range[nzone+zone+1];
5389                     cg0 = cg1 - cd->ind[p-1].nrecv[zone];
5390                 }
5391
5392                 const int numThreads = static_cast<int>(comm->dth.size());
5393 #pragma omp parallel for num_threads(numThreads) schedule(static)
5394                 for (int th = 0; th < numThreads; th++)
5395                 {
5396                     try
5397                     {
5398                         dd_comm_setup_work_t &work = comm->dth[th];
5399
5400                         /* Retain data accumulated into buffers of thread 0 */
5401                         if (th > 0)
5402                         {
5403                             clearCommSetupData(&work);
5404                         }
5405
5406                         int cg0_th = cg0 + ((cg1 - cg0)* th   )/numThreads;
5407                         int cg1_th = cg0 + ((cg1 - cg0)*(th+1))/numThreads;
5408
5409                         /* Get the cg's for this pulse in this zone */
5410                         get_zone_pulse_cgs(dd, zonei, zone, cg0_th, cg1_th,
5411                                            dd->globalAtomGroupIndices,
5412                                            dd->atomGroups(),
5413                                            dim, dim_ind, dim0, dim1, dim2,
5414                                            r_comm2, r_bcomm2,
5415                                            box, distanceIsTriclinic,
5416                                            normal, skew_fac2_d, skew_fac_01,
5417                                            v_d, v_0, v_1, &corners, sf2_round,
5418                                            bDistBonded, bBondComm,
5419                                            bDist2B, bDistMB,
5420                                            cg_cm, fr->cginfo,
5421                                            th == 0 ? &ind->index : &work.localAtomGroupBuffer,
5422                                            &work);
5423                     }
5424                     GMX_CATCH_ALL_AND_EXIT_WITH_FATAL_ERROR;
5425                 } // END
5426
5427                 std::vector<int>       &atomGroups = comm->dth[0].atomGroupBuffer;
5428                 std::vector<gmx::RVec> &positions  = comm->dth[0].positionBuffer;
5429                 ind->nsend[zone]  = comm->dth[0].nsend_zone;
5430                 /* Append data of threads>=1 to the communication buffers */
5431                 for (int th = 1; th < numThreads; th++)
5432                 {
5433                     const dd_comm_setup_work_t &dth = comm->dth[th];
5434
5435                     ind->index.insert(ind->index.end(), dth.localAtomGroupBuffer.begin(), dth.localAtomGroupBuffer.end());
5436                     atomGroups.insert(atomGroups.end(), dth.atomGroupBuffer.begin(), dth.atomGroupBuffer.end());
5437                     positions.insert(positions.end(), dth.positionBuffer.begin(), dth.positionBuffer.end());
5438                     comm->dth[0].nat += dth.nat;
5439                     ind->nsend[zone] += dth.nsend_zone;
5440                 }
5441             }
5442             /* Clear the counts in case we do not have pbc */
5443             for (zone = nzone_send; zone < nzone; zone++)
5444             {
5445                 ind->nsend[zone] = 0;
5446             }
5447             ind->nsend[nzone]     = ind->index.size();
5448             ind->nsend[nzone + 1] = comm->dth[0].nat;
5449             /* Communicate the number of cg's and atoms to receive */
5450             ddSendrecv(dd, dim_ind, dddirBackward,
5451                        ind->nsend, nzone+2,
5452                        ind->nrecv, nzone+2);
5453
5454             if (p > 0)
5455             {
5456                 /* We can receive in place if only the last zone is not empty */
5457                 for (zone = 0; zone < nzone-1; zone++)
5458                 {
5459                     if (ind->nrecv[zone] > 0)
5460                     {
5461                         cd->receiveInPlace = false;
5462                     }
5463                 }
5464             }
5465
5466             int receiveBufferSize = 0;
5467             if (!cd->receiveInPlace)
5468             {
5469                 receiveBufferSize = ind->nrecv[nzone];
5470             }
5471             /* These buffer are actually only needed with in-place */
5472             DDBufferAccess<int>       globalAtomGroupBuffer(comm->intBuffer, receiveBufferSize);
5473             DDBufferAccess<gmx::RVec> rvecBuffer(comm->rvecBuffer, receiveBufferSize);
5474
5475             dd_comm_setup_work_t     &work = comm->dth[0];
5476
5477             /* Make space for the global cg indices */
5478             int numAtomGroupsNew = pos_cg + ind->nrecv[nzone];
5479             dd->globalAtomGroupIndices.resize(numAtomGroupsNew);
5480             dd->atomGroups_.index.resize(numAtomGroupsNew + 1);
5481             /* Communicate the global cg indices */
5482             gmx::ArrayRef<int> integerBufferRef;
5483             if (cd->receiveInPlace)
5484             {
5485                 integerBufferRef = gmx::arrayRefFromArray(dd->globalAtomGroupIndices.data() + pos_cg, ind->nrecv[nzone]);
5486             }
5487             else
5488             {
5489                 integerBufferRef = globalAtomGroupBuffer.buffer;
5490             }
5491             ddSendrecv<int>(dd, dim_ind, dddirBackward,
5492                             work.atomGroupBuffer, integerBufferRef);
5493
5494             /* Make space for cg_cm */
5495             dd_check_alloc_ncg(fr, state, f, pos_cg + ind->nrecv[nzone]);
5496             if (fr->cutoff_scheme == ecutsGROUP)
5497             {
5498                 cg_cm = fr->cg_cm;
5499             }
5500             else
5501             {
5502                 cg_cm = as_rvec_array(state->x.data());
5503             }
5504             /* Communicate cg_cm */
5505             gmx::ArrayRef<gmx::RVec> rvecBufferRef;
5506             if (cd->receiveInPlace)
5507             {
5508                 rvecBufferRef = gmx::arrayRefFromArray(reinterpret_cast<gmx::RVec *>(cg_cm + pos_cg), ind->nrecv[nzone]);
5509             }
5510             else
5511             {
5512                 rvecBufferRef = rvecBuffer.buffer;
5513             }
5514             ddSendrecv<gmx::RVec>(dd, dim_ind, dddirBackward,
5515                                   work.positionBuffer, rvecBufferRef);
5516
5517             /* Make the charge group index */
5518             if (cd->receiveInPlace)
5519             {
5520                 zone = (p == 0 ? 0 : nzone - 1);
5521                 while (zone < nzone)
5522                 {
5523                     for (cg = 0; cg < ind->nrecv[zone]; cg++)
5524                     {
5525                         cg_gl                              = dd->globalAtomGroupIndices[pos_cg];
5526                         fr->cginfo[pos_cg]                 = ddcginfo(cginfo_mb, cg_gl);
5527                         nrcg                               = GET_CGINFO_NATOMS(fr->cginfo[pos_cg]);
5528                         dd->atomGroups_.index[pos_cg + 1]  = dd->atomGroups_.index[pos_cg] + nrcg;
5529                         if (bBondComm)
5530                         {
5531                             /* Update the charge group presence,
5532                              * so we can use it in the next pass of the loop.
5533                              */
5534                             comm->bLocalCG[cg_gl] = TRUE;
5535                         }
5536                         pos_cg++;
5537                     }
5538                     if (p == 0)
5539                     {
5540                         comm->zone_ncg1[nzone+zone] = ind->nrecv[zone];
5541                     }
5542                     zone++;
5543                     zone_cg_range[nzone+zone] = pos_cg;
5544                 }
5545             }
5546             else
5547             {
5548                 /* This part of the code is never executed with bBondComm. */
5549                 merge_cg_buffers(nzone, cd, p, zone_cg_range,
5550                                  dd->globalAtomGroupIndices, integerBufferRef.data(),
5551                                  cg_cm, as_rvec_array(rvecBufferRef.data()),
5552                                  dd->atomGroups_.index,
5553                                  fr->cginfo_mb, fr->cginfo);
5554                 pos_cg += ind->nrecv[nzone];
5555             }
5556             nat_tot += ind->nrecv[nzone+1];
5557         }
5558         if (!cd->receiveInPlace)
5559         {
5560             /* Store the atom block for easy copying of communication buffers */
5561             make_cell2at_index(cd, nzone, zone_cg_range[nzone], dd->atomGroups());
5562         }
5563         nzone += nzone;
5564     }
5565
5566     comm->atomRanges.setEnd(DDAtomRanges::Type::Zones, nat_tot);
5567
5568     if (!bBondComm)
5569     {
5570         /* We don't need to update cginfo, since that was alrady done above.
5571          * So we pass NULL for the forcerec.
5572          */
5573         dd_set_cginfo(dd->globalAtomGroupIndices,
5574                       dd->ncg_home, dd->globalAtomGroupIndices.size(),
5575                       nullptr, comm->bLocalCG);
5576     }
5577
5578     if (debug)
5579     {
5580         fprintf(debug, "Finished setting up DD communication, zones:");
5581         for (c = 0; c < zones->n; c++)
5582         {
5583             fprintf(debug, " %d", zones->cg_range[c+1]-zones->cg_range[c]);
5584         }
5585         fprintf(debug, "\n");
5586     }
5587 }
5588
5589 static void set_cg_boundaries(gmx_domdec_zones_t *zones)
5590 {
5591     int c;
5592
5593     for (c = 0; c < zones->nizone; c++)
5594     {
5595         zones->izone[c].cg1  = zones->cg_range[c+1];
5596         zones->izone[c].jcg0 = zones->cg_range[zones->izone[c].j0];
5597         zones->izone[c].jcg1 = zones->cg_range[zones->izone[c].j1];
5598     }
5599 }
5600
5601 /* \brief Set zone dimensions for zones \p zone_start to \p zone_end-1
5602  *
5603  * Also sets the atom density for the home zone when \p zone_start=0.
5604  * For this \p numMovedChargeGroupsInHomeZone needs to be passed to tell
5605  * how many charge groups will move but are still part of the current range.
5606  * \todo When converting domdec to use proper classes, all these variables
5607  *       should be private and a method should return the correct count
5608  *       depending on an internal state.
5609  *
5610  * \param[in,out] dd          The domain decomposition struct
5611  * \param[in]     box         The box
5612  * \param[in]     ddbox       The domain decomposition box struct
5613  * \param[in]     zone_start  The start of the zone range to set sizes for
5614  * \param[in]     zone_end    The end of the zone range to set sizes for
5615  * \param[in]     numMovedChargeGroupsInHomeZone  The number of charge groups in the home zone that should moved but are still present in dd->comm->zones.cg_range
5616  */
5617 static void set_zones_size(gmx_domdec_t *dd,
5618                            matrix box, const gmx_ddbox_t *ddbox,
5619                            int zone_start, int zone_end,
5620                            int numMovedChargeGroupsInHomeZone)
5621 {
5622     gmx_domdec_comm_t  *comm;
5623     gmx_domdec_zones_t *zones;
5624     gmx_bool            bDistMB;
5625     int                 z, zi, d, dim;
5626     real                rcs, rcmbs;
5627     int                 i, j;
5628     real                vol;
5629
5630     comm = dd->comm;
5631
5632     zones = &comm->zones;
5633
5634     /* Do we need to determine extra distances for multi-body bondeds? */
5635     bDistMB = (comm->bInterCGMultiBody && isDlbOn(dd->comm) && dd->ndim > 1);
5636
5637     for (z = zone_start; z < zone_end; z++)
5638     {
5639         /* Copy cell limits to zone limits.
5640          * Valid for non-DD dims and non-shifted dims.
5641          */
5642         copy_rvec(comm->cell_x0, zones->size[z].x0);
5643         copy_rvec(comm->cell_x1, zones->size[z].x1);
5644     }
5645
5646     for (d = 0; d < dd->ndim; d++)
5647     {
5648         dim = dd->dim[d];
5649
5650         for (z = 0; z < zones->n; z++)
5651         {
5652             /* With a staggered grid we have different sizes
5653              * for non-shifted dimensions.
5654              */
5655             if (isDlbOn(dd->comm) && zones->shift[z][dim] == 0)
5656             {
5657                 if (d == 1)
5658                 {
5659                     zones->size[z].x0[dim] = comm->zone_d1[zones->shift[z][dd->dim[d-1]]].min0;
5660                     zones->size[z].x1[dim] = comm->zone_d1[zones->shift[z][dd->dim[d-1]]].max1;
5661                 }
5662                 else if (d == 2)
5663                 {
5664                     zones->size[z].x0[dim] = comm->zone_d2[zones->shift[z][dd->dim[d-2]]][zones->shift[z][dd->dim[d-1]]].min0;
5665                     zones->size[z].x1[dim] = comm->zone_d2[zones->shift[z][dd->dim[d-2]]][zones->shift[z][dd->dim[d-1]]].max1;
5666                 }
5667             }
5668         }
5669
5670         rcs   = comm->cutoff;
5671         rcmbs = comm->cutoff_mbody;
5672         if (ddbox->tric_dir[dim])
5673         {
5674             rcs   /= ddbox->skew_fac[dim];
5675             rcmbs /= ddbox->skew_fac[dim];
5676         }
5677
5678         /* Set the lower limit for the shifted zone dimensions */
5679         for (z = zone_start; z < zone_end; z++)
5680         {
5681             if (zones->shift[z][dim] > 0)
5682             {
5683                 dim = dd->dim[d];
5684                 if (!isDlbOn(dd->comm) || d == 0)
5685                 {
5686                     zones->size[z].x0[dim] = comm->cell_x1[dim];
5687                     zones->size[z].x1[dim] = comm->cell_x1[dim] + rcs;
5688                 }
5689                 else
5690                 {
5691                     /* Here we take the lower limit of the zone from
5692                      * the lowest domain of the zone below.
5693                      */
5694                     if (z < 4)
5695                     {
5696                         zones->size[z].x0[dim] =
5697                             comm->zone_d1[zones->shift[z][dd->dim[d-1]]].min1;
5698                     }
5699                     else
5700                     {
5701                         if (d == 1)
5702                         {
5703                             zones->size[z].x0[dim] =
5704                                 zones->size[zone_perm[2][z-4]].x0[dim];
5705                         }
5706                         else
5707                         {
5708                             zones->size[z].x0[dim] =
5709                                 comm->zone_d2[zones->shift[z][dd->dim[d-2]]][zones->shift[z][dd->dim[d-1]]].min1;
5710                         }
5711                     }
5712                     /* A temporary limit, is updated below */
5713                     zones->size[z].x1[dim] = zones->size[z].x0[dim];
5714
5715                     if (bDistMB)
5716                     {
5717                         for (zi = 0; zi < zones->nizone; zi++)
5718                         {
5719                             if (zones->shift[zi][dim] == 0)
5720                             {
5721                                 /* This takes the whole zone into account.
5722                                  * With multiple pulses this will lead
5723                                  * to a larger zone then strictly necessary.
5724                                  */
5725                                 zones->size[z].x1[dim] = std::max(zones->size[z].x1[dim],
5726                                                                   zones->size[zi].x1[dim]+rcmbs);
5727                             }
5728                         }
5729                     }
5730                 }
5731             }
5732         }
5733
5734         /* Loop over the i-zones to set the upper limit of each
5735          * j-zone they see.
5736          */
5737         for (zi = 0; zi < zones->nizone; zi++)
5738         {
5739             if (zones->shift[zi][dim] == 0)
5740             {
5741                 /* We should only use zones up to zone_end */
5742                 int jZoneEnd = std::min(zones->izone[zi].j1, zone_end);
5743                 for (z = zones->izone[zi].j0; z < jZoneEnd; z++)
5744                 {
5745                     if (zones->shift[z][dim] > 0)
5746                     {
5747                         zones->size[z].x1[dim] = std::max(zones->size[z].x1[dim],
5748                                                           zones->size[zi].x1[dim]+rcs);
5749                     }
5750                 }
5751             }
5752         }
5753     }
5754
5755     for (z = zone_start; z < zone_end; z++)
5756     {
5757         /* Initialization only required to keep the compiler happy */
5758         rvec corner_min = {0, 0, 0}, corner_max = {0, 0, 0}, corner;
5759         int  nc, c;
5760
5761         /* To determine the bounding box for a zone we need to find
5762          * the extreme corners of 4, 2 or 1 corners.
5763          */
5764         nc = 1 << (ddbox->nboundeddim - 1);
5765
5766         for (c = 0; c < nc; c++)
5767         {
5768             /* Set up a zone corner at x=0, ignoring trilinic couplings */
5769             corner[XX] = 0;
5770             if ((c & 1) == 0)
5771             {
5772                 corner[YY] = zones->size[z].x0[YY];
5773             }
5774             else
5775             {
5776                 corner[YY] = zones->size[z].x1[YY];
5777             }
5778             if ((c & 2) == 0)
5779             {
5780                 corner[ZZ] = zones->size[z].x0[ZZ];
5781             }
5782             else
5783             {
5784                 corner[ZZ] = zones->size[z].x1[ZZ];
5785             }
5786             if (dd->ndim == 1 && dd->dim[0] < ZZ && ZZ < dd->npbcdim &&
5787                 box[ZZ][1 - dd->dim[0]] != 0)
5788             {
5789                 /* With 1D domain decomposition the cg's are not in
5790                  * the triclinic box, but triclinic x-y and rectangular y/x-z.
5791                  * Shift the corner of the z-vector back to along the box
5792                  * vector of dimension d, so it will later end up at 0 along d.
5793                  * This can affect the location of this corner along dd->dim[0]
5794                  * through the matrix operation below if box[d][dd->dim[0]]!=0.
5795                  */
5796                 int d = 1 - dd->dim[0];
5797
5798                 corner[d] -= corner[ZZ]*box[ZZ][d]/box[ZZ][ZZ];
5799             }
5800             /* Apply the triclinic couplings */
5801             assert(ddbox->npbcdim <= DIM);
5802             for (i = YY; i < ddbox->npbcdim; i++)
5803             {
5804                 for (j = XX; j < i; j++)
5805                 {
5806                     corner[j] += corner[i]*box[i][j]/box[i][i];
5807                 }
5808             }
5809             if (c == 0)
5810             {
5811                 copy_rvec(corner, corner_min);
5812                 copy_rvec(corner, corner_max);
5813             }
5814             else
5815             {
5816                 for (i = 0; i < DIM; i++)
5817                 {
5818                     corner_min[i] = std::min(corner_min[i], corner[i]);
5819                     corner_max[i] = std::max(corner_max[i], corner[i]);
5820                 }
5821             }
5822         }
5823         /* Copy the extreme cornes without offset along x */
5824         for (i = 0; i < DIM; i++)
5825         {
5826             zones->size[z].bb_x0[i] = corner_min[i];
5827             zones->size[z].bb_x1[i] = corner_max[i];
5828         }
5829         /* Add the offset along x */
5830         zones->size[z].bb_x0[XX] += zones->size[z].x0[XX];
5831         zones->size[z].bb_x1[XX] += zones->size[z].x1[XX];
5832     }
5833
5834     if (zone_start == 0)
5835     {
5836         vol = 1;
5837         for (dim = 0; dim < DIM; dim++)
5838         {
5839             vol *= zones->size[0].x1[dim] - zones->size[0].x0[dim];
5840         }
5841         zones->dens_zone0 = (zones->cg_range[1] - zones->cg_range[0] - numMovedChargeGroupsInHomeZone)/vol;
5842     }
5843
5844     if (debug)
5845     {
5846         for (z = zone_start; z < zone_end; z++)
5847         {
5848             fprintf(debug, "zone %d    %6.3f - %6.3f  %6.3f - %6.3f  %6.3f - %6.3f\n",
5849                     z,
5850                     zones->size[z].x0[XX], zones->size[z].x1[XX],
5851                     zones->size[z].x0[YY], zones->size[z].x1[YY],
5852                     zones->size[z].x0[ZZ], zones->size[z].x1[ZZ]);
5853             fprintf(debug, "zone %d bb %6.3f - %6.3f  %6.3f - %6.3f  %6.3f - %6.3f\n",
5854                     z,
5855                     zones->size[z].bb_x0[XX], zones->size[z].bb_x1[XX],
5856                     zones->size[z].bb_x0[YY], zones->size[z].bb_x1[YY],
5857                     zones->size[z].bb_x0[ZZ], zones->size[z].bb_x1[ZZ]);
5858         }
5859     }
5860 }
5861
5862 static int comp_cgsort(const void *a, const void *b)
5863 {
5864     int           comp;
5865
5866     gmx_cgsort_t *cga, *cgb;
5867     cga = (gmx_cgsort_t *)a;
5868     cgb = (gmx_cgsort_t *)b;
5869
5870     comp = cga->nsc - cgb->nsc;
5871     if (comp == 0)
5872     {
5873         comp = cga->ind_gl - cgb->ind_gl;
5874     }
5875
5876     return comp;
5877 }
5878
5879 static void order_int_cg(int n, const gmx_cgsort_t *sort,
5880                          int *a, int *buf)
5881 {
5882     int i;
5883
5884     /* Order the data */
5885     for (i = 0; i < n; i++)
5886     {
5887         buf[i] = a[sort[i].ind];
5888     }
5889
5890     /* Copy back to the original array */
5891     for (i = 0; i < n; i++)
5892     {
5893         a[i] = buf[i];
5894     }
5895 }
5896
5897 static void order_vec_cg(int                      n,
5898                          const gmx_cgsort_t      *sort,
5899                          rvec                    *v,
5900                          gmx::ArrayRef<gmx::RVec> sortBuffer)
5901 {
5902     GMX_ASSERT(sortBuffer.size() >= static_cast<size_t>(n), "Need a sufficiently large sorting buffer");
5903
5904     rvec *buf = as_rvec_array(sortBuffer.data());
5905
5906     /* Order the data */
5907     for (int i = 0; i < n; i++)
5908     {
5909         copy_rvec(v[sort[i].ind], buf[i]);
5910     }
5911
5912     /* Copy back to the original array */
5913     for (int i = 0; i < n; i++)
5914     {
5915         copy_rvec(buf[i], v[i]);
5916     }
5917 }
5918
5919 static void order_vec_atom(int                       ncg,
5920                            const gmx::BlockRanges   *atomGroups,
5921                            const gmx_cgsort_t       *sort,
5922                            gmx::ArrayRef<gmx::RVec>  v,
5923                            gmx::ArrayRef<gmx::RVec>  buf)
5924 {
5925     int a, atot, cg, cg0, cg1, i;
5926
5927     if (atomGroups == nullptr)
5928     {
5929         /* Avoid the useless loop of the atoms within a cg */
5930         order_vec_cg(ncg, sort, as_rvec_array(v.data()), buf);
5931
5932         return;
5933     }
5934
5935     /* Order the data */
5936     a = 0;
5937     for (cg = 0; cg < ncg; cg++)
5938     {
5939         cg0 = atomGroups->index[sort[cg].ind];
5940         cg1 = atomGroups->index[sort[cg].ind+1];
5941         for (i = cg0; i < cg1; i++)
5942         {
5943             copy_rvec(v[i], buf[a]);
5944             a++;
5945         }
5946     }
5947     atot = a;
5948
5949     /* Copy back to the original array */
5950     for (a = 0; a < atot; a++)
5951     {
5952         copy_rvec(buf[a], v[a]);
5953     }
5954 }
5955
5956 static void ordered_sort(int nsort2, gmx_cgsort_t *sort2,
5957                          int nsort_new, gmx_cgsort_t *sort_new,
5958                          gmx_cgsort_t *sort1)
5959 {
5960     int i1, i2, i_new;
5961
5962     /* The new indices are not very ordered, so we qsort them */
5963     gmx_qsort_threadsafe(sort_new, nsort_new, sizeof(sort_new[0]), comp_cgsort);
5964
5965     /* sort2 is already ordered, so now we can merge the two arrays */
5966     i1    = 0;
5967     i2    = 0;
5968     i_new = 0;
5969     while (i2 < nsort2 || i_new < nsort_new)
5970     {
5971         if (i2 == nsort2)
5972         {
5973             sort1[i1++] = sort_new[i_new++];
5974         }
5975         else if (i_new == nsort_new)
5976         {
5977             sort1[i1++] = sort2[i2++];
5978         }
5979         else if (sort2[i2].nsc < sort_new[i_new].nsc ||
5980                  (sort2[i2].nsc == sort_new[i_new].nsc &&
5981                   sort2[i2].ind_gl < sort_new[i_new].ind_gl))
5982         {
5983             sort1[i1++] = sort2[i2++];
5984         }
5985         else
5986         {
5987             sort1[i1++] = sort_new[i_new++];
5988         }
5989     }
5990 }
5991
5992 static int dd_sort_order(gmx_domdec_t *dd, t_forcerec *fr, int ncg_home_old)
5993 {
5994     gmx_domdec_sort_t *sort;
5995     gmx_cgsort_t      *cgsort, *sort_i;
5996     int                ncg_new, nsort2, nsort_new, i, *a, moved;
5997
5998     sort = dd->comm->sort;
5999
6000     a = fr->ns->grid->cell_index;
6001
6002     moved = NSGRID_SIGNAL_MOVED_FAC*fr->ns->grid->ncells;
6003
6004     if (ncg_home_old >= 0)
6005     {
6006         /* The charge groups that remained in the same ns grid cell
6007          * are completely ordered. So we can sort efficiently by sorting
6008          * the charge groups that did move into the stationary list.
6009          */
6010         ncg_new   = 0;
6011         nsort2    = 0;
6012         nsort_new = 0;
6013         for (i = 0; i < dd->ncg_home; i++)
6014         {
6015             /* Check if this cg did not move to another node */
6016             if (a[i] < moved)
6017             {
6018                 if (i >= ncg_home_old || a[i] != sort->sort[i].nsc)
6019                 {
6020                     /* This cg is new on this node or moved ns grid cell */
6021                     if (nsort_new >= sort->sort_new_nalloc)
6022                     {
6023                         sort->sort_new_nalloc = over_alloc_dd(nsort_new+1);
6024                         srenew(sort->sort_new, sort->sort_new_nalloc);
6025                     }
6026                     sort_i = &(sort->sort_new[nsort_new++]);
6027                 }
6028                 else
6029                 {
6030                     /* This cg did not move */
6031                     sort_i = &(sort->sort2[nsort2++]);
6032                 }
6033                 /* Sort on the ns grid cell indices
6034                  * and the global topology index.
6035                  * index_gl is irrelevant with cell ns,
6036                  * but we set it here anyhow to avoid a conditional.
6037                  */
6038                 sort_i->nsc    = a[i];
6039                 sort_i->ind_gl = dd->globalAtomGroupIndices[i];
6040                 sort_i->ind    = i;
6041                 ncg_new++;
6042             }
6043         }
6044         if (debug)
6045         {
6046             fprintf(debug, "ordered sort cgs: stationary %d moved %d\n",
6047                     nsort2, nsort_new);
6048         }
6049         /* Sort efficiently */
6050         ordered_sort(nsort2, sort->sort2, nsort_new, sort->sort_new,
6051                      sort->sort);
6052     }
6053     else
6054     {
6055         cgsort  = sort->sort;
6056         ncg_new = 0;
6057         for (i = 0; i < dd->ncg_home; i++)
6058         {
6059             /* Sort on the ns grid cell indices
6060              * and the global topology index
6061              */
6062             cgsort[i].nsc    = a[i];
6063             cgsort[i].ind_gl = dd->globalAtomGroupIndices[i];
6064             cgsort[i].ind    = i;
6065             if (cgsort[i].nsc < moved)
6066             {
6067                 ncg_new++;
6068             }
6069         }
6070         if (debug)
6071         {
6072             fprintf(debug, "qsort cgs: %d new home %d\n", dd->ncg_home, ncg_new);
6073         }
6074         /* Determine the order of the charge groups using qsort */
6075         gmx_qsort_threadsafe(cgsort, dd->ncg_home, sizeof(cgsort[0]), comp_cgsort);
6076     }
6077
6078     return ncg_new;
6079 }
6080
6081 static int dd_sort_order_nbnxn(gmx_domdec_t *dd, t_forcerec *fr)
6082 {
6083     gmx_cgsort_t *sort;
6084     int           ncg_new, i, na;
6085     const int    *a;
6086
6087     sort = dd->comm->sort->sort;
6088
6089     nbnxn_get_atomorder(fr->nbv->nbs.get(), &a, &na);
6090
6091     ncg_new = 0;
6092     for (i = 0; i < na; i++)
6093     {
6094         if (a[i] >= 0)
6095         {
6096             sort[ncg_new].ind = a[i];
6097             ncg_new++;
6098         }
6099     }
6100
6101     return ncg_new;
6102 }
6103
6104 static void dd_sort_state(gmx_domdec_t *dd, rvec *cgcm, t_forcerec *fr, t_state *state,
6105                           int ncg_home_old)
6106 {
6107     gmx_domdec_sort_t *sort;
6108     gmx_cgsort_t      *cgsort;
6109     int                ncg_new, i, *ibuf, cgsize;
6110
6111     sort = dd->comm->sort;
6112
6113     if (dd->ncg_home > sort->sort_nalloc)
6114     {
6115         sort->sort_nalloc = over_alloc_dd(dd->ncg_home);
6116         srenew(sort->sort, sort->sort_nalloc);
6117         srenew(sort->sort2, sort->sort_nalloc);
6118     }
6119     cgsort = sort->sort;
6120
6121     switch (fr->cutoff_scheme)
6122     {
6123         case ecutsGROUP:
6124             ncg_new = dd_sort_order(dd, fr, ncg_home_old);
6125             break;
6126         case ecutsVERLET:
6127             ncg_new = dd_sort_order_nbnxn(dd, fr);
6128             break;
6129         default:
6130             gmx_incons("unimplemented");
6131             ncg_new = 0;
6132     }
6133
6134     const gmx::BlockRanges &atomGroups = dd->atomGroups();
6135
6136     /* We alloc with the old size, since cgindex is still old */
6137     DDBufferAccess<gmx::RVec>  rvecBuffer(dd->comm->rvecBuffer, atomGroups.index[dd->ncg_home]);
6138
6139     const gmx::BlockRanges    *atomGroupsPtr = (dd->comm->bCGs ? &atomGroups : nullptr);
6140
6141     /* Remove the charge groups which are no longer at home here */
6142     dd->ncg_home = ncg_new;
6143     if (debug)
6144     {
6145         fprintf(debug, "Set the new home charge group count to %d\n",
6146                 dd->ncg_home);
6147     }
6148
6149     /* Reorder the state */
6150     if (state->flags & (1 << estX))
6151     {
6152         order_vec_atom(dd->ncg_home, atomGroupsPtr, cgsort, state->x, rvecBuffer.buffer);
6153     }
6154     if (state->flags & (1 << estV))
6155     {
6156         order_vec_atom(dd->ncg_home, atomGroupsPtr, cgsort, state->v, rvecBuffer.buffer);
6157     }
6158     if (state->flags & (1 << estCGP))
6159     {
6160         order_vec_atom(dd->ncg_home, atomGroupsPtr, cgsort, state->cg_p, rvecBuffer.buffer);
6161     }
6162
6163     if (fr->cutoff_scheme == ecutsGROUP)
6164     {
6165         /* Reorder cgcm */
6166         order_vec_cg(dd->ncg_home, cgsort, cgcm, rvecBuffer.buffer);
6167     }
6168
6169     if (dd->ncg_home+1 > sort->ibuf_nalloc)
6170     {
6171         sort->ibuf_nalloc = over_alloc_dd(dd->ncg_home+1);
6172         srenew(sort->ibuf, sort->ibuf_nalloc);
6173     }
6174     ibuf = sort->ibuf;
6175     /* Reorder the global cg index */
6176     order_int_cg(dd->ncg_home, cgsort, dd->globalAtomGroupIndices.data(), ibuf);
6177     /* Reorder the cginfo */
6178     order_int_cg(dd->ncg_home, cgsort, fr->cginfo, ibuf);
6179     /* Rebuild the local cg index */
6180     if (dd->comm->bCGs)
6181     {
6182         ibuf[0] = 0;
6183         for (i = 0; i < dd->ncg_home; i++)
6184         {
6185             cgsize    = atomGroups.index[cgsort[i].ind+1] - atomGroups.index[cgsort[i].ind];
6186             ibuf[i+1] = ibuf[i] + cgsize;
6187         }
6188         for (i = 0; i < dd->ncg_home+1; i++)
6189         {
6190             dd->atomGroups_.index[i] = ibuf[i];
6191         }
6192     }
6193     else
6194     {
6195         for (i = 0; i < dd->ncg_home+1; i++)
6196         {
6197             dd->atomGroups_.index[i] = i;
6198         }
6199     }
6200     /* Set the home atom number */
6201     dd->comm->atomRanges.setEnd(DDAtomRanges::Type::Home, dd->atomGroups().index[dd->ncg_home]);
6202
6203     if (fr->cutoff_scheme == ecutsVERLET)
6204     {
6205         /* The atoms are now exactly in grid order, update the grid order */
6206         nbnxn_set_atomorder(fr->nbv->nbs.get());
6207     }
6208     else
6209     {
6210         /* Copy the sorted ns cell indices back to the ns grid struct */
6211         for (i = 0; i < dd->ncg_home; i++)
6212         {
6213             fr->ns->grid->cell_index[i] = cgsort[i].nsc;
6214         }
6215         fr->ns->grid->nr = dd->ncg_home;
6216     }
6217 }
6218
6219 static void add_dd_statistics(gmx_domdec_t *dd)
6220 {
6221     gmx_domdec_comm_t *comm = dd->comm;
6222
6223     for (int i = 0; i < static_cast<int>(DDAtomRanges::Type::Number); i++)
6224     {
6225         auto range = static_cast<DDAtomRanges::Type>(i);
6226         comm->sum_nat[i] +=
6227             comm->atomRanges.end(range) - comm->atomRanges.start(range);
6228     }
6229     comm->ndecomp++;
6230 }
6231
6232 void reset_dd_statistics_counters(gmx_domdec_t *dd)
6233 {
6234     gmx_domdec_comm_t *comm = dd->comm;
6235
6236     /* Reset all the statistics and counters for total run counting */
6237     for (int i = 0; i < static_cast<int>(DDAtomRanges::Type::Number); i++)
6238     {
6239         comm->sum_nat[i] = 0;
6240     }
6241     comm->ndecomp   = 0;
6242     comm->nload     = 0;
6243     comm->load_step = 0;
6244     comm->load_sum  = 0;
6245     comm->load_max  = 0;
6246     clear_ivec(comm->load_lim);
6247     comm->load_mdf = 0;
6248     comm->load_pme = 0;
6249 }
6250
6251 void print_dd_statistics(const t_commrec *cr, const t_inputrec *ir, FILE *fplog)
6252 {
6253     gmx_domdec_comm_t *comm      = cr->dd->comm;
6254
6255     const int          numRanges = static_cast<int>(DDAtomRanges::Type::Number);
6256     gmx_sumd(numRanges, comm->sum_nat, cr);
6257
6258     if (fplog == nullptr)
6259     {
6260         return;
6261     }
6262
6263     fprintf(fplog, "\n    D O M A I N   D E C O M P O S I T I O N   S T A T I S T I C S\n\n");
6264
6265     for (int i = static_cast<int>(DDAtomRanges::Type::Zones); i < numRanges; i++)
6266     {
6267         auto   range = static_cast<DDAtomRanges::Type>(i);
6268         double av    = comm->sum_nat[i]/comm->ndecomp;
6269         switch (range)
6270         {
6271             case DDAtomRanges::Type::Zones:
6272                 fprintf(fplog,
6273                         " av. #atoms communicated per step for force:  %d x %.1f\n",
6274                         2, av);
6275                 break;
6276             case DDAtomRanges::Type::Vsites:
6277                 if (cr->dd->vsite_comm)
6278                 {
6279                     fprintf(fplog,
6280                             " av. #atoms communicated per step for vsites: %d x %.1f\n",
6281                             (EEL_PME(ir->coulombtype) || ir->coulombtype == eelEWALD) ? 3 : 2,
6282                             av);
6283                 }
6284                 break;
6285             case DDAtomRanges::Type::Constraints:
6286                 if (cr->dd->constraint_comm)
6287                 {
6288                     fprintf(fplog,
6289                             " av. #atoms communicated per step for LINCS:  %d x %.1f\n",
6290                             1 + ir->nLincsIter, av);
6291                 }
6292                 break;
6293             default:
6294                 gmx_incons(" Unknown type for DD statistics");
6295         }
6296     }
6297     fprintf(fplog, "\n");
6298
6299     if (comm->bRecordLoad && EI_DYNAMICS(ir->eI))
6300     {
6301         print_dd_load_av(fplog, cr->dd);
6302     }
6303 }
6304
6305 void dd_partition_system(FILE                *fplog,
6306                          gmx_int64_t          step,
6307                          const t_commrec     *cr,
6308                          gmx_bool             bMasterState,
6309                          int                  nstglobalcomm,
6310                          t_state             *state_global,
6311                          const gmx_mtop_t    *top_global,
6312                          const t_inputrec    *ir,
6313                          t_state             *state_local,
6314                          PaddedRVecVector    *f,
6315                          gmx::MDAtoms        *mdAtoms,
6316                          gmx_localtop_t      *top_local,
6317                          t_forcerec          *fr,
6318                          gmx_vsite_t         *vsite,
6319                          gmx::Constraints    *constr,
6320                          t_nrnb              *nrnb,
6321                          gmx_wallcycle       *wcycle,
6322                          gmx_bool             bVerbose)
6323 {
6324     gmx_domdec_t      *dd;
6325     gmx_domdec_comm_t *comm;
6326     gmx_ddbox_t        ddbox = {0};
6327     t_block           *cgs_gl;
6328     gmx_int64_t        step_pcoupl;
6329     rvec               cell_ns_x0, cell_ns_x1;
6330     int                ncgindex_set, ncg_home_old = -1, ncg_moved, nat_f_novirsum;
6331     gmx_bool           bBoxChanged, bNStGlobalComm, bDoDLB, bCheckWhetherToTurnDlbOn, bLogLoad;
6332     gmx_bool           bRedist, bSortCG, bResortAll;
6333     ivec               ncells_old = {0, 0, 0}, ncells_new = {0, 0, 0}, np;
6334     real               grid_density;
6335     char               sbuf[22];
6336
6337     wallcycle_start(wcycle, ewcDOMDEC);
6338
6339     dd   = cr->dd;
6340     comm = dd->comm;
6341
6342     // TODO if the update code becomes accessible here, use
6343     // upd->deform for this logic.
6344     bBoxChanged = (bMasterState || inputrecDeform(ir));
6345     if (ir->epc != epcNO)
6346     {
6347         /* With nstpcouple > 1 pressure coupling happens.
6348          * one step after calculating the pressure.
6349          * Box scaling happens at the end of the MD step,
6350          * after the DD partitioning.
6351          * We therefore have to do DLB in the first partitioning
6352          * after an MD step where P-coupling occurred.
6353          * We need to determine the last step in which p-coupling occurred.
6354          * MRS -- need to validate this for vv?
6355          */
6356         int n = ir->nstpcouple;
6357         if (n == 1)
6358         {
6359             step_pcoupl = step - 1;
6360         }
6361         else
6362         {
6363             step_pcoupl = ((step - 1)/n)*n + 1;
6364         }
6365         if (step_pcoupl >= comm->partition_step)
6366         {
6367             bBoxChanged = TRUE;
6368         }
6369     }
6370
6371     bNStGlobalComm = (step % nstglobalcomm == 0);
6372
6373     if (!isDlbOn(comm))
6374     {
6375         bDoDLB = FALSE;
6376     }
6377     else
6378     {
6379         /* Should we do dynamic load balacing this step?
6380          * Since it requires (possibly expensive) global communication,
6381          * we might want to do DLB less frequently.
6382          */
6383         if (bBoxChanged || ir->epc != epcNO)
6384         {
6385             bDoDLB = bBoxChanged;
6386         }
6387         else
6388         {
6389             bDoDLB = bNStGlobalComm;
6390         }
6391     }
6392
6393     /* Check if we have recorded loads on the nodes */
6394     if (comm->bRecordLoad && dd_load_count(comm) > 0)
6395     {
6396         bCheckWhetherToTurnDlbOn = dd_dlb_get_should_check_whether_to_turn_dlb_on(dd);
6397
6398         /* Print load every nstlog, first and last step to the log file */
6399         bLogLoad = ((ir->nstlog > 0 && step % ir->nstlog == 0) ||
6400                     comm->n_load_collect == 0 ||
6401                     (ir->nsteps >= 0 &&
6402                      (step + ir->nstlist > ir->init_step + ir->nsteps)));
6403
6404         /* Avoid extra communication due to verbose screen output
6405          * when nstglobalcomm is set.
6406          */
6407         if (bDoDLB || bLogLoad || bCheckWhetherToTurnDlbOn ||
6408             (bVerbose && (ir->nstlist == 0 || nstglobalcomm <= ir->nstlist)))
6409         {
6410             get_load_distribution(dd, wcycle);
6411             if (DDMASTER(dd))
6412             {
6413                 if (bLogLoad)
6414                 {
6415                     dd_print_load(fplog, dd, step-1);
6416                 }
6417                 if (bVerbose)
6418                 {
6419                     dd_print_load_verbose(dd);
6420                 }
6421             }
6422             comm->n_load_collect++;
6423
6424             if (isDlbOn(comm))
6425             {
6426                 if (DDMASTER(dd))
6427                 {
6428                     /* Add the measured cycles to the running average */
6429                     const float averageFactor        = 0.1f;
6430                     comm->cyclesPerStepDlbExpAverage =
6431                         (1 - averageFactor)*comm->cyclesPerStepDlbExpAverage +
6432                         averageFactor*comm->cycl[ddCyclStep]/comm->cycl_n[ddCyclStep];
6433                 }
6434                 if (comm->dlbState == edlbsOnCanTurnOff &&
6435                     dd->comm->n_load_have % c_checkTurnDlbOffInterval == c_checkTurnDlbOffInterval - 1)
6436                 {
6437                     gmx_bool turnOffDlb;
6438                     if (DDMASTER(dd))
6439                     {
6440                         /* If the running averaged cycles with DLB are more
6441                          * than before we turned on DLB, turn off DLB.
6442                          * We will again run and check the cycles without DLB
6443                          * and we can then decide if to turn off DLB forever.
6444                          */
6445                         turnOffDlb = (comm->cyclesPerStepDlbExpAverage >
6446                                       comm->cyclesPerStepBeforeDLB);
6447                     }
6448                     dd_bcast(dd, sizeof(turnOffDlb), &turnOffDlb);
6449                     if (turnOffDlb)
6450                     {
6451                         /* To turn off DLB, we need to redistribute the atoms */
6452                         dd_collect_state(dd, state_local, state_global);
6453                         bMasterState = TRUE;
6454                         turn_off_dlb(fplog, cr, step);
6455                     }
6456                 }
6457             }
6458             else if (bCheckWhetherToTurnDlbOn)
6459             {
6460                 gmx_bool turnOffDlbForever = FALSE;
6461                 gmx_bool turnOnDlb         = FALSE;
6462
6463                 /* Since the timings are node dependent, the master decides */
6464                 if (DDMASTER(dd))
6465                 {
6466                     /* If we recently turned off DLB, we want to check if
6467                      * performance is better without DLB. We want to do this
6468                      * ASAP to minimize the chance that external factors
6469                      * slowed down the DLB step are gone here and we
6470                      * incorrectly conclude that DLB was causing the slowdown.
6471                      * So we measure one nstlist block, no running average.
6472                      */
6473                     if (comm->haveTurnedOffDlb &&
6474                         comm->cycl[ddCyclStep]/comm->cycl_n[ddCyclStep] <
6475                         comm->cyclesPerStepDlbExpAverage)
6476                     {
6477                         /* After turning off DLB we ran nstlist steps in fewer
6478                          * cycles than with DLB. This likely means that DLB
6479                          * in not benefical, but this could be due to a one
6480                          * time unlucky fluctuation, so we require two such
6481                          * observations in close succession to turn off DLB
6482                          * forever.
6483                          */
6484                         if (comm->dlbSlowerPartitioningCount > 0 &&
6485                             dd->ddp_count < comm->dlbSlowerPartitioningCount + 10*c_checkTurnDlbOnInterval)
6486                         {
6487                             turnOffDlbForever = TRUE;
6488                         }
6489                         comm->haveTurnedOffDlb           = false;
6490                         /* Register when we last measured DLB slowdown */
6491                         comm->dlbSlowerPartitioningCount = dd->ddp_count;
6492                     }
6493                     else
6494                     {
6495                         /* Here we check if the max PME rank load is more than 0.98
6496                          * the max PP force load. If so, PP DLB will not help,
6497                          * since we are (almost) limited by PME. Furthermore,
6498                          * DLB will cause a significant extra x/f redistribution
6499                          * cost on the PME ranks, which will then surely result
6500                          * in lower total performance.
6501                          */
6502                         if (cr->npmenodes > 0 &&
6503                             dd_pme_f_ratio(dd) > 1 - DD_PERF_LOSS_DLB_ON)
6504                         {
6505                             turnOnDlb = FALSE;
6506                         }
6507                         else
6508                         {
6509                             turnOnDlb = (dd_force_imb_perf_loss(dd) >= DD_PERF_LOSS_DLB_ON);
6510                         }
6511                     }
6512                 }
6513                 struct
6514                 {
6515                     gmx_bool turnOffDlbForever;
6516                     gmx_bool turnOnDlb;
6517                 }
6518                 bools {
6519                     turnOffDlbForever, turnOnDlb
6520                 };
6521                 dd_bcast(dd, sizeof(bools), &bools);
6522                 if (bools.turnOffDlbForever)
6523                 {
6524                     turn_off_dlb_forever(fplog, cr, step);
6525                 }
6526                 else if (bools.turnOnDlb)
6527                 {
6528                     turn_on_dlb(fplog, cr, step);
6529                     bDoDLB = TRUE;
6530                 }
6531             }
6532         }
6533         comm->n_load_have++;
6534     }
6535
6536     cgs_gl = &comm->cgs_gl;
6537
6538     bRedist = FALSE;
6539     if (bMasterState)
6540     {
6541         /* Clear the old state */
6542         clearDDStateIndices(dd, 0, 0);
6543         ncgindex_set = 0;
6544
6545         auto xGlobal = positionsFromStatePointer(state_global);
6546
6547         set_ddbox(dd, true, ir,
6548                   DDMASTER(dd) ? state_global->box : nullptr,
6549                   true, xGlobal,
6550                   &ddbox);
6551
6552         distributeState(fplog, dd, state_global, *cgs_gl, ddbox, state_local, f);
6553
6554         dd_make_local_cgs(dd, &top_local->cgs);
6555
6556         /* Ensure that we have space for the new distribution */
6557         dd_check_alloc_ncg(fr, state_local, f, dd->ncg_home);
6558
6559         if (fr->cutoff_scheme == ecutsGROUP)
6560         {
6561             calc_cgcm(fplog, 0, dd->ncg_home,
6562                       &top_local->cgs, as_rvec_array(state_local->x.data()), fr->cg_cm);
6563         }
6564
6565         inc_nrnb(nrnb, eNR_CGCM, comm->atomRanges.numHomeAtoms());
6566
6567         dd_set_cginfo(dd->globalAtomGroupIndices, 0, dd->ncg_home, fr, comm->bLocalCG);
6568     }
6569     else if (state_local->ddp_count != dd->ddp_count)
6570     {
6571         if (state_local->ddp_count > dd->ddp_count)
6572         {
6573             gmx_fatal(FARGS, "Internal inconsistency state_local->ddp_count (%d) > dd->ddp_count (%d)", state_local->ddp_count, dd->ddp_count);
6574         }
6575
6576         if (state_local->ddp_count_cg_gl != state_local->ddp_count)
6577         {
6578             gmx_fatal(FARGS, "Internal inconsistency state_local->ddp_count_cg_gl (%d) != state_local->ddp_count (%d)", state_local->ddp_count_cg_gl, state_local->ddp_count);
6579         }
6580
6581         /* Clear the old state */
6582         clearDDStateIndices(dd, 0, 0);
6583
6584         /* Restore the atom group indices from state_local */
6585         restoreAtomGroups(dd, cgs_gl->index, state_local);
6586         make_dd_indices(dd, cgs_gl->index, 0);
6587         ncgindex_set = dd->ncg_home;
6588
6589         if (fr->cutoff_scheme == ecutsGROUP)
6590         {
6591             /* Redetermine the cg COMs */
6592             calc_cgcm(fplog, 0, dd->ncg_home,
6593                       &top_local->cgs, as_rvec_array(state_local->x.data()), fr->cg_cm);
6594         }
6595
6596         inc_nrnb(nrnb, eNR_CGCM, comm->atomRanges.numHomeAtoms());
6597
6598         dd_set_cginfo(dd->globalAtomGroupIndices, 0, dd->ncg_home, fr, comm->bLocalCG);
6599
6600         set_ddbox(dd, bMasterState, ir, state_local->box,
6601                   true, state_local->x, &ddbox);
6602
6603         bRedist = isDlbOn(comm);
6604     }
6605     else
6606     {
6607         /* We have the full state, only redistribute the cgs */
6608
6609         /* Clear the non-home indices */
6610         clearDDStateIndices(dd, dd->ncg_home, comm->atomRanges.numHomeAtoms());
6611         ncgindex_set = 0;
6612
6613         /* Avoid global communication for dim's without pbc and -gcom */
6614         if (!bNStGlobalComm)
6615         {
6616             copy_rvec(comm->box0, ddbox.box0    );
6617             copy_rvec(comm->box_size, ddbox.box_size);
6618         }
6619         set_ddbox(dd, bMasterState, ir, state_local->box,
6620                   bNStGlobalComm, state_local->x, &ddbox);
6621
6622         bBoxChanged = TRUE;
6623         bRedist     = TRUE;
6624     }
6625     /* For dim's without pbc and -gcom */
6626     copy_rvec(ddbox.box0, comm->box0    );
6627     copy_rvec(ddbox.box_size, comm->box_size);
6628
6629     set_dd_cell_sizes(dd, &ddbox, dynamic_dd_box(&ddbox, ir), bMasterState, bDoDLB,
6630                       step, wcycle);
6631
6632     if (comm->nstDDDumpGrid > 0 && step % comm->nstDDDumpGrid == 0)
6633     {
6634         write_dd_grid_pdb("dd_grid", step, dd, state_local->box, &ddbox);
6635     }
6636
6637     /* Check if we should sort the charge groups */
6638     bSortCG = (bMasterState || bRedist);
6639
6640     ncg_home_old = dd->ncg_home;
6641
6642     /* When repartitioning we mark charge groups that will move to neighboring
6643      * DD cells, but we do not move them right away for performance reasons.
6644      * Thus we need to keep track of how many charge groups will move for
6645      * obtaining correct local charge group / atom counts.
6646      */
6647     ncg_moved = 0;
6648     if (bRedist)
6649     {
6650         wallcycle_sub_start(wcycle, ewcsDD_REDIST);
6651
6652         dd_redistribute_cg(fplog, step, dd, ddbox.tric_dir,
6653                            state_local, f, fr,
6654                            !bSortCG, nrnb, &ncgindex_set, &ncg_moved);
6655
6656         wallcycle_sub_stop(wcycle, ewcsDD_REDIST);
6657     }
6658
6659     get_nsgrid_boundaries(ddbox.nboundeddim, state_local->box,
6660                           dd, &ddbox,
6661                           &comm->cell_x0, &comm->cell_x1,
6662                           dd->ncg_home, fr->cg_cm,
6663                           cell_ns_x0, cell_ns_x1, &grid_density);
6664
6665     if (bBoxChanged)
6666     {
6667         comm_dd_ns_cell_sizes(dd, &ddbox, cell_ns_x0, cell_ns_x1, step);
6668     }
6669
6670     switch (fr->cutoff_scheme)
6671     {
6672         case ecutsGROUP:
6673             copy_ivec(fr->ns->grid->n, ncells_old);
6674             grid_first(fplog, fr->ns->grid, dd, &ddbox,
6675                        state_local->box, cell_ns_x0, cell_ns_x1,
6676                        fr->rlist, grid_density);
6677             break;
6678         case ecutsVERLET:
6679             nbnxn_get_ncells(fr->nbv->nbs.get(), &ncells_old[XX], &ncells_old[YY]);
6680             break;
6681         default:
6682             gmx_incons("unimplemented");
6683     }
6684     /* We need to store tric_dir for dd_get_ns_ranges called from ns.c */
6685     copy_ivec(ddbox.tric_dir, comm->tric_dir);
6686
6687     if (bSortCG)
6688     {
6689         wallcycle_sub_start(wcycle, ewcsDD_GRID);
6690
6691         /* Sort the state on charge group position.
6692          * This enables exact restarts from this step.
6693          * It also improves performance by about 15% with larger numbers
6694          * of atoms per node.
6695          */
6696
6697         /* Fill the ns grid with the home cell,
6698          * so we can sort with the indices.
6699          */
6700         set_zones_ncg_home(dd);
6701
6702         switch (fr->cutoff_scheme)
6703         {
6704             case ecutsVERLET:
6705                 set_zones_size(dd, state_local->box, &ddbox, 0, 1, ncg_moved);
6706
6707                 nbnxn_put_on_grid(fr->nbv->nbs.get(), fr->ePBC, state_local->box,
6708                                   0,
6709                                   comm->zones.size[0].bb_x0,
6710                                   comm->zones.size[0].bb_x1,
6711                                   0, dd->ncg_home,
6712                                   comm->zones.dens_zone0,
6713                                   fr->cginfo,
6714                                   as_rvec_array(state_local->x.data()),
6715                                   ncg_moved, bRedist ? comm->movedBuffer.data() : nullptr,
6716                                   fr->nbv->grp[eintLocal].kernel_type,
6717                                   fr->nbv->nbat);
6718
6719                 nbnxn_get_ncells(fr->nbv->nbs.get(), &ncells_new[XX], &ncells_new[YY]);
6720                 break;
6721             case ecutsGROUP:
6722                 fill_grid(&comm->zones, fr->ns->grid, dd->ncg_home,
6723                           0, dd->ncg_home, fr->cg_cm);
6724
6725                 copy_ivec(fr->ns->grid->n, ncells_new);
6726                 break;
6727             default:
6728                 gmx_incons("unimplemented");
6729         }
6730
6731         bResortAll = bMasterState;
6732
6733         /* Check if we can user the old order and ns grid cell indices
6734          * of the charge groups to sort the charge groups efficiently.
6735          */
6736         if (ncells_new[XX] != ncells_old[XX] ||
6737             ncells_new[YY] != ncells_old[YY] ||
6738             ncells_new[ZZ] != ncells_old[ZZ])
6739         {
6740             bResortAll = TRUE;
6741         }
6742
6743         if (debug)
6744         {
6745             fprintf(debug, "Step %s, sorting the %d home charge groups\n",
6746                     gmx_step_str(step, sbuf), dd->ncg_home);
6747         }
6748         dd_sort_state(dd, fr->cg_cm, fr, state_local,
6749                       bResortAll ? -1 : ncg_home_old);
6750
6751         /* After sorting and compacting we set the correct size */
6752         dd_resize_state(state_local, f, comm->atomRanges.numHomeAtoms());
6753
6754         /* Rebuild all the indices */
6755         ga2la_clear(dd->ga2la);
6756         ncgindex_set = 0;
6757
6758         wallcycle_sub_stop(wcycle, ewcsDD_GRID);
6759     }
6760
6761     wallcycle_sub_start(wcycle, ewcsDD_SETUPCOMM);
6762
6763     /* Setup up the communication and communicate the coordinates */
6764     setup_dd_communication(dd, state_local->box, &ddbox, fr, state_local, f);
6765
6766     /* Set the indices */
6767     make_dd_indices(dd, cgs_gl->index, ncgindex_set);
6768
6769     /* Set the charge group boundaries for neighbor searching */
6770     set_cg_boundaries(&comm->zones);
6771
6772     if (fr->cutoff_scheme == ecutsVERLET)
6773     {
6774         /* When bSortCG=true, we have already set the size for zone 0 */
6775         set_zones_size(dd, state_local->box, &ddbox,
6776                        bSortCG ? 1 : 0, comm->zones.n,
6777                        0);
6778     }
6779
6780     wallcycle_sub_stop(wcycle, ewcsDD_SETUPCOMM);
6781
6782     /*
6783        write_dd_pdb("dd_home",step,"dump",top_global,cr,
6784                  -1,as_rvec_array(state_local->x.data()),state_local->box);
6785      */
6786
6787     wallcycle_sub_start(wcycle, ewcsDD_MAKETOP);
6788
6789     /* Extract a local topology from the global topology */
6790     for (int i = 0; i < dd->ndim; i++)
6791     {
6792         np[dd->dim[i]] = comm->cd[i].numPulses();
6793     }
6794     dd_make_local_top(dd, &comm->zones, dd->npbcdim, state_local->box,
6795                       comm->cellsize_min, np,
6796                       fr,
6797                       fr->cutoff_scheme == ecutsGROUP ? fr->cg_cm : as_rvec_array(state_local->x.data()),
6798                       vsite, top_global, top_local);
6799
6800     wallcycle_sub_stop(wcycle, ewcsDD_MAKETOP);
6801
6802     wallcycle_sub_start(wcycle, ewcsDD_MAKECONSTR);
6803
6804     /* Set up the special atom communication */
6805     int n = comm->atomRanges.end(DDAtomRanges::Type::Zones);
6806     for (int i = static_cast<int>(DDAtomRanges::Type::Zones) + 1; i < static_cast<int>(DDAtomRanges::Type::Number); i++)
6807     {
6808         auto range = static_cast<DDAtomRanges::Type>(i);
6809         switch (range)
6810         {
6811             case DDAtomRanges::Type::Vsites:
6812                 if (vsite && vsite->n_intercg_vsite)
6813                 {
6814                     n = dd_make_local_vsites(dd, n, top_local->idef.il);
6815                 }
6816                 break;
6817             case DDAtomRanges::Type::Constraints:
6818                 if (dd->bInterCGcons || dd->bInterCGsettles)
6819                 {
6820                     /* Only for inter-cg constraints we need special code */
6821                     n = dd_make_local_constraints(dd, n, top_global, fr->cginfo,
6822                                                   constr, ir->nProjOrder,
6823                                                   top_local->idef.il);
6824                 }
6825                 break;
6826             default:
6827                 gmx_incons("Unknown special atom type setup");
6828         }
6829         comm->atomRanges.setEnd(range, n);
6830     }
6831
6832     wallcycle_sub_stop(wcycle, ewcsDD_MAKECONSTR);
6833
6834     wallcycle_sub_start(wcycle, ewcsDD_TOPOTHER);
6835
6836     /* Make space for the extra coordinates for virtual site
6837      * or constraint communication.
6838      */
6839     state_local->natoms = comm->atomRanges.numAtomsTotal();
6840
6841     dd_resize_state(state_local, f, state_local->natoms);
6842
6843     if (fr->haveDirectVirialContributions)
6844     {
6845         if (vsite && vsite->n_intercg_vsite)
6846         {
6847             nat_f_novirsum = comm->atomRanges.end(DDAtomRanges::Type::Vsites);
6848         }
6849         else
6850         {
6851             if (EEL_FULL(ir->coulombtype) && dd->n_intercg_excl > 0)
6852             {
6853                 nat_f_novirsum = comm->atomRanges.end(DDAtomRanges::Type::Zones);
6854             }
6855             else
6856             {
6857                 nat_f_novirsum = comm->atomRanges.numHomeAtoms();
6858             }
6859         }
6860     }
6861     else
6862     {
6863         nat_f_novirsum = 0;
6864     }
6865
6866     /* Set the number of atoms required for the force calculation.
6867      * Forces need to be constrained when doing energy
6868      * minimization. For simple simulations we could avoid some
6869      * allocation, zeroing and copying, but this is probably not worth
6870      * the complications and checking.
6871      */
6872     forcerec_set_ranges(fr, dd->ncg_home, dd->globalAtomGroupIndices.size(),
6873                         comm->atomRanges.end(DDAtomRanges::Type::Zones),
6874                         comm->atomRanges.end(DDAtomRanges::Type::Constraints),
6875                         nat_f_novirsum);
6876
6877     /* Update atom data for mdatoms and several algorithms */
6878     mdAlgorithmsSetupAtomData(cr, ir, top_global, top_local, fr,
6879                               nullptr, mdAtoms, constr, vsite, nullptr);
6880
6881     auto mdatoms = mdAtoms->mdatoms();
6882     if (!thisRankHasDuty(cr, DUTY_PME))
6883     {
6884         /* Send the charges and/or c6/sigmas to our PME only node */
6885         gmx_pme_send_parameters(cr,
6886                                 fr->ic,
6887                                 mdatoms->nChargePerturbed, mdatoms->nTypePerturbed,
6888                                 mdatoms->chargeA, mdatoms->chargeB,
6889                                 mdatoms->sqrt_c6A, mdatoms->sqrt_c6B,
6890                                 mdatoms->sigmaA, mdatoms->sigmaB,
6891                                 dd_pme_maxshift_x(dd), dd_pme_maxshift_y(dd));
6892     }
6893
6894     if (ir->bPull)
6895     {
6896         /* Update the local pull groups */
6897         dd_make_local_pull_groups(cr, ir->pull_work, mdatoms);
6898     }
6899
6900     if (ir->bRot)
6901     {
6902         /* Update the local rotation groups */
6903         dd_make_local_rotation_groups(dd, ir->rot);
6904     }
6905
6906     if (ir->eSwapCoords != eswapNO)
6907     {
6908         /* Update the local groups needed for ion swapping */
6909         dd_make_local_swap_groups(dd, ir->swap);
6910     }
6911
6912     /* Update the local atoms to be communicated via the IMD protocol if bIMD is TRUE. */
6913     dd_make_local_IMD_atoms(ir->bIMD, dd, ir->imd);
6914
6915     add_dd_statistics(dd);
6916
6917     /* Make sure we only count the cycles for this DD partitioning */
6918     clear_dd_cycle_counts(dd);
6919
6920     /* Because the order of the atoms might have changed since
6921      * the last vsite construction, we need to communicate the constructing
6922      * atom coordinates again (for spreading the forces this MD step).
6923      */
6924     dd_move_x_vsites(dd, state_local->box, as_rvec_array(state_local->x.data()));
6925
6926     wallcycle_sub_stop(wcycle, ewcsDD_TOPOTHER);
6927
6928     if (comm->nstDDDump > 0 && step % comm->nstDDDump == 0)
6929     {
6930         dd_move_x(dd, state_local->box, state_local->x, nullWallcycle);
6931         write_dd_pdb("dd_dump", step, "dump", top_global, cr,
6932                      -1, as_rvec_array(state_local->x.data()), state_local->box);
6933     }
6934
6935     /* Store the partitioning step */
6936     comm->partition_step = step;
6937
6938     /* Increase the DD partitioning counter */
6939     dd->ddp_count++;
6940     /* The state currently matches this DD partitioning count, store it */
6941     state_local->ddp_count = dd->ddp_count;
6942     if (bMasterState)
6943     {
6944         /* The DD master node knows the complete cg distribution,
6945          * store the count so we can possibly skip the cg info communication.
6946          */
6947         comm->master_cg_ddp_count = (bSortCG ? 0 : dd->ddp_count);
6948     }
6949
6950     if (comm->DD_debug > 0)
6951     {
6952         /* Set the env var GMX_DD_DEBUG if you suspect corrupted indices */
6953         check_index_consistency(dd, top_global->natoms, ncg_mtop(top_global),
6954                                 "after partitioning");
6955     }
6956
6957     wallcycle_stop(wcycle, ewcDOMDEC);
6958 }
6959
6960 /*! \brief Check whether bonded interactions are missing, if appropriate */
6961 void checkNumberOfBondedInteractions(FILE                 *fplog,
6962                                      t_commrec            *cr,
6963                                      int                   totalNumberOfBondedInteractions,
6964                                      const gmx_mtop_t     *top_global,
6965                                      const gmx_localtop_t *top_local,
6966                                      const t_state        *state,
6967                                      bool                 *shouldCheckNumberOfBondedInteractions)
6968 {
6969     if (*shouldCheckNumberOfBondedInteractions)
6970     {
6971         if (totalNumberOfBondedInteractions != cr->dd->nbonded_global)
6972         {
6973             dd_print_missing_interactions(fplog, cr, totalNumberOfBondedInteractions, top_global, top_local, state); // Does not return
6974         }
6975         *shouldCheckNumberOfBondedInteractions = false;
6976     }
6977 }