2 # This file is part of the GROMACS molecular simulation package.
4 # Copyright (c) 2019,2020, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 # Lesser General Public License for more details.
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
35 """mdrun operation module
37 The mdrun operation (in its first draft) conforms to the user-level API, but does
38 not use the Python Context resource manager. It uses either the legacy 0.0.7
39 Context or its own Context, also implemented in this module.
47 from contextlib import contextmanager
51 import gmxapi.operation as _op
52 from gmxapi import exceptions
54 # The following imports are not marked as public API.
55 from .abc import ModuleObject
58 from . import workflow
60 # Initialize module-level logger
61 from gmxapi import logger as root_logger
63 logger = root_logger.getChild('mdrun')
64 logger.info('Importing {}'.format(__name__))
67 # Output in the gmxapi.operation Context.
68 # TODO: Consider using a single base class for the DataProxy, but have distinct
69 # data descriptor behavior (or different descriptor implementations in different
70 # subclasses) so that static code inspection can more easily determine the
71 # attributes of the data proxies.
72 _output_descriptors = (
73 _op.OutputDataDescriptor('_work_dir', str),
74 _op.OutputDataDescriptor('trajectory', str),
75 _op.OutputDataDescriptor('parameters', dict)
77 _publishing_descriptors = {desc._name: gmxapi.operation.Publisher(desc._name, desc._dtype) for desc in
79 _output = _op.OutputCollectionDescription(**{descriptor._name: descriptor._dtype for descriptor in
83 class OutputDataProxy(_op.DataProxyBase,
84 descriptors=_output_descriptors):
85 """Implement the 'output' attribute of MDRun operations."""
88 class PublishingDataProxy(_op.DataProxyBase,
89 descriptors=_publishing_descriptors):
90 """Manage output resource updates for MDRun operation."""
93 _output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
94 output_description=_output,
95 publishing_data_proxy=PublishingDataProxy)
98 # Input in the gmxapi.operation Context for the dispatching runner.
99 # The default empty dictionary for parameters just means that there are no overrides
100 # to the parameters already provided in _simulation_input.
101 _input = _op.InputCollectionDescription(
102 [('_simulation_input', inspect.Parameter('_simulation_input',
103 inspect.Parameter.POSITIONAL_OR_KEYWORD,
105 ('parameters', inspect.Parameter('parameters',
106 inspect.Parameter.POSITIONAL_OR_KEYWORD,
112 def _standard_node_resource_factory(*args, **kwargs):
113 """Translate Python UI input to the gmxapi.operation node builder inputs."""
114 source_collection = _input.bind(*args, **kwargs)
115 logger.info('mdrun input bound as source collection {}'.format(source_collection))
116 return source_collection
120 def scoped_communicator(original_comm, requested_size: int = None):
121 from gmxapi.simulation.context import _acquire_communicator, _get_ensemble_communicator
123 if requested_size is None:
124 communicator = _acquire_communicator(communicator=original_comm)
127 if original_comm is None or not hasattr(original_comm, 'Get_size'):
128 raise exceptions.UsageError('A valid communicator must be provided when requesting a specific size.')
129 original_comm_size = original_comm.Get_size()
130 if original_comm_size < requested_size:
131 raise exceptions.FeatureNotAvailableError(
132 'Cannot produce a subcommunicator of size {} from a communicator of size {}.'.format(
136 assert original_comm_size >= requested_size
137 communicator = _get_ensemble_communicator(original_comm, requested_size)
145 class LegacyImplementationSubscription(object):
146 """Input type representing a subscription to 0.0.7 implementation in gmxapi.operation.
148 This input resource is a subscription to work that is dispatched to a sub-context.
149 The resource can be created from the standard data of the simulation module.
152 def __init__(self, resource_manager: _op.ResourceManager):
153 from .context import Context as LegacyContext
154 import gmxapi._gmxapi as _gmxapi
155 self._gmxapi = _gmxapi
157 assert isinstance(resource_manager, _op.ResourceManager)
158 # We use several details of the gmxapi.operation.Context.ResourceManager.
159 # These dependencies can evolve into the subscription protocol between Contexts.
161 # Configure and run a gmxapi 0.0.7 session.
162 # 0. Determine ensemble width.
163 # 1. Choose, create/check working directories.
164 # 2. Create source TPR.
165 # 3. Create workspec.
166 # 3.5 Add plugin potentials, if any.
168 # 5. Collect outputs from context (including plugin outputs) and be ready to publish them.
170 # Determine ensemble width
171 ensemble_width = resource_manager.ensemble_width
173 # Choose working directories
174 # TODO: operation working directory naming scheme should be centrally well-defined.
175 # Note that workflow.WorkSpec.uid is currently dependent on the input file parameter,
176 # so we cannot create the input file path in the working directory based on WorkSpec.uid.
177 workdir_list = ['{node}_{member}'.format(node=resource_manager.operation_id,
179 for member in range(ensemble_width)]
180 parameters_dict_list = [{}] * ensemble_width
182 # This is a reasonable place to start using MPI ensemble implementation details.
183 # We will want better abstraction in the future, but it is best if related filesystem
184 # accesses occur from the same processes, consistently. Note that we already
185 # handle some optimization and dependency reduction when the ensemble size is 1.
186 # TODO: multithread and multiprocess alternatives to MPI ensemble management.
188 # TODO: Allow user to provide communicator instead of implicitly getting COMM_WORLD
189 with scoped_communicator(None) as context_comm:
190 context_rank = context_comm.Get_rank()
191 with scoped_communicator(context_comm, ensemble_width) as ensemble_comm:
192 # Note that in the current implementation, extra ranks have nothing to do,
193 # but they may have a dummy communicator, so be sure to skip those members
194 # of the context_comm.
195 if context_rank < ensemble_width:
196 assert ensemble_comm.Get_size() == ensemble_width
197 ensemble_rank = ensemble_comm.Get_rank()
198 # TODO: This should be a richer object that includes at least host information
199 # and preferably the full gmxapi Future interface.
200 self.workdir = os.path.abspath(workdir_list[ensemble_rank])
202 with resource_manager.local_input(member=ensemble_rank) as input_pack:
203 source_file = input_pack.kwargs['_simulation_input']
204 parameters = input_pack.kwargs['parameters']
205 # If there are any other key word arguments to process from the gmxapi.mdrun
206 # factory call, do it here.
208 # TODO: We should really name this file with a useful input-dependent tag.
209 tprfile = os.path.join(self.workdir, 'topol.tpr')
211 expected_working_files = [tprfile]
213 if os.path.exists(self.workdir):
214 if os.path.isdir(self.workdir):
215 # Confirm that this is a restarted simulation.
216 # It is unspecified by the API, but at least through gmxapi 0.1,
217 # all simulations are initialized with a checkpoint file named state.cpt
218 # (see src/api/cpp/context.cpp)
219 checkpoint_file = os.path.join(self.workdir, 'state.cpp')
220 expected_working_files.append(checkpoint_file)
222 for file in expected_working_files:
223 if not os.path.exists(file):
224 raise exceptions.ApiError(
225 'Cannot determine working directory state: {}'.format(self.workdir))
227 raise exceptions.ApiError(
228 'Chosen working directory path exists but is not a directory: {}'.format(self.workdir))
230 # Build the working directory and input files.
231 os.mkdir(self.workdir)
232 sim_input = fileio.read_tpr(source_file)
233 for key, value in parameters.items():
235 sim_input.parameters.set(key=key, value=value)
236 except _gmxapi.Exception as e:
237 raise exceptions.ApiError(
238 'Bug encountered. Unknown error when trying to set simulation '
239 'parameter {} to {}'.format(key, value)
242 fileio.write_tpr_file(output=tprfile, input=sim_input)
243 logger.info('Created {} on rank {}'.format(tprfile, context_rank))
245 # Gather the actual outputs from the ensemble members.
246 if hasattr(ensemble_comm, 'allgather'):
247 # We should not assume that abspath expands the same on different MPI ranks.
248 workdir_list = ensemble_comm.allgather(self.workdir)
249 tpr_filenames = ensemble_comm.allgather(tprfile)
250 parameters = fileio.read_tpr(tprfile).parameters.extract()
251 parameters_dict_list = ensemble_comm.allgather(parameters)
253 workdir_list = [os.path.abspath(workdir) for workdir in workdir_list]
254 # TODO: If we use better input file names, they need to be updated in multiple places.
255 tpr_filenames = [os.path.join(workdir, 'topol.tpr') for workdir in workdir_list]
256 parameters_dict_list = [fileio.read_tpr(tprfile).parameters.extract() for tprfile in tpr_filenames]
258 logger.debug('Context rank {} acknowledges working directories {}'.format(context_rank,
260 logger.debug('Operation {}:{} will use {}'.format(resource_manager.operation_id,
264 # TODO: Normalize the way we pass run time parameters to mdrun.
266 kwargs = getattr(resource_manager, 'mdrun_kwargs', {})
267 for key, value in kwargs.items():
268 logger.debug('Adding mdrun run time argument: {}'.format(key + '=' + str(value)))
269 work = workflow.from_tpr(tpr_filenames, **kwargs)
270 self.workspec = work.workspec
271 context = LegacyContext(work=self.workspec, workdir_list=workdir_list, communicator=ensemble_comm)
272 self.simulation_module_context = context
273 # Go ahead and execute immediately. No need for lazy initialization in this basic case.
274 with self.simulation_module_context as session:
276 # TODO: There may be some additional results that we need to extract...
277 # end: if context_rank < ensemble_width
279 # end scoped_communicator: ensemble_comm
281 if context_comm.Get_size() > 1:
282 context_comm.bcast(workdir_list, root=0)
283 # end scoped_communicator: context_comm
285 self.workdir = workdir_list
286 self.parameters = parameters_dict_list
289 class SubscriptionSessionResources(object):
290 """Input and output run-time resources for a MDRun subscription.
292 A generalization of this class is the probably the main hook for customizing the resources
293 provided to the operation at run time.
295 .. todo:: Better factoring of SessionResources, ResourceFactory, Director.resource_factory.
298 def __init__(self, input: LegacyImplementationSubscription, output: PublishingDataProxy):
299 assert isinstance(input, LegacyImplementationSubscription)
300 assert isinstance(output, PublishingDataProxy)
302 member_id = self.output._client_identifier
303 # Before enabling the following, be sure we understand what is happening.
304 # if member_id is None:
306 self.workdir = input.workdir[member_id]
307 self.parameters = input.parameters[member_id]
310 class SubscriptionPublishingRunner(object):
311 """Handle execution in the gmxapi.operation context as a subscription to the gmxapi.simulation.context."""
312 input_resource_factory = LegacyImplementationSubscription
314 def __init__(self, resources: SubscriptionSessionResources):
315 assert isinstance(resources, SubscriptionSessionResources)
316 # Note that the resources contain a reference to a simulation ensemble that has already run.
317 self.resources = resources
320 """Operation implementation in the gmxapi.operation module context."""
321 publisher = self.resources.output
322 publisher._work_dir = self.resources.workdir
323 publisher.parameters = self.resources.parameters
324 # TODO: Make the return value a trajectory handle rather than a file path.
325 # TODO: Decide how to handle append vs. noappend output.
326 # TODO: More rigorous handling of the trajectory file(s)
327 # We have no way to query the name of the trajectory produced, and we
328 # have avoided exposing the ability to specify it, so we have to assume
329 # GROMACS default behavior.
330 publisher.trajectory = os.path.join(self.resources.workdir, 'traj.trr')
336 def _make_uid(input) -> str:
337 # TODO: Use input fingerprint for more useful identification.
340 new_uid = 'mdrun_{}_{}'.format(_next_uid, salt)
350 class ResourceManager(gmxapi.operation.ResourceManager):
351 """Manage resources for the MDRun operation in the gmxapi.operation contexts.
353 Extends gmxapi.operation.ResourceManager to tolerate non-standard data payloads.
354 Futures managed by this resource manager may contain additional attributes.
357 def future(self, name: str, description: _op.ResultDescription):
358 tpr_future = super().future(name=name, description=description)
361 def data(self) -> OutputDataProxy:
362 return OutputDataProxy(self)
364 def update_output(self):
365 """Override gmxapi.operation.ResourceManager.update_output because we handle paralellism as 0.0.7."""
366 # For the moment, this is copy-pasted from gmxapi.operation.ResourceManager,
367 # but the only part we need to override is the ensemble handling at `for i in range(self.ensemble_width)`
368 # TODO: Reimplement as the resource factory and director for the operation target context.
370 self.__operation_entrance_counter += 1
371 if self.__operation_entrance_counter > 1:
372 raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
374 # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
375 # operation's library/implementation context.
378 # Note: this is the resource translation from gmxapi.operation context
379 # to the dispatching runner director. It uses details of the gmxapi.operation.Context
380 # and of the operation.
382 # TODO: Dispatch/discover this resource factory from a canonical place.
383 assert hasattr(self._runner_director, 'input_resource_factory')
384 # Create on all ranks.
385 input = self._runner_director.input_resource_factory(self)
386 # End of action of the InputResourceDirector[Context, MdRunSubscription].
389 # We are giving the director a resource that contains the subscription
390 # to the dispatched work.
392 publishing_resources = self.publishing_resources()
393 for member in range(self.ensemble_width):
394 with publishing_resources(ensemble_member=member) as output:
395 resources = self._resource_factory(input=input, output=output)
396 runner = self._runner_director(resources)
400 class StandardInputDescription(_op.InputDescription):
401 """Provide the ReadTpr input description in gmxapi.operation Contexts."""
403 def signature(self) -> _op.InputCollectionDescription:
406 def make_uid(self, input: _op.DataEdge) -> str:
407 return _make_uid(input)
410 class RegisteredOperation(_op.OperationImplementation, metaclass=_op.OperationMeta):
411 """Provide the gmxapi compatible ReadTpr implementation."""
413 # This is a class method to allow the class object to be used in gmxapi.operation._make_registry_key
415 def name(self) -> str:
416 """Canonical name for the operation."""
420 def namespace(self) -> str:
421 """read_tpr is importable from the gmxapi module."""
425 def director(cls, context: gmxapi.abc.Context) -> _op.OperationDirector:
426 if isinstance(context, _op.Context):
427 return StandardDirector(context)
430 class StandardOperationHandle(_op.AbstractOperation):
431 """Handle used in Python UI or gmxapi.operation Contexts."""
433 def __init__(self, resource_manager: ResourceManager):
434 self.__resource_manager = resource_manager
437 self.__resource_manager.update_output()
440 def output(self) -> OutputDataProxy:
441 return self.__resource_manager.data()
444 class StandardDirector(gmxapi.abc.OperationDirector):
445 """Direct the instantiation of a read_tpr node in a gmxapi.operation Context.
447 .. todo:: Compose this behavior in a more generic class.
449 .. todo:: Describe where instances live.
452 def __init__(self, context: _op.Context):
453 if not isinstance(context, _op.Context):
454 raise gmxapi.exceptions.ValueError('StandardDirector requires a gmxapi.operation Context.')
455 self.context = context
457 def __call__(self, resources: _op.DataSourceCollection, label: str = None) -> StandardOperationHandle:
458 builder = self.context.node_builder(operation=RegisteredOperation, label=label)
460 builder.set_resource_factory(SubscriptionSessionResources)
461 builder.set_input_description(StandardInputDescription())
462 builder.set_handle(StandardOperationHandle)
463 # The runner in the gmxapi.operation context is the servicer for the legacy context.
464 builder.set_runner_director(SubscriptionPublishingRunner)
465 builder.set_output_factory(_output_factory)
466 builder.set_resource_manager(ResourceManager)
468 # Note: we have not yet done any dispatching based on *resources*. We should
469 # translate the resources provided into the form that the Context can subscribe to
470 # using the dispatching resource_factory. In the second draft, this operation
471 # is dispatched to a SimulationModuleContext, which can be subscribed directly
472 # to sources that are either literal filenames or gmxapi.simulation sources,
473 # while standard Futures can be resolved in the standard context.
475 assert isinstance(resources, _op.DataSourceCollection)
476 for name, source in resources.items():
477 builder.add_input(name, source)
479 handle = builder.build()
480 assert isinstance(handle, StandardOperationHandle)
483 def handle_type(self, context: gmxapi.abc.Context):
484 return StandardOperationHandle
486 # Developer note: the Director instance is a convenient place to get a dispatching
487 # factory. The Director may become generic or more universal, but the resource_factory
488 # would likely not be typed on the generic parameters of the Director class.
489 # Instead, it is likely a generic function with its own TypeVar parameters.
490 def resource_factory(self,
491 source: typing.Union[gmxapi.abc.Context, ModuleObject, None],
492 target: gmxapi.abc.Context = None):
493 # Distinguish between the UIContext, in which input is in the form
494 # of function call arguments, and the StandardContext, implemented in
495 # gmxapi.operation. UIContext is probably a virtual context that is
496 # asserted by callers in order to get a factory that normalizes UI input
497 # for the StandardContext.
500 target = self.context
502 if isinstance(target, _op.Context):
503 # Return a factory that can bind to function call arguments to produce a DataSourceCollection.
504 return _standard_node_resource_factory
505 if isinstance(source, _op.Context):
506 return SubscriptionSessionResources
507 if isinstance(source, ModuleObject):
508 if isinstance(target, _op.Context):
509 # We are creating a node in gmxapi.operation.Context from another gmxapi.simulation operation.
510 # This means that we want to subscribe to the subcontext instead of the gmxapi.operation.Context.
511 # In the first draft, though, we just access a special payload.
512 # Return a factory that will consume *_simulation_input* and *parameters*
513 # members of a received object.
514 logger.info('Building mdrun operation from source {}'.format(source))
516 def simulation_input_workaround(input):
518 if hasattr(source, 'output'):
519 source = input.output
520 assert hasattr(source, '_simulation_input')
521 assert hasattr(source, 'parameters')
522 logger.info('mdrun receiving input {}: {}'.format(source._simulation_input.name,
523 source._simulation_input.description))
524 source_collection = _input.bind(_simulation_input=source._simulation_input,
525 parameters=source.parameters)
526 logger.info('mdrun input bound as source collection {}'.format(source_collection))
527 return source_collection
529 return simulation_input_workaround
531 raise gmxapi.exceptions.ValueError('No dispatching from {} context to {}'.format(source, target))
534 def mdrun(input, label: str = None, context=None):
535 """MD simulation operation.
538 input : valid simulation input
541 runnable operation to perform the specified simulation
543 The *output* attribute of the returned operation handle contains dynamically
544 determined outputs from the operation.
546 `input` may be a TPR file name or a an object providing the SimulationInput interface.
549 New function names will be appearing to handle tasks that are separate
551 "simulate" is plausibly a dispatcher or base class for various tasks
552 dispatched by mdrun. Specific work factories are likely "minimize,"
553 "test_particle_insertion," "legacy_simulation" (do_md), or "simulation"
554 composition (which may be leap-frog, vv, and other algorithms)
556 handle_context = context
557 if handle_context is not None:
558 raise gmxapi.exceptions.NotImplementedError(
559 'context must be None. This factory is only for the Python UI right now.')
561 target_context = _op.current_context()
562 assert isinstance(target_context, _op.Context)
563 # Get a director that will create a node in the standard context.
564 node_director = _op._get_operation_director(RegisteredOperation, context=target_context)
565 assert isinstance(node_director, StandardDirector)
566 # TODO: refine this protocol
567 assert handle_context is None
569 if isinstance(input, ModuleObject):
570 # The input is from read_tpr or modify_input.
571 source_context = input
573 # Allow automatic dispatching
574 source_context = None
576 resource_factory = node_director.resource_factory(source=source_context, target=target_context)
577 resources = resource_factory(input)
578 handle = node_director(resources=resources, label=label)
579 # Note: One effect of the assertions above is to help the type checker infer
580 # the return type of the handle. It is hard to convince the type checker that
581 # the return value of the node builder is up-cast. We might be able to resolve
582 # this by making both get_operation_director and ReadTprImplementation
583 # generics of the handle type, or using the handle type as the key for
584 # get_operation_director.