src.plangym.vectorization.parallel
==================================

.. py:module:: src.plangym.vectorization.parallel

.. autoapi-nested-parse::

   Handle parallelization for ``plangym.Environment`` that allows vectorized steps.



Classes
-------

.. autoapisummary::

   src.plangym.vectorization.parallel.ExternalProcess
   src.plangym.vectorization.parallel.BatchEnv
   src.plangym.vectorization.parallel.ParallelEnv


Module Contents
---------------

.. py:class:: ExternalProcess(constructor)

   Step environment in a separate process for lock free paralellism.

   The environment will be created in the external process by calling the
   specified callable. This can be an environment class, or a function
   creating the environment and potentially wrapping it. The returned
   environment should not access global variables.

   :param constructor: Callable that creates and returns an OpenAI gym environment.

   .. attribute:: observation_space

      The cached observation space of the environment.

   .. attribute:: action_space

      The cached action space of the environment.

   ..notes:
       This is mostly a copy paste from
       https://github.com/tensorflow/agents/blob/master/agents/tools/wrappers.py,
       that lets us set and read the environment state.



   .. py:attribute:: _ACCESS
      :value: 1



   .. py:attribute:: _CALL
      :value: 2



   .. py:attribute:: _RESULT
      :value: 3



   .. py:attribute:: _EXCEPTION
      :value: 4



   .. py:attribute:: _CLOSE
      :value: 5



   .. py:attribute:: _process


   .. py:attribute:: _observ_space
      :value: None



   .. py:attribute:: _action_space
      :value: None



   .. py:property:: observation_space
      Return the observation space of the internal environment.


   .. py:property:: action_space
      Return the action space of the internal environment.


   .. py:method:: __getattr__(name)

      Request an attribute from the environment.

      Note that this involves communication with the external process, so it can         be slow.

      :param name: Attribute to access.

      :returns: Value of the attribute.



   .. py:method:: call(name, *args, **kwargs)

      Asynchronously call a method of the external environment.

      :param name: Name of the method to call.
      :param \*args: Positional arguments to forward to the method.
      :param \*\*kwargs: Keyword arguments to forward to the method.

      :returns: Promise object that blocks and provides the return value when called.



   .. py:method:: close()

      Send a close message to the external process and join it.



   .. py:method:: set_state(state, blocking=True)

      Set the state of the internal environment.



   .. py:method:: step_batch(actions, states=None, dt = None, return_state = None, blocking=True)

      Vectorized version of the ``step`` method.

      It allows to step a vector of states and actions. The signature and         behaviour is the same as ``step``, but taking a list of states, actions         and dts as input.

      :param actions: Iterable containing the different actions to be applied.
      :param states: Iterable containing the different states to be set.
      :param dt: int or array containing the frameskips that will be applied.
      :param blocking: If True, execute sequentially.
      :param return_state: Whether to return the state in the returned tuple.                 If None, `step` will return the state if `state` was passed as a parameter.

      :returns: if states is None returns ``(observs, rewards, ends, infos)``
                else returns ``(new_states, observs, rewards, ends, infos)``



   .. py:method:: step(action, state=None, dt = 1, blocking=True)

      Step the environment.

      :param action: The action to apply to the environment.
      :param state: State to be set on the environment before stepping it.
      :param dt: Number of consecutive times that action will be applied.
      :param blocking: Whether to wait for the result.

      :returns: Transition tuple when blocking, otherwise callable that returns the           transition tuple.



   .. py:method:: reset(blocking=True, return_states = False)

      Reset the environment.

      :param blocking: Whether to wait for the result.
      :param return_states: If true return also the initial state of the environment.

      :returns: New observation when blocking, otherwise callable that returns the new           observation.



   .. py:method:: _receive()

      Wait for a message from the worker process and return its payload.

      Raises
        Exception: An exception was raised inside the worker process.
        KeyError: The received message is of an unknown type.

      Returns
        Payload object of the message.




   .. py:method:: _worker(constructor, conn)

      Wait for actions and send back environment results.

      :param constructor: Constructor for the OpenAI Gym environment.
      :param conn: Connection for communication to the main process.

      :raises KeyError: When receiving a message of unknown type.



.. py:class:: BatchEnv(envs, blocking)

   Combine multiple environments to step them in batch.

   It is mostly a copy paste from     https://github.com/tensorflow/agents/blob/master/agents/tools/wrappers.py     that also allows to set and get the states.

   To step environments in parallel, environments must support a     ``blocking=False`` argument to their step and reset functions that     makes them return callables instead to receive the result at a later time.

   :param envs: List of environments.
   :param blocking: Step environments after another rather than in parallel.

   :raises ValueError: Environments have different observation or action spaces.


   .. py:attribute:: _envs


   .. py:attribute:: _blocking


   .. py:method:: __len__()

      Return the number of combined environments.



   .. py:method:: __getitem__(index)

      Access an underlying environment by index.



   .. py:method:: __getattr__(name)

      Forward unimplemented attributes to one of the original environments.

      :param name: Attribute that was accessed.

      :returns: Value behind the attribute name one of the wrapped environments.



   .. py:method:: make_transitions(actions, states=None, dt = 1, return_state = None)

      Implement the logic for stepping the environment in parallel.



   .. py:method:: sync_states(state, blocking = True)

      Set the same state to all the environments that are inside an external process.

      :param state: Target state to set on the environments.
      :param blocking: If ``True`` perform the update sequentially. If ``False`` step                      the environments in parallel.

      :returns: None.



   .. py:method:: reset(indices=None, return_states = True)

      Reset the environment and return the resulting batch data.

      :param indices: The batch indices of environments to reset; defaults to all.
      :param return_states: return the corresponding states after reset.

      :returns: Batch of observations. If ``return_states`` is ``True`` return a tuple           containing ``(batch_of_observations, batch_of_states)``.



   .. py:method:: close()

      Send close messages to the external process and join them.



.. py:class:: ParallelEnv(env_class, name, frameskip = 1, autoreset = True, delay_setup = False, n_workers = 8, blocking = False, **kwargs)

   Bases: :py:obj:`plangym.vectorization.env.VectorizedEnv`


   Allow any environment to be stepped in parallel when step_batch is called.

   It creates a local instance of the target environment to call all other methods.

   Example::

       >>> from plangym.videogames import AtariEnv
       >>> env = ParallelEnv(env_class=AtariEnv,
       ...                           name="MsPacman-v0",
       ...                           clone_seeds=True,
       ...                           autoreset=True,
       ...                           blocking=False)
       >>>
       >>> state, obs, info = env.reset()
       >>>
       >>> states = [state.copy() for _ in range(10)]
       >>> actions = [env.sample_action() for _ in range(10)]
       >>>
       >>> data =  env.step_batch(states=states, actions=actions)
       >>> new_states, observs, rewards, ends, truncateds, infos = data



   .. py:attribute:: _blocking


   .. py:attribute:: _batch_env
      :value: None



   .. py:property:: blocking
      :type: bool

      If True the steps are performed sequentially.


   .. py:method:: setup()

      Run environment initialization and create the subprocesses for stepping in parallel.



   .. py:method:: clone(**kwargs)

      Return a copy of the environment.



   .. py:method:: make_transitions(actions, states = None, dt = 1, return_state = None)

      Vectorized version of the ``step`` method.

      It allows to step a vector of states and actions. The signature and
      behaviour is the same as ``step``, but taking a list of states, actions
      and dts as input.

      :param actions: Iterable containing the different actions to be applied.
      :param states: Iterable containing the different states to be set.
      :param dt: int or array containing the frameskips that will be applied.
      :param return_state: Whether to return the state in the returned tuple.                 If None, `step` will return the state if `state` was passed as a parameter.

      :returns: if states is None returns ``(observs, rewards, ends, truncateds, infos)`` else             ``(new_states, observs, rewards, ends, truncateds, infos)``



   .. py:method:: sync_states(state)

      Synchronize all the copies of the wrapped environment.

      Set all the states of the different workers of the internal :class:`BatchEnv`
       to the same state as the internal :class:`Environment` used to apply the
       non-vectorized steps.



   .. py:method:: close()

      Close the environment and the spawned processes.



