Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 80 additions & 49 deletions examples/camera_viz/camera_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import yaml

from pipeline import FrameSource
from sources import build_local_camera
from sources import PairedFrameSource, build_local_camera
from transports import RtpH264Sender, make_encoder

logger = logging.getLogger("camera_streamer")
Expand All @@ -45,16 +45,22 @@
SUPERVISOR_TICK_S = 1.0


def _pick_mono_source(sources: List[FrameSource], camera_name: str) -> FrameSource:
"""Mono-only sender: exactly one source per camera. Stereo cameras
aren't supported here pending per-eye QuadLayer binding."""
def _eye_sources(sources: List[FrameSource], camera_name: str) -> List[FrameSource]:
"""Normalize ``build_local_camera`` output into a 1-or-2 element list.

Mono cameras → [src]. Stereo cameras → [left, right] unwrapped from
the PairedFrameSource wrapper that ``build_local_camera`` returns.
The streamer then fires one independent RTP stream per element."""
if len(sources) == 1 and isinstance(sources[0], PairedFrameSource):
paired = sources[0]
return [paired.left, paired.right]
if len(sources) != 1:
names = [s.spec.name for s in sources]
raise ValueError(
f"camera {camera_name!r} produced {len(sources)} streams {names}; "
"only mono cameras are supported here. Set mode/stereo to mono/false."
"expected 1 (mono) or a PairedFrameSource (stereo)."
)
return sources[0]
return [sources[0]]


class CameraSupervisor:
Expand Down Expand Up @@ -96,66 +102,91 @@ def stop(self) -> None:
return
self._thread = None

def _build_sender(self) -> RtpH264Sender:
source = _pick_mono_source(build_local_camera(self._cfg), self._name)
def _build_senders(self) -> List[RtpH264Sender]:
eyes = _eye_sources(build_local_camera(self._cfg), self._name)
rtp = self._cfg.get("rtp", {})
if "port" not in rtp:
raise ValueError(f"camera {self._name!r} missing rtp.port")
encoder = make_encoder(
rtp.get("encoder", self._default_encoder),
width=int(self._cfg["width"]),
height=int(self._cfg["height"]),
bitrate=int(rtp.get("bitrate_mbps", 15)) * 1_000_000,
fps=int(self._cfg.get("fps", 30)),
gop=int(rtp["gop"]) if "gop" in rtp else None,
gpu_id=int(rtp.get("gpu_id", 0)),
)
return RtpH264Sender(
source=source,
encoder=encoder,
host=self._host,
port=int(rtp["port"]),
width=int(self._cfg["width"]),
height=int(self._cfg["height"]),
fps=int(self._cfg.get("fps", 30)),
mtu=int(rtp.get("mtu", 1400)),
)
is_stereo = len(eyes) == 2
if is_stereo and "port_right" not in rtp:
raise ValueError(
f"camera {self._name!r}: stereo requires rtp.port_right (the "
"left eye goes to rtp.port, the right eye to rtp.port_right)"
)

def build_one(source: FrameSource, port: int) -> RtpH264Sender:
encoder = make_encoder(
rtp.get("encoder", self._default_encoder),
width=int(self._cfg["width"]),
height=int(self._cfg["height"]),
bitrate=int(rtp.get("bitrate_mbps", 15)) * 1_000_000,
fps=int(self._cfg.get("fps", 30)),
gop=int(rtp["gop"]) if "gop" in rtp else None,
gpu_id=int(rtp.get("gpu_id", 0)),
)
return RtpH264Sender(
source=source,
encoder=encoder,
host=self._host,
port=port,
width=int(self._cfg["width"]),
height=int(self._cfg["height"]),
fps=int(self._cfg.get("fps", 30)),
mtu=int(rtp.get("mtu", 1400)),
)

if is_stereo:
return [
build_one(eyes[0], int(rtp["port"])),
build_one(eyes[1], int(rtp["port_right"])),
]
return [build_one(eyes[0], int(rtp["port"]))]

def _run(self) -> None:
attempt = 0
while not self._stop.is_set():
attempt += 1
sender: Optional[RtpH264Sender] = None
senders: List[RtpH264Sender] = []
started_at: Optional[float] = None
try:
logger.info("camera %r: building (attempt %d)", self._name, attempt)
sender = self._build_sender()
sender.start()
senders = self._build_senders()
for s in senders:
s.start()
started_at = time.monotonic()
logger.info(
"camera %r: streaming → %s:%s",
self._name,
self._host,
self._cfg["rtp"]["port"],
)
# Poll sender liveness. If the send-loop thread dies
rtp = self._cfg.get("rtp", {})
if len(senders) == 2:
logger.info(
"camera %r: streaming stereo → %s:%s (L) + %s:%s (R)",
self._name,
self._host,
rtp.get("port"),
self._host,
rtp.get("port_right"),
)
else:
logger.info(
"camera %r: streaming → %s:%s",
self._name,
self._host,
rtp.get("port"),
)
# Poll sender liveness. If any send-loop thread dies
# after startup (GStreamer pipeline error, encoder
# crash, etc.) raise into the retry path; otherwise a
# silent-but-dead supervisor would keep the service
# "healthy" while nothing is being streamed.
# crash, etc.) raise into the retry path — for stereo
# we treat the pair atomically: if one eye drops, we
# restart both.
while not self._stop.is_set():
self._stop.wait(timeout=SUPERVISOR_TICK_S)
if not sender.is_alive():
raise RuntimeError("RtpH264Sender thread exited unexpectedly")
dead = [s for s in senders if not s.is_alive()]
if dead:
raise RuntimeError(
f"{len(dead)}/{len(senders)} RtpH264Sender thread(s) exited unexpectedly"
)
except KeyboardInterrupt:
# SIGINT during ``sender.start()`` arrives as KeyboardInterrupt
# in this thread; surface as a stop, not a retry.
self._stop.set()
break
except Exception as e:
# ``camera_streamer`` is supposed to never exit. Log full
# traceback at debug and a one-liner at warning so journalctl
# stays readable while preserving the detail for triage.
uptime = (time.monotonic() - started_at) if started_at else 0.0
logger.warning(
"camera %r: failure after %.1fs uptime: %s — retrying in %.1fs",
Expand All @@ -166,9 +197,9 @@ def _run(self) -> None:
)
logger.debug("camera %r: traceback", self._name, exc_info=True)
finally:
if sender is not None:
for s in senders:
try:
sender.stop()
s.stop()
except Exception:
logger.debug(
"camera %r: sender.stop() raised", self._name, exc_info=True
Expand Down
121 changes: 103 additions & 18 deletions examples/camera_viz/camera_viz.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import argparse
import signal
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Tuple

Expand All @@ -29,9 +30,23 @@

from pipeline import FrameSource, VizRunner
from placements import PlacementConfig, PlacementStrategy, build as build_placement
from sources import RtpH264Source, build_local_camera
from sources import PairedFrameSource, RtpH264Source, build_local_camera

SourceEntry = Tuple[FrameSource, Optional[PlacementStrategy]]

@dataclass
class SourceEntry:
"""One row in the layer plan: a source + its placement + stereo cfg.

``stereo`` and ``stereo_baseline_mm`` are pulled from the camera spec
(``cameras.<cam>.stereo``) and the placement spec
(``placements.<cam>.stereo_baseline_mm``) respectively. They drive
the QuadLayer Config when the layer is added to the session.
"""

source: FrameSource
placement: Optional[PlacementStrategy]
stereo: bool = False
stereo_baseline_mm: float = 0.0


def _build_placement(spec: Optional[dict], is_xr: bool) -> Optional[PlacementStrategy]:
Expand Down Expand Up @@ -82,19 +97,48 @@ def _placement_with_aspect(
return _build_placement(spec, is_xr)


def _stereo_for(cam: dict, placements_cfg: dict) -> Tuple[bool, float]:
"""Resolve stereo + baseline for one camera.

``stereo`` lives on the camera (so the producer side knows). The
rendering knob ``stereo_baseline_mm`` lives on the placement (it's
a display-time parameter). 0.0 means both eyes see the same world
quad — all parallax comes from the captured frames.
"""
stereo = bool(cam.get("stereo", False))
pspec = placements_cfg.get(cam["name"]) or {}
baseline_mm = float(pspec.get("stereo_baseline_mm", 0.0))
return stereo, baseline_mm


def _build_local_entries(cfg: dict, is_xr: bool) -> List[SourceEntry]:
"""source=local: open each enabled camera directly."""
placements_cfg = cfg.get("display", {}).get("placements", {})
entries: List[SourceEntry] = []
for cam in _enabled_cameras(cfg):
placement = _placement_with_aspect(placements_cfg.get(cam["name"]), cam, is_xr)
stereo, baseline_mm = _stereo_for(cam, placements_cfg)
for source in build_local_camera(cam):
entries.append((source, placement))
entries.append(
SourceEntry(
source=source,
placement=placement,
stereo=stereo,
stereo_baseline_mm=baseline_mm,
)
)
return entries


def _build_rtp_entries(cfg: dict, is_xr: bool) -> List[SourceEntry]:
"""source=rtp: build an RTP listener per camera using its ``rtp.port``."""
"""source=rtp: build an RTP listener per camera using its ``rtp.port``.

Stereo cameras open TWO listeners (rtp.port for left, rtp.port_right
for right) and pair them via PairedFrameSource. The wire path treats
the two eyes as independent streams — drift is acceptable (the user
accepted "no sync" for RTP stereo); paired-frame atomicity at the
QuadLayer mailbox is what stops torn pairs from reaching the GPU.
"""
placements_cfg = cfg.get("display", {}).get("placements", {})
entries: List[SourceEntry] = []
for cam in _enabled_cameras(cfg):
Expand All @@ -104,16 +148,52 @@ def _build_rtp_entries(cfg: dict, is_xr: bool) -> List[SourceEntry]:
f"camera_viz: camera {cam.get('name')!r} missing rtp.port; "
"required when source: rtp"
)
source = RtpH264Source(
name=cam["name"],
width=int(cam["width"]),
height=int(cam["height"]),
port=int(rtp["port"]),
rtp_buffer_size=int(rtp.get("rtp_buffer_size", 212992)),
gpu_id=int(rtp.get("gpu_id", 0)),
)
placement = _placement_with_aspect(placements_cfg.get(cam["name"]), cam, is_xr)
entries.append((source, placement))
stereo, baseline_mm = _stereo_for(cam, placements_cfg)

if stereo:
if "port_right" not in rtp:
raise ValueError(
f"camera_viz: stereo camera {cam.get('name')!r} missing "
"rtp.port_right (required when stereo + source: rtp)"
)
left = RtpH264Source(
name=f"{cam['name']}.left",
width=int(cam["width"]),
height=int(cam["height"]),
port=int(rtp["port"]),
rtp_buffer_size=int(rtp.get("rtp_buffer_size", 212992)),
gpu_id=int(rtp.get("gpu_id", 0)),
)
right = RtpH264Source(
name=f"{cam['name']}.right",
width=int(cam["width"]),
height=int(cam["height"]),
port=int(rtp["port_right"]),
rtp_buffer_size=int(rtp.get("rtp_buffer_size", 212992)),
gpu_id=int(rtp.get("gpu_id", 0)),
)
source: FrameSource = PairedFrameSource(
name=cam["name"], left=left, right=right
)
else:
source = RtpH264Source(
name=cam["name"],
width=int(cam["width"]),
height=int(cam["height"]),
port=int(rtp["port"]),
rtp_buffer_size=int(rtp.get("rtp_buffer_size", 212992)),
gpu_id=int(rtp.get("gpu_id", 0)),
)

entries.append(
SourceEntry(
source=source,
placement=placement,
stereo=stereo,
stereo_baseline_mm=baseline_mm,
)
)
return entries


Expand Down Expand Up @@ -168,14 +248,19 @@ def main(argv: Optional[list[str]] = None) -> int:

# Build sources, layers, and placement strategies in parallel arrays.
sources, layers, strategies = [], [], []
for source, placement in entries:
sources.append(source)
for entry in entries:
sources.append(entry.source)
layer_cfg = viz.QuadLayerConfig()
layer_cfg.name = source.spec.name
layer_cfg.resolution = viz.Resolution(source.spec.width, source.spec.height)
layer_cfg.name = entry.source.spec.name
layer_cfg.resolution = viz.Resolution(
entry.source.spec.width, entry.source.spec.height
)
layer_cfg.format = viz.PixelFormat.kRGBA8
if entry.stereo:
layer_cfg.stereo = True
layer_cfg.stereo_baseline_mm = entry.stereo_baseline_mm
layers.append(session.add_quad_layer(layer_cfg))
strategies.append(placement)
strategies.append(entry.placement)

print(
f"camera_viz: source={source_mode}, mode={cfg.get('display', {}).get('mode')}, "
Expand Down
45 changes: 45 additions & 0 deletions examples/camera_viz/configs/synthetic_stereo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Synthetic stereo source — exercises the stereo QuadLayer path without
# stereo camera hardware. Same animated pattern as synthetic.yaml, but
# with a horizontal pixel disparity between the eyes so the parallax is
# visible in XR. Window mode renders the LEFT eye (per the documented
# single-view fallback).

source: local
streaming:
host: 127.0.0.1
encoder: auto

cameras:
- name: synth
enabled: true
type: synthetic
stereo: true # → SyntheticStereoSource + stereo QuadLayer
width: 1280
height: 720
fps: 60
hue_speed_hz: 0.25
disparity_px: 20 # horizontal pixel offset between eyes
rtp:
port: 5000
bitrate_mbps: 15

display:
mode: xr # window | xr — stereo really only shines in xr
window:
width: 1280
height: 720
xr:
near_z: 0.05
far_z: 100.0
clear_color: [0.05, 0.05, 0.08, 1.0]
placements:
synth:
lock_mode: lazy
distance: 1.5
stereo_baseline_mm: 0.0 # 0 → both eyes see the same world quad
# (parallax purely from the captured frames).
# Try 65.0 (typical IPD) to virtually push the
# screen further; -30.0 to bring it closer.
Loading
Loading