Python wrapping code for gmxapi mdrun bindings.
[alexxy/gromacs.git] / python_packaging / src / gmxapi / simulation / workflow.py
index 0e504f28bd0bdfb387fda84a643c207393530c13..91e4a77fa6a39ba96081cc144bd8c2643fdced34 100644 (file)
+#
+# This file is part of the GROMACS molecular simulation package.
+#
+# Copyright (c) 2019, by the GROMACS development team, led by
+# Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
+# and including many others, as listed in the AUTHORS file in the
+# top-level source directory and at http://www.gromacs.org.
+#
+# GROMACS is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public License
+# as published by the Free Software Foundation; either version 2.1
+# of the License, or (at your option) any later version.
+#
+# GROMACS is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with GROMACS; if not, see
+# http://www.gnu.org/licenses, or write to the Free Software Foundation,
+# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
+#
+# If you want to redistribute modifications to GROMACS, please
+# consider that scientific software is very special. Version
+# control is crucial - bugs must be traceable. We will be happy to
+# consider code for inclusion in the official distribution, but
+# derived work must not be called official GROMACS. Details are found
+# in the README & COPYING files - if they are missing, get the
+# official version at http://www.gromacs.org.
+#
+# To help us fund GROMACS development, we humbly ask that you cite
+# the research papers on the package. Check out http://www.gromacs.org.
+#
+# This file is based on the Kasson Lab gmxapi project release 0.0.7.4.
+# https://github.com/kassonlab/gmxapi/blob/v0.0.7.4/src/gmx/workflow.py
+# # https://github.com/kassonlab/gmxapi/blob/v0.0.7.4/LICENSE
 """
 Provide workflow-level utilities and classes
 ============================================
 
-Single-sim example:
-
-.. code-block:: python
-
-    >>> md = gmx.workflow.from_tpr(filename)
-    >>> gmx.run(md)
-    >>>
-    >>> # The above is shorthand for
-    >>> md = gmx.workflow.from_tpr(filename)
-    >>> with gmx.get_context(md.workspec) as session:
-    ...    session.run()
-
-Array sim example:
-
-.. code-block:: python
-
-    >>> md = gmx.workflow.from_tpr([filename1, filename2])
-    >>> gmx.run(md)
-
-The representation of work and the way it is dispatched are areas of active
-development. See also https://github.com/kassonlab/gmxapi/milestone/3
+Supports the implementation of operations in the gmxapi.simulation module.
 """
 
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-__all__ = ['WorkSpec', 'WorkElement']
+__all__ = ['from_tpr', 'WorkSpec', 'WorkElement']
 
+import collections
 import warnings
 import weakref
+from typing import Text, Iterable, Set
 
-from gmx import exceptions
-from gmx import logging
-from gmx.util import to_string
-from gmx.util import to_utf8
+import gmxapi as gmx
+from gmxapi import exceptions
 
 # Module-level logger
-logger = logging.getLogger(__name__)
+logger = gmx.logger.getChild(__name__)
 logger.info('Importing gmx.workflow')
 
 # Work specification version string.
 workspec_version = "gmxapi_workspec_0_1"
 logger.info("Using schema version {}.".format(workspec_version))
 
-# module-level constant indicating a workflow implementing parallel array work.
-# ARRAY = 0
+
+def to_utf8(input) -> bytes:
+    """Return a utf8 encoded byte sequence of the Unicode ``input`` or its string representation.
+
+    Returns:
+         :py:bytes byte sequence.
+    """
+    if isinstance(input, str):
+        value = input.encode('utf-8')
+    elif isinstance(input, bytes):
+        value = input
+    else:
+        try:
+            string = str(input)
+            value = string.encode('utf-8')
+        except Exception as e:
+            raise exceptions.ValueError("Input cannot be interpreted as a UTF-8 compatible string.") from e
+    return value
+
+
+def to_string(input) -> str:
+    """Return a Unicode string representation of ``input``.
+
+    If ``input`` or its string representation is not already a Unicode object, attempt to decode as utf-8.
+
+    Returns a native string, decoding utf-8 encoded byte sequences if necessary.
+    """
+    if isinstance(input, str):
+        value = input
+    else:
+        try:
+            value = input.decode('utf-8')
+        except Exception:
+            try:
+                value = str(input)
+            except Exception as e:
+                raise exceptions.ValueError("Cannot find a string representation of input.") from e
+    return value
 
 
 class GmxMap(dict):
     """Utility/compatibility class to ensure consistent keys.
 
-    Allow subscripting to use native str or Python 2 Unicode objects.
     Internally, converts all keys to native str for the current interpreter.
     """
     def keys(self):
@@ -163,7 +210,7 @@ class WorkSpec(object):
         # not own the context.
         self.__context_weak_ref = weakref.ref(context)
 
-    def _chase_deps(self, source_set, name_list):
+    def __chase_deps(self, source_set: Set[str], name_list: Iterable[Text]):
         """Helper to recursively generate dependencies before dependents.
 
         Given a set of WorkElement objects and a list of element names, generate WorkElements for
@@ -172,13 +219,13 @@ class WorkSpec(object):
 
         For example, to sequence an entire work specification into a reasonable order for instantiation, use
 
-            >>> workspec._chase_deps(set(workspec.elements.keys()), list(workspec.elements.keys()))
+            >>> workspec.__chase_deps(set(workspec.elements.keys()), list(workspec.elements.keys()))
 
         Note: as a member function of WorkSpec, we have access to the full WorkSpec elements data at all
         times, giving us extra flexibility in implementation and arguments.
 
         Args:
-            sources: a (super)set of element names from the current work spec (will be consumed)
+            source_set: a (super)set of element names from the current work spec (will be consumed)
             name_list: subset of *sources* to be sequenced
 
         Returns:
@@ -188,8 +235,9 @@ class WorkSpec(object):
         they depend are all named in *source_list* and available in the current
         work spec.
 
-        Note: *source_set* is a reference to an object that is modified arbitrarily.
+        Warning: *source_set* is a reference to an object that is modified arbitrarily.
         The caller should not re-use the object after calling _chase_deps().
+        (Make a copy first, if needed.)
 
         TODO: Separate out DAG topology operations from here and Context.__enter__()
         Our needs are simple enough that we probably don't need an external dependency
@@ -197,13 +245,14 @@ class WorkSpec(object):
         """
         # Recursively (depth-first) generate a topologically valid serialized DAG from source_set.
         assert isinstance(source_set, set)
-        # Warning: This is not at all a rigorous check.
-        # It is hard to check whether this is string-like or list-like in both Py 2.7 and 3.x
-        if not isinstance(name_list, (list, tuple, set)):
-            raise exceptions.ValueError('Must disambiguate "name_list" by passing a list or tuple.')
+        if isinstance(name_list, (str, bytes)):
+            warnings.warn('name_list appears to be a single name. Disambiguate a string by passing a list or tuple.')
+        assert isinstance(name_list, collections.abc.Iterable)
+
         # Make a copy of name_list in case the input reference is being used elsewhere during
         # iteration, such as for source_set, which is modified during the loop.
         for name in tuple(name_list):
+            assert isinstance(name, str)
             if name in source_set:
                 source_set.remove(name)
                 element = WorkElement.deserialize(self.elements[name], name=name, workspec=self)
@@ -218,7 +267,7 @@ class WorkSpec(object):
                                 'Dependencies should be a string or sequence of strings. Got {}'.format(type(item)))
                         dependency_list = [item]
                     for dependency in dependency_list:
-                        for recursive_dep in self._chase_deps(source_set, (dependency,)):
+                        for recursive_dep in self.__chase_deps(source_set, (dependency,)):
                             yield recursive_dep
                 yield element
             else:
@@ -229,15 +278,15 @@ class WorkSpec(object):
 
     def __iter__(self):
         source_set = set(self.elements.keys())
-        for element in self._chase_deps(source_set, source_set):
+        for element in self.__chase_deps(source_set, source_set):
             yield element
 
     def __hash__(self):
         """Uniquely identify this work specification.
 
-        Allows the spec to be used as a dictionary key in Python. Note that this hash is possibly dependent on the Python
-        implementation. It is not part of the gmxapi specification and should not be used outside of a single invocation
-        of a script.
+        Allows the spec to be used as a dictionary key in Python. Note that this hash is possibly dependent on the
+        Python implementation. It is not part of the gmxapi specification and should not be used outside of a single
+        invocation of a script.
         """
         # Hash the serialized elements, concatenated as a single string. Note that the order of elements and their
         # contents is not guaranteed, but should be consistent within a script invocation.
@@ -260,45 +309,20 @@ class WorkSpec(object):
             if hasattr(element, "depends"):
                 for dependency in element.depends:
                     if not dependency in self.elements:
-                        raise exceptions.UsageError("Element dependencies must already be specified before an Element may be added.")
+                        raise exceptions.UsageError(
+                            "Element dependencies must already be specified before an Element may be added.")
             # Okay, it looks like we have an element we can add
             if hasattr(element, "workspec") and element.workspec is not None and element.workspec is not self:
-                raise exceptions.Error("Element must be removed from its current WorkSpec to be added to this WorkSpec, but element removal is not yet implemented.")
+                raise exceptions.Error(
+                    "Element must be removed from its current WorkSpec to be added to this WorkSpec, but element "
+                    "removal is not yet implemented.")
             self.elements[element.name] = element.serialize()
             element.workspec = self
         else:
-            raise exceptions.ValueError("Provided object does not appear to be compatible with gmx.workflow.WorkElement.")
+            raise exceptions.ValueError(
+                "Provided object does not appear to be compatible with gmx.workflow.WorkElement.")
         logger.info("Added element {} to workspec.".format(element.name))
 
-    # def remove_element(self, name):
-    #     """Remove named element from work specification.
-    #
-    #     Does not delete references to WorkElement objects, but WorkElement objects will be moved to a None WorkSpec."""
-    # To implement, WorkElement attributes should be reworked as properties that dynamically act on the
-    # workspec reference. Additionally, WorkSpec may have to keep weak references to WorkElements in order
-    # to reset the WorkElement.workspec strong reference.
-
-    # def add(self, spec):
-    #     """
-    #     Merge the provided spec into this one.
-    #
-    #     We can't easily replace references to ``spec`` with references to the WorkSpec we are merging into, but we can
-    #     steal the work elements out of ``spec`` and leave it empty. We could also set an ``alias`` attribute in it or
-    #     something, but that seems unnecessary. Alternatively, we can set the new and old spec to be equal, but we would
-    #     need an additional abstraction layer to keep them from diverging again. Since client code will retain references
-    #     to the elements in the work spec, we need to be clear about when we are duplicating a WorkSpec versus obtaining
-    #     different references to the same.
-    #
-    #     This is an implementation detail that can be unresolved and hidden for now. The high-level interface only
-    #     requires that client code can bind different workflow elements together in a sensible way and get expected
-    #     results.
-    #
-    #     :param spec: WorkSpec to be merged into this one.
-    #     :return:
-    #
-    #     To do: consider instead a gmx.workflow.merge(workspecA, workspecB) free function that returns a new WorkSpec.
-    #     """
-
     def serialize(self):
         """Serialize the work specification in a form suitable to pass to any Context implementation.
 
@@ -328,7 +352,7 @@ class WorkSpec(object):
                                }
         for name, element in [(e, json.loads(to_string(self.elements[e]))) for e in sorted(self.elements.keys())]:
             dict_representation['elements'][str(name)] = element
-        serialization = json.dumps(dict_representation, ensure_ascii=True, sort_keys=True, separators=(',',':'))
+        serialization = json.dumps(dict_representation, ensure_ascii=True, sort_keys=True, separators=(',', ':'))
         return serialization.encode('utf-8')
 
     @classmethod
@@ -340,7 +364,7 @@ class WorkSpec(object):
         ver_out = workspec.version
         if ver_in != ver_out:
             message = "Expected work spec version {}. Got work spec version {}.".format(ver_out, ver_in)
-            raise exceptions.CompatibilityError(message)
+            raise exceptions.ValueError(message)
         for element in dict_representation['elements']:
             workspec.elements[element] = dict_representation['elements'][element]
         return workspec
@@ -574,7 +598,7 @@ def from_tpr(input=None, **kwargs):
 
     Setting ``end_time`` redefines the end point of the simulation trajectory from what was provided in
     ``input``. It is equivalent to changing the number of steps requested in the MDP (or TPR) input, but
-    it time is provided as picoseconds instead of a number of time steps.
+    the time is provided as picoseconds instead of a number of time steps.
 
     .. deprecated:: 0.0.7
         If ``steps=N`` is provided and N is an integer
@@ -594,12 +618,15 @@ def from_tpr(input=None, **kwargs):
         grid (tuple): Domain decomposition grid divisions (nx, ny, nz). (-dd)
         max_hours (float): Terminate after 0.99 times this many hours if simulation is still running. (-maxh)
         pme_ranks (int): number of separate ranks to be used for PME electrostatics. (-npme)
-        pme_threads_per_rank (int): Number of OpenMP threads per PME rank. (-ntomp_pme)
+        threads_per_pme_rank (int): Number of OpenMP threads per PME rank. (-ntomp_pme)
         steps (int): Override input files and run for this many steps. (-nsteps; deprecated)
         threads (int): Total number of threads to start. (-nt)
         threads_per_rank (int): number of OpenMP threads to start per MPI rank. (-ntomp)
         tmpi (int): number of thread-MPI ranks to start. (-ntmpi)
 
+    ..  versionchanged:: 0.1
+        *pme_threads_per_rank* renamed to *threads_per_pme_rank*.
+
     Returns:
         simulation member of a gmx.workflow.WorkSpec object
 
@@ -654,8 +681,13 @@ def from_tpr(input=None, **kwargs):
             params['tmpi'] = int(kwargs[arg_key])
         elif arg_key == 'threads_per_rank' or arg_key == 'ntomp':
             params['threads_per_rank'] = int(kwargs[arg_key])
-        elif arg_key == 'pme_threads_per_rank' or arg_key == 'ntomp_pme':
-            params['pme_threads_per_rank'] = int(kwargs[arg_key])
+        elif arg_key == 'pme_threads_per_rank' or arg_key == 'threads_per_pme_rank' or arg_key == 'ntomp_pme':
+            # TODO: Remove this temporary accommodation.
+            assert not gmx.version.api_is_at_least(0, 2)
+            if arg_key == 'pme_threads_per_rank':
+                warnings.warn("Key word pme_threads_per_rank has been renamed to threads_per_pme_rank.",
+                              DeprecationWarning)
+            params['threads_per_pme_rank'] = int(kwargs[arg_key])
         elif arg_key == 'steps' or arg_key == 'nsteps':
             if kwargs[arg_key] is None:
                 # None means "don't override the input" which is indicated by a parameter value of -2 in GROMACS 2019