Python wrapping code for gmxapi mdrun bindings.
[alexxy/gromacs.git] / python_packaging / src / gmxapi / simulation / context.py
1 #
2 # This file is part of the GROMACS molecular simulation package.
3 #
4 # Copyright (c) 2019, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
8 #
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
13 #
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 # Lesser General Public License for more details.
18 #
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
23 #
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
31 #
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
34 #
35 # This file is based on the Kasson Lab gmxapi project release 0.0.7.4.
36 # https://github.com/kassonlab/gmxapi/blob/v0.0.7.4/src/gmx/context.py
37 # https://github.com/kassonlab/gmxapi/blob/v0.0.7.4/LICENSE
38 """
39 Execution Context
40 =================
41 """
42
43 __all__ = ['Context']
44
45 import importlib
46 import os
47 import warnings
48 import tempfile
49
50 from gmxapi import exceptions
51 from gmxapi import logger as root_logger
52 import gmxapi._gmxapi as _gmxapi
53
54 # Module-level logger
55 logger = root_logger.getChild('context')
56 logger.info('Importing {}'.format(__name__))
57
58
59 def _load_tpr(self, element):
60     """Implement the gromacs.load_tpr operation.
61
62     Updates the minimum width of the workflow parallelism. Does not add any API object to the graph.
63
64     Arguments:
65         self: The Context in which this operation is being loaded.
66         element: WorkElement specifying the operation.
67
68     Returns:
69         A Director that the Context can use in launching the Session.
70     """
71     class Builder(object):
72         def __init__(self, tpr_list):
73             logger.debug("Loading tpr builder for tpr_list {}".format(tpr_list))
74             self.tpr_list = tpr_list
75             self.subscribers = []
76             self.width = len(tpr_list)
77
78         def add_subscriber(self, builder):
79             builder.infile = self.tpr_list
80             self.subscribers.append(builder)
81
82         def build(self, dag):
83             width = len(self.tpr_list)
84             for builder in self.subscribers:
85                 builder.width = width
86             if 'width' in dag.graph:
87                 width = max(width, dag.graph['width'])
88             dag.graph['width'] = width
89
90     return Builder(element.params['input'])
91
92 def _md(context, element):
93     """Implement the gmxapi.md operation by returning a builder that can populate a data flow graph for the element.
94
95     Inspects dependencies to set up the simulation runner.
96
97     The graph node created will have `launch` and `run` attributes with function references, and a `width`
98     attribute declaring the workflow parallelism requirement.
99
100     Arguments:
101         context: The Context in which this operation is being loaded.
102         element: WorkElement specifying the operation.
103
104     Returns:
105         A Director that the Context can use in launching the Session.
106     """
107     class Builder(object):
108         """Translate md work element to a node in the session's DAG."""
109         def __init__(self, element):
110             try:
111                 self.name = element.name
112                 # Note that currently the calling code is in charge of subscribing this builder to its dependencies.
113                 # A list of tpr files will be set when the calling code subscribes this builder to a tpr provider.
114                 self.infile = None
115                 # Other dependencies in the element may register potentials when subscribed to.
116                 self.potential = []
117                 self.input_nodes = []
118                 self.runtime_params = element.params
119             except AttributeError:
120                 raise exceptions.ValueError("object provided does not seem to be a WorkElement.")
121         def add_subscriber(self, builder):
122             """The md operation does not yet have any subscribeable facilities."""
123             pass
124         def build(self, dag):
125             """Add a node to the graph that, when launched, will construct a simulation runner.
126
127             Complete the definition of appropriate graph edges for dependencies.
128
129             The launch() method of the added node creates the runner from the tpr file for the current rank and adds
130             modules from the incoming edges.
131             """
132             if not (hasattr(dag, 'add_node')
133                     and hasattr(dag, 'add_edge')
134                     and hasattr(dag, 'graph')
135                     and hasattr(dag, 'nodes')):
136                 raise exceptions.ProtocolError("Argument 'dag' must provide the DiGraph interface.")
137             name = self.name
138             dag.add_node(name)
139             for neighbor in self.input_nodes:
140                 dag.add_edge(neighbor, name)
141             infile = self.infile
142             assert not infile is None
143             potential_list = self.potential
144             assert dag.graph['width'] >= len(infile)
145
146             # Provide closure with which to execute tasks for this node.
147             def launch(rank=None):
148                 assert not rank is None
149
150                 # Copy and update, if required by `end_time` parameter.
151                 temp_filename = None
152                 if 'end_time' in self.runtime_params:
153                     # Note that mkstemp returns a file descriptor as the first part of the tuple.
154                     # We can make this cleaner in 0.0.7 with a separate node that manages the
155                     # altered input.
156                     _, temp_filename = tempfile.mkstemp(suffix='.tpr')
157                     logger.debug('Updating input. Using temp file {}'.format(temp_filename))
158                     _gmxapi.rewrite_tprfile(source=infile[rank],
159                                          destination=temp_filename,
160                                          end_time=self.runtime_params['end_time'])
161                     tpr_file = temp_filename
162                 else:
163                     tpr_file = infile[rank]
164
165                 logger.info('Loading TPR file: {}'.format(tpr_file))
166                 system = _gmxapi.from_tpr(tpr_file)
167                 dag.nodes[name]['system'] = system
168                 mdargs = _gmxapi.MDArgs()
169                 mdargs.set(self.runtime_params)
170                 # Workaround to give access to plugin potentials used in a context.
171                 pycontext = element.workspec._context
172                 pycontext.potentials = potential_list
173                 context = pycontext._api_object
174                 context.setMDArgs(mdargs)
175                 for potential in potential_list:
176                     context.add_mdmodule(potential)
177                 dag.nodes[name]['session'] = system.launch(context)
178                 dag.nodes[name]['close'] = dag.nodes[name]['session'].close
179
180                 if 'end_time' in self.runtime_params:
181                     def special_close():
182                         dag.nodes[name]['session'].close()
183                         logger.debug("Unlinking temporary TPR file {}.".format(temp_filename))
184                         os.unlink(temp_filename)
185                     dag.nodes[name]['close'] = special_close
186                 else:
187                     dag.nodes[name]['close'] = dag.nodes[name]['session'].close
188
189                 def runner():
190                     """Currently we only support a single call to run."""
191                     def done():
192                         raise StopIteration()
193                     # Replace the runner with a stop condition for subsequent passes.
194                     dag.nodes[name]['run'] = done
195                     return dag.nodes[name]['session'].run()
196                 dag.nodes[name]['run'] = runner
197                 return dag.nodes[name]['run']
198
199             dag.nodes[name]['launch'] = launch
200
201     return Builder(element)
202
203
204 def _get_mpi_ensemble_communicator(session_communicator, ensemble_size):
205     """Get an ensemble communicator from an MPI communicator.
206
207     An ensemble communicator is an object that implements mpi4py.MPI.Comm methods
208     as described elsewhere in this documentation.
209
210     :param session_communicator: MPI communicator with the interface described by mpi4py.MPI.Comm
211     :param ensemble_size: number of ensemble members
212     :return: communicator of described size on participating ranks and null communicator on any others.
213
214     Must be called exactly once in every process in `communicator`. It is the
215     responsibility of the calling code to refrain from running ensemble operations
216     if not part of the ensemble. The calling code determines this by comparing its
217     session_communicator.Get_rank() to ensemble_size. This is not a good solution
218     because it assumes there is a single ensemble communicator and that ensemble
219     work is allocated to ranks serially from session_rank 0. Future work might
220     use process groups associated with specific operations in the work graph so
221     that processes can check for membership in a group to decide whether to use
222     their ensemble communicator. Another possibility would be to return None
223     rather than a null communicator in processes that aren't participating in
224     a given ensemble.
225     """
226     from mpi4py import MPI
227
228     session_size = session_communicator.Get_size()
229     session_rank = session_communicator.Get_rank()
230
231     # Check the ensemble "width" against the available parallelism
232     if ensemble_size > session_size:
233         msg = 'ParallelArrayContext requires a work array that fits in the MPI communicator: '
234         msg += 'array width {} > size {}.'
235         msg = msg.format(ensemble_size, session_size)
236         raise exceptions.UsageError(msg)
237     if ensemble_size < session_size:
238         msg = 'MPI context is wider than necessary to run this work:  array width {} vs. size {}.'
239         warnings.warn(msg.format(ensemble_size, session_size))
240
241     # Create an appropriate sub-communicator for the present work. Extra ranks will be in a
242     # sub-communicator with no work.
243     if session_rank < ensemble_size:
244         # The session launcher should maintain an inventory of the ensembles and
245         # provide an appropriate tag, but right now we just have a sort of
246         # Boolean: ensemble or not.
247         color = 0
248     else:
249         color = MPI.UNDEFINED
250
251     ensemble_communicator = session_communicator.Split(color, session_rank)
252     try:
253         ensemble_communicator_size = ensemble_communicator.Get_size()
254         ensemble_communicator_rank = ensemble_communicator.Get_rank()
255     except:
256         warnings.warn("Possible API programming error: ensemble_communicator does not provide required methods...")
257         ensemble_communicator_size = 0
258         ensemble_communicator_rank = None
259     logger.info("Session rank {} assigned to rank {} of subcommunicator {} of size {}".format(
260         session_rank,
261         ensemble_communicator_rank,
262         ensemble_communicator,
263         ensemble_communicator_size
264     ))
265
266     # There isn't a good reason to worry about special handling for a null communicator,
267     # which we have to explicitly avoid "free"ing, so let's just get rid of it.
268     # To do: don't even get the null communicator in the first place. Use a group and create instead of split.
269     if ensemble_communicator == MPI.COMM_NULL:
270         ensemble_communicator = None
271
272     return ensemble_communicator
273
274
275 def _acquire_communicator(communicator=None):
276     """Get a workflow level communicator for the session.
277
278     This function is intended to be called by the __enter__ method that creates
279     a session get a communicator instance. The `Free` method of the returned
280     instance must be called exactly once. This should be performed by the
281     corresponding __exit__ method.
282
283     Arguments:
284         communicator : a communicator to duplicate (optional)
285
286     Returns:
287         A communicator that must be explicitly freed by the caller.
288
289     Currently only supports MPI multi-simulation parallelism dependent on
290     mpi4py. The mpi4py package should be installed and built with compilers
291     that are compatible with the gmxapi installation.
292
293     If provided, `communicator` must provide the mpi4py.MPI.Comm interface.
294     Returns either a duplicate of `communicator` or of MPI_COMM_WORLD if mpi4py
295     is available. Otherwise, returns a mock communicator that can only manage
296     sessions and ensembles of size 0 or 1.
297
298     gmx behavior is undefined if launched with mpiexec and without mpi4py
299     """
300
301     class MockSessionCommunicator(object):
302         def Dup(self):
303             return self
304
305         def Free(self):
306             return
307
308         def Get_size(self):
309             return 1
310
311         def Get_rank(self):
312             return 0
313
314         def __str__(self):
315             return 'Basic'
316
317         def __repr__(self):
318             return 'MockSessionCommunicator()'
319
320     if communicator is None:
321         try:
322             import mpi4py.MPI as MPI
323             communicator = MPI.COMM_WORLD
324         except ImportError:
325             logger.info("mpi4py is not available for default session communication.")
326             communicator = MockSessionCommunicator()
327     else:
328         communicator = communicator
329
330     try:
331         new_communicator = communicator.Dup()
332     except Exception as e:
333         message = "Exception when duplicating communicator: {}".format(e)
334         raise exceptions.ApiError(message)
335
336     return new_communicator
337
338
339 def _get_ensemble_communicator(communicator, ensemble_size):
340     """Provide ensemble_communicator feature in active_context, if possible.
341
342     Must be called on all ranks in `communicator`. The communicator returned
343     must be freed by a call to its `Free()` instance method. This function is
344     best used in a context manager's `__enter__()` method so that the
345     corresponding `context.Free()` can be called in the `__exit__` method.
346
347     Arguments:
348         communicator : session communicator for the session with the ensemble.
349         ensemble_size : ensemble size of the requested ensemble communicator
350
351     The ensemble_communicator feature should be present if the Context can
352     provide communication between all ensemble members. The Context should
353     determine this at the launch of the session and set the
354     ``_session_ensemble_communicator`` attribute to provide an object that
355     implements the same interface as an mpi4py.MPI.Comm object. Actually, this is
356     a temporary shim, so the only methods that need to be available are `Get_size`,
357     `Get_rank` and something that can be called as
358     Allreduce(send, recv) where send and recv are objects providing the Python
359     buffer interface.
360
361     Currently, only one ensemble can be managed in a session.
362     """
363     ensemble_communicator = None
364
365     class TrivialEnsembleCommunicator(object):
366         def __init__(self):
367             import numpy
368             self._numpy = numpy
369
370         def Free(self):
371             return
372
373         def Allreduce(self, send, recv):
374             logger.debug("Faking an Allreduce for ensemble of size 1.")
375             send_buffer = self._numpy.array(send, copy=False)
376             recv_buffer = self._numpy.array(recv, copy=False)
377             recv_buffer[:] = send_buffer[:]
378
379         def Get_size(self):
380             return 1
381
382         def Get_rank(self):
383             return 0
384
385     # For trivial cases, don't bother trying to use MPI
386     # Note: all ranks in communicator must agree on the size of the work!
387     # Note: If running with a Mock session communicator in an MPI session (user error)
388     # every rank will think it is the only rank and will try to perform the
389     # same work.
390     if communicator.Get_size() <= 1 or ensemble_size <= 1:
391         message = "Getting TrivialEnsembleCommunicator for ensemble of size {}".format((ensemble_size))
392         message += " for session rank {} in session communicator of size {}".format(
393             communicator.Get_rank(),
394             communicator.Get_size())
395         logger.debug(message)
396         ensemble_communicator = TrivialEnsembleCommunicator()
397     else:
398         message = "Getting an MPI subcommunicator for ensemble of size {}".format(ensemble_size)
399         message += " for session rank {} in session communicator of size {}".format(
400             communicator.Get_rank(),
401             communicator.Get_size())
402         logger.debug(message)
403         ensemble_communicator = _get_mpi_ensemble_communicator(communicator, ensemble_size)
404
405     return ensemble_communicator
406
407 def _get_ensemble_update(context):
408     """Set up a simple ensemble resource.
409
410     The context should call this function once per session to get an `ensemble_update`
411     function object.
412
413     This is a draft of a Context feature that may not be available in all
414     Context implementations. This factory function can be wrapped as a
415     ``ensemble_update`` "property" in a Context instance method to produce a Python function
416     with the signature ``update(context, send, recv, tag=None)``.
417
418     This feature requires that the Context is capabable of providing the
419     ensemble_communicator feature and the numpy feature.
420     If both are available, the function object provided by
421     ``ensemble_update`` provides
422     the ensemble reduce operation used by the restraint potential plugin in the
423     gmxapi sample_restraint repository. Otherwise, the provided function object
424     will log an error and then raise an exception.
425
426     gmxapi 0.0.5 and 0.0.6 MD plugin clients look for a member function named
427     ``ensemble_update`` in the Context that launched them. In the future,
428     clients will use session resources to access ensemble reduce operations.
429     In the mean time, a transitional implementation can involve defining a
430     ``ensemble_update`` property in the Context object that acts as a factory
431     function to produce the reducing operation, if possible with the given
432     resources.
433     """
434     try:
435         import numpy
436     except ImportError:
437         message = "ensemble_update requires numpy, but numpy is not available."
438         logger.error(message)
439         raise exceptions.FeatureNotAvailableError(message)
440
441     def _ensemble_update(active_context, send, recv, tag=None):
442         assert not tag is None
443         assert str(tag) != ''
444         if not tag in active_context.part:
445             active_context.part[tag] = 0
446         logger.debug("Performing ensemble update.")
447         active_context._session_ensemble_communicator.Allreduce(send, recv)
448         buffer = numpy.array(recv, copy=False)
449         buffer /= active_context.work_width
450         suffix = '_{}.npz'.format(tag)
451         # These will end up in the working directory and each ensemble member will have one
452         filename = str("rank{}part{:04d}{}".format(active_context.rank, int(active_context.part[tag]), suffix))
453         numpy.savez(filename, recv=recv)
454         active_context.part[tag] += 1
455
456     def _no_ensemble_update(active_context, send, recv, tag=None):
457         message = "Attempt to call ensemble_update() in a Context that does not provide the operation."
458         # If we confirm effective exception handling, remove the extraneous log.
459         logger.error(message)
460         raise exceptions.FeatureNotAvailableError(message)
461
462     if context._session_ensemble_communicator is not None:
463         functor = _ensemble_update
464     else:
465         functor = _no_ensemble_update
466     context.part = {}
467     return functor
468
469
470 class Context(object):
471     """Manage an array of simulation work executing in parallel.
472
473     .. deprecated:: 0.0.7
474
475     This execution context implementation is imported from the gmxapi 0.0.7
476     package for GROMACS 2019 and does not conform to the protocols of gmxapi 0.1+.
477     It is used internally to support legacy code.
478
479     The following features are subject to be changed or removed without further
480     notice.
481
482     Attributes:
483         work :obj:`gmx.workflow.WorkSpec`: specification of work to be performed when a session is launched.
484         rank : numerical index of the current worker in a running session (None if not running)
485         work_width : Detected number of co-scheduled work elements.
486         elements : dictionary of references to elements of the workflow.
487
488     `rank`, `work_width`, and `elements` are empty or None until the work is processed, as during session launch.
489
490     To run multiple simulations at a time, whether ensembles or independent
491     simulations, Python should be invoked in an MPI environment with `mpi4py`.
492     The Context can then use one MPI rank per simulation. It is recommended that
493     GROMACS is compiled without MPI in this case. (Thread-MPI is recommended.)
494     MPI domain decomposition is not explicitly supported with this Context
495     implementation.
496
497     Example:
498
499         >>> from gmxapi.simulation.context import get_context
500         >>> from gmxapi.simulation.workflow import from_tpr
501         >>> work = from_tpr([tpr_filename, tpr_filename])
502         >>> context = get_context(work)
503         >>> with context as session:
504         ...    session.run()
505         ...    # The session is one abstraction too low to know what rank it is. It lets the spawning context manage
506         ...    # such things.
507         ...    # rank = session.rank
508         ...    # The local context object knows where it fits in the global array.
509         ...    rank = context.rank
510         ...    output_path = os.path.join(context.workdir_list[rank], 'traj.trr')
511         ...    assert(os.path.exists(output_path))
512         ...    print('Worker {} produced {}'.format(rank, output_path))
513
514     Warning:
515         Do not run a gmxapi script in an MPI environment without `mpi4py`. gmxapi
516         will not be able to determine that other processes are running the same
517         script.
518
519     Implementation notes:
520
521     To produce a running session, the Context __enter__() method is called, according to the Python context manager
522     protocol. At this time, the attached WorkSpec must be feasible on the available resources. To turn the specified
523     work into an executable directed acyclic graph (DAG), handle objects for the elements in the work spec are sequenced
524     in dependency-compatible order and the context creates a "builder" for each according to the element's operation.
525     Each builder is subscribed to the builders of its dependency elements. The DAG is then assembled by calling each
526     builder in sequence. A builder can add zero, one, or more nodes and edges to the DAG.
527
528     The Session is then launched from the DAG. What happens next is implementation-dependent, and it may take a while for
529     us to decide whether and how to standardize interfaces for the DAG nodes and edges and/or execution protocols. I
530     expect each node will at least have a `launch()` method, but will also probably have typed input and output ports as well as some signalling.
531     A sophisticated and abstract Session implementation could schedule work only to satisfy data dependencies of requested
532     output upon request. Our immediate implementation will use the following protocol.
533
534     Each node has a `launch()` method. When the session is entered, the `launch()` method is called for each node in
535     dependency order. The launch method returns either a callable (`run()` function) or None, raising an exception in
536     case of an error. The sequence of callables is stored by the Session. When Session.run() is called, the
537     sequence of callables is called in order. If StopIteration is raised by the callable, it is removed from the sequence.
538     The sequence is processed repeatedly until there are no more callables.
539
540     Note that this does not rigorously handle races or deadlocks, or flexibility in automatically chasing dependencies. A more
541     thorough implementation could recursively call launch on dependencies (launch could be idempotent or involve some
542     signalling to dependents when complete), run calls could be entirely event driven, and/or nodes could "publish"
543     output (including just a completion message), blocking for acknowledgement before looking for the next set of subscribed inputs.
544     """
545
546     def __init__(self, work=None, workdir_list=None, communicator=None):
547         """Create manager for computing resources.
548
549         Does not initialize resources because Python objects by themselves do
550         not have a good way to deinitialize resources. Instead, resources are
551         initialized using the Python context manager protocol when sessions are
552         entered and exited.
553
554         Appropriate computing resources need to be knowable when the Context is created.
555
556         Keyword Arguments:
557             work : work specification with which to initialize this context
558             workdir_list : deprecated
559             communicator : non-owning reference to a multiprocessing communicator
560
561         If provided, communicator must implement the mpi4py.MPI.Comm interface. The
562         Context will use this communicator as the parent for subcommunicators
563         used when launching sessions. If provided, communicator is owned by the
564         caller, and must be freed by the caller after any sessions are closed.
565         By default, the Context will get a reference to MPI_COMM_WORLD, which
566         will be freed when the Python process ends and cleans up its resources.
567         The communicator stored by the Context instance will not be used directly,
568         but will be duplicated when launching sessions using ``with``.
569         """
570
571         # self.__context_array = list([Context(work_element) for work_element in work])
572         from .workflow import WorkSpec
573
574         # Until better Session abstraction exists at the Python level, a
575         # _session_communicator attribute will be added to and removed from the
576         # context at session entry and exit. If necessary, a _session_ensemble_communicator
577         # will be split from _session_communicator for simulation ensembles
578         # present in the specified work.
579         self.__communicator = communicator
580
581         self.__work = WorkSpec()
582         self.__workdir_list = workdir_list
583
584         self._session = None
585         # Note: this attribute is a detail of MPI-based Contexts. Client access
586         # is subject to change.
587         self.rank = None
588
589         # `work_width` notes the required width of an array of synchronous tasks to perform the specified work.
590         # As work elements are processed, self.work_width will be increased as appropriate.
591         self.work_width = None
592
593         # initialize the operations map. May be extended during the lifetime of a Context.
594         # Note that there may be a difference between built-in operations provided by this module and
595         # additional operations registered at run time.
596         self.__operations = dict()
597         # The map contains a builder for each operation. The builder is created by passing the element to the function
598         # in the map. The object returned must have the following methods:
599         #
600         #   * add_subscriber(another_builder) : allow other builders to subscribe to this one.
601         #   * build(dag) : Fulfill the builder responsibilities by adding an arbitrary number of nodes and edges to a Graph.
602         #
603         # The gmxapi namespace of operations should be consistent with a specified universal set of functionalities
604         self.__operations['gmxapi'] = {'md': lambda element : _md(self, element),
605                                       }
606         # Even if TPR file loading were to become a common and stable enough operation to be specified in
607         # an API, it is unlikely to be implemented by any code outside of GROMACS, so let's not clutter
608         # a potentially more universal namespace.
609         self.__operations['gromacs'] = {'load_tpr': lambda element : _load_tpr(self, element),
610                                        }
611
612         # Right now we are treating workspec elements and work DAG nodes as equivalent, but they are
613         # emphatically not intended to be tightly coupled. The work specification is intended to be
614         # simple, user-friendly, general, and easy-to-implement. The DAG is an implementation detail
615         # and may differ across context types. It is likely to have stronger typing of nodes and/or
616         # edges. It is not yet specified whether we should translate the work into a graph before, after,
617         # or during processing of the elements, so it is not yet known whether we will need facilities
618         # to allow cross-referencing between the two graph-type structures. If we instantiate API objects
619         # as we process work elements, and the DAG in a context deviates from the work specification
620         # topology, we would need to use named dependencies to look up objects to bind to. Such facilities
621         # could be hidden in the WorkElement class(es), too, to delegate code away from the Context as a
622         # container class growing without bounds...
623         # In actuality, we will have to process the entire workspec to some degree to make sure we can
624         # run it on the available resources.
625         self.elements = None
626
627         # This setter must be called after the operations map has been populated.
628         self.work = work
629
630         try:
631             self._api_object = _gmxapi.Context()
632         except Exception as e:
633             logger.error('Got exception when trying to create default library context: ' + str(e))
634             raise exceptions.ApiError('Uncaught exception in API object creation.') from e
635
636     @property
637     def work(self):
638         return self.__work
639
640     @work.setter
641     def work(self, work):
642         """Set `work` attribute.
643
644         Raises:
645             gmx.exceptions.ApiError: work is not compatible with schema or
646                 known operations.
647             gmx.exceptions.UsageError: Context can not access operations in
648                 the name space given for an Element
649             gmx.exceptions.ValueError: assignment operation cannot be performed
650                 for the provided object (rhs)
651
652         For discussion on error handling, see https://github.com/kassonlab/gmxapi/issues/125
653         """
654         from .workflow import WorkSpec, WorkElement
655         if work is None:
656             return
657
658         if isinstance(work, WorkSpec):
659             workspec = work
660         elif hasattr(work, 'workspec') and isinstance(work.workspec, WorkSpec):
661             workspec = work.workspec
662         else:
663             raise exceptions.ValueError('work argument must provide a gmx.workflow.WorkSpec.')
664         workspec._context = self
665
666         # Make sure this context knows how to run the specified work.
667         for e in workspec.elements:
668             element = WorkElement.deserialize(workspec.elements[e])
669
670             # Note: Non-built-in namespaces (non-native) are treated as modules to import.
671             # Native namespaces may not be completely implemented in a particular version of a particular Context.
672             if element.namespace in {'gmxapi', 'gromacs'}:
673                 assert element.namespace in self.__operations
674                 if not element.operation in self.__operations[element.namespace]:
675                     # The requested element is a built-in operation but not available in this Context.
676                     # element.namespace should be mapped, but not all operations are necessarily implemented.
677                     logger.error("Operation {} not found in map {}".format(element.operation,
678                                                                            str(self.__operations)))
679                     # This check should be performed when deciding if the context is appropriate for the work.
680                     # If we are just going to use a try/catch block for this test, then we should differentiate
681                     # this exception from those raised due to incorrect usage.
682                     # The exception thrown here may evolve with https://github.com/kassonlab/gmxapi/issues/125
683                     raise exceptions.FeatureNotAvailableError(
684                         'Specified work cannot be performed due to unimplemented operation {}.{}.'.format(
685                             element.namespace,
686                             element.operation))
687
688             else:
689                 assert element.namespace not in {'gmxapi', 'gromacs'}
690
691                 # Don't leave an empty nested dictionary if we couldn't map the operation.
692                 if element.namespace in self.__operations:
693                     namespace_map = self.__operations[element.namespace]
694                 else:
695                     namespace_map = dict()
696
697                 # Set or update namespace map iff we have something new.
698                 if element.operation not in namespace_map:
699                     try:
700                         element_module = importlib.import_module(element.namespace)
701                         element_operation = getattr(element_module, element.operation)
702                     except ImportError as e:
703                         raise exceptions.UsageError(
704                             'Cannot find implementation for namespace {}. ImportError: {}'.format(
705                                 element.namespace,
706                                 str(e)))
707                     except AttributeError:
708                         raise exceptions.UsageError(
709                             'Cannot find factory for operation {}.{}'.format(
710                                 element.namespace,
711                                 element.operation
712                             )
713                         )
714                     namespace_map[element.operation] = element_operation
715                     self.__operations[element.namespace] = namespace_map
716
717         self.__work = workspec
718
719     def add_operation(self, namespace, operation, get_builder):
720         # noinspection PyUnresolvedReferences
721         """Add a builder factory to the operation map.
722
723         Extends the known operations of the Context by mapping an operation in a namespace to a function
724         that returns a builder to process a work element referencing the operation. Must be called before
725         the work specification is added, since the spec is inspected to confirm that the Context can run it.
726
727         It may be more appropriate to add this functionality to the Context constructor or as auxiliary
728         information in the workspec, or to remove it entirely; it is straight-forward to just add snippets
729         of code to additional files in the working directory and to make them available as modules for the
730         Context to import.
731
732         Example:
733
734             >>> # Import some custom extension code.
735             >>> import myplugin
736             >>> myelement = myplugin.new_element()
737             >>> workspec = gmx.workflow.WorkSpec()
738             >>> workspec.add_element(myelement)
739             >>> context = gmx.context.ParallelArrayContext()
740             >>> context.add_operation(myelement.namespace, myelement.operation, myplugin.element_translator)
741             >>> context.work = workspec
742             >>> with get_context() as session:
743             ...    session.run()
744
745         """
746         if namespace not in self.__operations:
747             if namespace in {'gmxapi', 'gromacs'}:
748                 raise exceptions.UsageError("Cannot add operations to built-in namespaces.")
749             else:
750                 self.__operations[namespace] = dict()
751         else:
752             assert namespace in self.__operations
753
754         if operation in self.__operations[namespace]:
755             raise exceptions.UsageError("Operation {}.{} already defined in this context.".format(namespace, operation))
756         else:
757             self.__operations[namespace][operation] = get_builder
758
759     # Set up a simple ensemble resource
760     # This should be implemented for Session, not Context, and use an appropriate subcommunicator
761     # that is created and freed as the Session launches and exits.
762     def ensemble_update(self, send, recv, tag=None):
763         """Implement the ensemble_update member function that gmxapi through 0.0.6 expects.
764
765         """
766         # gmxapi through 0.0.6 expects to bind to this member function during "build".
767         # This behavior needs to be deprecated (bind during launch, instead), but this
768         # dispatching function should be an effective placeholder.
769         if tag is None or str(tag) == '':
770             raise exceptions.ApiError("ensemble_update must be called with a name tag.")
771         # __ensemble_update is an attribute, not an instance function, so we need to explicitly pass 'self'
772         return self.__ensemble_update(self, send, recv, tag)
773
774     def __enter__(self):
775         """Implement Python context manager protocol, producing a Session.
776
777         The work specified in this Context is inspected to build a directed
778         acyclic dependency graph (DAG). A Session is launched after reconciling
779         the configured work with available computing resources. Each time the
780         `run()` method of the Session is called, the graph is executed, and any
781         nodes that have no more work to do are pruned from the graph.
782
783         In practice, there are not currently any types of work implemented that
784         do not complete on the first pass.
785
786         Returns:
787             Session object that can be run and/or inspected.
788
789         Additional API operations are possible while the Session is active. When used as a Python context manager,
790         the Context will close the Session at the end of the `with` block by calling `__exit__`.
791
792         Note: this is probably where we will have to process the work specification to determine whether we
793         have appropriate resources (such as sufficiently wide parallelism). Until we have a better Session
794         abstraction, this means the clean approach should take two passes to first build a DAG and then
795         instantiate objects to perform the work. In the first implementation, we kind of muddle things into
796         a single pass.
797         """
798         try:
799             import networkx as nx
800             from networkx import DiGraph as _Graph
801         except ImportError:
802             raise exceptions.FeatureNotAvailableError("gmx requires the networkx package to execute work graphs.")
803
804         # Cache the working directory from which we were launched so that __exit__() can give us proper context
805         # management behavior.
806         self.__initial_cwd = os.getcwd()
807         logger.debug("Launching session from {}".format(self.__initial_cwd))
808
809         if self._session is not None:
810             raise exceptions.Error('Already running.')
811         if self.work is None:
812             raise exceptions.UsageError('No work to perform!')
813
814         # Set up the global and local context.
815         # Check the global MPI configuration
816         # Since the Context doesn't have a destructor, if we use an MPI communicator at this scope then
817         # it has to be owned and managed outside of Context.
818         self._session_communicator = _acquire_communicator(self.__communicator)
819         context_comm_size = self._session_communicator.Get_size()
820         context_rank = self._session_communicator.Get_rank()
821         self.rank = context_rank
822         # self._communicator = communicator
823         logger.debug("Context rank {} in context {} of size {}".format(context_rank,
824                                                                        self._session_communicator,
825                                                                        context_comm_size))
826
827         ###
828         # Process the work specification.
829         ###
830         logger.debug("Processing workspec:\n{}".format(str(self.work)))
831
832         # Get a builder for DAG components for each element
833         builders = {}
834         builder_sequence = []
835         for element in self.work:
836             # dispatch builders for operation implementations
837             try:
838                 new_builder = self.__operations[element.namespace][element.operation](element)
839                 assert hasattr(new_builder, 'add_subscriber')
840                 assert hasattr(new_builder, 'build')
841
842                 logger.info("Collected builder for {}".format(element.name))
843             except LookupError as e:
844                 request = '.'.join([element.namespace, element.operation])
845                 message = 'Could not find an implementation for the specified operation: {}. '.format(request)
846                 message += str(e)
847                 raise exceptions.ApiError(message)
848             # Subscribing builders is the Context's responsibility because otherwise the builders
849             # don't know about each other. Builders should not depend on the Context unless they
850             # are a facility provided by the Context, in which case they may be member functions
851             # of the Context. We will probably need to pass at least some
852             # of the Session to the `launch()` method, though...
853             dependencies = element.depends
854             for dependency in dependencies:
855                 # If a dependency is a list, assume it is an "ensemble" of dependencies
856                 # and pick the element for corresponding to the local rank.
857                 if isinstance(dependency, (list, tuple)):
858                     assert len(dependency) > context_rank
859                     name = str(dependency[context_rank])
860                 else:
861                     name = dependency
862                 logger.info("Subscribing {} to {}.".format(element.name, name))
863                 builders[name].add_subscriber(new_builder)
864             builders[element.name] = new_builder
865             builder_sequence.append(element.name)
866             logger.debug("Appended {} to builder sequence.".format(element.name))
867
868         # Call the builders in dependency order
869         # Note: session_communicator is available, but ensemble_communicator has not been created yet.
870         graph = _Graph(width=1)
871         logger.info("Building sequence {}".format(builder_sequence))
872         for name in builder_sequence:
873             builder = builders[name]
874             logger.info("Building {}".format(builder))
875             logger.debug("Has build attribute {}.".format(builder.build))
876             builder.build(graph)
877             logger.debug("Built.")
878         self.work_width = graph.graph['width']
879
880         # Prepare working directories. This should probably be moved to some aspect of the Session and either
881         # removed from here or made more explicit to the user.
882         workdir_list = self.__workdir_list
883         if workdir_list is None:
884             workdir_list = [os.path.join('.', str(i)) for i in range(self.work_width)]
885         self.__workdir_list = list([os.path.abspath(dir) for dir in workdir_list])
886
887         # For gmxapi 0.0.6, all ranks have a session_ensemble_communicator
888         self._session_ensemble_communicator = _get_ensemble_communicator(self._session_communicator, self.work_width)
889         self.__ensemble_update = _get_ensemble_update(self)
890
891         # launch() is currently a method of gmx.core.MDSystem and returns a gmxapi::Session.
892         # MDSystem objects are obtained from gmx.core.from_tpr(). They also provide add_potential().
893         # gmxapi::Session objects are exposed as gmx.core.MDSession and provide run() and close() methods.
894         #
895         # Here, I want to find the input appropriate for this rank and get an MDSession for it.
896         # E.g. Make a pass that allows meta-objects to bind (setting md_proxy._input_tpr and md_proxy._plugins,
897         # and then call a routine implemented by each object to run whatever protocol it needs, such
898         # as `system = gmx.core.from_tpr(md._input_tpr); system.add_potential(md._plugins)
899         # For future design plans, reference https://github.com/kassonlab/gmxapi/issues/65
900         #
901         # This `if` condition is currently the thing that ultimately determines whether the
902         # rank attempts to do work.
903         if context_rank < self.work_width:
904             # print(graph)
905             logger.debug(("Launching graph {}.".format(graph.graph)))
906             logger.debug("Graph nodes: {}".format(str(list(graph.nodes))))
907             logger.debug("Graph edges: {}".format(str(list(graph.edges))))
908
909             logger.info("Launching work on context rank {}, subcommunicator rank {}.".format(
910                 self.rank,
911                 self._session_ensemble_communicator.Get_rank()))
912
913             # Launch the work for this rank
914             self.workdir = self.__workdir_list[self.rank]
915             if os.path.exists(self.workdir):
916                 if not os.path.isdir(self.workdir):
917                     raise exceptions.Error('{} is not a valid working directory.'.format(self.workdir))
918             else:
919                 os.mkdir(self.workdir)
920             os.chdir(self.workdir)
921             logger.info('rank {} changed directory to {}'.format(self.rank, self.workdir))
922             sorted_nodes = nx.topological_sort(graph)
923             runners = []
924             closers = []
925             for name in sorted_nodes:
926                 launcher = graph.nodes[name]['launch']
927                 runner = launcher(self.rank)
928                 if not runner is None:
929                     runners.append(runner)
930                     closers.append(graph.nodes[name]['close'])
931
932             # Get a session object to return. It must simply provide a `run()` function.
933             context = self # Part of workaround for bug gmxapi-214
934             class Session(object):
935                 def __init__(self, runners, closers):
936                     self.runners = list(runners)
937                     self.closers = list(closers)
938
939                 def run(self):
940                     # Note we are not following the documented protocol of running repeatedly yet.
941                     to_be_deleted = []
942                     for i, runner in enumerate(self.runners):
943                         try:
944                             runner()
945                         except StopIteration:
946                             to_be_deleted.insert(0, i)
947                     for i in to_be_deleted:
948                         del self.runners[i]
949                     return True
950
951                 def close(self):
952                     for close in self.closers:
953                         logger.debug("Closing node: {}".format(close))
954                         close()
955                     # Workaround for bug gmxapi-214
956                     if not _gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'):
957                         context._api_object = _gmxapi.Context()
958
959             self._session = Session(runners, closers)
960         else:
961             logger.info("Context rank {} has no work to do".format(self.rank))
962
963             context = self # Part of workaround for bug gmxapi-214
964             class NullSession(object):
965                 def run(self):
966                     logger.info("Running null session on rank {}.".format(self.rank))
967                     return True
968                 def close(self):
969                     logger.info("Closing null session.")
970                     # Workaround for bug gmxapi-214
971                     if not _gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'):
972                         context._api_object = _gmxapi.Context()
973                     return
974
975             self._session = NullSession()
976             self._session.rank = self.rank
977
978         # Make sure session has started on all ranks before continuing?
979
980         self._session.graph = graph
981         return self._session
982
983     def __exit__(self, exception_type, exception_value, traceback):
984         """Implement Python context manager protocol."""
985         logger.info("Exiting session on context rank {}.".format(self.rank))
986         if self._session is not None:
987             logger.info("Calling session.close().")
988             self._session.close()
989             self._session = None
990         else:
991             # Note: we should not have a None session but rather an API-compliant Session that just has no work.
992             # Reference: https://github.com/kassonlab/gmxapi/issues/41
993             logger.info("No _session known to context or session already closed.")
994         if hasattr(self, '_session_ensemble_communicator'):
995             if self._session_communicator is not None:
996                 logger.info("Freeing sub-communicator {} on rank {}".format(
997                     self._session_ensemble_communicator,
998                     self.rank))
999                 self._session_ensemble_communicator.Free()
1000             else:
1001                 logger.debug('"None" ensemble communicator does not need to be "Free"d.')
1002             del self._session_ensemble_communicator
1003         else:
1004             logger.debug("No ensemble subcommunicator on context rank {}.".format(self.rank))
1005
1006         logger.debug("Freeing session communicator.")
1007         self._session_communicator.Free()
1008         logger.debug("Deleting session communicator reference.")
1009         del self._session_communicator
1010
1011         os.chdir(self.__initial_cwd)
1012         logger.info("Session closed on context rank {}.".format(self.rank))
1013         # Note: Since sessions running in different processes can have different work, sessions have not necessarily
1014         # ended on all ranks. As a result, starting another session on the same resources could block until the
1015         # resources are available.
1016
1017         # Python context managers return False when there were no exceptions to handle.
1018         return False
1019
1020
1021 def get_context(work=None):
1022     """Get a concrete Context object.
1023
1024     Args:
1025         work (gmx.workflow.WorkSpec): runnable work as a valid gmx.workflow.WorkSpec object
1026
1027     Returns:
1028         An object implementing the :py:class:`gmx.context.Context` interface, if possible.
1029
1030     Raises:
1031         gmx.exceptions.ValueError if an appropriate context for ``work`` could not be loaded.
1032
1033     If work is provided, return a Context object capable of running the provided work or produce an error.
1034
1035     The semantics for finding Context implementations needs more consideration, and a more informative exception
1036     is likely possible.
1037
1038     A Context can run the provided work if
1039
1040       * the Context supports can resolve all operations specified in the elements
1041       * the Context supports DAG topologies implied by the network of dependencies
1042       * the Context supports features required by the elements with the specified parameters,
1043         such as synchronous array jobs.
1044
1045     """
1046     # We need to define an interface for WorkSpec objects so that we don't need
1047     # to rely on typing and inter-module dependencies.
1048     from .workflow import WorkSpec
1049     workspec = None
1050     if work is not None:
1051         if isinstance(work, WorkSpec):
1052             workspec = work
1053         elif hasattr(work, 'workspec') and isinstance(work.workspec,
1054                                                       WorkSpec):
1055             workspec = work.workspec
1056         else:
1057             raise exceptions.ValueError('work argument must provide a gmx.workflow.WorkSpec.')
1058     if workspec is not None and \
1059             hasattr(workspec, '_context') and \
1060             workspec._context is not None:
1061         context = workspec._context
1062     else:
1063         context = Context(work=workspec)
1064
1065     return context