"""Plangym API implementation."""
from abc import ABC
from typing import Any, Iterable
import gymnasium as gym
import numpy
from plangym.core import PlangymEnv, wrap_callable
LIFE_KEY = "lifes"
[docs]
class VideogameEnv(PlangymEnv, ABC):
"""Common interface for working with video games that run using an emulator."""
AVAILABLE_OBS_TYPES = {"rgb", "grayscale", "ram"}
DEFAULT_OBS_TYPE = "rgb"
def __init__(
self,
name: str,
frameskip: int = 5,
episodic_life: bool = False,
autoreset: bool = True,
delay_setup: bool = False,
remove_time_limit: bool = True,
obs_type: str = "rgb", # ram | rgb | grayscale
render_mode: str | None = None, # None | human | rgb_array
wrappers: Iterable[wrap_callable] | None = None,
**kwargs,
):
"""Initialize a :class:`VideogameEnv`.
Args:
name: Name of the environment. Follows standard gym syntax conventions.
frameskip: Number of times an action will be applied for each step
in dt.
episodic_life: Return ``end = True`` when losing a life.
autoreset: Restart environment when reaching a terminal state.
delay_setup: If ``True`` do not initialize the ``gym.Environment``
and wait for ``setup`` to be called later.
remove_time_limit: If True, remove the time limit from the environment.
obs_type: One of {"rgb", "ram", "grayscale"}.
mode: Integer or string indicating the game mode, when available.
difficulty: Difficulty level of the game, when available.
repeat_action_probability: Repeat the last action with this probability.
full_action_space: Whether to use the full range of possible actions
or only those available in the game.
render_mode: One of {None, "human", "rgb_aray"}.
wrappers: Wrappers that will be applied to the underlying OpenAI env.
Every element of the iterable can be either a :class:`gym.Wrapper`
or a tuple containing ``(gym.Wrapper, kwargs)``.
kwargs: Additional arguments to be passed to the ``gym.make`` function.
"""
self.episodic_life = episodic_life
self._info_step = {LIFE_KEY: -1, "lost_life": False}
super().__init__(
name=name,
frameskip=frameskip,
autoreset=autoreset,
wrappers=wrappers,
delay_setup=delay_setup,
render_mode=render_mode,
remove_time_limit=remove_time_limit,
obs_type=obs_type,
**kwargs,
)
@property
def n_actions(self) -> int:
"""Return the number of actions available."""
return self.action_space.n
[docs]
@staticmethod
def get_lifes_from_info(info: dict[str, Any]) -> int:
"""Return the number of lifes remaining in the current game."""
return info.get("life", -1)
[docs]
def apply_action(self, action):
"""Evolve the environment for one time step applying the provided action."""
obs, reward, terminal, truncated, info = super().apply_action(action=action)
info[LIFE_KEY] = self.get_lifes_from_info(info)
past_lifes = self._info_step.get(LIFE_KEY, -1)
lost_life = past_lifes > info[LIFE_KEY] or self._info_step.get("lost_life")
info["lost_life"] = lost_life
terminal = (terminal or lost_life) if self.episodic_life else terminal
return obs, reward, terminal, truncated, info
[docs]
def clone(self, **kwargs) -> "VideogameEnv":
"""Return a copy of the environment."""
params = {
"episodic_life": self.episodic_life,
"obs_type": self.obs_type,
"render_mode": self.render_mode,
}
params.update(**kwargs)
return super().clone(**params)
[docs]
def begin_step(
self, action=None, dt=None, state=None, return_state: bool | None = None
) -> None:
"""Perform setup of step variables before starting `step_with_dt`."""
self._info_step = {LIFE_KEY: -1, "lost_life": False}
super().begin_step(
action=action,
dt=dt,
state=state,
return_state=return_state,
)
[docs]
def init_spaces(self) -> None:
"""Initialize the action_space and the observation_space of the environment."""
super().init_spaces()
if self.obs_type == "ram":
if self.DEFAULT_OBS_TYPE == "ram":
space = self.gym_env.observation_space
else:
ram_size = self.get_ram().shape
space = gym.spaces.Box(low=0, high=255, dtype=numpy.uint8, shape=ram_size)
self._obs_space = space
[docs]
def process_obs(self, obs, **kwargs):
"""Return the ram vector if obs_type == "ram" or and image otherwise."""
obs = super().process_obs(obs, **kwargs)
if self.obs_type == "ram" and self.DEFAULT_OBS_TYPE != "ram":
obs = self.get_ram()
return obs
[docs]
def get_ram(self) -> numpy.ndarray:
"""Return the ram of the emulator as a numpy array."""
raise NotImplementedError()