2 # This file is part of the GROMACS molecular simulation package.
4 # Copyright (c) 2019, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 # Lesser General Public License for more details.
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
35 """mdrun operation module
37 The mdrun operation (in its first draft) conforms to the user-level API, but does
38 not use the Python Context resource manager. It uses either the legacy 0.0.7
39 Context or its own Context, also implemented in this module.
47 from contextlib import contextmanager
51 import gmxapi.operation as _op
52 from gmxapi import exceptions
54 # The following imports are not marked as public API.
55 from .abc import ModuleObject
58 from . import workflow
60 # Initialize module-level logger
61 from gmxapi import logger as root_logger
63 logger = root_logger.getChild('mdrun')
64 logger.info('Importing {}'.format(__name__))
67 # Output in the gmxapi.operation Context.
68 # TODO: Consider using a single base class for the DataProxy, but have distinct
69 # data descriptor behavior (or different descriptor implementations in different
70 # subclasses) so that static code inspection can more easily determine the
71 # attributes of the data proxies.
72 _output_descriptors = (
73 _op.OutputDataDescriptor('_work_dir', str),
74 _op.OutputDataDescriptor('trajectory', str),
75 _op.OutputDataDescriptor('parameters', dict)
77 _publishing_descriptors = {desc._name: gmxapi.operation.Publisher(desc._name, desc._dtype) for desc in
79 _output = _op.OutputCollectionDescription(**{descriptor._name: descriptor._dtype for descriptor in
83 class OutputDataProxy(_op.DataProxyBase,
84 descriptors=_output_descriptors):
85 """Implement the 'output' attribute of MDRun operations."""
88 class PublishingDataProxy(_op.DataProxyBase,
89 descriptors=_publishing_descriptors):
90 """Manage output resource updates for MDRun operation."""
93 _output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
94 output_description=_output,
95 publishing_data_proxy=PublishingDataProxy)
98 # Input in the gmxapi.operation Context for the dispatching runner.
99 # The default empty dictionary for parameters just means that there are no overrides
100 # to the parameters already provided in _simulation_input.
101 _input = _op.InputCollectionDescription(
102 [('_simulation_input', inspect.Parameter('_simulation_input',
103 inspect.Parameter.POSITIONAL_OR_KEYWORD,
105 ('parameters', inspect.Parameter('parameters',
106 inspect.Parameter.POSITIONAL_OR_KEYWORD,
112 def _standard_node_resource_factory(*args, **kwargs):
113 """Translate Python UI input to the gmxapi.operation node builder inputs."""
114 source_collection = _input.bind(*args, **kwargs)
115 logger.info('mdrun input bound as source collection {}'.format(source_collection))
116 return source_collection
120 def scoped_communicator(original_comm, requested_size: int = None):
121 from gmxapi.simulation.context import _acquire_communicator, _get_ensemble_communicator
123 if requested_size is None:
124 communicator = _acquire_communicator(communicator=original_comm)
127 if original_comm is None or not hasattr(original_comm, 'Get_size'):
128 raise exceptions.UsageError('A valid communicator must be provided when requesting a specific size.')
129 original_comm_size = original_comm.Get_size()
130 if original_comm_size < requested_size:
131 raise exceptions.FeatureNotAvailableError(
132 'Cannot produce a subcommunicator of size {} from a communicator of size {}.'.format(
136 assert original_comm_size >= requested_size
137 communicator = _get_ensemble_communicator(original_comm, requested_size)
145 class LegacyImplementationSubscription(object):
146 """Input type representing a subscription to 0.0.7 implementation in gmxapi.operation.
148 This input resource is a subscription to work that is dispatched to a sub-context.
149 The resource can be created from the standard data of the simulation module.
152 def __init__(self, resource_manager: _op.ResourceManager):
153 from .context import Context as LegacyContext
154 import gmxapi._gmxapi as _gmxapi
155 self._gmxapi = _gmxapi
157 assert isinstance(resource_manager, _op.ResourceManager)
158 # We use several details of the gmxapi.operation.Context.ResourceManager.
159 # These dependencies can evolve into the subscription protocol between Contexts.
161 # Configure and run a gmxapi 0.0.7 session.
162 # 0. Determine ensemble width.
163 # 1. Choose, create/check working directories.
164 # 2. Create source TPR.
165 # 3. Create workspec.
166 # 3.5 Add plugin potentials, if any.
168 # 5. Collect outputs from context (including plugin outputs) and be ready to publish them.
170 # Determine ensemble width
171 ensemble_width = resource_manager.ensemble_width
173 # Choose working directories
174 # TODO: operation working directory naming scheme should be centrally well-defined.
175 # Note that workflow.WorkSpec.uid is currently dependent on the input file parameter,
176 # so we cannot create the input file path in the working directory based on WorkSpec.uid.
177 workdir_list = ['{node}_{member}'.format(node=resource_manager.operation_id,
179 for member in range(ensemble_width)]
180 parameters_dict_list = [{}] * ensemble_width
182 # This is a reasonable place to start using MPI ensemble implementation details.
183 # We will want better abstraction in the future, but it is best if related filesystem
184 # accesses occur from the same processes, consistently. Note that we already
185 # handle some optimization and dependency reduction when the ensemble size is 1.
186 # TODO: multithread and multiprocess alternatives to MPI ensemble management.
188 # TODO: Allow user to provide communicator instead of implicitly getting COMM_WORLD
189 with scoped_communicator(None) as context_comm:
190 context_rank = context_comm.Get_rank()
191 with scoped_communicator(context_comm, ensemble_width) as ensemble_comm:
192 # Note that in the current implementation, extra ranks have nothing to do,
193 # but they may have a dummy communicator, so be sure to skip those members
194 # of the context_comm.
195 if context_rank < ensemble_width:
196 assert ensemble_comm.Get_size() == ensemble_width
197 ensemble_rank = ensemble_comm.Get_rank()
198 # TODO: This should be a richer object that includes at least host information
199 # and preferably the full gmxapi Future interface.
200 self.workdir = os.path.abspath(workdir_list[ensemble_rank])
202 with resource_manager.local_input(member=ensemble_rank) as input_pack:
203 source_file = input_pack.kwargs['_simulation_input']
204 parameters = input_pack.kwargs['parameters']
205 # If there are any other key word arguments to process from the gmxapi.mdrun
206 # factory call, do it here.
208 # TODO: We should really name this file with a useful input-dependent tag.
209 tprfile = os.path.join(self.workdir, 'topol.tpr')
211 expected_working_files = [tprfile]
213 if os.path.exists(self.workdir):
214 if os.path.isdir(self.workdir):
215 # Confirm that this is a restarted simulation.
216 # It is unspecified by the API, but at least through gmxapi 0.1,
217 # all simulations are initialized with a checkpoint file named state.cpt
218 # (see src/api/cpp/context.cpp)
219 checkpoint_file = os.path.join(self.workdir, 'state.cpp')
220 expected_working_files.append(checkpoint_file)
222 for file in expected_working_files:
223 if not os.path.exists(file):
224 raise exceptions.ApiError(
225 'Cannot determine working directory state: {}'.format(self.workdir))
227 raise exceptions.ApiError(
228 'Chosen working directory path exists but is not a directory: {}'.format(self.workdir))
230 # Build the working directory and input files.
231 os.mkdir(self.workdir)
232 sim_input = fileio.read_tpr(source_file)
233 for key, value in parameters.items():
235 sim_input.parameters.set(key=key, value=value)
236 except _gmxapi.Exception as e:
237 raise exceptions.ApiError(
238 'Bug encountered. Unknown error when trying to set simulation '
239 'parameter {} to {}'.format(key, value)
242 fileio.write_tpr_file(output=tprfile, input=sim_input)
243 logger.info('Created {} on rank {}'.format(tprfile, context_rank))
245 # Gather the actual outputs from the ensemble members.
246 if hasattr(ensemble_comm, 'allgather'):
247 # We should not assume that abspath expands the same on different MPI ranks.
248 workdir_list = ensemble_comm.allgather(self.workdir)
249 tpr_filenames = ensemble_comm.allgather(tprfile)
250 parameters = fileio.read_tpr(tprfile).parameters.extract()
251 parameters_dict_list = ensemble_comm.allgather(parameters)
253 workdir_list = [os.path.abspath(workdir) for workdir in workdir_list]
254 # TODO: If we use better input file names, they need to be updated in multiple places.
255 tpr_filenames = [os.path.join(workdir, 'topol.tpr') for workdir in workdir_list]
256 parameters_dict_list = [fileio.read_tpr(tprfile).parameters.extract() for tprfile in tpr_filenames]
258 logger.debug('Context rank {} acknowledges working directories {}'.format(context_rank,
260 logger.debug('Operation {}:{} will use {}'.format(resource_manager.operation_id,
264 # TODO: We have not exposed the ability to pass any run time parameters to mdrun.
265 work = workflow.from_tpr(tpr_filenames)
266 self.workspec = work.workspec
267 context = LegacyContext(work=self.workspec, workdir_list=workdir_list, communicator=ensemble_comm)
268 self.simulation_module_context = context
269 # Go ahead and execute immediately. No need for lazy initialization in this basic case.
270 with self.simulation_module_context as session:
272 # TODO: There may be some additional results that we need to extract...
273 # end: if context_rank < ensemble_width
275 # end scoped_communicator: ensemble_comm
277 if context_comm.Get_size() > 1:
278 context_comm.bcast(workdir_list, root=0)
279 # end scoped_communicator: context_comm
281 self.workdir = workdir_list
282 self.parameters = parameters_dict_list
285 class SubscriptionSessionResources(object):
286 """Input and output run-time resources for a MDRun subscription.
288 A generalization of this class is the probably the main hook for customizing the resources
289 provided to the operation at run time.
291 .. todo:: Better factoring of SessionResources, ResourceFactory, Director.resource_factory.
294 def __init__(self, input: LegacyImplementationSubscription, output: PublishingDataProxy):
295 assert isinstance(input, LegacyImplementationSubscription)
296 assert isinstance(output, PublishingDataProxy)
298 member_id = self.output._client_identifier
299 # Before enabling the following, be sure we understand what is happening.
300 # if member_id is None:
302 self.workdir = input.workdir[member_id]
303 self.parameters = input.parameters[member_id]
306 class SubscriptionPublishingRunner(object):
307 """Handle execution in the gmxapi.operation context as a subscription to the gmxapi.simulation.context."""
308 input_resource_factory = LegacyImplementationSubscription
310 def __init__(self, resources: SubscriptionSessionResources):
311 assert isinstance(resources, SubscriptionSessionResources)
312 # Note that the resources contain a reference to a simulation ensemble that has already run.
313 self.resources = resources
316 """Operation implementation in the gmxapi.operation module context."""
317 publisher = self.resources.output
318 publisher._work_dir = self.resources.workdir
319 publisher.parameters = self.resources.parameters
320 # TODO: Make the return value a trajectory handle rather than a file path.
321 # TODO: Decide how to handle append vs. noappend output.
322 # TODO: More rigorous handling of the trajectory file(s)
323 # We have no way to query the name of the trajectory produced, and we
324 # have avoided exposing the ability to specify it, so we have to assume
325 # GROMACS default behavior.
326 publisher.trajectory = os.path.join(self.resources.workdir, 'traj.trr')
332 def _make_uid(input) -> str:
333 # TODO: Use input fingerprint for more useful identification.
336 new_uid = 'mdrun_{}_{}'.format(_next_uid, salt)
346 class ResourceManager(gmxapi.operation.ResourceManager):
347 """Manage resources for the MDRun operation in the gmxapi.operation contexts.
349 Extends gmxapi.operation.ResourceManager to tolerate non-standard data payloads.
350 Futures managed by this resource manager may contain additional attributes.
353 def future(self, name: str, description: _op.ResultDescription):
354 tpr_future = super().future(name=name, description=description)
357 def data(self) -> OutputDataProxy:
358 return OutputDataProxy(self)
360 def update_output(self):
361 """Override gmxapi.operation.ResourceManager.update_output because we handle paralellism as 0.0.7."""
362 # For the moment, this is copy-pasted from gmxapi.operation.ResourceManager,
363 # but the only part we need to override is the ensemble handling at `for i in range(self.ensemble_width)`
364 # TODO: Reimplement as the resource factory and director for the operation target context.
366 self.__operation_entrance_counter += 1
367 if self.__operation_entrance_counter > 1:
368 raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
370 # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
371 # operation's library/implementation context.
374 # Note: this is the resource translation from gmxapi.operation context
375 # to the dispatching runner director. It uses details of the gmxapi.operation.Context
376 # and of the operation.
378 # TODO: Dispatch/discover this resource factory from a canonical place.
379 assert hasattr(self._runner_director, 'input_resource_factory')
380 # Create on all ranks.
381 input = self._runner_director.input_resource_factory(self)
382 # End of action of the InputResourceDirector[Context, MdRunSubscription].
385 # We are giving the director a resource that contains the subscription
386 # to the dispatched work.
388 publishing_resources = self.publishing_resources()
389 for member in range(self.ensemble_width):
390 with publishing_resources(ensemble_member=member) as output:
391 resources = self._resource_factory(input=input, output=output)
392 runner = self._runner_director(resources)
396 class StandardInputDescription(_op.InputDescription):
397 """Provide the ReadTpr input description in gmxapi.operation Contexts."""
399 def signature(self) -> _op.InputCollectionDescription:
402 def make_uid(self, input: _op.DataEdge) -> str:
403 return _make_uid(input)
406 class RegisteredOperation(_op.OperationImplementation, metaclass=_op.OperationMeta):
407 """Provide the gmxapi compatible ReadTpr implementation."""
409 # This is a class method to allow the class object to be used in gmxapi.operation._make_registry_key
411 def name(self) -> str:
412 """Canonical name for the operation."""
416 def namespace(self) -> str:
417 """read_tpr is importable from the gmxapi module."""
421 def director(cls, context: gmxapi.abc.Context) -> _op.OperationDirector:
422 if isinstance(context, _op.Context):
423 return StandardDirector(context)
426 class StandardOperationHandle(_op.AbstractOperation):
427 """Handle used in Python UI or gmxapi.operation Contexts."""
429 def __init__(self, resource_manager: ResourceManager):
430 self.__resource_manager = resource_manager
433 self.__resource_manager.update_output()
436 def output(self) -> OutputDataProxy:
437 return self.__resource_manager.data()
440 class StandardDirector(gmxapi.abc.OperationDirector):
441 """Direct the instantiation of a read_tpr node in a gmxapi.operation Context.
443 .. todo:: Compose this behavior in a more generic class.
445 .. todo:: Describe where instances live.
448 def __init__(self, context: _op.Context):
449 if not isinstance(context, _op.Context):
450 raise gmxapi.exceptions.ValueError('StandardDirector requires a gmxapi.operation Context.')
451 self.context = context
453 def __call__(self, resources: _op.DataSourceCollection, label: str = None) -> StandardOperationHandle:
454 builder = self.context.node_builder(operation=RegisteredOperation, label=label)
456 builder.set_resource_factory(SubscriptionSessionResources)
457 builder.set_input_description(StandardInputDescription())
458 builder.set_handle(StandardOperationHandle)
459 # The runner in the gmxapi.operation context is the servicer for the legacy context.
460 builder.set_runner_director(SubscriptionPublishingRunner)
461 builder.set_output_factory(_output_factory)
462 builder.set_resource_manager(ResourceManager)
464 # Note: we have not yet done any dispatching based on *resources*. We should
465 # translate the resources provided into the form that the Context can subscribe to
466 # using the dispatching resource_factory. In the second draft, this operation
467 # is dispatched to a SimulationModuleContext, which can be subscribed directly
468 # to sources that are either literal filenames or gmxapi.simulation sources,
469 # while standard Futures can be resolved in the standard context.
471 assert isinstance(resources, _op.DataSourceCollection)
472 for name, source in resources.items():
473 builder.add_input(name, source)
475 handle = builder.build()
476 assert isinstance(handle, StandardOperationHandle)
479 def handle_type(self, context: gmxapi.abc.Context):
480 return StandardOperationHandle
482 # Developer note: the Director instance is a convenient place to get a dispatching
483 # factory. The Director may become generic or more universal, but the resource_factory
484 # would likely not be typed on the generic parameters of the Director class.
485 # Instead, it is likely a generic function with its own TypeVar parameters.
486 def resource_factory(self,
487 source: typing.Union[gmxapi.abc.Context, ModuleObject, None],
488 target: gmxapi.abc.Context = None):
489 # Distinguish between the UIContext, in which input is in the form
490 # of function call arguments, and the StandardContext, implemented in
491 # gmxapi.operation. UIContext is probably a virtual context that is
492 # asserted by callers in order to get a factory that normalizes UI input
493 # for the StandardContext.
496 target = self.context
498 if isinstance(target, _op.Context):
499 # Return a factory that can bind to function call arguments to produce a DataSourceCollection.
500 return _standard_node_resource_factory
501 if isinstance(source, _op.Context):
502 return SubscriptionSessionResources
503 if isinstance(source, ModuleObject):
504 if isinstance(target, _op.Context):
505 # We are creating a node in gmxapi.operation.Context from another gmxapi.simulation operation.
506 # This means that we want to subscribe to the subcontext instead of the gmxapi.operation.Context.
507 # In the first draft, though, we just access a special payload.
508 # Return a factory that will consume *_simulation_input* and *parameters*
509 # members of a received object.
510 logger.info('Building mdrun operation from source {}'.format(source))
512 def simulation_input_workaround(input):
514 if hasattr(source, 'output'):
515 source = input.output
516 assert hasattr(source, '_simulation_input')
517 assert hasattr(source, 'parameters')
518 logger.info('mdrun receiving input {}: {}'.format(source._simulation_input.name,
519 source._simulation_input.description))
520 source_collection = _input.bind(_simulation_input=source._simulation_input,
521 parameters=source.parameters)
522 logger.info('mdrun input bound as source collection {}'.format(source_collection))
523 return source_collection
525 return simulation_input_workaround
527 raise gmxapi.exceptions.ValueError('No dispatching from {} context to {}'.format(source, target))
530 def mdrun(input, label: str = None, context=None):
531 """MD simulation operation.
534 input : valid simulation input
537 runnable operation to perform the specified simulation
539 The *output* attribute of the returned operation handle contains dynamically
540 determined outputs from the operation.
542 `input` may be a TPR file name or a an object providing the SimulationInput interface.
545 New function names will be appearing to handle tasks that are separate
547 "simulate" is plausibly a dispatcher or base class for various tasks
548 dispatched by mdrun. Specific work factories are likely "minimize,"
549 "test_particle_insertion," "legacy_simulation" (do_md), or "simulation"
550 composition (which may be leap-frog, vv, and other algorithms)
552 handle_context = context
553 if handle_context is not None:
554 raise gmxapi.exceptions.NotImplementedError(
555 'context must be None. This factory is only for the Python UI right now.')
557 target_context = _op.current_context()
558 assert isinstance(target_context, _op.Context)
559 # Get a director that will create a node in the standard context.
560 node_director = _op._get_operation_director(RegisteredOperation, context=target_context)
561 assert isinstance(node_director, StandardDirector)
562 # TODO: refine this protocol
563 assert handle_context is None
565 if isinstance(input, ModuleObject):
566 # The input is from read_tpr or modify_input.
567 source_context = input
569 # Allow automatic dispatching
570 source_context = None
572 resource_factory = node_director.resource_factory(source=source_context, target=target_context)
573 resources = resource_factory(input)
574 handle = node_director(resources=resources, label=label)
575 # Note: One effect of the assertions above is to help the type checker infer
576 # the return type of the handle. It is hard to convince the type checker that
577 # the return value of the node builder is up-cast. We might be able to resolve
578 # this by making both get_operation_director and ReadTprImplementation
579 # generics of the handle type, or using the handle type as the key for
580 # get_operation_director.