Fix typo.
[alexxy/gromacs.git] / python_packaging / src / gmxapi / operation.py
1 #
2 # This file is part of the GROMACS molecular simulation package.
3 #
4 # Copyright (c) 2019,2020,2021, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
8 #
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
13 #
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 # Lesser General Public License for more details.
18 #
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
23 #
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
31 #
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
34
35 """Define gmxapi-compliant Operations
36
37 Provide decorators and base classes to generate and validate gmxapi Operations.
38
39 Nodes in a work graph are created as instances of Operations. An Operation factory
40 accepts well-defined inputs as key word arguments. The object returned by such
41 a factory is a handle to the node in the work graph. It's ``output`` attribute
42 is a collection of the Operation's results.
43
44 function_wrapper(...) produces a wrapper that converts a function to an Operation
45 factory. The Operation is defined when the wrapper is called. The Operation is
46 instantiated when the factory is called. The function is executed when the Operation
47 instance is run.
48
49 The framework ensures that an Operation instance is executed no more than once.
50 """
51
52 __all__ = ['computed_result',
53            'function_wrapper',
54            ]
55
56 import abc
57 import collections
58 import functools
59 import inspect
60 import typing
61 import weakref
62 from contextlib import contextmanager
63
64 import gmxapi as gmx
65 from gmxapi import datamodel
66 from gmxapi import exceptions
67 from gmxapi import logger as root_logger
68 from gmxapi.abc import OperationImplementation, MutableResource, Node
69 from gmxapi.typing import _Context, ResultTypeVar, SourceTypeVar, valid_result_types, valid_source_types
70 from gmxapi.typing import Future as _Future
71
72 # Initialize module-level logger
73 logger = root_logger.getChild('operation')
74 logger.info('Importing {}'.format(__name__))
75
76
77 class ResultDescription:
78     """Describe what will be returned when `result()` is called."""
79
80     def __init__(self, dtype=None, width=1):
81         assert isinstance(dtype, type)
82         assert issubclass(dtype, valid_result_types)
83         assert isinstance(width, int)
84         self._dtype = dtype
85         self._width = width
86
87     @property
88     def dtype(self) -> type:
89         """node output type"""
90         return self._dtype
91
92     @property
93     def width(self) -> int:
94         """ensemble width"""
95         return self._width
96
97     def __repr__(self):
98         return '{}(dtype={}, width={})'.format(self.__class__.__name__, self.dtype, self.width)
99
100
101 class OutputData(object):
102     """Encapsulate the description and storage of a data output."""
103
104     def __init__(self, name: str, description: ResultDescription):
105         assert name != ''
106         self._name = name
107         assert isinstance(description, ResultDescription)
108         self._description = description
109         self._done = [False] * self._description.width
110         self._data = [None] * self._description.width
111
112     @property
113     def name(self):
114         """The name of the published output."""
115         return self._name
116
117     # TODO: Change to regular member function and add ensemble member arg.
118     @property
119     def done(self):
120         """Ensemble completion status for this output."""
121         return all(self._done)
122
123     def data(self, member: int = None):
124         """Access the raw data for localized output for the ensemble or the specified member."""
125         if not self.done:
126             raise exceptions.ApiError('Attempt to read before data has been published.')
127         if self._data is None or None in self._data:
128             raise exceptions.ApiError('Data marked "done" but contains null value.')
129         if member is not None:
130             return self._data[member]
131         else:
132             return self._data
133
134     def set(self, value, member: int):
135         """Set local data and mark as completed.
136
137         Used internally to maintain the data store.
138         """
139         if self._description.dtype == datamodel.NDArray:
140             self._data[member] = datamodel.ndarray(value)
141         else:
142             self._data[member] = self._description.dtype(value)
143         self._done[member] = True
144
145     def reset(self):
146         """Reinitialize the data store.
147
148         Note:
149             This is a workaround until the execution model is more developed.
150
151         Todo:
152             Remove this method when all operation handles provide factories to
153             reinstantiate on new Contexts and/or with new inputs.
154
155         """
156         self._done = [False] * self._description.width
157         self._data = [None] * self._description.width
158
159
160 class EnsembleDataSource(gmx.abc.EnsembleDataSource):
161     """A single source of data with ensemble data flow annotations.
162
163     Note that data sources may be Futures.
164     """
165
166     def __init__(self, source=None, width=1, dtype=None):
167         self.source = source
168         self.width = width
169         self.dtype = dtype
170
171     def node(self, member: int):
172         return self.source[member]
173
174     def reset(self):
175         protocols = ('reset', '_reset')
176         for protocol in protocols:
177             if hasattr(self.source, protocol):
178                 getattr(self.source, protocol)()
179
180
181 class DataSourceCollection(collections.OrderedDict):
182     """Store and describe input data handles for an operation.
183
184     When created from InputCollectionDescription.bind(), the DataSourceCollection
185     has had default values inserted.
186
187     Note: DataSourceCollection is the input resource collection type for NodeBuilder.
188     To use with the NodeBuilder interface, first create the collection, then
189     iteratively add its resources.
190
191     TODO: We should probably normalize the collection aspect of input. Is this a
192           single input? Are all inputs added as collections? Should this be split
193           to a standard iterable? Does a resource factory always produce a mapping?
194     """
195
196     def __init__(self, **kwargs):
197         """Initialize from key/value pairs of named data sources.
198
199         Data sources may be any of the basic gmxapi data types, gmxapi Futures
200         of those types, or gmxapi ensemble data bundles of the above.
201         """
202         super(DataSourceCollection, self).__init__()
203         for name, value in kwargs.items():
204             self[name] = value
205
206     def __setitem__(self, key: str, value: SourceTypeVar) -> None:
207         if not isinstance(key, str):
208             raise exceptions.TypeError('Data must be named with str type.')
209         # TODO: Encapsulate handling of proferred data sources to Context details.
210         # Preprocessed input should be self-describing gmxapi data types. Structured
211         # input must be recursively (depth-first) converted to gmxapi data types.
212         # TODO: Handle gmxapi Futures stored as dictionary elements!
213         if not isinstance(value, valid_source_types):
214             if isinstance(value, collections.abc.Iterable):
215                 # Iterables here are treated as arrays, but we do not have a robust typing system.
216                 # Warning: In the initial implementation, the iterable may contain Future objects.
217                 # TODO: (#2993) Revisit as we sort out data shape and Future protocol.
218                 try:
219                     value = datamodel.ndarray(value)
220                 except (exceptions.ValueError, exceptions.TypeError) as e:
221                     raise exceptions.TypeError('Iterable could not be converted to NDArray: {}'.format(value)) from e
222             elif hasattr(value, 'result'):
223                 # A Future object.
224                 pass
225             else:
226                 raise exceptions.ApiError('Cannot process data source {}'.format(value))
227         super().__setitem__(key, value)
228
229     def reset(self):
230         """Reset all sources in the collection."""
231         for source in self.values():
232             if hasattr(source, 'reset'):
233                 source.reset()
234             if hasattr(source, '_reset'):
235                 source._reset()
236
237     def __hash__(self):
238         """Provide some sort of unique identifier.
239
240         We need a more deterministic fingerprinting scheme with well-specified
241         uniqueness semantics, but right now we just need something reasonably
242         unique.
243         """
244         hashed_keys_and_values = tuple(hash(entity) for item in self.items() for entity in item)
245         return hash(hashed_keys_and_values)
246
247
248 def computed_result(function):
249     """Decorate a function to get a helper that produces an object with Result behavior.
250
251     When called, the new function produces an ImmediateResult object.
252
253     The new function has the same signature as the original function, but can accept
254     gmxapi data proxies, assuming the provided proxy objects represent types
255     compatible with the original signature.
256
257     Calls to `result()` return the value that `function` would return when executed
258     in the local context with the inputs fully resolved.
259
260     The API does not specify when input data dependencies will be resolved
261     or when the wrapped function will be executed. That is, ``@computed_result``
262     functions may force immediate resolution of data dependencies and/or may
263     be called more than once to satisfy dependent operation inputs.
264     """
265
266     # Attempt to inspect function signature. Introspection can fail, so we catch
267     # the various exceptions. We re-raise as ApiError, indicating a bug, because
268     # we should do more to handle this intelligently and provide better user
269     # feedback.
270     # TODO: Figure out what to do with exceptions where this introspection
271     #  and rebinding won't work.
272     # ref: https://docs.python.org/3/library/inspect.html#introspecting-callables-with-the-signature-object
273     try:
274         sig = inspect.signature(function)
275     except TypeError as T:
276         raise exceptions.ApiError('Can not inspect type of provided function argument.') from T
277     except ValueError as V:
278         raise exceptions.ApiError('Can not inspect provided function signature.') from V
279
280     wrapped_function = function_wrapper()(function)
281
282     @functools.wraps(function)
283     def new_function(*args, **kwargs):
284         # The signature of the new function will accept abstractions
285         # of whatever types it originally accepted. This wrapper must
286         # * Create a mapping to the original call signature from `input`
287         # * Add handling for typed abstractions in wrapper function.
288         # * Process arguments to the wrapper function into `input`
289
290         # 1. Inspect the return annotation to determine valid gmxapi type(s)
291         # 2. Generate a Result object advertising the correct type, bound to the
292         #    Input and implementing function.
293         # 3. Transform the result() data to the correct type.
294
295         # TODO: (FR3+) create a serializable data structure for inputs discovered
296         #  from function introspection.
297
298         for name, param in sig.parameters.items():
299             assert not param.kind == param.POSITIONAL_ONLY
300         bound_arguments = sig.bind(*args, **kwargs)
301         handle = wrapped_function(**bound_arguments.arguments)
302         output = handle.output
303         # TODO: Find a type hinting / generic way to assert output attributes.
304         return output.data
305
306     return new_function
307
308
309 class OutputCollectionDescription(collections.OrderedDict):
310     def __init__(self, **kwargs):
311         """Create the output description for an operation node from a dictionary of names and types."""
312         outputs = []
313         for name, flavor in kwargs.items():
314             if not isinstance(name, str):
315                 raise exceptions.TypeError('Output descriptions are keyed by Python strings.')
316             # Multidimensional outputs are explicitly NDArray
317             if issubclass(flavor, (list, tuple)):
318                 flavor = datamodel.NDArray
319             assert issubclass(flavor, valid_result_types)
320             outputs.append((name, flavor))
321         super().__init__(outputs)
322
323
324 class InputCollectionDescription(collections.OrderedDict):
325     """Describe acceptable inputs for an Operation.
326
327     Generally, an InputCollectionDescription is an aspect of the public API by
328     which an Operation expresses its possible inputs. This class includes details
329     of the Python package.
330
331     Keyword Arguments:
332         parameters : A sequence of named parameter descriptions.
333
334     Parameter descriptions are objects containing an `annotation` attribute
335     declaring the data type of the parameter and, optionally, a `default`
336     attribute declaring a default value for the parameter.
337
338     Instances can be used as an ordered map of parameter names to gmxapi data types.
339
340     Analogous to inspect.Signature, but generalized for gmxapi Operations.
341     Additional notable differences: typing is normalized at initialization, and
342     the bind() method does not return an object that can be directly used as
343     function input. The object returned from bind() is used to construct a data
344     graph Edge for subsequent execution.
345     """
346
347     def __init__(self, parameters: typing.Iterable[typing.Tuple[str, inspect.Parameter]]):
348         """Create the input description for an operation node from a dictionary of names and types."""
349         inputs = []
350         for name, param in parameters:
351             if not isinstance(name, str):
352                 raise exceptions.TypeError('Input descriptions are keyed by Python strings.')
353             # Multidimensional inputs are explicitly NDArray
354             dtype = param.annotation
355             if issubclass(dtype, collections.abc.Iterable) \
356                     and not issubclass(dtype, (str, bytes, collections.abc.Mapping)):
357                 # TODO: we can relax this with some more input conditioning.
358                 if dtype != datamodel.NDArray:
359                     raise exceptions.UsageError(
360                         'Cannot accept input type {}. Sequence type inputs must use NDArray.'.format(param))
361             assert issubclass(dtype, valid_result_types)
362             if hasattr(param, 'kind'):
363                 disallowed = any([param.kind == param.POSITIONAL_ONLY,
364                                   param.kind == param.VAR_POSITIONAL,
365                                   param.kind == param.VAR_KEYWORD])
366                 if disallowed:
367                     raise exceptions.ProtocolError(
368                         'Cannot wrap function. Operations must have well-defined parameter names.')
369                 kind = param.kind
370             else:
371                 kind = inspect.Parameter.POSITIONAL_OR_KEYWORD
372             if hasattr(param, 'default'):
373                 default = param.default
374             else:
375                 default = inspect.Parameter.empty
376             inputs.append(inspect.Parameter(name, kind, default=default, annotation=dtype))
377         super().__init__([(input.name, input.annotation) for input in inputs])
378         self.signature = inspect.Signature(inputs)
379
380     @staticmethod
381     def from_function(function):
382         """Inspect a function to be wrapped.
383
384         Used internally by gmxapi.operation.function_wrapper()
385
386             Raises:
387                 exceptions.ProtocolError if function signature cannot be determined to be valid.
388
389             Returns:
390                 InputCollectionDescription for the function input signature.
391         """
392         # First, inspect the function.
393         assert callable(function)
394         signature = inspect.signature(function)
395         # The function must have clear and static input schema
396         # Make sure that all parameters have clear names, whether or not they are used in a call.
397         for name, param in signature.parameters.items():
398             disallowed = any([param.kind == param.POSITIONAL_ONLY,
399                               param.kind == param.VAR_POSITIONAL,
400                               param.kind == param.VAR_KEYWORD])
401             if disallowed:
402                 raise exceptions.ProtocolError(
403                     'Cannot wrap function. Operations must have well-defined parameter names.')
404             if param.name == 'input':
405                 raise exceptions.ProtocolError('Function signature includes the (reserved) "input" keyword argument.')
406         description = collections.OrderedDict()
407         for param in signature.parameters.values():
408             if param.name == 'output':
409                 # Wrapped functions may accept the output parameter to publish results, but
410                 # that is not part of the Operation input signature.
411                 continue
412             if param.annotation == param.empty:
413                 if param.default == param.empty or param.default is None:
414                     raise exceptions.ProtocolError('Could not infer parameter type for {}'.format(param.name))
415                 dtype = type(param.default)
416                 if isinstance(dtype, collections.abc.Iterable) \
417                         and not isinstance(dtype, (str, bytes, collections.abc.Mapping)):
418                     dtype = datamodel.NDArray
419             else:
420                 dtype = param.annotation
421             description[param.name] = param.replace(annotation=dtype)
422         return InputCollectionDescription(description.items())
423
424     def bind(self, *args, **kwargs) -> DataSourceCollection:
425         """Create a compatible DataSourceCollection from provided arguments.
426
427         Pre-process input and function signature to get named input arguments.
428
429         This is a helper function to allow calling code to characterize the
430         arguments in a Python function call with hints from the factory that is
431         initializing an operation. Its most useful functionality is to  allows a
432         factory to accept positional arguments where named inputs are usually
433         required. It also allows data sources to participate in multiple
434         DataSourceCollections with minimal constraints.
435
436         Note that the returned object has had data populated from any defaults
437         described in the InputCollectionDescription.
438
439         See wrapped_function_runner() and describe_function_input().
440         """
441         # For convenience, accept *args, but convert to **kwargs to pass to Operation.
442         # Factory accepts an unadvertised `input` keyword argument that is used as a default kwargs dict.
443         # If present, kwargs['input'] is treated as an input "pack" providing _default_ values.
444         input_kwargs = collections.OrderedDict()
445         if 'input' in kwargs:
446             provided_input = kwargs.pop('input')
447             if provided_input is not None:
448                 input_kwargs.update(provided_input)
449         # `function` may accept an `output` keyword argument that should not be supplied to the factory.
450         for key, value in kwargs.items():
451             if key == 'output':
452                 raise exceptions.UsageError('Invalid keyword argument: output (reserved).')
453             input_kwargs[key] = value
454         try:
455             bound_arguments = self.signature.bind_partial(*args, **input_kwargs)
456         except TypeError as e:
457             raise exceptions.UsageError('Could not bind operation parameters to function signature.') from e
458         assert 'output' not in bound_arguments.arguments
459         bound_arguments.apply_defaults()
460         assert 'input' not in bound_arguments.arguments
461         input_kwargs = collections.OrderedDict([pair for pair in bound_arguments.arguments.items()])
462         if 'output' in input_kwargs:
463             input_kwargs.pop('output')
464         return DataSourceCollection(**input_kwargs)
465
466
467 class ProxyDataDescriptor(object):
468     """Base class for data descriptors used in DataProxyBase subclasses.
469
470     Subclasses should either not define __init__ or should call the base class
471     __init__ explicitly: super().__init__(self, name, dtype)
472     """
473
474     def __init__(self, name: str, dtype: ResultTypeVar = None):
475         self._name = name
476         # TODO: We should not allow dtype==None, but we currently have a weak data
477         #  model that does not allow good support of structured Futures.
478         if dtype is not None:
479             assert isinstance(dtype, type)
480             assert issubclass(dtype, valid_result_types)
481         self._dtype = dtype
482
483
484 class DataProxyMeta(abc.ABCMeta):
485     # Key word arguments consumed by __prepare__
486     _prepare_keywords = ('descriptors',)
487
488     @classmethod
489     def __prepare__(mcs, name, bases, descriptors: collections.abc.Mapping = None):
490         """Allow dynamic sub-classing.
491
492         DataProxy class definitions are collections of data descriptors. This
493         class method allows subclasses to give the descriptor names and type(s)
494         in the class declaration as arguments instead of as class attribute
495         assignments.
496
497             class MyProxy(DataProxyBase, descriptors={name: MyDescriptor() for name in datanames}): pass
498
499         Note:
500             Recent Python versions allow this to be replaced via ``__init_subclass__`` hook.
501             See :issue:`4116`
502         """
503         if descriptors is None:
504             return {}
505         elif isinstance(descriptors, tuple):
506             namespace = collections.OrderedDict([(d._name, d) for d in descriptors])
507             return namespace
508         else:
509             assert isinstance(descriptors, collections.abc.Mapping)
510             return descriptors
511
512     def __new__(cls, name, bases: typing.Iterable, namespace, **kwargs):
513         for key in kwargs:
514             if key not in DataProxyMeta._prepare_keywords:
515                 raise exceptions.ApiError('Unexpected class creation keyword: {}'.format(key))
516         # See note about DataProxyBase._reserved.
517         if '_reserved' not in namespace and not any(hasattr(base, '_reserved') for base in bases):
518             raise exceptions.ApiError(
519                 'We currently expect DataProxy classes to provide a list of reserved attribute names.')
520         for key in namespace:
521             # Here we can check conformance with naming and typing rules.
522             assert isinstance(key, str)
523             if key.startswith('__'):
524                 # Skip non-public attributes.
525                 continue
526             descriptor = namespace[key]
527             # The purpose of the current data proxies is to serve as a limited namespace
528             # containing only descriptors of a certain type. In the future, these proxies
529             # may be flattened into a facet of a richer OperationHandle class
530             # (this metaclass may become a decorator on an implementation class),
531             # but for now we check that the class is being used according to the
532             # documented framework. A nearer term update could be to restrict the
533             # type of the data descriptor:
534             # TODO: Use a member type of the derived cls (or a mix-in base) to specify a particular
535             #  ProxyDataDescriptor subclass.
536             # Also, see note about DataProxyBase._reserved
537             if not isinstance(descriptor, ProxyDataDescriptor):
538                 if key not in namespace['_reserved'] and not any(key in getattr(base, '_reserved') for base in
539                                                                  bases if hasattr(base, '_reserved')):
540                     raise exceptions.ApiError('Unexpected data proxy attribute {}: {}'.format(key, repr(descriptor)))
541             else:
542                 assert isinstance(descriptor, ProxyDataDescriptor)
543                 if not isinstance(descriptor._name, str) or descriptor._name == '':
544                     descriptor._name = key
545                 else:
546                     if descriptor._name != key:
547                         raise exceptions.ApiError(
548                             'Descriptor internal name {} does not match attribute name {}'.format(
549                                 descriptor._name, key))
550         return super().__new__(cls, name, bases, namespace)
551
552     # TODO: This keyword argument stripping is not necessary in more recent Python versions.
553     # When Python minimum required version is increased, check if we can remove this.
554     def __init__(cls, name, bases, namespace, **kwargs):
555         for key in kwargs:
556             if key not in DataProxyMeta._prepare_keywords:
557                 raise exceptions.ApiError('Unexpected class initialization keyword: {}'.format(key))
558         super().__init__(name, bases, namespace)
559
560     # TODO: See if we can use __dir__ in the metaclass to help hint class attributes for better tab completion.
561     #  Ref: https://ipython.readthedocs.io/en/stable/config/integrating.html#tab-completion
562     # def __dir__(self) -> Iterable[str]:
563     #     return super().__dir__()
564
565
566 class DataProxyBase(collections.abc.Mapping, metaclass=DataProxyMeta):
567     """Limited interface to managed resources.
568
569     Inherit from DataProxy to specialize an interface to an ``instance``.
570     In the derived class, either do not define ``__init__`` or be sure to
571     initialize the super class (DataProxy) with an instance of the object
572     to be proxied.
573
574     A class deriving from DataProxyBase allows its instances to provide a namespace
575     for proxies to named data by defining attributes that are data descriptors
576     (subclasses of ProxyDataDescriptor).
577     The ProxyDataDescriptors are accessed as attributes of the
578     data proxy instance or by iterating on items(). Attributes that are not
579     ProxyDataDescriptors are possible, but will not be returned by items() which
580     is a necessary part of gmxapi execution protocol.
581
582     Acts as an owning handle to the resources provide by ``instance``,
583     preventing the reference count of ``instance`` from going to zero for the
584     lifetime of the proxy object.
585
586     When sub-classing DataProxyBase, data descriptors can be passed as a mapping
587     to the ``descriptors`` key word argument in the class declaration. This
588     allows data proxy subclasses to be easily defined dynamically.
589
590         mydescriptors = {'foo': Publisher('foo', int), 'data': Publisher('data', float)}
591         ...
592         class MyDataProxy(DataProxyBase, descriptors=mydescriptors): pass
593         assert hasattr(MyDataProxy, 'foo')
594
595     """
596     # This class attribute (which subclasses are free to replace to augment) is an
597     # indication of a problem with the current data model. If we are allowing
598     # reserved words that would otherwise be valid data names, there is not a
599     # compelling reason for separate data proxy classes: we throw away the assertion
600     # that we are preparing a clean namespace and we could have accomplished the
601     # class responsibilities in the Operation handle with just descriptor classes.
602     # If we want the clean namespace, we should figure out how to keep this interface
603     # from growing and/or have some "hidden" internal interface.
604     _reserved = ('ensemble_width', 'items', '_reserved')
605
606     # This class can be expanded to be the attachment point for a metaclass for
607     # data proxies such as PublishingDataProxy or OutputDataProxy, which may be
608     # defined very dynamically and concisely as a set of Descriptors and a type()
609     # call.
610     # If development in this direction does not materialize, then this base
611     # class is not very useful and should be removed.
612     def __init__(self, instance: 'SourceResource', client_id: int = None):
613         """Get partial ownership of a resource provider.
614
615         Arguments:
616             instance : resource-owning object
617             client_id : identifier for client holding the resource handle (e.g. ensemble member id)
618
619         If client_id is not provided, the proxy scope is for all clients.
620         """
621         # TODO: Decide whether _resource_instance is public or not.
622         # Note: currently commonly needed for subclass implementations.
623         self._resource_instance = instance
624         # Developer note subclasses should handle self._client_identifier == None
625         self._client_identifier = client_id
626         # Collection is fixed by the time of instance creation, so cache it.
627         self.__keys = tuple([key for key, _ in self.items()])
628         self.__length = len(self.__keys)
629
630     @property
631     def ensemble_width(self) -> int:
632         return self._resource_instance.width()
633
634     @classmethod
635     def items(cls):
636         """Generator for tuples of attribute name and descriptor instance.
637
638         This almost certainly doesn't do quite what we want...
639         """
640         for name, value in cls.__dict__.items():
641             if isinstance(value, ProxyDataDescriptor):
642                 yield name, value
643
644     def __getitem__(self, k):
645         if hasattr(self, k):
646             return getattr(self, k)
647
648     def __len__(self):
649         return self.__length
650
651     def __iter__(self):
652         for key in self.__keys:
653             yield key
654
655
656 class Publisher(ProxyDataDescriptor):
657     """Data descriptor for write access to a specific named data resource.
658
659     For a wrapped function receiving an ``output`` argument, provides the
660     accessors for an attribute on the object passed as ``output``. Maps
661     read and write access by the wrapped function to appropriate details of
662     the resource manager.
663
664     Used internally to implement settable attributes on PublishingDataProxy.
665     Allows PublishingDataProxy to be dynamically defined in the scope of the
666     operation.function_wrapper closure. Each named output is represented by
667     an instance of Publisher in the PublishingDataProxy class definition for
668     the operation.
669
670     Ref: https://docs.python.org/3/reference/datamodel.html#implementing-descriptors
671
672     Collaborations:
673     Relies on implementation details of ResourceManager.
674     """
675
676     def __get__(self, instance: DataProxyBase, owner):
677         if instance is None:
678             # The current access has come through the class attribute of owner class
679             return self
680         resource_manager = instance._resource_instance
681         client_id = instance._client_identifier
682         # TODO: Fix API scope.
683         # Either this class is a detail of the same implementation as ResourceManager,
684         # or we need to enforce that instance._resource_instance provides _data (or equivalent)
685         assert isinstance(resource_manager, ResourceManager)
686         if client_id is None:
687             return getattr(resource_manager._data, self._name)
688         else:
689             return getattr(resource_manager._data, self._name)[client_id]
690
691     def __set__(self, instance: DataProxyBase, value):
692         resource_manager = instance._resource_instance
693         # TODO: Fix API scope.
694         # Either this class is a detail of the same implementation as ResourceManager,
695         # or we need to enforce that instance._resource_instance provides _data (or equivalent)
696         assert isinstance(resource_manager, ResourceManager)
697         client_id = instance._client_identifier
698         resource_manager.set_result(name=self._name, value=value, member=client_id)
699
700     def __repr__(self):
701         return '{}(name={}, dtype={})'.format(self.__class__.__name__,
702                                               self._name,
703                                               self._dtype.__qualname__)
704
705
706 def define_publishing_data_proxy(output_description) -> typing.Type[DataProxyBase]:
707     """Returns a class definition for a PublishingDataProxy for the provided output description."""
708     # This dynamic type creation hides collaborations with things like make_datastore.
709     # We should encapsulate these relationships in Context details, explicit collaborations
710     # between specific operations and Contexts, and in groups of Operation definition helpers.
711
712     descriptors = collections.OrderedDict([(name, Publisher(name)) for name in output_description])
713
714     class PublishingDataProxy(DataProxyBase, descriptors=descriptors):
715         """Handler for write access to the `output` of an operation.
716
717         Acts as a sort of PublisherCollection.
718         """
719
720     return PublishingDataProxy
721
722
723 # get symbols we can use to annotate input and output types more specifically.
724 _OutputDataProxyType = typing.TypeVar('_OutputDataProxyType', bound=DataProxyBase)
725 _PublishingDataProxyType = typing.TypeVar('_PublishingDataProxyType', bound=DataProxyBase)
726 # Currently, the ClientID type is an integer, but this may change.
727 ClientID = typing.NewType('ClientID', int)
728
729
730 class _Resources(typing.Generic[_PublishingDataProxyType]):
731     pass
732
733
734 # TODO: Why generic in publishingdataproxytype?
735 class SourceResource(typing.Generic[_OutputDataProxyType, _PublishingDataProxyType]):
736     """Resource Manager for a data provider.
737
738     Supports Future instances in a particular context.
739     """
740
741     # Note: ResourceManager members not yet included:
742     # future(), _data, set_result.
743
744     # This might not belong here. Maybe separate out for a OperationHandleManager?
745     @abc.abstractmethod
746     def data(self) -> _OutputDataProxyType:
747         """Get the output data proxy."""
748         # Warning: this should probably be renamed, but "output_data_proxy" is already
749         # a member in at least one derived class.
750         ...
751
752     @abc.abstractmethod
753     def is_done(self, name: str) -> bool:
754         return False
755
756     @abc.abstractmethod
757     def get(self, name: str) -> 'OutputData':
758         ...
759
760     @abc.abstractmethod
761     def update_output(self):
762         """Bring the _data member up to date and local."""
763         pass
764
765     @abc.abstractmethod
766     def reset(self):
767         """Recursively reinitialize resources.
768
769         Set the resource manager to its initialized state.
770         All outputs are marked not "done".
771         All inputs supporting the interface have ``_reset()`` called on them.
772         """
773
774     @abc.abstractmethod
775     def width(self) -> int:
776         """Ensemble width of the managed resources."""
777         ...
778
779     @abc.abstractmethod
780     def future(self, name: str, description: ResultDescription) -> 'Future':
781         """Get a Future handle for managed data.
782
783         Resource managers owned by subclasses of gmx.operation.Context provide
784         this method to get references to output data.
785
786         In addition to the interface described by gmx.abc.Future, returned objects
787         provide the interface described by gmx.operation.Future.
788         """
789
790
791 class StaticSourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]):
792     """Provide the resource manager interface for local static data.
793
794     Allow data transformations on the proxied resource.
795
796     Keyword Args:
797         proxied_data: A gmxapi supported data object.
798         width: Size of (one-dimensional) shaped data produced by function.
799         function: Transformation to perform on the managed data.
800
801     The callable passed as ``function`` must accept a single argument. The
802     argument will be an iterable when proxied_data represents an ensemble,
803     or an object of the same type as proxied_data otherwise.
804     """
805
806     def __init__(self, *, name: str, proxied_data, width: int, function: typing.Callable):
807         assert not isinstance(proxied_data, Future)
808         if hasattr(proxied_data, 'width'):
809             # Ensemble data source
810             assert hasattr(proxied_data, 'source')
811             self._result = function(proxied_data.source)
812         else:
813             self._result = function(proxied_data)
814         if width > 1:
815             if isinstance(self._result, (str, bytes)):
816                 # In this case, do not implicitly broadcast
817                 raise exceptions.ValueError('"function" produced data incompatible with "width".')
818             else:
819                 if not isinstance(self._result, collections.abc.Iterable):
820                     raise exceptions.DataShapeError(
821                         'Expected iterable of size {} but "function" result is not iterable.')
822             data = list(self._result)
823             size = len(data)
824             if len(data) != width:
825                 raise exceptions.DataShapeError(
826                     'Expected iterable of size {} but "function" produced a {} of size {}'.format(width, type(data),
827                                                                                                   size))
828             dtype = type(data[0])
829         else:
830             if width != 1:
831                 raise exceptions.ValueError('width must be an integer 1 or greater.')
832             dtype = type(self._result)
833             if issubclass(dtype, (list, tuple)):
834                 dtype = datamodel.NDArray
835                 data = [datamodel.ndarray(self._result)]
836             elif isinstance(self._result, collections.abc.Iterable):
837                 if not isinstance(self._result, (str, bytes, dict)):
838                     raise exceptions.ValueError(
839                         'Expecting width 1 but "function" produced iterable type {}.'.format(type(self._result)))
840                 else:
841                     dtype = str
842                     data = [str(self._result)]
843             else:
844                 data = [self._result]
845         description = ResultDescription(dtype=dtype, width=width)
846         self._data = OutputData(name=name, description=description)
847         for member in range(width):
848             self._data.set(data[member], member=member)
849
850         output_collection_description = OutputCollectionDescription(**{name: dtype})
851         self.output_data_proxy = define_output_data_proxy(output_description=output_collection_description)
852
853     def is_done(self, name: str) -> bool:
854         return True
855
856     def get(self, name: str) -> 'OutputData':
857         assert self._data.name == name
858         return self._data
859
860     def data(self) -> _OutputDataProxyType:
861         return self.output_data_proxy(self)
862
863     def width(self) -> int:
864         # TODO: It looks like the OutputData ResultDescription probably belongs
865         #  in the public interface.
866         return self._data._description.width
867
868     def update_output(self):
869         pass
870
871     def reset(self):
872         pass
873
874     def future(self, name: str, description: ResultDescription) -> 'Future':
875         return Future(self, name, description=description)
876
877
878 class ProxyResourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]):
879     """Act as a resource manager for data managed by another resource manager.
880
881     Allow data transformations on the proxied resource.
882
883     Keyword Args:
884         proxied_future: An object implementing the Future interface.
885         width: Size of (one-dimensional) shaped data produced by function.
886         function: Transformation to perform on the result of proxied_future.
887
888     The callable passed as ``function`` must accept a single argument, which will
889     be an iterable when proxied_future represents an ensemble, or an object of
890     type proxied_future.description.dtype otherwise.
891     """
892
893     def __init__(self, proxied_future: 'Future', width: int, function: typing.Callable):
894         self._done = False
895         self._proxied_future = proxied_future
896         self._width = width
897         self.name = self._proxied_future.name
898         self._result = None
899         assert callable(function)
900         self.function = function
901
902     def width(self) -> int:
903         return self._width
904
905     def reset(self):
906         self._done = False
907         self._proxied_future._reset()
908         self._result = None
909
910     def is_done(self, name: str) -> bool:
911         return self._done
912
913     def get(self, name: str):
914         if name != self.name:
915             raise exceptions.ValueError('Request for unknown data.')
916         if not self.is_done(name):
917             raise exceptions.ProtocolError('Data not ready.')
918         result = self.function(self._result)
919         if self._width != 1:
920             # TODO Fix this typing nightmare:
921             #  ResultDescription should be fully knowable and defined when the resource manager is initialized.
922             data = OutputData(name=self.name, description=ResultDescription(dtype=type(result[0]), width=self._width))
923             for member, value in enumerate(result):
924                 data.set(value, member)
925         else:
926             data = OutputData(name=self.name, description=ResultDescription(dtype=type(result), width=self._width))
927             data.set(result, 0)
928         return data
929
930     def update_output(self):
931         self._result = self._proxied_future.result()
932         self._done = True
933
934     def data(self) -> _OutputDataProxyType:
935         raise exceptions.ApiError('ProxyResourceManager cannot yet manage a full OutputDataProxy.')
936
937     def future(self, name: str, description: ResultDescription):
938         return Future(self, name, description=description)
939
940
941 class AbstractOperation(typing.Generic[_OutputDataProxyType]):
942     """Client interface to an operation instance (graph node).
943
944     Note that this is a generic abstract class. Subclasses should provide a
945     class subscript to help static type checkers.
946     """
947
948     @abc.abstractmethod
949     def run(self):
950         """Assert execution of an operation.
951
952         After calling run(), the operation results are guaranteed to be available
953         in the local context.
954         """
955
956     @property
957     @abc.abstractmethod
958     def output(self) -> _OutputDataProxyType:
959         """Get a proxy collection to the output of the operation.
960
961         Developer note: The 'output' property exists to isolate the namespace of
962         output data from other operation handle attributes and we should consider
963         whether it is actually necessary or helpful. To facilitate its possible
964         future removal, do not enrich its interface beyond that of a collection
965         of OutputDescriptor attributes.
966         """
967         ...
968
969
970 class OperationRegistryKey(collections.namedtuple('OperationRegistryKey', 'namespace name'), collections.abc.Hashable):
971     """Helper class to define the key type for OperationRegistry."""
972
973     def __hash__(self):
974         return hash((self.namespace, self.name))
975
976     def __eq__(self, other):
977         """Test equivalence rather than identity.
978
979         Note: Use `is` to test identity.
980         """
981         return other.namespace == self.namespace and other.name == self.name
982
983     def __str__(self):
984         return '.'.join([self.namespace, self.name])
985
986     def __repr__(self):
987         return '{}(namespace={}, name={})'.format(self.__qualname__, self.namespace, self.name)
988
989
990 def _make_registry_key(*args) -> OperationRegistryKey:
991     """Normalize input to an OperationRegistryKey.
992
993     Used to implement OperationRegistry.__getitem__(), which catches and converts
994     the various exceptions this helper function can produce.
995     """
996     if len(args) > 1:
997         return OperationRegistryKey(*args)
998     else:
999         if len(args) != 1:
1000             raise exceptions.UsageError('Empty index value passed to OperationRegistry instance[].')
1001         item = args[0]
1002     if isinstance(item, OperationRegistryKey):
1003         return item
1004     if isinstance(item, str):
1005         namespace, name = item.rsplit(sep='.', maxsplit=1)
1006         return OperationRegistryKey(namespace=namespace, name=name)
1007     # Item could be a class object or an instance object...
1008     if hasattr(item, 'namespace') and hasattr(item, 'name'):
1009         if callable(item.namespace):
1010             namespace = item.namespace()
1011         else:
1012             namespace = item.namespace
1013         if callable(item.name):
1014             name = item.name()
1015         else:
1016             name = item.name
1017         return OperationRegistryKey(namespace=namespace, name=name)
1018     raise exceptions.ValueError('Not a usable OperationRegistryKey: {}'.format(item))
1019
1020
1021 class OperationRegistry(collections.UserDict):
1022     """Helper class to map identifiers to Operation implementation instances.
1023
1024     This is an implementation detail of gmxapi.operation and should not be used from
1025     outside of the package until a stable interface can be specified.
1026     """
1027
1028     @typing.overload
1029     def __getitem__(self, item: OperationRegistryKey):
1030         ...
1031
1032     @typing.overload
1033     def __getitem__(self, item: str):
1034         ...
1035
1036     def __getitem__(self, *args):
1037         """Fetch the requested operation registrant.
1038
1039         The getter is overloaded to support look-ups in multiple forms.
1040
1041         The key can be given in the following forms.
1042         * As a period-delimited string of the form "namespace.operation".
1043         * As an OperationRegistryKey object.
1044         * As a sequence of positional arguments accepted by OperationRegistryKey.
1045         * As an object conforming to the OperationRegistryKey interface.
1046         """
1047         try:
1048             item = _make_registry_key(*args)
1049         except exceptions.Error as e:
1050             raise exceptions.TypeError('Could not interpret key as a OperationRegistryKey.') from e
1051         return self.data[item]
1052
1053
1054 # Module level data store for locating operation implementations at run time.
1055 # TODO: This may make sense as instance data of a root Context instance, but we
1056 #  don't have a stable interface for communicating between Contexts yet.
1057 # Alternatively, it could be held as class data in a RegisteredOperation class,
1058 # but note that the "register" member function would behave less like the abc.ABCMeta
1059 # support for "virtual subclassing" and more like the generic function machinery
1060 # of e.g. functools.singledispatch.
1061 _operation_registry = OperationRegistry()
1062
1063
1064 def _register_operation(cls: typing.Type[OperationImplementation]):
1065     assert isinstance(cls, type)
1066     assert issubclass(cls, OperationImplementation)
1067     operation = _make_registry_key(cls)
1068     if operation in _operation_registry:
1069         full_name = str(operation)
1070         raise exceptions.ProtocolError('Attempting to redefine operation {}.'.format(full_name))
1071     _operation_registry[operation] = cls
1072
1073
1074 # TODO: replace with a generic function that we dispatch on so the type checker can infer a return type.
1075 def _get_operation_director(operation, context: gmx.abc.Context):
1076     """
1077
1078     :param operation:
1079     :param context:
1080     :return: gmxapi.abc.OperationDirector
1081     """
1082     registrant = _operation_registry[operation]
1083     director = registrant.director(context=context)
1084     return director
1085
1086
1087 class InputDescription(abc.ABC):
1088     """Node input description for gmxapi.operation module.
1089
1090     Provides the support needed to understand operation inputs in gmxapi.operation
1091     module Contexts.
1092
1093     .. todo:: Implementation base class with heritable behavior and/or helpers to
1094               compose this functionality from more normative description of
1095               operation inputs. This will probably become a facet of the ResourceDirector
1096               when specialized for gmxapi.operation.Context.
1097     """
1098
1099     @abc.abstractmethod
1100     def signature(self) -> InputCollectionDescription:
1101         """Mapping of named inputs and input type.
1102
1103         Used to determine valid inputs before an Operation node is created.
1104
1105         Collaborations:
1106             Related to the operation resource factory for this context.
1107
1108         ..  todo::
1109             Better unification of this protocol, InputCollectionDescription, and
1110             resource factory.
1111             Note, also, that the *bind* method of the returned InputCollectionDescription
1112             serves as the resource factory for input to the node builder.
1113         """
1114         ...
1115
1116     @abc.abstractmethod
1117     def make_uid(self, input: 'DataEdge') -> str:
1118         """The unique identity of an operation node tags the output with respect to the input.
1119
1120         Combines information on the Operation details and the input to uniquely
1121         identify the Operation node.
1122
1123         Arguments:
1124             input : A (collection of) data source(s) that can provide Fingerprints.
1125
1126         Used internally by the Context to manage ownership of data sources, to
1127         locate resources for nodes in work graphs, and to manage serialization,
1128         deserialization, and checkpointing of the work graph.
1129
1130         The UID is a detail of the generic Operation that _should_ be independent
1131         of the Context details to allow the framework to manage when and where
1132         an operation is executed.
1133
1134         TODO: We probably don't want to allow Operations to single-handedly determine their
1135          own uniqueness, but they probably should participate in the determination with the Context.
1136
1137         TODO: Context implementations should be allowed to optimize handling of
1138          equivalent operations in different sessions or work graphs, but we do not
1139          yet guarantee that UIDs are globally unique!
1140
1141         To be refined...
1142         """
1143         ...
1144
1145
1146 class ConcreteInputDescription(InputDescription):
1147     """Simple composed InputDescription."""
1148
1149     def __init__(self,
1150                  input_signature: InputCollectionDescription,
1151                  uid_helper: typing.Callable[['DataEdge'], str]
1152                  ):
1153         self._input_signature_description = input_signature
1154         self._uid_helper = uid_helper
1155
1156     def signature(self) -> InputCollectionDescription:
1157         return self._input_signature_description
1158
1159     def make_uid(self, input: 'DataEdge') -> str:
1160         return self._uid_helper(input)
1161
1162
1163 class OperationMeta(abc.ABCMeta):
1164     """Metaclass to manage the definition of Operation implementation classes.
1165
1166     Design Note:
1167         Note that this metaclass can be superseded by `__init_subclass__()`.
1168         See :issue:`4116`.
1169
1170     """
1171
1172     def __new__(meta, name, bases, class_dict):
1173         cls = super().__new__(meta, name, bases, class_dict)
1174         # Register subclasses, but not the base class.
1175         if issubclass(cls, OperationImplementation) and cls is not OperationImplementation:
1176             # TODO: Remove OperationDetailsBase layer and this extra check.
1177             # Note: we do not yet register the Operations built dynamically because we
1178             # don't have a clear definition of unique implementations yet. For instance,
1179             # a new OperationDetails class is defined for each call to gmx.join_arrays
1180             # TODO: Properly register and reuse Operations defined dynamically
1181             #  through function_wrapper (currently encompassed by OperationDetailsBase subclasses)
1182             if name != 'OperationDetailsBase':
1183                 if OperationDetailsBase not in bases:
1184                     _register_operation(cls)
1185         return cls
1186
1187
1188 class OperationDetailsBase(OperationImplementation, InputDescription,
1189                            metaclass=OperationMeta):
1190     """Abstract base class for Operation details in this module's Python Context.
1191
1192     Provides necessary interface for use with gmxapi.operation.ResourceManager.
1193     Separates the details of an Operation from those of the ResourceManager in
1194     a given Context.
1195
1196     OperationDetails classes are almost stateless, serving mainly to compose implementation
1197     details. Instances (operation objects) provide the Context-dependent interfaces
1198     for a specific node in a work graph.
1199
1200     OperationDetails subclasses are created dynamically by function_wrapper and
1201     make_operation.
1202
1203     Developer note: when subclassing, note that the ResourceManager is responsible
1204     for managing Operation state. Do not add instance data members related to
1205     computation or output state.
1206
1207     TODO: determine what is acceptable instance data and/or initialization information.
1208     Note that currently the subclass in function_wrapper has _no_ initialization input,
1209     but does not yet handle input-dependent output specification or node fingerprinting.
1210     It seems likely that instance initialization will require some characterization of
1211     supplied input, but nothing else. Even that much is not necessary if the instance
1212     is completely stateless, but that would require additional parameters to the member
1213     functions. However, an instance should be tied to a specific ResourceManager and
1214     Context, so weak references to these would be reasonable.
1215     """
1216
1217     @abc.abstractmethod
1218     def output_description(self) -> OutputCollectionDescription:
1219         """Mapping of available outputs and types for an existing Operation node."""
1220         ...
1221
1222     @abc.abstractmethod
1223     def publishing_data_proxy(self, *,
1224                               instance: SourceResource[typing.Any, _PublishingDataProxyType],
1225                               client_id) -> _PublishingDataProxyType:
1226         """Factory for Operation output publishing resources.
1227
1228         Used internally when the operation is run with resources provided by instance."""
1229         ...
1230
1231     @abc.abstractmethod
1232     def output_data_proxy(self,
1233                           instance: SourceResource[_OutputDataProxyType, typing.Any]
1234                           ) -> _OutputDataProxyType:
1235         """Get an object that can provide Futures for output data managed by instance."""
1236         ...
1237
1238     @abc.abstractmethod
1239     def __call__(self, resources: _Resources):
1240         """Execute the operation with provided resources.
1241
1242         Resources are prepared in an execution context with aid of resource_director()
1243
1244         After the first call, output data has been published and is trivially
1245         available through the output_data_proxy()
1246         """
1247         ...
1248
1249     @classmethod
1250     @abc.abstractmethod
1251     def resource_director(cls, *, input, output: _PublishingDataProxyType) -> _Resources[_PublishingDataProxyType]:
1252         """a Director factory that helps build the Session Resources for the function.
1253
1254         The Session launcher provides the director with all of the resources previously
1255         requested/negotiated/registered by the Operation. The director uses details of
1256         the operation to build the resources object required by the operation runner.
1257
1258         For the Python Context, the protocol is for the Context to call the
1259         resource_director instance method, passing input and output containers.
1260         (See, for example, gmxapi.operation.PyFunctionRunnerResources)
1261         """
1262         ...
1263
1264     # TODO: Don't run the director. Just return the correct callable.
1265     @classmethod
1266     def operation_director(cls, *args, context: 'Context', label=None, **kwargs) -> AbstractOperation:
1267         """Dispatching Director for adding a work node.
1268
1269         A Director for input of a particular sort knows how to reconcile
1270         input with the requirements of the Operation and Context node builder.
1271         The Director (using a less flexible / more standard interface)
1272         builds the operation node using a node builder provided by the Context.
1273
1274         This is essentially the creation method, instead of __init__, but the
1275         object is created and owned by the framework, and the caller receives
1276         an OperationHandle instead of a reference to an instance of cls.
1277
1278         # TODO: We need a way to compose this functionality for arbitrary Contexts.
1279         # That likely requires traits on the Contexts, and registration of Context
1280         # implementations. It seems likely that an Operation will register Director
1281         # implementations on import, and dispatching will be moved to the Context
1282         # implementations, which can either find an appropriate OperationDirector
1283         # or raise a compatibility error. To avoid requirements on import order of
1284         # Operations and Context implementations, we can change this to a non-abstract
1285         # dispatching method, requiring registration in the global gmxapi.context
1286         # module, or get rid of this method and use something like pkg_resources
1287         # "entry point" groups for independent registration of Directors and Contexts,
1288         # each annotated with relevant traits. E.g.:
1289         # https://setuptools.readthedocs.io/en/latest/setuptools.html#dynamic-discovery-of-services-and-plugins
1290         """
1291         if not isinstance(context, Context):
1292             raise exceptions.UsageError('Context instance needed for dispatch.')
1293         # TODO: use Context characteristics rather than isinstance checks.
1294         if isinstance(context, ModuleContext):
1295             construct = OperationDirector(*args, operation_details=cls, context=context, label=label, **kwargs)
1296             return construct()
1297         elif isinstance(context, SubgraphContext):
1298             construct = OperationDirector(*args, operation_details=cls, context=context, label=label, **kwargs)
1299             return construct()
1300         else:
1301             raise exceptions.ApiError('Cannot dispatch operation_director for context {}'.format(context))
1302
1303
1304 # TODO: Implement observer pattern for edge->node data flow.
1305 # Step 0: implement subject interface subscribe()
1306 # Step 1: implement subject interface get_state()
1307 # Step 2: implement observer interface update()
1308 # Step 3: implement subject interface notify()
1309 # Step 4: implement observer hook to support substantial change in source that
1310 #         invalidates downstream fingerprinting, such as a new subgraph iteration.
1311 # class Observer(object):
1312 #     """Abstract base class for data observers."""
1313 #     def rebind(self, edge: DataEdge):
1314 #         """Recreate the Operation at the consuming end of the DataEdge."""
1315
1316
1317 class Future(_Future):
1318     """gmxapi data handle.
1319
1320     Future is currently more than a Future right now. (should be corrected / clarified.)
1321     Future is also a facade to other features of the data provider.
1322
1323     Future objects are the most atomic interface between Contexts. User scripts
1324     may hold Futures from which they extract data with result(). Operation output
1325     used as input for another Operation can be decomposed such that the Operation
1326     factory has only Future objects in its input.
1327
1328     TODO: ``subscribe`` method allows consumers to bind as Observers.
1329
1330     TODO: extract the abstract class for input inspection?
1331     Currently abstraction is handled through SourceResource subclassing.
1332
1333     Attributes:
1334         description (ResultDescription): Describes the result to be obtained from this Future.
1335
1336     """
1337
1338     def __init__(self, resource_manager: SourceResource, name: str, description: ResultDescription):
1339         self.name = name
1340         if not isinstance(description, ResultDescription):
1341             raise exceptions.ValueError('Need description of requested data.')
1342         self.description = description  # type: ResultDescription
1343         self.resource_manager = resource_manager
1344
1345         # Deprecated. We should not "reset" futures, but reconstitute them, but we
1346         # need to move the data model to a subscription-based system so that we can
1347         # make Futures properly immutable and issue new ones across subgraph iterations.
1348         self._number_of_resets = 0
1349
1350     def __eq__(self, other):
1351         # This function is defined because we have defined __hash__().
1352         # Before customizing __eq__(), recall that Python objects that compare
1353         # equal should hash to the same value.
1354         # Please keep the two functions semantically correct.
1355         return object.__eq__(self, other)
1356
1357     def __hash__(self):
1358         # We cannot properly determine equivalency beyond the scope of a ResourceManager instance
1359         # without more developed data flow fingerprinting.
1360         return hash((id(self.resource_manager), self.name, self.description, self._number_of_resets))
1361
1362     def __str__(self):
1363         return '<Future: name={}, description={}>'.format(self.name, self.description)
1364
1365     def result(self) -> ResultTypeVar:
1366         """Fetch data to the caller's Context.
1367
1368         Returns an object of the concrete type specified according to
1369         the operation that produces this Result.
1370
1371         Ensemble data are returned as a list. Scalar results or results from single
1372         member ensembles are returned as scalars.
1373         """
1374         self.resource_manager.update_output()
1375         # Return ownership of concrete data
1376         handle = self.resource_manager.get(self.name)
1377
1378         # For intuitive use in non-ensemble cases, we represent data as bare scalars
1379         # when possible. It is easier for users to cast scalars to lists of length 1
1380         # than to introspect their own code to determine if a list of length 1 is
1381         # part of an ensemble or not. The data model will become clearer as we
1382         # develop more robust handling of multidimensional data and data flow topologies.
1383         # In the future,
1384         # we may distinguish between data of shape () and shape (1,), but we will need
1385         # to be careful with semantics. We are already starting to adopt a rule-of-thumb
1386         # that data objects assume the minimum dimensionality necessary unless told
1387         # otherwise, and we could make that a hard rule if it doesn't make other things
1388         # too difficult.
1389         if self.description.width == 1:
1390             return handle.data(member=0)
1391         else:
1392             return handle.data()
1393
1394     def _reset(self):
1395         """Mark the Future "not done" to allow reexecution.
1396
1397         Invalidates cached results, resets "done" markers in data sources, and
1398         triggers _reset recursively.
1399
1400         Note: this is a hack that is inconsistent with the plan of unique mappings
1401         of inputs to outputs, but allows a quick prototype for looping operations.
1402         """
1403         self._number_of_resets += 1
1404         self.resource_manager.reset()
1405
1406     @property
1407     def dtype(self):
1408         return self.description.dtype
1409
1410     def __getitem__(self, item):
1411         """Get a more limited view on the Future."""
1412         description = ResultDescription(dtype=self.dtype, width=self.description.width)
1413         # TODO: Use explicit typing when we have more thorough typing.
1414         description._dtype = None
1415         if self.description.width == 1:
1416             proxy = ProxyResourceManager(self,
1417                                          width=description.width,
1418                                          function=lambda value, key=item: value[key])
1419         else:
1420             proxy = ProxyResourceManager(self,
1421                                          width=description.width,
1422                                          function=lambda value, key=item:
1423                                          [subscriptable[key] for subscriptable in value])
1424         return proxy.future(self.name, description=description)
1425
1426
1427 class OutputDataDescriptor(ProxyDataDescriptor):
1428     """Read-only data descriptor for proxied access to output data.
1429
1430     Knows how to get a Future from the resource manager.
1431     """
1432
1433     # TODO: Reconcile the internal implementation details with the visibility and
1434     #  usages of this class.
1435
1436     def __get__(self, proxy: DataProxyBase, owner):
1437         if proxy is None:
1438             # Access through class attribute of owner class
1439             return self
1440         result_description = ResultDescription(dtype=self._dtype, width=proxy.ensemble_width)
1441
1442         return proxy._resource_instance.future(name=self._name, description=result_description)
1443
1444
1445 class MutableResourceDescriptor(ProxyDataDescriptor):
1446     """Accessor for rich binding interfaces.
1447
1448     Allows operations to access resources beyond the scope of the current
1449     resource manager. Used by operations whose interactions are more complicated
1450     than standard typed data flow at the scope of the current Context.
1451
1452     Instead of a Future interface, the returned object is a MutableResource with
1453     which a subscriber can collaborate with lower-level protocols.
1454     """
1455
1456     def __get__(self, proxy: DataProxyBase, owner) -> typing.Union[MutableResource, 'MutableResourceDescriptor']:
1457         if proxy is None:
1458             # Access through class attribute of owner class. We don't have a
1459             # specified use case for that, so allow inspection of the data
1460             # descriptor instance, itself.
1461             return self
1462         # TODO: implement.
1463         # The protocol for MD extension plugins requires that the simulation operation
1464         # subscribe to the plugin. Then the Context allows the plugin to access the
1465         # MdRunner interface as the simulation is launched.
1466         # The protocol for modify_input and for mdrun to consume the TPR payload
1467         # of read_tpr or modify_input should allow us to use the gmxapi 0.0.7
1468         # WorkSpec to configure and launch a simulation, which we can do by feeding
1469         # forward and building a fused operation at the mdrun node. The information
1470         # fed forward can just be references to the inputs and parameters of the
1471         # earlier operations, with annotations so that we know the intended behavior.
1472
1473
1474 def define_output_data_proxy(output_description: OutputCollectionDescription) -> typing.Type[DataProxyBase]:
1475     descriptors = {name: OutputDataDescriptor(name, description) for name, description in output_description.items()}
1476
1477     class OutputDataProxy(DataProxyBase, descriptors=descriptors):
1478         """Handler for read access to the `output` member of an operation handle.
1479
1480         Acts as a sort of ResultCollection.
1481
1482         A ResourceManager creates an OutputDataProxy instance at initialization to
1483         provide the ``output`` property of an operation handle.
1484         """
1485
1486     # Note: the OutputDataProxy has an inherent ensemble shape in the context
1487     # in which it is used, but that is an instance characteristic, not part of this type definition.
1488     # TODO: (FR5) The current tool does not support topology changing operations.
1489     return OutputDataProxy
1490
1491
1492 # Encapsulate the description of the input data flow.
1493 PyFuncInput = collections.namedtuple('Input', ('args', 'kwargs', 'dependencies'))
1494
1495
1496 class SinkTerminal(object):
1497     """Operation input end of a data edge.
1498
1499     In addition to the information in an InputCollectionDescription, includes
1500     topological information for the Operation node (ensemble width).
1501
1502     Collaborations: Required for creation of a DataEdge. Created with knowledge
1503     of a DataSourceCollection instance and a InputCollectionDescription.
1504     """
1505
1506     # TODO: This clearly maps to a Builder pattern.
1507     # I think we want to get the sink terminal builder from a factory parameterized by InputCollectionDescription,
1508     # add data source collections, and then build the sink terminal for the data edge.
1509     def __init__(self, input_collection_description: InputCollectionDescription):
1510         """Define an appropriate data sink for a new operation node.
1511
1512         Resolve data sources and input description to determine connectability,
1513         topology, and any necessary implicit data transformations.
1514
1515         :param input_collection_description: Available inputs for Operation
1516         :return: Fully formed description of the Sink terminal for a data edge to be created.
1517
1518         Collaborations: Execution Context implementation.
1519         """
1520         self.ensemble_width = 1
1521         self.inputs = input_collection_description
1522
1523     def __str__(self):
1524         return '<SinkTerminal: ensemble_width={}>'.format(self.ensemble_width)
1525
1526     def update_width(self, width: int):
1527         if not isinstance(width, int):
1528             try:
1529                 width = int(width)
1530             except TypeError:
1531                 raise exceptions.TypeError('Need an integer width > 0.')
1532         if width < 1:
1533             raise exceptions.ValueError('Nonsensical ensemble width: {}'.format(int(width)))
1534         if self.ensemble_width != 1:
1535             if width != self.ensemble_width:
1536                 raise exceptions.ValueError(
1537                     'Cannot change ensemble width {} to width {}.'.format(self.ensemble_width, width))
1538         self.ensemble_width = width
1539
1540     def update(self, data_source_collection: DataSourceCollection):
1541         """Update the SinkTerminal with the proposed data provider."""
1542         for name, sink_dtype in self.inputs.items():
1543             if name not in data_source_collection:
1544                 # If/when we accept data from multiple sources, we'll need some additional sanity checking.
1545                 if not hasattr(self.inputs.signature.parameters[name], 'default'):
1546                     raise exceptions.UsageError('No data or default for {}'.format(name))
1547             else:
1548                 # With a single data source, we need data to be in the source or have a default
1549                 assert name in data_source_collection
1550                 assert issubclass(sink_dtype, valid_result_types)
1551                 source = data_source_collection[name]
1552                 logger.debug('Updating Sink for source {}: {}.'.format(name, source))
1553                 if isinstance(source, sink_dtype):
1554                     logger.debug('Source matches sink. No update necessary.')
1555                     continue
1556                 else:
1557                     if isinstance(source, collections.abc.Iterable) and not isinstance(source, (
1558                             str, bytes, collections.abc.Mapping)):
1559                         assert isinstance(source, datamodel.NDArray)
1560                         if sink_dtype != datamodel.NDArray:
1561                             # Source is NDArray, but sink is not. Implicitly scatter.
1562                             self.update_width(len(source))
1563                         continue
1564                     if hasattr(source, 'description'):
1565                         source_description = typing.cast(ResultDescription, source.description)
1566                         source_dtype = source_description.dtype
1567                         assert isinstance(sink_dtype, type)
1568                         # TODO: Handle typing of Future slices when we have a better data model.
1569                         if source_dtype is not None:
1570                             assert isinstance(source_dtype, type)
1571                             if not issubclass(source_dtype, sink_dtype):
1572                                 raise exceptions.TypeError('Expected {} but got {}.'.format(sink_dtype, source_dtype))
1573                         source_width = source.description.width
1574                         self.update_width(source_width)
1575
1576
1577 class DataEdge(object):
1578     """State and description of a data flow edge.
1579
1580     A DataEdge connects a data source collection to a data sink. A sink is an
1581     input or collection of inputs of an operation (or fused operation). An operation's
1582     inputs may be fed from multiple data source collections, but an operation
1583     cannot be fully instantiated until all of its inputs are bound, so the DataEdge
1584     is instantiated at the same time the operation is instantiated because the
1585     required topology of a graph edge may be determined by the required topology
1586     of another graph edge.
1587
1588     A data edge has a well-defined topology only when it is terminated by both
1589     a source and sink. Creation requires that a source collection is compared to
1590     a sink description.
1591
1592     Calling code initiates edge creation by passing well-described data sources
1593     to an operation factory. The data sources may be annotated with explicit scatter
1594     or gather commands.
1595
1596     The resource manager for the new operation determines the
1597     required shape of the sink to handle all of the offered input.
1598
1599     Broadcasting
1600     and transformations of the data sources are then determined and the edge is
1601     established.
1602
1603     At that point, the fingerprint of the input data at each operation
1604     becomes available to the resource manager for the operation. The fingerprint
1605     has sufficient information for the resource manager of the operation to
1606     request and receive data through the execution context.
1607
1608     Instantiating operations and data edges implicitly involves collaboration with
1609     a Context instance. The state of a given Context or the availability of a
1610     default Context through a module function may affect the ability to instantiate
1611     an operation or edge. In other words, behavior may be different for connections
1612     being made in the scripting environment versus the running Session, and implementation
1613     details can determine whether or not new operations or data flow can occur in
1614     different code environments.
1615     """
1616
1617     class ConstantResolver(object):
1618         def __init__(self, value):
1619             self.value = value
1620
1621         def __call__(self, member=None):
1622             return self.value
1623
1624     def __init__(self, source_collection: DataSourceCollection, sink_terminal: SinkTerminal):
1625         # Adapters are callables that transform a source and node ID to local data.
1626         # Every key in the sink has an adapter.
1627         self.adapters = {}
1628         self.source_collection = source_collection
1629         self.sink_terminal = sink_terminal
1630         for name in sink_terminal.inputs:
1631             if name not in source_collection:
1632                 if hasattr(sink_terminal.inputs[name], 'default'):
1633                     self.adapters[name] = self.ConstantResolver(sink_terminal.inputs[name])
1634                 else:
1635                     # TODO: Initialize with multiple DataSourceCollections?
1636                     raise exceptions.ValueError('No source or default for required input "{}".'.format(name))
1637             else:
1638                 source = source_collection[name]
1639                 sink = sink_terminal.inputs[name]
1640                 if isinstance(source, (str, bool, int, float, dict)):
1641                     if issubclass(sink, (str, bool, int, float, dict)):
1642                         self.adapters[name] = self.ConstantResolver(source)
1643                     else:
1644                         assert issubclass(sink, datamodel.NDArray)
1645                         self.adapters[name] = self.ConstantResolver(datamodel.ndarray([source]))
1646                 elif isinstance(source, datamodel.NDArray):
1647                     if issubclass(sink, datamodel.NDArray):
1648                         # TODO: shape checking
1649                         # Implicit broadcast may not be what is intended
1650                         self.adapters[name] = self.ConstantResolver(source)
1651                     else:
1652                         if source.shape[0] != sink_terminal.ensemble_width:
1653                             raise exceptions.ValueError(
1654                                 'Implicit broadcast could not match array source to ensemble sink')
1655                         else:
1656                             self.adapters[name] = lambda member, source=source: source[member]
1657                 elif hasattr(source, 'result'):
1658                     # Handle data futures...
1659                     # If the Future is part of an ensemble, result() will return a list.
1660                     # Otherwise, it will return a single object.
1661                     ensemble_width = source.description.width
1662                     # TODO: subscribe to futures so results can be pushed.
1663                     if ensemble_width == 1:
1664                         self.adapters[name] = lambda member, source=source: source.result()
1665                     else:
1666                         self.adapters[name] = lambda member, source=source: source.result()[member]
1667                 else:
1668                     assert isinstance(source, EnsembleDataSource)
1669                     self.adapters[name] = lambda member, source=source: source.node(member)
1670
1671     def __str__(self):
1672         return '<DataEdge: source_collection={}, sink_terminal={}>'.format(self.source_collection, self.sink_terminal)
1673
1674     def reset(self):
1675         self.source_collection.reset()
1676
1677     def resolve(self, key: str, member: int):
1678         return self.adapters[key](member=member)
1679
1680     def sink(self, node: int) -> dict:
1681         """Consume data for the specified sink terminal node.
1682
1683         Run-time utility delivers data from the bound data source(s) for the
1684         specified terminal that was configured when the edge was created.
1685
1686         Terminal node is identified by a member index number.
1687
1688         Returns:
1689             A Python dictionary of the provided inputs as local data (not Future).
1690         """
1691         results = {}
1692         sink_ports = self.sink_terminal.inputs
1693         for key in sink_ports:
1694             results[key] = self.resolve(key, node)
1695         return results
1696
1697
1698 class ResourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]):
1699     """Provides data publication and subscription services.
1700
1701         Owns the data published by the operation implementation or served to consumers.
1702         Mediates read and write access to the managed data streams.
1703
1704         This ResourceManager implementation is defined in conjunction with a
1705         run-time definition of an Operation that wraps a Python callable (function).
1706         ResourceManager is instantiated with a reference to the callable.
1707
1708         When the Operation is run, the resource manager prepares resources for the wrapped
1709         function. Inputs provided to the Operation factory are provided to the
1710         function as keyword arguments. The wrapped function publishes its output
1711         through the (additional) ``output`` key word argument. This argument is
1712         a short-lived resource, prepared by the ResourceManager, with writable
1713         attributes named in the call to function_wrapper().
1714
1715         After the Operation has run and the outputs published, the data managed
1716         by the ResourceManager is marked "done."
1717
1718         Protocols:
1719
1720         The data() method produces a read-only collection of outputs named for
1721         the Operation when the Operation's ``output`` attribute is accessed.
1722
1723         publishing_resources() can be called once during the ResourceManager lifetime
1724         to provide the ``output`` object for the wrapped function. (Used by update_output().)
1725
1726         update_output() brings the managed output data up-to-date with the input
1727         when the Operation results are needed. If the Operation has not run, an
1728         execution session is prepared with input and output arguments for the
1729         wrapped Python callable. Output is publishable only during this session.
1730
1731     TODO: This functionality should evolve to be a facet of Context implementations.
1732      There should be no more than one ResourceManager instance per work graph
1733      node in a Context. This will soon be at odds with letting the ResourceManager
1734      be owned by an operation instance handle.
1735     TODO: The publisher and data objects can be more strongly defined through
1736      interaction between the Context and clients.
1737
1738     Design notes:
1739
1740     The normative pattern for updating data is to execute a node in the work
1741     graph, passing Resources for an execution Session to an operation runner.
1742     The resources and runner are dependent on the implementation details of
1743     the operation and the execution context, so logical execution may look
1744     like the following.
1745
1746         resource_builder = ResourcesBuilder()
1747         runner_builder = RunnerBuilder()
1748         input_resource_director = input_resource_factory.director(input)
1749         output_resource_director = publishing_resource_factory.director(output)
1750         input_resource_director(resource_builder, runner_builder)
1751         output_resource_director(resource_builder, runner_builder)
1752         resources = resource_builder.build()
1753         runner = runner_builder.build()
1754         runner(resources)
1755
1756     Only the final line is intended to be literal. The preceding code, if it
1757     exists in entirety, may be spread across several code comments.
1758
1759     TODO: Data should be pushed, not pulled.
1760     Early implementations executed operation code and extracted results directly.
1761     While we need to be able to "wait for" results and alert the data provider that
1762     we are ready for input, we want to defer execution management and data flow to
1763     the framework.
1764     """
1765
1766     @contextmanager
1767     def __publishing_context(self, ensemble_member=0) -> typing.Iterator[_PublishingDataProxyType]:
1768         """Get a context manager for resolving the data dependencies of this node.
1769
1770         The returned object is a Python context manager (used to open a `with` block)
1771         to define the scope in which the operation's output can be published.
1772         'output' type resources can be published exactly once, and only while the
1773         publishing context is active. (See operation.function_wrapper())
1774
1775         Used internally to implement ResourceManager.publishing_resources()
1776
1777         Responsibilities of the context manager are to:
1778             * (TODO) Make sure dependencies are resolved.
1779             * Make sure outputs are marked 'done' when leaving the context.
1780
1781         """
1782
1783         # TODO:
1784         # if self._data.done():
1785         #     raise exceptions.ProtocolError('Resources have already been published.')
1786
1787         # I don't think we want the OperationDetails to need to know about ensemble data,
1788         # (though the should probably be allowed to), so we may need a separate interface
1789         # for the resource manager with built-in scope-limiting to a single ensemble member.
1790         # Right now, one Operation handle owns one ResourceManager (which takes care of
1791         # the ensemble details), which owns one OperationDetails (which has no ensemble knowledge).
1792         # It is the responsibility of the calling code to make sure the PublishingDataProxy
1793         # gets used correctly.
1794
1795         # ref: https://docs.python.org/3/library/contextlib.html#contextlib.contextmanager
1796         if self._done[ensemble_member]:
1797             raise exceptions.ProtocolError('Attempting to publish {}[{}] more than once.'.format(self.operation_id, ensemble_member))
1798
1799         try:
1800             resource = self.__publishing_data_proxy(instance=weakref.proxy(self),
1801                                                     client_id=ensemble_member)
1802         except Exception as e:
1803             logger.debug('Publishing context could not be created due to {}'.format(e))
1804             raise e
1805
1806         yield resource
1807         # Note: The remaining lines are skipped if an exception occurs in the `with` block
1808         # for the contextmanager suite, which effectively raises at the line after 'yield'.
1809         logger.debug('Published output for {} member {}'.format(self.operation_id, ensemble_member))
1810         self._done[ensemble_member] = True
1811
1812     def __init__(self, *,
1813                  source: DataEdge,
1814                  operation_id,
1815                  output_description: OutputCollectionDescription,
1816                  output_data_proxy: typing.Type[_OutputDataProxyType],
1817                  publishing_data_proxy: typing.Type[_PublishingDataProxyType],
1818                  resource_factory,
1819                  runner_director,
1820                  output_context: 'Context'):
1821         """Initialize a resource manager for the inputs and outputs of an operation.
1822         """
1823         # Note: This implementation assumes there is one ResourceManager instance per data source,
1824         # so we only stash the inputs and dependency information for a single set of resources.
1825         # TODO: validate input_fingerprint as its interface becomes clear.
1826         self._input_edge = source
1827         self.ensemble_width = self._input_edge.sink_terminal.ensemble_width
1828
1829         # Node UID.
1830         self.operation_id = operation_id
1831
1832         if isinstance(output_context, Context):
1833             self._output_context = output_context
1834         else:
1835             message = 'Provide an instance of gmxapi.operation.Context for output_context'
1836             raise exceptions.UsageError(message)
1837         assert self._output_context is not None
1838
1839         self._output_data_proxy = output_data_proxy
1840         assert self._output_data_proxy is not None
1841         assert callable(self._output_data_proxy)
1842
1843         self._output_description = output_description
1844         assert self._output_description is not None
1845
1846         self.__publishing_data_proxy = publishing_data_proxy
1847         assert self.__publishing_data_proxy is not None
1848         assert callable(self.__publishing_data_proxy)
1849
1850         self._runner_director = runner_director
1851         assert self._runner_director is not None
1852         self._resource_factory = resource_factory
1853         assert self._resource_factory is not None
1854
1855         self._data = _make_datastore(output_description=self._output_description,
1856                                      ensemble_width=self.ensemble_width)
1857
1858         # We store a rereference to the publishing context manager implementation
1859         # in a data structure that can only produce one per Python interpreter
1860         # (using list.pop()).
1861         # TODO: reimplement as a data descriptor
1862         #  so that PublishingDataProxy does not need a bound circular reference.
1863         self.__publishing_resources = [self.__publishing_context]
1864
1865         self._done = [False] * self.ensemble_width
1866         self.__operation_entrance_counter = 0
1867
1868     def width(self) -> int:
1869         return self.ensemble_width
1870
1871     def reset(self):
1872         self.__operation_entrance_counter = 0
1873         self._done = [False] * self.ensemble_width
1874         self.__publishing_resources = [self.__publishing_context]
1875         for data in self._data.values():
1876             data.reset()
1877         self._input_edge.reset()
1878         assert self.__operation_entrance_counter == 0
1879
1880     def done(self, member=None):
1881         if member is None:
1882             return all(self._done)
1883         else:
1884             return self._done[member]
1885
1886     def set_result(self, name, value, member: int):
1887         if not isinstance(value, (str, bytes)):
1888             try:
1889                 for item in value:
1890                     # In this specification, it is antithetical to publish Futures.
1891                     if hasattr(item, 'result'):
1892                         raise exceptions.ApiError('Operation produced Future instead of real output.')
1893             except TypeError:
1894                 # Ignore when `item` is not iterable.
1895                 pass
1896         self._data[name].set(value=value, member=member)
1897
1898     def is_done(self, name):
1899         return self._data[name].done
1900
1901     def get(self, name: str):
1902         """
1903
1904         Raises exceptions.ProtocolError if requested data is not local yet.
1905         Raises exceptions.ValueError if data is requested for an unknown name.
1906         """
1907         if name not in self._data:
1908             raise exceptions.ValueError('Request for unknown data.')
1909         if not self.is_done(name):
1910             raise exceptions.ProtocolError('Data not ready.')
1911         assert isinstance(self._data[name], OutputData)
1912         return self._data[name]
1913
1914     # TODO: Normalize. This is the no-argument, no-return callable member of an
1915     #  operation handle that dispatches to another Context (the operation implementation).
1916     # TODO: Allow update of a single ensemble member. As written, this always updates all ensemble members. That can
1917     #  be the default behavior, but we don't want to require non-local updates in all cases.
1918     def update_output(self):
1919         """Bring the output of the bound operation up to date.
1920
1921         Execute the bound operation once if and only if it has not
1922         yet been run in the lifetime of this resource manager.
1923
1924         Used internally to implement Futures for the local operation
1925         associated with this resource manager.
1926
1927         Raises:
1928             exceptions.ApiError if operation runner fails to publish output.
1929
1930         TODO: More comprehensive error handling for operations that fail to execute.
1931
1932         TODO: We need a different implementation for an operation whose output
1933          is served by multiple resource managers. E.g. an operation whose output
1934          is available across the ensemble, but which should only be executed on
1935          a single ensemble member.
1936         """
1937         # This code is not intended to be reentrant. We make a modest attempt to
1938         # catch unexpected reentrance, but this is not (yet) intended to be a thread-safe
1939         # resource manager implementation.
1940         # TODO: Handle checking just the ensemble members this resource manager is responsible for.
1941         # TODO: Replace with a managed observer pattern. Update once when input is available in the Context.
1942         if not self.done():
1943             # Note: This check could also be encapsulated in a run_once decorator that
1944             # could even set a data descriptor to change behavior.
1945             self.__operation_entrance_counter += 1
1946             if self.__operation_entrance_counter > 1:
1947                 raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
1948             if not self.done():
1949                 # Note! This is a detail of the ResourceManager in a SerialContext
1950                 # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
1951                 #  operation's library/implementation context.
1952                 publishing_resources = self.publishing_resources()
1953                 for i in range(self.ensemble_width):
1954                     # TODO: rewrite the following expression as a call to a resource factory.
1955                     # TODO: Consider whether the resource_factory behavior should be normalized
1956                     #  to always use `with` blocks to indicate the lifetime of a resource handle.
1957                     #  That implies that an operation handle can expire, but the operation handle
1958                     #  could be "yield"ed
1959                     #  from within the `with` block to keep the resource scope alive until the resulting
1960                     #  generator is exhausted. Not sure what that looks like or what the use case would be.
1961                     with self.local_input(i) as input:
1962                         # Note: Resources are marked "done" by the publishing system
1963                         # before the following context manager finishes exiting.
1964                         with publishing_resources(ensemble_member=i) as output:
1965                             # self._runner(*input.args, output=output, **input.kwargs)
1966                             ####
1967                             # Here we can make _runner a thing that accepts session resources, and
1968                             # is created by specializable builders. Separate out the expression of
1969                             # inputs.
1970                             #
1971                             # resource_builder = OperationDetails.ResourcesBuilder(context)
1972                             # runner_builder = OperationDetails.RunnerBuilder(context)
1973                             # input_resource_director = self._input_resource_factory.director(input)
1974                             # output_resource_director = self._publishing_resource_factory.director(output)
1975                             # input_resource_director(resource_builder, runner_builder)
1976                             # output_resource_director(resource_builder, runner_builder)
1977                             # resources = resource_builder.build()
1978                             # runner = runner_builder.build()
1979                             # runner(resources)
1980                             #
1981                             # This resource factory signature might need to be inverted or broken up
1982                             # into a builder for consistency. I.e.
1983                             # option 1: Make the input and output resources with separate factories and add_resource on
1984                             # the runner builder.
1985                             # option 2: Pass resource_builder to input_director and then output_director.
1986                             error_message = 'Got {} while executing {} for operation {}.'
1987                             try:
1988                                 resources = self._resource_factory(input=input, output=output)
1989                             except exceptions.TypeError as e:
1990                                 message = error_message.format(e, self._resource_factory, self.operation_id)
1991                                 raise exceptions.ApiError(message) from e
1992
1993                             runner = self._runner_director(resources)
1994                             try:
1995                                 runner()
1996                             except Exception as e:
1997                                 message = error_message.format(e, runner, self.operation_id)
1998                                 raise exceptions.ApiError(message) from e
1999             if not self.done():
2000                 message = 'update_output implementation failed to update all outputs for {}.'
2001                 message = message.format(self.operation_id)
2002                 raise exceptions.ApiError(message)
2003
2004     def future(self, name: str, description: ResultDescription):
2005         """Retrieve a Future for a named output.
2006
2007         Provide a description of the expected result to check for compatibility or
2008         implicit topological conversion.
2009
2010         TODO: (FR5+) Normalize this part of the interface between operation definitions and
2011          resource managers.
2012         """
2013         if not isinstance(name, str) or name not in self._data:
2014             raise exceptions.ValueError('"name" argument must name an output.')
2015         assert description is not None
2016         requested_dtype = description.dtype
2017         available_dtype = self._data[name]._description.dtype
2018         if requested_dtype != available_dtype:
2019             # TODO: framework to check for implicit conversions
2020             message = 'Requested Future of type {} is not compatible with available type {}.'
2021             message = message.format(requested_dtype, available_dtype)
2022             raise exceptions.ApiError(message)
2023         return Future(self, name, description)
2024
2025     def data(self) -> _OutputDataProxyType:
2026         """Get an adapter to the output resources to access results."""
2027         return self._output_data_proxy(self)
2028
2029     @contextmanager
2030     def local_input(self, member: int = None):
2031         """In an API session, get a handle to fully resolved locally available input data.
2032
2033         Execution dependencies are resolved on creation of the context manager. Input data
2034         becomes available in the ``as`` object when entering the context manager, which
2035         becomes invalid after exiting the context manager. Resources allocated to hold the
2036         input data may be released when exiting the context manager.
2037
2038         It is left as an implementation detail whether the context manager is reusable and
2039         under what circumstances one may be obtained.
2040         """
2041         # Localize data
2042         kwargs = self._input_edge.sink(node=member)
2043         assert 'input' not in kwargs
2044
2045         # Check that we have real data
2046         for key, value in kwargs.items():
2047             assert not hasattr(value, 'result')
2048             assert not hasattr(value, 'run')
2049             value_list = []
2050             if isinstance(value, list):
2051                 value_list = value
2052             if isinstance(value, datamodel.NDArray):
2053                 value_list = value._values
2054             if isinstance(value, collections.abc.Mapping):
2055                 value_list = value.values()
2056             assert not isinstance(value_list, Future)
2057             assert not hasattr(value_list, 'result')
2058             assert not hasattr(value_list, 'run')
2059             for item in value_list:
2060                 assert not hasattr(item, 'result')
2061
2062         input_pack = InputPack(kwargs=kwargs)
2063
2064         # Prepare input data structure
2065         # Note: we use 'yield' instead of 'return' for the protocol expected by
2066         # the @contextmanager decorator
2067         yield input_pack
2068
2069     def publishing_resources(self):
2070         """Get a context manager for resolving the data dependencies of this node.
2071
2072         Use the returned object as a Python context manager.
2073         'output' type resources can be published exactly once, and only while the
2074         publishing context is active.
2075
2076         Write access to publishing resources can be granted exactly once during the
2077         resource manager lifetime and conveys exclusive access.
2078         """
2079         return self.__publishing_resources.pop()
2080
2081
2082 class PyFunctionRunnerResources(collections.UserDict):
2083     """Runtime resources for Python functions.
2084
2085     Produced by a ResourceDirector for a particular Operation.
2086     """
2087
2088     def output(self):
2089         if 'output' in self:
2090             return self['output']
2091         else:
2092             return None
2093
2094     def input(self):
2095         return {key: value for key, value in self.items() if key != 'output'}
2096
2097
2098 class PyFunctionRunner(abc.ABC):
2099     def __init__(self, *, function: typing.Callable, output_description: OutputCollectionDescription):
2100         assert callable(function)
2101         self.function = function
2102         self.output_description = output_description
2103
2104     @abc.abstractmethod
2105     def __call__(self, resources: PyFunctionRunnerResources):
2106         self.function(output=resources.output(), **resources.input())
2107
2108
2109 class CapturedOutputRunner(PyFunctionRunner):
2110     """Function runner that captures return value as output.data"""
2111
2112     def __call__(self, resources: PyFunctionRunnerResources):
2113         resources['output'].data = self.function(**resources.input())
2114
2115
2116 class OutputParameterRunner(PyFunctionRunner):
2117     """Function runner that uses output parameter to let function publish output."""
2118
2119     def __call__(self, resources: PyFunctionRunnerResources):
2120         self.function(**resources)
2121
2122
2123 def wrapped_function_runner(function, output_description: OutputCollectionDescription = None) -> PyFunctionRunner:
2124     """Get an adapter for a function to be wrapped.
2125
2126     If the function does not accept a publishing data proxy as an `output`
2127     key word argument, the returned object has a `capture_output` attribute that
2128     must be re-assigned by the calling code before calling the runner. `capture_output`
2129     must be assigned to be a callable that will receive the output of the wrapped
2130     function.
2131
2132     Returns:
2133         Callable with a signature `__call__(*args, **kwargs)` and no return value
2134
2135     Collaborations:
2136         OperationDetails.resource_director assigns the `capture_output` member of the returned object.
2137     """
2138     assert callable(function)
2139     signature = inspect.signature(function)
2140
2141     # Implementation note: this function dispatches an implementation with the
2142     # logic below. A better factoring would be a "chain of responsibility" in
2143     # which the concrete Runners would be tried in sequence and determine internally
2144     # whether to create a runner, raise an error, or defer.
2145
2146     # Determine output details for proper dispatching.
2147     # First check for signature with output parameter.
2148     # TODO FR4: standardize typing
2149     if 'output' in signature.parameters:
2150         if not isinstance(output_description, OutputCollectionDescription):
2151             if not isinstance(output_description, collections.abc.Mapping):
2152                 raise exceptions.UsageError(
2153                     'Function passes output through call argument, but output is not described.')
2154             return OutputParameterRunner(
2155                 function=function,
2156                 output_description=OutputCollectionDescription(**output_description))
2157         else:
2158             return OutputParameterRunner(function=function,
2159                                          output_description=output_description)
2160     # Next try output_description parameter or function return annotation.
2161     else:
2162         if isinstance(output_description, OutputCollectionDescription):
2163             return_type = output_description['data'].gmxapi_datatype
2164         elif output_description is not None:
2165             # output_description should be None for inferred output or
2166             # a singular mapping of the key 'data' to a gmxapi type.
2167             if not isinstance(output_description, collections.abc.Mapping) \
2168                     or set(output_description.keys()) != {'data'}:
2169                 raise exceptions.ApiError(
2170                     'invalid output description for wrapped function: {}'.format(output_description))
2171             if signature.return_annotation != signature.empty:
2172                 if signature.return_annotation != output_description['data']:
2173                     raise exceptions.ApiError(
2174                         'Wrapped function with return-value-capture provided with non-matching output description.')
2175             return_type = output_description['data']
2176         else:
2177             # Use return type inferred from function signature.
2178             return_type = signature.return_annotation
2179         if return_type == signature.empty or return_type is None:
2180             raise exceptions.ApiError('No return annotation or output_description for {}'.format(function))
2181         return CapturedOutputRunner(function=function,
2182                                     output_description=OutputCollectionDescription(data=return_type))
2183
2184
2185 # TODO: Refactor in terms of reference to a node in a Context.
2186 #  ResourceManager is an implementation detail of how the Context
2187 #  manages a node.
2188 class OperationHandle(AbstractOperation[_OutputDataProxyType]):
2189     """Generic Operation handle for dynamically defined operations.
2190
2191     Define a gmxapi Operation for the functionality being wrapped by the enclosing code.
2192
2193     An Operation type definition encapsulates description of allowed inputs
2194     of an Operation. An Operation instance represents a node in a work graph
2195     with uniquely fingerprinted inputs and well-defined output. The implementation
2196     of the operation is a collaboration with the resource managers resolving
2197     data flow for output Futures, which may depend on the execution context.
2198     """
2199
2200     def __init__(self, resource_manager: SourceResource[_OutputDataProxyType, typing.Any]):
2201         """Initialization defines the unique input requirements of a work graph node.
2202
2203         Initialization parameters map to the parameters of the wrapped function with
2204         addition(s) to support gmxapi data flow and deferred execution.
2205
2206         If provided, an ``input`` keyword argument is interpreted as a parameter pack
2207         of base input. Inputs also present as standalone keyword arguments override
2208         values in ``input``.
2209
2210         Inputs that are handles to gmxapi operations or outputs induce data flow
2211         dependencies that the framework promises to satisfy before the Operation
2212         executes and produces output.
2213         """
2214         # TODO: When the resource manager can be kept alive by an enclosing or
2215         #  module-level Context, convert to a weakref.
2216         self.__resource_manager = resource_manager
2217         # The unique identifier for the operation node allows the Context implementation
2218         # to manage the state of the handle. Reproducibility of node_uid is TBD, but
2219         # it must be unique in a Context where it references a different operation node.
2220         self.node_uid = None
2221
2222     @property
2223     def output(self) -> _OutputDataProxyType:
2224         # TODO: We can configure `output` as a data descriptor
2225         #  instead of a property so that we can get more information
2226         #  from the class attribute before creating an instance of OperationDetails.OutputDataProxy.
2227         # The C++ equivalence would probably be a templated free function for examining traits.
2228         return self.__resource_manager.data()
2229
2230     def run(self):
2231         """Make a single attempt to resolve data flow conditions.
2232
2233         This is a public method, but should not need to be called by users. Instead,
2234         just use the `output` data proxy for result handles, or force data flow to be
2235         resolved with the `result` methods on the result handles.
2236
2237         `run()` may be useful to try to trigger computation (such as for remotely
2238         dispatched work) without retrieving results locally right away.
2239
2240         `run()` is also useful internally as a facade to the Context implementation details
2241         that allow `result()` calls to ask for their data dependencies to be resolved.
2242         Typically, `run()` will cause results to be published to subscribing operations as
2243         they are calculated, so the `run()` hook allows execution dependency to be slightly
2244         decoupled from data dependency, as well as to allow some optimizations or to allow
2245         data flow to be resolved opportunistically. `result()` should not call `run()`
2246         directly, but should cause the resource manager / Context implementation to process
2247         the data flow graph.
2248
2249         In one conception, `run()` can have a return value that supports control flow
2250         by itself being either runnable or not. The idea would be to support
2251         fault tolerance, implementations that require multiple iterations / triggers
2252         to complete, or looping operations.
2253         """
2254         # Note: `run()` is a synonym for `resolve` or `update` or whatever we choose
2255         #  to generically describe the request to bring a node up-to-date: i.e. the
2256         #  non-returning callable on the object produced by a director.
2257         self.__resource_manager.update_output()
2258
2259
2260 class OperationPlaceholder(AbstractOperation):
2261     """Placeholder for Operation handle during subgraph definition."""
2262
2263     def __init__(self, subgraph_resource_manager):
2264         ...
2265
2266     def run(self):
2267         raise exceptions.UsageError('This placeholder operation handle is not in an executable context.')
2268
2269     @property
2270     def output(self):
2271         """Allow subgraph components to be connected without instantiating actual operations."""
2272         if not isinstance(current_context(), SubgraphContext):
2273             raise exceptions.UsageError('Invalid access to subgraph internals.')
2274
2275
2276 _HandleType = typing.TypeVar('_HandleType', bound=gmx.abc.OperationReference)
2277
2278
2279 class NodeBuilder(gmx.abc.NodeBuilder):
2280     """Add an operation node to be managed by a Context.
2281
2282     The NodeBuilder interface implies minimal internal logic, and instead
2283     specifies the set of information that must or may be provided to construct
2284     a node.
2285     """
2286
2287     def __init__(self,
2288                  context: 'Context',
2289                  operation,
2290                  label: typing.Optional[str] = None):
2291         """Initialize the base NodeBuilder for gmxapi.operation module Nodes.
2292
2293         TODO:
2294             Convert the *operation* argument to be the registrant in the Operation registry.
2295             Requires confirmation of conforming behavior for dynamically defined operations.
2296
2297         """
2298         self.context = context
2299         self.label = label
2300         try:
2301             key = _make_registry_key(operation)
2302         except Exception as e:
2303             error = 'Could not create an operation registry key from {}'.format(operation)
2304             raise exceptions.ValueError(error) from e
2305         else:
2306             # TODO: sensibly handle dynamically defined operations.
2307             if key not in _operation_registry and not issubclass(operation, OperationDetailsBase):
2308                 error = '{} must be initialized with a registered operation. Got {}.'
2309                 raise exceptions.ValueError(error.format(__class__.__qualname__, operation))
2310         self.sources = DataSourceCollection()
2311         self._input_description = None
2312         self._resource_factory = None
2313         self._runner_director = None
2314         self._handle = None
2315         self._output_factory = None
2316
2317         self._resource_manager = ResourceManager
2318
2319     def set_input_description(self, input_description: InputDescription):
2320         self._input_description = input_description
2321
2322     def set_resource_factory(self, factory):
2323         self._resource_factory = factory
2324
2325     def set_runner_director(self, factory):
2326         self._runner_director = factory
2327
2328     def set_handle(self, factory):
2329         self._handle = factory
2330
2331     def set_output_factory(self, factory: 'OutputFactory'):
2332         self._output_factory = factory
2333
2334     def set_resource_manager(self, implementation: typing.Type[ResourceManager]):
2335         """Allow overriding the default ResourceManager implementation.
2336
2337         This is a workaround until we figure out what parts of the ResourceManager
2338         could be composed, registered separately with the Context, or be customized
2339         through other dispatching. One likely part of the solution is for clients
2340         of the NodeBuilder to assert requirements of the Context.
2341         """
2342         assert issubclass(implementation, ResourceManager)
2343         self._resource_manager = implementation
2344
2345     # TODO: Let the Context use the handle factory to produce a dynamically typed handle,
2346     #  and figure out the right way to annotate the return value.
2347     def build(self):
2348         """Create node and return a handle of the appropriate type."""
2349
2350         # Check for the ability to instantiate operations.
2351         missing_details = list()
2352         for builder_resource in ['input_description',
2353                                  'resource_factory',
2354                                  'runner_director',
2355                                  'handle',
2356                                  'output_factory']:
2357             detail = '_' + builder_resource
2358             if getattr(self, detail, None) is None:
2359                 missing_details.append(builder_resource)
2360         if len(missing_details) > 0:
2361             raise exceptions.UsageError(
2362                 'Missing details needed for operation node: {}'.format(
2363                     ', '.join(missing_details)
2364                 ))
2365
2366         assert hasattr(self._input_description, 'signature')
2367         input_sink = SinkTerminal(self._input_description.signature())
2368         input_sink.update(self.sources)
2369         logger.debug('SinkTerminal configured: {}'.format(SinkTerminal))
2370         edge = DataEdge(self.sources, input_sink)
2371         logger.debug('Created data edge {} with Sink {}'.format(edge, edge.sink_terminal))
2372         # TODO: Fingerprinting: Each operation instance has unique output based on the unique input.
2373         #            input_data_fingerprint = edge.fingerprint()
2374
2375         # Set up output proxy.
2376         assert hasattr(self._input_description, 'make_uid')
2377         uid = self._input_description.make_uid(edge)
2378         # TODO: ResourceManager should fetch the relevant factories from the Context
2379         #  instead of getting an OperationDetails instance.
2380         output_data_proxy = self._output_factory.output_proxy()
2381         output_description = self._output_factory.output_description()
2382         publishing_data_proxy = self._output_factory.publishing_data_proxy()
2383         manager = self._resource_manager(output_context=self.context,
2384                                          source=edge,
2385                                          operation_id=uid,
2386                                          output_data_proxy=output_data_proxy,
2387                                          output_description=output_description,
2388                                          publishing_data_proxy=publishing_data_proxy,
2389                                          resource_factory=self._resource_factory,
2390                                          runner_director=self._runner_director)
2391         self.context.work_graph[uid] = manager
2392         # TODO: Replace with a call to Node.handle()
2393         handle = self._handle(self.context.work_graph[uid])
2394         handle.node_uid = uid
2395         return handle
2396
2397     def add_input(self, name, source):
2398         # TODO: We can move some input checking here as the data model matures.
2399         self.sources[name] = source
2400
2401
2402 class InputPack(object):
2403     """Input container for data sources provided to resource factories.
2404
2405     When gmxapi.operation Contexts provide run time inputs to operations,
2406     instances of this class are provided to the operation's registered
2407     Resource factory.
2408
2409     Attributes:
2410         kwargs (dict): collection of named data sources.
2411     """
2412
2413     def __init__(self, kwargs: typing.Mapping[str, SourceTypeVar]):
2414         self.kwargs = kwargs
2415
2416
2417 class Context(gmx.abc.Context):
2418     """API Context.
2419
2420     All gmxapi data and operations are owned by a Context instance. The Context
2421     manages the details of how work is run and how data is managed.
2422
2423     Context implementations are not required to inherit from gmxapi.context.Context,
2424     but this class definition serves to specify the current Context API.
2425
2426     If subclassing is used to implement new Contexts, be sure to initialize the
2427     base class when providing a new __init__
2428     """
2429
2430     def node(self, node_id) -> Node:
2431         if node_id in self.labels:
2432             return self.labels[node_id]
2433         elif node_id in self.work_graph:
2434             return self.work_graph[node_id]
2435         else:
2436             raise exceptions.ValueError('Could not find a node identified by {}'.format(node_id))
2437
2438     def __init__(self):
2439         self.operations = dict()
2440         self.labels = dict()
2441         self.work_graph = collections.OrderedDict()
2442
2443     @abc.abstractmethod
2444     def node_builder(self, *, operation,
2445                      label: typing.Optional[str] = None) -> NodeBuilder:
2446         """Get a builder for a new work graph node.
2447
2448         Nodes are elements of computational work, with resources and execution
2449         managed by the Context. The Context handles parallelism resources, data
2450         placement, work scheduling, and data flow / execution dependencies.
2451
2452         This method is used by Operation director code and helper functions to
2453         add work to the graph.
2454
2455         Arguments:
2456             operation: a registered gmxapi operation
2457             label: optional user-provided identifier to provide human-readable node locators.
2458
2459         """
2460         ...
2461     # TODO: *node()* accessor.
2462     # @abc.abstractmethod
2463     # def node(self, node_identifier) -> AbstractOperation:
2464     #     ...
2465
2466
2467 class ModuleNodeBuilder(NodeBuilder):
2468     """Builder for work nodes in gmxapi.operation.ModuleContext."""
2469
2470
2471 class ModuleContext(Context):
2472     """Context implementation for the gmxapi.operation module.
2473
2474     """
2475     __version__ = 0
2476
2477     def node_builder(self, operation, label=None) -> NodeBuilder:
2478         """Get a builder for a new work node to add an operation in this context."""
2479         if label is not None:
2480             if label in self.labels:
2481                 raise exceptions.ValueError('Label {} is already in use.'.format(label))
2482             else:
2483                 # The builder should update the labeled node when it is done.
2484                 self.labels[label] = None
2485
2486         return ModuleNodeBuilder(context=weakref.proxy(self), operation=operation, label=label)
2487
2488
2489 # Context stack.
2490 __current_context = [ModuleContext()]
2491
2492
2493 def current_context() -> Context:
2494     """Get a reference to the currently active Context.
2495
2496     The imported gmxapi.context module maintains some state for the convenience
2497     of the scripting environment. Internally, all gmxapi activity occurs under
2498     the management of an API Context, explicitly or implicitly. Some actions or
2499     code constructs will generate separate contexts or sub-contexts. This utility
2500     command retrieves a reference to the currently active Context.
2501     """
2502     return __current_context[-1]
2503
2504
2505 def push_context(context) -> Context:
2506     """Enter a sub-context by pushing a context to the global context stack.
2507     """
2508     __current_context.append(context)
2509     return current_context()
2510
2511
2512 def pop_context() -> Context:
2513     """Exit the current Context by popping it from the stack."""
2514     return __current_context.pop()
2515
2516
2517 class OutputFactory(object):
2518     """Encapsulate the details of Operation output implementation in the gmxapi.operation Context.
2519
2520     Currently, OutputFactory objects are containers that compose functionality
2521     with which to implement the required internal interface.
2522     """
2523
2524     def __init__(self, *,
2525                  output_proxy: typing.Callable[[SourceResource], _OutputDataProxyType],
2526                  output_description: OutputCollectionDescription,
2527                  publishing_data_proxy: typing.Callable[[SourceResource, ClientID], _PublishingDataProxyType]):
2528         """Package the output details for an operation.
2529
2530         Arguments:
2531             output_proxy: factory to produce the *output* facet of an operation instance (node)
2532             output_description: fully formed output description
2533             publishing_data_proxy: factory to produce the run time output publishing resources
2534
2535         """
2536         if not callable(output_proxy):
2537             raise exceptions.ValueError('output_proxy argument must be a callable.')
2538         if not callable(publishing_data_proxy):
2539             raise exceptions.ValueError('publishing_data_proxy argument must be a callable.')
2540         if not isinstance(output_description, OutputCollectionDescription):
2541             raise exceptions.ValueError('output_description must be an instance of '
2542                                         'gmxapi.operation.OutputCollectionDescription')
2543         self._output_proxy = output_proxy
2544         self._output_description = output_description
2545         self._publishing_data_proxy = publishing_data_proxy
2546
2547     def output_proxy(self) -> typing.Callable[[_OutputDataProxyType, SourceResource], _OutputDataProxyType]:
2548         return self._output_proxy
2549
2550     def output_description(self) -> OutputCollectionDescription:
2551         return self._output_description
2552
2553     def publishing_data_proxy(self) -> typing.Callable[[SourceResource, ClientID],
2554                                                        _PublishingDataProxyType]:
2555         return self._publishing_data_proxy
2556
2557
2558 # TODO: Refactor in terms of gmx.abc.OperationDirector[_Op, gmx.operation.Context]
2559 # Normalizing this OperationDirector may require other updates to the function_wrapper facilities.
2560 class OperationDirector(object):
2561     """Direct the construction of an operation node in the gmxapi.operation module Context.
2562
2563     Collaboration: used by OperationDetails.operation_director, which
2564     will likely dispatch to different implementations depending on
2565     requirements of work or context.
2566     """
2567
2568     def __init__(self,
2569                  *args,
2570                  operation_details: typing.Type[OperationDetailsBase],
2571                  context: Context,
2572                  label=None,
2573                  **kwargs):
2574         self.operation_details = operation_details
2575         self.context = weakref.proxy(context)
2576         self.args = args
2577         self.kwargs = kwargs
2578         self.label = label
2579
2580     def __call__(self) -> AbstractOperation:
2581         builder = self.context.node_builder(operation=self.operation_details, label=self.label)
2582
2583         builder.set_resource_factory(self.operation_details.resource_director)
2584         builder.set_input_description(self.operation_details)
2585         builder.set_handle(OperationHandle)
2586
2587         operation_details = self.operation_details()
2588         node_input_factory = operation_details.signature().bind
2589         data_source_collection = node_input_factory(*self.args, **self.kwargs)
2590         for name, source in data_source_collection.items():
2591             builder.add_input(name, source)
2592
2593         def runner_director(resources):
2594             def runner():
2595                 operation_details(resources)
2596             return runner
2597
2598         builder.set_runner_director(runner_director)
2599         # Note that the call-backs held by OutputFactory cannot be annotated with
2600         # key word arguments under PEP 484, producing a weak warning in some cases.
2601         # We can consider more in the future how to balance call-back simplicity,
2602         # type annotation, and key word explicitness in helper functions like these.
2603         output_factory = OutputFactory(output_description=operation_details.output_description(),
2604                                        output_proxy=operation_details.output_data_proxy,
2605                                        publishing_data_proxy=operation_details.publishing_data_proxy)
2606         builder.set_output_factory(output_factory)
2607
2608         handle = builder.build()
2609         return handle
2610
2611
2612 def _make_datastore(output_description: OutputCollectionDescription, ensemble_width: int):
2613     """Create the data store for an operation with the described output.
2614
2615     Create a container to hold the resources for an operation node.
2616     Used internally by the resource manager when setting up the node.
2617     Evolution of the C++ framework for creating the Operation SessionResources
2618     object will inform the future of this and the resource_director method, but
2619     this data store is how the Context manages output data sources for resources
2620     that it manages.
2621     """
2622
2623     datastore = collections.OrderedDict()
2624     for name, dtype in output_description.items():
2625         assert isinstance(dtype, type)
2626         result_description = ResultDescription(dtype=dtype, width=ensemble_width)
2627         datastore[name] = OutputData(name=name, description=result_description)
2628     return datastore
2629
2630
2631 # TODO: For outputs, distinguish between "results" and "events".
2632 #  Both are published to the resource manager in the same way, but the relationship
2633 #  with subscribers is potentially different.
2634 def function_wrapper(output: dict = None):
2635     # Suppress warnings in the example code.
2636     # noinspection PyUnresolvedReferences
2637     """Generate a decorator for wrapped functions with signature manipulation.
2638
2639     New function accepts the same arguments, with additional arguments required by
2640     the API.
2641
2642     The new function returns an object with an ``output`` attribute containing the named outputs.
2643
2644     Example:
2645
2646         >>> @function_wrapper(output={'spam': str, 'foo': str})
2647         ... def myfunc(parameter: str = None, output=None):
2648         ...    output.spam = parameter
2649         ...    output.foo = parameter + ' ' + parameter
2650         ...
2651         >>> operation1 = myfunc(parameter='spam spam')
2652         >>> assert operation1.output.spam.result() == 'spam spam'
2653         >>> assert operation1.output.foo.result() == 'spam spam spam spam'
2654
2655     Arguments:
2656         output (dict): output names and types
2657
2658     If ``output`` is provided to the wrapper, a data structure will be passed to
2659     the wrapped functions with the named attributes so that the function can easily
2660     publish multiple named results. Otherwise, the ``output`` of the generated operation
2661     will just capture the return value of the wrapped function.
2662
2663     .. todo:: gmxapi typing stub file(s).
2664               The way this wrapper uses parameter annotations is not completely
2665               compatible with static type checking (PEP 484). If we decide to
2666               keep the convenience functionality by which operation details are
2667               inferred from parameter annotations, we should provide a separate
2668               stub file (.pyi) to support static type checking of the API.
2669     """
2670
2671     if output is not None and not isinstance(output, collections.abc.Mapping):
2672         raise exceptions.TypeError('If provided, `output` argument must be a mapping of data names to types.')
2673
2674     # TODO: (FR5+) gmxapi operations need to allow a context-dependent way to generate an implementation with input.
2675     # This function wrapper reproduces the wrapped function's kwargs, but does not allow chaining a
2676     # dynamic `input` kwarg and does not dispatch according to a `context` kwarg. We should allow
2677     # a default implementation and registration of alternate implementations. We don't have to do that
2678     # with functools.singledispatch, but we could, if we add yet another layer to generate a wrapper
2679     # that takes the context as the first argument. (`singledispatch` inspects the first argument rather
2680     # that a named argument)
2681
2682     # Implementation note: The closure of the current function is used to
2683     # dynamically define several classes that support the operation to be
2684     # created by the returned decorator.
2685
2686     def decorator(function) -> typing.Callable:
2687         # Explicitly capture `function` and `output` references.
2688         provided_output_map = output
2689
2690         # Note: Allow operations to be defined entirely in template headers to facilitate
2691         # compile-time optimization of fused operations. Consider what distinction, if any,
2692         # exists between a fused operation and a more basic operation. Probably it amounts
2693         # to aspects related to interaction with the Context that get combined in a fused
2694         # operation, such as the resource director, builder, etc.
2695         class OperationDetails(OperationDetailsBase):
2696             # Warning: function.__qualname__ is not rigorous since function may be in a local scope.
2697             # TODO: Improve base identifier.
2698             # Suggest registering directly in the Context instead of in this local class definition.
2699             __basename = '.'.join((str(function.__module__), function.__qualname__))
2700             __last_uid = 0
2701             _input_signature_description = InputCollectionDescription.from_function(function)
2702             # TODO: Separate the class and instance logic for the runner.
2703             # Logically, the runner is a detail of a context-specific implementation class,
2704             # though the output is not generally fully knowable until an instance is initialized
2705             # for a certain input fingerprint.
2706             # Note: We are almost at a point where this class can be subsumed into two
2707             # possible return types for wrapped_function_runner, acting as an operation helper.
2708             _runner = wrapped_function_runner(function, provided_output_map)
2709             _output_description = _runner.output_description
2710             _output_data_proxy_type = define_output_data_proxy(_output_description)
2711             _publishing_data_proxy_type = define_publishing_data_proxy(_output_description)
2712             _SourceResource = SourceResource[_output_data_proxy_type, _publishing_data_proxy_type]
2713
2714             @classmethod
2715             def name(cls) -> str:
2716                 return cls.__basename.split('.')[-1]
2717
2718             @classmethod
2719             def namespace(cls) -> str:
2720                 return cls.__basename.rstrip('.' + cls.name())
2721
2722             @classmethod
2723             def director(cls, context: _Context):
2724                 return cls.operation_director
2725
2726             @classmethod
2727             def signature(cls) -> InputCollectionDescription:
2728                 """Mapping of named inputs and input type.
2729
2730                 Used to determine valid inputs before an Operation node is created.
2731
2732                 Overrides OperationDetailsBase.signature() to provide an
2733                 implementation for the bound operation.
2734                 """
2735                 return cls._input_signature_description
2736
2737             def output_description(self) -> OutputCollectionDescription:
2738                 """Mapping of available outputs and types for an existing Operation node.
2739
2740                 Overrides OperationDetailsBase.output_description() to provide an
2741                 implementation for the bound operation.
2742                 """
2743                 return self._output_description
2744
2745             def publishing_data_proxy(self, *,
2746                                       instance: _SourceResource,
2747                                       client_id: int
2748                                       ) -> _publishing_data_proxy_type:
2749                 """Factory for Operation output publishing resources.
2750
2751                 Used internally when the operation is run with resources provided by instance.
2752
2753                 Overrides OperationDetailsBase.publishing_data_proxy() to provide an
2754                 implementation for the bound operation.
2755                 """
2756                 assert isinstance(instance, ResourceManager)
2757                 return self._publishing_data_proxy_type(instance=instance, client_id=client_id)
2758
2759             def output_data_proxy(self, instance: _SourceResource) -> _output_data_proxy_type:
2760                 assert isinstance(instance, ResourceManager)
2761                 return self._output_data_proxy_type(instance=instance)
2762
2763             def __call__(self, resources: PyFunctionRunnerResources):
2764                 """Execute the operation with provided resources.
2765
2766                 Resources are prepared in an execution context with aid of resource_director()
2767
2768                 After the first call, output data has been published and is trivially
2769                 available through the output_data_proxy()
2770
2771                 Overrides OperationDetailsBase.__call__().
2772                 """
2773                 self._runner(resources)
2774
2775             @classmethod
2776             def make_uid(cls, input):
2777                 """The unique identity of an operation node tags the output with respect to the input.
2778
2779                 Combines information on the Operation details and the input to uniquely
2780                 identify the Operation node.
2781
2782                 Arguments:
2783                     input : A (collection of) data source(s) that can provide Fingerprints.
2784
2785                 Used internally by the Context to manage ownership of data sources, to
2786                 locate resources for nodes in work graphs, and to manage serialization,
2787                 deserialization, and checkpointing of the work graph.
2788
2789                 The UID is a detail of the generic Operation that _should_ be independent
2790                 of the Context details to allow the framework to manage when and where
2791                 an operation is executed.
2792
2793                 Design notes on further refinement:
2794                     TODO: Operations should not single-handedly determine their own uniqueness
2795                     (but they should participate in the determination with the Context).
2796
2797                     Context implementations should be allowed to optimize handling of
2798                     equivalent operations in different sessions or work graphs, but we do not
2799                     yet TODO: guarantee that UIDs are globally unique!
2800
2801                     The UID should uniquely indicate an operation node based on that node's input.
2802                     We need input fingerprinting to identify equivalent nodes in a work graph
2803                     or distinguish nodes across work graphs.
2804
2805                 """
2806                 uid = str(cls.__basename) + str(cls.__last_uid)
2807                 cls.__last_uid += 1
2808                 return uid
2809
2810             @classmethod
2811             def resource_director(cls, *, input=None,
2812                                   output: _publishing_data_proxy_type = None) -> PyFunctionRunnerResources:
2813                 """a Director factory that helps build the Session Resources for the function.
2814
2815                 The Session launcher provides the director with all of the resources previously
2816                 requested/negotiated/registered by the Operation. The director uses details of
2817                 the operation to build the resources object required by the operation runner.
2818
2819                 For the Python Context, the protocol is for the Context to call the
2820                 resource_director instance method, passing input and output containers.
2821
2822                 Raises:
2823                     exceptions.TypeError if provided resource type does not match input signature.
2824                 """
2825                 resources = PyFunctionRunnerResources()
2826                 resources.update(input.kwargs)
2827                 resources.update({'output': output})
2828
2829                 # TODO: Remove this hack when we can better handle Futures of Containers and Future slicing.
2830                 for name in resources:
2831                     if isinstance(resources[name], (list, tuple)):
2832                         resources[name] = datamodel.ndarray(resources[name])
2833
2834                 # Check data compatibility
2835                 for name, value in resources.items():
2836                     if name != 'output':
2837                         expected = cls.signature()[name]
2838                         got = type(value)
2839                         if got != expected:
2840                             raise exceptions.TypeError(
2841                                 'Expected {} but got {} for {} resource {}.'.format(expected,
2842                                                                                     got,
2843                                                                                     cls.__basename,
2844                                                                                     name))
2845                 return resources
2846
2847         # TODO: (FR4) Update annotations with gmxapi data types. E.g. return -> Future.
2848         @functools.wraps(function)
2849         def helper(*args, context=None, **kwargs):
2850             # Description of the Operation input (and output) occurs in the
2851             # decorator closure. By the time this factory is (dynamically) defined,
2852             # the OperationDetails and ResourceManager are well defined, but not
2853             # yet instantiated.
2854             # Inspection of the offered input occurs when this factory is called,
2855             # and OperationDetails, ResourceManager, and Operation are instantiated.
2856
2857             # This operation factory is specialized for the default package Context.
2858             if context is None:
2859                 context = current_context()
2860             else:
2861                 raise exceptions.ApiError('Non-default context handling not implemented.')
2862
2863             # This calls a dispatching function that may not be able to reconcile the input
2864             # and Context capabilities. This is the place to handle various exceptions for
2865             # whatever reasons this reconciliation cannot occur.
2866             handle = OperationDetails.operation_director(*args, context=context, label=None, **kwargs)
2867
2868             # TODO: NOW: The input fingerprint describes the provided input
2869             # as (a) ensemble input, (b) static, (c) future. By the time the
2870             # operation is instantiated, the topology of the node is known.
2871             # When compared to the InputCollectionDescription, the data compatibility
2872             # can be determined.
2873
2874             return handle
2875
2876         # to do: The factory itself needs to be able to register a factory with
2877         # the context that will be responsible for the Operation handle.
2878         # The factories need to be able to serve as dispatchers for themselves,
2879         # since an operation in one context may need to be reconstituted in a
2880         # different context.
2881         # The dispatching factory produces a director for a Context,
2882         # which will register a factory with the operation in that context.
2883
2884         # The factory function has a DirectorFactory. Director instances talk to a NodeBuilder for a Context to
2885         # get handles to new operation nodes managed by the context. Part of that process includes registering
2886         # a DirectorFactory with the Context.
2887         return helper
2888
2889     return decorator
2890
2891
2892 class GraphVariableDescriptor(object):
2893     def __init__(self, name: str = None, dtype=None, default=None):
2894         self.name = name
2895         self.dtype = dtype
2896         self.default = default
2897         self.state = None
2898
2899     @property
2900     def internal_name(self):
2901         try:
2902             return '_' + self.name
2903         except TypeError:
2904             return None
2905
2906     def __get__(self, instance, owner):
2907         if instance is None:
2908             # Access is through the class attribute of the owning class.
2909             # Allows the descriptor itself to be inspected or reconfigured after
2910             # class definition.
2911             # TODO: There is probably some usage checking that can be performed here.
2912             return self
2913         try:
2914             value = getattr(instance, self.internal_name)
2915         except AttributeError:
2916             value = self.default
2917             # Lazily initialize the instance value from the class default.
2918             if value is not None:
2919                 try:
2920                     setattr(instance, self.internal_name, value)
2921                 except Exception as e:
2922                     message = 'Could not assign default value to {} attribute of {}'.format(
2923                         self.internal_name,
2924                         instance)
2925                     raise exceptions.ApiError(message) from e
2926         return value
2927
2928     # Implementation note: If we have a version of the descriptor class with no `__set__` method,
2929     # it is a non-data descriptor that can be overridden by a data descriptor on an instance.
2930     # This could be one way to handle the polymorphism associated with state changes.
2931     def __set__(self, instance, value):
2932         if instance._editing:
2933             # Update the internal connections defining the subgraph.
2934             setattr(instance, self.internal_name, value)
2935         else:
2936             raise AttributeError('{} not assignable on {}'.format(self.name, instance))
2937
2938
2939 class GraphMeta(type):
2940     """Meta-class for gmxapi data flow graphs and subgraphs.
2941
2942     Used to implement ``subgraph`` as GraphMeta.__new__(...).
2943     Also allows subgraphs to be defined as Python class definitions by inheriting
2944     from Subgraph or by using the ``metaclass=GraphMeta`` hint in the class
2945     statement arguments.
2946
2947     The Python class creation protocol allows Subgraphs to be defined in as follows.
2948
2949     See the Subgraph class documentation for customization of instances through
2950     the Python context manager protocol.
2951     """
2952     _prepare_keywords = ('variables',)
2953
2954     # TODO: Python 3.7.2 introduces typing.OrderedDict
2955     # In practice, we are using collections.OrderedDict, but we should use the generic
2956     # ABC from the typing module to avoid being overly restrictive with type hints.
2957     try:
2958         from typing import OrderedDict
2959     except ImportError:
2960         from collections import OrderedDict
2961
2962     @classmethod
2963     def __prepare__(mcs, name, bases, variables: OrderedDict = None, **kwargs):
2964         """Prepare the class namespace.
2965
2966         Keyword Args:
2967               variables: mapping of persistent graph variables to type / default value (optional)
2968         """
2969         # Python runs this before executing the class body of Subgraph or its
2970         # subclasses. This is our chance to handle key word arguments given in the
2971         # class declaration.
2972
2973         if kwargs is not None:
2974             for keyword in kwargs:
2975                 raise exceptions.UsageError('Unexpected key word argument: {}'.format(keyword))
2976
2977         namespace = collections.OrderedDict()
2978
2979         if variables is not None:
2980             if isinstance(variables, collections.abc.Mapping):
2981                 for name, value in variables.items():
2982                     if isinstance(value, type):
2983                         dtype = value
2984                         if hasattr(value, 'default'):
2985                             default = value.default
2986                         else:
2987                             default = None
2988                     else:
2989                         default = value
2990                         if hasattr(default, 'dtype'):
2991                             dtype = default.dtype
2992                         else:
2993                             dtype = type(default)
2994                     namespace[name] = GraphVariableDescriptor(name, default=default, dtype=dtype)
2995                     # Note: we are not currently using the hook used by `inspect`
2996                     # to annotate with the class that defined the attribute.
2997                     # namespace[name].__objclass__ = mcs
2998                     assert not hasattr(namespace[name], '__objclass__')
2999             else:
3000                 raise exceptions.ValueError('"variables" must be a mapping of graph variables to types or defaults.')
3001
3002         return namespace
3003
3004     def __new__(cls, name, bases, namespace, **kwargs):
3005         for key in kwargs:
3006             if key not in GraphMeta._prepare_keywords:
3007                 raise exceptions.ApiError('Unexpected class creation keyword: {}'.format(key))
3008         return type.__new__(cls, name, bases, namespace)
3009
3010     # TODO: This is keyword argument stripping is not necessary in more recent Python versions.
3011     # When Python minimum required version is increased, check if we can remove this.
3012     def __init__(cls, name, bases, namespace, **kwargs):
3013         for key in kwargs:
3014             if key not in GraphMeta._prepare_keywords:
3015                 raise exceptions.ApiError('Unexpected class initialization keyword: {}'.format(key))
3016         super().__init__(name, bases, namespace)
3017
3018
3019 class SubgraphNodeBuilder(NodeBuilder):
3020
3021     def __init__(self,
3022                  context: 'SubgraphContext',
3023                  operation,
3024                  label: typing.Optional[str] = None):
3025         super().__init__(context, operation, label)
3026
3027     def add_input(self, name: str, source):
3028         """Add an input resource for the Node under construction.
3029
3030         Extends NodeBuilder.add_input()
3031         """
3032         # Inspect inputs.
3033         #  * Are they from outside the subgraph?
3034         #  * Subgraph variables?
3035         #  * Subgraph internal nodes?
3036         # Inputs from outside the subgraph are (provisionally) subgraph inputs.
3037         # Inputs that are subgraph variables or subgraph internal nodes mark operations that will need to be re-run.
3038         # For first implementation, let all operations be recreated, but we need to
3039         # manage the right input proxies.
3040         # For zeroeth implementation, try just tracking the entities that need a reset() method called.
3041         assert isinstance(self.context, SubgraphContext)
3042         if hasattr(source, 'reset'):
3043             self.context.add_resetter(source.reset)
3044         elif hasattr(source, '_reset'):
3045             self.context.add_resetter(source._reset)
3046         super().add_input(name, source)
3047
3048     def build(self) -> OperationPlaceholder:
3049         """Get a reference to the internal node in the subgraph definition.
3050
3051         In the SubgraphContext, these handles cannot represent uniquely identifiable
3052         results. They are placeholders for relative positions in graphs that are
3053         not defined until the subgraph is being executed.
3054
3055         Such references should be tracked and invalidated when exiting the
3056         subgraph context. Within the subgraph context, they are used to establish
3057         the recipe for updating the subgraph's outputs or persistent data during
3058         execution.
3059         """
3060         # Placeholder handles in the subgraph definition don't have real resource managers.
3061         # Check for the ability to instantiate operations.
3062         handle = super().build()
3063         # handle = OperationPlaceholder()
3064         return typing.cast(OperationPlaceholder, handle)
3065
3066
3067 class SubgraphContext(Context):
3068     """Provide a Python context manager in which to set up a graph of operations.
3069
3070     Allows operations to be configured without adding nodes to the global execution
3071     context.
3072     """
3073
3074     def __init__(self):
3075         super().__init__()
3076         self.resetters = set()
3077
3078     def node_builder(self, operation, label=None) -> NodeBuilder:
3079         if label is not None:
3080             if label in self.labels:
3081                 raise exceptions.ValueError('Label {} is already in use.'.format(label))
3082             else:
3083                 # The builder should update the labeled node when it is done.
3084                 self.labels[label] = None
3085
3086         return SubgraphNodeBuilder(context=weakref.proxy(self), operation=operation, label=label)
3087
3088     def add_resetter(self, function):
3089         assert callable(function)
3090         self.resetters.add(function)
3091
3092
3093 class Subgraph(object, metaclass=GraphMeta):
3094     """
3095
3096     When subclassing from Subgraph, aspects of the subgraph's data ports can be
3097     specified with keyword arguments in the class statement. Example::
3098
3099         >>> class MySubgraph(Subgraph, variables={'int_with_default': 1, 'boolData': bool}): pass
3100         ...
3101
3102     The key word *variables* is used in the class declaration to map the types
3103     of subgraph Variables with (optional) default values.
3104
3105     Execution model:
3106         Subgraph execution must follow a well-defined protocol in order to sensibly
3107         resolve data Futures at predictable points. Note that subgraphs act as operations
3108         that can be automatically redefined at run time (in limited cases), such as
3109         to automatically form iteration chains to implement "while" loops. We refer
3110         to one copy / iteration / generation of a subgraph as an "element" below.
3111
3112         When a subgraph element begins execution, each of its variables with an
3113         "updated" state from the previous iteration has the "updated" state moved
3114         to the new element's "initial" state, and the "updated" state is voided.
3115
3116         Subgraph.next() appends an element of the subgraph to the chain. Subsequent
3117         calls to Subgraph.run() bring the new outputs up to date (and then call next()).
3118         Thus, for a subgraph with an output called ``is_converged``, calling
3119         ``while (not subgraph.is_converged): subgraph.run()`` has the desired effect,
3120         but is likely suboptimal for execution. Instead use the gmxapi while_loop.
3121
3122         If the subgraph is not currently executing, it can be in one of two states:
3123         "editing" or "ready". In the "ready" state, class and instance Variables
3124         cannot be assigned, but can be read through the data descriptors. In this
3125         state, the descriptors only have a single state.
3126
3127         If "editing," variables on the class object can be assigned to update the
3128         data flow defined for the subgraph. While "editing", reading or writing
3129         instance Variables is undefined behavior.
3130
3131         When "editing" begins, each variable is readable as a proxy to the "initial"
3132         state in an element. Assignments while "editing" put variables in temporary
3133         states accessible only during editing.
3134         When "editing" finishes, the "updated" output data source of each
3135         variable for the element is set, if appropriate.
3136
3137         # TODO: Use a get_context to allow operation factories or accessors to mark
3138         #  references for update/annotation when exiting the 'with' block.
3139     """
3140
3141
3142 class SubgraphBuilder(object):
3143     """Helper for defining new Subgraphs.
3144
3145     Manages a Python context in which to define a new Subgraph type. Can be used
3146     in a Python ``with`` construct exactly once to provide the Subgraph class body.
3147     When the ``with`` block is exited (or ``build()`` is called explicitly), a
3148     new type instance becomes available. Subsequent calls to SubgraphBuilder.__call__(self, ...)
3149     are dispatched to the Subgraph constructor.
3150
3151     Outside of the ``with`` block, read access to data members is proxied to
3152     descriptors on the built Subgraph.
3153
3154     Instances of SubgraphBuilder are returned by the ``subgraph()`` utility function.
3155     """
3156
3157     def __init__(self, variables):
3158         self.__dict__.update({'variables': variables,
3159                               '_staging': collections.OrderedDict(),
3160                               '_editing': False,
3161                               '_subgraph_context': None,
3162                               '_subgraph_instance': None,
3163                               '_fused_operation': None,
3164                               '_factory': None})
3165         # Return a placeholder that we can update during iteration.
3166         # Long term, this is probably implemented with data descriptors
3167         # that will be moved to a new Subgraph type object.
3168         for name in self.variables:
3169             if not isinstance(self.variables[name], Future):
3170                 self.variables[name] = gmx.make_constant(self.variables[name])
3171
3172         # class MySubgraph(Subgraph, variables=variables):
3173         #     pass
3174         #
3175         # self._subgraph_instance = MySubgraph()
3176
3177     def __getattr__(self, item):
3178         if self._editing:
3179             if item in self.variables:
3180                 if item in self._staging:
3181                     logger.debug('Read access to intermediate value of subgraph variable {}'.format(item))
3182                     return self._staging[item]
3183                 else:
3184                     logger.debug('Read access to subgraph variable {}'.format(item))
3185                     return self.variables[item]
3186             else:
3187                 raise AttributeError('Invalid attribute: {}'.format(item))
3188         else:
3189             # TODO: this is not quite the described interface...
3190             return lambda obj: obj.values[item]
3191
3192     def __setattr__(self, key, value):
3193         """Part of the builder interface."""
3194         if key in self.__dict__:
3195             self.__dict__[key] = value
3196         else:
3197             if self._editing:
3198                 self.add_update(key, value)
3199             else:
3200                 raise exceptions.UsageError('Subgraph is not in an editable state.')
3201
3202     def add_update(self, key, value):
3203         """Add a variable update to the internal subgraph."""
3204         if key not in self.variables:
3205             raise AttributeError('No such attribute: {}'.format(key))
3206         if not self._editing:
3207             raise exceptions.UsageError('Subgraph is not in an editable state.')
3208         # Else, stage the potential final value for the iteration.
3209         logger.debug('Staging subgraph update {} = {}'.format(key, value))
3210         # Return a placeholder that we can update during iteration.
3211         # Long term, this is probably implemented with data descriptors
3212         # that will be moved to a new Subgraph type object.
3213         if not isinstance(value, Future):
3214             value = gmx.make_constant(value)
3215         self._staging[key] = value
3216         self._staging.move_to_end(key)
3217
3218     def __enter__(self):
3219         """Enter a Context managed by the subgraph to capture operation additions.
3220
3221         Allows the internal data flow of the subgraph to be defined in the same
3222         manner as the default work graph while the Python context manager is active.
3223
3224         The subgraph Context is activated when entering a ``with`` block and
3225         finalized at the end of the block.
3226         """
3227         # While editing the subgraph in the SubgraphContext, we need __get__ and __set__
3228         # data descriptor access on the Subgraph subclass type, but outside of the
3229         # context manager, the descriptor should be non-writable and more opaque,
3230         # while the instance should be readable, but not writable.
3231         self.__dict__['_editing'] = True
3232         # TODO: this probably needs to be configured with variables...
3233         self.__dict__['_subgraph_context'] = SubgraphContext()
3234         push_context(self._subgraph_context)
3235         return self
3236
3237     def build(self):
3238         """Build the subgraph by defining some new operations.
3239
3240         Examine the subgraph variables. Variables with handles to data sources
3241         in the SubgraphContext represent work that needs to be executed on
3242         a subgraph execution or iteration. Variables with handles to data sources
3243         outside of the subgraph represent work that needs to be executed once
3244         on only the first iteration to initialize the subgraph.
3245
3246         Construct a factory for the fused operation that performs the work to
3247         update the variables on a single iteration.
3248
3249         Construct a factory for the fused operation that performs the work to
3250         update the variables on subsequent iterations and which will be fed
3251         the outputs of a previous iteration. Both of the generated operations
3252         have the same output signature.
3253
3254         Construct and wrap the generator function to recursively append work
3255         to the graph and update until condition is satisfied.
3256
3257         TODO: Explore how to drop work from the graph once there are no more
3258          references to its output, including check-point machinery.
3259         """
3260         logger.debug('Finalizing subgraph definition.')
3261         inputs = collections.OrderedDict()
3262         for key, value in self.variables.items():
3263             # TODO: What if we don't want to provide default values?
3264             inputs[key] = value
3265
3266         updates = self._staging
3267
3268         class Subgraph(object):
3269             def __init__(self, input_futures, update_sources):
3270                 self.values = collections.OrderedDict([(key, value.result()) for key, value in input_futures.items()])
3271                 logger.debug('subgraph initialized with {}'.format(
3272                     ', '.join(['{}: {}'.format(key, value) for key, value in self.values.items()])))
3273                 self.futures = collections.OrderedDict([(key, value) for key, value in input_futures.items()])
3274                 self.update_sources = collections.OrderedDict([(key, value) for key, value in update_sources.items()])
3275                 logger.debug('Subgraph updates staged:')
3276                 for update, source in self.update_sources.items():
3277                     logger.debug('    {} = {}'.format(update, source))
3278
3279             def run(self):
3280                 for name in self.update_sources:
3281                     result = self.update_sources[name].result()
3282                     logger.debug('Update: {} = {}'.format(name, result))
3283                     self.values[name] = result
3284                 # Replace the data sources in the futures.
3285                 for name in self.update_sources:
3286                     self.futures[name].resource_manager = gmx.make_constant(self.values[name]).resource_manager
3287                 for name in self.update_sources:
3288                     self.update_sources[name]._reset()
3289
3290         subgraph = Subgraph(inputs, updates)
3291
3292         return lambda subgraph=subgraph: subgraph
3293
3294     def __exit__(self, exc_type, exc_val, exc_tb):
3295         """End the Subgraph editing session and finalize the Subgraph build.
3296
3297         After exiting, this instance forwards __call__() to a factory for an
3298         operation that carries out the work in the subgraph with inputs bound
3299         in the current context as defined by ``variables``.
3300         """
3301         self._factory = self.build()
3302
3303         context = pop_context()
3304         assert context is self._subgraph_context
3305         self.__dict__['_editing'] = False
3306         # Returning False causes exceptions in the `with` block to be reraised.
3307         # Remember to switch this to return True if we want to transform or suppress
3308         # such an exception (we probably do).
3309         if exc_type is not None:
3310             logger.error('Got exception {} while editing subgraph {}.'.format(exc_val, self))
3311             logger.debug('Subgraph exception traceback: \n{}'.format(exc_tb))
3312         return False
3313
3314     def __call__(self):
3315         # TODO: After build() has been called, this should dispatch to a factory
3316         #  that returns an OperationHandle.
3317         return self._factory()
3318
3319
3320 def while_loop(*, operation, condition, max_iteration=10):
3321     """Generate and run a chain of operations such that condition evaluates True.
3322
3323     Returns and operation instance that acts like a single node in the current
3324     work graph, but which is a proxy to the operation at the end of a dynamically generated chain
3325     of operations. At run time, condition is evaluated for the last element in
3326     the current chain. If condition evaluates False, the chain is extended and
3327     the next element is executed. When condition evaluates True, the object
3328     returned by ``while_loop`` becomes a proxy for the last element in the chain.
3329
3330     Equivalent to calling operation.while(condition), where available.
3331
3332     Arguments:
3333         operation: a callable that produces an instance of an operation when called with no arguments.
3334         condition: a callable that accepts an object (returned by ``operation``) that returns a boolean.
3335         max_iteration: execute the loop no more than this many times (default 10)
3336
3337     Warning:
3338         *max_iteration* is provided in part to minimize the cost of bugs in early
3339         versions of this software. The default value may be changed or
3340         removed on short notice.
3341
3342     Warning:
3343         The protocol by which ``while_loop`` interacts with ``operation`` and ``condition``
3344         is very unstable right now. Please refer to this documentation when installing new
3345         versions of the package.
3346
3347     Protocol:
3348         Warning:
3349             This protocol will be changed before the 0.1 API is finalized.
3350
3351         When called, ``while_loop`` calls ``operation`` without arguments
3352         and captures the return value captured as ``_operation``.
3353         The object produced by ``operation()`` must have a ``reset``,
3354         a ``run`` method, and an ``output`` attribute.
3355
3356         This is inspected
3357         to determine the output data proxy for the operation produced by the call
3358         to ``while_loop``. When that operation is called, it does the equivalent of
3359
3360             while(condition(self._operation)):
3361                 self._operation.reset()
3362                 self._operation.run()
3363
3364         Then, the output data proxy of ``self`` is updated with the results from
3365         self._operation.output.
3366
3367     """
3368     # In the first implementation, Subgraph is NOT and OperationHandle.
3369     # if not isinstance(obj, AbstractOperationHandle):
3370     #     raise exceptions.UsageError(
3371     #     '"operation" key word argument must be a callable that produces an Operation handle.')
3372     # outputs = {}
3373     # for name, descriptor in obj.output.items():
3374     #     outputs[name] = descriptor._dtype
3375
3376     # 1. Get the initial inputs.
3377     # 2. Initialize the subgraph with the initial inputs.
3378     # 3. Run the subgraph.
3379     # 4. Get the outputs.
3380     # 5. Initialize the subgraph with the outputs.
3381     # 6. Go to 3 if condition is not met.
3382
3383     obj = operation()
3384     assert hasattr(obj, 'values')
3385     outputs = collections.OrderedDict([(key, type(value)) for key, value in obj.values.items()])
3386
3387     @function_wrapper(output=outputs)
3388     def run_loop(output: OutputCollectionDescription):
3389         iteration = 0
3390         obj = operation()
3391         logger.debug('Created object {}'.format(obj))
3392         logger.debug(', '.join(['{}: {}'.format(key, obj.values[key]) for key in obj.values]))
3393         logger.debug('Condition: {}'.format(condition(obj)))
3394         while (condition(obj)):
3395             logger.debug('Running iteration {}'.format(iteration))
3396             obj.run()
3397             logger.debug(
3398                 ', '.join(['{}: {}'.format(key, obj.values[key]) for key in obj.values]))
3399             logger.debug('Condition: {}'.format(condition(obj)))
3400             iteration += 1
3401             if iteration > max_iteration:
3402                 break
3403         for name in outputs:
3404             setattr(output, name, obj.values[name])
3405
3406         return obj
3407
3408     return run_loop
3409
3410
3411 def subgraph(variables=None):
3412     """Allow operations to be configured in a sub-context.
3413
3414     The object returned functions as a Python context manager. When entering the
3415     context manager (the beginning of the ``with`` block), the object has an
3416     attribute for each of the named ``variables``. Reading from these variables
3417     gets a proxy for the initial value or its update from a previous loop iteration.
3418     At the end of the ``with`` block, any values or data flows assigned to these
3419     attributes become the output for an iteration.
3420
3421     After leaving the ``with`` block, the variables are no longer assignable, but
3422     can be called as bound methods to get the current value of a variable.
3423
3424     When the object is run, operations bound to the variables are ``reset`` and
3425     run to update the variables.
3426     """
3427     # Implementation note:
3428     # A Subgraph (type) has a subgraph context associated with it. The subgraph's
3429     # ability to capture operation additions is implemented in terms of the
3430     # subgraph context.
3431     logger.debug('Declare a new subgraph with variables {}'.format(variables))
3432
3433     return SubgraphBuilder(variables)
3434
3435
3436 @computed_result
3437 def join_arrays(*, front: datamodel.NDArray = (), back: datamodel.NDArray = ()) -> datamodel.NDArray:
3438     """Operation that consumes two sequences and produces a concatenated single sequence.
3439
3440     Note that the exact signature of the operation is not determined until this
3441     helper is called. Helper functions may dispatch to factories for different
3442     operations based on the inputs. In this case, the dtype and shape of the
3443     inputs determines dtype and shape of the output. An operation instance must
3444     have strongly typed output, but the input must be strongly typed on an
3445     object definition so that a Context can make runtime decisions about
3446     dispatching work and data before instantiating.
3447     # TODO: elaborate and clarify.
3448     # TODO: check type and shape.
3449     # TODO: figure out a better annotation.
3450     """
3451     # TODO: (FR4) Returned list should be an NDArray.
3452     if isinstance(front, (str, bytes)) or isinstance(back, (str, bytes)):
3453         raise exceptions.ValueError('Input must be a pair of lists.')
3454     assert isinstance(front, datamodel.NDArray)
3455     assert isinstance(back, datamodel.NDArray)
3456     new_list = list(front._values)
3457     new_list.extend(back._values)
3458     return datamodel.NDArray(new_list)
3459
3460
3461 # TODO: Constrain
3462 Scalar = typing.TypeVar('Scalar')
3463
3464
3465 def concatenate_lists(sublists: list = ()) -> _Future[gmx.datamodel.NDArray]:
3466     """Combine data sources into a single list.
3467
3468     A trivial data flow restructuring operation.
3469     """
3470     if isinstance(sublists, (str, bytes)):
3471         raise exceptions.ValueError('Input must be a list of lists.')
3472     if len(sublists) == 0:
3473         return datamodel.ndarray([])
3474     else:
3475         # TODO: Fix the data model so that this can type-check properly.
3476         return join_arrays(front=sublists[0],
3477                            back=typing.cast(datamodel.NDArray,
3478                                             concatenate_lists(sublists[1:])))
3479
3480
3481 def make_constant(value: Scalar) -> _Future:
3482     """Provide a predetermined value at run time.
3483
3484     This is a trivial operation that provides a (typed) value, primarily for
3485     internally use to manage gmxapi data flow.
3486
3487     Accepts a value of any type. The object returned has a definite type and
3488     provides same interface as other gmxapi outputs. Additional constraints or
3489     guarantees on data type may appear in future versions.
3490     """
3491     dtype = type(value)
3492     source = StaticSourceManager(name='data', proxied_data=value, width=1, function=lambda x: x)
3493     description = ResultDescription(dtype=dtype, width=1)
3494     future = Future(source, 'data', description=description)
3495     return future
3496
3497
3498 def logical_not(value: bool) -> _Future:
3499     """Boolean negation.
3500
3501     If the argument is a gmxapi compatible Data or Future object, a new View or
3502     Future is created that proxies the boolean opposite of the input.
3503
3504     If the argument is a callable, logical_not returns a wrapper function that
3505     returns a Future for the logical opposite of the callable's result.
3506     """
3507     # TODO: Small data transformations like this don't need to be formal Operations.
3508     # This could be essentially a data annotation that affects the resolver in a
3509     # DataEdge. As an API detail, coding for different Contexts and optimizations
3510     # within those Context implementations could be simplified.
3511     operation = function_wrapper(output={'data': bool})(lambda data=bool(): not bool(data))
3512     return operation(data=value).output.data