Apply clang-format to source tree
[alexxy/gromacs.git] / src / gromacs / domdec / cellsizes.cpp
1 /*
2  * This file is part of the GROMACS molecular simulation package.
3  *
4  * Copyright (c) 2018,2019, by the GROMACS development team, led by
5  * Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6  * and including many others, as listed in the AUTHORS file in the
7  * top-level source directory and at http://www.gromacs.org.
8  *
9  * GROMACS is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public License
11  * as published by the Free Software Foundation; either version 2.1
12  * of the License, or (at your option) any later version.
13  *
14  * GROMACS is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Lesser General Public License for more details.
18  *
19  * You should have received a copy of the GNU Lesser General Public
20  * License along with GROMACS; if not, see
21  * http://www.gnu.org/licenses, or write to the Free Software Foundation,
22  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
23  *
24  * If you want to redistribute modifications to GROMACS, please
25  * consider that scientific software is very special. Version
26  * control is crucial - bugs must be traceable. We will be happy to
27  * consider code for inclusion in the official distribution, but
28  * derived work must not be called official GROMACS. Details are found
29  * in the README & COPYING files - if they are missing, get the
30  * official version at http://www.gromacs.org.
31  *
32  * To help us fund GROMACS development, we humbly ask that you cite
33  * the research papers on the package. Check out http://www.gromacs.org.
34  */
35 /* \internal \file
36  *
37  * \brief Implements DD cell-size related functions.
38  *
39  * \author Berk Hess <hess@kth.se>
40  * \author Roland Schulz <roland.schulz@intel.com>
41  * \ingroup module_domdec
42  */
43
44 #include "gmxpre.h"
45
46 #include "cellsizes.h"
47
48 #include "config.h"
49
50 #include "gromacs/gmxlib/network.h"
51 #include "gromacs/math/vec.h"
52 #include "gromacs/mdtypes/commrec.h"
53 #include "gromacs/utility/cstringutil.h"
54 #include "gromacs/utility/fatalerror.h"
55
56 #include "atomdistribution.h"
57 #include "domdec_internal.h"
58 #include "utility.h"
59
60 static void set_pme_maxshift(gmx_domdec_t*      dd,
61                              gmx_ddpme_t*       ddpme,
62                              gmx_bool           bUniform,
63                              const gmx_ddbox_t* ddbox,
64                              const real*        cellFrac)
65 {
66     gmx_domdec_comm_t* comm;
67     int                nc, ns, s;
68     int *              xmin, *xmax;
69     real               range, pme_boundary;
70     int                sh;
71
72     comm = dd->comm;
73     nc   = dd->nc[ddpme->dim];
74     ns   = ddpme->nslab;
75
76     if (!ddpme->dim_match)
77     {
78         /* PP decomposition is not along dim: the worst situation */
79         sh = ns / 2;
80     }
81     else if (ns <= 3 || (bUniform && ns == nc))
82     {
83         /* The optimal situation */
84         sh = 1;
85     }
86     else
87     {
88         /* We need to check for all pme nodes which nodes they
89          * could possibly need to communicate with.
90          */
91         xmin = ddpme->pp_min;
92         xmax = ddpme->pp_max;
93         /* Allow for atoms to be maximally 2/3 times the cut-off
94          * out of their DD cell. This is a reasonable balance between
95          * between performance and support for most charge-group/cut-off
96          * combinations.
97          */
98         range = 2.0 / 3.0 * comm->systemInfo.cutoff / ddbox->box_size[ddpme->dim];
99         /* Avoid extra communication when we are exactly at a boundary */
100         range *= 0.999;
101
102         sh = 1;
103         for (s = 0; s < ns; s++)
104         {
105             /* PME slab s spreads atoms between box frac. s/ns and (s+1)/ns */
106             pme_boundary = static_cast<real>(s) / ns;
107             while (sh + 1 < ns
108                    && ((s - (sh + 1) >= 0 && cellFrac[xmax[s - (sh + 1)] + 1] + range > pme_boundary)
109                        || (s - (sh + 1) < 0 && cellFrac[xmax[s - (sh + 1) + ns] + 1] - 1 + range > pme_boundary)))
110             {
111                 sh++;
112             }
113             pme_boundary = static_cast<real>(s + 1) / ns;
114             while (sh + 1 < ns
115                    && ((s + (sh + 1) < ns && cellFrac[xmin[s + (sh + 1)]] - range < pme_boundary)
116                        || (s + (sh + 1) >= ns && cellFrac[xmin[s + (sh + 1) - ns]] + 1 - range < pme_boundary)))
117             {
118                 sh++;
119             }
120         }
121     }
122
123     ddpme->maxshift = sh;
124
125     if (debug)
126     {
127         fprintf(debug, "PME slab communication range for dim %d is %d\n", ddpme->dim, ddpme->maxshift);
128     }
129 }
130
131 static void check_box_size(const gmx_domdec_t* dd, const gmx_ddbox_t* ddbox)
132 {
133     int d, dim;
134
135     for (d = 0; d < dd->ndim; d++)
136     {
137         dim = dd->dim[d];
138         if (dim < ddbox->nboundeddim
139             && ddbox->box_size[dim] * ddbox->skew_fac[dim]
140                        < dd->nc[dim] * dd->comm->cellsize_limit * DD_CELL_MARGIN)
141         {
142             gmx_fatal(
143                     FARGS,
144                     "The %c-size of the box (%f) times the triclinic skew factor (%f) is smaller "
145                     "than the number of DD cells (%d) times the smallest allowed cell size (%f)\n",
146                     dim2char(dim), ddbox->box_size[dim], ddbox->skew_fac[dim], dd->nc[dim],
147                     dd->comm->cellsize_limit);
148         }
149     }
150 }
151
152 real grid_jump_limit(const gmx_domdec_comm_t* comm, real cutoff, int dim_ind)
153 {
154     real grid_jump_limit;
155
156     /* The distance between the boundaries of cells at distance
157      * x+-1,y+-1 or y+-1,z+-1 is limited by the cut-off restrictions
158      * and by the fact that cells should not be shifted by more than
159      * half their size, such that cg's only shift by one cell
160      * at redecomposition.
161      */
162     grid_jump_limit = comm->cellsize_limit;
163     if (!comm->bVacDLBNoLimit)
164     {
165         if (comm->bPMELoadBalDLBLimits)
166         {
167             cutoff = std::max(cutoff, comm->PMELoadBal_max_cutoff);
168         }
169         grid_jump_limit = std::max(grid_jump_limit, cutoff / comm->cd[dim_ind].numPulses());
170     }
171
172     return grid_jump_limit;
173 }
174
175 /* This function should be used for moving the domain boudaries during DLB,
176  * for obtaining the minimum cell size. It checks the initially set limit
177  * comm->cellsize_min, for bonded and initial non-bonded cut-offs,
178  * and, possibly, a longer cut-off limit set for PME load balancing.
179  */
180 static real cellsize_min_dlb(gmx_domdec_comm_t* comm, int dim_ind, int dim)
181 {
182     real cellsize_min;
183
184     cellsize_min = comm->cellsize_min[dim];
185
186     if (!comm->bVacDLBNoLimit)
187     {
188         /* The cut-off might have changed, e.g. by PME load balacning,
189          * from the value used to set comm->cellsize_min, so check it.
190          */
191         cellsize_min = std::max(cellsize_min, comm->systemInfo.cutoff / comm->cd[dim_ind].np_dlb);
192
193         if (comm->bPMELoadBalDLBLimits)
194         {
195             /* Check for the cut-off limit set by the PME load balancing */
196             cellsize_min = std::max(cellsize_min, comm->PMELoadBal_max_cutoff / comm->cd[dim_ind].np_dlb);
197         }
198     }
199
200     return cellsize_min;
201 }
202
203 /* Set the domain boundaries. Use for static (or no) load balancing,
204  * and also for the starting state for dynamic load balancing.
205  * setmode determine if and where the boundaries are stored, use enum above.
206  * Returns the number communication pulses in npulse.
207  */
208 gmx::ArrayRef<const std::vector<real>>
209 set_dd_cell_sizes_slb(gmx_domdec_t* dd, const gmx_ddbox_t* ddbox, int setmode, ivec npulse)
210 {
211     gmx_domdec_comm_t* comm = dd->comm;
212
213     gmx::ArrayRef<std::vector<real>> cell_x_master;
214     if (setmode == setcellsizeslbMASTER)
215     {
216         cell_x_master = dd->ma->cellSizesBuffer;
217     }
218
219     rvec cellsize_min;
220     for (int d = 0; d < DIM; d++)
221     {
222         cellsize_min[d] = ddbox->box_size[d] * ddbox->skew_fac[d];
223         npulse[d]       = 1;
224         if (dd->nc[d] == 1 || comm->slb_frac[d] == nullptr)
225         {
226             /* Uniform grid */
227             real cell_dx = ddbox->box_size[d] / dd->nc[d];
228             switch (setmode)
229             {
230                 case setcellsizeslbMASTER:
231                     for (int j = 0; j < dd->nc[d] + 1; j++)
232                     {
233                         cell_x_master[d][j] = ddbox->box0[d] + j * cell_dx;
234                     }
235                     break;
236                 case setcellsizeslbLOCAL:
237                     comm->cell_x0[d] = ddbox->box0[d] + (dd->ci[d]) * cell_dx;
238                     comm->cell_x1[d] = ddbox->box0[d] + (dd->ci[d] + 1) * cell_dx;
239                     break;
240                 default: break;
241             }
242             real cellsize = cell_dx * ddbox->skew_fac[d];
243             while (cellsize * npulse[d] < comm->systemInfo.cutoff)
244             {
245                 npulse[d]++;
246             }
247             cellsize_min[d] = cellsize;
248         }
249         else
250         {
251             /* Statically load balanced grid */
252             /* Also when we are not doing a master distribution we determine
253              * all cell borders in a loop to obtain identical values
254              * to the master distribution case and to determine npulse.
255              */
256             gmx::ArrayRef<real> cell_x;
257             std::vector<real>   cell_x_buffer;
258             if (setmode == setcellsizeslbMASTER)
259             {
260                 cell_x = cell_x_master[d];
261             }
262             else
263             {
264                 cell_x_buffer.resize(dd->nc[d] + 1);
265                 cell_x = cell_x_buffer;
266             }
267             cell_x[0] = ddbox->box0[d];
268             for (int j = 0; j < dd->nc[d]; j++)
269             {
270                 real cell_dx  = ddbox->box_size[d] * comm->slb_frac[d][j];
271                 cell_x[j + 1] = cell_x[j] + cell_dx;
272                 real cellsize = cell_dx * ddbox->skew_fac[d];
273                 while (cellsize * npulse[d] < comm->systemInfo.cutoff && npulse[d] < dd->nc[d] - 1)
274                 {
275                     npulse[d]++;
276                 }
277                 cellsize_min[d] = std::min(cellsize_min[d], cellsize);
278             }
279             if (setmode == setcellsizeslbLOCAL)
280             {
281                 comm->cell_x0[d] = cell_x[dd->ci[d]];
282                 comm->cell_x1[d] = cell_x[dd->ci[d] + 1];
283             }
284         }
285         /* The following limitation is to avoid that a cell would receive
286          * some of its own home charge groups back over the periodic boundary.
287          * Double charge groups cause trouble with the global indices.
288          */
289         if (d < ddbox->npbcdim && dd->nc[d] > 1 && npulse[d] >= dd->nc[d])
290         {
291             char error_string[STRLEN];
292
293             sprintf(error_string,
294                     "The box size in direction %c (%f) times the triclinic skew factor (%f) is too "
295                     "small for a cut-off of %f with %d domain decomposition cells, use 1 or more "
296                     "than %d %s or increase the box size in this direction",
297                     dim2char(d), ddbox->box_size[d], ddbox->skew_fac[d], comm->systemInfo.cutoff,
298                     dd->nc[d], dd->nc[d], dd->nnodes > dd->nc[d] ? "cells" : "ranks");
299
300             if (setmode == setcellsizeslbLOCAL)
301             {
302                 gmx_fatal_collective(FARGS, dd->mpi_comm_all, DDMASTER(dd), "%s", error_string);
303             }
304             else
305             {
306                 gmx_fatal(FARGS, "%s", error_string);
307             }
308         }
309     }
310
311     if (!isDlbOn(comm))
312     {
313         copy_rvec(cellsize_min, comm->cellsize_min);
314     }
315
316     DDRankSetup& ddRankSetup = comm->ddRankSetup;
317     for (int d = 0; d < ddRankSetup.npmedecompdim; d++)
318     {
319         set_pme_maxshift(dd, &ddRankSetup.ddpme[d], comm->slb_frac[dd->dim[d]] == nullptr, ddbox,
320                          ddRankSetup.ddpme[d].slb_dim_f);
321     }
322
323     return cell_x_master;
324 }
325
326
327 static void dd_cell_sizes_dlb_root_enforce_limits(gmx_domdec_t*      dd,
328                                                   int                d,
329                                                   int                dim,
330                                                   RowMaster*         rowMaster,
331                                                   const gmx_ddbox_t* ddbox,
332                                                   gmx_bool           bUniform,
333                                                   int64_t            step,
334                                                   real               cellsize_limit_f,
335                                                   int                range[])
336 {
337     gmx_domdec_comm_t* comm;
338     real               halfway, cellsize_limit_f_i, region_size;
339     gmx_bool           bLastHi  = FALSE;
340     int                nrange[] = { range[0], range[1] };
341
342     region_size = rowMaster->cellFrac[range[1]] - rowMaster->cellFrac[range[0]];
343
344     GMX_ASSERT(region_size >= (range[1] - range[0]) * cellsize_limit_f,
345                "The region should fit all cells at minimum size");
346
347     comm = dd->comm;
348
349     const int ncd = dd->nc[dim];
350
351     const bool dimHasPbc = (dim < ddbox->npbcdim);
352
353     gmx::ArrayRef<real> cell_size = rowMaster->buf_ncd;
354
355     if (debug)
356     {
357         fprintf(debug, "enforce_limits: %d %d\n", range[0], range[1]);
358     }
359
360     /* First we need to check if the scaling does not make cells
361      * smaller than the smallest allowed size.
362      * We need to do this iteratively, since if a cell is too small,
363      * it needs to be enlarged, which makes all the other cells smaller,
364      * which could in turn make another cell smaller than allowed.
365      */
366     for (int i = range[0]; i < range[1]; i++)
367     {
368         rowMaster->isCellMin[i] = false;
369     }
370     int nmin = 0;
371     int nmin_old;
372     do
373     {
374         nmin_old = nmin;
375         /* We need the total for normalization */
376         real fac = 0;
377         for (int i = range[0]; i < range[1]; i++)
378         {
379             if (!rowMaster->isCellMin[i])
380             {
381                 fac += cell_size[i];
382             }
383         }
384         /* This condition is ensured by the assertion at the end of the loop */
385         GMX_ASSERT(fac > 0, "We cannot have 0 size to work with");
386         fac = (region_size - nmin * cellsize_limit_f)
387               / fac; /* substracting cells already set to cellsize_limit_f */
388         /* Determine the cell boundaries */
389         for (int i = range[0]; i < range[1]; i++)
390         {
391             if (!rowMaster->isCellMin[i])
392             {
393                 cell_size[i] *= fac;
394                 if (!dimHasPbc && (i == 0 || i == dd->nc[dim] - 1))
395                 {
396                     cellsize_limit_f_i = 0;
397                 }
398                 else
399                 {
400                     cellsize_limit_f_i = cellsize_limit_f;
401                 }
402                 if (cell_size[i] < cellsize_limit_f_i)
403                 {
404                     rowMaster->isCellMin[i] = true;
405                     cell_size[i]            = cellsize_limit_f_i;
406                     nmin++;
407                 }
408             }
409             rowMaster->cellFrac[i + 1] = rowMaster->cellFrac[i] + cell_size[i];
410         }
411
412         /* This is ensured by the assertion at the beginning of this function */
413         GMX_ASSERT(nmin < range[1] - range[0], "We can not have all cells limited");
414     } while (nmin > nmin_old);
415
416     const int i  = range[1] - 1;
417     cell_size[i] = rowMaster->cellFrac[i + 1] - rowMaster->cellFrac[i];
418     /* For this check we should not use DD_CELL_MARGIN,
419      * but a slightly smaller factor,
420      * since rounding could get use below the limit.
421      */
422     if (dimHasPbc && cell_size[i] < cellsize_limit_f * DD_CELL_MARGIN2 / DD_CELL_MARGIN)
423     {
424         char buf[22];
425         gmx_fatal(FARGS,
426                   "step %s: the dynamic load balancing could not balance dimension %c: box size "
427                   "%f, triclinic skew factor %f, #cells %d, minimum cell size %f\n",
428                   gmx_step_str(step, buf), dim2char(dim), ddbox->box_size[dim],
429                   ddbox->skew_fac[dim], ncd, comm->cellsize_min[dim]);
430     }
431
432     rowMaster->dlbIsLimited = (nmin > 0) || (range[0] > 0) || (range[1] < ncd);
433
434     if (!bUniform)
435     {
436         /* Check if the boundary did not displace more than halfway
437          * each of the cells it bounds, as this could cause problems,
438          * especially when the differences between cell sizes are large.
439          * If changes are applied, they will not make cells smaller
440          * than the cut-off, as we check all the boundaries which
441          * might be affected by a change and if the old state was ok,
442          * the cells will at most be shrunk back to their old size.
443          */
444         for (int i = range[0] + 1; i < range[1]; i++)
445         {
446             halfway = 0.5 * (rowMaster->oldCellFrac[i] + rowMaster->oldCellFrac[i - 1]);
447             if (rowMaster->cellFrac[i] < halfway)
448             {
449                 rowMaster->cellFrac[i] = halfway;
450                 /* Check if the change also causes shifts of the next boundaries */
451                 for (int j = i + 1; j < range[1]; j++)
452                 {
453                     if (rowMaster->cellFrac[j] < rowMaster->cellFrac[j - 1] + cellsize_limit_f)
454                     {
455                         rowMaster->cellFrac[j] = rowMaster->cellFrac[j - 1] + cellsize_limit_f;
456                     }
457                 }
458             }
459             halfway = 0.5 * (rowMaster->oldCellFrac[i] + rowMaster->oldCellFrac[i + 1]);
460             if (rowMaster->cellFrac[i] > halfway)
461             {
462                 rowMaster->cellFrac[i] = halfway;
463                 /* Check if the change also causes shifts of the next boundaries */
464                 for (int j = i - 1; j >= range[0] + 1; j--)
465                 {
466                     if (rowMaster->cellFrac[j] > rowMaster->cellFrac[j + 1] - cellsize_limit_f)
467                     {
468                         rowMaster->cellFrac[j] = rowMaster->cellFrac[j + 1] - cellsize_limit_f;
469                     }
470                 }
471             }
472         }
473     }
474
475     /* nrange is defined as [lower, upper) range for new call to enforce_limits */
476     /* find highest violation of LimLo (a) and the following violation of LimHi (thus the lowest
477      * following) (b) then call enforce_limits for (oldb,a), (a,b). In the next step: (b,nexta).
478      * oldb and nexta can be the boundaries. for a and b nrange is used */
479     if (d > 0)
480     {
481         /* Take care of the staggering of the cell boundaries */
482         if (bUniform)
483         {
484             for (int i = range[0]; i < range[1]; i++)
485             {
486                 rowMaster->bounds[i].cellFracLowerMax = rowMaster->cellFrac[i];
487                 rowMaster->bounds[i].cellFracUpperMin = rowMaster->cellFrac[i + 1];
488             }
489         }
490         else
491         {
492             for (int i = range[0] + 1; i < range[1]; i++)
493             {
494                 const RowMaster::Bounds& bounds = rowMaster->bounds[i];
495
496                 bool bLimLo = (rowMaster->cellFrac[i] < bounds.boundMin);
497                 bool bLimHi = (rowMaster->cellFrac[i] > bounds.boundMax);
498                 if (bLimLo && bLimHi)
499                 {
500                     /* Both limits violated, try the best we can */
501                     /* For this case we split the original range (range) in two parts and care about the other limitiations in the next iteration. */
502                     rowMaster->cellFrac[i] = 0.5 * (bounds.boundMin + bounds.boundMax);
503                     nrange[0]              = range[0];
504                     nrange[1]              = i;
505                     dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, rowMaster, ddbox, bUniform,
506                                                           step, cellsize_limit_f, nrange);
507
508                     nrange[0] = i;
509                     nrange[1] = range[1];
510                     dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, rowMaster, ddbox, bUniform,
511                                                           step, cellsize_limit_f, nrange);
512
513                     return;
514                 }
515                 else if (bLimLo)
516                 {
517                     /* rowMaster->cellFrac[i] = rowMaster->boundMin[i]; */
518                     nrange[1] = i; /* only store violation location. There could be a LimLo violation following with an higher index */
519                     bLastHi   = FALSE;
520                 }
521                 else if (bLimHi && !bLastHi)
522                 {
523                     bLastHi = TRUE;
524                     if (nrange[1] < range[1]) /* found a LimLo before */
525                     {
526                         rowMaster->cellFrac[nrange[1]] = rowMaster->bounds[nrange[1]].boundMin;
527                         dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, rowMaster, ddbox, bUniform,
528                                                               step, cellsize_limit_f, nrange);
529                         nrange[0] = nrange[1];
530                     }
531                     rowMaster->cellFrac[i] = rowMaster->bounds[i].boundMax;
532                     nrange[1]              = i;
533                     dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, rowMaster, ddbox, bUniform,
534                                                           step, cellsize_limit_f, nrange);
535                     nrange[0] = i;
536                     nrange[1] = range[1];
537                 }
538             }
539             if (nrange[1] < range[1]) /* found last a LimLo */
540             {
541                 rowMaster->cellFrac[nrange[1]] = rowMaster->bounds[nrange[1]].boundMin;
542                 dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, rowMaster, ddbox, bUniform, step,
543                                                       cellsize_limit_f, nrange);
544                 nrange[0] = nrange[1];
545                 nrange[1] = range[1];
546                 dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, rowMaster, ddbox, bUniform, step,
547                                                       cellsize_limit_f, nrange);
548             }
549             else if (nrange[0] > range[0]) /* found at least one LimHi */
550             {
551                 dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, rowMaster, ddbox, bUniform, step,
552                                                       cellsize_limit_f, nrange);
553             }
554         }
555     }
556 }
557
558
559 static void set_dd_cell_sizes_dlb_root(gmx_domdec_t*      dd,
560                                        int                d,
561                                        int                dim,
562                                        RowMaster*         rowMaster,
563                                        const gmx_ddbox_t* ddbox,
564                                        gmx_bool           bDynamicBox,
565                                        gmx_bool           bUniform,
566                                        int64_t            step)
567 {
568     gmx_domdec_comm_t* comm    = dd->comm;
569     constexpr real     c_relax = 0.5;
570     int                range[] = { 0, 0 };
571
572     /* Convert the maximum change from the input percentage to a fraction */
573     const real change_limit = comm->ddSettings.dlb_scale_lim * 0.01;
574
575     const int ncd = dd->nc[dim];
576
577     const bool bPBC = (dim < ddbox->npbcdim);
578
579     gmx::ArrayRef<real> cell_size = rowMaster->buf_ncd;
580
581     /* Store the original boundaries */
582     for (int i = 0; i < ncd + 1; i++)
583     {
584         rowMaster->oldCellFrac[i] = rowMaster->cellFrac[i];
585     }
586     if (bUniform)
587     {
588         for (int i = 0; i < ncd; i++)
589         {
590             cell_size[i] = 1.0 / ncd;
591         }
592     }
593     else if (dd_load_count(comm) > 0)
594     {
595         real load_aver  = comm->load[d].sum_m / ncd;
596         real change_max = 0;
597         real load_i;
598         real change;
599         for (int i = 0; i < ncd; i++)
600         {
601             /* Determine the relative imbalance of cell i */
602             load_i         = comm->load[d].load[i * comm->load[d].nload + 2];
603             real imbalance = (load_i - load_aver) / (load_aver > 0 ? load_aver : 1);
604             /* Determine the change of the cell size using underrelaxation */
605             change     = -c_relax * imbalance;
606             change_max = std::max(change_max, std::max(change, -change));
607         }
608         /* Limit the amount of scaling.
609          * We need to use the same rescaling for all cells in one row,
610          * otherwise the load balancing might not converge.
611          */
612         real sc = c_relax;
613         if (change_max > change_limit)
614         {
615             sc *= change_limit / change_max;
616         }
617         for (int i = 0; i < ncd; i++)
618         {
619             /* Determine the relative imbalance of cell i */
620             load_i         = comm->load[d].load[i * comm->load[d].nload + 2];
621             real imbalance = (load_i - load_aver) / (load_aver > 0 ? load_aver : 1);
622             /* Determine the change of the cell size using underrelaxation */
623             change       = -sc * imbalance;
624             cell_size[i] = (rowMaster->cellFrac[i + 1] - rowMaster->cellFrac[i]) * (1 + change);
625         }
626     }
627
628     real cellsize_limit_f = cellsize_min_dlb(comm, d, dim) / ddbox->box_size[dim];
629     cellsize_limit_f *= DD_CELL_MARGIN;
630     real dist_min_f_hard = grid_jump_limit(comm, comm->systemInfo.cutoff, d) / ddbox->box_size[dim];
631     real dist_min_f      = dist_min_f_hard * DD_CELL_MARGIN;
632     if (ddbox->tric_dir[dim])
633     {
634         cellsize_limit_f /= ddbox->skew_fac[dim];
635         dist_min_f /= ddbox->skew_fac[dim];
636     }
637     if (bDynamicBox && d > 0)
638     {
639         dist_min_f *= DD_PRES_SCALE_MARGIN;
640     }
641     if (d > 0 && !bUniform)
642     {
643         /* Make sure that the grid is not shifted too much */
644         for (int i = 1; i < ncd; i++)
645         {
646             const RowMaster::Bounds& boundsNeighbor = rowMaster->bounds[i - 1];
647             RowMaster::Bounds&       bounds         = rowMaster->bounds[i];
648
649             if (bounds.cellFracUpperMin - boundsNeighbor.cellFracLowerMax < 2 * dist_min_f_hard)
650             {
651                 gmx_incons("Inconsistent DD boundary staggering limits!");
652             }
653             bounds.boundMin = boundsNeighbor.cellFracLowerMax + dist_min_f;
654             real space = rowMaster->cellFrac[i] - (boundsNeighbor.cellFracLowerMax + dist_min_f);
655             if (space > 0)
656             {
657                 bounds.boundMin += 0.5 * space;
658             }
659             bounds.boundMax = bounds.cellFracUpperMin - dist_min_f;
660             space           = rowMaster->cellFrac[i] - (bounds.cellFracUpperMin - dist_min_f);
661             if (space < 0)
662             {
663                 rowMaster->bounds[i].boundMax += 0.5 * space;
664             }
665             if (debug)
666             {
667                 fprintf(debug, "dim %d boundary %d %.3f < %.3f < %.3f < %.3f < %.3f\n", d, i,
668                         boundsNeighbor.cellFracLowerMax + dist_min_f, bounds.boundMin,
669                         rowMaster->cellFrac[i], bounds.boundMax, bounds.cellFracUpperMin - dist_min_f);
670             }
671         }
672     }
673     range[1]                 = ncd;
674     rowMaster->cellFrac[0]   = 0;
675     rowMaster->cellFrac[ncd] = 1;
676     dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, rowMaster, ddbox, bUniform, step,
677                                           cellsize_limit_f, range);
678
679
680     /* After the checks above, the cells should obey the cut-off
681      * restrictions, but it does not hurt to check.
682      */
683     for (int i = 0; i < ncd; i++)
684     {
685         if (debug)
686         {
687             fprintf(debug, "Relative bounds dim %d  cell %d: %f %f\n", dim, i,
688                     rowMaster->cellFrac[i], rowMaster->cellFrac[i + 1]);
689         }
690
691         if ((bPBC || (i != 0 && i != dd->nc[dim] - 1))
692             && rowMaster->cellFrac[i + 1] - rowMaster->cellFrac[i] < cellsize_limit_f / DD_CELL_MARGIN)
693         {
694             char buf[22];
695             fprintf(stderr, "\nWARNING step %s: direction %c, cell %d too small: %f\n",
696                     gmx_step_str(step, buf), dim2char(dim), i,
697                     (rowMaster->cellFrac[i + 1] - rowMaster->cellFrac[i]) * ddbox->box_size[dim]
698                             * ddbox->skew_fac[dim]);
699         }
700     }
701
702     int pos = ncd + 1;
703     /* Store the cell boundaries of the lower dimensions at the end */
704     for (int d1 = 0; d1 < d; d1++)
705     {
706         rowMaster->cellFrac[pos++] = comm->cellsizesWithDlb[d1].fracLower;
707         rowMaster->cellFrac[pos++] = comm->cellsizesWithDlb[d1].fracUpper;
708     }
709
710     DDRankSetup& ddRankSetup = comm->ddRankSetup;
711     if (d < ddRankSetup.npmedecompdim)
712     {
713         /* The master determines the maximum shift for
714          * the coordinate communication between separate PME nodes.
715          */
716         set_pme_maxshift(dd, &ddRankSetup.ddpme[d], bUniform, ddbox, rowMaster->cellFrac.data());
717     }
718     rowMaster->cellFrac[pos++] = ddRankSetup.ddpme[0].maxshift;
719     if (d >= 1)
720     {
721         rowMaster->cellFrac[pos++] = ddRankSetup.ddpme[1].maxshift;
722     }
723 }
724
725 static void relative_to_absolute_cell_bounds(gmx_domdec_t* dd, const gmx_ddbox_t* ddbox, int dimind)
726 {
727     gmx_domdec_comm_t*        comm      = dd->comm;
728     const DDCellsizesWithDlb& cellsizes = comm->cellsizesWithDlb[dimind];
729
730     /* Set the cell dimensions */
731     int dim            = dd->dim[dimind];
732     comm->cell_x0[dim] = cellsizes.fracLower * ddbox->box_size[dim];
733     comm->cell_x1[dim] = cellsizes.fracUpper * ddbox->box_size[dim];
734     if (dim >= ddbox->nboundeddim)
735     {
736         comm->cell_x0[dim] += ddbox->box0[dim];
737         comm->cell_x1[dim] += ddbox->box0[dim];
738     }
739 }
740
741 static void distribute_dd_cell_sizes_dlb(gmx_domdec_t*       dd,
742                                          int                 d,
743                                          int                 dim,
744                                          gmx::ArrayRef<real> cellFracRow,
745                                          const gmx_ddbox_t*  ddbox)
746 {
747     gmx_domdec_comm_t& comm = *dd->comm;
748
749 #if GMX_MPI
750     /* Each node would only need to know two fractions,
751      * but it is probably cheaper to broadcast the whole array.
752      */
753     MPI_Bcast(cellFracRow.data(), ddCellFractionBufferSize(dd, d) * sizeof(real), MPI_BYTE, 0,
754               comm.mpi_comm_load[d]);
755 #endif
756     /* Copy the fractions for this dimension from the buffer */
757     comm.cellsizesWithDlb[d].fracLower = cellFracRow[dd->ci[dim]];
758     comm.cellsizesWithDlb[d].fracUpper = cellFracRow[dd->ci[dim] + 1];
759     /* The whole array was communicated, so set the buffer position */
760     int pos = dd->nc[dim] + 1;
761     for (int d1 = 0; d1 <= d; d1++)
762     {
763         if (d1 < d)
764         {
765             /* Copy the cell fractions of the lower dimensions */
766             comm.cellsizesWithDlb[d1].fracLower = cellFracRow[pos++];
767             comm.cellsizesWithDlb[d1].fracUpper = cellFracRow[pos++];
768         }
769         relative_to_absolute_cell_bounds(dd, ddbox, d1);
770     }
771     /* Convert the communicated shift from float to int */
772     comm.ddRankSetup.ddpme[0].maxshift = gmx::roundToInt(cellFracRow[pos++]);
773     if (d >= 1)
774     {
775         comm.ddRankSetup.ddpme[1].maxshift = gmx::roundToInt(cellFracRow[pos++]);
776     }
777 }
778
779 static void set_dd_cell_sizes_dlb_change(gmx_domdec_t*      dd,
780                                          const gmx_ddbox_t* ddbox,
781                                          gmx_bool           bDynamicBox,
782                                          gmx_bool           bUniform,
783                                          int64_t            step)
784 {
785     for (int d = 0; d < dd->ndim; d++)
786     {
787         const int dim         = dd->dim[d];
788         bool      isRowMember = true;
789         bool      isRowRoot   = true;
790         for (int d1 = d; d1 < dd->ndim; d1++)
791         {
792             if (dd->ci[dd->dim[d1]] > 0)
793             {
794                 if (d1 != d)
795                 {
796                     isRowMember = false;
797                 }
798                 isRowRoot = false;
799             }
800         }
801         if (isRowMember)
802         {
803             DDCellsizesWithDlb& cellsizes = dd->comm->cellsizesWithDlb[d];
804             gmx::ArrayRef<real> cellFracRow;
805
806             if (isRowRoot)
807             {
808                 RowMaster* rowMaster = cellsizes.rowMaster.get();
809                 set_dd_cell_sizes_dlb_root(dd, d, dim, rowMaster, ddbox, bDynamicBox, bUniform, step);
810                 cellFracRow = rowMaster->cellFrac;
811             }
812             else
813             {
814                 cellFracRow = cellsizes.fracRow;
815             }
816             distribute_dd_cell_sizes_dlb(dd, d, dim, cellFracRow, ddbox);
817         }
818     }
819 }
820
821 static void set_dd_cell_sizes_dlb_nochange(gmx_domdec_t* dd, const gmx_ddbox_t* ddbox)
822 {
823     int d;
824
825     /* This function assumes the box is static and should therefore
826      * not be called when the box has changed since the last
827      * call to dd_partition_system.
828      */
829     for (d = 0; d < dd->ndim; d++)
830     {
831         relative_to_absolute_cell_bounds(dd, ddbox, d);
832     }
833 }
834
835
836 static void set_dd_cell_sizes_dlb(gmx_domdec_t*      dd,
837                                   const gmx_ddbox_t* ddbox,
838                                   gmx_bool           bDynamicBox,
839                                   gmx_bool           bUniform,
840                                   gmx_bool           bDoDLB,
841                                   int64_t            step,
842                                   gmx_wallcycle_t    wcycle)
843 {
844     gmx_domdec_comm_t* comm;
845     int                dim;
846
847     comm = dd->comm;
848
849     if (bDoDLB)
850     {
851         wallcycle_start(wcycle, ewcDDCOMMBOUND);
852         set_dd_cell_sizes_dlb_change(dd, ddbox, bDynamicBox, bUniform, step);
853         wallcycle_stop(wcycle, ewcDDCOMMBOUND);
854     }
855     else if (bDynamicBox)
856     {
857         set_dd_cell_sizes_dlb_nochange(dd, ddbox);
858     }
859
860     /* Set the dimensions for which no DD is used */
861     for (dim = 0; dim < DIM; dim++)
862     {
863         if (dd->nc[dim] == 1)
864         {
865             comm->cell_x0[dim] = 0;
866             comm->cell_x1[dim] = ddbox->box_size[dim];
867             if (dim >= ddbox->nboundeddim)
868             {
869                 comm->cell_x0[dim] += ddbox->box0[dim];
870                 comm->cell_x1[dim] += ddbox->box0[dim];
871             }
872         }
873     }
874 }
875
876 void set_dd_cell_sizes(gmx_domdec_t*      dd,
877                        const gmx_ddbox_t* ddbox,
878                        gmx_bool           bDynamicBox,
879                        gmx_bool           bUniform,
880                        gmx_bool           bDoDLB,
881                        int64_t            step,
882                        gmx_wallcycle_t    wcycle)
883 {
884     gmx_domdec_comm_t* comm = dd->comm;
885
886     /* Copy the old cell boundaries for the cg displacement check */
887     copy_rvec(comm->cell_x0, comm->old_cell_x0);
888     copy_rvec(comm->cell_x1, comm->old_cell_x1);
889
890     if (isDlbOn(comm))
891     {
892         if (DDMASTER(dd))
893         {
894             check_box_size(dd, ddbox);
895         }
896         set_dd_cell_sizes_dlb(dd, ddbox, bDynamicBox, bUniform, bDoDLB, step, wcycle);
897     }
898     else
899     {
900         ivec numPulses;
901         set_dd_cell_sizes_slb(dd, ddbox, setcellsizeslbLOCAL, numPulses);
902
903         /* Check if the change in cell size requires a different number
904          * of communication pulses and if so change the number.
905          */
906         for (int d = 0; d < dd->ndim; d++)
907         {
908             gmx_domdec_comm_dim_t& cd           = comm->cd[d];
909             int                    numPulsesDim = numPulses[dd->dim[d]];
910             if (cd.numPulses() != numPulsesDim)
911             {
912                 if (debug)
913                 {
914                     fprintf(debug,
915                             "Changing the number of halo communication pulses along dim %c from %d "
916                             "to %d\n",
917                             dim2char(dd->dim[d]), cd.numPulses(), numPulsesDim);
918                 }
919                 cd.ind.resize(numPulsesDim);
920             }
921         }
922     }
923
924     if (debug)
925     {
926         for (int d = 0; d < DIM; d++)
927         {
928             fprintf(debug, "cell_x[%d] %f - %f skew_fac %f\n", d, comm->cell_x0[d],
929                     comm->cell_x1[d], ddbox->skew_fac[d]);
930         }
931     }
932 }