2 # This file is part of the GROMACS molecular simulation package.
4 # Copyright (c) 2019, by the GROMACS development team, led by
5 # Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6 # and including many others, as listed in the AUTHORS file in the
7 # top-level source directory and at http://www.gromacs.org.
9 # GROMACS is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public License
11 # as published by the Free Software Foundation; either version 2.1
12 # of the License, or (at your option) any later version.
14 # GROMACS is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 # Lesser General Public License for more details.
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with GROMACS; if not, see
21 # http://www.gnu.org/licenses, or write to the Free Software Foundation,
22 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 # If you want to redistribute modifications to GROMACS, please
25 # consider that scientific software is very special. Version
26 # control is crucial - bugs must be traceable. We will be happy to
27 # consider code for inclusion in the official distribution, but
28 # derived work must not be called official GROMACS. Details are found
29 # in the README & COPYING files - if they are missing, get the
30 # official version at http://www.gromacs.org.
32 # To help us fund GROMACS development, we humbly ask that you cite
33 # the research papers on the package. Check out http://www.gromacs.org.
35 """mdrun operation module
37 The mdrun operation (in its first draft) conforms to the user-level API, but does
38 not use the Python Context resource manager. It uses either the legacy 0.0.7
39 Context or its own Context, also implemented in this module.
47 from contextlib import contextmanager
51 import gmxapi.operation as _op
52 from gmxapi import exceptions
54 # The following imports are not marked as public API.
55 from .abc import ModuleObject
58 from . import workflow
60 # Initialize module-level logger
61 from gmxapi import logger as root_logger
63 logger = root_logger.getChild('mdrun')
64 logger.info('Importing {}'.format(__name__))
67 class OutputDataProxy(_op.DataProxyBase,
69 '_work_dir': _op.OutputDataDescriptor('_work_dir', str)}):
70 """Implement the 'output' attribute of MDRun operations."""
73 class Publisher(gmxapi.operation.Publisher):
74 """Implement the publishing data descriptor for MDRun output."""
76 def __set__(self, instance: 'PublishingDataProxy', value):
77 super().__set__(instance, value)
80 class PublishingDataProxy(_op.DataProxyBase,
82 '_work_dir': Publisher('_work_dir', str)}
84 """Manage output resource updates for MDRun operation."""
91 # Output in the gmxapi.operation Context.
92 _output = _op.OutputCollectionDescription(_work_dir=str)
94 # Input in the gmxapi.operation Context for the dispatching runner.
95 # The default empty dictionary for parameters just means that there are no overrides
96 # to the parameters already provided in _simulation_input.
97 _input = _op.InputCollectionDescription(
98 [('_simulation_input', inspect.Parameter('_simulation_input',
99 inspect.Parameter.POSITIONAL_OR_KEYWORD,
101 ('parameters', inspect.Parameter('parameters',
102 inspect.Parameter.POSITIONAL_OR_KEYWORD,
108 def _standard_node_resource_factory(*args, **kwargs):
109 """Translate Python UI input to the gmxapi.operation node builder inputs."""
110 source_collection = _input.bind(*args, **kwargs)
111 logger.info('mdrun input bound as source collection {}'.format(source_collection))
112 return source_collection
115 _output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
116 output_description=_output,
117 publishing_data_proxy=PublishingDataProxy)
121 def scoped_communicator(original_comm, requested_size: int = None):
122 from gmxapi.simulation.context import _acquire_communicator, _get_ensemble_communicator
124 if requested_size is None:
125 communicator = _acquire_communicator(communicator=original_comm)
128 if original_comm is None or not hasattr(original_comm, 'Get_size'):
129 raise exceptions.UsageError('A valid communicator must be provided when requesting a specific size.')
130 original_comm_size = original_comm.Get_size()
131 if original_comm_size < requested_size:
132 raise exceptions.FeatureNotAvailableError(
133 'Cannot produce a subcommunicator of size {} from a communicator of size {}.'.format(
137 assert original_comm_size >= requested_size
138 communicator = _get_ensemble_communicator(original_comm, requested_size)
146 class LegacyImplementationSubscription(object):
147 """Input type representing a subscription to 0.0.7 implementation in gmxapi.operation.
149 This input resource is a subscription to work that is dispatched to a sub-context.
150 The resource can be created from the standard data of the simulation module.
153 def __init__(self, resource_manager: _op.ResourceManager):
154 from .context import Context as LegacyContext
155 import gmxapi._gmxapi as _gmxapi
156 self._gmxapi = _gmxapi
158 assert isinstance(resource_manager, _op.ResourceManager)
159 # We use several details of the gmxapi.operation.Context.ResourceManager.
160 # These dependencies can evolve into the subscription protocol between Contexts.
162 # Configure and run a gmxapi 0.0.7 session.
163 # 0. Determine ensemble width.
164 # 1. Choose, create/check working directories.
165 # 2. Create source TPR.
166 # 3. Create workspec.
167 # 3.5 Add plugin potentials, if any.
169 # 5. Collect outputs from context (including plugin outputs) and be ready to publish them.
171 # Determine ensemble width
172 ensemble_width = resource_manager.ensemble_width
174 # Choose working directories
175 # TODO: operation working directory naming scheme should be centrally well-defined.
176 # Note that workflow.WorkSpec.uid is currently dependent on the input file parameter,
177 # so we cannot create the input file path in the working directory based on WorkSpec.uid.
178 workdir_list = ['{node}_{member}'.format(node=resource_manager.operation_id,
180 for member in range(ensemble_width)]
182 # This is a reasonable place to start using MPI ensemble implementation details.
183 # We will want better abstraction in the future, but it is best if related filesystem
184 # accesses occur from the same processes, consistently. Note that we already
185 # handle some optimization and dependency reduction when the ensemble size is 1.
186 # TODO: multithread and multiprocess alternatives to MPI ensemble management.
188 # TODO: Allow user to provide communicator instead of implicitly getting COMM_WORLD
189 with scoped_communicator(None) as context_comm:
190 context_rank = context_comm.Get_rank()
191 with scoped_communicator(context_comm, ensemble_width) as ensemble_comm:
192 # Note that in the current implementation, extra ranks have nothing to do,
193 # but they may have a dummy communicator, so be sure to skip those members
194 # of the context_comm.
195 if context_rank < ensemble_width:
196 assert ensemble_comm.Get_size() == ensemble_width
197 ensemble_rank = ensemble_comm.Get_rank()
198 # TODO: This should be a richer object that includes at least host information
199 # and preferably the full gmxapi Future interface.
200 self.workdir = os.path.abspath(workdir_list[ensemble_rank])
202 with resource_manager.local_input(member=ensemble_rank) as input_pack:
203 source_file = input_pack.kwargs['_simulation_input']
204 parameters = input_pack.kwargs['parameters']
205 # If there are any other key word arguments to process from the gmxapi.mdrun
206 # factory call, do it here.
208 # TODO: We should really name this file with a useful input-dependent tag.
209 tprfile = os.path.join(self.workdir, 'topol.tpr')
211 expected_working_files = [tprfile]
213 if os.path.exists(self.workdir):
214 if os.path.isdir(self.workdir):
215 # Confirm that this is a restarted simulation.
216 # It is unspecified by the API, but at least through gmxapi 0.1,
217 # all simulations are initialized with a checkpoint file named state.cpt
218 # (see src/api/cpp/context.cpp)
219 checkpoint_file = os.path.join(self.workdir, 'state.cpp')
220 expected_working_files.append(checkpoint_file)
222 for file in expected_working_files:
223 if not os.path.exists(file):
224 raise exceptions.ApiError(
225 'Cannot determine working directory state: {}'.format(self.workdir))
227 raise exceptions.ApiError(
228 'Chosen working directory path exists but is not a directory: {}'.format(self.workdir))
230 # Build the working directory and input files.
231 os.mkdir(self.workdir)
232 sim_input = fileio.read_tpr(source_file)
233 for key, value in parameters.items():
235 sim_input.parameters.set(key=key, value=value)
236 except _gmxapi.Exception as e:
237 raise exceptions.ApiError(
238 'Bug encountered. Unknown error when trying to set simulation '
239 'parameter {} to {}'.format(key, value)
242 fileio.write_tpr_file(output=tprfile, input=sim_input)
243 logger.info('Created {} on rank {}'.format(tprfile, context_rank))
245 # Gather the actual working directories from the ensemble members.
246 if hasattr(ensemble_comm, 'allgather'):
247 # We should not assume that abspath expands the same on different MPI ranks.
248 workdir_list = ensemble_comm.allgather(self.workdir)
249 tpr_filenames = ensemble_comm.allgather(tprfile)
251 workdir_list = [os.path.abspath(workdir) for workdir in workdir_list]
252 # TODO: If we use better input file names, they need to be updated in multiple places.
253 tpr_filenames = [os.path.join(workdir, 'topol.tpr') for workdir in workdir_list]
255 logger.debug('Context rank {} acknowledges working directories {}'.format(context_rank,
257 logger.debug('Operation {}:{} will use {}'.format(resource_manager.operation_id,
261 work = workflow.from_tpr(tpr_filenames)
262 self.workspec = work.workspec
263 context = LegacyContext(work=self.workspec, workdir_list=workdir_list, communicator=ensemble_comm)
264 self.simulation_module_context = context
265 # Go ahead and execute immediately. No need for lazy initialization in this basic case.
266 with self.simulation_module_context as session:
268 # TODO: There may be some additional results that we need to extract...
269 # end: if context_rank < ensemble_width
271 # end scoped_communicator: ensemble_comm
273 if context_comm.Get_size() > 1:
274 context_comm.bcast(workdir_list, root=0)
275 # end scoped_communicator: context_comm
277 self.workdir = workdir_list
280 class SubscriptionSessionResources(object):
281 """Input and output run-time resources for a MDRun subscription.
283 A generalization of this class is the probably the main hook for customizing the resources
284 provided to the operation at run time.
286 .. todo:: Better factoring of SessionResources, ResourceFactory, Director.resource_factory.
289 def __init__(self, input: LegacyImplementationSubscription, output: PublishingDataProxy):
290 assert isinstance(input, LegacyImplementationSubscription)
291 assert isinstance(output, PublishingDataProxy)
293 member_id = self.output._client_identifier
294 # Before enabling the following, be sure we understand what is happening.
295 # if member_id is None:
297 self.workdir = input.workdir[member_id]
300 # def create(input: LegacyImplementationSubscription,
301 # output: PublishingDataProxy) -> 'SubscriptionSessionResources':
302 # """Factory function to create new resource instances.
304 # It is better to overload a factory member than to write a fancy __init__.
305 # An alternative to overloading is to make this a classmethod for polymorphic
306 # creation. Note that Python classes (can) know what their subclasses are.
308 # if isinstance(input, LegacyImplementationSubscription):
309 # return _session_resource_factory(input, output)
312 class SubscriptionPublishingRunner(object):
313 """Handle execution in the gmxapi.operation context as a subscription to the gmxapi.simulation.context."""
314 input_resource_factory = LegacyImplementationSubscription
316 def __init__(self, resources: SubscriptionSessionResources):
317 assert isinstance(resources, SubscriptionSessionResources)
318 # Note that the resources contain a reference to a simulation ensemble that has already run.
319 self.resources = resources
322 """Operation implementation in the gmxapi.operation module context."""
323 publisher = self.resources.output
324 publisher._work_dir = self.resources.workdir
330 def _make_uid(input) -> str:
331 # TODO: Use input fingerprint for more useful identification.
334 new_uid = 'mdrun_{}_{}'.format(_next_uid, salt)
344 class ResourceManager(gmxapi.operation.ResourceManager):
345 """Manage resources for the MDRun operation in the gmxapi.operation contexts.
347 Extends gmxapi.operation.ResourceManager to tolerate non-standard data payloads.
348 Futures managed by this resource manager may contain additional attributes.
351 def future(self, name: str, description: _op.ResultDescription):
352 tpr_future = super().future(name=name, description=description)
355 def data(self) -> OutputDataProxy:
356 return OutputDataProxy(self)
358 def update_output(self):
359 """Override gmxapi.operation.ResourceManager.update_output because we handle paralellism as 0.0.7."""
360 # For the moment, this is copy-pasted from gmxapi.operation.ResourceManager,
361 # but the only part we need to override is the ensemble handling at `for i in range(self.ensemble_width)`
362 # TODO: Reimplement as the resource factory and director for the operation target context.
364 self.__operation_entrance_counter += 1
365 if self.__operation_entrance_counter > 1:
366 raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
368 # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
369 # operation's library/implementation context.
372 # Note: this is the resource translation from gmxapi.operation context
373 # to the dispatching runner director. It uses details of the gmxapi.operation.Context
374 # and of the operation.
376 # TODO: Dispatch/discover this resource factory from a canonical place.
377 assert hasattr(self._runner_director, 'input_resource_factory')
378 # Create on all ranks.
379 input = self._runner_director.input_resource_factory(self)
380 # End of action of the InputResourceDirector[Context, MdRunSubscription].
383 # We are giving the director a resource that contains the subscription
384 # to the dispatched work.
386 publishing_resources = self.publishing_resources()
387 for member in range(self.ensemble_width):
388 with publishing_resources(ensemble_member=member) as output:
389 resources = self._resource_factory(input=input, output=output)
390 runner = self._runner_director(resources)
394 class StandardInputDescription(_op.InputDescription):
395 """Provide the ReadTpr input description in gmxapi.operation Contexts."""
397 def signature(self) -> _op.InputCollectionDescription:
400 def make_uid(self, input: _op.DataEdge) -> str:
401 return _make_uid(input)
404 class RegisteredOperation(_op.OperationImplementation, metaclass=_op.OperationMeta):
405 """Provide the gmxapi compatible ReadTpr implementation."""
407 # This is a class method to allow the class object to be used in gmxapi.operation._make_registry_key
409 def name(self) -> str:
410 """Canonical name for the operation."""
414 def namespace(self) -> str:
415 """read_tpr is importable from the gmxapi module."""
419 def director(cls, context: gmxapi.abc.Context) -> _op.OperationDirector:
420 if isinstance(context, _op.Context):
421 return StandardDirector(context)
424 class StandardOperationHandle(_op.AbstractOperation):
425 """Handle used in Python UI or gmxapi.operation Contexts."""
427 def __init__(self, resource_manager: ResourceManager):
428 self.__resource_manager = resource_manager
431 self.__resource_manager.update_output()
434 def output(self) -> OutputDataProxy:
435 return self.__resource_manager.data()
438 class StandardDirector(gmxapi.abc.OperationDirector):
439 """Direct the instantiation of a read_tpr node in a gmxapi.operation Context.
441 .. todo:: Compose this behavior in a more generic class.
443 .. todo:: Describe where instances live.
446 def __init__(self, context: _op.Context):
447 if not isinstance(context, _op.Context):
448 raise gmxapi.exceptions.ValueError('StandardDirector requires a gmxapi.operation Context.')
449 self.context = context
451 def __call__(self, resources: _op.DataSourceCollection, label: str = None) -> StandardOperationHandle:
452 builder = self.context.node_builder(operation=RegisteredOperation, label=label)
454 builder.set_resource_factory(SubscriptionSessionResources)
455 builder.set_input_description(StandardInputDescription())
456 builder.set_handle(StandardOperationHandle)
457 # The runner in the gmxapi.operation context is the servicer for the legacy context.
458 builder.set_runner_director(SubscriptionPublishingRunner)
459 builder.set_output_factory(_output_factory)
460 builder.set_resource_manager(ResourceManager)
462 # Note: we have not yet done any dispatching based on *resources*. We should
463 # translate the resources provided into the form that the Context can subscribe to
464 # using the dispatching resource_factory. In the second draft, this operation
465 # is dispatched to a SimulationModuleContext, which can be subscribed directly
466 # to sources that are either literal filenames or gmxapi.simulation sources,
467 # while standard Futures can be resolved in the standard context.
469 assert isinstance(resources, _op.DataSourceCollection)
470 for name, source in resources.items():
471 builder.add_input(name, source)
473 handle = builder.build()
474 assert isinstance(handle, StandardOperationHandle)
477 def handle_type(self, context: gmxapi.abc.Context):
478 return StandardOperationHandle
480 # Developer note: the Director instance is a convenient place to get a dispatching
481 # factory. The Director may become generic or more universal, but the resource_factory
482 # would likely not be typed on the generic parameters of the Director class.
483 # Instead, it is likely a generic function with its own TypeVar parameters.
484 def resource_factory(self,
485 source: typing.Union[gmxapi.abc.Context, ModuleObject, None],
486 target: gmxapi.abc.Context = None):
487 # Distinguish between the UIContext, in which input is in the form
488 # of function call arguments, and the StandardContext, implemented in
489 # gmxapi.operation. UIContext is probably a virtual context that is
490 # asserted by callers in order to get a factory that normalizes UI input
491 # for the StandardContext.
494 target = self.context
496 if isinstance(target, _op.Context):
497 # Return a factory that can bind to function call arguments to produce a DataSourceCollection.
498 return _standard_node_resource_factory
499 if isinstance(source, _op.Context):
500 return SubscriptionSessionResources
501 if isinstance(source, ModuleObject):
502 if isinstance(target, _op.Context):
503 # We are creating a node in gmxapi.operation.Context from another gmxapi.simulation operation.
504 # This means that we want to subscribe to the subcontext instead of the gmxapi.operation.Context.
505 # In the first draft, though, we just access a special payload.
506 # Return a factory that will consume *_simulation_input* and *parameters*
507 # members of a received object.
508 logger.info('Building mdrun operation from source {}'.format(source))
510 def simulation_input_workaround(input):
512 if hasattr(source, 'output'):
513 source = input.output
514 assert hasattr(source, '_simulation_input')
515 assert hasattr(source, 'parameters')
516 logger.info('mdrun receiving input {}: {}'.format(source._simulation_input.name,
517 source._simulation_input.description))
518 source_collection = _input.bind(_simulation_input=source._simulation_input,
519 parameters=source.parameters)
520 logger.info('mdrun input bound as source collection {}'.format(source_collection))
521 return source_collection
523 return simulation_input_workaround
525 raise gmxapi.exceptions.ValueError('No dispatching from {} context to {}'.format(source, target))
528 def mdrun(input, label: str = None, context=None):
529 """MD simulation operation.
532 input : valid simulation input
535 runnable operation to perform the specified simulation
537 The *output* attribute of the returned operation handle contains dynamically
538 determined outputs from the operation.
540 `input` may be a TPR file name or a an object providing the SimulationInput interface.
543 New function names will be appearing to handle tasks that are separate
545 "simulate" is plausibly a dispatcher or base class for various tasks
546 dispatched by mdrun. Specific work factories are likely "minimize,"
547 "test_particle_insertion," "legacy_simulation" (do_md), or "simulation"
548 composition (which may be leap-frog, vv, and other algorithms)
550 handle_context = context
551 if handle_context is not None:
552 raise gmxapi.exceptions.NotImplementedError(
553 'context must be None. This factory is only for the Python UI right now.')
555 target_context = _op.current_context()
556 assert isinstance(target_context, _op.Context)
557 # Get a director that will create a node in the standard context.
558 node_director = _op._get_operation_director(RegisteredOperation, context=target_context)
559 assert isinstance(node_director, StandardDirector)
560 # TODO: refine this protocol
561 assert handle_context is None
563 if isinstance(input, ModuleObject):
564 # The input is from read_tpr or modify_input.
565 source_context = input
567 # Allow automatic dispatching
568 source_context = None
570 resource_factory = node_director.resource_factory(source=source_context, target=target_context)
571 resources = resource_factory(input)
572 handle = node_director(resources=resources, label=label)
573 # Note: One effect of the assertions above is to help the type checker infer
574 # the return type of the handle. It is hard to convince the type checker that
575 # the return value of the node builder is up-cast. We might be able to resolve
576 # this by making both get_operation_director and ReadTprImplementation
577 # generics of the handle type, or using the handle type as the key for
578 # get_operation_director.