From d7e298594f19e8473531a01dccf6c8700b2d104e Mon Sep 17 00:00:00 2001 From: Mark Abraham Date: Wed, 18 Aug 2021 14:36:19 +0000 Subject: [PATCH] Introduce ObservablesReducer --- src/gromacs/mdtypes/CMakeLists.txt | 1 + src/gromacs/mdtypes/observablesreducer.cpp | 262 +++++++++ src/gromacs/mdtypes/observablesreducer.h | 463 +++++++++++++++ src/gromacs/mdtypes/tests/CMakeLists.txt | 3 +- .../mdtypes/tests/observablesreducer.cpp | 542 ++++++++++++++++++ 5 files changed, 1270 insertions(+), 1 deletion(-) create mode 100644 src/gromacs/mdtypes/observablesreducer.cpp create mode 100644 src/gromacs/mdtypes/observablesreducer.h create mode 100644 src/gromacs/mdtypes/tests/observablesreducer.cpp diff --git a/src/gromacs/mdtypes/CMakeLists.txt b/src/gromacs/mdtypes/CMakeLists.txt index a9ee14787a..bdc90b0161 100644 --- a/src/gromacs/mdtypes/CMakeLists.txt +++ b/src/gromacs/mdtypes/CMakeLists.txt @@ -48,6 +48,7 @@ file(GLOB MDTYPES_SOURCES md_enums.cpp multipletimestepping.cpp observableshistory.cpp + observablesreducer.cpp state.cpp) if(GMX_GPU) diff --git a/src/gromacs/mdtypes/observablesreducer.cpp b/src/gromacs/mdtypes/observablesreducer.cpp new file mode 100644 index 0000000000..83db804cc1 --- /dev/null +++ b/src/gromacs/mdtypes/observablesreducer.cpp @@ -0,0 +1,262 @@ +/* + * This file is part of the GROMACS molecular simulation package. + * + * Copyright (c) 2021, by the GROMACS development team, led by + * Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl, + * and including many others, as listed in the AUTHORS file in the + * top-level source directory and at http://www.gromacs.org. + * + * GROMACS is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation; either version 2.1 + * of the License, or (at your option) any later version. + * + * GROMACS is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with GROMACS; if not, see + * http://www.gnu.org/licenses, or write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * If you want to redistribute modifications to GROMACS, please + * consider that scientific software is very special. Version + * control is crucial - bugs must be traceable. We will be happy to + * consider code for inclusion in the official distribution, but + * derived work must not be called official GROMACS. Details are found + * in the README & COPYING files - if they are missing, get the + * official version at http://www.gromacs.org. + * + * To help us fund GROMACS development, we humbly ask that you cite + * the research papers on the package. Check out http://www.gromacs.org. + */ +/*! \internal \file + * \brief + * Defines gmx::ObservablesReducer and its builder + * + * These are defined in the same translation unit so that the Impl + * object of the ObservablesReducer can be built by the builder. + * + * \inlibraryapi + * \ingroup module_mdtypes + */ +#include "gmxpre.h" + +#include "observablesreducer.h" + +#include +#include + +#include "gromacs/utility/arrayref.h" +#include "gromacs/utility/gmxassert.h" + +namespace gmx +{ + +//! Impl class for ObservablesReducer +class ObservablesReducer::Impl +{ +public: + //! Constructor + Impl(std::vector&& communicationBuffer, + std::vector&& registeredCallbacks) : + communicationBuffer_(std::move(communicationBuffer)), + registeredCallbacks_(std::move(registeredCallbacks)) + { + } + + /*! \brief May be called by any subscribed module, via the callback + * supplied by ObservablesReducerBuilder::build(). + * + * If this method has been called on this rank since the last call + * to reduceComplete() with a \c requirement to communicate soon, + * then it will make the communication buffer available via + * communicationBuffer() to \c compute_globals(), so it can copy + * it to the buffer it uses for MPI communication. + * + * It is the subscribers' responsibility to coordinate so that + * all subscribers on all ranks agree on the need to + * communicate, e.g. by orchestating communication based on + * the current step number or a previous message. + * + * Does not check that the callback corresponds to a module that + * subscribed to the builder(). + * + * Returns the status of the ObservablesReducer about whether + * reduction has already been called on this step. + */ + ObservablesReducerStatus requireReduction(int callbackIndex, ReductionRequirement requirement) + { + if (requirement == ReductionRequirement::Soon) + { + reduceSoon_ = true; + } + callbacksAfterReduction_.push_back(callbackIndex); + return status_; + } + + /*! \brief Storage for communication buffer + * + * Must never be resized, because that would potentially + * invalidate views of it held by the subscribers. */ + std::vector communicationBuffer_; + //! Registered callbacks that might be used after communication + std::vector registeredCallbacks_; + //! Indices into registeredCallbacks_ of callbacks to use after the next reduction + std::vector callbacksAfterReduction_; + /*! \brief Whether the reduction will occur soon because a + * module required it + * + * "Soon" means this step or next, depending when during the step + * it was required, as there is only one point during a normal + * simulator step where observables reduction might occur. */ + bool reduceSoon_ = false; + //! Whether reduction has taken place this step + ObservablesReducerStatus status_ = ObservablesReducerStatus::ReadyToReduce; +}; + +ObservablesReducer::ObservablesReducer(std::unique_ptr impl) : impl_(std::move(impl)) {} + +ObservablesReducer::ObservablesReducer(ObservablesReducer&&) noexcept = default; + +ObservablesReducer::~ObservablesReducer() = default; + +ObservablesReducer& ObservablesReducer::operator=(ObservablesReducer&& other) noexcept +{ + impl_ = std::move(other.impl_); + return *this; +} + +ArrayRef ObservablesReducer::communicationBuffer() +{ + if (!impl_->reduceSoon_) + { + // Nothing to reduce + return {}; + } + return impl_->communicationBuffer_; +} + +void ObservablesReducer::reductionComplete(Step step) +{ + impl_->status_ = ObservablesReducerStatus::AlreadyReducedThisStep; + for (auto& callbackIndex : impl_->callbacksAfterReduction_) + { + impl_->registeredCallbacks_[callbackIndex](step); + } + // Prepare for the next reduction + std::fill(impl_->communicationBuffer_.begin(), impl_->communicationBuffer_.end(), 0.0); + impl_->callbacksAfterReduction_.clear(); + impl_->reduceSoon_ = false; +} + +void ObservablesReducer::stepComplete() +{ + impl_->status_ = ObservablesReducerStatus::ReadyToReduce; +} + +//! Impl class for ObservablesReducerBuilder +class ObservablesReducerBuilder::Impl +{ +public: + //! Data required to set up a subscription + struct Subscription + { + //! Number of doubles required to reduce + int sizeRequired; + //! The callback to notify of the view and future callback to require reduction + ObservablesReducerBuilder::CallbackFromBuilder callbackFromBuilder; + //! The callback later used by ObservablesReducer after reduction events + ObservablesReducerBuilder::CallbackAfterReduction callbackAfterReduction; + }; + + //! Contains all subscriptions received + std::vector subscriptions_; + //! Whether build() has already been called on the owner object + bool buildHasBeenCalled_ = false; +}; + +ObservablesReducerBuilder::ObservablesReducerBuilder() : impl_(std::make_unique()) {} + +ObservablesReducerBuilder::ObservablesReducerBuilder(ObservablesReducerBuilder&&) noexcept = default; + +ObservablesReducerBuilder& ObservablesReducerBuilder::operator=(ObservablesReducerBuilder&& other) noexcept +{ + impl_ = std::move(other.impl_); + return *this; +} + +ObservablesReducerBuilder::~ObservablesReducerBuilder() = default; + +void ObservablesReducerBuilder::addSubscriber(const int sizeRequired, + CallbackFromBuilder&& callbackFromBuilder, + CallbackAfterReduction&& callbackAfterReduction) +{ + GMX_RELEASE_ASSERT(!impl_->buildHasBeenCalled_, + "Cannot add subscribers to a builder once build() has been called"); + impl_->subscriptions_.emplace_back(Impl::Subscription{ + sizeRequired, std::move(callbackFromBuilder), std::move(callbackAfterReduction) }); +} + +ObservablesReducer ObservablesReducerBuilder::build() +{ + GMX_RELEASE_ASSERT(!impl_->buildHasBeenCalled_, + "Cannot build ObservablesReducer again from the same builder"); + + // Prepare the communication buffer + const int totalSizeRequired = std::accumulate(impl_->subscriptions_.begin(), + impl_->subscriptions_.end(), + 0.0, + [](int subtotal, const auto& subscription) { + return subtotal + subscription.sizeRequired; + }); + std::vector communicationBuffer(totalSizeRequired); + // Set up a view of the communication buffer that we can use after + // ownership has been transferred to the impl object. + const ArrayRef bufferView(communicationBuffer); + + std::vector registeredCallbacksAfterReduction; + registeredCallbacksAfterReduction.reserve(impl_->subscriptions_.size()); + for (const Impl::Subscription& subscription : impl_->subscriptions_) + { + registeredCallbacksAfterReduction.emplace_back(subscription.callbackAfterReduction); + } + + // Make the impl object so we can set up callbacks to it. This is + // safe because the impl object is allocated on the heap, so we + // have a stable value to share with the subscribers. + auto implPtr = std::make_unique( + std::move(communicationBuffer), std::move(registeredCallbacksAfterReduction)); + auto* impl = implPtr.get(); + + // Then use the impl object to make the real one. Note that there + // is no need for an ObservablesReducer to keep track of all the + // modules that might notify it, so it doesn't do that. + ObservablesReducer observablesReducer(std::move(implPtr)); + + // Now let the subscribers know how to require reduction in + // future, and which memory they should use for input and output. + size_t start = 0; + int indexToCallbackAfterReduction = 0; + for (const Impl::Subscription& subscription : impl_->subscriptions_) + { + // Construct the callback that will hereafter be owned by the + // subscriber. + CallbackToRequireReduction callbackToRequireReduction = + [impl, indexToCallbackAfterReduction](ReductionRequirement requirement) { + return impl->requireReduction(indexToCallbackAfterReduction, requirement); + }; + subscription.callbackFromBuilder(std::move(callbackToRequireReduction), + bufferView.subArray(start, subscription.sizeRequired)); + start += subscription.sizeRequired; + ++indexToCallbackAfterReduction; + } + + impl_->buildHasBeenCalled_ = true; + + return observablesReducer; +} + +} // namespace gmx diff --git a/src/gromacs/mdtypes/observablesreducer.h b/src/gromacs/mdtypes/observablesreducer.h new file mode 100644 index 0000000000..4221f20940 --- /dev/null +++ b/src/gromacs/mdtypes/observablesreducer.h @@ -0,0 +1,463 @@ +/* + * This file is part of the GROMACS molecular simulation package. + * + * Copyright (c) 2021, by the GROMACS development team, led by + * Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl, + * and including many others, as listed in the AUTHORS file in the + * top-level source directory and at http://www.gromacs.org. + * + * GROMACS is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation; either version 2.1 + * of the License, or (at your option) any later version. + * + * GROMACS is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with GROMACS; if not, see + * http://www.gnu.org/licenses, or write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * If you want to redistribute modifications to GROMACS, please + * consider that scientific software is very special. Version + * control is crucial - bugs must be traceable. We will be happy to + * consider code for inclusion in the official distribution, but + * derived work must not be called official GROMACS. Details are found + * in the README & COPYING files - if they are missing, get the + * official version at http://www.gromacs.org. + * + * To help us fund GROMACS development, we humbly ask that you cite + * the research papers on the package. Check out http://www.gromacs.org. + */ +/*! \libinternal \file + * \brief + * Declares gmx::ObservablesReducer and builder + * + * Periodically modules implementing MD simulations need to + * communicate with all collaborating ranks to do things like compute + * observables like total energies, signal conditions, and check + * internal consistency. This communication synchronizes all + * participating ranks, which limits scaling and performance, so it is + * done rarely (typically once per step) and only when required. + * + * Modules may provide data of type double to be reduced across + * all ranks via an MPI all-reduce with MPI_SUM. Double-precision + * floating-point is chosen so that no meaningful precision is lost + * e.g. in computing energies, while also permitting integral or + * boolean messages to be passed as double-precision floating-point + * values. + * + * Different modules typically need to communicate on different MD + * steps, so in principle one might optimize by filling a + * std::vector with the values required on the current + * step. However, that requires that each module produce and then copy + * to the reduction buffer the data for this step. The typical amount + * of data required even if all modules need to participate (ie. + * hundreds of doubles) is smaller than the message headers that are + * used by the underlying network transport protocol. So optimizing + * for minimum message size is not particularly effective because it + * does not meaningfully reduce the total time taken to communicate. + * + * Instead, we always reduce a buffer of the size that would be needed + * if all active modules required communication this step. Then no + * module needs to copy data merely to achieve reduction. To achieve + * this, each module needs a stable view of memory into which it can + * store data for which reduction is desired. It also means that + * modules not active in the current simulation do not contribute to + * the workload at run time. Also, modules that are active but don't + * need communication at any particular MD step can passively opt out + * and that incurs no overhead. + * + * The functionality is separated two main components, one that does + * work during the simulation, and a builder that is used only during + * setup time. This separates the responsibilities of + * - allowing subscription and building the communication buffer, from + * - orchestrating the minimum activity needed for this MD step. + * + * The interaction diagrams for those two workflows are depicted + * below. + * +\msc +wordwraparcs=true, +hscale="2"; + +runner [label="runner"], +builder [label="builder"], +moduleA [label="moduleA"], +moduleB [label="moduleB"], +observablesReducer [label="observablesReducer"]; + +runner =>> builder [label="makes"]; + +runner =>> moduleA [label="makes"]; +runner =>> moduleA [label="passes builder to"]; +moduleA =>> builder [label="subscribes itself to"]; + +runner =>> moduleB [label="makes"]; +runner =>> moduleB [label="passes builder to"]; +moduleB =>> builder [label="subscribes itself to"]; + +runner =>> builder [label="calls build()"]; +builder =>> builder [label="makes communication\nbuffer"]; +builder =>> moduleA [label="notifies of\ncallback and view"]; +builder =>> moduleB [label="notifies of\ncallback and view"]; +builder =>> observablesReducer [label="makes"]; + +\endmsc + +Once the \c observablesReducer is built, the builder may be +destructed. + +The \c observablesReducer and its modules operate entirely by +passing callbacks. + +\msc +wordwraparcs=true, +hscale="2"; + +runner [label="runner"], +moduleA [label="moduleA"], +moduleB [label="moduleB"], +observablesReducer [label="observablesReducer"], +compute_globals [label="compute_globals()"]; + +runner =>> moduleA [label="asks for work"]; +moduleA =>> moduleA [label="Produces values\nto reduce"]; +moduleA =>> observablesReducer [label="requires reduction from"]; + +runner =>> moduleB [label="asks for work"]; +moduleB =>> moduleB [label="Produces values\nto reduce"]; +moduleB =>> observablesReducer [label="requires reduction from"]; + +runner =>> runner [label="Does other things also"]; +runner =>> compute_globals [label="asks to do reduction"]; +compute_globals =>> compute_globals [label="prepares data to\nreduce in\nlegacy style"]; +compute_globals =>> observablesReducer [label="asks for\nbuffer view"]; +observablesReducer =>> compute_globals [label="provides\nbuffer view"]; +compute_globals =>> compute_globals [label="Does MPI_Allreduce"]; +compute_globals =>> observablesReducer [label="notifies after\nreduction"]; +observablesReducer =>> moduleA [label="notifies after reduction"]; +moduleA =>> moduleA [label="Uses reduced values"]; +moduleA =>> observablesReducer [label="returns"]; +observablesReducer =>> moduleB [label="notifies after reduction"]; +moduleB =>> moduleB [label="Uses reduced values"]; +moduleB =>> observablesReducer [label="returns"]; +observablesReducer =>> observablesReducer [label="zeroes reduction buffer"]; +observablesReducer =>> compute_globals [label="returns"]; + +\endmsc + * + * Three callbacks are produced and called per participating module: + * + * 1. One produced by the module and passed to the builder so that + * later the ObservablesReducer can call it to notify the module + * that reduction is complete. + * 2. One produced by the builder and returned to the module so the + * latter can call it to require reduction when it wishes + * 3. One produced by the module and passed to the builder so the + * latter can call it to notify the former of the buffer view + * it should use in the first callback and receive a copy + * of the second callback. + * + * Modules often request that reduction occur "soon" ie. this step or + * next step, depending whether reduction has already take place this + * MD step. However they are also able to request reduction to occur + * "eventually" ie. only whenever some other module requires it, so + * the total number of reductions is minimized. Naturally, the + * callback to such a module happens only after the eventual + * reduction, which may happen on the same step or a later one. If a + * module makes more than one "eventually" reduction request before + * reduction takes place, the callback to that module will be called + * multiple times when eventually reduction does take place. It is the + * responsibility of the module to refrain from making those requests + * if the multiple callbacks would be a problem (e.g. maintain an + * internal record of whether a reduction request has been made). + * Modules are not required to set any value for reduction unless they + * are requesting reduction. + * + * An ObservablesReducer object is intended to replace the use of \c + * compute_globals() by simulations, as + * https://gitlab.com/gromacs/gromacs/-/issues/3887 progresses. When + * no modules using the legacy style communication remain, it is + * anticipated that this class will change to contain an MPI + * communicator to use to implement the MPI_Allreduce internally. At + * that time, communicationBuffer() and reductionComplete() will + * likely change into a doReduction() method, or similar. The flow of + * the whole propagator loop will now be less clear inasmuch as the + * responsibility for requesting reduction now lies with each module, + * however this is probably still more clear than the large forest of + * flags that resulted from all modules having to have their control + * logic in the propagator loop. + * + * \inlibraryapi + * \ingroup module_mdtypes + */ +#ifndef GMX_MDTYPES_OBSERVABLESREDUCER_H +#define GMX_MDTYPES_OBSERVABLESREDUCER_H + +#include + +#include +#include +#include + +namespace gmx +{ +template +class ArrayRef; + +class ObservablesReducer; +using Step = int64_t; + +/*! \brief Control whether reduction is required soon. */ +enum class ReductionRequirement : int +{ + //! Reduce whenever the runner next checks with the ObservablesReducer. + Soon, + /*! \brief Reduce whenever the runner next checks with the + * ObservablesReducer after some module requires reduction Soon */ + Eventually +}; + +/*! \brief Report whether the reduction has happened this step */ +enum class ObservablesReducerStatus : int +{ + //! Reduction has not yet happened this step + ReadyToReduce, + //! Reduction has happened this step + AlreadyReducedThisStep +}; + +/*! \libinternal \brief + * Builder for ObservablesReducer + * + * Receives subscriptions from MD modules. Caller should call \c + * build() once all subscriptions have been received, and then not + * attempt any further subscriptions or builds. At that time, the + * builder may be destructed. + * + * This builder will + * - receive all subscriptions from MD modules, then + * - build the communication buffer used by the subscribers, + * - build the \c ObservablesReducer object that manages the + * lifetime of that buffer, and + * - notify the subscribers via callback of the view of that buffer + * that is theirs to use and a further callback to require + * reduction of that buffer. + * See also the interaction diagram in the documentation + * for observablesreducer.h file. + * + * Note that the builder callbacks do not follow the approach of \c + * MDModulesNotifier because that requires that the same value is + * passed to all recipients. Here a distinct value goes to each + * recipient, ie. a different view of the communication buffer. + * + * In order to avoid circular build-time dependencies between the + * ObservablesReducer (and its builder) with the modules that use it, + * the latter can directly call methods on the former, supplying + * anonymous callbacks to be used by the former to contact the + * latter. CallbackAfterReduction and CallbackFromBuilder are of this + * type. + * + * A callback type CallBackToRequireReduction is also used instead of + * a direct method call on ObservablesReducer to require reduction. + * This is implemented by calling a method on the Impl object of a + * ObservablesReducer. This extends the interface of + * ObservablesReducer in a way that is not directly visible. That + * complexity provides two benefits: + * - only registered subscribers can require reduction (which helps + * ensure correctness by construction) + * - the ObservablesReducer::Impl has a stable address from the heap + * allocation needed for std::unique_ptr to use in forming the + * callback to request reduction. + * Alternatives exist for the latter, but create requirements on the + * stability of the address of ObservablesReducer, and/or extra + * coordination to only pass that address to subscribers once it is + * stable. + * + * It is the subscribers' responsibility to coordinate so that all + * subscribers on all ranks agree on the need to communicate, e.g. by + * orchestrating communication based on the current step number or a + * previous message. + * + */ +class ObservablesReducerBuilder +{ +public: + //! Constructor + ObservablesReducerBuilder(); + //! Destructor + ~ObservablesReducerBuilder(); + //! Move constructor + ObservablesReducerBuilder(ObservablesReducerBuilder&& other) noexcept; + //! Move assignment operator + ObservablesReducerBuilder& operator=(ObservablesReducerBuilder&& other) noexcept; + + /*! \brief Convenience type for the callback subscribers to + * provide when they require reduction. */ + using CallbackAfterReduction = std::function; + /*! \brief Convenience type for the callback subscribers + * call to require reduction. + * + * When called, the status it returns can be used for checking the + * internal expectations of the subscriber on whether reduction + * has already occured this step, or not. */ + using CallbackToRequireReduction = std::function; + /*! \brief Convenience type for the callback from the builder to + * notify the subscribers of the callback they will own and later + * use to require reduction and the view of the communication + * buffer they will later use. */ + using CallbackFromBuilder = std::function)>; + + /*! \brief Add a subscriber to the \c ObservablesReducer that will + * later be built in \c build() + * + * Takes ownership of both callbacks supplied by the subscribing + * module. This approach ensures that time is not spent in the MD + * loop constructing std::function objects, because constructing + * one of those requires 1-2 heap allocations (depending on the + * size of the lambda capture). + * + * Must not be called after build() */ + void addSubscriber(int sizeRequired, + CallbackFromBuilder&& callbackFromBuilder, + CallbackAfterReduction&& callbackAfterReduction); + + /*! \brief Build a \c ObservablesReducer to which any subscribers + * have been added + * + * Must be called only once. Notifies each subscriber (via the + * CallbackFromBuilder that it supplied) of the view of the + * reduction buffer that they will use and the + * CallbackToRequireReduction that they will use. */ + ObservablesReducer build(); + +private: + class Impl; + //! Impl object + std::unique_ptr impl_; +}; + +/*! \libinternal \brief + * Manage reduction of observables for registered subscribers + * + * Modules can require that the \c ObservablesReducer object to which + * they have subscribed do communication this step. After reduction + * is complete, notifications are made to the callbacks that modules + * previously supplied to the ObservablesReducerBuilder. Then the + * reduction buffer is zeroed. Thus the subscribers may not depend on + * the values in their buffer view after the notification callback + * returns, so they should do any necessary processing during that + * callback. + * + * Modules are free to request reduction whenever they wish, and have + * no obligations to do anything at any time. In particular, they + * do not have to set values for their reduction buffer except when + * they are requesting reduction. + * + * The \c ObservablesReducerBuilder object is responsible for + * preparing a vector of doubles and notifying the subscribers of the + * mutually disjoint views of the buffer that they should use for both + * input and output of the reduction. The ObservablesReducer object + * that it builds retains no record of the subscribers, because its + * responsibility is solely to orchestrate the MPI communication and + * callbacks. + * + * Subscribers automatically use the correct \c ObservablesReducer + * object because the callback they received is bound to the correct + * one. The only way a module can participate in an \c + * ObservablesReducer is to have registered with its builder. + * + * The owner of an ObservablesReducer must maintain the lifetime of + * the \c ObservablesReducer object until all subscribers no longer + * need it. After the destruction of an \c ObservablesReducer, if + * subscribers access their view of the communication buffer, the + * behavior is undefined. + * + * \inlibraryapi + * \ingroup module_mdtypes + */ +class ObservablesReducer +{ +private: + class Impl; + std::unique_ptr impl_; + +public: + //! Constructor only usable by ObservablesReducerBuilder + explicit ObservablesReducer(std::unique_ptr impl); + // Destructor + ~ObservablesReducer(); + //! Move constructor + ObservablesReducer(ObservablesReducer&& other) noexcept; + //! Move assignment operator + ObservablesReducer& operator=(ObservablesReducer&& other) noexcept; + + /*! \brief Provide view of communication buffer for MPI reduction + * + * If no subscriber used ReductionRequirement::Soon since the last + * call to reductionComplete(), then this method returns an empty + * buffer. Otherwise it returns a view over the buffer potentially + * filled by all subscribed modules. + */ + ArrayRef communicationBuffer(); + /*! \brief Called by the runner after MPI communication is complete + * + * Notifies all subscribers who required reduction since the last + * call to reductionComplete() and passes the \c step value so + * they can check internally that the simulation state is + * consistent. + * + * After all notifications, zeroes the communication buffer. It is + * the responsibility of the subscribers that required reduction + * to react suitably to the data available during their + * notification. This ensures that modules cannot get arbitrary + * but realistic-looking values left behind from previous + * communication stages. It also ensures that subsequent + * communication stages will not be able to keep reducing values + * until they overflow or underflow. This zeroing is most efficient + * to do centrally in an object of this class. + * + * The choice of zero for the sentinel value is not perfect. In + * principle, a value of zero is potentially significant to any + * subscriber, so could be provided to a subscriber as the result + * of an incorrect implementation of ObservablesReducer or + * inconsistent use by subscribers. However by construction (which + * is tested), the integration tests never produce a zero result + * from an reduced value provided by a subscriber. So, if the + * coverage there is high then there is good reason to expect that + * when a zero value is used by a subscribers it is the result of + * a reduction and thus significant, rather than an artefact of + * the zeroing of the communication buffer after notifications are + * complete. + * + * The choice of zero ensures that the MPI reduction will produce + * a valid numerical result in all cases except when a module that + * required reduction set buffer contents that produced a + * problematic output after reduction. + */ + void reductionComplete(Step step); + /*! \brief Notify the ObservablesReducer that this MD step is complete + * + * Any runner using the ObservablesReducer must call this method + * whenever a step completes, so that subscribed modules can use + * that information to check whether reduction is happening on the + * step that they expect. + * + * The ObservablesReducer keeps track of whether reduction has + * already occured this step, so that when modules request + * reduction it can notify them of that status. This permits them + * to check their own requirements, e.g. that + * ReductionRequirement::Soon will operate this step or next + * step. */ + void stepComplete(); + //! The builder needs to be able to make the Impl object + friend class ObservablesReducerBuilder; +}; + +} // namespace gmx + +#endif diff --git a/src/gromacs/mdtypes/tests/CMakeLists.txt b/src/gromacs/mdtypes/tests/CMakeLists.txt index 90d37b1d51..f403b1319a 100644 --- a/src/gromacs/mdtypes/tests/CMakeLists.txt +++ b/src/gromacs/mdtypes/tests/CMakeLists.txt @@ -1,7 +1,7 @@ # # This file is part of the GROMACS molecular simulation package. # -# Copyright (c) 2020, by the GROMACS development team, led by +# Copyright (c) 2020,2021, by the GROMACS development team, led by # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl, # and including many others, as listed in the AUTHORS file in the # top-level source directory and at http://www.gromacs.org. @@ -34,6 +34,7 @@ gmx_add_unit_test(MdtypesUnitTest mdtypes-test CPP_SOURCE_FILES + observablesreducer.cpp checkpointdata.cpp forcebuffers.cpp multipletimestepping.cpp diff --git a/src/gromacs/mdtypes/tests/observablesreducer.cpp b/src/gromacs/mdtypes/tests/observablesreducer.cpp new file mode 100644 index 0000000000..f621949245 --- /dev/null +++ b/src/gromacs/mdtypes/tests/observablesreducer.cpp @@ -0,0 +1,542 @@ +/* + * This file is part of the GROMACS molecular simulation package. + * + * Copyright (c) 2021, by the GROMACS development team, led by + * Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl, + * and including many others, as listed in the AUTHORS file in the + * top-level source directory and at http://www.gromacs.org. + * + * GROMACS is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation; either version 2.1 + * of the License, or (at your option) any later version. + * + * GROMACS is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with GROMACS; if not, see + * http://www.gnu.org/licenses, or write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * If you want to redistribute modifications to GROMACS, please + * consider that scientific software is very special. Version + * control is crucial - bugs must be traceable. We will be happy to + * consider code for inclusion in the official distribution, but + * derived work must not be called official GROMACS. Details are found + * in the README & COPYING files - if they are missing, get the + * official version at http://www.gromacs.org. + * + * To help us fund GROMACS development, we humbly ask that you cite + * the research papers on the package. Check out http://www.gromacs.org. + */ +/*! \internal \file + * \brief + * Tests for ObservablesReducer. + * + * \ingroup module_mdtypes + */ +#include "gmxpre.h" + +#include "gromacs/mdtypes/observablesreducer.h" + +#include +#include +#include + +#include +#include + +#include "gromacs/utility/arrayref.h" +#include "gromacs/utility/basedefinitions.h" +#include "gromacs/utility/stringutil.h" + +namespace gmx::test +{ +namespace +{ + +TEST(ObservablesReducerTest, CanMoveAssign) +{ + ObservablesReducerBuilder builder; + ObservablesReducer observablesReducer = builder.build(); + EXPECT_TRUE(observablesReducer.communicationBuffer().empty()) + << "no buffer available when no subscribers requested reduction"; +} + +TEST(ObservablesReducerTest, CanMoveConstruct) +{ + ObservablesReducerBuilder builder; + ObservablesReducer observablesReducerOriginal = builder.build(); + ObservablesReducer observablesReducer(std::move(observablesReducerOriginal)); + EXPECT_TRUE(observablesReducer.communicationBuffer().empty()) + << "no buffer available when no subscribers requested reduction"; +} + +TEST(ObservablesReducerTest, CanBuildAndUseWithNoSubscribers) +{ + ObservablesReducerBuilder builder; + + ObservablesReducer observablesReducer = builder.build(); + EXPECT_TRUE(observablesReducer.communicationBuffer().empty()) + << "no buffer available when no subscribers requested reduction"; + observablesReducer.reductionComplete(0); + + EXPECT_TRUE(observablesReducer.communicationBuffer().empty()) + << "no buffer available after reductionComplete()"; + observablesReducer.stepComplete(); +} + +TEST(ObservablesReducerTest, CanBuildAndUseWithOneSubscriber) +{ + ObservablesReducerBuilder builder; + + // This test implements the caller, the builder and the + // ObservablesReducer all in the one scope, which likely does not + // resemble any actual use case. More realistic test cases are + // found below. + + std::optional stepUponWhichReductionOccured; + ObservablesReducerBuilder::CallbackToRequireReduction callbackToRequireReduction; + ArrayRef bufferView; + ObservablesReducerBuilder::CallbackFromBuilder callbackFromBuilder = + [&](ObservablesReducerBuilder::CallbackToRequireReduction&& c, ArrayRef b) { + callbackToRequireReduction = std::move(c); + bufferView = b; + }; + + ObservablesReducerBuilder::CallbackAfterReduction callbackAfterReduction = + [&stepUponWhichReductionOccured](Step step) { stepUponWhichReductionOccured = step; }; + const int requiredBufferSize = 2; + builder.addSubscriber( + requiredBufferSize, std::move(callbackFromBuilder), std::move(callbackAfterReduction)); + + ObservablesReducer observablesReducer = builder.build(); + EXPECT_TRUE(observablesReducer.communicationBuffer().empty()) + << "no buffer available when no subscribers requested reduction"; + ASSERT_EQ(requiredBufferSize, bufferView.size()); + ASSERT_NE(callbackToRequireReduction, nullptr) + << "must have valid callback supplied by the builder"; + EXPECT_FALSE(stepUponWhichReductionOccured.has_value()) + << "no callbacks until reductionComplete() is called"; + + // Fill some dummy data, so we can check the zeroing later + bufferView[0] = 3.0; + bufferView[1] = 4.0; + + { + SCOPED_TRACE("Test that ReductionRequirement::Eventually doesn't trigger behavior"); + + EXPECT_EQ(callbackToRequireReduction(ReductionRequirement::Eventually), + ObservablesReducerStatus::ReadyToReduce); + EXPECT_TRUE(observablesReducer.communicationBuffer().empty()) + << "no buffer available when the only subscribers requested reduction eventually"; + EXPECT_FALSE(stepUponWhichReductionOccured.has_value()) + << "no callbacks until reductionComplete() is called"; + + // Note that there's nothing else to check here, because the + // empty buffer means that no reduction should take place. + observablesReducer.stepComplete(); + } + { + SCOPED_TRACE("Test that ReductionRequirement::Soon does trigger behavior"); + + EXPECT_EQ(callbackToRequireReduction(ReductionRequirement::Soon), + ObservablesReducerStatus::ReadyToReduce); + EXPECT_EQ(observablesReducer.communicationBuffer().size(), requiredBufferSize) + << "buffer available when a subscriber requested reduction soon"; + EXPECT_FALSE(stepUponWhichReductionOccured.has_value()) + << "no callbacks until reductionComplete() is called"; + + // In the intended use case, some external component must do + // the actual reduction across ranks using the buffer at this + // point. Here, we just pretend it happened. + + int step = 2; + observablesReducer.reductionComplete(step); + ASSERT_TRUE(stepUponWhichReductionOccured.has_value()) << "reduction callbacks took place"; + EXPECT_EQ(stepUponWhichReductionOccured.value(), step) + << "reduction step is passed through correctly"; + EXPECT_THAT(bufferView, testing::AllOf(testing::SizeIs(requiredBufferSize), testing::Each(0.0))) + << "buffer is zeroed after reduction"; + observablesReducer.stepComplete(); + } +} + +// Integration tests of ObservablesReducer, builder, and fake +// subscriber(s). These will model multiple ranks each with multiple +// subscribers. Building tests that used actual MPI would be extra +// complexity that is not needed at this time. + +//! Helper class that models an MD module that needs to make a subscription to \c ObservablesReducer +class Subscriber +{ +public: + //! Ensure that each subscriber sends an interesting amount of data + static constexpr int s_subscriberBufferMinimumSize = 3; + /*! \brief Base value used to ensure the data reduced by each + * subscriber is distinct, to help diagnose bugs. + * + * Also contributes to ensuring that the reduced total is + * never zero. + * + * Note that in a real use case, the subscribers will generally be + * located in multiple modules. */ + static constexpr double s_subscriberOffset = 1000; + //! Constructor + Subscriber(int subscriberIndex, int numRanks) : + // Ensure each subscriber sends a different amount of data, to expose bugs + sizeRequired_(s_subscriberBufferMinimumSize + subscriberIndex), + // Ensure each subscriber sends a distinct range of data, to expose bugs + valueOffset_(s_subscriberOffset * (subscriberIndex + 1)), + numRanks_(numRanks), + subscriberIndex_(subscriberIndex) + { + } + + //! Make the subscription via the \c observablesReducerBuilder + void makeSubscription(ObservablesReducerBuilder* observablesReducerBuilder) + { + observablesReducerBuilder->addSubscriber( + sizeRequired_, + [this](ObservablesReducerBuilder::CallbackToRequireReduction callback, + ArrayRef bufferView) { + this->callbackWhenBufferAvailable(std::move(callback), bufferView); + }, + [this](Step step) { this->callbackAfterReduction(step); }); + } + + //! Callback to receive data from the builder + void callbackWhenBufferAvailable(ObservablesReducerBuilder::CallbackToRequireReduction&& callbackToRequireReduction, + ArrayRef bufferView) + { + SCOPED_TRACE("In callback from builder"); + + callbackToRequireReduction_ = std::move(callbackToRequireReduction); + communicationBuffer_ = bufferView; + EXPECT_THAT(communicationBuffer_, testing::AllOf(testing::SizeIs(sizeRequired_), testing::Each(0.0))) + << "size of buffer did not match request"; + } + + //! Pretend to do some simulation work characteristic of \c step + void doSimulationWork(Step step, ReductionRequirement reductionRequirement) const + { + // In a real case, MD simulation work for this PP rank and + // step would go here. + // ... + // Then we put values that model its intermediate output into + // the communication buffer. Those values vary with the step, + // so that we can test for correctness over multiple reduction + // events. + std::iota(communicationBuffer_.begin(), communicationBuffer_.end(), valueOffset_ + double(step)); + // Then we require reduction. + EXPECT_EQ(callbackToRequireReduction_(reductionRequirement), ObservablesReducerStatus::ReadyToReduce); + } + + //! After the reduction, check the values for this subscriber are as expected + void callbackAfterReduction(Step step) + { + SCOPED_TRACE(formatString("In callback after reduction for subscriber %d", subscriberIndex_)); + + // Expected values are different for each subscriber, and + // vary with step and number of ranks. + std::vector expectedResult(communicationBuffer_.size()); + std::iota(expectedResult.begin(), expectedResult.end(), valueOffset_ + double(step)); + std::for_each(expectedResult.begin(), expectedResult.end(), [this](auto& v) { + v *= this->numRanks_; + }); + EXPECT_THAT(communicationBuffer_, testing::Pointwise(testing::Eq(), expectedResult)) + << "wrong values were reduced"; + // Ensuring that zero is never computed by a reduction helps + // test that the zeroing of the communication buffer is + // working correctly, as we will only observe zero after + // zeroing and no subsequent activity. + EXPECT_THAT(communicationBuffer_, testing::Not(testing::Each(0))) + << "zero may not be the result of an reduction during testing"; + } + + //! The number of doubles required to reduce + int sizeRequired_; + //! The callback used to require reduction + ObservablesReducerBuilder::CallbackToRequireReduction callbackToRequireReduction_; + //! The buffer used for communication, supplied by an \c ObservablesReducer + ArrayRef communicationBuffer_; + //! Offset that differentiates the values reduced by each subscriber + double valueOffset_; + //! Number of ranks, used in constructing test expectations + int numRanks_; + //! Index within the group of subscribers + int subscriberIndex_; +}; + +//! Test fixture class +class ObservablesReducerIntegrationTest : public testing::TestWithParam> +{ +public: + //! Helper struct to model data on a single MPI rank + struct RankData + { + /*! \brief Builder of \c observablesReducer for this "rank," + * valid until after its build() method has been called. */ + std::optional builder = ObservablesReducerBuilder{}; + //! Subscribers to \c observablesReducer + std::vector subscribers; + /*! \brief Manages reduction of observables on behalf of this + * "rank", valid only after the ObserbalesReducerBuilder + * builds it. */ + std::optional observablesReducer; + }; + + //! Constructor + ObservablesReducerIntegrationTest() : numSubscribers_(std::get<0>(GetParam())) + { + int numRanks(std::get<1>(GetParam())); + + rankData_.resize(numRanks); + for (auto& rankData : rankData_) + { + for (int i = 0; i < numSubscribers_; ++i) + { + // Ensure each subscriber sends a different (but small) amount of data + rankData.subscribers.emplace_back(Subscriber(i, numRanks)); + } + // Now that the addresses of the subscribers are + // stable, set up the build-time callback. + for (auto& subscriber : rankData.subscribers) + { + subscriber.makeSubscription(&rankData.builder.value()); + } + } + } + + /*! \brief Performs the equivalent of MPI_Allreduce on the + * communication buffer over \c rankData_ */ + void fakeMpiAllReduce() + { + std::vector reducedValues( + rankData_[0].observablesReducer.value().communicationBuffer().size(), 0.0); + // Reduce the values across "ranks" + for (auto& rankData : rankData_) + { + for (size_t i = 0; i != reducedValues.size(); ++i) + { + reducedValues[i] += rankData.observablesReducer.value().communicationBuffer()[i]; + } + } + // Copy the reduced values to all "ranks" + for (auto& rankData : rankData_) + { + auto buffer = rankData.observablesReducer.value().communicationBuffer(); + std::copy(reducedValues.begin(), reducedValues.end(), buffer.begin()); + } + } + + //! The number of subscribers + int numSubscribers_; + //! Models data distributed over MPI ranks + std::vector rankData_; +}; + +TEST_P(ObservablesReducerIntegrationTest, CanBuildAndUseSimply) +{ + for (auto& rankData : rankData_) + { + rankData.observablesReducer = rankData.builder.value().build(); + rankData.builder.reset(); + EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty()) + << "no buffer available when no subscribers requested reduction"; + } + + Step step = 0; + for (auto& rankData : rankData_) + { + for (auto& subscriber : rankData.subscribers) + { + subscriber.doSimulationWork(step, ReductionRequirement::Soon); + } + EXPECT_EQ(numSubscribers_ == 0, rankData.observablesReducer.value().communicationBuffer().empty()) + << "buffer should be available only when there are active subscribers"; + } + + // This does reduction work, and calls the callbacks that check + // the buffer contents. + fakeMpiAllReduce(); + + for (auto& rankData : rankData_) + { + rankData.observablesReducer.value().reductionComplete(step); + EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty()) + << "no buffer available after reductionComplete()"; + rankData.observablesReducer.value().stepComplete(); + } +} + +TEST_P(ObservablesReducerIntegrationTest, CanBuildAndUseOverMultipleSteps) +{ + for (auto& rankData : rankData_) + { + rankData.observablesReducer = rankData.builder.value().build(); + rankData.builder.reset(); + EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty()) + << "no buffer available when no subscribers requested reduction"; + } + + for (Step step = 0; step < 20; step += 10) + { + for (auto& rankData : rankData_) + { + for (auto& subscriber : rankData.subscribers) + { + subscriber.doSimulationWork(step, ReductionRequirement::Soon); + } + EXPECT_EQ(numSubscribers_ == 0, + rankData.observablesReducer.value().communicationBuffer().empty()) + << "buffer should be available only when there are subscribers"; + } + + // This does reduction work, and calls the callbacks that + // check the buffer contents. + fakeMpiAllReduce(); + + for (auto& rankData : rankData_) + { + rankData.observablesReducer.value().reductionComplete(step); + EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty()) + << "no buffer available after reductionComplete()"; + rankData.observablesReducer.value().stepComplete(); + } + } +} + +TEST_P(ObservablesReducerIntegrationTest, CanBuildAndUseWithoutAllNeedingReduction) +{ + if (numSubscribers_ == 0) + { + // Test is meaningless with no subscribers + return; + } + + for (auto& rankData : rankData_) + { + rankData.observablesReducer = rankData.builder.value().build(); + rankData.builder.reset(); + EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty()) + << "no buffer available when no subscribers requested reduction"; + } + + // Only one subscriber does work leading to reduction + size_t subscriberNeedingReduction = 0; + Step step = 0; + for (auto& rankData : rankData_) + { + auto& subscriber = rankData.subscribers[subscriberNeedingReduction]; + subscriber.doSimulationWork(step, ReductionRequirement::Soon); + EXPECT_FALSE(rankData.observablesReducer.value().communicationBuffer().empty()) + << "buffer should be available when there is an active subscriber"; + } + + // This does reduction work, and calls the callbacks that check + // the buffer contents. + fakeMpiAllReduce(); + + // Check that other subscribers didn't reduce anything + for (auto& rankData : rankData_) + { + for (size_t r = 0; r != rankData.subscribers.size(); ++r) + { + if (r == subscriberNeedingReduction) + { + continue; + } + EXPECT_THAT(rankData.subscribers[r].communicationBuffer_, testing::Each(0.0)) + << "buffer for non-subscribers should be zero"; + } + } + + for (auto& rankData : rankData_) + { + rankData.observablesReducer.value().reductionComplete(step); + EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty()) + << "no buffer available after reductionComplete()"; + rankData.observablesReducer.value().stepComplete(); + } +} + +TEST_P(ObservablesReducerIntegrationTest, CanBuildAndUseWhenASubscriberUsesEventually) +{ + if (numSubscribers_ < 2) + { + // Test is meaningful only with multiple subscribers + return; + } + + for (auto& rankData : rankData_) + { + rankData.observablesReducer = rankData.builder.value().build(); + rankData.builder.reset(); + EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty()) + << "no buffer available when no subscribers requested reduction"; + } + + // Only one subscriber does work leading to reduction + size_t subscriberUsingEventually = 1; + Step step = 1; + for (auto& rankData : rankData_) + { + auto& subscriber = rankData.subscribers[subscriberUsingEventually]; + subscriber.doSimulationWork(step, ReductionRequirement::Eventually); + EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty()) + << "buffer should not be available when the only active subscriber used " + "ReductionRequirement::Eventually"; + } + + // This will do nothing, as all the communication buffers are + // empty, but we can't directly test that nothing + // occured. Instead, we will later do some + // ReductionRequirement::Soon work and observe that result is + // consistent with exactly one reduction. + fakeMpiAllReduce(); + + for (auto& rankData : rankData_) + { + for (size_t i = 0; i != rankData.subscribers.size(); ++i) + { + if (i == subscriberUsingEventually) + { + continue; + } + rankData.subscribers[i].doSimulationWork(step, ReductionRequirement::Soon); + } + EXPECT_FALSE(rankData.observablesReducer.value().communicationBuffer().empty()) + << "buffer should be available since there are subscribers"; + } + + // This does reduction work, and calls the callbacks that check + // the buffer contents. + fakeMpiAllReduce(); + + for (auto& rankData : rankData_) + { + rankData.observablesReducer.value().reductionComplete(step); + EXPECT_TRUE(rankData.observablesReducer.value().communicationBuffer().empty()) + << "no buffer available after reductionComplete()"; + rankData.observablesReducer.value().stepComplete(); + } +} + +//! Help GoogleTest name our test cases +std::string namesOfTests(const testing::TestParamInfo& info) +{ + // NB alphanumeric characters only + return formatString("numSubscribers%dnumRanks%d", std::get<0>(info.param), std::get<1>(info.param)); +} +INSTANTIATE_TEST_SUITE_P(WithVariousSubscriberCounts, + ObservablesReducerIntegrationTest, + testing::Combine(testing::Values(0, 1, 2, 3), // subscriber counts + testing::Values(1, 2, 3)), // rank counts + namesOfTests); + +} // namespace +} // namespace gmx::test -- 2.22.0