afb674f10ca63d0f7c39d2f9595405d21935fe12
[alexxy/gromacs.git] / python_packaging / src / gmxapi / simulation / context.py
1 """
2 Execution Context
3 =================
4 """
5 from __future__ import absolute_import
6 from __future__ import division
7 from __future__ import print_function
8 from __future__ import unicode_literals
9
10 __all__ = ['Context']
11
12 import importlib
13 import os
14 import warnings
15 import tempfile
16
17 from gmx import exceptions
18 from gmx import logging
19 from gmx import status
20 import gmx.core as gmxapi
21
22
23 # Module-level logger
24 logger = logging.getLogger(__name__)
25 logger.info('Importing gmx.context')
26
27
28 def _load_tpr(self, element):
29     """Implement the gromacs.load_tpr operation.
30
31     Updates the minimum width of the workflow parallelism. Does not add any API object to the graph.
32
33     Arguments:
34         self: The Context in which this operation is being loaded.
35         element: WorkElement specifying the operation.
36
37     Returns:
38         A Director that the Context can use in launching the Session.
39     """
40     class Builder(object):
41         def __init__(self, tpr_list):
42             logger.debug("Loading tpr builder for tpr_list {}".format(tpr_list))
43             self.tpr_list = tpr_list
44             self.subscribers = []
45             self.width = len(tpr_list)
46
47         def add_subscriber(self, builder):
48             builder.infile = self.tpr_list
49             self.subscribers.append(builder)
50
51         def build(self, dag):
52             width = len(self.tpr_list)
53             for builder in self.subscribers:
54                 builder.width = width
55             if 'width' in dag.graph:
56                 width = max(width, dag.graph['width'])
57             dag.graph['width'] = width
58
59     return Builder(element.params['input'])
60
61 def _md(context, element):
62     """Implement the gmxapi.md operation by returning a builder that can populate a data flow graph for the element.
63
64     Inspects dependencies to set up the simulation runner.
65
66     The graph node created will have `launch` and `run` attributes with function references, and a `width`
67     attribute declaring the workflow parallelism requirement.
68
69     Arguments:
70         context: The Context in which this operation is being loaded.
71         element: WorkElement specifying the operation.
72
73     Returns:
74         A Director that the Context can use in launching the Session.
75     """
76     class Builder(object):
77         """Translate md work element to a node in the session's DAG."""
78         def __init__(self, element):
79             try:
80                 self.name = element.name
81                 # Note that currently the calling code is in charge of subscribing this builder to its dependencies.
82                 # A list of tpr files will be set when the calling code subscribes this builder to a tpr provider.
83                 self.infile = None
84                 # Other dependencies in the element may register potentials when subscribed to.
85                 self.potential = []
86                 self.input_nodes = []
87                 self.runtime_params = element.params
88             except AttributeError:
89                 raise exceptions.ValueError("object provided does not seem to be a WorkElement.")
90         def add_subscriber(self, builder):
91             """The md operation does not yet have any subscribeable facilities."""
92             pass
93         def build(self, dag):
94             """Add a node to the graph that, when launched, will construct a simulation runner.
95
96             Complete the definition of appropriate graph edges for dependencies.
97
98             The launch() method of the added node creates the runner from the tpr file for the current rank and adds
99             modules from the incoming edges.
100             """
101             if not (hasattr(dag, 'add_node')
102                     and hasattr(dag, 'add_edge')
103                     and hasattr(dag, 'graph')
104                     and hasattr(dag, 'nodes')):
105                 raise gmx.exceptions.TypeError("dag argument does not have a DiGraph interface.")
106             name = self.name
107             dag.add_node(name)
108             for neighbor in self.input_nodes:
109                 dag.add_edge(neighbor, name)
110             infile = self.infile
111             assert not infile is None
112             potential_list = self.potential
113             assert dag.graph['width'] >= len(infile)
114
115             # Provide closure with which to execute tasks for this node.
116             def launch(rank=None):
117                 assert not rank is None
118
119                 # Copy and update, if required by `end_time` parameter.
120                 temp_filename = None
121                 if 'end_time' in self.runtime_params:
122                     # Note that mkstemp returns a file descriptor as the first part of the tuple.
123                     # We can make this cleaner in 0.0.7 with a separate node that manages the
124                     # altered input.
125                     _, temp_filename = tempfile.mkstemp(suffix='.tpr')
126                     logger.debug('Updating input. Using temp file {}'.format(temp_filename))
127                     gmxapi.copy_tprfile(source=infile[rank],
128                                           destination=temp_filename,
129                                           end_time=self.runtime_params['end_time'])
130                     tpr_file = temp_filename
131                 else:
132                     tpr_file = infile[rank]
133
134                 logger.info('Loading TPR file: {}'.format(tpr_file))
135                 system = gmxapi.from_tpr(tpr_file)
136                 dag.nodes[name]['system'] = system
137                 mdargs = gmxapi.MDArgs()
138                 mdargs.set(self.runtime_params)
139                 # Workaround to give access to plugin potentials used in a context.
140                 pycontext = element.workspec._context
141                 pycontext.potentials = potential_list
142                 context = pycontext._api_object
143                 context.setMDArgs(mdargs)
144                 for potential in potential_list:
145                     context.add_mdmodule(potential)
146                 dag.nodes[name]['session'] = system.launch(context)
147                 dag.nodes[name]['close'] = dag.nodes[name]['session'].close
148
149                 if 'end_time' in self.runtime_params:
150                     def special_close():
151                         dag.nodes[name]['session'].close()
152                         logger.debug("Unlinking temporary TPR file {}.".format(temp_filename))
153                         os.unlink(temp_filename)
154                     dag.nodes[name]['close'] = special_close
155                 else:
156                     dag.nodes[name]['close'] = dag.nodes[name]['session'].close
157
158                 def runner():
159                     """Currently we only support a single call to run."""
160                     def done():
161                         raise StopIteration()
162                     # Replace the runner with a stop condition for subsequent passes.
163                     dag.nodes[name]['run'] = done
164                     return dag.nodes[name]['session'].run()
165                 dag.nodes[name]['run'] = runner
166                 return dag.nodes[name]['run']
167
168             dag.nodes[name]['launch'] = launch
169
170     return Builder(element)
171
172
173 def _get_mpi_ensemble_communicator(session_communicator, ensemble_size):
174     """Get an ensemble communicator from an MPI communicator.
175
176     An ensemble communicator is an object that implements mpi4py.MPI.Comm methods
177     as described elsewhere in this documentation.
178
179     :param session_communicator: MPI communicator with the interface described by mpi4py.MPI.Comm
180     :param ensemble_size: number of ensemble members
181     :return: communicator of described size on participating ranks and null communicator on any others.
182
183     Must be called exactly once in every process in `communicator`. It is the
184     responsibility of the calling code to refrain from running ensemble operations
185     if not part of the ensemble. The calling code determines this by comparing its
186     session_communicator.Get_rank() to ensemble_size. This is not a good solution
187     because it assumes there is a single ensemble communicator and that ensemble
188     work is allocated to ranks serially from session_rank 0. Future work might
189     use process groups associated with specific operations in the work graph so
190     that processes can check for membership in a group to decide whether to use
191     their ensemble communicator. Another possibility would be to return None
192     rather than a null communicator in processes that aren't participating in
193     a given ensemble.
194     """
195     from mpi4py import MPI
196
197     session_size = session_communicator.Get_size()
198     session_rank = session_communicator.Get_rank()
199
200     # Check the ensemble "width" against the available parallelism
201     if ensemble_size > session_size:
202         msg = 'ParallelArrayContext requires a work array that fits in the MPI communicator: '
203         msg += 'array width {} > size {}.'
204         msg = msg.format(ensemble_size, session_size)
205         raise exceptions.UsageError(msg)
206     if ensemble_size < session_size:
207         msg = 'MPI context is wider than necessary to run this work:  array width {} vs. size {}.'
208         warnings.warn(msg.format(ensemble_size, session_size))
209
210     # Create an appropriate sub-communicator for the present work. Extra ranks will be in a
211     # sub-communicator with no work.
212     if session_rank < ensemble_size:
213         # The session launcher should maintain an inventory of the ensembles and
214         # provide an appropriate tag, but right now we just have a sort of
215         # Boolean: ensemble or not.
216         color = 0
217     else:
218         color = MPI.UNDEFINED
219
220     ensemble_communicator = session_communicator.Split(color, session_rank)
221     try:
222         ensemble_communicator_size = ensemble_communicator.Get_size()
223         ensemble_communicator_rank = ensemble_communicator.Get_rank()
224     except:
225         warnings.warn("Possible API programming error: ensemble_communicator does not provide required methods...")
226         ensemble_communicator_size = 0
227         ensemble_communicator_rank = None
228     logger.info("Session rank {} assigned to rank {} of subcommunicator {} of size {}".format(
229         session_rank,
230         ensemble_communicator_rank,
231         ensemble_communicator,
232         ensemble_communicator_size
233     ))
234
235     # There isn't a good reason to worry about special handling for a null communicator,
236     # which we have to explicitly avoid "free"ing, so let's just get rid of it.
237     # To do: don't even get the null communicator in the first place. Use a group and create instead of split.
238     if ensemble_communicator == MPI.COMM_NULL:
239         ensemble_communicator = None
240
241     return ensemble_communicator
242
243
244 def _acquire_communicator(communicator=None):
245     """Get a workflow level communicator for the session.
246
247     This function is intended to be called by the __enter__ method that creates
248     a session get a communicator instance. The `Free` method of the returned
249     instance must be called exactly once. This should be performed by the
250     corresponding __exit__ method.
251
252     Arguments:
253         communicator : a communicator to duplicate (optional)
254
255     Returns:
256         A communicator that must be explicitly freed by the caller.
257
258     Currently only supports MPI multi-simulation parallelism dependent on
259     mpi4py. The mpi4py package should be installed and built with compilers
260     that are compatible with the gmxapi installation.
261
262     If provided, `communicator` must provide the mpi4py.MPI.Comm interface.
263     Returns either a duplicate of `communicator` or of MPI_COMM_WORLD if mpi4py
264     is available. Otherwise, returns a mock communicator that can only manage
265     sessions and ensembles of size 0 or 1.
266
267     gmx behavior is undefined if launched with mpiexec and without mpi4py
268     """
269
270     class MockSessionCommunicator(object):
271         def Dup(self):
272             return self
273
274         def Free(self):
275             return
276
277         def Get_size(self):
278             return 1
279
280         def Get_rank(self):
281             return 0
282
283         def __str__(self):
284             return 'Basic'
285
286         def __repr__(self):
287             return 'MockSessionCommunicator()'
288
289     if communicator is None:
290         try:
291             import mpi4py.MPI as MPI
292             communicator = MPI.COMM_WORLD
293         except ImportError:
294             logger.info("mpi4py is not available for default session communication.")
295             communicator = MockSessionCommunicator()
296     else:
297         communicator = communicator
298
299     try:
300         new_communicator = communicator.Dup()
301     except Exception as e:
302         message = "Exception when duplicating communicator: {}".format(e)
303         raise exceptions.ApiError(message)
304
305     return new_communicator
306
307
308 def _get_ensemble_communicator(communicator, ensemble_size):
309     """Provide ensemble_communicator feature in active_context, if possible.
310
311     Must be called on all ranks in `communicator`. The communicator returned
312     must be freed by a call to its `Free()` instance method. This function is
313     best used in a context manager's `__enter__()` method so that the
314     corresponding `context.Free()` can be called in the `__exit__` method.
315
316     Arguments:
317         communicator : session communicator for the session with the ensemble.
318         ensemble_size : ensemble size of the requested ensemble communicator
319
320     The ensemble_communicator feature should be present if the Context can
321     provide communication between all ensemble members. The Context should
322     determine this at the launch of the session and set the
323     ``_session_ensemble_communicator`` attribute to provide an object that
324     implements the same interface as an mpi4py.MPI.Comm object. Actually, this is
325     a temporary shim, so the only methods that need to be available are `Get_size`,
326     `Get_rank` and something that can be called as
327     Allreduce(send, recv) where send and recv are objects providing the Python
328     buffer interface.
329
330     Currently, only one ensemble can be managed in a session.
331     """
332     ensemble_communicator = None
333
334     class TrivialEnsembleCommunicator(object):
335         def __init__(self):
336             import numpy
337             self._numpy = numpy
338
339         def Free(self):
340             return
341
342         def Allreduce(self, send, recv):
343             logger.debug("Faking an Allreduce for ensemble of size 1.")
344             send_buffer = self._numpy.array(send, copy=False)
345             recv_buffer = self._numpy.array(recv, copy=False)
346             recv_buffer[:] = send_buffer[:]
347
348         def Get_size(self):
349             return 1
350
351         def Get_rank(self):
352             return 0
353
354     # For trivial cases, don't bother trying to use MPI
355     # Note: all ranks in communicator must agree on the size of the work!
356     # Note: If running with a Mock session communicator in an MPI session (user error)
357     # every rank will think it is the only rank and will try to perform the
358     # same work.
359     if communicator.Get_size() <= 1 or ensemble_size <= 1:
360         message = "Getting TrivialEnsembleCommunicator for ensemble of size {}".format((ensemble_size))
361         message += " for session rank {} in session communicator of size {}".format(
362             communicator.Get_rank(),
363             communicator.Get_size())
364         logger.debug(message)
365         ensemble_communicator = TrivialEnsembleCommunicator()
366     else:
367         message = "Getting an MPI subcommunicator for ensemble of size {}".format(ensemble_size)
368         message += " for session rank {} in session communicator of size {}".format(
369             communicator.Get_rank(),
370             communicator.Get_size())
371         logger.debug(message)
372         ensemble_communicator = _get_mpi_ensemble_communicator(communicator, ensemble_size)
373
374     return ensemble_communicator
375
376 def _get_ensemble_update(context):
377     """Set up a simple ensemble resource.
378
379     The context should call this function once per session to get an `ensemble_update`
380     function object.
381
382     This is a draft of a Context feature that may not be available in all
383     Context implementations. This factory function can be wrapped as a
384     ``ensemble_update`` "property" in a Context instance method to produce a Python function
385     with the signature ``update(context, send, recv, tag=None)``.
386
387     This feature requires that the Context is capabable of providing the
388     ensemble_communicator feature and the numpy feature.
389     If both are available, the function object provided by
390     ``ensemble_update`` provides
391     the ensemble reduce operation used by the restraint potential plugin in the
392     gmxapi sample_restraint repository. Otherwise, the provided function object
393     will log an error and then raise an exception.
394
395     gmxapi 0.0.5 and 0.0.6 MD plugin clients look for a member function named
396     ``ensemble_update`` in the Context that launched them. In the future,
397     clients will use session resources to access ensemble reduce operations.
398     In the mean time, a transitional implementation can involve defining a
399     ``ensemble_update`` property in the Context object that acts as a factory
400     function to produce the reducing operation, if possible with the given
401     resources.
402     """
403     try:
404         import numpy
405     except ImportError:
406         message = "ensemble_update requires numpy, but numpy is not available."
407         logger.error(message)
408         raise exceptions.FeatureNotAvailableError(message)
409
410     def _ensemble_update(active_context, send, recv, tag=None):
411         assert not tag is None
412         assert str(tag) != ''
413         if not tag in active_context.part:
414             active_context.part[tag] = 0
415         logger.debug("Performing ensemble update.")
416         active_context._session_ensemble_communicator.Allreduce(send, recv)
417         buffer = numpy.array(recv, copy=False)
418         buffer /= active_context.work_width
419         suffix = '_{}.npz'.format(tag)
420         # These will end up in the working directory and each ensemble member will have one
421         filename = str("rank{}part{:04d}{}".format(active_context.rank, int(active_context.part[tag]), suffix))
422         numpy.savez(filename, recv=recv)
423         active_context.part[tag] += 1
424
425     def _no_ensemble_update(active_context, send, recv, tag=None):
426         message = "Attempt to call ensemble_update() in a Context that does not provide the operation."
427         # If we confirm effective exception handling, remove the extraneous log.
428         logger.error(message)
429         raise exceptions.FeatureNotAvailableError(message)
430
431     if context._session_ensemble_communicator is not None:
432         functor = _ensemble_update
433     else:
434         functor = _no_ensemble_update
435     context.part = {}
436     return functor
437
438
439 class _libgromacsContext(object):
440     """Low level API to libgromacs library context provides Python context manager.
441
442     Binds to a workflow and manages computation resources.
443
444     Attributes:
445         workflow (:obj:`gmx.workflow.WorkSpec`): bound workflow to be executed.
446
447     Example:
448         >>> with _libgromacsContext(my_workflow) as session: # doctest: +SKIP
449         ...    session.run()
450
451     Things are still fluid, but what we might do is have all of the WorkSpec operations that are supported
452     by a given Context to correspond to member functions in the Context, a SessionBuilder, or Session. In
453     any case, the operations allow a Context implementation to transform a work specification into a
454     directed acyclic graph of schedulable work.
455     """
456     # The Context is the appropriate entity to own or mediate access to an appropriate logging facility,
457     # but right now we are using the module-level Python logger.
458     # Reference https://github.com/kassonlab/gmxapi/issues/135
459     def __init__(self, workflow=None):
460         """Create new context bound to the provided workflow, if any.
461
462         Args:
463             workflow (gmx.workflow.WorkSpec) work specification object to bind.
464
465         """
466         self._session = None
467         self.__workflow = workflow
468
469     @property
470     def workflow(self):
471         return self.__workflow
472
473     @workflow.setter
474     def workflow(self, workflow):
475         """Before accepting a workflow, the context must check whether it can interpret the work specification."""
476
477         self.__workflow = workflow
478
479     @classmethod
480     def check_workspec(cls, workspec, raises=False):
481         """Check the validity of the work specification in this Context.
482
483         Args:
484             workspec: work specification to check
485             raises: Boolean (default False)
486
487         If raises == True, raises exceptions for problems found in the work specification.
488
489         Returns:
490             True if workspec is processable in this Context, else False.
491         """
492         from gmx.workflow import workspec_version, get_source_elements, WorkElement
493         # initialize return value.
494         is_valid = True
495         # Check compatibility
496         if workspec.version != workspec_version:
497             is_valid = False
498             if raises:
499                 raise exceptions.ApiError('Incompatible workspec version.')
500         # Check that Elements are uniquely identifiable.
501         elements = dict()
502         for element in workspec.elements:
503             if element.name is not None and element.name not in elements:
504                 elements[element.name] = element
505             else:
506                 is_valid = False
507                 if raises:
508                     raise exceptions.ApiError('WorkSpec must contain uniquely named elements.')
509         # Check that the specification is complete. There must be at least one source element and all
510         # dependencies must be fulfilled.
511         sources = set([element.name for element in get_source_elements(workspec)])
512         if len(sources) < 1:
513             is_valid = False
514             if raises:
515                 raise exceptions.ApiError('WorkSpec must contain at least one source element')
516         return is_valid
517
518
519     def __enter__(self):
520         """Implement Python context manager protocol.
521
522         Returns:
523             runnable session object.
524         """
525         if self._session is not None:
526             raise exceptions.Error('Already running.')
527         # The API runner currently has an implicit context.
528         try:
529             # launch() with no arguments is deprecated.
530             # Ref: https://github.com/kassonlab/gmxapi/issues/124
531             self._session = self.workflow.launch()
532         except:
533             self._session = None
534             raise
535         return self._session
536
537     def __exit__(self, exception_type, exception_value, traceback):
538         """Implement Python context manager protocol.
539
540         Closing a session should not produce Python exceptions. Instead, exit
541         state is accessible through API objects like Status.
542         For evolving design points, see
543         - https://github.com/kassonlab/gmxapi/issues/41
544         - https://github.com/kassonlab/gmxapi/issues/121
545         """
546         self._session.close()
547         self._session = None
548         return False
549
550
551 class DefaultContext(_libgromacsContext):
552     """ Produce an appropriate context for the work and compute environment.
553
554     Deprecated:
555         Use gmx.context.get_context() to find an appropriate high-level API
556         context. For lower-level access to the library that does not employ the
557         full API Context abstraction, but instead explicitly uses a local
558         libgromacs instance, a replacement still needs to be devised. It may
559         have this same interface, but the name and scoping of DefaultContext is
560         misleading.
561     """
562     def __init__(self, work):
563         # There is very little context abstraction at this point...
564         warnings.warn("Behavior of DefaultContext is unspecified starting in gmxapi 0.0.8.", DeprecationWarning)
565         super(DefaultContext, self).__init__(work)
566
567 class Context(object):
568     """Manage an array of simulation work executing in parallel.
569
570     This is the first implementation of a new style of Context class that has some extra abstraction
571     and uses the new WorkSpec idea.
572
573     Additional facilities are available to elements of the array members.
574
575       * array element corresponding to work in the current sub-context
576       * "global" resources managed by the ParallelArrayContext
577
578     Attributes:
579         work :obj:`gmx.workflow.WorkSpec`: specification of work to be performed when a session is launched.
580         rank : numerical index of the current worker in a running session (None if not running)
581         work_width : Minimum width needed for the parallelism required by the array of work being executed.
582         elements : dictionary of references to elements of the workflow.
583
584     `rank`, `work_width`, and `elements` are empty or None until the work is processed, as during session launch.
585
586     Example:
587         Use ``mpiexec -n 2 python -m mpi4py myscript.py`` to run two jobs at the same time.
588         In this example the jobs are identical. In myscript.py:
589
590         >>> import gmx
591         >>> import gmx.core
592         >>> from gmx.data import tpr_filename # Get a test tpr filename
593         >>> work = gmx.workflow.from_tpr([tpr_filename, tpr_filename])
594         >>> gmx.run(work)
595
596     Example:
597
598         >>> import gmx
599         >>> import gmx.core
600         >>> from gmx.data import tpr_filename # Get a test tpr filename
601         >>> work = gmx.workflow.from_tpr([tpr_filename, tpr_filename])
602         >>> context = gmx.context.get_context(work)
603         >>> with context as session:
604         ...    session.run()
605         ...    # The session is one abstraction too low to know what rank it is. It lets the spawning context manage
606         ...    # such things.
607         ...    # rank = session.rank
608         ...    # The local context object knows where it fits in the global array.
609         ...    rank = context.rank
610         ...    output_path = os.path.join(context.workdir_list[rank], 'traj.trr')
611         ...    assert(os.path.exists(output_path))
612         ...    print('Worker {} produced {}'.format(rank, output_path))
613
614     Implementation notes:
615
616     To produce a running session, the Context __enter__() method is called, according to the Python context manager
617     protocol. At this time, the attached WorkSpec must be feasible on the available resources. To turn the specified
618     work into an executable directed acyclic graph (DAG), handle objects for the elements in the work spec are sequenced
619     in dependency-compatible order and the context creates a "builder" for each according to the element's operation.
620     Each builder is subscribed to the builders of its dependency elements. The DAG is then assembled by calling each
621     builder in sequence. A builder can add zero, one, or more nodes and edges to the DAG.
622
623     The Session is then launched from the DAG. What happens next is implementation-dependent, and it may take a while for
624     us to decide whether and how to standardize interfaces for the DAG nodes and edges and/or execution protocols. I
625     expect each node will at least have a `launch()` method, but will also probably have typed input and output ports as well as some signalling.
626     A sophisticated and abstract Session implementation could schedule work only to satisfy data dependencies of requested
627     output upon request. Our immediate implementation will use the following protocol.
628
629     Each node has a `launch()` method. When the session is entered, the `launch()` method is called for each node in
630     dependency order. The launch method returns either a callable (`run()` function) or None, raising an exception in
631     case of an error. The sequence of non-None callables is stored by the Session. When Session.run() is called, the
632     sequence of callables is called in order. If StopIteration is raised by the callable, it is removed from the sequence.
633     The sequence is processed repeatedly until there are no more callables.
634
635     Note that this does not rigorously handle races or deadlocks, or flexibility in automatically chasing dependencies. A more
636     thorough implementation could recursively call launch on dependencies (launch could be idempotent or involve some
637     signalling to dependents when complete), run calls could be entirely event driven, and/or nodes could "publish"
638     output (including just a completion message), blocking for acknowledgement before looking for the next set of subscribed inputs.
639     """
640
641     def __init__(self, work=None, workdir_list=None, communicator=None):
642         """Create manager for computing resources.
643
644         Does not initialize resources because Python objects by themselves do
645         not have a good way to deinitialize resources. Instead, resources are
646         initialized using the Python context manager protocol when sessions are
647         entered and exited.
648
649         Appropriate computing resources need to be knowable when the Context is created.
650
651         Keyword Arguments:
652             work : work specification with which to initialize this context
653             workdir_list : deprecated
654             communicator : non-owning reference to a multiprocessing communicator
655
656         If provided, communicator must implement the mpi4py.MPI.Comm interface. The
657         Context will use this communicator as the parent for subcommunicators
658         used when launching sessions. If provided, communicator is owned by the
659         caller, and must be freed by the caller after any sessions are closed.
660         By default, the Context will get a reference to MPI_COMM_WORLD, which
661         will be freed when the Python process ends and cleans up its resources.
662         The communicator stored by the Context instance will not be used directly,
663         but will be duplicated when launching sessions using ``with``.
664         """
665
666         # self.__context_array = list([Context(work_element) for work_element in work])
667         from gmx.workflow import WorkSpec
668
669         # Until better Session abstraction exists at the Python level, a
670         # _session_communicator attribute will be added to and removed from the
671         # context at session entry and exit. If necessary, a _session_ensemble_communicator
672         # will be split from _session_communicator for simulation ensembles
673         # present in the specified work.
674         self.__communicator = communicator
675
676         self.__work = WorkSpec()
677         self.__workdir_list = workdir_list
678
679         self._session = None
680         # This may not belong here. Is it confusing for the Context to have both global and local properties?
681         # Alternatively, maybe a trivial `property` that gets the rank from a bound session, if any.
682         self.rank = None
683
684         # `work_width` notes the required width of an array of synchronous tasks to perform the specified work.
685         # As work elements are processed, self.work_width will be increased as appropriate.
686         self.work_width = None
687
688         # initialize the operations map. May be extended during the lifetime of a Context.
689         # Note that there may be a difference between built-in operations provided by this module and
690         # additional operations registered at run time.
691         self.__operations = dict()
692         # The map contains a builder for each operation. The builder is created by passing the element to the function
693         # in the map. The object returned must have the following methods:
694         #
695         #   * add_subscriber(another_builder) : allow other builders to subscribe to this one.
696         #   * build(dag) : Fulfill the builder responsibilities by adding an arbitrary number of nodes and edges to a Graph.
697         #
698         # The gmxapi namespace of operations should be consistent with a specified universal set of functionalities
699         self.__operations['gmxapi'] = {'md': lambda element : _md(self, element),
700                                        # 'global_data' : shared_data_maker,
701                                       }
702         # Even if TPR file loading were to become a common and stable enough operation to be specified in
703         # and API, it is unlikely to be implemented by any code outside of GROMACS, so let's not clutter
704         # a potentially more universal namespace.
705         self.__operations['gromacs'] = {'load_tpr': lambda element : _load_tpr(self, element),
706                                        }
707
708         # Right now we are treating workspec elements and work DAG nodes as equivalent, but they are
709         # emphatically not intended to be tightly coupled. The work specification is intended to be
710         # simple, user-friendly, general, and easy-to-implement. The DAG is an implementation detail
711         # and may differ across context types. It is likely to have stronger typing of nodes and/or
712         # edges. It is not yet specified whether we should translate the work into a graph before, after,
713         # or during processing of the elements, so it is not yet known whether we will need facilities
714         # to allow cross-referencing between the two graph-type structures. If we instantiate API objects
715         # as we process work elements, and the DAG in a context deviates from the work specification
716         # topology, we would need to use named dependencies to look up objects to bind to. Such facilities
717         # could be hidden in the WorkElement class(es), too, to delegate code away from the Context as a
718         # container class growing without bounds...
719         # In actuality, we will have to process the entire workspec to some degree to make sure we can
720         # run it on the available resources.
721         self.elements = None
722
723         # This setter must be called after the operations map has been populated.
724         self.work = work
725
726         self._api_object = gmxapi.Context()
727
728     @property
729     def work(self):
730         return self.__work
731
732     @work.setter
733     def work(self, work):
734         """Set `work` attribute.
735
736         Raises:
737             gmx.exceptions.ApiError: work is not compatible with schema or
738                 known operations.
739             gmx.exceptions.UsageError: Context can not access operations in
740                 the name space given for an Element
741             gmx.exceptions.ValueError: assignment operation cannot be performed
742                 for the provided object (rhs)
743
744         For discussion on error handling, see https://github.com/kassonlab/gmxapi/issues/125
745         """
746         from gmx.workflow import WorkSpec, WorkElement
747         if work is None:
748             return
749
750         if isinstance(work, WorkSpec):
751             workspec = work
752         elif hasattr(work, 'workspec') and isinstance(work.workspec, WorkSpec):
753             workspec = work.workspec
754         else:
755             raise exceptions.ValueError('work argument must provide a gmx.workflow.WorkSpec.')
756         workspec._context = self
757
758         # Make sure this context knows how to run the specified work.
759         for e in workspec.elements:
760             element = WorkElement.deserialize(workspec.elements[e])
761
762             # Note: Non-built-in namespaces (non-native) are treated as modules to import.
763             # Native namespaces may not be completely implemented in a particular version of a particular Context.
764             if element.namespace in {'gmxapi', 'gromacs'}:
765                 assert element.namespace in self.__operations
766                 if not element.operation in self.__operations[element.namespace]:
767                     # The requested element is a built-in operation but not available in this Context.
768                     # element.namespace should be mapped, but not all operations are necessarily implemented.
769                     logger.error("Operation {} not found in map {}".format(element.operation,
770                                                                            str(self.__operations)))
771                     # This check should be performed when deciding if the context is appropriate for the work.
772                     # If we are just going to use a try/catch block for this test, then we should differentiate
773                     # this exception from those raised due to incorrect usage.
774                     # The exception thrown here may evolve with https://github.com/kassonlab/gmxapi/issues/125
775                     raise exceptions.FeatureNotAvailableError(
776                         'Specified work cannot be performed due to unimplemented operation {}.{}.'.format(
777                             element.namespace,
778                             element.operation))
779
780             else:
781                 assert element.namespace not in {'gmxapi', 'gromacs'}
782
783                 # Don't leave an empty nested dictionary if we couldn't map the operation.
784                 if element.namespace in self.__operations:
785                     namespace_map = self.__operations[element.namespace]
786                 else:
787                     namespace_map = dict()
788
789                 # Set or update namespace map iff we have something new.
790                 if element.operation not in namespace_map:
791                     try:
792                         element_module = importlib.import_module(element.namespace)
793                         element_operation = getattr(element_module, element.operation)
794                     except ImportError as e:
795                         raise exceptions.UsageError(
796                             'Cannot find implementation for namespace {}. ImportError: {}'.format(
797                                 element.namespace,
798                                 str(e)))
799                     except AttributeError:
800                         raise exceptions.UsageError(
801                             'Cannot find factory for operation {}.{}'.format(
802                                 element.namespace,
803                                 element.operation
804                             )
805                         )
806                     namespace_map[element.operation] = element_operation
807                     self.__operations[element.namespace] = namespace_map
808
809         self.__work = workspec
810
811     def add_operation(self, namespace, operation, get_builder):
812         """Add a builder factory to the operation map.
813
814         Extends the known operations of the Context by mapping an operation in a namespace to a function
815         that returns a builder to process a work element referencing the operation. Must be called before
816         the work specification is added, since the spec is inspected to confirm that the Context can run it.
817
818         It may be more appropriate to add this functionality to the Context constructor or as auxiliary
819         information in the workspec, or to remove it entirely; it is straight-forward to just add snippets
820         of code to additional files in the working directory and to make them available as modules for the
821         Context to import.
822
823         Example:
824
825             >>> # Import some custom extension code.
826             >>> import myplugin
827             >>> myelement = myplugin.new_element()
828             >>> workspec = gmx.workflow.WorkSpec()
829             >>> workspec.add_element(myelement)
830             >>> context = gmx.context.ParallelArrayContext()
831             >>> context.add_operation(myelement.namespace, myelement.operation, myplugin.element_translator)
832             >>> context.work = workspec
833             >>> with get_context() as session:
834             ...    session.run()
835
836         """
837         if namespace not in self.__operations:
838             if namespace in {'gmxapi', 'gromacs'}:
839                 raise exceptions.UsageError("Cannot add operations to built-in namespaces.")
840             else:
841                 self.__operations[namespace] = dict()
842         else:
843             assert namespace in self.__operations
844
845         if operation in self.__operations[namespace]:
846             raise exceptions.UsageError("Operation {}.{} already defined in this context.".format(namespace, operation))
847         else:
848             self.__operations[namespace][operation] = get_builder
849
850     # Set up a simple ensemble resource
851     # This should be implemented for Session, not Context, and use an appropriate subcommunicator
852     # that is created and freed as the Session launches and exits.
853     def ensemble_update(self, send, recv, tag=None):
854         """Implement the ensemble_update member function that gmxapi through 0.0.6 expects.
855
856         """
857         # gmxapi through 0.0.6 expects to bind to this member function during "build".
858         # This behavior needs to be deprecated (bind during launch, instead), but this
859         # dispatching function should be an effective placeholder.
860         if tag is None or str(tag) == '':
861             raise exceptions.ApiError("ensemble_update must be called with a name tag.")
862         # __ensemble_update is an attribute, not an instance function, so we need to explicitly pass 'self'
863         return self.__ensemble_update(self, send, recv, tag)
864
865     def __enter__(self):
866         """Implement Python context manager protocol, producing a Session for the specified work in this Context.
867
868         Returns:
869             Session object that can be run and/or inspected.
870
871         Additional API operations are possible while the Session is active. When used as a Python context manager,
872         the Context will close the Session at the end of the `with` block by calling `__exit__`.
873
874         Note: this is probably where we will have to process the work specification to determine whether we
875         have appropriate resources (such as sufficiently wide parallelism). Until we have a better Session
876         abstraction, this means the clean approach should take two passes to first build a DAG and then
877         instantiate objects to perform the work. In the first implementation, we kind of muddle things into
878         a single pass.
879         """
880         try:
881             import networkx as nx
882             from networkx import DiGraph as _Graph
883         except ImportError:
884             raise exceptions.FeatureNotAvailableError("gmx requires the networkx package to execute work graphs.")
885
886         # Cache the working directory from which we were launched so that __exit__() can give us proper context
887         # management behavior.
888         self.__initial_cwd = os.getcwd()
889         logger.debug("Launching session from {}".format(self.__initial_cwd))
890
891         if self._session is not None:
892             raise exceptions.Error('Already running.')
893         if self.work is None:
894             raise exceptions.UsageError('No work to perform!')
895
896         # Set up the global and local context.
897         # Check the global MPI configuration
898         # Since the Context doesn't have a destructor, if we use an MPI communicator at this scope then
899         # it has to be owned and managed outside of Context.
900         self._session_communicator = _acquire_communicator(self.__communicator)
901         context_comm_size = self._session_communicator.Get_size()
902         context_rank = self._session_communicator.Get_rank()
903         self.rank = context_rank
904         # self._communicator = communicator
905         logger.debug("Context rank {} in context {} of size {}".format(context_rank,
906                                                                        self._session_communicator,
907                                                                        context_comm_size))
908
909         ###
910         # Process the work specification.
911         ###
912         logger.debug("Processing workspec:\n{}".format(str(self.work)))
913
914         # Get a builder for DAG components for each element
915         builders = {}
916         builder_sequence = []
917         for element in self.work:
918             # dispatch builders for operation implementations
919             try:
920                 new_builder = self.__operations[element.namespace][element.operation](element)
921                 assert hasattr(new_builder, 'add_subscriber')
922                 assert hasattr(new_builder, 'build')
923
924                 logger.info("Collected builder for {}".format(element.name))
925             except LookupError as e:
926                 request = '.'.join([element.namespace, element.operation])
927                 message = 'Could not find an implementation for the specified operation: {}. '.format(request)
928                 message += str(e)
929                 raise exceptions.ApiError(message)
930             # Subscribing builders is the Context's responsibility because otherwise the builders
931             # don't know about each other. Builders should not depend on the Context unless they
932             # are a facility provided by the Context, in which case they may be member functions
933             # of the Context. We will probably need to pass at least some
934             # of the Session to the `launch()` method, though...
935             dependencies = element.depends
936             for dependency in dependencies:
937                 # If a dependency is a list, assume it is an "ensemble" of dependencies
938                 # and pick the element for corresponding to the local rank.
939                 if isinstance(dependency, (list, tuple)):
940                     assert len(dependency) > context_rank
941                     name = str(dependency[context_rank])
942                 else:
943                     name = dependency
944                 logger.info("Subscribing {} to {}.".format(element.name, name))
945                 builders[name].add_subscriber(new_builder)
946             builders[element.name] = new_builder
947             builder_sequence.append(element.name)
948
949         # Call the builders in dependency order
950         # Note: session_communicator is available, but ensemble_communicator has not been created yet.
951         graph = _Graph(width=1)
952         logger.info("Building sequence {}".format(builder_sequence))
953         for name in builder_sequence:
954             builder = builders[name]
955             logger.info("Building {}".format(builder))
956             logger.debug("Has build attribute {}.".format(builder.build))
957             builder.build(graph)
958         self.work_width = graph.graph['width']
959
960         # Prepare working directories. This should probably be moved to some aspect of the Session and either
961         # removed from here or made more explicit to the user.
962         workdir_list = self.__workdir_list
963         if workdir_list is None:
964             workdir_list = [os.path.join('.', str(i)) for i in range(self.work_width)]
965         self.__workdir_list = list([os.path.abspath(dir) for dir in workdir_list])
966
967         # For gmxapi 0.0.6, all ranks have a session_ensemble_communicator
968         self._session_ensemble_communicator = _get_ensemble_communicator(self._session_communicator, self.work_width)
969         self.__ensemble_update = _get_ensemble_update(self)
970
971         # launch() is currently a method of gmx.core.MDSystem and returns a gmxapi::Session.
972         # MDSystem objects are obtained from gmx.core.from_tpr(). They also provide add_potential().
973         # gmxapi::Session objects are exposed as gmx.core.MDSession and provide run() and close() methods.
974         #
975         # Here, I want to find the input appropriate for this rank and get an MDSession for it.
976         # E.g. Make a pass that allows meta-objects to bind (setting md_proxy._input_tpr and md_proxy._plugins,
977         # and then call a routine implemented by each object to run whatever protocol it needs, such
978         # as `system = gmx.core.from_tpr(md._input_tpr); system.add_potential(md._plugins)
979         # For future design plans, reference https://github.com/kassonlab/gmxapi/issues/65
980         #
981         # This `if` condition is currently the thing that ultimately determines whether the
982         # rank attempts to do work.
983         if context_rank < self.work_width:
984             # print(graph)
985             logger.debug(("Launching graph {}.".format(graph.graph)))
986             logger.debug("Graph nodes: {}".format(str(list(graph.nodes))))
987             logger.debug("Graph edges: {}".format(str(list(graph.edges))))
988
989             logger.info("Launching work on context rank {}, subcommunicator rank {}.".format(
990                 self.rank,
991                 self._session_ensemble_communicator.Get_rank()))
992
993             # Launch the work for this rank
994             self.workdir = self.__workdir_list[self.rank]
995             if os.path.exists(self.workdir):
996                 if not os.path.isdir(self.workdir):
997                     raise exceptions.FileError('{} is not a valid working directory.'.format(self.workdir))
998             else:
999                 os.mkdir(self.workdir)
1000             os.chdir(self.workdir)
1001             logger.info('rank {} changed directory to {}'.format(self.rank, self.workdir))
1002             sorted_nodes = nx.topological_sort(graph)
1003             runners = []
1004             closers = []
1005             for name in sorted_nodes:
1006                 launcher = graph.nodes[name]['launch']
1007                 runner = launcher(self.rank)
1008                 if not runner is None:
1009                     runners.append(runner)
1010                     closers.append(graph.nodes[name]['close'])
1011
1012             # Get a session object to return. It must simply provide a `run()` function.
1013             context = self # Part of workaround for bug gmxapi-214
1014             class Session(object):
1015                 def __init__(self, runners, closers):
1016                     self.runners = list(runners)
1017                     self.closers = list(closers)
1018
1019                 def run(self):
1020                     # Note we are not following the documented protocol of running repeatedly yet.
1021                     to_be_deleted = []
1022                     for i, runner in enumerate(self.runners):
1023                         try:
1024                             runner()
1025                         except StopIteration:
1026                             to_be_deleted.insert(0, i)
1027                     for i in to_be_deleted:
1028                         del self.runners[i]
1029                     return True
1030
1031                 def close(self):
1032                     for close in self.closers:
1033                         logger.debug("Closing node: {}".format(close))
1034                         close()
1035                     # Workaround for bug gmxapi-214
1036                     if not gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'):
1037                         context._api_object = gmxapi.Context()
1038
1039
1040             self._session = Session(runners, closers)
1041         else:
1042             logger.info("Context rank {} has no work to do".format(self.rank))
1043
1044             context = self # Part of workaround for bug gmxapi-214
1045             class NullSession(object):
1046                 def run(self):
1047                     logger.info("Running null session on rank {}.".format(self.rank))
1048                     return status.Status()
1049                 def close(self):
1050                     logger.info("Closing null session.")
1051                     # Workaround for bug gmxapi-214
1052                     if not gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'):
1053                         context._api_object = gmxapi.Context()
1054                     return
1055
1056             self._session = NullSession()
1057             self._session.rank = self.rank
1058
1059         # Make sure session has started on all ranks before continuing?
1060
1061         self._session.graph = graph
1062         return self._session
1063
1064     def __exit__(self, exception_type, exception_value, traceback):
1065         """Implement Python context manager protocol."""
1066         logger.info("Exiting session on context rank {}.".format(self.rank))
1067         if self._session is not None:
1068             logger.info("Calling session.close().")
1069             self._session.close()
1070             self._session = None
1071         else:
1072             # Note: we should not have a None session but rather an API-compliant Session that just has no work.
1073             # Reference: https://github.com/kassonlab/gmxapi/issues/41
1074             logger.info("No _session known to context or session already closed.")
1075         if hasattr(self, '_session_ensemble_communicator'):
1076             if self._session_communicator is not None:
1077                 logger.info("Freeing sub-communicator {} on rank {}".format(
1078                     self._session_ensemble_communicator,
1079                     self.rank))
1080                 self._session_ensemble_communicator.Free()
1081             else:
1082                 logger.debug('"None" ensemble communicator does not need to be "Free"d.')
1083             del self._session_ensemble_communicator
1084         else:
1085             logger.debug("No ensemble subcommunicator on context rank {}.".format(self.rank))
1086
1087         logger.debug("Freeing session communicator.")
1088         self._session_communicator.Free()
1089         logger.debug("Deleting session communicator reference.")
1090         del self._session_communicator
1091
1092         os.chdir(self.__initial_cwd)
1093         logger.info("Session closed on context rank {}.".format(self.rank))
1094         # Note: Since sessions running in different processes can have different work, sessions have not necessarily
1095         # ended on all ranks. As a result, starting another session on the same resources could block until the
1096         # resources are available.
1097
1098         # Python context managers return False when there were no exceptions to handle.
1099         return False
1100
1101
1102 # The interface and functionality of ParallelArrayContext is the new generic
1103 # Context behavior, but we need to keep the old name for compatibility for
1104 # the moment.
1105 ParallelArrayContext = Context
1106
1107 def get_context(work=None):
1108     """Get a concrete Context object.
1109
1110     Args:
1111         work (gmx.workflow.WorkSpec): runnable work as a valid gmx.workflow.WorkSpec object
1112
1113     Returns:
1114         An object implementing the :py:class:`gmx.context.Context` interface, if possible.
1115
1116     Raises:
1117         gmx.exceptions.ValueError if an appropriate context for ``work`` could not be loaded.
1118
1119     If work is provided, return a Context object capable of running the provided work or produce an error.
1120
1121     The semantics for finding Context implementations needs more consideration, and a more informative exception
1122     is likely possible.
1123
1124     A Context can run the provided work if
1125
1126       * the Context supports can resolve all operations specified in the elements
1127       * the Context supports DAG topologies implied by the network of dependencies
1128       * the Context supports features required by the elements with the specified parameters,
1129         such as synchronous array jobs.
1130
1131     """
1132     # We need to define an interface for WorkSpec objects so that we don't need
1133     # to rely on typing and inter-module dependencies.
1134     from gmx import workflow
1135     workspec = None
1136     if work is not None:
1137         if isinstance(work, workflow.WorkSpec):
1138             workspec = work
1139         elif hasattr(work, 'workspec') and isinstance(work.workspec,
1140                                                       workflow.WorkSpec):
1141             workspec = work.workspec
1142         else:
1143             raise exceptions.ValueError('work argument must provide a gmx.workflow.WorkSpec.')
1144     if workspec is not None and \
1145             hasattr(workspec, '_context') and \
1146             workspec._context is not None:
1147         context = workspec._context
1148     else:
1149         context = Context(work=workspec)
1150
1151     return context