#
# This file is part of the GROMACS molecular simulation package.
#
-# Copyright (c) 2019, by the GROMACS development team, led by
+# Copyright (c) 2019,2020,2021, by the GROMACS development team, led by
# Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
# and including many others, as listed in the AUTHORS file in the
# top-level source directory and at http://www.gromacs.org.
logger.info('Importing {}'.format(__name__))
-class OutputDataProxy(_op.DataProxyBase,
- descriptors={
- '_work_dir': _op.OutputDataDescriptor('_work_dir', str)}):
- """Implement the 'output' attribute of MDRun operations."""
-
+# Output in the gmxapi.operation Context.
+# TODO: Consider using a single base class for the DataProxy, but have distinct
+# data descriptor behavior (or different descriptor implementations in different
+# subclasses) so that static code inspection can more easily determine the
+# attributes of the data proxies.
+_output_descriptors = (
+ _op.OutputDataDescriptor('_work_dir', str),
+ _op.OutputDataDescriptor('trajectory', str),
+ _op.OutputDataDescriptor('parameters', dict)
+)
+_publishing_descriptors = {desc._name: gmxapi.operation.Publisher(desc._name, desc._dtype) for desc in
+ _output_descriptors}
+_output = _op.OutputCollectionDescription(**{descriptor._name: descriptor._dtype for descriptor in
+ _output_descriptors})
-class Publisher(gmxapi.operation.Publisher):
- """Implement the publishing data descriptor for MDRun output."""
- def __set__(self, instance: 'PublishingDataProxy', value):
- super().__set__(instance, value)
+class OutputDataProxy(_op.DataProxyBase,
+ descriptors=_output_descriptors):
+ """Implement the 'output' attribute of MDRun operations."""
class PublishingDataProxy(_op.DataProxyBase,
- descriptors={
- '_work_dir': Publisher('_work_dir', str)}
- ):
+ descriptors=_publishing_descriptors):
"""Manage output resource updates for MDRun operation."""
-#
-# Helpers
-#
+_output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
+ output_description=_output,
+ publishing_data_proxy=PublishingDataProxy)
-# Output in the gmxapi.operation Context.
-_output = _op.OutputCollectionDescription(_work_dir=str)
# Input in the gmxapi.operation Context for the dispatching runner.
# The default empty dictionary for parameters just means that there are no overrides
return source_collection
-_output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
- output_description=_output,
- publishing_data_proxy=PublishingDataProxy)
-
-
@contextmanager
def scoped_communicator(original_comm, requested_size: int = None):
from gmxapi.simulation.context import _acquire_communicator, _get_ensemble_communicator
workdir_list = ['{node}_{member}'.format(node=resource_manager.operation_id,
member=member)
for member in range(ensemble_width)]
+ parameters_dict_list = [{}] * ensemble_width
# This is a reasonable place to start using MPI ensemble implementation details.
# We will want better abstraction in the future, but it is best if related filesystem
# It is unspecified by the API, but at least through gmxapi 0.1,
# all simulations are initialized with a checkpoint file named state.cpt
# (see src/api/cpp/context.cpp)
- checkpoint_file = os.path.join(self.workdir, 'state.cpp')
+ checkpoint_file = os.path.join(self.workdir, 'state.cpt')
expected_working_files.append(checkpoint_file)
for file in expected_working_files:
fileio.write_tpr_file(output=tprfile, input=sim_input)
logger.info('Created {} on rank {}'.format(tprfile, context_rank))
- # Gather the actual working directories from the ensemble members.
+ # Gather the actual outputs from the ensemble members.
if hasattr(ensemble_comm, 'allgather'):
# We should not assume that abspath expands the same on different MPI ranks.
workdir_list = ensemble_comm.allgather(self.workdir)
tpr_filenames = ensemble_comm.allgather(tprfile)
+ parameters = fileio.read_tpr(tprfile).parameters.extract()
+ parameters_dict_list = ensemble_comm.allgather(parameters)
else:
workdir_list = [os.path.abspath(workdir) for workdir in workdir_list]
# TODO: If we use better input file names, they need to be updated in multiple places.
tpr_filenames = [os.path.join(workdir, 'topol.tpr') for workdir in workdir_list]
+ parameters_dict_list = [fileio.read_tpr(tprfile).parameters.extract() for tprfile in tpr_filenames]
logger.debug('Context rank {} acknowledges working directories {}'.format(context_rank,
workdir_list))
ensemble_rank,
self.workdir
))
- work = workflow.from_tpr(tpr_filenames)
+ # TODO: (#3718) Normalize the way we pass run time parameters to mdrun.
+ kwargs = getattr(resource_manager, 'mdrun_kwargs', {})
+ for key, value in kwargs.items():
+ logger.debug('Adding mdrun run time argument: {}'.format(key + '=' + str(value)))
+ work = workflow.from_tpr(tpr_filenames, **kwargs)
self.workspec = work.workspec
context = LegacyContext(work=self.workspec, workdir_list=workdir_list, communicator=ensemble_comm)
self.simulation_module_context = context
# end scoped_communicator: context_comm
self.workdir = workdir_list
+ self.parameters = parameters_dict_list
class SubscriptionSessionResources(object):
# if member_id is None:
# member_id = 0
self.workdir = input.workdir[member_id]
-
- # @staticmethod
- # def create(input: LegacyImplementationSubscription,
- # output: PublishingDataProxy) -> 'SubscriptionSessionResources':
- # """Factory function to create new resource instances.
- #
- # It is better to overload a factory member than to write a fancy __init__.
- # An alternative to overloading is to make this a classmethod for polymorphic
- # creation. Note that Python classes (can) know what their subclasses are.
- # """
- # if isinstance(input, LegacyImplementationSubscription):
- # return _session_resource_factory(input, output)
+ self.parameters = input.parameters[member_id]
class SubscriptionPublishingRunner(object):
"""Operation implementation in the gmxapi.operation module context."""
publisher = self.resources.output
publisher._work_dir = self.resources.workdir
+ publisher.parameters = self.resources.parameters
+ # TODO: Make the return value a trajectory handle rather than a file path.
+ # TODO: Decide how to handle append vs. noappend output.
+ # TODO: More rigorous handling of the trajectory file(s)
+ # We have no way to query the name of the trajectory produced, and we
+ # have avoided exposing the ability to specify it, so we have to assume
+ # GROMACS default behavior.
+ publisher.trajectory = os.path.join(self.resources.workdir, 'traj.trr')
_next_uid = 0