2 # This file is part of the GROMACS molecular simulation package.
4 # Copyright (c) 2019, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 # Lesser General Public License for more details.
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
35 # This file is based on the Kasson Lab gmxapi project release 0.0.7.4.
36 # https://github.com/kassonlab/gmxapi/blob/v0.0.7.4/src/gmx/context.py
37 # https://github.com/kassonlab/gmxapi/blob/v0.0.7.4/LICENSE
50 from gmxapi import exceptions
51 from gmxapi import logger as root_logger
52 import gmxapi._gmxapi as _gmxapi
55 logger = root_logger.getChild('context')
56 logger.info('Importing {}'.format(__name__))
59 def _load_tpr(self, element):
60 """Implement the gromacs.load_tpr operation.
62 Updates the minimum width of the workflow parallelism. Does not add any API object to the graph.
65 self: The Context in which this operation is being loaded.
66 element: WorkElement specifying the operation.
69 A Director that the Context can use in launching the Session.
71 class Builder(object):
72 def __init__(self, tpr_list):
73 logger.debug("Loading tpr builder for tpr_list {}".format(tpr_list))
74 self.tpr_list = tpr_list
76 self.width = len(tpr_list)
78 def add_subscriber(self, builder):
79 builder.infile = self.tpr_list
80 self.subscribers.append(builder)
83 width = len(self.tpr_list)
84 for builder in self.subscribers:
86 if 'width' in dag.graph:
87 width = max(width, dag.graph['width'])
88 dag.graph['width'] = width
90 return Builder(element.params['input'])
92 def _md(context, element):
93 """Implement the gmxapi.md operation by returning a builder that can populate a data flow graph for the element.
95 Inspects dependencies to set up the simulation runner.
97 The graph node created will have `launch` and `run` attributes with function references, and a `width`
98 attribute declaring the workflow parallelism requirement.
101 context: The Context in which this operation is being loaded.
102 element: WorkElement specifying the operation.
105 A Director that the Context can use in launching the Session.
107 class Builder(object):
108 """Translate md work element to a node in the session's DAG."""
109 def __init__(self, element):
111 self.name = element.name
112 # Note that currently the calling code is in charge of subscribing this builder to its dependencies.
113 # A list of tpr files will be set when the calling code subscribes this builder to a tpr provider.
115 # Other dependencies in the element may register potentials when subscribed to.
117 self.input_nodes = []
118 self.runtime_params = element.params
119 except AttributeError:
120 raise exceptions.ValueError("object provided does not seem to be a WorkElement.")
121 def add_subscriber(self, builder):
122 """The md operation does not yet have any subscribeable facilities."""
124 def build(self, dag):
125 """Add a node to the graph that, when launched, will construct a simulation runner.
127 Complete the definition of appropriate graph edges for dependencies.
129 The launch() method of the added node creates the runner from the tpr file for the current rank and adds
130 modules from the incoming edges.
132 if not (hasattr(dag, 'add_node')
133 and hasattr(dag, 'add_edge')
134 and hasattr(dag, 'graph')
135 and hasattr(dag, 'nodes')):
136 raise exceptions.ProtocolError("Argument 'dag' must provide the DiGraph interface.")
139 for neighbor in self.input_nodes:
140 dag.add_edge(neighbor, name)
142 assert not infile is None
143 potential_list = self.potential
144 assert dag.graph['width'] >= len(infile)
146 # Provide closure with which to execute tasks for this node.
147 def launch(rank=None):
148 assert not rank is None
150 # Copy and update, if required by `end_time` parameter.
152 if 'end_time' in self.runtime_params:
153 # Note that mkstemp returns a file descriptor as the first part of the tuple.
154 # We can make this cleaner in 0.0.7 with a separate node that manages the
156 _, temp_filename = tempfile.mkstemp(suffix='.tpr')
157 logger.debug('Updating input. Using temp file {}'.format(temp_filename))
158 _gmxapi.rewrite_tprfile(source=infile[rank],
159 destination=temp_filename,
160 end_time=self.runtime_params['end_time'])
161 tpr_file = temp_filename
163 tpr_file = infile[rank]
165 logger.info('Loading TPR file: {}'.format(tpr_file))
166 system = _gmxapi.from_tpr(tpr_file)
167 dag.nodes[name]['system'] = system
168 mdargs = _gmxapi.MDArgs()
169 mdargs.set(self.runtime_params)
170 # Workaround to give access to plugin potentials used in a context.
171 pycontext = element.workspec._context
172 pycontext.potentials = potential_list
173 context = pycontext._api_object
174 context.setMDArgs(mdargs)
175 for potential in potential_list:
176 context.add_mdmodule(potential)
177 dag.nodes[name]['session'] = system.launch(context)
178 dag.nodes[name]['close'] = dag.nodes[name]['session'].close
180 if 'end_time' in self.runtime_params:
182 dag.nodes[name]['session'].close()
183 logger.debug("Unlinking temporary TPR file {}.".format(temp_filename))
184 os.unlink(temp_filename)
185 dag.nodes[name]['close'] = special_close
187 dag.nodes[name]['close'] = dag.nodes[name]['session'].close
190 """Currently we only support a single call to run."""
192 raise StopIteration()
193 # Replace the runner with a stop condition for subsequent passes.
194 dag.nodes[name]['run'] = done
195 return dag.nodes[name]['session'].run()
196 dag.nodes[name]['run'] = runner
197 return dag.nodes[name]['run']
199 dag.nodes[name]['launch'] = launch
201 return Builder(element)
204 def _get_mpi_ensemble_communicator(session_communicator, ensemble_size):
205 """Get an ensemble communicator from an MPI communicator.
207 An ensemble communicator is an object that implements mpi4py.MPI.Comm methods
208 as described elsewhere in this documentation.
210 :param session_communicator: MPI communicator with the interface described by mpi4py.MPI.Comm
211 :param ensemble_size: number of ensemble members
212 :return: communicator of described size on participating ranks and null communicator on any others.
214 Must be called exactly once in every process in `communicator`. It is the
215 responsibility of the calling code to refrain from running ensemble operations
216 if not part of the ensemble. The calling code determines this by comparing its
217 session_communicator.Get_rank() to ensemble_size. This is not a good solution
218 because it assumes there is a single ensemble communicator and that ensemble
219 work is allocated to ranks serially from session_rank 0. Future work might
220 use process groups associated with specific operations in the work graph so
221 that processes can check for membership in a group to decide whether to use
222 their ensemble communicator. Another possibility would be to return None
223 rather than a null communicator in processes that aren't participating in
226 from mpi4py import MPI
228 session_size = session_communicator.Get_size()
229 session_rank = session_communicator.Get_rank()
231 # Check the ensemble "width" against the available parallelism
232 if ensemble_size > session_size:
233 msg = 'ParallelArrayContext requires a work array that fits in the MPI communicator: '
234 msg += 'array width {} > size {}.'
235 msg = msg.format(ensemble_size, session_size)
236 raise exceptions.UsageError(msg)
237 if ensemble_size < session_size:
238 msg = 'MPI context is wider than necessary to run this work: array width {} vs. size {}.'
239 warnings.warn(msg.format(ensemble_size, session_size))
241 # Create an appropriate sub-communicator for the present work. Extra ranks will be in a
242 # sub-communicator with no work.
243 if session_rank < ensemble_size:
244 # The session launcher should maintain an inventory of the ensembles and
245 # provide an appropriate tag, but right now we just have a sort of
246 # Boolean: ensemble or not.
249 color = MPI.UNDEFINED
251 ensemble_communicator = session_communicator.Split(color, session_rank)
253 ensemble_communicator_size = ensemble_communicator.Get_size()
254 ensemble_communicator_rank = ensemble_communicator.Get_rank()
256 warnings.warn("Possible API programming error: ensemble_communicator does not provide required methods...")
257 ensemble_communicator_size = 0
258 ensemble_communicator_rank = None
259 logger.info("Session rank {} assigned to rank {} of subcommunicator {} of size {}".format(
261 ensemble_communicator_rank,
262 ensemble_communicator,
263 ensemble_communicator_size
266 # There isn't a good reason to worry about special handling for a null communicator,
267 # which we have to explicitly avoid "free"ing, so let's just get rid of it.
268 # To do: don't even get the null communicator in the first place. Use a group and create instead of split.
269 if ensemble_communicator == MPI.COMM_NULL:
270 ensemble_communicator = None
272 return ensemble_communicator
275 def _acquire_communicator(communicator=None):
276 """Get a workflow level communicator for the session.
278 This function is intended to be called by the __enter__ method that creates
279 a session get a communicator instance. The `Free` method of the returned
280 instance must be called exactly once. This should be performed by the
281 corresponding __exit__ method.
284 communicator : a communicator to duplicate (optional)
287 A communicator that must be explicitly freed by the caller.
289 Currently only supports MPI multi-simulation parallelism dependent on
290 mpi4py. The mpi4py package should be installed and built with compilers
291 that are compatible with the gmxapi installation.
293 If provided, `communicator` must provide the mpi4py.MPI.Comm interface.
294 Returns either a duplicate of `communicator` or of MPI_COMM_WORLD if mpi4py
295 is available. Otherwise, returns a mock communicator that can only manage
296 sessions and ensembles of size 0 or 1.
298 gmx behavior is undefined if launched with mpiexec and without mpi4py
301 class MockSessionCommunicator(object):
318 return 'MockSessionCommunicator()'
320 if communicator is None:
322 import mpi4py.MPI as MPI
323 communicator = MPI.COMM_WORLD
325 logger.info("mpi4py is not available for default session communication.")
326 communicator = MockSessionCommunicator()
328 communicator = communicator
331 new_communicator = communicator.Dup()
332 except Exception as e:
333 message = "Exception when duplicating communicator: {}".format(e)
334 raise exceptions.ApiError(message)
336 return new_communicator
339 def _get_ensemble_communicator(communicator, ensemble_size):
340 """Provide ensemble_communicator feature in active_context, if possible.
342 Must be called on all ranks in `communicator`. The communicator returned
343 must be freed by a call to its `Free()` instance method. This function is
344 best used in a context manager's `__enter__()` method so that the
345 corresponding `context.Free()` can be called in the `__exit__` method.
348 communicator : session communicator for the session with the ensemble.
349 ensemble_size : ensemble size of the requested ensemble communicator
351 The ensemble_communicator feature should be present if the Context can
352 provide communication between all ensemble members. The Context should
353 determine this at the launch of the session and set the
354 ``_session_ensemble_communicator`` attribute to provide an object that
355 implements the same interface as an mpi4py.MPI.Comm object. Actually, this is
356 a temporary shim, so the only methods that need to be available are `Get_size`,
357 `Get_rank` and something that can be called as
358 Allreduce(send, recv) where send and recv are objects providing the Python
361 Currently, only one ensemble can be managed in a session.
363 ensemble_communicator = None
365 class TrivialEnsembleCommunicator(object):
373 def Allreduce(self, send, recv):
374 logger.debug("Faking an Allreduce for ensemble of size 1.")
375 send_buffer = self._numpy.array(send, copy=False)
376 recv_buffer = self._numpy.array(recv, copy=False)
377 recv_buffer[:] = send_buffer[:]
385 # For trivial cases, don't bother trying to use MPI
386 # Note: all ranks in communicator must agree on the size of the work!
387 # Note: If running with a Mock session communicator in an MPI session (user error)
388 # every rank will think it is the only rank and will try to perform the
390 if communicator.Get_size() <= 1 or ensemble_size <= 1:
391 message = "Getting TrivialEnsembleCommunicator for ensemble of size {}".format((ensemble_size))
392 message += " for session rank {} in session communicator of size {}".format(
393 communicator.Get_rank(),
394 communicator.Get_size())
395 logger.debug(message)
396 ensemble_communicator = TrivialEnsembleCommunicator()
398 message = "Getting an MPI subcommunicator for ensemble of size {}".format(ensemble_size)
399 message += " for session rank {} in session communicator of size {}".format(
400 communicator.Get_rank(),
401 communicator.Get_size())
402 logger.debug(message)
403 ensemble_communicator = _get_mpi_ensemble_communicator(communicator, ensemble_size)
405 return ensemble_communicator
407 def _get_ensemble_update(context):
408 """Set up a simple ensemble resource.
410 The context should call this function once per session to get an `ensemble_update`
413 This is a draft of a Context feature that may not be available in all
414 Context implementations. This factory function can be wrapped as a
415 ``ensemble_update`` "property" in a Context instance method to produce a Python function
416 with the signature ``update(context, send, recv, tag=None)``.
418 This feature requires that the Context is capabable of providing the
419 ensemble_communicator feature and the numpy feature.
420 If both are available, the function object provided by
421 ``ensemble_update`` provides
422 the ensemble reduce operation used by the restraint potential plugin in the
423 gmxapi sample_restraint repository. Otherwise, the provided function object
424 will log an error and then raise an exception.
426 gmxapi 0.0.5 and 0.0.6 MD plugin clients look for a member function named
427 ``ensemble_update`` in the Context that launched them. In the future,
428 clients will use session resources to access ensemble reduce operations.
429 In the mean time, a transitional implementation can involve defining a
430 ``ensemble_update`` property in the Context object that acts as a factory
431 function to produce the reducing operation, if possible with the given
437 message = "ensemble_update requires numpy, but numpy is not available."
438 logger.error(message)
439 raise exceptions.FeatureNotAvailableError(message)
441 def _ensemble_update(active_context, send, recv, tag=None):
442 assert not tag is None
443 assert str(tag) != ''
444 if not tag in active_context.part:
445 active_context.part[tag] = 0
446 logger.debug("Performing ensemble update.")
447 active_context._session_ensemble_communicator.Allreduce(send, recv)
448 buffer = numpy.array(recv, copy=False)
449 buffer /= active_context.work_width
450 suffix = '_{}.npz'.format(tag)
451 # These will end up in the working directory and each ensemble member will have one
452 filename = str("rank{}part{:04d}{}".format(active_context.rank, int(active_context.part[tag]), suffix))
453 numpy.savez(filename, recv=recv)
454 active_context.part[tag] += 1
456 def _no_ensemble_update(active_context, send, recv, tag=None):
457 message = "Attempt to call ensemble_update() in a Context that does not provide the operation."
458 # If we confirm effective exception handling, remove the extraneous log.
459 logger.error(message)
460 raise exceptions.FeatureNotAvailableError(message)
462 if context._session_ensemble_communicator is not None:
463 functor = _ensemble_update
465 functor = _no_ensemble_update
470 class Context(object):
471 """Manage an array of simulation work executing in parallel.
473 .. deprecated:: 0.0.7
475 This execution context implementation is imported from the gmxapi 0.0.7
476 package for GROMACS 2019 and does not conform to the protocols of gmxapi 0.1+.
477 It is used internally to support legacy code.
479 The following features are subject to be changed or removed without further
483 work :obj:`gmx.workflow.WorkSpec`: specification of work to be performed when a session is launched.
484 rank : numerical index of the current worker in a running session (None if not running)
485 work_width : Detected number of co-scheduled work elements.
486 elements : dictionary of references to elements of the workflow.
488 `rank`, `work_width`, and `elements` are empty or None until the work is processed, as during session launch.
490 To run multiple simulations at a time, whether ensembles or independent
491 simulations, Python should be invoked in an MPI environment with `mpi4py`.
492 The Context can then use one MPI rank per simulation. It is recommended that
493 GROMACS is compiled without MPI in this case. (Thread-MPI is recommended.)
494 MPI domain decomposition is not explicitly supported with this Context
499 >>> from gmxapi.simulation.context import get_context
500 >>> from gmxapi.simulation.workflow import from_tpr
501 >>> work = from_tpr([tpr_filename, tpr_filename])
502 >>> context = get_context(work)
503 >>> with context as session:
505 ... # The session is one abstraction too low to know what rank it is. It lets the spawning context manage
507 ... # rank = session.rank
508 ... # The local context object knows where it fits in the global array.
509 ... rank = context.rank
510 ... output_path = os.path.join(context.workdir_list[rank], 'traj.trr')
511 ... assert(os.path.exists(output_path))
512 ... print('Worker {} produced {}'.format(rank, output_path))
515 Do not run a gmxapi script in an MPI environment without `mpi4py`. gmxapi
516 will not be able to determine that other processes are running the same
519 Implementation notes:
521 To produce a running session, the Context __enter__() method is called, according to the Python context manager
522 protocol. At this time, the attached WorkSpec must be feasible on the available resources. To turn the specified
523 work into an executable directed acyclic graph (DAG), handle objects for the elements in the work spec are sequenced
524 in dependency-compatible order and the context creates a "builder" for each according to the element's operation.
525 Each builder is subscribed to the builders of its dependency elements. The DAG is then assembled by calling each
526 builder in sequence. A builder can add zero, one, or more nodes and edges to the DAG.
528 The Session is then launched from the DAG. What happens next is implementation-dependent, and it may take a while for
529 us to decide whether and how to standardize interfaces for the DAG nodes and edges and/or execution protocols. I
530 expect each node will at least have a `launch()` method, but will also probably have typed input and output ports as well as some signalling.
531 A sophisticated and abstract Session implementation could schedule work only to satisfy data dependencies of requested
532 output upon request. Our immediate implementation will use the following protocol.
534 Each node has a `launch()` method. When the session is entered, the `launch()` method is called for each node in
535 dependency order. The launch method returns either a callable (`run()` function) or None, raising an exception in
536 case of an error. The sequence of callables is stored by the Session. When Session.run() is called, the
537 sequence of callables is called in order. If StopIteration is raised by the callable, it is removed from the sequence.
538 The sequence is processed repeatedly until there are no more callables.
540 Note that this does not rigorously handle races or deadlocks, or flexibility in automatically chasing dependencies. A more
541 thorough implementation could recursively call launch on dependencies (launch could be idempotent or involve some
542 signalling to dependents when complete), run calls could be entirely event driven, and/or nodes could "publish"
543 output (including just a completion message), blocking for acknowledgement before looking for the next set of subscribed inputs.
546 def __init__(self, work=None, workdir_list=None, communicator=None):
547 """Create manager for computing resources.
549 Does not initialize resources because Python objects by themselves do
550 not have a good way to deinitialize resources. Instead, resources are
551 initialized using the Python context manager protocol when sessions are
554 Appropriate computing resources need to be knowable when the Context is created.
557 work : work specification with which to initialize this context
558 workdir_list : deprecated
559 communicator : non-owning reference to a multiprocessing communicator
561 If provided, communicator must implement the mpi4py.MPI.Comm interface. The
562 Context will use this communicator as the parent for subcommunicators
563 used when launching sessions. If provided, communicator is owned by the
564 caller, and must be freed by the caller after any sessions are closed.
565 By default, the Context will get a reference to MPI_COMM_WORLD, which
566 will be freed when the Python process ends and cleans up its resources.
567 The communicator stored by the Context instance will not be used directly,
568 but will be duplicated when launching sessions using ``with``.
571 # self.__context_array = list([Context(work_element) for work_element in work])
572 from .workflow import WorkSpec
574 # Until better Session abstraction exists at the Python level, a
575 # _session_communicator attribute will be added to and removed from the
576 # context at session entry and exit. If necessary, a _session_ensemble_communicator
577 # will be split from _session_communicator for simulation ensembles
578 # present in the specified work.
579 self.__communicator = communicator
581 self.__work = WorkSpec()
582 self.__workdir_list = workdir_list
585 # Note: this attribute is a detail of MPI-based Contexts. Client access
586 # is subject to change.
589 # `work_width` notes the required width of an array of synchronous tasks to perform the specified work.
590 # As work elements are processed, self.work_width will be increased as appropriate.
591 self.work_width = None
593 # initialize the operations map. May be extended during the lifetime of a Context.
594 # Note that there may be a difference between built-in operations provided by this module and
595 # additional operations registered at run time.
596 self.__operations = dict()
597 # The map contains a builder for each operation. The builder is created by passing the element to the function
598 # in the map. The object returned must have the following methods:
600 # * add_subscriber(another_builder) : allow other builders to subscribe to this one.
601 # * build(dag) : Fulfill the builder responsibilities by adding an arbitrary number of nodes and edges to a Graph.
603 # The gmxapi namespace of operations should be consistent with a specified universal set of functionalities
604 self.__operations['gmxapi'] = {'md': lambda element : _md(self, element),
606 # Even if TPR file loading were to become a common and stable enough operation to be specified in
607 # an API, it is unlikely to be implemented by any code outside of GROMACS, so let's not clutter
608 # a potentially more universal namespace.
609 self.__operations['gromacs'] = {'load_tpr': lambda element : _load_tpr(self, element),
612 # Right now we are treating workspec elements and work DAG nodes as equivalent, but they are
613 # emphatically not intended to be tightly coupled. The work specification is intended to be
614 # simple, user-friendly, general, and easy-to-implement. The DAG is an implementation detail
615 # and may differ across context types. It is likely to have stronger typing of nodes and/or
616 # edges. It is not yet specified whether we should translate the work into a graph before, after,
617 # or during processing of the elements, so it is not yet known whether we will need facilities
618 # to allow cross-referencing between the two graph-type structures. If we instantiate API objects
619 # as we process work elements, and the DAG in a context deviates from the work specification
620 # topology, we would need to use named dependencies to look up objects to bind to. Such facilities
621 # could be hidden in the WorkElement class(es), too, to delegate code away from the Context as a
622 # container class growing without bounds...
623 # In actuality, we will have to process the entire workspec to some degree to make sure we can
624 # run it on the available resources.
627 # This setter must be called after the operations map has been populated.
631 self._api_object = _gmxapi.Context()
632 except Exception as e:
633 logger.error('Got exception when trying to create default library context: ' + str(e))
634 raise exceptions.ApiError('Uncaught exception in API object creation.') from e
641 def work(self, work):
642 """Set `work` attribute.
645 gmx.exceptions.ApiError: work is not compatible with schema or
647 gmx.exceptions.UsageError: Context can not access operations in
648 the name space given for an Element
649 gmx.exceptions.ValueError: assignment operation cannot be performed
650 for the provided object (rhs)
652 For discussion on error handling, see https://github.com/kassonlab/gmxapi/issues/125
654 from .workflow import WorkSpec, WorkElement
658 if isinstance(work, WorkSpec):
660 elif hasattr(work, 'workspec') and isinstance(work.workspec, WorkSpec):
661 workspec = work.workspec
663 raise exceptions.ValueError('work argument must provide a gmx.workflow.WorkSpec.')
664 workspec._context = self
666 # Make sure this context knows how to run the specified work.
667 for e in workspec.elements:
668 element = WorkElement.deserialize(workspec.elements[e])
670 # Note: Non-built-in namespaces (non-native) are treated as modules to import.
671 # Native namespaces may not be completely implemented in a particular version of a particular Context.
672 if element.namespace in {'gmxapi', 'gromacs'}:
673 assert element.namespace in self.__operations
674 if not element.operation in self.__operations[element.namespace]:
675 # The requested element is a built-in operation but not available in this Context.
676 # element.namespace should be mapped, but not all operations are necessarily implemented.
677 logger.error("Operation {} not found in map {}".format(element.operation,
678 str(self.__operations)))
679 # This check should be performed when deciding if the context is appropriate for the work.
680 # If we are just going to use a try/catch block for this test, then we should differentiate
681 # this exception from those raised due to incorrect usage.
682 # The exception thrown here may evolve with https://github.com/kassonlab/gmxapi/issues/125
683 raise exceptions.FeatureNotAvailableError(
684 'Specified work cannot be performed due to unimplemented operation {}.{}.'.format(
689 assert element.namespace not in {'gmxapi', 'gromacs'}
691 # Don't leave an empty nested dictionary if we couldn't map the operation.
692 if element.namespace in self.__operations:
693 namespace_map = self.__operations[element.namespace]
695 namespace_map = dict()
697 # Set or update namespace map iff we have something new.
698 if element.operation not in namespace_map:
700 element_module = importlib.import_module(element.namespace)
701 element_operation = getattr(element_module, element.operation)
702 except ImportError as e:
703 raise exceptions.UsageError(
704 'Cannot find implementation for namespace {}. ImportError: {}'.format(
707 except AttributeError:
708 raise exceptions.UsageError(
709 'Cannot find factory for operation {}.{}'.format(
714 namespace_map[element.operation] = element_operation
715 self.__operations[element.namespace] = namespace_map
717 self.__work = workspec
719 def add_operation(self, namespace, operation, get_builder):
720 # noinspection PyUnresolvedReferences
721 """Add a builder factory to the operation map.
723 Extends the known operations of the Context by mapping an operation in a namespace to a function
724 that returns a builder to process a work element referencing the operation. Must be called before
725 the work specification is added, since the spec is inspected to confirm that the Context can run it.
727 It may be more appropriate to add this functionality to the Context constructor or as auxiliary
728 information in the workspec, or to remove it entirely; it is straight-forward to just add snippets
729 of code to additional files in the working directory and to make them available as modules for the
734 >>> # Import some custom extension code.
736 >>> myelement = myplugin.new_element()
737 >>> workspec = gmx.workflow.WorkSpec()
738 >>> workspec.add_element(myelement)
739 >>> context = gmx.context.ParallelArrayContext()
740 >>> context.add_operation(myelement.namespace, myelement.operation, myplugin.element_translator)
741 >>> context.work = workspec
742 >>> with get_context() as session:
746 if namespace not in self.__operations:
747 if namespace in {'gmxapi', 'gromacs'}:
748 raise exceptions.UsageError("Cannot add operations to built-in namespaces.")
750 self.__operations[namespace] = dict()
752 assert namespace in self.__operations
754 if operation in self.__operations[namespace]:
755 raise exceptions.UsageError("Operation {}.{} already defined in this context.".format(namespace, operation))
757 self.__operations[namespace][operation] = get_builder
759 # Set up a simple ensemble resource
760 # This should be implemented for Session, not Context, and use an appropriate subcommunicator
761 # that is created and freed as the Session launches and exits.
762 def ensemble_update(self, send, recv, tag=None):
763 """Implement the ensemble_update member function that gmxapi through 0.0.6 expects.
766 # gmxapi through 0.0.6 expects to bind to this member function during "build".
767 # This behavior needs to be deprecated (bind during launch, instead), but this
768 # dispatching function should be an effective placeholder.
769 if tag is None or str(tag) == '':
770 raise exceptions.ApiError("ensemble_update must be called with a name tag.")
771 # __ensemble_update is an attribute, not an instance function, so we need to explicitly pass 'self'
772 return self.__ensemble_update(self, send, recv, tag)
775 """Implement Python context manager protocol, producing a Session.
777 The work specified in this Context is inspected to build a directed
778 acyclic dependency graph (DAG). A Session is launched after reconciling
779 the configured work with available computing resources. Each time the
780 `run()` method of the Session is called, the graph is executed, and any
781 nodes that have no more work to do are pruned from the graph.
783 In practice, there are not currently any types of work implemented that
784 do not complete on the first pass.
787 Session object that can be run and/or inspected.
789 Additional API operations are possible while the Session is active. When used as a Python context manager,
790 the Context will close the Session at the end of the `with` block by calling `__exit__`.
792 Note: this is probably where we will have to process the work specification to determine whether we
793 have appropriate resources (such as sufficiently wide parallelism). Until we have a better Session
794 abstraction, this means the clean approach should take two passes to first build a DAG and then
795 instantiate objects to perform the work. In the first implementation, we kind of muddle things into
799 import networkx as nx
800 from networkx import DiGraph as _Graph
802 raise exceptions.FeatureNotAvailableError("gmx requires the networkx package to execute work graphs.")
804 # Cache the working directory from which we were launched so that __exit__() can give us proper context
805 # management behavior.
806 self.__initial_cwd = os.getcwd()
807 logger.debug("Launching session from {}".format(self.__initial_cwd))
809 if self._session is not None:
810 raise exceptions.Error('Already running.')
811 if self.work is None:
812 raise exceptions.UsageError('No work to perform!')
814 # Set up the global and local context.
815 # Check the global MPI configuration
816 # Since the Context doesn't have a destructor, if we use an MPI communicator at this scope then
817 # it has to be owned and managed outside of Context.
818 self._session_communicator = _acquire_communicator(self.__communicator)
819 context_comm_size = self._session_communicator.Get_size()
820 context_rank = self._session_communicator.Get_rank()
821 self.rank = context_rank
822 # self._communicator = communicator
823 logger.debug("Context rank {} in context {} of size {}".format(context_rank,
824 self._session_communicator,
828 # Process the work specification.
830 logger.debug("Processing workspec:\n{}".format(str(self.work)))
832 # Get a builder for DAG components for each element
834 builder_sequence = []
835 for element in self.work:
836 # dispatch builders for operation implementations
838 new_builder = self.__operations[element.namespace][element.operation](element)
839 assert hasattr(new_builder, 'add_subscriber')
840 assert hasattr(new_builder, 'build')
842 logger.info("Collected builder for {}".format(element.name))
843 except LookupError as e:
844 request = '.'.join([element.namespace, element.operation])
845 message = 'Could not find an implementation for the specified operation: {}. '.format(request)
847 raise exceptions.ApiError(message)
848 # Subscribing builders is the Context's responsibility because otherwise the builders
849 # don't know about each other. Builders should not depend on the Context unless they
850 # are a facility provided by the Context, in which case they may be member functions
851 # of the Context. We will probably need to pass at least some
852 # of the Session to the `launch()` method, though...
853 dependencies = element.depends
854 for dependency in dependencies:
855 # If a dependency is a list, assume it is an "ensemble" of dependencies
856 # and pick the element for corresponding to the local rank.
857 if isinstance(dependency, (list, tuple)):
858 assert len(dependency) > context_rank
859 name = str(dependency[context_rank])
862 logger.info("Subscribing {} to {}.".format(element.name, name))
863 builders[name].add_subscriber(new_builder)
864 builders[element.name] = new_builder
865 builder_sequence.append(element.name)
866 logger.debug("Appended {} to builder sequence.".format(element.name))
868 # Call the builders in dependency order
869 # Note: session_communicator is available, but ensemble_communicator has not been created yet.
870 graph = _Graph(width=1)
871 logger.info("Building sequence {}".format(builder_sequence))
872 for name in builder_sequence:
873 builder = builders[name]
874 logger.info("Building {}".format(builder))
875 logger.debug("Has build attribute {}.".format(builder.build))
877 logger.debug("Built.")
878 self.work_width = graph.graph['width']
880 # Prepare working directories. This should probably be moved to some aspect of the Session and either
881 # removed from here or made more explicit to the user.
882 workdir_list = self.__workdir_list
883 if workdir_list is None:
884 workdir_list = [os.path.join('.', str(i)) for i in range(self.work_width)]
885 self.__workdir_list = list([os.path.abspath(dir) for dir in workdir_list])
887 # For gmxapi 0.0.6, all ranks have a session_ensemble_communicator
888 self._session_ensemble_communicator = _get_ensemble_communicator(self._session_communicator, self.work_width)
889 self.__ensemble_update = _get_ensemble_update(self)
891 # launch() is currently a method of gmx.core.MDSystem and returns a gmxapi::Session.
892 # MDSystem objects are obtained from gmx.core.from_tpr(). They also provide add_potential().
893 # gmxapi::Session objects are exposed as gmx.core.MDSession and provide run() and close() methods.
895 # Here, I want to find the input appropriate for this rank and get an MDSession for it.
896 # E.g. Make a pass that allows meta-objects to bind (setting md_proxy._input_tpr and md_proxy._plugins,
897 # and then call a routine implemented by each object to run whatever protocol it needs, such
898 # as `system = gmx.core.from_tpr(md._input_tpr); system.add_potential(md._plugins)
899 # For future design plans, reference https://github.com/kassonlab/gmxapi/issues/65
901 # This `if` condition is currently the thing that ultimately determines whether the
902 # rank attempts to do work.
903 if context_rank < self.work_width:
905 logger.debug(("Launching graph {}.".format(graph.graph)))
906 logger.debug("Graph nodes: {}".format(str(list(graph.nodes))))
907 logger.debug("Graph edges: {}".format(str(list(graph.edges))))
909 logger.info("Launching work on context rank {}, subcommunicator rank {}.".format(
911 self._session_ensemble_communicator.Get_rank()))
913 # Launch the work for this rank
914 self.workdir = self.__workdir_list[self.rank]
915 if os.path.exists(self.workdir):
916 if not os.path.isdir(self.workdir):
917 raise exceptions.Error('{} is not a valid working directory.'.format(self.workdir))
919 os.mkdir(self.workdir)
920 os.chdir(self.workdir)
921 logger.info('rank {} changed directory to {}'.format(self.rank, self.workdir))
922 sorted_nodes = nx.topological_sort(graph)
925 for name in sorted_nodes:
926 launcher = graph.nodes[name]['launch']
927 runner = launcher(self.rank)
928 if not runner is None:
929 runners.append(runner)
930 closers.append(graph.nodes[name]['close'])
932 # Get a session object to return. It must simply provide a `run()` function.
933 context = self # Part of workaround for bug gmxapi-214
934 class Session(object):
935 def __init__(self, runners, closers):
936 self.runners = list(runners)
937 self.closers = list(closers)
940 # Note we are not following the documented protocol of running repeatedly yet.
942 for i, runner in enumerate(self.runners):
945 except StopIteration:
946 to_be_deleted.insert(0, i)
947 for i in to_be_deleted:
952 for close in self.closers:
953 logger.debug("Closing node: {}".format(close))
955 # Workaround for bug gmxapi-214
956 if not _gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'):
957 context._api_object = _gmxapi.Context()
959 self._session = Session(runners, closers)
961 logger.info("Context rank {} has no work to do".format(self.rank))
963 context = self # Part of workaround for bug gmxapi-214
964 class NullSession(object):
966 logger.info("Running null session on rank {}.".format(self.rank))
969 logger.info("Closing null session.")
970 # Workaround for bug gmxapi-214
971 if not _gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'):
972 context._api_object = _gmxapi.Context()
975 self._session = NullSession()
976 self._session.rank = self.rank
978 # Make sure session has started on all ranks before continuing?
980 self._session.graph = graph
983 def __exit__(self, exception_type, exception_value, traceback):
984 """Implement Python context manager protocol."""
985 logger.info("Exiting session on context rank {}.".format(self.rank))
986 if self._session is not None:
987 logger.info("Calling session.close().")
988 self._session.close()
991 # Note: we should not have a None session but rather an API-compliant Session that just has no work.
992 # Reference: https://github.com/kassonlab/gmxapi/issues/41
993 logger.info("No _session known to context or session already closed.")
994 if hasattr(self, '_session_ensemble_communicator'):
995 if self._session_communicator is not None:
996 logger.info("Freeing sub-communicator {} on rank {}".format(
997 self._session_ensemble_communicator,
999 self._session_ensemble_communicator.Free()
1001 logger.debug('"None" ensemble communicator does not need to be "Free"d.')
1002 del self._session_ensemble_communicator
1004 logger.debug("No ensemble subcommunicator on context rank {}.".format(self.rank))
1006 logger.debug("Freeing session communicator.")
1007 self._session_communicator.Free()
1008 logger.debug("Deleting session communicator reference.")
1009 del self._session_communicator
1011 os.chdir(self.__initial_cwd)
1012 logger.info("Session closed on context rank {}.".format(self.rank))
1013 # Note: Since sessions running in different processes can have different work, sessions have not necessarily
1014 # ended on all ranks. As a result, starting another session on the same resources could block until the
1015 # resources are available.
1017 # Python context managers return False when there were no exceptions to handle.
1021 def get_context(work=None):
1022 """Get a concrete Context object.
1025 work (gmx.workflow.WorkSpec): runnable work as a valid gmx.workflow.WorkSpec object
1028 An object implementing the :py:class:`gmx.context.Context` interface, if possible.
1031 gmx.exceptions.ValueError if an appropriate context for ``work`` could not be loaded.
1033 If work is provided, return a Context object capable of running the provided work or produce an error.
1035 The semantics for finding Context implementations needs more consideration, and a more informative exception
1038 A Context can run the provided work if
1040 * the Context supports can resolve all operations specified in the elements
1041 * the Context supports DAG topologies implied by the network of dependencies
1042 * the Context supports features required by the elements with the specified parameters,
1043 such as synchronous array jobs.
1046 # We need to define an interface for WorkSpec objects so that we don't need
1047 # to rely on typing and inter-module dependencies.
1048 from .workflow import WorkSpec
1050 if work is not None:
1051 if isinstance(work, WorkSpec):
1053 elif hasattr(work, 'workspec') and isinstance(work.workspec,
1055 workspec = work.workspec
1057 raise exceptions.ValueError('work argument must provide a gmx.workflow.WorkSpec.')
1058 if workspec is not None and \
1059 hasattr(workspec, '_context') and \
1060 workspec._context is not None:
1061 context = workspec._context
1063 context = Context(work=workspec)