plangym.vectorization.parallel

Handle parallelization for plangym.Environment that allows vectorized steps.

Module Contents

Classes

ExternalProcess

Step environment in a separate process for lock free paralellism.

BatchEnv

Combine multiple environments to step them in batch.

ParallelEnv

Allow any environment to be stepped in parallel when step_batch is called.

class plangym.vectorization.parallel.ExternalProcess(constructor)[source]

Step environment in a separate process for lock free paralellism.

The environment will be created in the external process by calling the specified callable. This can be an environment class, or a function creating the environment and potentially wrapping it. The returned environment should not access global variables.

Parameters

constructor – Callable that creates and returns an OpenAI gym environment.

observation_space

The cached observation space of the environment.

action_space

The cached action space of the environment.

..notes:

This is mostly a copy paste from https://github.com/tensorflow/agents/blob/master/agents/tools/wrappers.py, that lets us set and read the environment state.

_ACCESS = 1
_CALL = 2
_RESULT = 3
_EXCEPTION = 4
_CLOSE = 5
property observation_space(self)

Return the observation space of the internal environment.

property action_space(self)

Return the action space of the internal environment.

__getattr__(self, name)[source]

Request an attribute from the environment.

Note that this involves communication with the external process, so it can be slow.

Parameters

name – Attribute to access.

Returns

Value of the attribute.

call(self, name, *args, **kwargs)[source]

Asynchronously call a method of the external environment.

Parameters
  • name – Name of the method to call.

  • *args – Positional arguments to forward to the method.

  • **kwargs – Keyword arguments to forward to the method.

Returns

Promise object that blocks and provides the return value when called.

close(self)[source]

Send a close message to the external process and join it.

set_state(self, state, blocking=True)[source]

Set the state of the internal environment.

step_batch(self, actions, states=None, dt=None, return_state=None, blocking=True)[source]

Vectorized version of the step method.

It allows to step a vector of states and actions. The signature and behaviour is the same as step, but taking a list of states, actions and dts as input.

Parameters
  • actions – Iterable containing the different actions to be applied.

  • states – Iterable containing the different states to be set.

  • dt (Union[numpy.ndarray, int]) – int or array containing the frameskips that will be applied.

  • blocking – If True, execute sequentially.

  • return_state (bool) – Whether to return the state in the returned tuple. If None, step will return the state if state was passed as a parameter.

Returns

if states is None returns (observs, rewards, ends, infos) else returns (new_states, observs, rewards, ends, infos)

step(self, action, state=None, dt=1, blocking=True)[source]

Step the environment.

Parameters
  • action – The action to apply to the environment.

  • state – State to be set on the environment before stepping it.

  • dt (int) – Number of consecutive times that action will be applied.

  • blocking – Whether to wait for the result.

Returns

Transition tuple when blocking, otherwise callable that returns the transition tuple.

reset(self, blocking=True, return_states=False)[source]

Reset the environment.

Parameters
  • blocking – Whether to wait for the result.

  • return_states (bool) – If true return also the initial state of the environment.

Returns

New observation when blocking, otherwise callable that returns the new observation.

_receive(self)[source]

Wait for a message from the worker process and return its payload.

Raises
  • Exception – An exception was raised inside the worker process.

  • KeyError – The received message is of an unknown type.

Returns

Payload object of the message.

_worker(self, constructor, conn)[source]

Wait for actions and send back environment results.

Parameters
  • constructor – Constructor for the OpenAI Gym environment.

  • conn – Connection for communication to the main process.

Raises

KeyError – When receiving a message of unknown type.

class plangym.vectorization.parallel.BatchEnv(envs, blocking)[source]

Combine multiple environments to step them in batch.

It is mostly a copy paste from https://github.com/tensorflow/agents/blob/master/agents/tools/wrappers.py that also allows to set and get the states.

To step environments in parallel, environments must support a blocking=False argument to their step and reset functions that makes them return callables instead to receive the result at a later time.

Parameters
  • envs – List of environments.

  • blocking – Step environments after another rather than in parallel.

Raises

ValueError – Environments have different observation or action spaces.

__len__(self)[source]

Return the number of combined environments.

Return type

int

__getitem__(self, index)[source]

Access an underlying environment by index.

__getattr__(self, name)[source]

Forward unimplemented attributes to one of the original environments.

Parameters

name – Attribute that was accessed.

Returns

Value behind the attribute name one of the wrapped environments.

make_transitions(self, actions, states=None, dt=1, return_state=None)[source]

Implement the logic for stepping the environment in parallel.

Parameters
  • dt (Union[numpy.ndarray, int]) –

  • return_state (bool) –

sync_states(self, state, blocking=True)[source]

Set the same state to all the environments that are inside an external process.

Parameters
  • state – Target state to set on the environments.

  • blocking (bool) – If True perform the update sequentially. If False step the environments in parallel.

Returns

None.

Return type

None

reset(self, indices=None, return_states=True)[source]

Reset the environment and return the resulting batch observations, or batch of observations and states.

Parameters
  • indices – The batch indices of environments to reset; defaults to all.

  • return_states (bool) – return the corresponding states after reset.

Returns

Batch of observations. If return_states is True return a tuple containing (batch_of_observations, batch_of_states).

close(self)[source]

Send close messages to the external process and join them.

class plangym.vectorization.parallel.ParallelEnv(env_class, name, frameskip=1, autoreset=True, delay_setup=False, n_workers=8, blocking=False, **kwargs)[source]

Bases: plangym.vectorization.env.VectorizedEnv

Allow any environment to be stepped in parallel when step_batch is called.

It creates a local instance of the target environment to call all other methods.

Example:

>>> from plangym.videogames import AtariEnv
>>> env = ParallelEnv(env_class=AtariEnv,
...                           name="MsPacman-v0",
...                           clone_seeds=True,
...                           autoreset=True,
...                           blocking=False)
>>>
>>> state, obs = env.reset()
>>>
>>> states = [state.copy() for _ in range(10)]
>>> actions = [env.sample_action() for _ in range(10)]
>>>
>>> data =  env.step_batch(states=states, actions=actions)
>>> new_states, observs, rewards, ends, infos = data
Parameters
  • name (str) –

  • frameskip (int) –

  • autoreset (bool) –

  • delay_setup (bool) –

  • n_workers (int) –

  • blocking (bool) –

property blocking(self)

If True the steps are performed sequentially.

Return type

bool

setup(self)[source]

Run environment initialization and create the subprocesses for stepping in parallel.

clone(self, **kwargs)[source]

Return a copy of the environment.

Return type

plangym.core.PlanEnv

make_transitions(self, actions, states=None, dt=1, return_state=None)[source]

Vectorized version of the step method.

It allows to step a vector of states and actions. The signature and behaviour is the same as step, but taking a list of states, actions and dts as input.

Parameters
  • actions (numpy.ndarray) – Iterable containing the different actions to be applied.

  • states (numpy.ndarray) – Iterable containing the different states to be set.

  • dt (Union[numpy.ndarray, int]) – int or array containing the frameskips that will be applied.

  • return_state (bool) – Whether to return the state in the returned tuple. If None, step will return the state if state was passed as a parameter.

Returns

if states is None returns (observs, rewards, ends, infos) else (new_states, observs, rewards, ends, infos)

sync_states(self, state)[source]

Synchronize all the copies of the wrapped environment.

Set all the states of the different workers of the internal BatchEnv

to the same state as the internal Environment used to apply the non-vectorized steps.

Parameters

state (None) –

close(self)[source]

Close the environment and the spawned processes.

Return type

None