logger.info('Importing {}'.format(__name__))
-class OutputDataProxy(_op.DataProxyBase,
- descriptors={
- '_work_dir': _op.OutputDataDescriptor('_work_dir', str)}):
- """Implement the 'output' attribute of MDRun operations."""
-
+# Output in the gmxapi.operation Context.
+# TODO: Consider using a single base class for the DataProxy, but have distinct
+# data descriptor behavior (or different descriptor implementations in different
+# subclasses) so that static code inspection can more easily determine the
+# attributes of the data proxies.
+_output_descriptors = (
+ _op.OutputDataDescriptor('_work_dir', str),
+)
+_publishing_descriptors = {desc._name: gmxapi.operation.Publisher(desc._name, desc._dtype) for desc in
+ _output_descriptors}
+_output = _op.OutputCollectionDescription(**{descriptor._name: descriptor._dtype for descriptor in
+ _output_descriptors})
-class Publisher(gmxapi.operation.Publisher):
- """Implement the publishing data descriptor for MDRun output."""
- def __set__(self, instance: 'PublishingDataProxy', value):
- super().__set__(instance, value)
+class OutputDataProxy(_op.DataProxyBase,
+ descriptors=_output_descriptors):
+ """Implement the 'output' attribute of MDRun operations."""
class PublishingDataProxy(_op.DataProxyBase,
- descriptors={
- '_work_dir': Publisher('_work_dir', str)}
- ):
+ descriptors=_publishing_descriptors):
"""Manage output resource updates for MDRun operation."""
-#
-# Helpers
-#
+_output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
+ output_description=_output,
+ publishing_data_proxy=PublishingDataProxy)
-# Output in the gmxapi.operation Context.
-_output = _op.OutputCollectionDescription(_work_dir=str)
# Input in the gmxapi.operation Context for the dispatching runner.
# The default empty dictionary for parameters just means that there are no overrides
return source_collection
-_output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
- output_description=_output,
- publishing_data_proxy=PublishingDataProxy)
-
-
@contextmanager
def scoped_communicator(original_comm, requested_size: int = None):
from gmxapi.simulation.context import _acquire_communicator, _get_ensemble_communicator
ensemble_rank,
self.workdir
))
+ # TODO: We have not exposed the ability to pass any run time parameters to mdrun.
work = workflow.from_tpr(tpr_filenames)
self.workspec = work.workspec
context = LegacyContext(work=self.workspec, workdir_list=workdir_list, communicator=ensemble_comm)
# member_id = 0
self.workdir = input.workdir[member_id]
- # @staticmethod
- # def create(input: LegacyImplementationSubscription,
- # output: PublishingDataProxy) -> 'SubscriptionSessionResources':
- # """Factory function to create new resource instances.
- #
- # It is better to overload a factory member than to write a fancy __init__.
- # An alternative to overloading is to make this a classmethod for polymorphic
- # creation. Note that Python classes (can) know what their subclasses are.
- # """
- # if isinstance(input, LegacyImplementationSubscription):
- # return _session_resource_factory(input, output)
-
class SubscriptionPublishingRunner(object):
"""Handle execution in the gmxapi.operation context as a subscription to the gmxapi.simulation.context."""