"Too many calls to startData() compared to provided options");
if (impl_->handles_.empty())
{
- impl_->storage_.setParallelOptions(opt);
- impl_->storage_.startDataStorage(this, &moduleManager());
+ impl_->storage_.startParallelDataStorage(this, &moduleManager(), opt);
}
Impl::HandlePointer handle(new internal::AnalysisDataHandleImpl(this));
--- /dev/null
+/*
+ * This file is part of the GROMACS molecular simulation package.
+ *
+ * Copyright (c) 2013, by the GROMACS development team, led by
+ * David van der Spoel, Berk Hess, Erik Lindahl, and including many
+ * others, as listed in the AUTHORS file in the top-level source
+ * directory and at http://www.gromacs.org.
+ *
+ * GROMACS is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1
+ * of the License, or (at your option) any later version.
+ *
+ * GROMACS is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with GROMACS; if not, see
+ * http://www.gnu.org/licenses, or write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * If you want to redistribute modifications to GROMACS, please
+ * consider that scientific software is very special. Version
+ * control is crucial - bugs must be traceable. We will be happy to
+ * consider code for inclusion in the official distribution, but
+ * derived work must not be called official GROMACS. Details are found
+ * in the README & COPYING files - if they are missing, get the
+ * official version at http://www.gromacs.org.
+ *
+ * To help us fund GROMACS development, we humbly ask that you cite
+ * the research papers on the package. Check out http://www.gromacs.org.
+ */
+/*! \internal \file
+ * \brief
+ * Implements classes from datamodule.h.
+ *
+ * \author Teemu Murtola <teemu.murtola@gmail.com>
+ * \ingroup module_analysisdata
+ */
+#include "gromacs/analysisdata/datamodule.h"
+
+#include "gromacs/analysisdata/paralleloptions.h"
+
+namespace gmx
+{
+
+bool AnalysisDataModuleSerial::parallelDataStarted(
+ AbstractAnalysisData *data,
+ const AnalysisDataParallelOptions & /*options*/)
+{
+ dataStarted(data);
+ return false;
+}
+
+void AnalysisDataModuleParallel::dataStarted(AbstractAnalysisData *data)
+{
+ AnalysisDataParallelOptions options;
+ (void)parallelDataStarted(data, options);
+}
+
+} // namespace gmx
*/
/*! \file
* \brief
- * Declares gmx::AnalysisDataModuleInterface.
+ * Declares gmx::AnalysisDataModuleInterface and related convenience classes.
*
* \author Teemu Murtola <teemu.murtola@gmail.com>
* \inlibraryapi
class AbstractAnalysisData;
class AnalysisDataFrameHeader;
+class AnalysisDataParallelOptions;
class AnalysisDataPointSetRef;
/*! \brief
*
* The interface provides one method (flags()) that describes features of
* data objects the module supports. Only most common features are included
- * in the flags; custom checks can be implemented in the dataStarted() method
- * (see below).
+ * in the flags; custom checks can be implemented in the dataStarted() and/or
+ * parallelDataStarted() methods (see below).
* All other methods in the interface are callbacks that are called by the
* data object to which the module is attached to describe the data.
*
- * The frames are presented to the module always in the order of increasing
- * indices, even if they become ready in a different order in the attached
- * data.
+ * The modules can operate in two modes: serial or parallel.
+ * In serial mode, the frames are presented to the module always in the order
+ * of increasing indices, even if they become ready in a different order in the
+ * attached data.
+ * In parallel mode, the frames are presented in the order that they become
+ * available in the input data, which may not be sequential. This mode allows
+ * the input data to optimize its behavior if it does not need to store and
+ * sort the frames.
+ * If the input data supports parallel mode, it calls parallelDataStarted().
+ * If the module returns true from this method, then it will process the frames
+ * in the parallel mode. If the module returns false, it will get the frames
+ * in serial order.
+ * If the input data does not support parallel mode, it calls dataStarted().
+ *
+ * Concrete modules typically do not directly derive from this interface, but
+ * from either AnalysisDataModuleSerial or AnalysisDataModuleParallel.
+ * Both these classes implement one of dataStarted()/parallelDataStarted() by
+ * forwarding the calls to the other method of this pair. This allows the
+ * module to only implement the initialization once, without needing to worry
+ * how to correctly handle both cases.
*
* Currently, if the module throws an exception, it requires the analysis tool
* to terminate, since AbstractAnalysisData will be left in a state where it
* \throws unspecified Can throw any exception required by the
* implementing class to report errors.
*
+ * When the data is ready, either this method or parallelDataStarted()
+ * is called, depending on the nature of the input data. If this
+ * method is called, the input data will always present the frames in
+ * sequential order.
+ *
* The data to which the module is attached is passed as an argument
* to provide access to properties of the data for initialization
* and/or validation. The module can also call
* AbstractAnalysisData::addColumnModule() was called.
*/
virtual void dataStarted(AbstractAnalysisData *data) = 0;
+ /*! \brief
+ * Called (once) for parallel data when the data has been set up.
+ *
+ * \param[in] data Data object to which the module is added.
+ * \param[in] options Parallelization properties of the input data.
+ * \returns true if the module can process the input in
+ * non-sequential order.
+ * \throws APIError if the provided data is not compatible.
+ * \throws unspecified Can throw any exception required by the
+ * implementing class to report errors.
+ *
+ * This method is called instead of dataStarted() if the input data has
+ * the capability to present data in non-sequential order.
+ * If the method returns true, then the module accepts this and frame
+ * notification methods may be called in that non-sequential order.
+ * If the method returns false, then the frame notification methods are
+ * called in sequential order, as if dataStarted() had been called.
+ *
+ * See dataStarted() for general information on initializing the data.
+ * That applies to this method as well, with the exception that calling
+ * AbstractAnalysisData::requestStorage() is currently not very well
+ * supported (or rather, accessing the requested storage doesn't work).
+ */
+ virtual bool parallelDataStarted(
+ AbstractAnalysisData *data,
+ const AnalysisDataParallelOptions &options) = 0;
/*! \brief
* Called at the start of each data frame.
*
* implementing class to report errors.
*/
virtual void dataFinished() = 0;
+};
+
+/*! \brief
+ * Convenience base class for serial analysis data modules.
+ *
+ * Implements the parallelDataStarted() method such that initialization is
+ * always forwarded to dataStarted(), and the module always behaves as serial
+ * (parallelDataStarted() returns false).
+ *
+ * \inlibraryapi
+ * \ingroup module_analysisdata
+ */
+class AnalysisDataModuleSerial : public AnalysisDataModuleInterface
+{
+ public:
+ virtual ~AnalysisDataModuleSerial() {}
+
+ virtual int flags() const = 0;
+
+ virtual void dataStarted(AbstractAnalysisData *data) = 0;
+ virtual void frameStarted(const AnalysisDataFrameHeader &frame) = 0;
+ virtual void pointsAdded(const AnalysisDataPointSetRef &points) = 0;
+ virtual void frameFinished(const AnalysisDataFrameHeader &header) = 0;
+ virtual void dataFinished() = 0;
+
+ private:
+ virtual bool parallelDataStarted(
+ AbstractAnalysisData *data,
+ const AnalysisDataParallelOptions &options);
+};
+
+/*! \brief
+ * Convenience base class for parallel analysis data modules.
+ *
+ * Implements the dataStarted() method such that initialization is always done
+ * in parallelDataStarted(). dataStarted() calls are forwarded to
+ * parallelDataStarted() using a dummy serial AnalysisDataParallelOptions.
+ *
+ * \inlibraryapi
+ * \ingroup module_analysisdata
+ */
+class AnalysisDataModuleParallel : public AnalysisDataModuleInterface
+{
+ public:
+ virtual ~AnalysisDataModuleParallel() {}
+
+ virtual int flags() const = 0;
+
+ virtual bool parallelDataStarted(
+ AbstractAnalysisData *data,
+ const AnalysisDataParallelOptions &options) = 0;
+ virtual void frameStarted(const AnalysisDataFrameHeader &frame) = 0;
+ virtual void pointsAdded(const AnalysisDataPointSetRef &points) = 0;
+ virtual void frameFinished(const AnalysisDataFrameHeader &header) = 0;
+ virtual void dataFinished() = 0;
- protected:
- AnalysisDataModuleInterface() {}
+ private:
+ virtual void dataStarted(AbstractAnalysisData *data);
};
} // namespace gmx
#include "gromacs/analysisdata/abstractdata.h"
#include "gromacs/analysisdata/dataframe.h"
#include "gromacs/analysisdata/datamodule.h"
+#include "gromacs/analysisdata/paralleloptions.h"
#include "gromacs/utility/exceptions.h"
#include "gromacs/utility/gmxassert.h"
class AnalysisDataModuleManager::Impl
{
public:
+ //! Stores information about an attached module.
+ struct ModuleInfo
+ {
+ //! Initializes the module information.
+ explicit ModuleInfo(AnalysisDataModulePointer module)
+ : module(module), bParallel(false)
+ {
+ }
+
+ //! Pointer to the actual module.
+ AnalysisDataModulePointer module;
+ //! Whether the module supports parallel processing.
+ bool bParallel;
+ };
+
//! Shorthand for list of modules added to the data.
- typedef std::vector<AnalysisDataModulePointer> ModuleList;
+ typedef std::vector<ModuleInfo> ModuleList;
//! Describes the current state of the notification methods.
enum State
bool bDataProperty_[eDataPropertyNR];
//! true if all modules support missing data.
bool bAllowMissing_;
+ //! true if there are modules that do not support parallel processing.
+ bool bSerialModules_;
+ //! true if there are modules that support parallel processing.
+ bool bParallelModules_;
/*! \brief
* Current state of the notification methods.
};
AnalysisDataModuleManager::Impl::Impl()
- : bAllowMissing_(true), state_(eNotStarted), currIndex_(0)
+ : bAllowMissing_(true), bSerialModules_(false), bParallelModules_(false),
+ state_(eNotStarted), currIndex_(0)
{
// This must be in sync with how AbstractAnalysisData is actually
// initialized.
Impl::ModuleList::const_iterator i;
for (i = impl_->modules_.begin(); i != impl_->modules_.end(); ++i)
{
- impl_->checkModuleProperty(**i, property, bSet);
+ impl_->checkModuleProperty(*i->module, property, bSet);
}
impl_->bDataProperty_[property] = bSet;
}
AnalysisDataModulePointer module)
{
impl_->checkModuleProperties(*module);
+ // TODO: Ensure that the system does not end up in an inconsistent state by
+ // adding a module in mid-data during parallel processing (probably best to
+ // prevent alltogether).
GMX_RELEASE_ASSERT(impl_->state_ != Impl::eInFrame,
"Cannot add a data module in mid-frame");
impl_->presentData(data, module.get());
{
impl_->bAllowMissing_ = false;
}
- impl_->modules_.push_back(module);
+ impl_->modules_.push_back(Impl::ModuleInfo(module));
}
void
}
+bool
+AnalysisDataModuleManager::hasSerialModules() const
+{
+ GMX_ASSERT(impl_->state_ != Impl::eNotStarted,
+ "Module state not accessible before data is started");
+ return impl_->bSerialModules_;
+}
+
+
void
-AnalysisDataModuleManager::notifyDataStart(AbstractAnalysisData *data) const
+AnalysisDataModuleManager::notifyDataStart(AbstractAnalysisData *data)
{
GMX_RELEASE_ASSERT(impl_->state_ == Impl::eNotStarted,
"notifyDataStart() called more than once");
GMX_RELEASE_ASSERT(data->columnCount(d) > 0,
"Data column count is not set");
}
- impl_->state_ = Impl::eInData;
+ impl_->state_ = Impl::eInData;
+ impl_->bSerialModules_ = !impl_->modules_.empty();
+ impl_->bParallelModules_ = false;
Impl::ModuleList::const_iterator i;
for (i = impl_->modules_.begin(); i != impl_->modules_.end(); ++i)
// This should not fail, since addModule() and
// dataPropertyAboutToChange() already do the checks, but kept here to
// catch potential bugs (perhaps it would be best to assert on failure).
- impl_->checkModuleProperties(**i);
- (*i)->dataStarted(data);
+ impl_->checkModuleProperties(*i->module);
+ i->module->dataStarted(data);
+ }
+}
+
+
+void
+AnalysisDataModuleManager::notifyParallelDataStart(
+ AbstractAnalysisData *data,
+ const AnalysisDataParallelOptions &options)
+{
+ GMX_RELEASE_ASSERT(impl_->state_ == Impl::eNotStarted,
+ "notifyDataStart() called more than once");
+ for (int d = 0; d < data->dataSetCount(); ++d)
+ {
+ GMX_RELEASE_ASSERT(data->columnCount(d) > 0,
+ "Data column count is not set");
+ }
+ impl_->state_ = Impl::eInData;
+ impl_->bSerialModules_ = false;
+ impl_->bParallelModules_ = false;
+
+ Impl::ModuleList::iterator i;
+ for (i = impl_->modules_.begin(); i != impl_->modules_.end(); ++i)
+ {
+ // This should not fail, since addModule() and
+ // dataPropertyAboutToChange() already do the checks, but kept here to
+ // catch potential bugs (perhaps it would be best to assert on failure).
+ impl_->checkModuleProperties(*i->module);
+ i->bParallel = i->module->parallelDataStarted(data, options);
+ if (i->bParallel)
+ {
+ impl_->bParallelModules_ = true;
+ }
+ else
+ {
+ impl_->bSerialModules_ = true;
+ }
}
}
GMX_ASSERT(header.index() == impl_->currIndex_, "Out of order frames");
impl_->state_ = Impl::eInFrame;
- Impl::ModuleList::const_iterator i;
- for (i = impl_->modules_.begin(); i != impl_->modules_.end(); ++i)
+ if (impl_->bSerialModules_)
{
- (*i)->frameStarted(header);
+ Impl::ModuleList::const_iterator i;
+ for (i = impl_->modules_.begin(); i != impl_->modules_.end(); ++i)
+ {
+ if (!i->bParallel)
+ {
+ i->module->frameStarted(header);
+ }
+ }
+ }
+}
+
+void
+AnalysisDataModuleManager::notifyParallelFrameStart(
+ const AnalysisDataFrameHeader &header) const
+{
+ if (impl_->bParallelModules_)
+ {
+ Impl::ModuleList::const_iterator i;
+ for (i = impl_->modules_.begin(); i != impl_->modules_.end(); ++i)
+ {
+ if (i->bParallel)
+ {
+ i->module->frameStarted(header);
+ }
+ }
}
}
// "Invalid columns");
GMX_ASSERT(points.frameIndex() == impl_->currIndex_,
"Points do not correspond to current frame");
- if (!impl_->bAllowMissing_ && !points.allPresent())
+ if (impl_->bSerialModules_)
{
- GMX_THROW(APIError("Missing data not supported by a module"));
+ if (!impl_->bAllowMissing_ && !points.allPresent())
+ {
+ GMX_THROW(APIError("Missing data not supported by a module"));
+ }
+
+ Impl::ModuleList::const_iterator i;
+ for (i = impl_->modules_.begin(); i != impl_->modules_.end(); ++i)
+ {
+ if (!i->bParallel)
+ {
+ i->module->pointsAdded(points);
+ }
+ }
}
+}
- Impl::ModuleList::const_iterator i;
- for (i = impl_->modules_.begin(); i != impl_->modules_.end(); ++i)
+
+void
+AnalysisDataModuleManager::notifyParallelPointsAdd(
+ const AnalysisDataPointSetRef &points) const
+{
+ // TODO: Add checks for column spans (requires passing the information
+ // about the column counts from somewhere).
+ //GMX_ASSERT(points.lastColumn() < columnCount(points.dataSetIndex()),
+ // "Invalid columns");
+ if (impl_->bParallelModules_)
{
- (*i)->pointsAdded(points);
+ if (!impl_->bAllowMissing_ && !points.allPresent())
+ {
+ GMX_THROW(APIError("Missing data not supported by a module"));
+ }
+
+ Impl::ModuleList::const_iterator i;
+ for (i = impl_->modules_.begin(); i != impl_->modules_.end(); ++i)
+ {
+ if (i->bParallel)
+ {
+ i->module->pointsAdded(points);
+ }
+ }
}
}
impl_->state_ = Impl::eInData;
++impl_->currIndex_;
- Impl::ModuleList::const_iterator i;
- for (i = impl_->modules_.begin(); i != impl_->modules_.end(); ++i)
+ if (impl_->bSerialModules_)
+ {
+ Impl::ModuleList::const_iterator i;
+ for (i = impl_->modules_.begin(); i != impl_->modules_.end(); ++i)
+ {
+ if (!i->bParallel)
+ {
+ i->module->frameFinished(header);
+ }
+ }
+ }
+}
+
+
+void
+AnalysisDataModuleManager::notifyParallelFrameFinish(
+ const AnalysisDataFrameHeader &header) const
+{
+ if (impl_->bParallelModules_)
{
- (*i)->frameFinished(header);
+ Impl::ModuleList::const_iterator i;
+ for (i = impl_->modules_.begin(); i != impl_->modules_.end(); ++i)
+ {
+ if (i->bParallel)
+ {
+ i->module->frameFinished(header);
+ }
+ }
}
}
Impl::ModuleList::const_iterator i;
for (i = impl_->modules_.begin(); i != impl_->modules_.end(); ++i)
{
- (*i)->dataFinished();
+ i->module->dataFinished();
}
}
namespace gmx
{
+class AnalysisDataParallelOptions;
+
/*! \libinternal \brief
* Encapsulates handling of data modules attached to AbstractAnalysisData.
*
*/
void dataPropertyAboutToChange(DataProperty property, bool bSet);
+ /*! \brief
+ * Whether there are modules that do not support parallel processing.
+ *
+ * Must not be called before notifyDataStart()/notifyParallelDataStart().
+ * If notifyDataStart() has been called, returns true if there are any
+ * modules (all modules are treated as serial).
+ *
+ * Does not throw.
+ */
+ bool hasSerialModules() const;
+
/*! \brief
* Adds a module to process the data.
*
AnalysisDataModuleInterface *module);
/*! \brief
- * Notifies attached modules of the start of data.
+ * Notifies attached modules of the start of serial data.
*
* \param data Data object that is starting.
* \throws APIError if any attached data module is not compatible.
*
* \p data should typically be \c this when calling from a class
* derived from AbstractAnalysisData.
+ *
+ * This method initializes all modules for serial processing by calling
+ * AnalysisDataModuleInterface::dataStarted().
*/
- void notifyDataStart(AbstractAnalysisData *data) const;
+ void notifyDataStart(AbstractAnalysisData *data);
/*! \brief
- * Notifies attached modules of the start of a frame.
+ * Notifies attached modules of the start of parallel data.
+ *
+ * \param data Data object that is starting.
+ * \param[in] options Parallelization properties of the input data.
+ * \throws APIError if any attached data module is not compatible.
+ * \throws unspecified Any exception thrown by attached data modules
+ * in AnalysisDataModuleInterface::parallelDataStarted().
+ *
+ * Can be called instead of notifyDataStart() if \p data supports
+ * non-sequential creation of frames. Works as notifyDataStart(),
+ * but instead calls AnalysisDataModuleInterface::parallelDataStarted()
+ * and records whether the module supports the parallel mode.
+ * Subsequent notification calls then notify the modules according to
+ * the mode they accept.
+ *
+ * See notifyDataStart() for general constraints.
+ */
+ void notifyParallelDataStart(
+ AbstractAnalysisData *data,
+ const AnalysisDataParallelOptions &options);
+ /*! \brief
+ * Notifies attached serial modules of the start of a frame.
*
* \param[in] header Header information for the frame that is starting.
* \throws unspecified Any exception thrown by attached data modules
*/
void notifyFrameStart(const AnalysisDataFrameHeader &header) const;
/*! \brief
- * Notifies attached modules of the addition of points to the
+ * Notifies attached parallel modules of the start of a frame.
+ *
+ * \param[in] header Header information for the frame that is starting.
+ * \throws unspecified Any exception thrown by attached data modules
+ * in AnalysisDataModuleInterface::frameStarted().
+ *
+ * If notifyParallelDataStart() has been called, should be called once
+ * for each frame, before notifyParallelPointsAdd() calls for that
+ * frame.
+ * It is allowed to call this method in any order for the frames, but
+ * should be called exactly once for each frame.
+ */
+ void notifyParallelFrameStart(const AnalysisDataFrameHeader &header) const;
+ /*! \brief
+ * Notifies attached serial modules of the addition of points to the
* current frame.
*
* \param[in] points Set of points added (also provides access to
*/
void notifyPointsAdd(const AnalysisDataPointSetRef &points) const;
/*! \brief
- * Notifies attached modules of the end of a frame.
+ * Notifies attached parallel modules of the addition of points to a frame.
+ *
+ * \param[in] points Set of points added (also provides access to
+ * frame-level data).
+ * \throws APIError if any attached data module is not compatible.
+ * \throws unspecified Any exception thrown by attached data modules
+ * in AnalysisDataModuleInterface::pointsAdded().
+ *
+ * See notifyPointsAdd() for information on the structure of the point
+ * sets.
+ */
+ void notifyParallelPointsAdd(const AnalysisDataPointSetRef &points) const;
+ /*! \brief
+ * Notifies attached serial modules of the end of a frame.
*
* \param[in] header Header information for the frame that is ending.
* \throws unspecified Any exception thrown by attached data modules
* notifyFrameStart() call.
*/
void notifyFrameFinish(const AnalysisDataFrameHeader &header) const;
+ /*! \brief
+ * Notifies attached parallel modules of the end of a frame.
+ *
+ * \param[in] header Header information for the frame that is ending.
+ * \throws unspecified Any exception thrown by attached data modules
+ * in AnalysisDataModuleInterface::frameFinished().
+ *
+ * Should be called once for each call of notifyParallelFrameStart(),
+ * after any notifyParallelPointsAdd() calls for the frame.
+ * \p header should be identical to that used in the corresponding
+ * notifyParallelFrameStart() call.
+ */
+ void notifyParallelFrameFinish(const AnalysisDataFrameHeader &header) const;
/*! \brief
* Notifies attached modules of the end of data.
*
AnalysisDataProxy::AnalysisDataProxy(int firstColumn, int columnSpan,
AbstractAnalysisData *data)
- : source_(*data), firstColumn_(firstColumn), columnSpan_(columnSpan)
+ : source_(*data), firstColumn_(firstColumn), columnSpan_(columnSpan),
+ bParallel_(false)
{
GMX_RELEASE_ASSERT(data != NULL, "Source data must not be NULL");
GMX_RELEASE_ASSERT(firstColumn >= 0 && columnSpan > 0, "Invalid proxy column");
}
+bool
+AnalysisDataProxy::parallelDataStarted(
+ AbstractAnalysisData *data,
+ const AnalysisDataParallelOptions &options)
+{
+ GMX_RELEASE_ASSERT(data == &source_, "Source data mismatch");
+ setDataSetCount(data->dataSetCount());
+ for (int i = 0; i < data->dataSetCount(); ++i)
+ {
+ setColumnCount(i, columnSpan_);
+ }
+ moduleManager().notifyParallelDataStart(this, options);
+ bParallel_ = !moduleManager().hasSerialModules();
+ return bParallel_;
+}
+
+
void
AnalysisDataProxy::frameStarted(const AnalysisDataFrameHeader &frame)
{
- moduleManager().notifyFrameStart(frame);
+ if (bParallel_)
+ {
+ moduleManager().notifyParallelFrameStart(frame);
+ }
+ else
+ {
+ moduleManager().notifyFrameStart(frame);
+ }
}
AnalysisDataPointSetRef columns(points, firstColumn_, columnSpan_);
if (columns.columnCount() > 0)
{
- moduleManager().notifyPointsAdd(columns);
+ if (bParallel_)
+ {
+ moduleManager().notifyParallelPointsAdd(columns);
+ }
+ else
+ {
+ moduleManager().notifyPointsAdd(columns);
+ }
}
}
void
AnalysisDataProxy::frameFinished(const AnalysisDataFrameHeader &header)
{
- moduleManager().notifyFrameFinish(header);
+ if (bParallel_)
+ {
+ moduleManager().notifyParallelFrameFinish(header);
+ }
+ else
+ {
+ moduleManager().notifyFrameFinish(header);
+ }
}
virtual int flags() const;
virtual void dataStarted(AbstractAnalysisData *data);
+ virtual bool parallelDataStarted(
+ AbstractAnalysisData *data,
+ const AnalysisDataParallelOptions &options);
virtual void frameStarted(const AnalysisDataFrameHeader &frame);
virtual void pointsAdded(const AnalysisDataPointSetRef &points);
virtual void frameFinished(const AnalysisDataFrameHeader &header);
AbstractAnalysisData &source_;
int firstColumn_;
int columnSpan_;
+ bool bParallel_;
// Copy and assign disallowed by base.
};
{
return isMultipoint() && storageLimit_ == 0 && pendingLimit_ == 1;
}
+ /*! \brief
+ * Returns whether data needs to be stored at all.
+ *
+ * This is used to optimize multipoint handling for parallel cases
+ * (where shouldNotifyImmediately() returns false),
+ * where it is not necessary to store even a single frame.
+ *
+ * \todo
+ * This could be extended to non-multipoint data as well.
+ *
+ * Does not throw.
+ */
+ bool needStorage() const
+ {
+ return storageLimit_ > 0 || (pendingLimit_ > 1 && modules_->hasSerialModules());
+ }
/*! \brief
* Calls notification methods for new frames.
*
*
* Should always be at least one.
*
+ * \todo
+ * Get rid of this alltogether, as it is no longer used much.
+ *
* \see AnalysisDataStorage::startFrame()
*/
int pendingLimit_;
GMX_RELEASE_ASSERT(storedFrame.frameIndex() == index,
"Inconsistent internal frame indexing");
builders_.push_back(storedFrame.finishFrame(isMultipoint()));
+ modules_->notifyParallelFrameFinish(storedFrame.header());
if (shouldNotifyImmediately())
{
++firstUnnotifiedIndex_;
AnalysisDataStorageFrameData::addPointSet(int dataSetIndex, int firstColumn,
ValueIterator begin, ValueIterator end)
{
- const int valueCount = end - begin;
+ const int valueCount = end - begin;
+ AnalysisDataPointSetInfo pointSetInfo(0, valueCount,
+ dataSetIndex, firstColumn);
+ AnalysisDataPointSetRef pointSet(header(), pointSetInfo,
+ AnalysisDataValuesRef(begin, end));
+ storageImpl().modules_->notifyParallelPointsAdd(pointSet);
if (storageImpl().shouldNotifyImmediately())
{
- AnalysisDataPointSetInfo pointSetInfo(0, valueCount,
- dataSetIndex, firstColumn);
- storageImpl().modules_->notifyPointsAdd(
- AnalysisDataPointSetRef(header(), pointSetInfo,
- AnalysisDataValuesRef(begin, end)));
+ storageImpl().modules_->notifyPointsAdd(pointSet);
}
- else
+ else if (storageImpl().needStorage())
{
pointSets_.push_back(
AnalysisDataPointSetInfo(values_.size(), valueCount,
"Point sets created for non-multipoint data");
values_ = builder_->values_;
builder_->clearValues();
+ for (int i = 0; i < pointSetCount(); ++i)
+ {
+ storageImpl().modules_->notifyParallelPointsAdd(pointSet(i));
+ }
}
else
{
}
-void
-AnalysisDataStorage::setParallelOptions(const AnalysisDataParallelOptions &opt)
-{
- impl_->pendingLimit_ = 2 * opt.parallelizationFactor() - 1;
-}
-
-
int
AnalysisDataStorage::frameCount() const
{
impl_->modules_ = modules;
if (!impl_->storeAll())
{
- impl_->extendBuffer(impl_->storageLimit_ + impl_->pendingLimit_ + 1);
+ // 2 = pending limit (1) + 1
+ impl_->extendBuffer(impl_->storageLimit_ + 2);
+ }
+}
+
+
+void
+AnalysisDataStorage::startParallelDataStorage(
+ AbstractAnalysisData *data,
+ AnalysisDataModuleManager *modules,
+ const AnalysisDataParallelOptions &options)
+{
+ const int pendingLimit = 2 * options.parallelizationFactor() - 1;
+ impl_->pendingLimit_ = pendingLimit;
+ modules->notifyParallelDataStart(data, options);
+ // Data needs to be set before calling extendBuffer()
+ impl_->data_ = data;
+ impl_->modules_ = modules;
+ if (!impl_->storeAll())
+ {
+ impl_->extendBuffer(impl_->storageLimit_ + pendingLimit + 1);
}
}
GMX_RELEASE_ASSERT(storedFrame->frameIndex() == header.index(),
"Inconsistent internal frame indexing");
storedFrame->startFrame(header, impl_->getFrameBuilder());
+ impl_->modules_->notifyParallelFrameStart(header);
if (impl_->shouldNotifyImmediately())
{
impl_->modules_->notifyFrameStart(header);
*
* After this method has been called, all values appear as not set.
*
- * May call AnalysisDataModuleManager::notifyPointsAdd(), and may throw
- * any exception this method throws.
+ * May call AnalysisDataModuleManager::notifyPointsAdd() and
+ * AnalysisDataModuleManager::notifyParallelPointsAdd(), and may throw
+ * any exception these methods throw.
*/
void finishPointSet();
/*! \brief
* To use this class in a class derived from AbstractAnalysisData, a member
* variable of this type should be declared and the pure virtual methods
* forwarded to frameCount(), tryGetDataFrame() and requestStorage().
- * Storage properties should be set up, and then startDataStorage() called.
+ * Storage properties should be set up, and then startDataStorage() or
+ * startParallelDataStorage() called.
* New frames can then be added using startFrame(), currentFrame() and
* finishFrame() methods. When all frames are ready, finishDataStorage() must
* be called. These methods (and AnalysisDataStorageFrame::finishPointSet())
AnalysisDataStorage();
~AnalysisDataStorage();
- /*! \brief
- * Set parallelization options for the storage.
- *
- * \param[in] opt Parallization options to use.
- *
- * If this method is not called, the storage is set up for serial
- * storage only.
- *
- * Does not throw.
- */
- void setParallelOptions(const AnalysisDataParallelOptions &opt);
-
/*! \brief
* Returns the number of ready frames.
*
*/
void startDataStorage(AbstractAnalysisData *data,
AnalysisDataModuleManager *modules);
+ /*! \brief
+ * Start storing data in parallel.
+ *
+ * \param[in] data AbstractAnalysisData object containing this
+ * storage.
+ * \param[in] options Parallelization options to use.
+ * \param modules Module manager for \p data.
+ * \exception std::bad_alloc if storage allocation fails.
+ *
+ * Should be called instead of startDataStorage() if the data will be
+ * produced in parallel. Works as startDataStorage(), but additionally
+ * initializes the storage and the attached modules to prepare for
+ * out-of-order data frames.
+ *
+ * Calls AnalysisDataModuleManager::notifyParallelDataStart(), and
+ * throws any exceptions this method throws.
+ */
+ void startParallelDataStorage(
+ AbstractAnalysisData *data,
+ AnalysisDataModuleManager *modules,
+ const AnalysisDataParallelOptions &options);
/*! \brief
* Starts storing a new frame.
*
* setParallelOptions().
* Throws APIError if this constraint is violated.
*
- * Calls AnalysisDataModuleManager::notifyFrameStart() in certain
- * cases, and throws any exceptions this method throws.
+ * Calls AnalysisDataModuleManager::notifyFrameStart() (in certain
+ * cases) and AnalysisDataModuleManager::notifyParallelFrameStart(),
+ * and throws any exceptions these methods throw.
*/
AnalysisDataStorageFrame &startFrame(const AnalysisDataFrameHeader &header);
/*! \brief
* \ingroup module_analysisdata
*/
class AnalysisDataAverageModule : public AbstractAnalysisArrayData,
- public AnalysisDataModuleInterface
+ public AnalysisDataModuleSerial
{
public:
AnalysisDataAverageModule();
* \ingroup module_analysisdata
*/
class AnalysisDataFrameAverageModule : public AbstractAnalysisData,
- public AnalysisDataModuleInterface
+ public AnalysisDataModuleSerial
{
public:
AnalysisDataFrameAverageModule();
/*
* This file is part of the GROMACS molecular simulation package.
*
- * Copyright (c) 2010,2011,2012, by the GROMACS development team, led by
+ * Copyright (c) 2010,2011,2012,2013, by the GROMACS development team, led by
* David van der Spoel, Berk Hess, Erik Lindahl, and including many
* others, as listed in the AUTHORS file in the top-level source
* directory and at http://www.gromacs.org.
* \ingroup module_analysisdata
*/
class AnalysisDataDisplacementModule : public AbstractAnalysisData,
- public AnalysisDataModuleInterface
+ public AnalysisDataModuleSerial
{
public:
AnalysisDataDisplacementModule();
{
sum += value(i, c).value();
}
- scaleSingle(c, 1.0 / (sum * xstep()));
+ if (sum > 0.0)
+ {
+ scaleSingle(c, 1.0 / (sum * xstep()));
+ }
}
}
* \ingroup module_analysisdata
*/
class BasicAverageHistogramModule : public AbstractAverageHistogram,
- public AnalysisDataModuleInterface
+ public AnalysisDataModuleSerial
{
public:
BasicAverageHistogramModule();
}
-void
-AnalysisDataSimpleHistogramModule::dataStarted(AbstractAnalysisData *data)
+bool
+AnalysisDataSimpleHistogramModule::parallelDataStarted(
+ AbstractAnalysisData *data,
+ const AnalysisDataParallelOptions &options)
{
addModule(impl_->averager_);
setDataSetCount(data->dataSetCount());
{
setColumnCount(i, settings().binCount());
}
- impl_->storage_.startDataStorage(this, &moduleManager());
+ impl_->storage_.startParallelDataStorage(this, &moduleManager(), options);
+ return true;
}
}
-void
-AnalysisDataWeightedHistogramModule::dataStarted(AbstractAnalysisData *data)
+bool
+AnalysisDataWeightedHistogramModule::parallelDataStarted(
+ AbstractAnalysisData *data,
+ const AnalysisDataParallelOptions &options)
{
addModule(impl_->averager_);
setDataSetCount(data->dataSetCount());
{
setColumnCount(i, settings().binCount());
}
- impl_->storage_.startDataStorage(this, &moduleManager());
+ impl_->storage_.startParallelDataStorage(this, &moduleManager(), options);
+ return true;
}
* \ingroup module_analysisdata
*/
class AnalysisDataSimpleHistogramModule : public AbstractAnalysisData,
- public AnalysisDataModuleInterface
+ public AnalysisDataModuleParallel
{
public:
/*! \brief
virtual int flags() const;
- virtual void dataStarted(AbstractAnalysisData *data);
+ virtual bool parallelDataStarted(
+ AbstractAnalysisData *data,
+ const AnalysisDataParallelOptions &options);
virtual void frameStarted(const AnalysisDataFrameHeader &header);
virtual void pointsAdded(const AnalysisDataPointSetRef &points);
virtual void frameFinished(const AnalysisDataFrameHeader &header);
* \ingroup module_analysisdata
*/
class AnalysisDataWeightedHistogramModule : public AbstractAnalysisData,
- public AnalysisDataModuleInterface
+ public AnalysisDataModuleParallel
{
public:
//! \copydoc AnalysisDataSimpleHistogramModule::AnalysisDataSimpleHistogramModule()
virtual int flags() const;
- virtual void dataStarted(AbstractAnalysisData *data);
+ virtual bool parallelDataStarted(
+ AbstractAnalysisData *data,
+ const AnalysisDataParallelOptions &options);
virtual void frameStarted(const AnalysisDataFrameHeader &header);
virtual void pointsAdded(const AnalysisDataPointSetRef &points);
virtual void frameFinished(const AnalysisDataFrameHeader &header);
* \ingroup module_analysisdata
*/
class AnalysisDataBinAverageModule : public AbstractAnalysisArrayData,
- public AnalysisDataModuleInterface
+ public AnalysisDataModuleSerial
{
public:
//! \copydoc AnalysisDataSimpleHistogramModule::AnalysisDataSimpleHistogramModule()
* \ingroup module_analysisdata
*/
class AnalysisDataLifetimeModule : public AbstractAnalysisArrayData,
- public AnalysisDataModuleInterface
+ public AnalysisDataModuleSerial
{
public:
AnalysisDataLifetimeModule();
*
* \ingroup module_analysisdata
*/
-class AbstractPlotModule : public AnalysisDataModuleInterface
+class AbstractPlotModule : public AnalysisDataModuleSerial
{
public:
virtual ~AbstractPlotModule();
{
AnalysisDataTestFixture::addStaticCheckerModule(input_, &data_);
}
+ void addStaticParallelCheckerModule()
+ {
+ AnalysisDataTestFixture::addStaticParallelCheckerModule(input_, &data_);
+ }
void addStaticColumnCheckerModule(int firstColumn, int columnCount)
{
AnalysisDataTestFixture::addStaticColumnCheckerModule(
ASSERT_NO_THROW_GMX(AnalysisDataTest::presentAllData());
}
+/*
+ * Tests that data is forwarded correctly to modules when there are only
+ * parallel modules.
+ */
+TYPED_TEST(AnalysisDataCommonTest, CallsParallelModuleCorrectly)
+{
+ ASSERT_NO_THROW_GMX(AnalysisDataTest::addStaticParallelCheckerModule());
+ ASSERT_NO_THROW_GMX(AnalysisDataTest::addStaticParallelCheckerModule());
+ ASSERT_NO_THROW_GMX(AnalysisDataTest::presentAllData());
+}
+
+/*
+ * Tests that data is forwarded correctly to modules when there are both
+ * parallel and serial modules.
+ */
+TYPED_TEST(AnalysisDataCommonTest, CallsMixedModulesCorrectly)
+{
+ ASSERT_NO_THROW_GMX(AnalysisDataTest::addStaticCheckerModule());
+ ASSERT_NO_THROW_GMX(AnalysisDataTest::addStaticParallelCheckerModule());
+ ASSERT_NO_THROW_GMX(AnalysisDataTest::presentAllData());
+}
+
/*
* Tests that data is forwarded correctly to modules that are added using
* addColumnModule().
TYPED_TEST(AnalysisDataCommonTest, CallsModuleCorrectlyWithOutOfOrderFrames)
{
ASSERT_NO_THROW_GMX(AnalysisDataTest::addStaticCheckerModule());
+ ASSERT_NO_THROW_GMX(AnalysisDataTest::addStaticParallelCheckerModule());
ASSERT_NO_THROW_GMX(AnalysisDataTest::addStaticColumnCheckerModule(1, 2));
gmx::AnalysisDataHandle handle1;
gmx::AnalysisDataHandle handle2;
*
* \ingroup module_analysisdata
*/
-class IndexFileWriterModule : public AnalysisDataModuleInterface
+class IndexFileWriterModule : public AnalysisDataModuleSerial
{
public:
IndexFileWriterModule();
AbstractAnalysisData *source)
{
MockAnalysisDataModulePointer module(new MockAnalysisDataModule(0));
- module->setupStaticCheck(data, source);
+ module->setupStaticCheck(data, source, false);
+ source->addModule(module);
+}
+
+
+void
+AnalysisDataTestFixture::addStaticParallelCheckerModule(
+ const AnalysisDataTestInput &data,
+ AbstractAnalysisData *source)
+{
+ MockAnalysisDataModulePointer module(new MockAnalysisDataModule(0));
+ module->setupStaticCheck(data, source, true);
source->addModule(module);
}
*/
static void addStaticCheckerModule(const AnalysisDataTestInput &data,
AbstractAnalysisData *source);
+ /*! \brief
+ * Adds a mock module that verifies parallel output against
+ * AnalysisDataTestInput.
+ *
+ * \param[in] data Data to compare against.
+ * \param source Data object to verify.
+ *
+ * Creates a parallel mock module that verifies that the
+ * AnalysisDataModuleInterface methods are called correctly by
+ * \p source. Parameters for the calls are verified against \p data.
+ * Adds the created module to \p source using \p data->addModule().
+ * Any exceptions from the called functions should be caught by the
+ * caller.
+ *
+ * Differs from addStaticCheckerModule() in that the created mock
+ * module reports that it accepts parallel input data, and accepts and
+ * verifies notification calls following the parallel pattern.
+ *
+ * \see AbstractAnalysisData::addModule()
+ */
+ static void addStaticParallelCheckerModule(
+ const AnalysisDataTestInput &data,
+ AbstractAnalysisData *source);
/*! \brief
* Adds a column mock module that verifies output against
* AnalysisDataTestInput.
int storageCount_;
};
+/*! \brief
+ * Sets the mock object expectation to mimick AnalysisDataModuleSerial.
+ *
+ * Makes MockAnalysisDataModule::parallelDataStarted() behave as if the mock
+ * object was an AnalysisDataModuleSerial object: forward the call to
+ * MockAnalysisDataModule::dataStarted() and return false.
+ */
+void setSerialExpectationForParallelDataStarted(MockAnalysisDataModule *mock)
+{
+ using ::testing::_;
+ using ::testing::AtMost;
+ using ::testing::DoAll;
+ using ::testing::Invoke;
+ using ::testing::Return;
+ using ::testing::WithArg;
+ EXPECT_CALL(*mock, parallelDataStarted(_, _))
+ .Times(AtMost(1))
+ .WillOnce(DoAll(WithArg<0>(Invoke(mock, &MockAnalysisDataModule::dataStarted)),
+ Return(false)));
+}
+
} // anonymous namespace
void
MockAnalysisDataModule::setupStaticCheck(const AnalysisDataTestInput &data,
- AbstractAnalysisData *source)
+ AbstractAnalysisData *source,
+ bool bParallel)
{
impl_->flags_ |= efAllowMulticolumn | efAllowMultipoint | efAllowMultipleDataSets;
- ::testing::InSequence dummy;
using ::testing::_;
using ::testing::Invoke;
+ using ::testing::Property;
+ using ::testing::Return;
- EXPECT_CALL(*this, dataStarted(source));
- for (int row = 0; row < data.frameCount(); ++row)
+ if (bParallel)
{
- const AnalysisDataTestInputFrame &frame = data.frame(row);
- EXPECT_CALL(*this, frameStarted(_))
- .WillOnce(Invoke(StaticDataFrameHeaderChecker(&frame)));
- for (int ps = 0; ps < frame.pointSetCount(); ++ps)
+ ::testing::Expectation init =
+ EXPECT_CALL(*this, parallelDataStarted(source, _))
+ .WillOnce(Return(true));
+ ::testing::ExpectationSet framesFinished;
+ for (int row = 0; row < data.frameCount(); ++row)
{
- const AnalysisDataTestInputPointSet &points = frame.pointSet(ps);
- StaticDataPointsChecker checker(&frame, &points, 0,
- data.columnCount(points.dataSetIndex()));
- EXPECT_CALL(*this, pointsAdded(_)).WillOnce(Invoke(checker));
+ ::testing::InSequence frameSequence;
+ const AnalysisDataTestInputFrame &frame = data.frame(row);
+ EXPECT_CALL(*this, frameStarted(Property(&AnalysisDataFrameHeader::index, row)))
+ .After(init)
+ .WillOnce(Invoke(StaticDataFrameHeaderChecker(&frame)));
+ for (int ps = 0; ps < frame.pointSetCount(); ++ps)
+ {
+ const AnalysisDataTestInputPointSet &points = frame.pointSet(ps);
+ StaticDataPointsChecker checker(&frame, &points, 0,
+ data.columnCount(points.dataSetIndex()));
+ EXPECT_CALL(*this, pointsAdded(Property(&AnalysisDataPointSetRef::frameIndex, row)))
+ .WillOnce(Invoke(checker));
+ }
+ framesFinished +=
+ EXPECT_CALL(*this, frameFinished(Property(&AnalysisDataFrameHeader::index, row)))
+ .WillOnce(Invoke(StaticDataFrameHeaderChecker(&frame)));
}
- EXPECT_CALL(*this, frameFinished(_))
- .WillOnce(Invoke(StaticDataFrameHeaderChecker(&frame)));
+ EXPECT_CALL(*this, dataFinished())
+ .After(framesFinished);
+ }
+ else
+ {
+ ::testing::InSequence dummy;
+ setSerialExpectationForParallelDataStarted(this);
+ EXPECT_CALL(*this, dataStarted(source));
+ for (int row = 0; row < data.frameCount(); ++row)
+ {
+ const AnalysisDataTestInputFrame &frame = data.frame(row);
+ EXPECT_CALL(*this, frameStarted(_))
+ .WillOnce(Invoke(StaticDataFrameHeaderChecker(&frame)));
+ for (int ps = 0; ps < frame.pointSetCount(); ++ps)
+ {
+ const AnalysisDataTestInputPointSet &points = frame.pointSet(ps);
+ StaticDataPointsChecker checker(&frame, &points, 0,
+ data.columnCount(points.dataSetIndex()));
+ EXPECT_CALL(*this, pointsAdded(_)).WillOnce(Invoke(checker));
+ }
+ EXPECT_CALL(*this, frameFinished(_))
+ .WillOnce(Invoke(StaticDataFrameHeaderChecker(&frame)));
+ }
+ EXPECT_CALL(*this, dataFinished());
}
- EXPECT_CALL(*this, dataFinished());
}
using ::testing::_;
using ::testing::Invoke;
+ setSerialExpectationForParallelDataStarted(this);
EXPECT_CALL(*this, dataStarted(_));
for (int row = 0; row < data.frameCount(); ++row)
{
using ::testing::_;
using ::testing::Invoke;
+ setSerialExpectationForParallelDataStarted(this);
EXPECT_CALL(*this, dataStarted(source))
.WillOnce(Invoke(DataStorageRequester(storageCount)));
for (int row = 0; row < data.frameCount(); ++row)
using ::testing::Expectation;
using ::testing::Invoke;
+ setSerialExpectationForParallelDataStarted(this);
Expectation dataStart = EXPECT_CALL(*this, dataStarted(source))
.WillOnce(Invoke(impl_.get(), &Impl::startReferenceData));
Expectation frameStart = EXPECT_CALL(*this, frameStarted(_))
/*
* This file is part of the GROMACS molecular simulation package.
*
- * Copyright (c) 2011,2012, by the GROMACS development team, led by
+ * Copyright (c) 2011,2012,2013, by the GROMACS development team, led by
* David van der Spoel, Berk Hess, Erik Lindahl, and including many
* others, as listed in the AUTHORS file in the top-level source
* directory and at http://www.gromacs.org.
#include "gromacs/analysisdata/dataframe.h"
#include "gromacs/analysisdata/datamodule.h"
+#include "gromacs/analysisdata/paralleloptions.h"
#include "gromacs/utility/common.h"
namespace gmx
virtual int flags() const;
+ MOCK_METHOD2(parallelDataStarted,
+ bool(AbstractAnalysisData *data,
+ const AnalysisDataParallelOptions &options));
MOCK_METHOD1(dataStarted, void(AbstractAnalysisData *data));
MOCK_METHOD1(frameStarted, void(const AnalysisDataFrameHeader &header));
MOCK_METHOD1(pointsAdded, void(const AnalysisDataPointSetRef &points));
MOCK_METHOD0(dataFinished, void());
void setupStaticCheck(const AnalysisDataTestInput &data,
- AbstractAnalysisData *source);
+ AbstractAnalysisData *source,
+ bool bParallel);
void setupStaticColumnCheck(const AnalysisDataTestInput &data,
int firstcol, int n,
AbstractAnalysisData *source);