2 # This file is part of the GROMACS molecular simulation package.
4 # Copyright (c) 2019,2020,2021, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 # Lesser General Public License for more details.
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
35 """Define gmxapi-compliant Operations
37 Provide decorators and base classes to generate and validate gmxapi Operations.
39 Nodes in a work graph are created as instances of Operations. An Operation factory
40 accepts well-defined inputs as key word arguments. The object returned by such
41 a factory is a handle to the node in the work graph. It's ``output`` attribute
42 is a collection of the Operation's results.
44 function_wrapper(...) produces a wrapper that converts a function to an Operation
45 factory. The Operation is defined when the wrapper is called. The Operation is
46 instantiated when the factory is called. The function is executed when the Operation
49 The framework ensures that an Operation instance is executed no more than once.
52 __all__ = ['computed_result',
62 from contextlib import contextmanager
65 from gmxapi import datamodel
66 from gmxapi import exceptions
67 from gmxapi import logger as root_logger
68 from gmxapi.abc import OperationImplementation, MutableResource, Node
69 from gmxapi.typing import _Context, ResultTypeVar, SourceTypeVar, valid_result_types, valid_source_types
70 from gmxapi.typing import Future as _Future
72 # Initialize module-level logger
73 logger = root_logger.getChild('operation')
74 logger.info('Importing {}'.format(__name__))
77 class ResultDescription:
78 """Describe what will be returned when `result()` is called."""
80 def __init__(self, dtype=None, width=1):
81 assert isinstance(dtype, type)
82 assert issubclass(dtype, valid_result_types)
83 assert isinstance(width, int)
88 def dtype(self) -> type:
89 """node output type"""
93 def width(self) -> int:
98 return '{}(dtype={}, width={})'.format(self.__class__.__name__, self.dtype, self.width)
101 class OutputData(object):
102 """Encapsulate the description and storage of a data output."""
104 def __init__(self, name: str, description: ResultDescription):
107 assert isinstance(description, ResultDescription)
108 self._description = description
109 self._done = [False] * self._description.width
110 self._data = [None] * self._description.width
114 """The name of the published output."""
117 # TODO: Change to regular member function and add ensemble member arg.
120 """Ensemble completion status for this output."""
121 return all(self._done)
123 def data(self, member: int = None):
124 """Access the raw data for localized output for the ensemble or the specified member."""
126 raise exceptions.ApiError('Attempt to read before data has been published.')
127 if self._data is None or None in self._data:
128 raise exceptions.ApiError('Data marked "done" but contains null value.')
129 if member is not None:
130 return self._data[member]
134 def set(self, value, member: int):
135 """Set local data and mark as completed.
137 Used internally to maintain the data store.
139 if self._description.dtype == datamodel.NDArray:
140 self._data[member] = datamodel.ndarray(value)
142 self._data[member] = self._description.dtype(value)
143 self._done[member] = True
146 """Reinitialize the data store.
149 This is a workaround until the execution model is more developed.
152 Remove this method when all operation handles provide factories to
153 reinstantiate on new Contexts and/or with new inputs.
156 self._done = [False] * self._description.width
157 self._data = [None] * self._description.width
160 class EnsembleDataSource(gmx.abc.EnsembleDataSource):
161 """A single source of data with ensemble data flow annotations.
163 Note that data sources may be Futures.
166 def __init__(self, source=None, width=1, dtype=None):
171 def node(self, member: int):
172 return self.source[member]
175 protocols = ('reset', '_reset')
176 for protocol in protocols:
177 if hasattr(self.source, protocol):
178 getattr(self.source, protocol)()
181 class DataSourceCollection(collections.OrderedDict):
182 """Store and describe input data handles for an operation.
184 When created from InputCollectionDescription.bind(), the DataSourceCollection
185 has had default values inserted.
187 Note: DataSourceCollection is the input resource collection type for NodeBuilder.
188 To use with the NodeBuilder interface, first create the collection, then
189 iteratively add its resources.
191 TODO: We should probably normalize the collection aspect of input. Is this a
192 single input? Are all inputs added as collections? Should this be split
193 to a standard iterable? Does a resource factory always produce a mapping?
196 def __init__(self, **kwargs):
197 """Initialize from key/value pairs of named data sources.
199 Data sources may be any of the basic gmxapi data types, gmxapi Futures
200 of those types, or gmxapi ensemble data bundles of the above.
202 super(DataSourceCollection, self).__init__()
203 for name, value in kwargs.items():
206 def __setitem__(self, key: str, value: SourceTypeVar) -> None:
207 if not isinstance(key, str):
208 raise exceptions.TypeError('Data must be named with str type.')
209 # TODO: Encapsulate handling of proferred data sources to Context details.
210 # Preprocessed input should be self-describing gmxapi data types. Structured
211 # input must be recursively (depth-first) converted to gmxapi data types.
212 # TODO: Handle gmxapi Futures stored as dictionary elements!
213 if not isinstance(value, valid_source_types):
214 if isinstance(value, collections.abc.Iterable):
215 # Iterables here are treated as arrays, but we do not have a robust typing system.
216 # Warning: In the initial implementation, the iterable may contain Future objects.
217 # TODO: (#2993) Revisit as we sort out data shape and Future protocol.
219 value = datamodel.ndarray(value)
220 except (exceptions.ValueError, exceptions.TypeError) as e:
221 raise exceptions.TypeError('Iterable could not be converted to NDArray: {}'.format(value)) from e
222 elif hasattr(value, 'result'):
226 raise exceptions.ApiError('Cannot process data source {}'.format(value))
227 super().__setitem__(key, value)
230 """Reset all sources in the collection."""
231 for source in self.values():
232 if hasattr(source, 'reset'):
234 if hasattr(source, '_reset'):
238 """Provide some sort of unique identifier.
240 We need a more deterministic fingerprinting scheme with well-specified
241 uniqueness semantics, but right now we just need something reasonably
244 hashed_keys_and_values = tuple(hash(entity) for item in self.items() for entity in item)
245 return hash(hashed_keys_and_values)
248 def computed_result(function):
249 """Decorate a function to get a helper that produces an object with Result behavior.
251 When called, the new function produces an ImmediateResult object.
253 The new function has the same signature as the original function, but can accept
254 gmxapi data proxies, assuming the provided proxy objects represent types
255 compatible with the original signature.
257 Calls to `result()` return the value that `function` would return when executed
258 in the local context with the inputs fully resolved.
260 The API does not specify when input data dependencies will be resolved
261 or when the wrapped function will be executed. That is, ``@computed_result``
262 functions may force immediate resolution of data dependencies and/or may
263 be called more than once to satisfy dependent operation inputs.
266 # Attempt to inspect function signature. Introspection can fail, so we catch
267 # the various exceptions. We re-raise as ApiError, indicating a bug, because
268 # we should do more to handle this intelligently and provide better user
270 # TODO: Figure out what to do with exceptions where this introspection
271 # and rebinding won't work.
272 # ref: https://docs.python.org/3/library/inspect.html#introspecting-callables-with-the-signature-object
274 sig = inspect.signature(function)
275 except TypeError as T:
276 raise exceptions.ApiError('Can not inspect type of provided function argument.') from T
277 except ValueError as V:
278 raise exceptions.ApiError('Can not inspect provided function signature.') from V
280 wrapped_function = function_wrapper()(function)
282 @functools.wraps(function)
283 def new_function(*args, **kwargs):
284 # The signature of the new function will accept abstractions
285 # of whatever types it originally accepted. This wrapper must
286 # * Create a mapping to the original call signature from `input`
287 # * Add handling for typed abstractions in wrapper function.
288 # * Process arguments to the wrapper function into `input`
290 # 1. Inspect the return annotation to determine valid gmxapi type(s)
291 # 2. Generate a Result object advertising the correct type, bound to the
292 # Input and implementing function.
293 # 3. Transform the result() data to the correct type.
295 # TODO: (FR3+) create a serializable data structure for inputs discovered
296 # from function introspection.
298 for name, param in sig.parameters.items():
299 assert not param.kind == param.POSITIONAL_ONLY
300 bound_arguments = sig.bind(*args, **kwargs)
301 handle = wrapped_function(**bound_arguments.arguments)
302 output = handle.output
303 # TODO: Find a type hinting / generic way to assert output attributes.
309 class OutputCollectionDescription(collections.OrderedDict):
310 def __init__(self, **kwargs):
311 """Create the output description for an operation node from a dictionary of names and types."""
313 for name, flavor in kwargs.items():
314 if not isinstance(name, str):
315 raise exceptions.TypeError('Output descriptions are keyed by Python strings.')
316 # Multidimensional outputs are explicitly NDArray
317 if issubclass(flavor, (list, tuple)):
318 flavor = datamodel.NDArray
319 assert issubclass(flavor, valid_result_types)
320 outputs.append((name, flavor))
321 super().__init__(outputs)
324 class InputCollectionDescription(collections.OrderedDict):
325 """Describe acceptable inputs for an Operation.
327 Generally, an InputCollectionDescription is an aspect of the public API by
328 which an Operation expresses its possible inputs. This class includes details
329 of the Python package.
332 parameters : A sequence of named parameter descriptions.
334 Parameter descriptions are objects containing an `annotation` attribute
335 declaring the data type of the parameter and, optionally, a `default`
336 attribute declaring a default value for the parameter.
338 Instances can be used as an ordered map of parameter names to gmxapi data types.
340 Analogous to inspect.Signature, but generalized for gmxapi Operations.
341 Additional notable differences: typing is normalized at initialization, and
342 the bind() method does not return an object that can be directly used as
343 function input. The object returned from bind() is used to construct a data
344 graph Edge for subsequent execution.
347 def __init__(self, parameters: typing.Iterable[typing.Tuple[str, inspect.Parameter]]):
348 """Create the input description for an operation node from a dictionary of names and types."""
350 for name, param in parameters:
351 if not isinstance(name, str):
352 raise exceptions.TypeError('Input descriptions are keyed by Python strings.')
353 # Multidimensional inputs are explicitly NDArray
354 dtype = param.annotation
355 if issubclass(dtype, collections.abc.Iterable) \
356 and not issubclass(dtype, (str, bytes, collections.abc.Mapping)):
357 # TODO: we can relax this with some more input conditioning.
358 if dtype != datamodel.NDArray:
359 raise exceptions.UsageError(
360 'Cannot accept input type {}. Sequence type inputs must use NDArray.'.format(param))
361 assert issubclass(dtype, valid_result_types)
362 if hasattr(param, 'kind'):
363 disallowed = any([param.kind == param.POSITIONAL_ONLY,
364 param.kind == param.VAR_POSITIONAL,
365 param.kind == param.VAR_KEYWORD])
367 raise exceptions.ProtocolError(
368 'Cannot wrap function. Operations must have well-defined parameter names.')
371 kind = inspect.Parameter.POSITIONAL_OR_KEYWORD
372 if hasattr(param, 'default'):
373 default = param.default
375 default = inspect.Parameter.empty
376 inputs.append(inspect.Parameter(name, kind, default=default, annotation=dtype))
377 super().__init__([(input.name, input.annotation) for input in inputs])
378 self.signature = inspect.Signature(inputs)
381 def from_function(function):
382 """Inspect a function to be wrapped.
384 Used internally by gmxapi.operation.function_wrapper()
387 exceptions.ProtocolError if function signature cannot be determined to be valid.
390 InputCollectionDescription for the function input signature.
392 # First, inspect the function.
393 assert callable(function)
394 signature = inspect.signature(function)
395 # The function must have clear and static input schema
396 # Make sure that all parameters have clear names, whether or not they are used in a call.
397 for name, param in signature.parameters.items():
398 disallowed = any([param.kind == param.POSITIONAL_ONLY,
399 param.kind == param.VAR_POSITIONAL,
400 param.kind == param.VAR_KEYWORD])
402 raise exceptions.ProtocolError(
403 'Cannot wrap function. Operations must have well-defined parameter names.')
404 if param.name == 'input':
405 raise exceptions.ProtocolError('Function signature includes the (reserved) "input" keyword argument.')
406 description = collections.OrderedDict()
407 for param in signature.parameters.values():
408 if param.name == 'output':
409 # Wrapped functions may accept the output parameter to publish results, but
410 # that is not part of the Operation input signature.
412 if param.annotation == param.empty:
413 if param.default == param.empty or param.default is None:
414 raise exceptions.ProtocolError('Could not infer parameter type for {}'.format(param.name))
415 dtype = type(param.default)
416 if isinstance(dtype, collections.abc.Iterable) \
417 and not isinstance(dtype, (str, bytes, collections.abc.Mapping)):
418 dtype = datamodel.NDArray
420 dtype = param.annotation
421 description[param.name] = param.replace(annotation=dtype)
422 return InputCollectionDescription(description.items())
424 def bind(self, *args, **kwargs) -> DataSourceCollection:
425 """Create a compatible DataSourceCollection from provided arguments.
427 Pre-process input and function signature to get named input arguments.
429 This is a helper function to allow calling code to characterize the
430 arguments in a Python function call with hints from the factory that is
431 initializing an operation. Its most useful functionality is to allows a
432 factory to accept positional arguments where named inputs are usually
433 required. It also allows data sources to participate in multiple
434 DataSourceCollections with minimal constraints.
436 Note that the returned object has had data populated from any defaults
437 described in the InputCollectionDescription.
439 See wrapped_function_runner() and describe_function_input().
441 # For convenience, accept *args, but convert to **kwargs to pass to Operation.
442 # Factory accepts an unadvertised `input` keyword argument that is used as a default kwargs dict.
443 # If present, kwargs['input'] is treated as an input "pack" providing _default_ values.
444 input_kwargs = collections.OrderedDict()
445 if 'input' in kwargs:
446 provided_input = kwargs.pop('input')
447 if provided_input is not None:
448 input_kwargs.update(provided_input)
449 # `function` may accept an `output` keyword argument that should not be supplied to the factory.
450 for key, value in kwargs.items():
452 raise exceptions.UsageError('Invalid keyword argument: output (reserved).')
453 input_kwargs[key] = value
455 bound_arguments = self.signature.bind_partial(*args, **input_kwargs)
456 except TypeError as e:
457 raise exceptions.UsageError('Could not bind operation parameters to function signature.') from e
458 assert 'output' not in bound_arguments.arguments
459 bound_arguments.apply_defaults()
460 assert 'input' not in bound_arguments.arguments
461 input_kwargs = collections.OrderedDict([pair for pair in bound_arguments.arguments.items()])
462 if 'output' in input_kwargs:
463 input_kwargs.pop('output')
464 return DataSourceCollection(**input_kwargs)
467 class ProxyDataDescriptor(object):
468 """Base class for data descriptors used in DataProxyBase subclasses.
470 Subclasses should either not define __init__ or should call the base class
471 __init__ explicitly: super().__init__(self, name, dtype)
474 def __init__(self, name: str, dtype: ResultTypeVar = None):
476 # TODO: We should not allow dtype==None, but we currently have a weak data
477 # model that does not allow good support of structured Futures.
478 if dtype is not None:
479 assert isinstance(dtype, type)
480 assert issubclass(dtype, valid_result_types)
484 class DataProxyMeta(abc.ABCMeta):
485 # Key word arguments consumed by __prepare__
486 _prepare_keywords = ('descriptors',)
489 def __prepare__(mcs, name, bases, descriptors: collections.abc.Mapping = None):
490 """Allow dynamic sub-classing.
492 DataProxy class definitions are collections of data descriptors. This
493 class method allows subclasses to give the descriptor names and type(s)
494 in the class declaration as arguments instead of as class attribute
497 class MyProxy(DataProxyBase, descriptors={name: MyDescriptor() for name in datanames}): pass
500 Recent Python versions allow this to be replaced via ``__init_subclass__`` hook.
503 if descriptors is None:
505 elif isinstance(descriptors, tuple):
506 namespace = collections.OrderedDict([(d._name, d) for d in descriptors])
509 assert isinstance(descriptors, collections.abc.Mapping)
512 def __new__(cls, name, bases: typing.Iterable, namespace, **kwargs):
514 if key not in DataProxyMeta._prepare_keywords:
515 raise exceptions.ApiError('Unexpected class creation keyword: {}'.format(key))
516 # See note about DataProxyBase._reserved.
517 if '_reserved' not in namespace and not any(hasattr(base, '_reserved') for base in bases):
518 raise exceptions.ApiError(
519 'We currently expect DataProxy classes to provide a list of reserved attribute names.')
520 for key in namespace:
521 # Here we can check conformance with naming and typing rules.
522 assert isinstance(key, str)
523 if key.startswith('__'):
524 # Skip non-public attributes.
526 descriptor = namespace[key]
527 # The purpose of the current data proxies is to serve as a limited namespace
528 # containing only descriptors of a certain type. In the future, these proxies
529 # may be flattened into a facet of a richer OperationHandle class
530 # (this metaclass may become a decorator on an implementation class),
531 # but for now we check that the class is being used according to the
532 # documented framework. A nearer term update could be to restrict the
533 # type of the data descriptor:
534 # TODO: Use a member type of the derived cls (or a mix-in base) to specify a particular
535 # ProxyDataDescriptor subclass.
536 # Also, see note about DataProxyBase._reserved
537 if not isinstance(descriptor, ProxyDataDescriptor):
538 if key not in namespace['_reserved'] and not any(key in getattr(base, '_reserved') for base in
539 bases if hasattr(base, '_reserved')):
540 raise exceptions.ApiError('Unexpected data proxy attribute {}: {}'.format(key, repr(descriptor)))
542 assert isinstance(descriptor, ProxyDataDescriptor)
543 if not isinstance(descriptor._name, str) or descriptor._name == '':
544 descriptor._name = key
546 if descriptor._name != key:
547 raise exceptions.ApiError(
548 'Descriptor internal name {} does not match attribute name {}'.format(
549 descriptor._name, key))
550 return super().__new__(cls, name, bases, namespace)
552 # TODO: This keyword argument stripping is not necessary in more recent Python versions.
553 # When Python minimum required version is increased, check if we can remove this.
554 def __init__(cls, name, bases, namespace, **kwargs):
556 if key not in DataProxyMeta._prepare_keywords:
557 raise exceptions.ApiError('Unexpected class initialization keyword: {}'.format(key))
558 super().__init__(name, bases, namespace)
560 # TODO: See if we can use __dir__ in the metaclass to help hint class attributes for better tab completion.
561 # Ref: https://ipython.readthedocs.io/en/stable/config/integrating.html#tab-completion
562 # def __dir__(self) -> Iterable[str]:
563 # return super().__dir__()
566 class DataProxyBase(collections.abc.Mapping, metaclass=DataProxyMeta):
567 """Limited interface to managed resources.
569 Inherit from DataProxy to specialize an interface to an ``instance``.
570 In the derived class, either do not define ``__init__`` or be sure to
571 initialize the super class (DataProxy) with an instance of the object
574 A class deriving from DataProxyBase allows its instances to provide a namespace
575 for proxies to named data by defining attributes that are data descriptors
576 (subclasses of ProxyDataDescriptor).
577 The ProxyDataDescriptors are accessed as attributes of the
578 data proxy instance or by iterating on items(). Attributes that are not
579 ProxyDataDescriptors are possible, but will not be returned by items() which
580 is a necessary part of gmxapi execution protocol.
582 Acts as an owning handle to the resources provide by ``instance``,
583 preventing the reference count of ``instance`` from going to zero for the
584 lifetime of the proxy object.
586 When sub-classing DataProxyBase, data descriptors can be passed as a mapping
587 to the ``descriptors`` key word argument in the class declaration. This
588 allows data proxy subclasses to be easily defined dynamically.
590 mydescriptors = {'foo': Publisher('foo', int), 'data': Publisher('data', float)}
592 class MyDataProxy(DataProxyBase, descriptors=mydescriptors): pass
593 assert hasattr(MyDataProxy, 'foo')
596 # This class attribute (which subclasses are free to replace to augment) is an
597 # indication of a problem with the current data model. If we are allowing
598 # reserved words that would otherwise be valid data names, there is not a
599 # compelling reason for separate data proxy classes: we throw away the assertion
600 # that we are preparing a clean namespace and we could have accomplished the
601 # class responsibilities in the Operation handle with just descriptor classes.
602 # If we want the clean namespace, we should figure out how to keep this interface
603 # from growing and/or have some "hidden" internal interface.
604 _reserved = ('ensemble_width', 'items', '_reserved')
606 # This class can be expanded to be the attachment point for a metaclass for
607 # data proxies such as PublishingDataProxy or OutputDataProxy, which may be
608 # defined very dynamically and concisely as a set of Descriptors and a type()
610 # If development in this direction does not materialize, then this base
611 # class is not very useful and should be removed.
612 def __init__(self, instance: 'SourceResource', client_id: int = None):
613 """Get partial ownership of a resource provider.
616 instance : resource-owning object
617 client_id : identifier for client holding the resource handle (e.g. ensemble member id)
619 If client_id is not provided, the proxy scope is for all clients.
621 # TODO: Decide whether _resource_instance is public or not.
622 # Note: currently commonly needed for subclass implementations.
623 self._resource_instance = instance
624 # Developer note subclasses should handle self._client_identifier == None
625 self._client_identifier = client_id
626 # Collection is fixed by the time of instance creation, so cache it.
627 self.__keys = tuple([key for key, _ in self.items()])
628 self.__length = len(self.__keys)
631 def ensemble_width(self) -> int:
632 return self._resource_instance.width()
636 """Generator for tuples of attribute name and descriptor instance.
638 This almost certainly doesn't do quite what we want...
640 for name, value in cls.__dict__.items():
641 if isinstance(value, ProxyDataDescriptor):
644 def __getitem__(self, k):
646 return getattr(self, k)
652 for key in self.__keys:
656 class Publisher(ProxyDataDescriptor):
657 """Data descriptor for write access to a specific named data resource.
659 For a wrapped function receiving an ``output`` argument, provides the
660 accessors for an attribute on the object passed as ``output``. Maps
661 read and write access by the wrapped function to appropriate details of
662 the resource manager.
664 Used internally to implement settable attributes on PublishingDataProxy.
665 Allows PublishingDataProxy to be dynamically defined in the scope of the
666 operation.function_wrapper closure. Each named output is represented by
667 an instance of Publisher in the PublishingDataProxy class definition for
670 Ref: https://docs.python.org/3/reference/datamodel.html#implementing-descriptors
673 Relies on implementation details of ResourceManager.
676 def __get__(self, instance: DataProxyBase, owner):
678 # The current access has come through the class attribute of owner class
680 resource_manager = instance._resource_instance
681 client_id = instance._client_identifier
682 # TODO: Fix API scope.
683 # Either this class is a detail of the same implementation as ResourceManager,
684 # or we need to enforce that instance._resource_instance provides _data (or equivalent)
685 assert isinstance(resource_manager, ResourceManager)
686 if client_id is None:
687 return getattr(resource_manager._data, self._name)
689 return getattr(resource_manager._data, self._name)[client_id]
691 def __set__(self, instance: DataProxyBase, value):
692 resource_manager = instance._resource_instance
693 # TODO: Fix API scope.
694 # Either this class is a detail of the same implementation as ResourceManager,
695 # or we need to enforce that instance._resource_instance provides _data (or equivalent)
696 assert isinstance(resource_manager, ResourceManager)
697 client_id = instance._client_identifier
698 resource_manager.set_result(name=self._name, value=value, member=client_id)
701 return '{}(name={}, dtype={})'.format(self.__class__.__name__,
703 self._dtype.__qualname__)
706 def define_publishing_data_proxy(output_description) -> typing.Type[DataProxyBase]:
707 """Returns a class definition for a PublishingDataProxy for the provided output description."""
708 # This dynamic type creation hides collaborations with things like make_datastore.
709 # We should encapsulate these relationships in Context details, explicit collaborations
710 # between specific operations and Contexts, and in groups of Operation definition helpers.
712 descriptors = collections.OrderedDict([(name, Publisher(name)) for name in output_description])
714 class PublishingDataProxy(DataProxyBase, descriptors=descriptors):
715 """Handler for write access to the `output` of an operation.
717 Acts as a sort of PublisherCollection.
720 return PublishingDataProxy
723 # get symbols we can use to annotate input and output types more specifically.
724 _OutputDataProxyType = typing.TypeVar('_OutputDataProxyType', bound=DataProxyBase)
725 _PublishingDataProxyType = typing.TypeVar('_PublishingDataProxyType', bound=DataProxyBase)
726 # Currently, the ClientID type is an integer, but this may change.
727 ClientID = typing.NewType('ClientID', int)
730 class _Resources(typing.Generic[_PublishingDataProxyType]):
734 # TODO: Why generic in publishingdataproxytype?
735 class SourceResource(typing.Generic[_OutputDataProxyType, _PublishingDataProxyType]):
736 """Resource Manager for a data provider.
738 Supports Future instances in a particular context.
741 # Note: ResourceManager members not yet included:
742 # future(), _data, set_result.
744 # This might not belong here. Maybe separate out for a OperationHandleManager?
746 def data(self) -> _OutputDataProxyType:
747 """Get the output data proxy."""
748 # Warning: this should probably be renamed, but "output_data_proxy" is already
749 # a member in at least one derived class.
753 def is_done(self, name: str) -> bool:
757 def get(self, name: str) -> 'OutputData':
761 def update_output(self):
762 """Bring the _data member up to date and local."""
767 """Recursively reinitialize resources.
769 Set the resource manager to its initialized state.
770 All outputs are marked not "done".
771 All inputs supporting the interface have ``_reset()`` called on them.
775 def width(self) -> int:
776 """Ensemble width of the managed resources."""
780 def future(self, name: str, description: ResultDescription) -> 'Future':
781 """Get a Future handle for managed data.
783 Resource managers owned by subclasses of gmx.operation.Context provide
784 this method to get references to output data.
786 In addition to the interface described by gmx.abc.Future, returned objects
787 provide the interface described by gmx.operation.Future.
791 class StaticSourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]):
792 """Provide the resource manager interface for local static data.
794 Allow data transformations on the proxied resource.
797 proxied_data: A gmxapi supported data object.
798 width: Size of (one-dimensional) shaped data produced by function.
799 function: Transformation to perform on the managed data.
801 The callable passed as ``function`` must accept a single argument. The
802 argument will be an iterable when proxied_data represents an ensemble,
803 or an object of the same type as proxied_data otherwise.
806 def __init__(self, *, name: str, proxied_data, width: int, function: typing.Callable):
807 assert not isinstance(proxied_data, Future)
808 if hasattr(proxied_data, 'width'):
809 # Ensemble data source
810 assert hasattr(proxied_data, 'source')
811 self._result = function(proxied_data.source)
813 self._result = function(proxied_data)
815 if isinstance(self._result, (str, bytes)):
816 # In this case, do not implicitly broadcast
817 raise exceptions.ValueError('"function" produced data incompatible with "width".')
819 if not isinstance(self._result, collections.abc.Iterable):
820 raise exceptions.DataShapeError(
821 'Expected iterable of size {} but "function" result is not iterable.')
822 data = list(self._result)
824 if len(data) != width:
825 raise exceptions.DataShapeError(
826 'Expected iterable of size {} but "function" produced a {} of size {}'.format(width, type(data),
828 dtype = type(data[0])
831 raise exceptions.ValueError('width must be an integer 1 or greater.')
832 dtype = type(self._result)
833 if issubclass(dtype, (list, tuple)):
834 dtype = datamodel.NDArray
835 data = [datamodel.ndarray(self._result)]
836 elif isinstance(self._result, collections.abc.Iterable):
837 if not isinstance(self._result, (str, bytes, dict)):
838 raise exceptions.ValueError(
839 'Expecting width 1 but "function" produced iterable type {}.'.format(type(self._result)))
842 data = [str(self._result)]
844 data = [self._result]
845 description = ResultDescription(dtype=dtype, width=width)
846 self._data = OutputData(name=name, description=description)
847 for member in range(width):
848 self._data.set(data[member], member=member)
850 output_collection_description = OutputCollectionDescription(**{name: dtype})
851 self.output_data_proxy = define_output_data_proxy(output_description=output_collection_description)
853 def is_done(self, name: str) -> bool:
856 def get(self, name: str) -> 'OutputData':
857 assert self._data.name == name
860 def data(self) -> _OutputDataProxyType:
861 return self.output_data_proxy(self)
863 def width(self) -> int:
864 # TODO: It looks like the OutputData ResultDescription probably belongs
865 # in the public interface.
866 return self._data._description.width
868 def update_output(self):
874 def future(self, name: str, description: ResultDescription) -> 'Future':
875 return Future(self, name, description=description)
878 class ProxyResourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]):
879 """Act as a resource manager for data managed by another resource manager.
881 Allow data transformations on the proxied resource.
884 proxied_future: An object implementing the Future interface.
885 width: Size of (one-dimensional) shaped data produced by function.
886 function: Transformation to perform on the result of proxied_future.
888 The callable passed as ``function`` must accept a single argument, which will
889 be an iterable when proxied_future represents an ensemble, or an object of
890 type proxied_future.description.dtype otherwise.
893 def __init__(self, proxied_future: 'Future', width: int, function: typing.Callable):
895 self._proxied_future = proxied_future
897 self.name = self._proxied_future.name
899 assert callable(function)
900 self.function = function
902 def width(self) -> int:
907 self._proxied_future._reset()
910 def is_done(self, name: str) -> bool:
913 def get(self, name: str):
914 if name != self.name:
915 raise exceptions.ValueError('Request for unknown data.')
916 if not self.is_done(name):
917 raise exceptions.ProtocolError('Data not ready.')
918 result = self.function(self._result)
920 # TODO Fix this typing nightmare:
921 # ResultDescription should be fully knowable and defined when the resource manager is initialized.
922 data = OutputData(name=self.name, description=ResultDescription(dtype=type(result[0]), width=self._width))
923 for member, value in enumerate(result):
924 data.set(value, member)
926 data = OutputData(name=self.name, description=ResultDescription(dtype=type(result), width=self._width))
930 def update_output(self):
931 self._result = self._proxied_future.result()
934 def data(self) -> _OutputDataProxyType:
935 raise exceptions.ApiError('ProxyResourceManager cannot yet manage a full OutputDataProxy.')
937 def future(self, name: str, description: ResultDescription):
938 return Future(self, name, description=description)
941 class AbstractOperation(typing.Generic[_OutputDataProxyType]):
942 """Client interface to an operation instance (graph node).
944 Note that this is a generic abstract class. Subclasses should provide a
945 class subscript to help static type checkers.
950 """Assert execution of an operation.
952 After calling run(), the operation results are guaranteed to be available
953 in the local context.
958 def output(self) -> _OutputDataProxyType:
959 """Get a proxy collection to the output of the operation.
961 Developer note: The 'output' property exists to isolate the namespace of
962 output data from other operation handle attributes and we should consider
963 whether it is actually necessary or helpful. To facilitate its possible
964 future removal, do not enrich its interface beyond that of a collection
965 of OutputDescriptor attributes.
970 class OperationRegistryKey(collections.namedtuple('OperationRegistryKey', 'namespace name'), collections.abc.Hashable):
971 """Helper class to define the key type for OperationRegistry."""
974 return hash((self.namespace, self.name))
976 def __eq__(self, other):
977 """Test equivalence rather than identity.
979 Note: Use `is` to test identity.
981 return other.namespace == self.namespace and other.name == self.name
984 return '.'.join([self.namespace, self.name])
987 return '{}(namespace={}, name={})'.format(self.__qualname__, self.namespace, self.name)
990 def _make_registry_key(*args) -> OperationRegistryKey:
991 """Normalize input to an OperationRegistryKey.
993 Used to implement OperationRegistry.__getitem__(), which catches and converts
994 the various exceptions this helper function can produce.
997 return OperationRegistryKey(*args)
1000 raise exceptions.UsageError('Empty index value passed to OperationRegistry instance[].')
1002 if isinstance(item, OperationRegistryKey):
1004 if isinstance(item, str):
1005 namespace, name = item.rsplit(sep='.', maxsplit=1)
1006 return OperationRegistryKey(namespace=namespace, name=name)
1007 # Item could be a class object or an instance object...
1008 if hasattr(item, 'namespace') and hasattr(item, 'name'):
1009 if callable(item.namespace):
1010 namespace = item.namespace()
1012 namespace = item.namespace
1013 if callable(item.name):
1017 return OperationRegistryKey(namespace=namespace, name=name)
1018 raise exceptions.ValueError('Not a usable OperationRegistryKey: {}'.format(item))
1021 class OperationRegistry(collections.UserDict):
1022 """Helper class to map identifiers to Operation implementation instances.
1024 This is an implementation detail of gmxapi.operation and should not be used from
1025 outside of the package until a stable interface can be specified.
1029 def __getitem__(self, item: OperationRegistryKey):
1033 def __getitem__(self, item: str):
1036 def __getitem__(self, *args):
1037 """Fetch the requested operation registrant.
1039 The getter is overloaded to support look-ups in multiple forms.
1041 The key can be given in the following forms.
1042 * As a period-delimited string of the form "namespace.operation".
1043 * As an OperationRegistryKey object.
1044 * As a sequence of positional arguments accepted by OperationRegistryKey.
1045 * As an object conforming to the OperationRegistryKey interface.
1048 item = _make_registry_key(*args)
1049 except exceptions.Error as e:
1050 raise exceptions.TypeError('Could not interpret key as a OperationRegistryKey.') from e
1051 return self.data[item]
1054 # Module level data store for locating operation implementations at run time.
1055 # TODO: This may make sense as instance data of a root Context instance, but we
1056 # don't have a stable interface for communicating between Contexts yet.
1057 # Alternatively, it could be held as class data in a RegisteredOperation class,
1058 # but note that the "register" member function would behave less like the abc.ABCMeta
1059 # support for "virtual subclassing" and more like the generic function machinery
1060 # of e.g. functools.singledispatch.
1061 _operation_registry = OperationRegistry()
1064 def _register_operation(cls: typing.Type[OperationImplementation]):
1065 assert isinstance(cls, type)
1066 assert issubclass(cls, OperationImplementation)
1067 operation = _make_registry_key(cls)
1068 if operation in _operation_registry:
1069 full_name = str(operation)
1070 raise exceptions.ProtocolError('Attempting to redefine operation {}.'.format(full_name))
1071 _operation_registry[operation] = cls
1074 # TODO: replace with a generic function that we dispatch on so the type checker can infer a return type.
1075 def _get_operation_director(operation, context: gmx.abc.Context):
1080 :return: gmxapi.abc.OperationDirector
1082 registrant = _operation_registry[operation]
1083 director = registrant.director(context=context)
1087 class InputDescription(abc.ABC):
1088 """Node input description for gmxapi.operation module.
1090 Provides the support needed to understand operation inputs in gmxapi.operation
1093 .. todo:: Implementation base class with heritable behavior and/or helpers to
1094 compose this functionality from more normative description of
1095 operation inputs. This will probably become a facet of the ResourceDirector
1096 when specialized for gmxapi.operation.Context.
1100 def signature(self) -> InputCollectionDescription:
1101 """Mapping of named inputs and input type.
1103 Used to determine valid inputs before an Operation node is created.
1106 Related to the operation resource factory for this context.
1109 Better unification of this protocol, InputCollectionDescription, and
1111 Note, also, that the *bind* method of the returned InputCollectionDescription
1112 serves as the resource factory for input to the node builder.
1117 def make_uid(self, input: 'DataEdge') -> str:
1118 """The unique identity of an operation node tags the output with respect to the input.
1120 Combines information on the Operation details and the input to uniquely
1121 identify the Operation node.
1124 input : A (collection of) data source(s) that can provide Fingerprints.
1126 Used internally by the Context to manage ownership of data sources, to
1127 locate resources for nodes in work graphs, and to manage serialization,
1128 deserialization, and checkpointing of the work graph.
1130 The UID is a detail of the generic Operation that _should_ be independent
1131 of the Context details to allow the framework to manage when and where
1132 an operation is executed.
1134 TODO: We probably don't want to allow Operations to single-handedly determine their
1135 own uniqueness, but they probably should participate in the determination with the Context.
1137 TODO: Context implementations should be allowed to optimize handling of
1138 equivalent operations in different sessions or work graphs, but we do not
1139 yet guarantee that UIDs are globally unique!
1146 class ConcreteInputDescription(InputDescription):
1147 """Simple composed InputDescription."""
1150 input_signature: InputCollectionDescription,
1151 uid_helper: typing.Callable[['DataEdge'], str]
1153 self._input_signature_description = input_signature
1154 self._uid_helper = uid_helper
1156 def signature(self) -> InputCollectionDescription:
1157 return self._input_signature_description
1159 def make_uid(self, input: 'DataEdge') -> str:
1160 return self._uid_helper(input)
1163 class OperationMeta(abc.ABCMeta):
1164 """Metaclass to manage the definition of Operation implementation classes.
1167 Note that this metaclass can be superseded by `__init_subclass__()`.
1172 def __new__(meta, name, bases, class_dict):
1173 cls = super().__new__(meta, name, bases, class_dict)
1174 # Register subclasses, but not the base class.
1175 if issubclass(cls, OperationImplementation) and cls is not OperationImplementation:
1176 # TODO: Remove OperationDetailsBase layer and this extra check.
1177 # Note: we do not yet register the Operations built dynamically because we
1178 # don't have a clear definition of unique implementations yet. For instance,
1179 # a new OperationDetails class is defined for each call to gmx.join_arrays
1180 # TODO: Properly register and reuse Operations defined dynamically
1181 # through function_wrapper (currently encompassed by OperationDetailsBase subclasses)
1182 if name != 'OperationDetailsBase':
1183 if OperationDetailsBase not in bases:
1184 _register_operation(cls)
1188 class OperationDetailsBase(OperationImplementation, InputDescription,
1189 metaclass=OperationMeta):
1190 """Abstract base class for Operation details in this module's Python Context.
1192 Provides necessary interface for use with gmxapi.operation.ResourceManager.
1193 Separates the details of an Operation from those of the ResourceManager in
1196 OperationDetails classes are almost stateless, serving mainly to compose implementation
1197 details. Instances (operation objects) provide the Context-dependent interfaces
1198 for a specific node in a work graph.
1200 OperationDetails subclasses are created dynamically by function_wrapper and
1203 Developer note: when subclassing, note that the ResourceManager is responsible
1204 for managing Operation state. Do not add instance data members related to
1205 computation or output state.
1207 TODO: determine what is acceptable instance data and/or initialization information.
1208 Note that currently the subclass in function_wrapper has _no_ initialization input,
1209 but does not yet handle input-dependent output specification or node fingerprinting.
1210 It seems likely that instance initialization will require some characterization of
1211 supplied input, but nothing else. Even that much is not necessary if the instance
1212 is completely stateless, but that would require additional parameters to the member
1213 functions. However, an instance should be tied to a specific ResourceManager and
1214 Context, so weak references to these would be reasonable.
1218 def output_description(self) -> OutputCollectionDescription:
1219 """Mapping of available outputs and types for an existing Operation node."""
1223 def publishing_data_proxy(self, *,
1224 instance: SourceResource[typing.Any, _PublishingDataProxyType],
1225 client_id) -> _PublishingDataProxyType:
1226 """Factory for Operation output publishing resources.
1228 Used internally when the operation is run with resources provided by instance."""
1232 def output_data_proxy(self,
1233 instance: SourceResource[_OutputDataProxyType, typing.Any]
1234 ) -> _OutputDataProxyType:
1235 """Get an object that can provide Futures for output data managed by instance."""
1239 def __call__(self, resources: _Resources):
1240 """Execute the operation with provided resources.
1242 Resources are prepared in an execution context with aid of resource_director()
1244 After the first call, output data has been published and is trivially
1245 available through the output_data_proxy()
1251 def resource_director(cls, *, input, output: _PublishingDataProxyType) -> _Resources[_PublishingDataProxyType]:
1252 """a Director factory that helps build the Session Resources for the function.
1254 The Session launcher provides the director with all of the resources previously
1255 requested/negotiated/registered by the Operation. The director uses details of
1256 the operation to build the resources object required by the operation runner.
1258 For the Python Context, the protocol is for the Context to call the
1259 resource_director instance method, passing input and output containers.
1260 (See, for example, gmxapi.operation.PyFunctionRunnerResources)
1264 # TODO: Don't run the director. Just return the correct callable.
1266 def operation_director(cls, *args, context: 'Context', label=None, **kwargs) -> AbstractOperation:
1267 """Dispatching Director for adding a work node.
1269 A Director for input of a particular sort knows how to reconcile
1270 input with the requirements of the Operation and Context node builder.
1271 The Director (using a less flexible / more standard interface)
1272 builds the operation node using a node builder provided by the Context.
1274 This is essentially the creation method, instead of __init__, but the
1275 object is created and owned by the framework, and the caller receives
1276 an OperationHandle instead of a reference to an instance of cls.
1278 # TODO: We need a way to compose this functionality for arbitrary Contexts.
1279 # That likely requires traits on the Contexts, and registration of Context
1280 # implementations. It seems likely that an Operation will register Director
1281 # implementations on import, and dispatching will be moved to the Context
1282 # implementations, which can either find an appropriate OperationDirector
1283 # or raise a compatibility error. To avoid requirements on import order of
1284 # Operations and Context implementations, we can change this to a non-abstract
1285 # dispatching method, requiring registration in the global gmxapi.context
1286 # module, or get rid of this method and use something like pkg_resources
1287 # "entry point" groups for independent registration of Directors and Contexts,
1288 # each annotated with relevant traits. E.g.:
1289 # https://setuptools.readthedocs.io/en/latest/setuptools.html#dynamic-discovery-of-services-and-plugins
1291 if not isinstance(context, Context):
1292 raise exceptions.UsageError('Context instance needed for dispatch.')
1293 # TODO: use Context characteristics rather than isinstance checks.
1294 if isinstance(context, ModuleContext):
1295 construct = OperationDirector(*args, operation_details=cls, context=context, label=label, **kwargs)
1297 elif isinstance(context, SubgraphContext):
1298 construct = OperationDirector(*args, operation_details=cls, context=context, label=label, **kwargs)
1301 raise exceptions.ApiError('Cannot dispatch operation_director for context {}'.format(context))
1304 # TODO: Implement observer pattern for edge->node data flow.
1305 # Step 0: implement subject interface subscribe()
1306 # Step 1: implement subject interface get_state()
1307 # Step 2: implement observer interface update()
1308 # Step 3: implement subject interface notify()
1309 # Step 4: implement observer hook to support substantial change in source that
1310 # invalidates downstream fingerprinting, such as a new subgraph iteration.
1311 # class Observer(object):
1312 # """Abstract base class for data observers."""
1313 # def rebind(self, edge: DataEdge):
1314 # """Recreate the Operation at the consuming end of the DataEdge."""
1317 class Future(_Future):
1318 """gmxapi data handle.
1320 Future is currently more than a Future right now. (should be corrected / clarified.)
1321 Future is also a facade to other features of the data provider.
1323 Future objects are the most atomic interface between Contexts. User scripts
1324 may hold Futures from which they extract data with result(). Operation output
1325 used as input for another Operation can be decomposed such that the Operation
1326 factory has only Future objects in its input.
1328 TODO: ``subscribe`` method allows consumers to bind as Observers.
1330 TODO: extract the abstract class for input inspection?
1331 Currently abstraction is handled through SourceResource subclassing.
1334 description (ResultDescription): Describes the result to be obtained from this Future.
1338 def __init__(self, resource_manager: SourceResource, name: str, description: ResultDescription):
1340 if not isinstance(description, ResultDescription):
1341 raise exceptions.ValueError('Need description of requested data.')
1342 self.description = description # type: ResultDescription
1343 self.resource_manager = resource_manager
1345 # Deprecated. We should not "reset" futures, but reconstitute them, but we
1346 # need to move the data model to a subscription-based system so that we can
1347 # make Futures properly immutable and issue new ones across subgraph iterations.
1348 self._number_of_resets = 0
1350 def __eq__(self, other):
1351 # This function is defined because we have defined __hash__().
1352 # Before customizing __eq__(), recall that Python objects that compare
1353 # equal should hash to the same value.
1354 # Please keep the two functions semantically correct.
1355 return object.__eq__(self, other)
1358 # We cannot properly determine equivalency beyond the scope of a ResourceManager instance
1359 # without more developed data flow fingerprinting.
1360 return hash((id(self.resource_manager), self.name, self.description, self._number_of_resets))
1363 return '<Future: name={}, description={}>'.format(self.name, self.description)
1365 def result(self) -> ResultTypeVar:
1366 """Fetch data to the caller's Context.
1368 Returns an object of the concrete type specified according to
1369 the operation that produces this Result.
1371 Ensemble data are returned as a list. Scalar results or results from single
1372 member ensembles are returned as scalars.
1374 self.resource_manager.update_output()
1375 # Return ownership of concrete data
1376 handle = self.resource_manager.get(self.name)
1378 # For intuitive use in non-ensemble cases, we represent data as bare scalars
1379 # when possible. It is easier for users to cast scalars to lists of length 1
1380 # than to introspect their own code to determine if a list of length 1 is
1381 # part of an ensemble or not. The data model will become clearer as we
1382 # develop more robust handling of multidimensional data and data flow topologies.
1384 # we may distinguish between data of shape () and shape (1,), but we will need
1385 # to be careful with semantics. We are already starting to adopt a rule-of-thumb
1386 # that data objects assume the minimum dimensionality necessary unless told
1387 # otherwise, and we could make that a hard rule if it doesn't make other things
1389 if self.description.width == 1:
1390 return handle.data(member=0)
1392 return handle.data()
1395 """Mark the Future "not done" to allow reexecution.
1397 Invalidates cached results, resets "done" markers in data sources, and
1398 triggers _reset recursively.
1400 Note: this is a hack that is inconsistent with the plan of unique mappings
1401 of inputs to outputs, but allows a quick prototype for looping operations.
1403 self._number_of_resets += 1
1404 self.resource_manager.reset()
1408 return self.description.dtype
1410 def __getitem__(self, item):
1411 """Get a more limited view on the Future."""
1412 description = ResultDescription(dtype=self.dtype, width=self.description.width)
1413 # TODO: Use explicit typing when we have more thorough typing.
1414 description._dtype = None
1415 if self.description.width == 1:
1416 proxy = ProxyResourceManager(self,
1417 width=description.width,
1418 function=lambda value, key=item: value[key])
1420 proxy = ProxyResourceManager(self,
1421 width=description.width,
1422 function=lambda value, key=item:
1423 [subscriptable[key] for subscriptable in value])
1424 return proxy.future(self.name, description=description)
1427 class OutputDataDescriptor(ProxyDataDescriptor):
1428 """Read-only data descriptor for proxied access to output data.
1430 Knows how to get a Future from the resource manager.
1433 # TODO: Reconcile the internal implementation details with the visibility and
1434 # usages of this class.
1436 def __get__(self, proxy: DataProxyBase, owner):
1438 # Access through class attribute of owner class
1440 result_description = ResultDescription(dtype=self._dtype, width=proxy.ensemble_width)
1442 return proxy._resource_instance.future(name=self._name, description=result_description)
1445 class MutableResourceDescriptor(ProxyDataDescriptor):
1446 """Accessor for rich binding interfaces.
1448 Allows operations to access resources beyond the scope of the current
1449 resource manager. Used by operations whose interactions are more complicated
1450 than standard typed data flow at the scope of the current Context.
1452 Instead of a Future interface, the returned object is a MutableResource with
1453 which a subscriber can collaborate with lower-level protocols.
1456 def __get__(self, proxy: DataProxyBase, owner) -> typing.Union[MutableResource, 'MutableResourceDescriptor']:
1458 # Access through class attribute of owner class. We don't have a
1459 # specified use case for that, so allow inspection of the data
1460 # descriptor instance, itself.
1463 # The protocol for MD extension plugins requires that the simulation operation
1464 # subscribe to the plugin. Then the Context allows the plugin to access the
1465 # MdRunner interface as the simulation is launched.
1466 # The protocol for modify_input and for mdrun to consume the TPR payload
1467 # of read_tpr or modify_input should allow us to use the gmxapi 0.0.7
1468 # WorkSpec to configure and launch a simulation, which we can do by feeding
1469 # forward and building a fused operation at the mdrun node. The information
1470 # fed forward can just be references to the inputs and parameters of the
1471 # earlier operations, with annotations so that we know the intended behavior.
1474 def define_output_data_proxy(output_description: OutputCollectionDescription) -> typing.Type[DataProxyBase]:
1475 descriptors = {name: OutputDataDescriptor(name, description) for name, description in output_description.items()}
1477 class OutputDataProxy(DataProxyBase, descriptors=descriptors):
1478 """Handler for read access to the `output` member of an operation handle.
1480 Acts as a sort of ResultCollection.
1482 A ResourceManager creates an OutputDataProxy instance at initialization to
1483 provide the ``output`` property of an operation handle.
1486 # Note: the OutputDataProxy has an inherent ensemble shape in the context
1487 # in which it is used, but that is an instance characteristic, not part of this type definition.
1488 # TODO: (FR5) The current tool does not support topology changing operations.
1489 return OutputDataProxy
1492 # Encapsulate the description of the input data flow.
1493 PyFuncInput = collections.namedtuple('Input', ('args', 'kwargs', 'dependencies'))
1496 class SinkTerminal(object):
1497 """Operation input end of a data edge.
1499 In addition to the information in an InputCollectionDescription, includes
1500 topological information for the Operation node (ensemble width).
1502 Collaborations: Required for creation of a DataEdge. Created with knowledge
1503 of a DataSourceCollection instance and a InputCollectionDescription.
1506 # TODO: This clearly maps to a Builder pattern.
1507 # I think we want to get the sink terminal builder from a factory parameterized by InputCollectionDescription,
1508 # add data source collections, and then build the sink terminal for the data edge.
1509 def __init__(self, input_collection_description: InputCollectionDescription):
1510 """Define an appropriate data sink for a new operation node.
1512 Resolve data sources and input description to determine connectability,
1513 topology, and any necessary implicit data transformations.
1515 :param input_collection_description: Available inputs for Operation
1516 :return: Fully formed description of the Sink terminal for a data edge to be created.
1518 Collaborations: Execution Context implementation.
1520 self.ensemble_width = 1
1521 self.inputs = input_collection_description
1524 return '<SinkTerminal: ensemble_width={}>'.format(self.ensemble_width)
1526 def update_width(self, width: int):
1527 if not isinstance(width, int):
1531 raise exceptions.TypeError('Need an integer width > 0.')
1533 raise exceptions.ValueError('Nonsensical ensemble width: {}'.format(int(width)))
1534 if self.ensemble_width != 1:
1535 if width != self.ensemble_width:
1536 raise exceptions.ValueError(
1537 'Cannot change ensemble width {} to width {}.'.format(self.ensemble_width, width))
1538 self.ensemble_width = width
1540 def update(self, data_source_collection: DataSourceCollection):
1541 """Update the SinkTerminal with the proposed data provider."""
1542 for name, sink_dtype in self.inputs.items():
1543 if name not in data_source_collection:
1544 # If/when we accept data from multiple sources, we'll need some additional sanity checking.
1545 if not hasattr(self.inputs.signature.parameters[name], 'default'):
1546 raise exceptions.UsageError('No data or default for {}'.format(name))
1548 # With a single data source, we need data to be in the source or have a default
1549 assert name in data_source_collection
1550 assert issubclass(sink_dtype, valid_result_types)
1551 source = data_source_collection[name]
1552 logger.debug('Updating Sink for source {}: {}.'.format(name, source))
1553 if isinstance(source, sink_dtype):
1554 logger.debug('Source matches sink. No update necessary.')
1557 if isinstance(source, collections.abc.Iterable) and not isinstance(source, (
1558 str, bytes, collections.abc.Mapping)):
1559 assert isinstance(source, datamodel.NDArray)
1560 if sink_dtype != datamodel.NDArray:
1561 # Source is NDArray, but sink is not. Implicitly scatter.
1562 self.update_width(len(source))
1564 if hasattr(source, 'description'):
1565 source_description = typing.cast(ResultDescription, source.description)
1566 source_dtype = source_description.dtype
1567 assert isinstance(sink_dtype, type)
1568 # TODO: Handle typing of Future slices when we have a better data model.
1569 if source_dtype is not None:
1570 assert isinstance(source_dtype, type)
1571 if not issubclass(source_dtype, sink_dtype):
1572 raise exceptions.TypeError('Expected {} but got {}.'.format(sink_dtype, source_dtype))
1573 source_width = source.description.width
1574 self.update_width(source_width)
1577 class DataEdge(object):
1578 """State and description of a data flow edge.
1580 A DataEdge connects a data source collection to a data sink. A sink is an
1581 input or collection of inputs of an operation (or fused operation). An operation's
1582 inputs may be fed from multiple data source collections, but an operation
1583 cannot be fully instantiated until all of its inputs are bound, so the DataEdge
1584 is instantiated at the same time the operation is instantiated because the
1585 required topology of a graph edge may be determined by the required topology
1586 of another graph edge.
1588 A data edge has a well-defined topology only when it is terminated by both
1589 a source and sink. Creation requires that a source collection is compared to
1592 Calling code initiates edge creation by passing well-described data sources
1593 to an operation factory. The data sources may be annotated with explicit scatter
1596 The resource manager for the new operation determines the
1597 required shape of the sink to handle all of the offered input.
1600 and transformations of the data sources are then determined and the edge is
1603 At that point, the fingerprint of the input data at each operation
1604 becomes available to the resource manager for the operation. The fingerprint
1605 has sufficient information for the resource manager of the operation to
1606 request and receive data through the execution context.
1608 Instantiating operations and data edges implicitly involves collaboration with
1609 a Context instance. The state of a given Context or the availability of a
1610 default Context through a module function may affect the ability to instantiate
1611 an operation or edge. In other words, behavior may be different for connections
1612 being made in the scripting environment versus the running Session, and implementation
1613 details can determine whether or not new operations or data flow can occur in
1614 different code environments.
1617 class ConstantResolver(object):
1618 def __init__(self, value):
1621 def __call__(self, member=None):
1624 def __init__(self, source_collection: DataSourceCollection, sink_terminal: SinkTerminal):
1625 # Adapters are callables that transform a source and node ID to local data.
1626 # Every key in the sink has an adapter.
1628 self.source_collection = source_collection
1629 self.sink_terminal = sink_terminal
1630 for name in sink_terminal.inputs:
1631 if name not in source_collection:
1632 if hasattr(sink_terminal.inputs[name], 'default'):
1633 self.adapters[name] = self.ConstantResolver(sink_terminal.inputs[name])
1635 # TODO: Initialize with multiple DataSourceCollections?
1636 raise exceptions.ValueError('No source or default for required input "{}".'.format(name))
1638 source = source_collection[name]
1639 sink = sink_terminal.inputs[name]
1640 if isinstance(source, (str, bool, int, float, dict)):
1641 if issubclass(sink, (str, bool, int, float, dict)):
1642 self.adapters[name] = self.ConstantResolver(source)
1644 assert issubclass(sink, datamodel.NDArray)
1645 self.adapters[name] = self.ConstantResolver(datamodel.ndarray([source]))
1646 elif isinstance(source, datamodel.NDArray):
1647 if issubclass(sink, datamodel.NDArray):
1648 # TODO: shape checking
1649 # Implicit broadcast may not be what is intended
1650 self.adapters[name] = self.ConstantResolver(source)
1652 if source.shape[0] != sink_terminal.ensemble_width:
1653 raise exceptions.ValueError(
1654 'Implicit broadcast could not match array source to ensemble sink')
1656 self.adapters[name] = lambda member, source=source: source[member]
1657 elif hasattr(source, 'result'):
1658 # Handle data futures...
1659 # If the Future is part of an ensemble, result() will return a list.
1660 # Otherwise, it will return a single object.
1661 ensemble_width = source.description.width
1662 # TODO: subscribe to futures so results can be pushed.
1663 if ensemble_width == 1:
1664 self.adapters[name] = lambda member, source=source: source.result()
1666 self.adapters[name] = lambda member, source=source: source.result()[member]
1668 assert isinstance(source, EnsembleDataSource)
1669 self.adapters[name] = lambda member, source=source: source.node(member)
1672 return '<DataEdge: source_collection={}, sink_terminal={}>'.format(self.source_collection, self.sink_terminal)
1675 self.source_collection.reset()
1677 def resolve(self, key: str, member: int):
1678 return self.adapters[key](member=member)
1680 def sink(self, node: int) -> dict:
1681 """Consume data for the specified sink terminal node.
1683 Run-time utility delivers data from the bound data source(s) for the
1684 specified terminal that was configured when the edge was created.
1686 Terminal node is identified by a member index number.
1689 A Python dictionary of the provided inputs as local data (not Future).
1692 sink_ports = self.sink_terminal.inputs
1693 for key in sink_ports:
1694 results[key] = self.resolve(key, node)
1698 class ResourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]):
1699 """Provides data publication and subscription services.
1701 Owns the data published by the operation implementation or served to consumers.
1702 Mediates read and write access to the managed data streams.
1704 This ResourceManager implementation is defined in conjunction with a
1705 run-time definition of an Operation that wraps a Python callable (function).
1706 ResourceManager is instantiated with a reference to the callable.
1708 When the Operation is run, the resource manager prepares resources for the wrapped
1709 function. Inputs provided to the Operation factory are provided to the
1710 function as keyword arguments. The wrapped function publishes its output
1711 through the (additional) ``output`` key word argument. This argument is
1712 a short-lived resource, prepared by the ResourceManager, with writable
1713 attributes named in the call to function_wrapper().
1715 After the Operation has run and the outputs published, the data managed
1716 by the ResourceManager is marked "done."
1720 The data() method produces a read-only collection of outputs named for
1721 the Operation when the Operation's ``output`` attribute is accessed.
1723 publishing_resources() can be called once during the ResourceManager lifetime
1724 to provide the ``output`` object for the wrapped function. (Used by update_output().)
1726 update_output() brings the managed output data up-to-date with the input
1727 when the Operation results are needed. If the Operation has not run, an
1728 execution session is prepared with input and output arguments for the
1729 wrapped Python callable. Output is publishable only during this session.
1731 TODO: This functionality should evolve to be a facet of Context implementations.
1732 There should be no more than one ResourceManager instance per work graph
1733 node in a Context. This will soon be at odds with letting the ResourceManager
1734 be owned by an operation instance handle.
1735 TODO: The publisher and data objects can be more strongly defined through
1736 interaction between the Context and clients.
1740 The normative pattern for updating data is to execute a node in the work
1741 graph, passing Resources for an execution Session to an operation runner.
1742 The resources and runner are dependent on the implementation details of
1743 the operation and the execution context, so logical execution may look
1746 resource_builder = ResourcesBuilder()
1747 runner_builder = RunnerBuilder()
1748 input_resource_director = input_resource_factory.director(input)
1749 output_resource_director = publishing_resource_factory.director(output)
1750 input_resource_director(resource_builder, runner_builder)
1751 output_resource_director(resource_builder, runner_builder)
1752 resources = resource_builder.build()
1753 runner = runner_builder.build()
1756 Only the final line is intended to be literal. The preceding code, if it
1757 exists in entirety, may be spread across several code comments.
1759 TODO: Data should be pushed, not pulled.
1760 Early implementations executed operation code and extracted results directly.
1761 While we need to be able to "wait for" results and alert the data provider that
1762 we are ready for input, we want to defer execution management and data flow to
1767 def __publishing_context(self, ensemble_member=0) -> typing.Iterator[_PublishingDataProxyType]:
1768 """Get a context manager for resolving the data dependencies of this node.
1770 The returned object is a Python context manager (used to open a `with` block)
1771 to define the scope in which the operation's output can be published.
1772 'output' type resources can be published exactly once, and only while the
1773 publishing context is active. (See operation.function_wrapper())
1775 Used internally to implement ResourceManager.publishing_resources()
1777 Responsibilities of the context manager are to:
1778 * (TODO) Make sure dependencies are resolved.
1779 * Make sure outputs are marked 'done' when leaving the context.
1784 # if self._data.done():
1785 # raise exceptions.ProtocolError('Resources have already been published.')
1787 # I don't think we want the OperationDetails to need to know about ensemble data,
1788 # (though the should probably be allowed to), so we may need a separate interface
1789 # for the resource manager with built-in scope-limiting to a single ensemble member.
1790 # Right now, one Operation handle owns one ResourceManager (which takes care of
1791 # the ensemble details), which owns one OperationDetails (which has no ensemble knowledge).
1792 # It is the responsibility of the calling code to make sure the PublishingDataProxy
1793 # gets used correctly.
1795 # ref: https://docs.python.org/3/library/contextlib.html#contextlib.contextmanager
1796 if self._done[ensemble_member]:
1797 raise exceptions.ProtocolError('Attempting to publish {}[{}] more than once.'.format(self.operation_id, ensemble_member))
1800 resource = self.__publishing_data_proxy(instance=weakref.proxy(self),
1801 client_id=ensemble_member)
1802 except Exception as e:
1803 logger.debug('Publishing context could not be created due to {}'.format(e))
1807 # Note: The remaining lines are skipped if an exception occurs in the `with` block
1808 # for the contextmanager suite, which effectively raises at the line after 'yield'.
1809 logger.debug('Published output for {} member {}'.format(self.operation_id, ensemble_member))
1810 self._done[ensemble_member] = True
1812 def __init__(self, *,
1815 output_description: OutputCollectionDescription,
1816 output_data_proxy: typing.Type[_OutputDataProxyType],
1817 publishing_data_proxy: typing.Type[_PublishingDataProxyType],
1820 output_context: 'Context'):
1821 """Initialize a resource manager for the inputs and outputs of an operation.
1823 # Note: This implementation assumes there is one ResourceManager instance per data source,
1824 # so we only stash the inputs and dependency information for a single set of resources.
1825 # TODO: validate input_fingerprint as its interface becomes clear.
1826 self._input_edge = source
1827 self.ensemble_width = self._input_edge.sink_terminal.ensemble_width
1830 self.operation_id = operation_id
1832 if isinstance(output_context, Context):
1833 self._output_context = output_context
1835 message = 'Provide an instance of gmxapi.operation.Context for output_context'
1836 raise exceptions.UsageError(message)
1837 assert self._output_context is not None
1839 self._output_data_proxy = output_data_proxy
1840 assert self._output_data_proxy is not None
1841 assert callable(self._output_data_proxy)
1843 self._output_description = output_description
1844 assert self._output_description is not None
1846 self.__publishing_data_proxy = publishing_data_proxy
1847 assert self.__publishing_data_proxy is not None
1848 assert callable(self.__publishing_data_proxy)
1850 self._runner_director = runner_director
1851 assert self._runner_director is not None
1852 self._resource_factory = resource_factory
1853 assert self._resource_factory is not None
1855 self._data = _make_datastore(output_description=self._output_description,
1856 ensemble_width=self.ensemble_width)
1858 # We store a rereference to the publishing context manager implementation
1859 # in a data structure that can only produce one per Python interpreter
1860 # (using list.pop()).
1861 # TODO: reimplement as a data descriptor
1862 # so that PublishingDataProxy does not need a bound circular reference.
1863 self.__publishing_resources = [self.__publishing_context]
1865 self._done = [False] * self.ensemble_width
1866 self.__operation_entrance_counter = 0
1868 def width(self) -> int:
1869 return self.ensemble_width
1872 self.__operation_entrance_counter = 0
1873 self._done = [False] * self.ensemble_width
1874 self.__publishing_resources = [self.__publishing_context]
1875 for data in self._data.values():
1877 self._input_edge.reset()
1878 assert self.__operation_entrance_counter == 0
1880 def done(self, member=None):
1882 return all(self._done)
1884 return self._done[member]
1886 def set_result(self, name, value, member: int):
1887 if not isinstance(value, (str, bytes)):
1890 # In this specification, it is antithetical to publish Futures.
1891 if hasattr(item, 'result'):
1892 raise exceptions.ApiError('Operation produced Future instead of real output.')
1894 # Ignore when `item` is not iterable.
1896 self._data[name].set(value=value, member=member)
1898 def is_done(self, name):
1899 return self._data[name].done
1901 def get(self, name: str):
1904 Raises exceptions.ProtocolError if requested data is not local yet.
1905 Raises exceptions.ValueError if data is requested for an unknown name.
1907 if name not in self._data:
1908 raise exceptions.ValueError('Request for unknown data.')
1909 if not self.is_done(name):
1910 raise exceptions.ProtocolError('Data not ready.')
1911 assert isinstance(self._data[name], OutputData)
1912 return self._data[name]
1914 # TODO: Normalize. This is the no-argument, no-return callable member of an
1915 # operation handle that dispatches to another Context (the operation implementation).
1916 # TODO: Allow update of a single ensemble member. As written, this always updates all ensemble members. That can
1917 # be the default behavior, but we don't want to require non-local updates in all cases.
1918 def update_output(self):
1919 """Bring the output of the bound operation up to date.
1921 Execute the bound operation once if and only if it has not
1922 yet been run in the lifetime of this resource manager.
1924 Used internally to implement Futures for the local operation
1925 associated with this resource manager.
1928 exceptions.ApiError if operation runner fails to publish output.
1930 TODO: More comprehensive error handling for operations that fail to execute.
1932 TODO: We need a different implementation for an operation whose output
1933 is served by multiple resource managers. E.g. an operation whose output
1934 is available across the ensemble, but which should only be executed on
1935 a single ensemble member.
1937 # This code is not intended to be reentrant. We make a modest attempt to
1938 # catch unexpected reentrance, but this is not (yet) intended to be a thread-safe
1939 # resource manager implementation.
1940 # TODO: Handle checking just the ensemble members this resource manager is responsible for.
1941 # TODO: Replace with a managed observer pattern. Update once when input is available in the Context.
1943 # Note: This check could also be encapsulated in a run_once decorator that
1944 # could even set a data descriptor to change behavior.
1945 self.__operation_entrance_counter += 1
1946 if self.__operation_entrance_counter > 1:
1947 raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
1949 # Note! This is a detail of the ResourceManager in a SerialContext
1950 # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
1951 # operation's library/implementation context.
1952 publishing_resources = self.publishing_resources()
1953 for i in range(self.ensemble_width):
1954 # TODO: rewrite the following expression as a call to a resource factory.
1955 # TODO: Consider whether the resource_factory behavior should be normalized
1956 # to always use `with` blocks to indicate the lifetime of a resource handle.
1957 # That implies that an operation handle can expire, but the operation handle
1958 # could be "yield"ed
1959 # from within the `with` block to keep the resource scope alive until the resulting
1960 # generator is exhausted. Not sure what that looks like or what the use case would be.
1961 with self.local_input(i) as input:
1962 # Note: Resources are marked "done" by the publishing system
1963 # before the following context manager finishes exiting.
1964 with publishing_resources(ensemble_member=i) as output:
1965 # self._runner(*input.args, output=output, **input.kwargs)
1967 # Here we can make _runner a thing that accepts session resources, and
1968 # is created by specializable builders. Separate out the expression of
1971 # resource_builder = OperationDetails.ResourcesBuilder(context)
1972 # runner_builder = OperationDetails.RunnerBuilder(context)
1973 # input_resource_director = self._input_resource_factory.director(input)
1974 # output_resource_director = self._publishing_resource_factory.director(output)
1975 # input_resource_director(resource_builder, runner_builder)
1976 # output_resource_director(resource_builder, runner_builder)
1977 # resources = resource_builder.build()
1978 # runner = runner_builder.build()
1981 # This resource factory signature might need to be inverted or broken up
1982 # into a builder for consistency. I.e.
1983 # option 1: Make the input and output resources with separate factories and add_resource on
1984 # the runner builder.
1985 # option 2: Pass resource_builder to input_director and then output_director.
1986 error_message = 'Got {} while executing {} for operation {}.'
1988 resources = self._resource_factory(input=input, output=output)
1989 except exceptions.TypeError as e:
1990 message = error_message.format(e, self._resource_factory, self.operation_id)
1991 raise exceptions.ApiError(message) from e
1993 runner = self._runner_director(resources)
1996 except Exception as e:
1997 message = error_message.format(e, runner, self.operation_id)
1998 raise exceptions.ApiError(message) from e
2000 message = 'update_output implementation failed to update all outputs for {}.'
2001 message = message.format(self.operation_id)
2002 raise exceptions.ApiError(message)
2004 def future(self, name: str, description: ResultDescription):
2005 """Retrieve a Future for a named output.
2007 Provide a description of the expected result to check for compatibility or
2008 implicit topological conversion.
2010 TODO: (FR5+) Normalize this part of the interface between operation definitions and
2013 if not isinstance(name, str) or name not in self._data:
2014 raise exceptions.ValueError('"name" argument must name an output.')
2015 assert description is not None
2016 requested_dtype = description.dtype
2017 available_dtype = self._data[name]._description.dtype
2018 if requested_dtype != available_dtype:
2019 # TODO: framework to check for implicit conversions
2020 message = 'Requested Future of type {} is not compatible with available type {}.'
2021 message = message.format(requested_dtype, available_dtype)
2022 raise exceptions.ApiError(message)
2023 return Future(self, name, description)
2025 def data(self) -> _OutputDataProxyType:
2026 """Get an adapter to the output resources to access results."""
2027 return self._output_data_proxy(self)
2030 def local_input(self, member: int = None):
2031 """In an API session, get a handle to fully resolved locally available input data.
2033 Execution dependencies are resolved on creation of the context manager. Input data
2034 becomes available in the ``as`` object when entering the context manager, which
2035 becomes invalid after exiting the context manager. Resources allocated to hold the
2036 input data may be released when exiting the context manager.
2038 It is left as an implementation detail whether the context manager is reusable and
2039 under what circumstances one may be obtained.
2042 kwargs = self._input_edge.sink(node=member)
2043 assert 'input' not in kwargs
2045 # Check that we have real data
2046 for key, value in kwargs.items():
2047 assert not hasattr(value, 'result')
2048 assert not hasattr(value, 'run')
2050 if isinstance(value, list):
2052 if isinstance(value, datamodel.NDArray):
2053 value_list = value._values
2054 if isinstance(value, collections.abc.Mapping):
2055 value_list = value.values()
2056 assert not isinstance(value_list, Future)
2057 assert not hasattr(value_list, 'result')
2058 assert not hasattr(value_list, 'run')
2059 for item in value_list:
2060 assert not hasattr(item, 'result')
2062 input_pack = InputPack(kwargs=kwargs)
2064 # Prepare input data structure
2065 # Note: we use 'yield' instead of 'return' for the protocol expected by
2066 # the @contextmanager decorator
2069 def publishing_resources(self):
2070 """Get a context manager for resolving the data dependencies of this node.
2072 Use the returned object as a Python context manager.
2073 'output' type resources can be published exactly once, and only while the
2074 publishing context is active.
2076 Write access to publishing resources can be granted exactly once during the
2077 resource manager lifetime and conveys exclusive access.
2079 return self.__publishing_resources.pop()
2082 class PyFunctionRunnerResources(collections.UserDict):
2083 """Runtime resources for Python functions.
2085 Produced by a ResourceDirector for a particular Operation.
2089 if 'output' in self:
2090 return self['output']
2095 return {key: value for key, value in self.items() if key != 'output'}
2098 class PyFunctionRunner(abc.ABC):
2099 def __init__(self, *, function: typing.Callable, output_description: OutputCollectionDescription):
2100 assert callable(function)
2101 self.function = function
2102 self.output_description = output_description
2105 def __call__(self, resources: PyFunctionRunnerResources):
2106 self.function(output=resources.output(), **resources.input())
2109 class CapturedOutputRunner(PyFunctionRunner):
2110 """Function runner that captures return value as output.data"""
2112 def __call__(self, resources: PyFunctionRunnerResources):
2113 resources['output'].data = self.function(**resources.input())
2116 class OutputParameterRunner(PyFunctionRunner):
2117 """Function runner that uses output parameter to let function publish output."""
2119 def __call__(self, resources: PyFunctionRunnerResources):
2120 self.function(**resources)
2123 def wrapped_function_runner(function, output_description: OutputCollectionDescription = None) -> PyFunctionRunner:
2124 """Get an adapter for a function to be wrapped.
2126 If the function does not accept a publishing data proxy as an `output`
2127 key word argument, the returned object has a `capture_output` attribute that
2128 must be re-assigned by the calling code before calling the runner. `capture_output`
2129 must be assigned to be a callable that will receive the output of the wrapped
2133 Callable with a signature `__call__(*args, **kwargs)` and no return value
2136 OperationDetails.resource_director assigns the `capture_output` member of the returned object.
2138 assert callable(function)
2139 signature = inspect.signature(function)
2141 # Implementation note: this function dispatches an implementation with the
2142 # logic below. A better factoring would be a "chain of responsibility" in
2143 # which the concrete Runners would be tried in sequence and determine internally
2144 # whether to create a runner, raise an error, or defer.
2146 # Determine output details for proper dispatching.
2147 # First check for signature with output parameter.
2148 # TODO FR4: standardize typing
2149 if 'output' in signature.parameters:
2150 if not isinstance(output_description, OutputCollectionDescription):
2151 if not isinstance(output_description, collections.abc.Mapping):
2152 raise exceptions.UsageError(
2153 'Function passes output through call argument, but output is not described.')
2154 return OutputParameterRunner(
2156 output_description=OutputCollectionDescription(**output_description))
2158 return OutputParameterRunner(function=function,
2159 output_description=output_description)
2160 # Next try output_description parameter or function return annotation.
2162 if isinstance(output_description, OutputCollectionDescription):
2163 return_type = output_description['data'].gmxapi_datatype
2164 elif output_description is not None:
2165 # output_description should be None for inferred output or
2166 # a singular mapping of the key 'data' to a gmxapi type.
2167 if not isinstance(output_description, collections.abc.Mapping) \
2168 or set(output_description.keys()) != {'data'}:
2169 raise exceptions.ApiError(
2170 'invalid output description for wrapped function: {}'.format(output_description))
2171 if signature.return_annotation != signature.empty:
2172 if signature.return_annotation != output_description['data']:
2173 raise exceptions.ApiError(
2174 'Wrapped function with return-value-capture provided with non-matching output description.')
2175 return_type = output_description['data']
2177 # Use return type inferred from function signature.
2178 return_type = signature.return_annotation
2179 if return_type == signature.empty or return_type is None:
2180 raise exceptions.ApiError('No return annotation or output_description for {}'.format(function))
2181 return CapturedOutputRunner(function=function,
2182 output_description=OutputCollectionDescription(data=return_type))
2185 # TODO: Refactor in terms of reference to a node in a Context.
2186 # ResourceManager is an implementation detail of how the Context
2188 class OperationHandle(AbstractOperation[_OutputDataProxyType]):
2189 """Generic Operation handle for dynamically defined operations.
2191 Define a gmxapi Operation for the functionality being wrapped by the enclosing code.
2193 An Operation type definition encapsulates description of allowed inputs
2194 of an Operation. An Operation instance represents a node in a work graph
2195 with uniquely fingerprinted inputs and well-defined output. The implementation
2196 of the operation is a collaboration with the resource managers resolving
2197 data flow for output Futures, which may depend on the execution context.
2200 def __init__(self, resource_manager: SourceResource[_OutputDataProxyType, typing.Any]):
2201 """Initialization defines the unique input requirements of a work graph node.
2203 Initialization parameters map to the parameters of the wrapped function with
2204 addition(s) to support gmxapi data flow and deferred execution.
2206 If provided, an ``input`` keyword argument is interpreted as a parameter pack
2207 of base input. Inputs also present as standalone keyword arguments override
2208 values in ``input``.
2210 Inputs that are handles to gmxapi operations or outputs induce data flow
2211 dependencies that the framework promises to satisfy before the Operation
2212 executes and produces output.
2214 # TODO: When the resource manager can be kept alive by an enclosing or
2215 # module-level Context, convert to a weakref.
2216 self.__resource_manager = resource_manager
2217 # The unique identifier for the operation node allows the Context implementation
2218 # to manage the state of the handle. Reproducibility of node_uid is TBD, but
2219 # it must be unique in a Context where it references a different operation node.
2220 self.node_uid = None
2223 def output(self) -> _OutputDataProxyType:
2224 # TODO: We can configure `output` as a data descriptor
2225 # instead of a property so that we can get more information
2226 # from the class attribute before creating an instance of OperationDetails.OutputDataProxy.
2227 # The C++ equivalence would probably be a templated free function for examining traits.
2228 return self.__resource_manager.data()
2231 """Make a single attempt to resolve data flow conditions.
2233 This is a public method, but should not need to be called by users. Instead,
2234 just use the `output` data proxy for result handles, or force data flow to be
2235 resolved with the `result` methods on the result handles.
2237 `run()` may be useful to try to trigger computation (such as for remotely
2238 dispatched work) without retrieving results locally right away.
2240 `run()` is also useful internally as a facade to the Context implementation details
2241 that allow `result()` calls to ask for their data dependencies to be resolved.
2242 Typically, `run()` will cause results to be published to subscribing operations as
2243 they are calculated, so the `run()` hook allows execution dependency to be slightly
2244 decoupled from data dependency, as well as to allow some optimizations or to allow
2245 data flow to be resolved opportunistically. `result()` should not call `run()`
2246 directly, but should cause the resource manager / Context implementation to process
2247 the data flow graph.
2249 In one conception, `run()` can have a return value that supports control flow
2250 by itself being either runnable or not. The idea would be to support
2251 fault tolerance, implementations that require multiple iterations / triggers
2252 to complete, or looping operations.
2254 # Note: `run()` is a synonym for `resolve` or `update` or whatever we choose
2255 # to generically describe the request to bring a node up-to-date: i.e. the
2256 # non-returning callable on the object produced by a director.
2257 self.__resource_manager.update_output()
2260 class OperationPlaceholder(AbstractOperation):
2261 """Placeholder for Operation handle during subgraph definition."""
2263 def __init__(self, subgraph_resource_manager):
2267 raise exceptions.UsageError('This placeholder operation handle is not in an executable context.')
2271 """Allow subgraph components to be connected without instantiating actual operations."""
2272 if not isinstance(current_context(), SubgraphContext):
2273 raise exceptions.UsageError('Invalid access to subgraph internals.')
2276 _HandleType = typing.TypeVar('_HandleType', bound=gmx.abc.OperationReference)
2279 class NodeBuilder(gmx.abc.NodeBuilder):
2280 """Add an operation node to be managed by a Context.
2282 The NodeBuilder interface implies minimal internal logic, and instead
2283 specifies the set of information that must or may be provided to construct
2290 label: typing.Optional[str] = None):
2291 """Initialize the base NodeBuilder for gmxapi.operation module Nodes.
2294 Convert the *operation* argument to be the registrant in the Operation registry.
2295 Requires confirmation of conforming behavior for dynamically defined operations.
2298 self.context = context
2301 key = _make_registry_key(operation)
2302 except Exception as e:
2303 error = 'Could not create an operation registry key from {}'.format(operation)
2304 raise exceptions.ValueError(error) from e
2306 # TODO: sensibly handle dynamically defined operations.
2307 if key not in _operation_registry and not issubclass(operation, OperationDetailsBase):
2308 error = '{} must be initialized with a registered operation. Got {}.'
2309 raise exceptions.ValueError(error.format(__class__.__qualname__, operation))
2310 self.sources = DataSourceCollection()
2311 self._input_description = None
2312 self._resource_factory = None
2313 self._runner_director = None
2315 self._output_factory = None
2317 self._resource_manager = ResourceManager
2319 def set_input_description(self, input_description: InputDescription):
2320 self._input_description = input_description
2322 def set_resource_factory(self, factory):
2323 self._resource_factory = factory
2325 def set_runner_director(self, factory):
2326 self._runner_director = factory
2328 def set_handle(self, factory):
2329 self._handle = factory
2331 def set_output_factory(self, factory: 'OutputFactory'):
2332 self._output_factory = factory
2334 def set_resource_manager(self, implementation: typing.Type[ResourceManager]):
2335 """Allow overriding the default ResourceManager implementation.
2337 This is a workaround until we figure out what parts of the ResourceManager
2338 could be composed, registered separately with the Context, or be customized
2339 through other dispatching. One likely part of the solution is for clients
2340 of the NodeBuilder to assert requirements of the Context.
2342 assert issubclass(implementation, ResourceManager)
2343 self._resource_manager = implementation
2345 # TODO: Let the Context use the handle factory to produce a dynamically typed handle,
2346 # and figure out the right way to annotate the return value.
2348 """Create node and return a handle of the appropriate type."""
2350 # Check for the ability to instantiate operations.
2351 missing_details = list()
2352 for builder_resource in ['input_description',
2357 detail = '_' + builder_resource
2358 if getattr(self, detail, None) is None:
2359 missing_details.append(builder_resource)
2360 if len(missing_details) > 0:
2361 raise exceptions.UsageError(
2362 'Missing details needed for operation node: {}'.format(
2363 ', '.join(missing_details)
2366 assert hasattr(self._input_description, 'signature')
2367 input_sink = SinkTerminal(self._input_description.signature())
2368 input_sink.update(self.sources)
2369 logger.debug('SinkTerminal configured: {}'.format(SinkTerminal))
2370 edge = DataEdge(self.sources, input_sink)
2371 logger.debug('Created data edge {} with Sink {}'.format(edge, edge.sink_terminal))
2372 # TODO: Fingerprinting: Each operation instance has unique output based on the unique input.
2373 # input_data_fingerprint = edge.fingerprint()
2375 # Set up output proxy.
2376 assert hasattr(self._input_description, 'make_uid')
2377 uid = self._input_description.make_uid(edge)
2378 # TODO: ResourceManager should fetch the relevant factories from the Context
2379 # instead of getting an OperationDetails instance.
2380 output_data_proxy = self._output_factory.output_proxy()
2381 output_description = self._output_factory.output_description()
2382 publishing_data_proxy = self._output_factory.publishing_data_proxy()
2383 manager = self._resource_manager(output_context=self.context,
2386 output_data_proxy=output_data_proxy,
2387 output_description=output_description,
2388 publishing_data_proxy=publishing_data_proxy,
2389 resource_factory=self._resource_factory,
2390 runner_director=self._runner_director)
2391 self.context.work_graph[uid] = manager
2392 # TODO: Replace with a call to Node.handle()
2393 handle = self._handle(self.context.work_graph[uid])
2394 handle.node_uid = uid
2397 def add_input(self, name, source):
2398 # TODO: We can move some input checking here as the data model matures.
2399 self.sources[name] = source
2402 class InputPack(object):
2403 """Input container for data sources provided to resource factories.
2405 When gmxapi.operation Contexts provide run time inputs to operations,
2406 instances of this class are provided to the operation's registered
2410 kwargs (dict): collection of named data sources.
2413 def __init__(self, kwargs: typing.Mapping[str, SourceTypeVar]):
2414 self.kwargs = kwargs
2417 class Context(gmx.abc.Context):
2420 All gmxapi data and operations are owned by a Context instance. The Context
2421 manages the details of how work is run and how data is managed.
2423 Context implementations are not required to inherit from gmxapi.context.Context,
2424 but this class definition serves to specify the current Context API.
2426 If subclassing is used to implement new Contexts, be sure to initialize the
2427 base class when providing a new __init__
2430 def node(self, node_id) -> Node:
2431 if node_id in self.labels:
2432 return self.labels[node_id]
2433 elif node_id in self.work_graph:
2434 return self.work_graph[node_id]
2436 raise exceptions.ValueError('Could not find a node identified by {}'.format(node_id))
2439 self.operations = dict()
2440 self.labels = dict()
2441 self.work_graph = collections.OrderedDict()
2444 def node_builder(self, *, operation,
2445 label: typing.Optional[str] = None) -> NodeBuilder:
2446 """Get a builder for a new work graph node.
2448 Nodes are elements of computational work, with resources and execution
2449 managed by the Context. The Context handles parallelism resources, data
2450 placement, work scheduling, and data flow / execution dependencies.
2452 This method is used by Operation director code and helper functions to
2453 add work to the graph.
2456 operation: a registered gmxapi operation
2457 label: optional user-provided identifier to provide human-readable node locators.
2461 # TODO: *node()* accessor.
2462 # @abc.abstractmethod
2463 # def node(self, node_identifier) -> AbstractOperation:
2467 class ModuleNodeBuilder(NodeBuilder):
2468 """Builder for work nodes in gmxapi.operation.ModuleContext."""
2471 class ModuleContext(Context):
2472 """Context implementation for the gmxapi.operation module.
2477 def node_builder(self, operation, label=None) -> NodeBuilder:
2478 """Get a builder for a new work node to add an operation in this context."""
2479 if label is not None:
2480 if label in self.labels:
2481 raise exceptions.ValueError('Label {} is already in use.'.format(label))
2483 # The builder should update the labeled node when it is done.
2484 self.labels[label] = None
2486 return ModuleNodeBuilder(context=weakref.proxy(self), operation=operation, label=label)
2490 __current_context = [ModuleContext()]
2493 def current_context() -> Context:
2494 """Get a reference to the currently active Context.
2496 The imported gmxapi.context module maintains some state for the convenience
2497 of the scripting environment. Internally, all gmxapi activity occurs under
2498 the management of an API Context, explicitly or implicitly. Some actions or
2499 code constructs will generate separate contexts or sub-contexts. This utility
2500 command retrieves a reference to the currently active Context.
2502 return __current_context[-1]
2505 def push_context(context) -> Context:
2506 """Enter a sub-context by pushing a context to the global context stack.
2508 __current_context.append(context)
2509 return current_context()
2512 def pop_context() -> Context:
2513 """Exit the current Context by popping it from the stack."""
2514 return __current_context.pop()
2517 class OutputFactory(object):
2518 """Encapsulate the details of Operation output implementation in the gmxapi.operation Context.
2520 Currently, OutputFactory objects are containers that compose functionality
2521 with which to implement the required internal interface.
2524 def __init__(self, *,
2525 output_proxy: typing.Callable[[SourceResource], _OutputDataProxyType],
2526 output_description: OutputCollectionDescription,
2527 publishing_data_proxy: typing.Callable[[SourceResource, ClientID], _PublishingDataProxyType]):
2528 """Package the output details for an operation.
2531 output_proxy: factory to produce the *output* facet of an operation instance (node)
2532 output_description: fully formed output description
2533 publishing_data_proxy: factory to produce the run time output publishing resources
2536 if not callable(output_proxy):
2537 raise exceptions.ValueError('output_proxy argument must be a callable.')
2538 if not callable(publishing_data_proxy):
2539 raise exceptions.ValueError('publishing_data_proxy argument must be a callable.')
2540 if not isinstance(output_description, OutputCollectionDescription):
2541 raise exceptions.ValueError('output_description must be an instance of '
2542 'gmxapi.operation.OutputCollectionDescription')
2543 self._output_proxy = output_proxy
2544 self._output_description = output_description
2545 self._publishing_data_proxy = publishing_data_proxy
2547 def output_proxy(self) -> typing.Callable[[_OutputDataProxyType, SourceResource], _OutputDataProxyType]:
2548 return self._output_proxy
2550 def output_description(self) -> OutputCollectionDescription:
2551 return self._output_description
2553 def publishing_data_proxy(self) -> typing.Callable[[SourceResource, ClientID],
2554 _PublishingDataProxyType]:
2555 return self._publishing_data_proxy
2558 # TODO: Refactor in terms of gmx.abc.OperationDirector[_Op, gmx.operation.Context]
2559 # Normalizing this OperationDirector may require other updates to the function_wrapper facilities.
2560 class OperationDirector(object):
2561 """Direct the construction of an operation node in the gmxapi.operation module Context.
2563 Collaboration: used by OperationDetails.operation_director, which
2564 will likely dispatch to different implementations depending on
2565 requirements of work or context.
2570 operation_details: typing.Type[OperationDetailsBase],
2574 self.operation_details = operation_details
2575 self.context = weakref.proxy(context)
2577 self.kwargs = kwargs
2580 def __call__(self) -> AbstractOperation:
2581 builder = self.context.node_builder(operation=self.operation_details, label=self.label)
2583 builder.set_resource_factory(self.operation_details.resource_director)
2584 builder.set_input_description(self.operation_details)
2585 builder.set_handle(OperationHandle)
2587 operation_details = self.operation_details()
2588 node_input_factory = operation_details.signature().bind
2589 data_source_collection = node_input_factory(*self.args, **self.kwargs)
2590 for name, source in data_source_collection.items():
2591 builder.add_input(name, source)
2593 def runner_director(resources):
2595 operation_details(resources)
2598 builder.set_runner_director(runner_director)
2599 # Note that the call-backs held by OutputFactory cannot be annotated with
2600 # key word arguments under PEP 484, producing a weak warning in some cases.
2601 # We can consider more in the future how to balance call-back simplicity,
2602 # type annotation, and key word explicitness in helper functions like these.
2603 output_factory = OutputFactory(output_description=operation_details.output_description(),
2604 output_proxy=operation_details.output_data_proxy,
2605 publishing_data_proxy=operation_details.publishing_data_proxy)
2606 builder.set_output_factory(output_factory)
2608 handle = builder.build()
2612 def _make_datastore(output_description: OutputCollectionDescription, ensemble_width: int):
2613 """Create the data store for an operation with the described output.
2615 Create a container to hold the resources for an operation node.
2616 Used internally by the resource manager when setting up the node.
2617 Evolution of the C++ framework for creating the Operation SessionResources
2618 object will inform the future of this and the resource_director method, but
2619 this data store is how the Context manages output data sources for resources
2623 datastore = collections.OrderedDict()
2624 for name, dtype in output_description.items():
2625 assert isinstance(dtype, type)
2626 result_description = ResultDescription(dtype=dtype, width=ensemble_width)
2627 datastore[name] = OutputData(name=name, description=result_description)
2631 # TODO: For outputs, distinguish between "results" and "events".
2632 # Both are published to the resource manager in the same way, but the relationship
2633 # with subscribers is potentially different.
2634 def function_wrapper(output: dict = None):
2635 # Suppress warnings in the example code.
2636 # noinspection PyUnresolvedReferences
2637 """Generate a decorator for wrapped functions with signature manipulation.
2639 New function accepts the same arguments, with additional arguments required by
2642 The new function returns an object with an ``output`` attribute containing the named outputs.
2646 >>> @function_wrapper(output={'spam': str, 'foo': str})
2647 ... def myfunc(parameter: str = None, output=None):
2648 ... output.spam = parameter
2649 ... output.foo = parameter + ' ' + parameter
2651 >>> operation1 = myfunc(parameter='spam spam')
2652 >>> assert operation1.output.spam.result() == 'spam spam'
2653 >>> assert operation1.output.foo.result() == 'spam spam spam spam'
2656 output (dict): output names and types
2658 If ``output`` is provided to the wrapper, a data structure will be passed to
2659 the wrapped functions with the named attributes so that the function can easily
2660 publish multiple named results. Otherwise, the ``output`` of the generated operation
2661 will just capture the return value of the wrapped function.
2663 .. todo:: gmxapi typing stub file(s).
2664 The way this wrapper uses parameter annotations is not completely
2665 compatible with static type checking (PEP 484). If we decide to
2666 keep the convenience functionality by which operation details are
2667 inferred from parameter annotations, we should provide a separate
2668 stub file (.pyi) to support static type checking of the API.
2671 if output is not None and not isinstance(output, collections.abc.Mapping):
2672 raise exceptions.TypeError('If provided, `output` argument must be a mapping of data names to types.')
2674 # TODO: (FR5+) gmxapi operations need to allow a context-dependent way to generate an implementation with input.
2675 # This function wrapper reproduces the wrapped function's kwargs, but does not allow chaining a
2676 # dynamic `input` kwarg and does not dispatch according to a `context` kwarg. We should allow
2677 # a default implementation and registration of alternate implementations. We don't have to do that
2678 # with functools.singledispatch, but we could, if we add yet another layer to generate a wrapper
2679 # that takes the context as the first argument. (`singledispatch` inspects the first argument rather
2680 # that a named argument)
2682 # Implementation note: The closure of the current function is used to
2683 # dynamically define several classes that support the operation to be
2684 # created by the returned decorator.
2686 def decorator(function) -> typing.Callable:
2687 # Explicitly capture `function` and `output` references.
2688 provided_output_map = output
2690 # Note: Allow operations to be defined entirely in template headers to facilitate
2691 # compile-time optimization of fused operations. Consider what distinction, if any,
2692 # exists between a fused operation and a more basic operation. Probably it amounts
2693 # to aspects related to interaction with the Context that get combined in a fused
2694 # operation, such as the resource director, builder, etc.
2695 class OperationDetails(OperationDetailsBase):
2696 # Warning: function.__qualname__ is not rigorous since function may be in a local scope.
2697 # TODO: Improve base identifier.
2698 # Suggest registering directly in the Context instead of in this local class definition.
2699 __basename = '.'.join((str(function.__module__), function.__qualname__))
2701 _input_signature_description = InputCollectionDescription.from_function(function)
2702 # TODO: Separate the class and instance logic for the runner.
2703 # Logically, the runner is a detail of a context-specific implementation class,
2704 # though the output is not generally fully knowable until an instance is initialized
2705 # for a certain input fingerprint.
2706 # Note: We are almost at a point where this class can be subsumed into two
2707 # possible return types for wrapped_function_runner, acting as an operation helper.
2708 _runner = wrapped_function_runner(function, provided_output_map)
2709 _output_description = _runner.output_description
2710 _output_data_proxy_type = define_output_data_proxy(_output_description)
2711 _publishing_data_proxy_type = define_publishing_data_proxy(_output_description)
2712 _SourceResource = SourceResource[_output_data_proxy_type, _publishing_data_proxy_type]
2715 def name(cls) -> str:
2716 return cls.__basename.split('.')[-1]
2719 def namespace(cls) -> str:
2720 return cls.__basename.rstrip('.' + cls.name())
2723 def director(cls, context: _Context):
2724 return cls.operation_director
2727 def signature(cls) -> InputCollectionDescription:
2728 """Mapping of named inputs and input type.
2730 Used to determine valid inputs before an Operation node is created.
2732 Overrides OperationDetailsBase.signature() to provide an
2733 implementation for the bound operation.
2735 return cls._input_signature_description
2737 def output_description(self) -> OutputCollectionDescription:
2738 """Mapping of available outputs and types for an existing Operation node.
2740 Overrides OperationDetailsBase.output_description() to provide an
2741 implementation for the bound operation.
2743 return self._output_description
2745 def publishing_data_proxy(self, *,
2746 instance: _SourceResource,
2748 ) -> _publishing_data_proxy_type:
2749 """Factory for Operation output publishing resources.
2751 Used internally when the operation is run with resources provided by instance.
2753 Overrides OperationDetailsBase.publishing_data_proxy() to provide an
2754 implementation for the bound operation.
2756 assert isinstance(instance, ResourceManager)
2757 return self._publishing_data_proxy_type(instance=instance, client_id=client_id)
2759 def output_data_proxy(self, instance: _SourceResource) -> _output_data_proxy_type:
2760 assert isinstance(instance, ResourceManager)
2761 return self._output_data_proxy_type(instance=instance)
2763 def __call__(self, resources: PyFunctionRunnerResources):
2764 """Execute the operation with provided resources.
2766 Resources are prepared in an execution context with aid of resource_director()
2768 After the first call, output data has been published and is trivially
2769 available through the output_data_proxy()
2771 Overrides OperationDetailsBase.__call__().
2773 self._runner(resources)
2776 def make_uid(cls, input):
2777 """The unique identity of an operation node tags the output with respect to the input.
2779 Combines information on the Operation details and the input to uniquely
2780 identify the Operation node.
2783 input : A (collection of) data source(s) that can provide Fingerprints.
2785 Used internally by the Context to manage ownership of data sources, to
2786 locate resources for nodes in work graphs, and to manage serialization,
2787 deserialization, and checkpointing of the work graph.
2789 The UID is a detail of the generic Operation that _should_ be independent
2790 of the Context details to allow the framework to manage when and where
2791 an operation is executed.
2793 Design notes on further refinement:
2794 TODO: Operations should not single-handedly determine their own uniqueness
2795 (but they should participate in the determination with the Context).
2797 Context implementations should be allowed to optimize handling of
2798 equivalent operations in different sessions or work graphs, but we do not
2799 yet TODO: guarantee that UIDs are globally unique!
2801 The UID should uniquely indicate an operation node based on that node's input.
2802 We need input fingerprinting to identify equivalent nodes in a work graph
2803 or distinguish nodes across work graphs.
2806 uid = str(cls.__basename) + str(cls.__last_uid)
2811 def resource_director(cls, *, input=None,
2812 output: _publishing_data_proxy_type = None) -> PyFunctionRunnerResources:
2813 """a Director factory that helps build the Session Resources for the function.
2815 The Session launcher provides the director with all of the resources previously
2816 requested/negotiated/registered by the Operation. The director uses details of
2817 the operation to build the resources object required by the operation runner.
2819 For the Python Context, the protocol is for the Context to call the
2820 resource_director instance method, passing input and output containers.
2823 exceptions.TypeError if provided resource type does not match input signature.
2825 resources = PyFunctionRunnerResources()
2826 resources.update(input.kwargs)
2827 resources.update({'output': output})
2829 # TODO: Remove this hack when we can better handle Futures of Containers and Future slicing.
2830 for name in resources:
2831 if isinstance(resources[name], (list, tuple)):
2832 resources[name] = datamodel.ndarray(resources[name])
2834 # Check data compatibility
2835 for name, value in resources.items():
2836 if name != 'output':
2837 expected = cls.signature()[name]
2840 raise exceptions.TypeError(
2841 'Expected {} but got {} for {} resource {}.'.format(expected,
2847 # TODO: (FR4) Update annotations with gmxapi data types. E.g. return -> Future.
2848 @functools.wraps(function)
2849 def helper(*args, context=None, **kwargs):
2850 # Description of the Operation input (and output) occurs in the
2851 # decorator closure. By the time this factory is (dynamically) defined,
2852 # the OperationDetails and ResourceManager are well defined, but not
2854 # Inspection of the offered input occurs when this factory is called,
2855 # and OperationDetails, ResourceManager, and Operation are instantiated.
2857 # This operation factory is specialized for the default package Context.
2859 context = current_context()
2861 raise exceptions.ApiError('Non-default context handling not implemented.')
2863 # This calls a dispatching function that may not be able to reconcile the input
2864 # and Context capabilities. This is the place to handle various exceptions for
2865 # whatever reasons this reconciliation cannot occur.
2866 handle = OperationDetails.operation_director(*args, context=context, label=None, **kwargs)
2868 # TODO: NOW: The input fingerprint describes the provided input
2869 # as (a) ensemble input, (b) static, (c) future. By the time the
2870 # operation is instantiated, the topology of the node is known.
2871 # When compared to the InputCollectionDescription, the data compatibility
2872 # can be determined.
2876 # to do: The factory itself needs to be able to register a factory with
2877 # the context that will be responsible for the Operation handle.
2878 # The factories need to be able to serve as dispatchers for themselves,
2879 # since an operation in one context may need to be reconstituted in a
2880 # different context.
2881 # The dispatching factory produces a director for a Context,
2882 # which will register a factory with the operation in that context.
2884 # The factory function has a DirectorFactory. Director instances talk to a NodeBuilder for a Context to
2885 # get handles to new operation nodes managed by the context. Part of that process includes registering
2886 # a DirectorFactory with the Context.
2892 class GraphVariableDescriptor(object):
2893 def __init__(self, name: str = None, dtype=None, default=None):
2896 self.default = default
2900 def internal_name(self):
2902 return '_' + self.name
2906 def __get__(self, instance, owner):
2907 if instance is None:
2908 # Access is through the class attribute of the owning class.
2909 # Allows the descriptor itself to be inspected or reconfigured after
2911 # TODO: There is probably some usage checking that can be performed here.
2914 value = getattr(instance, self.internal_name)
2915 except AttributeError:
2916 value = self.default
2917 # Lazily initialize the instance value from the class default.
2918 if value is not None:
2920 setattr(instance, self.internal_name, value)
2921 except Exception as e:
2922 message = 'Could not assign default value to {} attribute of {}'.format(
2925 raise exceptions.ApiError(message) from e
2928 # Implementation note: If we have a version of the descriptor class with no `__set__` method,
2929 # it is a non-data descriptor that can be overridden by a data descriptor on an instance.
2930 # This could be one way to handle the polymorphism associated with state changes.
2931 def __set__(self, instance, value):
2932 if instance._editing:
2933 # Update the internal connections defining the subgraph.
2934 setattr(instance, self.internal_name, value)
2936 raise AttributeError('{} not assignable on {}'.format(self.name, instance))
2939 class GraphMeta(type):
2940 """Meta-class for gmxapi data flow graphs and subgraphs.
2942 Used to implement ``subgraph`` as GraphMeta.__new__(...).
2943 Also allows subgraphs to be defined as Python class definitions by inheriting
2944 from Subgraph or by using the ``metaclass=GraphMeta`` hint in the class
2945 statement arguments.
2947 The Python class creation protocol allows Subgraphs to be defined in as follows.
2949 See the Subgraph class documentation for customization of instances through
2950 the Python context manager protocol.
2952 _prepare_keywords = ('variables',)
2954 # TODO: Python 3.7.2 introduces typing.OrderedDict
2955 # In practice, we are using collections.OrderedDict, but we should use the generic
2956 # ABC from the typing module to avoid being overly restrictive with type hints.
2958 from typing import OrderedDict
2960 from collections import OrderedDict
2963 def __prepare__(mcs, name, bases, variables: OrderedDict = None, **kwargs):
2964 """Prepare the class namespace.
2967 variables: mapping of persistent graph variables to type / default value (optional)
2969 # Python runs this before executing the class body of Subgraph or its
2970 # subclasses. This is our chance to handle key word arguments given in the
2971 # class declaration.
2973 if kwargs is not None:
2974 for keyword in kwargs:
2975 raise exceptions.UsageError('Unexpected key word argument: {}'.format(keyword))
2977 namespace = collections.OrderedDict()
2979 if variables is not None:
2980 if isinstance(variables, collections.abc.Mapping):
2981 for name, value in variables.items():
2982 if isinstance(value, type):
2984 if hasattr(value, 'default'):
2985 default = value.default
2990 if hasattr(default, 'dtype'):
2991 dtype = default.dtype
2993 dtype = type(default)
2994 namespace[name] = GraphVariableDescriptor(name, default=default, dtype=dtype)
2995 # Note: we are not currently using the hook used by `inspect`
2996 # to annotate with the class that defined the attribute.
2997 # namespace[name].__objclass__ = mcs
2998 assert not hasattr(namespace[name], '__objclass__')
3000 raise exceptions.ValueError('"variables" must be a mapping of graph variables to types or defaults.')
3004 def __new__(cls, name, bases, namespace, **kwargs):
3006 if key not in GraphMeta._prepare_keywords:
3007 raise exceptions.ApiError('Unexpected class creation keyword: {}'.format(key))
3008 return type.__new__(cls, name, bases, namespace)
3010 # TODO: This is keyword argument stripping is not necessary in more recent Python versions.
3011 # When Python minimum required version is increased, check if we can remove this.
3012 def __init__(cls, name, bases, namespace, **kwargs):
3014 if key not in GraphMeta._prepare_keywords:
3015 raise exceptions.ApiError('Unexpected class initialization keyword: {}'.format(key))
3016 super().__init__(name, bases, namespace)
3019 class SubgraphNodeBuilder(NodeBuilder):
3022 context: 'SubgraphContext',
3024 label: typing.Optional[str] = None):
3025 super().__init__(context, operation, label)
3027 def add_input(self, name: str, source):
3028 """Add an input resource for the Node under construction.
3030 Extends NodeBuilder.add_input()
3033 # * Are they from outside the subgraph?
3034 # * Subgraph variables?
3035 # * Subgraph internal nodes?
3036 # Inputs from outside the subgraph are (provisionally) subgraph inputs.
3037 # Inputs that are subgraph variables or subgraph internal nodes mark operations that will need to be re-run.
3038 # For first implementation, let all operations be recreated, but we need to
3039 # manage the right input proxies.
3040 # For zeroeth implementation, try just tracking the entities that need a reset() method called.
3041 assert isinstance(self.context, SubgraphContext)
3042 if hasattr(source, 'reset'):
3043 self.context.add_resetter(source.reset)
3044 elif hasattr(source, '_reset'):
3045 self.context.add_resetter(source._reset)
3046 super().add_input(name, source)
3048 def build(self) -> OperationPlaceholder:
3049 """Get a reference to the internal node in the subgraph definition.
3051 In the SubgraphContext, these handles cannot represent uniquely identifiable
3052 results. They are placeholders for relative positions in graphs that are
3053 not defined until the subgraph is being executed.
3055 Such references should be tracked and invalidated when exiting the
3056 subgraph context. Within the subgraph context, they are used to establish
3057 the recipe for updating the subgraph's outputs or persistent data during
3060 # Placeholder handles in the subgraph definition don't have real resource managers.
3061 # Check for the ability to instantiate operations.
3062 handle = super().build()
3063 # handle = OperationPlaceholder()
3064 return typing.cast(OperationPlaceholder, handle)
3067 class SubgraphContext(Context):
3068 """Provide a Python context manager in which to set up a graph of operations.
3070 Allows operations to be configured without adding nodes to the global execution
3076 self.resetters = set()
3078 def node_builder(self, operation, label=None) -> NodeBuilder:
3079 if label is not None:
3080 if label in self.labels:
3081 raise exceptions.ValueError('Label {} is already in use.'.format(label))
3083 # The builder should update the labeled node when it is done.
3084 self.labels[label] = None
3086 return SubgraphNodeBuilder(context=weakref.proxy(self), operation=operation, label=label)
3088 def add_resetter(self, function):
3089 assert callable(function)
3090 self.resetters.add(function)
3093 class Subgraph(object, metaclass=GraphMeta):
3096 When subclassing from Subgraph, aspects of the subgraph's data ports can be
3097 specified with keyword arguments in the class statement. Example::
3099 >>> class MySubgraph(Subgraph, variables={'int_with_default': 1, 'boolData': bool}): pass
3102 The key word *variables* is used in the class declaration to map the types
3103 of subgraph Variables with (optional) default values.
3106 Subgraph execution must follow a well-defined protocol in order to sensibly
3107 resolve data Futures at predictable points. Note that subgraphs act as operations
3108 that can be automatically redefined at run time (in limited cases), such as
3109 to automatically form iteration chains to implement "while" loops. We refer
3110 to one copy / iteration / generation of a subgraph as an "element" below.
3112 When a subgraph element begins execution, each of its variables with an
3113 "updated" state from the previous iteration has the "updated" state moved
3114 to the new element's "initial" state, and the "updated" state is voided.
3116 Subgraph.next() appends an element of the subgraph to the chain. Subsequent
3117 calls to Subgraph.run() bring the new outputs up to date (and then call next()).
3118 Thus, for a subgraph with an output called ``is_converged``, calling
3119 ``while (not subgraph.is_converged): subgraph.run()`` has the desired effect,
3120 but is likely suboptimal for execution. Instead use the gmxapi while_loop.
3122 If the subgraph is not currently executing, it can be in one of two states:
3123 "editing" or "ready". In the "ready" state, class and instance Variables
3124 cannot be assigned, but can be read through the data descriptors. In this
3125 state, the descriptors only have a single state.
3127 If "editing," variables on the class object can be assigned to update the
3128 data flow defined for the subgraph. While "editing", reading or writing
3129 instance Variables is undefined behavior.
3131 When "editing" begins, each variable is readable as a proxy to the "initial"
3132 state in an element. Assignments while "editing" put variables in temporary
3133 states accessible only during editing.
3134 When "editing" finishes, the "updated" output data source of each
3135 variable for the element is set, if appropriate.
3137 # TODO: Use a get_context to allow operation factories or accessors to mark
3138 # references for update/annotation when exiting the 'with' block.
3142 class SubgraphBuilder(object):
3143 """Helper for defining new Subgraphs.
3145 Manages a Python context in which to define a new Subgraph type. Can be used
3146 in a Python ``with`` construct exactly once to provide the Subgraph class body.
3147 When the ``with`` block is exited (or ``build()`` is called explicitly), a
3148 new type instance becomes available. Subsequent calls to SubgraphBuilder.__call__(self, ...)
3149 are dispatched to the Subgraph constructor.
3151 Outside of the ``with`` block, read access to data members is proxied to
3152 descriptors on the built Subgraph.
3154 Instances of SubgraphBuilder are returned by the ``subgraph()`` utility function.
3157 def __init__(self, variables):
3158 self.__dict__.update({'variables': variables,
3159 '_staging': collections.OrderedDict(),
3161 '_subgraph_context': None,
3162 '_subgraph_instance': None,
3163 '_fused_operation': None,
3165 # Return a placeholder that we can update during iteration.
3166 # Long term, this is probably implemented with data descriptors
3167 # that will be moved to a new Subgraph type object.
3168 for name in self.variables:
3169 if not isinstance(self.variables[name], Future):
3170 self.variables[name] = gmx.make_constant(self.variables[name])
3172 # class MySubgraph(Subgraph, variables=variables):
3175 # self._subgraph_instance = MySubgraph()
3177 def __getattr__(self, item):
3179 if item in self.variables:
3180 if item in self._staging:
3181 logger.debug('Read access to intermediate value of subgraph variable {}'.format(item))
3182 return self._staging[item]
3184 logger.debug('Read access to subgraph variable {}'.format(item))
3185 return self.variables[item]
3187 raise AttributeError('Invalid attribute: {}'.format(item))
3189 # TODO: this is not quite the described interface...
3190 return lambda obj: obj.values[item]
3192 def __setattr__(self, key, value):
3193 """Part of the builder interface."""
3194 if key in self.__dict__:
3195 self.__dict__[key] = value
3198 self.add_update(key, value)
3200 raise exceptions.UsageError('Subgraph is not in an editable state.')
3202 def add_update(self, key, value):
3203 """Add a variable update to the internal subgraph."""
3204 if key not in self.variables:
3205 raise AttributeError('No such attribute: {}'.format(key))
3206 if not self._editing:
3207 raise exceptions.UsageError('Subgraph is not in an editable state.')
3208 # Else, stage the potential final value for the iteration.
3209 logger.debug('Staging subgraph update {} = {}'.format(key, value))
3210 # Return a placeholder that we can update during iteration.
3211 # Long term, this is probably implemented with data descriptors
3212 # that will be moved to a new Subgraph type object.
3213 if not isinstance(value, Future):
3214 value = gmx.make_constant(value)
3215 self._staging[key] = value
3216 self._staging.move_to_end(key)
3218 def __enter__(self):
3219 """Enter a Context managed by the subgraph to capture operation additions.
3221 Allows the internal data flow of the subgraph to be defined in the same
3222 manner as the default work graph while the Python context manager is active.
3224 The subgraph Context is activated when entering a ``with`` block and
3225 finalized at the end of the block.
3227 # While editing the subgraph in the SubgraphContext, we need __get__ and __set__
3228 # data descriptor access on the Subgraph subclass type, but outside of the
3229 # context manager, the descriptor should be non-writable and more opaque,
3230 # while the instance should be readable, but not writable.
3231 self.__dict__['_editing'] = True
3232 # TODO: this probably needs to be configured with variables...
3233 self.__dict__['_subgraph_context'] = SubgraphContext()
3234 push_context(self._subgraph_context)
3238 """Build the subgraph by defining some new operations.
3240 Examine the subgraph variables. Variables with handles to data sources
3241 in the SubgraphContext represent work that needs to be executed on
3242 a subgraph execution or iteration. Variables with handles to data sources
3243 outside of the subgraph represent work that needs to be executed once
3244 on only the first iteration to initialize the subgraph.
3246 Construct a factory for the fused operation that performs the work to
3247 update the variables on a single iteration.
3249 Construct a factory for the fused operation that performs the work to
3250 update the variables on subsequent iterations and which will be fed
3251 the outputs of a previous iteration. Both of the generated operations
3252 have the same output signature.
3254 Construct and wrap the generator function to recursively append work
3255 to the graph and update until condition is satisfied.
3257 TODO: Explore how to drop work from the graph once there are no more
3258 references to its output, including check-point machinery.
3260 logger.debug('Finalizing subgraph definition.')
3261 inputs = collections.OrderedDict()
3262 for key, value in self.variables.items():
3263 # TODO: What if we don't want to provide default values?
3266 updates = self._staging
3268 class Subgraph(object):
3269 def __init__(self, input_futures, update_sources):
3270 self.values = collections.OrderedDict([(key, value.result()) for key, value in input_futures.items()])
3271 logger.debug('subgraph initialized with {}'.format(
3272 ', '.join(['{}: {}'.format(key, value) for key, value in self.values.items()])))
3273 self.futures = collections.OrderedDict([(key, value) for key, value in input_futures.items()])
3274 self.update_sources = collections.OrderedDict([(key, value) for key, value in update_sources.items()])
3275 logger.debug('Subgraph updates staged:')
3276 for update, source in self.update_sources.items():
3277 logger.debug(' {} = {}'.format(update, source))
3280 for name in self.update_sources:
3281 result = self.update_sources[name].result()
3282 logger.debug('Update: {} = {}'.format(name, result))
3283 self.values[name] = result
3284 # Replace the data sources in the futures.
3285 for name in self.update_sources:
3286 self.futures[name].resource_manager = gmx.make_constant(self.values[name]).resource_manager
3287 for name in self.update_sources:
3288 self.update_sources[name]._reset()
3290 subgraph = Subgraph(inputs, updates)
3292 return lambda subgraph=subgraph: subgraph
3294 def __exit__(self, exc_type, exc_val, exc_tb):
3295 """End the Subgraph editing session and finalize the Subgraph build.
3297 After exiting, this instance forwards __call__() to a factory for an
3298 operation that carries out the work in the subgraph with inputs bound
3299 in the current context as defined by ``variables``.
3301 self._factory = self.build()
3303 context = pop_context()
3304 assert context is self._subgraph_context
3305 self.__dict__['_editing'] = False
3306 # Returning False causes exceptions in the `with` block to be reraised.
3307 # Remember to switch this to return True if we want to transform or suppress
3308 # such an exception (we probably do).
3309 if exc_type is not None:
3310 logger.error('Got exception {} while editing subgraph {}.'.format(exc_val, self))
3311 logger.debug('Subgraph exception traceback: \n{}'.format(exc_tb))
3315 # TODO: After build() has been called, this should dispatch to a factory
3316 # that returns an OperationHandle.
3317 return self._factory()
3320 def while_loop(*, operation, condition, max_iteration=10):
3321 """Generate and run a chain of operations such that condition evaluates True.
3323 Returns and operation instance that acts like a single node in the current
3324 work graph, but which is a proxy to the operation at the end of a dynamically generated chain
3325 of operations. At run time, condition is evaluated for the last element in
3326 the current chain. If condition evaluates False, the chain is extended and
3327 the next element is executed. When condition evaluates True, the object
3328 returned by ``while_loop`` becomes a proxy for the last element in the chain.
3330 Equivalent to calling operation.while(condition), where available.
3333 operation: a callable that produces an instance of an operation when called with no arguments.
3334 condition: a callable that accepts an object (returned by ``operation``) that returns a boolean.
3335 max_iteration: execute the loop no more than this many times (default 10)
3338 *max_iteration* is provided in part to minimize the cost of bugs in early
3339 versions of this software. The default value may be changed or
3340 removed on short notice.
3343 The protocol by which ``while_loop`` interacts with ``operation`` and ``condition``
3344 is very unstable right now. Please refer to this documentation when installing new
3345 versions of the package.
3349 This protocol will be changed before the 0.1 API is finalized.
3351 When called, ``while_loop`` calls ``operation`` without arguments
3352 and captures the return value captured as ``_operation``.
3353 The object produced by ``operation()`` must have a ``reset``,
3354 a ``run`` method, and an ``output`` attribute.
3357 to determine the output data proxy for the operation produced by the call
3358 to ``while_loop``. When that operation is called, it does the equivalent of
3360 while(condition(self._operation)):
3361 self._operation.reset()
3362 self._operation.run()
3364 Then, the output data proxy of ``self`` is updated with the results from
3365 self._operation.output.
3368 # In the first implementation, Subgraph is NOT and OperationHandle.
3369 # if not isinstance(obj, AbstractOperationHandle):
3370 # raise exceptions.UsageError(
3371 # '"operation" key word argument must be a callable that produces an Operation handle.')
3373 # for name, descriptor in obj.output.items():
3374 # outputs[name] = descriptor._dtype
3376 # 1. Get the initial inputs.
3377 # 2. Initialize the subgraph with the initial inputs.
3378 # 3. Run the subgraph.
3379 # 4. Get the outputs.
3380 # 5. Initialize the subgraph with the outputs.
3381 # 6. Go to 3 if condition is not met.
3384 assert hasattr(obj, 'values')
3385 outputs = collections.OrderedDict([(key, type(value)) for key, value in obj.values.items()])
3387 @function_wrapper(output=outputs)
3388 def run_loop(output: OutputCollectionDescription):
3391 logger.debug('Created object {}'.format(obj))
3392 logger.debug(', '.join(['{}: {}'.format(key, obj.values[key]) for key in obj.values]))
3393 logger.debug('Condition: {}'.format(condition(obj)))
3394 while (condition(obj)):
3395 logger.debug('Running iteration {}'.format(iteration))
3398 ', '.join(['{}: {}'.format(key, obj.values[key]) for key in obj.values]))
3399 logger.debug('Condition: {}'.format(condition(obj)))
3401 if iteration > max_iteration:
3403 for name in outputs:
3404 setattr(output, name, obj.values[name])
3411 def subgraph(variables=None):
3412 """Allow operations to be configured in a sub-context.
3414 The object returned functions as a Python context manager. When entering the
3415 context manager (the beginning of the ``with`` block), the object has an
3416 attribute for each of the named ``variables``. Reading from these variables
3417 gets a proxy for the initial value or its update from a previous loop iteration.
3418 At the end of the ``with`` block, any values or data flows assigned to these
3419 attributes become the output for an iteration.
3421 After leaving the ``with`` block, the variables are no longer assignable, but
3422 can be called as bound methods to get the current value of a variable.
3424 When the object is run, operations bound to the variables are ``reset`` and
3425 run to update the variables.
3427 # Implementation note:
3428 # A Subgraph (type) has a subgraph context associated with it. The subgraph's
3429 # ability to capture operation additions is implemented in terms of the
3431 logger.debug('Declare a new subgraph with variables {}'.format(variables))
3433 return SubgraphBuilder(variables)
3437 def join_arrays(*, front: datamodel.NDArray = (), back: datamodel.NDArray = ()) -> datamodel.NDArray:
3438 """Operation that consumes two sequences and produces a concatenated single sequence.
3440 Note that the exact signature of the operation is not determined until this
3441 helper is called. Helper functions may dispatch to factories for different
3442 operations based on the inputs. In this case, the dtype and shape of the
3443 inputs determines dtype and shape of the output. An operation instance must
3444 have strongly typed output, but the input must be strongly typed on an
3445 object definition so that a Context can make runtime decisions about
3446 dispatching work and data before instantiating.
3447 # TODO: elaborate and clarify.
3448 # TODO: check type and shape.
3449 # TODO: figure out a better annotation.
3451 # TODO: (FR4) Returned list should be an NDArray.
3452 if isinstance(front, (str, bytes)) or isinstance(back, (str, bytes)):
3453 raise exceptions.ValueError('Input must be a pair of lists.')
3454 assert isinstance(front, datamodel.NDArray)
3455 assert isinstance(back, datamodel.NDArray)
3456 new_list = list(front._values)
3457 new_list.extend(back._values)
3458 return datamodel.NDArray(new_list)
3462 Scalar = typing.TypeVar('Scalar')
3465 def concatenate_lists(sublists: list = ()) -> _Future[gmx.datamodel.NDArray]:
3466 """Combine data sources into a single list.
3468 A trivial data flow restructuring operation.
3470 if isinstance(sublists, (str, bytes)):
3471 raise exceptions.ValueError('Input must be a list of lists.')
3472 if len(sublists) == 0:
3473 return datamodel.ndarray([])
3475 # TODO: Fix the data model so that this can type-check properly.
3476 return join_arrays(front=sublists[0],
3477 back=typing.cast(datamodel.NDArray,
3478 concatenate_lists(sublists[1:])))
3481 def make_constant(value: Scalar) -> _Future:
3482 """Provide a predetermined value at run time.
3484 This is a trivial operation that provides a (typed) value, primarily for
3485 internally use to manage gmxapi data flow.
3487 Accepts a value of any type. The object returned has a definite type and
3488 provides same interface as other gmxapi outputs. Additional constraints or
3489 guarantees on data type may appear in future versions.
3492 source = StaticSourceManager(name='data', proxied_data=value, width=1, function=lambda x: x)
3493 description = ResultDescription(dtype=dtype, width=1)
3494 future = Future(source, 'data', description=description)
3498 def logical_not(value: bool) -> _Future:
3499 """Boolean negation.
3501 If the argument is a gmxapi compatible Data or Future object, a new View or
3502 Future is created that proxies the boolean opposite of the input.
3504 If the argument is a callable, logical_not returns a wrapper function that
3505 returns a Future for the logical opposite of the callable's result.
3507 # TODO: Small data transformations like this don't need to be formal Operations.
3508 # This could be essentially a data annotation that affects the resolver in a
3509 # DataEdge. As an API detail, coding for different Contexts and optimizations
3510 # within those Context implementations could be simplified.
3511 operation = function_wrapper(output={'data': bool})(lambda data=bool(): not bool(data))
3512 return operation(data=value).output.data