Allow gmxapi.mdrun to properly dispatch simulation ensembles.
authorM. Eric Irrgang <ericirrgang@gmail.com>
Mon, 9 Sep 2019 15:08:52 +0000 (18:08 +0300)
committerEric Irrgang <ericirrgang@gmail.com>
Wed, 11 Sep 2019 14:33:44 +0000 (16:33 +0200)
Update the gmxapi.simulation.mdrun implementation to be more normative.
Dispatch the simulation work to the gmxapi 0.0.7 implementation in
gmxapi.simulation.workflow and gmxapi.simulation.context.

Change-Id: Id223507fc064e178992bc87d6fcf89bdd4029523

python_packaging/src/gmxapi/simulation/abc.py [new file with mode: 0644]
python_packaging/src/gmxapi/simulation/mdrun.py
python_packaging/src/test/test_mdrun.py

diff --git a/python_packaging/src/gmxapi/simulation/abc.py b/python_packaging/src/gmxapi/simulation/abc.py
new file mode 100644 (file)
index 0000000..075a62e
--- /dev/null
@@ -0,0 +1,56 @@
+#
+# This file is part of the GROMACS molecular simulation package.
+#
+# Copyright (c) 2019, by the GROMACS development team, led by
+# Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
+# and including many others, as listed in the AUTHORS file in the
+# top-level source directory and at http://www.gromacs.org.
+#
+# GROMACS is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public License
+# as published by the Free Software Foundation; either version 2.1
+# of the License, or (at your option) any later version.
+#
+# GROMACS is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with GROMACS; if not, see
+# http://www.gnu.org/licenses, or write to the Free Software Foundation,
+# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
+#
+# If you want to redistribute modifications to GROMACS, please
+# consider that scientific software is very special. Version
+# control is crucial - bugs must be traceable. We will be happy to
+# consider code for inclusion in the official distribution, but
+# derived work must not be called official GROMACS. Details are found
+# in the README & COPYING files - if they are missing, get the
+# official version at http://www.gromacs.org.
+#
+# To help us fund GROMACS development, we humbly ask that you cite
+# the research papers on the package. Check out http://www.gromacs.org.
+
+"""
+Abstract base classes for gmxapi.simulation module
+==================================================
+
+These classes allow static and dynamic type checking for gmxapi Python
+interfaces. Some may be used as bases to inherit behaviors, but users should
+not assume that a gmxapi object actually inherits from classes in this module.
+
+For more information on the concept of abstract base classes in Python, refer
+to https://docs.python.org/3/library/abc.html
+
+For more on type hinting, see https://docs.python.org/3/library/typing.html
+"""
+
+import abc
+
+
+class ModuleObject(abc.ABC):
+    """Extended interface for objects in the gmaxpi.simulation module.
+
+    Implies availability of additional binding details.
+    """
index 876ab8343043e6e6d338b4504077f8be829d1dc4..370668a1366229484c360ccf0a24c93f50f4a441 100644 (file)
@@ -39,52 +39,493 @@ not use the Python Context resource manager. It uses either the legacy 0.0.7
 Context or its own Context, also implemented in this module.
 """
 
+__all__ = ['mdrun']
+
+import inspect
+import os
 import typing
+from contextlib import contextmanager
 
+import gmxapi
+import gmxapi.abc
+import gmxapi.operation as _op
 from gmxapi import exceptions
-from gmxapi.operation import AbstractOperation
-from gmxapi.operation import function_wrapper
+
 # The following imports are not marked as public API.
-# TODO: Resolve public API and exposure.
+from .abc import ModuleObject
+
+from . import fileio
 from . import workflow
 
+# Initialize module-level logger
+from gmxapi import logger as root_logger
 
-def mdrun_dispatcher(context, *, input, label: str = None, **kwargs) -> typing.Type['AbstractOperation']:
-    """Dispatch to an appropriate director based on the context and input.
+logger = root_logger.getChild('mdrun')
+logger.info('Importing {}'.format(__name__))
 
-    Runs appropriate director code to set up an operation, returning a handle to
-    a simulation operation node.
 
-    Arguments:
-        context: Execution context in which to attempt to instantiate an operation from input
-        input: operation input. Argument type is context dependent.
-        label: human readable tag for the work node to be created.
+class OutputDataProxy(_op.DataProxyBase,
+                      descriptors={
+                          '_work_dir': _op.OutputDataDescriptor('_work_dir', str)}):
+    """Implement the 'output' attribute of MDRun operations."""
 
-    Additional key word arguments are passed on to the dispatched factory.
 
-    """
-    # TODO: This is a legacy import that should be updated.
-    from .context import get_context
-    if label is not None:
-        raise exceptions.NotImplementedError('sorry... no labels yet')
-    try:
-        legacy_context = get_context(work=input)
-    except Exception:
-        legacy_context = None
-
-    def run_session():
-        with legacy_context as session:
-            session.run()
-        return True
-
-    if context is not None and context is legacy_context:
-        helper = function_wrapper(output={'data': bool})(run_session)
-        return helper(**kwargs)
+class Publisher(gmxapi.operation.Publisher):
+    """Implement the publishing data descriptor for MDRun output."""
+
+    def __set__(self, instance: 'PublishingDataProxy', value):
+        super().__set__(instance, value)
+
+
+class PublishingDataProxy(_op.DataProxyBase,
+                          descriptors={
+                              '_work_dir': Publisher('_work_dir', str)}
+                          ):
+    """Manage output resource updates for MDRun operation."""
+
+
+#
+# Helpers
+#
+
+# Output in the gmxapi.operation Context.
+_output = _op.OutputCollectionDescription(_work_dir=str)
+
+# Input in the gmxapi.operation Context for the dispatching runner.
+# The default empty dictionary for parameters just means that there are no overrides
+# to the parameters already provided in _simulation_input.
+_input = _op.InputCollectionDescription(
+    [('_simulation_input', inspect.Parameter('_simulation_input',
+                                             inspect.Parameter.POSITIONAL_OR_KEYWORD,
+                                             annotation=str)),
+     ('parameters', inspect.Parameter('parameters',
+                                      inspect.Parameter.POSITIONAL_OR_KEYWORD,
+                                      annotation=dict,
+                                      default=dict()))
+     ])
+
+
+def _standard_node_resource_factory(*args, **kwargs):
+    """Translate Python UI input to the gmxapi.operation node builder inputs."""
+    source_collection = _input.bind(*args, **kwargs)
+    logger.info('mdrun input bound as source collection {}'.format(source_collection))
+    return source_collection
+
+
+_output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
+                                    output_description=_output,
+                                    publishing_data_proxy=PublishingDataProxy)
+
+
+@contextmanager
+def scoped_communicator(original_comm, requested_size: int = None):
+    from gmxapi.simulation.context import _acquire_communicator, _get_ensemble_communicator
+
+    if requested_size is None:
+        communicator = _acquire_communicator(communicator=original_comm)
+
     else:
-        raise exceptions.ValueError('Could not dispatch MD input {} with context {}'.format(input, legacy_context))
+        if original_comm is None or not hasattr(original_comm, 'Get_size'):
+            raise exceptions.UsageError('A valid communicator must be provided when requesting a specific size.')
+        original_comm_size = original_comm.Get_size()
+        if original_comm_size < requested_size:
+            raise exceptions.FeatureNotAvailableError(
+                'Cannot produce a subcommunicator of size {} from a communicator of size {}.'.format(
+                    requested_size,
+                    original_comm_size
+                ))
+        assert original_comm_size >= requested_size
+        communicator = _get_ensemble_communicator(original_comm, requested_size)
+
+    try:
+        yield communicator
+    finally:
+        communicator.Free()
+
+
+class LegacyImplementationSubscription(object):
+    """Input type representing a subscription to 0.0.7 implementation in gmxapi.operation.
+
+    This input resource is a subscription to work that is dispatched to a sub-context.
+    The resource can be created from the standard data of the simulation module.
+    """
+
+    def __init__(self, resource_manager: _op.ResourceManager):
+        from .context import Context as LegacyContext
+        import gmxapi._gmxapi as _gmxapi
+        self._gmxapi = _gmxapi
+
+        assert isinstance(resource_manager, _op.ResourceManager)
+        # We use several details of the gmxapi.operation.Context.ResourceManager.
+        # These dependencies can evolve into the subscription protocol between Contexts.
+
+        # Configure and run a gmxapi 0.0.7 session.
+        # 0. Determine ensemble width.
+        # 1. Choose, create/check working directories.
+        # 2. Create source TPR.
+        # 3. Create workspec.
+        # 3.5 Add plugin potentials, if any.
+        # 4. Run.
+        # 5. Collect outputs from context (including plugin outputs) and be ready to publish them.
+
+        # Determine ensemble width
+        ensemble_width = resource_manager.ensemble_width
+
+        # Choose working directories
+        # TODO: operation working directory naming scheme should be centrally well-defined.
+        # Note that workflow.WorkSpec.uid is currently dependent on the input file parameter,
+        # so we cannot create the input file path in the working directory based on WorkSpec.uid.
+        workdir_list = ['{node}_{member}'.format(node=resource_manager.operation_id,
+                                                 member=member)
+                        for member in range(ensemble_width)]
+
+        # This is a reasonable place to start using MPI ensemble implementation details.
+        # We will want better abstraction in the future, but it is best if related filesystem
+        # accesses occur from the same processes, consistently. Note that we already
+        # handle some optimization and dependency reduction when the ensemble size is 1.
+        # TODO: multithread and multiprocess alternatives to MPI ensemble management.
+
+        # TODO: Allow user to provide communicator instead of implicitly getting COMM_WORLD
+        with scoped_communicator(None) as context_comm:
+            context_rank = context_comm.Get_rank()
+            with scoped_communicator(context_comm, ensemble_width) as ensemble_comm:
+                # Note that in the current implementation, extra ranks have nothing to do,
+                # but they may have a dummy communicator, so be sure to skip those members
+                # of the context_comm.
+                if context_rank < ensemble_width:
+                    assert ensemble_comm.Get_size() == ensemble_width
+                    ensemble_rank = ensemble_comm.Get_rank()
+                    # TODO: This should be a richer object that includes at least host information
+                    #  and preferably the full gmxapi Future interface.
+                    self.workdir = os.path.abspath(workdir_list[ensemble_rank])
+
+                    with resource_manager.local_input(member=ensemble_rank) as input_pack:
+                        source_file = input_pack.kwargs['_simulation_input']
+                        parameters = input_pack.kwargs['parameters']
+                        # If there are any other key word arguments to process from the gmxapi.mdrun
+                        # factory call, do it here.
+
+                    # TODO: We should really name this file with a useful input-dependent tag.
+                    tprfile = os.path.join(self.workdir, 'topol.tpr')
+
+                    expected_working_files = [tprfile]
+
+                    if os.path.exists(self.workdir):
+                        if os.path.isdir(self.workdir):
+                            # Confirm that this is a restarted simulation.
+                            # It is unspecified by the API, but at least through gmxapi 0.1,
+                            # all simulations are initialized with a checkpoint file named state.cpt
+                            # (see src/api/cpp/context.cpp)
+                            checkpoint_file = os.path.join(self.workdir, 'state.cpp')
+                            expected_working_files.append(checkpoint_file)
+
+                            for file in expected_working_files:
+                                if not os.path.exists(file):
+                                    raise exceptions.ApiError(
+                                        'Cannot determine working directory state: {}'.format(self.workdir))
+                        else:
+                            raise exceptions.ApiError(
+                                'Chosen working directory path exists but is not a directory: {}'.format(self.workdir))
+                    else:
+                        # Build the working directory and input files.
+                        os.mkdir(self.workdir)
+                        sim_input = fileio.read_tpr(source_file)
+                        for key, value in parameters.items():
+                            try:
+                                sim_input.parameters.set(key=key, value=value)
+                            except _gmxapi.Exception as e:
+                                raise exceptions.ApiError(
+                                    'Bug encountered. Unknown error when trying to set simulation '
+                                    'parameter {} to {}'.format(key, value)
+                                ) from e
+
+                        fileio.write_tpr_file(output=tprfile, input=sim_input)
+                    logger.info('Created {} on rank {}'.format(tprfile, context_rank))
+
+                    # Gather the actual working directories from the ensemble members.
+                    if hasattr(ensemble_comm, 'allgather'):
+                        # We should not assume that abspath expands the same on different MPI ranks.
+                        workdir_list = ensemble_comm.allgather(self.workdir)
+                        tpr_filenames = ensemble_comm.allgather(tprfile)
+                    else:
+                        workdir_list = [os.path.abspath(workdir) for workdir in workdir_list]
+                        # TODO: If we use better input file names, they need to be updated in multiple places.
+                        tpr_filenames = [os.path.join(workdir, 'topol.tpr') for workdir in workdir_list]
+
+                    logger.debug('Context rank {} acknowledges working directories {}'.format(context_rank,
+                                                                                             workdir_list))
+                    logger.debug('Operation {}:{} will use {}'.format(resource_manager.operation_id,
+                                                                      ensemble_rank,
+                                                                      self.workdir
+                                                                      ))
+                    work = workflow.from_tpr(tpr_filenames)
+                    self.workspec = work.workspec
+                    context = LegacyContext(work=self.workspec, workdir_list=workdir_list, communicator=ensemble_comm)
+                    self.simulation_module_context = context
+                    # Go ahead and execute immediately. No need for lazy initialization in this basic case.
+                    with self.simulation_module_context as session:
+                        session.run()
+                        # TODO: There may be some additional results that we need to extract...
+                    # end: if context_rank < ensemble_width
+
+                # end scoped_communicator: ensemble_comm
+
+            if context_comm.Get_size() > 1:
+                context_comm.bcast(workdir_list, root=0)
+            # end scoped_communicator: context_comm
+
+        self.workdir = workdir_list
+
+
+class SubscriptionSessionResources(object):
+    """Input and output run-time resources for a MDRun subscription.
+
+    A generalization of this class is the probably the main hook for customizing the resources
+    provided to the operation at run time.
+
+    .. todo:: Better factoring of SessionResources, ResourceFactory, Director.resource_factory.
+    """
+
+    def __init__(self, input: LegacyImplementationSubscription, output: PublishingDataProxy):
+        assert isinstance(input, LegacyImplementationSubscription)
+        assert isinstance(output, PublishingDataProxy)
+        self.output = output
+        member_id = self.output._client_identifier
+        # Before enabling the following, be sure we understand what is happening.
+        # if member_id is None:
+        #     member_id = 0
+        self.workdir = input.workdir[member_id]
+
+    # @staticmethod
+    # def create(input: LegacyImplementationSubscription,
+    #  output: PublishingDataProxy) -> 'SubscriptionSessionResources':
+    #     """Factory function to create new resource instances.
+    #
+    #     It is better to overload a factory member than to write a fancy __init__.
+    #     An alternative to overloading is to make this a classmethod for polymorphic
+    #     creation. Note that Python classes (can) know what their subclasses are.
+    #     """
+    #     if isinstance(input, LegacyImplementationSubscription):
+    #         return _session_resource_factory(input, output)
+
 
+class SubscriptionPublishingRunner(object):
+    """Handle execution in the gmxapi.operation context as a subscription to the gmxapi.simulation.context."""
+    input_resource_factory = LegacyImplementationSubscription
 
-def mdrun(input=None) -> typing.Type['AbstractOperation']:
+    def __init__(self, resources: SubscriptionSessionResources):
+        assert isinstance(resources, SubscriptionSessionResources)
+        # Note that the resources contain a reference to a simulation ensemble that has already run.
+        self.resources = resources
+
+    def run(self):
+        """Operation implementation in the gmxapi.operation module context."""
+        publisher = self.resources.output
+        publisher._work_dir = self.resources.workdir
+
+
+_next_uid = 0
+
+
+def _make_uid(input) -> str:
+    # TODO: Use input fingerprint for more useful identification.
+    salt = hash(input)
+    global _next_uid
+    new_uid = 'mdrun_{}_{}'.format(_next_uid, salt)
+    _next_uid += 1
+    return new_uid
+
+
+#
+# Implementation
+#
+
+
+class ResourceManager(gmxapi.operation.ResourceManager):
+    """Manage resources for the MDRun operation in the gmxapi.operation contexts.
+
+    Extends gmxapi.operation.ResourceManager to tolerate non-standard data payloads.
+    Futures managed by this resource manager may contain additional attributes.
+    """
+
+    def future(self, name: str, description: _op.ResultDescription):
+        tpr_future = super().future(name=name, description=description)
+        return tpr_future
+
+    def data(self) -> OutputDataProxy:
+        return OutputDataProxy(self)
+
+    def update_output(self):
+        """Override gmxapi.operation.ResourceManager.update_output because we handle paralellism as 0.0.7."""
+        # For the moment, this is copy-pasted from gmxapi.operation.ResourceManager,
+        # but the only part we need to override is the ensemble handling at `for i in range(self.ensemble_width)`
+        # TODO: Reimplement as the resource factory and director for the operation target context.
+        if not self.done():
+            self.__operation_entrance_counter += 1
+            if self.__operation_entrance_counter > 1:
+                raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
+            if not self.done():
+                # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
+                #  operation's library/implementation context.
+
+                ###
+                # Note: this is the resource translation from gmxapi.operation context
+                # to the dispatching runner director. It uses details of the gmxapi.operation.Context
+                # and of the operation.
+
+                # TODO: Dispatch/discover this resource factory from a canonical place.
+                assert hasattr(self._runner_director, 'input_resource_factory')
+                # Create on all ranks.
+                input = self._runner_director.input_resource_factory(self)
+                # End of action of the InputResourceDirector[Context, MdRunSubscription].
+                ###
+
+                # We are giving the director a resource that contains the subscription
+                # to the dispatched work.
+
+                publishing_resources = self.publishing_resources()
+                for member in range(self.ensemble_width):
+                    with publishing_resources(ensemble_member=member) as output:
+                        resources = self._resource_factory(input=input, output=output)
+                        runner = self._runner_director(resources)
+                        runner.run()
+
+
+class StandardInputDescription(_op.InputDescription):
+    """Provide the ReadTpr input description in gmxapi.operation Contexts."""
+
+    def signature(self) -> _op.InputCollectionDescription:
+        return _input
+
+    def make_uid(self, input: _op.DataEdge) -> str:
+        return _make_uid(input)
+
+
+class RegisteredOperation(_op.OperationImplementation, metaclass=_op.OperationMeta):
+    """Provide the gmxapi compatible ReadTpr implementation."""
+
+    # This is a class method to allow the class object to be used in gmxapi.operation._make_registry_key
+    @classmethod
+    def name(self) -> str:
+        """Canonical name for the operation."""
+        return 'mdrun'
+
+    @classmethod
+    def namespace(self) -> str:
+        """read_tpr is importable from the gmxapi module."""
+        return 'gmxapi'
+
+    @classmethod
+    def director(cls, context: gmxapi.abc.Context) -> _op.OperationDirector:
+        if isinstance(context, _op.Context):
+            return StandardDirector(context)
+
+
+class StandardOperationHandle(_op.AbstractOperation):
+    """Handle used in Python UI or gmxapi.operation Contexts."""
+
+    def __init__(self, resource_manager: ResourceManager):
+        self.__resource_manager = resource_manager
+
+    def run(self):
+        self.__resource_manager.update_output()
+
+    @property
+    def output(self) -> OutputDataProxy:
+        return self.__resource_manager.data()
+
+
+class StandardDirector(gmxapi.abc.OperationDirector):
+    """Direct the instantiation of a read_tpr node in a gmxapi.operation Context.
+
+    .. todo:: Compose this behavior in a more generic class.
+
+    .. todo:: Describe where instances live.
+    """
+
+    def __init__(self, context: _op.Context):
+        if not isinstance(context, _op.Context):
+            raise gmxapi.exceptions.ValueError('StandardDirector requires a gmxapi.operation Context.')
+        self.context = context
+
+    def __call__(self, resources: _op.DataSourceCollection, label: str = None) -> StandardOperationHandle:
+        builder = self.context.node_builder(operation=RegisteredOperation, label=label)
+
+        builder.set_resource_factory(SubscriptionSessionResources)
+        builder.set_input_description(StandardInputDescription())
+        builder.set_handle(StandardOperationHandle)
+        # The runner in the gmxapi.operation context is the servicer for the legacy context.
+        builder.set_runner_director(SubscriptionPublishingRunner)
+        builder.set_output_factory(_output_factory)
+        builder.set_resource_manager(ResourceManager)
+
+        # Note: we have not yet done any dispatching based on *resources*. We should
+        # translate the resources provided into the form that the Context can subscribe to
+        # using the dispatching resource_factory. In the second draft, this operation
+        # is dispatched to a SimulationModuleContext, which can be subscribed directly
+        # to sources that are either literal filenames or gmxapi.simulation sources,
+        # while standard Futures can be resolved in the standard context.
+        #
+        assert isinstance(resources, _op.DataSourceCollection)
+        for name, source in resources.items():
+            builder.add_input(name, source)
+
+        handle = builder.build()
+        assert isinstance(handle, StandardOperationHandle)
+        return handle
+
+    def handle_type(self, context: gmxapi.abc.Context):
+        return StandardOperationHandle
+
+    # Developer note: the Director instance is a convenient place to get a dispatching
+    # factory. The Director may become generic or more universal, but the resource_factory
+    # would likely not be typed on the generic parameters of the Director class.
+    # Instead, it is likely a generic function with its own TypeVar parameters.
+    def resource_factory(self,
+                         source: typing.Union[gmxapi.abc.Context, ModuleObject, None],
+                         target: gmxapi.abc.Context = None):
+        # Distinguish between the UIContext, in which input is in the form
+        # of function call arguments, and the StandardContext, implemented in
+        # gmxapi.operation. UIContext is probably a virtual context that is
+        # asserted by callers in order to get a factory that normalizes UI input
+        # for the StandardContext.
+        #
+        if target is None:
+            target = self.context
+        if source is None:
+            if isinstance(target, _op.Context):
+                # Return a factory that can bind to function call arguments to produce a DataSourceCollection.
+                return _standard_node_resource_factory
+        if isinstance(source, _op.Context):
+            return SubscriptionSessionResources
+        if isinstance(source, ModuleObject):
+            if isinstance(target, _op.Context):
+                # We are creating a node in gmxapi.operation.Context from another gmxapi.simulation operation.
+                # This means that we want to subscribe to the subcontext instead of the gmxapi.operation.Context.
+                # In the first draft, though, we just access a special payload.
+                # Return a factory that will consume *_simulation_input* and *parameters*
+                # members of a received object.
+                logger.info('Building mdrun operation from source {}'.format(source))
+
+                def simulation_input_workaround(input):
+                    source = input
+                    if hasattr(source, 'output'):
+                        source = input.output
+                    assert hasattr(source, '_simulation_input')
+                    assert hasattr(source, 'parameters')
+                    logger.info('mdrun receiving input {}: {}'.format(source._simulation_input.name,
+                                                                      source._simulation_input.description))
+                    source_collection = _input.bind(_simulation_input=source._simulation_input,
+                                                    parameters=source.parameters)
+                    logger.info('mdrun input bound as source collection {}'.format(source_collection))
+                    return source_collection
+
+                return simulation_input_workaround
+
+        raise gmxapi.exceptions.ValueError('No dispatching from {} context to {}'.format(source, target))
+
+
+def mdrun(input, label: str = None, context=None):
     """MD simulation operation.
 
     Arguments:
@@ -93,10 +534,10 @@ def mdrun(input=None) -> typing.Type['AbstractOperation']:
     Returns:
         runnable operation to perform the specified simulation
 
-    The returned object has a `run()` method to launch the simulation.
-    Otherwise, this operation does not yet support the gmxapi data flow model.
+    The *output* attribute of the returned operation handle contains dynamically
+    determined outputs from the operation.
 
-    `input` may be a TPR file name.
+    `input` may be a TPR file name or a an object providing the SimulationInput interface.
 
     Note:
         New function names will be appearing to handle tasks that are separate
@@ -106,21 +547,33 @@ def mdrun(input=None) -> typing.Type['AbstractOperation']:
         "test_particle_insertion," "legacy_simulation" (do_md), or "simulation"
         composition (which may be leap-frog, vv, and other algorithms)
     """
-    # If input looks like classic filename input, use the gmxapi 0.0.7
-    # implementation.
-    from .context import get_context
-    try:
-        work = workflow.from_tpr(input)
-        assert work is not None
-        work_context = get_context(work)
-        assert work_context is not None
-    except exceptions.UsageError:
-        # Input was not gmxapi 0.0.7 input.
-        work = None
-        work_context = None
-
-    # TODO: Inspect input to determine context.
-    kwargs = {}
-    handle = mdrun_dispatcher(context=work_context, input=work, label=None, **kwargs)
+    handle_context = context
+    if handle_context is not None:
+        raise gmxapi.exceptions.NotImplementedError(
+            'context must be None. This factory is only for the Python UI right now.')
+
+    target_context = _op.current_context()
+    assert isinstance(target_context, _op.Context)
+    # Get a director that will create a node in the standard context.
+    node_director = _op._get_operation_director(RegisteredOperation, context=target_context)
+    assert isinstance(node_director, StandardDirector)
+    # TODO: refine this protocol
+    assert handle_context is None
+    # Examine the input.
+    if isinstance(input, ModuleObject):
+        # The input is from read_tpr or modify_input.
+        source_context = input
+    else:
+        # Allow automatic dispatching
+        source_context = None
 
+    resource_factory = node_director.resource_factory(source=source_context, target=target_context)
+    resources = resource_factory(input)
+    handle = node_director(resources=resources, label=label)
+    # Note: One effect of the assertions above is to help the type checker infer
+    # the return type of the handle. It is hard to convince the type checker that
+    # the return value of the node builder is up-cast. We might be able to resolve
+    # this by making both get_operation_director and ReadTprImplementation
+    # generics of the handle type, or using the handle type as the key for
+    # get_operation_director.
     return handle
index 71afc70a3e226ce4c8a9a085c5b68bb5ac1e60b0..9fade56ff5a92cb21741d266c3645936972a37a7 100644 (file)
@@ -49,18 +49,26 @@ import pytest
 import gmxapi as gmx
 from gmxapi.testsupport import withmpi_only
 
-gmx.logger.setLevel(logging.WARNING)
+# Configure the `logging` module before proceeding any further.
+gmx.logger.setLevel(logging.DEBUG)
 
-# Configure the `logging` module before and non-built-in packages start to use it.
-logging.getLogger().setLevel(logging.WARNING)
-# create console handler
-ch = logging.StreamHandler()
-ch.setLevel(logging.WARNING)
-# create formatter and add it to the handler
-formatter = logging.Formatter('%(asctime)s:%(name)s:%(levelname)s: %(message)s')
-ch.setFormatter(formatter)
-# add the handlers to the logger
-logging.getLogger().addHandler(ch)
+try:
+    from mpi4py import MPI
+    rank_number = MPI.COMM_WORLD.Get_rank()
+except ImportError:
+    rank_tag = ''
+    MPI = None
+else:
+    rank_tag = 'rank{}:'.format(rank_number)
+
+# Use this formatter to improve the caplog log records.
+formatter = logging.Formatter(rank_tag + '%(name)s:%(levelname)s: %(message)s')
+
+# For additional console logging, create and attach a stream handler.
+# For example:
+#    ch = logging.StreamHandler()
+#    ch.setFormatter(formatter)
+#    logging.getLogger().addHandler(ch)
 
 
 @pytest.mark.usefixtures('cleandir')
@@ -69,3 +77,33 @@ def test_run_from_tpr(spc_water_box):
 
     md = gmx.mdrun(spc_water_box)
     md.run()
+    # TODO: better handling of output on unused MPI ranks.
+
+
+@withmpi_only
+@pytest.mark.usefixtures('cleandir')
+def test_run_trivial_ensemble(spc_water_box, caplog):
+    from mpi4py import MPI
+    current_rank = MPI.COMM_WORLD.Get_rank()
+    with caplog.at_level(logging.DEBUG):
+        caplog.handler.setFormatter(formatter)
+        with caplog.at_level(logging.WARNING, 'gmxapi'), \
+                caplog.at_level(logging.DEBUG, 'gmxapi.mdrun'), \
+                caplog.at_level(logging.DEBUG, 'gmxapi.modify_input'), \
+                caplog.at_level(logging.DEBUG, 'gmxapi.read_tpr'), \
+                caplog.at_level(logging.DEBUG, 'gmxapi.simulation'):
+
+            tpr_filename = spc_water_box
+            ensemble_width = 2
+            md = gmx.mdrun([tpr_filename] * ensemble_width)
+            assert md.output.ensemble_width == ensemble_width
+            md.run()
+
+            output_directory = md.output._work_dir.result()
+            logging.info('output_directory result: {}'.format(str(output_directory)))
+            assert len(output_directory) == 2
+
+            # Note that the 'cleandir' test fixture will clean up the output directory on
+            # other ranks, so only check the current rank.
+            assert output_directory[0] != output_directory[1]
+            assert os.path.exists(output_directory[current_rank])