2 # This file is part of the GROMACS molecular simulation package.
4 # Copyright (c) 2019, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 # Lesser General Public License for more details.
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
35 """mdrun operation module
37 The mdrun operation (in its first draft) conforms to the user-level API, but does
38 not use the Python Context resource manager. It uses either the legacy 0.0.7
39 Context or its own Context, also implemented in this module.
47 from contextlib import contextmanager
51 import gmxapi.operation as _op
52 from gmxapi import exceptions
54 # The following imports are not marked as public API.
55 from .abc import ModuleObject
58 from . import workflow
60 # Initialize module-level logger
61 from gmxapi import logger as root_logger
63 logger = root_logger.getChild('mdrun')
64 logger.info('Importing {}'.format(__name__))
67 # Output in the gmxapi.operation Context.
68 # TODO: Consider using a single base class for the DataProxy, but have distinct
69 # data descriptor behavior (or different descriptor implementations in different
70 # subclasses) so that static code inspection can more easily determine the
71 # attributes of the data proxies.
72 _output_descriptors = (
73 _op.OutputDataDescriptor('_work_dir', str),
74 _op.OutputDataDescriptor('trajectory', str)
76 _publishing_descriptors = {desc._name: gmxapi.operation.Publisher(desc._name, desc._dtype) for desc in
78 _output = _op.OutputCollectionDescription(**{descriptor._name: descriptor._dtype for descriptor in
82 class OutputDataProxy(_op.DataProxyBase,
83 descriptors=_output_descriptors):
84 """Implement the 'output' attribute of MDRun operations."""
87 class PublishingDataProxy(_op.DataProxyBase,
88 descriptors=_publishing_descriptors):
89 """Manage output resource updates for MDRun operation."""
92 _output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
93 output_description=_output,
94 publishing_data_proxy=PublishingDataProxy)
97 # Input in the gmxapi.operation Context for the dispatching runner.
98 # The default empty dictionary for parameters just means that there are no overrides
99 # to the parameters already provided in _simulation_input.
100 _input = _op.InputCollectionDescription(
101 [('_simulation_input', inspect.Parameter('_simulation_input',
102 inspect.Parameter.POSITIONAL_OR_KEYWORD,
104 ('parameters', inspect.Parameter('parameters',
105 inspect.Parameter.POSITIONAL_OR_KEYWORD,
111 def _standard_node_resource_factory(*args, **kwargs):
112 """Translate Python UI input to the gmxapi.operation node builder inputs."""
113 source_collection = _input.bind(*args, **kwargs)
114 logger.info('mdrun input bound as source collection {}'.format(source_collection))
115 return source_collection
119 def scoped_communicator(original_comm, requested_size: int = None):
120 from gmxapi.simulation.context import _acquire_communicator, _get_ensemble_communicator
122 if requested_size is None:
123 communicator = _acquire_communicator(communicator=original_comm)
126 if original_comm is None or not hasattr(original_comm, 'Get_size'):
127 raise exceptions.UsageError('A valid communicator must be provided when requesting a specific size.')
128 original_comm_size = original_comm.Get_size()
129 if original_comm_size < requested_size:
130 raise exceptions.FeatureNotAvailableError(
131 'Cannot produce a subcommunicator of size {} from a communicator of size {}.'.format(
135 assert original_comm_size >= requested_size
136 communicator = _get_ensemble_communicator(original_comm, requested_size)
144 class LegacyImplementationSubscription(object):
145 """Input type representing a subscription to 0.0.7 implementation in gmxapi.operation.
147 This input resource is a subscription to work that is dispatched to a sub-context.
148 The resource can be created from the standard data of the simulation module.
151 def __init__(self, resource_manager: _op.ResourceManager):
152 from .context import Context as LegacyContext
153 import gmxapi._gmxapi as _gmxapi
154 self._gmxapi = _gmxapi
156 assert isinstance(resource_manager, _op.ResourceManager)
157 # We use several details of the gmxapi.operation.Context.ResourceManager.
158 # These dependencies can evolve into the subscription protocol between Contexts.
160 # Configure and run a gmxapi 0.0.7 session.
161 # 0. Determine ensemble width.
162 # 1. Choose, create/check working directories.
163 # 2. Create source TPR.
164 # 3. Create workspec.
165 # 3.5 Add plugin potentials, if any.
167 # 5. Collect outputs from context (including plugin outputs) and be ready to publish them.
169 # Determine ensemble width
170 ensemble_width = resource_manager.ensemble_width
172 # Choose working directories
173 # TODO: operation working directory naming scheme should be centrally well-defined.
174 # Note that workflow.WorkSpec.uid is currently dependent on the input file parameter,
175 # so we cannot create the input file path in the working directory based on WorkSpec.uid.
176 workdir_list = ['{node}_{member}'.format(node=resource_manager.operation_id,
178 for member in range(ensemble_width)]
180 # This is a reasonable place to start using MPI ensemble implementation details.
181 # We will want better abstraction in the future, but it is best if related filesystem
182 # accesses occur from the same processes, consistently. Note that we already
183 # handle some optimization and dependency reduction when the ensemble size is 1.
184 # TODO: multithread and multiprocess alternatives to MPI ensemble management.
186 # TODO: Allow user to provide communicator instead of implicitly getting COMM_WORLD
187 with scoped_communicator(None) as context_comm:
188 context_rank = context_comm.Get_rank()
189 with scoped_communicator(context_comm, ensemble_width) as ensemble_comm:
190 # Note that in the current implementation, extra ranks have nothing to do,
191 # but they may have a dummy communicator, so be sure to skip those members
192 # of the context_comm.
193 if context_rank < ensemble_width:
194 assert ensemble_comm.Get_size() == ensemble_width
195 ensemble_rank = ensemble_comm.Get_rank()
196 # TODO: This should be a richer object that includes at least host information
197 # and preferably the full gmxapi Future interface.
198 self.workdir = os.path.abspath(workdir_list[ensemble_rank])
200 with resource_manager.local_input(member=ensemble_rank) as input_pack:
201 source_file = input_pack.kwargs['_simulation_input']
202 parameters = input_pack.kwargs['parameters']
203 # If there are any other key word arguments to process from the gmxapi.mdrun
204 # factory call, do it here.
206 # TODO: We should really name this file with a useful input-dependent tag.
207 tprfile = os.path.join(self.workdir, 'topol.tpr')
209 expected_working_files = [tprfile]
211 if os.path.exists(self.workdir):
212 if os.path.isdir(self.workdir):
213 # Confirm that this is a restarted simulation.
214 # It is unspecified by the API, but at least through gmxapi 0.1,
215 # all simulations are initialized with a checkpoint file named state.cpt
216 # (see src/api/cpp/context.cpp)
217 checkpoint_file = os.path.join(self.workdir, 'state.cpp')
218 expected_working_files.append(checkpoint_file)
220 for file in expected_working_files:
221 if not os.path.exists(file):
222 raise exceptions.ApiError(
223 'Cannot determine working directory state: {}'.format(self.workdir))
225 raise exceptions.ApiError(
226 'Chosen working directory path exists but is not a directory: {}'.format(self.workdir))
228 # Build the working directory and input files.
229 os.mkdir(self.workdir)
230 sim_input = fileio.read_tpr(source_file)
231 for key, value in parameters.items():
233 sim_input.parameters.set(key=key, value=value)
234 except _gmxapi.Exception as e:
235 raise exceptions.ApiError(
236 'Bug encountered. Unknown error when trying to set simulation '
237 'parameter {} to {}'.format(key, value)
240 fileio.write_tpr_file(output=tprfile, input=sim_input)
241 logger.info('Created {} on rank {}'.format(tprfile, context_rank))
243 # Gather the actual working directories from the ensemble members.
244 if hasattr(ensemble_comm, 'allgather'):
245 # We should not assume that abspath expands the same on different MPI ranks.
246 workdir_list = ensemble_comm.allgather(self.workdir)
247 tpr_filenames = ensemble_comm.allgather(tprfile)
249 workdir_list = [os.path.abspath(workdir) for workdir in workdir_list]
250 # TODO: If we use better input file names, they need to be updated in multiple places.
251 tpr_filenames = [os.path.join(workdir, 'topol.tpr') for workdir in workdir_list]
253 logger.debug('Context rank {} acknowledges working directories {}'.format(context_rank,
255 logger.debug('Operation {}:{} will use {}'.format(resource_manager.operation_id,
259 # TODO: We have not exposed the ability to pass any run time parameters to mdrun.
260 work = workflow.from_tpr(tpr_filenames)
261 self.workspec = work.workspec
262 context = LegacyContext(work=self.workspec, workdir_list=workdir_list, communicator=ensemble_comm)
263 self.simulation_module_context = context
264 # Go ahead and execute immediately. No need for lazy initialization in this basic case.
265 with self.simulation_module_context as session:
267 # TODO: There may be some additional results that we need to extract...
268 # end: if context_rank < ensemble_width
270 # end scoped_communicator: ensemble_comm
272 if context_comm.Get_size() > 1:
273 context_comm.bcast(workdir_list, root=0)
274 # end scoped_communicator: context_comm
276 self.workdir = workdir_list
279 class SubscriptionSessionResources(object):
280 """Input and output run-time resources for a MDRun subscription.
282 A generalization of this class is the probably the main hook for customizing the resources
283 provided to the operation at run time.
285 .. todo:: Better factoring of SessionResources, ResourceFactory, Director.resource_factory.
288 def __init__(self, input: LegacyImplementationSubscription, output: PublishingDataProxy):
289 assert isinstance(input, LegacyImplementationSubscription)
290 assert isinstance(output, PublishingDataProxy)
292 member_id = self.output._client_identifier
293 # Before enabling the following, be sure we understand what is happening.
294 # if member_id is None:
296 self.workdir = input.workdir[member_id]
299 class SubscriptionPublishingRunner(object):
300 """Handle execution in the gmxapi.operation context as a subscription to the gmxapi.simulation.context."""
301 input_resource_factory = LegacyImplementationSubscription
303 def __init__(self, resources: SubscriptionSessionResources):
304 assert isinstance(resources, SubscriptionSessionResources)
305 # Note that the resources contain a reference to a simulation ensemble that has already run.
306 self.resources = resources
309 """Operation implementation in the gmxapi.operation module context."""
310 publisher = self.resources.output
311 publisher._work_dir = self.resources.workdir
312 # TODO: Make the return value a trajectory handle rather than a file path.
313 # TODO: Decide how to handle append vs. noappend output.
314 # TODO: More rigorous handling of the trajectory file(s)
315 # We have no way to query the name of the trajectory produced, and we
316 # have avoided exposing the ability to specify it, so we have to assume
317 # GROMACS default behavior.
318 publisher.trajectory = os.path.join(self.resources.workdir, 'traj.trr')
324 def _make_uid(input) -> str:
325 # TODO: Use input fingerprint for more useful identification.
328 new_uid = 'mdrun_{}_{}'.format(_next_uid, salt)
338 class ResourceManager(gmxapi.operation.ResourceManager):
339 """Manage resources for the MDRun operation in the gmxapi.operation contexts.
341 Extends gmxapi.operation.ResourceManager to tolerate non-standard data payloads.
342 Futures managed by this resource manager may contain additional attributes.
345 def future(self, name: str, description: _op.ResultDescription):
346 tpr_future = super().future(name=name, description=description)
349 def data(self) -> OutputDataProxy:
350 return OutputDataProxy(self)
352 def update_output(self):
353 """Override gmxapi.operation.ResourceManager.update_output because we handle paralellism as 0.0.7."""
354 # For the moment, this is copy-pasted from gmxapi.operation.ResourceManager,
355 # but the only part we need to override is the ensemble handling at `for i in range(self.ensemble_width)`
356 # TODO: Reimplement as the resource factory and director for the operation target context.
358 self.__operation_entrance_counter += 1
359 if self.__operation_entrance_counter > 1:
360 raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
362 # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
363 # operation's library/implementation context.
366 # Note: this is the resource translation from gmxapi.operation context
367 # to the dispatching runner director. It uses details of the gmxapi.operation.Context
368 # and of the operation.
370 # TODO: Dispatch/discover this resource factory from a canonical place.
371 assert hasattr(self._runner_director, 'input_resource_factory')
372 # Create on all ranks.
373 input = self._runner_director.input_resource_factory(self)
374 # End of action of the InputResourceDirector[Context, MdRunSubscription].
377 # We are giving the director a resource that contains the subscription
378 # to the dispatched work.
380 publishing_resources = self.publishing_resources()
381 for member in range(self.ensemble_width):
382 with publishing_resources(ensemble_member=member) as output:
383 resources = self._resource_factory(input=input, output=output)
384 runner = self._runner_director(resources)
388 class StandardInputDescription(_op.InputDescription):
389 """Provide the ReadTpr input description in gmxapi.operation Contexts."""
391 def signature(self) -> _op.InputCollectionDescription:
394 def make_uid(self, input: _op.DataEdge) -> str:
395 return _make_uid(input)
398 class RegisteredOperation(_op.OperationImplementation, metaclass=_op.OperationMeta):
399 """Provide the gmxapi compatible ReadTpr implementation."""
401 # This is a class method to allow the class object to be used in gmxapi.operation._make_registry_key
403 def name(self) -> str:
404 """Canonical name for the operation."""
408 def namespace(self) -> str:
409 """read_tpr is importable from the gmxapi module."""
413 def director(cls, context: gmxapi.abc.Context) -> _op.OperationDirector:
414 if isinstance(context, _op.Context):
415 return StandardDirector(context)
418 class StandardOperationHandle(_op.AbstractOperation):
419 """Handle used in Python UI or gmxapi.operation Contexts."""
421 def __init__(self, resource_manager: ResourceManager):
422 self.__resource_manager = resource_manager
425 self.__resource_manager.update_output()
428 def output(self) -> OutputDataProxy:
429 return self.__resource_manager.data()
432 class StandardDirector(gmxapi.abc.OperationDirector):
433 """Direct the instantiation of a read_tpr node in a gmxapi.operation Context.
435 .. todo:: Compose this behavior in a more generic class.
437 .. todo:: Describe where instances live.
440 def __init__(self, context: _op.Context):
441 if not isinstance(context, _op.Context):
442 raise gmxapi.exceptions.ValueError('StandardDirector requires a gmxapi.operation Context.')
443 self.context = context
445 def __call__(self, resources: _op.DataSourceCollection, label: str = None) -> StandardOperationHandle:
446 builder = self.context.node_builder(operation=RegisteredOperation, label=label)
448 builder.set_resource_factory(SubscriptionSessionResources)
449 builder.set_input_description(StandardInputDescription())
450 builder.set_handle(StandardOperationHandle)
451 # The runner in the gmxapi.operation context is the servicer for the legacy context.
452 builder.set_runner_director(SubscriptionPublishingRunner)
453 builder.set_output_factory(_output_factory)
454 builder.set_resource_manager(ResourceManager)
456 # Note: we have not yet done any dispatching based on *resources*. We should
457 # translate the resources provided into the form that the Context can subscribe to
458 # using the dispatching resource_factory. In the second draft, this operation
459 # is dispatched to a SimulationModuleContext, which can be subscribed directly
460 # to sources that are either literal filenames or gmxapi.simulation sources,
461 # while standard Futures can be resolved in the standard context.
463 assert isinstance(resources, _op.DataSourceCollection)
464 for name, source in resources.items():
465 builder.add_input(name, source)
467 handle = builder.build()
468 assert isinstance(handle, StandardOperationHandle)
471 def handle_type(self, context: gmxapi.abc.Context):
472 return StandardOperationHandle
474 # Developer note: the Director instance is a convenient place to get a dispatching
475 # factory. The Director may become generic or more universal, but the resource_factory
476 # would likely not be typed on the generic parameters of the Director class.
477 # Instead, it is likely a generic function with its own TypeVar parameters.
478 def resource_factory(self,
479 source: typing.Union[gmxapi.abc.Context, ModuleObject, None],
480 target: gmxapi.abc.Context = None):
481 # Distinguish between the UIContext, in which input is in the form
482 # of function call arguments, and the StandardContext, implemented in
483 # gmxapi.operation. UIContext is probably a virtual context that is
484 # asserted by callers in order to get a factory that normalizes UI input
485 # for the StandardContext.
488 target = self.context
490 if isinstance(target, _op.Context):
491 # Return a factory that can bind to function call arguments to produce a DataSourceCollection.
492 return _standard_node_resource_factory
493 if isinstance(source, _op.Context):
494 return SubscriptionSessionResources
495 if isinstance(source, ModuleObject):
496 if isinstance(target, _op.Context):
497 # We are creating a node in gmxapi.operation.Context from another gmxapi.simulation operation.
498 # This means that we want to subscribe to the subcontext instead of the gmxapi.operation.Context.
499 # In the first draft, though, we just access a special payload.
500 # Return a factory that will consume *_simulation_input* and *parameters*
501 # members of a received object.
502 logger.info('Building mdrun operation from source {}'.format(source))
504 def simulation_input_workaround(input):
506 if hasattr(source, 'output'):
507 source = input.output
508 assert hasattr(source, '_simulation_input')
509 assert hasattr(source, 'parameters')
510 logger.info('mdrun receiving input {}: {}'.format(source._simulation_input.name,
511 source._simulation_input.description))
512 source_collection = _input.bind(_simulation_input=source._simulation_input,
513 parameters=source.parameters)
514 logger.info('mdrun input bound as source collection {}'.format(source_collection))
515 return source_collection
517 return simulation_input_workaround
519 raise gmxapi.exceptions.ValueError('No dispatching from {} context to {}'.format(source, target))
522 def mdrun(input, label: str = None, context=None):
523 """MD simulation operation.
526 input : valid simulation input
529 runnable operation to perform the specified simulation
531 The *output* attribute of the returned operation handle contains dynamically
532 determined outputs from the operation.
534 `input` may be a TPR file name or a an object providing the SimulationInput interface.
537 New function names will be appearing to handle tasks that are separate
539 "simulate" is plausibly a dispatcher or base class for various tasks
540 dispatched by mdrun. Specific work factories are likely "minimize,"
541 "test_particle_insertion," "legacy_simulation" (do_md), or "simulation"
542 composition (which may be leap-frog, vv, and other algorithms)
544 handle_context = context
545 if handle_context is not None:
546 raise gmxapi.exceptions.NotImplementedError(
547 'context must be None. This factory is only for the Python UI right now.')
549 target_context = _op.current_context()
550 assert isinstance(target_context, _op.Context)
551 # Get a director that will create a node in the standard context.
552 node_director = _op._get_operation_director(RegisteredOperation, context=target_context)
553 assert isinstance(node_director, StandardDirector)
554 # TODO: refine this protocol
555 assert handle_context is None
557 if isinstance(input, ModuleObject):
558 # The input is from read_tpr or modify_input.
559 source_context = input
561 # Allow automatic dispatching
562 source_context = None
564 resource_factory = node_director.resource_factory(source=source_context, target=target_context)
565 resources = resource_factory(input)
566 handle = node_director(resources=resources, label=label)
567 # Note: One effect of the assertions above is to help the type checker infer
568 # the return type of the handle. It is hard to convince the type checker that
569 # the return value of the node builder is up-cast. We might be able to resolve
570 # this by making both get_operation_director and ReadTprImplementation
571 # generics of the handle type, or using the handle type as the key for
572 # get_operation_director.