src.plangym.vectorization.parallel#

Handle parallelization for plangym.Environment that allows vectorized steps.

Classes#

ExternalProcess

Step environment in a separate process for lock free paralellism.

BatchEnv

Combine multiple environments to step them in batch.

ParallelEnv

Allow any environment to be stepped in parallel when step_batch is called.

Module Contents#

class src.plangym.vectorization.parallel.ExternalProcess(constructor)[source]#

Step environment in a separate process for lock free paralellism.

The environment will be created in the external process by calling the specified callable. This can be an environment class, or a function creating the environment and potentially wrapping it. The returned environment should not access global variables.

Parameters:

constructor – Callable that creates and returns an OpenAI gym environment.

observation_space#

The cached observation space of the environment.

action_space#

The cached action space of the environment.

..notes:

This is mostly a copy paste from tensorflow/agents, that lets us set and read the environment state.

_ACCESS = 1#
_CALL = 2#
_RESULT = 3#
_EXCEPTION = 4#
_CLOSE = 5#
_process#
_observ_space = None#
_action_space = None#
property observation_space#
Return the observation space of the internal environment.
property action_space#
Return the action space of the internal environment.
__getattr__(name)[source]#

Request an attribute from the environment.

Note that this involves communication with the external process, so it can be slow.

Parameters:

name – Attribute to access.

Returns:

Value of the attribute.

call(name, *args, **kwargs)[source]#

Asynchronously call a method of the external environment.

Parameters:
  • name – Name of the method to call.

  • *args – Positional arguments to forward to the method.

  • **kwargs – Keyword arguments to forward to the method.

Returns:

Promise object that blocks and provides the return value when called.

close()[source]#

Send a close message to the external process and join it.

set_state(state, blocking=True)[source]#

Set the state of the internal environment.

step_batch(actions, states=None, dt=None, return_state=None, blocking=True)[source]#

Vectorized version of the step method.

It allows to step a vector of states and actions. The signature and behaviour is the same as step, but taking a list of states, actions and dts as input.

Parameters:
  • actions – Iterable containing the different actions to be applied.

  • states – Iterable containing the different states to be set.

  • dt (numpy.ndarray | int) – int or array containing the frameskips that will be applied.

  • blocking – If True, execute sequentially.

  • return_state (bool | None) – Whether to return the state in the returned tuple. If None, step will return the state if state was passed as a parameter.

Returns:

if states is None returns (observs, rewards, ends, infos) else returns (new_states, observs, rewards, ends, infos)

step(action, state=None, dt=1, blocking=True)[source]#

Step the environment.

Parameters:
  • action – The action to apply to the environment.

  • state – State to be set on the environment before stepping it.

  • dt (int) – Number of consecutive times that action will be applied.

  • blocking – Whether to wait for the result.

Returns:

Transition tuple when blocking, otherwise callable that returns the transition tuple.

reset(blocking=True, return_states=False)[source]#

Reset the environment.

Parameters:
  • blocking – Whether to wait for the result.

  • return_states (bool) – If true return also the initial state of the environment.

Returns:

New observation when blocking, otherwise callable that returns the new observation.

_receive()[source]#

Wait for a message from the worker process and return its payload.

Raises

Exception: An exception was raised inside the worker process. KeyError: The received message is of an unknown type.

Returns

Payload object of the message.

_worker(constructor, conn)[source]#

Wait for actions and send back environment results.

Parameters:
  • constructor – Constructor for the OpenAI Gym environment.

  • conn – Connection for communication to the main process.

Raises:

KeyError – When receiving a message of unknown type.

class src.plangym.vectorization.parallel.BatchEnv(envs, blocking)[source]#

Combine multiple environments to step them in batch.

It is mostly a copy paste from tensorflow/agents that also allows to set and get the states.

To step environments in parallel, environments must support a blocking=False argument to their step and reset functions that makes them return callables instead to receive the result at a later time.

Parameters:
  • envs – List of environments.

  • blocking – Step environments after another rather than in parallel.

Raises:

ValueError – Environments have different observation or action spaces.

_envs#
_blocking#
__len__()[source]#

Return the number of combined environments.

Return type:

int

__getitem__(index)[source]#

Access an underlying environment by index.

__getattr__(name)[source]#

Forward unimplemented attributes to one of the original environments.

Parameters:

name – Attribute that was accessed.

Returns:

Value behind the attribute name one of the wrapped environments.

make_transitions(actions, states=None, dt=1, return_state=None)[source]#

Implement the logic for stepping the environment in parallel.

Parameters:
  • dt (numpy.ndarray | int)

  • return_state (bool | None)

sync_states(state, blocking=True)[source]#

Set the same state to all the environments that are inside an external process.

Parameters:
  • state – Target state to set on the environments.

  • blocking (bool) – If True perform the update sequentially. If False step the environments in parallel.

Returns:

None.

Return type:

None

reset(indices=None, return_states=True)[source]#

Reset the environment and return the resulting batch data.

Parameters:
  • indices – The batch indices of environments to reset; defaults to all.

  • return_states (bool) – return the corresponding states after reset.

Returns:

Batch of observations. If return_states is True return a tuple containing (batch_of_observations, batch_of_states).

close()[source]#

Send close messages to the external process and join them.

class src.plangym.vectorization.parallel.ParallelEnv(env_class, name, frameskip=1, autoreset=True, delay_setup=False, n_workers=8, blocking=False, **kwargs)[source]#

Bases: plangym.vectorization.env.VectorizedEnv

Allow any environment to be stepped in parallel when step_batch is called.

It creates a local instance of the target environment to call all other methods.

Example:

>>> from plangym.videogames import AtariEnv
>>> env = ParallelEnv(env_class=AtariEnv,
...                           name="MsPacman-v0",
...                           clone_seeds=True,
...                           autoreset=True,
...                           blocking=False)
>>>
>>> state, obs, info = env.reset()
>>>
>>> states = [state.copy() for _ in range(10)]
>>> actions = [env.sample_action() for _ in range(10)]
>>>
>>> data =  env.step_batch(states=states, actions=actions)
>>> new_states, observs, rewards, ends, truncateds, infos = data
Parameters:
  • name (str)

  • frameskip (int)

  • autoreset (bool)

  • delay_setup (bool)

  • n_workers (int)

  • blocking (bool)

_blocking#
_batch_env = None#
property blocking: bool#

If True the steps are performed sequentially.

Return type:

bool

setup()[source]#

Run environment initialization and create the subprocesses for stepping in parallel.

clone(**kwargs)[source]#

Return a copy of the environment.

Return type:

plangym.core.PlanEnv

make_transitions(actions, states=None, dt=1, return_state=None)[source]#

Vectorized version of the step method.

It allows to step a vector of states and actions. The signature and behaviour is the same as step, but taking a list of states, actions and dts as input.

Parameters:
  • actions (numpy.ndarray) – Iterable containing the different actions to be applied.

  • states (numpy.ndarray) – Iterable containing the different states to be set.

  • dt (numpy.ndarray | int) – int or array containing the frameskips that will be applied.

  • return_state (bool | None) – Whether to return the state in the returned tuple. If None, step will return the state if state was passed as a parameter.

Returns:

if states is None returns (observs, rewards, ends, truncateds, infos) else (new_states, observs, rewards, ends, truncateds, infos)

sync_states(state)[source]#

Synchronize all the copies of the wrapped environment.

Set all the states of the different workers of the internal BatchEnv

to the same state as the internal Environment used to apply the non-vectorized steps.

Parameters:

state (None)

close()[source]#

Close the environment and the spawned processes.

Return type:

None