Extract DDRankSetup from gmx_domdec_comm_t
[alexxy/gromacs.git] / src / gromacs / domdec / cellsizes.cpp
1 /*
2  * This file is part of the GROMACS molecular simulation package.
3  *
4  * Copyright (c) 2018,2019, by the GROMACS development team, led by
5  * Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6  * and including many others, as listed in the AUTHORS file in the
7  * top-level source directory and at http://www.gromacs.org.
8  *
9  * GROMACS is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public License
11  * as published by the Free Software Foundation; either version 2.1
12  * of the License, or (at your option) any later version.
13  *
14  * GROMACS is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Lesser General Public License for more details.
18  *
19  * You should have received a copy of the GNU Lesser General Public
20  * License along with GROMACS; if not, see
21  * http://www.gnu.org/licenses, or write to the Free Software Foundation,
22  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
23  *
24  * If you want to redistribute modifications to GROMACS, please
25  * consider that scientific software is very special. Version
26  * control is crucial - bugs must be traceable. We will be happy to
27  * consider code for inclusion in the official distribution, but
28  * derived work must not be called official GROMACS. Details are found
29  * in the README & COPYING files - if they are missing, get the
30  * official version at http://www.gromacs.org.
31  *
32  * To help us fund GROMACS development, we humbly ask that you cite
33  * the research papers on the package. Check out http://www.gromacs.org.
34  */
35 /* \internal \file
36  *
37  * \brief Implements DD cell-size related functions.
38  *
39  * \author Berk Hess <hess@kth.se>
40  * \author Roland Schulz <roland.schulz@intel.com>
41  * \ingroup module_domdec
42  */
43
44 #include "gmxpre.h"
45
46 #include "cellsizes.h"
47
48 #include "config.h"
49
50 #include "gromacs/gmxlib/network.h"
51 #include "gromacs/math/vec.h"
52 #include "gromacs/mdtypes/commrec.h"
53 #include "gromacs/utility/cstringutil.h"
54 #include "gromacs/utility/fatalerror.h"
55
56 #include "atomdistribution.h"
57 #include "domdec_internal.h"
58 #include "utility.h"
59
60 static void set_pme_maxshift(gmx_domdec_t *dd, gmx_ddpme_t *ddpme,
61                              gmx_bool bUniform, const gmx_ddbox_t *ddbox,
62                              const real *cellFrac)
63 {
64     gmx_domdec_comm_t *comm;
65     int                nc, ns, s;
66     int               *xmin, *xmax;
67     real               range, pme_boundary;
68     int                sh;
69
70     comm = dd->comm;
71     nc   = dd->nc[ddpme->dim];
72     ns   = ddpme->nslab;
73
74     if (!ddpme->dim_match)
75     {
76         /* PP decomposition is not along dim: the worst situation */
77         sh = ns/2;
78     }
79     else if (ns <= 3 || (bUniform && ns == nc))
80     {
81         /* The optimal situation */
82         sh = 1;
83     }
84     else
85     {
86         /* We need to check for all pme nodes which nodes they
87          * could possibly need to communicate with.
88          */
89         xmin = ddpme->pp_min;
90         xmax = ddpme->pp_max;
91         /* Allow for atoms to be maximally 2/3 times the cut-off
92          * out of their DD cell. This is a reasonable balance between
93          * between performance and support for most charge-group/cut-off
94          * combinations.
95          */
96         range  = 2.0/3.0*comm->systemInfo.cutoff/ddbox->box_size[ddpme->dim];
97         /* Avoid extra communication when we are exactly at a boundary */
98         range *= 0.999;
99
100         sh = 1;
101         for (s = 0; s < ns; s++)
102         {
103             /* PME slab s spreads atoms between box frac. s/ns and (s+1)/ns */
104             pme_boundary = static_cast<real>(s)/ns;
105             while (sh + 1 < ns &&
106                    ((s - (sh + 1) >= 0 &&
107                      cellFrac[xmax[s - (sh + 1)     ] + 1]     + range > pme_boundary) ||
108                     (s - (sh + 1) <  0 &&
109                      cellFrac[xmax[s - (sh + 1) + ns] + 1] - 1 + range > pme_boundary)))
110             {
111                 sh++;
112             }
113             pme_boundary = static_cast<real>(s+1)/ns;
114             while (sh + 1 < ns &&
115                    ((s + (sh + 1) <  ns &&
116                      cellFrac[xmin[s + (sh + 1)    ]  ]     - range < pme_boundary) ||
117                     (s + (sh + 1) >= ns &&
118                      cellFrac[xmin[s +(sh + 1) - ns]  ] + 1 - range < pme_boundary)))
119             {
120                 sh++;
121             }
122         }
123     }
124
125     ddpme->maxshift = sh;
126
127     if (debug)
128     {
129         fprintf(debug, "PME slab communication range for dim %d is %d\n",
130                 ddpme->dim, ddpme->maxshift);
131     }
132 }
133
134 static void check_box_size(const gmx_domdec_t *dd,
135                            const gmx_ddbox_t  *ddbox)
136 {
137     int d, dim;
138
139     for (d = 0; d < dd->ndim; d++)
140     {
141         dim = dd->dim[d];
142         if (dim < ddbox->nboundeddim &&
143             ddbox->box_size[dim]*ddbox->skew_fac[dim] <
144             dd->nc[dim]*dd->comm->cellsize_limit*DD_CELL_MARGIN)
145         {
146             gmx_fatal(FARGS, "The %c-size of the box (%f) times the triclinic skew factor (%f) is smaller than the number of DD cells (%d) times the smallest allowed cell size (%f)\n",
147                       dim2char(dim), ddbox->box_size[dim], ddbox->skew_fac[dim],
148                       dd->nc[dim], dd->comm->cellsize_limit);
149         }
150     }
151 }
152
153 real grid_jump_limit(const gmx_domdec_comm_t *comm,
154                      real                     cutoff,
155                      int                      dim_ind)
156 {
157     real grid_jump_limit;
158
159     /* The distance between the boundaries of cells at distance
160      * x+-1,y+-1 or y+-1,z+-1 is limited by the cut-off restrictions
161      * and by the fact that cells should not be shifted by more than
162      * half their size, such that cg's only shift by one cell
163      * at redecomposition.
164      */
165     grid_jump_limit = comm->cellsize_limit;
166     if (!comm->bVacDLBNoLimit)
167     {
168         if (comm->bPMELoadBalDLBLimits)
169         {
170             cutoff = std::max(cutoff, comm->PMELoadBal_max_cutoff);
171         }
172         grid_jump_limit = std::max(grid_jump_limit,
173                                    cutoff/comm->cd[dim_ind].numPulses());
174     }
175
176     return grid_jump_limit;
177 }
178
179 /* This function should be used for moving the domain boudaries during DLB,
180  * for obtaining the minimum cell size. It checks the initially set limit
181  * comm->cellsize_min, for bonded and initial non-bonded cut-offs,
182  * and, possibly, a longer cut-off limit set for PME load balancing.
183  */
184 static real cellsize_min_dlb(gmx_domdec_comm_t *comm, int dim_ind, int dim)
185 {
186     real cellsize_min;
187
188     cellsize_min = comm->cellsize_min[dim];
189
190     if (!comm->bVacDLBNoLimit)
191     {
192         /* The cut-off might have changed, e.g. by PME load balacning,
193          * from the value used to set comm->cellsize_min, so check it.
194          */
195         cellsize_min = std::max(cellsize_min, comm->systemInfo.cutoff/comm->cd[dim_ind].np_dlb);
196
197         if (comm->bPMELoadBalDLBLimits)
198         {
199             /* Check for the cut-off limit set by the PME load balancing */
200             cellsize_min = std::max(cellsize_min, comm->PMELoadBal_max_cutoff/comm->cd[dim_ind].np_dlb);
201         }
202     }
203
204     return cellsize_min;
205 }
206
207 /* Set the domain boundaries. Use for static (or no) load balancing,
208  * and also for the starting state for dynamic load balancing.
209  * setmode determine if and where the boundaries are stored, use enum above.
210  * Returns the number communication pulses in npulse.
211  */
212 gmx::ArrayRef < const std::vector < real>>
213 set_dd_cell_sizes_slb(gmx_domdec_t *dd, const gmx_ddbox_t *ddbox,
214                       int setmode, ivec npulse)
215 {
216     gmx_domdec_comm_t *comm = dd->comm;
217
218     gmx::ArrayRef < std::vector < real>> cell_x_master;
219     if (setmode == setcellsizeslbMASTER)
220     {
221         cell_x_master = dd->ma->cellSizesBuffer;
222     }
223
224     rvec cellsize_min;
225     for (int d = 0; d < DIM; d++)
226     {
227         cellsize_min[d] = ddbox->box_size[d]*ddbox->skew_fac[d];
228         npulse[d]       = 1;
229         if (dd->nc[d] == 1 || comm->slb_frac[d] == nullptr)
230         {
231             /* Uniform grid */
232             real cell_dx = ddbox->box_size[d]/dd->nc[d];
233             switch (setmode)
234             {
235                 case setcellsizeslbMASTER:
236                     for (int j = 0; j < dd->nc[d]+1; j++)
237                     {
238                         cell_x_master[d][j] = ddbox->box0[d] + j*cell_dx;
239                     }
240                     break;
241                 case setcellsizeslbLOCAL:
242                     comm->cell_x0[d] = ddbox->box0[d] + (dd->ci[d]  )*cell_dx;
243                     comm->cell_x1[d] = ddbox->box0[d] + (dd->ci[d]+1)*cell_dx;
244                     break;
245                 default:
246                     break;
247             }
248             real cellsize = cell_dx*ddbox->skew_fac[d];
249             while (cellsize*npulse[d] < comm->systemInfo.cutoff)
250             {
251                 npulse[d]++;
252             }
253             cellsize_min[d] = cellsize;
254         }
255         else
256         {
257             /* Statically load balanced grid */
258             /* Also when we are not doing a master distribution we determine
259              * all cell borders in a loop to obtain identical values
260              * to the master distribution case and to determine npulse.
261              */
262             gmx::ArrayRef<real> cell_x;
263             std::vector<real>   cell_x_buffer;
264             if (setmode == setcellsizeslbMASTER)
265             {
266                 cell_x = cell_x_master[d];
267             }
268             else
269             {
270                 cell_x_buffer.resize(dd->nc[d] + 1);
271                 cell_x = cell_x_buffer;
272             }
273             cell_x[0] = ddbox->box0[d];
274             for (int j = 0; j < dd->nc[d]; j++)
275             {
276                 real cell_dx  = ddbox->box_size[d]*comm->slb_frac[d][j];
277                 cell_x[j+1]   = cell_x[j] + cell_dx;
278                 real cellsize = cell_dx*ddbox->skew_fac[d];
279                 while (cellsize*npulse[d] < comm->systemInfo.cutoff &&
280                        npulse[d] < dd->nc[d]-1)
281                 {
282                     npulse[d]++;
283                 }
284                 cellsize_min[d] = std::min(cellsize_min[d], cellsize);
285             }
286             if (setmode == setcellsizeslbLOCAL)
287             {
288                 comm->cell_x0[d] = cell_x[dd->ci[d]];
289                 comm->cell_x1[d] = cell_x[dd->ci[d]+1];
290             }
291         }
292         /* The following limitation is to avoid that a cell would receive
293          * some of its own home charge groups back over the periodic boundary.
294          * Double charge groups cause trouble with the global indices.
295          */
296         if (d < ddbox->npbcdim &&
297             dd->nc[d] > 1 && npulse[d] >= dd->nc[d])
298         {
299             char error_string[STRLEN];
300
301             sprintf(error_string,
302                     "The box size in direction %c (%f) times the triclinic skew factor (%f) is too small for a cut-off of %f with %d domain decomposition cells, use 1 or more than %d %s or increase the box size in this direction",
303                     dim2char(d), ddbox->box_size[d], ddbox->skew_fac[d],
304                     comm->systemInfo.cutoff,
305                     dd->nc[d], dd->nc[d],
306                     dd->nnodes > dd->nc[d] ? "cells" : "ranks");
307
308             if (setmode == setcellsizeslbLOCAL)
309             {
310                 gmx_fatal_collective(FARGS, dd->mpi_comm_all, DDMASTER(dd),
311                                      "%s", error_string);
312             }
313             else
314             {
315                 gmx_fatal(FARGS, "%s", error_string);
316             }
317         }
318     }
319
320     if (!isDlbOn(comm))
321     {
322         copy_rvec(cellsize_min, comm->cellsize_min);
323     }
324
325     DDRankSetup &ddRankSetup = comm->ddRankSetup;
326     for (int d = 0; d < ddRankSetup.npmedecompdim; d++)
327     {
328         set_pme_maxshift(dd, &ddRankSetup.ddpme[d],
329                          comm->slb_frac[dd->dim[d]] == nullptr, ddbox,
330                          ddRankSetup.ddpme[d].slb_dim_f);
331     }
332
333     return cell_x_master;
334 }
335
336
337 static void dd_cell_sizes_dlb_root_enforce_limits(gmx_domdec_t *dd,
338                                                   int d, int dim, RowMaster *rowMaster,
339                                                   const gmx_ddbox_t *ddbox,
340                                                   gmx_bool bUniform, int64_t step, real cellsize_limit_f, int range[])
341 {
342     gmx_domdec_comm_t *comm;
343     real               halfway, cellsize_limit_f_i, region_size;
344     gmx_bool           bLastHi  = FALSE;
345     int                nrange[] = {range[0], range[1]};
346
347     region_size = rowMaster->cellFrac[range[1]] - rowMaster->cellFrac[range[0]];
348
349     GMX_ASSERT(region_size >= (range[1] - range[0])*cellsize_limit_f,
350                "The region should fit all cells at minimum size");
351
352     comm = dd->comm;
353
354     const int           ncd       = dd->nc[dim];
355
356     const bool          dimHasPbc = (dim < ddbox->npbcdim);
357
358     gmx::ArrayRef<real> cell_size = rowMaster->buf_ncd;
359
360     if (debug)
361     {
362         fprintf(debug, "enforce_limits: %d %d\n", range[0], range[1]);
363     }
364
365     /* First we need to check if the scaling does not make cells
366      * smaller than the smallest allowed size.
367      * We need to do this iteratively, since if a cell is too small,
368      * it needs to be enlarged, which makes all the other cells smaller,
369      * which could in turn make another cell smaller than allowed.
370      */
371     for (int i = range[0]; i < range[1]; i++)
372     {
373         rowMaster->isCellMin[i] = false;
374     }
375     int nmin = 0;
376     int nmin_old;
377     do
378     {
379         nmin_old = nmin;
380         /* We need the total for normalization */
381         real fac = 0;
382         for (int i = range[0]; i < range[1]; i++)
383         {
384             if (!rowMaster->isCellMin[i])
385             {
386                 fac += cell_size[i];
387             }
388         }
389         /* This condition is ensured by the assertion at the end of the loop */
390         GMX_ASSERT(fac > 0, "We cannot have 0 size to work with");
391         fac = (region_size - nmin*cellsize_limit_f)/fac; /* substracting cells already set to cellsize_limit_f */
392         /* Determine the cell boundaries */
393         for (int i = range[0]; i < range[1]; i++)
394         {
395             if (!rowMaster->isCellMin[i])
396             {
397                 cell_size[i] *= fac;
398                 if (!dimHasPbc && (i == 0 || i == dd->nc[dim] - 1))
399                 {
400                     cellsize_limit_f_i = 0;
401                 }
402                 else
403                 {
404                     cellsize_limit_f_i = cellsize_limit_f;
405                 }
406                 if (cell_size[i] < cellsize_limit_f_i)
407                 {
408                     rowMaster->isCellMin[i] = true;
409                     cell_size[i]            = cellsize_limit_f_i;
410                     nmin++;
411                 }
412             }
413             rowMaster->cellFrac[i + 1] = rowMaster->cellFrac[i] + cell_size[i];
414         }
415
416         /* This is ensured by the assertion at the beginning of this function */
417         GMX_ASSERT(nmin < range[1] - range[0], "We can not have all cells limited");
418     }
419     while (nmin > nmin_old);
420
421     const int i  = range[1] - 1;
422     cell_size[i] = rowMaster->cellFrac[i + 1] - rowMaster->cellFrac[i];
423     /* For this check we should not use DD_CELL_MARGIN,
424      * but a slightly smaller factor,
425      * since rounding could get use below the limit.
426      */
427     if (dimHasPbc &&
428         cell_size[i] < cellsize_limit_f*DD_CELL_MARGIN2/DD_CELL_MARGIN)
429     {
430         char buf[22];
431         gmx_fatal(FARGS, "step %s: the dynamic load balancing could not balance dimension %c: box size %f, triclinic skew factor %f, #cells %d, minimum cell size %f\n",
432                   gmx_step_str(step, buf),
433                   dim2char(dim), ddbox->box_size[dim], ddbox->skew_fac[dim],
434                   ncd, comm->cellsize_min[dim]);
435     }
436
437     rowMaster->dlbIsLimited = (nmin > 0) || (range[0] > 0) || (range[1] < ncd);
438
439     if (!bUniform)
440     {
441         /* Check if the boundary did not displace more than halfway
442          * each of the cells it bounds, as this could cause problems,
443          * especially when the differences between cell sizes are large.
444          * If changes are applied, they will not make cells smaller
445          * than the cut-off, as we check all the boundaries which
446          * might be affected by a change and if the old state was ok,
447          * the cells will at most be shrunk back to their old size.
448          */
449         for (int i = range[0]+1; i < range[1]; i++)
450         {
451             halfway = 0.5*(rowMaster->oldCellFrac[i] + rowMaster->oldCellFrac[i - 1]);
452             if (rowMaster->cellFrac[i] < halfway)
453             {
454                 rowMaster->cellFrac[i] = halfway;
455                 /* Check if the change also causes shifts of the next boundaries */
456                 for (int j = i + 1; j < range[1]; j++)
457                 {
458                     if (rowMaster->cellFrac[j] < rowMaster->cellFrac[j - 1] + cellsize_limit_f)
459                     {
460                         rowMaster->cellFrac[j] =  rowMaster->cellFrac[j - 1] + cellsize_limit_f;
461                     }
462                 }
463             }
464             halfway = 0.5*(rowMaster->oldCellFrac[i] + rowMaster->oldCellFrac[i + 1]);
465             if (rowMaster->cellFrac[i] > halfway)
466             {
467                 rowMaster->cellFrac[i] = halfway;
468                 /* Check if the change also causes shifts of the next boundaries */
469                 for (int j = i - 1; j >= range[0] + 1; j--)
470                 {
471                     if (rowMaster->cellFrac[j] > rowMaster->cellFrac[j + 1] - cellsize_limit_f)
472                     {
473                         rowMaster->cellFrac[j] = rowMaster->cellFrac[j + 1] - cellsize_limit_f;
474                     }
475                 }
476             }
477         }
478     }
479
480     /* nrange is defined as [lower, upper) range for new call to enforce_limits */
481     /* find highest violation of LimLo (a) and the following violation of LimHi (thus the lowest following) (b)
482      * then call enforce_limits for (oldb,a), (a,b). In the next step: (b,nexta). oldb and nexta can be the boundaries.
483      * for a and b nrange is used */
484     if (d > 0)
485     {
486         /* Take care of the staggering of the cell boundaries */
487         if (bUniform)
488         {
489             for (int i = range[0]; i < range[1]; i++)
490             {
491                 rowMaster->bounds[i].cellFracLowerMax = rowMaster->cellFrac[i];
492                 rowMaster->bounds[i].cellFracUpperMin = rowMaster->cellFrac[i + 1];
493             }
494         }
495         else
496         {
497             for (int i = range[0] + 1; i < range[1]; i++)
498             {
499                 const RowMaster::Bounds &bounds = rowMaster->bounds[i];
500
501                 bool                     bLimLo = (rowMaster->cellFrac[i] < bounds.boundMin);
502                 bool                     bLimHi = (rowMaster->cellFrac[i] > bounds.boundMax);
503                 if (bLimLo && bLimHi)
504                 {
505                     /* Both limits violated, try the best we can */
506                     /* For this case we split the original range (range) in two parts and care about the other limitiations in the next iteration. */
507                     rowMaster->cellFrac[i] = 0.5*(bounds.boundMin + bounds.boundMax);
508                     nrange[0]              = range[0];
509                     nrange[1]              = i;
510                     dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, rowMaster, ddbox, bUniform, step, cellsize_limit_f, nrange);
511
512                     nrange[0] = i;
513                     nrange[1] = range[1];
514                     dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, rowMaster, ddbox, bUniform, step, cellsize_limit_f, nrange);
515
516                     return;
517                 }
518                 else if (bLimLo)
519                 {
520                     /* rowMaster->cellFrac[i] = rowMaster->boundMin[i]; */
521                     nrange[1] = i;  /* only store violation location. There could be a LimLo violation following with an higher index */
522                     bLastHi   = FALSE;
523                 }
524                 else if (bLimHi && !bLastHi)
525                 {
526                     bLastHi = TRUE;
527                     if (nrange[1] < range[1])   /* found a LimLo before */
528                     {
529                         rowMaster->cellFrac[nrange[1]] = rowMaster->bounds[nrange[1]].boundMin;
530                         dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, rowMaster, ddbox, bUniform, step, cellsize_limit_f, nrange);
531                         nrange[0] = nrange[1];
532                     }
533                     rowMaster->cellFrac[i] = rowMaster->bounds[i].boundMax;
534                     nrange[1]              = i;
535                     dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, rowMaster, ddbox, bUniform, step, cellsize_limit_f, nrange);
536                     nrange[0] = i;
537                     nrange[1] = range[1];
538                 }
539             }
540             if (nrange[1] < range[1])   /* found last a LimLo */
541             {
542                 rowMaster->cellFrac[nrange[1]] = rowMaster->bounds[nrange[1]].boundMin;
543                 dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, rowMaster, ddbox, bUniform, step, cellsize_limit_f, nrange);
544                 nrange[0] = nrange[1];
545                 nrange[1] = range[1];
546                 dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, rowMaster, ddbox, bUniform, step, cellsize_limit_f, nrange);
547             }
548             else if (nrange[0] > range[0]) /* found at least one LimHi */
549             {
550                 dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, rowMaster, ddbox, bUniform, step, cellsize_limit_f, nrange);
551             }
552         }
553     }
554 }
555
556
557 static void set_dd_cell_sizes_dlb_root(gmx_domdec_t *dd,
558                                        int d, int dim, RowMaster *rowMaster,
559                                        const gmx_ddbox_t *ddbox,
560                                        gmx_bool bDynamicBox,
561                                        gmx_bool bUniform, int64_t step)
562 {
563     gmx_domdec_comm_t *comm    = dd->comm;
564     constexpr real     c_relax = 0.5;
565     int                range[] = { 0, 0 };
566
567     /* Convert the maximum change from the input percentage to a fraction */
568     const real          change_limit = comm->ddSettings.dlb_scale_lim*0.01;
569
570     const int           ncd          = dd->nc[dim];
571
572     const bool          bPBC = (dim < ddbox->npbcdim);
573
574     gmx::ArrayRef<real> cell_size = rowMaster->buf_ncd;
575
576     /* Store the original boundaries */
577     for (int i = 0; i < ncd + 1; i++)
578     {
579         rowMaster->oldCellFrac[i] = rowMaster->cellFrac[i];
580     }
581     if (bUniform)
582     {
583         for (int i = 0; i < ncd; i++)
584         {
585             cell_size[i] = 1.0/ncd;
586         }
587     }
588     else if (dd_load_count(comm) > 0)
589     {
590         real load_aver  = comm->load[d].sum_m/ncd;
591         real change_max = 0;
592         real load_i;
593         real change;
594         for (int i = 0; i < ncd; i++)
595         {
596             /* Determine the relative imbalance of cell i */
597             load_i         = comm->load[d].load[i*comm->load[d].nload+2];
598             real imbalance = (load_i - load_aver)/(load_aver > 0 ? load_aver : 1);
599             /* Determine the change of the cell size using underrelaxation */
600             change     = -c_relax*imbalance;
601             change_max = std::max(change_max, std::max(change, -change));
602         }
603         /* Limit the amount of scaling.
604          * We need to use the same rescaling for all cells in one row,
605          * otherwise the load balancing might not converge.
606          */
607         real sc = c_relax;
608         if (change_max > change_limit)
609         {
610             sc *= change_limit/change_max;
611         }
612         for (int i = 0; i < ncd; i++)
613         {
614             /* Determine the relative imbalance of cell i */
615             load_i         = comm->load[d].load[i*comm->load[d].nload+2];
616             real imbalance = (load_i - load_aver)/(load_aver > 0 ? load_aver : 1);
617             /* Determine the change of the cell size using underrelaxation */
618             change       = -sc*imbalance;
619             cell_size[i] = (rowMaster->cellFrac[i + 1] - rowMaster->cellFrac[i])*(1 + change);
620         }
621     }
622
623     real cellsize_limit_f  = cellsize_min_dlb(comm, d, dim)/ddbox->box_size[dim];
624     cellsize_limit_f      *= DD_CELL_MARGIN;
625     real dist_min_f_hard   = grid_jump_limit(comm, comm->systemInfo.cutoff, d)/ddbox->box_size[dim];
626     real dist_min_f        = dist_min_f_hard * DD_CELL_MARGIN;
627     if (ddbox->tric_dir[dim])
628     {
629         cellsize_limit_f /= ddbox->skew_fac[dim];
630         dist_min_f       /= ddbox->skew_fac[dim];
631     }
632     if (bDynamicBox && d > 0)
633     {
634         dist_min_f *= DD_PRES_SCALE_MARGIN;
635     }
636     if (d > 0 && !bUniform)
637     {
638         /* Make sure that the grid is not shifted too much */
639         for (int i = 1; i < ncd; i++)
640         {
641             const RowMaster::Bounds &boundsNeighbor = rowMaster->bounds[i - 1];
642             RowMaster::Bounds       &bounds         = rowMaster->bounds[i];
643
644             if (bounds.cellFracUpperMin - boundsNeighbor.cellFracLowerMax < 2 * dist_min_f_hard)
645             {
646                 gmx_incons("Inconsistent DD boundary staggering limits!");
647             }
648             bounds.boundMin = boundsNeighbor.cellFracLowerMax + dist_min_f;
649             real space       = rowMaster->cellFrac[i] - (boundsNeighbor.cellFracLowerMax + dist_min_f);
650             if (space > 0)
651             {
652                 bounds.boundMin += 0.5*space;
653             }
654             bounds.boundMax  = bounds.cellFracUpperMin - dist_min_f;
655             space            = rowMaster->cellFrac[i] - (bounds.cellFracUpperMin - dist_min_f);
656             if (space < 0)
657             {
658                 rowMaster->bounds[i].boundMax += 0.5*space;
659             }
660             if (debug)
661             {
662                 fprintf(debug,
663                         "dim %d boundary %d %.3f < %.3f < %.3f < %.3f < %.3f\n",
664                         d, i,
665                         boundsNeighbor.cellFracLowerMax + dist_min_f,
666                         bounds.boundMin, rowMaster->cellFrac[i], bounds.boundMax,
667                         bounds.cellFracUpperMin - dist_min_f);
668             }
669         }
670     }
671     range[1]                 = ncd;
672     rowMaster->cellFrac[0]   = 0;
673     rowMaster->cellFrac[ncd] = 1;
674     dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, rowMaster, ddbox, bUniform, step, cellsize_limit_f, range);
675
676
677     /* After the checks above, the cells should obey the cut-off
678      * restrictions, but it does not hurt to check.
679      */
680     for (int i = 0; i < ncd; i++)
681     {
682         if (debug)
683         {
684             fprintf(debug, "Relative bounds dim %d  cell %d: %f %f\n",
685                     dim, i, rowMaster->cellFrac[i], rowMaster->cellFrac[i + 1]);
686         }
687
688         if ((bPBC || (i != 0 && i != dd->nc[dim]-1)) &&
689             rowMaster->cellFrac[i + 1] - rowMaster->cellFrac[i] <
690             cellsize_limit_f/DD_CELL_MARGIN)
691         {
692             char buf[22];
693             fprintf(stderr,
694                     "\nWARNING step %s: direction %c, cell %d too small: %f\n",
695                     gmx_step_str(step, buf), dim2char(dim), i,
696                     (rowMaster->cellFrac[i + 1] - rowMaster->cellFrac[i])
697                     *ddbox->box_size[dim]*ddbox->skew_fac[dim]);
698         }
699     }
700
701     int pos = ncd + 1;
702     /* Store the cell boundaries of the lower dimensions at the end */
703     for (int d1 = 0; d1 < d; d1++)
704     {
705         rowMaster->cellFrac[pos++] = comm->cellsizesWithDlb[d1].fracLower;
706         rowMaster->cellFrac[pos++] = comm->cellsizesWithDlb[d1].fracUpper;
707     }
708
709     DDRankSetup &ddRankSetup = comm->ddRankSetup;
710     if (d < ddRankSetup.npmedecompdim)
711     {
712         /* The master determines the maximum shift for
713          * the coordinate communication between separate PME nodes.
714          */
715         set_pme_maxshift(dd, &ddRankSetup.ddpme[d], bUniform, ddbox, rowMaster->cellFrac.data());
716     }
717     rowMaster->cellFrac[pos++] = ddRankSetup.ddpme[0].maxshift;
718     if (d >= 1)
719     {
720         rowMaster->cellFrac[pos++] = ddRankSetup.ddpme[1].maxshift;
721     }
722 }
723
724 static void relative_to_absolute_cell_bounds(gmx_domdec_t      *dd,
725                                              const gmx_ddbox_t *ddbox,
726                                              int                dimind)
727 {
728     gmx_domdec_comm_t        *comm      = dd->comm;
729     const DDCellsizesWithDlb &cellsizes = comm->cellsizesWithDlb[dimind];
730
731     /* Set the cell dimensions */
732     int dim            = dd->dim[dimind];
733     comm->cell_x0[dim] = cellsizes.fracLower*ddbox->box_size[dim];
734     comm->cell_x1[dim] = cellsizes.fracUpper*ddbox->box_size[dim];
735     if (dim >= ddbox->nboundeddim)
736     {
737         comm->cell_x0[dim] += ddbox->box0[dim];
738         comm->cell_x1[dim] += ddbox->box0[dim];
739     }
740 }
741
742 static void distribute_dd_cell_sizes_dlb(gmx_domdec_t *dd,
743                                          int d, int dim,
744                                          gmx::ArrayRef<real> cellFracRow,
745                                          const gmx_ddbox_t *ddbox)
746 {
747     gmx_domdec_comm_t &comm = *dd->comm;
748
749 #if GMX_MPI
750     /* Each node would only need to know two fractions,
751      * but it is probably cheaper to broadcast the whole array.
752      */
753     MPI_Bcast(cellFracRow.data(), ddCellFractionBufferSize(dd, d)*sizeof(real), MPI_BYTE,
754               0, comm.mpi_comm_load[d]);
755 #endif
756     /* Copy the fractions for this dimension from the buffer */
757     comm.cellsizesWithDlb[d].fracLower = cellFracRow[dd->ci[dim]    ];
758     comm.cellsizesWithDlb[d].fracUpper = cellFracRow[dd->ci[dim] + 1];
759     /* The whole array was communicated, so set the buffer position */
760     int pos = dd->nc[dim] + 1;
761     for (int d1 = 0; d1 <= d; d1++)
762     {
763         if (d1 < d)
764         {
765             /* Copy the cell fractions of the lower dimensions */
766             comm.cellsizesWithDlb[d1].fracLower = cellFracRow[pos++];
767             comm.cellsizesWithDlb[d1].fracUpper = cellFracRow[pos++];
768         }
769         relative_to_absolute_cell_bounds(dd, ddbox, d1);
770     }
771     /* Convert the communicated shift from float to int */
772     comm.ddRankSetup.ddpme[0].maxshift = gmx::roundToInt(cellFracRow[pos++]);
773     if (d >= 1)
774     {
775         comm.ddRankSetup.ddpme[1].maxshift = gmx::roundToInt(cellFracRow[pos++]);
776     }
777 }
778
779 static void set_dd_cell_sizes_dlb_change(gmx_domdec_t *dd,
780                                          const gmx_ddbox_t *ddbox,
781                                          gmx_bool bDynamicBox,
782                                          gmx_bool bUniform, int64_t step)
783 {
784     for (int d = 0; d < dd->ndim; d++)
785     {
786         const int dim         = dd->dim[d];
787         bool      isRowMember = true;
788         bool      isRowRoot   = true;
789         for (int d1 = d; d1 < dd->ndim; d1++)
790         {
791             if (dd->ci[dd->dim[d1]] > 0)
792             {
793                 if (d1 != d)
794                 {
795                     isRowMember = false;
796                 }
797                 isRowRoot = false;
798             }
799         }
800         if (isRowMember)
801         {
802             DDCellsizesWithDlb  &cellsizes = dd->comm->cellsizesWithDlb[d];
803             gmx::ArrayRef<real>  cellFracRow;
804
805             if (isRowRoot)
806             {
807                 RowMaster *rowMaster = cellsizes.rowMaster.get();
808                 set_dd_cell_sizes_dlb_root(dd, d, dim, rowMaster,
809                                            ddbox, bDynamicBox, bUniform, step);
810                 cellFracRow = rowMaster->cellFrac;
811             }
812             else
813             {
814                 cellFracRow = cellsizes.fracRow;
815             }
816             distribute_dd_cell_sizes_dlb(dd, d, dim, cellFracRow, ddbox);
817         }
818     }
819 }
820
821 static void set_dd_cell_sizes_dlb_nochange(gmx_domdec_t      *dd,
822                                            const gmx_ddbox_t *ddbox)
823 {
824     int d;
825
826     /* This function assumes the box is static and should therefore
827      * not be called when the box has changed since the last
828      * call to dd_partition_system.
829      */
830     for (d = 0; d < dd->ndim; d++)
831     {
832         relative_to_absolute_cell_bounds(dd, ddbox, d);
833     }
834 }
835
836
837
838 static void set_dd_cell_sizes_dlb(gmx_domdec_t *dd,
839                                   const gmx_ddbox_t *ddbox, gmx_bool bDynamicBox,
840                                   gmx_bool bUniform, gmx_bool bDoDLB, int64_t step,
841                                   gmx_wallcycle_t wcycle)
842 {
843     gmx_domdec_comm_t *comm;
844     int                dim;
845
846     comm = dd->comm;
847
848     if (bDoDLB)
849     {
850         wallcycle_start(wcycle, ewcDDCOMMBOUND);
851         set_dd_cell_sizes_dlb_change(dd, ddbox, bDynamicBox, bUniform, step);
852         wallcycle_stop(wcycle, ewcDDCOMMBOUND);
853     }
854     else if (bDynamicBox)
855     {
856         set_dd_cell_sizes_dlb_nochange(dd, ddbox);
857     }
858
859     /* Set the dimensions for which no DD is used */
860     for (dim = 0; dim < DIM; dim++)
861     {
862         if (dd->nc[dim] == 1)
863         {
864             comm->cell_x0[dim] = 0;
865             comm->cell_x1[dim] = ddbox->box_size[dim];
866             if (dim >= ddbox->nboundeddim)
867             {
868                 comm->cell_x0[dim] += ddbox->box0[dim];
869                 comm->cell_x1[dim] += ddbox->box0[dim];
870             }
871         }
872     }
873 }
874
875 void set_dd_cell_sizes(gmx_domdec_t *dd,
876                        const gmx_ddbox_t *ddbox, gmx_bool bDynamicBox,
877                        gmx_bool bUniform, gmx_bool bDoDLB, int64_t step,
878                        gmx_wallcycle_t wcycle)
879 {
880     gmx_domdec_comm_t *comm = dd->comm;
881
882     /* Copy the old cell boundaries for the cg displacement check */
883     copy_rvec(comm->cell_x0, comm->old_cell_x0);
884     copy_rvec(comm->cell_x1, comm->old_cell_x1);
885
886     if (isDlbOn(comm))
887     {
888         if (DDMASTER(dd))
889         {
890             check_box_size(dd, ddbox);
891         }
892         set_dd_cell_sizes_dlb(dd, ddbox, bDynamicBox, bUniform, bDoDLB, step, wcycle);
893     }
894     else
895     {
896         ivec numPulses;
897         set_dd_cell_sizes_slb(dd, ddbox, setcellsizeslbLOCAL, numPulses);
898
899         /* Check if the change in cell size requires a different number
900          * of communication pulses and if so change the number.
901          */
902         for (int d = 0; d < dd->ndim; d++)
903         {
904             gmx_domdec_comm_dim_t &cd           = comm->cd[d];
905             int                    numPulsesDim = numPulses[dd->dim[d]];
906             if (cd.numPulses() != numPulsesDim)
907             {
908                 if (debug)
909                 {
910                     fprintf(debug, "Changing the number of halo communication pulses along dim %c from %d to %d\n",
911                             dim2char(dd->dim[d]), cd.numPulses(), numPulsesDim);
912                 }
913                 cd.ind.resize(numPulsesDim);
914             }
915         }
916     }
917
918     if (debug)
919     {
920         for (int d = 0; d < DIM; d++)
921         {
922             fprintf(debug, "cell_x[%d] %f - %f skew_fac %f\n",
923                     d, comm->cell_x0[d], comm->cell_x1[d], ddbox->skew_fac[d]);
924         }
925     }
926 }