Fix typos.
[alexxy/gromacs.git] / python_packaging / src / gmxapi / operation.py
1 #
2 # This file is part of the GROMACS molecular simulation package.
3 #
4 # Copyright (c) 2019, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
8 #
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
13 #
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 # Lesser General Public License for more details.
18 #
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
23 #
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
31 #
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
34
35 """Define gmxapi-compliant Operations
36
37 Provide decorators and base classes to generate and validate gmxapi Operations.
38
39 Nodes in a work graph are created as instances of Operations. An Operation factory
40 accepts well-defined inputs as key word arguments. The object returned by such
41 a factory is a handle to the node in the work graph. It's ``output`` attribute
42 is a collection of the Operation's results.
43
44 function_wrapper(...) produces a wrapper that converts a function to an Operation
45 factory. The Operation is defined when the wrapper is called. The Operation is
46 instantiated when the factory is called. The function is executed when the Operation
47 instance is run.
48
49 The framework ensures that an Operation instance is executed no more than once.
50 """
51
52 __all__ = ['computed_result',
53            'function_wrapper',
54            ]
55
56 import abc
57 import collections
58 import functools
59 import inspect
60 import typing
61 import weakref
62 from contextlib import contextmanager
63
64 import gmxapi as gmx
65 from gmxapi import datamodel
66 from gmxapi import exceptions
67 from gmxapi import logger as root_logger
68 from gmxapi.abc import OperationImplementation, MutableResource, Node
69 from gmxapi.typing import _Context, ResultTypeVar, SourceTypeVar, valid_result_types, valid_source_types
70 from gmxapi.typing import Future as _Future
71
72 # Initialize module-level logger
73 logger = root_logger.getChild('operation')
74 logger.info('Importing {}'.format(__name__))
75
76
77 class ResultDescription:
78     """Describe what will be returned when `result()` is called."""
79
80     def __init__(self, dtype=None, width=1):
81         assert isinstance(dtype, type)
82         assert issubclass(dtype, valid_result_types)
83         assert isinstance(width, int)
84         self._dtype = dtype
85         self._width = width
86
87     @property
88     def dtype(self) -> type:
89         """node output type"""
90         return self._dtype
91
92     @property
93     def width(self) -> int:
94         """ensemble width"""
95         return self._width
96
97     def __repr__(self):
98         return '{}(dtype={}, width={})'.format(self.__class__.__name__, self.dtype, self.width)
99
100
101 class OutputData(object):
102     """Encapsulate the description and storage of a data output."""
103
104     def __init__(self, name: str, description: ResultDescription):
105         assert name != ''
106         self._name = name
107         assert isinstance(description, ResultDescription)
108         self._description = description
109         self._done = [False] * self._description.width
110         self._data = [None] * self._description.width
111
112     @property
113     def name(self):
114         """The name of the published output."""
115         return self._name
116
117     # TODO: Change to regular member function and add ensemble member arg.
118     @property
119     def done(self):
120         """Ensemble completion status for this output."""
121         return all(self._done)
122
123     def data(self, member: int = None):
124         """Access the raw data for localized output for the ensemble or the specified member."""
125         if not self.done:
126             raise exceptions.ApiError('Attempt to read before data has been published.')
127         if self._data is None or None in self._data:
128             raise exceptions.ApiError('Data marked "done" but contains null value.')
129         if member is not None:
130             return self._data[member]
131         else:
132             return self._data
133
134     def set(self, value, member: int):
135         """Set local data and mark as completed.
136
137         Used internally to maintain the data store.
138         """
139         if self._description.dtype == datamodel.NDArray:
140             self._data[member] = datamodel.ndarray(value)
141         else:
142             self._data[member] = self._description.dtype(value)
143         self._done[member] = True
144
145     def reset(self):
146         """Reinitialize the data store.
147
148         Note:
149             This is a workaround until the execution model is more developed.
150
151         Todo:
152             Remove this method when all operation handles provide factories to
153             reinstantiate on new Contexts and/or with new inputs.
154
155         """
156         self._done = [False] * self._description.width
157         self._data = [None] * self._description.width
158
159
160 class EnsembleDataSource(gmx.abc.EnsembleDataSource):
161     """A single source of data with ensemble data flow annotations.
162
163     Note that data sources may be Futures.
164     """
165
166     def __init__(self, source=None, width=1, dtype=None):
167         self.source = source
168         self.width = width
169         self.dtype = dtype
170
171     def node(self, member: int):
172         return self.source[member]
173
174     def reset(self):
175         protocols = ('reset', '_reset')
176         for protocol in protocols:
177             if hasattr(self.source, protocol):
178                 getattr(self.source, protocol)()
179
180
181 class DataSourceCollection(collections.OrderedDict):
182     """Store and describe input data handles for an operation.
183
184     When created from InputCollectionDescription.bind(), the DataSourceCollection
185     has had default values inserted.
186
187     Note: DataSourceCollection is the input resource collection type for NodeBuilder.
188     To use with the NodeBuilder interface, first create the collection, then
189     iteratively add its resources.
190
191     TODO: We should probably normalize the collection aspect of input. Is this a
192           single input? Are all inputs added as collections? Should this be split
193           to a standard iterable? Does a resource factory always produce a mapping?
194     """
195
196     def __init__(self, **kwargs):
197         """Initialize from key/value pairs of named data sources.
198
199         Data sources may be any of the basic gmxapi data types, gmxapi Futures
200         of those types, or gmxapi ensemble data bundles of the above.
201         """
202         super(DataSourceCollection, self).__init__()
203         for name, value in kwargs.items():
204             self[name] = value
205
206     def __setitem__(self, key: str, value: SourceTypeVar) -> None:
207         if not isinstance(key, str):
208             raise exceptions.TypeError('Data must be named with str type.')
209         # TODO: Encapsulate handling of proferred data sources to Context details.
210         # Preprocessed input should be self-describing gmxapi data types. Structured
211         # input must be recursively (depth-first) converted to gmxapi data types.
212         # TODO: Handle gmxapi Futures stored as dictionary elements!
213         if not isinstance(value, valid_source_types):
214             if isinstance(value, collections.abc.Iterable):
215                 # Iterables here are treated as arrays, but we do not have a robust typing system.
216                 # Warning: In the initial implementation, the iterable may contain Future objects.
217                 # TODO: (#2993) Revisit as we sort out data shape and Future protocol.
218                 try:
219                     value = datamodel.ndarray(value)
220                 except (exceptions.ValueError, exceptions.TypeError) as e:
221                     raise exceptions.TypeError('Iterable could not be converted to NDArray: {}'.format(value)) from e
222             elif hasattr(value, 'result'):
223                 # A Future object.
224                 pass
225             else:
226                 raise exceptions.ApiError('Cannot process data source {}'.format(value))
227         super().__setitem__(key, value)
228
229     def reset(self):
230         """Reset all sources in the collection."""
231         for source in self.values():
232             if hasattr(source, 'reset'):
233                 source.reset()
234             if hasattr(source, '_reset'):
235                 source._reset()
236
237     def __hash__(self):
238         """Provide some sort of unique identifier.
239
240         We need a more deterministic fingerprinting scheme with well-specified
241         uniqueness semantics, but right now we just need something reasonably
242         unique.
243         """
244         hashed_keys_and_values = tuple(hash(entity) for item in self.items() for entity in item)
245         return hash(hashed_keys_and_values)
246
247
248 def computed_result(function):
249     """Decorate a function to get a helper that produces an object with Result behavior.
250
251     When called, the new function produces an ImmediateResult object.
252
253     The new function has the same signature as the original function, but can accept
254     gmxapi data proxies, assuming the provided proxy objects represent types
255     compatible with the original signature.
256
257     Calls to `result()` return the value that `function` would return when executed
258     in the local context with the inputs fully resolved.
259
260     The API does not specify when input data dependencies will be resolved
261     or when the wrapped function will be executed. That is, ``@computed_result``
262     functions may force immediate resolution of data dependencies and/or may
263     be called more than once to satisfy dependent operation inputs.
264     """
265
266     # Attempt to inspect function signature. Introspection can fail, so we catch
267     # the various exceptions. We re-raise as ApiError, indicating a bug, because
268     # we should do more to handle this intelligently and provide better user
269     # feedback.
270     # TODO: Figure out what to do with exceptions where this introspection
271     #  and rebinding won't work.
272     # ref: https://docs.python.org/3/library/inspect.html#introspecting-callables-with-the-signature-object
273     try:
274         sig = inspect.signature(function)
275     except TypeError as T:
276         raise exceptions.ApiError('Can not inspect type of provided function argument.') from T
277     except ValueError as V:
278         raise exceptions.ApiError('Can not inspect provided function signature.') from V
279
280     wrapped_function = function_wrapper()(function)
281
282     @functools.wraps(function)
283     def new_function(*args, **kwargs):
284         # The signature of the new function will accept abstractions
285         # of whatever types it originally accepted. This wrapper must
286         # * Create a mapping to the original call signature from `input`
287         # * Add handling for typed abstractions in wrapper function.
288         # * Process arguments to the wrapper function into `input`
289
290         # 1. Inspect the return annotation to determine valid gmxapi type(s)
291         # 2. Generate a Result object advertising the correct type, bound to the
292         #    Input and implementing function.
293         # 3. Transform the result() data to the correct type.
294
295         # TODO: (FR3+) create a serializable data structure for inputs discovered
296         #  from function introspection.
297
298         for name, param in sig.parameters.items():
299             assert not param.kind == param.POSITIONAL_ONLY
300         bound_arguments = sig.bind(*args, **kwargs)
301         handle = wrapped_function(**bound_arguments.arguments)
302         output = handle.output
303         # TODO: Find a type hinting / generic way to assert output attributes.
304         return output.data
305
306     return new_function
307
308
309 class OutputCollectionDescription(collections.OrderedDict):
310     def __init__(self, **kwargs):
311         """Create the output description for an operation node from a dictionary of names and types."""
312         outputs = []
313         for name, flavor in kwargs.items():
314             if not isinstance(name, str):
315                 raise exceptions.TypeError('Output descriptions are keyed by Python strings.')
316             # Multidimensional outputs are explicitly NDArray
317             if issubclass(flavor, (list, tuple)):
318                 flavor = datamodel.NDArray
319             assert issubclass(flavor, valid_result_types)
320             outputs.append((name, flavor))
321         super().__init__(outputs)
322
323
324 class InputCollectionDescription(collections.OrderedDict):
325     """Describe acceptable inputs for an Operation.
326
327     Generally, an InputCollectionDescription is an aspect of the public API by
328     which an Operation expresses its possible inputs. This class includes details
329     of the Python package.
330
331     Keyword Arguments:
332         parameters : A sequence of named parameter descriptions.
333
334     Parameter descriptions are objects containing an `annotation` attribute
335     declaring the data type of the parameter and, optionally, a `default`
336     attribute declaring a default value for the parameter.
337
338     Instances can be used as an ordered map of parameter names to gmxapi data types.
339
340     Analogous to inspect.Signature, but generalized for gmxapi Operations.
341     Additional notable differences: typing is normalized at initialization, and
342     the bind() method does not return an object that can be directly used as
343     function input. The object returned from bind() is used to construct a data
344     graph Edge for subsequent execution.
345     """
346
347     def __init__(self, parameters: typing.Iterable[typing.Tuple[str, inspect.Parameter]]):
348         """Create the input description for an operation node from a dictionary of names and types."""
349         inputs = []
350         for name, param in parameters:
351             if not isinstance(name, str):
352                 raise exceptions.TypeError('Input descriptions are keyed by Python strings.')
353             # Multidimensional inputs are explicitly NDArray
354             dtype = param.annotation
355             if issubclass(dtype, collections.abc.Iterable) \
356                     and not issubclass(dtype, (str, bytes, collections.abc.Mapping)):
357                 # TODO: we can relax this with some more input conditioning.
358                 if dtype != datamodel.NDArray:
359                     raise exceptions.UsageError(
360                         'Cannot accept input type {}. Sequence type inputs must use NDArray.'.format(param))
361             assert issubclass(dtype, valid_result_types)
362             if hasattr(param, 'kind'):
363                 disallowed = any([param.kind == param.POSITIONAL_ONLY,
364                                   param.kind == param.VAR_POSITIONAL,
365                                   param.kind == param.VAR_KEYWORD])
366                 if disallowed:
367                     raise exceptions.ProtocolError(
368                         'Cannot wrap function. Operations must have well-defined parameter names.')
369                 kind = param.kind
370             else:
371                 kind = inspect.Parameter.POSITIONAL_OR_KEYWORD
372             if hasattr(param, 'default'):
373                 default = param.default
374             else:
375                 default = inspect.Parameter.empty
376             inputs.append(inspect.Parameter(name, kind, default=default, annotation=dtype))
377         super().__init__([(input.name, input.annotation) for input in inputs])
378         self.signature = inspect.Signature(inputs)
379
380     @staticmethod
381     def from_function(function):
382         """Inspect a function to be wrapped.
383
384         Used internally by gmxapi.operation.function_wrapper()
385
386             Raises:
387                 exceptions.ProtocolError if function signature cannot be determined to be valid.
388
389             Returns:
390                 InputCollectionDescription for the function input signature.
391         """
392         # First, inspect the function.
393         assert callable(function)
394         signature = inspect.signature(function)
395         # The function must have clear and static input schema
396         # Make sure that all parameters have clear names, whether or not they are used in a call.
397         for name, param in signature.parameters.items():
398             disallowed = any([param.kind == param.POSITIONAL_ONLY,
399                               param.kind == param.VAR_POSITIONAL,
400                               param.kind == param.VAR_KEYWORD])
401             if disallowed:
402                 raise exceptions.ProtocolError(
403                     'Cannot wrap function. Operations must have well-defined parameter names.')
404             if param.name == 'input':
405                 raise exceptions.ProtocolError('Function signature includes the (reserved) "input" keyword argument.')
406         description = collections.OrderedDict()
407         for param in signature.parameters.values():
408             if param.name == 'output':
409                 # Wrapped functions may accept the output parameter to publish results, but
410                 # that is not part of the Operation input signature.
411                 continue
412             if param.annotation == param.empty:
413                 if param.default == param.empty or param.default is None:
414                     raise exceptions.ProtocolError('Could not infer parameter type for {}'.format(param.name))
415                 dtype = type(param.default)
416                 if isinstance(dtype, collections.abc.Iterable) \
417                         and not isinstance(dtype, (str, bytes, collections.abc.Mapping)):
418                     dtype = datamodel.NDArray
419             else:
420                 dtype = param.annotation
421             description[param.name] = param.replace(annotation=dtype)
422         return InputCollectionDescription(description.items())
423
424     def bind(self, *args, **kwargs) -> DataSourceCollection:
425         """Create a compatible DataSourceCollection from provided arguments.
426
427         Pre-process input and function signature to get named input arguments.
428
429         This is a helper function to allow calling code to characterize the
430         arguments in a Python function call with hints from the factory that is
431         initializing an operation. Its most useful functionality is to  allows a
432         factory to accept positional arguments where named inputs are usually
433         required. It also allows data sources to participate in multiple
434         DataSourceCollections with minimal constraints.
435
436         Note that the returned object has had data populated from any defaults
437         described in the InputCollectionDescription.
438
439         See wrapped_function_runner() and describe_function_input().
440         """
441         # For convenience, accept *args, but convert to **kwargs to pass to Operation.
442         # Factory accepts an unadvertised `input` keyword argument that is used as a default kwargs dict.
443         # If present, kwargs['input'] is treated as an input "pack" providing _default_ values.
444         input_kwargs = collections.OrderedDict()
445         if 'input' in kwargs:
446             provided_input = kwargs.pop('input')
447             if provided_input is not None:
448                 input_kwargs.update(provided_input)
449         # `function` may accept an `output` keyword argument that should not be supplied to the factory.
450         for key, value in kwargs.items():
451             if key == 'output':
452                 raise exceptions.UsageError('Invalid keyword argument: output (reserved).')
453             input_kwargs[key] = value
454         try:
455             bound_arguments = self.signature.bind_partial(*args, **input_kwargs)
456         except TypeError as e:
457             raise exceptions.UsageError('Could not bind operation parameters to function signature.') from e
458         assert 'output' not in bound_arguments.arguments
459         bound_arguments.apply_defaults()
460         assert 'input' not in bound_arguments.arguments
461         input_kwargs = collections.OrderedDict([pair for pair in bound_arguments.arguments.items()])
462         if 'output' in input_kwargs:
463             input_kwargs.pop('output')
464         return DataSourceCollection(**input_kwargs)
465
466
467 class ProxyDataDescriptor(object):
468     """Base class for data descriptors used in DataProxyBase subclasses.
469
470     Subclasses should either not define __init__ or should call the base class
471     __init__ explicitly: super().__init__(self, name, dtype)
472     """
473
474     def __init__(self, name: str, dtype: ResultTypeVar = None):
475         self._name = name
476         # TODO: We should not allow dtype==None, but we currently have a weak data
477         #  model that does not allow good support of structured Futures.
478         if dtype is not None:
479             assert isinstance(dtype, type)
480             assert issubclass(dtype, valid_result_types)
481         self._dtype = dtype
482
483
484 class DataProxyMeta(abc.ABCMeta):
485     # Key word arguments consumed by __prepare__
486     _prepare_keywords = ('descriptors',)
487
488     @classmethod
489     def __prepare__(mcs, name, bases, descriptors: collections.abc.Mapping = None):
490         """Allow dynamic sub-classing.
491
492         DataProxy class definitions are collections of data descriptors. This
493         class method allows subclasses to give the descriptor names and type(s)
494         in the class declaration as arguments instead of as class attribute
495         assignments.
496
497             class MyProxy(DataProxyBase, descriptors={name: MyDescriptor() for name in datanames}): pass
498
499         Note:
500             If we are only using this metaclass for the __prepare__ hook by the
501             time we require Python >= 3.6, we could reimplement __prepare__ as
502             DataProxyBase.__init_subclass__ and remove this metaclass.
503         """
504         if descriptors is None:
505             return {}
506         elif isinstance(descriptors, tuple):
507             namespace = collections.OrderedDict([(d._name, d) for d in descriptors])
508             return namespace
509         else:
510             assert isinstance(descriptors, collections.abc.Mapping)
511             return descriptors
512
513     def __new__(cls, name, bases: typing.Iterable, namespace, **kwargs):
514         for key in kwargs:
515             if key not in DataProxyMeta._prepare_keywords:
516                 raise exceptions.ApiError('Unexpected class creation keyword: {}'.format(key))
517         # See note about DataProxyBase._reserved.
518         if '_reserved' not in namespace and not any(hasattr(base, '_reserved') for base in bases):
519             raise exceptions.ApiError(
520                 'We currently expect DataProxy classes to provide a list of reserved attribute names.')
521         for key in namespace:
522             # Here we can check conformance with naming and typing rules.
523             assert isinstance(key, str)
524             if key.startswith('__'):
525                 # Skip non-public attributes.
526                 continue
527             descriptor = namespace[key]
528             # The purpose of the current data proxies is to serve as a limited namespace
529             # containing only descriptors of a certain type. In the future, these proxies
530             # may be flattened into a facet of a richer OperationHandle class
531             # (this metaclass may become a decorator on an implementation class),
532             # but for now we check that the class is being used according to the
533             # documented framework. A nearer term update could be to restrict the
534             # type of the data descriptor:
535             # TODO: Use a member type of the derived cls (or a mix-in base) to specify a particular
536             #  ProxyDataDescriptor subclass.
537             # Also, see note about DataProxyBase._reserved
538             if not isinstance(descriptor, ProxyDataDescriptor):
539                 if key not in namespace['_reserved'] and not any(key in getattr(base, '_reserved') for base in
540                                                                  bases if hasattr(base, '_reserved')):
541                     raise exceptions.ApiError('Unexpected data proxy attribute {}: {}'.format(key, repr(descriptor)))
542             else:
543                 assert isinstance(descriptor, ProxyDataDescriptor)
544                 if not isinstance(descriptor._name, str) or descriptor._name == '':
545                     descriptor._name = key
546                 else:
547                     if descriptor._name != key:
548                         raise exceptions.ApiError(
549                             'Descriptor internal name {} does not match attribute name {}'.format(
550                                 descriptor._name, key))
551         return super().__new__(cls, name, bases, namespace)
552
553     # TODO: This keyword argument stripping is not necessary in more recent Python versions.
554     # When Python minimum required version is increased, check if we can remove this.
555     def __init__(cls, name, bases, namespace, **kwargs):
556         for key in kwargs:
557             if key not in DataProxyMeta._prepare_keywords:
558                 raise exceptions.ApiError('Unexpected class initialization keyword: {}'.format(key))
559         super().__init__(name, bases, namespace)
560
561     # TODO: See if we can use __dir__ in the metaclass to help hint class attributes for better tab completion.
562     #  Ref: https://ipython.readthedocs.io/en/stable/config/integrating.html#tab-completion
563     # def __dir__(self) -> Iterable[str]:
564     #     return super().__dir__()
565
566
567 class DataProxyBase(collections.abc.Mapping, metaclass=DataProxyMeta):
568     """Limited interface to managed resources.
569
570     Inherit from DataProxy to specialize an interface to an ``instance``.
571     In the derived class, either do not define ``__init__`` or be sure to
572     initialize the super class (DataProxy) with an instance of the object
573     to be proxied.
574
575     A class deriving from DataProxyBase allows its instances to provide a namespace
576     for proxies to named data by defining attributes that are data descriptors
577     (subclasses of ProxyDataDescriptor).
578     The ProxyDataDescriptors are accessed as attributes of the
579     data proxy instance or by iterating on items(). Attributes that are not
580     ProxyDataDescriptors are possible, but will not be returned by items() which
581     is a necessary part of gmxapi execution protocol.
582
583     Acts as an owning handle to the resources provide by ``instance``,
584     preventing the reference count of ``instance`` from going to zero for the
585     lifetime of the proxy object.
586
587     When sub-classing DataProxyBase, data descriptors can be passed as a mapping
588     to the ``descriptors`` key word argument in the class declaration. This
589     allows data proxy subclasses to be easily defined dynamically.
590
591         mydescriptors = {'foo': Publisher('foo', int), 'data': Publisher('data', float)}
592         ...
593         class MyDataProxy(DataProxyBase, descriptors=mydescriptors): pass
594         assert hasattr(MyDataProxy, 'foo')
595
596     """
597     # This class attribute (which subclasses are free to replace to augment) is an
598     # indication of a problem with the current data model. If we are allowing
599     # reserved words that would otherwise be valid data names, there is not a
600     # compelling reason for separate data proxy classes: we throw away the assertion
601     # that we are preparing a clean namespace and we could have accomplished the
602     # class responsibilities in the Operation handle with just descriptor classes.
603     # If we want the clean namespace, we should figure out how to keep this interface
604     # from growing and/or have some "hidden" internal interface.
605     _reserved = ('ensemble_width', 'items', '_reserved')
606
607     # This class can be expanded to be the attachment point for a metaclass for
608     # data proxies such as PublishingDataProxy or OutputDataProxy, which may be
609     # defined very dynamically and concisely as a set of Descriptors and a type()
610     # call.
611     # If development in this direction does not materialize, then this base
612     # class is not very useful and should be removed.
613     def __init__(self, instance: 'SourceResource', client_id: int = None):
614         """Get partial ownership of a resource provider.
615
616         Arguments:
617             instance : resource-owning object
618             client_id : identifier for client holding the resource handle (e.g. ensemble member id)
619
620         If client_id is not provided, the proxy scope is for all clients.
621         """
622         # TODO: Decide whether _resource_instance is public or not.
623         # Note: currently commonly needed for subclass implementations.
624         self._resource_instance = instance
625         # Developer note subclasses should handle self._client_identifier == None
626         self._client_identifier = client_id
627         # Collection is fixed by the time of instance creation, so cache it.
628         self.__keys = tuple([key for key, _ in self.items()])
629         self.__length = len(self.__keys)
630
631     @property
632     def ensemble_width(self) -> int:
633         return self._resource_instance.width()
634
635     @classmethod
636     def items(cls):
637         """Generator for tuples of attribute name and descriptor instance.
638
639         This almost certainly doesn't do quite what we want...
640         """
641         for name, value in cls.__dict__.items():
642             if isinstance(value, ProxyDataDescriptor):
643                 yield name, value
644
645     def __getitem__(self, k):
646         if hasattr(self, k):
647             return getattr(self, k)
648
649     def __len__(self):
650         return self.__length
651
652     def __iter__(self):
653         for key in self.__keys:
654             yield key
655
656
657 class Publisher(ProxyDataDescriptor):
658     """Data descriptor for write access to a specific named data resource.
659
660     For a wrapped function receiving an ``output`` argument, provides the
661     accessors for an attribute on the object passed as ``output``. Maps
662     read and write access by the wrapped function to appropriate details of
663     the resource manager.
664
665     Used internally to implement settable attributes on PublishingDataProxy.
666     Allows PublishingDataProxy to be dynamically defined in the scope of the
667     operation.function_wrapper closure. Each named output is represented by
668     an instance of Publisher in the PublishingDataProxy class definition for
669     the operation.
670
671     Ref: https://docs.python.org/3/reference/datamodel.html#implementing-descriptors
672
673     Collaborations:
674     Relies on implementation details of ResourceManager.
675     """
676
677     def __get__(self, instance: DataProxyBase, owner):
678         if instance is None:
679             # The current access has come through the class attribute of owner class
680             return self
681         resource_manager = instance._resource_instance
682         client_id = instance._client_identifier
683         # TODO: Fix API scope.
684         # Either this class is a detail of the same implementation as ResourceManager,
685         # or we need to enforce that instance._resource_instance provides _data (or equivalent)
686         assert isinstance(resource_manager, ResourceManager)
687         if client_id is None:
688             return getattr(resource_manager._data, self._name)
689         else:
690             return getattr(resource_manager._data, self._name)[client_id]
691
692     def __set__(self, instance: DataProxyBase, value):
693         resource_manager = instance._resource_instance
694         # TODO: Fix API scope.
695         # Either this class is a detail of the same implementation as ResourceManager,
696         # or we need to enforce that instance._resource_instance provides _data (or equivalent)
697         assert isinstance(resource_manager, ResourceManager)
698         client_id = instance._client_identifier
699         resource_manager.set_result(name=self._name, value=value, member=client_id)
700
701     def __repr__(self):
702         return '{}(name={}, dtype={})'.format(self.__class__.__name__,
703                                               self._name,
704                                               self._dtype.__qualname__)
705
706
707 def define_publishing_data_proxy(output_description) -> typing.Type[DataProxyBase]:
708     """Returns a class definition for a PublishingDataProxy for the provided output description."""
709     # This dynamic type creation hides collaborations with things like make_datastore.
710     # We should encapsulate these relationships in Context details, explicit collaborations
711     # between specific operations and Contexts, and in groups of Operation definition helpers.
712
713     descriptors = collections.OrderedDict([(name, Publisher(name)) for name in output_description])
714
715     class PublishingDataProxy(DataProxyBase, descriptors=descriptors):
716         """Handler for write access to the `output` of an operation.
717
718         Acts as a sort of PublisherCollection.
719         """
720
721     return PublishingDataProxy
722
723
724 # get symbols we can use to annotate input and output types more specifically.
725 _OutputDataProxyType = typing.TypeVar('_OutputDataProxyType', bound=DataProxyBase)
726 _PublishingDataProxyType = typing.TypeVar('_PublishingDataProxyType', bound=DataProxyBase)
727 # Currently, the ClientID type is an integer, but this may change.
728 ClientID = typing.NewType('ClientID', int)
729
730
731 class _Resources(typing.Generic[_PublishingDataProxyType]):
732     pass
733
734
735 # TODO: Why generic in publishingdataproxytype?
736 class SourceResource(typing.Generic[_OutputDataProxyType, _PublishingDataProxyType]):
737     """Resource Manager for a data provider.
738
739     Supports Future instances in a particular context.
740     """
741
742     # Note: ResourceManager members not yet included:
743     # future(), _data, set_result.
744
745     # This might not belong here. Maybe separate out for a OperationHandleManager?
746     @abc.abstractmethod
747     def data(self) -> _OutputDataProxyType:
748         """Get the output data proxy."""
749         # Warning: this should probably be renamed, but "output_data_proxy" is already
750         # a member in at least one derived class.
751         ...
752
753     @abc.abstractmethod
754     def is_done(self, name: str) -> bool:
755         return False
756
757     @abc.abstractmethod
758     def get(self, name: str) -> 'OutputData':
759         ...
760
761     @abc.abstractmethod
762     def update_output(self):
763         """Bring the _data member up to date and local."""
764         pass
765
766     @abc.abstractmethod
767     def reset(self):
768         """Recursively reinitialize resources.
769
770         Set the resource manager to its initialized state.
771         All outputs are marked not "done".
772         All inputs supporting the interface have ``_reset()`` called on them.
773         """
774
775     @abc.abstractmethod
776     def width(self) -> int:
777         """Ensemble width of the managed resources."""
778         ...
779
780     @abc.abstractmethod
781     def future(self, name: str, description: ResultDescription) -> 'Future':
782         """Get a Future handle for managed data.
783
784         Resource managers owned by subclasses of gmx.operation.Context provide
785         this method to get references to output data.
786
787         In addition to the interface described by gmx.abc.Future, returned objects
788         provide the interface described by gmx.operation.Future.
789         """
790
791
792 class StaticSourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]):
793     """Provide the resource manager interface for local static data.
794
795     Allow data transformations on the proxied resource.
796
797     Keyword Args:
798         proxied_data: A gmxapi supported data object.
799         width: Size of (one-dimensional) shaped data produced by function.
800         function: Transformation to perform on the managed data.
801
802     The callable passed as ``function`` must accept a single argument. The
803     argument will be an iterable when proxied_data represents an ensemble,
804     or an object of the same type as proxied_data otherwise.
805     """
806
807     def __init__(self, *, name: str, proxied_data, width: int, function: typing.Callable):
808         assert not isinstance(proxied_data, Future)
809         if hasattr(proxied_data, 'width'):
810             # Ensemble data source
811             assert hasattr(proxied_data, 'source')
812             self._result = function(proxied_data.source)
813         else:
814             self._result = function(proxied_data)
815         if width > 1:
816             if isinstance(self._result, (str, bytes)):
817                 # In this case, do not implicitly broadcast
818                 raise exceptions.ValueError('"function" produced data incompatible with "width".')
819             else:
820                 if not isinstance(self._result, collections.abc.Iterable):
821                     raise exceptions.DataShapeError(
822                         'Expected iterable of size {} but "function" result is not iterable.')
823             data = list(self._result)
824             size = len(data)
825             if len(data) != width:
826                 raise exceptions.DataShapeError(
827                     'Expected iterable of size {} but "function" produced a {} of size {}'.format(width, type(data),
828                                                                                                   size))
829             dtype = type(data[0])
830         else:
831             if width != 1:
832                 raise exceptions.ValueError('width must be an integer 1 or greater.')
833             dtype = type(self._result)
834             if issubclass(dtype, (list, tuple)):
835                 dtype = datamodel.NDArray
836                 data = [datamodel.ndarray(self._result)]
837             elif isinstance(self._result, collections.abc.Iterable):
838                 if not isinstance(self._result, (str, bytes, dict)):
839                     raise exceptions.ValueError(
840                         'Expecting width 1 but "function" produced iterable type {}.'.format(type(self._result)))
841                 else:
842                     dtype = str
843                     data = [str(self._result)]
844             else:
845                 data = [self._result]
846         description = ResultDescription(dtype=dtype, width=width)
847         self._data = OutputData(name=name, description=description)
848         for member in range(width):
849             self._data.set(data[member], member=member)
850
851         output_collection_description = OutputCollectionDescription(**{name: dtype})
852         self.output_data_proxy = define_output_data_proxy(output_description=output_collection_description)
853
854     def is_done(self, name: str) -> bool:
855         return True
856
857     def get(self, name: str) -> 'OutputData':
858         assert self._data.name == name
859         return self._data
860
861     def data(self) -> _OutputDataProxyType:
862         return self.output_data_proxy(self)
863
864     def width(self) -> int:
865         # TODO: It looks like the OutputData ResultDescription probably belongs
866         #  in the public interface.
867         return self._data._description.width
868
869     def update_output(self):
870         pass
871
872     def reset(self):
873         pass
874
875     def future(self, name: str, description: ResultDescription) -> 'Future':
876         return Future(self, name, description=description)
877
878
879 class ProxyResourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]):
880     """Act as a resource manager for data managed by another resource manager.
881
882     Allow data transformations on the proxied resource.
883
884     Keyword Args:
885         proxied_future: An object implementing the Future interface.
886         width: Size of (one-dimensional) shaped data produced by function.
887         function: Transformation to perform on the result of proxied_future.
888
889     The callable passed as ``function`` must accept a single argument, which will
890     be an iterable when proxied_future represents an ensemble, or an object of
891     type proxied_future.description.dtype otherwise.
892     """
893
894     def __init__(self, proxied_future: 'Future', width: int, function: typing.Callable):
895         self._done = False
896         self._proxied_future = proxied_future
897         self._width = width
898         self.name = self._proxied_future.name
899         self._result = None
900         assert callable(function)
901         self.function = function
902
903     def width(self) -> int:
904         return self._width
905
906     def reset(self):
907         self._done = False
908         self._proxied_future._reset()
909         self._result = None
910
911     def is_done(self, name: str) -> bool:
912         return self._done
913
914     def get(self, name: str):
915         if name != self.name:
916             raise exceptions.ValueError('Request for unknown data.')
917         if not self.is_done(name):
918             raise exceptions.ProtocolError('Data not ready.')
919         result = self.function(self._result)
920         if self._width != 1:
921             # TODO Fix this typing nightmare:
922             #  ResultDescription should be fully knowable and defined when the resource manager is initialized.
923             data = OutputData(name=self.name, description=ResultDescription(dtype=type(result[0]), width=self._width))
924             for member, value in enumerate(result):
925                 data.set(value, member)
926         else:
927             data = OutputData(name=self.name, description=ResultDescription(dtype=type(result), width=self._width))
928             data.set(result, 0)
929         return data
930
931     def update_output(self):
932         self._result = self._proxied_future.result()
933         self._done = True
934
935     def data(self) -> _OutputDataProxyType:
936         raise exceptions.ApiError('ProxyResourceManager cannot yet manage a full OutputDataProxy.')
937
938     def future(self, name: str, description: ResultDescription):
939         return Future(self, name, description=description)
940
941
942 class AbstractOperation(typing.Generic[_OutputDataProxyType]):
943     """Client interface to an operation instance (graph node).
944
945     Note that this is a generic abstract class. Subclasses should provide a
946     class subscript to help static type checkers.
947     """
948
949     @abc.abstractmethod
950     def run(self):
951         """Assert execution of an operation.
952
953         After calling run(), the operation results are guaranteed to be available
954         in the local context.
955         """
956
957     @property
958     @abc.abstractmethod
959     def output(self) -> _OutputDataProxyType:
960         """Get a proxy collection to the output of the operation.
961
962         Developer note: The 'output' property exists to isolate the namespace of
963         output data from other operation handle attributes and we should consider
964         whether it is actually necessary or helpful. To facilitate its possible
965         future removal, do not enrich its interface beyond that of a collection
966         of OutputDescriptor attributes.
967         """
968         ...
969
970
971 class OperationRegistryKey(collections.namedtuple('OperationRegistryKey', 'namespace name'), collections.abc.Hashable):
972     """Helper class to define the key type for OperationRegistry."""
973
974     def __hash__(self):
975         return hash((self.namespace, self.name))
976
977     def __eq__(self, other):
978         """Test equivalence rather than identity.
979
980         Note: Use `is` to test identity.
981         """
982         return other.namespace == self.namespace and other.name == self.name
983
984     def __str__(self):
985         return '.'.join([self.namespace, self.name])
986
987     def __repr__(self):
988         return '{}(namespace={}, name={})'.format(self.__qualname__, self.namespace, self.name)
989
990
991 def _make_registry_key(*args) -> OperationRegistryKey:
992     """Normalize input to an OperationRegistryKey.
993
994     Used to implement OperationRegistry.__getitem__(), which catches and converts
995     the various exceptions this helper function can produce.
996     """
997     if len(args) > 1:
998         return OperationRegistryKey(*args)
999     else:
1000         if len(args) != 1:
1001             raise exceptions.UsageError('Empty index value passed to OperationRegistry instance[].')
1002         item = args[0]
1003     if isinstance(item, OperationRegistryKey):
1004         return item
1005     if isinstance(item, str):
1006         namespace, name = item.rsplit(sep='.', maxsplit=1)
1007         return OperationRegistryKey(namespace=namespace, name=name)
1008     # Item could be a class object or an instance object...
1009     if hasattr(item, 'namespace') and hasattr(item, 'name'):
1010         if callable(item.namespace):
1011             namespace = item.namespace()
1012         else:
1013             namespace = item.namespace
1014         if callable(item.name):
1015             name = item.name()
1016         else:
1017             name = item.name
1018         return OperationRegistryKey(namespace=namespace, name=name)
1019     raise exceptions.ValueError('Not a usable OperationRegistryKey: {}'.format(item))
1020
1021
1022 class OperationRegistry(collections.UserDict):
1023     """Helper class to map identifiers to Operation implementation instances.
1024
1025     This is an implementation detail of gmxapi.operation and should not be used from
1026     outside of the package until a stable interface can be specified.
1027     """
1028
1029     @typing.overload
1030     def __getitem__(self, item: OperationRegistryKey):
1031         ...
1032
1033     @typing.overload
1034     def __getitem__(self, item: str):
1035         ...
1036
1037     def __getitem__(self, *args):
1038         """Fetch the requested operation registrant.
1039
1040         The getter is overloaded to support look-ups in multiple forms.
1041
1042         The key can be given in the following forms.
1043         * As a period-delimited string of the form "namespace.operation".
1044         * As an OperationRegistryKey object.
1045         * As a sequence of positional arguments accepted by OperationRegistryKey.
1046         * As an object conforming to the OperationRegistryKey interface.
1047         """
1048         try:
1049             item = _make_registry_key(*args)
1050         except exceptions.Error as e:
1051             raise exceptions.TypeError('Could not interpret key as a OperationRegistryKey.') from e
1052         return self.data[item]
1053
1054
1055 # Module level data store for locating operation implementations at run time.
1056 # TODO: This may make sense as instance data of a root Context instance, but we
1057 #  don't have a stable interface for communicating between Contexts yet.
1058 # Alternatively, it could be held as class data in a RegisteredOperation class,
1059 # but note that the "register" member function would behave less like the abc.ABCMeta
1060 # support for "virtual subclassing" and more like the generic function machinery
1061 # of e.g. functools.singledispatch.
1062 _operation_registry = OperationRegistry()
1063
1064
1065 def _register_operation(cls: typing.Type[OperationImplementation]):
1066     assert isinstance(cls, type)
1067     assert issubclass(cls, OperationImplementation)
1068     operation = _make_registry_key(cls)
1069     if operation in _operation_registry:
1070         full_name = str(operation)
1071         raise exceptions.ProtocolError('Attempting to redefine operation {}.'.format(full_name))
1072     _operation_registry[operation] = cls
1073
1074
1075 # TODO: replace with a generic function that we dispatch on so the type checker can infer a return type.
1076 def _get_operation_director(operation, context: gmx.abc.Context):
1077     """
1078
1079     :param operation:
1080     :param context:
1081     :return: gmxapi.abc.OperationDirector
1082     """
1083     registrant = _operation_registry[operation]
1084     director = registrant.director(context=context)
1085     return director
1086
1087
1088 class InputDescription(abc.ABC):
1089     """Node input description for gmxapi.operation module.
1090
1091     Provides the support needed to understand operation inputs in gmxapi.operation
1092     module Contexts.
1093
1094     .. todo:: Implementation base class with heritable behavior and/or helpers to
1095               compose this functionality from more normative description of
1096               operation inputs. This will probably become a facet of the ResourceDirector
1097               when specialized for gmxapi.operation.Context.
1098     """
1099
1100     @abc.abstractmethod
1101     def signature(self) -> InputCollectionDescription:
1102         """Mapping of named inputs and input type.
1103
1104         Used to determine valid inputs before an Operation node is created.
1105
1106         Collaborations:
1107             Related to the operation resource factory for this context.
1108
1109         ..  todo::
1110             Better unification of this protocol, InputCollectionDescription, and
1111             resource factory.
1112             Note, also, that the *bind* method of the returned InputCollectionDescription
1113             serves as the resource factory for input to the node builder.
1114         """
1115         ...
1116
1117     @abc.abstractmethod
1118     def make_uid(self, input: 'DataEdge') -> str:
1119         """The unique identity of an operation node tags the output with respect to the input.
1120
1121         Combines information on the Operation details and the input to uniquely
1122         identify the Operation node.
1123
1124         Arguments:
1125             input : A (collection of) data source(s) that can provide Fingerprints.
1126
1127         Used internally by the Context to manage ownership of data sources, to
1128         locate resources for nodes in work graphs, and to manage serialization,
1129         deserialization, and checkpointing of the work graph.
1130
1131         The UID is a detail of the generic Operation that _should_ be independent
1132         of the Context details to allow the framework to manage when and where
1133         an operation is executed.
1134
1135         TODO: We probably don't want to allow Operations to single-handedly determine their
1136          own uniqueness, but they probably should participate in the determination with the Context.
1137
1138         TODO: Context implementations should be allowed to optimize handling of
1139          equivalent operations in different sessions or work graphs, but we do not
1140          yet guarantee that UIDs are globally unique!
1141
1142         To be refined...
1143         """
1144         ...
1145
1146
1147 class ConcreteInputDescription(InputDescription):
1148     """Simple composed InputDescription."""
1149
1150     def __init__(self,
1151                  input_signature: InputCollectionDescription,
1152                  uid_helper: typing.Callable[['DataEdge'], str]
1153                  ):
1154         self._input_signature_description = input_signature
1155         self._uid_helper = uid_helper
1156
1157     def signature(self) -> InputCollectionDescription:
1158         return self._input_signature_description
1159
1160     def make_uid(self, input: 'DataEdge') -> str:
1161         return self._uid_helper(input)
1162
1163
1164 class OperationMeta(abc.ABCMeta):
1165     """Metaclass to manage the definition of Operation implementation classes.
1166
1167     Note that this metaclass can be superseded by `__init_subclass__()` when
1168     the minimum Python version is increased to Python 3.6+.
1169     """
1170
1171     def __new__(meta, name, bases, class_dict):
1172         cls = super().__new__(meta, name, bases, class_dict)
1173         # Register subclasses, but not the base class.
1174         if issubclass(cls, OperationImplementation) and cls is not OperationImplementation:
1175             # TODO: Remove OperationDetailsBase layer and this extra check.
1176             # Note: we do not yet register the Operations built dynamically because we
1177             # don't have a clear definition of unique implementations yet. For instance,
1178             # a new OperationDetails class is defined for each call to gmx.join_arrays
1179             # TODO: Properly register and reuse Operations defined dynamically
1180             #  through function_wrapper (currently encompassed by OperationDetailsBase subclasses)
1181             if name != 'OperationDetailsBase':
1182                 if OperationDetailsBase not in bases:
1183                     _register_operation(cls)
1184         return cls
1185
1186
1187 class OperationDetailsBase(OperationImplementation, InputDescription,
1188                            metaclass=OperationMeta):
1189     """Abstract base class for Operation details in this module's Python Context.
1190
1191     Provides necessary interface for use with gmxapi.operation.ResourceManager.
1192     Separates the details of an Operation from those of the ResourceManager in
1193     a given Context.
1194
1195     OperationDetails classes are almost stateless, serving mainly to compose implementation
1196     details. Instances (operation objects) provide the Context-dependent interfaces
1197     for a specific node in a work graph.
1198
1199     OperationDetails subclasses are created dynamically by function_wrapper and
1200     make_operation.
1201
1202     Developer note: when subclassing, note that the ResourceManager is responsible
1203     for managing Operation state. Do not add instance data members related to
1204     computation or output state.
1205
1206     TODO: determine what is acceptable instance data and/or initialization information.
1207     Note that currently the subclass in function_wrapper has _no_ initialization input,
1208     but does not yet handle input-dependent output specification or node fingerprinting.
1209     It seems likely that instance initialization will require some characterization of
1210     supplied input, but nothing else. Even that much is not necessary if the instance
1211     is completely stateless, but that would require additional parameters to the member
1212     functions. However, an instance should be tied to a specific ResourceManager and
1213     Context, so weak references to these would be reasonable.
1214     """
1215
1216     @abc.abstractmethod
1217     def output_description(self) -> OutputCollectionDescription:
1218         """Mapping of available outputs and types for an existing Operation node."""
1219         ...
1220
1221     @abc.abstractmethod
1222     def publishing_data_proxy(self, *,
1223                               instance: SourceResource[typing.Any, _PublishingDataProxyType],
1224                               client_id) -> _PublishingDataProxyType:
1225         """Factory for Operation output publishing resources.
1226
1227         Used internally when the operation is run with resources provided by instance."""
1228         ...
1229
1230     @abc.abstractmethod
1231     def output_data_proxy(self,
1232                           instance: SourceResource[_OutputDataProxyType, typing.Any]
1233                           ) -> _OutputDataProxyType:
1234         """Get an object that can provide Futures for output data managed by instance."""
1235         ...
1236
1237     @abc.abstractmethod
1238     def __call__(self, resources: _Resources):
1239         """Execute the operation with provided resources.
1240
1241         Resources are prepared in an execution context with aid of resource_director()
1242
1243         After the first call, output data has been published and is trivially
1244         available through the output_data_proxy()
1245         """
1246         ...
1247
1248     @classmethod
1249     @abc.abstractmethod
1250     def resource_director(cls, *, input, output: _PublishingDataProxyType) -> _Resources[_PublishingDataProxyType]:
1251         """a Director factory that helps build the Session Resources for the function.
1252
1253         The Session launcher provides the director with all of the resources previously
1254         requested/negotiated/registered by the Operation. The director uses details of
1255         the operation to build the resources object required by the operation runner.
1256
1257         For the Python Context, the protocol is for the Context to call the
1258         resource_director instance method, passing input and output containers.
1259         (See, for example, gmxapi.operation.PyFunctionRunnerResources)
1260         """
1261         ...
1262
1263     # TODO: Don't run the director. Just return the correct callable.
1264     @classmethod
1265     def operation_director(cls, *args, context: 'Context', label=None, **kwargs) -> AbstractOperation:
1266         """Dispatching Director for adding a work node.
1267
1268         A Director for input of a particular sort knows how to reconcile
1269         input with the requirements of the Operation and Context node builder.
1270         The Director (using a less flexible / more standard interface)
1271         builds the operation node using a node builder provided by the Context.
1272
1273         This is essentially the creation method, instead of __init__, but the
1274         object is created and owned by the framework, and the caller receives
1275         an OperationHandle instead of a reference to an instance of cls.
1276
1277         # TODO: We need a way to compose this functionality for arbitrary Contexts.
1278         # That likely requires traits on the Contexts, and registration of Context
1279         # implementations. It seems likely that an Operation will register Director
1280         # implementations on import, and dispatching will be moved to the Context
1281         # implementations, which can either find an appropriate OperationDirector
1282         # or raise a compatibility error. To avoid requirements on import order of
1283         # Operations and Context implementations, we can change this to a non-abstract
1284         # dispatching method, requiring registration in the global gmxapi.context
1285         # module, or get rid of this method and use something like pkg_resources
1286         # "entry point" groups for independent registration of Directors and Contexts,
1287         # each annotated with relevant traits. E.g.:
1288         # https://setuptools.readthedocs.io/en/latest/setuptools.html#dynamic-discovery-of-services-and-plugins
1289         """
1290         if not isinstance(context, Context):
1291             raise exceptions.UsageError('Context instance needed for dispatch.')
1292         # TODO: use Context characteristics rather than isinstance checks.
1293         if isinstance(context, ModuleContext):
1294             construct = OperationDirector(*args, operation_details=cls, context=context, label=label, **kwargs)
1295             return construct()
1296         elif isinstance(context, SubgraphContext):
1297             construct = OperationDirector(*args, operation_details=cls, context=context, label=label, **kwargs)
1298             return construct()
1299         else:
1300             raise exceptions.ApiError('Cannot dispatch operation_director for context {}'.format(context))
1301
1302
1303 # TODO: Implement observer pattern for edge->node data flow.
1304 # Step 0: implement subject interface subscribe()
1305 # Step 1: implement subject interface get_state()
1306 # Step 2: implement observer interface update()
1307 # Step 3: implement subject interface notify()
1308 # Step 4: implement observer hook to support substantial change in source that
1309 #         invalidates downstream fingerprinting, such as a new subgraph iteration.
1310 # class Observer(object):
1311 #     """Abstract base class for data observers."""
1312 #     def rebind(self, edge: DataEdge):
1313 #         """Recreate the Operation at the consuming end of the DataEdge."""
1314
1315
1316 class Future(_Future):
1317     """gmxapi data handle.
1318
1319     Future is currently more than a Future right now. (should be corrected / clarified.)
1320     Future is also a facade to other features of the data provider.
1321
1322     Future objects are the most atomic interface between Contexts. User scripts
1323     may hold Futures from which they extract data with result(). Operation output
1324     used as input for another Operation can be decomposed such that the Operation
1325     factory has only Future objects in its input.
1326
1327     TODO: ``subscribe`` method allows consumers to bind as Observers.
1328
1329     TODO: extract the abstract class for input inspection?
1330     Currently abstraction is handled through SourceResource subclassing.
1331
1332     Attributes:
1333         description (ResultDescription): Describes the result to be obtained from this Future.
1334
1335     """
1336
1337     def __init__(self, resource_manager: SourceResource, name: str, description: ResultDescription):
1338         self.name = name
1339         if not isinstance(description, ResultDescription):
1340             raise exceptions.ValueError('Need description of requested data.')
1341         self.description = description  # type: ResultDescription
1342         self.resource_manager = resource_manager
1343
1344         # Deprecated. We should not "reset" futures, but reconstitute them, but we
1345         # need to move the data model to a subscription-based system so that we can
1346         # make Futures properly immutable and issue new ones across subgraph iterations.
1347         self._number_of_resets = 0
1348
1349     def __eq__(self, other):
1350         # This function is defined because we have defined __hash__().
1351         # Before customizing __eq__(), recall that Python objects that compare
1352         # equal should hash to the same value.
1353         # Please keep the two functions semantically correct.
1354         return object.__eq__(self, other)
1355
1356     def __hash__(self):
1357         # We cannot properly determine equivalency beyond the scope of a ResourceManager instance
1358         # without more developed data flow fingerprinting.
1359         return hash((id(self.resource_manager), self.name, self.description, self._number_of_resets))
1360
1361     def __str__(self):
1362         return '<Future: name={}, description={}>'.format(self.name, self.description)
1363
1364     def result(self) -> ResultTypeVar:
1365         """Fetch data to the caller's Context.
1366
1367         Returns an object of the concrete type specified according to
1368         the operation that produces this Result.
1369
1370         Ensemble data are returned as a list. Scalar results or results from single
1371         member ensembles are returned as scalars.
1372         """
1373         self.resource_manager.update_output()
1374         # Return ownership of concrete data
1375         handle = self.resource_manager.get(self.name)
1376
1377         # For intuitive use in non-ensemble cases, we represent data as bare scalars
1378         # when possible. It is easier for users to cast scalars to lists of length 1
1379         # than to introspect their own code to determine if a list of length 1 is
1380         # part of an ensemble or not. The data model will become clearer as we
1381         # develop more robust handling of multidimensional data and data flow topologies.
1382         # In the future,
1383         # we may distinguish between data of shape () and shape (1,), but we will need
1384         # to be careful with semantics. We are already starting to adopt a rule-of-thumb
1385         # that data objects assume the minimum dimensionality necessary unless told
1386         # otherwise, and we could make that a hard rule if it doesn't make other things
1387         # too difficult.
1388         if self.description.width == 1:
1389             return handle.data(member=0)
1390         else:
1391             return handle.data()
1392
1393     def _reset(self):
1394         """Mark the Future "not done" to allow reexecution.
1395
1396         Invalidates cached results, resets "done" markers in data sources, and
1397         triggers _reset recursively.
1398
1399         Note: this is a hack that is inconsistent with the plan of unique mappings
1400         of inputs to outputs, but allows a quick prototype for looping operations.
1401         """
1402         self._number_of_resets += 1
1403         self.resource_manager.reset()
1404
1405     @property
1406     def dtype(self):
1407         return self.description.dtype
1408
1409     def __getitem__(self, item):
1410         """Get a more limited view on the Future."""
1411         description = ResultDescription(dtype=self.dtype, width=self.description.width)
1412         # TODO: Use explicit typing when we have more thorough typing.
1413         description._dtype = None
1414         if self.description.width == 1:
1415             proxy = ProxyResourceManager(self,
1416                                          width=description.width,
1417                                          function=lambda value, key=item: value[key])
1418         else:
1419             proxy = ProxyResourceManager(self,
1420                                          width=description.width,
1421                                          function=lambda value, key=item:
1422                                          [subscriptable[key] for subscriptable in value])
1423         return proxy.future(self.name, description=description)
1424
1425
1426 class OutputDataDescriptor(ProxyDataDescriptor):
1427     """Read-only data descriptor for proxied access to output data.
1428
1429     Knows how to get a Future from the resource manager.
1430     """
1431
1432     # TODO: Reconcile the internal implementation details with the visibility and
1433     #  usages of this class.
1434
1435     def __get__(self, proxy: DataProxyBase, owner):
1436         if proxy is None:
1437             # Access through class attribute of owner class
1438             return self
1439         result_description = ResultDescription(dtype=self._dtype, width=proxy.ensemble_width)
1440
1441         return proxy._resource_instance.future(name=self._name, description=result_description)
1442
1443
1444 class MutableResourceDescriptor(ProxyDataDescriptor):
1445     """Accessor for rich binding interfaces.
1446
1447     Allows operations to access resources beyond the scope of the current
1448     resource manager. Used by operations whose interactions are more complicated
1449     than standard typed data flow at the scope of the current Context.
1450
1451     Instead of a Future interface, the returned object is a MutableResource with
1452     which a subscriber can collaborate with lower-level protocols.
1453     """
1454
1455     def __get__(self, proxy: DataProxyBase, owner) -> typing.Union[MutableResource, 'MutableResourceDescriptor']:
1456         if proxy is None:
1457             # Access through class attribute of owner class. We don't have a
1458             # specified use case for that, so allow inspection of the data
1459             # descriptor instance, itself.
1460             return self
1461         # TODO: implement.
1462         # The protocol for MD extension plugins requires that the simulation operation
1463         # subscribe to the plugin. Then the Context allows the plugin to access the
1464         # MdRunner interface as the simulation is launched.
1465         # The protocol for modify_input and for mdrun to consume the TPR payload
1466         # of read_tpr or modify_input should allow us to use the gmxapi 0.0.7
1467         # WorkSpec to configure and launch a simulation, which we can do by feeding
1468         # forward and building a fused operation at the mdrun node. The information
1469         # fed forward can just be references to the inputs and parameters of the
1470         # earlier operations, with annotations so that we know the intended behavior.
1471
1472
1473 def define_output_data_proxy(output_description: OutputCollectionDescription) -> typing.Type[DataProxyBase]:
1474     descriptors = {name: OutputDataDescriptor(name, description) for name, description in output_description.items()}
1475
1476     class OutputDataProxy(DataProxyBase, descriptors=descriptors):
1477         """Handler for read access to the `output` member of an operation handle.
1478
1479         Acts as a sort of ResultCollection.
1480
1481         A ResourceManager creates an OutputDataProxy instance at initialization to
1482         provide the ``output`` property of an operation handle.
1483         """
1484
1485     # Note: the OutputDataProxy has an inherent ensemble shape in the context
1486     # in which it is used, but that is an instance characteristic, not part of this type definition.
1487     # TODO: (FR5) The current tool does not support topology changing operations.
1488     return OutputDataProxy
1489
1490
1491 # Encapsulate the description of the input data flow.
1492 PyFuncInput = collections.namedtuple('Input', ('args', 'kwargs', 'dependencies'))
1493
1494
1495 class SinkTerminal(object):
1496     """Operation input end of a data edge.
1497
1498     In addition to the information in an InputCollectionDescription, includes
1499     topological information for the Operation node (ensemble width).
1500
1501     Collaborations: Required for creation of a DataEdge. Created with knowledge
1502     of a DataSourceCollection instance and a InputCollectionDescription.
1503     """
1504
1505     # TODO: This clearly maps to a Builder pattern.
1506     # I think we want to get the sink terminal builder from a factory parameterized by InputCollectionDescription,
1507     # add data source collections, and then build the sink terminal for the data edge.
1508     def __init__(self, input_collection_description: InputCollectionDescription):
1509         """Define an appropriate data sink for a new operation node.
1510
1511         Resolve data sources and input description to determine connectability,
1512         topology, and any necessary implicit data transformations.
1513
1514         :param input_collection_description: Available inputs for Operation
1515         :return: Fully formed description of the Sink terminal for a data edge to be created.
1516
1517         Collaborations: Execution Context implementation.
1518         """
1519         self.ensemble_width = 1
1520         self.inputs = input_collection_description
1521
1522     def __str__(self):
1523         return '<SinkTerminal: ensemble_width={}>'.format(self.ensemble_width)
1524
1525     def update_width(self, width: int):
1526         if not isinstance(width, int):
1527             try:
1528                 width = int(width)
1529             except TypeError:
1530                 raise exceptions.TypeError('Need an integer width > 0.')
1531         if width < 1:
1532             raise exceptions.ValueError('Nonsensical ensemble width: {}'.format(int(width)))
1533         if self.ensemble_width != 1:
1534             if width != self.ensemble_width:
1535                 raise exceptions.ValueError(
1536                     'Cannot change ensemble width {} to width {}.'.format(self.ensemble_width, width))
1537         self.ensemble_width = width
1538
1539     def update(self, data_source_collection: DataSourceCollection):
1540         """Update the SinkTerminal with the proposed data provider."""
1541         for name, sink_dtype in self.inputs.items():
1542             if name not in data_source_collection:
1543                 # If/when we accept data from multiple sources, we'll need some additional sanity checking.
1544                 if not hasattr(self.inputs.signature.parameters[name], 'default'):
1545                     raise exceptions.UsageError('No data or default for {}'.format(name))
1546             else:
1547                 # With a single data source, we need data to be in the source or have a default
1548                 assert name in data_source_collection
1549                 assert issubclass(sink_dtype, valid_result_types)
1550                 source = data_source_collection[name]
1551                 logger.debug('Updating Sink for source {}: {}.'.format(name, source))
1552                 if isinstance(source, sink_dtype):
1553                     logger.debug('Source matches sink. No update necessary.')
1554                     continue
1555                 else:
1556                     if isinstance(source, collections.abc.Iterable) and not isinstance(source, (
1557                             str, bytes, collections.abc.Mapping)):
1558                         assert isinstance(source, datamodel.NDArray)
1559                         if sink_dtype != datamodel.NDArray:
1560                             # Source is NDArray, but sink is not. Implicitly scatter.
1561                             self.update_width(len(source))
1562                         continue
1563                     if hasattr(source, 'description'):
1564                         source_description = typing.cast(ResultDescription, source.description)
1565                         source_dtype = source_description.dtype
1566                         assert isinstance(sink_dtype, type)
1567                         # TODO: Handle typing of Future slices when we have a better data model.
1568                         if source_dtype is not None:
1569                             assert isinstance(source_dtype, type)
1570                             if not issubclass(source_dtype, sink_dtype):
1571                                 raise exceptions.TypeError('Expected {} but got {}.'.format(sink_dtype, source_dtype))
1572                         source_width = source.description.width
1573                         self.update_width(source_width)
1574
1575
1576 class DataEdge(object):
1577     """State and description of a data flow edge.
1578
1579     A DataEdge connects a data source collection to a data sink. A sink is an
1580     input or collection of inputs of an operation (or fused operation). An operation's
1581     inputs may be fed from multiple data source collections, but an operation
1582     cannot be fully instantiated until all of its inputs are bound, so the DataEdge
1583     is instantiated at the same time the operation is instantiated because the
1584     required topology of a graph edge may be determined by the required topology
1585     of another graph edge.
1586
1587     A data edge has a well-defined topology only when it is terminated by both
1588     a source and sink. Creation requires that a source collection is compared to
1589     a sink description.
1590
1591     Calling code initiates edge creation by passing well-described data sources
1592     to an operation factory. The data sources may be annotated with explicit scatter
1593     or gather commands.
1594
1595     The resource manager for the new operation determines the
1596     required shape of the sink to handle all of the offered input.
1597
1598     Broadcasting
1599     and transformations of the data sources are then determined and the edge is
1600     established.
1601
1602     At that point, the fingerprint of the input data at each operation
1603     becomes available to the resource manager for the operation. The fingerprint
1604     has sufficient information for the resource manager of the operation to
1605     request and receive data through the execution context.
1606
1607     Instantiating operations and data edges implicitly involves collaboration with
1608     a Context instance. The state of a given Context or the availability of a
1609     default Context through a module function may affect the ability to instantiate
1610     an operation or edge. In other words, behavior may be different for connections
1611     being made in the scripting environment versus the running Session, and implementation
1612     details can determine whether or not new operations or data flow can occur in
1613     different code environments.
1614     """
1615
1616     class ConstantResolver(object):
1617         def __init__(self, value):
1618             self.value = value
1619
1620         def __call__(self, member=None):
1621             return self.value
1622
1623     def __init__(self, source_collection: DataSourceCollection, sink_terminal: SinkTerminal):
1624         # Adapters are callables that transform a source and node ID to local data.
1625         # Every key in the sink has an adapter.
1626         self.adapters = {}
1627         self.source_collection = source_collection
1628         self.sink_terminal = sink_terminal
1629         for name in sink_terminal.inputs:
1630             if name not in source_collection:
1631                 if hasattr(sink_terminal.inputs[name], 'default'):
1632                     self.adapters[name] = self.ConstantResolver(sink_terminal.inputs[name])
1633                 else:
1634                     # TODO: Initialize with multiple DataSourceCollections?
1635                     raise exceptions.ValueError('No source or default for required input "{}".'.format(name))
1636             else:
1637                 source = source_collection[name]
1638                 sink = sink_terminal.inputs[name]
1639                 if isinstance(source, (str, bool, int, float, dict)):
1640                     if issubclass(sink, (str, bool, int, float, dict)):
1641                         self.adapters[name] = self.ConstantResolver(source)
1642                     else:
1643                         assert issubclass(sink, datamodel.NDArray)
1644                         self.adapters[name] = self.ConstantResolver(datamodel.ndarray([source]))
1645                 elif isinstance(source, datamodel.NDArray):
1646                     if issubclass(sink, datamodel.NDArray):
1647                         # TODO: shape checking
1648                         # Implicit broadcast may not be what is intended
1649                         self.adapters[name] = self.ConstantResolver(source)
1650                     else:
1651                         if source.shape[0] != sink_terminal.ensemble_width:
1652                             raise exceptions.ValueError(
1653                                 'Implicit broadcast could not match array source to ensemble sink')
1654                         else:
1655                             self.adapters[name] = lambda member, source=source: source[member]
1656                 elif hasattr(source, 'result'):
1657                     # Handle data futures...
1658                     # If the Future is part of an ensemble, result() will return a list.
1659                     # Otherwise, it will return a single object.
1660                     ensemble_width = source.description.width
1661                     # TODO: subscribe to futures so results can be pushed.
1662                     if ensemble_width == 1:
1663                         self.adapters[name] = lambda member, source=source: source.result()
1664                     else:
1665                         self.adapters[name] = lambda member, source=source: source.result()[member]
1666                 else:
1667                     assert isinstance(source, EnsembleDataSource)
1668                     self.adapters[name] = lambda member, source=source: source.node(member)
1669
1670     def __str__(self):
1671         return '<DataEdge: source_collection={}, sink_terminal={}>'.format(self.source_collection, self.sink_terminal)
1672
1673     def reset(self):
1674         self.source_collection.reset()
1675
1676     def resolve(self, key: str, member: int):
1677         return self.adapters[key](member=member)
1678
1679     def sink(self, node: int) -> dict:
1680         """Consume data for the specified sink terminal node.
1681
1682         Run-time utility delivers data from the bound data source(s) for the
1683         specified terminal that was configured when the edge was created.
1684
1685         Terminal node is identified by a member index number.
1686
1687         Returns:
1688             A Python dictionary of the provided inputs as local data (not Future).
1689         """
1690         results = {}
1691         sink_ports = self.sink_terminal.inputs
1692         for key in sink_ports:
1693             results[key] = self.resolve(key, node)
1694         return results
1695
1696
1697 class ResourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]):
1698     """Provides data publication and subscription services.
1699
1700         Owns the data published by the operation implementation or served to consumers.
1701         Mediates read and write access to the managed data streams.
1702
1703         This ResourceManager implementation is defined in conjunction with a
1704         run-time definition of an Operation that wraps a Python callable (function).
1705         ResourceManager is instantiated with a reference to the callable.
1706
1707         When the Operation is run, the resource manager prepares resources for the wrapped
1708         function. Inputs provided to the Operation factory are provided to the
1709         function as keyword arguments. The wrapped function publishes its output
1710         through the (additional) ``output`` key word argument. This argument is
1711         a short-lived resource, prepared by the ResourceManager, with writable
1712         attributes named in the call to function_wrapper().
1713
1714         After the Operation has run and the outputs published, the data managed
1715         by the ResourceManager is marked "done."
1716
1717         Protocols:
1718
1719         The data() method produces a read-only collection of outputs named for
1720         the Operation when the Operation's ``output`` attribute is accessed.
1721
1722         publishing_resources() can be called once during the ResourceManager lifetime
1723         to provide the ``output`` object for the wrapped function. (Used by update_output().)
1724
1725         update_output() brings the managed output data up-to-date with the input
1726         when the Operation results are needed. If the Operation has not run, an
1727         execution session is prepared with input and output arguments for the
1728         wrapped Python callable. Output is publishable only during this session.
1729
1730     TODO: This functionality should evolve to be a facet of Context implementations.
1731      There should be no more than one ResourceManager instance per work graph
1732      node in a Context. This will soon be at odds with letting the ResourceManager
1733      be owned by an operation instance handle.
1734     TODO: The publisher and data objects can be more strongly defined through
1735      interaction between the Context and clients.
1736
1737     Design notes:
1738
1739     The normative pattern for updating data is to execute a node in the work
1740     graph, passing Resources for an execution Session to an operation runner.
1741     The resources and runner are dependent on the implementation details of
1742     the operation and the execution context, so logical execution may look
1743     like the following.
1744
1745         resource_builder = ResourcesBuilder()
1746         runner_builder = RunnerBuilder()
1747         input_resource_director = input_resource_factory.director(input)
1748         output_resource_director = publishing_resource_factory.director(output)
1749         input_resource_director(resource_builder, runner_builder)
1750         output_resource_director(resource_builder, runner_builder)
1751         resources = resource_builder.build()
1752         runner = runner_builder.build()
1753         runner(resources)
1754
1755     Only the final line is intended to be literal. The preceding code, if it
1756     exists in entirety, may be spread across several code comments.
1757
1758     TODO: Data should be pushed, not pulled.
1759     Early implementations executed operation code and extracted results directly.
1760     While we need to be able to "wait for" results and alert the data provider that
1761     we are ready for input, we want to defer execution management and data flow to
1762     the framework.
1763     """
1764
1765     @contextmanager
1766     def __publishing_context(self, ensemble_member=0) -> typing.Iterator[_PublishingDataProxyType]:
1767         """Get a context manager for resolving the data dependencies of this node.
1768
1769         The returned object is a Python context manager (used to open a `with` block)
1770         to define the scope in which the operation's output can be published.
1771         'output' type resources can be published exactly once, and only while the
1772         publishing context is active. (See operation.function_wrapper())
1773
1774         Used internally to implement ResourceManager.publishing_resources()
1775
1776         Responsibilities of the context manager are to:
1777             * (TODO) Make sure dependencies are resolved.
1778             * Make sure outputs are marked 'done' when leaving the context.
1779
1780         """
1781
1782         # TODO:
1783         # if self._data.done():
1784         #     raise exceptions.ProtocolError('Resources have already been published.')
1785
1786         # I don't think we want the OperationDetails to need to know about ensemble data,
1787         # (though the should probably be allowed to), so we may need a separate interface
1788         # for the resource manager with built-in scope-limiting to a single ensemble member.
1789         # Right now, one Operation handle owns one ResourceManager (which takes care of
1790         # the ensemble details), which owns one OperationDetails (which has no ensemble knowledge).
1791         # It is the responsibility of the calling code to make sure the PublishingDataProxy
1792         # gets used correctly.
1793
1794         # ref: https://docs.python.org/3/library/contextlib.html#contextlib.contextmanager
1795         try:
1796             if not self._done[ensemble_member]:
1797                 resource = self.__publishing_data_proxy(instance=weakref.proxy(self),
1798                                                         client_id=ensemble_member)
1799                 yield resource
1800         except Exception as e:
1801             message = 'Uncaught {} while providing output-publishing resources for {}.'
1802             message.format(repr(e), self.operation_id)
1803             raise exceptions.ApiError(message) from e
1804         finally:
1805             logger.debug('Published output for {} member {}'.format(self.operation_id, ensemble_member))
1806             self._done[ensemble_member] = True
1807
1808     def __init__(self, *,
1809                  source: DataEdge,
1810                  operation_id,
1811                  output_description: OutputCollectionDescription,
1812                  output_data_proxy: typing.Type[_OutputDataProxyType],
1813                  publishing_data_proxy: typing.Type[_PublishingDataProxyType],
1814                  resource_factory,
1815                  runner_director,
1816                  output_context: 'Context'):
1817         """Initialize a resource manager for the inputs and outputs of an operation.
1818         """
1819         # Note: This implementation assumes there is one ResourceManager instance per data source,
1820         # so we only stash the inputs and dependency information for a single set of resources.
1821         # TODO: validate input_fingerprint as its interface becomes clear.
1822         self._input_edge = source
1823         self.ensemble_width = self._input_edge.sink_terminal.ensemble_width
1824
1825         # Node UID.
1826         self.operation_id = operation_id
1827
1828         if isinstance(output_context, Context):
1829             self._output_context = output_context
1830         else:
1831             message = 'Provide an instance of gmxapi.operation.Context for output_context'
1832             raise exceptions.UsageError(message)
1833         assert self._output_context is not None
1834
1835         self._output_data_proxy = output_data_proxy
1836         assert self._output_data_proxy is not None
1837         assert callable(self._output_data_proxy)
1838
1839         self._output_description = output_description
1840         assert self._output_description is not None
1841
1842         self.__publishing_data_proxy = publishing_data_proxy
1843         assert self.__publishing_data_proxy is not None
1844         assert callable(self.__publishing_data_proxy)
1845
1846         self._runner_director = runner_director
1847         assert self._runner_director is not None
1848         self._resource_factory = resource_factory
1849         assert self._resource_factory is not None
1850
1851         self._data = _make_datastore(output_description=self._output_description,
1852                                      ensemble_width=self.ensemble_width)
1853
1854         # We store a rereference to the publishing context manager implementation
1855         # in a data structure that can only produce one per Python interpreter
1856         # (using list.pop()).
1857         # TODO: reimplement as a data descriptor
1858         #  so that PublishingDataProxy does not need a bound circular reference.
1859         self.__publishing_resources = [self.__publishing_context]
1860
1861         self._done = [False] * self.ensemble_width
1862         self.__operation_entrance_counter = 0
1863
1864     def width(self) -> int:
1865         return self.ensemble_width
1866
1867     def reset(self):
1868         self.__operation_entrance_counter = 0
1869         self._done = [False] * self.ensemble_width
1870         self.__publishing_resources = [self.__publishing_context]
1871         for data in self._data.values():
1872             data.reset()
1873         self._input_edge.reset()
1874         assert self.__operation_entrance_counter == 0
1875
1876     def done(self, member=None):
1877         if member is None:
1878             return all(self._done)
1879         else:
1880             return self._done[member]
1881
1882     def set_result(self, name, value, member: int):
1883         if not isinstance(value, (str, bytes)):
1884             try:
1885                 for item in value:
1886                     # In this specification, it is antithetical to publish Futures.
1887                     if hasattr(item, 'result'):
1888                         raise exceptions.ApiError('Operation produced Future instead of real output.')
1889             except TypeError:
1890                 # Ignore when `item` is not iterable.
1891                 pass
1892         self._data[name].set(value=value, member=member)
1893
1894     def is_done(self, name):
1895         return self._data[name].done
1896
1897     def get(self, name: str):
1898         """
1899
1900         Raises exceptions.ProtocolError if requested data is not local yet.
1901         Raises exceptions.ValueError if data is requested for an unknown name.
1902         """
1903         if name not in self._data:
1904             raise exceptions.ValueError('Request for unknown data.')
1905         if not self.is_done(name):
1906             raise exceptions.ProtocolError('Data not ready.')
1907         assert isinstance(self._data[name], OutputData)
1908         return self._data[name]
1909
1910     # TODO: Normalize. This is the no-argument, no-return callable member of an
1911     #  operation handle that dispatches to another Context (the operation implementation).
1912     # TODO: Allow update of a single ensemble member. As written, this always updates all ensemble members. That can
1913     #  be the default behavior, but we don't want to require non-local updates in all cases.
1914     def update_output(self):
1915         """Bring the output of the bound operation up to date.
1916
1917         Execute the bound operation once if and only if it has not
1918         yet been run in the lifetime of this resource manager.
1919
1920         Used internally to implement Futures for the local operation
1921         associated with this resource manager.
1922
1923         TODO: We need a different implementation for an operation whose output
1924          is served by multiple resource managers. E.g. an operation whose output
1925          is available across the ensemble, but which should only be executed on
1926          a single ensemble member.
1927         """
1928         # This code is not intended to be reentrant. We make a modest attempt to
1929         # catch unexpected reentrance, but this is not (yet) intended to be a thread-safe
1930         # resource manager implementation.
1931         # TODO: Handle checking just the ensemble members this resource manager is responsible for.
1932         # TODO: Replace with a managed observer pattern. Update once when input is available in the Context.
1933         if not self.done():
1934             # Note: This check could also be encapsulated in a run_once decorator that
1935             # could even set a data descriptor to change behavior.
1936             self.__operation_entrance_counter += 1
1937             if self.__operation_entrance_counter > 1:
1938                 raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
1939             if not self.done():
1940                 # Note! This is a detail of the ResourceManager in a SerialContext
1941                 # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
1942                 #  operation's library/implementation context.
1943                 publishing_resources = self.publishing_resources()
1944                 for i in range(self.ensemble_width):
1945                     # TODO: rewrite the following expression as a call to a resource factory.
1946                     # TODO: Consider whether the resource_factory behavior should be normalized
1947                     #  to always use `with` blocks to indicate the lifetime of a resource handle.
1948                     #  That implies that an operation handle can expire, but the operation handle
1949                     #  could be "yield"ed
1950                     #  from within the `with` block to keep the resource scope alive until the resulting
1951                     #  generator is exhausted. Not sure what that looks like or what the use case would be.
1952                     with self.local_input(i) as input:
1953                         # Note: Resources are marked "done" by the publishing system
1954                         # before the following context manager finishes exiting.
1955                         with publishing_resources(ensemble_member=i) as output:
1956                             # self._runner(*input.args, output=output, **input.kwargs)
1957                             ####
1958                             # Here we can make _runner a thing that accepts session resources, and
1959                             # is created by specializable builders. Separate out the expression of
1960                             # inputs.
1961                             #
1962                             # resource_builder = OperationDetails.ResourcesBuilder(context)
1963                             # runner_builder = OperationDetails.RunnerBuilder(context)
1964                             # input_resource_director = self._input_resource_factory.director(input)
1965                             # output_resource_director = self._publishing_resource_factory.director(output)
1966                             # input_resource_director(resource_builder, runner_builder)
1967                             # output_resource_director(resource_builder, runner_builder)
1968                             # resources = resource_builder.build()
1969                             # runner = runner_builder.build()
1970                             # runner(resources)
1971                             #
1972                             # This resource factory signature might need to be inverted or broken up
1973                             # into a builder for consistency. I.e.
1974                             # option 1: Make the input and output resources with separate factories and add_resource on
1975                             # the runner builder.
1976                             # option 2: Pass resource_builder to input_director and then output_director.
1977                             resources = self._resource_factory(input=input, output=output)
1978                             runner = self._runner_director(resources)
1979                             runner()
1980             if not self.done():
1981                 raise exceptions.ApiError('update_output implementation failed to update all outputs.')
1982
1983     def future(self, name: str, description: ResultDescription):
1984         """Retrieve a Future for a named output.
1985
1986         Provide a description of the expected result to check for compatibility or
1987         implicit topological conversion.
1988
1989         TODO: (FR5+) Normalize this part of the interface between operation definitions and
1990          resource managers.
1991         """
1992         if not isinstance(name, str) or name not in self._data:
1993             raise exceptions.ValueError('"name" argument must name an output.')
1994         assert description is not None
1995         requested_dtype = description.dtype
1996         available_dtype = self._data[name]._description.dtype
1997         if requested_dtype != available_dtype:
1998             # TODO: framework to check for implicit conversions
1999             message = 'Requested Future of type {} is not compatible with available type {}.'
2000             message = message.format(requested_dtype, available_dtype)
2001             raise exceptions.ApiError(message)
2002         return Future(self, name, description)
2003
2004     def data(self) -> _OutputDataProxyType:
2005         """Get an adapter to the output resources to access results."""
2006         return self._output_data_proxy(self)
2007
2008     @contextmanager
2009     def local_input(self, member: int = None):
2010         """In an API session, get a handle to fully resolved locally available input data.
2011
2012         Execution dependencies are resolved on creation of the context manager. Input data
2013         becomes available in the ``as`` object when entering the context manager, which
2014         becomes invalid after exiting the context manager. Resources allocated to hold the
2015         input data may be released when exiting the context manager.
2016
2017         It is left as an implementation detail whether the context manager is reusable and
2018         under what circumstances one may be obtained.
2019         """
2020         # Localize data
2021         kwargs = self._input_edge.sink(node=member)
2022         assert 'input' not in kwargs
2023
2024         # Check that we have real data
2025         for key, value in kwargs.items():
2026             assert not hasattr(value, 'result')
2027             assert not hasattr(value, 'run')
2028             value_list = []
2029             if isinstance(value, list):
2030                 value_list = value
2031             if isinstance(value, datamodel.NDArray):
2032                 value_list = value._values
2033             if isinstance(value, collections.abc.Mapping):
2034                 value_list = value.values()
2035             assert not isinstance(value_list, Future)
2036             assert not hasattr(value_list, 'result')
2037             assert not hasattr(value_list, 'run')
2038             for item in value_list:
2039                 assert not hasattr(item, 'result')
2040
2041         input_pack = InputPack(kwargs=kwargs)
2042
2043         # Prepare input data structure
2044         # Note: we use 'yield' instead of 'return' for the protocol expected by
2045         # the @contextmanager decorator
2046         yield input_pack
2047
2048     def publishing_resources(self):
2049         """Get a context manager for resolving the data dependencies of this node.
2050
2051         Use the returned object as a Python context manager.
2052         'output' type resources can be published exactly once, and only while the
2053         publishing context is active.
2054
2055         Write access to publishing resources can be granted exactly once during the
2056         resource manager lifetime and conveys exclusive access.
2057         """
2058         return self.__publishing_resources.pop()
2059
2060
2061 class PyFunctionRunnerResources(collections.UserDict):
2062     """Runtime resources for Python functions.
2063
2064     Produced by a ResourceDirector for a particular Operation.
2065     """
2066
2067     def output(self):
2068         if 'output' in self:
2069             return self['output']
2070         else:
2071             return None
2072
2073     def input(self):
2074         return {key: value for key, value in self.items() if key != 'output'}
2075
2076
2077 class PyFunctionRunner(abc.ABC):
2078     def __init__(self, *, function: typing.Callable, output_description: OutputCollectionDescription):
2079         assert callable(function)
2080         self.function = function
2081         self.output_description = output_description
2082
2083     @abc.abstractmethod
2084     def __call__(self, resources: PyFunctionRunnerResources):
2085         self.function(output=resources.output(), **resources.input())
2086
2087
2088 class CapturedOutputRunner(PyFunctionRunner):
2089     """Function runner that captures return value as output.data"""
2090
2091     def __call__(self, resources: PyFunctionRunnerResources):
2092         resources['output'].data = self.function(**resources.input())
2093
2094
2095 class OutputParameterRunner(PyFunctionRunner):
2096     """Function runner that uses output parameter to let function publish output."""
2097
2098     def __call__(self, resources: PyFunctionRunnerResources):
2099         self.function(**resources)
2100
2101
2102 def wrapped_function_runner(function, output_description: OutputCollectionDescription = None) -> PyFunctionRunner:
2103     """Get an adapter for a function to be wrapped.
2104
2105     If the function does not accept a publishing data proxy as an `output`
2106     key word argument, the returned object has a `capture_output` attribute that
2107     must be re-assigned by the calling code before calling the runner. `capture_output`
2108     must be assigned to be a callable that will receive the output of the wrapped
2109     function.
2110
2111     Returns:
2112         Callable with a signature `__call__(*args, **kwargs)` and no return value
2113
2114     Collaborations:
2115         OperationDetails.resource_director assigns the `capture_output` member of the returned object.
2116     """
2117     assert callable(function)
2118     signature = inspect.signature(function)
2119
2120     # Implementation note: this function dispatches an implementation with the
2121     # logic below. A better factoring would be a "chain of responsibility" in
2122     # which the concrete Runners would be tried in sequence and determine internally
2123     # whether to create a runner, raise an error, or defer.
2124
2125     # Determine output details for proper dispatching.
2126     # First check for signature with output parameter.
2127     # TODO FR4: standardize typing
2128     if 'output' in signature.parameters:
2129         if not isinstance(output_description, OutputCollectionDescription):
2130             if not isinstance(output_description, collections.abc.Mapping):
2131                 raise exceptions.UsageError(
2132                     'Function passes output through call argument, but output is not described.')
2133             return OutputParameterRunner(
2134                 function=function,
2135                 output_description=OutputCollectionDescription(**output_description))
2136         else:
2137             return OutputParameterRunner(function=function,
2138                                          output_description=output_description)
2139     # Next try output_description parameter or function return annotation.
2140     else:
2141         if isinstance(output_description, OutputCollectionDescription):
2142             return_type = output_description['data'].gmxapi_datatype
2143         elif output_description is not None:
2144             # output_description should be None for inferred output or
2145             # a singular mapping of the key 'data' to a gmxapi type.
2146             if not isinstance(output_description, collections.abc.Mapping) \
2147                     or set(output_description.keys()) != {'data'}:
2148                 raise exceptions.ApiError(
2149                     'invalid output description for wrapped function: {}'.format(output_description))
2150             if signature.return_annotation != signature.empty:
2151                 if signature.return_annotation != output_description['data']:
2152                     raise exceptions.ApiError(
2153                         'Wrapped function with return-value-capture provided with non-matching output description.')
2154             return_type = output_description['data']
2155         else:
2156             # Use return type inferred from function signature.
2157             return_type = signature.return_annotation
2158         if return_type == signature.empty or return_type is None:
2159             raise exceptions.ApiError('No return annotation or output_description for {}'.format(function))
2160         return CapturedOutputRunner(function=function,
2161                                     output_description=OutputCollectionDescription(data=return_type))
2162
2163
2164 # TODO: Refactor in terms of reference to a node in a Context.
2165 #  ResourceManager is an implementation detail of how the Context
2166 #  manages a node.
2167 class OperationHandle(AbstractOperation[_OutputDataProxyType]):
2168     """Generic Operation handle for dynamically defined operations.
2169
2170     Define a gmxapi Operation for the functionality being wrapped by the enclosing code.
2171
2172     An Operation type definition encapsulates description of allowed inputs
2173     of an Operation. An Operation instance represents a node in a work graph
2174     with uniquely fingerprinted inputs and well-defined output. The implementation
2175     of the operation is a collaboration with the resource managers resolving
2176     data flow for output Futures, which may depend on the execution context.
2177     """
2178
2179     def __init__(self, resource_manager: SourceResource[_OutputDataProxyType, typing.Any]):
2180         """Initialization defines the unique input requirements of a work graph node.
2181
2182         Initialization parameters map to the parameters of the wrapped function with
2183         addition(s) to support gmxapi data flow and deferred execution.
2184
2185         If provided, an ``input`` keyword argument is interpreted as a parameter pack
2186         of base input. Inputs also present as standalone keyword arguments override
2187         values in ``input``.
2188
2189         Inputs that are handles to gmxapi operations or outputs induce data flow
2190         dependencies that the framework promises to satisfy before the Operation
2191         executes and produces output.
2192         """
2193         # TODO: When the resource manager can be kept alive by an enclosing or
2194         #  module-level Context, convert to a weakref.
2195         self.__resource_manager = resource_manager
2196         # The unique identifier for the operation node allows the Context implementation
2197         # to manage the state of the handle. Reproducibility of node_uid is TBD, but
2198         # it must be unique in a Context where it references a different operation node.
2199         self.node_uid = None
2200
2201     @property
2202     def output(self) -> _OutputDataProxyType:
2203         # TODO: We can configure `output` as a data descriptor
2204         #  instead of a property so that we can get more information
2205         #  from the class attribute before creating an instance of OperationDetails.OutputDataProxy.
2206         # The C++ equivalence would probably be a templated free function for examining traits.
2207         return self.__resource_manager.data()
2208
2209     def run(self):
2210         """Make a single attempt to resolve data flow conditions.
2211
2212         This is a public method, but should not need to be called by users. Instead,
2213         just use the `output` data proxy for result handles, or force data flow to be
2214         resolved with the `result` methods on the result handles.
2215
2216         `run()` may be useful to try to trigger computation (such as for remotely
2217         dispatched work) without retrieving results locally right away.
2218
2219         `run()` is also useful internally as a facade to the Context implementation details
2220         that allow `result()` calls to ask for their data dependencies to be resolved.
2221         Typically, `run()` will cause results to be published to subscribing operations as
2222         they are calculated, so the `run()` hook allows execution dependency to be slightly
2223         decoupled from data dependency, as well as to allow some optimizations or to allow
2224         data flow to be resolved opportunistically. `result()` should not call `run()`
2225         directly, but should cause the resource manager / Context implementation to process
2226         the data flow graph.
2227
2228         In one conception, `run()` can have a return value that supports control flow
2229         by itself being either runnable or not. The idea would be to support
2230         fault tolerance, implementations that require multiple iterations / triggers
2231         to complete, or looping operations.
2232         """
2233         # Note: `run()` is a synonym for `resolve` or `update` or whatever we choose
2234         #  to generically describe the request to bring a node up-to-date: i.e. the
2235         #  non-returning callable on the object produced by a director.
2236         self.__resource_manager.update_output()
2237
2238
2239 class OperationPlaceholder(AbstractOperation):
2240     """Placeholder for Operation handle during subgraph definition."""
2241
2242     def __init__(self, subgraph_resource_manager):
2243         ...
2244
2245     def run(self):
2246         raise exceptions.UsageError('This placeholder operation handle is not in an executable context.')
2247
2248     @property
2249     def output(self):
2250         """Allow subgraph components to be connected without instantiating actual operations."""
2251         if not isinstance(current_context(), SubgraphContext):
2252             raise exceptions.UsageError('Invalid access to subgraph internals.')
2253
2254
2255 _HandleType = typing.TypeVar('_HandleType', bound=gmx.abc.OperationReference)
2256
2257
2258 class NodeBuilder(gmx.abc.NodeBuilder):
2259     """Add an operation node to be managed by a Context.
2260
2261     The NodeBuilder interface implies minimal internal logic, and instead
2262     specifies the set of information that must or may be provided to construct
2263     a node.
2264     """
2265
2266     def __init__(self,
2267                  context: 'Context',
2268                  operation,
2269                  label: typing.Optional[str] = None):
2270         """Initialize the base NodeBuilder for gmxapi.operation module Nodes.
2271
2272         TODO:
2273             Convert the *operation* argument to be the registrant in the Operation registry.
2274             Requires confirmation of conforming behavior for dynamically defined operations.
2275
2276         """
2277         self.context = context
2278         self.label = label
2279         try:
2280             key = _make_registry_key(operation)
2281         except Exception as e:
2282             error = 'Could not create an operation registry key from {}'.format(operation)
2283             raise exceptions.ValueError(error) from e
2284         else:
2285             # TODO: sensibly handle dynamically defined operations.
2286             if key not in _operation_registry and not issubclass(operation, OperationDetailsBase):
2287                 error = '{} must be initialized with a registered operation. Got {}.'
2288                 raise exceptions.ValueError(error.format(__class__.__qualname__, operation))
2289         self.sources = DataSourceCollection()
2290         self._input_description = None
2291         self._resource_factory = None
2292         self._runner_director = None
2293         self._handle = None
2294         self._output_factory = None
2295
2296         self._resource_manager = ResourceManager
2297
2298     def set_input_description(self, input_description: InputDescription):
2299         self._input_description = input_description
2300
2301     def set_resource_factory(self, factory):
2302         self._resource_factory = factory
2303
2304     def set_runner_director(self, factory):
2305         self._runner_director = factory
2306
2307     def set_handle(self, factory):
2308         self._handle = factory
2309
2310     def set_output_factory(self, factory: 'OutputFactory'):
2311         self._output_factory = factory
2312
2313     def set_resource_manager(self, implementation: typing.Type[ResourceManager]):
2314         """Allow overriding the default ResourceManager implementation.
2315
2316         This is a workaround until we figure out what parts of the ResourceManager
2317         could be composed, registered separately with the Context, or be customized
2318         through other dispatching. One likely part of the solution is for clients
2319         of the NodeBuilder to assert requirements of the Context.
2320         """
2321         assert issubclass(implementation, ResourceManager)
2322         self._resource_manager = implementation
2323
2324     # TODO: Let the Context use the handle factory to produce a dynamically typed handle,
2325     #  and figure out the right way to annotate the return value.
2326     def build(self):
2327         """Create node and return a handle of the appropriate type."""
2328
2329         # Check for the ability to instantiate operations.
2330         missing_details = list()
2331         for builder_resource in ['input_description',
2332                                  'resource_factory',
2333                                  'runner_director',
2334                                  'handle',
2335                                  'output_factory']:
2336             detail = '_' + builder_resource
2337             if getattr(self, detail, None) is None:
2338                 missing_details.append(builder_resource)
2339         if len(missing_details) > 0:
2340             raise exceptions.UsageError(
2341                 'Missing details needed for operation node: {}'.format(
2342                     ', '.join(missing_details)
2343                 ))
2344
2345         assert hasattr(self._input_description, 'signature')
2346         input_sink = SinkTerminal(self._input_description.signature())
2347         input_sink.update(self.sources)
2348         logger.debug('SinkTerminal configured: {}'.format(SinkTerminal))
2349         edge = DataEdge(self.sources, input_sink)
2350         logger.debug('Created data edge {} with Sink {}'.format(edge, edge.sink_terminal))
2351         # TODO: Fingerprinting: Each operation instance has unique output based on the unique input.
2352         #            input_data_fingerprint = edge.fingerprint()
2353
2354         # Set up output proxy.
2355         assert hasattr(self._input_description, 'make_uid')
2356         uid = self._input_description.make_uid(edge)
2357         # TODO: ResourceManager should fetch the relevant factories from the Context
2358         #  instead of getting an OperationDetails instance.
2359         output_data_proxy = self._output_factory.output_proxy()
2360         output_description = self._output_factory.output_description()
2361         publishing_data_proxy = self._output_factory.publishing_data_proxy()
2362         manager = self._resource_manager(output_context=self.context,
2363                                          source=edge,
2364                                          operation_id=uid,
2365                                          output_data_proxy=output_data_proxy,
2366                                          output_description=output_description,
2367                                          publishing_data_proxy=publishing_data_proxy,
2368                                          resource_factory=self._resource_factory,
2369                                          runner_director=self._runner_director)
2370         self.context.work_graph[uid] = manager
2371         # TODO: Replace with a call to Node.handle()
2372         handle = self._handle(self.context.work_graph[uid])
2373         handle.node_uid = uid
2374         return handle
2375
2376     def add_input(self, name, source):
2377         # TODO: We can move some input checking here as the data model matures.
2378         self.sources[name] = source
2379
2380
2381 class InputPack(object):
2382     """Input container for data sources provided to resource factories.
2383
2384     When gmxapi.operation Contexts provide run time inputs to operations,
2385     instances of this class are provided to the operation's registered
2386     Resource factory.
2387
2388     Attributes:
2389         kwargs (dict): collection of named data sources.
2390     """
2391
2392     def __init__(self, kwargs: typing.Mapping[str, SourceTypeVar]):
2393         self.kwargs = kwargs
2394
2395
2396 class Context(gmx.abc.Context):
2397     """API Context.
2398
2399     All gmxapi data and operations are owned by a Context instance. The Context
2400     manages the details of how work is run and how data is managed.
2401
2402     Context implementations are not required to inherit from gmxapi.context.Context,
2403     but this class definition serves to specify the current Context API.
2404
2405     If subclassing is used to implement new Contexts, be sure to initialize the
2406     base class when providing a new __init__
2407     """
2408
2409     def node(self, node_id) -> Node:
2410         if node_id in self.labels:
2411             return self.labels[node_id]
2412         elif node_id in self.work_graph:
2413             return self.work_graph[node_id]
2414         else:
2415             raise exceptions.ValueError('Could not find a node identified by {}'.format(node_id))
2416
2417     def __init__(self):
2418         self.operations = dict()
2419         self.labels = dict()
2420         self.work_graph = collections.OrderedDict()
2421
2422     @abc.abstractmethod
2423     def node_builder(self, *, operation,
2424                      label: typing.Optional[str] = None) -> NodeBuilder:
2425         """Get a builder for a new work graph node.
2426
2427         Nodes are elements of computational work, with resources and execution
2428         managed by the Context. The Context handles parallelism resources, data
2429         placement, work scheduling, and data flow / execution dependencies.
2430
2431         This method is used by Operation director code and helper functions to
2432         add work to the graph.
2433
2434         Arguments:
2435             operation: a registered gmxapi operation
2436             label: optional user-provided identifier to provide human-readable node locators.
2437
2438         """
2439         ...
2440     # TODO: *node()* accessor.
2441     # @abc.abstractmethod
2442     # def node(self, node_identifier) -> AbstractOperation:
2443     #     ...
2444
2445
2446 class ModuleNodeBuilder(NodeBuilder):
2447     """Builder for work nodes in gmxapi.operation.ModuleContext."""
2448
2449
2450 class ModuleContext(Context):
2451     """Context implementation for the gmxapi.operation module.
2452
2453     """
2454     __version__ = 0
2455
2456     def node_builder(self, operation, label=None) -> NodeBuilder:
2457         """Get a builder for a new work node to add an operation in this context."""
2458         if label is not None:
2459             if label in self.labels:
2460                 raise exceptions.ValueError('Label {} is already in use.'.format(label))
2461             else:
2462                 # The builder should update the labeled node when it is done.
2463                 self.labels[label] = None
2464
2465         return ModuleNodeBuilder(context=weakref.proxy(self), operation=operation, label=label)
2466
2467
2468 # Context stack.
2469 __current_context = [ModuleContext()]
2470
2471
2472 def current_context() -> Context:
2473     """Get a reference to the currently active Context.
2474
2475     The imported gmxapi.context module maintains some state for the convenience
2476     of the scripting environment. Internally, all gmxapi activity occurs under
2477     the management of an API Context, explicitly or implicitly. Some actions or
2478     code constructs will generate separate contexts or sub-contexts. This utility
2479     command retrieves a reference to the currently active Context.
2480     """
2481     return __current_context[-1]
2482
2483
2484 def push_context(context) -> Context:
2485     """Enter a sub-context by pushing a context to the global context stack.
2486     """
2487     __current_context.append(context)
2488     return current_context()
2489
2490
2491 def pop_context() -> Context:
2492     """Exit the current Context by popping it from the stack."""
2493     return __current_context.pop()
2494
2495
2496 class OutputFactory(object):
2497     """Encapsulate the details of Operation output implementation in the gmxapi.operation Context.
2498
2499     Currently, OutputFactory objects are containers that compose functionality
2500     with which to implement the required internal interface.
2501     """
2502
2503     def __init__(self, *,
2504                  output_proxy: typing.Callable[[SourceResource], _OutputDataProxyType],
2505                  output_description: OutputCollectionDescription,
2506                  publishing_data_proxy: typing.Callable[[SourceResource, ClientID], _PublishingDataProxyType]):
2507         """Package the output details for an operation.
2508
2509         Arguments:
2510             output_proxy: factory to produce the *output* facet of an operation instance (node)
2511             output_description: fully formed output description
2512             publishing_data_proxy: factory to produce the run time output publishing resources
2513
2514         """
2515         if not callable(output_proxy):
2516             raise exceptions.ValueError('output_proxy argument must be a callable.')
2517         if not callable(publishing_data_proxy):
2518             raise exceptions.ValueError('publishing_data_proxy argument must be a callable.')
2519         if not isinstance(output_description, OutputCollectionDescription):
2520             raise exceptions.ValueError('output_description must be an instance of '
2521                                         'gmxapi.operation.OutputCollectionDescription')
2522         self._output_proxy = output_proxy
2523         self._output_description = output_description
2524         self._publishing_data_proxy = publishing_data_proxy
2525
2526     def output_proxy(self) -> typing.Callable[[_OutputDataProxyType, SourceResource], _OutputDataProxyType]:
2527         return self._output_proxy
2528
2529     def output_description(self) -> OutputCollectionDescription:
2530         return self._output_description
2531
2532     def publishing_data_proxy(self) -> typing.Callable[[SourceResource, ClientID],
2533                                                        _PublishingDataProxyType]:
2534         return self._publishing_data_proxy
2535
2536
2537 # TODO: Refactor in terms of gmx.abc.OperationDirector[_Op, gmx.operation.Context]
2538 # Normalizing this OperationDirector may require other updates to the function_wrapper facilities.
2539 class OperationDirector(object):
2540     """Direct the construction of an operation node in the gmxapi.operation module Context.
2541
2542     Collaboration: used by OperationDetails.operation_director, which
2543     will likely dispatch to different implementations depending on
2544     requirements of work or context.
2545     """
2546
2547     def __init__(self,
2548                  *args,
2549                  operation_details: typing.Type[OperationDetailsBase],
2550                  context: Context,
2551                  label=None,
2552                  **kwargs):
2553         self.operation_details = operation_details
2554         self.context = weakref.proxy(context)
2555         self.args = args
2556         self.kwargs = kwargs
2557         self.label = label
2558
2559     def __call__(self) -> AbstractOperation:
2560         builder = self.context.node_builder(operation=self.operation_details, label=self.label)
2561
2562         builder.set_resource_factory(self.operation_details.resource_director)
2563         builder.set_input_description(self.operation_details)
2564         builder.set_handle(OperationHandle)
2565
2566         operation_details = self.operation_details()
2567         node_input_factory = operation_details.signature().bind
2568         data_source_collection = node_input_factory(*self.args, **self.kwargs)
2569         for name, source in data_source_collection.items():
2570             builder.add_input(name, source)
2571
2572         def runner_director(resources):
2573             def runner():
2574                 operation_details(resources)
2575             return runner
2576
2577         builder.set_runner_director(runner_director)
2578         # Note that the call-backs held by OutputFactory cannot be annotated with
2579         # key word arguments under PEP 484, producing a weak warning in some cases.
2580         # We can consider more in the future how to balance call-back simplicity,
2581         # type annotation, and key word explicitness in helper functions like these.
2582         output_factory = OutputFactory(output_description=operation_details.output_description(),
2583                                        output_proxy=operation_details.output_data_proxy,
2584                                        publishing_data_proxy=operation_details.publishing_data_proxy)
2585         builder.set_output_factory(output_factory)
2586
2587         handle = builder.build()
2588         return handle
2589
2590
2591 def _make_datastore(output_description: OutputCollectionDescription, ensemble_width: int):
2592     """Create the data store for an operation with the described output.
2593
2594     Create a container to hold the resources for an operation node.
2595     Used internally by the resource manager when setting up the node.
2596     Evolution of the C++ framework for creating the Operation SessionResources
2597     object will inform the future of this and the resource_director method, but
2598     this data store is how the Context manages output data sources for resources
2599     that it manages.
2600     """
2601
2602     datastore = collections.OrderedDict()
2603     for name, dtype in output_description.items():
2604         assert isinstance(dtype, type)
2605         result_description = ResultDescription(dtype=dtype, width=ensemble_width)
2606         datastore[name] = OutputData(name=name, description=result_description)
2607     return datastore
2608
2609
2610 # TODO: For outputs, distinguish between "results" and "events".
2611 #  Both are published to the resource manager in the same way, but the relationship
2612 #  with subscribers is potentially different.
2613 def function_wrapper(output: dict = None):
2614     # Suppress warnings in the example code.
2615     # noinspection PyUnresolvedReferences
2616     """Generate a decorator for wrapped functions with signature manipulation.
2617
2618     New function accepts the same arguments, with additional arguments required by
2619     the API.
2620
2621     The new function returns an object with an `output` attribute containing the named outputs.
2622
2623     Example:
2624
2625         >>> @function_wrapper(output={'spam': str, 'foo': str})
2626         ... def myfunc(parameter: str = None, output=None):
2627         ...    output.spam = parameter
2628         ...    output.foo = parameter + ' ' + parameter
2629         ...
2630         >>> operation1 = myfunc(parameter='spam spam')
2631         >>> assert operation1.output.spam.result() == 'spam spam'
2632         >>> assert operation1.output.foo.result() == 'spam spam spam spam'
2633
2634     If 'output' is provided to the wrapper, a data structure will be passed to
2635     the wrapped functions with the named attributes so that the function can easily
2636     publish multiple named results. Otherwise, the `output` of the generated operation
2637     will just capture the return value of the wrapped function.
2638
2639     .. todo:: gmxapi typing stub file(s).
2640               The way this wrapper uses parameter annotations is not completely
2641               compatible with static type checking (PEP 484). If we decide to
2642               keep the convenience functionality by which operation details are
2643               inferred from parameter annotations, we should provide a separate
2644               stub file (.pyi) to support static type checking of the API.
2645     """
2646
2647     if output is not None and not isinstance(output, collections.abc.Mapping):
2648         raise exceptions.TypeError('If provided, `output` argument must be a mapping of data names to types.')
2649
2650     # TODO: (FR5+) gmxapi operations need to allow a context-dependent way to generate an implementation with input.
2651     # This function wrapper reproduces the wrapped function's kwargs, but does not allow chaining a
2652     # dynamic `input` kwarg and does not dispatch according to a `context` kwarg. We should allow
2653     # a default implementation and registration of alternate implementations. We don't have to do that
2654     # with functools.singledispatch, but we could, if we add yet another layer to generate a wrapper
2655     # that takes the context as the first argument. (`singledispatch` inspects the first argument rather
2656     # that a named argument)
2657
2658     # Implementation note: The closure of the current function is used to
2659     # dynamically define several classes that support the operation to be
2660     # created by the returned decorator.
2661
2662     def decorator(function) -> typing.Callable:
2663         # Explicitly capture `function` and `output` references.
2664         provided_output_map = output
2665
2666         # Note: Allow operations to be defined entirely in template headers to facilitate
2667         # compile-time optimization of fused operations. Consider what distinction, if any,
2668         # exists between a fused operation and a more basic operation. Probably it amounts
2669         # to aspects related to interaction with the Context that get combined in a fused
2670         # operation, such as the resource director, builder, etc.
2671         class OperationDetails(OperationDetailsBase):
2672             # Warning: function.__qualname__ is not rigorous since function may be in a local scope.
2673             # TODO: Improve base identifier.
2674             # Suggest registering directly in the Context instead of in this local class definition.
2675             __basename = '.'.join((str(function.__module__), function.__qualname__))
2676             __last_uid = 0
2677             _input_signature_description = InputCollectionDescription.from_function(function)
2678             # TODO: Separate the class and instance logic for the runner.
2679             # Logically, the runner is a detail of a context-specific implementation class,
2680             # though the output is not generally fully knowable until an instance is initialized
2681             # for a certain input fingerprint.
2682             # Note: We are almost at a point where this class can be subsumed into two
2683             # possible return types for wrapped_function_runner, acting as an operation helper.
2684             _runner = wrapped_function_runner(function, provided_output_map)
2685             _output_description = _runner.output_description
2686             _output_data_proxy_type = define_output_data_proxy(_output_description)
2687             _publishing_data_proxy_type = define_publishing_data_proxy(_output_description)
2688             _SourceResource = SourceResource[_output_data_proxy_type, _publishing_data_proxy_type]
2689
2690             @classmethod
2691             def name(cls) -> str:
2692                 return cls.__basename.split('.')[-1]
2693
2694             @classmethod
2695             def namespace(cls) -> str:
2696                 return cls.__basename.rstrip('.' + cls.name())
2697
2698             @classmethod
2699             def director(cls, context: _Context):
2700                 return cls.operation_director
2701
2702             @classmethod
2703             def signature(cls) -> InputCollectionDescription:
2704                 """Mapping of named inputs and input type.
2705
2706                 Used to determine valid inputs before an Operation node is created.
2707
2708                 Overrides OperationDetailsBase.signature() to provide an
2709                 implementation for the bound operation.
2710                 """
2711                 return cls._input_signature_description
2712
2713             def output_description(self) -> OutputCollectionDescription:
2714                 """Mapping of available outputs and types for an existing Operation node.
2715
2716                 Overrides OperationDetailsBase.output_description() to provide an
2717                 implementation for the bound operation.
2718                 """
2719                 return self._output_description
2720
2721             def publishing_data_proxy(self, *,
2722                                       instance: _SourceResource,
2723                                       client_id: int
2724                                       ) -> _publishing_data_proxy_type:
2725                 """Factory for Operation output publishing resources.
2726
2727                 Used internally when the operation is run with resources provided by instance.
2728
2729                 Overrides OperationDetailsBase.publishing_data_proxy() to provide an
2730                 implementation for the bound operation.
2731                 """
2732                 assert isinstance(instance, ResourceManager)
2733                 return self._publishing_data_proxy_type(instance=instance, client_id=client_id)
2734
2735             def output_data_proxy(self, instance: _SourceResource) -> _output_data_proxy_type:
2736                 assert isinstance(instance, ResourceManager)
2737                 return self._output_data_proxy_type(instance=instance)
2738
2739             def __call__(self, resources: PyFunctionRunnerResources):
2740                 """Execute the operation with provided resources.
2741
2742                 Resources are prepared in an execution context with aid of resource_director()
2743
2744                 After the first call, output data has been published and is trivially
2745                 available through the output_data_proxy()
2746
2747                 Overrides OperationDetailsBase.__call__().
2748                 """
2749                 self._runner(resources)
2750
2751             @classmethod
2752             def make_uid(cls, input):
2753                 """The unique identity of an operation node tags the output with respect to the input.
2754
2755                 Combines information on the Operation details and the input to uniquely
2756                 identify the Operation node.
2757
2758                 Arguments:
2759                     input : A (collection of) data source(s) that can provide Fingerprints.
2760
2761                 Used internally by the Context to manage ownership of data sources, to
2762                 locate resources for nodes in work graphs, and to manage serialization,
2763                 deserialization, and checkpointing of the work graph.
2764
2765                 The UID is a detail of the generic Operation that _should_ be independent
2766                 of the Context details to allow the framework to manage when and where
2767                 an operation is executed.
2768
2769                 Design notes on further refinement:
2770                     TODO: Operations should not single-handedly determine their own uniqueness
2771                     (but they should participate in the determination with the Context).
2772
2773                     Context implementations should be allowed to optimize handling of
2774                     equivalent operations in different sessions or work graphs, but we do not
2775                     yet TODO: guarantee that UIDs are globally unique!
2776
2777                     The UID should uniquely indicate an operation node based on that node's input.
2778                     We need input fingerprinting to identify equivalent nodes in a work graph
2779                     or distinguish nodes across work graphs.
2780
2781                 """
2782                 uid = str(cls.__basename) + str(cls.__last_uid)
2783                 cls.__last_uid += 1
2784                 return uid
2785
2786             @classmethod
2787             def resource_director(cls, *, input=None,
2788                                   output: _publishing_data_proxy_type = None) -> PyFunctionRunnerResources:
2789                 """a Director factory that helps build the Session Resources for the function.
2790
2791                 The Session launcher provides the director with all of the resources previously
2792                 requested/negotiated/registered by the Operation. The director uses details of
2793                 the operation to build the resources object required by the operation runner.
2794
2795                 For the Python Context, the protocol is for the Context to call the
2796                 resource_director instance method, passing input and output containers.
2797                 """
2798                 resources = PyFunctionRunnerResources()
2799                 resources.update(input.kwargs)
2800                 resources.update({'output': output})
2801
2802                 # TODO: Remove this hack when we can better handle Futures of Containers and Future slicing.
2803                 for name in resources:
2804                     if isinstance(resources[name], (list, tuple)):
2805                         resources[name] = datamodel.ndarray(resources[name])
2806
2807                 # Check data compatibility
2808                 for name, value in resources.items():
2809                     if name != 'output':
2810                         expected = cls.signature()[name]
2811                         got = type(value)
2812                         if got != expected:
2813                             raise exceptions.TypeError('Expected {} but got {}.'.format(expected, got))
2814                 return resources
2815
2816         # TODO: (FR4) Update annotations with gmxapi data types. E.g. return -> Future.
2817         @functools.wraps(function)
2818         def helper(*args, context=None, **kwargs):
2819             # Description of the Operation input (and output) occurs in the
2820             # decorator closure. By the time this factory is (dynamically) defined,
2821             # the OperationDetails and ResourceManager are well defined, but not
2822             # yet instantiated.
2823             # Inspection of the offered input occurs when this factory is called,
2824             # and OperationDetails, ResourceManager, and Operation are instantiated.
2825
2826             # This operation factory is specialized for the default package Context.
2827             if context is None:
2828                 context = current_context()
2829             else:
2830                 raise exceptions.ApiError('Non-default context handling not implemented.')
2831
2832             # This calls a dispatching function that may not be able to reconcile the input
2833             # and Context capabilities. This is the place to handle various exceptions for
2834             # whatever reasons this reconciliation cannot occur.
2835             handle = OperationDetails.operation_director(*args, context=context, label=None, **kwargs)
2836
2837             # TODO: NOW: The input fingerprint describes the provided input
2838             # as (a) ensemble input, (b) static, (c) future. By the time the
2839             # operation is instantiated, the topology of the node is known.
2840             # When compared to the InputCollectionDescription, the data compatibility
2841             # can be determined.
2842
2843             return handle
2844
2845         # to do: The factory itself needs to be able to register a factory with
2846         # the context that will be responsible for the Operation handle.
2847         # The factories need to be able to serve as dispatchers for themselves,
2848         # since an operation in one context may need to be reconstituted in a
2849         # different context.
2850         # The dispatching factory produces a director for a Context,
2851         # which will register a factory with the operation in that context.
2852
2853         # The factory function has a DirectorFactory. Director instances talk to a NodeBuilder for a Context to
2854         # get handles to new operation nodes managed by the context. Part of that process includes registering
2855         # a DirectorFactory with the Context.
2856         return helper
2857
2858     return decorator
2859
2860
2861 class GraphVariableDescriptor(object):
2862     def __init__(self, name: str = None, dtype=None, default=None):
2863         self.name = name
2864         self.dtype = dtype
2865         self.default = default
2866         self.state = None
2867
2868     @property
2869     def internal_name(self):
2870         try:
2871             return '_' + self.name
2872         except TypeError:
2873             return None
2874
2875     def __get__(self, instance, owner):
2876         if instance is None:
2877             # Access is through the class attribute of the owning class.
2878             # Allows the descriptor itself to be inspected or reconfigured after
2879             # class definition.
2880             # TODO: There is probably some usage checking that can be performed here.
2881             return self
2882         try:
2883             value = getattr(instance, self.internal_name)
2884         except AttributeError:
2885             value = self.default
2886             # Lazily initialize the instance value from the class default.
2887             if value is not None:
2888                 try:
2889                     setattr(instance, self.internal_name, value)
2890                 except Exception as e:
2891                     message = 'Could not assign default value to {} attribute of {}'.format(
2892                         self.internal_name,
2893                         instance)
2894                     raise exceptions.ApiError(message) from e
2895         return value
2896
2897     # Implementation note: If we have a version of the descriptor class with no `__set__` method,
2898     # it is a non-data descriptor that can be overridden by a data descriptor on an instance.
2899     # This could be one way to handle the polymorphism associated with state changes.
2900     def __set__(self, instance, value):
2901         if instance._editing:
2902             # Update the internal connections defining the subgraph.
2903             setattr(instance, self.internal_name, value)
2904         else:
2905             raise AttributeError('{} not assignable on {}'.format(self.name, instance))
2906
2907
2908 class GraphMeta(type):
2909     """Meta-class for gmxapi data flow graphs and subgraphs.
2910
2911     Used to implement ``subgraph`` as GraphMeta.__new__(...).
2912     Also allows subgraphs to be defined as Python class definitions by inheriting
2913     from Subgraph or by using the ``metaclass=GraphMeta`` hint in the class
2914     statement arguments.
2915
2916     The Python class creation protocol allows Subgraphs to be defined in as follows.
2917
2918     See the Subgraph class documentation for customization of instances through
2919     the Python context manager protocol.
2920     """
2921     _prepare_keywords = ('variables',)
2922
2923     # TODO: Python 3.7.2 introduces typing.OrderedDict
2924     # In practice, we are using collections.OrderedDict, but we should use the generic
2925     # ABC from the typing module to avoid being overly restrictive with type hints.
2926     try:
2927         from typing import OrderedDict
2928     except ImportError:
2929         from collections import OrderedDict
2930
2931     @classmethod
2932     def __prepare__(mcs, name, bases, variables: OrderedDict = None, **kwargs):
2933         """Prepare the class namespace.
2934
2935         Keyword Args:
2936               variables: mapping of persistent graph variables to type / default value (optional)
2937         """
2938         # Python runs this before executing the class body of Subgraph or its
2939         # subclasses. This is our chance to handle key word arguments given in the
2940         # class declaration.
2941
2942         if kwargs is not None:
2943             for keyword in kwargs:
2944                 raise exceptions.UsageError('Unexpected key word argument: {}'.format(keyword))
2945
2946         namespace = collections.OrderedDict()
2947
2948         if variables is not None:
2949             if isinstance(variables, collections.abc.Mapping):
2950                 for name, value in variables.items():
2951                     if isinstance(value, type):
2952                         dtype = value
2953                         if hasattr(value, 'default'):
2954                             default = value.default
2955                         else:
2956                             default = None
2957                     else:
2958                         default = value
2959                         if hasattr(default, 'dtype'):
2960                             dtype = default.dtype
2961                         else:
2962                             dtype = type(default)
2963                     namespace[name] = GraphVariableDescriptor(name, default=default, dtype=dtype)
2964                     # Note: we are not currently using the hook used by `inspect`
2965                     # to annotate with the class that defined the attribute.
2966                     # namespace[name].__objclass__ = mcs
2967                     assert not hasattr(namespace[name], '__objclass__')
2968             else:
2969                 raise exceptions.ValueError('"variables" must be a mapping of graph variables to types or defaults.')
2970
2971         return namespace
2972
2973     def __new__(cls, name, bases, namespace, **kwargs):
2974         for key in kwargs:
2975             if key not in GraphMeta._prepare_keywords:
2976                 raise exceptions.ApiError('Unexpected class creation keyword: {}'.format(key))
2977         return type.__new__(cls, name, bases, namespace)
2978
2979     # TODO: This is keyword argument stripping is not necessary in more recent Python versions.
2980     # When Python minimum required version is increased, check if we can remove this.
2981     def __init__(cls, name, bases, namespace, **kwargs):
2982         for key in kwargs:
2983             if key not in GraphMeta._prepare_keywords:
2984                 raise exceptions.ApiError('Unexpected class initialization keyword: {}'.format(key))
2985         super().__init__(name, bases, namespace)
2986
2987
2988 class SubgraphNodeBuilder(NodeBuilder):
2989
2990     def __init__(self,
2991                  context: 'SubgraphContext',
2992                  operation,
2993                  label: typing.Optional[str] = None):
2994         super().__init__(context, operation, label)
2995
2996     def add_input(self, name: str, source):
2997         """Add an input resource for the Node under construction.
2998
2999         Extends NodeBuilder.add_input()
3000         """
3001         # Inspect inputs.
3002         #  * Are they from outside the subgraph?
3003         #  * Subgraph variables?
3004         #  * Subgraph internal nodes?
3005         # Inputs from outside the subgraph are (provisionally) subgraph inputs.
3006         # Inputs that are subgraph variables or subgraph internal nodes mark operations that will need to be re-run.
3007         # For first implementation, let all operations be recreated, but we need to
3008         # manage the right input proxies.
3009         # For zeroeth implementation, try just tracking the entities that need a reset() method called.
3010         assert isinstance(self.context, SubgraphContext)
3011         if hasattr(source, 'reset'):
3012             self.context.add_resetter(source.reset)
3013         elif hasattr(source, '_reset'):
3014             self.context.add_resetter(source._reset)
3015         super().add_input(name, source)
3016
3017     def build(self) -> OperationPlaceholder:
3018         """Get a reference to the internal node in the subgraph definition.
3019
3020         In the SubgraphContext, these handles cannot represent uniquely identifiable
3021         results. They are placeholders for relative positions in graphs that are
3022         not defined until the subgraph is being executed.
3023
3024         Such references should be tracked and invalidated when exiting the
3025         subgraph context. Within the subgraph context, they are used to establish
3026         the recipe for updating the subgraph's outputs or persistent data during
3027         execution.
3028         """
3029         # Placeholder handles in the subgraph definition don't have real resource managers.
3030         # Check for the ability to instantiate operations.
3031         handle = super().build()
3032         # handle = OperationPlaceholder()
3033         return typing.cast(OperationPlaceholder, handle)
3034
3035
3036 class SubgraphContext(Context):
3037     """Provide a Python context manager in which to set up a graph of operations.
3038
3039     Allows operations to be configured without adding nodes to the global execution
3040     context.
3041     """
3042
3043     def __init__(self):
3044         super().__init__()
3045         self.resetters = set()
3046
3047     def node_builder(self, operation, label=None) -> NodeBuilder:
3048         if label is not None:
3049             if label in self.labels:
3050                 raise exceptions.ValueError('Label {} is already in use.'.format(label))
3051             else:
3052                 # The builder should update the labeled node when it is done.
3053                 self.labels[label] = None
3054
3055         return SubgraphNodeBuilder(context=weakref.proxy(self), operation=operation, label=label)
3056
3057     def add_resetter(self, function):
3058         assert callable(function)
3059         self.resetters.add(function)
3060
3061
3062 class Subgraph(object, metaclass=GraphMeta):
3063     """
3064
3065     When subclassing from Subgraph, aspects of the subgraph's data ports can be
3066     specified with keyword arguments in the class statement. Example::
3067
3068         >>> class MySubgraph(Subgraph, variables={'int_with_default': 1, 'boolData': bool}): pass
3069         ...
3070
3071     The key word *variables* is used in the class declaration to map the types
3072     of subgraph Variables with (optional) default values.
3073
3074     Execution model:
3075         Subgraph execution must follow a well-defined protocol in order to sensibly
3076         resolve data Futures at predictable points. Note that subgraphs act as operations
3077         that can be automatically redefined at run time (in limited cases), such as
3078         to automatically form iteration chains to implement "while" loops. We refer
3079         to one copy / iteration / generation of a subgraph as an "element" below.
3080
3081         When a subgraph element begins execution, each of its variables with an
3082         "updated" state from the previous iteration has the "updated" state moved
3083         to the new element's "initial" state, and the "updated" state is voided.
3084
3085         Subgraph.next() appends an element of the subgraph to the chain. Subsequent
3086         calls to Subgraph.run() bring the new outputs up to date (and then call next()).
3087         Thus, for a subgraph with an output called ``is_converged``, calling
3088         ``while (not subgraph.is_converged): subgraph.run()`` has the desired effect,
3089         but is likely suboptimal for execution. Instead use the gmxapi while_loop.
3090
3091         If the subgraph is not currently executing, it can be in one of two states:
3092         "editing" or "ready". In the "ready" state, class and instance Variables
3093         cannot be assigned, but can be read through the data descriptors. In this
3094         state, the descriptors only have a single state.
3095
3096         If "editing," variables on the class object can be assigned to update the
3097         data flow defined for the subgraph. While "editing", reading or writing
3098         instance Variables is undefined behavior.
3099
3100         When "editing" begins, each variable is readable as a proxy to the "initial"
3101         state in an element. Assignments while "editing" put variables in temporary
3102         states accessible only during editing.
3103         When "editing" finishes, the "updated" output data source of each
3104         variable for the element is set, if appropriate.
3105
3106         # TODO: Use a get_context to allow operation factories or accessors to mark
3107         #  references for update/annotation when exiting the 'with' block.
3108     """
3109
3110
3111 class SubgraphBuilder(object):
3112     """Helper for defining new Subgraphs.
3113
3114     Manages a Python context in which to define a new Subgraph type. Can be used
3115     in a Python ``with`` construct exactly once to provide the Subgraph class body.
3116     When the ``with`` block is exited (or ``build()`` is called explicitly), a
3117     new type instance becomes available. Subsequent calls to SubgraphBuilder.__call__(self, ...)
3118     are dispatched to the Subgraph constructor.
3119
3120     Outside of the ``with`` block, read access to data members is proxied to
3121     descriptors on the built Subgraph.
3122
3123     Instances of SubgraphBuilder are returned by the ``subgraph()`` utility function.
3124     """
3125
3126     def __init__(self, variables):
3127         self.__dict__.update({'variables': variables,
3128                               '_staging': collections.OrderedDict(),
3129                               '_editing': False,
3130                               '_subgraph_context': None,
3131                               '_subgraph_instance': None,
3132                               '_fused_operation': None,
3133                               '_factory': None})
3134         # Return a placeholder that we can update during iteration.
3135         # Long term, this is probably implemented with data descriptors
3136         # that will be moved to a new Subgraph type object.
3137         for name in self.variables:
3138             if not isinstance(self.variables[name], Future):
3139                 self.variables[name] = gmx.make_constant(self.variables[name])
3140
3141         # class MySubgraph(Subgraph, variables=variables):
3142         #     pass
3143         #
3144         # self._subgraph_instance = MySubgraph()
3145
3146     def __getattr__(self, item):
3147         if self._editing:
3148             if item in self.variables:
3149                 if item in self._staging:
3150                     logger.debug('Read access to intermediate value of subgraph variable {}'.format(item))
3151                     return self._staging[item]
3152                 else:
3153                     logger.debug('Read access to subgraph variable {}'.format(item))
3154                     return self.variables[item]
3155             else:
3156                 raise AttributeError('Invalid attribute: {}'.format(item))
3157         else:
3158             # TODO: this is not quite the described interface...
3159             return lambda obj: obj.values[item]
3160
3161     def __setattr__(self, key, value):
3162         """Part of the builder interface."""
3163         if key in self.__dict__:
3164             self.__dict__[key] = value
3165         else:
3166             if self._editing:
3167                 self.add_update(key, value)
3168             else:
3169                 raise exceptions.UsageError('Subgraph is not in an editable state.')
3170
3171     def add_update(self, key, value):
3172         """Add a variable update to the internal subgraph."""
3173         if key not in self.variables:
3174             raise AttributeError('No such attribute: {}'.format(key))
3175         if not self._editing:
3176             raise exceptions.UsageError('Subgraph is not in an editable state.')
3177         # Else, stage the potential final value for the iteration.
3178         logger.debug('Staging subgraph update {} = {}'.format(key, value))
3179         # Return a placeholder that we can update during iteration.
3180         # Long term, this is probably implemented with data descriptors
3181         # that will be moved to a new Subgraph type object.
3182         if not isinstance(value, Future):
3183             value = gmx.make_constant(value)
3184         self._staging[key] = value
3185         self._staging.move_to_end(key)
3186
3187     def __enter__(self):
3188         """Enter a Context managed by the subgraph to capture operation additions.
3189
3190         Allows the internal data flow of the subgraph to be defined in the same
3191         manner as the default work graph while the Python context manager is active.
3192
3193         The subgraph Context is activated when entering a ``with`` block and
3194         finalized at the end of the block.
3195         """
3196         # While editing the subgraph in the SubgraphContext, we need __get__ and __set__
3197         # data descriptor access on the Subgraph subclass type, but outside of the
3198         # context manager, the descriptor should be non-writable and more opaque,
3199         # while the instance should be readable, but not writable.
3200         self.__dict__['_editing'] = True
3201         # TODO: this probably needs to be configured with variables...
3202         self.__dict__['_subgraph_context'] = SubgraphContext()
3203         push_context(self._subgraph_context)
3204         return self
3205
3206     def build(self):
3207         """Build the subgraph by defining some new operations.
3208
3209         Examine the subgraph variables. Variables with handles to data sources
3210         in the SubgraphContext represent work that needs to be executed on
3211         a subgraph execution or iteration. Variables with handles to data sources
3212         outside of the subgraph represent work that needs to be executed once
3213         on only the first iteration to initialize the subgraph.
3214
3215         Construct a factory for the fused operation that performs the work to
3216         update the variables on a single iteration.
3217
3218         Construct a factory for the fused operation that performs the work to
3219         update the variables on subsequent iterations and which will be fed
3220         the outputs of a previous iteration. Both of the generated operations
3221         have the same output signature.
3222
3223         Construct and wrap the generator function to recursively append work
3224         to the graph and update until condition is satisfied.
3225
3226         TODO: Explore how to drop work from the graph once there are no more
3227          references to its output, including check-point machinery.
3228         """
3229         logger.debug('Finalizing subgraph definition.')
3230         inputs = collections.OrderedDict()
3231         for key, value in self.variables.items():
3232             # TODO: What if we don't want to provide default values?
3233             inputs[key] = value
3234
3235         updates = self._staging
3236
3237         class Subgraph(object):
3238             def __init__(self, input_futures, update_sources):
3239                 self.values = collections.OrderedDict([(key, value.result()) for key, value in input_futures.items()])
3240                 logger.debug('subgraph initialized with {}'.format(
3241                     ', '.join(['{}: {}'.format(key, value) for key, value in self.values.items()])))
3242                 self.futures = collections.OrderedDict([(key, value) for key, value in input_futures.items()])
3243                 self.update_sources = collections.OrderedDict([(key, value) for key, value in update_sources.items()])
3244                 logger.debug('Subgraph updates staged:')
3245                 for update, source in self.update_sources.items():
3246                     logger.debug('    {} = {}'.format(update, source))
3247
3248             def run(self):
3249                 for name in self.update_sources:
3250                     result = self.update_sources[name].result()
3251                     logger.debug('Update: {} = {}'.format(name, result))
3252                     self.values[name] = result
3253                 # Replace the data sources in the futures.
3254                 for name in self.update_sources:
3255                     self.futures[name].resource_manager = gmx.make_constant(self.values[name]).resource_manager
3256                 for name in self.update_sources:
3257                     self.update_sources[name]._reset()
3258
3259         subgraph = Subgraph(inputs, updates)
3260
3261         return lambda subgraph=subgraph: subgraph
3262
3263     def __exit__(self, exc_type, exc_val, exc_tb):
3264         """End the Subgraph editing session and finalize the Subgraph build.
3265
3266         After exiting, this instance forwards __call__() to a factory for an
3267         operation that carries out the work in the subgraph with inputs bound
3268         in the current context as defined by ``variables``.
3269         """
3270         self._factory = self.build()
3271
3272         context = pop_context()
3273         assert context is self._subgraph_context
3274         self.__dict__['_editing'] = False
3275         # Returning False causes exceptions in the `with` block to be reraised.
3276         # Remember to switch this to return True if we want to transform or suppress
3277         # such an exception (we probably do).
3278         if exc_type is not None:
3279             logger.error('Got exception {} while editing subgraph {}.'.format(exc_val, self))
3280             logger.debug('Subgraph exception traceback: \n{}'.format(exc_tb))
3281         return False
3282
3283     def __call__(self):
3284         # TODO: After build() has been called, this should dispatch to a factory
3285         #  that returns an OperationHandle.
3286         return self._factory()
3287
3288
3289 def while_loop(*, operation, condition, max_iteration=10):
3290     """Generate and run a chain of operations such that condition evaluates True.
3291
3292     Returns and operation instance that acts like a single node in the current
3293     work graph, but which is a proxy to the operation at the end of a dynamically generated chain
3294     of operations. At run time, condition is evaluated for the last element in
3295     the current chain. If condition evaluates False, the chain is extended and
3296     the next element is executed. When condition evaluates True, the object
3297     returned by ``while_loop`` becomes a proxy for the last element in the chain.
3298
3299     Equivalent to calling operation.while(condition), where available.
3300
3301     Arguments:
3302         operation: a callable that produces an instance of an operation when called with no arguments.
3303         condition: a callable that accepts an object (returned by ``operation``) that returns a boolean.
3304         max_iteration: execute the loop no more than this many times (default 10)
3305
3306     Warning:
3307         *max_iteration* is provided in part to minimize the cost of bugs in early
3308         versions of this software. The default value may be changed or
3309         removed on short notice.
3310
3311     Warning:
3312         The protocol by which ``while_loop`` interacts with ``operation`` and ``condition``
3313         is very unstable right now. Please refer to this documentation when installing new
3314         versions of the package.
3315
3316     Protocol:
3317         Warning:
3318             This protocol will be changed before the 0.1 API is finalized.
3319
3320         When called, ``while_loop`` calls ``operation`` without arguments
3321         and captures the return value captured as ``_operation``.
3322         The object produced by ``operation()`` must have a ``reset``,
3323         a ``run`` method, and an ``output`` attribute.
3324
3325         This is inspected
3326         to determine the output data proxy for the operation produced by the call
3327         to ``while_loop``. When that operation is called, it does the equivalent of
3328
3329             while(condition(self._operation)):
3330                 self._operation.reset()
3331                 self._operation.run()
3332
3333         Then, the output data proxy of ``self`` is updated with the results from
3334         self._operation.output.
3335
3336     """
3337     # In the first implementation, Subgraph is NOT and OperationHandle.
3338     # if not isinstance(obj, AbstractOperationHandle):
3339     #     raise exceptions.UsageError(
3340     #     '"operation" key word argument must be a callable that produces an Operation handle.')
3341     # outputs = {}
3342     # for name, descriptor in obj.output.items():
3343     #     outputs[name] = descriptor._dtype
3344
3345     # 1. Get the initial inputs.
3346     # 2. Initialize the subgraph with the initial inputs.
3347     # 3. Run the subgraph.
3348     # 4. Get the outputs.
3349     # 5. Initialize the subgraph with the outputs.
3350     # 6. Go to 3 if condition is not met.
3351
3352     obj = operation()
3353     assert hasattr(obj, 'values')
3354     outputs = collections.OrderedDict([(key, type(value)) for key, value in obj.values.items()])
3355
3356     @function_wrapper(output=outputs)
3357     def run_loop(output: OutputCollectionDescription):
3358         iteration = 0
3359         obj = operation()
3360         logger.debug('Created object {}'.format(obj))
3361         logger.debug(', '.join(['{}: {}'.format(key, obj.values[key]) for key in obj.values]))
3362         logger.debug('Condition: {}'.format(condition(obj)))
3363         while (condition(obj)):
3364             logger.debug('Running iteration {}'.format(iteration))
3365             obj.run()
3366             logger.debug(
3367                 ', '.join(['{}: {}'.format(key, obj.values[key]) for key in obj.values]))
3368             logger.debug('Condition: {}'.format(condition(obj)))
3369             iteration += 1
3370             if iteration > max_iteration:
3371                 break
3372         for name in outputs:
3373             setattr(output, name, obj.values[name])
3374
3375         return obj
3376
3377     return run_loop
3378
3379
3380 def subgraph(variables=None):
3381     """Allow operations to be configured in a sub-context.
3382
3383     The object returned functions as a Python context manager. When entering the
3384     context manager (the beginning of the ``with`` block), the object has an
3385     attribute for each of the named ``variables``. Reading from these variables
3386     gets a proxy for the initial value or its update from a previous loop iteration.
3387     At the end of the ``with`` block, any values or data flows assigned to these
3388     attributes become the output for an iteration.
3389
3390     After leaving the ``with`` block, the variables are no longer assignable, but
3391     can be called as bound methods to get the current value of a variable.
3392
3393     When the object is run, operations bound to the variables are ``reset`` and
3394     run to update the variables.
3395     """
3396     # Implementation note:
3397     # A Subgraph (type) has a subgraph context associated with it. The subgraph's
3398     # ability to capture operation additions is implemented in terms of the
3399     # subgraph context.
3400     logger.debug('Declare a new subgraph with variables {}'.format(variables))
3401
3402     return SubgraphBuilder(variables)
3403
3404
3405 @computed_result
3406 def join_arrays(*, front: datamodel.NDArray = (), back: datamodel.NDArray = ()) -> datamodel.NDArray:
3407     """Operation that consumes two sequences and produces a concatenated single sequence.
3408
3409     Note that the exact signature of the operation is not determined until this
3410     helper is called. Helper functions may dispatch to factories for different
3411     operations based on the inputs. In this case, the dtype and shape of the
3412     inputs determines dtype and shape of the output. An operation instance must
3413     have strongly typed output, but the input must be strongly typed on an
3414     object definition so that a Context can make runtime decisions about
3415     dispatching work and data before instantiating.
3416     # TODO: elaborate and clarify.
3417     # TODO: check type and shape.
3418     # TODO: figure out a better annotation.
3419     """
3420     # TODO: (FR4) Returned list should be an NDArray.
3421     if isinstance(front, (str, bytes)) or isinstance(back, (str, bytes)):
3422         raise exceptions.ValueError('Input must be a pair of lists.')
3423     assert isinstance(front, datamodel.NDArray)
3424     assert isinstance(back, datamodel.NDArray)
3425     new_list = list(front._values)
3426     new_list.extend(back._values)
3427     return datamodel.NDArray(new_list)
3428
3429
3430 # TODO: Constrain
3431 Scalar = typing.TypeVar('Scalar')
3432
3433
3434 def concatenate_lists(sublists: list = ()) -> _Future[gmx.datamodel.NDArray]:
3435     """Combine data sources into a single list.
3436
3437     A trivial data flow restructuring operation.
3438     """
3439     if isinstance(sublists, (str, bytes)):
3440         raise exceptions.ValueError('Input must be a list of lists.')
3441     if len(sublists) == 0:
3442         return datamodel.ndarray([])
3443     else:
3444         # TODO: Fix the data model so that this can type-check properly.
3445         return join_arrays(front=sublists[0],
3446                            back=typing.cast(datamodel.NDArray,
3447                                             concatenate_lists(sublists[1:])))
3448
3449
3450 def make_constant(value: Scalar) -> _Future:
3451     """Provide a predetermined value at run time.
3452
3453     This is a trivial operation that provides a (typed) value, primarily for
3454     internally use to manage gmxapi data flow.
3455
3456     Accepts a value of any type. The object returned has a definite type and
3457     provides same interface as other gmxapi outputs. Additional constraints or
3458     guarantees on data type may appear in future versions.
3459     """
3460     dtype = type(value)
3461     source = StaticSourceManager(name='data', proxied_data=value, width=1, function=lambda x: x)
3462     description = ResultDescription(dtype=dtype, width=1)
3463     future = Future(source, 'data', description=description)
3464     return future
3465
3466
3467 def logical_not(value: bool) -> _Future:
3468     """Boolean negation.
3469
3470     If the argument is a gmxapi compatible Data or Future object, a new View or
3471     Future is created that proxies the boolean opposite of the input.
3472
3473     If the argument is a callable, logical_not returns a wrapper function that
3474     returns a Future for the logical opposite of the callable's result.
3475     """
3476     # TODO: Small data transformations like this don't need to be formal Operations.
3477     # This could be essentially a data annotation that affects the resolver in a
3478     # DataEdge. As an API detail, coding for different Contexts and optimizations
3479     # within those Context implementations could be simplified.
3480     operation = function_wrapper(output={'data': bool})(lambda data=bool(): not bool(data))
3481     return operation(data=value).output.data