2 # This file is part of the GROMACS molecular simulation package.
4 # Copyright (c) 2019,2020,2021, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 # Lesser General Public License for more details.
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
35 """mdrun operation module
37 The mdrun operation (in its first draft) conforms to the user-level API, but does
38 not use the Python Context resource manager. It uses either the legacy 0.0.7
39 Context or its own Context, also implemented in this module.
47 from contextlib import contextmanager
51 import gmxapi.operation as _op
52 from gmxapi import exceptions
54 # The following imports are not marked as public API.
55 from .abc import ModuleObject
58 from . import workflow
60 # Initialize module-level logger
61 from gmxapi import logger as root_logger
63 logger = root_logger.getChild('mdrun')
64 logger.info('Importing {}'.format(__name__))
67 # Output in the gmxapi.operation Context.
68 # TODO: Consider using a single base class for the DataProxy, but have distinct
69 # data descriptor behavior (or different descriptor implementations in different
70 # subclasses) so that static code inspection can more easily determine the
71 # attributes of the data proxies.
72 _output_descriptors = (
73 _op.OutputDataDescriptor('_work_dir', str),
74 _op.OutputDataDescriptor('trajectory', str),
75 _op.OutputDataDescriptor('parameters', dict)
77 _publishing_descriptors = {desc._name: gmxapi.operation.Publisher(desc._name, desc._dtype) for desc in
79 _output = _op.OutputCollectionDescription(**{descriptor._name: descriptor._dtype for descriptor in
83 class OutputDataProxy(_op.DataProxyBase,
84 descriptors=_output_descriptors):
85 """Implement the 'output' attribute of MDRun operations."""
88 class PublishingDataProxy(_op.DataProxyBase,
89 descriptors=_publishing_descriptors):
90 """Manage output resource updates for MDRun operation."""
93 _output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
94 output_description=_output,
95 publishing_data_proxy=PublishingDataProxy)
98 # Input in the gmxapi.operation Context for the dispatching runner.
99 # The default empty dictionary for parameters just means that there are no overrides
100 # to the parameters already provided in _simulation_input.
101 _input = _op.InputCollectionDescription(
102 [('_simulation_input', inspect.Parameter('_simulation_input',
103 inspect.Parameter.POSITIONAL_OR_KEYWORD,
105 ('parameters', inspect.Parameter('parameters',
106 inspect.Parameter.POSITIONAL_OR_KEYWORD,
112 def _standard_node_resource_factory(*args, **kwargs):
113 """Translate Python UI input to the gmxapi.operation node builder inputs."""
114 source_collection = _input.bind(*args, **kwargs)
115 logger.info('mdrun input bound as source collection {}'.format(source_collection))
116 return source_collection
120 def scoped_communicator(original_comm, requested_size: int = None):
121 from gmxapi.simulation.context import _acquire_communicator, _get_ensemble_communicator
123 if requested_size is None:
124 communicator = _acquire_communicator(communicator=original_comm)
127 if original_comm is None or not hasattr(original_comm, 'Get_size'):
128 raise exceptions.UsageError('A valid communicator must be provided when requesting a specific size.')
129 original_comm_size = original_comm.Get_size()
130 if original_comm_size < requested_size:
131 raise exceptions.FeatureNotAvailableError(
132 'Cannot produce a subcommunicator of size {} from a communicator of size {}.'.format(
136 assert original_comm_size >= requested_size
137 communicator = _get_ensemble_communicator(original_comm, requested_size)
145 class LegacyImplementationSubscription(object):
146 """Input type representing a subscription to 0.0.7 implementation in gmxapi.operation.
148 This input resource is a subscription to work that is dispatched to a sub-context.
149 The resource can be created from the standard data of the simulation module.
152 def __init__(self, resource_manager: _op.ResourceManager):
153 from .context import Context as LegacyContext
154 import gmxapi._gmxapi as _gmxapi
155 self._gmxapi = _gmxapi
157 assert isinstance(resource_manager, _op.ResourceManager)
158 # We use several details of the gmxapi.operation.Context.ResourceManager.
159 # These dependencies can evolve into the subscription protocol between Contexts.
161 # Configure and run a gmxapi 0.0.7 session.
162 # 0. Determine ensemble width.
163 # 1. Choose, create/check working directories.
164 # 2. Create source TPR.
165 # 3. Create workspec.
166 # 3.5 Add plugin potentials, if any.
168 # 5. Collect outputs from context (including plugin outputs) and be ready to publish them.
170 # Determine ensemble width
171 ensemble_width = resource_manager.ensemble_width
173 # Choose working directories
174 # TODO: operation working directory naming scheme should be centrally well-defined.
175 # Note that workflow.WorkSpec.uid is currently dependent on the input file parameter,
176 # so we cannot create the input file path in the working directory based on WorkSpec.uid.
177 workdir_list = ['{node}_{member}'.format(node=resource_manager.operation_id,
179 for member in range(ensemble_width)]
180 parameters_dict_list = [{}] * ensemble_width
182 # This is a reasonable place to start using MPI ensemble implementation details.
183 # We will want better abstraction in the future, but it is best if related filesystem
184 # accesses occur from the same processes, consistently. Note that we already
185 # handle some optimization and dependency reduction when the ensemble size is 1.
186 # TODO: multithread and multiprocess alternatives to MPI ensemble management.
188 # TODO: Allow user to provide communicator instead of implicitly getting COMM_WORLD
189 with scoped_communicator(None) as context_comm:
190 context_rank = context_comm.Get_rank()
191 with scoped_communicator(context_comm, ensemble_width) as ensemble_comm:
192 # Note that in the current implementation, extra ranks have nothing to do,
193 # but they may have a dummy communicator, so be sure to skip those members
194 # of the context_comm.
195 if context_rank < ensemble_width:
196 assert ensemble_comm.Get_size() == ensemble_width
197 ensemble_rank = ensemble_comm.Get_rank()
198 # TODO: This should be a richer object that includes at least host information
199 # and preferably the full gmxapi Future interface.
200 self.workdir = os.path.abspath(workdir_list[ensemble_rank])
202 with resource_manager.local_input(member=ensemble_rank) as input_pack:
203 source_file = input_pack.kwargs['_simulation_input']
204 parameters = input_pack.kwargs['parameters']
205 # If there are any other key word arguments to process from the gmxapi.mdrun
206 # factory call, do it here.
208 # TODO: We should really name this file with a useful input-dependent tag.
209 tprfile = os.path.join(self.workdir, 'topol.tpr')
211 expected_working_files = [tprfile]
213 if os.path.exists(self.workdir):
214 if os.path.isdir(self.workdir):
215 # Confirm that this is a restarted simulation.
216 # It is unspecified by the API, but at least through gmxapi 0.1,
217 # all simulations are initialized with a checkpoint file named state.cpt
218 # (see src/api/cpp/context.cpp)
219 checkpoint_file = os.path.join(self.workdir, 'state.cpt')
220 expected_working_files.append(checkpoint_file)
222 for file in expected_working_files:
223 if not os.path.exists(file):
224 raise exceptions.ApiError(
225 'Cannot determine working directory state: {}'.format(self.workdir))
227 raise exceptions.ApiError(
228 'Chosen working directory path exists but is not a directory: {}'.format(self.workdir))
230 # Build the working directory and input files.
231 os.mkdir(self.workdir)
232 sim_input = fileio.read_tpr(source_file)
233 for key, value in parameters.items():
235 sim_input.parameters.set(key=key, value=value)
236 except _gmxapi.Exception as e:
237 raise exceptions.ApiError(
238 'Bug encountered. Unknown error when trying to set simulation '
239 'parameter {} to {}'.format(key, value)
242 fileio.write_tpr_file(output=tprfile, input=sim_input)
243 logger.info('Created {} on rank {}'.format(tprfile, context_rank))
245 # Gather the actual outputs from the ensemble members.
246 if hasattr(ensemble_comm, 'allgather'):
247 # We should not assume that abspath expands the same on different MPI ranks.
248 workdir_list = ensemble_comm.allgather(self.workdir)
249 tpr_filenames = ensemble_comm.allgather(tprfile)
250 parameters = fileio.read_tpr(tprfile).parameters.extract()
251 parameters_dict_list = ensemble_comm.allgather(parameters)
253 workdir_list = [os.path.abspath(workdir) for workdir in workdir_list]
254 # TODO: If we use better input file names, they need to be updated in multiple places.
255 tpr_filenames = [os.path.join(workdir, 'topol.tpr') for workdir in workdir_list]
256 parameters_dict_list = [fileio.read_tpr(tprfile).parameters.extract() for tprfile in tpr_filenames]
258 logger.debug('Context rank {} acknowledges working directories {}'.format(context_rank,
260 logger.debug('Operation {}:{} will use {}'.format(resource_manager.operation_id,
264 # TODO: (#3718) Normalize the way we pass run time parameters to mdrun.
265 kwargs = getattr(resource_manager, 'mdrun_kwargs', {})
266 for key, value in kwargs.items():
267 logger.debug('Adding mdrun run time argument: {}'.format(key + '=' + str(value)))
268 work = workflow.from_tpr(tpr_filenames, **kwargs)
269 self.workspec = work.workspec
270 context = LegacyContext(work=self.workspec, workdir_list=workdir_list, communicator=ensemble_comm)
271 self.simulation_module_context = context
272 # Go ahead and execute immediately. No need for lazy initialization in this basic case.
273 with self.simulation_module_context as session:
275 # TODO: There may be some additional results that we need to extract...
276 # end: if context_rank < ensemble_width
278 # end scoped_communicator: ensemble_comm
280 if context_comm.Get_size() > 1:
281 context_comm.bcast(workdir_list, root=0)
282 # end scoped_communicator: context_comm
284 self.workdir = workdir_list
285 self.parameters = parameters_dict_list
288 class SubscriptionSessionResources(object):
289 """Input and output run-time resources for a MDRun subscription.
291 A generalization of this class is the probably the main hook for customizing the resources
292 provided to the operation at run time.
294 .. todo:: Better factoring of SessionResources, ResourceFactory, Director.resource_factory.
297 def __init__(self, input: LegacyImplementationSubscription, output: PublishingDataProxy):
298 assert isinstance(input, LegacyImplementationSubscription)
299 assert isinstance(output, PublishingDataProxy)
301 member_id = self.output._client_identifier
302 # Before enabling the following, be sure we understand what is happening.
303 # if member_id is None:
305 self.workdir = input.workdir[member_id]
306 self.parameters = input.parameters[member_id]
309 class SubscriptionPublishingRunner(object):
310 """Handle execution in the gmxapi.operation context as a subscription to the gmxapi.simulation.context."""
311 input_resource_factory = LegacyImplementationSubscription
313 def __init__(self, resources: SubscriptionSessionResources):
314 assert isinstance(resources, SubscriptionSessionResources)
315 # Note that the resources contain a reference to a simulation ensemble that has already run.
316 self.resources = resources
319 """Operation implementation in the gmxapi.operation module context."""
320 publisher = self.resources.output
321 publisher._work_dir = self.resources.workdir
322 publisher.parameters = self.resources.parameters
323 # TODO: Make the return value a trajectory handle rather than a file path.
324 # TODO: Decide how to handle append vs. noappend output.
325 # TODO: More rigorous handling of the trajectory file(s)
326 # We have no way to query the name of the trajectory produced, and we
327 # have avoided exposing the ability to specify it, so we have to assume
328 # GROMACS default behavior.
329 publisher.trajectory = os.path.join(self.resources.workdir, 'traj.trr')
335 def _make_uid(input) -> str:
336 # TODO: Use input fingerprint for more useful identification.
339 new_uid = 'mdrun_{}_{}'.format(_next_uid, salt)
349 class ResourceManager(gmxapi.operation.ResourceManager):
350 """Manage resources for the MDRun operation in the gmxapi.operation contexts.
352 Extends gmxapi.operation.ResourceManager to tolerate non-standard data payloads.
353 Futures managed by this resource manager may contain additional attributes.
356 def future(self, name: str, description: _op.ResultDescription):
357 tpr_future = super().future(name=name, description=description)
360 def data(self) -> OutputDataProxy:
361 return OutputDataProxy(self)
363 def update_output(self):
364 """Override gmxapi.operation.ResourceManager.update_output because we handle paralellism as 0.0.7."""
365 # For the moment, this is copy-pasted from gmxapi.operation.ResourceManager,
366 # but the only part we need to override is the ensemble handling at `for i in range(self.ensemble_width)`
367 # TODO: Reimplement as the resource factory and director for the operation target context.
369 self.__operation_entrance_counter += 1
370 if self.__operation_entrance_counter > 1:
371 raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
373 # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
374 # operation's library/implementation context.
377 # Note: this is the resource translation from gmxapi.operation context
378 # to the dispatching runner director. It uses details of the gmxapi.operation.Context
379 # and of the operation.
381 # TODO: Dispatch/discover this resource factory from a canonical place.
382 assert hasattr(self._runner_director, 'input_resource_factory')
383 # Create on all ranks.
384 input = self._runner_director.input_resource_factory(self)
385 # End of action of the InputResourceDirector[Context, MdRunSubscription].
388 # We are giving the director a resource that contains the subscription
389 # to the dispatched work.
391 publishing_resources = self.publishing_resources()
392 for member in range(self.ensemble_width):
393 with publishing_resources(ensemble_member=member) as output:
394 resources = self._resource_factory(input=input, output=output)
395 runner = self._runner_director(resources)
399 class StandardInputDescription(_op.InputDescription):
400 """Provide the ReadTpr input description in gmxapi.operation Contexts."""
402 def signature(self) -> _op.InputCollectionDescription:
405 def make_uid(self, input: _op.DataEdge) -> str:
406 return _make_uid(input)
409 class RegisteredOperation(_op.OperationImplementation, metaclass=_op.OperationMeta):
410 """Provide the gmxapi compatible ReadTpr implementation."""
412 # This is a class method to allow the class object to be used in gmxapi.operation._make_registry_key
414 def name(self) -> str:
415 """Canonical name for the operation."""
419 def namespace(self) -> str:
420 """read_tpr is importable from the gmxapi module."""
424 def director(cls, context: gmxapi.abc.Context) -> _op.OperationDirector:
425 if isinstance(context, _op.Context):
426 return StandardDirector(context)
429 class StandardOperationHandle(_op.AbstractOperation):
430 """Handle used in Python UI or gmxapi.operation Contexts."""
432 def __init__(self, resource_manager: ResourceManager):
433 self.__resource_manager = resource_manager
436 self.__resource_manager.update_output()
439 def output(self) -> OutputDataProxy:
440 return self.__resource_manager.data()
443 class StandardDirector(gmxapi.abc.OperationDirector):
444 """Direct the instantiation of a read_tpr node in a gmxapi.operation Context.
446 .. todo:: Compose this behavior in a more generic class.
448 .. todo:: Describe where instances live.
451 def __init__(self, context: _op.Context):
452 if not isinstance(context, _op.Context):
453 raise gmxapi.exceptions.ValueError('StandardDirector requires a gmxapi.operation Context.')
454 self.context = context
456 def __call__(self, resources: _op.DataSourceCollection, label: str = None) -> StandardOperationHandle:
457 builder = self.context.node_builder(operation=RegisteredOperation, label=label)
459 builder.set_resource_factory(SubscriptionSessionResources)
460 builder.set_input_description(StandardInputDescription())
461 builder.set_handle(StandardOperationHandle)
462 # The runner in the gmxapi.operation context is the servicer for the legacy context.
463 builder.set_runner_director(SubscriptionPublishingRunner)
464 builder.set_output_factory(_output_factory)
465 builder.set_resource_manager(ResourceManager)
467 # Note: we have not yet done any dispatching based on *resources*. We should
468 # translate the resources provided into the form that the Context can subscribe to
469 # using the dispatching resource_factory. In the second draft, this operation
470 # is dispatched to a SimulationModuleContext, which can be subscribed directly
471 # to sources that are either literal filenames or gmxapi.simulation sources,
472 # while standard Futures can be resolved in the standard context.
474 assert isinstance(resources, _op.DataSourceCollection)
475 for name, source in resources.items():
476 builder.add_input(name, source)
478 handle = builder.build()
479 assert isinstance(handle, StandardOperationHandle)
482 def handle_type(self, context: gmxapi.abc.Context):
483 return StandardOperationHandle
485 # Developer note: the Director instance is a convenient place to get a dispatching
486 # factory. The Director may become generic or more universal, but the resource_factory
487 # would likely not be typed on the generic parameters of the Director class.
488 # Instead, it is likely a generic function with its own TypeVar parameters.
489 def resource_factory(self,
490 source: typing.Union[gmxapi.abc.Context, ModuleObject, None],
491 target: gmxapi.abc.Context = None):
492 # Distinguish between the UIContext, in which input is in the form
493 # of function call arguments, and the StandardContext, implemented in
494 # gmxapi.operation. UIContext is probably a virtual context that is
495 # asserted by callers in order to get a factory that normalizes UI input
496 # for the StandardContext.
499 target = self.context
501 if isinstance(target, _op.Context):
502 # Return a factory that can bind to function call arguments to produce a DataSourceCollection.
503 return _standard_node_resource_factory
504 if isinstance(source, _op.Context):
505 return SubscriptionSessionResources
506 if isinstance(source, ModuleObject):
507 if isinstance(target, _op.Context):
508 # We are creating a node in gmxapi.operation.Context from another gmxapi.simulation operation.
509 # This means that we want to subscribe to the subcontext instead of the gmxapi.operation.Context.
510 # In the first draft, though, we just access a special payload.
511 # Return a factory that will consume *_simulation_input* and *parameters*
512 # members of a received object.
513 logger.info('Building mdrun operation from source {}'.format(source))
515 def simulation_input_workaround(input):
517 if hasattr(source, 'output'):
518 source = input.output
519 assert hasattr(source, '_simulation_input')
520 assert hasattr(source, 'parameters')
521 logger.info('mdrun receiving input {}: {}'.format(source._simulation_input.name,
522 source._simulation_input.description))
523 source_collection = _input.bind(_simulation_input=source._simulation_input,
524 parameters=source.parameters)
525 logger.info('mdrun input bound as source collection {}'.format(source_collection))
526 return source_collection
528 return simulation_input_workaround
530 raise gmxapi.exceptions.ValueError('No dispatching from {} context to {}'.format(source, target))
533 def mdrun(input, label: str = None, context=None):
534 """MD simulation operation.
537 input : valid simulation input
540 runnable operation to perform the specified simulation
542 The *output* attribute of the returned operation handle contains dynamically
543 determined outputs from the operation.
545 `input` may be a TPR file name or a an object providing the SimulationInput interface.
548 New function names will be appearing to handle tasks that are separate
550 "simulate" is plausibly a dispatcher or base class for various tasks
551 dispatched by mdrun. Specific work factories are likely "minimize,"
552 "test_particle_insertion," "legacy_simulation" (do_md), or "simulation"
553 composition (which may be leap-frog, vv, and other algorithms)
555 handle_context = context
556 if handle_context is not None:
557 raise gmxapi.exceptions.NotImplementedError(
558 'context must be None. This factory is only for the Python UI right now.')
560 target_context = _op.current_context()
561 assert isinstance(target_context, _op.Context)
562 # Get a director that will create a node in the standard context.
563 node_director = _op._get_operation_director(RegisteredOperation, context=target_context)
564 assert isinstance(node_director, StandardDirector)
565 # TODO: refine this protocol
566 assert handle_context is None
568 if isinstance(input, ModuleObject):
569 # The input is from read_tpr or modify_input.
570 source_context = input
572 # Allow automatic dispatching
573 source_context = None
575 resource_factory = node_director.resource_factory(source=source_context, target=target_context)
576 resources = resource_factory(input)
577 handle = node_director(resources=resources, label=label)
578 # Note: One effect of the assertions above is to help the type checker infer
579 # the return type of the handle. It is hard to convince the type checker that
580 # the return value of the node builder is up-cast. We might be able to resolve
581 # this by making both get_operation_director and ReadTprImplementation
582 # generics of the handle type, or using the handle type as the key for
583 # get_operation_director.