src.plangym.vectorization.parallel#
Handle parallelization for plangym.Environment that allows vectorized steps.
Classes#
Step environment in a separate process for lock free paralellism. |
|
Combine multiple environments to step them in batch. |
|
Allow any environment to be stepped in parallel when step_batch is called. |
Module Contents#
- class src.plangym.vectorization.parallel.ExternalProcess(constructor)[source]#
Step environment in a separate process for lock free paralellism.
The environment will be created in the external process by calling the specified callable. This can be an environment class, or a function creating the environment and potentially wrapping it. The returned environment should not access global variables.
- Parameters:
constructor – Callable that creates and returns an OpenAI gym environment.
- observation_space#
The cached observation space of the environment.
- action_space#
The cached action space of the environment.
- ..notes:
This is mostly a copy paste from tensorflow/agents, that lets us set and read the environment state.
- _ACCESS = 1#
- _CALL = 2#
- _RESULT = 3#
- _EXCEPTION = 4#
- _CLOSE = 5#
- _process#
- _observ_space = None#
- _action_space = None#
- property observation_space#
- Return the observation space of the internal environment.
- property action_space#
- Return the action space of the internal environment.
- __getattr__(name)[source]#
Request an attribute from the environment.
Note that this involves communication with the external process, so it can be slow.
- Parameters:
name – Attribute to access.
- Returns:
Value of the attribute.
- call(name, *args, **kwargs)[source]#
Asynchronously call a method of the external environment.
- Parameters:
name – Name of the method to call.
*args – Positional arguments to forward to the method.
**kwargs – Keyword arguments to forward to the method.
- Returns:
Promise object that blocks and provides the return value when called.
- step_batch(actions, states=None, dt=None, return_state=None, blocking=True)[source]#
Vectorized version of the
stepmethod.It allows to step a vector of states and actions. The signature and behaviour is the same as
step, but taking a list of states, actions and dts as input.- Parameters:
actions – Iterable containing the different actions to be applied.
states – Iterable containing the different states to be set.
dt (numpy.ndarray | int) – int or array containing the frameskips that will be applied.
blocking – If True, execute sequentially.
return_state (bool | None) – Whether to return the state in the returned tuple. If None, step will return the state if state was passed as a parameter.
- Returns:
if states is None returns
(observs, rewards, ends, infos)else returns(new_states, observs, rewards, ends, infos)
- step(action, state=None, dt=1, blocking=True)[source]#
Step the environment.
- Parameters:
action – The action to apply to the environment.
state – State to be set on the environment before stepping it.
dt (int) – Number of consecutive times that action will be applied.
blocking – Whether to wait for the result.
- Returns:
Transition tuple when blocking, otherwise callable that returns the transition tuple.
- reset(blocking=True, return_states=False)[source]#
Reset the environment.
- Parameters:
blocking – Whether to wait for the result.
return_states (bool) – If true return also the initial state of the environment.
- Returns:
New observation when blocking, otherwise callable that returns the new observation.
- class src.plangym.vectorization.parallel.BatchEnv(envs, blocking)[source]#
Combine multiple environments to step them in batch.
It is mostly a copy paste from tensorflow/agents that also allows to set and get the states.
To step environments in parallel, environments must support a
blocking=Falseargument to their step and reset functions that makes them return callables instead to receive the result at a later time.- Parameters:
envs – List of environments.
blocking – Step environments after another rather than in parallel.
- Raises:
ValueError – Environments have different observation or action spaces.
- _envs#
- _blocking#
- __getattr__(name)[source]#
Forward unimplemented attributes to one of the original environments.
- Parameters:
name – Attribute that was accessed.
- Returns:
Value behind the attribute name one of the wrapped environments.
- make_transitions(actions, states=None, dt=1, return_state=None)[source]#
Implement the logic for stepping the environment in parallel.
- Parameters:
dt (numpy.ndarray | int)
return_state (bool | None)
- sync_states(state, blocking=True)[source]#
Set the same state to all the environments that are inside an external process.
- Parameters:
state – Target state to set on the environments.
blocking (bool) – If
Trueperform the update sequentially. IfFalsestep the environments in parallel.
- Returns:
None.
- Return type:
None
- reset(indices=None, return_states=True)[source]#
Reset the environment and return the resulting batch data.
- Parameters:
indices – The batch indices of environments to reset; defaults to all.
return_states (bool) – return the corresponding states after reset.
- Returns:
Batch of observations. If
return_statesisTruereturn a tuple containing(batch_of_observations, batch_of_states).
- class src.plangym.vectorization.parallel.ParallelEnv(env_class, name, frameskip=1, autoreset=True, delay_setup=False, n_workers=8, blocking=False, **kwargs)[source]#
Bases:
plangym.vectorization.env.VectorizedEnvAllow any environment to be stepped in parallel when step_batch is called.
It creates a local instance of the target environment to call all other methods.
Example:
>>> from plangym.videogames import AtariEnv >>> env = ParallelEnv(env_class=AtariEnv, ... name="MsPacman-v0", ... clone_seeds=True, ... autoreset=True, ... blocking=False) >>> >>> state, obs, info = env.reset() >>> >>> states = [state.copy() for _ in range(10)] >>> actions = [env.sample_action() for _ in range(10)] >>> >>> data = env.step_batch(states=states, actions=actions) >>> new_states, observs, rewards, ends, truncateds, infos = data
- Parameters:
name (str)
frameskip (int)
autoreset (bool)
delay_setup (bool)
n_workers (int)
blocking (bool)
- _blocking#
- _batch_env = None#
- property blocking: bool#
If True the steps are performed sequentially.
- Return type:
bool
- setup()[source]#
Run environment initialization and create the subprocesses for stepping in parallel.
- make_transitions(actions, states=None, dt=1, return_state=None)[source]#
Vectorized version of the
stepmethod.It allows to step a vector of states and actions. The signature and behaviour is the same as
step, but taking a list of states, actions and dts as input.- Parameters:
actions (numpy.ndarray) – Iterable containing the different actions to be applied.
states (numpy.ndarray) – Iterable containing the different states to be set.
dt (numpy.ndarray | int) – int or array containing the frameskips that will be applied.
return_state (bool | None) – Whether to return the state in the returned tuple. If None, step will return the state if state was passed as a parameter.
- Returns:
if states is None returns
(observs, rewards, ends, truncateds, infos)else(new_states, observs, rewards, ends, truncateds, infos)