Introduce ObservablesReducer
authorMark Abraham <mark.j.abraham@gmail.com>
Wed, 18 Aug 2021 14:36:19 +0000 (14:36 +0000)
committerJoe Jordan <ejjordan12@gmail.com>
Wed, 18 Aug 2021 14:36:19 +0000 (14:36 +0000)
src/gromacs/mdtypes/CMakeLists.txt
src/gromacs/mdtypes/observablesreducer.cpp [new file with mode: 0644]
src/gromacs/mdtypes/observablesreducer.h [new file with mode: 0644]
src/gromacs/mdtypes/tests/CMakeLists.txt
src/gromacs/mdtypes/tests/observablesreducer.cpp [new file with mode: 0644]

index a9ee14787a7b079ca1e99219d41d7afa73b1cef8..bdc90b01615f757d5a410159e24128b891580d66 100644 (file)
@@ -48,6 +48,7 @@ file(GLOB MDTYPES_SOURCES
     md_enums.cpp
     multipletimestepping.cpp
     observableshistory.cpp
+    observablesreducer.cpp
     state.cpp)
 
 if(GMX_GPU)
diff --git a/src/gromacs/mdtypes/observablesreducer.cpp b/src/gromacs/mdtypes/observablesreducer.cpp
new file mode 100644 (file)
index 0000000..83db804
--- /dev/null
@@ -0,0 +1,262 @@
+/*
+ * This file is part of the GROMACS molecular simulation package.
+ *
+ * Copyright (c) 2021, by the GROMACS development team, led by
+ * Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
+ * and including many others, as listed in the AUTHORS file in the
+ * top-level source directory and at http://www.gromacs.org.
+ *
+ * GROMACS is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1
+ * of the License, or (at your option) any later version.
+ *
+ * GROMACS is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with GROMACS; if not, see
+ * http://www.gnu.org/licenses, or write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
+ *
+ * If you want to redistribute modifications to GROMACS, please
+ * consider that scientific software is very special. Version
+ * control is crucial - bugs must be traceable. We will be happy to
+ * consider code for inclusion in the official distribution, but
+ * derived work must not be called official GROMACS. Details are found
+ * in the README & COPYING files - if they are missing, get the
+ * official version at http://www.gromacs.org.
+ *
+ * To help us fund GROMACS development, we humbly ask that you cite
+ * the research papers on the package. Check out http://www.gromacs.org.
+ */
+/*! \internal \file
+ * \brief
+ * Defines gmx::ObservablesReducer and its builder
+ *
+ * These are defined in the same translation unit so that the Impl
+ * object of the ObservablesReducer can be built by the builder.
+ *
+ * \inlibraryapi
+ * \ingroup module_mdtypes
+ */
+#include "gmxpre.h"
+
+#include "observablesreducer.h"
+
+#include <algorithm>
+#include <numeric>
+
+#include "gromacs/utility/arrayref.h"
+#include "gromacs/utility/gmxassert.h"
+
+namespace gmx
+{
+
+//! Impl class for ObservablesReducer
+class ObservablesReducer::Impl
+{
+public:
+    //! Constructor
+    Impl(std::vector<double>&&                                            communicationBuffer,
+         std::vector<ObservablesReducerBuilder::CallbackAfterReduction>&& registeredCallbacks) :
+        communicationBuffer_(std::move(communicationBuffer)),
+        registeredCallbacks_(std::move(registeredCallbacks))
+    {
+    }
+
+    /*! \brief May be called by any subscribed module, via the callback
+     * supplied by ObservablesReducerBuilder::build().
+     *
+     * If this method has been called on this rank since the last call
+     * to reduceComplete() with a \c requirement to communicate soon,
+     * then it will make the communication buffer available via
+     * communicationBuffer() to \c compute_globals(), so it can copy
+     * it to the buffer it uses for MPI communication.
+     *
+     * It is the subscribers' responsibility to coordinate so that
+     * all subscribers on all ranks agree on the need to
+     * communicate, e.g. by orchestating communication based on
+     * the current step number or a previous message.
+     *
+     * Does not check that the callback corresponds to a module that
+     * subscribed to the builder().
+     *
+     * Returns the status of the ObservablesReducer about whether
+     * reduction has already been called on this step.
+     */
+    ObservablesReducerStatus requireReduction(int callbackIndex, ReductionRequirement requirement)
+    {
+        if (requirement == ReductionRequirement::Soon)
+        {
+            reduceSoon_ = true;
+        }
+        callbacksAfterReduction_.push_back(callbackIndex);
+        return status_;
+    }
+
+    /*! \brief Storage for communication buffer
+     *
+     * Must never be resized, because that would potentially
+     * invalidate views of it held by the subscribers. */
+    std::vector<double> communicationBuffer_;
+    //! Registered callbacks that might be used after communication
+    std::vector<ObservablesReducerBuilder::CallbackAfterReduction> registeredCallbacks_;
+    //! Indices into registeredCallbacks_ of callbacks to use after the next reduction
+    std::vector<int> callbacksAfterReduction_;
+    /*! \brief Whether the reduction will occur soon because a
+     * module required it
+     *
+     * "Soon" means this step or next, depending when during the step
+     * it was required, as there is only one point during a normal
+     * simulator step where observables reduction might occur. */
+    bool reduceSoon_ = false;
+    //! Whether reduction has taken place this step
+    ObservablesReducerStatus status_ = ObservablesReducerStatus::ReadyToReduce;
+};
+
+ObservablesReducer::ObservablesReducer(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
+
+ObservablesReducer::ObservablesReducer(ObservablesReducer&&) noexcept = default;
+
+ObservablesReducer::~ObservablesReducer() = default;
+
+ObservablesReducer& ObservablesReducer::operator=(ObservablesReducer&& other) noexcept
+{
+    impl_ = std::move(other.impl_);
+    return *this;
+}
+
+ArrayRef<double> ObservablesReducer::communicationBuffer()
+{
+    if (!impl_->reduceSoon_)
+    {
+        // Nothing to reduce
+        return {};
+    }
+    return impl_->communicationBuffer_;
+}
+
+void ObservablesReducer::reductionComplete(Step step)
+{
+    impl_->status_ = ObservablesReducerStatus::AlreadyReducedThisStep;
+    for (auto& callbackIndex : impl_->callbacksAfterReduction_)
+    {
+        impl_->registeredCallbacks_[callbackIndex](step);
+    }
+    // Prepare for the next reduction
+    std::fill(impl_->communicationBuffer_.begin(), impl_->communicationBuffer_.end(), 0.0);
+    impl_->callbacksAfterReduction_.clear();
+    impl_->reduceSoon_ = false;
+}
+
+void ObservablesReducer::stepComplete()
+{
+    impl_->status_ = ObservablesReducerStatus::ReadyToReduce;
+}
+
+//! Impl class for ObservablesReducerBuilder
+class ObservablesReducerBuilder::Impl
+{
+public:
+    //! Data required to set up a subscription
+    struct Subscription
+    {
+        //! Number of doubles required to reduce
+        int sizeRequired;
+        //! The callback to notify of the view and future callback to require reduction
+        ObservablesReducerBuilder::CallbackFromBuilder callbackFromBuilder;
+        //! The callback later used by ObservablesReducer after reduction events
+        ObservablesReducerBuilder::CallbackAfterReduction callbackAfterReduction;
+    };
+
+    //! Contains all subscriptions received
+    std::vector<Subscription> subscriptions_;
+    //! Whether build() has already been called on the owner object
+    bool buildHasBeenCalled_ = false;
+};
+
+ObservablesReducerBuilder::ObservablesReducerBuilder() : impl_(std::make_unique<Impl>()) {}
+
+ObservablesReducerBuilder::ObservablesReducerBuilder(ObservablesReducerBuilder&&) noexcept = default;
+
+ObservablesReducerBuilder& ObservablesReducerBuilder::operator=(ObservablesReducerBuilder&& other) noexcept
+{
+    impl_ = std::move(other.impl_);
+    return *this;
+}
+
+ObservablesReducerBuilder::~ObservablesReducerBuilder() = default;
+
+void ObservablesReducerBuilder::addSubscriber(const int                sizeRequired,
+                                              CallbackFromBuilder&&    callbackFromBuilder,
+                                              CallbackAfterReduction&& callbackAfterReduction)
+{
+    GMX_RELEASE_ASSERT(!impl_->buildHasBeenCalled_,
+                       "Cannot add subscribers to a builder once build() has been called");
+    impl_->subscriptions_.emplace_back(Impl::Subscription{
+            sizeRequired, std::move(callbackFromBuilder), std::move(callbackAfterReduction) });
+}
+
+ObservablesReducer ObservablesReducerBuilder::build()
+{
+    GMX_RELEASE_ASSERT(!impl_->buildHasBeenCalled_,
+                       "Cannot build ObservablesReducer again from the same builder");
+
+    // Prepare the communication buffer
+    const int           totalSizeRequired = std::accumulate(impl_->subscriptions_.begin(),
+                                                  impl_->subscriptions_.end(),
+                                                  0.0,
+                                                  [](int subtotal, const auto& subscription) {
+                                                      return subtotal + subscription.sizeRequired;
+                                                  });
+    std::vector<double> communicationBuffer(totalSizeRequired);
+    // Set up a view of the communication buffer that we can use after
+    // ownership has been transferred to the impl object.
+    const ArrayRef<double> bufferView(communicationBuffer);
+
+    std::vector<ObservablesReducerBuilder::CallbackAfterReduction> registeredCallbacksAfterReduction;
+    registeredCallbacksAfterReduction.reserve(impl_->subscriptions_.size());
+    for (const Impl::Subscription& subscription : impl_->subscriptions_)
+    {
+        registeredCallbacksAfterReduction.emplace_back(subscription.callbackAfterReduction);
+    }
+
+    // Make the impl object so we can set up callbacks to it. This is
+    // safe because the impl object is allocated on the heap, so we
+    // have a stable value to share with the subscribers.
+    auto implPtr = std::make_unique<ObservablesReducer::Impl>(
+            std::move(communicationBuffer), std::move(registeredCallbacksAfterReduction));
+    auto* impl = implPtr.get();
+
+    // Then use the impl object to make the real one. Note that there
+    // is no need for an ObservablesReducer to keep track of all the
+    // modules that might notify it, so it doesn't do that.
+    ObservablesReducer observablesReducer(std::move(implPtr));
+
+    // Now let the subscribers know how to require reduction in
+    // future, and which memory they should use for input and output.
+    size_t start                         = 0;
+    int    indexToCallbackAfterReduction = 0;
+    for (const Impl::Subscription& subscription : impl_->subscriptions_)
+    {
+        // Construct the callback that will hereafter be owned by the
+        // subscriber.
+        CallbackToRequireReduction callbackToRequireReduction =
+                [impl, indexToCallbackAfterReduction](ReductionRequirement requirement) {
+                    return impl->requireReduction(indexToCallbackAfterReduction, requirement);
+                };
+        subscription.callbackFromBuilder(std::move(callbackToRequireReduction),
+                                         bufferView.subArray(start, subscription.sizeRequired));
+        start += subscription.sizeRequired;
+        ++indexToCallbackAfterReduction;
+    }
+
+    impl_->buildHasBeenCalled_ = true;
+
+    return observablesReducer;
+}
+
+} // namespace gmx
diff --git a/src/gromacs/mdtypes/observablesreducer.h b/src/gromacs/mdtypes/observablesreducer.h
new file mode 100644 (file)
index 0000000..4221f20
--- /dev/null
@@ -0,0 +1,463 @@
+/*
+ * This file is part of the GROMACS molecular simulation package.
+ *
+ * Copyright (c) 2021, by the GROMACS development team, led by
+ * Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
+ * and including many others, as listed in the AUTHORS file in the
+ * top-level source directory and at http://www.gromacs.org.
+ *
+ * GROMACS is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1
+ * of the License, or (at your option) any later version.
+ *
+ * GROMACS is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with GROMACS; if not, see
+ * http://www.gnu.org/licenses, or write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
+ *
+ * If you want to redistribute modifications to GROMACS, please
+ * consider that scientific software is very special. Version
+ * control is crucial - bugs must be traceable. We will be happy to
+ * consider code for inclusion in the official distribution, but
+ * derived work must not be called official GROMACS. Details are found
+ * in the README & COPYING files - if they are missing, get the
+ * official version at http://www.gromacs.org.
+ *
+ * To help us fund GROMACS development, we humbly ask that you cite
+ * the research papers on the package. Check out http://www.gromacs.org.
+ */
+/*! \libinternal \file
+ * \brief
+ * Declares gmx::ObservablesReducer and builder
+ *
+ * Periodically modules implementing MD simulations need to
+ * communicate with all collaborating ranks to do things like compute
+ * observables like total energies, signal conditions, and check
+ * internal consistency. This communication synchronizes all
+ * participating ranks, which limits scaling and performance, so it is
+ * done rarely (typically once per step) and only when required.
+ *
+ * Modules may provide data of type double to be reduced across
+ * all ranks via an MPI all-reduce with MPI_SUM. Double-precision
+ * floating-point is chosen so that no meaningful precision is lost
+ * e.g. in computing energies, while also permitting integral or
+ * boolean messages to be passed as double-precision floating-point
+ * values.
+ *
+ * Different modules typically need to communicate on different MD
+ * steps, so in principle one might optimize by filling a
+ * std::vector<double> with the values required on the current
+ * step. However, that requires that each module produce and then copy
+ * to the reduction buffer the data for this step. The typical amount
+ * of data required even if all modules need to participate (ie.
+ * hundreds of doubles) is smaller than the message headers that are
+ * used by the underlying network transport protocol. So optimizing
+ * for minimum message size is not particularly effective because it
+ * does not meaningfully reduce the total time taken to communicate.
+ *
+ * Instead, we always reduce a buffer of the size that would be needed
+ * if all active modules required communication this step. Then no
+ * module needs to copy data merely to achieve reduction. To achieve
+ * this, each module needs a stable view of memory into which it can
+ * store data for which reduction is desired. It also means that
+ * modules not active in the current simulation do not contribute to
+ * the workload at run time. Also, modules that are active but don't
+ * need communication at any particular MD step can passively opt out
+ * and that incurs no overhead.
+ *
+ * The functionality is separated two main components, one that does
+ * work during the simulation, and a builder that is used only during
+ * setup time. This separates the responsibilities of
+ * - allowing subscription and building the communication buffer, from
+ * - orchestrating the minimum activity needed for this MD step.
+ *
+ * The interaction diagrams for those two workflows are depicted
+ * below.
+ *
+\msc
+wordwraparcs=true,
+hscale="2";
+
+runner [label="runner"],
+builder [label="builder"],
+moduleA [label="moduleA"],
+moduleB [label="moduleB"],
+observablesReducer [label="observablesReducer"];
+
+runner =>> builder [label="makes"];
+
+runner =>> moduleA [label="makes"];
+runner =>> moduleA [label="passes builder to"];
+moduleA =>> builder [label="subscribes itself to"];
+
+runner =>> moduleB [label="makes"];
+runner =>> moduleB [label="passes builder to"];
+moduleB =>> builder [label="subscribes itself to"];
+
+runner =>> builder [label="calls build()"];
+builder =>> builder [label="makes communication\nbuffer"];
+builder =>> moduleA [label="notifies of\ncallback and view"];
+builder =>> moduleB [label="notifies of\ncallback and view"];
+builder =>> observablesReducer [label="makes"];
+
+\endmsc
+
+Once the \c observablesReducer is built, the builder may be
+destructed.
+
+The \c observablesReducer and its modules operate entirely by
+passing callbacks.
+
+\msc
+wordwraparcs=true,
+hscale="2";
+
+runner [label="runner"],
+moduleA [label="moduleA"],
+moduleB [label="moduleB"],
+observablesReducer [label="observablesReducer"],
+compute_globals [label="compute_globals()"];
+
+runner =>> moduleA [label="asks for work"];
+moduleA =>> moduleA [label="Produces values\nto reduce"];
+moduleA =>> observablesReducer [label="requires reduction from"];
+
+runner =>> moduleB [label="asks for work"];
+moduleB =>> moduleB [label="Produces values\nto reduce"];
+moduleB =>> observablesReducer [label="requires reduction from"];
+
+runner =>> runner [label="Does other things also"];
+runner =>> compute_globals [label="asks to do reduction"];
+compute_globals =>> compute_globals [label="prepares data to\nreduce in\nlegacy style"];
+compute_globals =>> observablesReducer [label="asks for\nbuffer view"];
+observablesReducer =>> compute_globals [label="provides\nbuffer view"];
+compute_globals =>> compute_globals [label="Does MPI_Allreduce"];
+compute_globals =>> observablesReducer [label="notifies after\nreduction"];
+observablesReducer =>> moduleA [label="notifies after reduction"];
+moduleA =>> moduleA [label="Uses reduced values"];
+moduleA =>> observablesReducer [label="returns"];
+observablesReducer =>> moduleB [label="notifies after reduction"];
+moduleB =>> moduleB [label="Uses reduced values"];
+moduleB =>> observablesReducer [label="returns"];
+observablesReducer =>> observablesReducer [label="zeroes reduction buffer"];
+observablesReducer =>> compute_globals [label="returns"];
+
+\endmsc
+ *
+ * Three callbacks are produced and called per participating module:
+ *
+ * 1. One produced by the module and passed to the builder so that
+ *    later the ObservablesReducer can call it to notify the module
+ *    that reduction is complete.
+ * 2. One produced by the builder and returned to the module so the
+ *    latter can call it to require reduction when it wishes
+ * 3. One produced by the module and passed to the builder so the
+ *    latter can call it to notify the former of the buffer view
+ *    it should use in the first callback and receive a copy
+ *    of the second callback.
+ *
+ * Modules often request that reduction occur "soon" ie. this step or
+ * next step, depending whether reduction has already take place this
+ * MD step. However they are also able to request reduction to occur
+ * "eventually" ie. only whenever some other module requires it, so
+ * the total number of reductions is minimized. Naturally, the
+ * callback to such a module happens only after the eventual
+ * reduction, which may happen on the same step or a later one. If a
+ * module makes more than one "eventually" reduction request before
+ * reduction takes place, the callback to that module will be called
+ * multiple times when eventually reduction does take place. It is the
+ * responsibility of the module to refrain from making those requests
+ * if the multiple callbacks would be a problem (e.g. maintain an
+ * internal record of whether a reduction request has been made).
+ * Modules are not required to set any value for reduction unless they
+ * are requesting reduction.
+ *
+ * An ObservablesReducer object is intended to replace the use of \c
+ * compute_globals() by simulations, as
+ * https://gitlab.com/gromacs/gromacs/-/issues/3887 progresses. When
+ * no modules using the legacy style communication remain, it is
+ * anticipated that this class will change to contain an MPI
+ * communicator to use to implement the MPI_Allreduce internally.  At
+ * that time, communicationBuffer() and reductionComplete() will
+ * likely change into a doReduction() method, or similar. The flow of
+ * the whole propagator loop will now be less clear inasmuch as the
+ * responsibility for requesting reduction now lies with each module,
+ * however this is probably still more clear than the large forest of
+ * flags that resulted from all modules having to have their control
+ * logic in the propagator loop.
+ *
+ * \inlibraryapi
+ * \ingroup module_mdtypes
+ */
+#ifndef GMX_MDTYPES_OBSERVABLESREDUCER_H
+#define GMX_MDTYPES_OBSERVABLESREDUCER_H
+
+#include <cstdint>
+
+#include <functional>
+#include <memory>
+#include <vector>
+
+namespace gmx
+{
+template<typename>
+class ArrayRef;
+
+class ObservablesReducer;
+using Step = int64_t;
+
+/*! \brief Control whether reduction is required soon. */
+enum class ReductionRequirement : int
+{
+    //! Reduce whenever the runner next checks with the ObservablesReducer.
+    Soon,
+    /*! \brief Reduce whenever the runner next checks with the
+     * ObservablesReducer after some module requires reduction Soon */
+    Eventually
+};
+
+/*! \brief Report whether the reduction has happened this step */
+enum class ObservablesReducerStatus : int
+{
+    //! Reduction has not yet happened this step
+    ReadyToReduce,
+    //! Reduction has happened this step
+    AlreadyReducedThisStep
+};
+
+/*! \libinternal \brief
+ * Builder for ObservablesReducer
+ *
+ * Receives subscriptions from MD modules. Caller should call \c
+ * build() once all subscriptions have been received, and then not
+ * attempt any further subscriptions or builds. At that time, the
+ * builder may be destructed.
+ *
+ * This builder will
+ * - receive all subscriptions from MD modules, then
+ * - build the communication buffer used by the subscribers,
+ * - build the \c ObservablesReducer object that manages the
+ *   lifetime of that buffer, and
+ * - notify the subscribers via callback of the view of that buffer
+ *   that is theirs to use and a further callback to require
+ *   reduction of that buffer.
+ * See also the interaction diagram in the documentation
+ * for observablesreducer.h file.
+ *
+ * Note that the builder callbacks do not follow the approach of \c
+ * MDModulesNotifier because that requires that the same value is
+ * passed to all recipients. Here a distinct value goes to each
+ * recipient, ie. a different view of the communication buffer.
+ *
+ * In order to avoid circular build-time dependencies between the
+ * ObservablesReducer (and its builder) with the modules that use it,
+ * the latter can directly call methods on the former, supplying
+ * anonymous callbacks to be used by the former to contact the
+ * latter. CallbackAfterReduction and CallbackFromBuilder are of this
+ * type.
+ *
+ * A callback type CallBackToRequireReduction is also used instead of
+ * a direct method call on ObservablesReducer to require reduction.
+ * This is implemented by calling a method on the Impl object of a
+ * ObservablesReducer. This extends the interface of
+ * ObservablesReducer in a way that is not directly visible. That
+ * complexity provides two benefits:
+ * - only registered subscribers can require reduction (which helps
+ *   ensure correctness by construction)
+ * - the ObservablesReducer::Impl has a stable address from the heap
+ *   allocation needed for std::unique_ptr to use in forming the
+ *   callback to request reduction.
+ * Alternatives exist for the latter, but create requirements on the
+ * stability of the address of ObservablesReducer, and/or extra
+ * coordination to only pass that address to subscribers once it is
+ * stable.
+ *
+ * It is the subscribers' responsibility to coordinate so that all
+ * subscribers on all ranks agree on the need to communicate, e.g. by
+ * orchestrating communication based on the current step number or a
+ * previous message.
+ *
+ */
+class ObservablesReducerBuilder
+{
+public:
+    //! Constructor
+    ObservablesReducerBuilder();
+    //! Destructor
+    ~ObservablesReducerBuilder();
+    //! Move constructor
+    ObservablesReducerBuilder(ObservablesReducerBuilder&& other) noexcept;
+    //! Move assignment operator
+    ObservablesReducerBuilder& operator=(ObservablesReducerBuilder&& other) noexcept;
+
+    /*! \brief Convenience type for the callback subscribers to
+     * provide when they require reduction. */
+    using CallbackAfterReduction = std::function<void(Step)>;
+    /*! \brief Convenience type for the callback subscribers
+     * call to require reduction.
+     *
+     * When called, the status it returns can be used for checking the
+     * internal expectations of the subscriber on whether reduction
+     * has already occured this step, or not. */
+    using CallbackToRequireReduction = std::function<ObservablesReducerStatus(ReductionRequirement)>;
+    /*! \brief Convenience type for the callback from the builder to
+     * notify the subscribers of the callback they will own and later
+     * use to require reduction and the view of the communication
+     * buffer they will later use. */
+    using CallbackFromBuilder = std::function<void(CallbackToRequireReduction&&, ArrayRef<double>)>;
+
+    /*! \brief Add a subscriber to the \c ObservablesReducer that will
+     * later be built in \c build()
+     *
+     * Takes ownership of both callbacks supplied by the subscribing
+     * module. This approach ensures that time is not spent in the MD
+     * loop constructing std::function objects, because constructing
+     * one of those requires 1-2 heap allocations (depending on the
+     * size of the lambda capture).
+     *
+     * Must not be called after build() */
+    void addSubscriber(int                      sizeRequired,
+                       CallbackFromBuilder&&    callbackFromBuilder,
+                       CallbackAfterReduction&& callbackAfterReduction);
+
+    /*! \brief Build a \c ObservablesReducer to which any subscribers
+     * have been added
+     *
+     * Must be called only once. Notifies each subscriber (via the
+     * CallbackFromBuilder that it supplied) of the view of the
+     * reduction buffer that they will use and the
+     * CallbackToRequireReduction that they will use. */
+    ObservablesReducer build();
+
+private:
+    class Impl;
+    //! Impl object
+    std::unique_ptr<Impl> impl_;
+};
+
+/*! \libinternal \brief
+ * Manage reduction of observables for registered subscribers
+ *
+ * Modules can require that the \c ObservablesReducer object to which
+ * they have subscribed do communication this step.  After reduction
+ * is complete, notifications are made to the callbacks that modules
+ * previously supplied to the ObservablesReducerBuilder. Then the
+ * reduction buffer is zeroed. Thus the subscribers may not depend on
+ * the values in their buffer view after the notification callback
+ * returns, so they should do any necessary processing during that
+ * callback.
+ *
+ * Modules are free to request reduction whenever they wish, and have
+ * no obligations to do anything at any time. In particular, they
+ * do not have to set values for their reduction buffer except when
+ * they are requesting reduction.
+ *
+ * The \c ObservablesReducerBuilder object is responsible for
+ * preparing a vector of doubles and notifying the subscribers of the
+ * mutually disjoint views of the buffer that they should use for both
+ * input and output of the reduction. The ObservablesReducer object
+ * that it builds retains no record of the subscribers, because its
+ * responsibility is solely to orchestrate the MPI communication and
+ * callbacks.
+ *
+ * Subscribers automatically use the correct \c ObservablesReducer
+ * object because the callback they received is bound to the correct
+ * one. The only way a module can participate in an \c
+ * ObservablesReducer is to have registered with its builder.
+ *
+ * The owner of an ObservablesReducer must maintain the lifetime of
+ * the \c ObservablesReducer object until all subscribers no longer
+ * need it. After the destruction of an \c ObservablesReducer, if
+ * subscribers access their view of the communication buffer, the
+ * behavior is undefined.
+ *
+ * \inlibraryapi
+ * \ingroup module_mdtypes
+ */
+class ObservablesReducer
+{
+private:
+    class Impl;
+    std::unique_ptr<Impl> impl_;
+
+public:
+    //! Constructor only usable by ObservablesReducerBuilder
+    explicit ObservablesReducer(std::unique_ptr<Impl> impl);
+    // Destructor
+    ~ObservablesReducer();
+    //! Move constructor
+    ObservablesReducer(ObservablesReducer&& other) noexcept;
+    //! Move assignment operator
+    ObservablesReducer& operator=(ObservablesReducer&& other) noexcept;
+
+    /*! \brief Provide view of communication buffer for MPI reduction
+     *
+     * If no subscriber used ReductionRequirement::Soon since the last
+     * call to reductionComplete(), then this method returns an empty
+     * buffer. Otherwise it returns a view over the buffer potentially
+     * filled by all subscribed modules.
+     */
+    ArrayRef<double> communicationBuffer();
+    /*! \brief Called by the runner after MPI communication is complete
+     *
+     * Notifies all subscribers who required reduction since the last
+     * call to reductionComplete() and passes the \c step value so
+     * they can check internally that the simulation state is
+     * consistent.
+     *
+     * After all notifications, zeroes the communication buffer. It is
+     * the responsibility of the subscribers that required reduction
+     * to react suitably to the data available during their
+     * notification. This ensures that modules cannot get arbitrary
+     * but realistic-looking values left behind from previous
+     * communication stages. It also ensures that subsequent
+     * communication stages will not be able to keep reducing values
+     * until they overflow or underflow. This zeroing is most efficient
+     * to do centrally in an object of this class.
+     *
+     * The choice of zero for the sentinel value is not perfect. In
+     * principle, a value of zero is potentially significant to any
+     * subscriber, so could be provided to a subscriber as the result
+     * of an incorrect implementation of ObservablesReducer or
+     * inconsistent use by subscribers. However by construction (which
+     * is tested), the integration tests never produce a zero result
+     * from an reduced value provided by a subscriber. So, if the
+     * coverage there is high then there is good reason to expect that
+     * when a zero value is used by a subscribers it is the result of
+     * a reduction and thus significant, rather than an artefact of
+     * the zeroing of the communication buffer after notifications are
+     * complete.
+     *
+     * The choice of zero ensures that the MPI reduction will produce
+     * a valid numerical result in all cases except when a module that
+     * required reduction set buffer contents that produced a
+     * problematic output after reduction.
+     */
+    void reductionComplete(Step step);
+    /*! \brief Notify the ObservablesReducer that this MD step is complete
+     *
+     * Any runner using the ObservablesReducer must call this method
+     * whenever a step completes, so that subscribed modules can use
+     * that information to check whether reduction is happening on the
+     * step that they expect.
+     *
+     * The ObservablesReducer keeps track of whether reduction has
+     * already occured this step, so that when modules request
+     * reduction it can notify them of that status. This permits them
+     * to check their own requirements, e.g. that
+     * ReductionRequirement::Soon will operate this step or next
+     * step. */
+    void stepComplete();
+    //! The builder needs to be able to make the Impl object
+    friend class ObservablesReducerBuilder;
+};
+
+} // namespace gmx
+
+#endif
index 90d37b1d51389edbc0d15720cd30f7b4f16e7d8d..f403b1319a2b0a27ac4a6f0e275b5485d11adb5e 100644 (file)
@@ -1,7 +1,7 @@
 #
 # This file is part of the GROMACS molecular simulation package.
 #
-# Copyright (c) 2020, by the GROMACS development team, led by
+# Copyright (c) 2020,2021, by the GROMACS development team, led by
 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
 # and including many others, as listed in the AUTHORS file in the
 # top-level source directory and at http://www.gromacs.org.
@@ -34,6 +34,7 @@
 
 gmx_add_unit_test(MdtypesUnitTest mdtypes-test
     CPP_SOURCE_FILES
+        observablesreducer.cpp
         checkpointdata.cpp
         forcebuffers.cpp
         multipletimestepping.cpp
diff --git a/src/gromacs/mdtypes/tests/observablesreducer.cpp b/src/gromacs/mdtypes/tests/observablesreducer.cpp
new file mode 100644 (file)
index 0000000..f621949
--- /dev/null
@@ -0,0 +1,542 @@
+/*
+ * This file is part of the GROMACS molecular simulation package.
+ *
+ * Copyright (c) 2021, by the GROMACS development team, led by
+ * Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
+ * and including many others, as listed in the AUTHORS file in the
+ * top-level source directory and at http://www.gromacs.org.
+ *
+ * GROMACS is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1
+ * of the License, or (at your option) any later version.
+ *
+ * GROMACS is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with GROMACS; if not, see
+ * http://www.gnu.org/licenses, or write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
+ *
+ * If you want to redistribute modifications to GROMACS, please
+ * consider that scientific software is very special. Version
+ * control is crucial - bugs must be traceable. We will be happy to
+ * consider code for inclusion in the official distribution, but
+ * derived work must not be called official GROMACS. Details are found
+ * in the README & COPYING files - if they are missing, get the
+ * official version at http://www.gromacs.org.
+ *
+ * To help us fund GROMACS development, we humbly ask that you cite
+ * the research papers on the package. Check out http://www.gromacs.org.
+ */
+/*! \internal \file
+ * \brief
+ * Tests for ObservablesReducer.
+ *
+ * \ingroup module_mdtypes
+ */
+#include "gmxpre.h"
+
+#include "gromacs/mdtypes/observablesreducer.h"
+
+#include <numeric>
+#include <optional>
+#include <tuple>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "gromacs/utility/arrayref.h"
+#include "gromacs/utility/basedefinitions.h"
+#include "gromacs/utility/stringutil.h"
+
+namespace gmx::test
+{
+namespace
+{
+
+TEST(ObservablesReducerTest, CanMoveAssign)
+{
+    ObservablesReducerBuilder builder;
+    ObservablesReducer        observablesReducer = builder.build();
+    EXPECT_TRUE(observablesReducer.communicationBuffer().empty())
+            << "no buffer available when no subscribers requested reduction";
+}
+
+TEST(ObservablesReducerTest, CanMoveConstruct)
+{
+    ObservablesReducerBuilder builder;
+    ObservablesReducer        observablesReducerOriginal = builder.build();
+    ObservablesReducer        observablesReducer(std::move(observablesReducerOriginal));
+    EXPECT_TRUE(observablesReducer.communicationBuffer().empty())
+            << "no buffer available when no subscribers requested reduction";
+}
+
+TEST(ObservablesReducerTest, CanBuildAndUseWithNoSubscribers)
+{
+    ObservablesReducerBuilder builder;
+
+    ObservablesReducer observablesReducer = builder.build();
+    EXPECT_TRUE(observablesReducer.communicationBuffer().empty())
+            << "no buffer available when no subscribers requested reduction";
+    observablesReducer.reductionComplete(0);
+
+    EXPECT_TRUE(observablesReducer.communicationBuffer().empty())
+            << "no buffer available after reductionComplete()";
+    observablesReducer.stepComplete();
+}
+
+TEST(ObservablesReducerTest, CanBuildAndUseWithOneSubscriber)
+{
+    ObservablesReducerBuilder builder;
+
+    // This test implements the caller, the builder and the
+    // ObservablesReducer all in the one scope, which likely does not
+    // resemble any actual use case. More realistic test cases are
+    // found below.
+
+    std::optional<int>                                    stepUponWhichReductionOccured;
+    ObservablesReducerBuilder::CallbackToRequireReduction callbackToRequireReduction;
+    ArrayRef<double>                                      bufferView;
+    ObservablesReducerBuilder::CallbackFromBuilder        callbackFromBuilder =
+            [&](ObservablesReducerBuilder::CallbackToRequireReduction&& c, ArrayRef<double> b) {
+                callbackToRequireReduction = std::move(c);
+                bufferView                 = b;
+            };
+
+    ObservablesReducerBuilder::CallbackAfterReduction callbackAfterReduction =
+            [&stepUponWhichReductionOccured](Step step) { stepUponWhichReductionOccured = step; };
+    const int requiredBufferSize = 2;
+    builder.addSubscriber(
+            requiredBufferSize, std::move(callbackFromBuilder), std::move(callbackAfterReduction));
+
+    ObservablesReducer observablesReducer = builder.build();
+    EXPECT_TRUE(observablesReducer.communicationBuffer().empty())
+            << "no buffer available when no subscribers requested reduction";
+    ASSERT_EQ(requiredBufferSize, bufferView.size());
+    ASSERT_NE(callbackToRequireReduction, nullptr)
+            << "must have valid callback supplied by the builder";
+    EXPECT_FALSE(stepUponWhichReductionOccured.has_value())
+            << "no callbacks until reductionComplete() is called";
+
+    // Fill some dummy data, so we can check the zeroing later
+    bufferView[0] = 3.0;
+    bufferView[1] = 4.0;
+
+    {
+        SCOPED_TRACE("Test that ReductionRequirement::Eventually doesn't trigger behavior");
+
+        EXPECT_EQ(callbackToRequireReduction(ReductionRequirement::Eventually),
+                  ObservablesReducerStatus::ReadyToReduce);
+        EXPECT_TRUE(observablesReducer.communicationBuffer().empty())
+                << "no buffer available when the only subscribers requested reduction eventually";
+        EXPECT_FALSE(stepUponWhichReductionOccured.has_value())
+                << "no callbacks until reductionComplete() is called";
+
+        // Note that there's nothing else to check here, because the
+        // empty buffer means that no reduction should take place.
+        observablesReducer.stepComplete();
+    }
+    {
+        SCOPED_TRACE("Test that ReductionRequirement::Soon does trigger behavior");
+
+        EXPECT_EQ(callbackToRequireReduction(ReductionRequirement::Soon),
+                  ObservablesReducerStatus::ReadyToReduce);
+        EXPECT_EQ(observablesReducer.communicationBuffer().size(), requiredBufferSize)
+                << "buffer available when a subscriber requested reduction soon";
+        EXPECT_FALSE(stepUponWhichReductionOccured.has_value())
+                << "no callbacks until reductionComplete() is called";
+
+        // In the intended use case, some external component must do
+        // the actual reduction across ranks using the buffer at this
+        // point. Here, we just pretend it happened.
+
+        int step = 2;
+        observablesReducer.reductionComplete(step);
+        ASSERT_TRUE(stepUponWhichReductionOccured.has_value()) << "reduction callbacks took place";
+        EXPECT_EQ(stepUponWhichReductionOccured.value(), step)
+                << "reduction step is passed through correctly";
+        EXPECT_THAT(bufferView, testing::AllOf(testing::SizeIs(requiredBufferSize), testing::Each(0.0)))
+                << "buffer is zeroed after reduction";
+        observablesReducer.stepComplete();
+    }
+}
+
+// Integration tests of ObservablesReducer, builder, and fake
+// subscriber(s). These will model multiple ranks each with multiple
+// subscribers. Building tests that used actual MPI would be extra
+// complexity that is not needed at this time.
+
+//! Helper class that models an MD module that needs to make a subscription to \c ObservablesReducer
+class Subscriber
+{
+public:
+    //! Ensure that each subscriber sends an interesting amount of data
+    static constexpr int s_subscriberBufferMinimumSize = 3;
+    /*! \brief Base value used to ensure the data reduced by each
+     * subscriber is distinct, to help diagnose bugs.
+     *
+     * Also contributes to ensuring that the reduced total is
+     * never zero.
+     *
+     * Note that in a real use case, the subscribers will generally be
+     * located in multiple modules. */
+    static constexpr double s_subscriberOffset = 1000;
+    //! Constructor
+    Subscriber(int subscriberIndex, int numRanks) :
+        // Ensure each subscriber sends a different amount of data, to expose bugs
+        sizeRequired_(s_subscriberBufferMinimumSize + subscriberIndex),
+        // Ensure each subscriber sends a distinct range of data, to expose bugs
+        valueOffset_(s_subscriberOffset * (subscriberIndex + 1)),
+        numRanks_(numRanks),
+        subscriberIndex_(subscriberIndex)
+    {
+    }
+
+    //! Make the subscription via the \c observablesReducerBuilder
+    void makeSubscription(ObservablesReducerBuilder* observablesReducerBuilder)
+    {
+        observablesReducerBuilder->addSubscriber(
+                sizeRequired_,
+                [this](ObservablesReducerBuilder::CallbackToRequireReduction callback,
+                       ArrayRef<double>                                      bufferView) {
+                    this->callbackWhenBufferAvailable(std::move(callback), bufferView);
+                },
+                [this](Step step) { this->callbackAfterReduction(step); });
+    }
+
+    //! Callback to receive data from the builder
+    void callbackWhenBufferAvailable(ObservablesReducerBuilder::CallbackToRequireReduction&& callbackToRequireReduction,
+                                     ArrayRef<double> bufferView)
+    {
+        SCOPED_TRACE("In callback from builder");
+
+        callbackToRequireReduction_ = std::move(callbackToRequireReduction);
+        communicationBuffer_        = bufferView;
+        EXPECT_THAT(communicationBuffer_, testing::AllOf(testing::SizeIs(sizeRequired_), testing::Each(0.0)))
+                << "size of buffer did not match request";
+    }
+
+    //! Pretend to do some simulation work characteristic of \c step
+    void doSimulationWork(Step step, ReductionRequirement reductionRequirement) const
+    {
+        // In a real case, MD simulation work for this PP rank and
+        // step would go here.
+        // ...
+        // Then we put values that model its intermediate output into
+        // the communication buffer. Those values vary with the step,
+        // so that we can test for correctness over multiple reduction
+        // events.
+        std::iota(communicationBuffer_.begin(), communicationBuffer_.end(), valueOffset_ + double(step));
+        // Then we require reduction.
+        EXPECT_EQ(callbackToRequireReduction_(reductionRequirement), ObservablesReducerStatus::ReadyToReduce);
+    }
+
+    //! After the reduction, check the values for this subscriber are as expected
+    void callbackAfterReduction(Step step)
+    {
+        SCOPED_TRACE(formatString("In callback after reduction for subscriber %d", subscriberIndex_));
+
+        // Expected values are different for each subscriber, and
+        // vary with step and number of ranks.
+        std::vector<double> expectedResult(communicationBuffer_.size());
+        std::iota(expectedResult.begin(), expectedResult.end(), valueOffset_ + double(step));
+        std::for_each(expectedResult.begin(), expectedResult.end(), [this](auto& v) {
+            v *= this->numRanks_;
+        });
+        EXPECT_THAT(communicationBuffer_, testing::Pointwise(testing::Eq(), expectedResult))
+                << "wrong values were reduced";
+        // Ensuring that zero is never computed by a reduction helps
+        // test that the zeroing of the communication buffer is
+        // working correctly, as we will only observe zero after
+        // zeroing and no subsequent activity.
+        EXPECT_THAT(communicationBuffer_, testing::Not(testing::Each(0)))
+                << "zero may not be the result of an reduction during testing";
+    }
+
+    //! The number of doubles required to reduce
+    int sizeRequired_;
+    //! The callback used to require reduction
+    ObservablesReducerBuilder::CallbackToRequireReduction callbackToRequireReduction_;
+    //! The buffer used for communication, supplied by an \c ObservablesReducer
+    ArrayRef<double> communicationBuffer_;
+    //! Offset that differentiates the values reduced by each subscriber
+    double valueOffset_;
+    //! Number of ranks, used in constructing test expectations
+    int numRanks_;
+    //! Index within the group of subscribers
+    int subscriberIndex_;
+};
+
+//! Test fixture class
+class ObservablesReducerIntegrationTest : public testing::TestWithParam<std::tuple<int, int>>
+{
+public:
+    //! Helper struct to model data on a single MPI rank
+    struct RankData
+    {
+        /*! \brief Builder of \c observablesReducer for this "rank,"
+         * valid until after its build() method has been called. */
+        std::optional<ObservablesReducerBuilder> builder = ObservablesReducerBuilder{};
+        //! Subscribers to \c observablesReducer
+        std::vector<Subscriber> subscribers;
+        /*! \brief Manages reduction of observables on behalf of this
+         * "rank", valid only after the ObserbalesReducerBuilder
+         * builds it. */
+        std::optional<ObservablesReducer> observablesReducer;
+    };
+
+    //! Constructor
+    ObservablesReducerIntegrationTest() : numSubscribers_(std::get<0>(GetParam()))
+    {
+        int numRanks(std::get<1>(GetParam()));
+
+        rankData_.resize(numRanks);
+        for (auto& rankData : rankData_)
+        {
+            for (int i = 0; i < numSubscribers_; ++i)
+            {
+                // Ensure each subscriber sends a different (but small) amount of data
+                rankData.subscribers.emplace_back(Subscriber(i, numRanks));
+            }
+            // Now that the addresses of the subscribers are
+            // stable, set up the build-time callback.
+            for (auto& subscriber : rankData.subscribers)
+            {
+                subscriber.makeSubscription(&rankData.builder.value());
+            }
+        }
+    }
+
+    /*! \brief Performs the equivalent of MPI_Allreduce on the
+     * communication buffer over \c rankData_ */
+    void fakeMpiAllReduce()
+    {
+        std::vector<double> reducedValues(
+                rankData_[0].observablesReducer.value().communicationBuffer().size(), 0.0);
+        // Reduce the values across "ranks"
+        for (auto& rankData : rankData_)
+        {
+            for (size_t i = 0; i != reducedValues.size(); ++i)
+            {
+                reducedValues[i] += rankData.observablesReducer.value().communicationBuffer()[i];
+            }
+        }
+        // Copy the reduced values to all "ranks"
+        for (auto& rankData : rankData_)
+        {
+            auto buffer = rankData.observablesReducer.value().communicationBuffer();
+            std::copy(reducedValues.begin(), reducedValues.end(), buffer.begin());
+        }
+    }
+
+    //! The number of subscribers
+    int numSubscribers_;
+    //! Models data distributed over MPI ranks
+    std::vector<RankData> rankData_;
+};
+
+TEST_P(ObservablesReducerIntegrationTest, CanBuildAndUseSimply)
+{
+    for (auto& rankData : rankData_)
+    {
+        rankData.observablesReducer = rankData.builder.value().build();
+        rankData.builder.reset();
+        EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty())
+                << "no buffer available when no subscribers requested reduction";
+    }
+
+    Step step = 0;
+    for (auto& rankData : rankData_)
+    {
+        for (auto& subscriber : rankData.subscribers)
+        {
+            subscriber.doSimulationWork(step, ReductionRequirement::Soon);
+        }
+        EXPECT_EQ(numSubscribers_ == 0, rankData.observablesReducer.value().communicationBuffer().empty())
+                << "buffer should be available only when there are active subscribers";
+    }
+
+    // This does reduction work, and calls the callbacks that check
+    // the buffer contents.
+    fakeMpiAllReduce();
+
+    for (auto& rankData : rankData_)
+    {
+        rankData.observablesReducer.value().reductionComplete(step);
+        EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty())
+                << "no buffer available after reductionComplete()";
+        rankData.observablesReducer.value().stepComplete();
+    }
+}
+
+TEST_P(ObservablesReducerIntegrationTest, CanBuildAndUseOverMultipleSteps)
+{
+    for (auto& rankData : rankData_)
+    {
+        rankData.observablesReducer = rankData.builder.value().build();
+        rankData.builder.reset();
+        EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty())
+                << "no buffer available when no subscribers requested reduction";
+    }
+
+    for (Step step = 0; step < 20; step += 10)
+    {
+        for (auto& rankData : rankData_)
+        {
+            for (auto& subscriber : rankData.subscribers)
+            {
+                subscriber.doSimulationWork(step, ReductionRequirement::Soon);
+            }
+            EXPECT_EQ(numSubscribers_ == 0,
+                      rankData.observablesReducer.value().communicationBuffer().empty())
+                    << "buffer should be available only when there are subscribers";
+        }
+
+        // This does reduction work, and calls the callbacks that
+        // check the buffer contents.
+        fakeMpiAllReduce();
+
+        for (auto& rankData : rankData_)
+        {
+            rankData.observablesReducer.value().reductionComplete(step);
+            EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty())
+                    << "no buffer available after reductionComplete()";
+            rankData.observablesReducer.value().stepComplete();
+        }
+    }
+}
+
+TEST_P(ObservablesReducerIntegrationTest, CanBuildAndUseWithoutAllNeedingReduction)
+{
+    if (numSubscribers_ == 0)
+    {
+        // Test is meaningless with no subscribers
+        return;
+    }
+
+    for (auto& rankData : rankData_)
+    {
+        rankData.observablesReducer = rankData.builder.value().build();
+        rankData.builder.reset();
+        EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty())
+                << "no buffer available when no subscribers requested reduction";
+    }
+
+    // Only one subscriber does work leading to reduction
+    size_t subscriberNeedingReduction = 0;
+    Step   step                       = 0;
+    for (auto& rankData : rankData_)
+    {
+        auto& subscriber = rankData.subscribers[subscriberNeedingReduction];
+        subscriber.doSimulationWork(step, ReductionRequirement::Soon);
+        EXPECT_FALSE(rankData.observablesReducer.value().communicationBuffer().empty())
+                << "buffer should be available when there is an active subscriber";
+    }
+
+    // This does reduction work, and calls the callbacks that check
+    // the buffer contents.
+    fakeMpiAllReduce();
+
+    // Check that other subscribers didn't reduce anything
+    for (auto& rankData : rankData_)
+    {
+        for (size_t r = 0; r != rankData.subscribers.size(); ++r)
+        {
+            if (r == subscriberNeedingReduction)
+            {
+                continue;
+            }
+            EXPECT_THAT(rankData.subscribers[r].communicationBuffer_, testing::Each(0.0))
+                    << "buffer for non-subscribers should be zero";
+        }
+    }
+
+    for (auto& rankData : rankData_)
+    {
+        rankData.observablesReducer.value().reductionComplete(step);
+        EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty())
+                << "no buffer available after reductionComplete()";
+        rankData.observablesReducer.value().stepComplete();
+    }
+}
+
+TEST_P(ObservablesReducerIntegrationTest, CanBuildAndUseWhenASubscriberUsesEventually)
+{
+    if (numSubscribers_ < 2)
+    {
+        // Test is meaningful only with multiple subscribers
+        return;
+    }
+
+    for (auto& rankData : rankData_)
+    {
+        rankData.observablesReducer = rankData.builder.value().build();
+        rankData.builder.reset();
+        EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty())
+                << "no buffer available when no subscribers requested reduction";
+    }
+
+    // Only one subscriber does work leading to reduction
+    size_t subscriberUsingEventually = 1;
+    Step   step                      = 1;
+    for (auto& rankData : rankData_)
+    {
+        auto& subscriber = rankData.subscribers[subscriberUsingEventually];
+        subscriber.doSimulationWork(step, ReductionRequirement::Eventually);
+        EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty())
+                << "buffer should not be available when the only active subscriber used "
+                   "ReductionRequirement::Eventually";
+    }
+
+    // This will do nothing, as all the communication buffers are
+    // empty, but we can't directly test that nothing
+    // occured. Instead, we will later do some
+    // ReductionRequirement::Soon work and observe that result is
+    // consistent with exactly one reduction.
+    fakeMpiAllReduce();
+
+    for (auto& rankData : rankData_)
+    {
+        for (size_t i = 0; i != rankData.subscribers.size(); ++i)
+        {
+            if (i == subscriberUsingEventually)
+            {
+                continue;
+            }
+            rankData.subscribers[i].doSimulationWork(step, ReductionRequirement::Soon);
+        }
+        EXPECT_FALSE(rankData.observablesReducer.value().communicationBuffer().empty())
+                << "buffer should be available since there are subscribers";
+    }
+
+    // This does reduction work, and calls the callbacks that check
+    // the buffer contents.
+    fakeMpiAllReduce();
+
+    for (auto& rankData : rankData_)
+    {
+        rankData.observablesReducer.value().reductionComplete(step);
+        EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty())
+                << "no buffer available after reductionComplete()";
+        rankData.observablesReducer.value().stepComplete();
+    }
+}
+
+//! Help GoogleTest name our test cases
+std::string namesOfTests(const testing::TestParamInfo<ObservablesReducerIntegrationTest::ParamType>& info)
+{
+    // NB alphanumeric characters only
+    return formatString("numSubscribers%dnumRanks%d", std::get<0>(info.param), std::get<1>(info.param));
+}
+INSTANTIATE_TEST_SUITE_P(WithVariousSubscriberCounts,
+                         ObservablesReducerIntegrationTest,
+                         testing::Combine(testing::Values(0, 1, 2, 3), // subscriber counts
+                                          testing::Values(1, 2, 3)),   // rank counts
+                         namesOfTests);
+
+} // namespace
+} // namespace gmx::test