2 # This file is part of the GROMACS molecular simulation package.
4 # Copyright (c) 2019, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 # Lesser General Public License for more details.
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
35 # This file is based on the Kasson Lab gmxapi project release 0.0.7.4.
36 # https://github.com/kassonlab/gmxapi/blob/v0.0.7.4/src/gmx/context.py
37 # https://github.com/kassonlab/gmxapi/blob/v0.0.7.4/LICENSE
50 from gmxapi import exceptions
51 from gmxapi import logger as root_logger
52 import gmxapi._gmxapi as _gmxapi
55 logger = root_logger.getChild('simulation.context')
56 logger.info('Importing {}'.format(__name__))
59 def _load_tpr(self, element):
60 """Implement the gromacs.load_tpr operation.
62 Updates the minimum width of the workflow parallelism. Does not add any API object to the graph.
65 self: The Context in which this operation is being loaded.
66 element: WorkElement specifying the operation.
69 A Director that the Context can use in launching the Session.
71 class Builder(object):
72 def __init__(self, tpr_list):
73 logger.debug("Loading tpr builder for tpr_list {}".format(tpr_list))
74 self.tpr_list = tpr_list
76 self.width = len(tpr_list)
78 def add_subscriber(self, builder):
79 builder.infile = self.tpr_list
80 self.subscribers.append(builder)
83 width = len(self.tpr_list)
84 for builder in self.subscribers:
86 if 'width' in dag.graph:
87 width = max(width, dag.graph['width'])
88 dag.graph['width'] = width
90 return Builder(element.params['input'])
92 def _md(context, element):
93 """Implement the gmxapi.md operation by returning a builder that can populate a data flow graph for the element.
95 Inspects dependencies to set up the simulation runner.
97 The graph node created will have `launch` and `run` attributes with function references, and a `width`
98 attribute declaring the workflow parallelism requirement.
101 context: The Context in which this operation is being loaded.
102 element: WorkElement specifying the operation.
105 A Director that the Context can use in launching the Session.
107 class Builder(object):
108 """Translate md work element to a node in the session's DAG."""
109 def __init__(self, element):
111 self.name = element.name
112 # Note that currently the calling code is in charge of subscribing this builder to its dependencies.
113 # A list of tpr files will be set when the calling code subscribes this builder to a tpr provider.
115 # Other dependencies in the element may register potentials when subscribed to.
117 self.input_nodes = []
118 self.runtime_params = element.params
119 except AttributeError:
120 raise exceptions.ValueError("object provided does not seem to be a WorkElement.")
121 def add_subscriber(self, builder):
122 """The md operation does not yet have any subscribeable facilities."""
124 def build(self, dag):
125 """Add a node to the graph that, when launched, will construct a simulation runner.
127 Complete the definition of appropriate graph edges for dependencies.
129 The launch() method of the added node creates the runner from the tpr file for the current rank and adds
130 modules from the incoming edges.
132 if not (hasattr(dag, 'add_node')
133 and hasattr(dag, 'add_edge')
134 and hasattr(dag, 'graph')
135 and hasattr(dag, 'nodes')):
136 raise exceptions.ProtocolError("Argument 'dag' must provide the DiGraph interface.")
139 for neighbor in self.input_nodes:
140 dag.add_edge(neighbor, name)
142 assert not infile is None
143 potential_list = self.potential
144 assert dag.graph['width'] >= len(infile)
146 # Provide closure with which to execute tasks for this node.
147 def launch(rank=None):
148 assert not rank is None
150 # Copy and update, if required by `end_time` parameter.
152 if 'end_time' in self.runtime_params:
153 # Note that mkstemp returns a file descriptor as the first part of the tuple.
154 # We can make this cleaner in 0.0.7 with a separate node that manages the
156 _, temp_filename = tempfile.mkstemp(suffix='.tpr')
157 logger.debug('Updating input. Using temp file {}'.format(temp_filename))
158 _gmxapi.rewrite_tprfile(source=infile[rank],
159 destination=temp_filename,
160 end_time=self.runtime_params['end_time'])
161 tpr_file = temp_filename
163 tpr_file = infile[rank]
165 logger.info('Loading TPR file: {}'.format(tpr_file))
166 system = _gmxapi.from_tpr(tpr_file)
167 dag.nodes[name]['system'] = system
168 mdargs = _gmxapi.MDArgs()
169 mdargs.set(self.runtime_params)
170 # Workaround to give access to plugin potentials used in a context.
171 pycontext = element.workspec._context
172 pycontext.potentials = potential_list
173 context = pycontext._api_object
174 context.setMDArgs(mdargs)
175 for potential in potential_list:
176 context.add_mdmodule(potential)
177 dag.nodes[name]['session'] = system.launch(context)
178 dag.nodes[name]['close'] = dag.nodes[name]['session'].close
180 if 'end_time' in self.runtime_params:
182 dag.nodes[name]['session'].close()
183 logger.debug("Unlinking temporary TPR file {}.".format(temp_filename))
184 os.unlink(temp_filename)
185 dag.nodes[name]['close'] = special_close
187 dag.nodes[name]['close'] = dag.nodes[name]['session'].close
190 """Currently we only support a single call to run."""
192 raise StopIteration()
193 # Replace the runner with a stop condition for subsequent passes.
194 dag.nodes[name]['run'] = done
195 return dag.nodes[name]['session'].run()
196 dag.nodes[name]['run'] = runner
197 return dag.nodes[name]['run']
199 dag.nodes[name]['launch'] = launch
201 return Builder(element)
204 def _get_mpi_ensemble_communicator(session_communicator, ensemble_size):
205 """Get an ensemble communicator from an MPI communicator.
207 An ensemble communicator is an object that implements mpi4py.MPI.Comm methods
208 as described elsewhere in this documentation.
210 :param session_communicator: MPI communicator with the interface described by mpi4py.MPI.Comm
211 :param ensemble_size: number of ensemble members
212 :return: communicator of described size on participating ranks and null communicator on any others.
214 Must be called exactly once in every process in `communicator`. It is the
215 responsibility of the calling code to refrain from running ensemble operations
216 if not part of the ensemble. The calling code determines this by comparing its
217 session_communicator.Get_rank() to ensemble_size. This is not a good solution
218 because it assumes there is a single ensemble communicator and that ensemble
219 work is allocated to ranks serially from session_rank 0. Future work might
220 use process groups associated with specific operations in the work graph so
221 that processes can check for membership in a group to decide whether to use
222 their ensemble communicator. Another possibility would be to return None
223 rather than a null communicator in processes that aren't participating in
227 from mpi4py import MPI
229 raise exceptions.FeatureNotAvailableError('MPI ensemble communicator requires mpi4py package.')
231 session_size = session_communicator.Get_size()
232 session_rank = session_communicator.Get_rank()
234 # Check the ensemble "width" against the available parallelism
235 if ensemble_size > session_size:
236 msg = 'ParallelArrayContext requires a work array that fits in the MPI communicator: '
237 msg += 'array width {} > size {}.'
238 msg = msg.format(ensemble_size, session_size)
239 raise exceptions.UsageError(msg)
240 if ensemble_size < session_size:
241 msg = 'MPI context is wider than necessary to run this work: array width {} vs. size {}.'
242 warnings.warn(msg.format(ensemble_size, session_size))
244 # Create an appropriate sub-communicator for the present work. Extra ranks will be in a
245 # sub-communicator with no work.
246 if session_rank < ensemble_size:
247 # The session launcher should maintain an inventory of the ensembles and
248 # provide an appropriate tag, but right now we just have a sort of
249 # Boolean: ensemble or not.
252 color = MPI.UNDEFINED
254 ensemble_communicator = session_communicator.Split(color, session_rank)
256 ensemble_communicator_size = ensemble_communicator.Get_size()
257 ensemble_communicator_rank = ensemble_communicator.Get_rank()
259 warnings.warn("Possible API programming error: ensemble_communicator does not provide required methods...")
260 ensemble_communicator_size = 0
261 ensemble_communicator_rank = None
262 logger.info("Session rank {} assigned to rank {} of subcommunicator {} of size {}".format(
264 ensemble_communicator_rank,
265 ensemble_communicator,
266 ensemble_communicator_size
269 # There isn't a good reason to worry about special handling for a null communicator,
270 # which we have to explicitly avoid "free"ing, so let's just get rid of it.
271 # To do: don't even get the null communicator in the first place. Use a group and create instead of split.
272 if ensemble_communicator == MPI.COMM_NULL:
273 ensemble_communicator = None
275 return ensemble_communicator
278 class _DummyCommunicator(object):
279 """Placeholder class for trivial communication resources.
281 Simplifies logic in this module until communications infrastructure is more robust.
294 def Allreduce(self, send, recv):
295 logger.debug("Faking an Allreduce for ensemble of size 1.")
296 send_buffer = self._numpy.array(send, copy=False)
297 recv_buffer = self._numpy.array(recv, copy=False)
298 recv_buffer[:] = send_buffer[:]
307 return '_DummyCommunicator'
310 return '_DummyCommunicator()'
313 def _acquire_communicator(communicator=None):
314 """Get a workflow level communicator for the session.
316 This function is intended to be called by the __enter__ method that creates
317 a session get a communicator instance. The `Free` method of the returned
318 instance must be called exactly once. This should be performed by the
319 corresponding __exit__ method.
322 communicator : a communicator to duplicate (optional)
325 A communicator that must be explicitly freed by the caller.
327 Currently only supports MPI multi-simulation parallelism dependent on
328 mpi4py. The mpi4py package should be installed and built with compilers
329 that are compatible with the gmxapi installation.
331 If provided, `communicator` must provide the mpi4py.MPI.Comm interface.
332 Returns either a duplicate of `communicator` or of MPI_COMM_WORLD if mpi4py
333 is available. Otherwise, returns a mock communicator that can only manage
334 sessions and ensembles of size 0 or 1.
336 gmx behavior is undefined if launched with mpiexec and without mpi4py
339 if communicator is None:
341 import mpi4py.MPI as _MPI
345 logger.info("mpi4py is not available for default session communication.")
346 communicator = _DummyCommunicator()
348 communicator = _MPI.COMM_WORLD
350 communicator = communicator
353 new_communicator = communicator.Dup()
354 except Exception as e:
355 message = "Exception when duplicating communicator: {}".format(e)
356 raise exceptions.ApiError(message)
358 return new_communicator
361 def _get_ensemble_communicator(communicator, ensemble_size):
362 """Provide ensemble_communicator feature in active_context, if possible.
364 Must be called on all ranks in `communicator`. The communicator returned
365 must be freed by a call to its `Free()` instance method. This function is
366 best used in a context manager's `__enter__()` method so that the
367 corresponding `context.Free()` can be called in the `__exit__` method.
370 communicator : session communicator for the session with the ensemble.
371 ensemble_size : ensemble size of the requested ensemble communicator
373 The ensemble_communicator feature should be present if the Context can
374 provide communication between all ensemble members. The Context should
375 determine this at the launch of the session and set the
376 ``_session_ensemble_communicator`` attribute to provide an object that
377 implements the same interface as an mpi4py.MPI.Comm object. Actually, this is
378 a temporary shim, so the only methods that need to be available are `Get_size`,
379 `Get_rank` and something that can be called as
380 Allreduce(send, recv) where send and recv are objects providing the Python
383 Currently, only one ensemble can be managed in a session.
385 ensemble_communicator = None
387 # For trivial cases, don't bother trying to use MPI
388 # Note: all ranks in communicator must agree on the size of the work!
389 # Note: If running with a dummy session communicator in an MPI session (user error)
390 # every rank will think it is the only rank and will try to perform the
392 if communicator.Get_size() <= 1 or ensemble_size <= 1:
393 message = "Getting trivial ensemble communicator for ensemble of size {}".format((ensemble_size))
394 message += " for session rank {} in session communicator of size {}".format(
395 communicator.Get_rank(),
396 communicator.Get_size())
397 logger.debug(message)
398 ensemble_communicator = _DummyCommunicator()
400 message = "Getting an MPI subcommunicator for ensemble of size {}".format(ensemble_size)
401 message += " for session rank {} in session communicator of size {}".format(
402 communicator.Get_rank(),
403 communicator.Get_size())
404 logger.debug(message)
405 ensemble_communicator = _get_mpi_ensemble_communicator(communicator, ensemble_size)
407 return ensemble_communicator
409 def _get_ensemble_update(context):
410 """Set up a simple ensemble resource.
412 The context should call this function once per session to get an `ensemble_update`
415 This is a draft of a Context feature that may not be available in all
416 Context implementations. This factory function can be wrapped as a
417 ``ensemble_update`` "property" in a Context instance method to produce a Python function
418 with the signature ``update(context, send, recv, tag=None)``.
420 This feature requires that the Context is capabable of providing the
421 ensemble_communicator feature and the numpy feature.
422 If both are available, the function object provided by
423 ``ensemble_update`` provides
424 the ensemble reduce operation used by the restraint potential plugin in the
425 gmxapi sample_restraint repository. Otherwise, the provided function object
426 will log an error and then raise an exception.
428 gmxapi 0.0.5 and 0.0.6 MD plugin clients look for a member function named
429 ``ensemble_update`` in the Context that launched them. In the future,
430 clients will use session resources to access ensemble reduce operations.
431 In the mean time, a transitional implementation can involve defining a
432 ``ensemble_update`` property in the Context object that acts as a factory
433 function to produce the reducing operation, if possible with the given
439 message = "ensemble_update requires numpy, but numpy is not available."
440 logger.error(message)
441 raise exceptions.FeatureNotAvailableError(message)
443 def _ensemble_update(active_context, send, recv, tag=None):
444 assert not tag is None
445 assert str(tag) != ''
446 if not tag in active_context.part:
447 active_context.part[tag] = 0
448 logger.debug("Performing ensemble update.")
449 active_context._session_ensemble_communicator.Allreduce(send, recv)
450 buffer = numpy.array(recv, copy=False)
451 buffer /= active_context.work_width
452 suffix = '_{}.npz'.format(tag)
453 # These will end up in the working directory and each ensemble member will have one
454 filename = str("rank{}part{:04d}{}".format(active_context.rank, int(active_context.part[tag]), suffix))
455 numpy.savez(filename, recv=recv)
456 active_context.part[tag] += 1
458 def _no_ensemble_update(active_context, send, recv, tag=None):
459 message = "Attempt to call ensemble_update() in a Context that does not provide the operation."
460 # If we confirm effective exception handling, remove the extraneous log.
461 logger.error(message)
462 raise exceptions.FeatureNotAvailableError(message)
464 if context._session_ensemble_communicator is not None:
465 functor = _ensemble_update
467 functor = _no_ensemble_update
472 class Context(object):
473 """Manage an array of simulation work executing in parallel.
475 .. deprecated:: 0.0.7
477 This execution context implementation is imported from the gmxapi 0.0.7
478 package for GROMACS 2019 and does not conform to the protocols of gmxapi 0.1+.
479 It is used internally to support legacy code.
481 The following features are subject to be changed or removed without further
485 work :obj:`gmx.workflow.WorkSpec`: specification of work to be performed when a session is launched.
486 rank : numerical index of the current worker in a running session (None if not running)
487 work_width : Detected number of co-scheduled work elements.
488 elements : dictionary of references to elements of the workflow.
490 `rank`, `work_width`, and `elements` are empty or None until the work is processed, as during session launch.
492 To run multiple simulations at a time, whether ensembles or independent
493 simulations, Python should be invoked in an MPI environment with `mpi4py`.
494 The Context can then use one MPI rank per simulation. It is recommended that
495 GROMACS is compiled without MPI in this case. (Thread-MPI is recommended.)
496 MPI domain decomposition is not explicitly supported with this Context
501 >>> from gmxapi.simulation.context import get_context
502 >>> from gmxapi.simulation.workflow import from_tpr
503 >>> work = from_tpr([tpr_filename, tpr_filename])
504 >>> context = get_context(work)
505 >>> with context as session:
507 ... # The session is one abstraction too low to know what rank it is. It lets the spawning context manage
509 ... # rank = session.rank
510 ... # The local context object knows where it fits in the global array.
511 ... rank = context.rank
512 ... output_path = os.path.join(context.workdir_list[rank], 'traj.trr')
513 ... assert(os.path.exists(output_path))
514 ... print('Worker {} produced {}'.format(rank, output_path))
517 Do not run a gmxapi script in an MPI environment without `mpi4py`. gmxapi
518 will not be able to determine that other processes are running the same
521 Implementation notes:
523 To produce a running session, the Context __enter__() method is called, according to the Python context manager
524 protocol. At this time, the attached WorkSpec must be feasible on the available resources. To turn the specified
525 work into an executable directed acyclic graph (DAG), handle objects for the elements in the work spec are sequenced
526 in dependency-compatible order and the context creates a "builder" for each according to the element's operation.
527 Each builder is subscribed to the builders of its dependency elements. The DAG is then assembled by calling each
528 builder in sequence. A builder can add zero, one, or more nodes and edges to the DAG.
530 The Session is then launched from the DAG. What happens next is implementation-dependent, and it may take a while for
531 us to decide whether and how to standardize interfaces for the DAG nodes and edges and/or execution protocols. I
532 expect each node will at least have a `launch()` method, but will also probably have typed input and output ports as well as some signalling.
533 A sophisticated and abstract Session implementation could schedule work only to satisfy data dependencies of requested
534 output upon request. Our immediate implementation will use the following protocol.
536 Each node has a `launch()` method. When the session is entered, the `launch()` method is called for each node in
537 dependency order. The launch method returns either a callable (`run()` function) or None, raising an exception in
538 case of an error. The sequence of callables is stored by the Session. When Session.run() is called, the
539 sequence of callables is called in order. If StopIteration is raised by the callable, it is removed from the sequence.
540 The sequence is processed repeatedly until there are no more callables.
542 Note that this does not rigorously handle races or deadlocks, or flexibility in automatically chasing dependencies. A more
543 thorough implementation could recursively call launch on dependencies (launch could be idempotent or involve some
544 signalling to dependents when complete), run calls could be entirely event driven, and/or nodes could "publish"
545 output (including just a completion message), blocking for acknowledgement before looking for the next set of subscribed inputs.
548 def __init__(self, work=None, workdir_list=None, communicator=None):
549 """Create manager for computing resources.
551 Does not initialize resources because Python objects by themselves do
552 not have a good way to deinitialize resources. Instead, resources are
553 initialized using the Python context manager protocol when sessions are
556 Appropriate computing resources need to be knowable when the Context is created.
559 work : work specification with which to initialize this context
560 workdir_list : deprecated
561 communicator : non-owning reference to a multiprocessing communicator
563 If provided, communicator must implement the mpi4py.MPI.Comm interface. The
564 Context will use this communicator as the parent for subcommunicators
565 used when launching sessions. If provided, communicator is owned by the
566 caller, and must be freed by the caller after any sessions are closed.
567 By default, the Context will get a reference to MPI_COMM_WORLD, which
568 will be freed when the Python process ends and cleans up its resources.
569 The communicator stored by the Context instance will not be used directly,
570 but will be duplicated when launching sessions using ``with``.
573 # self.__context_array = list([Context(work_element) for work_element in work])
574 from .workflow import WorkSpec
576 # Until better Session abstraction exists at the Python level, a
577 # _session_communicator attribute will be added to and removed from the
578 # context at session entry and exit. If necessary, a _session_ensemble_communicator
579 # will be split from _session_communicator for simulation ensembles
580 # present in the specified work.
581 self.__communicator = communicator
583 self.__work = WorkSpec()
584 self.__workdir_list = workdir_list
587 # Note: this attribute is a detail of MPI-based Contexts. Client access
588 # is subject to change.
591 # `work_width` notes the required width of an array of synchronous tasks to perform the specified work.
592 # As work elements are processed, self.work_width will be increased as appropriate.
593 self.work_width = None
595 # initialize the operations map. May be extended during the lifetime of a Context.
596 # Note that there may be a difference between built-in operations provided by this module and
597 # additional operations registered at run time.
598 self.__operations = dict()
599 # The map contains a builder for each operation. The builder is created by passing the element to the function
600 # in the map. The object returned must have the following methods:
602 # * add_subscriber(another_builder) : allow other builders to subscribe to this one.
603 # * build(dag) : Fulfill the builder responsibilities by adding an arbitrary number of nodes and edges to a Graph.
605 # The gmxapi namespace of operations should be consistent with a specified universal set of functionalities
606 self.__operations['gmxapi'] = {'md': lambda element : _md(self, element),
608 # Even if TPR file loading were to become a common and stable enough operation to be specified in
609 # an API, it is unlikely to be implemented by any code outside of GROMACS, so let's not clutter
610 # a potentially more universal namespace.
611 self.__operations['gromacs'] = {'load_tpr': lambda element : _load_tpr(self, element),
614 # Right now we are treating workspec elements and work DAG nodes as equivalent, but they are
615 # emphatically not intended to be tightly coupled. The work specification is intended to be
616 # simple, user-friendly, general, and easy-to-implement. The DAG is an implementation detail
617 # and may differ across context types. It is likely to have stronger typing of nodes and/or
618 # edges. It is not yet specified whether we should translate the work into a graph before, after,
619 # or during processing of the elements, so it is not yet known whether we will need facilities
620 # to allow cross-referencing between the two graph-type structures. If we instantiate API objects
621 # as we process work elements, and the DAG in a context deviates from the work specification
622 # topology, we would need to use named dependencies to look up objects to bind to. Such facilities
623 # could be hidden in the WorkElement class(es), too, to delegate code away from the Context as a
624 # container class growing without bounds...
625 # In actuality, we will have to process the entire workspec to some degree to make sure we can
626 # run it on the available resources.
629 # This setter must be called after the operations map has been populated.
633 self._api_object = _gmxapi.Context()
634 except Exception as e:
635 logger.error('Got exception when trying to create default library context: ' + str(e))
636 raise exceptions.ApiError('Uncaught exception in API object creation.') from e
643 def work(self, work):
644 """Set `work` attribute.
647 gmx.exceptions.ApiError: work is not compatible with schema or
649 gmx.exceptions.UsageError: Context can not access operations in
650 the name space given for an Element
651 gmx.exceptions.ValueError: assignment operation cannot be performed
652 for the provided object (rhs)
654 For discussion on error handling, see https://github.com/kassonlab/gmxapi/issues/125
656 from .workflow import WorkSpec, WorkElement
660 if isinstance(work, WorkSpec):
662 elif hasattr(work, 'workspec') and isinstance(work.workspec, WorkSpec):
663 workspec = work.workspec
665 raise exceptions.ValueError('work argument must provide a gmx.workflow.WorkSpec.')
666 workspec._context = self
668 # Make sure this context knows how to run the specified work.
669 for e in workspec.elements:
670 element = WorkElement.deserialize(workspec.elements[e])
672 # Note: Non-built-in namespaces (non-native) are treated as modules to import.
673 # Native namespaces may not be completely implemented in a particular version of a particular Context.
674 if element.namespace in {'gmxapi', 'gromacs'}:
675 assert element.namespace in self.__operations
676 if not element.operation in self.__operations[element.namespace]:
677 # The requested element is a built-in operation but not available in this Context.
678 # element.namespace should be mapped, but not all operations are necessarily implemented.
679 logger.error("Operation {} not found in map {}".format(element.operation,
680 str(self.__operations)))
681 # This check should be performed when deciding if the context is appropriate for the work.
682 # If we are just going to use a try/catch block for this test, then we should differentiate
683 # this exception from those raised due to incorrect usage.
684 # The exception thrown here may evolve with https://github.com/kassonlab/gmxapi/issues/125
685 raise exceptions.FeatureNotAvailableError(
686 'Specified work cannot be performed due to unimplemented operation {}.{}.'.format(
691 assert element.namespace not in {'gmxapi', 'gromacs'}
693 # Don't leave an empty nested dictionary if we couldn't map the operation.
694 if element.namespace in self.__operations:
695 namespace_map = self.__operations[element.namespace]
697 namespace_map = dict()
699 # Set or update namespace map iff we have something new.
700 if element.operation not in namespace_map:
702 element_module = importlib.import_module(element.namespace)
703 element_operation = getattr(element_module, element.operation)
704 except ImportError as e:
705 raise exceptions.UsageError(
706 'Cannot find implementation for namespace {}. ImportError: {}'.format(
709 except AttributeError:
710 raise exceptions.UsageError(
711 'Cannot find factory for operation {}.{}'.format(
716 namespace_map[element.operation] = element_operation
717 self.__operations[element.namespace] = namespace_map
719 self.__work = workspec
721 def add_operation(self, namespace, operation, get_builder):
722 # noinspection PyUnresolvedReferences
723 """Add a builder factory to the operation map.
725 Extends the known operations of the Context by mapping an operation in a namespace to a function
726 that returns a builder to process a work element referencing the operation. Must be called before
727 the work specification is added, since the spec is inspected to confirm that the Context can run it.
729 It may be more appropriate to add this functionality to the Context constructor or as auxiliary
730 information in the workspec, or to remove it entirely; it is straight-forward to just add snippets
731 of code to additional files in the working directory and to make them available as modules for the
736 >>> # Import some custom extension code.
738 >>> myelement = myplugin.new_element()
739 >>> workspec = gmx.workflow.WorkSpec()
740 >>> workspec.add_element(myelement)
741 >>> context = gmx.context.ParallelArrayContext()
742 >>> context.add_operation(myelement.namespace, myelement.operation, myplugin.element_translator)
743 >>> context.work = workspec
744 >>> with get_context() as session:
748 if namespace not in self.__operations:
749 if namespace in {'gmxapi', 'gromacs'}:
750 raise exceptions.UsageError("Cannot add operations to built-in namespaces.")
752 self.__operations[namespace] = dict()
754 assert namespace in self.__operations
756 if operation in self.__operations[namespace]:
757 raise exceptions.UsageError("Operation {}.{} already defined in this context.".format(namespace, operation))
759 self.__operations[namespace][operation] = get_builder
761 # Set up a simple ensemble resource
762 # This should be implemented for Session, not Context, and use an appropriate subcommunicator
763 # that is created and freed as the Session launches and exits.
764 def ensemble_update(self, send, recv, tag=None):
765 """Implement the ensemble_update member function that gmxapi through 0.0.6 expects.
768 # gmxapi through 0.0.6 expects to bind to this member function during "build".
769 # This behavior needs to be deprecated (bind during launch, instead), but this
770 # dispatching function should be an effective placeholder.
771 if tag is None or str(tag) == '':
772 raise exceptions.ApiError("ensemble_update must be called with a name tag.")
773 # __ensemble_update is an attribute, not an instance function, so we need to explicitly pass 'self'
774 return self.__ensemble_update(self, send, recv, tag)
777 """Implement Python context manager protocol, producing a Session.
779 The work specified in this Context is inspected to build a directed
780 acyclic dependency graph (DAG). A Session is launched after reconciling
781 the configured work with available computing resources. Each time the
782 `run()` method of the Session is called, the graph is executed, and any
783 nodes that have no more work to do are pruned from the graph.
785 In practice, there are not currently any types of work implemented that
786 do not complete on the first pass.
789 Session object that can be run and/or inspected.
791 Additional API operations are possible while the Session is active. When used as a Python context manager,
792 the Context will close the Session at the end of the `with` block by calling `__exit__`.
794 Note: this is probably where we will have to process the work specification to determine whether we
795 have appropriate resources (such as sufficiently wide parallelism). Until we have a better Session
796 abstraction, this means the clean approach should take two passes to first build a DAG and then
797 instantiate objects to perform the work. In the first implementation, we kind of muddle things into
801 import networkx as nx
802 from networkx import DiGraph as _Graph
804 raise exceptions.FeatureNotAvailableError("gmx requires the networkx package to execute work graphs.")
806 # Cache the working directory from which we were launched so that __exit__() can give us proper context
807 # management behavior.
808 self.__initial_cwd = os.getcwd()
809 logger.debug("Launching session from {}".format(self.__initial_cwd))
811 if self._session is not None:
812 raise exceptions.Error('Already running.')
813 if self.work is None:
814 raise exceptions.UsageError('No work to perform!')
816 # Set up the global and local context.
817 # Check the global MPI configuration
818 # Since the Context doesn't have a destructor, if we use an MPI communicator at this scope then
819 # it has to be owned and managed outside of Context.
820 self._session_communicator = _acquire_communicator(self.__communicator)
821 context_comm_size = self._session_communicator.Get_size()
822 context_rank = self._session_communicator.Get_rank()
823 self.rank = context_rank
824 # self._communicator = communicator
825 logger.debug("Context rank {} in context {} of size {}".format(context_rank,
826 self._session_communicator,
830 # Process the work specification.
832 logger.debug("Processing workspec:\n{}".format(str(self.work)))
834 # Get a builder for DAG components for each element
836 builder_sequence = []
837 for element in self.work:
838 # dispatch builders for operation implementations
840 new_builder = self.__operations[element.namespace][element.operation](element)
841 assert hasattr(new_builder, 'add_subscriber')
842 assert hasattr(new_builder, 'build')
844 logger.info("Collected builder for {}".format(element.name))
845 except LookupError as e:
846 request = '.'.join([element.namespace, element.operation])
847 message = 'Could not find an implementation for the specified operation: {}. '.format(request)
849 raise exceptions.ApiError(message)
850 # Subscribing builders is the Context's responsibility because otherwise the builders
851 # don't know about each other. Builders should not depend on the Context unless they
852 # are a facility provided by the Context, in which case they may be member functions
853 # of the Context. We will probably need to pass at least some
854 # of the Session to the `launch()` method, though...
855 dependencies = element.depends
856 for dependency in dependencies:
857 # If a dependency is a list, assume it is an "ensemble" of dependencies
858 # and pick the element for corresponding to the local rank.
859 if isinstance(dependency, (list, tuple)):
860 assert len(dependency) > context_rank
861 name = str(dependency[context_rank])
864 logger.info("Subscribing {} to {}.".format(element.name, name))
865 builders[name].add_subscriber(new_builder)
866 builders[element.name] = new_builder
867 builder_sequence.append(element.name)
868 logger.debug("Appended {} to builder sequence.".format(element.name))
870 # Call the builders in dependency order
871 # Note: session_communicator is available, but ensemble_communicator has not been created yet.
872 graph = _Graph(width=1)
873 logger.info("Building sequence {}".format(builder_sequence))
874 for name in builder_sequence:
875 builder = builders[name]
876 logger.info("Building {}".format(builder))
877 logger.debug("Has build attribute {}.".format(builder.build))
879 logger.debug("Built.")
880 self.work_width = graph.graph['width']
882 # Prepare working directories. This should probably be moved to some aspect of the Session and either
883 # removed from here or made more explicit to the user.
884 workdir_list = self.__workdir_list
885 if workdir_list is None:
886 workdir_list = [os.path.join('.', str(i)) for i in range(self.work_width)]
887 self.__workdir_list = list([os.path.abspath(dir) for dir in workdir_list])
889 # For gmxapi 0.0.6, all ranks have a session_ensemble_communicator
890 self._session_ensemble_communicator = _get_ensemble_communicator(self._session_communicator, self.work_width)
891 self.__ensemble_update = _get_ensemble_update(self)
893 # launch() is currently a method of gmx.core.MDSystem and returns a gmxapi::Session.
894 # MDSystem objects are obtained from gmx.core.from_tpr(). They also provide add_potential().
895 # gmxapi::Session objects are exposed as gmx.core.MDSession and provide run() and close() methods.
897 # Here, I want to find the input appropriate for this rank and get an MDSession for it.
898 # E.g. Make a pass that allows meta-objects to bind (setting md_proxy._input_tpr and md_proxy._plugins,
899 # and then call a routine implemented by each object to run whatever protocol it needs, such
900 # as `system = gmx.core.from_tpr(md._input_tpr); system.add_potential(md._plugins)
901 # For future design plans, reference https://github.com/kassonlab/gmxapi/issues/65
903 # This `if` condition is currently the thing that ultimately determines whether the
904 # rank attempts to do work.
905 if context_rank < self.work_width:
907 logger.debug(("Launching graph {}.".format(graph.graph)))
908 logger.debug("Graph nodes: {}".format(str(list(graph.nodes))))
909 logger.debug("Graph edges: {}".format(str(list(graph.edges))))
911 logger.info("Launching work on context rank {}, subcommunicator rank {}.".format(
913 self._session_ensemble_communicator.Get_rank()))
915 # Launch the work for this rank
916 self.workdir = self.__workdir_list[self.rank]
917 if os.path.exists(self.workdir):
918 if not os.path.isdir(self.workdir):
919 raise exceptions.Error('{} is not a valid working directory.'.format(self.workdir))
921 os.mkdir(self.workdir)
922 os.chdir(self.workdir)
923 logger.info('rank {} changed directory to {}'.format(self.rank, self.workdir))
924 sorted_nodes = nx.topological_sort(graph)
927 for name in sorted_nodes:
928 launcher = graph.nodes[name]['launch']
929 runner = launcher(self.rank)
930 if not runner is None:
931 runners.append(runner)
932 closers.append(graph.nodes[name]['close'])
934 # Get a session object to return. It must simply provide a `run()` function.
935 context = self # Part of workaround for bug gmxapi-214
936 class Session(object):
937 def __init__(self, runners, closers):
938 self.runners = list(runners)
939 self.closers = list(closers)
942 # Note we are not following the documented protocol of running repeatedly yet.
944 for i, runner in enumerate(self.runners):
947 except StopIteration:
948 to_be_deleted.insert(0, i)
949 for i in to_be_deleted:
954 for close in self.closers:
955 logger.debug("Closing node: {}".format(close))
957 # Workaround for bug gmxapi-214
958 if not _gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'):
959 context._api_object = _gmxapi.Context()
961 self._session = Session(runners, closers)
963 logger.info("Context rank {} has no work to do".format(self.rank))
965 context = self # Part of workaround for bug gmxapi-214
966 class NullSession(object):
968 logger.info("Running null session on rank {}.".format(self.rank))
971 logger.info("Closing null session.")
972 # Workaround for bug gmxapi-214
973 if not _gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'):
974 context._api_object = _gmxapi.Context()
977 self._session = NullSession()
978 self._session.rank = self.rank
980 # Make sure session has started on all ranks before continuing?
982 self._session.graph = graph
985 def __exit__(self, exception_type, exception_value, traceback):
986 """Implement Python context manager protocol."""
987 logger.info("Exiting session on context rank {}.".format(self.rank))
988 if self._session is not None:
989 logger.info("Calling session.close().")
990 self._session.close()
993 # Note: we should not have a None session but rather an API-compliant Session that just has no work.
994 # Reference: https://github.com/kassonlab/gmxapi/issues/41
995 logger.info("No _session known to context or session already closed.")
996 if hasattr(self, '_session_ensemble_communicator'):
997 if self._session_communicator is not None:
998 logger.info("Freeing sub-communicator {} on rank {}".format(
999 self._session_ensemble_communicator,
1001 self._session_ensemble_communicator.Free()
1003 logger.debug('"None" ensemble communicator does not need to be "Free"d.')
1004 del self._session_ensemble_communicator
1006 logger.debug("No ensemble subcommunicator on context rank {}.".format(self.rank))
1008 logger.debug("Freeing session communicator.")
1009 self._session_communicator.Free()
1010 logger.debug("Deleting session communicator reference.")
1011 del self._session_communicator
1013 os.chdir(self.__initial_cwd)
1014 logger.info("Session closed on context rank {}.".format(self.rank))
1015 # Note: Since sessions running in different processes can have different work, sessions have not necessarily
1016 # ended on all ranks. As a result, starting another session on the same resources could block until the
1017 # resources are available.
1019 # Python context managers return False when there were no exceptions to handle.
1023 def get_context(work=None):
1024 """Get a concrete Context object.
1027 work (gmx.workflow.WorkSpec): runnable work as a valid gmx.workflow.WorkSpec object
1030 An object implementing the :py:class:`gmx.context.Context` interface, if possible.
1033 gmx.exceptions.ValueError if an appropriate context for ``work`` could not be loaded.
1035 If work is provided, return a Context object capable of running the provided work or produce an error.
1037 The semantics for finding Context implementations needs more consideration, and a more informative exception
1040 A Context can run the provided work if
1042 * the Context supports can resolve all operations specified in the elements
1043 * the Context supports DAG topologies implied by the network of dependencies
1044 * the Context supports features required by the elements with the specified parameters,
1045 such as synchronous array jobs.
1048 # We need to define an interface for WorkSpec objects so that we don't need
1049 # to rely on typing and inter-module dependencies.
1050 from .workflow import WorkSpec
1052 if work is not None:
1053 if isinstance(work, WorkSpec):
1055 elif hasattr(work, 'workspec') and isinstance(work.workspec,
1057 workspec = work.workspec
1059 raise exceptions.ValueError('work argument must provide a gmx.workflow.WorkSpec.')
1060 if workspec is not None and \
1061 hasattr(workspec, '_context') and \
1062 workspec._context is not None:
1063 context = workspec._context
1065 context = Context(work=workspec)