SYCL: Avoid using no_init read accessor in rocFFT
[alexxy/gromacs.git] / python_packaging / src / gmxapi / simulation / context.py
1 #
2 # This file is part of the GROMACS molecular simulation package.
3 #
4 # Copyright (c) 2019, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
8 #
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
13 #
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 # Lesser General Public License for more details.
18 #
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
23 #
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
31 #
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
34 #
35 # This file is based on the Kasson Lab gmxapi project release 0.0.7.4.
36 # https://github.com/kassonlab/gmxapi/blob/v0.0.7.4/src/gmx/context.py
37 # https://github.com/kassonlab/gmxapi/blob/v0.0.7.4/LICENSE
38 """
39 Execution Context
40 =================
41 """
42
43 __all__ = ['Context']
44
45 import importlib
46 import os
47 import warnings
48 import tempfile
49
50 from gmxapi import exceptions
51 from gmxapi import logger as root_logger
52 import gmxapi._gmxapi as _gmxapi
53
54 # Module-level logger
55 logger = root_logger.getChild('simulation.context')
56 logger.info('Importing {}'.format(__name__))
57
58
59 def _load_tpr(self, element):
60     """Implement the gromacs.load_tpr operation.
61
62     Updates the minimum width of the workflow parallelism. Does not add any API object to the graph.
63
64     Arguments:
65         self: The Context in which this operation is being loaded.
66         element: WorkElement specifying the operation.
67
68     Returns:
69         A Director that the Context can use in launching the Session.
70     """
71     class Builder(object):
72         def __init__(self, tpr_list):
73             logger.debug("Loading tpr builder for tpr_list {}".format(tpr_list))
74             self.tpr_list = tpr_list
75             self.subscribers = []
76             self.width = len(tpr_list)
77
78         def add_subscriber(self, builder):
79             builder.infile = self.tpr_list
80             self.subscribers.append(builder)
81
82         def build(self, dag):
83             width = len(self.tpr_list)
84             for builder in self.subscribers:
85                 builder.width = width
86             if 'width' in dag.graph:
87                 width = max(width, dag.graph['width'])
88             dag.graph['width'] = width
89
90     return Builder(element.params['input'])
91
92 def _md(context, element):
93     """Implement the gmxapi.md operation by returning a builder that can populate a data flow graph for the element.
94
95     Inspects dependencies to set up the simulation runner.
96
97     The graph node created will have `launch` and `run` attributes with function references, and a `width`
98     attribute declaring the workflow parallelism requirement.
99
100     Arguments:
101         context: The Context in which this operation is being loaded.
102         element: WorkElement specifying the operation.
103
104     Returns:
105         A Director that the Context can use in launching the Session.
106     """
107     class Builder(object):
108         """Translate md work element to a node in the session's DAG."""
109         def __init__(self, element):
110             try:
111                 self.name = element.name
112                 # Note that currently the calling code is in charge of subscribing this builder to its dependencies.
113                 # A list of tpr files will be set when the calling code subscribes this builder to a tpr provider.
114                 self.infile = None
115                 # Other dependencies in the element may register potentials when subscribed to.
116                 self.potential = []
117                 self.input_nodes = []
118                 self.runtime_params = element.params
119             except AttributeError:
120                 raise exceptions.ValueError("object provided does not seem to be a WorkElement.")
121         def add_subscriber(self, builder):
122             """The md operation does not yet have any subscribeable facilities."""
123             pass
124         def build(self, dag):
125             """Add a node to the graph that, when launched, will construct a simulation runner.
126
127             Complete the definition of appropriate graph edges for dependencies.
128
129             The launch() method of the added node creates the runner from the tpr file for the current rank and adds
130             modules from the incoming edges.
131             """
132             if not (hasattr(dag, 'add_node')
133                     and hasattr(dag, 'add_edge')
134                     and hasattr(dag, 'graph')
135                     and hasattr(dag, 'nodes')):
136                 raise exceptions.ProtocolError("Argument 'dag' must provide the DiGraph interface.")
137             name = self.name
138             dag.add_node(name)
139             for neighbor in self.input_nodes:
140                 dag.add_edge(neighbor, name)
141             infile = self.infile
142             assert not infile is None
143             potential_list = self.potential
144             assert dag.graph['width'] >= len(infile)
145
146             # Provide closure with which to execute tasks for this node.
147             def launch(rank=None):
148                 assert not rank is None
149
150                 # Copy and update, if required by `end_time` parameter.
151                 temp_filename = None
152                 if 'end_time' in self.runtime_params:
153                     # Note that mkstemp returns a file descriptor as the first part of the tuple.
154                     # We can make this cleaner in 0.0.7 with a separate node that manages the
155                     # altered input.
156                     _, temp_filename = tempfile.mkstemp(suffix='.tpr')
157                     logger.debug('Updating input. Using temp file {}'.format(temp_filename))
158                     _gmxapi.rewrite_tprfile(source=infile[rank],
159                                          destination=temp_filename,
160                                          end_time=self.runtime_params['end_time'])
161                     tpr_file = temp_filename
162                 else:
163                     tpr_file = infile[rank]
164
165                 logger.info('Loading TPR file: {}'.format(tpr_file))
166                 system = _gmxapi.from_tpr(tpr_file)
167                 dag.nodes[name]['system'] = system
168                 mdargs = _gmxapi.MDArgs()
169                 mdargs.set(self.runtime_params)
170                 # Workaround to give access to plugin potentials used in a context.
171                 pycontext = element.workspec._context
172                 pycontext.potentials = potential_list
173                 context = pycontext._api_object
174                 context.setMDArgs(mdargs)
175                 for potential in potential_list:
176                     context.add_mdmodule(potential)
177                 dag.nodes[name]['session'] = system.launch(context)
178                 dag.nodes[name]['close'] = dag.nodes[name]['session'].close
179
180                 if 'end_time' in self.runtime_params:
181                     def special_close():
182                         dag.nodes[name]['session'].close()
183                         logger.debug("Unlinking temporary TPR file {}.".format(temp_filename))
184                         os.unlink(temp_filename)
185                     dag.nodes[name]['close'] = special_close
186                 else:
187                     dag.nodes[name]['close'] = dag.nodes[name]['session'].close
188
189                 def runner():
190                     """Currently we only support a single call to run."""
191                     def done():
192                         raise StopIteration()
193                     # Replace the runner with a stop condition for subsequent passes.
194                     dag.nodes[name]['run'] = done
195                     return dag.nodes[name]['session'].run()
196                 dag.nodes[name]['run'] = runner
197                 return dag.nodes[name]['run']
198
199             dag.nodes[name]['launch'] = launch
200
201     return Builder(element)
202
203
204 def _get_mpi_ensemble_communicator(session_communicator, ensemble_size):
205     """Get an ensemble communicator from an MPI communicator.
206
207     An ensemble communicator is an object that implements mpi4py.MPI.Comm methods
208     as described elsewhere in this documentation.
209
210     :param session_communicator: MPI communicator with the interface described by mpi4py.MPI.Comm
211     :param ensemble_size: number of ensemble members
212     :return: communicator of described size on participating ranks and null communicator on any others.
213
214     Must be called exactly once in every process in `communicator`. It is the
215     responsibility of the calling code to refrain from running ensemble operations
216     if not part of the ensemble. The calling code determines this by comparing its
217     session_communicator.Get_rank() to ensemble_size. This is not a good solution
218     because it assumes there is a single ensemble communicator and that ensemble
219     work is allocated to ranks serially from session_rank 0. Future work might
220     use process groups associated with specific operations in the work graph so
221     that processes can check for membership in a group to decide whether to use
222     their ensemble communicator. Another possibility would be to return None
223     rather than a null communicator in processes that aren't participating in
224     a given ensemble.
225     """
226     try:
227         from mpi4py import MPI
228     except ImportError:
229         raise exceptions.FeatureNotAvailableError('MPI ensemble communicator requires mpi4py package.')
230
231     session_size = session_communicator.Get_size()
232     session_rank = session_communicator.Get_rank()
233
234     # Check the ensemble "width" against the available parallelism
235     if ensemble_size > session_size:
236         msg = 'ParallelArrayContext requires a work array that fits in the MPI communicator: '
237         msg += 'array width {} > size {}.'
238         msg = msg.format(ensemble_size, session_size)
239         raise exceptions.UsageError(msg)
240     if ensemble_size < session_size:
241         msg = 'MPI context is wider than necessary to run this work:  array width {} vs. size {}.'
242         warnings.warn(msg.format(ensemble_size, session_size))
243
244     # Create an appropriate sub-communicator for the present work. Extra ranks will be in a
245     # sub-communicator with no work.
246     if session_rank < ensemble_size:
247         # The session launcher should maintain an inventory of the ensembles and
248         # provide an appropriate tag, but right now we just have a sort of
249         # Boolean: ensemble or not.
250         color = 0
251     else:
252         color = MPI.UNDEFINED
253
254     ensemble_communicator = session_communicator.Split(color, session_rank)
255     try:
256         ensemble_communicator_size = ensemble_communicator.Get_size()
257         ensemble_communicator_rank = ensemble_communicator.Get_rank()
258     except:
259         warnings.warn("Possible API programming error: ensemble_communicator does not provide required methods...")
260         ensemble_communicator_size = 0
261         ensemble_communicator_rank = None
262     logger.info("Session rank {} assigned to rank {} of subcommunicator {} of size {}".format(
263         session_rank,
264         ensemble_communicator_rank,
265         ensemble_communicator,
266         ensemble_communicator_size
267     ))
268
269     # There isn't a good reason to worry about special handling for a null communicator,
270     # which we have to explicitly avoid "free"ing, so let's just get rid of it.
271     # To do: don't even get the null communicator in the first place. Use a group and create instead of split.
272     if ensemble_communicator == MPI.COMM_NULL:
273         ensemble_communicator = None
274
275     return ensemble_communicator
276
277
278 class _DummyCommunicator(object):
279     """Placeholder class for trivial communication resources.
280
281     Simplifies logic in this module until communications infrastructure is more robust.
282     """
283
284     def __init__(self):
285         import numpy
286         self._numpy = numpy
287
288     def Dup(self):
289         return self
290
291     def Free(self):
292         return
293
294     def Allreduce(self, send, recv):
295         logger.debug("Faking an Allreduce for ensemble of size 1.")
296         send_buffer = self._numpy.array(send, copy=False)
297         recv_buffer = self._numpy.array(recv, copy=False)
298         recv_buffer[:] = send_buffer[:]
299
300     def Get_size(self):
301         return 1
302
303     def Get_rank(self):
304         return 0
305
306     def __str__(self):
307         return '_DummyCommunicator'
308
309     def __repr__(self):
310         return '_DummyCommunicator()'
311
312
313 def _acquire_communicator(communicator=None):
314     """Get a workflow level communicator for the session.
315
316     This function is intended to be called by the __enter__ method that creates
317     a session get a communicator instance. The `Free` method of the returned
318     instance must be called exactly once. This should be performed by the
319     corresponding __exit__ method.
320
321     Arguments:
322         communicator : a communicator to duplicate (optional)
323
324     Returns:
325         A communicator that must be explicitly freed by the caller.
326
327     Currently only supports MPI multi-simulation parallelism dependent on
328     mpi4py. The mpi4py package should be installed and built with compilers
329     that are compatible with the gmxapi installation.
330
331     If provided, `communicator` must provide the mpi4py.MPI.Comm interface.
332     Returns either a duplicate of `communicator` or of MPI_COMM_WORLD if mpi4py
333     is available. Otherwise, returns a mock communicator that can only manage
334     sessions and ensembles of size 0 or 1.
335
336     gmx behavior is undefined if launched with mpiexec and without mpi4py
337     """
338
339     if communicator is None:
340         try:
341             import mpi4py.MPI as _MPI
342         except ImportError:
343             _MPI = None
344         if _MPI is None:
345             logger.info("mpi4py is not available for default session communication.")
346             communicator = _DummyCommunicator()
347         else:
348             communicator = _MPI.COMM_WORLD
349     else:
350         communicator = communicator
351
352     try:
353         new_communicator = communicator.Dup()
354     except Exception as e:
355         message = "Exception when duplicating communicator: {}".format(e)
356         raise exceptions.ApiError(message)
357
358     return new_communicator
359
360
361 def _get_ensemble_communicator(communicator, ensemble_size):
362     """Provide ensemble_communicator feature in active_context, if possible.
363
364     Must be called on all ranks in `communicator`. The communicator returned
365     must be freed by a call to its `Free()` instance method. This function is
366     best used in a context manager's `__enter__()` method so that the
367     corresponding `context.Free()` can be called in the `__exit__` method.
368
369     Arguments:
370         communicator : session communicator for the session with the ensemble.
371         ensemble_size : ensemble size of the requested ensemble communicator
372
373     The ensemble_communicator feature should be present if the Context can
374     provide communication between all ensemble members. The Context should
375     determine this at the launch of the session and set the
376     ``_session_ensemble_communicator`` attribute to provide an object that
377     implements the same interface as an mpi4py.MPI.Comm object. Actually, this is
378     a temporary shim, so the only methods that need to be available are `Get_size`,
379     `Get_rank` and something that can be called as
380     Allreduce(send, recv) where send and recv are objects providing the Python
381     buffer interface.
382
383     Currently, only one ensemble can be managed in a session.
384     """
385     ensemble_communicator = None
386
387     # For trivial cases, don't bother trying to use MPI
388     # Note: all ranks in communicator must agree on the size of the work!
389     # Note: If running with a dummy session communicator in an MPI session (user error)
390     # every rank will think it is the only rank and will try to perform the
391     # same work.
392     if communicator.Get_size() <= 1 or ensemble_size <= 1:
393         message = "Getting trivial ensemble communicator for ensemble of size {}".format((ensemble_size))
394         message += " for session rank {} in session communicator of size {}".format(
395             communicator.Get_rank(),
396             communicator.Get_size())
397         logger.debug(message)
398         ensemble_communicator = _DummyCommunicator()
399     else:
400         message = "Getting an MPI subcommunicator for ensemble of size {}".format(ensemble_size)
401         message += " for session rank {} in session communicator of size {}".format(
402             communicator.Get_rank(),
403             communicator.Get_size())
404         logger.debug(message)
405         ensemble_communicator = _get_mpi_ensemble_communicator(communicator, ensemble_size)
406
407     return ensemble_communicator
408
409 def _get_ensemble_update(context):
410     """Set up a simple ensemble resource.
411
412     The context should call this function once per session to get an `ensemble_update`
413     function object.
414
415     This is a draft of a Context feature that may not be available in all
416     Context implementations. This factory function can be wrapped as a
417     ``ensemble_update`` "property" in a Context instance method to produce a Python function
418     with the signature ``update(context, send, recv, tag=None)``.
419
420     This feature requires that the Context is capabable of providing the
421     ensemble_communicator feature and the numpy feature.
422     If both are available, the function object provided by
423     ``ensemble_update`` provides
424     the ensemble reduce operation used by the restraint potential plugin in the
425     gmxapi sample_restraint repository. Otherwise, the provided function object
426     will log an error and then raise an exception.
427
428     gmxapi 0.0.5 and 0.0.6 MD plugin clients look for a member function named
429     ``ensemble_update`` in the Context that launched them. In the future,
430     clients will use session resources to access ensemble reduce operations.
431     In the mean time, a transitional implementation can involve defining a
432     ``ensemble_update`` property in the Context object that acts as a factory
433     function to produce the reducing operation, if possible with the given
434     resources.
435     """
436     try:
437         import numpy
438     except ImportError:
439         message = "ensemble_update requires numpy, but numpy is not available."
440         logger.error(message)
441         raise exceptions.FeatureNotAvailableError(message)
442
443     def _ensemble_update(active_context, send, recv, tag=None):
444         assert not tag is None
445         assert str(tag) != ''
446         if not tag in active_context.part:
447             active_context.part[tag] = 0
448         logger.debug("Performing ensemble update.")
449         active_context._session_ensemble_communicator.Allreduce(send, recv)
450         buffer = numpy.array(recv, copy=False)
451         buffer /= active_context.work_width
452         suffix = '_{}.npz'.format(tag)
453         # These will end up in the working directory and each ensemble member will have one
454         filename = str("rank{}part{:04d}{}".format(active_context.rank, int(active_context.part[tag]), suffix))
455         numpy.savez(filename, recv=recv)
456         active_context.part[tag] += 1
457
458     def _no_ensemble_update(active_context, send, recv, tag=None):
459         message = "Attempt to call ensemble_update() in a Context that does not provide the operation."
460         # If we confirm effective exception handling, remove the extraneous log.
461         logger.error(message)
462         raise exceptions.FeatureNotAvailableError(message)
463
464     if context._session_ensemble_communicator is not None:
465         functor = _ensemble_update
466     else:
467         functor = _no_ensemble_update
468     context.part = {}
469     return functor
470
471
472 class Context(object):
473     """Manage an array of simulation work executing in parallel.
474
475     .. deprecated:: 0.0.7
476
477     This execution context implementation is imported from the gmxapi 0.0.7
478     package for GROMACS 2019 and does not conform to the protocols of gmxapi 0.1+.
479     It is used internally to support legacy code.
480
481     The following features are subject to be changed or removed without further
482     notice.
483
484     Attributes:
485         work :obj:`gmx.workflow.WorkSpec`: specification of work to be performed when a session is launched.
486         rank : numerical index of the current worker in a running session (None if not running)
487         work_width : Detected number of co-scheduled work elements.
488         elements : dictionary of references to elements of the workflow.
489
490     `rank`, `work_width`, and `elements` are empty or None until the work is processed, as during session launch.
491
492     To run multiple simulations at a time, whether ensembles or independent
493     simulations, Python should be invoked in an MPI environment with `mpi4py`.
494     The Context can then use one MPI rank per simulation. It is recommended that
495     GROMACS is compiled without MPI in this case. (Thread-MPI is recommended.)
496     MPI domain decomposition is not explicitly supported with this Context
497     implementation.
498
499     Example:
500
501         >>> from gmxapi.simulation.context import get_context
502         >>> from gmxapi.simulation.workflow import from_tpr
503         >>> work = from_tpr([tpr_filename, tpr_filename])
504         >>> context = get_context(work)
505         >>> with context as session:
506         ...    session.run()
507         ...    # The session is one abstraction too low to know what rank it is. It lets the spawning context manage
508         ...    # such things.
509         ...    # rank = session.rank
510         ...    # The local context object knows where it fits in the global array.
511         ...    rank = context.rank
512         ...    output_path = os.path.join(context.workdir_list[rank], 'traj.trr')
513         ...    assert(os.path.exists(output_path))
514         ...    print('Worker {} produced {}'.format(rank, output_path))
515
516     Warning:
517         Do not run a gmxapi script in an MPI environment without `mpi4py`. gmxapi
518         will not be able to determine that other processes are running the same
519         script.
520
521     Implementation notes:
522
523     To produce a running session, the Context __enter__() method is called, according to the Python context manager
524     protocol. At this time, the attached WorkSpec must be feasible on the available resources. To turn the specified
525     work into an executable directed acyclic graph (DAG), handle objects for the elements in the work spec are sequenced
526     in dependency-compatible order and the context creates a "builder" for each according to the element's operation.
527     Each builder is subscribed to the builders of its dependency elements. The DAG is then assembled by calling each
528     builder in sequence. A builder can add zero, one, or more nodes and edges to the DAG.
529
530     The Session is then launched from the DAG. What happens next is implementation-dependent, and it may take a while for
531     us to decide whether and how to standardize interfaces for the DAG nodes and edges and/or execution protocols. I
532     expect each node will at least have a `launch()` method, but will also probably have typed input and output ports as well as some signalling.
533     A sophisticated and abstract Session implementation could schedule work only to satisfy data dependencies of requested
534     output upon request. Our immediate implementation will use the following protocol.
535
536     Each node has a `launch()` method. When the session is entered, the `launch()` method is called for each node in
537     dependency order. The launch method returns either a callable (`run()` function) or None, raising an exception in
538     case of an error. The sequence of callables is stored by the Session. When Session.run() is called, the
539     sequence of callables is called in order. If StopIteration is raised by the callable, it is removed from the sequence.
540     The sequence is processed repeatedly until there are no more callables.
541
542     Note that this does not rigorously handle races or deadlocks, or flexibility in automatically chasing dependencies. A more
543     thorough implementation could recursively call launch on dependencies (launch could be idempotent or involve some
544     signalling to dependents when complete), run calls could be entirely event driven, and/or nodes could "publish"
545     output (including just a completion message), blocking for acknowledgement before looking for the next set of subscribed inputs.
546     """
547
548     def __init__(self, work=None, workdir_list=None, communicator=None):
549         """Create manager for computing resources.
550
551         Does not initialize resources because Python objects by themselves do
552         not have a good way to deinitialize resources. Instead, resources are
553         initialized using the Python context manager protocol when sessions are
554         entered and exited.
555
556         Appropriate computing resources need to be knowable when the Context is created.
557
558         Keyword Arguments:
559             work : work specification with which to initialize this context
560             workdir_list : deprecated
561             communicator : non-owning reference to a multiprocessing communicator
562
563         If provided, communicator must implement the mpi4py.MPI.Comm interface. The
564         Context will use this communicator as the parent for subcommunicators
565         used when launching sessions. If provided, communicator is owned by the
566         caller, and must be freed by the caller after any sessions are closed.
567         By default, the Context will get a reference to MPI_COMM_WORLD, which
568         will be freed when the Python process ends and cleans up its resources.
569         The communicator stored by the Context instance will not be used directly,
570         but will be duplicated when launching sessions using ``with``.
571         """
572
573         # self.__context_array = list([Context(work_element) for work_element in work])
574         from .workflow import WorkSpec
575
576         # Until better Session abstraction exists at the Python level, a
577         # _session_communicator attribute will be added to and removed from the
578         # context at session entry and exit. If necessary, a _session_ensemble_communicator
579         # will be split from _session_communicator for simulation ensembles
580         # present in the specified work.
581         self.__communicator = communicator
582
583         self.__work = WorkSpec()
584         self.__workdir_list = workdir_list
585
586         self._session = None
587         # Note: this attribute is a detail of MPI-based Contexts. Client access
588         # is subject to change.
589         self.rank = None
590
591         # `work_width` notes the required width of an array of synchronous tasks to perform the specified work.
592         # As work elements are processed, self.work_width will be increased as appropriate.
593         self.work_width = None
594
595         # initialize the operations map. May be extended during the lifetime of a Context.
596         # Note that there may be a difference between built-in operations provided by this module and
597         # additional operations registered at run time.
598         self.__operations = dict()
599         # The map contains a builder for each operation. The builder is created by passing the element to the function
600         # in the map. The object returned must have the following methods:
601         #
602         #   * add_subscriber(another_builder) : allow other builders to subscribe to this one.
603         #   * build(dag) : Fulfill the builder responsibilities by adding an arbitrary number of nodes and edges to a Graph.
604         #
605         # The gmxapi namespace of operations should be consistent with a specified universal set of functionalities
606         self.__operations['gmxapi'] = {'md': lambda element : _md(self, element),
607                                       }
608         # Even if TPR file loading were to become a common and stable enough operation to be specified in
609         # an API, it is unlikely to be implemented by any code outside of GROMACS, so let's not clutter
610         # a potentially more universal namespace.
611         self.__operations['gromacs'] = {'load_tpr': lambda element : _load_tpr(self, element),
612                                        }
613
614         # Right now we are treating workspec elements and work DAG nodes as equivalent, but they are
615         # emphatically not intended to be tightly coupled. The work specification is intended to be
616         # simple, user-friendly, general, and easy-to-implement. The DAG is an implementation detail
617         # and may differ across context types. It is likely to have stronger typing of nodes and/or
618         # edges. It is not yet specified whether we should translate the work into a graph before, after,
619         # or during processing of the elements, so it is not yet known whether we will need facilities
620         # to allow cross-referencing between the two graph-type structures. If we instantiate API objects
621         # as we process work elements, and the DAG in a context deviates from the work specification
622         # topology, we would need to use named dependencies to look up objects to bind to. Such facilities
623         # could be hidden in the WorkElement class(es), too, to delegate code away from the Context as a
624         # container class growing without bounds...
625         # In actuality, we will have to process the entire workspec to some degree to make sure we can
626         # run it on the available resources.
627         self.elements = None
628
629         # This setter must be called after the operations map has been populated.
630         self.work = work
631
632         try:
633             self._api_object = _gmxapi.Context()
634         except Exception as e:
635             logger.error('Got exception when trying to create default library context: ' + str(e))
636             raise exceptions.ApiError('Uncaught exception in API object creation.') from e
637
638     @property
639     def work(self):
640         return self.__work
641
642     @work.setter
643     def work(self, work):
644         """Set `work` attribute.
645
646         Raises:
647             gmx.exceptions.ApiError: work is not compatible with schema or
648                 known operations.
649             gmx.exceptions.UsageError: Context can not access operations in
650                 the name space given for an Element
651             gmx.exceptions.ValueError: assignment operation cannot be performed
652                 for the provided object (rhs)
653
654         For discussion on error handling, see https://github.com/kassonlab/gmxapi/issues/125
655         """
656         from .workflow import WorkSpec, WorkElement
657         if work is None:
658             return
659
660         if isinstance(work, WorkSpec):
661             workspec = work
662         elif hasattr(work, 'workspec') and isinstance(work.workspec, WorkSpec):
663             workspec = work.workspec
664         else:
665             raise exceptions.ValueError('work argument must provide a gmx.workflow.WorkSpec.')
666         workspec._context = self
667
668         # Make sure this context knows how to run the specified work.
669         for e in workspec.elements:
670             element = WorkElement.deserialize(workspec.elements[e])
671
672             # Note: Non-built-in namespaces (non-native) are treated as modules to import.
673             # Native namespaces may not be completely implemented in a particular version of a particular Context.
674             if element.namespace in {'gmxapi', 'gromacs'}:
675                 assert element.namespace in self.__operations
676                 if not element.operation in self.__operations[element.namespace]:
677                     # The requested element is a built-in operation but not available in this Context.
678                     # element.namespace should be mapped, but not all operations are necessarily implemented.
679                     logger.error("Operation {} not found in map {}".format(element.operation,
680                                                                            str(self.__operations)))
681                     # This check should be performed when deciding if the context is appropriate for the work.
682                     # If we are just going to use a try/catch block for this test, then we should differentiate
683                     # this exception from those raised due to incorrect usage.
684                     # The exception thrown here may evolve with https://github.com/kassonlab/gmxapi/issues/125
685                     raise exceptions.FeatureNotAvailableError(
686                         'Specified work cannot be performed due to unimplemented operation {}.{}.'.format(
687                             element.namespace,
688                             element.operation))
689
690             else:
691                 assert element.namespace not in {'gmxapi', 'gromacs'}
692
693                 # Don't leave an empty nested dictionary if we couldn't map the operation.
694                 if element.namespace in self.__operations:
695                     namespace_map = self.__operations[element.namespace]
696                 else:
697                     namespace_map = dict()
698
699                 # Set or update namespace map iff we have something new.
700                 if element.operation not in namespace_map:
701                     try:
702                         element_module = importlib.import_module(element.namespace)
703                         element_operation = getattr(element_module, element.operation)
704                     except ImportError as e:
705                         raise exceptions.UsageError(
706                             'Cannot find implementation for namespace {}. ImportError: {}'.format(
707                                 element.namespace,
708                                 str(e)))
709                     except AttributeError:
710                         raise exceptions.UsageError(
711                             'Cannot find factory for operation {}.{}'.format(
712                                 element.namespace,
713                                 element.operation
714                             )
715                         )
716                     namespace_map[element.operation] = element_operation
717                     self.__operations[element.namespace] = namespace_map
718
719         self.__work = workspec
720
721     def add_operation(self, namespace, operation, get_builder):
722         # noinspection PyUnresolvedReferences
723         """Add a builder factory to the operation map.
724
725         Extends the known operations of the Context by mapping an operation in a namespace to a function
726         that returns a builder to process a work element referencing the operation. Must be called before
727         the work specification is added, since the spec is inspected to confirm that the Context can run it.
728
729         It may be more appropriate to add this functionality to the Context constructor or as auxiliary
730         information in the workspec, or to remove it entirely; it is straight-forward to just add snippets
731         of code to additional files in the working directory and to make them available as modules for the
732         Context to import.
733
734         Example:
735
736             >>> # Import some custom extension code.
737             >>> import myplugin
738             >>> myelement = myplugin.new_element()
739             >>> workspec = gmx.workflow.WorkSpec()
740             >>> workspec.add_element(myelement)
741             >>> context = gmx.context.ParallelArrayContext()
742             >>> context.add_operation(myelement.namespace, myelement.operation, myplugin.element_translator)
743             >>> context.work = workspec
744             >>> with get_context() as session:
745             ...    session.run()
746
747         """
748         if namespace not in self.__operations:
749             if namespace in {'gmxapi', 'gromacs'}:
750                 raise exceptions.UsageError("Cannot add operations to built-in namespaces.")
751             else:
752                 self.__operations[namespace] = dict()
753         else:
754             assert namespace in self.__operations
755
756         if operation in self.__operations[namespace]:
757             raise exceptions.UsageError("Operation {}.{} already defined in this context.".format(namespace, operation))
758         else:
759             self.__operations[namespace][operation] = get_builder
760
761     # Set up a simple ensemble resource
762     # This should be implemented for Session, not Context, and use an appropriate subcommunicator
763     # that is created and freed as the Session launches and exits.
764     def ensemble_update(self, send, recv, tag=None):
765         """Implement the ensemble_update member function that gmxapi through 0.0.6 expects.
766
767         """
768         # gmxapi through 0.0.6 expects to bind to this member function during "build".
769         # This behavior needs to be deprecated (bind during launch, instead), but this
770         # dispatching function should be an effective placeholder.
771         if tag is None or str(tag) == '':
772             raise exceptions.ApiError("ensemble_update must be called with a name tag.")
773         # __ensemble_update is an attribute, not an instance function, so we need to explicitly pass 'self'
774         return self.__ensemble_update(self, send, recv, tag)
775
776     def __enter__(self):
777         """Implement Python context manager protocol, producing a Session.
778
779         The work specified in this Context is inspected to build a directed
780         acyclic dependency graph (DAG). A Session is launched after reconciling
781         the configured work with available computing resources. Each time the
782         `run()` method of the Session is called, the graph is executed, and any
783         nodes that have no more work to do are pruned from the graph.
784
785         In practice, there are not currently any types of work implemented that
786         do not complete on the first pass.
787
788         Returns:
789             Session object that can be run and/or inspected.
790
791         Additional API operations are possible while the Session is active. When used as a Python context manager,
792         the Context will close the Session at the end of the `with` block by calling `__exit__`.
793
794         Note: this is probably where we will have to process the work specification to determine whether we
795         have appropriate resources (such as sufficiently wide parallelism). Until we have a better Session
796         abstraction, this means the clean approach should take two passes to first build a DAG and then
797         instantiate objects to perform the work. In the first implementation, we kind of muddle things into
798         a single pass.
799         """
800         try:
801             import networkx as nx
802             from networkx import DiGraph as _Graph
803         except ImportError:
804             raise exceptions.FeatureNotAvailableError("gmx requires the networkx package to execute work graphs.")
805
806         # Cache the working directory from which we were launched so that __exit__() can give us proper context
807         # management behavior.
808         self.__initial_cwd = os.getcwd()
809         logger.debug("Launching session from {}".format(self.__initial_cwd))
810
811         if self._session is not None:
812             raise exceptions.Error('Already running.')
813         if self.work is None:
814             raise exceptions.UsageError('No work to perform!')
815
816         # Set up the global and local context.
817         # Check the global MPI configuration
818         # Since the Context doesn't have a destructor, if we use an MPI communicator at this scope then
819         # it has to be owned and managed outside of Context.
820         self._session_communicator = _acquire_communicator(self.__communicator)
821         context_comm_size = self._session_communicator.Get_size()
822         context_rank = self._session_communicator.Get_rank()
823         self.rank = context_rank
824         # self._communicator = communicator
825         logger.debug("Context rank {} in context {} of size {}".format(context_rank,
826                                                                        self._session_communicator,
827                                                                        context_comm_size))
828
829         ###
830         # Process the work specification.
831         ###
832         logger.debug("Processing workspec:\n{}".format(str(self.work)))
833
834         # Get a builder for DAG components for each element
835         builders = {}
836         builder_sequence = []
837         for element in self.work:
838             # dispatch builders for operation implementations
839             try:
840                 new_builder = self.__operations[element.namespace][element.operation](element)
841                 assert hasattr(new_builder, 'add_subscriber')
842                 assert hasattr(new_builder, 'build')
843
844                 logger.info("Collected builder for {}".format(element.name))
845             except LookupError as e:
846                 request = '.'.join([element.namespace, element.operation])
847                 message = 'Could not find an implementation for the specified operation: {}. '.format(request)
848                 message += str(e)
849                 raise exceptions.ApiError(message)
850             # Subscribing builders is the Context's responsibility because otherwise the builders
851             # don't know about each other. Builders should not depend on the Context unless they
852             # are a facility provided by the Context, in which case they may be member functions
853             # of the Context. We will probably need to pass at least some
854             # of the Session to the `launch()` method, though...
855             dependencies = element.depends
856             for dependency in dependencies:
857                 # If a dependency is a list, assume it is an "ensemble" of dependencies
858                 # and pick the element for corresponding to the local rank.
859                 if isinstance(dependency, (list, tuple)):
860                     assert len(dependency) > context_rank
861                     name = str(dependency[context_rank])
862                 else:
863                     name = dependency
864                 logger.info("Subscribing {} to {}.".format(element.name, name))
865                 builders[name].add_subscriber(new_builder)
866             builders[element.name] = new_builder
867             builder_sequence.append(element.name)
868             logger.debug("Appended {} to builder sequence.".format(element.name))
869
870         # Call the builders in dependency order
871         # Note: session_communicator is available, but ensemble_communicator has not been created yet.
872         graph = _Graph(width=1)
873         logger.info("Building sequence {}".format(builder_sequence))
874         for name in builder_sequence:
875             builder = builders[name]
876             logger.info("Building {}".format(builder))
877             logger.debug("Has build attribute {}.".format(builder.build))
878             builder.build(graph)
879             logger.debug("Built.")
880         self.work_width = graph.graph['width']
881
882         # Prepare working directories. This should probably be moved to some aspect of the Session and either
883         # removed from here or made more explicit to the user.
884         workdir_list = self.__workdir_list
885         if workdir_list is None:
886             workdir_list = [os.path.join('.', str(i)) for i in range(self.work_width)]
887         self.__workdir_list = list([os.path.abspath(dir) for dir in workdir_list])
888
889         # For gmxapi 0.0.6, all ranks have a session_ensemble_communicator
890         self._session_ensemble_communicator = _get_ensemble_communicator(self._session_communicator, self.work_width)
891         self.__ensemble_update = _get_ensemble_update(self)
892
893         # launch() is currently a method of gmx.core.MDSystem and returns a gmxapi::Session.
894         # MDSystem objects are obtained from gmx.core.from_tpr(). They also provide add_potential().
895         # gmxapi::Session objects are exposed as gmx.core.MDSession and provide run() and close() methods.
896         #
897         # Here, I want to find the input appropriate for this rank and get an MDSession for it.
898         # E.g. Make a pass that allows meta-objects to bind (setting md_proxy._input_tpr and md_proxy._plugins,
899         # and then call a routine implemented by each object to run whatever protocol it needs, such
900         # as `system = gmx.core.from_tpr(md._input_tpr); system.add_potential(md._plugins)
901         # For future design plans, reference https://github.com/kassonlab/gmxapi/issues/65
902         #
903         # This `if` condition is currently the thing that ultimately determines whether the
904         # rank attempts to do work.
905         if context_rank < self.work_width:
906             # print(graph)
907             logger.debug(("Launching graph {}.".format(graph.graph)))
908             logger.debug("Graph nodes: {}".format(str(list(graph.nodes))))
909             logger.debug("Graph edges: {}".format(str(list(graph.edges))))
910
911             logger.info("Launching work on context rank {}, subcommunicator rank {}.".format(
912                 self.rank,
913                 self._session_ensemble_communicator.Get_rank()))
914
915             # Launch the work for this rank
916             self.workdir = self.__workdir_list[self.rank]
917             if os.path.exists(self.workdir):
918                 if not os.path.isdir(self.workdir):
919                     raise exceptions.Error('{} is not a valid working directory.'.format(self.workdir))
920             else:
921                 os.mkdir(self.workdir)
922             os.chdir(self.workdir)
923             logger.info('rank {} changed directory to {}'.format(self.rank, self.workdir))
924             sorted_nodes = nx.topological_sort(graph)
925             runners = []
926             closers = []
927             for name in sorted_nodes:
928                 launcher = graph.nodes[name]['launch']
929                 runner = launcher(self.rank)
930                 if not runner is None:
931                     runners.append(runner)
932                     closers.append(graph.nodes[name]['close'])
933
934             # Get a session object to return. It must simply provide a `run()` function.
935             context = self # Part of workaround for bug gmxapi-214
936             class Session(object):
937                 def __init__(self, runners, closers):
938                     self.runners = list(runners)
939                     self.closers = list(closers)
940
941                 def run(self):
942                     # Note we are not following the documented protocol of running repeatedly yet.
943                     to_be_deleted = []
944                     for i, runner in enumerate(self.runners):
945                         try:
946                             runner()
947                         except StopIteration:
948                             to_be_deleted.insert(0, i)
949                     for i in to_be_deleted:
950                         del self.runners[i]
951                     return True
952
953                 def close(self):
954                     for close in self.closers:
955                         logger.debug("Closing node: {}".format(close))
956                         close()
957                     # Workaround for bug gmxapi-214
958                     if not _gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'):
959                         context._api_object = _gmxapi.Context()
960
961             self._session = Session(runners, closers)
962         else:
963             logger.info("Context rank {} has no work to do".format(self.rank))
964
965             context = self # Part of workaround for bug gmxapi-214
966             class NullSession(object):
967                 def run(self):
968                     logger.info("Running null session on rank {}.".format(self.rank))
969                     return True
970                 def close(self):
971                     logger.info("Closing null session.")
972                     # Workaround for bug gmxapi-214
973                     if not _gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'):
974                         context._api_object = _gmxapi.Context()
975                     return
976
977             self._session = NullSession()
978             self._session.rank = self.rank
979
980         # Make sure session has started on all ranks before continuing?
981
982         self._session.graph = graph
983         return self._session
984
985     def __exit__(self, exception_type, exception_value, traceback):
986         """Implement Python context manager protocol."""
987         logger.info("Exiting session on context rank {}.".format(self.rank))
988         if self._session is not None:
989             logger.info("Calling session.close().")
990             self._session.close()
991             self._session = None
992         else:
993             # Note: we should not have a None session but rather an API-compliant Session that just has no work.
994             # Reference: https://github.com/kassonlab/gmxapi/issues/41
995             logger.info("No _session known to context or session already closed.")
996         if hasattr(self, '_session_ensemble_communicator'):
997             if self._session_communicator is not None:
998                 logger.info("Freeing sub-communicator {} on rank {}".format(
999                     self._session_ensemble_communicator,
1000                     self.rank))
1001                 self._session_ensemble_communicator.Free()
1002             else:
1003                 logger.debug('"None" ensemble communicator does not need to be "Free"d.')
1004             del self._session_ensemble_communicator
1005         else:
1006             logger.debug("No ensemble subcommunicator on context rank {}.".format(self.rank))
1007
1008         logger.debug("Freeing session communicator.")
1009         self._session_communicator.Free()
1010         logger.debug("Deleting session communicator reference.")
1011         del self._session_communicator
1012
1013         os.chdir(self.__initial_cwd)
1014         logger.info("Session closed on context rank {}.".format(self.rank))
1015         # Note: Since sessions running in different processes can have different work, sessions have not necessarily
1016         # ended on all ranks. As a result, starting another session on the same resources could block until the
1017         # resources are available.
1018
1019         # Python context managers return False when there were no exceptions to handle.
1020         return False
1021
1022
1023 def get_context(work=None):
1024     """Get a concrete Context object.
1025
1026     Args:
1027         work (gmx.workflow.WorkSpec): runnable work as a valid gmx.workflow.WorkSpec object
1028
1029     Returns:
1030         An object implementing the :py:class:`gmx.context.Context` interface, if possible.
1031
1032     Raises:
1033         gmx.exceptions.ValueError if an appropriate context for ``work`` could not be loaded.
1034
1035     If work is provided, return a Context object capable of running the provided work or produce an error.
1036
1037     The semantics for finding Context implementations needs more consideration, and a more informative exception
1038     is likely possible.
1039
1040     A Context can run the provided work if
1041
1042       * the Context supports can resolve all operations specified in the elements
1043       * the Context supports DAG topologies implied by the network of dependencies
1044       * the Context supports features required by the elements with the specified parameters,
1045         such as synchronous array jobs.
1046
1047     """
1048     # We need to define an interface for WorkSpec objects so that we don't need
1049     # to rely on typing and inter-module dependencies.
1050     from .workflow import WorkSpec
1051     workspec = None
1052     if work is not None:
1053         if isinstance(work, WorkSpec):
1054             workspec = work
1055         elif hasattr(work, 'workspec') and isinstance(work.workspec,
1056                                                       WorkSpec):
1057             workspec = work.workspec
1058         else:
1059             raise exceptions.ValueError('work argument must provide a gmx.workflow.WorkSpec.')
1060     if workspec is not None and \
1061             hasattr(workspec, '_context') and \
1062             workspec._context is not None:
1063         context = workspec._context
1064     else:
1065         context = Context(work=workspec)
1066
1067     return context