Source code for gs_nyx_plugin.nyx_camera_sensor

"""
Nyx Camera Sensor for Genesis - GPU-accelerated rendering via Nyx renderer.
"""

# External imports
import ctypes
import os
from importlib.metadata import PackageNotFoundError, distribution
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional
import torch

# Internal imports
import genesis as gs

# Required for the pick result
if TYPE_CHECKING:
    from genesis.engine.entities.base_entity import Entity
from genesis.engine.sensors.camera import BaseCameraSensor
from genesis.engine.sensors.base_sensor import (
    Sensor,
)
from gs_nyx import nyx_py_sdk as nps

# Relative imports
from .nyx_renderer import NyxPyRenderer
from .nyx_scene_exporter import NyxSceneExporter
from .nyx_camera_options import NyxCameraOptions
from .nyx_camera_shared_metadata import NyxCameraSharedMetadata, _nyx_make_complete_path


__all__ = ["NyxCameraSensor", "NyxCameraData", "NyxPickPixelResult"]


# ========================== Data Class ==========================
[docs] class NyxCameraData(NamedTuple): """Return type of :meth:`NyxCameraSensor.read`. Attributes ---------- rgb : torch.Tensor Rendered RGB image, shape ``(B, H, W, 3)`` with ``dtype=torch.uint8``, resident on the CUDA device. ``B`` is the number of parallel Genesis environments (``1`` for non-batched scenes); ``H`` and ``W`` match the camera's configured :attr:`~NyxCameraOptions.res`. Channel order is RGB, values in ``[0, 255]``. """ rgb: torch.Tensor
# ========================== Nyx Pick Pixel Result ==========================
[docs] class NyxPickPixelResult(NamedTuple): """Hit information returned by :meth:`NyxCameraSensor.pick_pixel`. Attributes ---------- entity : genesis.engine.entities.base_entity.Entity The Genesis entity the ray hit. link_name : str Name of the hit link or sub-geometry. For URDF entities this is the link name; for MJCF entities it is ``"<link>_<vgeom_idx>"``; empty string for all other morph types. position : tuple of float World-space hit point ``(x, y, z)`` in Genesis Z-up coordinates. """ entity: "Entity" link_name: str position: tuple[float, float, float]
# ========================== Nyx Camera Sensor ==========================
[docs] class NyxCameraSensor( BaseCameraSensor, Sensor[NyxCameraOptions, NyxCameraSharedMetadata, NyxCameraData] ): """GPU-accelerated camera sensor backed by the Nyx renderer. Registers with the Genesis sensor manager, contributes one camera (plus any configured lights / env maps / light fields) to a renderer instance shared across all :class:`NyxCameraSensor` sensors in the scene, and produces RGB images on demand via the standard Genesis :meth:`~genesis.engine.sensors.base_sensor.Sensor.read` API. Configure with :class:`NyxCameraOptions`. The first ``NyxCameraSensor`` to finish building drives the actual renderer setup (scene export + native renderer startup); subsequent sensors slot into the same shared instance. See the user guide for a configured example; this class is rarely instantiated directly — pass :class:`NyxCameraOptions` to :meth:`Scene.add_sensor` instead. """
[docs] def __init__( self, options: NyxCameraOptions, idx: int, manager: "gs.SensorManager", ): """Bind the sensor to a Genesis sensor manager. Called by :meth:`Scene.add_sensor` — users should not invoke this directly. The native renderer is **not** started here; setup runs inside :meth:`build` once the Genesis scene is built and every ``NyxCameraSensor`` in the scene has been added. Parameters ---------- options : NyxCameraOptions Per-camera configuration (resolution, FOV, pose, render mode, lights / env maps, etc.). idx : int Genesis-side sensor index assigned by the manager. manager : genesis.SensorManager The sensor manager that owns this sensor. """ super().__init__(options, idx, manager) self._options: NyxCameraOptions self._camera_idx: Optional[int] = None # Per-env camera poses from move_to_attach(), shape (B, 3) each, or None if not attached self._attached_pos: Optional[torch.Tensor] = None self._attached_lookat: Optional[torch.Tensor] = None self._attached_up: Optional[torch.Tensor] = None
# ========================== Light Conversion ========================== def _convert_light_dict_to_asset(self, light_dict: Dict[str, Any]) -> Any: """ Convert a light configuration dict to an Nyx LightAsset. Handles coordinate conversion from Genesis Z-up to Nyx Y-up. Parameters ---------- light_dict : dict Light configuration with keys: - type: "point", "directional", or "spot" - pos: (x, y, z) position in Z-up coordinates (for point/spot) - dir: (x, y, z) direction in Z-up coordinates (for directional/spot) - color: (r, g, b) RGB color [0-1] - intensity: Light intensity - shadow: Whether to cast shadows (default True) - inner_angle, outer_angle: For spot lights (degrees) Returns ------- LightAsset Nyx LightAsset object configured with Y-up coordinates. """ light = nps.LightAsset() light_type = light_dict.get("type", "directional") if light_type == "point": light.type = nps.ELightType.Point # Get the position and convert it. light.point_position = nps.float3(*light_dict.get("pos", (0, 0, 5))) light.point_position = nps.float3_z_up_to_y_up_a(light.point_position) light.point_range = light_dict.get("range", 100.0) light.unit = nps.ELightUnit.Lumen elif light_type == "directional": light.type = nps.ELightType.Directional # Get the direction and convert it. light.directional_direction = nps.float3(*light_dict.get("dir", (0, 0, -1))) light.directional_direction = nps.float3_z_up_to_y_up_a( light.directional_direction ) light.unit = nps.ELightUnit.Lux elif light_type == "spot": light.type = nps.ELightType.Spot # Get the position and convert it. light.spot_position = nps.float3(*light_dict.get("pos", (0, 0, 5))) light.spot_position = nps.float3_z_up_to_y_up_a(light.spot_position) # Get the direction and convert it. light.spot_direction = nps.float3(*light_dict.get("dir", (0, 0, -1))) light.spot_direction = nps.float3_z_up_to_y_up_a(light.spot_direction) light.spot_innerAngle = light_dict.get("inner_angle", 15.0) light.spot_outerAngle = light_dict.get("outer_angle", 30.0) light.spot_range = light_dict.get("range", 100.0) light.unit = nps.ELightUnit.Lumen else: gs.raise_exception(f"Unknown light type: {light_type}") # Common light properties color = light_dict.get("color", (1, 1, 1)) light.color = nps.float3(color[0], color[1], color[2]) light.intensity = light_dict.get("intensity", 1.0) light.shadow = light_dict.get("shadow", True) return light def _apply_camera_transform(self, camera_T: torch.Tensor): """ Store per-env camera transforms from the attachment system. Called by BaseCameraSensor.move_to_attach() with the computed world transform. The stored values are sent to the renderer in _render_current_state(). Parameters ---------- camera_T : torch.Tensor Transform matrix, shape (B, 4, 4) for batched envs or (4, 4) for single. """ if len(camera_T.shape) == 2: camera_T = camera_T.unsqueeze(0) # Store per-env pos / lookat / up, all (B, 3) self._attached_pos = camera_T[:, :3, 3] self._attached_lookat = self._attached_pos + (-camera_T[:, :3, 2]) self._attached_up = camera_T[:, :3, 1] self._stale = True
[docs] def build(self): """Finalize the sensor and lazily start the shared Nyx renderer. Called by the Genesis sensor manager once per sensor after the scene has been built. Each call: 1. Resolves the optional rig attachment via :class:`RigidSensorMixin`. 2. Verifies CUDA is available (Nyx is GPU-only). 3. Registers this camera in the shared metadata, assigns it a Nyx camera index, and captures its configuration as a definition dict. 4. If this is the final ``NyxCameraSensor`` left to build, collects lights / env maps / light fields from **all** sibling sensors, writes the scene-description JSON via :class:`NyxSceneExporter`, and constructs the shared :class:`NyxPyRenderer`. 5. Pre-allocates this sensor's slot in the shared image cache: ``(B, H, W, 3)`` ``uint8`` CUDA tensor. Raises ------ genesis.GenesisException If CUDA is unavailable, or if sibling :class:`NyxCameraSensor` instances disagree on ``render_mode`` (the shared renderer can serve only one mode). """ super().build() # Sets up _link from RigidSensorMixin if not torch.cuda.is_available(): gs.raise_exception("NyxCameraSensor requires CUDA to be available.") scene = self._manager._sim.scene if self._shared_metadata.sensors is None: self._shared_metadata.sensors = [] self._shared_metadata.camera_defs = [] self._shared_metadata.image_cache = {} # Register this sensor and assign a camera index self._camera_idx = len(self._shared_metadata.sensors) self._shared_metadata.sensors.append(self) # Store camera configuration as a dict camera_def = { "pos": self._options.pos, "lookat": self._options.lookat, "up": self._options.up, "fov": self._options.fov, "res": self._options.res, "spp": self._options.spp, "denoise": self._options.denoise, "aperture": getattr(self._options, "aperture", 2.8), "focal_len": getattr(self._options, "focal_length", 10.0), "near": self._options.near, "far": self._options.far, "tone_mapper": self._options.tone_mapper, "anti_aliasing": self._options.anti_aliasing, } self._shared_metadata.camera_defs.append(camera_def) # Initialize renderer once all cameras are registered all_nyx_sensors = self._manager._sensors_by_type[type(self)] all_sensors_built = len(self._shared_metadata.sensors) == len(all_nyx_sensors) if self._shared_metadata.renderer is None and all_sensors_built: # Preload Nvidia compiler runtime if available (i.e. torch is not # built from source). Must happen after the CUDA check and before # the renderer is created, loading it at package-import time via # RTLD_GLOBAL interferes with torch's CUDA detection. try: nvrtc_dist = distribution("nvidia_cuda_nvrtc") for file in nvrtc_dist.files: if file.name.startswith("libnvrtc-builtins.so.1"): ctypes.CDLL(nvrtc_dist.locate_file(file), ctypes.RTLD_GLOBAL) break except (PackageNotFoundError, OSError): pass # Collect lights from all sensors and convert to LightAssets lights = [] for sensor in self._shared_metadata.sensors: for light_dict in sensor._options.lights: lights.append(sensor._convert_light_dict_to_asset(light_dict)) # Collect env_maps from all sensors env_maps = [] for sensor in self._shared_metadata.sensors: for env_map in sensor._options.env_maps: env_maps.append(env_map) # Collect light_fields from all sensors light_fields = [] for sensor in self._shared_metadata.sensors: for light_field in sensor._options.light_fields: light_fields.append(light_field) # When there is no lights or env maps, we notify the user. if len(lights) == 0 and len(env_maps) == 0: print( "\033[93m [WARNING] NyxCameraSensor: The current scene has no lights or environment maps. " "If this is unintentional, please add a light source or an environment map.\033[0m" ) # Generate the complete export path. complete_path = _nyx_make_complete_path( self._shared_metadata.scene_description_export_path ) # Build the scene description self._shared_metadata.scene_exporter = NyxSceneExporter( scene, cameras=self._shared_metadata.camera_defs, export_folder=complete_path, lights=lights, env_maps=env_maps, light_fields=light_fields, ) # Export the scene to a json file that the Nyx renderer can read. self._shared_metadata.scene_exporter.export_to_file( os.path.join( complete_path, "nyx_scene.json", ) ) # Get max resolution across all cameras max_width, max_height = self._get_max_resolution( [s._options for s in all_nyx_sensors] ) # Verify all sensors use the same render_mode render_mode = self._shared_metadata.sensors[0]._options.render_mode for sensor in self._shared_metadata.sensors[1:]: if sensor._options.render_mode != render_mode: gs.raise_exception( f"All Nyx cameras must use the same render_mode. " f"Found '{render_mode}' and '{sensor._options.render_mode}'. " f"Please ensure all NyxCameraOptions use the same render_mode." ) # Create the Nyx renderer instance. n_envs = max(self._manager._sim._B, 1) debug_view = self._shared_metadata.sensors[0]._options.debug_view self._shared_metadata.renderer = NyxPyRenderer( scene, os.path.join( complete_path, "nyx_scene.json", ), max_width, max_height, n_envs=n_envs, open_window=self._options.open_window, camera_defs=self._shared_metadata.camera_defs, render_mode=render_mode, debug_view=debug_view, ) self._shared_metadata.renderer.build( self._shared_metadata.scene_exporter._entity_uuid_pairs ) # Initialize image cache for this camera (always on CUDA for Nyx renderer) n_envs = max(self._manager._sim._B, 1) h, w = self._options.res[1], self._options.res[0] self._shared_metadata.image_cache[self._idx] = torch.zeros( (n_envs, h, w, 3), dtype=torch.uint8, device="cuda" )
[docs] def pick_pixel( self, camera_index: int, x: int, y: int ) -> Optional[NyxPickPixelResult]: """ Cast a ray through pixel (x, y) of the given camera and return what it hits. Parameters ---------- camera_index : int Index of the camera within this sensor's shared metadata. For a single NyxCameraSensor this is 0; multi-camera setups index in registration order. x, y : int Pixel coordinates in image space, with the origin at the top-left of the framebuffer. Must lie within the camera's configured resolution. Returns ------- NyxPickPixelResult or None ``None`` if the ray missed all scene geometry (e.g. background / sky). Otherwise a NyxPickPixelResult with fields: - ``entity``: the Genesis Entity that was hit - ``link_name``: name of the hit link (URDF) or ``<link>_<vgeom_idx>`` (MJCF); empty string for other morph types - ``position``: world-space hit point as ``(x, y, z)`` in Genesis Z-up coordinates """ # Pick a pixel using the Nyx renderer's picking functionality. res = self._shared_metadata.renderer.pick_pixel( self._shared_metadata.scene_exporter._entity_uuid_pairs, camera_index, x, y ) # If res is None, it means no entity was picked at that pixel. if res is None: return None # If not, convert the picked result to Genesis entities and return as NyxPickPixelResult. return NyxPickPixelResult(*res)
def _get_image_cache_entry(self): """Return this sensor's entry in the shared image cache.""" return self._shared_metadata.image_cache[self._idx] def _render_current_state(self): """Perform the actual render for all cameras.""" # Shorthand sensors = self._shared_metadata.sensors or [self] renderer = self._shared_metadata.renderer scene = self._manager._sim.scene # Update visualization transforms scene.visualizer.update_visual_states() # Compute per-env attachment transforms for ALL attached sensors for sensor in sensors: if sensor._link is not None: sensor.move_to_attach() # Update window events, delta time, etc. renderer.update() # Nyx Renderer can only render a single env at a time # So we loop over all envs and render all the sensors in this env for env_idx in range(max(self._manager._sim._B, 1)): # Update camera tensors for this env for sensor in sensors: if sensor._attached_pos is not None: # Attached camera: use per-env transform from move_to_attach renderer.update_camera_tensor( sensor._camera_idx, sensor._attached_pos[env_idx], sensor._attached_lookat[env_idx], sensor._attached_up[env_idx], ) else: # Static camera: same pose for all envs renderer.update_camera_tensor( sensor._camera_idx, sensor._options.pos, sensor._options.lookat, sensor._options.up, ) # Sync geometry data from Genesis to Nyx for this env renderer.update_scene(env_idx) # Render all cameras for sensor in sensors: rgb_tensor = renderer.render(camera_index=sensor._camera_idx) sensor._shared_metadata.image_cache[sensor._idx][env_idx] = rgb_tensor # Mark all sensors as fresh for sensor in sensors: sensor._stale = False self._shared_metadata.last_render_timestep = self._manager._sim.scene.t
[docs] def update_camera_pose(self, pos=None, lookat=None, up=None): """Update camera pose dynamically (applies to all envs on next render). Detaches the camera from any rig so the specified world-space pose is used uniformly across all environments. Parameters ---------- pos : tuple, optional New camera position (x, y, z) in Z-up coordinates. lookat : tuple, optional New look-at point (x, y, z) in Z-up coordinates. up : tuple, optional New up vector (x, y, z) in Z-up coordinates. """ if pos is not None: self._options.pos = tuple(pos) if lookat is not None: self._options.lookat = tuple(lookat) if up is not None: self._options.up = tuple(up) # Detach from rig self._link = None self._attached_pos = None self._attached_lookat = None self._attached_up = None self._stale = True
@staticmethod def _get_max_resolution(options_list: List[NyxCameraOptions]) -> tuple[int, int]: """Get the maximum resolution across all camera options.""" if not options_list: return 1024, 1024 max_width = max(opt.res[0] for opt in options_list) max_height = max(opt.res[1] for opt in options_list) return max_width, max_height