2 # This file is part of the GROMACS molecular simulation package.
4 # Copyright (c) 2019, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 # Lesser General Public License for more details.
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
35 """Define gmxapi-compliant Operations
37 Provide decorators and base classes to generate and validate gmxapi Operations.
39 Nodes in a work graph are created as instances of Operations. An Operation factory
40 accepts well-defined inputs as key word arguments. The object returned by such
41 a factory is a handle to the node in the work graph. It's ``output`` attribute
42 is a collection of the Operation's results.
44 function_wrapper(...) produces a wrapper that converts a function to an Operation
45 factory. The Operation is defined when the wrapper is called. The Operation is
46 instantiated when the factory is called. The function is executed when the Operation
49 The framework ensures that an Operation instance is executed no more than once.
52 __all__ = ['computed_result',
62 from contextlib import contextmanager
65 from gmxapi import datamodel
66 from gmxapi import exceptions
67 from gmxapi import logger as root_logger
68 from gmxapi.abc import OperationImplementation, MutableResource, Node
69 from gmxapi.typing import _Context, ResultTypeVar, SourceTypeVar, valid_result_types, valid_source_types
70 from gmxapi.typing import Future as _Future
72 # Initialize module-level logger
73 logger = root_logger.getChild('operation')
74 logger.info('Importing {}'.format(__name__))
77 class ResultDescription:
78 """Describe what will be returned when `result()` is called."""
80 def __init__(self, dtype=None, width=1):
81 assert isinstance(dtype, type)
82 assert issubclass(dtype, valid_result_types)
83 assert isinstance(width, int)
88 def dtype(self) -> type:
89 """node output type"""
93 def width(self) -> int:
98 return '{}(dtype={}, width={})'.format(self.__class__.__name__, self.dtype, self.width)
101 class OutputData(object):
102 """Encapsulate the description and storage of a data output."""
104 def __init__(self, name: str, description: ResultDescription):
107 assert isinstance(description, ResultDescription)
108 self._description = description
109 self._done = [False] * self._description.width
110 self._data = [None] * self._description.width
114 """The name of the published output."""
117 # TODO: Change to regular member function and add ensemble member arg.
120 """Ensemble completion status for this output."""
121 return all(self._done)
123 def data(self, member: int = None):
124 """Access the raw data for localized output for the ensemble or the specified member."""
126 raise exceptions.ApiError('Attempt to read before data has been published.')
127 if self._data is None or None in self._data:
128 raise exceptions.ApiError('Data marked "done" but contains null value.')
129 if member is not None:
130 return self._data[member]
134 def set(self, value, member: int):
135 """Set local data and mark as completed.
137 Used internally to maintain the data store.
139 if self._description.dtype == datamodel.NDArray:
140 self._data[member] = datamodel.ndarray(value)
142 self._data[member] = self._description.dtype(value)
143 self._done[member] = True
146 """Reinitialize the data store.
149 This is a workaround until the execution model is more developed.
152 Remove this method when all operation handles provide factories to
153 reinstantiate on new Contexts and/or with new inputs.
156 self._done = [False] * self._description.width
157 self._data = [None] * self._description.width
160 class EnsembleDataSource(gmx.abc.EnsembleDataSource):
161 """A single source of data with ensemble data flow annotations.
163 Note that data sources may be Futures.
166 def __init__(self, source=None, width=1, dtype=None):
171 def node(self, member: int):
172 return self.source[member]
175 protocols = ('reset', '_reset')
176 for protocol in protocols:
177 if hasattr(self.source, protocol):
178 getattr(self.source, protocol)()
181 class DataSourceCollection(collections.OrderedDict):
182 """Store and describe input data handles for an operation.
184 When created from InputCollectionDescription.bind(), the DataSourceCollection
185 has had default values inserted.
187 Note: DataSourceCollection is the input resource collection type for NodeBuilder.
188 To use with the NodeBuilder interface, first create the collection, then
189 iteratively add its resources.
191 TODO: We should probably normalize the collection aspect of input. Is this a
192 single input? Are all inputs added as collections? Should this be split
193 to a standard iterable? Does a resource factory always produce a mapping?
196 def __init__(self, **kwargs):
197 """Initialize from key/value pairs of named data sources.
199 Data sources may be any of the basic gmxapi data types, gmxapi Futures
200 of those types, or gmxapi ensemble data bundles of the above.
202 super(DataSourceCollection, self).__init__()
203 for name, value in kwargs.items():
206 def __setitem__(self, key: str, value: SourceTypeVar) -> None:
207 if not isinstance(key, str):
208 raise exceptions.TypeError('Data must be named with str type.')
209 # TODO: Encapsulate handling of proferred data sources to Context details.
210 # Preprocessed input should be self-describing gmxapi data types. Structured
211 # input must be recursively (depth-first) converted to gmxapi data types.
212 # TODO: Handle gmxapi Futures stored as dictionary elements!
213 if not isinstance(value, valid_source_types):
214 if isinstance(value, collections.abc.Iterable):
215 # Iterables here are treated as arrays, but we do not have a robust typing system.
216 # Warning: In the initial implementation, the iterable may contain Future objects.
217 # TODO: (#2993) Revisit as we sort out data shape and Future protocol.
219 value = datamodel.ndarray(value)
220 except (exceptions.ValueError, exceptions.TypeError) as e:
221 raise exceptions.TypeError('Iterable could not be converted to NDArray: {}'.format(value)) from e
222 elif hasattr(value, 'result'):
226 raise exceptions.ApiError('Cannot process data source {}'.format(value))
227 super().__setitem__(key, value)
230 """Reset all sources in the collection."""
231 for source in self.values():
232 if hasattr(source, 'reset'):
234 if hasattr(source, '_reset'):
238 """Provide some sort of unique identifier.
240 We need a more deterministic fingerprinting scheme with well-specified
241 uniqueness semantics, but right now we just need something reasonably
244 hashed_keys_and_values = tuple(hash(entity) for item in self.items() for entity in item)
245 return hash(hashed_keys_and_values)
248 def computed_result(function):
249 """Decorate a function to get a helper that produces an object with Result behavior.
251 When called, the new function produces an ImmediateResult object.
253 The new function has the same signature as the original function, but can accept
254 gmxapi data proxies, assuming the provided proxy objects represent types
255 compatible with the original signature.
257 Calls to `result()` return the value that `function` would return when executed
258 in the local context with the inputs fully resolved.
260 The API does not specify when input data dependencies will be resolved
261 or when the wrapped function will be executed. That is, ``@computed_result``
262 functions may force immediate resolution of data dependencies and/or may
263 be called more than once to satisfy dependent operation inputs.
266 # Attempt to inspect function signature. Introspection can fail, so we catch
267 # the various exceptions. We re-raise as ApiError, indicating a bug, because
268 # we should do more to handle this intelligently and provide better user
270 # TODO: Figure out what to do with exceptions where this introspection
271 # and rebinding won't work.
272 # ref: https://docs.python.org/3/library/inspect.html#introspecting-callables-with-the-signature-object
274 sig = inspect.signature(function)
275 except TypeError as T:
276 raise exceptions.ApiError('Can not inspect type of provided function argument.') from T
277 except ValueError as V:
278 raise exceptions.ApiError('Can not inspect provided function signature.') from V
280 wrapped_function = function_wrapper()(function)
282 @functools.wraps(function)
283 def new_function(*args, **kwargs):
284 # The signature of the new function will accept abstractions
285 # of whatever types it originally accepted. This wrapper must
286 # * Create a mapping to the original call signature from `input`
287 # * Add handling for typed abstractions in wrapper function.
288 # * Process arguments to the wrapper function into `input`
290 # 1. Inspect the return annotation to determine valid gmxapi type(s)
291 # 2. Generate a Result object advertising the correct type, bound to the
292 # Input and implementing function.
293 # 3. Transform the result() data to the correct type.
295 # TODO: (FR3+) create a serializable data structure for inputs discovered
296 # from function introspection.
298 for name, param in sig.parameters.items():
299 assert not param.kind == param.POSITIONAL_ONLY
300 bound_arguments = sig.bind(*args, **kwargs)
301 handle = wrapped_function(**bound_arguments.arguments)
302 output = handle.output
303 # TODO: Find a type hinting / generic way to assert output attributes.
309 class OutputCollectionDescription(collections.OrderedDict):
310 def __init__(self, **kwargs):
311 """Create the output description for an operation node from a dictionary of names and types."""
313 for name, flavor in kwargs.items():
314 if not isinstance(name, str):
315 raise exceptions.TypeError('Output descriptions are keyed by Python strings.')
316 # Multidimensional outputs are explicitly NDArray
317 if issubclass(flavor, (list, tuple)):
318 flavor = datamodel.NDArray
319 assert issubclass(flavor, valid_result_types)
320 outputs.append((name, flavor))
321 super().__init__(outputs)
324 class InputCollectionDescription(collections.OrderedDict):
325 """Describe acceptable inputs for an Operation.
327 Generally, an InputCollectionDescription is an aspect of the public API by
328 which an Operation expresses its possible inputs. This class includes details
329 of the Python package.
332 parameters : A sequence of named parameter descriptions.
334 Parameter descriptions are objects containing an `annotation` attribute
335 declaring the data type of the parameter and, optionally, a `default`
336 attribute declaring a default value for the parameter.
338 Instances can be used as an ordered map of parameter names to gmxapi data types.
340 Analogous to inspect.Signature, but generalized for gmxapi Operations.
341 Additional notable differences: typing is normalized at initialization, and
342 the bind() method does not return an object that can be directly used as
343 function input. The object returned from bind() is used to construct a data
344 graph Edge for subsequent execution.
347 def __init__(self, parameters: typing.Iterable[typing.Tuple[str, inspect.Parameter]]):
348 """Create the input description for an operation node from a dictionary of names and types."""
350 for name, param in parameters:
351 if not isinstance(name, str):
352 raise exceptions.TypeError('Input descriptions are keyed by Python strings.')
353 # Multidimensional inputs are explicitly NDArray
354 dtype = param.annotation
355 if issubclass(dtype, collections.abc.Iterable) \
356 and not issubclass(dtype, (str, bytes, collections.abc.Mapping)):
357 # TODO: we can relax this with some more input conditioning.
358 if dtype != datamodel.NDArray:
359 raise exceptions.UsageError(
360 'Cannot accept input type {}. Sequence type inputs must use NDArray.'.format(param))
361 assert issubclass(dtype, valid_result_types)
362 if hasattr(param, 'kind'):
363 disallowed = any([param.kind == param.POSITIONAL_ONLY,
364 param.kind == param.VAR_POSITIONAL,
365 param.kind == param.VAR_KEYWORD])
367 raise exceptions.ProtocolError(
368 'Cannot wrap function. Operations must have well-defined parameter names.')
371 kind = inspect.Parameter.POSITIONAL_OR_KEYWORD
372 if hasattr(param, 'default'):
373 default = param.default
375 default = inspect.Parameter.empty
376 inputs.append(inspect.Parameter(name, kind, default=default, annotation=dtype))
377 super().__init__([(input.name, input.annotation) for input in inputs])
378 self.signature = inspect.Signature(inputs)
381 def from_function(function):
382 """Inspect a function to be wrapped.
384 Used internally by gmxapi.operation.function_wrapper()
387 exceptions.ProtocolError if function signature cannot be determined to be valid.
390 InputCollectionDescription for the function input signature.
392 # First, inspect the function.
393 assert callable(function)
394 signature = inspect.signature(function)
395 # The function must have clear and static input schema
396 # Make sure that all parameters have clear names, whether or not they are used in a call.
397 for name, param in signature.parameters.items():
398 disallowed = any([param.kind == param.POSITIONAL_ONLY,
399 param.kind == param.VAR_POSITIONAL,
400 param.kind == param.VAR_KEYWORD])
402 raise exceptions.ProtocolError(
403 'Cannot wrap function. Operations must have well-defined parameter names.')
404 if param.name == 'input':
405 raise exceptions.ProtocolError('Function signature includes the (reserved) "input" keyword argument.')
406 description = collections.OrderedDict()
407 for param in signature.parameters.values():
408 if param.name == 'output':
409 # Wrapped functions may accept the output parameter to publish results, but
410 # that is not part of the Operation input signature.
412 if param.annotation == param.empty:
413 if param.default == param.empty or param.default is None:
414 raise exceptions.ProtocolError('Could not infer parameter type for {}'.format(param.name))
415 dtype = type(param.default)
416 if isinstance(dtype, collections.abc.Iterable) \
417 and not isinstance(dtype, (str, bytes, collections.abc.Mapping)):
418 dtype = datamodel.NDArray
420 dtype = param.annotation
421 description[param.name] = param.replace(annotation=dtype)
422 return InputCollectionDescription(description.items())
424 def bind(self, *args, **kwargs) -> DataSourceCollection:
425 """Create a compatible DataSourceCollection from provided arguments.
427 Pre-process input and function signature to get named input arguments.
429 This is a helper function to allow calling code to characterize the
430 arguments in a Python function call with hints from the factory that is
431 initializing an operation. Its most useful functionality is to allows a
432 factory to accept positional arguments where named inputs are usually
433 required. It also allows data sources to participate in multiple
434 DataSourceCollections with minimal constraints.
436 Note that the returned object has had data populated from any defaults
437 described in the InputCollectionDescription.
439 See wrapped_function_runner() and describe_function_input().
441 # For convenience, accept *args, but convert to **kwargs to pass to Operation.
442 # Factory accepts an unadvertised `input` keyword argument that is used as a default kwargs dict.
443 # If present, kwargs['input'] is treated as an input "pack" providing _default_ values.
444 input_kwargs = collections.OrderedDict()
445 if 'input' in kwargs:
446 provided_input = kwargs.pop('input')
447 if provided_input is not None:
448 input_kwargs.update(provided_input)
449 # `function` may accept an `output` keyword argument that should not be supplied to the factory.
450 for key, value in kwargs.items():
452 raise exceptions.UsageError('Invalid keyword argument: output (reserved).')
453 input_kwargs[key] = value
455 bound_arguments = self.signature.bind_partial(*args, **input_kwargs)
456 except TypeError as e:
457 raise exceptions.UsageError('Could not bind operation parameters to function signature.') from e
458 assert 'output' not in bound_arguments.arguments
459 bound_arguments.apply_defaults()
460 assert 'input' not in bound_arguments.arguments
461 input_kwargs = collections.OrderedDict([pair for pair in bound_arguments.arguments.items()])
462 if 'output' in input_kwargs:
463 input_kwargs.pop('output')
464 return DataSourceCollection(**input_kwargs)
467 class ProxyDataDescriptor(object):
468 """Base class for data descriptors used in DataProxyBase subclasses.
470 Subclasses should either not define __init__ or should call the base class
471 __init__ explicitly: super().__init__(self, name, dtype)
474 def __init__(self, name: str, dtype: ResultTypeVar = None):
476 # TODO: We should not allow dtype==None, but we currently have a weak data
477 # model that does not allow good support of structured Futures.
478 if dtype is not None:
479 assert isinstance(dtype, type)
480 assert issubclass(dtype, valid_result_types)
484 class DataProxyMeta(abc.ABCMeta):
485 # Key word arguments consumed by __prepare__
486 _prepare_keywords = ('descriptors',)
489 def __prepare__(mcs, name, bases, descriptors: collections.abc.Mapping = None):
490 """Allow dynamic sub-classing.
492 DataProxy class definitions are collections of data descriptors. This
493 class method allows subclasses to give the descriptor names and type(s)
494 in the class declaration as arguments instead of as class attribute
497 class MyProxy(DataProxyBase, descriptors={name: MyDescriptor() for name in datanames}): pass
500 If we are only using this metaclass for the __prepare__ hook by the
501 time we require Python >= 3.6, we could reimplement __prepare__ as
502 DataProxyBase.__init_subclass__ and remove this metaclass.
504 if descriptors is None:
506 elif isinstance(descriptors, tuple):
507 namespace = collections.OrderedDict([(d._name, d) for d in descriptors])
510 assert isinstance(descriptors, collections.abc.Mapping)
513 def __new__(cls, name, bases: typing.Iterable, namespace, **kwargs):
515 if key not in DataProxyMeta._prepare_keywords:
516 raise exceptions.ApiError('Unexpected class creation keyword: {}'.format(key))
517 # See note about DataProxyBase._reserved.
518 if '_reserved' not in namespace and not any(hasattr(base, '_reserved') for base in bases):
519 raise exceptions.ApiError(
520 'We currently expect DataProxy classes to provide a list of reserved attribute names.')
521 for key in namespace:
522 # Here we can check conformance with naming and typing rules.
523 assert isinstance(key, str)
524 if key.startswith('__'):
525 # Skip non-public attributes.
527 descriptor = namespace[key]
528 # The purpose of the current data proxies is to serve as a limited namespace
529 # containing only descriptors of a certain type. In the future, these proxies
530 # may be flattened into a facet of a richer OperationHandle class
531 # (this metaclass may become a decorator on an implementation class),
532 # but for now we check that the class is being used according to the
533 # documented framework. A nearer term update could be to restrict the
534 # type of the data descriptor:
535 # TODO: Use a member type of the derived cls (or a mix-in base) to specify a particular
536 # ProxyDataDescriptor subclass.
537 # Also, see note about DataProxyBase._reserved
538 if not isinstance(descriptor, ProxyDataDescriptor):
539 if key not in namespace['_reserved'] and not any(key in getattr(base, '_reserved') for base in
540 bases if hasattr(base, '_reserved')):
541 raise exceptions.ApiError('Unexpected data proxy attribute {}: {}'.format(key, repr(descriptor)))
543 assert isinstance(descriptor, ProxyDataDescriptor)
544 if not isinstance(descriptor._name, str) or descriptor._name == '':
545 descriptor._name = key
547 if descriptor._name != key:
548 raise exceptions.ApiError(
549 'Descriptor internal name {} does not match attribute name {}'.format(
550 descriptor._name, key))
551 return super().__new__(cls, name, bases, namespace)
553 # TODO: This keyword argument stripping is not necessary in more recent Python versions.
554 # When Python minimum required version is increased, check if we can remove this.
555 def __init__(cls, name, bases, namespace, **kwargs):
557 if key not in DataProxyMeta._prepare_keywords:
558 raise exceptions.ApiError('Unexpected class initialization keyword: {}'.format(key))
559 super().__init__(name, bases, namespace)
561 # TODO: See if we can use __dir__ in the metaclass to help hint class attributes for better tab completion.
562 # Ref: https://ipython.readthedocs.io/en/stable/config/integrating.html#tab-completion
563 # def __dir__(self) -> Iterable[str]:
564 # return super().__dir__()
567 class DataProxyBase(collections.abc.Mapping, metaclass=DataProxyMeta):
568 """Limited interface to managed resources.
570 Inherit from DataProxy to specialize an interface to an ``instance``.
571 In the derived class, either do not define ``__init__`` or be sure to
572 initialize the super class (DataProxy) with an instance of the object
575 A class deriving from DataProxyBase allows its instances to provide a namespace
576 for proxies to named data by defining attributes that are data descriptors
577 (subclasses of ProxyDataDescriptor).
578 The ProxyDataDescriptors are accessed as attributes of the
579 data proxy instance or by iterating on items(). Attributes that are not
580 ProxyDataDescriptors are possible, but will not be returned by items() which
581 is a necessary part of gmxapi execution protocol.
583 Acts as an owning handle to the resources provide by ``instance``,
584 preventing the reference count of ``instance`` from going to zero for the
585 lifetime of the proxy object.
587 When sub-classing DataProxyBase, data descriptors can be passed as a mapping
588 to the ``descriptors`` key word argument in the class declaration. This
589 allows data proxy subclasses to be easily defined dynamically.
591 mydescriptors = {'foo': Publisher('foo', int), 'data': Publisher('data', float)}
593 class MyDataProxy(DataProxyBase, descriptors=mydescriptors): pass
594 assert hasattr(MyDataProxy, 'foo')
597 # This class attribute (which subclasses are free to replace to augment) is an
598 # indication of a problem with the current data model. If we are allowing
599 # reserved words that would otherwise be valid data names, there is not a
600 # compelling reason for separate data proxy classes: we throw away the assertion
601 # that we are preparing a clean namespace and we could have accomplished the
602 # class responsibilities in the Operation handle with just descriptor classes.
603 # If we want the clean namespace, we should figure out how to keep this interface
604 # from growing and/or have some "hidden" internal interface.
605 _reserved = ('ensemble_width', 'items', '_reserved')
607 # This class can be expanded to be the attachment point for a metaclass for
608 # data proxies such as PublishingDataProxy or OutputDataProxy, which may be
609 # defined very dynamically and concisely as a set of Descriptors and a type()
611 # If development in this direction does not materialize, then this base
612 # class is not very useful and should be removed.
613 def __init__(self, instance: 'SourceResource', client_id: int = None):
614 """Get partial ownership of a resource provider.
617 instance : resource-owning object
618 client_id : identifier for client holding the resource handle (e.g. ensemble member id)
620 If client_id is not provided, the proxy scope is for all clients.
622 # TODO: Decide whether _resource_instance is public or not.
623 # Note: currently commonly needed for subclass implementations.
624 self._resource_instance = instance
625 # Developer note subclasses should handle self._client_identifier == None
626 self._client_identifier = client_id
627 # Collection is fixed by the time of instance creation, so cache it.
628 self.__keys = tuple([key for key, _ in self.items()])
629 self.__length = len(self.__keys)
632 def ensemble_width(self) -> int:
633 return self._resource_instance.width()
637 """Generator for tuples of attribute name and descriptor instance.
639 This almost certainly doesn't do quite what we want...
641 for name, value in cls.__dict__.items():
642 if isinstance(value, ProxyDataDescriptor):
645 def __getitem__(self, k):
647 return getattr(self, k)
653 for key in self.__keys:
657 class Publisher(ProxyDataDescriptor):
658 """Data descriptor for write access to a specific named data resource.
660 For a wrapped function receiving an ``output`` argument, provides the
661 accessors for an attribute on the object passed as ``output``. Maps
662 read and write access by the wrapped function to appropriate details of
663 the resource manager.
665 Used internally to implement settable attributes on PublishingDataProxy.
666 Allows PublishingDataProxy to be dynamically defined in the scope of the
667 operation.function_wrapper closure. Each named output is represented by
668 an instance of Publisher in the PublishingDataProxy class definition for
671 Ref: https://docs.python.org/3/reference/datamodel.html#implementing-descriptors
674 Relies on implementation details of ResourceManager.
677 def __get__(self, instance: DataProxyBase, owner):
679 # The current access has come through the class attribute of owner class
681 resource_manager = instance._resource_instance
682 client_id = instance._client_identifier
683 # TODO: Fix API scope.
684 # Either this class is a detail of the same implementation as ResourceManager,
685 # or we need to enforce that instance._resource_instance provides _data (or equivalent)
686 assert isinstance(resource_manager, ResourceManager)
687 if client_id is None:
688 return getattr(resource_manager._data, self._name)
690 return getattr(resource_manager._data, self._name)[client_id]
692 def __set__(self, instance: DataProxyBase, value):
693 resource_manager = instance._resource_instance
694 # TODO: Fix API scope.
695 # Either this class is a detail of the same implementation as ResourceManager,
696 # or we need to enforce that instance._resource_instance provides _data (or equivalent)
697 assert isinstance(resource_manager, ResourceManager)
698 client_id = instance._client_identifier
699 resource_manager.set_result(name=self._name, value=value, member=client_id)
702 return '{}(name={}, dtype={})'.format(self.__class__.__name__,
704 self._dtype.__qualname__)
707 def define_publishing_data_proxy(output_description) -> typing.Type[DataProxyBase]:
708 """Returns a class definition for a PublishingDataProxy for the provided output description."""
709 # This dynamic type creation hides collaborations with things like make_datastore.
710 # We should encapsulate these relationships in Context details, explicit collaborations
711 # between specific operations and Contexts, and in groups of Operation definition helpers.
713 descriptors = collections.OrderedDict([(name, Publisher(name)) for name in output_description])
715 class PublishingDataProxy(DataProxyBase, descriptors=descriptors):
716 """Handler for write access to the `output` of an operation.
718 Acts as a sort of PublisherCollection.
721 return PublishingDataProxy
724 # get symbols we can use to annotate input and output types more specifically.
725 _OutputDataProxyType = typing.TypeVar('_OutputDataProxyType', bound=DataProxyBase)
726 _PublishingDataProxyType = typing.TypeVar('_PublishingDataProxyType', bound=DataProxyBase)
727 # Currently, the ClientID type is an integer, but this may change.
728 ClientID = typing.NewType('ClientID', int)
731 class _Resources(typing.Generic[_PublishingDataProxyType]):
735 # TODO: Why generic in publishingdataproxytype?
736 class SourceResource(typing.Generic[_OutputDataProxyType, _PublishingDataProxyType]):
737 """Resource Manager for a data provider.
739 Supports Future instances in a particular context.
742 # Note: ResourceManager members not yet included:
743 # future(), _data, set_result.
745 # This might not belong here. Maybe separate out for a OperationHandleManager?
747 def data(self) -> _OutputDataProxyType:
748 """Get the output data proxy."""
749 # Warning: this should probably be renamed, but "output_data_proxy" is already
750 # a member in at least one derived class.
754 def is_done(self, name: str) -> bool:
758 def get(self, name: str) -> 'OutputData':
762 def update_output(self):
763 """Bring the _data member up to date and local."""
768 """Recursively reinitialize resources.
770 Set the resource manager to its initialized state.
771 All outputs are marked not "done".
772 All inputs supporting the interface have ``_reset()`` called on them.
776 def width(self) -> int:
777 """Ensemble width of the managed resources."""
781 def future(self, name: str, description: ResultDescription) -> 'Future':
782 """Get a Future handle for managed data.
784 Resource managers owned by subclasses of gmx.operation.Context provide
785 this method to get references to output data.
787 In addition to the interface described by gmx.abc.Future, returned objects
788 provide the interface described by gmx.operation.Future.
792 class StaticSourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]):
793 """Provide the resource manager interface for local static data.
795 Allow data transformations on the proxied resource.
798 proxied_data: A gmxapi supported data object.
799 width: Size of (one-dimensional) shaped data produced by function.
800 function: Transformation to perform on the managed data.
802 The callable passed as ``function`` must accept a single argument. The
803 argument will be an iterable when proxied_data represents an ensemble,
804 or an object of the same type as proxied_data otherwise.
807 def __init__(self, *, name: str, proxied_data, width: int, function: typing.Callable):
808 assert not isinstance(proxied_data, Future)
809 if hasattr(proxied_data, 'width'):
810 # Ensemble data source
811 assert hasattr(proxied_data, 'source')
812 self._result = function(proxied_data.source)
814 self._result = function(proxied_data)
816 if isinstance(self._result, (str, bytes)):
817 # In this case, do not implicitly broadcast
818 raise exceptions.ValueError('"function" produced data incompatible with "width".')
820 if not isinstance(self._result, collections.abc.Iterable):
821 raise exceptions.DataShapeError(
822 'Expected iterable of size {} but "function" result is not iterable.')
823 data = list(self._result)
825 if len(data) != width:
826 raise exceptions.DataShapeError(
827 'Expected iterable of size {} but "function" produced a {} of size {}'.format(width, type(data),
829 dtype = type(data[0])
832 raise exceptions.ValueError('width must be an integer 1 or greater.')
833 dtype = type(self._result)
834 if issubclass(dtype, (list, tuple)):
835 dtype = datamodel.NDArray
836 data = [datamodel.ndarray(self._result)]
837 elif isinstance(self._result, collections.abc.Iterable):
838 if not isinstance(self._result, (str, bytes, dict)):
839 raise exceptions.ValueError(
840 'Expecting width 1 but "function" produced iterable type {}.'.format(type(self._result)))
843 data = [str(self._result)]
845 data = [self._result]
846 description = ResultDescription(dtype=dtype, width=width)
847 self._data = OutputData(name=name, description=description)
848 for member in range(width):
849 self._data.set(data[member], member=member)
851 output_collection_description = OutputCollectionDescription(**{name: dtype})
852 self.output_data_proxy = define_output_data_proxy(output_description=output_collection_description)
854 def is_done(self, name: str) -> bool:
857 def get(self, name: str) -> 'OutputData':
858 assert self._data.name == name
861 def data(self) -> _OutputDataProxyType:
862 return self.output_data_proxy(self)
864 def width(self) -> int:
865 # TODO: It looks like the OutputData ResultDescription probably belongs
866 # in the public interface.
867 return self._data._description.width
869 def update_output(self):
875 def future(self, name: str, description: ResultDescription) -> 'Future':
876 return Future(self, name, description=description)
879 class ProxyResourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]):
880 """Act as a resource manager for data managed by another resource manager.
882 Allow data transformations on the proxied resource.
885 proxied_future: An object implementing the Future interface.
886 width: Size of (one-dimensional) shaped data produced by function.
887 function: Transformation to perform on the result of proxied_future.
889 The callable passed as ``function`` must accept a single argument, which will
890 be an iterable when proxied_future represents an ensemble, or an object of
891 type proxied_future.description.dtype otherwise.
894 def __init__(self, proxied_future: 'Future', width: int, function: typing.Callable):
896 self._proxied_future = proxied_future
898 self.name = self._proxied_future.name
900 assert callable(function)
901 self.function = function
903 def width(self) -> int:
908 self._proxied_future._reset()
911 def is_done(self, name: str) -> bool:
914 def get(self, name: str):
915 if name != self.name:
916 raise exceptions.ValueError('Request for unknown data.')
917 if not self.is_done(name):
918 raise exceptions.ProtocolError('Data not ready.')
919 result = self.function(self._result)
921 # TODO Fix this typing nightmare:
922 # ResultDescription should be fully knowable and defined when the resource manager is initialized.
923 data = OutputData(name=self.name, description=ResultDescription(dtype=type(result[0]), width=self._width))
924 for member, value in enumerate(result):
925 data.set(value, member)
927 data = OutputData(name=self.name, description=ResultDescription(dtype=type(result), width=self._width))
931 def update_output(self):
932 self._result = self._proxied_future.result()
935 def data(self) -> _OutputDataProxyType:
936 raise exceptions.ApiError('ProxyResourceManager cannot yet manage a full OutputDataProxy.')
938 def future(self, name: str, description: ResultDescription):
939 return Future(self, name, description=description)
942 class AbstractOperation(typing.Generic[_OutputDataProxyType]):
943 """Client interface to an operation instance (graph node).
945 Note that this is a generic abstract class. Subclasses should provide a
946 class subscript to help static type checkers.
951 """Assert execution of an operation.
953 After calling run(), the operation results are guaranteed to be available
954 in the local context.
959 def output(self) -> _OutputDataProxyType:
960 """Get a proxy collection to the output of the operation.
962 Developer note: The 'output' property exists to isolate the namespace of
963 output data from other operation handle attributes and we should consider
964 whether it is actually necessary or helpful. To facilitate its possible
965 future removal, do not enrich its interface beyond that of a collection
966 of OutputDescriptor attributes.
971 class OperationRegistryKey(collections.namedtuple('OperationRegistryKey', 'namespace name'), collections.abc.Hashable):
972 """Helper class to define the key type for OperationRegistry."""
975 return hash((self.namespace, self.name))
977 def __eq__(self, other):
978 """Test equivalence rather than identity.
980 Note: Use `is` to test identity.
982 return other.namespace == self.namespace and other.name == self.name
985 return '.'.join([self.namespace, self.name])
988 return '{}(namespace={}, name={})'.format(self.__qualname__, self.namespace, self.name)
991 def _make_registry_key(*args) -> OperationRegistryKey:
992 """Normalize input to an OperationRegistryKey.
994 Used to implement OperationRegistry.__getitem__(), which catches and converts
995 the various exceptions this helper function can produce.
998 return OperationRegistryKey(*args)
1001 raise exceptions.UsageError('Empty index value passed to OperationRegistry instance[].')
1003 if isinstance(item, OperationRegistryKey):
1005 if isinstance(item, str):
1006 namespace, name = item.rsplit(sep='.', maxsplit=1)
1007 return OperationRegistryKey(namespace=namespace, name=name)
1008 # Item could be a class object or an instance object...
1009 if hasattr(item, 'namespace') and hasattr(item, 'name'):
1010 if callable(item.namespace):
1011 namespace = item.namespace()
1013 namespace = item.namespace
1014 if callable(item.name):
1018 return OperationRegistryKey(namespace=namespace, name=name)
1019 raise exceptions.ValueError('Not a usable OperationRegistryKey: {}'.format(item))
1022 class OperationRegistry(collections.UserDict):
1023 """Helper class to map identifiers to Operation implementation instances.
1025 This is an implementation detail of gmxapi.operation and should not be used from
1026 outside of the package until a stable interface can be specified.
1030 def __getitem__(self, item: OperationRegistryKey):
1034 def __getitem__(self, item: str):
1037 def __getitem__(self, *args):
1038 """Fetch the requested operation registrant.
1040 The getter is overloaded to support look-ups in multiple forms.
1042 The key can be given in the following forms.
1043 * As a period-delimited string of the form "namespace.operation".
1044 * As an OperationRegistryKey object.
1045 * As a sequence of positional arguments accepted by OperationRegistryKey.
1046 * As an object conforming to the OperationRegistryKey interface.
1049 item = _make_registry_key(*args)
1050 except exceptions.Error as e:
1051 raise exceptions.TypeError('Could not interpret key as a OperationRegistryKey.') from e
1052 return self.data[item]
1055 # Module level data store for locating operation implementations at run time.
1056 # TODO: This may make sense as instance data of a root Context instance, but we
1057 # don't have a stable interface for communicating between Contexts yet.
1058 # Alternatively, it could be held as class data in a RegisteredOperation class,
1059 # but note that the "register" member function would behave less like the abc.ABCMeta
1060 # support for "virtual subclassing" and more like the generic function machinery
1061 # of e.g. functools.singledispatch.
1062 _operation_registry = OperationRegistry()
1065 def _register_operation(cls: typing.Type[OperationImplementation]):
1066 assert isinstance(cls, type)
1067 assert issubclass(cls, OperationImplementation)
1068 operation = _make_registry_key(cls)
1069 if operation in _operation_registry:
1070 full_name = str(operation)
1071 raise exceptions.ProtocolError('Attempting to redefine operation {}.'.format(full_name))
1072 _operation_registry[operation] = cls
1075 # TODO: replace with a generic function that we dispatch on so the type checker can infer a return type.
1076 def _get_operation_director(operation, context: gmx.abc.Context):
1081 :return: gmxapi.abc.OperationDirector
1083 registrant = _operation_registry[operation]
1084 director = registrant.director(context=context)
1088 class InputDescription(abc.ABC):
1089 """Node input description for gmxapi.operation module.
1091 Provides the support needed to understand operation inputs in gmxapi.operation
1094 .. todo:: Implementation base class with heritable behavior and/or helpers to
1095 compose this functionality from more normative description of
1096 operation inputs. This will probably become a facet of the ResourceDirector
1097 when specialized for gmxapi.operation.Context.
1101 def signature(self) -> InputCollectionDescription:
1102 """Mapping of named inputs and input type.
1104 Used to determine valid inputs before an Operation node is created.
1107 Related to the operation resource factory for this context.
1110 Better unification of this protocol, InputCollectionDescription, and
1112 Note, also, that the *bind* method of the returned InputCollectionDescription
1113 serves as the resource factory for input to the node builder.
1118 def make_uid(self, input: 'DataEdge') -> str:
1119 """The unique identity of an operation node tags the output with respect to the input.
1121 Combines information on the Operation details and the input to uniquely
1122 identify the Operation node.
1125 input : A (collection of) data source(s) that can provide Fingerprints.
1127 Used internally by the Context to manage ownership of data sources, to
1128 locate resources for nodes in work graphs, and to manage serialization,
1129 deserialization, and checkpointing of the work graph.
1131 The UID is a detail of the generic Operation that _should_ be independent
1132 of the Context details to allow the framework to manage when and where
1133 an operation is executed.
1135 TODO: We probably don't want to allow Operations to single-handedly determine their
1136 own uniqueness, but they probably should participate in the determination with the Context.
1138 TODO: Context implementations should be allowed to optimize handling of
1139 equivalent operations in different sessions or work graphs, but we do not
1140 yet guarantee that UIDs are globally unique!
1147 class ConcreteInputDescription(InputDescription):
1148 """Simple composed InputDescription."""
1151 input_signature: InputCollectionDescription,
1152 uid_helper: typing.Callable[['DataEdge'], str]
1154 self._input_signature_description = input_signature
1155 self._uid_helper = uid_helper
1157 def signature(self) -> InputCollectionDescription:
1158 return self._input_signature_description
1160 def make_uid(self, input: 'DataEdge') -> str:
1161 return self._uid_helper(input)
1164 class OperationMeta(abc.ABCMeta):
1165 """Metaclass to manage the definition of Operation implementation classes.
1167 Note that this metaclass can be superseded by `__init_subclass__()` when
1168 the minimum Python version is increased to Python 3.6+.
1171 def __new__(meta, name, bases, class_dict):
1172 cls = super().__new__(meta, name, bases, class_dict)
1173 # Register subclasses, but not the base class.
1174 if issubclass(cls, OperationImplementation) and cls is not OperationImplementation:
1175 # TODO: Remove OperationDetailsBase layer and this extra check.
1176 # Note: we do not yet register the Operations built dynamically because we
1177 # don't have a clear definition of unique implementations yet. For instance,
1178 # a new OperationDetails class is defined for each call to gmx.join_arrays
1179 # TODO: Properly register and reuse Operations defined dynamically
1180 # through function_wrapper (currently encompassed by OperationDetailsBase subclasses)
1181 if name != 'OperationDetailsBase':
1182 if OperationDetailsBase not in bases:
1183 _register_operation(cls)
1187 class OperationDetailsBase(OperationImplementation, InputDescription,
1188 metaclass=OperationMeta):
1189 """Abstract base class for Operation details in this module's Python Context.
1191 Provides necessary interface for use with gmxapi.operation.ResourceManager.
1192 Separates the details of an Operation from those of the ResourceManager in
1195 OperationDetails classes are almost stateless, serving mainly to compose implementation
1196 details. Instances (operation objects) provide the Context-dependent interfaces
1197 for a specific node in a work graph.
1199 OperationDetails subclasses are created dynamically by function_wrapper and
1202 Developer note: when subclassing, note that the ResourceManager is responsible
1203 for managing Operation state. Do not add instance data members related to
1204 computation or output state.
1206 TODO: determine what is acceptable instance data and/or initialization information.
1207 Note that currently the subclass in function_wrapper has _no_ initialization input,
1208 but does not yet handle input-dependent output specification or node fingerprinting.
1209 It seems likely that instance initialization will require some characterization of
1210 supplied input, but nothing else. Even that much is not necessary if the instance
1211 is completely stateless, but that would require additional parameters to the member
1212 functions. However, an instance should be tied to a specific ResourceManager and
1213 Context, so weak references to these would be reasonable.
1217 def output_description(self) -> OutputCollectionDescription:
1218 """Mapping of available outputs and types for an existing Operation node."""
1222 def publishing_data_proxy(self, *,
1223 instance: SourceResource[typing.Any, _PublishingDataProxyType],
1224 client_id) -> _PublishingDataProxyType:
1225 """Factory for Operation output publishing resources.
1227 Used internally when the operation is run with resources provided by instance."""
1231 def output_data_proxy(self,
1232 instance: SourceResource[_OutputDataProxyType, typing.Any]
1233 ) -> _OutputDataProxyType:
1234 """Get an object that can provide Futures for output data managed by instance."""
1238 def __call__(self, resources: _Resources):
1239 """Execute the operation with provided resources.
1241 Resources are prepared in an execution context with aid of resource_director()
1243 After the first call, output data has been published and is trivially
1244 available through the output_data_proxy()
1250 def resource_director(cls, *, input, output: _PublishingDataProxyType) -> _Resources[_PublishingDataProxyType]:
1251 """a Director factory that helps build the Session Resources for the function.
1253 The Session launcher provides the director with all of the resources previously
1254 requested/negotiated/registered by the Operation. The director uses details of
1255 the operation to build the resources object required by the operation runner.
1257 For the Python Context, the protocol is for the Context to call the
1258 resource_director instance method, passing input and output containers.
1259 (See, for example, gmxapi.operation.PyFunctionRunnerResources)
1263 # TODO: Don't run the director. Just return the correct callable.
1265 def operation_director(cls, *args, context: 'Context', label=None, **kwargs) -> AbstractOperation:
1266 """Dispatching Director for adding a work node.
1268 A Director for input of a particular sort knows how to reconcile
1269 input with the requirements of the Operation and Context node builder.
1270 The Director (using a less flexible / more standard interface)
1271 builds the operation node using a node builder provided by the Context.
1273 This is essentially the creation method, instead of __init__, but the
1274 object is created and owned by the framework, and the caller receives
1275 an OperationHandle instead of a reference to an instance of cls.
1277 # TODO: We need a way to compose this functionality for arbitrary Contexts.
1278 # That likely requires traits on the Contexts, and registration of Context
1279 # implementations. It seems likely that an Operation will register Director
1280 # implementations on import, and dispatching will be moved to the Context
1281 # implementations, which can either find an appropriate OperationDirector
1282 # or raise a compatibility error. To avoid requirements on import order of
1283 # Operations and Context implementations, we can change this to a non-abstract
1284 # dispatching method, requiring registration in the global gmxapi.context
1285 # module, or get rid of this method and use something like pkg_resources
1286 # "entry point" groups for independent registration of Directors and Contexts,
1287 # each annotated with relevant traits. E.g.:
1288 # https://setuptools.readthedocs.io/en/latest/setuptools.html#dynamic-discovery-of-services-and-plugins
1290 if not isinstance(context, Context):
1291 raise exceptions.UsageError('Context instance needed for dispatch.')
1292 # TODO: use Context characteristics rather than isinstance checks.
1293 if isinstance(context, ModuleContext):
1294 construct = OperationDirector(*args, operation_details=cls, context=context, label=label, **kwargs)
1296 elif isinstance(context, SubgraphContext):
1297 construct = OperationDirector(*args, operation_details=cls, context=context, label=label, **kwargs)
1300 raise exceptions.ApiError('Cannot dispatch operation_director for context {}'.format(context))
1303 # TODO: Implement observer pattern for edge->node data flow.
1304 # Step 0: implement subject interface subscribe()
1305 # Step 1: implement subject interface get_state()
1306 # Step 2: implement observer interface update()
1307 # Step 3: implement subject interface notify()
1308 # Step 4: implement observer hook to support substantial change in source that
1309 # invalidates downstream fingerprinting, such as a new subgraph iteration.
1310 # class Observer(object):
1311 # """Abstract base class for data observers."""
1312 # def rebind(self, edge: DataEdge):
1313 # """Recreate the Operation at the consuming end of the DataEdge."""
1316 class Future(_Future):
1317 """gmxapi data handle.
1319 Future is currently more than a Future right now. (should be corrected / clarified.)
1320 Future is also a facade to other features of the data provider.
1322 Future objects are the most atomic interface between Contexts. User scripts
1323 may hold Futures from which they extract data with result(). Operation output
1324 used as input for another Operation can be decomposed such that the Operation
1325 factory has only Future objects in its input.
1327 TODO: ``subscribe`` method allows consumers to bind as Observers.
1329 TODO: extract the abstract class for input inspection?
1330 Currently abstraction is handled through SourceResource subclassing.
1333 description (ResultDescription): Describes the result to be obtained from this Future.
1337 def __init__(self, resource_manager: SourceResource, name: str, description: ResultDescription):
1339 if not isinstance(description, ResultDescription):
1340 raise exceptions.ValueError('Need description of requested data.')
1341 self.description = description # type: ResultDescription
1342 self.resource_manager = resource_manager
1344 # Deprecated. We should not "reset" futures, but reconstitute them, but we
1345 # need to move the data model to a subscription-based system so that we can
1346 # make Futures properly immutable and issue new ones across subgraph iterations.
1347 self._number_of_resets = 0
1349 def __eq__(self, other):
1350 # This function is defined because we have defined __hash__().
1351 # Before customizing __eq__(), recall that Python objects that compare
1352 # equal should hash to the same value.
1353 # Please keep the two functions semantically correct.
1354 return object.__eq__(self, other)
1357 # We cannot properly determine equivalency beyond the scope of a ResourceManager instance
1358 # without more developed data flow fingerprinting.
1359 return hash((id(self.resource_manager), self.name, self.description, self._number_of_resets))
1362 return '<Future: name={}, description={}>'.format(self.name, self.description)
1364 def result(self) -> ResultTypeVar:
1365 """Fetch data to the caller's Context.
1367 Returns an object of the concrete type specified according to
1368 the operation that produces this Result.
1370 Ensemble data are returned as a list. Scalar results or results from single
1371 member ensembles are returned as scalars.
1373 self.resource_manager.update_output()
1374 # Return ownership of concrete data
1375 handle = self.resource_manager.get(self.name)
1377 # For intuitive use in non-ensemble cases, we represent data as bare scalars
1378 # when possible. It is easier for users to cast scalars to lists of length 1
1379 # than to introspect their own code to determine if a list of length 1 is
1380 # part of an ensemble or not. The data model will become clearer as we
1381 # develop more robust handling of multidimensional data and data flow topologies.
1383 # we may distinguish between data of shape () and shape (1,), but we will need
1384 # to be careful with semantics. We are already starting to adopt a rule-of-thumb
1385 # that data objects assume the minimum dimensionality necessary unless told
1386 # otherwise, and we could make that a hard rule if it doesn't make other things
1388 if self.description.width == 1:
1389 return handle.data(member=0)
1391 return handle.data()
1394 """Mark the Future "not done" to allow reexecution.
1396 Invalidates cached results, resets "done" markers in data sources, and
1397 triggers _reset recursively.
1399 Note: this is a hack that is inconsistent with the plan of unique mappings
1400 of inputs to outputs, but allows a quick prototype for looping operations.
1402 self._number_of_resets += 1
1403 self.resource_manager.reset()
1407 return self.description.dtype
1409 def __getitem__(self, item):
1410 """Get a more limited view on the Future."""
1411 description = ResultDescription(dtype=self.dtype, width=self.description.width)
1412 # TODO: Use explicit typing when we have more thorough typing.
1413 description._dtype = None
1414 if self.description.width == 1:
1415 proxy = ProxyResourceManager(self,
1416 width=description.width,
1417 function=lambda value, key=item: value[key])
1419 proxy = ProxyResourceManager(self,
1420 width=description.width,
1421 function=lambda value, key=item:
1422 [subscriptable[key] for subscriptable in value])
1423 return proxy.future(self.name, description=description)
1426 class OutputDataDescriptor(ProxyDataDescriptor):
1427 """Read-only data descriptor for proxied access to output data.
1429 Knows how to get a Future from the resource manager.
1432 # TODO: Reconcile the internal implementation details with the visibility and
1433 # usages of this class.
1435 def __get__(self, proxy: DataProxyBase, owner):
1437 # Access through class attribute of owner class
1439 result_description = ResultDescription(dtype=self._dtype, width=proxy.ensemble_width)
1441 return proxy._resource_instance.future(name=self._name, description=result_description)
1444 class MutableResourceDescriptor(ProxyDataDescriptor):
1445 """Accessor for rich binding interfaces.
1447 Allows operations to access resources beyond the scope of the current
1448 resource manager. Used by operations whose interactions are more complicated
1449 than standard typed data flow at the scope of the current Context.
1451 Instead of a Future interface, the returned object is a MutableResource with
1452 which a subscriber can collaborate with lower-level protocols.
1455 def __get__(self, proxy: DataProxyBase, owner) -> typing.Union[MutableResource, 'MutableResourceDescriptor']:
1457 # Access through class attribute of owner class. We don't have a
1458 # specified use case for that, so allow inspection of the data
1459 # descriptor instance, itself.
1462 # The protocol for MD extension plugins requires that the simulation operation
1463 # subscribe to the plugin. Then the Context allows the plugin to access the
1464 # MdRunner interface as the simulation is launched.
1465 # The protocol for modify_input and for mdrun to consume the TPR payload
1466 # of read_tpr or modify_input should allow us to use the gmxapi 0.0.7
1467 # WorkSpec to configure and launch a simulation, which we can do by feeding
1468 # forward and building a fused operation at the mdrun node. The information
1469 # fed forward can just be references to the inputs and parameters of the
1470 # earlier operations, with annotations so that we know the intended behavior.
1473 def define_output_data_proxy(output_description: OutputCollectionDescription) -> typing.Type[DataProxyBase]:
1474 descriptors = {name: OutputDataDescriptor(name, description) for name, description in output_description.items()}
1476 class OutputDataProxy(DataProxyBase, descriptors=descriptors):
1477 """Handler for read access to the `output` member of an operation handle.
1479 Acts as a sort of ResultCollection.
1481 A ResourceManager creates an OutputDataProxy instance at initialization to
1482 provide the ``output`` property of an operation handle.
1485 # Note: the OutputDataProxy has an inherent ensemble shape in the context
1486 # in which it is used, but that is an instance characteristic, not part of this type definition.
1487 # TODO: (FR5) The current tool does not support topology changing operations.
1488 return OutputDataProxy
1491 # Encapsulate the description of the input data flow.
1492 PyFuncInput = collections.namedtuple('Input', ('args', 'kwargs', 'dependencies'))
1495 class SinkTerminal(object):
1496 """Operation input end of a data edge.
1498 In addition to the information in an InputCollectionDescription, includes
1499 topological information for the Operation node (ensemble width).
1501 Collaborations: Required for creation of a DataEdge. Created with knowledge
1502 of a DataSourceCollection instance and a InputCollectionDescription.
1505 # TODO: This clearly maps to a Builder pattern.
1506 # I think we want to get the sink terminal builder from a factory parameterized by InputCollectionDescription,
1507 # add data source collections, and then build the sink terminal for the data edge.
1508 def __init__(self, input_collection_description: InputCollectionDescription):
1509 """Define an appropriate data sink for a new operation node.
1511 Resolve data sources and input description to determine connectability,
1512 topology, and any necessary implicit data transformations.
1514 :param input_collection_description: Available inputs for Operation
1515 :return: Fully formed description of the Sink terminal for a data edge to be created.
1517 Collaborations: Execution Context implementation.
1519 self.ensemble_width = 1
1520 self.inputs = input_collection_description
1523 return '<SinkTerminal: ensemble_width={}>'.format(self.ensemble_width)
1525 def update_width(self, width: int):
1526 if not isinstance(width, int):
1530 raise exceptions.TypeError('Need an integer width > 0.')
1532 raise exceptions.ValueError('Nonsensical ensemble width: {}'.format(int(width)))
1533 if self.ensemble_width != 1:
1534 if width != self.ensemble_width:
1535 raise exceptions.ValueError(
1536 'Cannot change ensemble width {} to width {}.'.format(self.ensemble_width, width))
1537 self.ensemble_width = width
1539 def update(self, data_source_collection: DataSourceCollection):
1540 """Update the SinkTerminal with the proposed data provider."""
1541 for name, sink_dtype in self.inputs.items():
1542 if name not in data_source_collection:
1543 # If/when we accept data from multiple sources, we'll need some additional sanity checking.
1544 if not hasattr(self.inputs.signature.parameters[name], 'default'):
1545 raise exceptions.UsageError('No data or default for {}'.format(name))
1547 # With a single data source, we need data to be in the source or have a default
1548 assert name in data_source_collection
1549 assert issubclass(sink_dtype, valid_result_types)
1550 source = data_source_collection[name]
1551 logger.debug('Updating Sink for source {}: {}.'.format(name, source))
1552 if isinstance(source, sink_dtype):
1553 logger.debug('Source matches sink. No update necessary.')
1556 if isinstance(source, collections.abc.Iterable) and not isinstance(source, (
1557 str, bytes, collections.abc.Mapping)):
1558 assert isinstance(source, datamodel.NDArray)
1559 if sink_dtype != datamodel.NDArray:
1560 # Source is NDArray, but sink is not. Implicitly scatter.
1561 self.update_width(len(source))
1563 if hasattr(source, 'description'):
1564 source_description = typing.cast(ResultDescription, source.description)
1565 source_dtype = source_description.dtype
1566 assert isinstance(sink_dtype, type)
1567 # TODO: Handle typing of Future slices when we have a better data model.
1568 if source_dtype is not None:
1569 assert isinstance(source_dtype, type)
1570 if not issubclass(source_dtype, sink_dtype):
1571 raise exceptions.TypeError('Expected {} but got {}.'.format(sink_dtype, source_dtype))
1572 source_width = source.description.width
1573 self.update_width(source_width)
1576 class DataEdge(object):
1577 """State and description of a data flow edge.
1579 A DataEdge connects a data source collection to a data sink. A sink is an
1580 input or collection of inputs of an operation (or fused operation). An operation's
1581 inputs may be fed from multiple data source collections, but an operation
1582 cannot be fully instantiated until all of its inputs are bound, so the DataEdge
1583 is instantiated at the same time the operation is instantiated because the
1584 required topology of a graph edge may be determined by the required topology
1585 of another graph edge.
1587 A data edge has a well-defined topology only when it is terminated by both
1588 a source and sink. Creation requires that a source collection is compared to
1591 Calling code initiates edge creation by passing well-described data sources
1592 to an operation factory. The data sources may be annotated with explicit scatter
1595 The resource manager for the new operation determines the
1596 required shape of the sink to handle all of the offered input.
1599 and transformations of the data sources are then determined and the edge is
1602 At that point, the fingerprint of the input data at each operation
1603 becomes available to the resource manager for the operation. The fingerprint
1604 has sufficient information for the resource manager of the operation to
1605 request and receive data through the execution context.
1607 Instantiating operations and data edges implicitly involves collaboration with
1608 a Context instance. The state of a given Context or the availability of a
1609 default Context through a module function may affect the ability to instantiate
1610 an operation or edge. In other words, behavior may be different for connections
1611 being made in the scripting environment versus the running Session, and implementation
1612 details can determine whether or not new operations or data flow can occur in
1613 different code environments.
1616 class ConstantResolver(object):
1617 def __init__(self, value):
1620 def __call__(self, member=None):
1623 def __init__(self, source_collection: DataSourceCollection, sink_terminal: SinkTerminal):
1624 # Adapters are callables that transform a source and node ID to local data.
1625 # Every key in the sink has an adapter.
1627 self.source_collection = source_collection
1628 self.sink_terminal = sink_terminal
1629 for name in sink_terminal.inputs:
1630 if name not in source_collection:
1631 if hasattr(sink_terminal.inputs[name], 'default'):
1632 self.adapters[name] = self.ConstantResolver(sink_terminal.inputs[name])
1634 # TODO: Initialize with multiple DataSourceCollections?
1635 raise exceptions.ValueError('No source or default for required input "{}".'.format(name))
1637 source = source_collection[name]
1638 sink = sink_terminal.inputs[name]
1639 if isinstance(source, (str, bool, int, float, dict)):
1640 if issubclass(sink, (str, bool, int, float, dict)):
1641 self.adapters[name] = self.ConstantResolver(source)
1643 assert issubclass(sink, datamodel.NDArray)
1644 self.adapters[name] = self.ConstantResolver(datamodel.ndarray([source]))
1645 elif isinstance(source, datamodel.NDArray):
1646 if issubclass(sink, datamodel.NDArray):
1647 # TODO: shape checking
1648 # Implicit broadcast may not be what is intended
1649 self.adapters[name] = self.ConstantResolver(source)
1651 if source.shape[0] != sink_terminal.ensemble_width:
1652 raise exceptions.ValueError(
1653 'Implicit broadcast could not match array source to ensemble sink')
1655 self.adapters[name] = lambda member, source=source: source[member]
1656 elif hasattr(source, 'result'):
1657 # Handle data futures...
1658 # If the Future is part of an ensemble, result() will return a list.
1659 # Otherwise, it will return a single object.
1660 ensemble_width = source.description.width
1661 # TODO: subscribe to futures so results can be pushed.
1662 if ensemble_width == 1:
1663 self.adapters[name] = lambda member, source=source: source.result()
1665 self.adapters[name] = lambda member, source=source: source.result()[member]
1667 assert isinstance(source, EnsembleDataSource)
1668 self.adapters[name] = lambda member, source=source: source.node(member)
1671 return '<DataEdge: source_collection={}, sink_terminal={}>'.format(self.source_collection, self.sink_terminal)
1674 self.source_collection.reset()
1676 def resolve(self, key: str, member: int):
1677 return self.adapters[key](member=member)
1679 def sink(self, node: int) -> dict:
1680 """Consume data for the specified sink terminal node.
1682 Run-time utility delivers data from the bound data source(s) for the
1683 specified terminal that was configured when the edge was created.
1685 Terminal node is identified by a member index number.
1688 A Python dictionary of the provided inputs as local data (not Future).
1691 sink_ports = self.sink_terminal.inputs
1692 for key in sink_ports:
1693 results[key] = self.resolve(key, node)
1697 class ResourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]):
1698 """Provides data publication and subscription services.
1700 Owns the data published by the operation implementation or served to consumers.
1701 Mediates read and write access to the managed data streams.
1703 This ResourceManager implementation is defined in conjunction with a
1704 run-time definition of an Operation that wraps a Python callable (function).
1705 ResourceManager is instantiated with a reference to the callable.
1707 When the Operation is run, the resource manager prepares resources for the wrapped
1708 function. Inputs provided to the Operation factory are provided to the
1709 function as keyword arguments. The wrapped function publishes its output
1710 through the (additional) ``output`` key word argument. This argument is
1711 a short-lived resource, prepared by the ResourceManager, with writable
1712 attributes named in the call to function_wrapper().
1714 After the Operation has run and the outputs published, the data managed
1715 by the ResourceManager is marked "done."
1719 The data() method produces a read-only collection of outputs named for
1720 the Operation when the Operation's ``output`` attribute is accessed.
1722 publishing_resources() can be called once during the ResourceManager lifetime
1723 to provide the ``output`` object for the wrapped function. (Used by update_output().)
1725 update_output() brings the managed output data up-to-date with the input
1726 when the Operation results are needed. If the Operation has not run, an
1727 execution session is prepared with input and output arguments for the
1728 wrapped Python callable. Output is publishable only during this session.
1730 TODO: This functionality should evolve to be a facet of Context implementations.
1731 There should be no more than one ResourceManager instance per work graph
1732 node in a Context. This will soon be at odds with letting the ResourceManager
1733 be owned by an operation instance handle.
1734 TODO: The publisher and data objects can be more strongly defined through
1735 interaction between the Context and clients.
1739 The normative pattern for updating data is to execute a node in the work
1740 graph, passing Resources for an execution Session to an operation runner.
1741 The resources and runner are dependent on the implementation details of
1742 the operation and the execution context, so logical execution may look
1745 resource_builder = ResourcesBuilder()
1746 runner_builder = RunnerBuilder()
1747 input_resource_director = input_resource_factory.director(input)
1748 output_resource_director = publishing_resource_factory.director(output)
1749 input_resource_director(resource_builder, runner_builder)
1750 output_resource_director(resource_builder, runner_builder)
1751 resources = resource_builder.build()
1752 runner = runner_builder.build()
1755 Only the final line is intended to be literal. The preceding code, if it
1756 exists in entirety, may be spread across several code comments.
1758 TODO: Data should be pushed, not pulled.
1759 Early implementations executed operation code and extracted results directly.
1760 While we need to be able to "wait for" results and alert the data provider that
1761 we are ready for input, we want to defer execution management and data flow to
1766 def __publishing_context(self, ensemble_member=0) -> typing.Iterator[_PublishingDataProxyType]:
1767 """Get a context manager for resolving the data dependencies of this node.
1769 The returned object is a Python context manager (used to open a `with` block)
1770 to define the scope in which the operation's output can be published.
1771 'output' type resources can be published exactly once, and only while the
1772 publishing context is active. (See operation.function_wrapper())
1774 Used internally to implement ResourceManager.publishing_resources()
1776 Responsibilities of the context manager are to:
1777 * (TODO) Make sure dependencies are resolved.
1778 * Make sure outputs are marked 'done' when leaving the context.
1783 # if self._data.done():
1784 # raise exceptions.ProtocolError('Resources have already been published.')
1786 # I don't think we want the OperationDetails to need to know about ensemble data,
1787 # (though the should probably be allowed to), so we may need a separate interface
1788 # for the resource manager with built-in scope-limiting to a single ensemble member.
1789 # Right now, one Operation handle owns one ResourceManager (which takes care of
1790 # the ensemble details), which owns one OperationDetails (which has no ensemble knowledge).
1791 # It is the responsibility of the calling code to make sure the PublishingDataProxy
1792 # gets used correctly.
1794 # ref: https://docs.python.org/3/library/contextlib.html#contextlib.contextmanager
1796 if not self._done[ensemble_member]:
1797 resource = self.__publishing_data_proxy(instance=weakref.proxy(self),
1798 client_id=ensemble_member)
1800 except Exception as e:
1801 message = 'Uncaught {} while providing output-publishing resources for {}.'
1802 message.format(repr(e), self.operation_id)
1803 raise exceptions.ApiError(message) from e
1805 logger.debug('Published output for {} member {}'.format(self.operation_id, ensemble_member))
1806 self._done[ensemble_member] = True
1808 def __init__(self, *,
1811 output_description: OutputCollectionDescription,
1812 output_data_proxy: typing.Type[_OutputDataProxyType],
1813 publishing_data_proxy: typing.Type[_PublishingDataProxyType],
1816 output_context: 'Context'):
1817 """Initialize a resource manager for the inputs and outputs of an operation.
1819 # Note: This implementation assumes there is one ResourceManager instance per data source,
1820 # so we only stash the inputs and dependency information for a single set of resources.
1821 # TODO: validate input_fingerprint as its interface becomes clear.
1822 self._input_edge = source
1823 self.ensemble_width = self._input_edge.sink_terminal.ensemble_width
1826 self.operation_id = operation_id
1828 if isinstance(output_context, Context):
1829 self._output_context = output_context
1831 message = 'Provide an instance of gmxapi.operation.Context for output_context'
1832 raise exceptions.UsageError(message)
1833 assert self._output_context is not None
1835 self._output_data_proxy = output_data_proxy
1836 assert self._output_data_proxy is not None
1837 assert callable(self._output_data_proxy)
1839 self._output_description = output_description
1840 assert self._output_description is not None
1842 self.__publishing_data_proxy = publishing_data_proxy
1843 assert self.__publishing_data_proxy is not None
1844 assert callable(self.__publishing_data_proxy)
1846 self._runner_director = runner_director
1847 assert self._runner_director is not None
1848 self._resource_factory = resource_factory
1849 assert self._resource_factory is not None
1851 self._data = _make_datastore(output_description=self._output_description,
1852 ensemble_width=self.ensemble_width)
1854 # We store a rereference to the publishing context manager implementation
1855 # in a data structure that can only produce one per Python interpreter
1856 # (using list.pop()).
1857 # TODO: reimplement as a data descriptor
1858 # so that PublishingDataProxy does not need a bound circular reference.
1859 self.__publishing_resources = [self.__publishing_context]
1861 self._done = [False] * self.ensemble_width
1862 self.__operation_entrance_counter = 0
1864 def width(self) -> int:
1865 return self.ensemble_width
1868 self.__operation_entrance_counter = 0
1869 self._done = [False] * self.ensemble_width
1870 self.__publishing_resources = [self.__publishing_context]
1871 for data in self._data.values():
1873 self._input_edge.reset()
1874 assert self.__operation_entrance_counter == 0
1876 def done(self, member=None):
1878 return all(self._done)
1880 return self._done[member]
1882 def set_result(self, name, value, member: int):
1883 if not isinstance(value, (str, bytes)):
1886 # In this specification, it is antithetical to publish Futures.
1887 if hasattr(item, 'result'):
1888 raise exceptions.ApiError('Operation produced Future instead of real output.')
1890 # Ignore when `item` is not iterable.
1892 self._data[name].set(value=value, member=member)
1894 def is_done(self, name):
1895 return self._data[name].done
1897 def get(self, name: str):
1900 Raises exceptions.ProtocolError if requested data is not local yet.
1901 Raises exceptions.ValueError if data is requested for an unknown name.
1903 if name not in self._data:
1904 raise exceptions.ValueError('Request for unknown data.')
1905 if not self.is_done(name):
1906 raise exceptions.ProtocolError('Data not ready.')
1907 assert isinstance(self._data[name], OutputData)
1908 return self._data[name]
1910 # TODO: Normalize. This is the no-argument, no-return callable member of an
1911 # operation handle that dispatches to another Context (the operation implementation).
1912 # TODO: Allow update of a single ensemble member. As written, this always updates all ensemble members. That can
1913 # be the default behavior, but we don't want to require non-local updates in all cases.
1914 def update_output(self):
1915 """Bring the output of the bound operation up to date.
1917 Execute the bound operation once if and only if it has not
1918 yet been run in the lifetime of this resource manager.
1920 Used internally to implement Futures for the local operation
1921 associated with this resource manager.
1923 TODO: We need a different implementation for an operation whose output
1924 is served by multiple resource managers. E.g. an operation whose output
1925 is available across the ensemble, but which should only be executed on
1926 a single ensemble member.
1928 # This code is not intended to be reentrant. We make a modest attempt to
1929 # catch unexpected reentrance, but this is not (yet) intended to be a thread-safe
1930 # resource manager implementation.
1931 # TODO: Handle checking just the ensemble members this resource manager is responsible for.
1932 # TODO: Replace with a managed observer pattern. Update once when input is available in the Context.
1934 # Note: This check could also be encapsulated in a run_once decorator that
1935 # could even set a data descriptor to change behavior.
1936 self.__operation_entrance_counter += 1
1937 if self.__operation_entrance_counter > 1:
1938 raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
1940 # Note! This is a detail of the ResourceManager in a SerialContext
1941 # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
1942 # operation's library/implementation context.
1943 publishing_resources = self.publishing_resources()
1944 for i in range(self.ensemble_width):
1945 # TODO: rewrite the following expression as a call to a resource factory.
1946 # TODO: Consider whether the resource_factory behavior should be normalized
1947 # to always use `with` blocks to indicate the lifetime of a resource handle.
1948 # That implies that an operation handle can expire, but the operation handle
1949 # could be "yield"ed
1950 # from within the `with` block to keep the resource scope alive until the resulting
1951 # generator is exhausted. Not sure what that looks like or what the use case would be.
1952 with self.local_input(i) as input:
1953 # Note: Resources are marked "done" by the publishing system
1954 # before the following context manager finishes exiting.
1955 with publishing_resources(ensemble_member=i) as output:
1956 # self._runner(*input.args, output=output, **input.kwargs)
1958 # Here we can make _runner a thing that accepts session resources, and
1959 # is created by specializable builders. Separate out the expression of
1962 # resource_builder = OperationDetails.ResourcesBuilder(context)
1963 # runner_builder = OperationDetails.RunnerBuilder(context)
1964 # input_resource_director = self._input_resource_factory.director(input)
1965 # output_resource_director = self._publishing_resource_factory.director(output)
1966 # input_resource_director(resource_builder, runner_builder)
1967 # output_resource_director(resource_builder, runner_builder)
1968 # resources = resource_builder.build()
1969 # runner = runner_builder.build()
1972 # This resource factory signature might need to be inverted or broken up
1973 # into a builder for consistency. I.e.
1974 # option 1: Make the input and output resources with separate factories and add_resource on
1975 # the runner builder.
1976 # option 2: Pass resource_builder to input_director and then output_director.
1977 resources = self._resource_factory(input=input, output=output)
1978 runner = self._runner_director(resources)
1981 raise exceptions.ApiError('update_output implementation failed to update all outputs.')
1983 def future(self, name: str, description: ResultDescription):
1984 """Retrieve a Future for a named output.
1986 Provide a description of the expected result to check for compatibility or
1987 implicit topological conversion.
1989 TODO: (FR5+) Normalize this part of the interface between operation definitions and
1992 if not isinstance(name, str) or name not in self._data:
1993 raise exceptions.ValueError('"name" argument must name an output.')
1994 assert description is not None
1995 requested_dtype = description.dtype
1996 available_dtype = self._data[name]._description.dtype
1997 if requested_dtype != available_dtype:
1998 # TODO: framework to check for implicit conversions
1999 message = 'Requested Future of type {} is not compatible with available type {}.'
2000 message = message.format(requested_dtype, available_dtype)
2001 raise exceptions.ApiError(message)
2002 return Future(self, name, description)
2004 def data(self) -> _OutputDataProxyType:
2005 """Get an adapter to the output resources to access results."""
2006 return self._output_data_proxy(self)
2009 def local_input(self, member: int = None):
2010 """In an API session, get a handle to fully resolved locally available input data.
2012 Execution dependencies are resolved on creation of the context manager. Input data
2013 becomes available in the ``as`` object when entering the context manager, which
2014 becomes invalid after exiting the context manager. Resources allocated to hold the
2015 input data may be released when exiting the context manager.
2017 It is left as an implementation detail whether the context manager is reusable and
2018 under what circumstances one may be obtained.
2021 kwargs = self._input_edge.sink(node=member)
2022 assert 'input' not in kwargs
2024 # Check that we have real data
2025 for key, value in kwargs.items():
2026 assert not hasattr(value, 'result')
2027 assert not hasattr(value, 'run')
2029 if isinstance(value, list):
2031 if isinstance(value, datamodel.NDArray):
2032 value_list = value._values
2033 if isinstance(value, collections.abc.Mapping):
2034 value_list = value.values()
2035 assert not isinstance(value_list, Future)
2036 assert not hasattr(value_list, 'result')
2037 assert not hasattr(value_list, 'run')
2038 for item in value_list:
2039 assert not hasattr(item, 'result')
2041 input_pack = InputPack(kwargs=kwargs)
2043 # Prepare input data structure
2044 # Note: we use 'yield' instead of 'return' for the protocol expected by
2045 # the @contextmanager decorator
2048 def publishing_resources(self):
2049 """Get a context manager for resolving the data dependencies of this node.
2051 Use the returned object as a Python context manager.
2052 'output' type resources can be published exactly once, and only while the
2053 publishing context is active.
2055 Write access to publishing resources can be granted exactly once during the
2056 resource manager lifetime and conveys exclusive access.
2058 return self.__publishing_resources.pop()
2061 class PyFunctionRunnerResources(collections.UserDict):
2062 """Runtime resources for Python functions.
2064 Produced by a ResourceDirector for a particular Operation.
2068 if 'output' in self:
2069 return self['output']
2074 return {key: value for key, value in self.items() if key != 'output'}
2077 class PyFunctionRunner(abc.ABC):
2078 def __init__(self, *, function: typing.Callable, output_description: OutputCollectionDescription):
2079 assert callable(function)
2080 self.function = function
2081 self.output_description = output_description
2084 def __call__(self, resources: PyFunctionRunnerResources):
2085 self.function(output=resources.output(), **resources.input())
2088 class CapturedOutputRunner(PyFunctionRunner):
2089 """Function runner that captures return value as output.data"""
2091 def __call__(self, resources: PyFunctionRunnerResources):
2092 resources['output'].data = self.function(**resources.input())
2095 class OutputParameterRunner(PyFunctionRunner):
2096 """Function runner that uses output parameter to let function publish output."""
2098 def __call__(self, resources: PyFunctionRunnerResources):
2099 self.function(**resources)
2102 def wrapped_function_runner(function, output_description: OutputCollectionDescription = None) -> PyFunctionRunner:
2103 """Get an adapter for a function to be wrapped.
2105 If the function does not accept a publishing data proxy as an `output`
2106 key word argument, the returned object has a `capture_output` attribute that
2107 must be re-assigned by the calling code before calling the runner. `capture_output`
2108 must be assigned to be a callable that will receive the output of the wrapped
2112 Callable with a signature `__call__(*args, **kwargs)` and no return value
2115 OperationDetails.resource_director assigns the `capture_output` member of the returned object.
2117 assert callable(function)
2118 signature = inspect.signature(function)
2120 # Implementation note: this function dispatches an implementation with the
2121 # logic below. A better factoring would be a "chain of responsibility" in
2122 # which the concrete Runners would be tried in sequence and determine internally
2123 # whether to create a runner, raise an error, or defer.
2125 # Determine output details for proper dispatching.
2126 # First check for signature with output parameter.
2127 # TODO FR4: standardize typing
2128 if 'output' in signature.parameters:
2129 if not isinstance(output_description, OutputCollectionDescription):
2130 if not isinstance(output_description, collections.abc.Mapping):
2131 raise exceptions.UsageError(
2132 'Function passes output through call argument, but output is not described.')
2133 return OutputParameterRunner(
2135 output_description=OutputCollectionDescription(**output_description))
2137 return OutputParameterRunner(function=function,
2138 output_description=output_description)
2139 # Next try output_description parameter or function return annotation.
2141 if isinstance(output_description, OutputCollectionDescription):
2142 return_type = output_description['data'].gmxapi_datatype
2143 elif output_description is not None:
2144 # output_description should be None for inferred output or
2145 # a singular mapping of the key 'data' to a gmxapi type.
2146 if not isinstance(output_description, collections.abc.Mapping) \
2147 or set(output_description.keys()) != {'data'}:
2148 raise exceptions.ApiError(
2149 'invalid output description for wrapped function: {}'.format(output_description))
2150 if signature.return_annotation != signature.empty:
2151 if signature.return_annotation != output_description['data']:
2152 raise exceptions.ApiError(
2153 'Wrapped function with return-value-capture provided with non-matching output description.')
2154 return_type = output_description['data']
2156 # Use return type inferred from function signature.
2157 return_type = signature.return_annotation
2158 if return_type == signature.empty or return_type is None:
2159 raise exceptions.ApiError('No return annotation or output_description for {}'.format(function))
2160 return CapturedOutputRunner(function=function,
2161 output_description=OutputCollectionDescription(data=return_type))
2164 # TODO: Refactor in terms of reference to a node in a Context.
2165 # ResourceManager is an implementation detail of how the Context
2167 class OperationHandle(AbstractOperation[_OutputDataProxyType]):
2168 """Generic Operation handle for dynamically defined operations.
2170 Define a gmxapi Operation for the functionality being wrapped by the enclosing code.
2172 An Operation type definition encapsulates description of allowed inputs
2173 of an Operation. An Operation instance represents a node in a work graph
2174 with uniquely fingerprinted inputs and well-defined output. The implementation
2175 of the operation is a collaboration with the resource managers resolving
2176 data flow for output Futures, which may depend on the execution context.
2179 def __init__(self, resource_manager: SourceResource[_OutputDataProxyType, typing.Any]):
2180 """Initialization defines the unique input requirements of a work graph node.
2182 Initialization parameters map to the parameters of the wrapped function with
2183 addition(s) to support gmxapi data flow and deferred execution.
2185 If provided, an ``input`` keyword argument is interpreted as a parameter pack
2186 of base input. Inputs also present as standalone keyword arguments override
2187 values in ``input``.
2189 Inputs that are handles to gmxapi operations or outputs induce data flow
2190 dependencies that the framework promises to satisfy before the Operation
2191 executes and produces output.
2193 # TODO: When the resource manager can be kept alive by an enclosing or
2194 # module-level Context, convert to a weakref.
2195 self.__resource_manager = resource_manager
2196 # The unique identifier for the operation node allows the Context implementation
2197 # to manage the state of the handle. Reproducibility of node_uid is TBD, but
2198 # it must be unique in a Context where it references a different operation node.
2199 self.node_uid = None
2202 def output(self) -> _OutputDataProxyType:
2203 # TODO: We can configure `output` as a data descriptor
2204 # instead of a property so that we can get more information
2205 # from the class attribute before creating an instance of OperationDetails.OutputDataProxy.
2206 # The C++ equivalence would probably be a templated free function for examining traits.
2207 return self.__resource_manager.data()
2210 """Make a single attempt to resolve data flow conditions.
2212 This is a public method, but should not need to be called by users. Instead,
2213 just use the `output` data proxy for result handles, or force data flow to be
2214 resolved with the `result` methods on the result handles.
2216 `run()` may be useful to try to trigger computation (such as for remotely
2217 dispatched work) without retrieving results locally right away.
2219 `run()` is also useful internally as a facade to the Context implementation details
2220 that allow `result()` calls to ask for their data dependencies to be resolved.
2221 Typically, `run()` will cause results to be published to subscribing operations as
2222 they are calculated, so the `run()` hook allows execution dependency to be slightly
2223 decoupled from data dependency, as well as to allow some optimizations or to allow
2224 data flow to be resolved opportunistically. `result()` should not call `run()`
2225 directly, but should cause the resource manager / Context implementation to process
2226 the data flow graph.
2228 In one conception, `run()` can have a return value that supports control flow
2229 by itself being either runnable or not. The idea would be to support
2230 fault tolerance, implementations that require multiple iterations / triggers
2231 to complete, or looping operations.
2233 # Note: `run()` is a synonym for `resolve` or `update` or whatever we choose
2234 # to generically describe the request to bring a node up-to-date: i.e. the
2235 # non-returning callable on the object produced by a director.
2236 self.__resource_manager.update_output()
2239 class OperationPlaceholder(AbstractOperation):
2240 """Placeholder for Operation handle during subgraph definition."""
2242 def __init__(self, subgraph_resource_manager):
2246 raise exceptions.UsageError('This placeholder operation handle is not in an executable context.')
2250 """Allow subgraph components to be connected without instantiating actual operations."""
2251 if not isinstance(current_context(), SubgraphContext):
2252 raise exceptions.UsageError('Invalid access to subgraph internals.')
2255 _HandleType = typing.TypeVar('_HandleType', bound=gmx.abc.OperationReference)
2258 class NodeBuilder(gmx.abc.NodeBuilder):
2259 """Add an operation node to be managed by a Context.
2261 The NodeBuilder interface implies minimal internal logic, and instead
2262 specifies the set of information that must or may be provided to construct
2269 label: typing.Optional[str] = None):
2270 """Initialize the base NodeBuilder for gmxapi.operation module Nodes.
2273 Convert the *operation* argument to be the registrant in the Operation registry.
2274 Requires confirmation of conforming behavior for dynamically defined operations.
2277 self.context = context
2280 key = _make_registry_key(operation)
2281 except Exception as e:
2282 error = 'Could not create an operation registry key from {}'.format(operation)
2283 raise exceptions.ValueError(error) from e
2285 # TODO: sensibly handle dynamically defined operations.
2286 if key not in _operation_registry and not issubclass(operation, OperationDetailsBase):
2287 error = '{} must be initialized with a registered operation. Got {}.'
2288 raise exceptions.ValueError(error.format(__class__.__qualname__, operation))
2289 self.sources = DataSourceCollection()
2290 self._input_description = None
2291 self._resource_factory = None
2292 self._runner_director = None
2294 self._output_factory = None
2296 self._resource_manager = ResourceManager
2298 def set_input_description(self, input_description: InputDescription):
2299 self._input_description = input_description
2301 def set_resource_factory(self, factory):
2302 self._resource_factory = factory
2304 def set_runner_director(self, factory):
2305 self._runner_director = factory
2307 def set_handle(self, factory):
2308 self._handle = factory
2310 def set_output_factory(self, factory: 'OutputFactory'):
2311 self._output_factory = factory
2313 def set_resource_manager(self, implementation: typing.Type[ResourceManager]):
2314 """Allow overriding the default ResourceManager implementation.
2316 This is a workaround until we figure out what parts of the ResourceManager
2317 could be composed, registered separately with the Context, or be customized
2318 through other dispatching. One likely part of the solution is for clients
2319 of the NodeBuilder to assert requirements of the Context.
2321 assert issubclass(implementation, ResourceManager)
2322 self._resource_manager = implementation
2324 # TODO: Let the Context use the handle factory to produce a dynamically typed handle,
2325 # and figure out the right way to annotate the return value.
2327 """Create node and return a handle of the appropriate type."""
2329 # Check for the ability to instantiate operations.
2330 missing_details = list()
2331 for builder_resource in ['input_description',
2336 detail = '_' + builder_resource
2337 if getattr(self, detail, None) is None:
2338 missing_details.append(builder_resource)
2339 if len(missing_details) > 0:
2340 raise exceptions.UsageError(
2341 'Missing details needed for operation node: {}'.format(
2342 ', '.join(missing_details)
2345 assert hasattr(self._input_description, 'signature')
2346 input_sink = SinkTerminal(self._input_description.signature())
2347 input_sink.update(self.sources)
2348 logger.debug('SinkTerminal configured: {}'.format(SinkTerminal))
2349 edge = DataEdge(self.sources, input_sink)
2350 logger.debug('Created data edge {} with Sink {}'.format(edge, edge.sink_terminal))
2351 # TODO: Fingerprinting: Each operation instance has unique output based on the unique input.
2352 # input_data_fingerprint = edge.fingerprint()
2354 # Set up output proxy.
2355 assert hasattr(self._input_description, 'make_uid')
2356 uid = self._input_description.make_uid(edge)
2357 # TODO: ResourceManager should fetch the relevant factories from the Context
2358 # instead of getting an OperationDetails instance.
2359 output_data_proxy = self._output_factory.output_proxy()
2360 output_description = self._output_factory.output_description()
2361 publishing_data_proxy = self._output_factory.publishing_data_proxy()
2362 manager = self._resource_manager(output_context=self.context,
2365 output_data_proxy=output_data_proxy,
2366 output_description=output_description,
2367 publishing_data_proxy=publishing_data_proxy,
2368 resource_factory=self._resource_factory,
2369 runner_director=self._runner_director)
2370 self.context.work_graph[uid] = manager
2371 # TODO: Replace with a call to Node.handle()
2372 handle = self._handle(self.context.work_graph[uid])
2373 handle.node_uid = uid
2376 def add_input(self, name, source):
2377 # TODO: We can move some input checking here as the data model matures.
2378 self.sources[name] = source
2381 class InputPack(object):
2382 """Input container for data sources provided to resource factories.
2384 When gmxapi.operation Contexts provide run time inputs to operations,
2385 instances of this class are provided to the operation's registered
2389 kwargs (dict): collection of named data sources.
2392 def __init__(self, kwargs: typing.Mapping[str, SourceTypeVar]):
2393 self.kwargs = kwargs
2396 class Context(gmx.abc.Context):
2399 All gmxapi data and operations are owned by a Context instance. The Context
2400 manages the details of how work is run and how data is managed.
2402 Context implementations are not required to inherit from gmxapi.context.Context,
2403 but this class definition serves to specify the current Context API.
2405 If subclassing is used to implement new Contexts, be sure to initialize the
2406 base class when providing a new __init__
2409 def node(self, node_id) -> Node:
2410 if node_id in self.labels:
2411 return self.labels[node_id]
2412 elif node_id in self.work_graph:
2413 return self.work_graph[node_id]
2415 raise exceptions.ValueError('Could not find a node identified by {}'.format(node_id))
2418 self.operations = dict()
2419 self.labels = dict()
2420 self.work_graph = collections.OrderedDict()
2423 def node_builder(self, *, operation,
2424 label: typing.Optional[str] = None) -> NodeBuilder:
2425 """Get a builder for a new work graph node.
2427 Nodes are elements of computational work, with resources and execution
2428 managed by the Context. The Context handles parallelism resources, data
2429 placement, work scheduling, and data flow / execution dependencies.
2431 This method is used by Operation director code and helper functions to
2432 add work to the graph.
2435 operation: a registered gmxapi operation
2436 label: optional user-provided identifier to provide human-readable node locators.
2440 # TODO: *node()* accessor.
2441 # @abc.abstractmethod
2442 # def node(self, node_identifier) -> AbstractOperation:
2446 class ModuleNodeBuilder(NodeBuilder):
2447 """Builder for work nodes in gmxapi.operation.ModuleContext."""
2450 class ModuleContext(Context):
2451 """Context implementation for the gmxapi.operation module.
2456 def node_builder(self, operation, label=None) -> NodeBuilder:
2457 """Get a builder for a new work node to add an operation in this context."""
2458 if label is not None:
2459 if label in self.labels:
2460 raise exceptions.ValueError('Label {} is already in use.'.format(label))
2462 # The builder should update the labeled node when it is done.
2463 self.labels[label] = None
2465 return ModuleNodeBuilder(context=weakref.proxy(self), operation=operation, label=label)
2469 __current_context = [ModuleContext()]
2472 def current_context() -> Context:
2473 """Get a reference to the currently active Context.
2475 The imported gmxapi.context module maintains some state for the convenience
2476 of the scripting environment. Internally, all gmxapi activity occurs under
2477 the management of an API Context, explicitly or implicitly. Some actions or
2478 code constructs will generate separate contexts or sub-contexts. This utility
2479 command retrieves a reference to the currently active Context.
2481 return __current_context[-1]
2484 def push_context(context) -> Context:
2485 """Enter a sub-context by pushing a context to the global context stack.
2487 __current_context.append(context)
2488 return current_context()
2491 def pop_context() -> Context:
2492 """Exit the current Context by popping it from the stack."""
2493 return __current_context.pop()
2496 class OutputFactory(object):
2497 """Encapsulate the details of Operation output implementation in the gmxapi.operation Context.
2499 Currently, OutputFactory objects are containers that compose functionality
2500 with which to implement the required internal interface.
2503 def __init__(self, *,
2504 output_proxy: typing.Callable[[SourceResource], _OutputDataProxyType],
2505 output_description: OutputCollectionDescription,
2506 publishing_data_proxy: typing.Callable[[SourceResource, ClientID], _PublishingDataProxyType]):
2507 """Package the output details for an operation.
2510 output_proxy: factory to produce the *output* facet of an operation instance (node)
2511 output_description: fully formed output description
2512 publishing_data_proxy: factory to produce the run time output publishing resources
2515 if not callable(output_proxy):
2516 raise exceptions.ValueError('output_proxy argument must be a callable.')
2517 if not callable(publishing_data_proxy):
2518 raise exceptions.ValueError('publishing_data_proxy argument must be a callable.')
2519 if not isinstance(output_description, OutputCollectionDescription):
2520 raise exceptions.ValueError('output_description must be an instance of '
2521 'gmxapi.operation.OutputCollectionDescription')
2522 self._output_proxy = output_proxy
2523 self._output_description = output_description
2524 self._publishing_data_proxy = publishing_data_proxy
2526 def output_proxy(self) -> typing.Callable[[_OutputDataProxyType, SourceResource], _OutputDataProxyType]:
2527 return self._output_proxy
2529 def output_description(self) -> OutputCollectionDescription:
2530 return self._output_description
2532 def publishing_data_proxy(self) -> typing.Callable[[SourceResource, ClientID],
2533 _PublishingDataProxyType]:
2534 return self._publishing_data_proxy
2537 # TODO: Refactor in terms of gmx.abc.OperationDirector[_Op, gmx.operation.Context]
2538 # Normalizing this OperationDirector may require other updates to the function_wrapper facilities.
2539 class OperationDirector(object):
2540 """Direct the construction of an operation node in the gmxapi.operation module Context.
2542 Collaboration: used by OperationDetails.operation_director, which
2543 will likely dispatch to different implementations depending on
2544 requirements of work or context.
2549 operation_details: typing.Type[OperationDetailsBase],
2553 self.operation_details = operation_details
2554 self.context = weakref.proxy(context)
2556 self.kwargs = kwargs
2559 def __call__(self) -> AbstractOperation:
2560 builder = self.context.node_builder(operation=self.operation_details, label=self.label)
2562 builder.set_resource_factory(self.operation_details.resource_director)
2563 builder.set_input_description(self.operation_details)
2564 builder.set_handle(OperationHandle)
2566 operation_details = self.operation_details()
2567 node_input_factory = operation_details.signature().bind
2568 data_source_collection = node_input_factory(*self.args, **self.kwargs)
2569 for name, source in data_source_collection.items():
2570 builder.add_input(name, source)
2572 def runner_director(resources):
2574 operation_details(resources)
2577 builder.set_runner_director(runner_director)
2578 # Note that the call-backs held by OutputFactory cannot be annotated with
2579 # key word arguments under PEP 484, producing a weak warning in some cases.
2580 # We can consider more in the future how to balance call-back simplicity,
2581 # type annotation, and key word explicitness in helper functions like these.
2582 output_factory = OutputFactory(output_description=operation_details.output_description(),
2583 output_proxy=operation_details.output_data_proxy,
2584 publishing_data_proxy=operation_details.publishing_data_proxy)
2585 builder.set_output_factory(output_factory)
2587 handle = builder.build()
2591 def _make_datastore(output_description: OutputCollectionDescription, ensemble_width: int):
2592 """Create the data store for an operation with the described output.
2594 Create a container to hold the resources for an operation node.
2595 Used internally by the resource manager when setting up the node.
2596 Evolution of the C++ framework for creating the Operation SessionResources
2597 object will inform the future of this and the resource_director method, but
2598 this data store is how the Context manages output data sources for resources
2602 datastore = collections.OrderedDict()
2603 for name, dtype in output_description.items():
2604 assert isinstance(dtype, type)
2605 result_description = ResultDescription(dtype=dtype, width=ensemble_width)
2606 datastore[name] = OutputData(name=name, description=result_description)
2610 # TODO: For outputs, distinguish between "results" and "events".
2611 # Both are published to the resource manager in the same way, but the relationship
2612 # with subscribers is potentially different.
2613 def function_wrapper(output: dict = None):
2614 # Suppress warnings in the example code.
2615 # noinspection PyUnresolvedReferences
2616 """Generate a decorator for wrapped functions with signature manipulation.
2618 New function accepts the same arguments, with additional arguments required by
2621 The new function returns an object with an `output` attribute containing the named outputs.
2625 >>> @function_wrapper(output={'spam': str, 'foo': str})
2626 ... def myfunc(parameter: str = None, output=None):
2627 ... output.spam = parameter
2628 ... output.foo = parameter + ' ' + parameter
2630 >>> operation1 = myfunc(parameter='spam spam')
2631 >>> assert operation1.output.spam.result() == 'spam spam'
2632 >>> assert operation1.output.foo.result() == 'spam spam spam spam'
2634 If 'output' is provided to the wrapper, a data structure will be passed to
2635 the wrapped functions with the named attributes so that the function can easily
2636 publish multiple named results. Otherwise, the `output` of the generated operation
2637 will just capture the return value of the wrapped function.
2639 .. todo:: gmxapi typing stub file(s).
2640 The way this wrapper uses parameter annotations is not completely
2641 compatible with static type checking (PEP 484). If we decide to
2642 keep the convenience functionality by which operation details are
2643 inferred from parameter annotations, we should provide a separate
2644 stub file (.pyi) to support static type checking of the API.
2647 if output is not None and not isinstance(output, collections.abc.Mapping):
2648 raise exceptions.TypeError('If provided, `output` argument must be a mapping of data names to types.')
2650 # TODO: (FR5+) gmxapi operations need to allow a context-dependent way to generate an implementation with input.
2651 # This function wrapper reproduces the wrapped function's kwargs, but does not allow chaining a
2652 # dynamic `input` kwarg and does not dispatch according to a `context` kwarg. We should allow
2653 # a default implementation and registration of alternate implementations. We don't have to do that
2654 # with functools.singledispatch, but we could, if we add yet another layer to generate a wrapper
2655 # that takes the context as the first argument. (`singledispatch` inspects the first argument rather
2656 # that a named argument)
2658 # Implementation note: The closure of the current function is used to
2659 # dynamically define several classes that support the operation to be
2660 # created by the returned decorator.
2662 def decorator(function) -> typing.Callable:
2663 # Explicitly capture `function` and `output` references.
2664 provided_output_map = output
2666 # Note: Allow operations to be defined entirely in template headers to facilitate
2667 # compile-time optimization of fused operations. Consider what distinction, if any,
2668 # exists between a fused operation and a more basic operation. Probably it amounts
2669 # to aspects related to interaction with the Context that get combined in a fused
2670 # operation, such as the resource director, builder, etc.
2671 class OperationDetails(OperationDetailsBase):
2672 # Warning: function.__qualname__ is not rigorous since function may be in a local scope.
2673 # TODO: Improve base identifier.
2674 # Suggest registering directly in the Context instead of in this local class definition.
2675 __basename = '.'.join((str(function.__module__), function.__qualname__))
2677 _input_signature_description = InputCollectionDescription.from_function(function)
2678 # TODO: Separate the class and instance logic for the runner.
2679 # Logically, the runner is a detail of a context-specific implementation class,
2680 # though the output is not generally fully knowable until an instance is initialized
2681 # for a certain input fingerprint.
2682 # Note: We are almost at a point where this class can be subsumed into two
2683 # possible return types for wrapped_function_runner, acting as an operation helper.
2684 _runner = wrapped_function_runner(function, provided_output_map)
2685 _output_description = _runner.output_description
2686 _output_data_proxy_type = define_output_data_proxy(_output_description)
2687 _publishing_data_proxy_type = define_publishing_data_proxy(_output_description)
2688 _SourceResource = SourceResource[_output_data_proxy_type, _publishing_data_proxy_type]
2691 def name(cls) -> str:
2692 return cls.__basename.split('.')[-1]
2695 def namespace(cls) -> str:
2696 return cls.__basename.rstrip('.' + cls.name())
2699 def director(cls, context: _Context):
2700 return cls.operation_director
2703 def signature(cls) -> InputCollectionDescription:
2704 """Mapping of named inputs and input type.
2706 Used to determine valid inputs before an Operation node is created.
2708 Overrides OperationDetailsBase.signature() to provide an
2709 implementation for the bound operation.
2711 return cls._input_signature_description
2713 def output_description(self) -> OutputCollectionDescription:
2714 """Mapping of available outputs and types for an existing Operation node.
2716 Overrides OperationDetailsBase.output_description() to provide an
2717 implementation for the bound operation.
2719 return self._output_description
2721 def publishing_data_proxy(self, *,
2722 instance: _SourceResource,
2724 ) -> _publishing_data_proxy_type:
2725 """Factory for Operation output publishing resources.
2727 Used internally when the operation is run with resources provided by instance.
2729 Overrides OperationDetailsBase.publishing_data_proxy() to provide an
2730 implementation for the bound operation.
2732 assert isinstance(instance, ResourceManager)
2733 return self._publishing_data_proxy_type(instance=instance, client_id=client_id)
2735 def output_data_proxy(self, instance: _SourceResource) -> _output_data_proxy_type:
2736 assert isinstance(instance, ResourceManager)
2737 return self._output_data_proxy_type(instance=instance)
2739 def __call__(self, resources: PyFunctionRunnerResources):
2740 """Execute the operation with provided resources.
2742 Resources are prepared in an execution context with aid of resource_director()
2744 After the first call, output data has been published and is trivially
2745 available through the output_data_proxy()
2747 Overrides OperationDetailsBase.__call__().
2749 self._runner(resources)
2752 def make_uid(cls, input):
2753 """The unique identity of an operation node tags the output with respect to the input.
2755 Combines information on the Operation details and the input to uniquely
2756 identify the Operation node.
2759 input : A (collection of) data source(s) that can provide Fingerprints.
2761 Used internally by the Context to manage ownership of data sources, to
2762 locate resources for nodes in work graphs, and to manage serialization,
2763 deserialization, and checkpointing of the work graph.
2765 The UID is a detail of the generic Operation that _should_ be independent
2766 of the Context details to allow the framework to manage when and where
2767 an operation is executed.
2769 Design notes on further refinement:
2770 TODO: Operations should not single-handedly determine their own uniqueness
2771 (but they should participate in the determination with the Context).
2773 Context implementations should be allowed to optimize handling of
2774 equivalent operations in different sessions or work graphs, but we do not
2775 yet TODO: guarantee that UIDs are globally unique!
2777 The UID should uniquely indicate an operation node based on that node's input.
2778 We need input fingerprinting to identify equivalent nodes in a work graph
2779 or distinguish nodes across work graphs.
2782 uid = str(cls.__basename) + str(cls.__last_uid)
2787 def resource_director(cls, *, input=None,
2788 output: _publishing_data_proxy_type = None) -> PyFunctionRunnerResources:
2789 """a Director factory that helps build the Session Resources for the function.
2791 The Session launcher provides the director with all of the resources previously
2792 requested/negotiated/registered by the Operation. The director uses details of
2793 the operation to build the resources object required by the operation runner.
2795 For the Python Context, the protocol is for the Context to call the
2796 resource_director instance method, passing input and output containers.
2798 resources = PyFunctionRunnerResources()
2799 resources.update(input.kwargs)
2800 resources.update({'output': output})
2802 # TODO: Remove this hack when we can better handle Futures of Containers and Future slicing.
2803 for name in resources:
2804 if isinstance(resources[name], (list, tuple)):
2805 resources[name] = datamodel.ndarray(resources[name])
2807 # Check data compatibility
2808 for name, value in resources.items():
2809 if name != 'output':
2810 expected = cls.signature()[name]
2813 raise exceptions.TypeError('Expected {} but got {}.'.format(expected, got))
2816 # TODO: (FR4) Update annotations with gmxapi data types. E.g. return -> Future.
2817 @functools.wraps(function)
2818 def helper(*args, context=None, **kwargs):
2819 # Description of the Operation input (and output) occurs in the
2820 # decorator closure. By the time this factory is (dynamically) defined,
2821 # the OperationDetails and ResourceManager are well defined, but not
2823 # Inspection of the offered input occurs when this factory is called,
2824 # and OperationDetails, ResourceManager, and Operation are instantiated.
2826 # This operation factory is specialized for the default package Context.
2828 context = current_context()
2830 raise exceptions.ApiError('Non-default context handling not implemented.')
2832 # This calls a dispatching function that may not be able to reconcile the input
2833 # and Context capabilities. This is the place to handle various exceptions for
2834 # whatever reasons this reconciliation cannot occur.
2835 handle = OperationDetails.operation_director(*args, context=context, label=None, **kwargs)
2837 # TODO: NOW: The input fingerprint describes the provided input
2838 # as (a) ensemble input, (b) static, (c) future. By the time the
2839 # operation is instantiated, the topology of the node is known.
2840 # When compared to the InputCollectionDescription, the data compatibility
2841 # can be determined.
2845 # to do: The factory itself needs to be able to register a factory with
2846 # the context that will be responsible for the Operation handle.
2847 # The factories need to be able to serve as dispatchers for themselves,
2848 # since an operation in one context may need to be reconstituted in a
2849 # different context.
2850 # The dispatching factory produces a director for a Context,
2851 # which will register a factory with the operation in that context.
2853 # The factory function has a DirectorFactory. Director instances talk to a NodeBuilder for a Context to
2854 # get handles to new operation nodes managed by the context. Part of that process includes registering
2855 # a DirectorFactory with the Context.
2861 class GraphVariableDescriptor(object):
2862 def __init__(self, name: str = None, dtype=None, default=None):
2865 self.default = default
2869 def internal_name(self):
2871 return '_' + self.name
2875 def __get__(self, instance, owner):
2876 if instance is None:
2877 # Access is through the class attribute of the owning class.
2878 # Allows the descriptor itself to be inspected or reconfigured after
2880 # TODO: There is probably some usage checking that can be performed here.
2883 value = getattr(instance, self.internal_name)
2884 except AttributeError:
2885 value = self.default
2886 # Lazily initialize the instance value from the class default.
2887 if value is not None:
2889 setattr(instance, self.internal_name, value)
2890 except Exception as e:
2891 message = 'Could not assign default value to {} attribute of {}'.format(
2894 raise exceptions.ApiError(message) from e
2897 # Implementation note: If we have a version of the descriptor class with no `__set__` method,
2898 # it is a non-data descriptor that can be overridden by a data descriptor on an instance.
2899 # This could be one way to handle the polymorphism associated with state changes.
2900 def __set__(self, instance, value):
2901 if instance._editing:
2902 # Update the internal connections defining the subgraph.
2903 setattr(instance, self.internal_name, value)
2905 raise AttributeError('{} not assignable on {}'.format(self.name, instance))
2908 class GraphMeta(type):
2909 """Meta-class for gmxapi data flow graphs and subgraphs.
2911 Used to implement ``subgraph`` as GraphMeta.__new__(...).
2912 Also allows subgraphs to be defined as Python class definitions by inheriting
2913 from Subgraph or by using the ``metaclass=GraphMeta`` hint in the class
2914 statement arguments.
2916 The Python class creation protocol allows Subgraphs to be defined in as follows.
2918 See the Subgraph class documentation for customization of instances through
2919 the Python context manager protocol.
2921 _prepare_keywords = ('variables',)
2923 # TODO: Python 3.7.2 introduces typing.OrderedDict
2924 # In practice, we are using collections.OrderedDict, but we should use the generic
2925 # ABC from the typing module to avoid being overly restrictive with type hints.
2927 from typing import OrderedDict
2929 from collections import OrderedDict
2932 def __prepare__(mcs, name, bases, variables: OrderedDict = None, **kwargs):
2933 """Prepare the class namespace.
2936 variables: mapping of persistent graph variables to type / default value (optional)
2938 # Python runs this before executing the class body of Subgraph or its
2939 # subclasses. This is our chance to handle key word arguments given in the
2940 # class declaration.
2942 if kwargs is not None:
2943 for keyword in kwargs:
2944 raise exceptions.UsageError('Unexpected key word argument: {}'.format(keyword))
2946 namespace = collections.OrderedDict()
2948 if variables is not None:
2949 if isinstance(variables, collections.abc.Mapping):
2950 for name, value in variables.items():
2951 if isinstance(value, type):
2953 if hasattr(value, 'default'):
2954 default = value.default
2959 if hasattr(default, 'dtype'):
2960 dtype = default.dtype
2962 dtype = type(default)
2963 namespace[name] = GraphVariableDescriptor(name, default=default, dtype=dtype)
2964 # Note: we are not currently using the hook used by `inspect`
2965 # to annotate with the class that defined the attribute.
2966 # namespace[name].__objclass__ = mcs
2967 assert not hasattr(namespace[name], '__objclass__')
2969 raise exceptions.ValueError('"variables" must be a mapping of graph variables to types or defaults.')
2973 def __new__(cls, name, bases, namespace, **kwargs):
2975 if key not in GraphMeta._prepare_keywords:
2976 raise exceptions.ApiError('Unexpected class creation keyword: {}'.format(key))
2977 return type.__new__(cls, name, bases, namespace)
2979 # TODO: This is keyword argument stripping is not necessary in more recent Python versions.
2980 # When Python minimum required version is increased, check if we can remove this.
2981 def __init__(cls, name, bases, namespace, **kwargs):
2983 if key not in GraphMeta._prepare_keywords:
2984 raise exceptions.ApiError('Unexpected class initialization keyword: {}'.format(key))
2985 super().__init__(name, bases, namespace)
2988 class SubgraphNodeBuilder(NodeBuilder):
2991 context: 'SubgraphContext',
2993 label: typing.Optional[str] = None):
2994 super().__init__(context, operation, label)
2996 def add_input(self, name: str, source):
2997 """Add an input resource for the Node under construction.
2999 Extends NodeBuilder.add_input()
3002 # * Are they from outside the subgraph?
3003 # * Subgraph variables?
3004 # * Subgraph internal nodes?
3005 # Inputs from outside the subgraph are (provisionally) subgraph inputs.
3006 # Inputs that are subgraph variables or subgraph internal nodes mark operations that will need to be re-run.
3007 # For first implementation, let all operations be recreated, but we need to
3008 # manage the right input proxies.
3009 # For zeroeth implementation, try just tracking the entities that need a reset() method called.
3010 assert isinstance(self.context, SubgraphContext)
3011 if hasattr(source, 'reset'):
3012 self.context.add_resetter(source.reset)
3013 elif hasattr(source, '_reset'):
3014 self.context.add_resetter(source._reset)
3015 super().add_input(name, source)
3017 def build(self) -> OperationPlaceholder:
3018 """Get a reference to the internal node in the subgraph definition.
3020 In the SubgraphContext, these handles cannot represent uniquely identifiable
3021 results. They are placeholders for relative positions in graphs that are
3022 not defined until the subgraph is being executed.
3024 Such references should be tracked and invalidated when exiting the
3025 subgraph context. Within the subgraph context, they are used to establish
3026 the recipe for updating the subgraph's outputs or persistent data during
3029 # Placeholder handles in the subgraph definition don't have real resource managers.
3030 # Check for the ability to instantiate operations.
3031 handle = super().build()
3032 # handle = OperationPlaceholder()
3033 return typing.cast(OperationPlaceholder, handle)
3036 class SubgraphContext(Context):
3037 """Provide a Python context manager in which to set up a graph of operations.
3039 Allows operations to be configured without adding nodes to the global execution
3045 self.resetters = set()
3047 def node_builder(self, operation, label=None) -> NodeBuilder:
3048 if label is not None:
3049 if label in self.labels:
3050 raise exceptions.ValueError('Label {} is already in use.'.format(label))
3052 # The builder should update the labeled node when it is done.
3053 self.labels[label] = None
3055 return SubgraphNodeBuilder(context=weakref.proxy(self), operation=operation, label=label)
3057 def add_resetter(self, function):
3058 assert callable(function)
3059 self.resetters.add(function)
3062 class Subgraph(object, metaclass=GraphMeta):
3065 When subclassing from Subgraph, aspects of the subgraph's data ports can be
3066 specified with keyword arguments in the class statement. Example::
3068 >>> class MySubgraph(Subgraph, variables={'int_with_default': 1, 'boolData': bool}): pass
3071 The key word *variables* is used in the class declaration to map the types
3072 of subgraph Variables with (optional) default values.
3075 Subgraph execution must follow a well-defined protocol in order to sensibly
3076 resolve data Futures at predictable points. Note that subgraphs act as operations
3077 that can be automatically redefined at run time (in limited cases), such as
3078 to automatically form iteration chains to implement "while" loops. We refer
3079 to one copy / iteration / generation of a subgraph as an "element" below.
3081 When a subgraph element begins execution, each of its variables with an
3082 "updated" state from the previous iteration has the "updated" state moved
3083 to the new element's "initial" state, and the "updated" state is voided.
3085 Subgraph.next() appends an element of the subgraph to the chain. Subsequent
3086 calls to Subgraph.run() bring the new outputs up to date (and then call next()).
3087 Thus, for a subgraph with an output called ``is_converged``, calling
3088 ``while (not subgraph.is_converged): subgraph.run()`` has the desired effect,
3089 but is likely suboptimal for execution. Instead use the gmxapi while_loop.
3091 If the subgraph is not currently executing, it can be in one of two states:
3092 "editing" or "ready". In the "ready" state, class and instance Variables
3093 cannot be assigned, but can be read through the data descriptors. In this
3094 state, the descriptors only have a single state.
3096 If "editing," variables on the class object can be assigned to update the
3097 data flow defined for the subgraph. While "editing", reading or writing
3098 instance Variables is undefined behavior.
3100 When "editing" begins, each variable is readable as a proxy to the "initial"
3101 state in an element. Assignments while "editing" put variables in temporary
3102 states accessible only during editing.
3103 When "editing" finishes, the "updated" output data source of each
3104 variable for the element is set, if appropriate.
3106 # TODO: Use a get_context to allow operation factories or accessors to mark
3107 # references for update/annotation when exiting the 'with' block.
3111 class SubgraphBuilder(object):
3112 """Helper for defining new Subgraphs.
3114 Manages a Python context in which to define a new Subgraph type. Can be used
3115 in a Python ``with`` construct exactly once to provide the Subgraph class body.
3116 When the ``with`` block is exited (or ``build()`` is called explicitly), a
3117 new type instance becomes available. Subsequent calls to SubgraphBuilder.__call__(self, ...)
3118 are dispatched to the Subgraph constructor.
3120 Outside of the ``with`` block, read access to data members is proxied to
3121 descriptors on the built Subgraph.
3123 Instances of SubgraphBuilder are returned by the ``subgraph()`` utility function.
3126 def __init__(self, variables):
3127 self.__dict__.update({'variables': variables,
3128 '_staging': collections.OrderedDict(),
3130 '_subgraph_context': None,
3131 '_subgraph_instance': None,
3132 '_fused_operation': None,
3134 # Return a placeholder that we can update during iteration.
3135 # Long term, this is probably implemented with data descriptors
3136 # that will be moved to a new Subgraph type object.
3137 for name in self.variables:
3138 if not isinstance(self.variables[name], Future):
3139 self.variables[name] = gmx.make_constant(self.variables[name])
3141 # class MySubgraph(Subgraph, variables=variables):
3144 # self._subgraph_instance = MySubgraph()
3146 def __getattr__(self, item):
3148 if item in self.variables:
3149 if item in self._staging:
3150 logger.debug('Read access to intermediate value of subgraph variable {}'.format(item))
3151 return self._staging[item]
3153 logger.debug('Read access to subgraph variable {}'.format(item))
3154 return self.variables[item]
3156 raise AttributeError('Invalid attribute: {}'.format(item))
3158 # TODO: this is not quite the described interface...
3159 return lambda obj: obj.values[item]
3161 def __setattr__(self, key, value):
3162 """Part of the builder interface."""
3163 if key in self.__dict__:
3164 self.__dict__[key] = value
3167 self.add_update(key, value)
3169 raise exceptions.UsageError('Subgraph is not in an editable state.')
3171 def add_update(self, key, value):
3172 """Add a variable update to the internal subgraph."""
3173 if key not in self.variables:
3174 raise AttributeError('No such attribute: {}'.format(key))
3175 if not self._editing:
3176 raise exceptions.UsageError('Subgraph is not in an editable state.')
3177 # Else, stage the potential final value for the iteration.
3178 logger.debug('Staging subgraph update {} = {}'.format(key, value))
3179 # Return a placeholder that we can update during iteration.
3180 # Long term, this is probably implemented with data descriptors
3181 # that will be moved to a new Subgraph type object.
3182 if not isinstance(value, Future):
3183 value = gmx.make_constant(value)
3184 self._staging[key] = value
3185 self._staging.move_to_end(key)
3187 def __enter__(self):
3188 """Enter a Context managed by the subgraph to capture operation additions.
3190 Allows the internal data flow of the subgraph to be defined in the same
3191 manner as the default work graph while the Python context manager is active.
3193 The subgraph Context is activated when entering a ``with`` block and
3194 finalized at the end of the block.
3196 # While editing the subgraph in the SubgraphContext, we need __get__ and __set__
3197 # data descriptor access on the Subgraph subclass type, but outside of the
3198 # context manager, the descriptor should be non-writable and more opaque,
3199 # while the instance should be readable, but not writable.
3200 self.__dict__['_editing'] = True
3201 # TODO: this probably needs to be configured with variables...
3202 self.__dict__['_subgraph_context'] = SubgraphContext()
3203 push_context(self._subgraph_context)
3207 """Build the subgraph by defining some new operations.
3209 Examine the subgraph variables. Variables with handles to data sources
3210 in the SubgraphContext represent work that needs to be executed on
3211 a subgraph execution or iteration. Variables with handles to data sources
3212 outside of the subgraph represent work that needs to be executed once
3213 on only the first iteration to initialize the subgraph.
3215 Construct a factory for the fused operation that performs the work to
3216 update the variables on a single iteration.
3218 Construct a factory for the fused operation that performs the work to
3219 update the variables on subsequent iterations and which will be fed
3220 the outputs of a previous iteration. Both of the generated operations
3221 have the same output signature.
3223 Construct and wrap the generator function to recursively append work
3224 to the graph and update until condition is satisfied.
3226 TODO: Explore how to drop work from the graph once there are no more
3227 references to its output, including check-point machinery.
3229 logger.debug('Finalizing subgraph definition.')
3230 inputs = collections.OrderedDict()
3231 for key, value in self.variables.items():
3232 # TODO: What if we don't want to provide default values?
3235 updates = self._staging
3237 class Subgraph(object):
3238 def __init__(self, input_futures, update_sources):
3239 self.values = collections.OrderedDict([(key, value.result()) for key, value in input_futures.items()])
3240 logger.debug('subgraph initialized with {}'.format(
3241 ', '.join(['{}: {}'.format(key, value) for key, value in self.values.items()])))
3242 self.futures = collections.OrderedDict([(key, value) for key, value in input_futures.items()])
3243 self.update_sources = collections.OrderedDict([(key, value) for key, value in update_sources.items()])
3244 logger.debug('Subgraph updates staged:')
3245 for update, source in self.update_sources.items():
3246 logger.debug(' {} = {}'.format(update, source))
3249 for name in self.update_sources:
3250 result = self.update_sources[name].result()
3251 logger.debug('Update: {} = {}'.format(name, result))
3252 self.values[name] = result
3253 # Replace the data sources in the futures.
3254 for name in self.update_sources:
3255 self.futures[name].resource_manager = gmx.make_constant(self.values[name]).resource_manager
3256 for name in self.update_sources:
3257 self.update_sources[name]._reset()
3259 subgraph = Subgraph(inputs, updates)
3261 return lambda subgraph=subgraph: subgraph
3263 def __exit__(self, exc_type, exc_val, exc_tb):
3264 """End the Subgraph editing session and finalize the Subgraph build.
3266 After exiting, this instance forwards __call__() to a factory for an
3267 operation that carries out the work in the subgraph with inputs bound
3268 in the current context as defined by ``variables``.
3270 self._factory = self.build()
3272 context = pop_context()
3273 assert context is self._subgraph_context
3274 self.__dict__['_editing'] = False
3275 # Returning False causes exceptions in the `with` block to be reraised.
3276 # Remember to switch this to return True if we want to transform or suppress
3277 # such an exception (we probably do).
3278 if exc_type is not None:
3279 logger.error('Got exception {} while editing subgraph {}.'.format(exc_val, self))
3280 logger.debug('Subgraph exception traceback: \n{}'.format(exc_tb))
3284 # TODO: After build() has been called, this should dispatch to a factory
3285 # that returns an OperationHandle.
3286 return self._factory()
3289 def while_loop(*, operation, condition, max_iteration=10):
3290 """Generate and run a chain of operations such that condition evaluates True.
3292 Returns and operation instance that acts like a single node in the current
3293 work graph, but which is a proxy to the operation at the end of a dynamically generated chain
3294 of operations. At run time, condition is evaluated for the last element in
3295 the current chain. If condition evaluates False, the chain is extended and
3296 the next element is executed. When condition evaluates True, the object
3297 returned by ``while_loop`` becomes a proxy for the last element in the chain.
3299 Equivalent to calling operation.while(condition), where available.
3302 operation: a callable that produces an instance of an operation when called with no arguments.
3303 condition: a callable that accepts an object (returned by ``operation``) that returns a boolean.
3304 max_iteration: execute the loop no more than this many times (default 10)
3307 *max_iteration* is provided in part to minimize the cost of bugs in early
3308 versions of this software. The default value may be changed or
3309 removed on short notice.
3312 The protocol by which ``while_loop`` interacts with ``operation`` and ``condition``
3313 is very unstable right now. Please refer to this documentation when installing new
3314 versions of the package.
3318 This protocol will be changed before the 0.1 API is finalized.
3320 When called, ``while_loop`` calls ``operation`` without arguments
3321 and captures the return value captured as ``_operation``.
3322 The object produced by ``operation()`` must have a ``reset``,
3323 a ``run`` method, and an ``output`` attribute.
3326 to determine the output data proxy for the operation produced by the call
3327 to ``while_loop``. When that operation is called, it does the equivalent of
3329 while(condition(self._operation)):
3330 self._operation.reset()
3331 self._operation.run()
3333 Then, the output data proxy of ``self`` is updated with the results from
3334 self._operation.output.
3337 # In the first implementation, Subgraph is NOT and OperationHandle.
3338 # if not isinstance(obj, AbstractOperationHandle):
3339 # raise exceptions.UsageError(
3340 # '"operation" key word argument must be a callable that produces an Operation handle.')
3342 # for name, descriptor in obj.output.items():
3343 # outputs[name] = descriptor._dtype
3345 # 1. Get the initial inputs.
3346 # 2. Initialize the subgraph with the initial inputs.
3347 # 3. Run the subgraph.
3348 # 4. Get the outputs.
3349 # 5. Initialize the subgraph with the outputs.
3350 # 6. Go to 3 if condition is not met.
3353 assert hasattr(obj, 'values')
3354 outputs = collections.OrderedDict([(key, type(value)) for key, value in obj.values.items()])
3356 @function_wrapper(output=outputs)
3357 def run_loop(output: OutputCollectionDescription):
3360 logger.debug('Created object {}'.format(obj))
3361 logger.debug(', '.join(['{}: {}'.format(key, obj.values[key]) for key in obj.values]))
3362 logger.debug('Condition: {}'.format(condition(obj)))
3363 while (condition(obj)):
3364 logger.debug('Running iteration {}'.format(iteration))
3367 ', '.join(['{}: {}'.format(key, obj.values[key]) for key in obj.values]))
3368 logger.debug('Condition: {}'.format(condition(obj)))
3370 if iteration > max_iteration:
3372 for name in outputs:
3373 setattr(output, name, obj.values[name])
3380 def subgraph(variables=None):
3381 """Allow operations to be configured in a sub-context.
3383 The object returned functions as a Python context manager. When entering the
3384 context manager (the beginning of the ``with`` block), the object has an
3385 attribute for each of the named ``variables``. Reading from these variables
3386 gets a proxy for the initial value or its update from a previous loop iteration.
3387 At the end of the ``with`` block, any values or data flows assigned to these
3388 attributes become the output for an iteration.
3390 After leaving the ``with`` block, the variables are no longer assignable, but
3391 can be called as bound methods to get the current value of a variable.
3393 When the object is run, operations bound to the variables are ``reset`` and
3394 run to update the variables.
3396 # Implementation note:
3397 # A Subgraph (type) has a subgraph context associated with it. The subgraph's
3398 # ability to capture operation additions is implemented in terms of the
3400 logger.debug('Declare a new subgraph with variables {}'.format(variables))
3402 return SubgraphBuilder(variables)
3406 def join_arrays(*, front: datamodel.NDArray = (), back: datamodel.NDArray = ()) -> datamodel.NDArray:
3407 """Operation that consumes two sequences and produces a concatenated single sequence.
3409 Note that the exact signature of the operation is not determined until this
3410 helper is called. Helper functions may dispatch to factories for different
3411 operations based on the inputs. In this case, the dtype and shape of the
3412 inputs determines dtype and shape of the output. An operation instance must
3413 have strongly typed output, but the input must be strongly typed on an
3414 object definition so that a Context can make runtime decisions about
3415 dispatching work and data before instantiating.
3416 # TODO: elaborate and clarify.
3417 # TODO: check type and shape.
3418 # TODO: figure out a better annotation.
3420 # TODO: (FR4) Returned list should be an NDArray.
3421 if isinstance(front, (str, bytes)) or isinstance(back, (str, bytes)):
3422 raise exceptions.ValueError('Input must be a pair of lists.')
3423 assert isinstance(front, datamodel.NDArray)
3424 assert isinstance(back, datamodel.NDArray)
3425 new_list = list(front._values)
3426 new_list.extend(back._values)
3427 return datamodel.NDArray(new_list)
3431 Scalar = typing.TypeVar('Scalar')
3434 def concatenate_lists(sublists: list = ()) -> _Future[gmx.datamodel.NDArray]:
3435 """Combine data sources into a single list.
3437 A trivial data flow restructuring operation.
3439 if isinstance(sublists, (str, bytes)):
3440 raise exceptions.ValueError('Input must be a list of lists.')
3441 if len(sublists) == 0:
3442 return datamodel.ndarray([])
3444 # TODO: Fix the data model so that this can type-check properly.
3445 return join_arrays(front=sublists[0],
3446 back=typing.cast(datamodel.NDArray,
3447 concatenate_lists(sublists[1:])))
3450 def make_constant(value: Scalar) -> _Future:
3451 """Provide a predetermined value at run time.
3453 This is a trivial operation that provides a (typed) value, primarily for
3454 internally use to manage gmxapi data flow.
3456 Accepts a value of any type. The object returned has a definite type and
3457 provides same interface as other gmxapi outputs. Additional constraints or
3458 guarantees on data type may appear in future versions.
3461 source = StaticSourceManager(name='data', proxied_data=value, width=1, function=lambda x: x)
3462 description = ResultDescription(dtype=dtype, width=1)
3463 future = Future(source, 'data', description=description)
3467 def logical_not(value: bool) -> _Future:
3468 """Boolean negation.
3470 If the argument is a gmxapi compatible Data or Future object, a new View or
3471 Future is created that proxies the boolean opposite of the input.
3473 If the argument is a callable, logical_not returns a wrapper function that
3474 returns a Future for the logical opposite of the callable's result.
3476 # TODO: Small data transformations like this don't need to be formal Operations.
3477 # This could be essentially a data annotation that affects the resolver in a
3478 # DataEdge. As an API detail, coding for different Contexts and optimizations
3479 # within those Context implementations could be simplified.
3480 operation = function_wrapper(output={'data': bool})(lambda data=bool(): not bool(data))
3481 return operation(data=value).output.data