Merge branch release-2019 into master
[alexxy/gromacs.git] / python_packaging / src / gmxapi / simulation / mdrun.py
1 #
2 # This file is part of the GROMACS molecular simulation package.
3 #
4 # Copyright (c) 2019, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
8 #
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
13 #
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 # Lesser General Public License for more details.
18 #
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
23 #
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
31 #
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
34
35 """mdrun operation module
36
37 The mdrun operation (in its first draft) conforms to the user-level API, but does
38 not use the Python Context resource manager. It uses either the legacy 0.0.7
39 Context or its own Context, also implemented in this module.
40 """
41
42 __all__ = ['mdrun']
43
44 import inspect
45 import os
46 import typing
47 from contextlib import contextmanager
48
49 import gmxapi
50 import gmxapi.abc
51 import gmxapi.operation as _op
52 from gmxapi import exceptions
53
54 # The following imports are not marked as public API.
55 from .abc import ModuleObject
56
57 from . import fileio
58 from . import workflow
59
60 # Initialize module-level logger
61 from gmxapi import logger as root_logger
62
63 logger = root_logger.getChild('mdrun')
64 logger.info('Importing {}'.format(__name__))
65
66
67 class OutputDataProxy(_op.DataProxyBase,
68                       descriptors={
69                           '_work_dir': _op.OutputDataDescriptor('_work_dir', str)}):
70     """Implement the 'output' attribute of MDRun operations."""
71
72
73 class Publisher(gmxapi.operation.Publisher):
74     """Implement the publishing data descriptor for MDRun output."""
75
76     def __set__(self, instance: 'PublishingDataProxy', value):
77         super().__set__(instance, value)
78
79
80 class PublishingDataProxy(_op.DataProxyBase,
81                           descriptors={
82                               '_work_dir': Publisher('_work_dir', str)}
83                           ):
84     """Manage output resource updates for MDRun operation."""
85
86
87 #
88 # Helpers
89 #
90
91 # Output in the gmxapi.operation Context.
92 _output = _op.OutputCollectionDescription(_work_dir=str)
93
94 # Input in the gmxapi.operation Context for the dispatching runner.
95 # The default empty dictionary for parameters just means that there are no overrides
96 # to the parameters already provided in _simulation_input.
97 _input = _op.InputCollectionDescription(
98     [('_simulation_input', inspect.Parameter('_simulation_input',
99                                              inspect.Parameter.POSITIONAL_OR_KEYWORD,
100                                              annotation=str)),
101      ('parameters', inspect.Parameter('parameters',
102                                       inspect.Parameter.POSITIONAL_OR_KEYWORD,
103                                       annotation=dict,
104                                       default=dict()))
105      ])
106
107
108 def _standard_node_resource_factory(*args, **kwargs):
109     """Translate Python UI input to the gmxapi.operation node builder inputs."""
110     source_collection = _input.bind(*args, **kwargs)
111     logger.info('mdrun input bound as source collection {}'.format(source_collection))
112     return source_collection
113
114
115 _output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
116                                     output_description=_output,
117                                     publishing_data_proxy=PublishingDataProxy)
118
119
120 @contextmanager
121 def scoped_communicator(original_comm, requested_size: int = None):
122     from gmxapi.simulation.context import _acquire_communicator, _get_ensemble_communicator
123
124     if requested_size is None:
125         communicator = _acquire_communicator(communicator=original_comm)
126
127     else:
128         if original_comm is None or not hasattr(original_comm, 'Get_size'):
129             raise exceptions.UsageError('A valid communicator must be provided when requesting a specific size.')
130         original_comm_size = original_comm.Get_size()
131         if original_comm_size < requested_size:
132             raise exceptions.FeatureNotAvailableError(
133                 'Cannot produce a subcommunicator of size {} from a communicator of size {}.'.format(
134                     requested_size,
135                     original_comm_size
136                 ))
137         assert original_comm_size >= requested_size
138         communicator = _get_ensemble_communicator(original_comm, requested_size)
139
140     try:
141         yield communicator
142     finally:
143         communicator.Free()
144
145
146 class LegacyImplementationSubscription(object):
147     """Input type representing a subscription to 0.0.7 implementation in gmxapi.operation.
148
149     This input resource is a subscription to work that is dispatched to a sub-context.
150     The resource can be created from the standard data of the simulation module.
151     """
152
153     def __init__(self, resource_manager: _op.ResourceManager):
154         from .context import Context as LegacyContext
155         import gmxapi._gmxapi as _gmxapi
156         self._gmxapi = _gmxapi
157
158         assert isinstance(resource_manager, _op.ResourceManager)
159         # We use several details of the gmxapi.operation.Context.ResourceManager.
160         # These dependencies can evolve into the subscription protocol between Contexts.
161
162         # Configure and run a gmxapi 0.0.7 session.
163         # 0. Determine ensemble width.
164         # 1. Choose, create/check working directories.
165         # 2. Create source TPR.
166         # 3. Create workspec.
167         # 3.5 Add plugin potentials, if any.
168         # 4. Run.
169         # 5. Collect outputs from context (including plugin outputs) and be ready to publish them.
170
171         # Determine ensemble width
172         ensemble_width = resource_manager.ensemble_width
173
174         # Choose working directories
175         # TODO: operation working directory naming scheme should be centrally well-defined.
176         # Note that workflow.WorkSpec.uid is currently dependent on the input file parameter,
177         # so we cannot create the input file path in the working directory based on WorkSpec.uid.
178         workdir_list = ['{node}_{member}'.format(node=resource_manager.operation_id,
179                                                  member=member)
180                         for member in range(ensemble_width)]
181
182         # This is a reasonable place to start using MPI ensemble implementation details.
183         # We will want better abstraction in the future, but it is best if related filesystem
184         # accesses occur from the same processes, consistently. Note that we already
185         # handle some optimization and dependency reduction when the ensemble size is 1.
186         # TODO: multithread and multiprocess alternatives to MPI ensemble management.
187
188         # TODO: Allow user to provide communicator instead of implicitly getting COMM_WORLD
189         with scoped_communicator(None) as context_comm:
190             context_rank = context_comm.Get_rank()
191             with scoped_communicator(context_comm, ensemble_width) as ensemble_comm:
192                 # Note that in the current implementation, extra ranks have nothing to do,
193                 # but they may have a dummy communicator, so be sure to skip those members
194                 # of the context_comm.
195                 if context_rank < ensemble_width:
196                     assert ensemble_comm.Get_size() == ensemble_width
197                     ensemble_rank = ensemble_comm.Get_rank()
198                     # TODO: This should be a richer object that includes at least host information
199                     #  and preferably the full gmxapi Future interface.
200                     self.workdir = os.path.abspath(workdir_list[ensemble_rank])
201
202                     with resource_manager.local_input(member=ensemble_rank) as input_pack:
203                         source_file = input_pack.kwargs['_simulation_input']
204                         parameters = input_pack.kwargs['parameters']
205                         # If there are any other key word arguments to process from the gmxapi.mdrun
206                         # factory call, do it here.
207
208                     # TODO: We should really name this file with a useful input-dependent tag.
209                     tprfile = os.path.join(self.workdir, 'topol.tpr')
210
211                     expected_working_files = [tprfile]
212
213                     if os.path.exists(self.workdir):
214                         if os.path.isdir(self.workdir):
215                             # Confirm that this is a restarted simulation.
216                             # It is unspecified by the API, but at least through gmxapi 0.1,
217                             # all simulations are initialized with a checkpoint file named state.cpt
218                             # (see src/api/cpp/context.cpp)
219                             checkpoint_file = os.path.join(self.workdir, 'state.cpp')
220                             expected_working_files.append(checkpoint_file)
221
222                             for file in expected_working_files:
223                                 if not os.path.exists(file):
224                                     raise exceptions.ApiError(
225                                         'Cannot determine working directory state: {}'.format(self.workdir))
226                         else:
227                             raise exceptions.ApiError(
228                                 'Chosen working directory path exists but is not a directory: {}'.format(self.workdir))
229                     else:
230                         # Build the working directory and input files.
231                         os.mkdir(self.workdir)
232                         sim_input = fileio.read_tpr(source_file)
233                         for key, value in parameters.items():
234                             try:
235                                 sim_input.parameters.set(key=key, value=value)
236                             except _gmxapi.Exception as e:
237                                 raise exceptions.ApiError(
238                                     'Bug encountered. Unknown error when trying to set simulation '
239                                     'parameter {} to {}'.format(key, value)
240                                 ) from e
241
242                         fileio.write_tpr_file(output=tprfile, input=sim_input)
243                     logger.info('Created {} on rank {}'.format(tprfile, context_rank))
244
245                     # Gather the actual working directories from the ensemble members.
246                     if hasattr(ensemble_comm, 'allgather'):
247                         # We should not assume that abspath expands the same on different MPI ranks.
248                         workdir_list = ensemble_comm.allgather(self.workdir)
249                         tpr_filenames = ensemble_comm.allgather(tprfile)
250                     else:
251                         workdir_list = [os.path.abspath(workdir) for workdir in workdir_list]
252                         # TODO: If we use better input file names, they need to be updated in multiple places.
253                         tpr_filenames = [os.path.join(workdir, 'topol.tpr') for workdir in workdir_list]
254
255                     logger.debug('Context rank {} acknowledges working directories {}'.format(context_rank,
256                                                                                              workdir_list))
257                     logger.debug('Operation {}:{} will use {}'.format(resource_manager.operation_id,
258                                                                       ensemble_rank,
259                                                                       self.workdir
260                                                                       ))
261                     work = workflow.from_tpr(tpr_filenames)
262                     self.workspec = work.workspec
263                     context = LegacyContext(work=self.workspec, workdir_list=workdir_list, communicator=ensemble_comm)
264                     self.simulation_module_context = context
265                     # Go ahead and execute immediately. No need for lazy initialization in this basic case.
266                     with self.simulation_module_context as session:
267                         session.run()
268                         # TODO: There may be some additional results that we need to extract...
269                     # end: if context_rank < ensemble_width
270
271                 # end scoped_communicator: ensemble_comm
272
273             if context_comm.Get_size() > 1:
274                 context_comm.bcast(workdir_list, root=0)
275             # end scoped_communicator: context_comm
276
277         self.workdir = workdir_list
278
279
280 class SubscriptionSessionResources(object):
281     """Input and output run-time resources for a MDRun subscription.
282
283     A generalization of this class is the probably the main hook for customizing the resources
284     provided to the operation at run time.
285
286     .. todo:: Better factoring of SessionResources, ResourceFactory, Director.resource_factory.
287     """
288
289     def __init__(self, input: LegacyImplementationSubscription, output: PublishingDataProxy):
290         assert isinstance(input, LegacyImplementationSubscription)
291         assert isinstance(output, PublishingDataProxy)
292         self.output = output
293         member_id = self.output._client_identifier
294         # Before enabling the following, be sure we understand what is happening.
295         # if member_id is None:
296         #     member_id = 0
297         self.workdir = input.workdir[member_id]
298
299     # @staticmethod
300     # def create(input: LegacyImplementationSubscription,
301     #  output: PublishingDataProxy) -> 'SubscriptionSessionResources':
302     #     """Factory function to create new resource instances.
303     #
304     #     It is better to overload a factory member than to write a fancy __init__.
305     #     An alternative to overloading is to make this a classmethod for polymorphic
306     #     creation. Note that Python classes (can) know what their subclasses are.
307     #     """
308     #     if isinstance(input, LegacyImplementationSubscription):
309     #         return _session_resource_factory(input, output)
310
311
312 class SubscriptionPublishingRunner(object):
313     """Handle execution in the gmxapi.operation context as a subscription to the gmxapi.simulation.context."""
314     input_resource_factory = LegacyImplementationSubscription
315
316     def __init__(self, resources: SubscriptionSessionResources):
317         assert isinstance(resources, SubscriptionSessionResources)
318         # Note that the resources contain a reference to a simulation ensemble that has already run.
319         self.resources = resources
320
321     def run(self):
322         """Operation implementation in the gmxapi.operation module context."""
323         publisher = self.resources.output
324         publisher._work_dir = self.resources.workdir
325
326
327 _next_uid = 0
328
329
330 def _make_uid(input) -> str:
331     # TODO: Use input fingerprint for more useful identification.
332     salt = hash(input)
333     global _next_uid
334     new_uid = 'mdrun_{}_{}'.format(_next_uid, salt)
335     _next_uid += 1
336     return new_uid
337
338
339 #
340 # Implementation
341 #
342
343
344 class ResourceManager(gmxapi.operation.ResourceManager):
345     """Manage resources for the MDRun operation in the gmxapi.operation contexts.
346
347     Extends gmxapi.operation.ResourceManager to tolerate non-standard data payloads.
348     Futures managed by this resource manager may contain additional attributes.
349     """
350
351     def future(self, name: str, description: _op.ResultDescription):
352         tpr_future = super().future(name=name, description=description)
353         return tpr_future
354
355     def data(self) -> OutputDataProxy:
356         return OutputDataProxy(self)
357
358     def update_output(self):
359         """Override gmxapi.operation.ResourceManager.update_output because we handle paralellism as 0.0.7."""
360         # For the moment, this is copy-pasted from gmxapi.operation.ResourceManager,
361         # but the only part we need to override is the ensemble handling at `for i in range(self.ensemble_width)`
362         # TODO: Reimplement as the resource factory and director for the operation target context.
363         if not self.done():
364             self.__operation_entrance_counter += 1
365             if self.__operation_entrance_counter > 1:
366                 raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
367             if not self.done():
368                 # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
369                 #  operation's library/implementation context.
370
371                 ###
372                 # Note: this is the resource translation from gmxapi.operation context
373                 # to the dispatching runner director. It uses details of the gmxapi.operation.Context
374                 # and of the operation.
375
376                 # TODO: Dispatch/discover this resource factory from a canonical place.
377                 assert hasattr(self._runner_director, 'input_resource_factory')
378                 # Create on all ranks.
379                 input = self._runner_director.input_resource_factory(self)
380                 # End of action of the InputResourceDirector[Context, MdRunSubscription].
381                 ###
382
383                 # We are giving the director a resource that contains the subscription
384                 # to the dispatched work.
385
386                 publishing_resources = self.publishing_resources()
387                 for member in range(self.ensemble_width):
388                     with publishing_resources(ensemble_member=member) as output:
389                         resources = self._resource_factory(input=input, output=output)
390                         runner = self._runner_director(resources)
391                         runner.run()
392
393
394 class StandardInputDescription(_op.InputDescription):
395     """Provide the ReadTpr input description in gmxapi.operation Contexts."""
396
397     def signature(self) -> _op.InputCollectionDescription:
398         return _input
399
400     def make_uid(self, input: _op.DataEdge) -> str:
401         return _make_uid(input)
402
403
404 class RegisteredOperation(_op.OperationImplementation, metaclass=_op.OperationMeta):
405     """Provide the gmxapi compatible ReadTpr implementation."""
406
407     # This is a class method to allow the class object to be used in gmxapi.operation._make_registry_key
408     @classmethod
409     def name(self) -> str:
410         """Canonical name for the operation."""
411         return 'mdrun'
412
413     @classmethod
414     def namespace(self) -> str:
415         """read_tpr is importable from the gmxapi module."""
416         return 'gmxapi'
417
418     @classmethod
419     def director(cls, context: gmxapi.abc.Context) -> _op.OperationDirector:
420         if isinstance(context, _op.Context):
421             return StandardDirector(context)
422
423
424 class StandardOperationHandle(_op.AbstractOperation):
425     """Handle used in Python UI or gmxapi.operation Contexts."""
426
427     def __init__(self, resource_manager: ResourceManager):
428         self.__resource_manager = resource_manager
429
430     def run(self):
431         self.__resource_manager.update_output()
432
433     @property
434     def output(self) -> OutputDataProxy:
435         return self.__resource_manager.data()
436
437
438 class StandardDirector(gmxapi.abc.OperationDirector):
439     """Direct the instantiation of a read_tpr node in a gmxapi.operation Context.
440
441     .. todo:: Compose this behavior in a more generic class.
442
443     .. todo:: Describe where instances live.
444     """
445
446     def __init__(self, context: _op.Context):
447         if not isinstance(context, _op.Context):
448             raise gmxapi.exceptions.ValueError('StandardDirector requires a gmxapi.operation Context.')
449         self.context = context
450
451     def __call__(self, resources: _op.DataSourceCollection, label: str = None) -> StandardOperationHandle:
452         builder = self.context.node_builder(operation=RegisteredOperation, label=label)
453
454         builder.set_resource_factory(SubscriptionSessionResources)
455         builder.set_input_description(StandardInputDescription())
456         builder.set_handle(StandardOperationHandle)
457         # The runner in the gmxapi.operation context is the servicer for the legacy context.
458         builder.set_runner_director(SubscriptionPublishingRunner)
459         builder.set_output_factory(_output_factory)
460         builder.set_resource_manager(ResourceManager)
461
462         # Note: we have not yet done any dispatching based on *resources*. We should
463         # translate the resources provided into the form that the Context can subscribe to
464         # using the dispatching resource_factory. In the second draft, this operation
465         # is dispatched to a SimulationModuleContext, which can be subscribed directly
466         # to sources that are either literal filenames or gmxapi.simulation sources,
467         # while standard Futures can be resolved in the standard context.
468         #
469         assert isinstance(resources, _op.DataSourceCollection)
470         for name, source in resources.items():
471             builder.add_input(name, source)
472
473         handle = builder.build()
474         assert isinstance(handle, StandardOperationHandle)
475         return handle
476
477     def handle_type(self, context: gmxapi.abc.Context):
478         return StandardOperationHandle
479
480     # Developer note: the Director instance is a convenient place to get a dispatching
481     # factory. The Director may become generic or more universal, but the resource_factory
482     # would likely not be typed on the generic parameters of the Director class.
483     # Instead, it is likely a generic function with its own TypeVar parameters.
484     def resource_factory(self,
485                          source: typing.Union[gmxapi.abc.Context, ModuleObject, None],
486                          target: gmxapi.abc.Context = None):
487         # Distinguish between the UIContext, in which input is in the form
488         # of function call arguments, and the StandardContext, implemented in
489         # gmxapi.operation. UIContext is probably a virtual context that is
490         # asserted by callers in order to get a factory that normalizes UI input
491         # for the StandardContext.
492         #
493         if target is None:
494             target = self.context
495         if source is None:
496             if isinstance(target, _op.Context):
497                 # Return a factory that can bind to function call arguments to produce a DataSourceCollection.
498                 return _standard_node_resource_factory
499         if isinstance(source, _op.Context):
500             return SubscriptionSessionResources
501         if isinstance(source, ModuleObject):
502             if isinstance(target, _op.Context):
503                 # We are creating a node in gmxapi.operation.Context from another gmxapi.simulation operation.
504                 # This means that we want to subscribe to the subcontext instead of the gmxapi.operation.Context.
505                 # In the first draft, though, we just access a special payload.
506                 # Return a factory that will consume *_simulation_input* and *parameters*
507                 # members of a received object.
508                 logger.info('Building mdrun operation from source {}'.format(source))
509
510                 def simulation_input_workaround(input):
511                     source = input
512                     if hasattr(source, 'output'):
513                         source = input.output
514                     assert hasattr(source, '_simulation_input')
515                     assert hasattr(source, 'parameters')
516                     logger.info('mdrun receiving input {}: {}'.format(source._simulation_input.name,
517                                                                       source._simulation_input.description))
518                     source_collection = _input.bind(_simulation_input=source._simulation_input,
519                                                     parameters=source.parameters)
520                     logger.info('mdrun input bound as source collection {}'.format(source_collection))
521                     return source_collection
522
523                 return simulation_input_workaround
524
525         raise gmxapi.exceptions.ValueError('No dispatching from {} context to {}'.format(source, target))
526
527
528 def mdrun(input, label: str = None, context=None):
529     """MD simulation operation.
530
531     Arguments:
532         input : valid simulation input
533
534     Returns:
535         runnable operation to perform the specified simulation
536
537     The *output* attribute of the returned operation handle contains dynamically
538     determined outputs from the operation.
539
540     `input` may be a TPR file name or a an object providing the SimulationInput interface.
541
542     Note:
543         New function names will be appearing to handle tasks that are separate
544
545         "simulate" is plausibly a dispatcher or base class for various tasks
546         dispatched by mdrun. Specific work factories are likely "minimize,"
547         "test_particle_insertion," "legacy_simulation" (do_md), or "simulation"
548         composition (which may be leap-frog, vv, and other algorithms)
549     """
550     handle_context = context
551     if handle_context is not None:
552         raise gmxapi.exceptions.NotImplementedError(
553             'context must be None. This factory is only for the Python UI right now.')
554
555     target_context = _op.current_context()
556     assert isinstance(target_context, _op.Context)
557     # Get a director that will create a node in the standard context.
558     node_director = _op._get_operation_director(RegisteredOperation, context=target_context)
559     assert isinstance(node_director, StandardDirector)
560     # TODO: refine this protocol
561     assert handle_context is None
562     # Examine the input.
563     if isinstance(input, ModuleObject):
564         # The input is from read_tpr or modify_input.
565         source_context = input
566     else:
567         # Allow automatic dispatching
568         source_context = None
569
570     resource_factory = node_director.resource_factory(source=source_context, target=target_context)
571     resources = resource_factory(input)
572     handle = node_director(resources=resources, label=label)
573     # Note: One effect of the assertions above is to help the type checker infer
574     # the return type of the handle. It is hard to convince the type checker that
575     # the return value of the node builder is up-cast. We might be able to resolve
576     # this by making both get_operation_director and ReadTprImplementation
577     # generics of the handle type, or using the handle type as the key for
578     # get_operation_director.
579     return handle