from gmxapi import logger as root_logger
from gmxapi.abc import OperationImplementation, MutableResource, Node
from gmxapi.typing import _Context, ResultTypeVar, SourceTypeVar, valid_result_types, valid_source_types
+from gmxapi.typing import Future as _Future
# Initialize module-level logger
logger = root_logger.getChild('operation')
Data sources may be any of the basic gmxapi data types, gmxapi Futures
of those types, or gmxapi ensemble data bundles of the above.
-
- Note that the checking and conditioning could be moved to one or more
- creation functions. In conjunction with an InputCollectionDescription,
- such creation functions could make decisions about automatically shaping
- the data flow topology or making conversions of data type or shape.
"""
- named_data = []
+ super(DataSourceCollection, self).__init__()
for name, value in kwargs.items():
- if not isinstance(name, str):
- raise exceptions.TypeError('Data must be named with str type.')
- # TODO: Encapsulate handling of proferred data sources to Context details.
- # Preprocessed input should be self-describing gmxapi data types. Structured
- # input must be recursively (depth-first) converted to gmxapi data types.
- # TODO: Handle gmxapi Futures stored as dictionary elements!
- if not isinstance(value, valid_source_types):
- if isinstance(value, collections.abc.Iterable):
- # Iterables here are treated as arrays, but we do not have a robust typing system.
- # Warning: In the initial implementation, the iterable may contain Future objects.
- # TODO: (#2993) Revisit as we sort out data shape and Future protocol.
- value = datamodel.ndarray(value)
- elif hasattr(value, 'result'):
- # A Future object.
- pass
- else:
- raise exceptions.ApiError('Cannot process data source {}'.format(value))
- named_data.append((name, value))
- super().__init__(named_data)
+ self[name] = value
def __setitem__(self, key: str, value: SourceTypeVar) -> None:
if not isinstance(key, str):
raise exceptions.TypeError('Data must be named with str type.')
+ # TODO: Encapsulate handling of proferred data sources to Context details.
+ # Preprocessed input should be self-describing gmxapi data types. Structured
+ # input must be recursively (depth-first) converted to gmxapi data types.
+ # TODO: Handle gmxapi Futures stored as dictionary elements!
if not isinstance(value, valid_source_types):
if isinstance(value, collections.abc.Iterable):
# Iterables here are treated as arrays, but we do not have a robust typing system.
# Warning: In the initial implementation, the iterable may contain Future objects.
# TODO: (#2993) Revisit as we sort out data shape and Future protocol.
- value = datamodel.ndarray(value)
+ try:
+ value = datamodel.ndarray(value)
+ except (exceptions.ValueError, exceptions.TypeError) as e:
+ raise exceptions.TypeError('Iterable could not be converted to NDArray: {}'.format(value)) from e
elif hasattr(value, 'result'):
# A Future object.
pass
# """Recreate the Operation at the consuming end of the DataEdge."""
-class Future(typing.Generic[ResultTypeVar]):
+class Future(_Future):
"""gmxapi data handle.
Future is currently more than a Future right now. (should be corrected / clarified.)
TODO: extract the abstract class for input inspection?
Currently abstraction is handled through SourceResource subclassing.
+
+ Attributes:
+ description (ResultDescription): Describes the result to be obtained from this Future.
+
"""
def __init__(self, resource_manager: SourceResource, name: str, description: ResultDescription):
self.name = name
if not isinstance(description, ResultDescription):
raise exceptions.ValueError('Need description of requested data.')
+ self.description = description # type: ResultDescription
self.resource_manager = resource_manager
- self.description = description
# Deprecated. We should not "reset" futures, but reconstitute them, but we
# need to move the data model to a subscription-based system so that we can
datastore = collections.OrderedDict()
for name, dtype in output_description.items():
assert isinstance(dtype, type)
- result_description = ResultDescription(dtype, width=ensemble_width)
+ result_description = ResultDescription(dtype=dtype, width=ensemble_width)
datastore[name] = OutputData(name=name, description=result_description)
return datastore
Scalar = typing.TypeVar('Scalar')
-def concatenate_lists(sublists: list = ()) -> Future[gmx.datamodel.NDArray]:
+def concatenate_lists(sublists: list = ()) -> _Future[gmx.datamodel.NDArray]:
"""Combine data sources into a single list.
A trivial data flow restructuring operation.
concatenate_lists(sublists[1:])))
-def make_constant(value: Scalar) -> Future:
+def make_constant(value: Scalar) -> _Future:
"""Provide a predetermined value at run time.
This is a trivial operation that provides a (typed) value, primarily for
return future
-def logical_not(value: bool) -> Future:
+def logical_not(value: bool) -> _Future:
"""Boolean negation.
If the argument is a gmxapi compatible Data or Future object, a new View or
Extends the specifications of the abstract base classes in gmxapi.abc.
"""
-from typing import TypeVar, Generic, NewType, Type, Callable
+from typing import TypeVar, Generic, NewType, Type
-import gmxapi as gmx
+import gmxapi.abc
# Use SourceTypeVar and ResultTypeVar for static type hints, annotations, and as a parameter to generics.
# Use valid_source_types and valid_result_types for run-time type checking.
-ResultTypeVar = TypeVar('ResultTypeVar', *(str, bool, int, float, dict, gmx.abc.NDArray))
+ResultTypeVar = TypeVar('ResultTypeVar', *(str, bool, int, float, dict, gmxapi.abc.NDArray))
valid_result_types = ResultTypeVar.__constraints__
SourceTypeVar = TypeVar('SourceTypeVar',
- *(str, bool, int, float, dict, gmx.abc.NDArray, gmx.abc.EnsembleDataSource))
+ *(str, bool, int, float, dict, gmxapi.abc.NDArray, gmxapi.abc.EnsembleDataSource))
valid_source_types = SourceTypeVar.__constraints__
# Place holder for type annotations of Context objects.
# TODO: Expand to support some static type checking.
-_Context = TypeVar('_Context', bound=gmx.abc.Context)
+_Context = TypeVar('_Context', bound=gmxapi.abc.Context)
# Type variable that binds to subclasses of the forward referenced OperationImplentation ABC.
-_Op = TypeVar('_Op', bound=gmx.abc.OperationImplementation)
+_Op = TypeVar('_Op', bound=gmxapi.abc.OperationImplementation)
# Note: We could make many types generic in SourceContext and TargetContext and
# compose functionality into instances of a helpful base class with the option of
...
-class OperationReference(gmx.abc.OperationReference, Generic[_Op]):
+class OperationReference(gmxapi.abc.OperationReference, Generic[_Op]):
"""Object with an OperationReference interface.
Generic version of AbstractOperationReference, parameterized by operation
"""
-class OperationImplementation(Generic[_Op], gmx.abc.OperationImplementation):
+class OperationImplementation(Generic[_Op], gmxapi.abc.OperationImplementation):
"""Essential interface of an Operation implementation.
Describe the essential features of an Operation that can be registered with