Merge 'origin/release-2019' into release-2020
[alexxy/gromacs.git] / python_packaging / src / gmxapi / simulation / mdrun.py
1 #
2 # This file is part of the GROMACS molecular simulation package.
3 #
4 # Copyright (c) 2019, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
8 #
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
13 #
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 # Lesser General Public License for more details.
18 #
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
23 #
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
31 #
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
34
35 """mdrun operation module
36
37 The mdrun operation (in its first draft) conforms to the user-level API, but does
38 not use the Python Context resource manager. It uses either the legacy 0.0.7
39 Context or its own Context, also implemented in this module.
40 """
41
42 __all__ = ['mdrun']
43
44 import inspect
45 import os
46 import typing
47 from contextlib import contextmanager
48
49 import gmxapi
50 import gmxapi.abc
51 import gmxapi.operation as _op
52 from gmxapi import exceptions
53
54 # The following imports are not marked as public API.
55 from .abc import ModuleObject
56
57 from . import fileio
58 from . import workflow
59
60 # Initialize module-level logger
61 from gmxapi import logger as root_logger
62
63 logger = root_logger.getChild('mdrun')
64 logger.info('Importing {}'.format(__name__))
65
66
67 # Output in the gmxapi.operation Context.
68 # TODO: Consider using a single base class for the DataProxy, but have distinct
69 #  data descriptor behavior (or different descriptor implementations in different
70 #  subclasses) so that static code inspection can more easily determine the
71 #  attributes of the data proxies.
72 _output_descriptors = (
73     _op.OutputDataDescriptor('_work_dir', str),
74     _op.OutputDataDescriptor('trajectory', str),
75     _op.OutputDataDescriptor('parameters', dict)
76 )
77 _publishing_descriptors = {desc._name: gmxapi.operation.Publisher(desc._name, desc._dtype) for desc in
78                            _output_descriptors}
79 _output = _op.OutputCollectionDescription(**{descriptor._name: descriptor._dtype for descriptor in
80                                              _output_descriptors})
81
82
83 class OutputDataProxy(_op.DataProxyBase,
84                       descriptors=_output_descriptors):
85     """Implement the 'output' attribute of MDRun operations."""
86
87
88 class PublishingDataProxy(_op.DataProxyBase,
89                           descriptors=_publishing_descriptors):
90     """Manage output resource updates for MDRun operation."""
91
92
93 _output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
94                                     output_description=_output,
95                                     publishing_data_proxy=PublishingDataProxy)
96
97
98 # Input in the gmxapi.operation Context for the dispatching runner.
99 # The default empty dictionary for parameters just means that there are no overrides
100 # to the parameters already provided in _simulation_input.
101 _input = _op.InputCollectionDescription(
102     [('_simulation_input', inspect.Parameter('_simulation_input',
103                                              inspect.Parameter.POSITIONAL_OR_KEYWORD,
104                                              annotation=str)),
105      ('parameters', inspect.Parameter('parameters',
106                                       inspect.Parameter.POSITIONAL_OR_KEYWORD,
107                                       annotation=dict,
108                                       default=dict()))
109      ])
110
111
112 def _standard_node_resource_factory(*args, **kwargs):
113     """Translate Python UI input to the gmxapi.operation node builder inputs."""
114     source_collection = _input.bind(*args, **kwargs)
115     logger.info('mdrun input bound as source collection {}'.format(source_collection))
116     return source_collection
117
118
119 @contextmanager
120 def scoped_communicator(original_comm, requested_size: int = None):
121     from gmxapi.simulation.context import _acquire_communicator, _get_ensemble_communicator
122
123     if requested_size is None:
124         communicator = _acquire_communicator(communicator=original_comm)
125
126     else:
127         if original_comm is None or not hasattr(original_comm, 'Get_size'):
128             raise exceptions.UsageError('A valid communicator must be provided when requesting a specific size.')
129         original_comm_size = original_comm.Get_size()
130         if original_comm_size < requested_size:
131             raise exceptions.FeatureNotAvailableError(
132                 'Cannot produce a subcommunicator of size {} from a communicator of size {}.'.format(
133                     requested_size,
134                     original_comm_size
135                 ))
136         assert original_comm_size >= requested_size
137         communicator = _get_ensemble_communicator(original_comm, requested_size)
138
139     try:
140         yield communicator
141     finally:
142         communicator.Free()
143
144
145 class LegacyImplementationSubscription(object):
146     """Input type representing a subscription to 0.0.7 implementation in gmxapi.operation.
147
148     This input resource is a subscription to work that is dispatched to a sub-context.
149     The resource can be created from the standard data of the simulation module.
150     """
151
152     def __init__(self, resource_manager: _op.ResourceManager):
153         from .context import Context as LegacyContext
154         import gmxapi._gmxapi as _gmxapi
155         self._gmxapi = _gmxapi
156
157         assert isinstance(resource_manager, _op.ResourceManager)
158         # We use several details of the gmxapi.operation.Context.ResourceManager.
159         # These dependencies can evolve into the subscription protocol between Contexts.
160
161         # Configure and run a gmxapi 0.0.7 session.
162         # 0. Determine ensemble width.
163         # 1. Choose, create/check working directories.
164         # 2. Create source TPR.
165         # 3. Create workspec.
166         # 3.5 Add plugin potentials, if any.
167         # 4. Run.
168         # 5. Collect outputs from context (including plugin outputs) and be ready to publish them.
169
170         # Determine ensemble width
171         ensemble_width = resource_manager.ensemble_width
172
173         # Choose working directories
174         # TODO: operation working directory naming scheme should be centrally well-defined.
175         # Note that workflow.WorkSpec.uid is currently dependent on the input file parameter,
176         # so we cannot create the input file path in the working directory based on WorkSpec.uid.
177         workdir_list = ['{node}_{member}'.format(node=resource_manager.operation_id,
178                                                  member=member)
179                         for member in range(ensemble_width)]
180         parameters_dict_list = [{}] * ensemble_width
181
182         # This is a reasonable place to start using MPI ensemble implementation details.
183         # We will want better abstraction in the future, but it is best if related filesystem
184         # accesses occur from the same processes, consistently. Note that we already
185         # handle some optimization and dependency reduction when the ensemble size is 1.
186         # TODO: multithread and multiprocess alternatives to MPI ensemble management.
187
188         # TODO: Allow user to provide communicator instead of implicitly getting COMM_WORLD
189         with scoped_communicator(None) as context_comm:
190             context_rank = context_comm.Get_rank()
191             with scoped_communicator(context_comm, ensemble_width) as ensemble_comm:
192                 # Note that in the current implementation, extra ranks have nothing to do,
193                 # but they may have a dummy communicator, so be sure to skip those members
194                 # of the context_comm.
195                 if context_rank < ensemble_width:
196                     assert ensemble_comm.Get_size() == ensemble_width
197                     ensemble_rank = ensemble_comm.Get_rank()
198                     # TODO: This should be a richer object that includes at least host information
199                     #  and preferably the full gmxapi Future interface.
200                     self.workdir = os.path.abspath(workdir_list[ensemble_rank])
201
202                     with resource_manager.local_input(member=ensemble_rank) as input_pack:
203                         source_file = input_pack.kwargs['_simulation_input']
204                         parameters = input_pack.kwargs['parameters']
205                         # If there are any other key word arguments to process from the gmxapi.mdrun
206                         # factory call, do it here.
207
208                     # TODO: We should really name this file with a useful input-dependent tag.
209                     tprfile = os.path.join(self.workdir, 'topol.tpr')
210
211                     expected_working_files = [tprfile]
212
213                     if os.path.exists(self.workdir):
214                         if os.path.isdir(self.workdir):
215                             # Confirm that this is a restarted simulation.
216                             # It is unspecified by the API, but at least through gmxapi 0.1,
217                             # all simulations are initialized with a checkpoint file named state.cpt
218                             # (see src/api/cpp/context.cpp)
219                             checkpoint_file = os.path.join(self.workdir, 'state.cpp')
220                             expected_working_files.append(checkpoint_file)
221
222                             for file in expected_working_files:
223                                 if not os.path.exists(file):
224                                     raise exceptions.ApiError(
225                                         'Cannot determine working directory state: {}'.format(self.workdir))
226                         else:
227                             raise exceptions.ApiError(
228                                 'Chosen working directory path exists but is not a directory: {}'.format(self.workdir))
229                     else:
230                         # Build the working directory and input files.
231                         os.mkdir(self.workdir)
232                         sim_input = fileio.read_tpr(source_file)
233                         for key, value in parameters.items():
234                             try:
235                                 sim_input.parameters.set(key=key, value=value)
236                             except _gmxapi.Exception as e:
237                                 raise exceptions.ApiError(
238                                     'Bug encountered. Unknown error when trying to set simulation '
239                                     'parameter {} to {}'.format(key, value)
240                                 ) from e
241
242                         fileio.write_tpr_file(output=tprfile, input=sim_input)
243                     logger.info('Created {} on rank {}'.format(tprfile, context_rank))
244
245                     # Gather the actual outputs from the ensemble members.
246                     if hasattr(ensemble_comm, 'allgather'):
247                         # We should not assume that abspath expands the same on different MPI ranks.
248                         workdir_list = ensemble_comm.allgather(self.workdir)
249                         tpr_filenames = ensemble_comm.allgather(tprfile)
250                         parameters = fileio.read_tpr(tprfile).parameters.extract()
251                         parameters_dict_list = ensemble_comm.allgather(parameters)
252                     else:
253                         workdir_list = [os.path.abspath(workdir) for workdir in workdir_list]
254                         # TODO: If we use better input file names, they need to be updated in multiple places.
255                         tpr_filenames = [os.path.join(workdir, 'topol.tpr') for workdir in workdir_list]
256                         parameters_dict_list = [fileio.read_tpr(tprfile).parameters.extract() for tprfile in tpr_filenames]
257
258                     logger.debug('Context rank {} acknowledges working directories {}'.format(context_rank,
259                                                                                              workdir_list))
260                     logger.debug('Operation {}:{} will use {}'.format(resource_manager.operation_id,
261                                                                       ensemble_rank,
262                                                                       self.workdir
263                                                                       ))
264                     # TODO: We have not exposed the ability to pass any run time parameters to mdrun.
265                     work = workflow.from_tpr(tpr_filenames)
266                     self.workspec = work.workspec
267                     context = LegacyContext(work=self.workspec, workdir_list=workdir_list, communicator=ensemble_comm)
268                     self.simulation_module_context = context
269                     # Go ahead and execute immediately. No need for lazy initialization in this basic case.
270                     with self.simulation_module_context as session:
271                         session.run()
272                         # TODO: There may be some additional results that we need to extract...
273                     # end: if context_rank < ensemble_width
274
275                 # end scoped_communicator: ensemble_comm
276
277             if context_comm.Get_size() > 1:
278                 context_comm.bcast(workdir_list, root=0)
279             # end scoped_communicator: context_comm
280
281         self.workdir = workdir_list
282         self.parameters = parameters_dict_list
283
284
285 class SubscriptionSessionResources(object):
286     """Input and output run-time resources for a MDRun subscription.
287
288     A generalization of this class is the probably the main hook for customizing the resources
289     provided to the operation at run time.
290
291     .. todo:: Better factoring of SessionResources, ResourceFactory, Director.resource_factory.
292     """
293
294     def __init__(self, input: LegacyImplementationSubscription, output: PublishingDataProxy):
295         assert isinstance(input, LegacyImplementationSubscription)
296         assert isinstance(output, PublishingDataProxy)
297         self.output = output
298         member_id = self.output._client_identifier
299         # Before enabling the following, be sure we understand what is happening.
300         # if member_id is None:
301         #     member_id = 0
302         self.workdir = input.workdir[member_id]
303         self.parameters = input.parameters[member_id]
304
305
306 class SubscriptionPublishingRunner(object):
307     """Handle execution in the gmxapi.operation context as a subscription to the gmxapi.simulation.context."""
308     input_resource_factory = LegacyImplementationSubscription
309
310     def __init__(self, resources: SubscriptionSessionResources):
311         assert isinstance(resources, SubscriptionSessionResources)
312         # Note that the resources contain a reference to a simulation ensemble that has already run.
313         self.resources = resources
314
315     def run(self):
316         """Operation implementation in the gmxapi.operation module context."""
317         publisher = self.resources.output
318         publisher._work_dir = self.resources.workdir
319         publisher.parameters = self.resources.parameters
320         # TODO: Make the return value a trajectory handle rather than a file path.
321         # TODO: Decide how to handle append vs. noappend output.
322         # TODO: More rigorous handling of the trajectory file(s)
323         # We have no way to query the name of the trajectory produced, and we
324         # have avoided exposing the ability to specify it, so we have to assume
325         # GROMACS default behavior.
326         publisher.trajectory = os.path.join(self.resources.workdir, 'traj.trr')
327
328
329 _next_uid = 0
330
331
332 def _make_uid(input) -> str:
333     # TODO: Use input fingerprint for more useful identification.
334     salt = hash(input)
335     global _next_uid
336     new_uid = 'mdrun_{}_{}'.format(_next_uid, salt)
337     _next_uid += 1
338     return new_uid
339
340
341 #
342 # Implementation
343 #
344
345
346 class ResourceManager(gmxapi.operation.ResourceManager):
347     """Manage resources for the MDRun operation in the gmxapi.operation contexts.
348
349     Extends gmxapi.operation.ResourceManager to tolerate non-standard data payloads.
350     Futures managed by this resource manager may contain additional attributes.
351     """
352
353     def future(self, name: str, description: _op.ResultDescription):
354         tpr_future = super().future(name=name, description=description)
355         return tpr_future
356
357     def data(self) -> OutputDataProxy:
358         return OutputDataProxy(self)
359
360     def update_output(self):
361         """Override gmxapi.operation.ResourceManager.update_output because we handle paralellism as 0.0.7."""
362         # For the moment, this is copy-pasted from gmxapi.operation.ResourceManager,
363         # but the only part we need to override is the ensemble handling at `for i in range(self.ensemble_width)`
364         # TODO: Reimplement as the resource factory and director for the operation target context.
365         if not self.done():
366             self.__operation_entrance_counter += 1
367             if self.__operation_entrance_counter > 1:
368                 raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
369             if not self.done():
370                 # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
371                 #  operation's library/implementation context.
372
373                 ###
374                 # Note: this is the resource translation from gmxapi.operation context
375                 # to the dispatching runner director. It uses details of the gmxapi.operation.Context
376                 # and of the operation.
377
378                 # TODO: Dispatch/discover this resource factory from a canonical place.
379                 assert hasattr(self._runner_director, 'input_resource_factory')
380                 # Create on all ranks.
381                 input = self._runner_director.input_resource_factory(self)
382                 # End of action of the InputResourceDirector[Context, MdRunSubscription].
383                 ###
384
385                 # We are giving the director a resource that contains the subscription
386                 # to the dispatched work.
387
388                 publishing_resources = self.publishing_resources()
389                 for member in range(self.ensemble_width):
390                     with publishing_resources(ensemble_member=member) as output:
391                         resources = self._resource_factory(input=input, output=output)
392                         runner = self._runner_director(resources)
393                         runner.run()
394
395
396 class StandardInputDescription(_op.InputDescription):
397     """Provide the ReadTpr input description in gmxapi.operation Contexts."""
398
399     def signature(self) -> _op.InputCollectionDescription:
400         return _input
401
402     def make_uid(self, input: _op.DataEdge) -> str:
403         return _make_uid(input)
404
405
406 class RegisteredOperation(_op.OperationImplementation, metaclass=_op.OperationMeta):
407     """Provide the gmxapi compatible ReadTpr implementation."""
408
409     # This is a class method to allow the class object to be used in gmxapi.operation._make_registry_key
410     @classmethod
411     def name(self) -> str:
412         """Canonical name for the operation."""
413         return 'mdrun'
414
415     @classmethod
416     def namespace(self) -> str:
417         """read_tpr is importable from the gmxapi module."""
418         return 'gmxapi'
419
420     @classmethod
421     def director(cls, context: gmxapi.abc.Context) -> _op.OperationDirector:
422         if isinstance(context, _op.Context):
423             return StandardDirector(context)
424
425
426 class StandardOperationHandle(_op.AbstractOperation):
427     """Handle used in Python UI or gmxapi.operation Contexts."""
428
429     def __init__(self, resource_manager: ResourceManager):
430         self.__resource_manager = resource_manager
431
432     def run(self):
433         self.__resource_manager.update_output()
434
435     @property
436     def output(self) -> OutputDataProxy:
437         return self.__resource_manager.data()
438
439
440 class StandardDirector(gmxapi.abc.OperationDirector):
441     """Direct the instantiation of a read_tpr node in a gmxapi.operation Context.
442
443     .. todo:: Compose this behavior in a more generic class.
444
445     .. todo:: Describe where instances live.
446     """
447
448     def __init__(self, context: _op.Context):
449         if not isinstance(context, _op.Context):
450             raise gmxapi.exceptions.ValueError('StandardDirector requires a gmxapi.operation Context.')
451         self.context = context
452
453     def __call__(self, resources: _op.DataSourceCollection, label: str = None) -> StandardOperationHandle:
454         builder = self.context.node_builder(operation=RegisteredOperation, label=label)
455
456         builder.set_resource_factory(SubscriptionSessionResources)
457         builder.set_input_description(StandardInputDescription())
458         builder.set_handle(StandardOperationHandle)
459         # The runner in the gmxapi.operation context is the servicer for the legacy context.
460         builder.set_runner_director(SubscriptionPublishingRunner)
461         builder.set_output_factory(_output_factory)
462         builder.set_resource_manager(ResourceManager)
463
464         # Note: we have not yet done any dispatching based on *resources*. We should
465         # translate the resources provided into the form that the Context can subscribe to
466         # using the dispatching resource_factory. In the second draft, this operation
467         # is dispatched to a SimulationModuleContext, which can be subscribed directly
468         # to sources that are either literal filenames or gmxapi.simulation sources,
469         # while standard Futures can be resolved in the standard context.
470         #
471         assert isinstance(resources, _op.DataSourceCollection)
472         for name, source in resources.items():
473             builder.add_input(name, source)
474
475         handle = builder.build()
476         assert isinstance(handle, StandardOperationHandle)
477         return handle
478
479     def handle_type(self, context: gmxapi.abc.Context):
480         return StandardOperationHandle
481
482     # Developer note: the Director instance is a convenient place to get a dispatching
483     # factory. The Director may become generic or more universal, but the resource_factory
484     # would likely not be typed on the generic parameters of the Director class.
485     # Instead, it is likely a generic function with its own TypeVar parameters.
486     def resource_factory(self,
487                          source: typing.Union[gmxapi.abc.Context, ModuleObject, None],
488                          target: gmxapi.abc.Context = None):
489         # Distinguish between the UIContext, in which input is in the form
490         # of function call arguments, and the StandardContext, implemented in
491         # gmxapi.operation. UIContext is probably a virtual context that is
492         # asserted by callers in order to get a factory that normalizes UI input
493         # for the StandardContext.
494         #
495         if target is None:
496             target = self.context
497         if source is None:
498             if isinstance(target, _op.Context):
499                 # Return a factory that can bind to function call arguments to produce a DataSourceCollection.
500                 return _standard_node_resource_factory
501         if isinstance(source, _op.Context):
502             return SubscriptionSessionResources
503         if isinstance(source, ModuleObject):
504             if isinstance(target, _op.Context):
505                 # We are creating a node in gmxapi.operation.Context from another gmxapi.simulation operation.
506                 # This means that we want to subscribe to the subcontext instead of the gmxapi.operation.Context.
507                 # In the first draft, though, we just access a special payload.
508                 # Return a factory that will consume *_simulation_input* and *parameters*
509                 # members of a received object.
510                 logger.info('Building mdrun operation from source {}'.format(source))
511
512                 def simulation_input_workaround(input):
513                     source = input
514                     if hasattr(source, 'output'):
515                         source = input.output
516                     assert hasattr(source, '_simulation_input')
517                     assert hasattr(source, 'parameters')
518                     logger.info('mdrun receiving input {}: {}'.format(source._simulation_input.name,
519                                                                       source._simulation_input.description))
520                     source_collection = _input.bind(_simulation_input=source._simulation_input,
521                                                     parameters=source.parameters)
522                     logger.info('mdrun input bound as source collection {}'.format(source_collection))
523                     return source_collection
524
525                 return simulation_input_workaround
526
527         raise gmxapi.exceptions.ValueError('No dispatching from {} context to {}'.format(source, target))
528
529
530 def mdrun(input, label: str = None, context=None):
531     """MD simulation operation.
532
533     Arguments:
534         input : valid simulation input
535
536     Returns:
537         runnable operation to perform the specified simulation
538
539     The *output* attribute of the returned operation handle contains dynamically
540     determined outputs from the operation.
541
542     `input` may be a TPR file name or a an object providing the SimulationInput interface.
543
544     Note:
545         New function names will be appearing to handle tasks that are separate
546
547         "simulate" is plausibly a dispatcher or base class for various tasks
548         dispatched by mdrun. Specific work factories are likely "minimize,"
549         "test_particle_insertion," "legacy_simulation" (do_md), or "simulation"
550         composition (which may be leap-frog, vv, and other algorithms)
551     """
552     handle_context = context
553     if handle_context is not None:
554         raise gmxapi.exceptions.NotImplementedError(
555             'context must be None. This factory is only for the Python UI right now.')
556
557     target_context = _op.current_context()
558     assert isinstance(target_context, _op.Context)
559     # Get a director that will create a node in the standard context.
560     node_director = _op._get_operation_director(RegisteredOperation, context=target_context)
561     assert isinstance(node_director, StandardDirector)
562     # TODO: refine this protocol
563     assert handle_context is None
564     # Examine the input.
565     if isinstance(input, ModuleObject):
566         # The input is from read_tpr or modify_input.
567         source_context = input
568     else:
569         # Allow automatic dispatching
570         source_context = None
571
572     resource_factory = node_director.resource_factory(source=source_context, target=target_context)
573     resources = resource_factory(input)
574     handle = node_director(resources=resources, label=label)
575     # Note: One effect of the assertions above is to help the type checker infer
576     # the return type of the handle. It is hard to convince the type checker that
577     # the return value of the node builder is up-cast. We might be able to resolve
578     # this by making both get_operation_director and ReadTprImplementation
579     # generics of the handle type, or using the handle type as the key for
580     # get_operation_director.
581     return handle