Add trajectory output to mdrun operation.
[alexxy/gromacs.git] / python_packaging / src / gmxapi / simulation / mdrun.py
1 #
2 # This file is part of the GROMACS molecular simulation package.
3 #
4 # Copyright (c) 2019, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
8 #
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
13 #
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 # Lesser General Public License for more details.
18 #
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
23 #
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
31 #
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
34
35 """mdrun operation module
36
37 The mdrun operation (in its first draft) conforms to the user-level API, but does
38 not use the Python Context resource manager. It uses either the legacy 0.0.7
39 Context or its own Context, also implemented in this module.
40 """
41
42 __all__ = ['mdrun']
43
44 import inspect
45 import os
46 import typing
47 from contextlib import contextmanager
48
49 import gmxapi
50 import gmxapi.abc
51 import gmxapi.operation as _op
52 from gmxapi import exceptions
53
54 # The following imports are not marked as public API.
55 from .abc import ModuleObject
56
57 from . import fileio
58 from . import workflow
59
60 # Initialize module-level logger
61 from gmxapi import logger as root_logger
62
63 logger = root_logger.getChild('mdrun')
64 logger.info('Importing {}'.format(__name__))
65
66
67 # Output in the gmxapi.operation Context.
68 # TODO: Consider using a single base class for the DataProxy, but have distinct
69 #  data descriptor behavior (or different descriptor implementations in different
70 #  subclasses) so that static code inspection can more easily determine the
71 #  attributes of the data proxies.
72 _output_descriptors = (
73     _op.OutputDataDescriptor('_work_dir', str),
74     _op.OutputDataDescriptor('trajectory', str)
75 )
76 _publishing_descriptors = {desc._name: gmxapi.operation.Publisher(desc._name, desc._dtype) for desc in
77                            _output_descriptors}
78 _output = _op.OutputCollectionDescription(**{descriptor._name: descriptor._dtype for descriptor in
79                                              _output_descriptors})
80
81
82 class OutputDataProxy(_op.DataProxyBase,
83                       descriptors=_output_descriptors):
84     """Implement the 'output' attribute of MDRun operations."""
85
86
87 class PublishingDataProxy(_op.DataProxyBase,
88                           descriptors=_publishing_descriptors):
89     """Manage output resource updates for MDRun operation."""
90
91
92 _output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
93                                     output_description=_output,
94                                     publishing_data_proxy=PublishingDataProxy)
95
96
97 # Input in the gmxapi.operation Context for the dispatching runner.
98 # The default empty dictionary for parameters just means that there are no overrides
99 # to the parameters already provided in _simulation_input.
100 _input = _op.InputCollectionDescription(
101     [('_simulation_input', inspect.Parameter('_simulation_input',
102                                              inspect.Parameter.POSITIONAL_OR_KEYWORD,
103                                              annotation=str)),
104      ('parameters', inspect.Parameter('parameters',
105                                       inspect.Parameter.POSITIONAL_OR_KEYWORD,
106                                       annotation=dict,
107                                       default=dict()))
108      ])
109
110
111 def _standard_node_resource_factory(*args, **kwargs):
112     """Translate Python UI input to the gmxapi.operation node builder inputs."""
113     source_collection = _input.bind(*args, **kwargs)
114     logger.info('mdrun input bound as source collection {}'.format(source_collection))
115     return source_collection
116
117
118 @contextmanager
119 def scoped_communicator(original_comm, requested_size: int = None):
120     from gmxapi.simulation.context import _acquire_communicator, _get_ensemble_communicator
121
122     if requested_size is None:
123         communicator = _acquire_communicator(communicator=original_comm)
124
125     else:
126         if original_comm is None or not hasattr(original_comm, 'Get_size'):
127             raise exceptions.UsageError('A valid communicator must be provided when requesting a specific size.')
128         original_comm_size = original_comm.Get_size()
129         if original_comm_size < requested_size:
130             raise exceptions.FeatureNotAvailableError(
131                 'Cannot produce a subcommunicator of size {} from a communicator of size {}.'.format(
132                     requested_size,
133                     original_comm_size
134                 ))
135         assert original_comm_size >= requested_size
136         communicator = _get_ensemble_communicator(original_comm, requested_size)
137
138     try:
139         yield communicator
140     finally:
141         communicator.Free()
142
143
144 class LegacyImplementationSubscription(object):
145     """Input type representing a subscription to 0.0.7 implementation in gmxapi.operation.
146
147     This input resource is a subscription to work that is dispatched to a sub-context.
148     The resource can be created from the standard data of the simulation module.
149     """
150
151     def __init__(self, resource_manager: _op.ResourceManager):
152         from .context import Context as LegacyContext
153         import gmxapi._gmxapi as _gmxapi
154         self._gmxapi = _gmxapi
155
156         assert isinstance(resource_manager, _op.ResourceManager)
157         # We use several details of the gmxapi.operation.Context.ResourceManager.
158         # These dependencies can evolve into the subscription protocol between Contexts.
159
160         # Configure and run a gmxapi 0.0.7 session.
161         # 0. Determine ensemble width.
162         # 1. Choose, create/check working directories.
163         # 2. Create source TPR.
164         # 3. Create workspec.
165         # 3.5 Add plugin potentials, if any.
166         # 4. Run.
167         # 5. Collect outputs from context (including plugin outputs) and be ready to publish them.
168
169         # Determine ensemble width
170         ensemble_width = resource_manager.ensemble_width
171
172         # Choose working directories
173         # TODO: operation working directory naming scheme should be centrally well-defined.
174         # Note that workflow.WorkSpec.uid is currently dependent on the input file parameter,
175         # so we cannot create the input file path in the working directory based on WorkSpec.uid.
176         workdir_list = ['{node}_{member}'.format(node=resource_manager.operation_id,
177                                                  member=member)
178                         for member in range(ensemble_width)]
179
180         # This is a reasonable place to start using MPI ensemble implementation details.
181         # We will want better abstraction in the future, but it is best if related filesystem
182         # accesses occur from the same processes, consistently. Note that we already
183         # handle some optimization and dependency reduction when the ensemble size is 1.
184         # TODO: multithread and multiprocess alternatives to MPI ensemble management.
185
186         # TODO: Allow user to provide communicator instead of implicitly getting COMM_WORLD
187         with scoped_communicator(None) as context_comm:
188             context_rank = context_comm.Get_rank()
189             with scoped_communicator(context_comm, ensemble_width) as ensemble_comm:
190                 # Note that in the current implementation, extra ranks have nothing to do,
191                 # but they may have a dummy communicator, so be sure to skip those members
192                 # of the context_comm.
193                 if context_rank < ensemble_width:
194                     assert ensemble_comm.Get_size() == ensemble_width
195                     ensemble_rank = ensemble_comm.Get_rank()
196                     # TODO: This should be a richer object that includes at least host information
197                     #  and preferably the full gmxapi Future interface.
198                     self.workdir = os.path.abspath(workdir_list[ensemble_rank])
199
200                     with resource_manager.local_input(member=ensemble_rank) as input_pack:
201                         source_file = input_pack.kwargs['_simulation_input']
202                         parameters = input_pack.kwargs['parameters']
203                         # If there are any other key word arguments to process from the gmxapi.mdrun
204                         # factory call, do it here.
205
206                     # TODO: We should really name this file with a useful input-dependent tag.
207                     tprfile = os.path.join(self.workdir, 'topol.tpr')
208
209                     expected_working_files = [tprfile]
210
211                     if os.path.exists(self.workdir):
212                         if os.path.isdir(self.workdir):
213                             # Confirm that this is a restarted simulation.
214                             # It is unspecified by the API, but at least through gmxapi 0.1,
215                             # all simulations are initialized with a checkpoint file named state.cpt
216                             # (see src/api/cpp/context.cpp)
217                             checkpoint_file = os.path.join(self.workdir, 'state.cpp')
218                             expected_working_files.append(checkpoint_file)
219
220                             for file in expected_working_files:
221                                 if not os.path.exists(file):
222                                     raise exceptions.ApiError(
223                                         'Cannot determine working directory state: {}'.format(self.workdir))
224                         else:
225                             raise exceptions.ApiError(
226                                 'Chosen working directory path exists but is not a directory: {}'.format(self.workdir))
227                     else:
228                         # Build the working directory and input files.
229                         os.mkdir(self.workdir)
230                         sim_input = fileio.read_tpr(source_file)
231                         for key, value in parameters.items():
232                             try:
233                                 sim_input.parameters.set(key=key, value=value)
234                             except _gmxapi.Exception as e:
235                                 raise exceptions.ApiError(
236                                     'Bug encountered. Unknown error when trying to set simulation '
237                                     'parameter {} to {}'.format(key, value)
238                                 ) from e
239
240                         fileio.write_tpr_file(output=tprfile, input=sim_input)
241                     logger.info('Created {} on rank {}'.format(tprfile, context_rank))
242
243                     # Gather the actual working directories from the ensemble members.
244                     if hasattr(ensemble_comm, 'allgather'):
245                         # We should not assume that abspath expands the same on different MPI ranks.
246                         workdir_list = ensemble_comm.allgather(self.workdir)
247                         tpr_filenames = ensemble_comm.allgather(tprfile)
248                     else:
249                         workdir_list = [os.path.abspath(workdir) for workdir in workdir_list]
250                         # TODO: If we use better input file names, they need to be updated in multiple places.
251                         tpr_filenames = [os.path.join(workdir, 'topol.tpr') for workdir in workdir_list]
252
253                     logger.debug('Context rank {} acknowledges working directories {}'.format(context_rank,
254                                                                                              workdir_list))
255                     logger.debug('Operation {}:{} will use {}'.format(resource_manager.operation_id,
256                                                                       ensemble_rank,
257                                                                       self.workdir
258                                                                       ))
259                     # TODO: We have not exposed the ability to pass any run time parameters to mdrun.
260                     work = workflow.from_tpr(tpr_filenames)
261                     self.workspec = work.workspec
262                     context = LegacyContext(work=self.workspec, workdir_list=workdir_list, communicator=ensemble_comm)
263                     self.simulation_module_context = context
264                     # Go ahead and execute immediately. No need for lazy initialization in this basic case.
265                     with self.simulation_module_context as session:
266                         session.run()
267                         # TODO: There may be some additional results that we need to extract...
268                     # end: if context_rank < ensemble_width
269
270                 # end scoped_communicator: ensemble_comm
271
272             if context_comm.Get_size() > 1:
273                 context_comm.bcast(workdir_list, root=0)
274             # end scoped_communicator: context_comm
275
276         self.workdir = workdir_list
277
278
279 class SubscriptionSessionResources(object):
280     """Input and output run-time resources for a MDRun subscription.
281
282     A generalization of this class is the probably the main hook for customizing the resources
283     provided to the operation at run time.
284
285     .. todo:: Better factoring of SessionResources, ResourceFactory, Director.resource_factory.
286     """
287
288     def __init__(self, input: LegacyImplementationSubscription, output: PublishingDataProxy):
289         assert isinstance(input, LegacyImplementationSubscription)
290         assert isinstance(output, PublishingDataProxy)
291         self.output = output
292         member_id = self.output._client_identifier
293         # Before enabling the following, be sure we understand what is happening.
294         # if member_id is None:
295         #     member_id = 0
296         self.workdir = input.workdir[member_id]
297
298
299 class SubscriptionPublishingRunner(object):
300     """Handle execution in the gmxapi.operation context as a subscription to the gmxapi.simulation.context."""
301     input_resource_factory = LegacyImplementationSubscription
302
303     def __init__(self, resources: SubscriptionSessionResources):
304         assert isinstance(resources, SubscriptionSessionResources)
305         # Note that the resources contain a reference to a simulation ensemble that has already run.
306         self.resources = resources
307
308     def run(self):
309         """Operation implementation in the gmxapi.operation module context."""
310         publisher = self.resources.output
311         publisher._work_dir = self.resources.workdir
312         # TODO: Make the return value a trajectory handle rather than a file path.
313         # TODO: Decide how to handle append vs. noappend output.
314         # TODO: More rigorous handling of the trajectory file(s)
315         # We have no way to query the name of the trajectory produced, and we
316         # have avoided exposing the ability to specify it, so we have to assume
317         # GROMACS default behavior.
318         publisher.trajectory = os.path.join(self.resources.workdir, 'traj.trr')
319
320
321 _next_uid = 0
322
323
324 def _make_uid(input) -> str:
325     # TODO: Use input fingerprint for more useful identification.
326     salt = hash(input)
327     global _next_uid
328     new_uid = 'mdrun_{}_{}'.format(_next_uid, salt)
329     _next_uid += 1
330     return new_uid
331
332
333 #
334 # Implementation
335 #
336
337
338 class ResourceManager(gmxapi.operation.ResourceManager):
339     """Manage resources for the MDRun operation in the gmxapi.operation contexts.
340
341     Extends gmxapi.operation.ResourceManager to tolerate non-standard data payloads.
342     Futures managed by this resource manager may contain additional attributes.
343     """
344
345     def future(self, name: str, description: _op.ResultDescription):
346         tpr_future = super().future(name=name, description=description)
347         return tpr_future
348
349     def data(self) -> OutputDataProxy:
350         return OutputDataProxy(self)
351
352     def update_output(self):
353         """Override gmxapi.operation.ResourceManager.update_output because we handle paralellism as 0.0.7."""
354         # For the moment, this is copy-pasted from gmxapi.operation.ResourceManager,
355         # but the only part we need to override is the ensemble handling at `for i in range(self.ensemble_width)`
356         # TODO: Reimplement as the resource factory and director for the operation target context.
357         if not self.done():
358             self.__operation_entrance_counter += 1
359             if self.__operation_entrance_counter > 1:
360                 raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
361             if not self.done():
362                 # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
363                 #  operation's library/implementation context.
364
365                 ###
366                 # Note: this is the resource translation from gmxapi.operation context
367                 # to the dispatching runner director. It uses details of the gmxapi.operation.Context
368                 # and of the operation.
369
370                 # TODO: Dispatch/discover this resource factory from a canonical place.
371                 assert hasattr(self._runner_director, 'input_resource_factory')
372                 # Create on all ranks.
373                 input = self._runner_director.input_resource_factory(self)
374                 # End of action of the InputResourceDirector[Context, MdRunSubscription].
375                 ###
376
377                 # We are giving the director a resource that contains the subscription
378                 # to the dispatched work.
379
380                 publishing_resources = self.publishing_resources()
381                 for member in range(self.ensemble_width):
382                     with publishing_resources(ensemble_member=member) as output:
383                         resources = self._resource_factory(input=input, output=output)
384                         runner = self._runner_director(resources)
385                         runner.run()
386
387
388 class StandardInputDescription(_op.InputDescription):
389     """Provide the ReadTpr input description in gmxapi.operation Contexts."""
390
391     def signature(self) -> _op.InputCollectionDescription:
392         return _input
393
394     def make_uid(self, input: _op.DataEdge) -> str:
395         return _make_uid(input)
396
397
398 class RegisteredOperation(_op.OperationImplementation, metaclass=_op.OperationMeta):
399     """Provide the gmxapi compatible ReadTpr implementation."""
400
401     # This is a class method to allow the class object to be used in gmxapi.operation._make_registry_key
402     @classmethod
403     def name(self) -> str:
404         """Canonical name for the operation."""
405         return 'mdrun'
406
407     @classmethod
408     def namespace(self) -> str:
409         """read_tpr is importable from the gmxapi module."""
410         return 'gmxapi'
411
412     @classmethod
413     def director(cls, context: gmxapi.abc.Context) -> _op.OperationDirector:
414         if isinstance(context, _op.Context):
415             return StandardDirector(context)
416
417
418 class StandardOperationHandle(_op.AbstractOperation):
419     """Handle used in Python UI or gmxapi.operation Contexts."""
420
421     def __init__(self, resource_manager: ResourceManager):
422         self.__resource_manager = resource_manager
423
424     def run(self):
425         self.__resource_manager.update_output()
426
427     @property
428     def output(self) -> OutputDataProxy:
429         return self.__resource_manager.data()
430
431
432 class StandardDirector(gmxapi.abc.OperationDirector):
433     """Direct the instantiation of a read_tpr node in a gmxapi.operation Context.
434
435     .. todo:: Compose this behavior in a more generic class.
436
437     .. todo:: Describe where instances live.
438     """
439
440     def __init__(self, context: _op.Context):
441         if not isinstance(context, _op.Context):
442             raise gmxapi.exceptions.ValueError('StandardDirector requires a gmxapi.operation Context.')
443         self.context = context
444
445     def __call__(self, resources: _op.DataSourceCollection, label: str = None) -> StandardOperationHandle:
446         builder = self.context.node_builder(operation=RegisteredOperation, label=label)
447
448         builder.set_resource_factory(SubscriptionSessionResources)
449         builder.set_input_description(StandardInputDescription())
450         builder.set_handle(StandardOperationHandle)
451         # The runner in the gmxapi.operation context is the servicer for the legacy context.
452         builder.set_runner_director(SubscriptionPublishingRunner)
453         builder.set_output_factory(_output_factory)
454         builder.set_resource_manager(ResourceManager)
455
456         # Note: we have not yet done any dispatching based on *resources*. We should
457         # translate the resources provided into the form that the Context can subscribe to
458         # using the dispatching resource_factory. In the second draft, this operation
459         # is dispatched to a SimulationModuleContext, which can be subscribed directly
460         # to sources that are either literal filenames or gmxapi.simulation sources,
461         # while standard Futures can be resolved in the standard context.
462         #
463         assert isinstance(resources, _op.DataSourceCollection)
464         for name, source in resources.items():
465             builder.add_input(name, source)
466
467         handle = builder.build()
468         assert isinstance(handle, StandardOperationHandle)
469         return handle
470
471     def handle_type(self, context: gmxapi.abc.Context):
472         return StandardOperationHandle
473
474     # Developer note: the Director instance is a convenient place to get a dispatching
475     # factory. The Director may become generic or more universal, but the resource_factory
476     # would likely not be typed on the generic parameters of the Director class.
477     # Instead, it is likely a generic function with its own TypeVar parameters.
478     def resource_factory(self,
479                          source: typing.Union[gmxapi.abc.Context, ModuleObject, None],
480                          target: gmxapi.abc.Context = None):
481         # Distinguish between the UIContext, in which input is in the form
482         # of function call arguments, and the StandardContext, implemented in
483         # gmxapi.operation. UIContext is probably a virtual context that is
484         # asserted by callers in order to get a factory that normalizes UI input
485         # for the StandardContext.
486         #
487         if target is None:
488             target = self.context
489         if source is None:
490             if isinstance(target, _op.Context):
491                 # Return a factory that can bind to function call arguments to produce a DataSourceCollection.
492                 return _standard_node_resource_factory
493         if isinstance(source, _op.Context):
494             return SubscriptionSessionResources
495         if isinstance(source, ModuleObject):
496             if isinstance(target, _op.Context):
497                 # We are creating a node in gmxapi.operation.Context from another gmxapi.simulation operation.
498                 # This means that we want to subscribe to the subcontext instead of the gmxapi.operation.Context.
499                 # In the first draft, though, we just access a special payload.
500                 # Return a factory that will consume *_simulation_input* and *parameters*
501                 # members of a received object.
502                 logger.info('Building mdrun operation from source {}'.format(source))
503
504                 def simulation_input_workaround(input):
505                     source = input
506                     if hasattr(source, 'output'):
507                         source = input.output
508                     assert hasattr(source, '_simulation_input')
509                     assert hasattr(source, 'parameters')
510                     logger.info('mdrun receiving input {}: {}'.format(source._simulation_input.name,
511                                                                       source._simulation_input.description))
512                     source_collection = _input.bind(_simulation_input=source._simulation_input,
513                                                     parameters=source.parameters)
514                     logger.info('mdrun input bound as source collection {}'.format(source_collection))
515                     return source_collection
516
517                 return simulation_input_workaround
518
519         raise gmxapi.exceptions.ValueError('No dispatching from {} context to {}'.format(source, target))
520
521
522 def mdrun(input, label: str = None, context=None):
523     """MD simulation operation.
524
525     Arguments:
526         input : valid simulation input
527
528     Returns:
529         runnable operation to perform the specified simulation
530
531     The *output* attribute of the returned operation handle contains dynamically
532     determined outputs from the operation.
533
534     `input` may be a TPR file name or a an object providing the SimulationInput interface.
535
536     Note:
537         New function names will be appearing to handle tasks that are separate
538
539         "simulate" is plausibly a dispatcher or base class for various tasks
540         dispatched by mdrun. Specific work factories are likely "minimize,"
541         "test_particle_insertion," "legacy_simulation" (do_md), or "simulation"
542         composition (which may be leap-frog, vv, and other algorithms)
543     """
544     handle_context = context
545     if handle_context is not None:
546         raise gmxapi.exceptions.NotImplementedError(
547             'context must be None. This factory is only for the Python UI right now.')
548
549     target_context = _op.current_context()
550     assert isinstance(target_context, _op.Context)
551     # Get a director that will create a node in the standard context.
552     node_director = _op._get_operation_director(RegisteredOperation, context=target_context)
553     assert isinstance(node_director, StandardDirector)
554     # TODO: refine this protocol
555     assert handle_context is None
556     # Examine the input.
557     if isinstance(input, ModuleObject):
558         # The input is from read_tpr or modify_input.
559         source_context = input
560     else:
561         # Allow automatic dispatching
562         source_context = None
563
564     resource_factory = node_director.resource_factory(source=source_context, target=target_context)
565     resources = resource_factory(input)
566     handle = node_director(resources=resources, label=label)
567     # Note: One effect of the assertions above is to help the type checker infer
568     # the return type of the handle. It is hard to convince the type checker that
569     # the return value of the node builder is up-cast. We might be able to resolve
570     # this by making both get_operation_director and ReadTprImplementation
571     # generics of the handle type, or using the handle type as the key for
572     # get_operation_director.
573     return handle