"""
Nyx Camera Sensor for Genesis - GPU-accelerated rendering via Nyx renderer.
"""
# External imports
import ctypes
import os
from importlib.metadata import PackageNotFoundError, distribution
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional
import torch
# Internal imports
import genesis as gs
# Required for the pick result
if TYPE_CHECKING:
from genesis.engine.entities.base_entity import Entity
from genesis.engine.sensors.camera import BaseCameraSensor
from genesis.engine.sensors.base_sensor import (
Sensor,
)
from gs_nyx import nyx_py_sdk as nps
# Relative imports
from .nyx_renderer import NyxPyRenderer
from .nyx_scene_exporter import NyxSceneExporter
from .nyx_camera_options import NyxCameraOptions
from .nyx_camera_shared_metadata import NyxCameraSharedMetadata, _nyx_make_complete_path
__all__ = ["NyxCameraSensor", "NyxCameraData", "NyxPickPixelResult"]
# ========================== Data Class ==========================
[docs]
class NyxCameraData(NamedTuple):
"""Return type of :meth:`NyxCameraSensor.read`.
Attributes
----------
rgb : torch.Tensor
Rendered RGB image, shape ``(B, H, W, 3)`` with ``dtype=torch.uint8``,
resident on the CUDA device. ``B`` is the number of parallel Genesis
environments (``1`` for non-batched scenes); ``H`` and ``W`` match
the camera's configured :attr:`~NyxCameraOptions.res`. Channel order
is RGB, values in ``[0, 255]``.
"""
rgb: torch.Tensor
# ========================== Nyx Pick Pixel Result ==========================
[docs]
class NyxPickPixelResult(NamedTuple):
"""Hit information returned by :meth:`NyxCameraSensor.pick_pixel`.
Attributes
----------
entity : genesis.engine.entities.base_entity.Entity
The Genesis entity the ray hit.
link_name : str
Name of the hit link or sub-geometry. For URDF entities this is the
link name; for MJCF entities it is ``"<link>_<vgeom_idx>"``;
empty string for all other morph types.
position : tuple of float
World-space hit point ``(x, y, z)`` in Genesis Z-up coordinates.
"""
entity: "Entity"
link_name: str
position: tuple[float, float, float]
# ========================== Nyx Camera Sensor ==========================
[docs]
class NyxCameraSensor(
BaseCameraSensor, Sensor[NyxCameraOptions, NyxCameraSharedMetadata, NyxCameraData]
):
"""GPU-accelerated camera sensor backed by the Nyx renderer.
Registers with the Genesis sensor manager, contributes one camera (plus
any configured lights / env maps / light fields) to a renderer instance
shared across all :class:`NyxCameraSensor` sensors in the scene, and
produces RGB images on demand via the standard Genesis
:meth:`~genesis.engine.sensors.base_sensor.Sensor.read` API.
Configure with :class:`NyxCameraOptions`. The first ``NyxCameraSensor``
to finish building drives the actual renderer setup (scene export +
native renderer startup); subsequent sensors slot into the same shared
instance.
See the user guide for a configured example; this class is rarely
instantiated directly — pass :class:`NyxCameraOptions` to
:meth:`Scene.add_sensor` instead.
"""
[docs]
def __init__(
self,
options: NyxCameraOptions,
idx: int,
manager: "gs.SensorManager",
):
"""Bind the sensor to a Genesis sensor manager.
Called by :meth:`Scene.add_sensor` — users should not invoke this
directly. The native renderer is **not** started here; setup runs
inside :meth:`build` once the Genesis scene is built and every
``NyxCameraSensor`` in the scene has been added.
Parameters
----------
options : NyxCameraOptions
Per-camera configuration (resolution, FOV, pose, render mode,
lights / env maps, etc.).
idx : int
Genesis-side sensor index assigned by the manager.
manager : genesis.SensorManager
The sensor manager that owns this sensor.
"""
super().__init__(options, idx, manager)
self._options: NyxCameraOptions
self._camera_idx: Optional[int] = None
# Per-env camera poses from move_to_attach(), shape (B, 3) each, or None if not attached
self._attached_pos: Optional[torch.Tensor] = None
self._attached_lookat: Optional[torch.Tensor] = None
self._attached_up: Optional[torch.Tensor] = None
# ========================== Light Conversion ==========================
def _convert_light_dict_to_asset(self, light_dict: Dict[str, Any]) -> Any:
"""
Convert a light configuration dict to an Nyx LightAsset. Handles coordinate conversion from Genesis Z-up to Nyx Y-up.
Parameters
----------
light_dict : dict
Light configuration with keys:
- type: "point", "directional", or "spot"
- pos: (x, y, z) position in Z-up coordinates (for point/spot)
- dir: (x, y, z) direction in Z-up coordinates (for directional/spot)
- color: (r, g, b) RGB color [0-1]
- intensity: Light intensity
- shadow: Whether to cast shadows (default True)
- inner_angle, outer_angle: For spot lights (degrees)
Returns
-------
LightAsset
Nyx LightAsset object configured with Y-up coordinates.
"""
light = nps.LightAsset()
light_type = light_dict.get("type", "directional")
if light_type == "point":
light.type = nps.ELightType.Point
# Get the position and convert it.
light.point_position = nps.float3(*light_dict.get("pos", (0, 0, 5)))
light.point_position = nps.float3_z_up_to_y_up_a(light.point_position)
light.point_range = light_dict.get("range", 100.0)
light.unit = nps.ELightUnit.Lumen
elif light_type == "directional":
light.type = nps.ELightType.Directional
# Get the direction and convert it.
light.directional_direction = nps.float3(*light_dict.get("dir", (0, 0, -1)))
light.directional_direction = nps.float3_z_up_to_y_up_a(
light.directional_direction
)
light.unit = nps.ELightUnit.Lux
elif light_type == "spot":
light.type = nps.ELightType.Spot
# Get the position and convert it.
light.spot_position = nps.float3(*light_dict.get("pos", (0, 0, 5)))
light.spot_position = nps.float3_z_up_to_y_up_a(light.spot_position)
# Get the direction and convert it.
light.spot_direction = nps.float3(*light_dict.get("dir", (0, 0, -1)))
light.spot_direction = nps.float3_z_up_to_y_up_a(light.spot_direction)
light.spot_innerAngle = light_dict.get("inner_angle", 15.0)
light.spot_outerAngle = light_dict.get("outer_angle", 30.0)
light.spot_range = light_dict.get("range", 100.0)
light.unit = nps.ELightUnit.Lumen
else:
gs.raise_exception(f"Unknown light type: {light_type}")
# Common light properties
color = light_dict.get("color", (1, 1, 1))
light.color = nps.float3(color[0], color[1], color[2])
light.intensity = light_dict.get("intensity", 1.0)
light.shadow = light_dict.get("shadow", True)
return light
def _apply_camera_transform(self, camera_T: torch.Tensor):
"""
Store per-env camera transforms from the attachment system.
Called by BaseCameraSensor.move_to_attach() with the computed world transform.
The stored values are sent to the renderer in _render_current_state().
Parameters
----------
camera_T : torch.Tensor
Transform matrix, shape (B, 4, 4) for batched envs or (4, 4) for single.
"""
if len(camera_T.shape) == 2:
camera_T = camera_T.unsqueeze(0)
# Store per-env pos / lookat / up, all (B, 3)
self._attached_pos = camera_T[:, :3, 3]
self._attached_lookat = self._attached_pos + (-camera_T[:, :3, 2])
self._attached_up = camera_T[:, :3, 1]
self._stale = True
[docs]
def build(self):
"""Finalize the sensor and lazily start the shared Nyx renderer.
Called by the Genesis sensor manager once per sensor after the scene
has been built. Each call:
1. Resolves the optional rig attachment via :class:`RigidSensorMixin`.
2. Verifies CUDA is available (Nyx is GPU-only).
3. Registers this camera in the shared metadata, assigns it a Nyx
camera index, and captures its configuration as a definition dict.
4. If this is the final ``NyxCameraSensor`` left to build, collects
lights / env maps / light fields from **all** sibling sensors,
writes the scene-description JSON via :class:`NyxSceneExporter`,
and constructs the shared :class:`NyxPyRenderer`.
5. Pre-allocates this sensor's slot in the shared image cache:
``(B, H, W, 3)`` ``uint8`` CUDA tensor.
Raises
------
genesis.GenesisException
If CUDA is unavailable, or if sibling
:class:`NyxCameraSensor` instances disagree on ``render_mode``
(the shared renderer can serve only one mode).
"""
super().build() # Sets up _link from RigidSensorMixin
if not torch.cuda.is_available():
gs.raise_exception("NyxCameraSensor requires CUDA to be available.")
scene = self._manager._sim.scene
if self._shared_metadata.sensors is None:
self._shared_metadata.sensors = []
self._shared_metadata.camera_defs = []
self._shared_metadata.image_cache = {}
# Register this sensor and assign a camera index
self._camera_idx = len(self._shared_metadata.sensors)
self._shared_metadata.sensors.append(self)
# Store camera configuration as a dict
camera_def = {
"pos": self._options.pos,
"lookat": self._options.lookat,
"up": self._options.up,
"fov": self._options.fov,
"res": self._options.res,
"spp": self._options.spp,
"denoise": self._options.denoise,
"aperture": getattr(self._options, "aperture", 2.8),
"focal_len": getattr(self._options, "focal_length", 10.0),
"near": self._options.near,
"far": self._options.far,
"tone_mapper": self._options.tone_mapper,
"anti_aliasing": self._options.anti_aliasing,
}
self._shared_metadata.camera_defs.append(camera_def)
# Initialize renderer once all cameras are registered
all_nyx_sensors = self._manager._sensors_by_type[type(self)]
all_sensors_built = len(self._shared_metadata.sensors) == len(all_nyx_sensors)
if self._shared_metadata.renderer is None and all_sensors_built:
# Preload Nvidia compiler runtime if available (i.e. torch is not
# built from source). Must happen after the CUDA check and before
# the renderer is created, loading it at package-import time via
# RTLD_GLOBAL interferes with torch's CUDA detection.
try:
nvrtc_dist = distribution("nvidia_cuda_nvrtc")
for file in nvrtc_dist.files:
if file.name.startswith("libnvrtc-builtins.so.1"):
ctypes.CDLL(nvrtc_dist.locate_file(file), ctypes.RTLD_GLOBAL)
break
except (PackageNotFoundError, OSError):
pass
# Collect lights from all sensors and convert to LightAssets
lights = []
for sensor in self._shared_metadata.sensors:
for light_dict in sensor._options.lights:
lights.append(sensor._convert_light_dict_to_asset(light_dict))
# Collect env_maps from all sensors
env_maps = []
for sensor in self._shared_metadata.sensors:
for env_map in sensor._options.env_maps:
env_maps.append(env_map)
# Collect light_fields from all sensors
light_fields = []
for sensor in self._shared_metadata.sensors:
for light_field in sensor._options.light_fields:
light_fields.append(light_field)
# When there is no lights or env maps, we notify the user.
if len(lights) == 0 and len(env_maps) == 0:
print(
"\033[93m [WARNING] NyxCameraSensor: The current scene has no lights or environment maps. "
"If this is unintentional, please add a light source or an environment map.\033[0m"
)
# Generate the complete export path.
complete_path = _nyx_make_complete_path(
self._shared_metadata.scene_description_export_path
)
# Build the scene description
self._shared_metadata.scene_exporter = NyxSceneExporter(
scene,
cameras=self._shared_metadata.camera_defs,
export_folder=complete_path,
lights=lights,
env_maps=env_maps,
light_fields=light_fields,
)
# Export the scene to a json file that the Nyx renderer can read.
self._shared_metadata.scene_exporter.export_to_file(
os.path.join(
complete_path,
"nyx_scene.json",
)
)
# Get max resolution across all cameras
max_width, max_height = self._get_max_resolution(
[s._options for s in all_nyx_sensors]
)
# Verify all sensors use the same render_mode
render_mode = self._shared_metadata.sensors[0]._options.render_mode
for sensor in self._shared_metadata.sensors[1:]:
if sensor._options.render_mode != render_mode:
gs.raise_exception(
f"All Nyx cameras must use the same render_mode. "
f"Found '{render_mode}' and '{sensor._options.render_mode}'. "
f"Please ensure all NyxCameraOptions use the same render_mode."
)
# Create the Nyx renderer instance.
n_envs = max(self._manager._sim._B, 1)
debug_view = self._shared_metadata.sensors[0]._options.debug_view
self._shared_metadata.renderer = NyxPyRenderer(
scene,
os.path.join(
complete_path,
"nyx_scene.json",
),
max_width,
max_height,
n_envs=n_envs,
open_window=self._options.open_window,
camera_defs=self._shared_metadata.camera_defs,
render_mode=render_mode,
debug_view=debug_view,
)
self._shared_metadata.renderer.build(
self._shared_metadata.scene_exporter._entity_uuid_pairs
)
# Initialize image cache for this camera (always on CUDA for Nyx renderer)
n_envs = max(self._manager._sim._B, 1)
h, w = self._options.res[1], self._options.res[0]
self._shared_metadata.image_cache[self._idx] = torch.zeros(
(n_envs, h, w, 3), dtype=torch.uint8, device="cuda"
)
[docs]
def pick_pixel(
self, camera_index: int, x: int, y: int
) -> Optional[NyxPickPixelResult]:
"""
Cast a ray through pixel (x, y) of the given camera and return what it hits.
Parameters
----------
camera_index : int
Index of the camera within this sensor's shared metadata. For a single
NyxCameraSensor this is 0; multi-camera setups index in registration order.
x, y : int
Pixel coordinates in image space, with the origin at the top-left of the
framebuffer. Must lie within the camera's configured resolution.
Returns
-------
NyxPickPixelResult or None
``None`` if the ray missed all scene geometry (e.g. background / sky).
Otherwise a NyxPickPixelResult with fields:
- ``entity``: the Genesis Entity that was hit
- ``link_name``: name of the hit link (URDF) or ``<link>_<vgeom_idx>`` (MJCF);
empty string for other morph types
- ``position``: world-space hit point as ``(x, y, z)`` in Genesis Z-up coordinates
"""
# Pick a pixel using the Nyx renderer's picking functionality.
res = self._shared_metadata.renderer.pick_pixel(
self._shared_metadata.scene_exporter._entity_uuid_pairs, camera_index, x, y
)
# If res is None, it means no entity was picked at that pixel.
if res is None:
return None
# If not, convert the picked result to Genesis entities and return as NyxPickPixelResult.
return NyxPickPixelResult(*res)
def _get_image_cache_entry(self):
"""Return this sensor's entry in the shared image cache."""
return self._shared_metadata.image_cache[self._idx]
def _render_current_state(self):
"""Perform the actual render for all cameras."""
# Shorthand
sensors = self._shared_metadata.sensors or [self]
renderer = self._shared_metadata.renderer
scene = self._manager._sim.scene
# Update visualization transforms
scene.visualizer.update_visual_states()
# Compute per-env attachment transforms for ALL attached sensors
for sensor in sensors:
if sensor._link is not None:
sensor.move_to_attach()
# Update window events, delta time, etc.
renderer.update()
# Nyx Renderer can only render a single env at a time
# So we loop over all envs and render all the sensors in this env
for env_idx in range(max(self._manager._sim._B, 1)):
# Update camera tensors for this env
for sensor in sensors:
if sensor._attached_pos is not None:
# Attached camera: use per-env transform from move_to_attach
renderer.update_camera_tensor(
sensor._camera_idx,
sensor._attached_pos[env_idx],
sensor._attached_lookat[env_idx],
sensor._attached_up[env_idx],
)
else:
# Static camera: same pose for all envs
renderer.update_camera_tensor(
sensor._camera_idx,
sensor._options.pos,
sensor._options.lookat,
sensor._options.up,
)
# Sync geometry data from Genesis to Nyx for this env
renderer.update_scene(env_idx)
# Render all cameras
for sensor in sensors:
rgb_tensor = renderer.render(camera_index=sensor._camera_idx)
sensor._shared_metadata.image_cache[sensor._idx][env_idx] = rgb_tensor
# Mark all sensors as fresh
for sensor in sensors:
sensor._stale = False
self._shared_metadata.last_render_timestep = self._manager._sim.scene.t
[docs]
def update_camera_pose(self, pos=None, lookat=None, up=None):
"""Update camera pose dynamically (applies to all envs on next render).
Detaches the camera from any rig so the specified world-space pose
is used uniformly across all environments.
Parameters
----------
pos : tuple, optional
New camera position (x, y, z) in Z-up coordinates.
lookat : tuple, optional
New look-at point (x, y, z) in Z-up coordinates.
up : tuple, optional
New up vector (x, y, z) in Z-up coordinates.
"""
if pos is not None:
self._options.pos = tuple(pos)
if lookat is not None:
self._options.lookat = tuple(lookat)
if up is not None:
self._options.up = tuple(up)
# Detach from rig
self._link = None
self._attached_pos = None
self._attached_lookat = None
self._attached_up = None
self._stale = True
@staticmethod
def _get_max_resolution(options_list: List[NyxCameraOptions]) -> tuple[int, int]:
"""Get the maximum resolution across all camera options."""
if not options_list:
return 1024, 1024
max_width = max(opt.res[0] for opt in options_list)
max_height = max(opt.res[1] for opt in options_list)
return max_width, max_height