Split off domdec distribution and cellsizes
authorBerk Hess <hess@kth.se>
Mon, 7 May 2018 15:35:15 +0000 (17:35 +0200)
committerMark Abraham <mark.j.abraham@gmail.com>
Thu, 17 May 2018 13:24:28 +0000 (15:24 +0200)
Move domain decomposition distribution and cell size related functions
to separate files.

This change is only code motion, apart from changing some macros to
constexpr or inline functions and adding some const qualifiers.

Change-Id: Ib943944b090958921428f69ecaf9e53f7e2ab224

src/gromacs/domdec/cellsizes.cpp [new file with mode: 0644]
src/gromacs/domdec/cellsizes.h [new file with mode: 0644]
src/gromacs/domdec/distribute.cpp [new file with mode: 0644]
src/gromacs/domdec/distribute.h [new file with mode: 0644]
src/gromacs/domdec/domdec.cpp
src/gromacs/domdec/domdec_internal.h
src/gromacs/domdec/utility.h

diff --git a/src/gromacs/domdec/cellsizes.cpp b/src/gromacs/domdec/cellsizes.cpp
new file mode 100644 (file)
index 0000000..2df8233
--- /dev/null
@@ -0,0 +1,938 @@
+/*
+ * This file is part of the GROMACS molecular simulation package.
+ *
+ * Copyright (c) 2018, by the GROMACS development team, led by
+ * Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
+ * and including many others, as listed in the AUTHORS file in the
+ * top-level source directory and at http://www.gromacs.org.
+ *
+ * GROMACS is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1
+ * of the License, or (at your option) any later version.
+ *
+ * GROMACS is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with GROMACS; if not, see
+ * http://www.gnu.org/licenses, or write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
+ *
+ * If you want to redistribute modifications to GROMACS, please
+ * consider that scientific software is very special. Version
+ * control is crucial - bugs must be traceable. We will be happy to
+ * consider code for inclusion in the official distribution, but
+ * derived work must not be called official GROMACS. Details are found
+ * in the README & COPYING files - if they are missing, get the
+ * official version at http://www.gromacs.org.
+ *
+ * To help us fund GROMACS development, we humbly ask that you cite
+ * the research papers on the package. Check out http://www.gromacs.org.
+ */
+/* \internal \file
+ *
+ * \brief Implements DD cell-size related functions.
+ *
+ * \author Berk Hess <hess@kth.se>
+ * \author Roland Schulz <roland.schulz@intel.com>
+ * \ingroup module_domdec
+ */
+
+#include "gmxpre.h"
+
+#include "cellsizes.h"
+
+#include "config.h"
+
+#include "gromacs/gmxlib/network.h"
+#include "gromacs/math/vec.h"
+#include "gromacs/mdtypes/commrec.h"
+#include "gromacs/utility/cstringutil.h"
+#include "gromacs/utility/fatalerror.h"
+#include "gromacs/utility/smalloc.h"
+
+#include "domdec_internal.h"
+#include "utility.h"
+
+static void set_pme_maxshift(gmx_domdec_t *dd, gmx_ddpme_t *ddpme,
+                             gmx_bool bUniform, const gmx_ddbox_t *ddbox,
+                             const real *cell_f)
+{
+    gmx_domdec_comm_t *comm;
+    int                nc, ns, s;
+    int               *xmin, *xmax;
+    real               range, pme_boundary;
+    int                sh;
+
+    comm = dd->comm;
+    nc   = dd->nc[ddpme->dim];
+    ns   = ddpme->nslab;
+
+    if (!ddpme->dim_match)
+    {
+        /* PP decomposition is not along dim: the worst situation */
+        sh = ns/2;
+    }
+    else if (ns <= 3 || (bUniform && ns == nc))
+    {
+        /* The optimal situation */
+        sh = 1;
+    }
+    else
+    {
+        /* We need to check for all pme nodes which nodes they
+         * could possibly need to communicate with.
+         */
+        xmin = ddpme->pp_min;
+        xmax = ddpme->pp_max;
+        /* Allow for atoms to be maximally 2/3 times the cut-off
+         * out of their DD cell. This is a reasonable balance between
+         * between performance and support for most charge-group/cut-off
+         * combinations.
+         */
+        range  = 2.0/3.0*comm->cutoff/ddbox->box_size[ddpme->dim];
+        /* Avoid extra communication when we are exactly at a boundary */
+        range *= 0.999;
+
+        sh = 1;
+        for (s = 0; s < ns; s++)
+        {
+            /* PME slab s spreads atoms between box frac. s/ns and (s+1)/ns */
+            pme_boundary = (real)s/ns;
+            while (sh+1 < ns &&
+                   ((s-(sh+1) >= 0 &&
+                     cell_f[xmax[s-(sh+1)   ]+1]     + range > pme_boundary) ||
+                    (s-(sh+1) <  0 &&
+                     cell_f[xmax[s-(sh+1)+ns]+1] - 1 + range > pme_boundary)))
+            {
+                sh++;
+            }
+            pme_boundary = (real)(s+1)/ns;
+            while (sh+1 < ns &&
+                   ((s+(sh+1) <  ns &&
+                     cell_f[xmin[s+(sh+1)   ]  ]     - range < pme_boundary) ||
+                    (s+(sh+1) >= ns &&
+                     cell_f[xmin[s+(sh+1)-ns]  ] + 1 - range < pme_boundary)))
+            {
+                sh++;
+            }
+        }
+    }
+
+    ddpme->maxshift = sh;
+
+    if (debug)
+    {
+        fprintf(debug, "PME slab communication range for dim %d is %d\n",
+                ddpme->dim, ddpme->maxshift);
+    }
+}
+
+static void check_box_size(gmx_domdec_t *dd, gmx_ddbox_t *ddbox)
+{
+    int d, dim;
+
+    for (d = 0; d < dd->ndim; d++)
+    {
+        dim = dd->dim[d];
+        if (dim < ddbox->nboundeddim &&
+            ddbox->box_size[dim]*ddbox->skew_fac[dim] <
+            dd->nc[dim]*dd->comm->cellsize_limit*DD_CELL_MARGIN)
+        {
+            gmx_fatal(FARGS, "The %c-size of the box (%f) times the triclinic skew factor (%f) is smaller than the number of DD cells (%d) times the smallest allowed cell size (%f)\n",
+                      dim2char(dim), ddbox->box_size[dim], ddbox->skew_fac[dim],
+                      dd->nc[dim], dd->comm->cellsize_limit);
+        }
+    }
+}
+
+real grid_jump_limit(const gmx_domdec_comm_t *comm,
+                     real                     cutoff,
+                     int                      dim_ind)
+{
+    real grid_jump_limit;
+
+    /* The distance between the boundaries of cells at distance
+     * x+-1,y+-1 or y+-1,z+-1 is limited by the cut-off restrictions
+     * and by the fact that cells should not be shifted by more than
+     * half their size, such that cg's only shift by one cell
+     * at redecomposition.
+     */
+    grid_jump_limit = comm->cellsize_limit;
+    if (!comm->bVacDLBNoLimit)
+    {
+        if (comm->bPMELoadBalDLBLimits)
+        {
+            cutoff = std::max(cutoff, comm->PMELoadBal_max_cutoff);
+        }
+        grid_jump_limit = std::max(grid_jump_limit,
+                                   cutoff/comm->cd[dim_ind].np);
+    }
+
+    return grid_jump_limit;
+}
+
+/* This function should be used for moving the domain boudaries during DLB,
+ * for obtaining the minimum cell size. It checks the initially set limit
+ * comm->cellsize_min, for bonded and initial non-bonded cut-offs,
+ * and, possibly, a longer cut-off limit set for PME load balancing.
+ */
+static real cellsize_min_dlb(gmx_domdec_comm_t *comm, int dim_ind, int dim)
+{
+    real cellsize_min;
+
+    cellsize_min = comm->cellsize_min[dim];
+
+    if (!comm->bVacDLBNoLimit)
+    {
+        /* The cut-off might have changed, e.g. by PME load balacning,
+         * from the value used to set comm->cellsize_min, so check it.
+         */
+        cellsize_min = std::max(cellsize_min, comm->cutoff/comm->cd[dim_ind].np_dlb);
+
+        if (comm->bPMELoadBalDLBLimits)
+        {
+            /* Check for the cut-off limit set by the PME load balancing */
+            cellsize_min = std::max(cellsize_min, comm->PMELoadBal_max_cutoff/comm->cd[dim_ind].np_dlb);
+        }
+    }
+
+    return cellsize_min;
+}
+
+/* Set the domain boundaries. Use for static (or no) load balancing,
+ * and also for the starting state for dynamic load balancing.
+ * setmode determine if and where the boundaries are stored, use enum above.
+ * Returns the number communication pulses in npulse.
+ */
+void set_dd_cell_sizes_slb(gmx_domdec_t *dd, const gmx_ddbox_t *ddbox,
+                           int setmode, ivec npulse)
+{
+    gmx_domdec_comm_t *comm;
+    int                d, j;
+    rvec               cellsize_min;
+    real              *cell_x, cell_dx, cellsize;
+
+    comm = dd->comm;
+
+    for (d = 0; d < DIM; d++)
+    {
+        cellsize_min[d] = ddbox->box_size[d]*ddbox->skew_fac[d];
+        npulse[d]       = 1;
+        if (dd->nc[d] == 1 || comm->slb_frac[d] == nullptr)
+        {
+            /* Uniform grid */
+            cell_dx = ddbox->box_size[d]/dd->nc[d];
+            switch (setmode)
+            {
+                case setcellsizeslbMASTER:
+                    for (j = 0; j < dd->nc[d]+1; j++)
+                    {
+                        dd->ma->cell_x[d][j] = ddbox->box0[d] + j*cell_dx;
+                    }
+                    break;
+                case setcellsizeslbLOCAL:
+                    comm->cell_x0[d] = ddbox->box0[d] + (dd->ci[d]  )*cell_dx;
+                    comm->cell_x1[d] = ddbox->box0[d] + (dd->ci[d]+1)*cell_dx;
+                    break;
+                default:
+                    break;
+            }
+            cellsize = cell_dx*ddbox->skew_fac[d];
+            while (cellsize*npulse[d] < comm->cutoff)
+            {
+                npulse[d]++;
+            }
+            cellsize_min[d] = cellsize;
+        }
+        else
+        {
+            /* Statically load balanced grid */
+            /* Also when we are not doing a master distribution we determine
+             * all cell borders in a loop to obtain identical values
+             * to the master distribution case and to determine npulse.
+             */
+            if (setmode == setcellsizeslbMASTER)
+            {
+                cell_x = dd->ma->cell_x[d];
+            }
+            else
+            {
+                snew(cell_x, dd->nc[d]+1);
+            }
+            cell_x[0] = ddbox->box0[d];
+            for (j = 0; j < dd->nc[d]; j++)
+            {
+                cell_dx     = ddbox->box_size[d]*comm->slb_frac[d][j];
+                cell_x[j+1] = cell_x[j] + cell_dx;
+                cellsize    = cell_dx*ddbox->skew_fac[d];
+                while (cellsize*npulse[d] < comm->cutoff &&
+                       npulse[d] < dd->nc[d]-1)
+                {
+                    npulse[d]++;
+                }
+                cellsize_min[d] = std::min(cellsize_min[d], cellsize);
+            }
+            if (setmode == setcellsizeslbLOCAL)
+            {
+                comm->cell_x0[d] = cell_x[dd->ci[d]];
+                comm->cell_x1[d] = cell_x[dd->ci[d]+1];
+            }
+            if (setmode != setcellsizeslbMASTER)
+            {
+                sfree(cell_x);
+            }
+        }
+        /* The following limitation is to avoid that a cell would receive
+         * some of its own home charge groups back over the periodic boundary.
+         * Double charge groups cause trouble with the global indices.
+         */
+        if (d < ddbox->npbcdim &&
+            dd->nc[d] > 1 && npulse[d] >= dd->nc[d])
+        {
+            char error_string[STRLEN];
+
+            sprintf(error_string,
+                    "The box size in direction %c (%f) times the triclinic skew factor (%f) is too small for a cut-off of %f with %d domain decomposition cells, use 1 or more than %d %s or increase the box size in this direction",
+                    dim2char(d), ddbox->box_size[d], ddbox->skew_fac[d],
+                    comm->cutoff,
+                    dd->nc[d], dd->nc[d],
+                    dd->nnodes > dd->nc[d] ? "cells" : "ranks");
+
+            if (setmode == setcellsizeslbLOCAL)
+            {
+                gmx_fatal_collective(FARGS, dd->mpi_comm_all, DDMASTER(dd),
+                                     error_string);
+            }
+            else
+            {
+                gmx_fatal(FARGS, error_string);
+            }
+        }
+    }
+
+    if (!isDlbOn(comm))
+    {
+        copy_rvec(cellsize_min, comm->cellsize_min);
+    }
+
+    for (d = 0; d < comm->npmedecompdim; d++)
+    {
+        set_pme_maxshift(dd, &comm->ddpme[d],
+                         comm->slb_frac[dd->dim[d]] == nullptr, ddbox,
+                         comm->ddpme[d].slb_dim_f);
+    }
+}
+
+
+static void dd_cell_sizes_dlb_root_enforce_limits(gmx_domdec_t *dd,
+                                                  int d, int dim, domdec_root_t *root,
+                                                  const gmx_ddbox_t *ddbox,
+                                                  gmx_bool bUniform, gmx_int64_t step, real cellsize_limit_f, int range[])
+{
+    gmx_domdec_comm_t *comm;
+    int                ncd, i, j, nmin, nmin_old;
+    gmx_bool           bLimLo, bLimHi;
+    real              *cell_size;
+    real               fac, halfway, cellsize_limit_f_i, region_size;
+    gmx_bool           bPBC, bLastHi = FALSE;
+    int                nrange[] = {range[0], range[1]};
+
+    region_size = root->cell_f[range[1]]-root->cell_f[range[0]];
+
+    comm = dd->comm;
+
+    ncd = dd->nc[dim];
+
+    bPBC = (dim < ddbox->npbcdim);
+
+    cell_size = root->buf_ncd;
+
+    if (debug)
+    {
+        fprintf(debug, "enforce_limits: %d %d\n", range[0], range[1]);
+    }
+
+    /* First we need to check if the scaling does not make cells
+     * smaller than the smallest allowed size.
+     * We need to do this iteratively, since if a cell is too small,
+     * it needs to be enlarged, which makes all the other cells smaller,
+     * which could in turn make another cell smaller than allowed.
+     */
+    for (i = range[0]; i < range[1]; i++)
+    {
+        root->bCellMin[i] = FALSE;
+    }
+    nmin = 0;
+    do
+    {
+        nmin_old = nmin;
+        /* We need the total for normalization */
+        fac = 0;
+        for (i = range[0]; i < range[1]; i++)
+        {
+            if (root->bCellMin[i] == FALSE)
+            {
+                fac += cell_size[i];
+            }
+        }
+        fac = ( region_size - nmin*cellsize_limit_f)/fac; /* substracting cells already set to cellsize_limit_f */
+        /* Determine the cell boundaries */
+        for (i = range[0]; i < range[1]; i++)
+        {
+            if (root->bCellMin[i] == FALSE)
+            {
+                cell_size[i] *= fac;
+                if (!bPBC && (i == 0 || i == dd->nc[dim] -1))
+                {
+                    cellsize_limit_f_i = 0;
+                }
+                else
+                {
+                    cellsize_limit_f_i = cellsize_limit_f;
+                }
+                if (cell_size[i] < cellsize_limit_f_i)
+                {
+                    root->bCellMin[i] = TRUE;
+                    cell_size[i]      = cellsize_limit_f_i;
+                    nmin++;
+                }
+            }
+            root->cell_f[i+1] = root->cell_f[i] + cell_size[i];
+        }
+    }
+    while (nmin > nmin_old);
+
+    i            = range[1]-1;
+    cell_size[i] = root->cell_f[i+1] - root->cell_f[i];
+    /* For this check we should not use DD_CELL_MARGIN,
+     * but a slightly smaller factor,
+     * since rounding could get use below the limit.
+     */
+    if (bPBC && cell_size[i] < cellsize_limit_f*DD_CELL_MARGIN2/DD_CELL_MARGIN)
+    {
+        char buf[22];
+        gmx_fatal(FARGS, "step %s: the dynamic load balancing could not balance dimension %c: box size %f, triclinic skew factor %f, #cells %d, minimum cell size %f\n",
+                  gmx_step_str(step, buf),
+                  dim2char(dim), ddbox->box_size[dim], ddbox->skew_fac[dim],
+                  ncd, comm->cellsize_min[dim]);
+    }
+
+    root->bLimited = (nmin > 0) || (range[0] > 0) || (range[1] < ncd);
+
+    if (!bUniform)
+    {
+        /* Check if the boundary did not displace more than halfway
+         * each of the cells it bounds, as this could cause problems,
+         * especially when the differences between cell sizes are large.
+         * If changes are applied, they will not make cells smaller
+         * than the cut-off, as we check all the boundaries which
+         * might be affected by a change and if the old state was ok,
+         * the cells will at most be shrunk back to their old size.
+         */
+        for (i = range[0]+1; i < range[1]; i++)
+        {
+            halfway = 0.5*(root->old_cell_f[i] + root->old_cell_f[i-1]);
+            if (root->cell_f[i] < halfway)
+            {
+                root->cell_f[i] = halfway;
+                /* Check if the change also causes shifts of the next boundaries */
+                for (j = i+1; j < range[1]; j++)
+                {
+                    if (root->cell_f[j] < root->cell_f[j-1] + cellsize_limit_f)
+                    {
+                        root->cell_f[j] =  root->cell_f[j-1] + cellsize_limit_f;
+                    }
+                }
+            }
+            halfway = 0.5*(root->old_cell_f[i] + root->old_cell_f[i+1]);
+            if (root->cell_f[i] > halfway)
+            {
+                root->cell_f[i] = halfway;
+                /* Check if the change also causes shifts of the next boundaries */
+                for (j = i-1; j >= range[0]+1; j--)
+                {
+                    if (root->cell_f[j] > root->cell_f[j+1] - cellsize_limit_f)
+                    {
+                        root->cell_f[j] = root->cell_f[j+1] - cellsize_limit_f;
+                    }
+                }
+            }
+        }
+    }
+
+    /* nrange is defined as [lower, upper) range for new call to enforce_limits */
+    /* find highest violation of LimLo (a) and the following violation of LimHi (thus the lowest following) (b)
+     * then call enforce_limits for (oldb,a), (a,b). In the next step: (b,nexta). oldb and nexta can be the boundaries.
+     * for a and b nrange is used */
+    if (d > 0)
+    {
+        /* Take care of the staggering of the cell boundaries */
+        if (bUniform)
+        {
+            for (i = range[0]; i < range[1]; i++)
+            {
+                root->cell_f_max0[i] = root->cell_f[i];
+                root->cell_f_min1[i] = root->cell_f[i+1];
+            }
+        }
+        else
+        {
+            for (i = range[0]+1; i < range[1]; i++)
+            {
+                bLimLo = (root->cell_f[i] < root->bound_min[i]);
+                bLimHi = (root->cell_f[i] > root->bound_max[i]);
+                if (bLimLo && bLimHi)
+                {
+                    /* Both limits violated, try the best we can */
+                    /* For this case we split the original range (range) in two parts and care about the other limitiations in the next iteration. */
+                    root->cell_f[i] = 0.5*(root->bound_min[i] + root->bound_max[i]);
+                    nrange[0]       = range[0];
+                    nrange[1]       = i;
+                    dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, root, ddbox, bUniform, step, cellsize_limit_f, nrange);
+
+                    nrange[0] = i;
+                    nrange[1] = range[1];
+                    dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, root, ddbox, bUniform, step, cellsize_limit_f, nrange);
+
+                    return;
+                }
+                else if (bLimLo)
+                {
+                    /* root->cell_f[i] = root->bound_min[i]; */
+                    nrange[1] = i;  /* only store violation location. There could be a LimLo violation following with an higher index */
+                    bLastHi   = FALSE;
+                }
+                else if (bLimHi && !bLastHi)
+                {
+                    bLastHi = TRUE;
+                    if (nrange[1] < range[1])   /* found a LimLo before */
+                    {
+                        root->cell_f[nrange[1]] = root->bound_min[nrange[1]];
+                        dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, root, ddbox, bUniform, step, cellsize_limit_f, nrange);
+                        nrange[0] = nrange[1];
+                    }
+                    root->cell_f[i] = root->bound_max[i];
+                    nrange[1]       = i;
+                    dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, root, ddbox, bUniform, step, cellsize_limit_f, nrange);
+                    nrange[0] = i;
+                    nrange[1] = range[1];
+                }
+            }
+            if (nrange[1] < range[1])   /* found last a LimLo */
+            {
+                root->cell_f[nrange[1]] = root->bound_min[nrange[1]];
+                dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, root, ddbox, bUniform, step, cellsize_limit_f, nrange);
+                nrange[0] = nrange[1];
+                nrange[1] = range[1];
+                dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, root, ddbox, bUniform, step, cellsize_limit_f, nrange);
+            }
+            else if (nrange[0] > range[0]) /* found at least one LimHi */
+            {
+                dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, root, ddbox, bUniform, step, cellsize_limit_f, nrange);
+            }
+        }
+    }
+}
+
+
+static void set_dd_cell_sizes_dlb_root(gmx_domdec_t *dd,
+                                       int d, int dim, domdec_root_t *root,
+                                       const gmx_ddbox_t *ddbox,
+                                       gmx_bool bDynamicBox,
+                                       gmx_bool bUniform, gmx_int64_t step)
+{
+    gmx_domdec_comm_t *comm;
+    int                ncd, d1, i, pos;
+    real              *cell_size;
+    real               load_aver, load_i, imbalance, change, change_max, sc;
+    real               cellsize_limit_f, dist_min_f, dist_min_f_hard, space;
+    real               change_limit;
+    real               relax = 0.5;
+    gmx_bool           bPBC;
+    int                range[] = { 0, 0 };
+
+    comm = dd->comm;
+
+    /* Convert the maximum change from the input percentage to a fraction */
+    change_limit = comm->dlb_scale_lim*0.01;
+
+    ncd = dd->nc[dim];
+
+    bPBC = (dim < ddbox->npbcdim);
+
+    cell_size = root->buf_ncd;
+
+    /* Store the original boundaries */
+    for (i = 0; i < ncd+1; i++)
+    {
+        root->old_cell_f[i] = root->cell_f[i];
+    }
+    if (bUniform)
+    {
+        for (i = 0; i < ncd; i++)
+        {
+            cell_size[i] = 1.0/ncd;
+        }
+    }
+    else if (dd_load_count(comm) > 0)
+    {
+        load_aver  = comm->load[d].sum_m/ncd;
+        change_max = 0;
+        for (i = 0; i < ncd; i++)
+        {
+            /* Determine the relative imbalance of cell i */
+            load_i    = comm->load[d].load[i*comm->load[d].nload+2];
+            imbalance = (load_i - load_aver)/(load_aver > 0 ? load_aver : 1);
+            /* Determine the change of the cell size using underrelaxation */
+            change     = -relax*imbalance;
+            change_max = std::max(change_max, std::max(change, -change));
+        }
+        /* Limit the amount of scaling.
+         * We need to use the same rescaling for all cells in one row,
+         * otherwise the load balancing might not converge.
+         */
+        sc = relax;
+        if (change_max > change_limit)
+        {
+            sc *= change_limit/change_max;
+        }
+        for (i = 0; i < ncd; i++)
+        {
+            /* Determine the relative imbalance of cell i */
+            load_i    = comm->load[d].load[i*comm->load[d].nload+2];
+            imbalance = (load_i - load_aver)/(load_aver > 0 ? load_aver : 1);
+            /* Determine the change of the cell size using underrelaxation */
+            change       = -sc*imbalance;
+            cell_size[i] = (root->cell_f[i+1]-root->cell_f[i])*(1 + change);
+        }
+    }
+
+    cellsize_limit_f  = cellsize_min_dlb(comm, d, dim)/ddbox->box_size[dim];
+    cellsize_limit_f *= DD_CELL_MARGIN;
+    dist_min_f_hard   = grid_jump_limit(comm, comm->cutoff, d)/ddbox->box_size[dim];
+    dist_min_f        = dist_min_f_hard * DD_CELL_MARGIN;
+    if (ddbox->tric_dir[dim])
+    {
+        cellsize_limit_f /= ddbox->skew_fac[dim];
+        dist_min_f       /= ddbox->skew_fac[dim];
+    }
+    if (bDynamicBox && d > 0)
+    {
+        dist_min_f *= DD_PRES_SCALE_MARGIN;
+    }
+    if (d > 0 && !bUniform)
+    {
+        /* Make sure that the grid is not shifted too much */
+        for (i = 1; i < ncd; i++)
+        {
+            if (root->cell_f_min1[i] - root->cell_f_max0[i-1] < 2 * dist_min_f_hard)
+            {
+                gmx_incons("Inconsistent DD boundary staggering limits!");
+            }
+            root->bound_min[i] = root->cell_f_max0[i-1] + dist_min_f;
+            space              = root->cell_f[i] - (root->cell_f_max0[i-1] + dist_min_f);
+            if (space > 0)
+            {
+                root->bound_min[i] += 0.5*space;
+            }
+            root->bound_max[i] = root->cell_f_min1[i] - dist_min_f;
+            space              = root->cell_f[i] - (root->cell_f_min1[i] - dist_min_f);
+            if (space < 0)
+            {
+                root->bound_max[i] += 0.5*space;
+            }
+            if (debug)
+            {
+                fprintf(debug,
+                        "dim %d boundary %d %.3f < %.3f < %.3f < %.3f < %.3f\n",
+                        d, i,
+                        root->cell_f_max0[i-1] + dist_min_f,
+                        root->bound_min[i], root->cell_f[i], root->bound_max[i],
+                        root->cell_f_min1[i] - dist_min_f);
+            }
+        }
+    }
+    range[1]          = ncd;
+    root->cell_f[0]   = 0;
+    root->cell_f[ncd] = 1;
+    dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, root, ddbox, bUniform, step, cellsize_limit_f, range);
+
+
+    /* After the checks above, the cells should obey the cut-off
+     * restrictions, but it does not hurt to check.
+     */
+    for (i = 0; i < ncd; i++)
+    {
+        if (debug)
+        {
+            fprintf(debug, "Relative bounds dim %d  cell %d: %f %f\n",
+                    dim, i, root->cell_f[i], root->cell_f[i+1]);
+        }
+
+        if ((bPBC || (i != 0 && i != dd->nc[dim]-1)) &&
+            root->cell_f[i+1] - root->cell_f[i] <
+            cellsize_limit_f/DD_CELL_MARGIN)
+        {
+            char buf[22];
+            fprintf(stderr,
+                    "\nWARNING step %s: direction %c, cell %d too small: %f\n",
+                    gmx_step_str(step, buf), dim2char(dim), i,
+                    (root->cell_f[i+1] - root->cell_f[i])
+                    *ddbox->box_size[dim]*ddbox->skew_fac[dim]);
+        }
+    }
+
+    pos = ncd + 1;
+    /* Store the cell boundaries of the lower dimensions at the end */
+    for (d1 = 0; d1 < d; d1++)
+    {
+        root->cell_f[pos++] = comm->cell_f0[d1];
+        root->cell_f[pos++] = comm->cell_f1[d1];
+    }
+
+    if (d < comm->npmedecompdim)
+    {
+        /* The master determines the maximum shift for
+         * the coordinate communication between separate PME nodes.
+         */
+        set_pme_maxshift(dd, &comm->ddpme[d], bUniform, ddbox, root->cell_f);
+    }
+    root->cell_f[pos++] = comm->ddpme[0].maxshift;
+    if (d >= 1)
+    {
+        root->cell_f[pos++] = comm->ddpme[1].maxshift;
+    }
+}
+
+static void relative_to_absolute_cell_bounds(gmx_domdec_t      *dd,
+                                             const gmx_ddbox_t *ddbox,
+                                             int                dimind)
+{
+    gmx_domdec_comm_t *comm;
+    int                dim;
+
+    comm = dd->comm;
+
+    /* Set the cell dimensions */
+    dim                = dd->dim[dimind];
+    comm->cell_x0[dim] = comm->cell_f0[dimind]*ddbox->box_size[dim];
+    comm->cell_x1[dim] = comm->cell_f1[dimind]*ddbox->box_size[dim];
+    if (dim >= ddbox->nboundeddim)
+    {
+        comm->cell_x0[dim] += ddbox->box0[dim];
+        comm->cell_x1[dim] += ddbox->box0[dim];
+    }
+}
+
+static void distribute_dd_cell_sizes_dlb(gmx_domdec_t *dd,
+                                         int d, int dim, real *cell_f_row,
+                                         const gmx_ddbox_t *ddbox)
+{
+    gmx_domdec_comm_t *comm;
+    int                d1, pos;
+
+    comm = dd->comm;
+
+#if GMX_MPI
+    /* Each node would only need to know two fractions,
+     * but it is probably cheaper to broadcast the whole array.
+     */
+    MPI_Bcast(cell_f_row, ddCellFractionBufferSize(dd, d)*sizeof(real), MPI_BYTE,
+              0, comm->mpi_comm_load[d]);
+#endif
+    /* Copy the fractions for this dimension from the buffer */
+    comm->cell_f0[d] = cell_f_row[dd->ci[dim]  ];
+    comm->cell_f1[d] = cell_f_row[dd->ci[dim]+1];
+    /* The whole array was communicated, so set the buffer position */
+    pos = dd->nc[dim] + 1;
+    for (d1 = 0; d1 <= d; d1++)
+    {
+        if (d1 < d)
+        {
+            /* Copy the cell fractions of the lower dimensions */
+            comm->cell_f0[d1] = cell_f_row[pos++];
+            comm->cell_f1[d1] = cell_f_row[pos++];
+        }
+        relative_to_absolute_cell_bounds(dd, ddbox, d1);
+    }
+    /* Convert the communicated shift from float to int */
+    comm->ddpme[0].maxshift = (int)(cell_f_row[pos++] + 0.5);
+    if (d >= 1)
+    {
+        comm->ddpme[1].maxshift = (int)(cell_f_row[pos++] + 0.5);
+    }
+}
+
+static void set_dd_cell_sizes_dlb_change(gmx_domdec_t *dd,
+                                         const gmx_ddbox_t *ddbox,
+                                         gmx_bool bDynamicBox,
+                                         gmx_bool bUniform, gmx_int64_t step)
+{
+    gmx_domdec_comm_t *comm;
+    int                d, dim, d1;
+    gmx_bool           bRowMember, bRowRoot;
+    real              *cell_f_row;
+
+    comm = dd->comm;
+
+    for (d = 0; d < dd->ndim; d++)
+    {
+        dim        = dd->dim[d];
+        bRowMember = TRUE;
+        bRowRoot   = TRUE;
+        for (d1 = d; d1 < dd->ndim; d1++)
+        {
+            if (dd->ci[dd->dim[d1]] > 0)
+            {
+                if (d1 != d)
+                {
+                    bRowMember = FALSE;
+                }
+                bRowRoot = FALSE;
+            }
+        }
+        if (bRowMember)
+        {
+            if (bRowRoot)
+            {
+                set_dd_cell_sizes_dlb_root(dd, d, dim, comm->root[d],
+                                           ddbox, bDynamicBox, bUniform, step);
+                cell_f_row = comm->root[d]->cell_f;
+            }
+            else
+            {
+                cell_f_row = comm->cell_f_row;
+            }
+            distribute_dd_cell_sizes_dlb(dd, d, dim, cell_f_row, ddbox);
+        }
+    }
+}
+
+static void set_dd_cell_sizes_dlb_nochange(gmx_domdec_t      *dd,
+                                           const gmx_ddbox_t *ddbox)
+{
+    int d;
+
+    /* This function assumes the box is static and should therefore
+     * not be called when the box has changed since the last
+     * call to dd_partition_system.
+     */
+    for (d = 0; d < dd->ndim; d++)
+    {
+        relative_to_absolute_cell_bounds(dd, ddbox, d);
+    }
+}
+
+
+
+static void set_dd_cell_sizes_dlb(gmx_domdec_t *dd,
+                                  const gmx_ddbox_t *ddbox, gmx_bool bDynamicBox,
+                                  gmx_bool bUniform, gmx_bool bDoDLB, gmx_int64_t step,
+                                  gmx_wallcycle_t wcycle)
+{
+    gmx_domdec_comm_t *comm;
+    int                dim;
+
+    comm = dd->comm;
+
+    if (bDoDLB)
+    {
+        wallcycle_start(wcycle, ewcDDCOMMBOUND);
+        set_dd_cell_sizes_dlb_change(dd, ddbox, bDynamicBox, bUniform, step);
+        wallcycle_stop(wcycle, ewcDDCOMMBOUND);
+    }
+    else if (bDynamicBox)
+    {
+        set_dd_cell_sizes_dlb_nochange(dd, ddbox);
+    }
+
+    /* Set the dimensions for which no DD is used */
+    for (dim = 0; dim < DIM; dim++)
+    {
+        if (dd->nc[dim] == 1)
+        {
+            comm->cell_x0[dim] = 0;
+            comm->cell_x1[dim] = ddbox->box_size[dim];
+            if (dim >= ddbox->nboundeddim)
+            {
+                comm->cell_x0[dim] += ddbox->box0[dim];
+                comm->cell_x1[dim] += ddbox->box0[dim];
+            }
+        }
+    }
+}
+
+static void realloc_comm_ind(gmx_domdec_t *dd, ivec npulse)
+{
+    int                    d, np, i;
+    gmx_domdec_comm_dim_t *cd;
+
+    for (d = 0; d < dd->ndim; d++)
+    {
+        cd = &dd->comm->cd[d];
+        np = npulse[dd->dim[d]];
+        if (np > cd->np_nalloc)
+        {
+            if (debug)
+            {
+                fprintf(debug, "(Re)allocing cd for %c to %d pulses\n",
+                        dim2char(dd->dim[d]), np);
+            }
+            if (DDMASTER(dd) && cd->np_nalloc > 0)
+            {
+                fprintf(stderr, "\nIncreasing the number of cell to communicate in dimension %c to %d for the first time\n", dim2char(dd->dim[d]), np);
+            }
+            srenew(cd->ind, np);
+            for (i = cd->np_nalloc; i < np; i++)
+            {
+                cd->ind[i].index  = nullptr;
+                cd->ind[i].nalloc = 0;
+            }
+            cd->np_nalloc = np;
+        }
+        cd->np = np;
+    }
+}
+
+void set_dd_cell_sizes(gmx_domdec_t *dd,
+                       gmx_ddbox_t *ddbox, gmx_bool bDynamicBox,
+                       gmx_bool bUniform, gmx_bool bDoDLB, gmx_int64_t step,
+                       gmx_wallcycle_t wcycle)
+{
+    gmx_domdec_comm_t *comm;
+    int                d;
+    ivec               npulse;
+
+    comm = dd->comm;
+
+    /* Copy the old cell boundaries for the cg displacement check */
+    copy_rvec(comm->cell_x0, comm->old_cell_x0);
+    copy_rvec(comm->cell_x1, comm->old_cell_x1);
+
+    if (isDlbOn(comm))
+    {
+        if (DDMASTER(dd))
+        {
+            check_box_size(dd, ddbox);
+        }
+        set_dd_cell_sizes_dlb(dd, ddbox, bDynamicBox, bUniform, bDoDLB, step, wcycle);
+    }
+    else
+    {
+        set_dd_cell_sizes_slb(dd, ddbox, setcellsizeslbLOCAL, npulse);
+        realloc_comm_ind(dd, npulse);
+    }
+
+    if (debug)
+    {
+        for (d = 0; d < DIM; d++)
+        {
+            fprintf(debug, "cell_x[%d] %f - %f skew_fac %f\n",
+                    d, comm->cell_x0[d], comm->cell_x1[d], ddbox->skew_fac[d]);
+        }
+    }
+}
diff --git a/src/gromacs/domdec/cellsizes.h b/src/gromacs/domdec/cellsizes.h
new file mode 100644 (file)
index 0000000..0ec3d86
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * This file is part of the GROMACS molecular simulation package.
+ *
+ * Copyright (c) 2018, by the GROMACS development team, led by
+ * Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
+ * and including many others, as listed in the AUTHORS file in the
+ * top-level source directory and at http://www.gromacs.org.
+ *
+ * GROMACS is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1
+ * of the License, or (at your option) any later version.
+ *
+ * GROMACS is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with GROMACS; if not, see
+ * http://www.gnu.org/licenses, or write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
+ *
+ * If you want to redistribute modifications to GROMACS, please
+ * consider that scientific software is very special. Version
+ * control is crucial - bugs must be traceable. We will be happy to
+ * consider code for inclusion in the official distribution, but
+ * derived work must not be called official GROMACS. Details are found
+ * in the README & COPYING files - if they are missing, get the
+ * official version at http://www.gromacs.org.
+ *
+ * To help us fund GROMACS development, we humbly ask that you cite
+ * the research papers on the package. Check out http://www.gromacs.org.
+ */
+/*! \internal \file
+ *
+ * \brief Declares DD cell-size related functions.
+ *
+ * \author Berk Hess <hess@kth.se>
+ * \ingroup module_domdec
+ */
+
+#ifndef GMX_DOMDEC_DOMDEC_CELLSIZES_H
+#define GMX_DOMDEC_DOMDEC_CELLSIZES_H
+
+#include "gromacs/math/vectypes.h"
+#include "gromacs/timing/wallcycle.h"
+
+struct gmx_ddbox_t;
+struct gmx_domdec_comm_t;
+struct gmx_domdec_t;
+
+/*! \brief Options for setting up a regular, possibly static load balanced, cell grid geometry */
+enum
+{
+    setcellsizeslbLOCAL,      //!< Set cell sizes locally on each rank
+    setcellsizeslbMASTER,     //!< Set cell sizes on master rank only
+    setcellsizeslbPULSE_ONLY  //!< Only set the communication pulses, not the cell sizes
+};
+
+/*! \brief Returns the minimum allowed distance between lower and upper bounds of zones along dimension dim_ind */
+real grid_jump_limit(const gmx_domdec_comm_t *comm,
+                     real                     cutoff,
+                     int                      dim_ind);
+
+/*! \brief Sets up an initial, non-staggered grid geometry, possibly using static load balancing */
+void set_dd_cell_sizes_slb(gmx_domdec_t *dd, const gmx_ddbox_t *ddbox,
+                           int setmode, ivec npulse);
+
+/*! \brief General cell size adjustment, possibly applying dynamic load balancing */
+void set_dd_cell_sizes(gmx_domdec_t *dd,
+                       gmx_ddbox_t *ddbox, gmx_bool bDynamicBox,
+                       gmx_bool bUniform, gmx_bool bDoDLB, gmx_int64_t step,
+                       gmx_wallcycle_t wcycle);
+
+#endif
diff --git a/src/gromacs/domdec/distribute.cpp b/src/gromacs/domdec/distribute.cpp
new file mode 100644 (file)
index 0000000..68bbee2
--- /dev/null
@@ -0,0 +1,581 @@
+/*
+ * This file is part of the GROMACS molecular simulation package.
+ *
+ * Copyright (c) 2018, by the GROMACS development team, led by
+ * Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
+ * and including many others, as listed in the AUTHORS file in the
+ * top-level source directory and at http://www.gromacs.org.
+ *
+ * GROMACS is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1
+ * of the License, or (at your option) any later version.
+ *
+ * GROMACS is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with GROMACS; if not, see
+ * http://www.gnu.org/licenses, or write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
+ *
+ * If you want to redistribute modifications to GROMACS, please
+ * consider that scientific software is very special. Version
+ * control is crucial - bugs must be traceable. We will be happy to
+ * consider code for inclusion in the official distribution, but
+ * derived work must not be called official GROMACS. Details are found
+ * in the README & COPYING files - if they are missing, get the
+ * official version at http://www.gromacs.org.
+ *
+ * To help us fund GROMACS development, we humbly ask that you cite
+ * the research papers on the package. Check out http://www.gromacs.org.
+ */
+/* \internal \file
+ *
+ * \brief Implements atom distribution functions.
+ *
+ * \author Berk Hess <hess@kth.se>
+ * \ingroup module_domdec
+ */
+
+#include "gmxpre.h"
+
+#include "distribute.h"
+
+#include "config.h"
+
+#include "gromacs/domdec/domdec_network.h"
+#include "gromacs/math/vec.h"
+#include "gromacs/mdtypes/commrec.h"
+#include "gromacs/mdtypes/df_history.h"
+#include "gromacs/mdtypes/state.h"
+#include "gromacs/utility/fatalerror.h"
+#include "gromacs/utility/smalloc.h"
+
+#include "cellsizes.h"
+#include "domdec_internal.h"
+#include "utility.h"
+
+void get_commbuffer_counts(gmx_domdec_t  *dd,
+                           int          **counts,
+                           int          **disps)
+{
+    gmx_domdec_master_t *ma;
+    int                  n;
+
+    ma = dd->ma;
+
+    /* Make the rvec count and displacment arrays */
+    *counts  = ma->ibuf;
+    *disps   = ma->ibuf + dd->nnodes;
+    for (n = 0; n < dd->nnodes; n++)
+    {
+        (*counts)[n] = ma->nat[n]*sizeof(rvec);
+        (*disps)[n]  = (n == 0 ? 0 : (*disps)[n-1] + (*counts)[n-1]);
+    }
+}
+
+static void dd_distribute_vec_sendrecv(gmx_domdec_t *dd, const t_block *cgs,
+                                       const rvec *v, rvec *lv)
+{
+    gmx_domdec_master_t *ma;
+    int                  n, i, c, a, nalloc = 0;
+    rvec                *buf = nullptr;
+
+    if (DDMASTER(dd))
+    {
+        ma  = dd->ma;
+
+        for (n = 0; n < dd->nnodes; n++)
+        {
+            if (n != dd->rank)
+            {
+                if (ma->nat[n] > nalloc)
+                {
+                    nalloc = over_alloc_dd(ma->nat[n]);
+                    srenew(buf, nalloc);
+                }
+                /* Use lv as a temporary buffer */
+                a = 0;
+                for (i = ma->index[n]; i < ma->index[n+1]; i++)
+                {
+                    for (c = cgs->index[ma->cg[i]]; c < cgs->index[ma->cg[i]+1]; c++)
+                    {
+                        copy_rvec(v[c], buf[a++]);
+                    }
+                }
+                if (a != ma->nat[n])
+                {
+                    gmx_fatal(FARGS, "Internal error a (%d) != nat (%d)",
+                              a, ma->nat[n]);
+                }
+
+#if GMX_MPI
+                MPI_Send(buf, ma->nat[n]*sizeof(rvec), MPI_BYTE,
+                         n, n, dd->mpi_comm_all);
+#endif
+            }
+        }
+        sfree(buf);
+        n = dd->masterrank;
+        a = 0;
+        for (i = ma->index[n]; i < ma->index[n+1]; i++)
+        {
+            for (c = cgs->index[ma->cg[i]]; c < cgs->index[ma->cg[i]+1]; c++)
+            {
+                copy_rvec(v[c], lv[a++]);
+            }
+        }
+    }
+    else
+    {
+#if GMX_MPI
+        MPI_Recv(lv, dd->nat_home*sizeof(rvec), MPI_BYTE, dd->masterrank,
+                 MPI_ANY_TAG, dd->mpi_comm_all, MPI_STATUS_IGNORE);
+#endif
+    }
+}
+
+static void dd_distribute_vec_scatterv(gmx_domdec_t *dd, const t_block *cgs,
+                                       const rvec *v, rvec *lv)
+{
+    gmx_domdec_master_t *ma;
+    int                 *scounts = nullptr, *disps = nullptr;
+    int                  n, i, c, a;
+    rvec                *buf = nullptr;
+
+    if (DDMASTER(dd))
+    {
+        ma  = dd->ma;
+
+        get_commbuffer_counts(dd, &scounts, &disps);
+
+        buf = ma->vbuf;
+        a   = 0;
+        for (n = 0; n < dd->nnodes; n++)
+        {
+            for (i = ma->index[n]; i < ma->index[n+1]; i++)
+            {
+                for (c = cgs->index[ma->cg[i]]; c < cgs->index[ma->cg[i]+1]; c++)
+                {
+                    copy_rvec(v[c], buf[a++]);
+                }
+            }
+        }
+    }
+
+    dd_scatterv(dd, scounts, disps, buf, dd->nat_home*sizeof(rvec), lv);
+}
+
+static void dd_distribute_vec(gmx_domdec_t *dd, const t_block *cgs,
+                              const rvec *v, rvec *lv)
+{
+    if (dd->nnodes <= c_maxNumRanksUseSendRecvForScatterAndGather)
+    {
+        dd_distribute_vec_sendrecv(dd, cgs, v, lv);
+    }
+    else
+    {
+        dd_distribute_vec_scatterv(dd, cgs, v, lv);
+    }
+}
+
+static void dd_distribute_dfhist(gmx_domdec_t *dd, df_history_t *dfhist)
+{
+    if (dfhist == nullptr)
+    {
+        return;
+    }
+
+    dd_bcast(dd, sizeof(int), &dfhist->bEquil);
+    dd_bcast(dd, sizeof(int), &dfhist->nlambda);
+    dd_bcast(dd, sizeof(real), &dfhist->wl_delta);
+
+    if (dfhist->nlambda > 0)
+    {
+        int nlam = dfhist->nlambda;
+        dd_bcast(dd, sizeof(int)*nlam, dfhist->n_at_lam);
+        dd_bcast(dd, sizeof(real)*nlam, dfhist->wl_histo);
+        dd_bcast(dd, sizeof(real)*nlam, dfhist->sum_weights);
+        dd_bcast(dd, sizeof(real)*nlam, dfhist->sum_dg);
+        dd_bcast(dd, sizeof(real)*nlam, dfhist->sum_minvar);
+        dd_bcast(dd, sizeof(real)*nlam, dfhist->sum_variance);
+
+        for (int i = 0; i < nlam; i++)
+        {
+            dd_bcast(dd, sizeof(real)*nlam, dfhist->accum_p[i]);
+            dd_bcast(dd, sizeof(real)*nlam, dfhist->accum_m[i]);
+            dd_bcast(dd, sizeof(real)*nlam, dfhist->accum_p2[i]);
+            dd_bcast(dd, sizeof(real)*nlam, dfhist->accum_m2[i]);
+            dd_bcast(dd, sizeof(real)*nlam, dfhist->Tij[i]);
+            dd_bcast(dd, sizeof(real)*nlam, dfhist->Tij_empirical[i]);
+        }
+    }
+}
+
+static void dd_distribute_state(gmx_domdec_t *dd, const t_block *cgs,
+                                const t_state *state, t_state *state_local,
+                                PaddedRVecVector *f)
+{
+    int nh = state_local->nhchainlength;
+
+    if (DDMASTER(dd))
+    {
+        GMX_RELEASE_ASSERT(state->nhchainlength == nh, "The global and local Nose-Hoover chain lengths should match");
+
+        for (int i = 0; i < efptNR; i++)
+        {
+            state_local->lambda[i] = state->lambda[i];
+        }
+        state_local->fep_state = state->fep_state;
+        state_local->veta      = state->veta;
+        state_local->vol0      = state->vol0;
+        copy_mat(state->box, state_local->box);
+        copy_mat(state->box_rel, state_local->box_rel);
+        copy_mat(state->boxv, state_local->boxv);
+        copy_mat(state->svir_prev, state_local->svir_prev);
+        copy_mat(state->fvir_prev, state_local->fvir_prev);
+        if (state->dfhist != nullptr)
+        {
+            copy_df_history(state_local->dfhist, state->dfhist);
+        }
+        for (int i = 0; i < state_local->ngtc; i++)
+        {
+            for (int j = 0; j < nh; j++)
+            {
+                state_local->nosehoover_xi[i*nh+j]        = state->nosehoover_xi[i*nh+j];
+                state_local->nosehoover_vxi[i*nh+j]       = state->nosehoover_vxi[i*nh+j];
+            }
+            state_local->therm_integral[i] = state->therm_integral[i];
+        }
+        for (int i = 0; i < state_local->nnhpres; i++)
+        {
+            for (int j = 0; j < nh; j++)
+            {
+                state_local->nhpres_xi[i*nh+j]        = state->nhpres_xi[i*nh+j];
+                state_local->nhpres_vxi[i*nh+j]       = state->nhpres_vxi[i*nh+j];
+            }
+        }
+        state_local->baros_integral = state->baros_integral;
+    }
+    dd_bcast(dd, ((efptNR)*sizeof(real)), state_local->lambda.data());
+    dd_bcast(dd, sizeof(int), &state_local->fep_state);
+    dd_bcast(dd, sizeof(real), &state_local->veta);
+    dd_bcast(dd, sizeof(real), &state_local->vol0);
+    dd_bcast(dd, sizeof(state_local->box), state_local->box);
+    dd_bcast(dd, sizeof(state_local->box_rel), state_local->box_rel);
+    dd_bcast(dd, sizeof(state_local->boxv), state_local->boxv);
+    dd_bcast(dd, sizeof(state_local->svir_prev), state_local->svir_prev);
+    dd_bcast(dd, sizeof(state_local->fvir_prev), state_local->fvir_prev);
+    dd_bcast(dd, ((state_local->ngtc*nh)*sizeof(double)), state_local->nosehoover_xi.data());
+    dd_bcast(dd, ((state_local->ngtc*nh)*sizeof(double)), state_local->nosehoover_vxi.data());
+    dd_bcast(dd, state_local->ngtc*sizeof(double), state_local->therm_integral.data());
+    dd_bcast(dd, ((state_local->nnhpres*nh)*sizeof(double)), state_local->nhpres_xi.data());
+    dd_bcast(dd, ((state_local->nnhpres*nh)*sizeof(double)), state_local->nhpres_vxi.data());
+
+    /* communicate df_history -- required for restarting from checkpoint */
+    dd_distribute_dfhist(dd, state_local->dfhist);
+
+    dd_resize_state(state_local, f, dd->nat_home);
+
+    if (state_local->flags & (1 << estX))
+    {
+        const rvec *xGlobal = (DDMASTER(dd) ? as_rvec_array(state->x.data()) : nullptr);
+        dd_distribute_vec(dd, cgs, xGlobal, as_rvec_array(state_local->x.data()));
+    }
+    if (state_local->flags & (1 << estV))
+    {
+        const rvec *vGlobal = (DDMASTER(dd) ? as_rvec_array(state->v.data()) : nullptr);
+        dd_distribute_vec(dd, cgs, vGlobal, as_rvec_array(state_local->v.data()));
+    }
+    if (state_local->flags & (1 << estCGP))
+    {
+        const rvec *cgpGlobal = (DDMASTER(dd) ? as_rvec_array(state->cg_p.data()) : nullptr);
+        dd_distribute_vec(dd, cgs, cgpGlobal, as_rvec_array(state_local->cg_p.data()));
+    }
+}
+
+static void distribute_cg(FILE *fplog,
+                          const matrix box, const ivec tric_dir,
+                          const t_block *cgs, rvec pos[],
+                          gmx_domdec_t *dd)
+{
+    gmx_domdec_master_t *ma;
+    int                **tmp_ind = nullptr, *tmp_nalloc = nullptr;
+    int                  i, icg, j, k, k0, k1, d;
+    matrix               tcm;
+    rvec                 cg_cm;
+    ivec                 ind;
+    real                 nrcg, inv_ncg, pos_d;
+    int                 *cgindex;
+    gmx_bool             bScrew;
+
+    ma = dd->ma;
+
+    snew(tmp_nalloc, dd->nnodes);
+    snew(tmp_ind, dd->nnodes);
+    for (i = 0; i < dd->nnodes; i++)
+    {
+        tmp_nalloc[i] = over_alloc_large(cgs->nr/dd->nnodes+1);
+        snew(tmp_ind[i], tmp_nalloc[i]);
+    }
+
+    /* Clear the count */
+    for (i = 0; i < dd->nnodes; i++)
+    {
+        ma->ncg[i] = 0;
+        ma->nat[i] = 0;
+    }
+
+    make_tric_corr_matrix(dd->npbcdim, box, tcm);
+
+    cgindex = cgs->index;
+
+    /* Compute the center of geometry for all charge groups */
+    for (icg = 0; icg < cgs->nr; icg++)
+    {
+        k0      = cgindex[icg];
+        k1      = cgindex[icg+1];
+        nrcg    = k1 - k0;
+        if (nrcg == 1)
+        {
+            copy_rvec(pos[k0], cg_cm);
+        }
+        else
+        {
+            inv_ncg = 1.0/nrcg;
+
+            clear_rvec(cg_cm);
+            for (k = k0; (k < k1); k++)
+            {
+                rvec_inc(cg_cm, pos[k]);
+            }
+            for (d = 0; (d < DIM); d++)
+            {
+                cg_cm[d] *= inv_ncg;
+            }
+        }
+        /* Put the charge group in the box and determine the cell index */
+        for (d = DIM-1; d >= 0; d--)
+        {
+            pos_d = cg_cm[d];
+            if (d < dd->npbcdim)
+            {
+                bScrew = (dd->bScrewPBC && d == XX);
+                if (tric_dir[d] && dd->nc[d] > 1)
+                {
+                    /* Use triclinic coordinates for this dimension */
+                    for (j = d+1; j < DIM; j++)
+                    {
+                        pos_d += cg_cm[j]*tcm[j][d];
+                    }
+                }
+                while (pos_d >= box[d][d])
+                {
+                    pos_d -= box[d][d];
+                    rvec_dec(cg_cm, box[d]);
+                    if (bScrew)
+                    {
+                        cg_cm[YY] = box[YY][YY] - cg_cm[YY];
+                        cg_cm[ZZ] = box[ZZ][ZZ] - cg_cm[ZZ];
+                    }
+                    for (k = k0; (k < k1); k++)
+                    {
+                        rvec_dec(pos[k], box[d]);
+                        if (bScrew)
+                        {
+                            pos[k][YY] = box[YY][YY] - pos[k][YY];
+                            pos[k][ZZ] = box[ZZ][ZZ] - pos[k][ZZ];
+                        }
+                    }
+                }
+                while (pos_d < 0)
+                {
+                    pos_d += box[d][d];
+                    rvec_inc(cg_cm, box[d]);
+                    if (bScrew)
+                    {
+                        cg_cm[YY] = box[YY][YY] - cg_cm[YY];
+                        cg_cm[ZZ] = box[ZZ][ZZ] - cg_cm[ZZ];
+                    }
+                    for (k = k0; (k < k1); k++)
+                    {
+                        rvec_inc(pos[k], box[d]);
+                        if (bScrew)
+                        {
+                            pos[k][YY] = box[YY][YY] - pos[k][YY];
+                            pos[k][ZZ] = box[ZZ][ZZ] - pos[k][ZZ];
+                        }
+                    }
+                }
+            }
+            /* This could be done more efficiently */
+            ind[d] = 0;
+            while (ind[d]+1 < dd->nc[d] && pos_d >= ma->cell_x[d][ind[d]+1])
+            {
+                ind[d]++;
+            }
+        }
+        i = dd_index(dd->nc, ind);
+        if (ma->ncg[i] == tmp_nalloc[i])
+        {
+            tmp_nalloc[i] = over_alloc_large(ma->ncg[i]+1);
+            srenew(tmp_ind[i], tmp_nalloc[i]);
+        }
+        tmp_ind[i][ma->ncg[i]] = icg;
+        ma->ncg[i]++;
+        ma->nat[i] += cgindex[icg+1] - cgindex[icg];
+    }
+
+    k1 = 0;
+    for (i = 0; i < dd->nnodes; i++)
+    {
+        ma->index[i] = k1;
+        for (k = 0; k < ma->ncg[i]; k++)
+        {
+            ma->cg[k1++] = tmp_ind[i][k];
+        }
+    }
+    ma->index[dd->nnodes] = k1;
+
+    for (i = 0; i < dd->nnodes; i++)
+    {
+        sfree(tmp_ind[i]);
+    }
+    sfree(tmp_ind);
+    sfree(tmp_nalloc);
+
+    if (fplog)
+    {
+        // Use double for the sums to avoid natoms^2 overflowing
+        // (65537^2 > 2^32)
+        int    nat_sum, nat_min, nat_max;
+        double nat2_sum;
+
+        nat_sum  = 0;
+        nat2_sum = 0;
+        nat_min  = ma->nat[0];
+        nat_max  = ma->nat[0];
+        for (i = 0; i < dd->nnodes; i++)
+        {
+            nat_sum  += ma->nat[i];
+            // cast to double to avoid integer overflows when squaring
+            nat2_sum += gmx::square(static_cast<double>(ma->nat[i]));
+            nat_min   = std::min(nat_min, ma->nat[i]);
+            nat_max   = std::max(nat_max, ma->nat[i]);
+        }
+        nat_sum  /= dd->nnodes;
+        nat2_sum /= dd->nnodes;
+
+        fprintf(fplog, "Atom distribution over %d domains: av %d stddev %d min %d max %d\n",
+                dd->nnodes,
+                nat_sum,
+                static_cast<int>(std::sqrt(nat2_sum - gmx::square(static_cast<double>(nat_sum)) + 0.5)),
+                nat_min, nat_max);
+    }
+}
+
+static void get_cg_distribution(FILE *fplog, gmx_domdec_t *dd,
+                                const t_block *cgs,
+                                const matrix box, const gmx_ddbox_t *ddbox,
+                                rvec pos[])
+{
+    gmx_domdec_master_t *ma = nullptr;
+    ivec                 npulse;
+    int                  i, cg_gl;
+    int                 *ibuf, buf2[2] = { 0, 0 };
+    gmx_bool             bMaster = DDMASTER(dd);
+
+    if (bMaster)
+    {
+        ma = dd->ma;
+
+        if (dd->bScrewPBC)
+        {
+            check_screw_box(box);
+        }
+
+        set_dd_cell_sizes_slb(dd, ddbox, setcellsizeslbMASTER, npulse);
+
+        distribute_cg(fplog, box, ddbox->tric_dir, cgs, pos, dd);
+        for (i = 0; i < dd->nnodes; i++)
+        {
+            ma->ibuf[2*i]   = ma->ncg[i];
+            ma->ibuf[2*i+1] = ma->nat[i];
+        }
+        ibuf = ma->ibuf;
+    }
+    else
+    {
+        ibuf = nullptr;
+    }
+    dd_scatter(dd, 2*sizeof(int), ibuf, buf2);
+
+    dd->ncg_home = buf2[0];
+    dd->nat_home = buf2[1];
+    dd->ncg_tot  = dd->ncg_home;
+    dd->nat_tot  = dd->nat_home;
+    if (dd->ncg_home > dd->cg_nalloc || dd->cg_nalloc == 0)
+    {
+        dd->cg_nalloc = over_alloc_dd(dd->ncg_home);
+        srenew(dd->index_gl, dd->cg_nalloc);
+        srenew(dd->cgindex, dd->cg_nalloc+1);
+    }
+    if (bMaster)
+    {
+        for (i = 0; i < dd->nnodes; i++)
+        {
+            ma->ibuf[i]            = ma->ncg[i]*sizeof(int);
+            ma->ibuf[dd->nnodes+i] = ma->index[i]*sizeof(int);
+        }
+    }
+
+    dd_scatterv(dd,
+                bMaster ? ma->ibuf : nullptr,
+                bMaster ? ma->ibuf+dd->nnodes : nullptr,
+                bMaster ? ma->cg : nullptr,
+                dd->ncg_home*sizeof(int), dd->index_gl);
+
+    /* Determine the home charge group sizes */
+    dd->cgindex[0] = 0;
+    for (i = 0; i < dd->ncg_home; i++)
+    {
+        cg_gl            = dd->index_gl[i];
+        dd->cgindex[i+1] =
+            dd->cgindex[i] + cgs->index[cg_gl+1] - cgs->index[cg_gl];
+    }
+
+    if (debug)
+    {
+        fprintf(debug, "Home charge groups:\n");
+        for (i = 0; i < dd->ncg_home; i++)
+        {
+            fprintf(debug, " %d", dd->index_gl[i]);
+            if (i % 10 == 9)
+            {
+                fprintf(debug, "\n");
+            }
+        }
+        fprintf(debug, "\n");
+    }
+}
+
+void distributeState(FILE                *fplog,
+                     gmx_domdec_t        *dd,
+                     t_state             *state_global,
+                     const t_block       &cgs_gl,
+                     const gmx_ddbox_t   &ddbox,
+                     t_state             *state_local,
+                     PaddedRVecVector    *f)
+{
+    rvec *xGlobal = (DDMASTER(dd) ? as_rvec_array(state_global->x.data()) : nullptr);
+
+    get_cg_distribution(fplog, dd, &cgs_gl,
+                        DDMASTER(dd) ? state_global->box : nullptr,
+                        &ddbox, xGlobal);
+
+    dd_distribute_state(dd, &cgs_gl,
+                        state_global, state_local, f);
+}
diff --git a/src/gromacs/domdec/distribute.h b/src/gromacs/domdec/distribute.h
new file mode 100644 (file)
index 0000000..db558b2
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * This file is part of the GROMACS molecular simulation package.
+ *
+ * Copyright (c) 2018, by the GROMACS development team, led by
+ * Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
+ * and including many others, as listed in the AUTHORS file in the
+ * top-level source directory and at http://www.gromacs.org.
+ *
+ * GROMACS is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1
+ * of the License, or (at your option) any later version.
+ *
+ * GROMACS is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with GROMACS; if not, see
+ * http://www.gnu.org/licenses, or write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
+ *
+ * If you want to redistribute modifications to GROMACS, please
+ * consider that scientific software is very special. Version
+ * control is crucial - bugs must be traceable. We will be happy to
+ * consider code for inclusion in the official distribution, but
+ * derived work must not be called official GROMACS. Details are found
+ * in the README & COPYING files - if they are missing, get the
+ * official version at http://www.gromacs.org.
+ *
+ * To help us fund GROMACS development, we humbly ask that you cite
+ * the research papers on the package. Check out http://www.gromacs.org.
+ */
+/*! \internal \file
+ *
+ * \brief Declares atom distribution functions.
+ *
+ * \author Berk Hess <hess@kth.se>
+ * \ingroup module_domdec
+ */
+#ifndef GMX_DOMDEC_DOMDEC_DISTRIBUTE_H
+#define GMX_DOMDEC_DOMDEC_DISTRIBUTE_H
+
+#include <cstdio>
+
+#include "gromacs/math/paddedvector.h"
+#include "gromacs/utility/basedefinitions.h"
+
+struct gmx_ddbox_t;
+struct gmx_domdec_t;
+struct t_block;
+class t_state;
+
+/*! \brief Returns state scatter/gather buffer element counts and displacements
+ *
+ * NOTE: Should only be called on the master rank.
+ */
+void get_commbuffer_counts(gmx_domdec_t  *dd,
+                           int          **counts,
+                           int          **disps);
+
+/*! \brief Distributes the state from the master rank to all DD ranks */
+void distributeState(FILE                *fplog,
+                     gmx_domdec_t        *dd,
+                     t_state             *state_global,
+                     const t_block       &cgs_gl,
+                     const gmx_ddbox_t   &ddbox,
+                     t_state             *state_local,
+                     PaddedRVecVector    *f);
+
+#endif
index 77e71abf1f86fb3310e5af76cb53b1c3505aad1d..ba83908db530ae8c543898494ea185045b740686 100644 (file)
 #include "gromacs/utility/smalloc.h"
 #include "gromacs/utility/stringutil.h"
 
+#include "cellsizes.h"
+#include "distribute.h"
 #include "domdec_constraints.h"
 #include "domdec_internal.h"
 #include "domdec_vsite.h"
 #include "redistribute.h"
 #include "utility.h"
 
-#define DDRANK(dd, rank)    (rank)
-#define DDMASTERRANK(dd)   (dd->masterrank)
-
-struct gmx_domdec_master_t
-{
-    /* The cell boundaries */
-    real **cell_x;
-    /* The global charge group division */
-    int   *ncg;    /* Number of home charge groups for each node */
-    int   *index;  /* Index of nnodes+1 into cg */
-    int   *cg;     /* Global charge group index */
-    int   *nat;    /* Number of home atoms for each node. */
-    int   *ibuf;   /* Buffer for communication */
-    rvec  *vbuf;   /* Buffer for state scattering and gathering */
-};
-
 #define DD_NLOAD_MAX 9
 
 const char *edlbs_names[edlbsNR] = { "off", "auto", "locked", "on", "on" };
@@ -159,12 +145,6 @@ static const int
                                                  {2, 5, 6},
                                                  {3, 5, 7}};
 
-/* Factors used to avoid problems due to rounding issues */
-#define DD_CELL_MARGIN       1.0001
-#define DD_CELL_MARGIN2      1.00005
-/* Factor to account for pressure scaling during nstlist steps */
-#define DD_PRES_SCALE_MARGIN 1.02
-
 /* Turn on DLB when the load imbalance causes this amount of total loss.
  * There is a bit of overhead with DLB and it's difficult to achieve
  * a load imbalance of less than 2% with DLB.
@@ -174,15 +154,6 @@ static const int
 /* Warn about imbalance due to PP or PP/PME load imbalance at this loss */
 #define DD_PERF_LOSS_WARN    0.05
 
-#define DD_CELL_F_SIZE(dd, di) ((dd)->nc[(dd)->dim[(di)]]+1+(di)*2+1+(di))
-
-/* Use separate MPI send and receive commands
- * when nnodes <= GMX_DD_NNODES_SENDRECV.
- * This saves memory (and some copying for small nnodes).
- * For high parallelization scatter and gather calls are used.
- */
-#define GMX_DD_NNODES_SENDRECV 4
-
 
 /* We check if to turn on DLB at the first and every 100 DD partitionings.
  * With large imbalance DLB will turn on at the first step, so we can
@@ -212,11 +183,6 @@ static void dd_dlb_set_should_check_whether_to_turn_dlb_on(gmx_domdec_t *dd, gmx
    }
  */
 
-/* This order is required to minimize the coordinate communication in PME
- * which uses decomposition in the x direction.
- */
-#define dd_index(n, i) ((((i)[XX]*(n)[YY] + (i)[YY])*(n)[ZZ]) + (i)[ZZ])
-
 static void ddindex2xyz(ivec nc, int ind, ivec xyz)
 {
     xyz[XX] = ind / (nc[YY]*nc[ZZ]);
@@ -1159,7 +1125,7 @@ static void dd_collect_vec_sendrecv(gmx_domdec_t                  *dd,
     {
 #if GMX_MPI
         MPI_Send(const_cast<void *>(static_cast<const void *>(lv.data())), dd->nat_home*sizeof(rvec), MPI_BYTE,
-                 DDMASTERRANK(dd), dd->rank, dd->mpi_comm_all);
+                 dd->masterrank, dd->rank, dd->mpi_comm_all);
 #endif
     }
     else
@@ -1167,7 +1133,7 @@ static void dd_collect_vec_sendrecv(gmx_domdec_t                  *dd,
         /* Copy the master coordinates to the global array */
         cgs_gl = &dd->comm->cgs_gl;
 
-        n = DDMASTERRANK(dd);
+        n = dd->masterrank;
         a = 0;
         for (i = ma->index[n]; i < ma->index[n+1]; i++)
         {
@@ -1187,7 +1153,7 @@ static void dd_collect_vec_sendrecv(gmx_domdec_t                  *dd,
                     srenew(buf, nalloc);
                 }
 #if GMX_MPI
-                MPI_Recv(buf, ma->nat[n]*sizeof(rvec), MPI_BYTE, DDRANK(dd, n),
+                MPI_Recv(buf, ma->nat[n]*sizeof(rvec), MPI_BYTE, n,
                          n, dd->mpi_comm_all, MPI_STATUS_IGNORE);
 #endif
                 a = 0;
@@ -1204,24 +1170,6 @@ static void dd_collect_vec_sendrecv(gmx_domdec_t                  *dd,
     }
 }
 
-static void get_commbuffer_counts(gmx_domdec_t *dd,
-                                  int **counts, int **disps)
-{
-    gmx_domdec_master_t *ma;
-    int                  n;
-
-    ma = dd->ma;
-
-    /* Make the rvec count and displacment arrays */
-    *counts  = ma->ibuf;
-    *disps   = ma->ibuf + dd->nnodes;
-    for (n = 0; n < dd->nnodes; n++)
-    {
-        (*counts)[n] = ma->nat[n]*sizeof(rvec);
-        (*disps)[n]  = (n == 0 ? 0 : (*disps)[n-1] + (*counts)[n-1]);
-    }
-}
-
 static void dd_collect_vec_gatherv(gmx_domdec_t                  *dd,
                                    gmx::ArrayRef<const gmx::RVec> lv,
                                    gmx::ArrayRef<gmx::RVec>       v)
@@ -1268,7 +1216,7 @@ void dd_collect_vec(gmx_domdec_t                  *dd,
 {
     dd_collect_cg(dd, state_local);
 
-    if (dd->nnodes <= GMX_DD_NNODES_SENDRECV)
+    if (dd->nnodes <= c_maxNumRanksUseSendRecvForScatterAndGather)
     {
         dd_collect_vec_sendrecv(dd, lv, v);
     }
@@ -1337,226 +1285,6 @@ void dd_collect_state(gmx_domdec_t *dd,
     }
 }
 
-static void dd_distribute_vec_sendrecv(gmx_domdec_t *dd, t_block *cgs,
-                                       const rvec *v, rvec *lv)
-{
-    gmx_domdec_master_t *ma;
-    int                  n, i, c, a, nalloc = 0;
-    rvec                *buf = nullptr;
-
-    if (DDMASTER(dd))
-    {
-        ma  = dd->ma;
-
-        for (n = 0; n < dd->nnodes; n++)
-        {
-            if (n != dd->rank)
-            {
-                if (ma->nat[n] > nalloc)
-                {
-                    nalloc = over_alloc_dd(ma->nat[n]);
-                    srenew(buf, nalloc);
-                }
-                /* Use lv as a temporary buffer */
-                a = 0;
-                for (i = ma->index[n]; i < ma->index[n+1]; i++)
-                {
-                    for (c = cgs->index[ma->cg[i]]; c < cgs->index[ma->cg[i]+1]; c++)
-                    {
-                        copy_rvec(v[c], buf[a++]);
-                    }
-                }
-                if (a != ma->nat[n])
-                {
-                    gmx_fatal(FARGS, "Internal error a (%d) != nat (%d)",
-                              a, ma->nat[n]);
-                }
-
-#if GMX_MPI
-                MPI_Send(buf, ma->nat[n]*sizeof(rvec), MPI_BYTE,
-                         DDRANK(dd, n), n, dd->mpi_comm_all);
-#endif
-            }
-        }
-        sfree(buf);
-        n = DDMASTERRANK(dd);
-        a = 0;
-        for (i = ma->index[n]; i < ma->index[n+1]; i++)
-        {
-            for (c = cgs->index[ma->cg[i]]; c < cgs->index[ma->cg[i]+1]; c++)
-            {
-                copy_rvec(v[c], lv[a++]);
-            }
-        }
-    }
-    else
-    {
-#if GMX_MPI
-        MPI_Recv(lv, dd->nat_home*sizeof(rvec), MPI_BYTE, DDMASTERRANK(dd),
-                 MPI_ANY_TAG, dd->mpi_comm_all, MPI_STATUS_IGNORE);
-#endif
-    }
-}
-
-static void dd_distribute_vec_scatterv(gmx_domdec_t *dd, t_block *cgs,
-                                       const rvec *v, rvec *lv)
-{
-    gmx_domdec_master_t *ma;
-    int                 *scounts = nullptr, *disps = nullptr;
-    int                  n, i, c, a;
-    rvec                *buf = nullptr;
-
-    if (DDMASTER(dd))
-    {
-        ma  = dd->ma;
-
-        get_commbuffer_counts(dd, &scounts, &disps);
-
-        buf = ma->vbuf;
-        a   = 0;
-        for (n = 0; n < dd->nnodes; n++)
-        {
-            for (i = ma->index[n]; i < ma->index[n+1]; i++)
-            {
-                for (c = cgs->index[ma->cg[i]]; c < cgs->index[ma->cg[i]+1]; c++)
-                {
-                    copy_rvec(v[c], buf[a++]);
-                }
-            }
-        }
-    }
-
-    dd_scatterv(dd, scounts, disps, buf, dd->nat_home*sizeof(rvec), lv);
-}
-
-static void dd_distribute_vec(gmx_domdec_t *dd, t_block *cgs,
-                              const rvec *v, rvec *lv)
-{
-    if (dd->nnodes <= GMX_DD_NNODES_SENDRECV)
-    {
-        dd_distribute_vec_sendrecv(dd, cgs, v, lv);
-    }
-    else
-    {
-        dd_distribute_vec_scatterv(dd, cgs, v, lv);
-    }
-}
-
-static void dd_distribute_dfhist(gmx_domdec_t *dd, df_history_t *dfhist)
-{
-    if (dfhist == nullptr)
-    {
-        return;
-    }
-
-    dd_bcast(dd, sizeof(int), &dfhist->bEquil);
-    dd_bcast(dd, sizeof(int), &dfhist->nlambda);
-    dd_bcast(dd, sizeof(real), &dfhist->wl_delta);
-
-    if (dfhist->nlambda > 0)
-    {
-        int nlam = dfhist->nlambda;
-        dd_bcast(dd, sizeof(int)*nlam, dfhist->n_at_lam);
-        dd_bcast(dd, sizeof(real)*nlam, dfhist->wl_histo);
-        dd_bcast(dd, sizeof(real)*nlam, dfhist->sum_weights);
-        dd_bcast(dd, sizeof(real)*nlam, dfhist->sum_dg);
-        dd_bcast(dd, sizeof(real)*nlam, dfhist->sum_minvar);
-        dd_bcast(dd, sizeof(real)*nlam, dfhist->sum_variance);
-
-        for (int i = 0; i < nlam; i++)
-        {
-            dd_bcast(dd, sizeof(real)*nlam, dfhist->accum_p[i]);
-            dd_bcast(dd, sizeof(real)*nlam, dfhist->accum_m[i]);
-            dd_bcast(dd, sizeof(real)*nlam, dfhist->accum_p2[i]);
-            dd_bcast(dd, sizeof(real)*nlam, dfhist->accum_m2[i]);
-            dd_bcast(dd, sizeof(real)*nlam, dfhist->Tij[i]);
-            dd_bcast(dd, sizeof(real)*nlam, dfhist->Tij_empirical[i]);
-        }
-    }
-}
-
-static void dd_distribute_state(gmx_domdec_t *dd, t_block *cgs,
-                                t_state *state, t_state *state_local,
-                                PaddedRVecVector *f)
-{
-    int nh = state_local->nhchainlength;
-
-    if (DDMASTER(dd))
-    {
-        GMX_RELEASE_ASSERT(state->nhchainlength == nh, "The global and local Nose-Hoover chain lengths should match");
-
-        for (int i = 0; i < efptNR; i++)
-        {
-            state_local->lambda[i] = state->lambda[i];
-        }
-        state_local->fep_state = state->fep_state;
-        state_local->veta      = state->veta;
-        state_local->vol0      = state->vol0;
-        copy_mat(state->box, state_local->box);
-        copy_mat(state->box_rel, state_local->box_rel);
-        copy_mat(state->boxv, state_local->boxv);
-        copy_mat(state->svir_prev, state_local->svir_prev);
-        copy_mat(state->fvir_prev, state_local->fvir_prev);
-        if (state->dfhist != nullptr)
-        {
-            copy_df_history(state_local->dfhist, state->dfhist);
-        }
-        for (int i = 0; i < state_local->ngtc; i++)
-        {
-            for (int j = 0; j < nh; j++)
-            {
-                state_local->nosehoover_xi[i*nh+j]        = state->nosehoover_xi[i*nh+j];
-                state_local->nosehoover_vxi[i*nh+j]       = state->nosehoover_vxi[i*nh+j];
-            }
-            state_local->therm_integral[i] = state->therm_integral[i];
-        }
-        for (int i = 0; i < state_local->nnhpres; i++)
-        {
-            for (int j = 0; j < nh; j++)
-            {
-                state_local->nhpres_xi[i*nh+j]        = state->nhpres_xi[i*nh+j];
-                state_local->nhpres_vxi[i*nh+j]       = state->nhpres_vxi[i*nh+j];
-            }
-        }
-        state_local->baros_integral = state->baros_integral;
-    }
-    dd_bcast(dd, ((efptNR)*sizeof(real)), state_local->lambda.data());
-    dd_bcast(dd, sizeof(int), &state_local->fep_state);
-    dd_bcast(dd, sizeof(real), &state_local->veta);
-    dd_bcast(dd, sizeof(real), &state_local->vol0);
-    dd_bcast(dd, sizeof(state_local->box), state_local->box);
-    dd_bcast(dd, sizeof(state_local->box_rel), state_local->box_rel);
-    dd_bcast(dd, sizeof(state_local->boxv), state_local->boxv);
-    dd_bcast(dd, sizeof(state_local->svir_prev), state_local->svir_prev);
-    dd_bcast(dd, sizeof(state_local->fvir_prev), state_local->fvir_prev);
-    dd_bcast(dd, ((state_local->ngtc*nh)*sizeof(double)), state_local->nosehoover_xi.data());
-    dd_bcast(dd, ((state_local->ngtc*nh)*sizeof(double)), state_local->nosehoover_vxi.data());
-    dd_bcast(dd, state_local->ngtc*sizeof(double), state_local->therm_integral.data());
-    dd_bcast(dd, ((state_local->nnhpres*nh)*sizeof(double)), state_local->nhpres_xi.data());
-    dd_bcast(dd, ((state_local->nnhpres*nh)*sizeof(double)), state_local->nhpres_vxi.data());
-
-    /* communicate df_history -- required for restarting from checkpoint */
-    dd_distribute_dfhist(dd, state_local->dfhist);
-
-    dd_resize_state(state_local, f, dd->nat_home);
-
-    if (state_local->flags & (1 << estX))
-    {
-        const rvec *xGlobal = (DDMASTER(dd) ? as_rvec_array(state->x.data()) : nullptr);
-        dd_distribute_vec(dd, cgs, xGlobal, as_rvec_array(state_local->x.data()));
-    }
-    if (state_local->flags & (1 << estV))
-    {
-        const rvec *vGlobal = (DDMASTER(dd) ? as_rvec_array(state->v.data()) : nullptr);
-        dd_distribute_vec(dd, cgs, vGlobal, as_rvec_array(state_local->v.data()));
-    }
-    if (state_local->flags & (1 << estCGP))
-    {
-        const rvec *cgpGlobal = (DDMASTER(dd) ? as_rvec_array(state->cg_p.data()) : nullptr);
-        dd_distribute_vec(dd, cgs, cgpGlobal, as_rvec_array(state_local->cg_p.data()));
-    }
-}
-
 static void write_dd_grid_pdb(const char *fn, gmx_int64_t step,
                               gmx_domdec_t *dd, matrix box, gmx_ddbox_t *ddbox)
 {
@@ -2306,59 +2034,6 @@ static void clear_dd_indices(gmx_domdec_t *dd, int cg_start, int a_start)
     }
 }
 
-/* This function should be used for moving the domain boudaries during DLB,
- * for obtaining the minimum cell size. It checks the initially set limit
- * comm->cellsize_min, for bonded and initial non-bonded cut-offs,
- * and, possibly, a longer cut-off limit set for PME load balancing.
- */
-static real cellsize_min_dlb(gmx_domdec_comm_t *comm, int dim_ind, int dim)
-{
-    real cellsize_min;
-
-    cellsize_min = comm->cellsize_min[dim];
-
-    if (!comm->bVacDLBNoLimit)
-    {
-        /* The cut-off might have changed, e.g. by PME load balacning,
-         * from the value used to set comm->cellsize_min, so check it.
-         */
-        cellsize_min = std::max(cellsize_min, comm->cutoff/comm->cd[dim_ind].np_dlb);
-
-        if (comm->bPMELoadBalDLBLimits)
-        {
-            /* Check for the cut-off limit set by the PME load balancing */
-            cellsize_min = std::max(cellsize_min, comm->PMELoadBal_max_cutoff/comm->cd[dim_ind].np_dlb);
-        }
-    }
-
-    return cellsize_min;
-}
-
-static real grid_jump_limit(gmx_domdec_comm_t *comm, real cutoff,
-                            int dim_ind)
-{
-    real grid_jump_limit;
-
-    /* The distance between the boundaries of cells at distance
-     * x+-1,y+-1 or y+-1,z+-1 is limited by the cut-off restrictions
-     * and by the fact that cells should not be shifted by more than
-     * half their size, such that cg's only shift by one cell
-     * at redecomposition.
-     */
-    grid_jump_limit = comm->cellsize_limit;
-    if (!comm->bVacDLBNoLimit)
-    {
-        if (comm->bPMELoadBalDLBLimits)
-        {
-            cutoff = std::max(cutoff, comm->PMELoadBal_max_cutoff);
-        }
-        grid_jump_limit = std::max(grid_jump_limit,
-                                   cutoff/comm->cd[dim_ind].np);
-    }
-
-    return grid_jump_limit;
-}
-
 static gmx_bool check_grid_jump(gmx_int64_t     step,
                                 gmx_domdec_t   *dd,
                                 real            cutoff,
@@ -2405,11 +2080,6 @@ static gmx_bool check_grid_jump(gmx_int64_t     step,
     return bInvalid;
 }
 
-static int dd_load_count(gmx_domdec_comm_t *comm)
-{
-    return (comm->eFlop ? comm->flop_n : comm->cycl_n[ddCyclF]);
-}
-
 static float dd_force_load(gmx_domdec_comm_t *comm)
 {
     float load;
@@ -2444,966 +2114,135 @@ static float dd_force_load(gmx_domdec_comm_t *comm)
             if (comm->cycl_n[ddCyclF] > 1)
             {
                 /* We should remove the WaitGPU time of the same MD step
-                 * as the one with the maximum F time, since the F time
-                 * and the wait time are not independent.
-                 * Furthermore, the step for the max F time should be chosen
-                 * the same on all ranks that share the same GPU.
-                 * But to keep the code simple, we remove the average instead.
-                 * The main reason for artificially long times at some steps
-                 * is spurious CPU activity or MPI time, so we don't expect
-                 * that changes in the GPU wait time matter a lot here.
-                 */
-                gpu_wait *= (comm->cycl_n[ddCyclF] - 1)/(float)comm->cycl_n[ddCyclF];
-            }
-            /* Sum the wait times over the ranks that share the same GPU */
-            MPI_Allreduce(&gpu_wait, &gpu_wait_sum, 1, MPI_FLOAT, MPI_SUM,
-                          comm->mpi_comm_gpu_shared);
-            /* Replace the wait time by the average over the ranks */
-            load += -gpu_wait + gpu_wait_sum/comm->nrank_gpu_shared;
-        }
-#endif
-    }
-
-    return load;
-}
-
-static void set_slb_pme_dim_f(gmx_domdec_t *dd, int dim, real **dim_f)
-{
-    gmx_domdec_comm_t *comm;
-    int                i;
-
-    comm = dd->comm;
-
-    snew(*dim_f, dd->nc[dim]+1);
-    (*dim_f)[0] = 0;
-    for (i = 1; i < dd->nc[dim]; i++)
-    {
-        if (comm->slb_frac[dim])
-        {
-            (*dim_f)[i] = (*dim_f)[i-1] + comm->slb_frac[dim][i-1];
-        }
-        else
-        {
-            (*dim_f)[i] = (real)i/(real)dd->nc[dim];
-        }
-    }
-    (*dim_f)[dd->nc[dim]] = 1;
-}
-
-static void init_ddpme(gmx_domdec_t *dd, gmx_ddpme_t *ddpme, int dimind)
-{
-    int  pmeindex, slab, nso, i;
-    ivec xyz;
-
-    if (dimind == 0 && dd->dim[0] == YY && dd->comm->npmenodes_x == 1)
-    {
-        ddpme->dim = YY;
-    }
-    else
-    {
-        ddpme->dim = dimind;
-    }
-    ddpme->dim_match = (ddpme->dim == dd->dim[dimind]);
-
-    ddpme->nslab = (ddpme->dim == 0 ?
-                    dd->comm->npmenodes_x :
-                    dd->comm->npmenodes_y);
-
-    if (ddpme->nslab <= 1)
-    {
-        return;
-    }
-
-    nso = dd->comm->npmenodes/ddpme->nslab;
-    /* Determine for each PME slab the PP location range for dimension dim */
-    snew(ddpme->pp_min, ddpme->nslab);
-    snew(ddpme->pp_max, ddpme->nslab);
-    for (slab = 0; slab < ddpme->nslab; slab++)
-    {
-        ddpme->pp_min[slab] = dd->nc[dd->dim[dimind]] - 1;
-        ddpme->pp_max[slab] = 0;
-    }
-    for (i = 0; i < dd->nnodes; i++)
-    {
-        ddindex2xyz(dd->nc, i, xyz);
-        /* For y only use our y/z slab.
-         * This assumes that the PME x grid size matches the DD grid size.
-         */
-        if (dimind == 0 || xyz[XX] == dd->ci[XX])
-        {
-            pmeindex = ddindex2pmeindex(dd, i);
-            if (dimind == 0)
-            {
-                slab = pmeindex/nso;
-            }
-            else
-            {
-                slab = pmeindex % ddpme->nslab;
-            }
-            ddpme->pp_min[slab] = std::min(ddpme->pp_min[slab], xyz[dimind]);
-            ddpme->pp_max[slab] = std::max(ddpme->pp_max[slab], xyz[dimind]);
-        }
-    }
-
-    set_slb_pme_dim_f(dd, ddpme->dim, &ddpme->slb_dim_f);
-}
-
-int dd_pme_maxshift_x(const gmx_domdec_t *dd)
-{
-    if (dd->comm->ddpme[0].dim == XX)
-    {
-        return dd->comm->ddpme[0].maxshift;
-    }
-    else
-    {
-        return 0;
-    }
-}
-
-int dd_pme_maxshift_y(const gmx_domdec_t *dd)
-{
-    if (dd->comm->ddpme[0].dim == YY)
-    {
-        return dd->comm->ddpme[0].maxshift;
-    }
-    else if (dd->comm->npmedecompdim >= 2 && dd->comm->ddpme[1].dim == YY)
-    {
-        return dd->comm->ddpme[1].maxshift;
-    }
-    else
-    {
-        return 0;
-    }
-}
-
-static void set_pme_maxshift(gmx_domdec_t *dd, gmx_ddpme_t *ddpme,
-                             gmx_bool bUniform, const gmx_ddbox_t *ddbox,
-                             const real *cell_f)
-{
-    gmx_domdec_comm_t *comm;
-    int                nc, ns, s;
-    int               *xmin, *xmax;
-    real               range, pme_boundary;
-    int                sh;
-
-    comm = dd->comm;
-    nc   = dd->nc[ddpme->dim];
-    ns   = ddpme->nslab;
-
-    if (!ddpme->dim_match)
-    {
-        /* PP decomposition is not along dim: the worst situation */
-        sh = ns/2;
-    }
-    else if (ns <= 3 || (bUniform && ns == nc))
-    {
-        /* The optimal situation */
-        sh = 1;
-    }
-    else
-    {
-        /* We need to check for all pme nodes which nodes they
-         * could possibly need to communicate with.
-         */
-        xmin = ddpme->pp_min;
-        xmax = ddpme->pp_max;
-        /* Allow for atoms to be maximally 2/3 times the cut-off
-         * out of their DD cell. This is a reasonable balance between
-         * between performance and support for most charge-group/cut-off
-         * combinations.
-         */
-        range  = 2.0/3.0*comm->cutoff/ddbox->box_size[ddpme->dim];
-        /* Avoid extra communication when we are exactly at a boundary */
-        range *= 0.999;
-
-        sh = 1;
-        for (s = 0; s < ns; s++)
-        {
-            /* PME slab s spreads atoms between box frac. s/ns and (s+1)/ns */
-            pme_boundary = (real)s/ns;
-            while (sh+1 < ns &&
-                   ((s-(sh+1) >= 0 &&
-                     cell_f[xmax[s-(sh+1)   ]+1]     + range > pme_boundary) ||
-                    (s-(sh+1) <  0 &&
-                     cell_f[xmax[s-(sh+1)+ns]+1] - 1 + range > pme_boundary)))
-            {
-                sh++;
-            }
-            pme_boundary = (real)(s+1)/ns;
-            while (sh+1 < ns &&
-                   ((s+(sh+1) <  ns &&
-                     cell_f[xmin[s+(sh+1)   ]  ]     - range < pme_boundary) ||
-                    (s+(sh+1) >= ns &&
-                     cell_f[xmin[s+(sh+1)-ns]  ] + 1 - range < pme_boundary)))
-            {
-                sh++;
-            }
-        }
-    }
-
-    ddpme->maxshift = sh;
-
-    if (debug)
-    {
-        fprintf(debug, "PME slab communication range for dim %d is %d\n",
-                ddpme->dim, ddpme->maxshift);
-    }
-}
-
-static void check_box_size(gmx_domdec_t *dd, gmx_ddbox_t *ddbox)
-{
-    int d, dim;
-
-    for (d = 0; d < dd->ndim; d++)
-    {
-        dim = dd->dim[d];
-        if (dim < ddbox->nboundeddim &&
-            ddbox->box_size[dim]*ddbox->skew_fac[dim] <
-            dd->nc[dim]*dd->comm->cellsize_limit*DD_CELL_MARGIN)
-        {
-            gmx_fatal(FARGS, "The %c-size of the box (%f) times the triclinic skew factor (%f) is smaller than the number of DD cells (%d) times the smallest allowed cell size (%f)\n",
-                      dim2char(dim), ddbox->box_size[dim], ddbox->skew_fac[dim],
-                      dd->nc[dim], dd->comm->cellsize_limit);
-        }
-    }
-}
-
-enum {
-    setcellsizeslbLOCAL, setcellsizeslbMASTER, setcellsizeslbPULSE_ONLY
-};
-
-/* Set the domain boundaries. Use for static (or no) load balancing,
- * and also for the starting state for dynamic load balancing.
- * setmode determine if and where the boundaries are stored, use enum above.
- * Returns the number communication pulses in npulse.
- */
-static void set_dd_cell_sizes_slb(gmx_domdec_t *dd, const gmx_ddbox_t *ddbox,
-                                  int setmode, ivec npulse)
-{
-    gmx_domdec_comm_t *comm;
-    int                d, j;
-    rvec               cellsize_min;
-    real              *cell_x, cell_dx, cellsize;
-
-    comm = dd->comm;
-
-    for (d = 0; d < DIM; d++)
-    {
-        cellsize_min[d] = ddbox->box_size[d]*ddbox->skew_fac[d];
-        npulse[d]       = 1;
-        if (dd->nc[d] == 1 || comm->slb_frac[d] == nullptr)
-        {
-            /* Uniform grid */
-            cell_dx = ddbox->box_size[d]/dd->nc[d];
-            switch (setmode)
-            {
-                case setcellsizeslbMASTER:
-                    for (j = 0; j < dd->nc[d]+1; j++)
-                    {
-                        dd->ma->cell_x[d][j] = ddbox->box0[d] + j*cell_dx;
-                    }
-                    break;
-                case setcellsizeslbLOCAL:
-                    comm->cell_x0[d] = ddbox->box0[d] + (dd->ci[d]  )*cell_dx;
-                    comm->cell_x1[d] = ddbox->box0[d] + (dd->ci[d]+1)*cell_dx;
-                    break;
-                default:
-                    break;
-            }
-            cellsize = cell_dx*ddbox->skew_fac[d];
-            while (cellsize*npulse[d] < comm->cutoff)
-            {
-                npulse[d]++;
-            }
-            cellsize_min[d] = cellsize;
-        }
-        else
-        {
-            /* Statically load balanced grid */
-            /* Also when we are not doing a master distribution we determine
-             * all cell borders in a loop to obtain identical values
-             * to the master distribution case and to determine npulse.
-             */
-            if (setmode == setcellsizeslbMASTER)
-            {
-                cell_x = dd->ma->cell_x[d];
-            }
-            else
-            {
-                snew(cell_x, dd->nc[d]+1);
-            }
-            cell_x[0] = ddbox->box0[d];
-            for (j = 0; j < dd->nc[d]; j++)
-            {
-                cell_dx     = ddbox->box_size[d]*comm->slb_frac[d][j];
-                cell_x[j+1] = cell_x[j] + cell_dx;
-                cellsize    = cell_dx*ddbox->skew_fac[d];
-                while (cellsize*npulse[d] < comm->cutoff &&
-                       npulse[d] < dd->nc[d]-1)
-                {
-                    npulse[d]++;
-                }
-                cellsize_min[d] = std::min(cellsize_min[d], cellsize);
-            }
-            if (setmode == setcellsizeslbLOCAL)
-            {
-                comm->cell_x0[d] = cell_x[dd->ci[d]];
-                comm->cell_x1[d] = cell_x[dd->ci[d]+1];
-            }
-            if (setmode != setcellsizeslbMASTER)
-            {
-                sfree(cell_x);
-            }
-        }
-        /* The following limitation is to avoid that a cell would receive
-         * some of its own home charge groups back over the periodic boundary.
-         * Double charge groups cause trouble with the global indices.
-         */
-        if (d < ddbox->npbcdim &&
-            dd->nc[d] > 1 && npulse[d] >= dd->nc[d])
-        {
-            char error_string[STRLEN];
-
-            sprintf(error_string,
-                    "The box size in direction %c (%f) times the triclinic skew factor (%f) is too small for a cut-off of %f with %d domain decomposition cells, use 1 or more than %d %s or increase the box size in this direction",
-                    dim2char(d), ddbox->box_size[d], ddbox->skew_fac[d],
-                    comm->cutoff,
-                    dd->nc[d], dd->nc[d],
-                    dd->nnodes > dd->nc[d] ? "cells" : "ranks");
-
-            if (setmode == setcellsizeslbLOCAL)
-            {
-                gmx_fatal_collective(FARGS, dd->mpi_comm_all, DDMASTER(dd),
-                                     error_string);
-            }
-            else
-            {
-                gmx_fatal(FARGS, error_string);
-            }
-        }
-    }
-
-    if (!isDlbOn(comm))
-    {
-        copy_rvec(cellsize_min, comm->cellsize_min);
-    }
-
-    for (d = 0; d < comm->npmedecompdim; d++)
-    {
-        set_pme_maxshift(dd, &comm->ddpme[d],
-                         comm->slb_frac[dd->dim[d]] == nullptr, ddbox,
-                         comm->ddpme[d].slb_dim_f);
-    }
-}
-
-
-static void dd_cell_sizes_dlb_root_enforce_limits(gmx_domdec_t *dd,
-                                                  int d, int dim, domdec_root_t *root,
-                                                  const gmx_ddbox_t *ddbox,
-                                                  gmx_bool bUniform, gmx_int64_t step, real cellsize_limit_f, int range[])
-{
-    gmx_domdec_comm_t *comm;
-    int                ncd, i, j, nmin, nmin_old;
-    gmx_bool           bLimLo, bLimHi;
-    real              *cell_size;
-    real               fac, halfway, cellsize_limit_f_i, region_size;
-    gmx_bool           bPBC, bLastHi = FALSE;
-    int                nrange[] = {range[0], range[1]};
-
-    region_size = root->cell_f[range[1]]-root->cell_f[range[0]];
-
-    comm = dd->comm;
-
-    ncd = dd->nc[dim];
-
-    bPBC = (dim < ddbox->npbcdim);
-
-    cell_size = root->buf_ncd;
-
-    if (debug)
-    {
-        fprintf(debug, "enforce_limits: %d %d\n", range[0], range[1]);
-    }
-
-    /* First we need to check if the scaling does not make cells
-     * smaller than the smallest allowed size.
-     * We need to do this iteratively, since if a cell is too small,
-     * it needs to be enlarged, which makes all the other cells smaller,
-     * which could in turn make another cell smaller than allowed.
-     */
-    for (i = range[0]; i < range[1]; i++)
-    {
-        root->bCellMin[i] = FALSE;
-    }
-    nmin = 0;
-    do
-    {
-        nmin_old = nmin;
-        /* We need the total for normalization */
-        fac = 0;
-        for (i = range[0]; i < range[1]; i++)
-        {
-            if (root->bCellMin[i] == FALSE)
-            {
-                fac += cell_size[i];
-            }
-        }
-        fac = ( region_size - nmin*cellsize_limit_f)/fac; /* substracting cells already set to cellsize_limit_f */
-        /* Determine the cell boundaries */
-        for (i = range[0]; i < range[1]; i++)
-        {
-            if (root->bCellMin[i] == FALSE)
-            {
-                cell_size[i] *= fac;
-                if (!bPBC && (i == 0 || i == dd->nc[dim] -1))
-                {
-                    cellsize_limit_f_i = 0;
-                }
-                else
-                {
-                    cellsize_limit_f_i = cellsize_limit_f;
-                }
-                if (cell_size[i] < cellsize_limit_f_i)
-                {
-                    root->bCellMin[i] = TRUE;
-                    cell_size[i]      = cellsize_limit_f_i;
-                    nmin++;
-                }
-            }
-            root->cell_f[i+1] = root->cell_f[i] + cell_size[i];
-        }
-    }
-    while (nmin > nmin_old);
-
-    i            = range[1]-1;
-    cell_size[i] = root->cell_f[i+1] - root->cell_f[i];
-    /* For this check we should not use DD_CELL_MARGIN,
-     * but a slightly smaller factor,
-     * since rounding could get use below the limit.
-     */
-    if (bPBC && cell_size[i] < cellsize_limit_f*DD_CELL_MARGIN2/DD_CELL_MARGIN)
-    {
-        char buf[22];
-        gmx_fatal(FARGS, "step %s: the dynamic load balancing could not balance dimension %c: box size %f, triclinic skew factor %f, #cells %d, minimum cell size %f\n",
-                  gmx_step_str(step, buf),
-                  dim2char(dim), ddbox->box_size[dim], ddbox->skew_fac[dim],
-                  ncd, comm->cellsize_min[dim]);
-    }
-
-    root->bLimited = (nmin > 0) || (range[0] > 0) || (range[1] < ncd);
-
-    if (!bUniform)
-    {
-        /* Check if the boundary did not displace more than halfway
-         * each of the cells it bounds, as this could cause problems,
-         * especially when the differences between cell sizes are large.
-         * If changes are applied, they will not make cells smaller
-         * than the cut-off, as we check all the boundaries which
-         * might be affected by a change and if the old state was ok,
-         * the cells will at most be shrunk back to their old size.
-         */
-        for (i = range[0]+1; i < range[1]; i++)
-        {
-            halfway = 0.5*(root->old_cell_f[i] + root->old_cell_f[i-1]);
-            if (root->cell_f[i] < halfway)
-            {
-                root->cell_f[i] = halfway;
-                /* Check if the change also causes shifts of the next boundaries */
-                for (j = i+1; j < range[1]; j++)
-                {
-                    if (root->cell_f[j] < root->cell_f[j-1] + cellsize_limit_f)
-                    {
-                        root->cell_f[j] =  root->cell_f[j-1] + cellsize_limit_f;
-                    }
-                }
-            }
-            halfway = 0.5*(root->old_cell_f[i] + root->old_cell_f[i+1]);
-            if (root->cell_f[i] > halfway)
-            {
-                root->cell_f[i] = halfway;
-                /* Check if the change also causes shifts of the next boundaries */
-                for (j = i-1; j >= range[0]+1; j--)
-                {
-                    if (root->cell_f[j] > root->cell_f[j+1] - cellsize_limit_f)
-                    {
-                        root->cell_f[j] = root->cell_f[j+1] - cellsize_limit_f;
-                    }
-                }
-            }
-        }
-    }
-
-    /* nrange is defined as [lower, upper) range for new call to enforce_limits */
-    /* find highest violation of LimLo (a) and the following violation of LimHi (thus the lowest following) (b)
-     * then call enforce_limits for (oldb,a), (a,b). In the next step: (b,nexta). oldb and nexta can be the boundaries.
-     * for a and b nrange is used */
-    if (d > 0)
-    {
-        /* Take care of the staggering of the cell boundaries */
-        if (bUniform)
-        {
-            for (i = range[0]; i < range[1]; i++)
-            {
-                root->cell_f_max0[i] = root->cell_f[i];
-                root->cell_f_min1[i] = root->cell_f[i+1];
-            }
-        }
-        else
-        {
-            for (i = range[0]+1; i < range[1]; i++)
-            {
-                bLimLo = (root->cell_f[i] < root->bound_min[i]);
-                bLimHi = (root->cell_f[i] > root->bound_max[i]);
-                if (bLimLo && bLimHi)
-                {
-                    /* Both limits violated, try the best we can */
-                    /* For this case we split the original range (range) in two parts and care about the other limitiations in the next iteration. */
-                    root->cell_f[i] = 0.5*(root->bound_min[i] + root->bound_max[i]);
-                    nrange[0]       = range[0];
-                    nrange[1]       = i;
-                    dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, root, ddbox, bUniform, step, cellsize_limit_f, nrange);
-
-                    nrange[0] = i;
-                    nrange[1] = range[1];
-                    dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, root, ddbox, bUniform, step, cellsize_limit_f, nrange);
-
-                    return;
-                }
-                else if (bLimLo)
-                {
-                    /* root->cell_f[i] = root->bound_min[i]; */
-                    nrange[1] = i;  /* only store violation location. There could be a LimLo violation following with an higher index */
-                    bLastHi   = FALSE;
-                }
-                else if (bLimHi && !bLastHi)
-                {
-                    bLastHi = TRUE;
-                    if (nrange[1] < range[1])   /* found a LimLo before */
-                    {
-                        root->cell_f[nrange[1]] = root->bound_min[nrange[1]];
-                        dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, root, ddbox, bUniform, step, cellsize_limit_f, nrange);
-                        nrange[0] = nrange[1];
-                    }
-                    root->cell_f[i] = root->bound_max[i];
-                    nrange[1]       = i;
-                    dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, root, ddbox, bUniform, step, cellsize_limit_f, nrange);
-                    nrange[0] = i;
-                    nrange[1] = range[1];
-                }
-            }
-            if (nrange[1] < range[1])   /* found last a LimLo */
-            {
-                root->cell_f[nrange[1]] = root->bound_min[nrange[1]];
-                dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, root, ddbox, bUniform, step, cellsize_limit_f, nrange);
-                nrange[0] = nrange[1];
-                nrange[1] = range[1];
-                dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, root, ddbox, bUniform, step, cellsize_limit_f, nrange);
-            }
-            else if (nrange[0] > range[0]) /* found at least one LimHi */
-            {
-                dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, root, ddbox, bUniform, step, cellsize_limit_f, nrange);
-            }
-        }
-    }
-}
-
-
-static void set_dd_cell_sizes_dlb_root(gmx_domdec_t *dd,
-                                       int d, int dim, domdec_root_t *root,
-                                       const gmx_ddbox_t *ddbox,
-                                       gmx_bool bDynamicBox,
-                                       gmx_bool bUniform, gmx_int64_t step)
-{
-    gmx_domdec_comm_t *comm;
-    int                ncd, d1, i, pos;
-    real              *cell_size;
-    real               load_aver, load_i, imbalance, change, change_max, sc;
-    real               cellsize_limit_f, dist_min_f, dist_min_f_hard, space;
-    real               change_limit;
-    real               relax = 0.5;
-    gmx_bool           bPBC;
-    int                range[] = { 0, 0 };
-
-    comm = dd->comm;
-
-    /* Convert the maximum change from the input percentage to a fraction */
-    change_limit = comm->dlb_scale_lim*0.01;
-
-    ncd = dd->nc[dim];
-
-    bPBC = (dim < ddbox->npbcdim);
-
-    cell_size = root->buf_ncd;
-
-    /* Store the original boundaries */
-    for (i = 0; i < ncd+1; i++)
-    {
-        root->old_cell_f[i] = root->cell_f[i];
-    }
-    if (bUniform)
-    {
-        for (i = 0; i < ncd; i++)
-        {
-            cell_size[i] = 1.0/ncd;
-        }
-    }
-    else if (dd_load_count(comm) > 0)
-    {
-        load_aver  = comm->load[d].sum_m/ncd;
-        change_max = 0;
-        for (i = 0; i < ncd; i++)
-        {
-            /* Determine the relative imbalance of cell i */
-            load_i    = comm->load[d].load[i*comm->load[d].nload+2];
-            imbalance = (load_i - load_aver)/(load_aver > 0 ? load_aver : 1);
-            /* Determine the change of the cell size using underrelaxation */
-            change     = -relax*imbalance;
-            change_max = std::max(change_max, std::max(change, -change));
-        }
-        /* Limit the amount of scaling.
-         * We need to use the same rescaling for all cells in one row,
-         * otherwise the load balancing might not converge.
-         */
-        sc = relax;
-        if (change_max > change_limit)
-        {
-            sc *= change_limit/change_max;
-        }
-        for (i = 0; i < ncd; i++)
-        {
-            /* Determine the relative imbalance of cell i */
-            load_i    = comm->load[d].load[i*comm->load[d].nload+2];
-            imbalance = (load_i - load_aver)/(load_aver > 0 ? load_aver : 1);
-            /* Determine the change of the cell size using underrelaxation */
-            change       = -sc*imbalance;
-            cell_size[i] = (root->cell_f[i+1]-root->cell_f[i])*(1 + change);
-        }
-    }
-
-    cellsize_limit_f  = cellsize_min_dlb(comm, d, dim)/ddbox->box_size[dim];
-    cellsize_limit_f *= DD_CELL_MARGIN;
-    dist_min_f_hard   = grid_jump_limit(comm, comm->cutoff, d)/ddbox->box_size[dim];
-    dist_min_f        = dist_min_f_hard * DD_CELL_MARGIN;
-    if (ddbox->tric_dir[dim])
-    {
-        cellsize_limit_f /= ddbox->skew_fac[dim];
-        dist_min_f       /= ddbox->skew_fac[dim];
-    }
-    if (bDynamicBox && d > 0)
-    {
-        dist_min_f *= DD_PRES_SCALE_MARGIN;
-    }
-    if (d > 0 && !bUniform)
-    {
-        /* Make sure that the grid is not shifted too much */
-        for (i = 1; i < ncd; i++)
-        {
-            if (root->cell_f_min1[i] - root->cell_f_max0[i-1] < 2 * dist_min_f_hard)
-            {
-                gmx_incons("Inconsistent DD boundary staggering limits!");
-            }
-            root->bound_min[i] = root->cell_f_max0[i-1] + dist_min_f;
-            space              = root->cell_f[i] - (root->cell_f_max0[i-1] + dist_min_f);
-            if (space > 0)
-            {
-                root->bound_min[i] += 0.5*space;
-            }
-            root->bound_max[i] = root->cell_f_min1[i] - dist_min_f;
-            space              = root->cell_f[i] - (root->cell_f_min1[i] - dist_min_f);
-            if (space < 0)
-            {
-                root->bound_max[i] += 0.5*space;
-            }
-            if (debug)
-            {
-                fprintf(debug,
-                        "dim %d boundary %d %.3f < %.3f < %.3f < %.3f < %.3f\n",
-                        d, i,
-                        root->cell_f_max0[i-1] + dist_min_f,
-                        root->bound_min[i], root->cell_f[i], root->bound_max[i],
-                        root->cell_f_min1[i] - dist_min_f);
-            }
-        }
-    }
-    range[1]          = ncd;
-    root->cell_f[0]   = 0;
-    root->cell_f[ncd] = 1;
-    dd_cell_sizes_dlb_root_enforce_limits(dd, d, dim, root, ddbox, bUniform, step, cellsize_limit_f, range);
-
-
-    /* After the checks above, the cells should obey the cut-off
-     * restrictions, but it does not hurt to check.
-     */
-    for (i = 0; i < ncd; i++)
-    {
-        if (debug)
-        {
-            fprintf(debug, "Relative bounds dim %d  cell %d: %f %f\n",
-                    dim, i, root->cell_f[i], root->cell_f[i+1]);
-        }
-
-        if ((bPBC || (i != 0 && i != dd->nc[dim]-1)) &&
-            root->cell_f[i+1] - root->cell_f[i] <
-            cellsize_limit_f/DD_CELL_MARGIN)
-        {
-            char buf[22];
-            fprintf(stderr,
-                    "\nWARNING step %s: direction %c, cell %d too small: %f\n",
-                    gmx_step_str(step, buf), dim2char(dim), i,
-                    (root->cell_f[i+1] - root->cell_f[i])
-                    *ddbox->box_size[dim]*ddbox->skew_fac[dim]);
-        }
-    }
-
-    pos = ncd + 1;
-    /* Store the cell boundaries of the lower dimensions at the end */
-    for (d1 = 0; d1 < d; d1++)
-    {
-        root->cell_f[pos++] = comm->cell_f0[d1];
-        root->cell_f[pos++] = comm->cell_f1[d1];
-    }
-
-    if (d < comm->npmedecompdim)
-    {
-        /* The master determines the maximum shift for
-         * the coordinate communication between separate PME nodes.
-         */
-        set_pme_maxshift(dd, &comm->ddpme[d], bUniform, ddbox, root->cell_f);
-    }
-    root->cell_f[pos++] = comm->ddpme[0].maxshift;
-    if (d >= 1)
-    {
-        root->cell_f[pos++] = comm->ddpme[1].maxshift;
-    }
-}
-
-static void relative_to_absolute_cell_bounds(gmx_domdec_t      *dd,
-                                             const gmx_ddbox_t *ddbox,
-                                             int                dimind)
-{
-    gmx_domdec_comm_t *comm;
-    int                dim;
-
-    comm = dd->comm;
-
-    /* Set the cell dimensions */
-    dim                = dd->dim[dimind];
-    comm->cell_x0[dim] = comm->cell_f0[dimind]*ddbox->box_size[dim];
-    comm->cell_x1[dim] = comm->cell_f1[dimind]*ddbox->box_size[dim];
-    if (dim >= ddbox->nboundeddim)
-    {
-        comm->cell_x0[dim] += ddbox->box0[dim];
-        comm->cell_x1[dim] += ddbox->box0[dim];
-    }
-}
-
-static void distribute_dd_cell_sizes_dlb(gmx_domdec_t *dd,
-                                         int d, int dim, real *cell_f_row,
-                                         const gmx_ddbox_t *ddbox)
-{
-    gmx_domdec_comm_t *comm;
-    int                d1, pos;
-
-    comm = dd->comm;
-
-#if GMX_MPI
-    /* Each node would only need to know two fractions,
-     * but it is probably cheaper to broadcast the whole array.
-     */
-    MPI_Bcast(cell_f_row, DD_CELL_F_SIZE(dd, d)*sizeof(real), MPI_BYTE,
-              0, comm->mpi_comm_load[d]);
-#endif
-    /* Copy the fractions for this dimension from the buffer */
-    comm->cell_f0[d] = cell_f_row[dd->ci[dim]  ];
-    comm->cell_f1[d] = cell_f_row[dd->ci[dim]+1];
-    /* The whole array was communicated, so set the buffer position */
-    pos = dd->nc[dim] + 1;
-    for (d1 = 0; d1 <= d; d1++)
-    {
-        if (d1 < d)
-        {
-            /* Copy the cell fractions of the lower dimensions */
-            comm->cell_f0[d1] = cell_f_row[pos++];
-            comm->cell_f1[d1] = cell_f_row[pos++];
+                 * as the one with the maximum F time, since the F time
+                 * and the wait time are not independent.
+                 * Furthermore, the step for the max F time should be chosen
+                 * the same on all ranks that share the same GPU.
+                 * But to keep the code simple, we remove the average instead.
+                 * The main reason for artificially long times at some steps
+                 * is spurious CPU activity or MPI time, so we don't expect
+                 * that changes in the GPU wait time matter a lot here.
+                 */
+                gpu_wait *= (comm->cycl_n[ddCyclF] - 1)/(float)comm->cycl_n[ddCyclF];
+            }
+            /* Sum the wait times over the ranks that share the same GPU */
+            MPI_Allreduce(&gpu_wait, &gpu_wait_sum, 1, MPI_FLOAT, MPI_SUM,
+                          comm->mpi_comm_gpu_shared);
+            /* Replace the wait time by the average over the ranks */
+            load += -gpu_wait + gpu_wait_sum/comm->nrank_gpu_shared;
         }
-        relative_to_absolute_cell_bounds(dd, ddbox, d1);
-    }
-    /* Convert the communicated shift from float to int */
-    comm->ddpme[0].maxshift = (int)(cell_f_row[pos++] + 0.5);
-    if (d >= 1)
-    {
-        comm->ddpme[1].maxshift = (int)(cell_f_row[pos++] + 0.5);
+#endif
     }
+
+    return load;
 }
 
-static void set_dd_cell_sizes_dlb_change(gmx_domdec_t *dd,
-                                         const gmx_ddbox_t *ddbox,
-                                         gmx_bool bDynamicBox,
-                                         gmx_bool bUniform, gmx_int64_t step)
+static void set_slb_pme_dim_f(gmx_domdec_t *dd, int dim, real **dim_f)
 {
     gmx_domdec_comm_t *comm;
-    int                d, dim, d1;
-    gmx_bool           bRowMember, bRowRoot;
-    real              *cell_f_row;
+    int                i;
 
     comm = dd->comm;
 
-    for (d = 0; d < dd->ndim; d++)
+    snew(*dim_f, dd->nc[dim]+1);
+    (*dim_f)[0] = 0;
+    for (i = 1; i < dd->nc[dim]; i++)
     {
-        dim        = dd->dim[d];
-        bRowMember = TRUE;
-        bRowRoot   = TRUE;
-        for (d1 = d; d1 < dd->ndim; d1++)
+        if (comm->slb_frac[dim])
         {
-            if (dd->ci[dd->dim[d1]] > 0)
-            {
-                if (d1 != d)
-                {
-                    bRowMember = FALSE;
-                }
-                bRowRoot = FALSE;
-            }
+            (*dim_f)[i] = (*dim_f)[i-1] + comm->slb_frac[dim][i-1];
         }
-        if (bRowMember)
+        else
         {
-            if (bRowRoot)
-            {
-                set_dd_cell_sizes_dlb_root(dd, d, dim, comm->root[d],
-                                           ddbox, bDynamicBox, bUniform, step);
-                cell_f_row = comm->root[d]->cell_f;
-            }
-            else
-            {
-                cell_f_row = comm->cell_f_row;
-            }
-            distribute_dd_cell_sizes_dlb(dd, d, dim, cell_f_row, ddbox);
+            (*dim_f)[i] = (real)i/(real)dd->nc[dim];
         }
     }
+    (*dim_f)[dd->nc[dim]] = 1;
 }
 
-static void set_dd_cell_sizes_dlb_nochange(gmx_domdec_t      *dd,
-                                           const gmx_ddbox_t *ddbox)
+static void init_ddpme(gmx_domdec_t *dd, gmx_ddpme_t *ddpme, int dimind)
 {
-    int d;
+    int  pmeindex, slab, nso, i;
+    ivec xyz;
 
-    /* This function assumes the box is static and should therefore
-     * not be called when the box has changed since the last
-     * call to dd_partition_system.
-     */
-    for (d = 0; d < dd->ndim; d++)
+    if (dimind == 0 && dd->dim[0] == YY && dd->comm->npmenodes_x == 1)
     {
-        relative_to_absolute_cell_bounds(dd, ddbox, d);
+        ddpme->dim = YY;
     }
-}
-
-
-
-static void set_dd_cell_sizes_dlb(gmx_domdec_t *dd,
-                                  const gmx_ddbox_t *ddbox, gmx_bool bDynamicBox,
-                                  gmx_bool bUniform, gmx_bool bDoDLB, gmx_int64_t step,
-                                  gmx_wallcycle_t wcycle)
-{
-    gmx_domdec_comm_t *comm;
-    int                dim;
-
-    comm = dd->comm;
-
-    if (bDoDLB)
+    else
     {
-        wallcycle_start(wcycle, ewcDDCOMMBOUND);
-        set_dd_cell_sizes_dlb_change(dd, ddbox, bDynamicBox, bUniform, step);
-        wallcycle_stop(wcycle, ewcDDCOMMBOUND);
+        ddpme->dim = dimind;
     }
-    else if (bDynamicBox)
+    ddpme->dim_match = (ddpme->dim == dd->dim[dimind]);
+
+    ddpme->nslab = (ddpme->dim == 0 ?
+                    dd->comm->npmenodes_x :
+                    dd->comm->npmenodes_y);
+
+    if (ddpme->nslab <= 1)
     {
-        set_dd_cell_sizes_dlb_nochange(dd, ddbox);
+        return;
     }
 
-    /* Set the dimensions for which no DD is used */
-    for (dim = 0; dim < DIM; dim++)
+    nso = dd->comm->npmenodes/ddpme->nslab;
+    /* Determine for each PME slab the PP location range for dimension dim */
+    snew(ddpme->pp_min, ddpme->nslab);
+    snew(ddpme->pp_max, ddpme->nslab);
+    for (slab = 0; slab < ddpme->nslab; slab++)
     {
-        if (dd->nc[dim] == 1)
-        {
-            comm->cell_x0[dim] = 0;
-            comm->cell_x1[dim] = ddbox->box_size[dim];
-            if (dim >= ddbox->nboundeddim)
-            {
-                comm->cell_x0[dim] += ddbox->box0[dim];
-                comm->cell_x1[dim] += ddbox->box0[dim];
-            }
-        }
+        ddpme->pp_min[slab] = dd->nc[dd->dim[dimind]] - 1;
+        ddpme->pp_max[slab] = 0;
     }
-}
-
-static void realloc_comm_ind(gmx_domdec_t *dd, ivec npulse)
-{
-    int                    d, np, i;
-    gmx_domdec_comm_dim_t *cd;
-
-    for (d = 0; d < dd->ndim; d++)
+    for (i = 0; i < dd->nnodes; i++)
     {
-        cd = &dd->comm->cd[d];
-        np = npulse[dd->dim[d]];
-        if (np > cd->np_nalloc)
+        ddindex2xyz(dd->nc, i, xyz);
+        /* For y only use our y/z slab.
+         * This assumes that the PME x grid size matches the DD grid size.
+         */
+        if (dimind == 0 || xyz[XX] == dd->ci[XX])
         {
-            if (debug)
-            {
-                fprintf(debug, "(Re)allocing cd for %c to %d pulses\n",
-                        dim2char(dd->dim[d]), np);
-            }
-            if (DDMASTER(dd) && cd->np_nalloc > 0)
+            pmeindex = ddindex2pmeindex(dd, i);
+            if (dimind == 0)
             {
-                fprintf(stderr, "\nIncreasing the number of cell to communicate in dimension %c to %d for the first time\n", dim2char(dd->dim[d]), np);
+                slab = pmeindex/nso;
             }
-            srenew(cd->ind, np);
-            for (i = cd->np_nalloc; i < np; i++)
+            else
             {
-                cd->ind[i].index  = nullptr;
-                cd->ind[i].nalloc = 0;
+                slab = pmeindex % ddpme->nslab;
             }
-            cd->np_nalloc = np;
+            ddpme->pp_min[slab] = std::min(ddpme->pp_min[slab], xyz[dimind]);
+            ddpme->pp_max[slab] = std::max(ddpme->pp_max[slab], xyz[dimind]);
         }
-        cd->np = np;
     }
-}
 
+    set_slb_pme_dim_f(dd, ddpme->dim, &ddpme->slb_dim_f);
+}
 
-static void set_dd_cell_sizes(gmx_domdec_t *dd,
-                              gmx_ddbox_t *ddbox, gmx_bool bDynamicBox,
-                              gmx_bool bUniform, gmx_bool bDoDLB, gmx_int64_t step,
-                              gmx_wallcycle_t wcycle)
+int dd_pme_maxshift_x(const gmx_domdec_t *dd)
 {
-    gmx_domdec_comm_t *comm;
-    int                d;
-    ivec               npulse;
-
-    comm = dd->comm;
-
-    /* Copy the old cell boundaries for the cg displacement check */
-    copy_rvec(comm->cell_x0, comm->old_cell_x0);
-    copy_rvec(comm->cell_x1, comm->old_cell_x1);
-
-    if (isDlbOn(comm))
+    if (dd->comm->ddpme[0].dim == XX)
     {
-        if (DDMASTER(dd))
-        {
-            check_box_size(dd, ddbox);
-        }
-        set_dd_cell_sizes_dlb(dd, ddbox, bDynamicBox, bUniform, bDoDLB, step, wcycle);
+        return dd->comm->ddpme[0].maxshift;
     }
     else
     {
-        set_dd_cell_sizes_slb(dd, ddbox, setcellsizeslbLOCAL, npulse);
-        realloc_comm_ind(dd, npulse);
+        return 0;
     }
+}
 
-    if (debug)
+int dd_pme_maxshift_y(const gmx_domdec_t *dd)
+{
+    if (dd->comm->ddpme[0].dim == YY)
     {
-        for (d = 0; d < DIM; d++)
-        {
-            fprintf(debug, "cell_x[%d] %f - %f skew_fac %f\n",
-                    d, comm->cell_x0[d], comm->cell_x1[d], ddbox->skew_fac[d]);
-        }
+        return dd->comm->ddpme[0].maxshift;
+    }
+    else if (dd->comm->npmedecompdim >= 2 && dd->comm->ddpme[1].dim == YY)
+    {
+        return dd->comm->ddpme[1].maxshift;
+    }
+    else
+    {
+        return 0;
     }
 }
 
@@ -3449,269 +2288,6 @@ static void comm_dd_ns_cell_sizes(gmx_domdec_t *dd,
     }
 }
 
-static void distribute_cg(FILE *fplog,
-                          const matrix box, ivec tric_dir, t_block *cgs, rvec pos[],
-                          gmx_domdec_t *dd)
-{
-    gmx_domdec_master_t *ma;
-    int                **tmp_ind = nullptr, *tmp_nalloc = nullptr;
-    int                  i, icg, j, k, k0, k1, d;
-    matrix               tcm;
-    rvec                 cg_cm;
-    ivec                 ind;
-    real                 nrcg, inv_ncg, pos_d;
-    int                 *cgindex;
-    gmx_bool             bScrew;
-
-    ma = dd->ma;
-
-    snew(tmp_nalloc, dd->nnodes);
-    snew(tmp_ind, dd->nnodes);
-    for (i = 0; i < dd->nnodes; i++)
-    {
-        tmp_nalloc[i] = over_alloc_large(cgs->nr/dd->nnodes+1);
-        snew(tmp_ind[i], tmp_nalloc[i]);
-    }
-
-    /* Clear the count */
-    for (i = 0; i < dd->nnodes; i++)
-    {
-        ma->ncg[i] = 0;
-        ma->nat[i] = 0;
-    }
-
-    make_tric_corr_matrix(dd->npbcdim, box, tcm);
-
-    cgindex = cgs->index;
-
-    /* Compute the center of geometry for all charge groups */
-    for (icg = 0; icg < cgs->nr; icg++)
-    {
-        k0      = cgindex[icg];
-        k1      = cgindex[icg+1];
-        nrcg    = k1 - k0;
-        if (nrcg == 1)
-        {
-            copy_rvec(pos[k0], cg_cm);
-        }
-        else
-        {
-            inv_ncg = 1.0/nrcg;
-
-            clear_rvec(cg_cm);
-            for (k = k0; (k < k1); k++)
-            {
-                rvec_inc(cg_cm, pos[k]);
-            }
-            for (d = 0; (d < DIM); d++)
-            {
-                cg_cm[d] *= inv_ncg;
-            }
-        }
-        /* Put the charge group in the box and determine the cell index */
-        for (d = DIM-1; d >= 0; d--)
-        {
-            pos_d = cg_cm[d];
-            if (d < dd->npbcdim)
-            {
-                bScrew = (dd->bScrewPBC && d == XX);
-                if (tric_dir[d] && dd->nc[d] > 1)
-                {
-                    /* Use triclinic coordintates for this dimension */
-                    for (j = d+1; j < DIM; j++)
-                    {
-                        pos_d += cg_cm[j]*tcm[j][d];
-                    }
-                }
-                while (pos_d >= box[d][d])
-                {
-                    pos_d -= box[d][d];
-                    rvec_dec(cg_cm, box[d]);
-                    if (bScrew)
-                    {
-                        cg_cm[YY] = box[YY][YY] - cg_cm[YY];
-                        cg_cm[ZZ] = box[ZZ][ZZ] - cg_cm[ZZ];
-                    }
-                    for (k = k0; (k < k1); k++)
-                    {
-                        rvec_dec(pos[k], box[d]);
-                        if (bScrew)
-                        {
-                            pos[k][YY] = box[YY][YY] - pos[k][YY];
-                            pos[k][ZZ] = box[ZZ][ZZ] - pos[k][ZZ];
-                        }
-                    }
-                }
-                while (pos_d < 0)
-                {
-                    pos_d += box[d][d];
-                    rvec_inc(cg_cm, box[d]);
-                    if (bScrew)
-                    {
-                        cg_cm[YY] = box[YY][YY] - cg_cm[YY];
-                        cg_cm[ZZ] = box[ZZ][ZZ] - cg_cm[ZZ];
-                    }
-                    for (k = k0; (k < k1); k++)
-                    {
-                        rvec_inc(pos[k], box[d]);
-                        if (bScrew)
-                        {
-                            pos[k][YY] = box[YY][YY] - pos[k][YY];
-                            pos[k][ZZ] = box[ZZ][ZZ] - pos[k][ZZ];
-                        }
-                    }
-                }
-            }
-            /* This could be done more efficiently */
-            ind[d] = 0;
-            while (ind[d]+1 < dd->nc[d] && pos_d >= ma->cell_x[d][ind[d]+1])
-            {
-                ind[d]++;
-            }
-        }
-        i = dd_index(dd->nc, ind);
-        if (ma->ncg[i] == tmp_nalloc[i])
-        {
-            tmp_nalloc[i] = over_alloc_large(ma->ncg[i]+1);
-            srenew(tmp_ind[i], tmp_nalloc[i]);
-        }
-        tmp_ind[i][ma->ncg[i]] = icg;
-        ma->ncg[i]++;
-        ma->nat[i] += cgindex[icg+1] - cgindex[icg];
-    }
-
-    k1 = 0;
-    for (i = 0; i < dd->nnodes; i++)
-    {
-        ma->index[i] = k1;
-        for (k = 0; k < ma->ncg[i]; k++)
-        {
-            ma->cg[k1++] = tmp_ind[i][k];
-        }
-    }
-    ma->index[dd->nnodes] = k1;
-
-    for (i = 0; i < dd->nnodes; i++)
-    {
-        sfree(tmp_ind[i]);
-    }
-    sfree(tmp_ind);
-    sfree(tmp_nalloc);
-
-    if (fplog)
-    {
-        // Use double for the sums to avoid natoms^2 overflowing
-        // (65537^2 > 2^32)
-        int    nat_sum, nat_min, nat_max;
-        double nat2_sum;
-
-        nat_sum  = 0;
-        nat2_sum = 0;
-        nat_min  = ma->nat[0];
-        nat_max  = ma->nat[0];
-        for (i = 0; i < dd->nnodes; i++)
-        {
-            nat_sum  += ma->nat[i];
-            // cast to double to avoid integer overflows when squaring
-            nat2_sum += gmx::square(static_cast<double>(ma->nat[i]));
-            nat_min   = std::min(nat_min, ma->nat[i]);
-            nat_max   = std::max(nat_max, ma->nat[i]);
-        }
-        nat_sum  /= dd->nnodes;
-        nat2_sum /= dd->nnodes;
-
-        fprintf(fplog, "Atom distribution over %d domains: av %d stddev %d min %d max %d\n",
-                dd->nnodes,
-                nat_sum,
-                static_cast<int>(std::sqrt(nat2_sum - gmx::square(static_cast<double>(nat_sum)) + 0.5)),
-                nat_min, nat_max);
-    }
-}
-
-static void get_cg_distribution(FILE *fplog, gmx_domdec_t *dd,
-                                t_block *cgs, const matrix box, gmx_ddbox_t *ddbox,
-                                rvec pos[])
-{
-    gmx_domdec_master_t *ma = nullptr;
-    ivec                 npulse;
-    int                  i, cg_gl;
-    int                 *ibuf, buf2[2] = { 0, 0 };
-    gmx_bool             bMaster = DDMASTER(dd);
-
-    if (bMaster)
-    {
-        ma = dd->ma;
-
-        if (dd->bScrewPBC)
-        {
-            check_screw_box(box);
-        }
-
-        set_dd_cell_sizes_slb(dd, ddbox, setcellsizeslbMASTER, npulse);
-
-        distribute_cg(fplog, box, ddbox->tric_dir, cgs, pos, dd);
-        for (i = 0; i < dd->nnodes; i++)
-        {
-            ma->ibuf[2*i]   = ma->ncg[i];
-            ma->ibuf[2*i+1] = ma->nat[i];
-        }
-        ibuf = ma->ibuf;
-    }
-    else
-    {
-        ibuf = nullptr;
-    }
-    dd_scatter(dd, 2*sizeof(int), ibuf, buf2);
-
-    dd->ncg_home = buf2[0];
-    dd->nat_home = buf2[1];
-    dd->ncg_tot  = dd->ncg_home;
-    dd->nat_tot  = dd->nat_home;
-    if (dd->ncg_home > dd->cg_nalloc || dd->cg_nalloc == 0)
-    {
-        dd->cg_nalloc = over_alloc_dd(dd->ncg_home);
-        srenew(dd->index_gl, dd->cg_nalloc);
-        srenew(dd->cgindex, dd->cg_nalloc+1);
-    }
-    if (bMaster)
-    {
-        for (i = 0; i < dd->nnodes; i++)
-        {
-            ma->ibuf[i]            = ma->ncg[i]*sizeof(int);
-            ma->ibuf[dd->nnodes+i] = ma->index[i]*sizeof(int);
-        }
-    }
-
-    dd_scatterv(dd,
-                bMaster ? ma->ibuf : nullptr,
-                bMaster ? ma->ibuf+dd->nnodes : nullptr,
-                bMaster ? ma->cg : nullptr,
-                dd->ncg_home*sizeof(int), dd->index_gl);
-
-    /* Determine the home charge group sizes */
-    dd->cgindex[0] = 0;
-    for (i = 0; i < dd->ncg_home; i++)
-    {
-        cg_gl            = dd->index_gl[i];
-        dd->cgindex[i+1] =
-            dd->cgindex[i] + cgs->index[cg_gl+1] - cgs->index[cg_gl];
-    }
-
-    if (debug)
-    {
-        fprintf(debug, "Home charge groups:\n");
-        for (i = 0; i < dd->ncg_home; i++)
-        {
-            fprintf(debug, " %d", dd->index_gl[i]);
-            if (i % 10 == 9)
-            {
-                fprintf(debug, "\n");
-            }
-        }
-        fprintf(debug, "\n");
-    }
-}
-
 void dd_cycles_add(const gmx_domdec_t *dd, float cycles, int ddCycl)
 {
     /* Note that the cycles value can be incorrect, either 0 or some
@@ -4278,7 +2854,7 @@ static void make_load_communicator(gmx_domdec_t *dd, int dim_ind, ivec loc)
                 /* This is the root process of this row */
                 snew(dd->comm->root[dim_ind], 1);
                 root = dd->comm->root[dim_ind];
-                snew(root->cell_f, DD_CELL_F_SIZE(dd, dim_ind));
+                snew(root->cell_f, ddCellFractionBufferSize(dd, dim_ind));
                 snew(root->old_cell_f, dd->nc[dim]+1);
                 snew(root->bCellMin, dd->nc[dim]);
                 if (dim_ind > 0)
@@ -4293,7 +2869,7 @@ static void make_load_communicator(gmx_domdec_t *dd, int dim_ind, ivec loc)
             else
             {
                 /* This is not a root process, we only need to receive cell_f */
-                snew(dd->comm->cell_f_row, DD_CELL_F_SIZE(dd, dim_ind));
+                snew(dd->comm->cell_f_row, ddCellFractionBufferSize(dd, dim_ind));
             }
         }
         if (dd->ci[dim] == dd->master_ci[dim])
@@ -4690,7 +3266,7 @@ static gmx_domdec_master_t *init_gmx_domdec_master_t(gmx_domdec_t *dd,
         snew(ma->cell_x[i], dd->nc[i]+1);
     }
 
-    if (dd->nnodes <= GMX_DD_NNODES_SENDRECV)
+    if (dd->nnodes <= c_maxNumRanksUseSendRecvForScatterAndGather)
     {
         ma->vbuf = nullptr;
     }
@@ -8334,19 +6910,14 @@ void dd_partition_system(FILE                *fplog,
         clear_dd_indices(dd, 0, 0);
         ncgindex_set = 0;
 
-        rvec *xGlobal = (SIMMASTER(cr) ? as_rvec_array(state_global->x.data()) : nullptr);
+        rvec *xGlobal = (DDMASTER(dd) ? as_rvec_array(state_global->x.data()) : nullptr);
 
-        set_ddbox(dd, bMasterState, ir,
-                  SIMMASTER(cr) ? state_global->box : nullptr,
+        set_ddbox(dd, TRUE, ir,
+                  DDMASTER(dd) ? state_global->box : nullptr,
                   TRUE, cgs_gl, xGlobal,
                   &ddbox);
 
-        get_cg_distribution(fplog, dd, cgs_gl,
-                            SIMMASTER(cr) ? state_global->box : nullptr,
-                            &ddbox, xGlobal);
-
-        dd_distribute_state(dd, cgs_gl,
-                            state_global, state_local, f);
+        distributeState(fplog, dd, state_global, *cgs_gl, ddbox, state_local, f);
 
         dd_make_local_cgs(dd, &top_local->cgs);
 
index e1107052d6267a284124c005b92db7e985553b24..1fdf5d561f2905ea5684e6dc2e419499f539da69 100644 (file)
@@ -400,6 +400,20 @@ struct gmx_domdec_comm_t
     int  DD_debug;                     /**< DD debug print level: 0, 1, 2 */
 };
 
+/*! \brief Data only available on the master rank */
+struct gmx_domdec_master_t
+{
+    /* The cell boundaries */
+    real **cell_x; /**< Cell boundaries, size #dim x #cells_in_dim + 1 */
+    /* The global charge group division */
+    int   *ncg;    /**< Number of home charge groups for each node */
+    int   *index;  /**< Index of nnodes+1 into cg */
+    int   *cg;     /**< Global charge group index */
+    int   *nat;    /**< Number of home atoms for each node. */
+    int   *ibuf;   /**< Buffer for communication */
+    rvec  *vbuf;   /**< Buffer for state scattering and gathering */
+};
+
 /*! \brief DD zone permutation
  *
  * Zone permutation from the Cartesian x-major/z-minor order to an order
@@ -425,6 +439,42 @@ real dd_cutoff_multibody(const gmx_domdec_t *dd);
 /*! \brief Returns the DD cut-off distance for two-body interactions */
 real dd_cutoff_twobody(const gmx_domdec_t *dd);
 
+/*! \brief Returns the domain index given the number of domains and the domain coordinates
+ *
+ * This order is required to minimize the coordinate communication in PME
+ * which uses decomposition in the x direction.
+ */
+static inline int dd_index(const ivec numDomains,
+                           const ivec domainCoordinates)
+{
+    return ((domainCoordinates[XX]*numDomains[YY] + domainCoordinates[YY])*numDomains[ZZ]) + domainCoordinates[ZZ];
+};
+
+/*! Returns the size of the buffer to hold fractional cell boundaries for DD dimension index dimIndex */
+static inline int ddCellFractionBufferSize(const gmx_domdec_t *dd,
+                                           int                 dimIndex)
+{
+    return dd->nc[dd->dim[dimIndex ]] + 1 + dimIndex*2 + 1 + dimIndex;
+}
+
+/*! \brief Maximum number of ranks for using send/recv for state scattering and gathering
+ *
+ * Use separate MPI send and receive commands
+ * when #ranks <= c_maxNumRanksUseSendRecvForScatterAndGather
+ * This saves memory (and some copying for small #ranks).
+ * For high parallelization scatter and gather calls are used.
+ */
+static constexpr int c_maxNumRanksUseSendRecvForScatterAndGather = 4;
+
+/*! \brief Make DD cells larger by this factor than the limit to avoid rounding issues */
+static constexpr double DD_CELL_MARGIN       = 1.0001;
+
+/*! \brief Factor for checking DD cell size limitation during DLB, should be in between 1 and DD_CELL_MARGIN */
+static constexpr double DD_CELL_MARGIN2      = 1.00005;
+
+/*! \brief With pressure scaling, keep cell sizes 2% above the limit to allow for some scaling */
+static constexpr double DD_PRES_SCALE_MARGIN = 1.02;
+
 /*! \endcond */
 
 #endif
index a5a47caa6da92ee5831a8afba93e40f8de5fbe6c..53ef072259a9e06e7770224a28735a8128e28848 100644 (file)
@@ -98,6 +98,12 @@ static inline void vec_rvec_check_alloc(vec_rvec_t *v,
     }
 };
 
+/*! \brief Returns the number of MD steps for which load has been recorded */
+static inline int dd_load_count(const gmx_domdec_comm_t *comm)
+{
+    return (comm->eFlop ? comm->flop_n : comm->cycl_n[ddCyclF]);
+}
+
 /*! \brief Resize the state and f, if !=nullptr, to natoms */
 void dd_resize_state(t_state          *state,
                      PaddedRVecVector *f,