dae0c1effe3073ffb2a40dbaf6b611cd54d6d8e6
[alexxy/gromacs.git] / python_packaging / src / gmxapi / simulation / mdrun.py
1 #
2 # This file is part of the GROMACS molecular simulation package.
3 #
4 # Copyright (c) 2019,2020,2021, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
8 #
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
13 #
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 # Lesser General Public License for more details.
18 #
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
23 #
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
31 #
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
34
35 """mdrun operation module
36
37 The mdrun operation (in its first draft) conforms to the user-level API, but does
38 not use the Python Context resource manager. It uses either the legacy 0.0.7
39 Context or its own Context, also implemented in this module.
40 """
41
42 __all__ = ['mdrun']
43
44 import inspect
45 import os
46 import typing
47 from contextlib import contextmanager
48
49 import gmxapi
50 import gmxapi.abc
51 import gmxapi.operation as _op
52 from gmxapi import exceptions
53
54 # The following imports are not marked as public API.
55 from .abc import ModuleObject
56
57 from . import fileio
58 from . import workflow
59
60 # Initialize module-level logger
61 from gmxapi import logger as root_logger
62
63 logger = root_logger.getChild('mdrun')
64 logger.info('Importing {}'.format(__name__))
65
66
67 # Output in the gmxapi.operation Context.
68 # TODO: Consider using a single base class for the DataProxy, but have distinct
69 #  data descriptor behavior (or different descriptor implementations in different
70 #  subclasses) so that static code inspection can more easily determine the
71 #  attributes of the data proxies.
72 _output_descriptors = (
73     _op.OutputDataDescriptor('_work_dir', str),
74     _op.OutputDataDescriptor('trajectory', str),
75     _op.OutputDataDescriptor('parameters', dict)
76 )
77 _publishing_descriptors = {desc._name: gmxapi.operation.Publisher(desc._name, desc._dtype) for desc in
78                            _output_descriptors}
79 _output = _op.OutputCollectionDescription(**{descriptor._name: descriptor._dtype for descriptor in
80                                              _output_descriptors})
81
82
83 class OutputDataProxy(_op.DataProxyBase,
84                       descriptors=_output_descriptors):
85     """Implement the 'output' attribute of MDRun operations."""
86
87
88 class PublishingDataProxy(_op.DataProxyBase,
89                           descriptors=_publishing_descriptors):
90     """Manage output resource updates for MDRun operation."""
91
92
93 _output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
94                                     output_description=_output,
95                                     publishing_data_proxy=PublishingDataProxy)
96
97
98 # Input in the gmxapi.operation Context for the dispatching runner.
99 # The default empty dictionary for parameters just means that there are no overrides
100 # to the parameters already provided in _simulation_input.
101 _input = _op.InputCollectionDescription(
102     [('_simulation_input', inspect.Parameter('_simulation_input',
103                                              inspect.Parameter.POSITIONAL_OR_KEYWORD,
104                                              annotation=str)),
105      ('parameters', inspect.Parameter('parameters',
106                                       inspect.Parameter.POSITIONAL_OR_KEYWORD,
107                                       annotation=dict,
108                                       default=dict()))
109      ])
110
111
112 def _standard_node_resource_factory(*args, **kwargs):
113     """Translate Python UI input to the gmxapi.operation node builder inputs."""
114     source_collection = _input.bind(*args, **kwargs)
115     logger.info('mdrun input bound as source collection {}'.format(source_collection))
116     return source_collection
117
118
119 @contextmanager
120 def scoped_communicator(original_comm, requested_size: int = None):
121     from gmxapi.simulation.context import _acquire_communicator, _get_ensemble_communicator
122
123     if requested_size is None:
124         communicator = _acquire_communicator(communicator=original_comm)
125
126     else:
127         if original_comm is None or not hasattr(original_comm, 'Get_size'):
128             raise exceptions.UsageError('A valid communicator must be provided when requesting a specific size.')
129         original_comm_size = original_comm.Get_size()
130         if original_comm_size < requested_size:
131             raise exceptions.FeatureNotAvailableError(
132                 'Cannot produce a subcommunicator of size {} from a communicator of size {}.'.format(
133                     requested_size,
134                     original_comm_size
135                 ))
136         assert original_comm_size >= requested_size
137         communicator = _get_ensemble_communicator(original_comm, requested_size)
138
139     try:
140         yield communicator
141     finally:
142         communicator.Free()
143
144
145 class LegacyImplementationSubscription(object):
146     """Input type representing a subscription to 0.0.7 implementation in gmxapi.operation.
147
148     This input resource is a subscription to work that is dispatched to a sub-context.
149     The resource can be created from the standard data of the simulation module.
150     """
151
152     def __init__(self, resource_manager: _op.ResourceManager):
153         from .context import Context as LegacyContext
154         import gmxapi._gmxapi as _gmxapi
155         self._gmxapi = _gmxapi
156
157         assert isinstance(resource_manager, _op.ResourceManager)
158         # We use several details of the gmxapi.operation.Context.ResourceManager.
159         # These dependencies can evolve into the subscription protocol between Contexts.
160
161         # Configure and run a gmxapi 0.0.7 session.
162         # 0. Determine ensemble width.
163         # 1. Choose, create/check working directories.
164         # 2. Create source TPR.
165         # 3. Create workspec.
166         # 3.5 Add plugin potentials, if any.
167         # 4. Run.
168         # 5. Collect outputs from context (including plugin outputs) and be ready to publish them.
169
170         # Determine ensemble width
171         ensemble_width = resource_manager.ensemble_width
172
173         # Choose working directories
174         # TODO: operation working directory naming scheme should be centrally well-defined.
175         # Note that workflow.WorkSpec.uid is currently dependent on the input file parameter,
176         # so we cannot create the input file path in the working directory based on WorkSpec.uid.
177         workdir_list = ['{node}_{member}'.format(node=resource_manager.operation_id,
178                                                  member=member)
179                         for member in range(ensemble_width)]
180         parameters_dict_list = [{}] * ensemble_width
181
182         # This is a reasonable place to start using MPI ensemble implementation details.
183         # We will want better abstraction in the future, but it is best if related filesystem
184         # accesses occur from the same processes, consistently. Note that we already
185         # handle some optimization and dependency reduction when the ensemble size is 1.
186         # TODO: multithread and multiprocess alternatives to MPI ensemble management.
187
188         # TODO: Allow user to provide communicator instead of implicitly getting COMM_WORLD
189         with scoped_communicator(None) as context_comm:
190             context_rank = context_comm.Get_rank()
191             with scoped_communicator(context_comm, ensemble_width) as ensemble_comm:
192                 # Note that in the current implementation, extra ranks have nothing to do,
193                 # but they may have a dummy communicator, so be sure to skip those members
194                 # of the context_comm.
195                 if context_rank < ensemble_width:
196                     assert ensemble_comm.Get_size() == ensemble_width
197                     ensemble_rank = ensemble_comm.Get_rank()
198                     # TODO: This should be a richer object that includes at least host information
199                     #  and preferably the full gmxapi Future interface.
200                     self.workdir = os.path.abspath(workdir_list[ensemble_rank])
201
202                     with resource_manager.local_input(member=ensemble_rank) as input_pack:
203                         source_file = input_pack.kwargs['_simulation_input']
204                         parameters = input_pack.kwargs['parameters']
205                         # If there are any other key word arguments to process from the gmxapi.mdrun
206                         # factory call, do it here.
207
208                     # TODO: We should really name this file with a useful input-dependent tag.
209                     tprfile = os.path.join(self.workdir, 'topol.tpr')
210
211                     expected_working_files = [tprfile]
212
213                     if os.path.exists(self.workdir):
214                         if os.path.isdir(self.workdir):
215                             # Confirm that this is a restarted simulation.
216                             # It is unspecified by the API, but at least through gmxapi 0.1,
217                             # all simulations are initialized with a checkpoint file named state.cpt
218                             # (see src/api/cpp/context.cpp)
219                             checkpoint_file = os.path.join(self.workdir, 'state.cpt')
220                             expected_working_files.append(checkpoint_file)
221
222                             for file in expected_working_files:
223                                 if not os.path.exists(file):
224                                     raise exceptions.ApiError(
225                                         'Cannot determine working directory state: {}'.format(self.workdir))
226                         else:
227                             raise exceptions.ApiError(
228                                 'Chosen working directory path exists but is not a directory: {}'.format(self.workdir))
229                     else:
230                         # Build the working directory and input files.
231                         os.mkdir(self.workdir)
232                         sim_input = fileio.read_tpr(source_file)
233                         for key, value in parameters.items():
234                             try:
235                                 sim_input.parameters.set(key=key, value=value)
236                             except _gmxapi.Exception as e:
237                                 raise exceptions.ApiError(
238                                     'Bug encountered. Unknown error when trying to set simulation '
239                                     'parameter {} to {}'.format(key, value)
240                                 ) from e
241
242                         fileio.write_tpr_file(output=tprfile, input=sim_input)
243                     logger.info('Created {} on rank {}'.format(tprfile, context_rank))
244
245                     # Gather the actual outputs from the ensemble members.
246                     if hasattr(ensemble_comm, 'allgather'):
247                         # We should not assume that abspath expands the same on different MPI ranks.
248                         workdir_list = ensemble_comm.allgather(self.workdir)
249                         tpr_filenames = ensemble_comm.allgather(tprfile)
250                         parameters = fileio.read_tpr(tprfile).parameters.extract()
251                         parameters_dict_list = ensemble_comm.allgather(parameters)
252                     else:
253                         workdir_list = [os.path.abspath(workdir) for workdir in workdir_list]
254                         # TODO: If we use better input file names, they need to be updated in multiple places.
255                         tpr_filenames = [os.path.join(workdir, 'topol.tpr') for workdir in workdir_list]
256                         parameters_dict_list = [fileio.read_tpr(tprfile).parameters.extract() for tprfile in tpr_filenames]
257
258                     logger.debug('Context rank {} acknowledges working directories {}'.format(context_rank,
259                                                                                              workdir_list))
260                     logger.debug('Operation {}:{} will use {}'.format(resource_manager.operation_id,
261                                                                       ensemble_rank,
262                                                                       self.workdir
263                                                                       ))
264                     # TODO: (#3718) Normalize the way we pass run time parameters to mdrun.
265                     kwargs = getattr(resource_manager, 'mdrun_kwargs', {})
266                     for key, value in kwargs.items():
267                         logger.debug('Adding mdrun run time argument: {}'.format(key + '=' + str(value)))
268                     work = workflow.from_tpr(tpr_filenames, **kwargs)
269                     self.workspec = work.workspec
270                     context = LegacyContext(work=self.workspec, workdir_list=workdir_list, communicator=ensemble_comm)
271                     self.simulation_module_context = context
272                     # Go ahead and execute immediately. No need for lazy initialization in this basic case.
273                     with self.simulation_module_context as session:
274                         session.run()
275                         # TODO: There may be some additional results that we need to extract...
276                     # end: if context_rank < ensemble_width
277
278                 # end scoped_communicator: ensemble_comm
279
280             if context_comm.Get_size() > 1:
281                 context_comm.bcast(workdir_list, root=0)
282             # end scoped_communicator: context_comm
283
284         self.workdir = workdir_list
285         self.parameters = parameters_dict_list
286
287
288 class SubscriptionSessionResources(object):
289     """Input and output run-time resources for a MDRun subscription.
290
291     A generalization of this class is the probably the main hook for customizing the resources
292     provided to the operation at run time.
293
294     .. todo:: Better factoring of SessionResources, ResourceFactory, Director.resource_factory.
295     """
296
297     def __init__(self, input: LegacyImplementationSubscription, output: PublishingDataProxy):
298         assert isinstance(input, LegacyImplementationSubscription)
299         assert isinstance(output, PublishingDataProxy)
300         self.output = output
301         member_id = self.output._client_identifier
302         # Before enabling the following, be sure we understand what is happening.
303         # if member_id is None:
304         #     member_id = 0
305         self.workdir = input.workdir[member_id]
306         self.parameters = input.parameters[member_id]
307
308
309 class SubscriptionPublishingRunner(object):
310     """Handle execution in the gmxapi.operation context as a subscription to the gmxapi.simulation.context."""
311     input_resource_factory = LegacyImplementationSubscription
312
313     def __init__(self, resources: SubscriptionSessionResources):
314         assert isinstance(resources, SubscriptionSessionResources)
315         # Note that the resources contain a reference to a simulation ensemble that has already run.
316         self.resources = resources
317
318     def run(self):
319         """Operation implementation in the gmxapi.operation module context."""
320         publisher = self.resources.output
321         publisher._work_dir = self.resources.workdir
322         publisher.parameters = self.resources.parameters
323         # TODO: Make the return value a trajectory handle rather than a file path.
324         # TODO: Decide how to handle append vs. noappend output.
325         # TODO: More rigorous handling of the trajectory file(s)
326         # We have no way to query the name of the trajectory produced, and we
327         # have avoided exposing the ability to specify it, so we have to assume
328         # GROMACS default behavior.
329         publisher.trajectory = os.path.join(self.resources.workdir, 'traj.trr')
330
331
332 _next_uid = 0
333
334
335 def _make_uid(input) -> str:
336     # TODO: Use input fingerprint for more useful identification.
337     salt = hash(input)
338     global _next_uid
339     new_uid = 'mdrun_{}_{}'.format(_next_uid, salt)
340     _next_uid += 1
341     return new_uid
342
343
344 #
345 # Implementation
346 #
347
348
349 class ResourceManager(gmxapi.operation.ResourceManager):
350     """Manage resources for the MDRun operation in the gmxapi.operation contexts.
351
352     Extends gmxapi.operation.ResourceManager to tolerate non-standard data payloads.
353     Futures managed by this resource manager may contain additional attributes.
354     """
355
356     def future(self, name: str, description: _op.ResultDescription):
357         tpr_future = super().future(name=name, description=description)
358         return tpr_future
359
360     def data(self) -> OutputDataProxy:
361         return OutputDataProxy(self)
362
363     def update_output(self):
364         """Override gmxapi.operation.ResourceManager.update_output because we handle paralellism as 0.0.7."""
365         # For the moment, this is copy-pasted from gmxapi.operation.ResourceManager,
366         # but the only part we need to override is the ensemble handling at `for i in range(self.ensemble_width)`
367         # TODO: Reimplement as the resource factory and director for the operation target context.
368         if not self.done():
369             self.__operation_entrance_counter += 1
370             if self.__operation_entrance_counter > 1:
371                 raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
372             if not self.done():
373                 # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
374                 #  operation's library/implementation context.
375
376                 ###
377                 # Note: this is the resource translation from gmxapi.operation context
378                 # to the dispatching runner director. It uses details of the gmxapi.operation.Context
379                 # and of the operation.
380
381                 # TODO: Dispatch/discover this resource factory from a canonical place.
382                 assert hasattr(self._runner_director, 'input_resource_factory')
383                 # Create on all ranks.
384                 input = self._runner_director.input_resource_factory(self)
385                 # End of action of the InputResourceDirector[Context, MdRunSubscription].
386                 ###
387
388                 # We are giving the director a resource that contains the subscription
389                 # to the dispatched work.
390
391                 publishing_resources = self.publishing_resources()
392                 for member in range(self.ensemble_width):
393                     with publishing_resources(ensemble_member=member) as output:
394                         resources = self._resource_factory(input=input, output=output)
395                         runner = self._runner_director(resources)
396                         runner.run()
397
398
399 class StandardInputDescription(_op.InputDescription):
400     """Provide the ReadTpr input description in gmxapi.operation Contexts."""
401
402     def signature(self) -> _op.InputCollectionDescription:
403         return _input
404
405     def make_uid(self, input: _op.DataEdge) -> str:
406         return _make_uid(input)
407
408
409 class RegisteredOperation(_op.OperationImplementation, metaclass=_op.OperationMeta):
410     """Provide the gmxapi compatible ReadTpr implementation."""
411
412     # This is a class method to allow the class object to be used in gmxapi.operation._make_registry_key
413     @classmethod
414     def name(self) -> str:
415         """Canonical name for the operation."""
416         return 'mdrun'
417
418     @classmethod
419     def namespace(self) -> str:
420         """read_tpr is importable from the gmxapi module."""
421         return 'gmxapi'
422
423     @classmethod
424     def director(cls, context: gmxapi.abc.Context) -> _op.OperationDirector:
425         if isinstance(context, _op.Context):
426             return StandardDirector(context)
427
428
429 class StandardOperationHandle(_op.AbstractOperation):
430     """Handle used in Python UI or gmxapi.operation Contexts."""
431
432     def __init__(self, resource_manager: ResourceManager):
433         self.__resource_manager = resource_manager
434
435     def run(self):
436         self.__resource_manager.update_output()
437
438     @property
439     def output(self) -> OutputDataProxy:
440         return self.__resource_manager.data()
441
442
443 class StandardDirector(gmxapi.abc.OperationDirector):
444     """Direct the instantiation of a read_tpr node in a gmxapi.operation Context.
445
446     .. todo:: Compose this behavior in a more generic class.
447
448     .. todo:: Describe where instances live.
449     """
450
451     def __init__(self, context: _op.Context):
452         if not isinstance(context, _op.Context):
453             raise gmxapi.exceptions.ValueError('StandardDirector requires a gmxapi.operation Context.')
454         self.context = context
455
456     def __call__(self, resources: _op.DataSourceCollection, label: str = None) -> StandardOperationHandle:
457         builder = self.context.node_builder(operation=RegisteredOperation, label=label)
458
459         builder.set_resource_factory(SubscriptionSessionResources)
460         builder.set_input_description(StandardInputDescription())
461         builder.set_handle(StandardOperationHandle)
462         # The runner in the gmxapi.operation context is the servicer for the legacy context.
463         builder.set_runner_director(SubscriptionPublishingRunner)
464         builder.set_output_factory(_output_factory)
465         builder.set_resource_manager(ResourceManager)
466
467         # Note: we have not yet done any dispatching based on *resources*. We should
468         # translate the resources provided into the form that the Context can subscribe to
469         # using the dispatching resource_factory. In the second draft, this operation
470         # is dispatched to a SimulationModuleContext, which can be subscribed directly
471         # to sources that are either literal filenames or gmxapi.simulation sources,
472         # while standard Futures can be resolved in the standard context.
473         #
474         assert isinstance(resources, _op.DataSourceCollection)
475         for name, source in resources.items():
476             builder.add_input(name, source)
477
478         handle = builder.build()
479         assert isinstance(handle, StandardOperationHandle)
480         return handle
481
482     def handle_type(self, context: gmxapi.abc.Context):
483         return StandardOperationHandle
484
485     # Developer note: the Director instance is a convenient place to get a dispatching
486     # factory. The Director may become generic or more universal, but the resource_factory
487     # would likely not be typed on the generic parameters of the Director class.
488     # Instead, it is likely a generic function with its own TypeVar parameters.
489     def resource_factory(self,
490                          source: typing.Union[gmxapi.abc.Context, ModuleObject, None],
491                          target: gmxapi.abc.Context = None):
492         # Distinguish between the UIContext, in which input is in the form
493         # of function call arguments, and the StandardContext, implemented in
494         # gmxapi.operation. UIContext is probably a virtual context that is
495         # asserted by callers in order to get a factory that normalizes UI input
496         # for the StandardContext.
497         #
498         if target is None:
499             target = self.context
500         if source is None:
501             if isinstance(target, _op.Context):
502                 # Return a factory that can bind to function call arguments to produce a DataSourceCollection.
503                 return _standard_node_resource_factory
504         if isinstance(source, _op.Context):
505             return SubscriptionSessionResources
506         if isinstance(source, ModuleObject):
507             if isinstance(target, _op.Context):
508                 # We are creating a node in gmxapi.operation.Context from another gmxapi.simulation operation.
509                 # This means that we want to subscribe to the subcontext instead of the gmxapi.operation.Context.
510                 # In the first draft, though, we just access a special payload.
511                 # Return a factory that will consume *_simulation_input* and *parameters*
512                 # members of a received object.
513                 logger.info('Building mdrun operation from source {}'.format(source))
514
515                 def simulation_input_workaround(input):
516                     source = input
517                     if hasattr(source, 'output'):
518                         source = input.output
519                     assert hasattr(source, '_simulation_input')
520                     assert hasattr(source, 'parameters')
521                     logger.info('mdrun receiving input {}: {}'.format(source._simulation_input.name,
522                                                                       source._simulation_input.description))
523                     source_collection = _input.bind(_simulation_input=source._simulation_input,
524                                                     parameters=source.parameters)
525                     logger.info('mdrun input bound as source collection {}'.format(source_collection))
526                     return source_collection
527
528                 return simulation_input_workaround
529
530         raise gmxapi.exceptions.ValueError('No dispatching from {} context to {}'.format(source, target))
531
532
533 def mdrun(input, label: str = None, context=None):
534     """MD simulation operation.
535
536     Arguments:
537         input : valid simulation input
538
539     Returns:
540         runnable operation to perform the specified simulation
541
542     The *output* attribute of the returned operation handle contains dynamically
543     determined outputs from the operation.
544
545     `input` may be a TPR file name or a an object providing the SimulationInput interface.
546
547     Note:
548         New function names will be appearing to handle tasks that are separate
549
550         "simulate" is plausibly a dispatcher or base class for various tasks
551         dispatched by mdrun. Specific work factories are likely "minimize,"
552         "test_particle_insertion," "legacy_simulation" (do_md), or "simulation"
553         composition (which may be leap-frog, vv, and other algorithms)
554     """
555     handle_context = context
556     if handle_context is not None:
557         raise gmxapi.exceptions.NotImplementedError(
558             'context must be None. This factory is only for the Python UI right now.')
559
560     target_context = _op.current_context()
561     assert isinstance(target_context, _op.Context)
562     # Get a director that will create a node in the standard context.
563     node_director = _op._get_operation_director(RegisteredOperation, context=target_context)
564     assert isinstance(node_director, StandardDirector)
565     # TODO: refine this protocol
566     assert handle_context is None
567     # Examine the input.
568     if isinstance(input, ModuleObject):
569         # The input is from read_tpr or modify_input.
570         source_context = input
571     else:
572         # Allow automatic dispatching
573         source_context = None
574
575     resource_factory = node_director.resource_factory(source=source_context, target=target_context)
576     resources = resource_factory(input)
577     handle = node_director(resources=resources, label=label)
578     # Note: One effect of the assertions above is to help the type checker infer
579     # the return type of the handle. It is hard to convince the type checker that
580     # the return value of the node builder is up-cast. We might be able to resolve
581     # this by making both get_operation_director and ReadTprImplementation
582     # generics of the handle type, or using the handle type as the key for
583     # get_operation_director.
584     return handle