plangym.vectorization.parallel
Handle parallelization for plangym.Environment
that allows vectorized steps.
Module Contents
Classes
Step environment in a separate process for lock free paralellism. |
|
Combine multiple environments to step them in batch. |
|
Allow any environment to be stepped in parallel when step_batch is called. |
- class plangym.vectorization.parallel.ExternalProcess(constructor)[source]
Step environment in a separate process for lock free paralellism.
The environment will be created in the external process by calling the specified callable. This can be an environment class, or a function creating the environment and potentially wrapping it. The returned environment should not access global variables.
- Parameters
constructor – Callable that creates and returns an OpenAI gym environment.
- observation_space
The cached observation space of the environment.
- action_space
The cached action space of the environment.
- ..notes:
This is mostly a copy paste from https://github.com/tensorflow/agents/blob/master/agents/tools/wrappers.py, that lets us set and read the environment state.
- _ACCESS = 1
- _CALL = 2
- _RESULT = 3
- _EXCEPTION = 4
- _CLOSE = 5
- property observation_space(self)
Return the observation space of the internal environment.
- property action_space(self)
Return the action space of the internal environment.
- __getattr__(self, name)[source]
Request an attribute from the environment.
Note that this involves communication with the external process, so it can be slow.
- Parameters
name – Attribute to access.
- Returns
Value of the attribute.
- call(self, name, *args, **kwargs)[source]
Asynchronously call a method of the external environment.
- Parameters
name – Name of the method to call.
*args – Positional arguments to forward to the method.
**kwargs – Keyword arguments to forward to the method.
- Returns
Promise object that blocks and provides the return value when called.
- step_batch(self, actions, states=None, dt=None, return_state=None, blocking=True)[source]
Vectorized version of the
step
method.It allows to step a vector of states and actions. The signature and behaviour is the same as
step
, but taking a list of states, actions and dts as input.- Parameters
actions – Iterable containing the different actions to be applied.
states – Iterable containing the different states to be set.
dt (Union[numpy.ndarray, int]) – int or array containing the frameskips that will be applied.
blocking – If True, execute sequentially.
return_state (bool) – Whether to return the state in the returned tuple. If None, step will return the state if state was passed as a parameter.
- Returns
if states is None returns
(observs, rewards, ends, infos)
else returns(new_states, observs, rewards, ends, infos)
- step(self, action, state=None, dt=1, blocking=True)[source]
Step the environment.
- Parameters
action – The action to apply to the environment.
state – State to be set on the environment before stepping it.
dt (int) – Number of consecutive times that action will be applied.
blocking – Whether to wait for the result.
- Returns
Transition tuple when blocking, otherwise callable that returns the transition tuple.
- reset(self, blocking=True, return_states=False)[source]
Reset the environment.
- Parameters
blocking – Whether to wait for the result.
return_states (bool) – If true return also the initial state of the environment.
- Returns
New observation when blocking, otherwise callable that returns the new observation.
- class plangym.vectorization.parallel.BatchEnv(envs, blocking)[source]
Combine multiple environments to step them in batch.
It is mostly a copy paste from https://github.com/tensorflow/agents/blob/master/agents/tools/wrappers.py that also allows to set and get the states.
To step environments in parallel, environments must support a
blocking=False
argument to their step and reset functions that makes them return callables instead to receive the result at a later time.- Parameters
envs – List of environments.
blocking – Step environments after another rather than in parallel.
- Raises
ValueError – Environments have different observation or action spaces.
- __getattr__(self, name)[source]
Forward unimplemented attributes to one of the original environments.
- Parameters
name – Attribute that was accessed.
- Returns
Value behind the attribute name one of the wrapped environments.
- make_transitions(self, actions, states=None, dt=1, return_state=None)[source]
Implement the logic for stepping the environment in parallel.
- Parameters
dt (Union[numpy.ndarray, int]) –
return_state (bool) –
- sync_states(self, state, blocking=True)[source]
Set the same state to all the environments that are inside an external process.
- Parameters
state – Target state to set on the environments.
blocking (bool) – If
True
perform the update sequentially. IfFalse
step the environments in parallel.
- Returns
None.
- Return type
None
- reset(self, indices=None, return_states=True)[source]
Reset the environment and return the resulting batch observations, or batch of observations and states.
- Parameters
indices – The batch indices of environments to reset; defaults to all.
return_states (bool) – return the corresponding states after reset.
- Returns
Batch of observations. If
return_states
isTrue
return a tuple containing(batch_of_observations, batch_of_states)
.
- class plangym.vectorization.parallel.ParallelEnv(env_class, name, frameskip=1, autoreset=True, delay_setup=False, n_workers=8, blocking=False, **kwargs)[source]
Bases:
plangym.vectorization.env.VectorizedEnv
Allow any environment to be stepped in parallel when step_batch is called.
It creates a local instance of the target environment to call all other methods.
Example:
>>> from plangym.videogames import AtariEnv >>> env = ParallelEnv(env_class=AtariEnv, ... name="MsPacman-v0", ... clone_seeds=True, ... autoreset=True, ... blocking=False) >>> >>> state, obs = env.reset() >>> >>> states = [state.copy() for _ in range(10)] >>> actions = [env.sample_action() for _ in range(10)] >>> >>> data = env.step_batch(states=states, actions=actions) >>> new_states, observs, rewards, ends, infos = data
- Parameters
name (str) –
frameskip (int) –
autoreset (bool) –
delay_setup (bool) –
n_workers (int) –
blocking (bool) –
- property blocking(self)
If True the steps are performed sequentially.
- Return type
bool
- setup(self)[source]
Run environment initialization and create the subprocesses for stepping in parallel.
- make_transitions(self, actions, states=None, dt=1, return_state=None)[source]
Vectorized version of the
step
method.It allows to step a vector of states and actions. The signature and behaviour is the same as
step
, but taking a list of states, actions and dts as input.- Parameters
actions (numpy.ndarray) – Iterable containing the different actions to be applied.
states (numpy.ndarray) – Iterable containing the different states to be set.
dt (Union[numpy.ndarray, int]) – int or array containing the frameskips that will be applied.
return_state (bool) – Whether to return the state in the returned tuple. If None, step will return the state if state was passed as a parameter.
- Returns
if states is None returns
(observs, rewards, ends, infos)
else(new_states, observs, rewards, ends, infos)