5 from __future__ import absolute_import
6 from __future__ import division
7 from __future__ import print_function
8 from __future__ import unicode_literals
17 from gmx import exceptions
18 from gmx import logging
19 from gmx import status
20 import gmx.core as gmxapi
24 logger = logging.getLogger(__name__)
25 logger.info('Importing gmx.context')
28 def _load_tpr(self, element):
29 """Implement the gromacs.load_tpr operation.
31 Updates the minimum width of the workflow parallelism. Does not add any API object to the graph.
34 self: The Context in which this operation is being loaded.
35 element: WorkElement specifying the operation.
38 A Director that the Context can use in launching the Session.
40 class Builder(object):
41 def __init__(self, tpr_list):
42 logger.debug("Loading tpr builder for tpr_list {}".format(tpr_list))
43 self.tpr_list = tpr_list
45 self.width = len(tpr_list)
47 def add_subscriber(self, builder):
48 builder.infile = self.tpr_list
49 self.subscribers.append(builder)
52 width = len(self.tpr_list)
53 for builder in self.subscribers:
55 if 'width' in dag.graph:
56 width = max(width, dag.graph['width'])
57 dag.graph['width'] = width
59 return Builder(element.params['input'])
61 def _md(context, element):
62 """Implement the gmxapi.md operation by returning a builder that can populate a data flow graph for the element.
64 Inspects dependencies to set up the simulation runner.
66 The graph node created will have `launch` and `run` attributes with function references, and a `width`
67 attribute declaring the workflow parallelism requirement.
70 context: The Context in which this operation is being loaded.
71 element: WorkElement specifying the operation.
74 A Director that the Context can use in launching the Session.
76 class Builder(object):
77 """Translate md work element to a node in the session's DAG."""
78 def __init__(self, element):
80 self.name = element.name
81 # Note that currently the calling code is in charge of subscribing this builder to its dependencies.
82 # A list of tpr files will be set when the calling code subscribes this builder to a tpr provider.
84 # Other dependencies in the element may register potentials when subscribed to.
87 self.runtime_params = element.params
88 except AttributeError:
89 raise exceptions.ValueError("object provided does not seem to be a WorkElement.")
90 def add_subscriber(self, builder):
91 """The md operation does not yet have any subscribeable facilities."""
94 """Add a node to the graph that, when launched, will construct a simulation runner.
96 Complete the definition of appropriate graph edges for dependencies.
98 The launch() method of the added node creates the runner from the tpr file for the current rank and adds
99 modules from the incoming edges.
101 if not (hasattr(dag, 'add_node')
102 and hasattr(dag, 'add_edge')
103 and hasattr(dag, 'graph')
104 and hasattr(dag, 'nodes')):
105 raise gmx.exceptions.TypeError("dag argument does not have a DiGraph interface.")
108 for neighbor in self.input_nodes:
109 dag.add_edge(neighbor, name)
111 assert not infile is None
112 potential_list = self.potential
113 assert dag.graph['width'] >= len(infile)
115 # Provide closure with which to execute tasks for this node.
116 def launch(rank=None):
117 assert not rank is None
119 # Copy and update, if required by `end_time` parameter.
121 if 'end_time' in self.runtime_params:
122 # Note that mkstemp returns a file descriptor as the first part of the tuple.
123 # We can make this cleaner in 0.0.7 with a separate node that manages the
125 _, temp_filename = tempfile.mkstemp(suffix='.tpr')
126 logger.debug('Updating input. Using temp file {}'.format(temp_filename))
127 gmxapi.copy_tprfile(source=infile[rank],
128 destination=temp_filename,
129 end_time=self.runtime_params['end_time'])
130 tpr_file = temp_filename
132 tpr_file = infile[rank]
134 logger.info('Loading TPR file: {}'.format(tpr_file))
135 system = gmxapi.from_tpr(tpr_file)
136 dag.nodes[name]['system'] = system
137 mdargs = gmxapi.MDArgs()
138 mdargs.set(self.runtime_params)
139 # Workaround to give access to plugin potentials used in a context.
140 pycontext = element.workspec._context
141 pycontext.potentials = potential_list
142 context = pycontext._api_object
143 context.setMDArgs(mdargs)
144 for potential in potential_list:
145 context.add_mdmodule(potential)
146 dag.nodes[name]['session'] = system.launch(context)
147 dag.nodes[name]['close'] = dag.nodes[name]['session'].close
149 if 'end_time' in self.runtime_params:
151 dag.nodes[name]['session'].close()
152 logger.debug("Unlinking temporary TPR file {}.".format(temp_filename))
153 os.unlink(temp_filename)
154 dag.nodes[name]['close'] = special_close
156 dag.nodes[name]['close'] = dag.nodes[name]['session'].close
159 """Currently we only support a single call to run."""
161 raise StopIteration()
162 # Replace the runner with a stop condition for subsequent passes.
163 dag.nodes[name]['run'] = done
164 return dag.nodes[name]['session'].run()
165 dag.nodes[name]['run'] = runner
166 return dag.nodes[name]['run']
168 dag.nodes[name]['launch'] = launch
170 return Builder(element)
173 def _get_mpi_ensemble_communicator(session_communicator, ensemble_size):
174 """Get an ensemble communicator from an MPI communicator.
176 An ensemble communicator is an object that implements mpi4py.MPI.Comm methods
177 as described elsewhere in this documentation.
179 :param session_communicator: MPI communicator with the interface described by mpi4py.MPI.Comm
180 :param ensemble_size: number of ensemble members
181 :return: communicator of described size on participating ranks and null communicator on any others.
183 Must be called exactly once in every process in `communicator`. It is the
184 responsibility of the calling code to refrain from running ensemble operations
185 if not part of the ensemble. The calling code determines this by comparing its
186 session_communicator.Get_rank() to ensemble_size. This is not a good solution
187 because it assumes there is a single ensemble communicator and that ensemble
188 work is allocated to ranks serially from session_rank 0. Future work might
189 use process groups associated with specific operations in the work graph so
190 that processes can check for membership in a group to decide whether to use
191 their ensemble communicator. Another possibility would be to return None
192 rather than a null communicator in processes that aren't participating in
195 from mpi4py import MPI
197 session_size = session_communicator.Get_size()
198 session_rank = session_communicator.Get_rank()
200 # Check the ensemble "width" against the available parallelism
201 if ensemble_size > session_size:
202 msg = 'ParallelArrayContext requires a work array that fits in the MPI communicator: '
203 msg += 'array width {} > size {}.'
204 msg = msg.format(ensemble_size, session_size)
205 raise exceptions.UsageError(msg)
206 if ensemble_size < session_size:
207 msg = 'MPI context is wider than necessary to run this work: array width {} vs. size {}.'
208 warnings.warn(msg.format(ensemble_size, session_size))
210 # Create an appropriate sub-communicator for the present work. Extra ranks will be in a
211 # sub-communicator with no work.
212 if session_rank < ensemble_size:
213 # The session launcher should maintain an inventory of the ensembles and
214 # provide an appropriate tag, but right now we just have a sort of
215 # Boolean: ensemble or not.
218 color = MPI.UNDEFINED
220 ensemble_communicator = session_communicator.Split(color, session_rank)
222 ensemble_communicator_size = ensemble_communicator.Get_size()
223 ensemble_communicator_rank = ensemble_communicator.Get_rank()
225 warnings.warn("Possible API programming error: ensemble_communicator does not provide required methods...")
226 ensemble_communicator_size = 0
227 ensemble_communicator_rank = None
228 logger.info("Session rank {} assigned to rank {} of subcommunicator {} of size {}".format(
230 ensemble_communicator_rank,
231 ensemble_communicator,
232 ensemble_communicator_size
235 # There isn't a good reason to worry about special handling for a null communicator,
236 # which we have to explicitly avoid "free"ing, so let's just get rid of it.
237 # To do: don't even get the null communicator in the first place. Use a group and create instead of split.
238 if ensemble_communicator == MPI.COMM_NULL:
239 ensemble_communicator = None
241 return ensemble_communicator
244 def _acquire_communicator(communicator=None):
245 """Get a workflow level communicator for the session.
247 This function is intended to be called by the __enter__ method that creates
248 a session get a communicator instance. The `Free` method of the returned
249 instance must be called exactly once. This should be performed by the
250 corresponding __exit__ method.
253 communicator : a communicator to duplicate (optional)
256 A communicator that must be explicitly freed by the caller.
258 Currently only supports MPI multi-simulation parallelism dependent on
259 mpi4py. The mpi4py package should be installed and built with compilers
260 that are compatible with the gmxapi installation.
262 If provided, `communicator` must provide the mpi4py.MPI.Comm interface.
263 Returns either a duplicate of `communicator` or of MPI_COMM_WORLD if mpi4py
264 is available. Otherwise, returns a mock communicator that can only manage
265 sessions and ensembles of size 0 or 1.
267 gmx behavior is undefined if launched with mpiexec and without mpi4py
270 class MockSessionCommunicator(object):
287 return 'MockSessionCommunicator()'
289 if communicator is None:
291 import mpi4py.MPI as MPI
292 communicator = MPI.COMM_WORLD
294 logger.info("mpi4py is not available for default session communication.")
295 communicator = MockSessionCommunicator()
297 communicator = communicator
300 new_communicator = communicator.Dup()
301 except Exception as e:
302 message = "Exception when duplicating communicator: {}".format(e)
303 raise exceptions.ApiError(message)
305 return new_communicator
308 def _get_ensemble_communicator(communicator, ensemble_size):
309 """Provide ensemble_communicator feature in active_context, if possible.
311 Must be called on all ranks in `communicator`. The communicator returned
312 must be freed by a call to its `Free()` instance method. This function is
313 best used in a context manager's `__enter__()` method so that the
314 corresponding `context.Free()` can be called in the `__exit__` method.
317 communicator : session communicator for the session with the ensemble.
318 ensemble_size : ensemble size of the requested ensemble communicator
320 The ensemble_communicator feature should be present if the Context can
321 provide communication between all ensemble members. The Context should
322 determine this at the launch of the session and set the
323 ``_session_ensemble_communicator`` attribute to provide an object that
324 implements the same interface as an mpi4py.MPI.Comm object. Actually, this is
325 a temporary shim, so the only methods that need to be available are `Get_size`,
326 `Get_rank` and something that can be called as
327 Allreduce(send, recv) where send and recv are objects providing the Python
330 Currently, only one ensemble can be managed in a session.
332 ensemble_communicator = None
334 class TrivialEnsembleCommunicator(object):
342 def Allreduce(self, send, recv):
343 logger.debug("Faking an Allreduce for ensemble of size 1.")
344 send_buffer = self._numpy.array(send, copy=False)
345 recv_buffer = self._numpy.array(recv, copy=False)
346 recv_buffer[:] = send_buffer[:]
354 # For trivial cases, don't bother trying to use MPI
355 # Note: all ranks in communicator must agree on the size of the work!
356 # Note: If running with a Mock session communicator in an MPI session (user error)
357 # every rank will think it is the only rank and will try to perform the
359 if communicator.Get_size() <= 1 or ensemble_size <= 1:
360 message = "Getting TrivialEnsembleCommunicator for ensemble of size {}".format((ensemble_size))
361 message += " for session rank {} in session communicator of size {}".format(
362 communicator.Get_rank(),
363 communicator.Get_size())
364 logger.debug(message)
365 ensemble_communicator = TrivialEnsembleCommunicator()
367 message = "Getting an MPI subcommunicator for ensemble of size {}".format(ensemble_size)
368 message += " for session rank {} in session communicator of size {}".format(
369 communicator.Get_rank(),
370 communicator.Get_size())
371 logger.debug(message)
372 ensemble_communicator = _get_mpi_ensemble_communicator(communicator, ensemble_size)
374 return ensemble_communicator
376 def _get_ensemble_update(context):
377 """Set up a simple ensemble resource.
379 The context should call this function once per session to get an `ensemble_update`
382 This is a draft of a Context feature that may not be available in all
383 Context implementations. This factory function can be wrapped as a
384 ``ensemble_update`` "property" in a Context instance method to produce a Python function
385 with the signature ``update(context, send, recv, tag=None)``.
387 This feature requires that the Context is capabable of providing the
388 ensemble_communicator feature and the numpy feature.
389 If both are available, the function object provided by
390 ``ensemble_update`` provides
391 the ensemble reduce operation used by the restraint potential plugin in the
392 gmxapi sample_restraint repository. Otherwise, the provided function object
393 will log an error and then raise an exception.
395 gmxapi 0.0.5 and 0.0.6 MD plugin clients look for a member function named
396 ``ensemble_update`` in the Context that launched them. In the future,
397 clients will use session resources to access ensemble reduce operations.
398 In the mean time, a transitional implementation can involve defining a
399 ``ensemble_update`` property in the Context object that acts as a factory
400 function to produce the reducing operation, if possible with the given
406 message = "ensemble_update requires numpy, but numpy is not available."
407 logger.error(message)
408 raise exceptions.FeatureNotAvailableError(message)
410 def _ensemble_update(active_context, send, recv, tag=None):
411 assert not tag is None
412 assert str(tag) != ''
413 if not tag in active_context.part:
414 active_context.part[tag] = 0
415 logger.debug("Performing ensemble update.")
416 active_context._session_ensemble_communicator.Allreduce(send, recv)
417 buffer = numpy.array(recv, copy=False)
418 buffer /= active_context.work_width
419 suffix = '_{}.npz'.format(tag)
420 # These will end up in the working directory and each ensemble member will have one
421 filename = str("rank{}part{:04d}{}".format(active_context.rank, int(active_context.part[tag]), suffix))
422 numpy.savez(filename, recv=recv)
423 active_context.part[tag] += 1
425 def _no_ensemble_update(active_context, send, recv, tag=None):
426 message = "Attempt to call ensemble_update() in a Context that does not provide the operation."
427 # If we confirm effective exception handling, remove the extraneous log.
428 logger.error(message)
429 raise exceptions.FeatureNotAvailableError(message)
431 if context._session_ensemble_communicator is not None:
432 functor = _ensemble_update
434 functor = _no_ensemble_update
439 class _libgromacsContext(object):
440 """Low level API to libgromacs library context provides Python context manager.
442 Binds to a workflow and manages computation resources.
445 workflow (:obj:`gmx.workflow.WorkSpec`): bound workflow to be executed.
448 >>> with _libgromacsContext(my_workflow) as session: # doctest: +SKIP
451 Things are still fluid, but what we might do is have all of the WorkSpec operations that are supported
452 by a given Context to correspond to member functions in the Context, a SessionBuilder, or Session. In
453 any case, the operations allow a Context implementation to transform a work specification into a
454 directed acyclic graph of schedulable work.
456 # The Context is the appropriate entity to own or mediate access to an appropriate logging facility,
457 # but right now we are using the module-level Python logger.
458 # Reference https://github.com/kassonlab/gmxapi/issues/135
459 def __init__(self, workflow=None):
460 """Create new context bound to the provided workflow, if any.
463 workflow (gmx.workflow.WorkSpec) work specification object to bind.
467 self.__workflow = workflow
471 return self.__workflow
474 def workflow(self, workflow):
475 """Before accepting a workflow, the context must check whether it can interpret the work specification."""
477 self.__workflow = workflow
480 def check_workspec(cls, workspec, raises=False):
481 """Check the validity of the work specification in this Context.
484 workspec: work specification to check
485 raises: Boolean (default False)
487 If raises == True, raises exceptions for problems found in the work specification.
490 True if workspec is processable in this Context, else False.
492 from gmx.workflow import workspec_version, get_source_elements, WorkElement
493 # initialize return value.
495 # Check compatibility
496 if workspec.version != workspec_version:
499 raise exceptions.ApiError('Incompatible workspec version.')
500 # Check that Elements are uniquely identifiable.
502 for element in workspec.elements:
503 if element.name is not None and element.name not in elements:
504 elements[element.name] = element
508 raise exceptions.ApiError('WorkSpec must contain uniquely named elements.')
509 # Check that the specification is complete. There must be at least one source element and all
510 # dependencies must be fulfilled.
511 sources = set([element.name for element in get_source_elements(workspec)])
515 raise exceptions.ApiError('WorkSpec must contain at least one source element')
520 """Implement Python context manager protocol.
523 runnable session object.
525 if self._session is not None:
526 raise exceptions.Error('Already running.')
527 # The API runner currently has an implicit context.
529 # launch() with no arguments is deprecated.
530 # Ref: https://github.com/kassonlab/gmxapi/issues/124
531 self._session = self.workflow.launch()
537 def __exit__(self, exception_type, exception_value, traceback):
538 """Implement Python context manager protocol.
540 Closing a session should not produce Python exceptions. Instead, exit
541 state is accessible through API objects like Status.
542 For evolving design points, see
543 - https://github.com/kassonlab/gmxapi/issues/41
544 - https://github.com/kassonlab/gmxapi/issues/121
546 self._session.close()
551 class DefaultContext(_libgromacsContext):
552 """ Produce an appropriate context for the work and compute environment.
555 Use gmx.context.get_context() to find an appropriate high-level API
556 context. For lower-level access to the library that does not employ the
557 full API Context abstraction, but instead explicitly uses a local
558 libgromacs instance, a replacement still needs to be devised. It may
559 have this same interface, but the name and scoping of DefaultContext is
562 def __init__(self, work):
563 # There is very little context abstraction at this point...
564 warnings.warn("Behavior of DefaultContext is unspecified starting in gmxapi 0.0.8.", DeprecationWarning)
565 super(DefaultContext, self).__init__(work)
567 class Context(object):
568 """Manage an array of simulation work executing in parallel.
570 This is the first implementation of a new style of Context class that has some extra abstraction
571 and uses the new WorkSpec idea.
573 Additional facilities are available to elements of the array members.
575 * array element corresponding to work in the current sub-context
576 * "global" resources managed by the ParallelArrayContext
579 work :obj:`gmx.workflow.WorkSpec`: specification of work to be performed when a session is launched.
580 rank : numerical index of the current worker in a running session (None if not running)
581 work_width : Minimum width needed for the parallelism required by the array of work being executed.
582 elements : dictionary of references to elements of the workflow.
584 `rank`, `work_width`, and `elements` are empty or None until the work is processed, as during session launch.
587 Use ``mpiexec -n 2 python -m mpi4py myscript.py`` to run two jobs at the same time.
588 In this example the jobs are identical. In myscript.py:
592 >>> from gmx.data import tpr_filename # Get a test tpr filename
593 >>> work = gmx.workflow.from_tpr([tpr_filename, tpr_filename])
600 >>> from gmx.data import tpr_filename # Get a test tpr filename
601 >>> work = gmx.workflow.from_tpr([tpr_filename, tpr_filename])
602 >>> context = gmx.context.get_context(work)
603 >>> with context as session:
605 ... # The session is one abstraction too low to know what rank it is. It lets the spawning context manage
607 ... # rank = session.rank
608 ... # The local context object knows where it fits in the global array.
609 ... rank = context.rank
610 ... output_path = os.path.join(context.workdir_list[rank], 'traj.trr')
611 ... assert(os.path.exists(output_path))
612 ... print('Worker {} produced {}'.format(rank, output_path))
614 Implementation notes:
616 To produce a running session, the Context __enter__() method is called, according to the Python context manager
617 protocol. At this time, the attached WorkSpec must be feasible on the available resources. To turn the specified
618 work into an executable directed acyclic graph (DAG), handle objects for the elements in the work spec are sequenced
619 in dependency-compatible order and the context creates a "builder" for each according to the element's operation.
620 Each builder is subscribed to the builders of its dependency elements. The DAG is then assembled by calling each
621 builder in sequence. A builder can add zero, one, or more nodes and edges to the DAG.
623 The Session is then launched from the DAG. What happens next is implementation-dependent, and it may take a while for
624 us to decide whether and how to standardize interfaces for the DAG nodes and edges and/or execution protocols. I
625 expect each node will at least have a `launch()` method, but will also probably have typed input and output ports as well as some signalling.
626 A sophisticated and abstract Session implementation could schedule work only to satisfy data dependencies of requested
627 output upon request. Our immediate implementation will use the following protocol.
629 Each node has a `launch()` method. When the session is entered, the `launch()` method is called for each node in
630 dependency order. The launch method returns either a callable (`run()` function) or None, raising an exception in
631 case of an error. The sequence of non-None callables is stored by the Session. When Session.run() is called, the
632 sequence of callables is called in order. If StopIteration is raised by the callable, it is removed from the sequence.
633 The sequence is processed repeatedly until there are no more callables.
635 Note that this does not rigorously handle races or deadlocks, or flexibility in automatically chasing dependencies. A more
636 thorough implementation could recursively call launch on dependencies (launch could be idempotent or involve some
637 signalling to dependents when complete), run calls could be entirely event driven, and/or nodes could "publish"
638 output (including just a completion message), blocking for acknowledgement before looking for the next set of subscribed inputs.
641 def __init__(self, work=None, workdir_list=None, communicator=None):
642 """Create manager for computing resources.
644 Does not initialize resources because Python objects by themselves do
645 not have a good way to deinitialize resources. Instead, resources are
646 initialized using the Python context manager protocol when sessions are
649 Appropriate computing resources need to be knowable when the Context is created.
652 work : work specification with which to initialize this context
653 workdir_list : deprecated
654 communicator : non-owning reference to a multiprocessing communicator
656 If provided, communicator must implement the mpi4py.MPI.Comm interface. The
657 Context will use this communicator as the parent for subcommunicators
658 used when launching sessions. If provided, communicator is owned by the
659 caller, and must be freed by the caller after any sessions are closed.
660 By default, the Context will get a reference to MPI_COMM_WORLD, which
661 will be freed when the Python process ends and cleans up its resources.
662 The communicator stored by the Context instance will not be used directly,
663 but will be duplicated when launching sessions using ``with``.
666 # self.__context_array = list([Context(work_element) for work_element in work])
667 from gmx.workflow import WorkSpec
669 # Until better Session abstraction exists at the Python level, a
670 # _session_communicator attribute will be added to and removed from the
671 # context at session entry and exit. If necessary, a _session_ensemble_communicator
672 # will be split from _session_communicator for simulation ensembles
673 # present in the specified work.
674 self.__communicator = communicator
676 self.__work = WorkSpec()
677 self.__workdir_list = workdir_list
680 # This may not belong here. Is it confusing for the Context to have both global and local properties?
681 # Alternatively, maybe a trivial `property` that gets the rank from a bound session, if any.
684 # `work_width` notes the required width of an array of synchronous tasks to perform the specified work.
685 # As work elements are processed, self.work_width will be increased as appropriate.
686 self.work_width = None
688 # initialize the operations map. May be extended during the lifetime of a Context.
689 # Note that there may be a difference between built-in operations provided by this module and
690 # additional operations registered at run time.
691 self.__operations = dict()
692 # The map contains a builder for each operation. The builder is created by passing the element to the function
693 # in the map. The object returned must have the following methods:
695 # * add_subscriber(another_builder) : allow other builders to subscribe to this one.
696 # * build(dag) : Fulfill the builder responsibilities by adding an arbitrary number of nodes and edges to a Graph.
698 # The gmxapi namespace of operations should be consistent with a specified universal set of functionalities
699 self.__operations['gmxapi'] = {'md': lambda element : _md(self, element),
700 # 'global_data' : shared_data_maker,
702 # Even if TPR file loading were to become a common and stable enough operation to be specified in
703 # and API, it is unlikely to be implemented by any code outside of GROMACS, so let's not clutter
704 # a potentially more universal namespace.
705 self.__operations['gromacs'] = {'load_tpr': lambda element : _load_tpr(self, element),
708 # Right now we are treating workspec elements and work DAG nodes as equivalent, but they are
709 # emphatically not intended to be tightly coupled. The work specification is intended to be
710 # simple, user-friendly, general, and easy-to-implement. The DAG is an implementation detail
711 # and may differ across context types. It is likely to have stronger typing of nodes and/or
712 # edges. It is not yet specified whether we should translate the work into a graph before, after,
713 # or during processing of the elements, so it is not yet known whether we will need facilities
714 # to allow cross-referencing between the two graph-type structures. If we instantiate API objects
715 # as we process work elements, and the DAG in a context deviates from the work specification
716 # topology, we would need to use named dependencies to look up objects to bind to. Such facilities
717 # could be hidden in the WorkElement class(es), too, to delegate code away from the Context as a
718 # container class growing without bounds...
719 # In actuality, we will have to process the entire workspec to some degree to make sure we can
720 # run it on the available resources.
723 # This setter must be called after the operations map has been populated.
726 self._api_object = gmxapi.Context()
733 def work(self, work):
734 """Set `work` attribute.
737 gmx.exceptions.ApiError: work is not compatible with schema or
739 gmx.exceptions.UsageError: Context can not access operations in
740 the name space given for an Element
741 gmx.exceptions.ValueError: assignment operation cannot be performed
742 for the provided object (rhs)
744 For discussion on error handling, see https://github.com/kassonlab/gmxapi/issues/125
746 from gmx.workflow import WorkSpec, WorkElement
750 if isinstance(work, WorkSpec):
752 elif hasattr(work, 'workspec') and isinstance(work.workspec, WorkSpec):
753 workspec = work.workspec
755 raise exceptions.ValueError('work argument must provide a gmx.workflow.WorkSpec.')
756 workspec._context = self
758 # Make sure this context knows how to run the specified work.
759 for e in workspec.elements:
760 element = WorkElement.deserialize(workspec.elements[e])
762 # Note: Non-built-in namespaces (non-native) are treated as modules to import.
763 # Native namespaces may not be completely implemented in a particular version of a particular Context.
764 if element.namespace in {'gmxapi', 'gromacs'}:
765 assert element.namespace in self.__operations
766 if not element.operation in self.__operations[element.namespace]:
767 # The requested element is a built-in operation but not available in this Context.
768 # element.namespace should be mapped, but not all operations are necessarily implemented.
769 logger.error("Operation {} not found in map {}".format(element.operation,
770 str(self.__operations)))
771 # This check should be performed when deciding if the context is appropriate for the work.
772 # If we are just going to use a try/catch block for this test, then we should differentiate
773 # this exception from those raised due to incorrect usage.
774 # The exception thrown here may evolve with https://github.com/kassonlab/gmxapi/issues/125
775 raise exceptions.FeatureNotAvailableError(
776 'Specified work cannot be performed due to unimplemented operation {}.{}.'.format(
781 assert element.namespace not in {'gmxapi', 'gromacs'}
783 # Don't leave an empty nested dictionary if we couldn't map the operation.
784 if element.namespace in self.__operations:
785 namespace_map = self.__operations[element.namespace]
787 namespace_map = dict()
789 # Set or update namespace map iff we have something new.
790 if element.operation not in namespace_map:
792 element_module = importlib.import_module(element.namespace)
793 element_operation = getattr(element_module, element.operation)
794 except ImportError as e:
795 raise exceptions.UsageError(
796 'Cannot find implementation for namespace {}. ImportError: {}'.format(
799 except AttributeError:
800 raise exceptions.UsageError(
801 'Cannot find factory for operation {}.{}'.format(
806 namespace_map[element.operation] = element_operation
807 self.__operations[element.namespace] = namespace_map
809 self.__work = workspec
811 def add_operation(self, namespace, operation, get_builder):
812 """Add a builder factory to the operation map.
814 Extends the known operations of the Context by mapping an operation in a namespace to a function
815 that returns a builder to process a work element referencing the operation. Must be called before
816 the work specification is added, since the spec is inspected to confirm that the Context can run it.
818 It may be more appropriate to add this functionality to the Context constructor or as auxiliary
819 information in the workspec, or to remove it entirely; it is straight-forward to just add snippets
820 of code to additional files in the working directory and to make them available as modules for the
825 >>> # Import some custom extension code.
827 >>> myelement = myplugin.new_element()
828 >>> workspec = gmx.workflow.WorkSpec()
829 >>> workspec.add_element(myelement)
830 >>> context = gmx.context.ParallelArrayContext()
831 >>> context.add_operation(myelement.namespace, myelement.operation, myplugin.element_translator)
832 >>> context.work = workspec
833 >>> with get_context() as session:
837 if namespace not in self.__operations:
838 if namespace in {'gmxapi', 'gromacs'}:
839 raise exceptions.UsageError("Cannot add operations to built-in namespaces.")
841 self.__operations[namespace] = dict()
843 assert namespace in self.__operations
845 if operation in self.__operations[namespace]:
846 raise exceptions.UsageError("Operation {}.{} already defined in this context.".format(namespace, operation))
848 self.__operations[namespace][operation] = get_builder
850 # Set up a simple ensemble resource
851 # This should be implemented for Session, not Context, and use an appropriate subcommunicator
852 # that is created and freed as the Session launches and exits.
853 def ensemble_update(self, send, recv, tag=None):
854 """Implement the ensemble_update member function that gmxapi through 0.0.6 expects.
857 # gmxapi through 0.0.6 expects to bind to this member function during "build".
858 # This behavior needs to be deprecated (bind during launch, instead), but this
859 # dispatching function should be an effective placeholder.
860 if tag is None or str(tag) == '':
861 raise exceptions.ApiError("ensemble_update must be called with a name tag.")
862 # __ensemble_update is an attribute, not an instance function, so we need to explicitly pass 'self'
863 return self.__ensemble_update(self, send, recv, tag)
866 """Implement Python context manager protocol, producing a Session for the specified work in this Context.
869 Session object that can be run and/or inspected.
871 Additional API operations are possible while the Session is active. When used as a Python context manager,
872 the Context will close the Session at the end of the `with` block by calling `__exit__`.
874 Note: this is probably where we will have to process the work specification to determine whether we
875 have appropriate resources (such as sufficiently wide parallelism). Until we have a better Session
876 abstraction, this means the clean approach should take two passes to first build a DAG and then
877 instantiate objects to perform the work. In the first implementation, we kind of muddle things into
881 import networkx as nx
882 from networkx import DiGraph as _Graph
884 raise exceptions.FeatureNotAvailableError("gmx requires the networkx package to execute work graphs.")
886 # Cache the working directory from which we were launched so that __exit__() can give us proper context
887 # management behavior.
888 self.__initial_cwd = os.getcwd()
889 logger.debug("Launching session from {}".format(self.__initial_cwd))
891 if self._session is not None:
892 raise exceptions.Error('Already running.')
893 if self.work is None:
894 raise exceptions.UsageError('No work to perform!')
896 # Set up the global and local context.
897 # Check the global MPI configuration
898 # Since the Context doesn't have a destructor, if we use an MPI communicator at this scope then
899 # it has to be owned and managed outside of Context.
900 self._session_communicator = _acquire_communicator(self.__communicator)
901 context_comm_size = self._session_communicator.Get_size()
902 context_rank = self._session_communicator.Get_rank()
903 self.rank = context_rank
904 # self._communicator = communicator
905 logger.debug("Context rank {} in context {} of size {}".format(context_rank,
906 self._session_communicator,
910 # Process the work specification.
912 logger.debug("Processing workspec:\n{}".format(str(self.work)))
914 # Get a builder for DAG components for each element
916 builder_sequence = []
917 for element in self.work:
918 # dispatch builders for operation implementations
920 new_builder = self.__operations[element.namespace][element.operation](element)
921 assert hasattr(new_builder, 'add_subscriber')
922 assert hasattr(new_builder, 'build')
924 logger.info("Collected builder for {}".format(element.name))
925 except LookupError as e:
926 request = '.'.join([element.namespace, element.operation])
927 message = 'Could not find an implementation for the specified operation: {}. '.format(request)
929 raise exceptions.ApiError(message)
930 # Subscribing builders is the Context's responsibility because otherwise the builders
931 # don't know about each other. Builders should not depend on the Context unless they
932 # are a facility provided by the Context, in which case they may be member functions
933 # of the Context. We will probably need to pass at least some
934 # of the Session to the `launch()` method, though...
935 dependencies = element.depends
936 for dependency in dependencies:
937 # If a dependency is a list, assume it is an "ensemble" of dependencies
938 # and pick the element for corresponding to the local rank.
939 if isinstance(dependency, (list, tuple)):
940 assert len(dependency) > context_rank
941 name = str(dependency[context_rank])
944 logger.info("Subscribing {} to {}.".format(element.name, name))
945 builders[name].add_subscriber(new_builder)
946 builders[element.name] = new_builder
947 builder_sequence.append(element.name)
949 # Call the builders in dependency order
950 # Note: session_communicator is available, but ensemble_communicator has not been created yet.
951 graph = _Graph(width=1)
952 logger.info("Building sequence {}".format(builder_sequence))
953 for name in builder_sequence:
954 builder = builders[name]
955 logger.info("Building {}".format(builder))
956 logger.debug("Has build attribute {}.".format(builder.build))
958 self.work_width = graph.graph['width']
960 # Prepare working directories. This should probably be moved to some aspect of the Session and either
961 # removed from here or made more explicit to the user.
962 workdir_list = self.__workdir_list
963 if workdir_list is None:
964 workdir_list = [os.path.join('.', str(i)) for i in range(self.work_width)]
965 self.__workdir_list = list([os.path.abspath(dir) for dir in workdir_list])
967 # For gmxapi 0.0.6, all ranks have a session_ensemble_communicator
968 self._session_ensemble_communicator = _get_ensemble_communicator(self._session_communicator, self.work_width)
969 self.__ensemble_update = _get_ensemble_update(self)
971 # launch() is currently a method of gmx.core.MDSystem and returns a gmxapi::Session.
972 # MDSystem objects are obtained from gmx.core.from_tpr(). They also provide add_potential().
973 # gmxapi::Session objects are exposed as gmx.core.MDSession and provide run() and close() methods.
975 # Here, I want to find the input appropriate for this rank and get an MDSession for it.
976 # E.g. Make a pass that allows meta-objects to bind (setting md_proxy._input_tpr and md_proxy._plugins,
977 # and then call a routine implemented by each object to run whatever protocol it needs, such
978 # as `system = gmx.core.from_tpr(md._input_tpr); system.add_potential(md._plugins)
979 # For future design plans, reference https://github.com/kassonlab/gmxapi/issues/65
981 # This `if` condition is currently the thing that ultimately determines whether the
982 # rank attempts to do work.
983 if context_rank < self.work_width:
985 logger.debug(("Launching graph {}.".format(graph.graph)))
986 logger.debug("Graph nodes: {}".format(str(list(graph.nodes))))
987 logger.debug("Graph edges: {}".format(str(list(graph.edges))))
989 logger.info("Launching work on context rank {}, subcommunicator rank {}.".format(
991 self._session_ensemble_communicator.Get_rank()))
993 # Launch the work for this rank
994 self.workdir = self.__workdir_list[self.rank]
995 if os.path.exists(self.workdir):
996 if not os.path.isdir(self.workdir):
997 raise exceptions.FileError('{} is not a valid working directory.'.format(self.workdir))
999 os.mkdir(self.workdir)
1000 os.chdir(self.workdir)
1001 logger.info('rank {} changed directory to {}'.format(self.rank, self.workdir))
1002 sorted_nodes = nx.topological_sort(graph)
1005 for name in sorted_nodes:
1006 launcher = graph.nodes[name]['launch']
1007 runner = launcher(self.rank)
1008 if not runner is None:
1009 runners.append(runner)
1010 closers.append(graph.nodes[name]['close'])
1012 # Get a session object to return. It must simply provide a `run()` function.
1013 context = self # Part of workaround for bug gmxapi-214
1014 class Session(object):
1015 def __init__(self, runners, closers):
1016 self.runners = list(runners)
1017 self.closers = list(closers)
1020 # Note we are not following the documented protocol of running repeatedly yet.
1022 for i, runner in enumerate(self.runners):
1025 except StopIteration:
1026 to_be_deleted.insert(0, i)
1027 for i in to_be_deleted:
1032 for close in self.closers:
1033 logger.debug("Closing node: {}".format(close))
1035 # Workaround for bug gmxapi-214
1036 if not gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'):
1037 context._api_object = gmxapi.Context()
1040 self._session = Session(runners, closers)
1042 logger.info("Context rank {} has no work to do".format(self.rank))
1044 context = self # Part of workaround for bug gmxapi-214
1045 class NullSession(object):
1047 logger.info("Running null session on rank {}.".format(self.rank))
1048 return status.Status()
1050 logger.info("Closing null session.")
1051 # Workaround for bug gmxapi-214
1052 if not gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'):
1053 context._api_object = gmxapi.Context()
1056 self._session = NullSession()
1057 self._session.rank = self.rank
1059 # Make sure session has started on all ranks before continuing?
1061 self._session.graph = graph
1062 return self._session
1064 def __exit__(self, exception_type, exception_value, traceback):
1065 """Implement Python context manager protocol."""
1066 logger.info("Exiting session on context rank {}.".format(self.rank))
1067 if self._session is not None:
1068 logger.info("Calling session.close().")
1069 self._session.close()
1070 self._session = None
1072 # Note: we should not have a None session but rather an API-compliant Session that just has no work.
1073 # Reference: https://github.com/kassonlab/gmxapi/issues/41
1074 logger.info("No _session known to context or session already closed.")
1075 if hasattr(self, '_session_ensemble_communicator'):
1076 if self._session_communicator is not None:
1077 logger.info("Freeing sub-communicator {} on rank {}".format(
1078 self._session_ensemble_communicator,
1080 self._session_ensemble_communicator.Free()
1082 logger.debug('"None" ensemble communicator does not need to be "Free"d.')
1083 del self._session_ensemble_communicator
1085 logger.debug("No ensemble subcommunicator on context rank {}.".format(self.rank))
1087 logger.debug("Freeing session communicator.")
1088 self._session_communicator.Free()
1089 logger.debug("Deleting session communicator reference.")
1090 del self._session_communicator
1092 os.chdir(self.__initial_cwd)
1093 logger.info("Session closed on context rank {}.".format(self.rank))
1094 # Note: Since sessions running in different processes can have different work, sessions have not necessarily
1095 # ended on all ranks. As a result, starting another session on the same resources could block until the
1096 # resources are available.
1098 # Python context managers return False when there were no exceptions to handle.
1102 # The interface and functionality of ParallelArrayContext is the new generic
1103 # Context behavior, but we need to keep the old name for compatibility for
1105 ParallelArrayContext = Context
1107 def get_context(work=None):
1108 """Get a concrete Context object.
1111 work (gmx.workflow.WorkSpec): runnable work as a valid gmx.workflow.WorkSpec object
1114 An object implementing the :py:class:`gmx.context.Context` interface, if possible.
1117 gmx.exceptions.ValueError if an appropriate context for ``work`` could not be loaded.
1119 If work is provided, return a Context object capable of running the provided work or produce an error.
1121 The semantics for finding Context implementations needs more consideration, and a more informative exception
1124 A Context can run the provided work if
1126 * the Context supports can resolve all operations specified in the elements
1127 * the Context supports DAG topologies implied by the network of dependencies
1128 * the Context supports features required by the elements with the specified parameters,
1129 such as synchronous array jobs.
1132 # We need to define an interface for WorkSpec objects so that we don't need
1133 # to rely on typing and inter-module dependencies.
1134 from gmx import workflow
1136 if work is not None:
1137 if isinstance(work, workflow.WorkSpec):
1139 elif hasattr(work, 'workspec') and isinstance(work.workspec,
1141 workspec = work.workspec
1143 raise exceptions.ValueError('work argument must provide a gmx.workflow.WorkSpec.')
1144 if workspec is not None and \
1145 hasattr(workspec, '_context') and \
1146 workspec._context is not None:
1147 context = workspec._context
1149 context = Context(work=workspec)