Minor tidying to mdrun.py
[alexxy/gromacs.git] / python_packaging / src / gmxapi / simulation / mdrun.py
1 #
2 # This file is part of the GROMACS molecular simulation package.
3 #
4 # Copyright (c) 2019, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
8 #
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
13 #
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 # Lesser General Public License for more details.
18 #
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
23 #
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
31 #
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
34
35 """mdrun operation module
36
37 The mdrun operation (in its first draft) conforms to the user-level API, but does
38 not use the Python Context resource manager. It uses either the legacy 0.0.7
39 Context or its own Context, also implemented in this module.
40 """
41
42 __all__ = ['mdrun']
43
44 import inspect
45 import os
46 import typing
47 from contextlib import contextmanager
48
49 import gmxapi
50 import gmxapi.abc
51 import gmxapi.operation as _op
52 from gmxapi import exceptions
53
54 # The following imports are not marked as public API.
55 from .abc import ModuleObject
56
57 from . import fileio
58 from . import workflow
59
60 # Initialize module-level logger
61 from gmxapi import logger as root_logger
62
63 logger = root_logger.getChild('mdrun')
64 logger.info('Importing {}'.format(__name__))
65
66
67 # Output in the gmxapi.operation Context.
68 # TODO: Consider using a single base class for the DataProxy, but have distinct
69 #  data descriptor behavior (or different descriptor implementations in different
70 #  subclasses) so that static code inspection can more easily determine the
71 #  attributes of the data proxies.
72 _output_descriptors = (
73     _op.OutputDataDescriptor('_work_dir', str),
74 )
75 _publishing_descriptors = {desc._name: gmxapi.operation.Publisher(desc._name, desc._dtype) for desc in
76                            _output_descriptors}
77 _output = _op.OutputCollectionDescription(**{descriptor._name: descriptor._dtype for descriptor in
78                                              _output_descriptors})
79
80
81 class OutputDataProxy(_op.DataProxyBase,
82                       descriptors=_output_descriptors):
83     """Implement the 'output' attribute of MDRun operations."""
84
85
86 class PublishingDataProxy(_op.DataProxyBase,
87                           descriptors=_publishing_descriptors):
88     """Manage output resource updates for MDRun operation."""
89
90
91 _output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
92                                     output_description=_output,
93                                     publishing_data_proxy=PublishingDataProxy)
94
95
96 # Input in the gmxapi.operation Context for the dispatching runner.
97 # The default empty dictionary for parameters just means that there are no overrides
98 # to the parameters already provided in _simulation_input.
99 _input = _op.InputCollectionDescription(
100     [('_simulation_input', inspect.Parameter('_simulation_input',
101                                              inspect.Parameter.POSITIONAL_OR_KEYWORD,
102                                              annotation=str)),
103      ('parameters', inspect.Parameter('parameters',
104                                       inspect.Parameter.POSITIONAL_OR_KEYWORD,
105                                       annotation=dict,
106                                       default=dict()))
107      ])
108
109
110 def _standard_node_resource_factory(*args, **kwargs):
111     """Translate Python UI input to the gmxapi.operation node builder inputs."""
112     source_collection = _input.bind(*args, **kwargs)
113     logger.info('mdrun input bound as source collection {}'.format(source_collection))
114     return source_collection
115
116
117 @contextmanager
118 def scoped_communicator(original_comm, requested_size: int = None):
119     from gmxapi.simulation.context import _acquire_communicator, _get_ensemble_communicator
120
121     if requested_size is None:
122         communicator = _acquire_communicator(communicator=original_comm)
123
124     else:
125         if original_comm is None or not hasattr(original_comm, 'Get_size'):
126             raise exceptions.UsageError('A valid communicator must be provided when requesting a specific size.')
127         original_comm_size = original_comm.Get_size()
128         if original_comm_size < requested_size:
129             raise exceptions.FeatureNotAvailableError(
130                 'Cannot produce a subcommunicator of size {} from a communicator of size {}.'.format(
131                     requested_size,
132                     original_comm_size
133                 ))
134         assert original_comm_size >= requested_size
135         communicator = _get_ensemble_communicator(original_comm, requested_size)
136
137     try:
138         yield communicator
139     finally:
140         communicator.Free()
141
142
143 class LegacyImplementationSubscription(object):
144     """Input type representing a subscription to 0.0.7 implementation in gmxapi.operation.
145
146     This input resource is a subscription to work that is dispatched to a sub-context.
147     The resource can be created from the standard data of the simulation module.
148     """
149
150     def __init__(self, resource_manager: _op.ResourceManager):
151         from .context import Context as LegacyContext
152         import gmxapi._gmxapi as _gmxapi
153         self._gmxapi = _gmxapi
154
155         assert isinstance(resource_manager, _op.ResourceManager)
156         # We use several details of the gmxapi.operation.Context.ResourceManager.
157         # These dependencies can evolve into the subscription protocol between Contexts.
158
159         # Configure and run a gmxapi 0.0.7 session.
160         # 0. Determine ensemble width.
161         # 1. Choose, create/check working directories.
162         # 2. Create source TPR.
163         # 3. Create workspec.
164         # 3.5 Add plugin potentials, if any.
165         # 4. Run.
166         # 5. Collect outputs from context (including plugin outputs) and be ready to publish them.
167
168         # Determine ensemble width
169         ensemble_width = resource_manager.ensemble_width
170
171         # Choose working directories
172         # TODO: operation working directory naming scheme should be centrally well-defined.
173         # Note that workflow.WorkSpec.uid is currently dependent on the input file parameter,
174         # so we cannot create the input file path in the working directory based on WorkSpec.uid.
175         workdir_list = ['{node}_{member}'.format(node=resource_manager.operation_id,
176                                                  member=member)
177                         for member in range(ensemble_width)]
178
179         # This is a reasonable place to start using MPI ensemble implementation details.
180         # We will want better abstraction in the future, but it is best if related filesystem
181         # accesses occur from the same processes, consistently. Note that we already
182         # handle some optimization and dependency reduction when the ensemble size is 1.
183         # TODO: multithread and multiprocess alternatives to MPI ensemble management.
184
185         # TODO: Allow user to provide communicator instead of implicitly getting COMM_WORLD
186         with scoped_communicator(None) as context_comm:
187             context_rank = context_comm.Get_rank()
188             with scoped_communicator(context_comm, ensemble_width) as ensemble_comm:
189                 # Note that in the current implementation, extra ranks have nothing to do,
190                 # but they may have a dummy communicator, so be sure to skip those members
191                 # of the context_comm.
192                 if context_rank < ensemble_width:
193                     assert ensemble_comm.Get_size() == ensemble_width
194                     ensemble_rank = ensemble_comm.Get_rank()
195                     # TODO: This should be a richer object that includes at least host information
196                     #  and preferably the full gmxapi Future interface.
197                     self.workdir = os.path.abspath(workdir_list[ensemble_rank])
198
199                     with resource_manager.local_input(member=ensemble_rank) as input_pack:
200                         source_file = input_pack.kwargs['_simulation_input']
201                         parameters = input_pack.kwargs['parameters']
202                         # If there are any other key word arguments to process from the gmxapi.mdrun
203                         # factory call, do it here.
204
205                     # TODO: We should really name this file with a useful input-dependent tag.
206                     tprfile = os.path.join(self.workdir, 'topol.tpr')
207
208                     expected_working_files = [tprfile]
209
210                     if os.path.exists(self.workdir):
211                         if os.path.isdir(self.workdir):
212                             # Confirm that this is a restarted simulation.
213                             # It is unspecified by the API, but at least through gmxapi 0.1,
214                             # all simulations are initialized with a checkpoint file named state.cpt
215                             # (see src/api/cpp/context.cpp)
216                             checkpoint_file = os.path.join(self.workdir, 'state.cpp')
217                             expected_working_files.append(checkpoint_file)
218
219                             for file in expected_working_files:
220                                 if not os.path.exists(file):
221                                     raise exceptions.ApiError(
222                                         'Cannot determine working directory state: {}'.format(self.workdir))
223                         else:
224                             raise exceptions.ApiError(
225                                 'Chosen working directory path exists but is not a directory: {}'.format(self.workdir))
226                     else:
227                         # Build the working directory and input files.
228                         os.mkdir(self.workdir)
229                         sim_input = fileio.read_tpr(source_file)
230                         for key, value in parameters.items():
231                             try:
232                                 sim_input.parameters.set(key=key, value=value)
233                             except _gmxapi.Exception as e:
234                                 raise exceptions.ApiError(
235                                     'Bug encountered. Unknown error when trying to set simulation '
236                                     'parameter {} to {}'.format(key, value)
237                                 ) from e
238
239                         fileio.write_tpr_file(output=tprfile, input=sim_input)
240                     logger.info('Created {} on rank {}'.format(tprfile, context_rank))
241
242                     # Gather the actual working directories from the ensemble members.
243                     if hasattr(ensemble_comm, 'allgather'):
244                         # We should not assume that abspath expands the same on different MPI ranks.
245                         workdir_list = ensemble_comm.allgather(self.workdir)
246                         tpr_filenames = ensemble_comm.allgather(tprfile)
247                     else:
248                         workdir_list = [os.path.abspath(workdir) for workdir in workdir_list]
249                         # TODO: If we use better input file names, they need to be updated in multiple places.
250                         tpr_filenames = [os.path.join(workdir, 'topol.tpr') for workdir in workdir_list]
251
252                     logger.debug('Context rank {} acknowledges working directories {}'.format(context_rank,
253                                                                                              workdir_list))
254                     logger.debug('Operation {}:{} will use {}'.format(resource_manager.operation_id,
255                                                                       ensemble_rank,
256                                                                       self.workdir
257                                                                       ))
258                     # TODO: We have not exposed the ability to pass any run time parameters to mdrun.
259                     work = workflow.from_tpr(tpr_filenames)
260                     self.workspec = work.workspec
261                     context = LegacyContext(work=self.workspec, workdir_list=workdir_list, communicator=ensemble_comm)
262                     self.simulation_module_context = context
263                     # Go ahead and execute immediately. No need for lazy initialization in this basic case.
264                     with self.simulation_module_context as session:
265                         session.run()
266                         # TODO: There may be some additional results that we need to extract...
267                     # end: if context_rank < ensemble_width
268
269                 # end scoped_communicator: ensemble_comm
270
271             if context_comm.Get_size() > 1:
272                 context_comm.bcast(workdir_list, root=0)
273             # end scoped_communicator: context_comm
274
275         self.workdir = workdir_list
276
277
278 class SubscriptionSessionResources(object):
279     """Input and output run-time resources for a MDRun subscription.
280
281     A generalization of this class is the probably the main hook for customizing the resources
282     provided to the operation at run time.
283
284     .. todo:: Better factoring of SessionResources, ResourceFactory, Director.resource_factory.
285     """
286
287     def __init__(self, input: LegacyImplementationSubscription, output: PublishingDataProxy):
288         assert isinstance(input, LegacyImplementationSubscription)
289         assert isinstance(output, PublishingDataProxy)
290         self.output = output
291         member_id = self.output._client_identifier
292         # Before enabling the following, be sure we understand what is happening.
293         # if member_id is None:
294         #     member_id = 0
295         self.workdir = input.workdir[member_id]
296
297
298 class SubscriptionPublishingRunner(object):
299     """Handle execution in the gmxapi.operation context as a subscription to the gmxapi.simulation.context."""
300     input_resource_factory = LegacyImplementationSubscription
301
302     def __init__(self, resources: SubscriptionSessionResources):
303         assert isinstance(resources, SubscriptionSessionResources)
304         # Note that the resources contain a reference to a simulation ensemble that has already run.
305         self.resources = resources
306
307     def run(self):
308         """Operation implementation in the gmxapi.operation module context."""
309         publisher = self.resources.output
310         publisher._work_dir = self.resources.workdir
311
312
313 _next_uid = 0
314
315
316 def _make_uid(input) -> str:
317     # TODO: Use input fingerprint for more useful identification.
318     salt = hash(input)
319     global _next_uid
320     new_uid = 'mdrun_{}_{}'.format(_next_uid, salt)
321     _next_uid += 1
322     return new_uid
323
324
325 #
326 # Implementation
327 #
328
329
330 class ResourceManager(gmxapi.operation.ResourceManager):
331     """Manage resources for the MDRun operation in the gmxapi.operation contexts.
332
333     Extends gmxapi.operation.ResourceManager to tolerate non-standard data payloads.
334     Futures managed by this resource manager may contain additional attributes.
335     """
336
337     def future(self, name: str, description: _op.ResultDescription):
338         tpr_future = super().future(name=name, description=description)
339         return tpr_future
340
341     def data(self) -> OutputDataProxy:
342         return OutputDataProxy(self)
343
344     def update_output(self):
345         """Override gmxapi.operation.ResourceManager.update_output because we handle paralellism as 0.0.7."""
346         # For the moment, this is copy-pasted from gmxapi.operation.ResourceManager,
347         # but the only part we need to override is the ensemble handling at `for i in range(self.ensemble_width)`
348         # TODO: Reimplement as the resource factory and director for the operation target context.
349         if not self.done():
350             self.__operation_entrance_counter += 1
351             if self.__operation_entrance_counter > 1:
352                 raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
353             if not self.done():
354                 # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
355                 #  operation's library/implementation context.
356
357                 ###
358                 # Note: this is the resource translation from gmxapi.operation context
359                 # to the dispatching runner director. It uses details of the gmxapi.operation.Context
360                 # and of the operation.
361
362                 # TODO: Dispatch/discover this resource factory from a canonical place.
363                 assert hasattr(self._runner_director, 'input_resource_factory')
364                 # Create on all ranks.
365                 input = self._runner_director.input_resource_factory(self)
366                 # End of action of the InputResourceDirector[Context, MdRunSubscription].
367                 ###
368
369                 # We are giving the director a resource that contains the subscription
370                 # to the dispatched work.
371
372                 publishing_resources = self.publishing_resources()
373                 for member in range(self.ensemble_width):
374                     with publishing_resources(ensemble_member=member) as output:
375                         resources = self._resource_factory(input=input, output=output)
376                         runner = self._runner_director(resources)
377                         runner.run()
378
379
380 class StandardInputDescription(_op.InputDescription):
381     """Provide the ReadTpr input description in gmxapi.operation Contexts."""
382
383     def signature(self) -> _op.InputCollectionDescription:
384         return _input
385
386     def make_uid(self, input: _op.DataEdge) -> str:
387         return _make_uid(input)
388
389
390 class RegisteredOperation(_op.OperationImplementation, metaclass=_op.OperationMeta):
391     """Provide the gmxapi compatible ReadTpr implementation."""
392
393     # This is a class method to allow the class object to be used in gmxapi.operation._make_registry_key
394     @classmethod
395     def name(self) -> str:
396         """Canonical name for the operation."""
397         return 'mdrun'
398
399     @classmethod
400     def namespace(self) -> str:
401         """read_tpr is importable from the gmxapi module."""
402         return 'gmxapi'
403
404     @classmethod
405     def director(cls, context: gmxapi.abc.Context) -> _op.OperationDirector:
406         if isinstance(context, _op.Context):
407             return StandardDirector(context)
408
409
410 class StandardOperationHandle(_op.AbstractOperation):
411     """Handle used in Python UI or gmxapi.operation Contexts."""
412
413     def __init__(self, resource_manager: ResourceManager):
414         self.__resource_manager = resource_manager
415
416     def run(self):
417         self.__resource_manager.update_output()
418
419     @property
420     def output(self) -> OutputDataProxy:
421         return self.__resource_manager.data()
422
423
424 class StandardDirector(gmxapi.abc.OperationDirector):
425     """Direct the instantiation of a read_tpr node in a gmxapi.operation Context.
426
427     .. todo:: Compose this behavior in a more generic class.
428
429     .. todo:: Describe where instances live.
430     """
431
432     def __init__(self, context: _op.Context):
433         if not isinstance(context, _op.Context):
434             raise gmxapi.exceptions.ValueError('StandardDirector requires a gmxapi.operation Context.')
435         self.context = context
436
437     def __call__(self, resources: _op.DataSourceCollection, label: str = None) -> StandardOperationHandle:
438         builder = self.context.node_builder(operation=RegisteredOperation, label=label)
439
440         builder.set_resource_factory(SubscriptionSessionResources)
441         builder.set_input_description(StandardInputDescription())
442         builder.set_handle(StandardOperationHandle)
443         # The runner in the gmxapi.operation context is the servicer for the legacy context.
444         builder.set_runner_director(SubscriptionPublishingRunner)
445         builder.set_output_factory(_output_factory)
446         builder.set_resource_manager(ResourceManager)
447
448         # Note: we have not yet done any dispatching based on *resources*. We should
449         # translate the resources provided into the form that the Context can subscribe to
450         # using the dispatching resource_factory. In the second draft, this operation
451         # is dispatched to a SimulationModuleContext, which can be subscribed directly
452         # to sources that are either literal filenames or gmxapi.simulation sources,
453         # while standard Futures can be resolved in the standard context.
454         #
455         assert isinstance(resources, _op.DataSourceCollection)
456         for name, source in resources.items():
457             builder.add_input(name, source)
458
459         handle = builder.build()
460         assert isinstance(handle, StandardOperationHandle)
461         return handle
462
463     def handle_type(self, context: gmxapi.abc.Context):
464         return StandardOperationHandle
465
466     # Developer note: the Director instance is a convenient place to get a dispatching
467     # factory. The Director may become generic or more universal, but the resource_factory
468     # would likely not be typed on the generic parameters of the Director class.
469     # Instead, it is likely a generic function with its own TypeVar parameters.
470     def resource_factory(self,
471                          source: typing.Union[gmxapi.abc.Context, ModuleObject, None],
472                          target: gmxapi.abc.Context = None):
473         # Distinguish between the UIContext, in which input is in the form
474         # of function call arguments, and the StandardContext, implemented in
475         # gmxapi.operation. UIContext is probably a virtual context that is
476         # asserted by callers in order to get a factory that normalizes UI input
477         # for the StandardContext.
478         #
479         if target is None:
480             target = self.context
481         if source is None:
482             if isinstance(target, _op.Context):
483                 # Return a factory that can bind to function call arguments to produce a DataSourceCollection.
484                 return _standard_node_resource_factory
485         if isinstance(source, _op.Context):
486             return SubscriptionSessionResources
487         if isinstance(source, ModuleObject):
488             if isinstance(target, _op.Context):
489                 # We are creating a node in gmxapi.operation.Context from another gmxapi.simulation operation.
490                 # This means that we want to subscribe to the subcontext instead of the gmxapi.operation.Context.
491                 # In the first draft, though, we just access a special payload.
492                 # Return a factory that will consume *_simulation_input* and *parameters*
493                 # members of a received object.
494                 logger.info('Building mdrun operation from source {}'.format(source))
495
496                 def simulation_input_workaround(input):
497                     source = input
498                     if hasattr(source, 'output'):
499                         source = input.output
500                     assert hasattr(source, '_simulation_input')
501                     assert hasattr(source, 'parameters')
502                     logger.info('mdrun receiving input {}: {}'.format(source._simulation_input.name,
503                                                                       source._simulation_input.description))
504                     source_collection = _input.bind(_simulation_input=source._simulation_input,
505                                                     parameters=source.parameters)
506                     logger.info('mdrun input bound as source collection {}'.format(source_collection))
507                     return source_collection
508
509                 return simulation_input_workaround
510
511         raise gmxapi.exceptions.ValueError('No dispatching from {} context to {}'.format(source, target))
512
513
514 def mdrun(input, label: str = None, context=None):
515     """MD simulation operation.
516
517     Arguments:
518         input : valid simulation input
519
520     Returns:
521         runnable operation to perform the specified simulation
522
523     The *output* attribute of the returned operation handle contains dynamically
524     determined outputs from the operation.
525
526     `input` may be a TPR file name or a an object providing the SimulationInput interface.
527
528     Note:
529         New function names will be appearing to handle tasks that are separate
530
531         "simulate" is plausibly a dispatcher or base class for various tasks
532         dispatched by mdrun. Specific work factories are likely "minimize,"
533         "test_particle_insertion," "legacy_simulation" (do_md), or "simulation"
534         composition (which may be leap-frog, vv, and other algorithms)
535     """
536     handle_context = context
537     if handle_context is not None:
538         raise gmxapi.exceptions.NotImplementedError(
539             'context must be None. This factory is only for the Python UI right now.')
540
541     target_context = _op.current_context()
542     assert isinstance(target_context, _op.Context)
543     # Get a director that will create a node in the standard context.
544     node_director = _op._get_operation_director(RegisteredOperation, context=target_context)
545     assert isinstance(node_director, StandardDirector)
546     # TODO: refine this protocol
547     assert handle_context is None
548     # Examine the input.
549     if isinstance(input, ModuleObject):
550         # The input is from read_tpr or modify_input.
551         source_context = input
552     else:
553         # Allow automatic dispatching
554         source_context = None
555
556     resource_factory = node_director.resource_factory(source=source_context, target=target_context)
557     resources = resource_factory(input)
558     handle = node_director(resources=resources, label=label)
559     # Note: One effect of the assertions above is to help the type checker infer
560     # the return type of the handle. It is hard to convince the type checker that
561     # the return value of the node builder is up-cast. We might be able to resolve
562     # this by making both get_operation_director and ReadTprImplementation
563     # generics of the handle type, or using the handle type as the key for
564     # get_operation_director.
565     return handle