2 # This file is part of the GROMACS molecular simulation package.
4 # Copyright (c) 2019, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 # Lesser General Public License for more details.
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
35 """mdrun operation module
37 The mdrun operation (in its first draft) conforms to the user-level API, but does
38 not use the Python Context resource manager. It uses either the legacy 0.0.7
39 Context or its own Context, also implemented in this module.
47 from contextlib import contextmanager
51 import gmxapi.operation as _op
52 from gmxapi import exceptions
54 # The following imports are not marked as public API.
55 from .abc import ModuleObject
58 from . import workflow
60 # Initialize module-level logger
61 from gmxapi import logger as root_logger
63 logger = root_logger.getChild('mdrun')
64 logger.info('Importing {}'.format(__name__))
67 # Output in the gmxapi.operation Context.
68 # TODO: Consider using a single base class for the DataProxy, but have distinct
69 # data descriptor behavior (or different descriptor implementations in different
70 # subclasses) so that static code inspection can more easily determine the
71 # attributes of the data proxies.
72 _output_descriptors = (
73 _op.OutputDataDescriptor('_work_dir', str),
75 _publishing_descriptors = {desc._name: gmxapi.operation.Publisher(desc._name, desc._dtype) for desc in
77 _output = _op.OutputCollectionDescription(**{descriptor._name: descriptor._dtype for descriptor in
81 class OutputDataProxy(_op.DataProxyBase,
82 descriptors=_output_descriptors):
83 """Implement the 'output' attribute of MDRun operations."""
86 class PublishingDataProxy(_op.DataProxyBase,
87 descriptors=_publishing_descriptors):
88 """Manage output resource updates for MDRun operation."""
91 _output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
92 output_description=_output,
93 publishing_data_proxy=PublishingDataProxy)
96 # Input in the gmxapi.operation Context for the dispatching runner.
97 # The default empty dictionary for parameters just means that there are no overrides
98 # to the parameters already provided in _simulation_input.
99 _input = _op.InputCollectionDescription(
100 [('_simulation_input', inspect.Parameter('_simulation_input',
101 inspect.Parameter.POSITIONAL_OR_KEYWORD,
103 ('parameters', inspect.Parameter('parameters',
104 inspect.Parameter.POSITIONAL_OR_KEYWORD,
110 def _standard_node_resource_factory(*args, **kwargs):
111 """Translate Python UI input to the gmxapi.operation node builder inputs."""
112 source_collection = _input.bind(*args, **kwargs)
113 logger.info('mdrun input bound as source collection {}'.format(source_collection))
114 return source_collection
118 def scoped_communicator(original_comm, requested_size: int = None):
119 from gmxapi.simulation.context import _acquire_communicator, _get_ensemble_communicator
121 if requested_size is None:
122 communicator = _acquire_communicator(communicator=original_comm)
125 if original_comm is None or not hasattr(original_comm, 'Get_size'):
126 raise exceptions.UsageError('A valid communicator must be provided when requesting a specific size.')
127 original_comm_size = original_comm.Get_size()
128 if original_comm_size < requested_size:
129 raise exceptions.FeatureNotAvailableError(
130 'Cannot produce a subcommunicator of size {} from a communicator of size {}.'.format(
134 assert original_comm_size >= requested_size
135 communicator = _get_ensemble_communicator(original_comm, requested_size)
143 class LegacyImplementationSubscription(object):
144 """Input type representing a subscription to 0.0.7 implementation in gmxapi.operation.
146 This input resource is a subscription to work that is dispatched to a sub-context.
147 The resource can be created from the standard data of the simulation module.
150 def __init__(self, resource_manager: _op.ResourceManager):
151 from .context import Context as LegacyContext
152 import gmxapi._gmxapi as _gmxapi
153 self._gmxapi = _gmxapi
155 assert isinstance(resource_manager, _op.ResourceManager)
156 # We use several details of the gmxapi.operation.Context.ResourceManager.
157 # These dependencies can evolve into the subscription protocol between Contexts.
159 # Configure and run a gmxapi 0.0.7 session.
160 # 0. Determine ensemble width.
161 # 1. Choose, create/check working directories.
162 # 2. Create source TPR.
163 # 3. Create workspec.
164 # 3.5 Add plugin potentials, if any.
166 # 5. Collect outputs from context (including plugin outputs) and be ready to publish them.
168 # Determine ensemble width
169 ensemble_width = resource_manager.ensemble_width
171 # Choose working directories
172 # TODO: operation working directory naming scheme should be centrally well-defined.
173 # Note that workflow.WorkSpec.uid is currently dependent on the input file parameter,
174 # so we cannot create the input file path in the working directory based on WorkSpec.uid.
175 workdir_list = ['{node}_{member}'.format(node=resource_manager.operation_id,
177 for member in range(ensemble_width)]
179 # This is a reasonable place to start using MPI ensemble implementation details.
180 # We will want better abstraction in the future, but it is best if related filesystem
181 # accesses occur from the same processes, consistently. Note that we already
182 # handle some optimization and dependency reduction when the ensemble size is 1.
183 # TODO: multithread and multiprocess alternatives to MPI ensemble management.
185 # TODO: Allow user to provide communicator instead of implicitly getting COMM_WORLD
186 with scoped_communicator(None) as context_comm:
187 context_rank = context_comm.Get_rank()
188 with scoped_communicator(context_comm, ensemble_width) as ensemble_comm:
189 # Note that in the current implementation, extra ranks have nothing to do,
190 # but they may have a dummy communicator, so be sure to skip those members
191 # of the context_comm.
192 if context_rank < ensemble_width:
193 assert ensemble_comm.Get_size() == ensemble_width
194 ensemble_rank = ensemble_comm.Get_rank()
195 # TODO: This should be a richer object that includes at least host information
196 # and preferably the full gmxapi Future interface.
197 self.workdir = os.path.abspath(workdir_list[ensemble_rank])
199 with resource_manager.local_input(member=ensemble_rank) as input_pack:
200 source_file = input_pack.kwargs['_simulation_input']
201 parameters = input_pack.kwargs['parameters']
202 # If there are any other key word arguments to process from the gmxapi.mdrun
203 # factory call, do it here.
205 # TODO: We should really name this file with a useful input-dependent tag.
206 tprfile = os.path.join(self.workdir, 'topol.tpr')
208 expected_working_files = [tprfile]
210 if os.path.exists(self.workdir):
211 if os.path.isdir(self.workdir):
212 # Confirm that this is a restarted simulation.
213 # It is unspecified by the API, but at least through gmxapi 0.1,
214 # all simulations are initialized with a checkpoint file named state.cpt
215 # (see src/api/cpp/context.cpp)
216 checkpoint_file = os.path.join(self.workdir, 'state.cpp')
217 expected_working_files.append(checkpoint_file)
219 for file in expected_working_files:
220 if not os.path.exists(file):
221 raise exceptions.ApiError(
222 'Cannot determine working directory state: {}'.format(self.workdir))
224 raise exceptions.ApiError(
225 'Chosen working directory path exists but is not a directory: {}'.format(self.workdir))
227 # Build the working directory and input files.
228 os.mkdir(self.workdir)
229 sim_input = fileio.read_tpr(source_file)
230 for key, value in parameters.items():
232 sim_input.parameters.set(key=key, value=value)
233 except _gmxapi.Exception as e:
234 raise exceptions.ApiError(
235 'Bug encountered. Unknown error when trying to set simulation '
236 'parameter {} to {}'.format(key, value)
239 fileio.write_tpr_file(output=tprfile, input=sim_input)
240 logger.info('Created {} on rank {}'.format(tprfile, context_rank))
242 # Gather the actual working directories from the ensemble members.
243 if hasattr(ensemble_comm, 'allgather'):
244 # We should not assume that abspath expands the same on different MPI ranks.
245 workdir_list = ensemble_comm.allgather(self.workdir)
246 tpr_filenames = ensemble_comm.allgather(tprfile)
248 workdir_list = [os.path.abspath(workdir) for workdir in workdir_list]
249 # TODO: If we use better input file names, they need to be updated in multiple places.
250 tpr_filenames = [os.path.join(workdir, 'topol.tpr') for workdir in workdir_list]
252 logger.debug('Context rank {} acknowledges working directories {}'.format(context_rank,
254 logger.debug('Operation {}:{} will use {}'.format(resource_manager.operation_id,
258 # TODO: We have not exposed the ability to pass any run time parameters to mdrun.
259 work = workflow.from_tpr(tpr_filenames)
260 self.workspec = work.workspec
261 context = LegacyContext(work=self.workspec, workdir_list=workdir_list, communicator=ensemble_comm)
262 self.simulation_module_context = context
263 # Go ahead and execute immediately. No need for lazy initialization in this basic case.
264 with self.simulation_module_context as session:
266 # TODO: There may be some additional results that we need to extract...
267 # end: if context_rank < ensemble_width
269 # end scoped_communicator: ensemble_comm
271 if context_comm.Get_size() > 1:
272 context_comm.bcast(workdir_list, root=0)
273 # end scoped_communicator: context_comm
275 self.workdir = workdir_list
278 class SubscriptionSessionResources(object):
279 """Input and output run-time resources for a MDRun subscription.
281 A generalization of this class is the probably the main hook for customizing the resources
282 provided to the operation at run time.
284 .. todo:: Better factoring of SessionResources, ResourceFactory, Director.resource_factory.
287 def __init__(self, input: LegacyImplementationSubscription, output: PublishingDataProxy):
288 assert isinstance(input, LegacyImplementationSubscription)
289 assert isinstance(output, PublishingDataProxy)
291 member_id = self.output._client_identifier
292 # Before enabling the following, be sure we understand what is happening.
293 # if member_id is None:
295 self.workdir = input.workdir[member_id]
298 class SubscriptionPublishingRunner(object):
299 """Handle execution in the gmxapi.operation context as a subscription to the gmxapi.simulation.context."""
300 input_resource_factory = LegacyImplementationSubscription
302 def __init__(self, resources: SubscriptionSessionResources):
303 assert isinstance(resources, SubscriptionSessionResources)
304 # Note that the resources contain a reference to a simulation ensemble that has already run.
305 self.resources = resources
308 """Operation implementation in the gmxapi.operation module context."""
309 publisher = self.resources.output
310 publisher._work_dir = self.resources.workdir
316 def _make_uid(input) -> str:
317 # TODO: Use input fingerprint for more useful identification.
320 new_uid = 'mdrun_{}_{}'.format(_next_uid, salt)
330 class ResourceManager(gmxapi.operation.ResourceManager):
331 """Manage resources for the MDRun operation in the gmxapi.operation contexts.
333 Extends gmxapi.operation.ResourceManager to tolerate non-standard data payloads.
334 Futures managed by this resource manager may contain additional attributes.
337 def future(self, name: str, description: _op.ResultDescription):
338 tpr_future = super().future(name=name, description=description)
341 def data(self) -> OutputDataProxy:
342 return OutputDataProxy(self)
344 def update_output(self):
345 """Override gmxapi.operation.ResourceManager.update_output because we handle paralellism as 0.0.7."""
346 # For the moment, this is copy-pasted from gmxapi.operation.ResourceManager,
347 # but the only part we need to override is the ensemble handling at `for i in range(self.ensemble_width)`
348 # TODO: Reimplement as the resource factory and director for the operation target context.
350 self.__operation_entrance_counter += 1
351 if self.__operation_entrance_counter > 1:
352 raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
354 # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
355 # operation's library/implementation context.
358 # Note: this is the resource translation from gmxapi.operation context
359 # to the dispatching runner director. It uses details of the gmxapi.operation.Context
360 # and of the operation.
362 # TODO: Dispatch/discover this resource factory from a canonical place.
363 assert hasattr(self._runner_director, 'input_resource_factory')
364 # Create on all ranks.
365 input = self._runner_director.input_resource_factory(self)
366 # End of action of the InputResourceDirector[Context, MdRunSubscription].
369 # We are giving the director a resource that contains the subscription
370 # to the dispatched work.
372 publishing_resources = self.publishing_resources()
373 for member in range(self.ensemble_width):
374 with publishing_resources(ensemble_member=member) as output:
375 resources = self._resource_factory(input=input, output=output)
376 runner = self._runner_director(resources)
380 class StandardInputDescription(_op.InputDescription):
381 """Provide the ReadTpr input description in gmxapi.operation Contexts."""
383 def signature(self) -> _op.InputCollectionDescription:
386 def make_uid(self, input: _op.DataEdge) -> str:
387 return _make_uid(input)
390 class RegisteredOperation(_op.OperationImplementation, metaclass=_op.OperationMeta):
391 """Provide the gmxapi compatible ReadTpr implementation."""
393 # This is a class method to allow the class object to be used in gmxapi.operation._make_registry_key
395 def name(self) -> str:
396 """Canonical name for the operation."""
400 def namespace(self) -> str:
401 """read_tpr is importable from the gmxapi module."""
405 def director(cls, context: gmxapi.abc.Context) -> _op.OperationDirector:
406 if isinstance(context, _op.Context):
407 return StandardDirector(context)
410 class StandardOperationHandle(_op.AbstractOperation):
411 """Handle used in Python UI or gmxapi.operation Contexts."""
413 def __init__(self, resource_manager: ResourceManager):
414 self.__resource_manager = resource_manager
417 self.__resource_manager.update_output()
420 def output(self) -> OutputDataProxy:
421 return self.__resource_manager.data()
424 class StandardDirector(gmxapi.abc.OperationDirector):
425 """Direct the instantiation of a read_tpr node in a gmxapi.operation Context.
427 .. todo:: Compose this behavior in a more generic class.
429 .. todo:: Describe where instances live.
432 def __init__(self, context: _op.Context):
433 if not isinstance(context, _op.Context):
434 raise gmxapi.exceptions.ValueError('StandardDirector requires a gmxapi.operation Context.')
435 self.context = context
437 def __call__(self, resources: _op.DataSourceCollection, label: str = None) -> StandardOperationHandle:
438 builder = self.context.node_builder(operation=RegisteredOperation, label=label)
440 builder.set_resource_factory(SubscriptionSessionResources)
441 builder.set_input_description(StandardInputDescription())
442 builder.set_handle(StandardOperationHandle)
443 # The runner in the gmxapi.operation context is the servicer for the legacy context.
444 builder.set_runner_director(SubscriptionPublishingRunner)
445 builder.set_output_factory(_output_factory)
446 builder.set_resource_manager(ResourceManager)
448 # Note: we have not yet done any dispatching based on *resources*. We should
449 # translate the resources provided into the form that the Context can subscribe to
450 # using the dispatching resource_factory. In the second draft, this operation
451 # is dispatched to a SimulationModuleContext, which can be subscribed directly
452 # to sources that are either literal filenames or gmxapi.simulation sources,
453 # while standard Futures can be resolved in the standard context.
455 assert isinstance(resources, _op.DataSourceCollection)
456 for name, source in resources.items():
457 builder.add_input(name, source)
459 handle = builder.build()
460 assert isinstance(handle, StandardOperationHandle)
463 def handle_type(self, context: gmxapi.abc.Context):
464 return StandardOperationHandle
466 # Developer note: the Director instance is a convenient place to get a dispatching
467 # factory. The Director may become generic or more universal, but the resource_factory
468 # would likely not be typed on the generic parameters of the Director class.
469 # Instead, it is likely a generic function with its own TypeVar parameters.
470 def resource_factory(self,
471 source: typing.Union[gmxapi.abc.Context, ModuleObject, None],
472 target: gmxapi.abc.Context = None):
473 # Distinguish between the UIContext, in which input is in the form
474 # of function call arguments, and the StandardContext, implemented in
475 # gmxapi.operation. UIContext is probably a virtual context that is
476 # asserted by callers in order to get a factory that normalizes UI input
477 # for the StandardContext.
480 target = self.context
482 if isinstance(target, _op.Context):
483 # Return a factory that can bind to function call arguments to produce a DataSourceCollection.
484 return _standard_node_resource_factory
485 if isinstance(source, _op.Context):
486 return SubscriptionSessionResources
487 if isinstance(source, ModuleObject):
488 if isinstance(target, _op.Context):
489 # We are creating a node in gmxapi.operation.Context from another gmxapi.simulation operation.
490 # This means that we want to subscribe to the subcontext instead of the gmxapi.operation.Context.
491 # In the first draft, though, we just access a special payload.
492 # Return a factory that will consume *_simulation_input* and *parameters*
493 # members of a received object.
494 logger.info('Building mdrun operation from source {}'.format(source))
496 def simulation_input_workaround(input):
498 if hasattr(source, 'output'):
499 source = input.output
500 assert hasattr(source, '_simulation_input')
501 assert hasattr(source, 'parameters')
502 logger.info('mdrun receiving input {}: {}'.format(source._simulation_input.name,
503 source._simulation_input.description))
504 source_collection = _input.bind(_simulation_input=source._simulation_input,
505 parameters=source.parameters)
506 logger.info('mdrun input bound as source collection {}'.format(source_collection))
507 return source_collection
509 return simulation_input_workaround
511 raise gmxapi.exceptions.ValueError('No dispatching from {} context to {}'.format(source, target))
514 def mdrun(input, label: str = None, context=None):
515 """MD simulation operation.
518 input : valid simulation input
521 runnable operation to perform the specified simulation
523 The *output* attribute of the returned operation handle contains dynamically
524 determined outputs from the operation.
526 `input` may be a TPR file name or a an object providing the SimulationInput interface.
529 New function names will be appearing to handle tasks that are separate
531 "simulate" is plausibly a dispatcher or base class for various tasks
532 dispatched by mdrun. Specific work factories are likely "minimize,"
533 "test_particle_insertion," "legacy_simulation" (do_md), or "simulation"
534 composition (which may be leap-frog, vv, and other algorithms)
536 handle_context = context
537 if handle_context is not None:
538 raise gmxapi.exceptions.NotImplementedError(
539 'context must be None. This factory is only for the Python UI right now.')
541 target_context = _op.current_context()
542 assert isinstance(target_context, _op.Context)
543 # Get a director that will create a node in the standard context.
544 node_director = _op._get_operation_director(RegisteredOperation, context=target_context)
545 assert isinstance(node_director, StandardDirector)
546 # TODO: refine this protocol
547 assert handle_context is None
549 if isinstance(input, ModuleObject):
550 # The input is from read_tpr or modify_input.
551 source_context = input
553 # Allow automatic dispatching
554 source_context = None
556 resource_factory = node_director.resource_factory(source=source_context, target=target_context)
557 resources = resource_factory(input)
558 handle = node_director(resources=resources, label=label)
559 # Note: One effect of the assertions above is to help the type checker infer
560 # the return type of the handle. It is hard to convince the type checker that
561 # the return value of the node builder is up-cast. We might be able to resolve
562 # this by making both get_operation_director and ReadTprImplementation
563 # generics of the handle type, or using the handle type as the key for
564 # get_operation_director.