From 87f443c761f3cfed0798f7740502e04fd7450fc8 Mon Sep 17 00:00:00 2001 From: "M. Eric Irrgang" Date: Mon, 9 Sep 2019 18:08:52 +0300 Subject: [PATCH] Allow gmxapi.mdrun to properly dispatch simulation ensembles. Update the gmxapi.simulation.mdrun implementation to be more normative. Dispatch the simulation work to the gmxapi 0.0.7 implementation in gmxapi.simulation.workflow and gmxapi.simulation.context. Change-Id: Id223507fc064e178992bc87d6fcf89bdd4029523 --- python_packaging/src/gmxapi/simulation/abc.py | 56 ++ .../src/gmxapi/simulation/mdrun.py | 555 ++++++++++++++++-- python_packaging/src/test/test_mdrun.py | 60 +- 3 files changed, 609 insertions(+), 62 deletions(-) create mode 100644 python_packaging/src/gmxapi/simulation/abc.py diff --git a/python_packaging/src/gmxapi/simulation/abc.py b/python_packaging/src/gmxapi/simulation/abc.py new file mode 100644 index 0000000000..075a62e6be --- /dev/null +++ b/python_packaging/src/gmxapi/simulation/abc.py @@ -0,0 +1,56 @@ +# +# This file is part of the GROMACS molecular simulation package. +# +# Copyright (c) 2019, by the GROMACS development team, led by +# Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl, +# and including many others, as listed in the AUTHORS file in the +# top-level source directory and at http://www.gromacs.org. +# +# GROMACS is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public License +# as published by the Free Software Foundation; either version 2.1 +# of the License, or (at your option) any later version. +# +# GROMACS is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with GROMACS; if not, see +# http://www.gnu.org/licenses, or write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +# If you want to redistribute modifications to GROMACS, please +# consider that scientific software is very special. Version +# control is crucial - bugs must be traceable. We will be happy to +# consider code for inclusion in the official distribution, but +# derived work must not be called official GROMACS. Details are found +# in the README & COPYING files - if they are missing, get the +# official version at http://www.gromacs.org. +# +# To help us fund GROMACS development, we humbly ask that you cite +# the research papers on the package. Check out http://www.gromacs.org. + +""" +Abstract base classes for gmxapi.simulation module +================================================== + +These classes allow static and dynamic type checking for gmxapi Python +interfaces. Some may be used as bases to inherit behaviors, but users should +not assume that a gmxapi object actually inherits from classes in this module. + +For more information on the concept of abstract base classes in Python, refer +to https://docs.python.org/3/library/abc.html + +For more on type hinting, see https://docs.python.org/3/library/typing.html +""" + +import abc + + +class ModuleObject(abc.ABC): + """Extended interface for objects in the gmaxpi.simulation module. + + Implies availability of additional binding details. + """ diff --git a/python_packaging/src/gmxapi/simulation/mdrun.py b/python_packaging/src/gmxapi/simulation/mdrun.py index 876ab83430..370668a136 100644 --- a/python_packaging/src/gmxapi/simulation/mdrun.py +++ b/python_packaging/src/gmxapi/simulation/mdrun.py @@ -39,52 +39,493 @@ not use the Python Context resource manager. It uses either the legacy 0.0.7 Context or its own Context, also implemented in this module. """ +__all__ = ['mdrun'] + +import inspect +import os import typing +from contextlib import contextmanager +import gmxapi +import gmxapi.abc +import gmxapi.operation as _op from gmxapi import exceptions -from gmxapi.operation import AbstractOperation -from gmxapi.operation import function_wrapper + # The following imports are not marked as public API. -# TODO: Resolve public API and exposure. +from .abc import ModuleObject + +from . import fileio from . import workflow +# Initialize module-level logger +from gmxapi import logger as root_logger -def mdrun_dispatcher(context, *, input, label: str = None, **kwargs) -> typing.Type['AbstractOperation']: - """Dispatch to an appropriate director based on the context and input. +logger = root_logger.getChild('mdrun') +logger.info('Importing {}'.format(__name__)) - Runs appropriate director code to set up an operation, returning a handle to - a simulation operation node. - Arguments: - context: Execution context in which to attempt to instantiate an operation from input - input: operation input. Argument type is context dependent. - label: human readable tag for the work node to be created. +class OutputDataProxy(_op.DataProxyBase, + descriptors={ + '_work_dir': _op.OutputDataDescriptor('_work_dir', str)}): + """Implement the 'output' attribute of MDRun operations.""" - Additional key word arguments are passed on to the dispatched factory. - """ - # TODO: This is a legacy import that should be updated. - from .context import get_context - if label is not None: - raise exceptions.NotImplementedError('sorry... no labels yet') - try: - legacy_context = get_context(work=input) - except Exception: - legacy_context = None - - def run_session(): - with legacy_context as session: - session.run() - return True - - if context is not None and context is legacy_context: - helper = function_wrapper(output={'data': bool})(run_session) - return helper(**kwargs) +class Publisher(gmxapi.operation.Publisher): + """Implement the publishing data descriptor for MDRun output.""" + + def __set__(self, instance: 'PublishingDataProxy', value): + super().__set__(instance, value) + + +class PublishingDataProxy(_op.DataProxyBase, + descriptors={ + '_work_dir': Publisher('_work_dir', str)} + ): + """Manage output resource updates for MDRun operation.""" + + +# +# Helpers +# + +# Output in the gmxapi.operation Context. +_output = _op.OutputCollectionDescription(_work_dir=str) + +# Input in the gmxapi.operation Context for the dispatching runner. +# The default empty dictionary for parameters just means that there are no overrides +# to the parameters already provided in _simulation_input. +_input = _op.InputCollectionDescription( + [('_simulation_input', inspect.Parameter('_simulation_input', + inspect.Parameter.POSITIONAL_OR_KEYWORD, + annotation=str)), + ('parameters', inspect.Parameter('parameters', + inspect.Parameter.POSITIONAL_OR_KEYWORD, + annotation=dict, + default=dict())) + ]) + + +def _standard_node_resource_factory(*args, **kwargs): + """Translate Python UI input to the gmxapi.operation node builder inputs.""" + source_collection = _input.bind(*args, **kwargs) + logger.info('mdrun input bound as source collection {}'.format(source_collection)) + return source_collection + + +_output_factory = _op.OutputFactory(output_proxy=OutputDataProxy, + output_description=_output, + publishing_data_proxy=PublishingDataProxy) + + +@contextmanager +def scoped_communicator(original_comm, requested_size: int = None): + from gmxapi.simulation.context import _acquire_communicator, _get_ensemble_communicator + + if requested_size is None: + communicator = _acquire_communicator(communicator=original_comm) + else: - raise exceptions.ValueError('Could not dispatch MD input {} with context {}'.format(input, legacy_context)) + if original_comm is None or not hasattr(original_comm, 'Get_size'): + raise exceptions.UsageError('A valid communicator must be provided when requesting a specific size.') + original_comm_size = original_comm.Get_size() + if original_comm_size < requested_size: + raise exceptions.FeatureNotAvailableError( + 'Cannot produce a subcommunicator of size {} from a communicator of size {}.'.format( + requested_size, + original_comm_size + )) + assert original_comm_size >= requested_size + communicator = _get_ensemble_communicator(original_comm, requested_size) + + try: + yield communicator + finally: + communicator.Free() + + +class LegacyImplementationSubscription(object): + """Input type representing a subscription to 0.0.7 implementation in gmxapi.operation. + + This input resource is a subscription to work that is dispatched to a sub-context. + The resource can be created from the standard data of the simulation module. + """ + + def __init__(self, resource_manager: _op.ResourceManager): + from .context import Context as LegacyContext + import gmxapi._gmxapi as _gmxapi + self._gmxapi = _gmxapi + + assert isinstance(resource_manager, _op.ResourceManager) + # We use several details of the gmxapi.operation.Context.ResourceManager. + # These dependencies can evolve into the subscription protocol between Contexts. + + # Configure and run a gmxapi 0.0.7 session. + # 0. Determine ensemble width. + # 1. Choose, create/check working directories. + # 2. Create source TPR. + # 3. Create workspec. + # 3.5 Add plugin potentials, if any. + # 4. Run. + # 5. Collect outputs from context (including plugin outputs) and be ready to publish them. + + # Determine ensemble width + ensemble_width = resource_manager.ensemble_width + + # Choose working directories + # TODO: operation working directory naming scheme should be centrally well-defined. + # Note that workflow.WorkSpec.uid is currently dependent on the input file parameter, + # so we cannot create the input file path in the working directory based on WorkSpec.uid. + workdir_list = ['{node}_{member}'.format(node=resource_manager.operation_id, + member=member) + for member in range(ensemble_width)] + + # This is a reasonable place to start using MPI ensemble implementation details. + # We will want better abstraction in the future, but it is best if related filesystem + # accesses occur from the same processes, consistently. Note that we already + # handle some optimization and dependency reduction when the ensemble size is 1. + # TODO: multithread and multiprocess alternatives to MPI ensemble management. + + # TODO: Allow user to provide communicator instead of implicitly getting COMM_WORLD + with scoped_communicator(None) as context_comm: + context_rank = context_comm.Get_rank() + with scoped_communicator(context_comm, ensemble_width) as ensemble_comm: + # Note that in the current implementation, extra ranks have nothing to do, + # but they may have a dummy communicator, so be sure to skip those members + # of the context_comm. + if context_rank < ensemble_width: + assert ensemble_comm.Get_size() == ensemble_width + ensemble_rank = ensemble_comm.Get_rank() + # TODO: This should be a richer object that includes at least host information + # and preferably the full gmxapi Future interface. + self.workdir = os.path.abspath(workdir_list[ensemble_rank]) + + with resource_manager.local_input(member=ensemble_rank) as input_pack: + source_file = input_pack.kwargs['_simulation_input'] + parameters = input_pack.kwargs['parameters'] + # If there are any other key word arguments to process from the gmxapi.mdrun + # factory call, do it here. + + # TODO: We should really name this file with a useful input-dependent tag. + tprfile = os.path.join(self.workdir, 'topol.tpr') + + expected_working_files = [tprfile] + + if os.path.exists(self.workdir): + if os.path.isdir(self.workdir): + # Confirm that this is a restarted simulation. + # It is unspecified by the API, but at least through gmxapi 0.1, + # all simulations are initialized with a checkpoint file named state.cpt + # (see src/api/cpp/context.cpp) + checkpoint_file = os.path.join(self.workdir, 'state.cpp') + expected_working_files.append(checkpoint_file) + + for file in expected_working_files: + if not os.path.exists(file): + raise exceptions.ApiError( + 'Cannot determine working directory state: {}'.format(self.workdir)) + else: + raise exceptions.ApiError( + 'Chosen working directory path exists but is not a directory: {}'.format(self.workdir)) + else: + # Build the working directory and input files. + os.mkdir(self.workdir) + sim_input = fileio.read_tpr(source_file) + for key, value in parameters.items(): + try: + sim_input.parameters.set(key=key, value=value) + except _gmxapi.Exception as e: + raise exceptions.ApiError( + 'Bug encountered. Unknown error when trying to set simulation ' + 'parameter {} to {}'.format(key, value) + ) from e + + fileio.write_tpr_file(output=tprfile, input=sim_input) + logger.info('Created {} on rank {}'.format(tprfile, context_rank)) + + # Gather the actual working directories from the ensemble members. + if hasattr(ensemble_comm, 'allgather'): + # We should not assume that abspath expands the same on different MPI ranks. + workdir_list = ensemble_comm.allgather(self.workdir) + tpr_filenames = ensemble_comm.allgather(tprfile) + else: + workdir_list = [os.path.abspath(workdir) for workdir in workdir_list] + # TODO: If we use better input file names, they need to be updated in multiple places. + tpr_filenames = [os.path.join(workdir, 'topol.tpr') for workdir in workdir_list] + + logger.debug('Context rank {} acknowledges working directories {}'.format(context_rank, + workdir_list)) + logger.debug('Operation {}:{} will use {}'.format(resource_manager.operation_id, + ensemble_rank, + self.workdir + )) + work = workflow.from_tpr(tpr_filenames) + self.workspec = work.workspec + context = LegacyContext(work=self.workspec, workdir_list=workdir_list, communicator=ensemble_comm) + self.simulation_module_context = context + # Go ahead and execute immediately. No need for lazy initialization in this basic case. + with self.simulation_module_context as session: + session.run() + # TODO: There may be some additional results that we need to extract... + # end: if context_rank < ensemble_width + + # end scoped_communicator: ensemble_comm + + if context_comm.Get_size() > 1: + context_comm.bcast(workdir_list, root=0) + # end scoped_communicator: context_comm + + self.workdir = workdir_list + + +class SubscriptionSessionResources(object): + """Input and output run-time resources for a MDRun subscription. + + A generalization of this class is the probably the main hook for customizing the resources + provided to the operation at run time. + + .. todo:: Better factoring of SessionResources, ResourceFactory, Director.resource_factory. + """ + + def __init__(self, input: LegacyImplementationSubscription, output: PublishingDataProxy): + assert isinstance(input, LegacyImplementationSubscription) + assert isinstance(output, PublishingDataProxy) + self.output = output + member_id = self.output._client_identifier + # Before enabling the following, be sure we understand what is happening. + # if member_id is None: + # member_id = 0 + self.workdir = input.workdir[member_id] + + # @staticmethod + # def create(input: LegacyImplementationSubscription, + # output: PublishingDataProxy) -> 'SubscriptionSessionResources': + # """Factory function to create new resource instances. + # + # It is better to overload a factory member than to write a fancy __init__. + # An alternative to overloading is to make this a classmethod for polymorphic + # creation. Note that Python classes (can) know what their subclasses are. + # """ + # if isinstance(input, LegacyImplementationSubscription): + # return _session_resource_factory(input, output) + +class SubscriptionPublishingRunner(object): + """Handle execution in the gmxapi.operation context as a subscription to the gmxapi.simulation.context.""" + input_resource_factory = LegacyImplementationSubscription -def mdrun(input=None) -> typing.Type['AbstractOperation']: + def __init__(self, resources: SubscriptionSessionResources): + assert isinstance(resources, SubscriptionSessionResources) + # Note that the resources contain a reference to a simulation ensemble that has already run. + self.resources = resources + + def run(self): + """Operation implementation in the gmxapi.operation module context.""" + publisher = self.resources.output + publisher._work_dir = self.resources.workdir + + +_next_uid = 0 + + +def _make_uid(input) -> str: + # TODO: Use input fingerprint for more useful identification. + salt = hash(input) + global _next_uid + new_uid = 'mdrun_{}_{}'.format(_next_uid, salt) + _next_uid += 1 + return new_uid + + +# +# Implementation +# + + +class ResourceManager(gmxapi.operation.ResourceManager): + """Manage resources for the MDRun operation in the gmxapi.operation contexts. + + Extends gmxapi.operation.ResourceManager to tolerate non-standard data payloads. + Futures managed by this resource manager may contain additional attributes. + """ + + def future(self, name: str, description: _op.ResultDescription): + tpr_future = super().future(name=name, description=description) + return tpr_future + + def data(self) -> OutputDataProxy: + return OutputDataProxy(self) + + def update_output(self): + """Override gmxapi.operation.ResourceManager.update_output because we handle paralellism as 0.0.7.""" + # For the moment, this is copy-pasted from gmxapi.operation.ResourceManager, + # but the only part we need to override is the ensemble handling at `for i in range(self.ensemble_width)` + # TODO: Reimplement as the resource factory and director for the operation target context. + if not self.done(): + self.__operation_entrance_counter += 1 + if self.__operation_entrance_counter > 1: + raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.') + if not self.done(): + # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the + # operation's library/implementation context. + + ### + # Note: this is the resource translation from gmxapi.operation context + # to the dispatching runner director. It uses details of the gmxapi.operation.Context + # and of the operation. + + # TODO: Dispatch/discover this resource factory from a canonical place. + assert hasattr(self._runner_director, 'input_resource_factory') + # Create on all ranks. + input = self._runner_director.input_resource_factory(self) + # End of action of the InputResourceDirector[Context, MdRunSubscription]. + ### + + # We are giving the director a resource that contains the subscription + # to the dispatched work. + + publishing_resources = self.publishing_resources() + for member in range(self.ensemble_width): + with publishing_resources(ensemble_member=member) as output: + resources = self._resource_factory(input=input, output=output) + runner = self._runner_director(resources) + runner.run() + + +class StandardInputDescription(_op.InputDescription): + """Provide the ReadTpr input description in gmxapi.operation Contexts.""" + + def signature(self) -> _op.InputCollectionDescription: + return _input + + def make_uid(self, input: _op.DataEdge) -> str: + return _make_uid(input) + + +class RegisteredOperation(_op.OperationImplementation, metaclass=_op.OperationMeta): + """Provide the gmxapi compatible ReadTpr implementation.""" + + # This is a class method to allow the class object to be used in gmxapi.operation._make_registry_key + @classmethod + def name(self) -> str: + """Canonical name for the operation.""" + return 'mdrun' + + @classmethod + def namespace(self) -> str: + """read_tpr is importable from the gmxapi module.""" + return 'gmxapi' + + @classmethod + def director(cls, context: gmxapi.abc.Context) -> _op.OperationDirector: + if isinstance(context, _op.Context): + return StandardDirector(context) + + +class StandardOperationHandle(_op.AbstractOperation): + """Handle used in Python UI or gmxapi.operation Contexts.""" + + def __init__(self, resource_manager: ResourceManager): + self.__resource_manager = resource_manager + + def run(self): + self.__resource_manager.update_output() + + @property + def output(self) -> OutputDataProxy: + return self.__resource_manager.data() + + +class StandardDirector(gmxapi.abc.OperationDirector): + """Direct the instantiation of a read_tpr node in a gmxapi.operation Context. + + .. todo:: Compose this behavior in a more generic class. + + .. todo:: Describe where instances live. + """ + + def __init__(self, context: _op.Context): + if not isinstance(context, _op.Context): + raise gmxapi.exceptions.ValueError('StandardDirector requires a gmxapi.operation Context.') + self.context = context + + def __call__(self, resources: _op.DataSourceCollection, label: str = None) -> StandardOperationHandle: + builder = self.context.node_builder(operation=RegisteredOperation, label=label) + + builder.set_resource_factory(SubscriptionSessionResources) + builder.set_input_description(StandardInputDescription()) + builder.set_handle(StandardOperationHandle) + # The runner in the gmxapi.operation context is the servicer for the legacy context. + builder.set_runner_director(SubscriptionPublishingRunner) + builder.set_output_factory(_output_factory) + builder.set_resource_manager(ResourceManager) + + # Note: we have not yet done any dispatching based on *resources*. We should + # translate the resources provided into the form that the Context can subscribe to + # using the dispatching resource_factory. In the second draft, this operation + # is dispatched to a SimulationModuleContext, which can be subscribed directly + # to sources that are either literal filenames or gmxapi.simulation sources, + # while standard Futures can be resolved in the standard context. + # + assert isinstance(resources, _op.DataSourceCollection) + for name, source in resources.items(): + builder.add_input(name, source) + + handle = builder.build() + assert isinstance(handle, StandardOperationHandle) + return handle + + def handle_type(self, context: gmxapi.abc.Context): + return StandardOperationHandle + + # Developer note: the Director instance is a convenient place to get a dispatching + # factory. The Director may become generic or more universal, but the resource_factory + # would likely not be typed on the generic parameters of the Director class. + # Instead, it is likely a generic function with its own TypeVar parameters. + def resource_factory(self, + source: typing.Union[gmxapi.abc.Context, ModuleObject, None], + target: gmxapi.abc.Context = None): + # Distinguish between the UIContext, in which input is in the form + # of function call arguments, and the StandardContext, implemented in + # gmxapi.operation. UIContext is probably a virtual context that is + # asserted by callers in order to get a factory that normalizes UI input + # for the StandardContext. + # + if target is None: + target = self.context + if source is None: + if isinstance(target, _op.Context): + # Return a factory that can bind to function call arguments to produce a DataSourceCollection. + return _standard_node_resource_factory + if isinstance(source, _op.Context): + return SubscriptionSessionResources + if isinstance(source, ModuleObject): + if isinstance(target, _op.Context): + # We are creating a node in gmxapi.operation.Context from another gmxapi.simulation operation. + # This means that we want to subscribe to the subcontext instead of the gmxapi.operation.Context. + # In the first draft, though, we just access a special payload. + # Return a factory that will consume *_simulation_input* and *parameters* + # members of a received object. + logger.info('Building mdrun operation from source {}'.format(source)) + + def simulation_input_workaround(input): + source = input + if hasattr(source, 'output'): + source = input.output + assert hasattr(source, '_simulation_input') + assert hasattr(source, 'parameters') + logger.info('mdrun receiving input {}: {}'.format(source._simulation_input.name, + source._simulation_input.description)) + source_collection = _input.bind(_simulation_input=source._simulation_input, + parameters=source.parameters) + logger.info('mdrun input bound as source collection {}'.format(source_collection)) + return source_collection + + return simulation_input_workaround + + raise gmxapi.exceptions.ValueError('No dispatching from {} context to {}'.format(source, target)) + + +def mdrun(input, label: str = None, context=None): """MD simulation operation. Arguments: @@ -93,10 +534,10 @@ def mdrun(input=None) -> typing.Type['AbstractOperation']: Returns: runnable operation to perform the specified simulation - The returned object has a `run()` method to launch the simulation. - Otherwise, this operation does not yet support the gmxapi data flow model. + The *output* attribute of the returned operation handle contains dynamically + determined outputs from the operation. - `input` may be a TPR file name. + `input` may be a TPR file name or a an object providing the SimulationInput interface. Note: New function names will be appearing to handle tasks that are separate @@ -106,21 +547,33 @@ def mdrun(input=None) -> typing.Type['AbstractOperation']: "test_particle_insertion," "legacy_simulation" (do_md), or "simulation" composition (which may be leap-frog, vv, and other algorithms) """ - # If input looks like classic filename input, use the gmxapi 0.0.7 - # implementation. - from .context import get_context - try: - work = workflow.from_tpr(input) - assert work is not None - work_context = get_context(work) - assert work_context is not None - except exceptions.UsageError: - # Input was not gmxapi 0.0.7 input. - work = None - work_context = None - - # TODO: Inspect input to determine context. - kwargs = {} - handle = mdrun_dispatcher(context=work_context, input=work, label=None, **kwargs) + handle_context = context + if handle_context is not None: + raise gmxapi.exceptions.NotImplementedError( + 'context must be None. This factory is only for the Python UI right now.') + + target_context = _op.current_context() + assert isinstance(target_context, _op.Context) + # Get a director that will create a node in the standard context. + node_director = _op._get_operation_director(RegisteredOperation, context=target_context) + assert isinstance(node_director, StandardDirector) + # TODO: refine this protocol + assert handle_context is None + # Examine the input. + if isinstance(input, ModuleObject): + # The input is from read_tpr or modify_input. + source_context = input + else: + # Allow automatic dispatching + source_context = None + resource_factory = node_director.resource_factory(source=source_context, target=target_context) + resources = resource_factory(input) + handle = node_director(resources=resources, label=label) + # Note: One effect of the assertions above is to help the type checker infer + # the return type of the handle. It is hard to convince the type checker that + # the return value of the node builder is up-cast. We might be able to resolve + # this by making both get_operation_director and ReadTprImplementation + # generics of the handle type, or using the handle type as the key for + # get_operation_director. return handle diff --git a/python_packaging/src/test/test_mdrun.py b/python_packaging/src/test/test_mdrun.py index 71afc70a3e..9fade56ff5 100644 --- a/python_packaging/src/test/test_mdrun.py +++ b/python_packaging/src/test/test_mdrun.py @@ -49,18 +49,26 @@ import pytest import gmxapi as gmx from gmxapi.testsupport import withmpi_only -gmx.logger.setLevel(logging.WARNING) +# Configure the `logging` module before proceeding any further. +gmx.logger.setLevel(logging.DEBUG) -# Configure the `logging` module before and non-built-in packages start to use it. -logging.getLogger().setLevel(logging.WARNING) -# create console handler -ch = logging.StreamHandler() -ch.setLevel(logging.WARNING) -# create formatter and add it to the handler -formatter = logging.Formatter('%(asctime)s:%(name)s:%(levelname)s: %(message)s') -ch.setFormatter(formatter) -# add the handlers to the logger -logging.getLogger().addHandler(ch) +try: + from mpi4py import MPI + rank_number = MPI.COMM_WORLD.Get_rank() +except ImportError: + rank_tag = '' + MPI = None +else: + rank_tag = 'rank{}:'.format(rank_number) + +# Use this formatter to improve the caplog log records. +formatter = logging.Formatter(rank_tag + '%(name)s:%(levelname)s: %(message)s') + +# For additional console logging, create and attach a stream handler. +# For example: +# ch = logging.StreamHandler() +# ch.setFormatter(formatter) +# logging.getLogger().addHandler(ch) @pytest.mark.usefixtures('cleandir') @@ -69,3 +77,33 @@ def test_run_from_tpr(spc_water_box): md = gmx.mdrun(spc_water_box) md.run() + # TODO: better handling of output on unused MPI ranks. + + +@withmpi_only +@pytest.mark.usefixtures('cleandir') +def test_run_trivial_ensemble(spc_water_box, caplog): + from mpi4py import MPI + current_rank = MPI.COMM_WORLD.Get_rank() + with caplog.at_level(logging.DEBUG): + caplog.handler.setFormatter(formatter) + with caplog.at_level(logging.WARNING, 'gmxapi'), \ + caplog.at_level(logging.DEBUG, 'gmxapi.mdrun'), \ + caplog.at_level(logging.DEBUG, 'gmxapi.modify_input'), \ + caplog.at_level(logging.DEBUG, 'gmxapi.read_tpr'), \ + caplog.at_level(logging.DEBUG, 'gmxapi.simulation'): + + tpr_filename = spc_water_box + ensemble_width = 2 + md = gmx.mdrun([tpr_filename] * ensemble_width) + assert md.output.ensemble_width == ensemble_width + md.run() + + output_directory = md.output._work_dir.result() + logging.info('output_directory result: {}'.format(str(output_directory))) + assert len(output_directory) == 2 + + # Note that the 'cleandir' test fixture will clean up the output directory on + # other ranks, so only check the current rank. + assert output_directory[0] != output_directory[1] + assert os.path.exists(output_directory[current_rank]) -- 2.22.0