Fix typo.
[alexxy/gromacs.git] / python_packaging / src / gmxapi / simulation / mdrun.py
index 370668a1366229484c360ccf0a24c93f50f4a441..dae0c1effe3073ffb2a40dbaf6b611cd54d6d8e6 100644 (file)
@@ -1,7 +1,7 @@
 #
 # This file is part of the GROMACS molecular simulation package.
 #
-# Copyright (c) 2019, by the GROMACS development team, led by
+# Copyright (c) 2019,2020,2021, by the GROMACS development team, led by
 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
 # and including many others, as listed in the AUTHORS file in the
 # top-level source directory and at http://www.gromacs.org.
@@ -64,32 +64,36 @@ logger = root_logger.getChild('mdrun')
 logger.info('Importing {}'.format(__name__))
 
 
-class OutputDataProxy(_op.DataProxyBase,
-                      descriptors={
-                          '_work_dir': _op.OutputDataDescriptor('_work_dir', str)}):
-    """Implement the 'output' attribute of MDRun operations."""
-
+# Output in the gmxapi.operation Context.
+# TODO: Consider using a single base class for the DataProxy, but have distinct
+#  data descriptor behavior (or different descriptor implementations in different
+#  subclasses) so that static code inspection can more easily determine the
+#  attributes of the data proxies.
+_output_descriptors = (
+    _op.OutputDataDescriptor('_work_dir', str),
+    _op.OutputDataDescriptor('trajectory', str),
+    _op.OutputDataDescriptor('parameters', dict)
+)
+_publishing_descriptors = {desc._name: gmxapi.operation.Publisher(desc._name, desc._dtype) for desc in
+                           _output_descriptors}
+_output = _op.OutputCollectionDescription(**{descriptor._name: descriptor._dtype for descriptor in
+                                             _output_descriptors})
 
-class Publisher(gmxapi.operation.Publisher):
-    """Implement the publishing data descriptor for MDRun output."""
 
-    def __set__(self, instance: 'PublishingDataProxy', value):
-        super().__set__(instance, value)
+class OutputDataProxy(_op.DataProxyBase,
+                      descriptors=_output_descriptors):
+    """Implement the 'output' attribute of MDRun operations."""
 
 
 class PublishingDataProxy(_op.DataProxyBase,
-                          descriptors={
-                              '_work_dir': Publisher('_work_dir', str)}
-                          ):
+                          descriptors=_publishing_descriptors):
     """Manage output resource updates for MDRun operation."""
 
 
-#
-# Helpers
-#
+_output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
+                                    output_description=_output,
+                                    publishing_data_proxy=PublishingDataProxy)
 
-# Output in the gmxapi.operation Context.
-_output = _op.OutputCollectionDescription(_work_dir=str)
 
 # Input in the gmxapi.operation Context for the dispatching runner.
 # The default empty dictionary for parameters just means that there are no overrides
@@ -112,11 +116,6 @@ def _standard_node_resource_factory(*args, **kwargs):
     return source_collection
 
 
-_output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
-                                    output_description=_output,
-                                    publishing_data_proxy=PublishingDataProxy)
-
-
 @contextmanager
 def scoped_communicator(original_comm, requested_size: int = None):
     from gmxapi.simulation.context import _acquire_communicator, _get_ensemble_communicator
@@ -178,6 +177,7 @@ class LegacyImplementationSubscription(object):
         workdir_list = ['{node}_{member}'.format(node=resource_manager.operation_id,
                                                  member=member)
                         for member in range(ensemble_width)]
+        parameters_dict_list = [{}] * ensemble_width
 
         # This is a reasonable place to start using MPI ensemble implementation details.
         # We will want better abstraction in the future, but it is best if related filesystem
@@ -216,7 +216,7 @@ class LegacyImplementationSubscription(object):
                             # It is unspecified by the API, but at least through gmxapi 0.1,
                             # all simulations are initialized with a checkpoint file named state.cpt
                             # (see src/api/cpp/context.cpp)
-                            checkpoint_file = os.path.join(self.workdir, 'state.cpp')
+                            checkpoint_file = os.path.join(self.workdir, 'state.cpt')
                             expected_working_files.append(checkpoint_file)
 
                             for file in expected_working_files:
@@ -242,15 +242,18 @@ class LegacyImplementationSubscription(object):
                         fileio.write_tpr_file(output=tprfile, input=sim_input)
                     logger.info('Created {} on rank {}'.format(tprfile, context_rank))
 
-                    # Gather the actual working directories from the ensemble members.
+                    # Gather the actual outputs from the ensemble members.
                     if hasattr(ensemble_comm, 'allgather'):
                         # We should not assume that abspath expands the same on different MPI ranks.
                         workdir_list = ensemble_comm.allgather(self.workdir)
                         tpr_filenames = ensemble_comm.allgather(tprfile)
+                        parameters = fileio.read_tpr(tprfile).parameters.extract()
+                        parameters_dict_list = ensemble_comm.allgather(parameters)
                     else:
                         workdir_list = [os.path.abspath(workdir) for workdir in workdir_list]
                         # TODO: If we use better input file names, they need to be updated in multiple places.
                         tpr_filenames = [os.path.join(workdir, 'topol.tpr') for workdir in workdir_list]
+                        parameters_dict_list = [fileio.read_tpr(tprfile).parameters.extract() for tprfile in tpr_filenames]
 
                     logger.debug('Context rank {} acknowledges working directories {}'.format(context_rank,
                                                                                              workdir_list))
@@ -258,7 +261,11 @@ class LegacyImplementationSubscription(object):
                                                                       ensemble_rank,
                                                                       self.workdir
                                                                       ))
-                    work = workflow.from_tpr(tpr_filenames)
+                    # TODO: (#3718) Normalize the way we pass run time parameters to mdrun.
+                    kwargs = getattr(resource_manager, 'mdrun_kwargs', {})
+                    for key, value in kwargs.items():
+                        logger.debug('Adding mdrun run time argument: {}'.format(key + '=' + str(value)))
+                    work = workflow.from_tpr(tpr_filenames, **kwargs)
                     self.workspec = work.workspec
                     context = LegacyContext(work=self.workspec, workdir_list=workdir_list, communicator=ensemble_comm)
                     self.simulation_module_context = context
@@ -275,6 +282,7 @@ class LegacyImplementationSubscription(object):
             # end scoped_communicator: context_comm
 
         self.workdir = workdir_list
+        self.parameters = parameters_dict_list
 
 
 class SubscriptionSessionResources(object):
@@ -295,18 +303,7 @@ class SubscriptionSessionResources(object):
         # if member_id is None:
         #     member_id = 0
         self.workdir = input.workdir[member_id]
-
-    # @staticmethod
-    # def create(input: LegacyImplementationSubscription,
-    #  output: PublishingDataProxy) -> 'SubscriptionSessionResources':
-    #     """Factory function to create new resource instances.
-    #
-    #     It is better to overload a factory member than to write a fancy __init__.
-    #     An alternative to overloading is to make this a classmethod for polymorphic
-    #     creation. Note that Python classes (can) know what their subclasses are.
-    #     """
-    #     if isinstance(input, LegacyImplementationSubscription):
-    #         return _session_resource_factory(input, output)
+        self.parameters = input.parameters[member_id]
 
 
 class SubscriptionPublishingRunner(object):
@@ -322,6 +319,14 @@ class SubscriptionPublishingRunner(object):
         """Operation implementation in the gmxapi.operation module context."""
         publisher = self.resources.output
         publisher._work_dir = self.resources.workdir
+        publisher.parameters = self.resources.parameters
+        # TODO: Make the return value a trajectory handle rather than a file path.
+        # TODO: Decide how to handle append vs. noappend output.
+        # TODO: More rigorous handling of the trajectory file(s)
+        # We have no way to query the name of the trajectory produced, and we
+        # have avoided exposing the ability to specify it, so we have to assume
+        # GROMACS default behavior.
+        publisher.trajectory = os.path.join(self.resources.workdir, 'traj.trr')
 
 
 _next_uid = 0