Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,8 @@ import type {
| `PlaybackOverlayContribution` | Registers a region above playback track (`id`, optional `order`, optional `height`, `render(context)`). |
| `TimelineOverlayContribution` | Alias of `PlaybackOverlayContribution` for naming clarity. |
| `RosViewExtensionContext` | Stable context object passed into extension renderers. |
| `PlaybackControlsApi` | Playback controls: `seek`, `play`, `pause`, `setSpeed`, `setLooping`, `stepBy`, `stepMessage`, `playUntil`, `subscribeCurrentTime`, `getSnapshot`. |
| `PlaybackSnapshot` | Playback state including `currentTime`, `startTime`, `endTime`, `isPlaying`, `speed`, optional `progressPercent`, `buffering`, `problems`. |
| `PlaybackControlsApi` | Playback controls: `seek`, `play`, `pause`, `setSpeed`, `setLooping`, `stepBy`, `stepMessage`, `playUntil`, `subscribeCurrentTime`, `getCurrentTime`, `getSnapshot`. |
| `PlaybackSnapshot` | Low-frequency playback state including `currentTime`, `startTime`, `endTime`, `isPlaying`, `speed`, optional `progressPercent`, `buffering`, `problems`. |
| `TimelineApi` | Helpers aligned with the scrubber: `getTimeBounds`, `timeToPercent`, `percentToTime`. |
| `MessageAccessApi` | Read-only `getMessagesInTimeRange` when the underlying player supports it. |

Expand Down Expand Up @@ -431,7 +431,8 @@ const annotationExtension: RosViewExtension = {
### Best practices

- Prefer `playback.subscribeCurrentTime()` for high-frequency visuals (canvas/ref updates) instead of React setState on every frame.
- Use `playback.getSnapshot()` for low-frequency state checks like `isPlaying`, `startTime`, and `endTime`.
- Use `playback.getCurrentTime()` for one-off real-time playhead reads.
- Use `playback.getSnapshot()` for low-frequency state checks like `isPlaying`, `startTime`, and `endTime`; its `currentTime` is a compatibility snapshot, not a React-driven playhead source.
- Keep extension renderers resilient; runtime errors are isolated from core playback controls.

---
Expand All @@ -440,6 +441,8 @@ const annotationExtension: RosViewExtension = {

For advanced use cases — subscribing to playback state and decoded messages from within custom React components rendered inside the viewer.

`useMessagePipeline` is intended for slowly-changing metadata such as presence, topics, bounds, progress, speed, and decoded message availability. Do not use `playerState.activeData.currentTime` for live playback UI; use `playback.subscribeCurrentTime()` or `playback.getCurrentTime()` instead.

```ts
import { useMessagePipeline } from '@ioai/rosview';
```
Expand Down
9 changes: 6 additions & 3 deletions docs/API.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,8 @@ import type {
| `SidebarTabContribution` | 注册侧边栏 Tab(`id`、`title`、可选 `icon`、`order`、`render(context)`)。 |
| `PlaybackOverlayContribution` | 在播放条上方注册一块区域(`id`、可选 `order`、`height`、`render(context)`)。 |
| `RosViewExtensionContext` | 传给扩展渲染器的稳定上下文(含 `playback`、`timeline`、`messages`、`hostContext`)。 |
| `PlaybackControlsApi` | 播放控制:`seek`、`play`、`pause`、`setSpeed`、`setLooping`、`stepBy`、`stepMessage`、`playUntil`、`subscribeCurrentTime`、`getSnapshot`。 |
| `PlaybackSnapshot` | 播放状态快照;可含 `progressPercent`、`buffering`、`problems` 等。 |
| `PlaybackControlsApi` | 播放控制:`seek`、`play`、`pause`、`setSpeed`、`setLooping`、`stepBy`、`stepMessage`、`playUntil`、`subscribeCurrentTime`、`getCurrentTime`、`getSnapshot`。 |
| `PlaybackSnapshot` | 低频播放状态快照;包含 `currentTime`、`startTime`、`endTime`、`isPlaying`、`speed`,可含 `progressPercent`、`buffering`、`problems` 等。 |
| `TimelineApi` | 与主 scrubber 对齐:`getTimeBounds`、`timeToPercent`、`percentToTime`。 |
| `MessageAccessApi` | 只读 `getMessagesInTimeRange`(播放器支持时)。 |

Expand Down Expand Up @@ -417,7 +417,8 @@ const annotationExtension: RosViewExtension = {
### 最佳实践

- 高频视觉更新优先用 `playback.subscribeCurrentTime()`,避免每帧 `setState`。
- 低频状态检查(如 `isPlaying`、`startTime`、`endTime`)用 `playback.getSnapshot()`。
- 一次性读取实时播放头时用 `playback.getCurrentTime()`。
- 低频状态检查(如 `isPlaying`、`startTime`、`endTime`)用 `playback.getSnapshot()`;其中的 `currentTime` 是兼容快照,不适合作为 React 驱动的实时播放头来源。
- 扩展渲染器应容错;运行时错误与核心播放控制隔离。

---
Expand All @@ -426,6 +427,8 @@ const annotationExtension: RosViewExtension = {

高级用法:在查看器内嵌的自定义 React 组件中订阅播放状态与解码消息。

`useMessagePipeline` 适合订阅低频元数据,例如 presence、topics、bounds、progress、speed 和解码消息可用性。实时播放 UI 不应依赖 `playerState.activeData.currentTime`;请使用 `playback.subscribeCurrentTime()` 或 `playback.getCurrentTime()`。

```ts
import { useMessagePipeline } from '@ioai/rosview';
```
Expand Down
13 changes: 11 additions & 2 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@ flowchart TB
BufferedSource -->|"messageIterator\n(17ms batches)"| Player
BlockLoader -->|"background prefill\ntime-block cache"| BufferedSource

Player -->|"PlayerState\n(messages, currentTime,\ntopics, progress)"| Pipeline
Player -->|"PlayerState\nmetadata, topics, progress"| Pipeline
Player -->|"subscribeCurrentTime()\ngetCurrentTime()"| FrameThrottle
Pipeline --> FrameThrottle
FrameThrottle -->|"messageEventsBySubscriberId"| ImagePanel
FrameThrottle --> PlotPanel
Expand Down Expand Up @@ -408,11 +409,14 @@ Key design decisions:
- **BlockLoader**: divides the entire file timeline into up to 400 blocks and preloads in the background
- **Seek backfill**: when seeking to a target time, fetches the most recent message per subscribed topic so panels always have data to display
- **Startup delay**: waits 100 ms after initialization before starting playback, giving panels time to call `setSubscriptions`
- **Playback time channel**: high-frequency `currentTime` updates go through `subscribeCurrentTime()` / `getCurrentTime()` instead of the Zustand pipeline store

#### MessagePipeline Layer

Core message distribution pipeline (Zustand store + custom pub/sub):

The Zustand store is intentionally limited to slowly-changing metadata. `PlayerState.activeData.currentTime` remains as a compatibility snapshot for discrete events such as initialization, seek, pause, loop/end boundaries, and close, but it is not updated for every playback tick.

```typescript
interface MessagePipelineState {
// Player state
Expand Down Expand Up @@ -443,7 +447,7 @@ Frame-rate control mechanism:
1. Player pushes `PlayerState` via `setListener`
2. The pipeline waits for all panels to call `renderDone` before processing the next frame
3. `msPerFrame` enforces a minimum interval between frames
4. Panels subscribe with `useMessagePipeline(selector)` to receive only the slice they need
4. Panels subscribe with `useMessagePipeline(selector)` to receive only the slow metadata slice they need; live playhead UI uses `subscribeCurrentTime()` or `getCurrentTime()`

#### Panel Layer

Expand Down Expand Up @@ -598,9 +602,14 @@ const subscribersRef = useRef<Set<(time: Time) => void>>(new Set());

function subscribeCurrentTime(callback: (time: Time) => void) {
subscribersRef.current.add(callback);
if (currentTimeRef.current) callback(currentTimeRef.current);
return () => subscribersRef.current.delete(callback);
}

function getCurrentTime() {
return currentTimeRef.current;
}

// Advance playback without triggering React renders
function advanceTime(time: Time) {
currentTimeRef.current = time;
Expand Down
13 changes: 11 additions & 2 deletions docs/ARCHITECTURE.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,8 @@ flowchart TB
BufferedSource -->|"messageIterator\n(17ms 批量)"| Player
BlockLoader -->|"后台预填充\n时间块缓存"| BufferedSource

Player -->|"PlayerState\n(messages, currentTime,\ntopics, progress)"| Pipeline
Player -->|"PlayerState\nmetadata, topics, progress"| Pipeline
Player -->|"subscribeCurrentTime()\ngetCurrentTime()"| FrameThrottle
Pipeline --> FrameThrottle
FrameThrottle -->|"messageEventsBySubscriberId"| ImagePanel
FrameThrottle --> PlotPanel
Expand Down Expand Up @@ -403,11 +404,14 @@ interface Initialization {
- **BlockLoader**:将整个文件时间线划分为最多 400 个 block,后台预加载
- **Seek backfill**:跳转到目标时间时,为每个已订阅 Topic 获取最近一条消息(确保面板立即有数据显示)
- **启动延迟**:初始化后等待 100ms 再开始播放,让面板先完成 `setSubscriptions`
- **播放时间通道**:高频 `currentTime` 通过 `subscribeCurrentTime()` / `getCurrentTime()` 暴露,不经过 Zustand pipeline store

#### MessagePipeline 层

核心消息分发管线(Zustand store + 自定义发布/订阅):

Zustand store 只承载低频元数据。`PlayerState.activeData.currentTime` 作为兼容快照保留,只在初始化、seek、pause、loop/end 边界、close 等离散事件刷新,不会在播放 tick 中持续更新。

```typescript
interface MessagePipelineState {
// Player 状态
Expand Down Expand Up @@ -438,7 +442,7 @@ interface MessagePipelineState {
1. Player 通过 `setListener` 推送 PlayerState
2. Pipeline 收到后,必须等上一帧所有面板 `renderDone` 后才处理下一帧
3. `msPerFrame` 限制两帧之间的最小间隔
4. 面板通过 `useMessagePipeline(selector)` 只订阅需要的切片
4. 面板通过 `useMessagePipeline(selector)` 只订阅需要的低频元数据切片;实时播放头使用 `subscribeCurrentTime()` 或 `getCurrentTime()`

#### 面板层

Expand Down Expand Up @@ -596,9 +600,14 @@ const subscribersRef = useRef<Set<(time: Time) => void>>(new Set());

function subscribeCurrentTime(callback: (time: Time) => void) {
subscribersRef.current.add(callback);
if (currentTimeRef.current) callback(currentTimeRef.current);
return () => subscribersRef.current.delete(callback);
}

function getCurrentTime() {
return currentTimeRef.current;
}

// 播放推进时直接调 subscriber(不触发 React 渲染)
function advanceTime(time: Time) {
currentTimeRef.current = time;
Expand Down
12 changes: 8 additions & 4 deletions src/core/extensions/buildContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,17 @@ function timeToPercentInternal(current: Time, start: Time, end: Time): number {
return clampPercent(Number((currentNano * 10000n) / total) / 100);
}

function buildPlaybackSnapshot(state: MessagePipelineState['playerState']): PlaybackSnapshot {
function buildPlaybackSnapshot(
state: MessagePipelineState['playerState'],
currentTime: Time | undefined,
): PlaybackSnapshot {
const activeData = state.activeData;
const pr = state.progress;
return {
presence: state.presence,
startTime: activeData?.startTime,
endTime: activeData?.endTime,
currentTime: activeData?.currentTime,
currentTime,
isPlaying: activeData?.isPlaying ?? false,
isLooping: activeData?.isLooping ?? true,
speed: activeData?.speed ?? 1,
Expand Down Expand Up @@ -104,7 +107,7 @@ export function createPlaybackControlsApi(
playUntil: (time) =>
new Promise<void>((resolve) => {
const targetNs = toNano(time);
const snap = buildPlaybackSnapshot(getPlayerState());
const snap = buildPlaybackSnapshot(getPlayerState(), player.getCurrentTime());
const cur = snap.currentTime;
if (cur && toNano(cur) >= targetNs) {
resolve();
Expand All @@ -120,7 +123,8 @@ export function createPlaybackControlsApi(
player.play();
}),
subscribeCurrentTime: (cb) => player.subscribeCurrentTime(cb),
getSnapshot: () => buildPlaybackSnapshot(getPlayerState()),
getCurrentTime: () => player.getCurrentTime(),
getSnapshot: () => buildPlaybackSnapshot(getPlayerState(), player.getCurrentTime()),
};
}

Expand Down
2 changes: 2 additions & 0 deletions src/core/extensions/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ export interface PlaybackControlsApi {
*/
playUntil(time: Time): Promise<void>;
subscribeCurrentTime(cb: (time: Time) => void): Unsubscribe;
/** Latest playback time without subscribing React to high-frequency pipeline state. */
getCurrentTime(): Time | undefined;
getSnapshot(): PlaybackSnapshot;
}

Expand Down
7 changes: 4 additions & 3 deletions src/core/pipeline/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ const EMPTY_DATATYPES = {} as RosDatatypes;
* Message state (lastMessageByTopic, per-subscriber batches, seq counters) is
* kept out of this store and lives in `messageBus` with per-key subscriptions.
* This store only holds slowly-changing metadata so that per-tick fan-out does
* not wake every useMessagePipeline subscriber.
* not wake every useMessagePipeline subscriber. Real-time playback time is
* exposed through Player.subscribeCurrentTime/getCurrentTime instead.
*/
export const useMessagePipelineStore = create<MessagePipelineState>((set) => ({
playerState: { presence: 'preinit', progress: {} },
Expand All @@ -42,8 +43,8 @@ export const useMessagePipelineStore = create<MessagePipelineState>((set) => ({
}
// Re-use existing references when the underlying identity has not changed
// so that selectors returning these fields are Object.is-equal and React
// skips the re-render, while still notifying selectors that depend on
// playerState itself (e.g. currentTime via activeData).
// skips the re-render for selectors that do not depend on playerState
// identity itself.
return {
playerState,
sortedTopics: ad.topics === state.sortedTopics ? state.sortedTopics : ad.topics,
Expand Down
70 changes: 70 additions & 0 deletions src/core/players/IterablePlayer.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { afterEach, describe, expect, it, vi } from 'vitest';
import { IterablePlayer } from './IterablePlayer';
import { messageBus } from '@/core/pipeline/messageBus';
import { useMessagePipelineStore } from '@/core/pipeline/store';
import type { Initialization, MessageEvent } from '@/core/types/ros';
import type { PlayerState } from '@/core/types/player';
import type { WorkerSerializedSource } from '@/infra/workers/WorkerSerializedSource';
Expand Down Expand Up @@ -266,6 +267,75 @@ describe('IterablePlayer playback clock', () => {
player.close();
});

it('returns current time through the imperative getter', async () => {
const source = makeSource([]);
const player = new IterablePlayer(source);

await player.initialize({});
expect(player.getCurrentTime()).toEqual({ sec: 0, nsec: 0 });

player.seek({ sec: 3, nsec: 250_000_000 });
await flushAsyncWork();
expect(player.getCurrentTime()).toEqual({ sec: 3, nsec: 250_000_000 });

player.close();
});

it('does not advance pipeline-store currentTime on pure playback ticks', async () => {
let now = 0;
let nextRafId = 1;
const oldPerformanceNow = performance.now;
const oldRequestAnimationFrame = globalThis.requestAnimationFrame;
const oldCancelAnimationFrame = globalThis.cancelAnimationFrame;
const rafCallbacks = new Map<number, FrameRequestCallback>();
Object.defineProperty(performance, 'now', {
configurable: true,
value: () => now,
});
globalThis.requestAnimationFrame = vi.fn((cb: FrameRequestCallback) => {
const id = nextRafId++;
rafCallbacks.set(id, cb);
return id;
});
globalThis.cancelAnimationFrame = vi.fn((id: number) => {
rafCallbacks.delete(id);
});

const source = makeSource([]);
const player = new IterablePlayer(source);
const seenTimes: number[] = [];

try {
await player.initialize({});
player.play();
const pipelineTimeBefore = useMessagePipelineStore.getState().playerState.activeData?.currentTime;
const unsubscribeTime = player.subscribeCurrentTime((time) => {
seenTimes.push(time.sec + time.nsec / 1e9);
});

now = 1000;
const firstRaf = Math.min(...rafCallbacks.keys());
rafCallbacks.get(firstRaf)?.(now);
await flushAsyncWork();

expect(seenTimes.at(-1)).toBeCloseTo(1, 3);
expect(player.getCurrentTime()).toEqual({ sec: 1, nsec: 0 });
expect(useMessagePipelineStore.getState().playerState.activeData?.currentTime).toBe(
pipelineTimeBefore,
);

unsubscribeTime();
} finally {
player.close();
Object.defineProperty(performance, 'now', {
configurable: true,
value: oldPerformanceNow,
});
globalThis.requestAnimationFrame = oldRequestAnimationFrame;
globalThis.cancelAnimationFrame = oldCancelAnimationFrame;
}
});

it('does not catch up wall time elapsed while the page is hidden', async () => {
let now = 0;
const oldPerformanceNow = performance.now;
Expand Down
45 changes: 38 additions & 7 deletions src/core/players/IterablePlayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ function isSameTimeRanges(nextValue?: TimeRange[], prevValue?: TimeRange[]): boo
return true;
}

function isSameTime(nextValue: Time | undefined, prevValue: Time | undefined): boolean {
if (nextValue === prevValue) {
return true;
}
if (!nextValue || !prevValue) {
return false;
}
return nextValue.sec === prevValue.sec && nextValue.nsec === prevValue.nsec;
}

type SharedPayloadRingProgress = NonNullable<PlayerState["progress"]["sharedPayloadRing"]>;

function isSameSharedPayloadRing(
Expand Down Expand Up @@ -178,6 +188,10 @@ export class IterablePlayer implements Player {
};
}

getCurrentTime(): Time | undefined {
return this._state.presence === "closed" ? undefined : this._currentTime;
}

registerSubscriptions(panelId: string, subscriptions: Subscription[]): void {
this._subscriptionsByPanel.set(panelId, subscriptions);
void this._rebuildSubscriptions();
Expand Down Expand Up @@ -661,29 +675,46 @@ export class IterablePlayer implements Player {
}
}

/** Replace activeData with a shallow copy so Zustand/React see a new reference. */
private _syncActiveDataSlice(): void {
/**
* Replace activeData only when slow metadata changes.
*
* Playback time itself is high-frequency and is delivered through
* subscribeCurrentTime/getCurrentTime. Keeping it out of the periodic pipeline
* emit prevents every useMessagePipeline selector from waking during playback.
*/
private _syncActiveDataSlice(options: { includeCurrentTime?: boolean } = {}): boolean {
const cur = this._state.activeData;
if (!cur) return;
if (!cur) return false;
const nextCurrentTime = options.includeCurrentTime === true ? this._currentTime : cur.currentTime;
const unchanged =
isSameTime(cur.currentTime, nextCurrentTime) &&
cur.isPlaying === this._isPlaying &&
cur.isLooping === this._isLooping &&
cur.speed === this._speed;
if (unchanged) {
return false;
}
this._state.activeData = {
...cur,
currentTime: this._currentTime,
currentTime: nextCurrentTime,
isPlaying: this._isPlaying,
isLooping: this._isLooping,
speed: this._speed,
};
return true;
}

private _maybeEmitPipelineState(): void {
const now = performance.now();
if (now - this._lastPipelineEmitMs < PIPELINE_EMIT_INTERVAL_MS) return;
this._lastPipelineEmitMs = now;
this._syncActiveDataSlice();
useMessagePipelineStore.getState().setPlayerState(this._state);
if (this._syncActiveDataSlice({ includeCurrentTime: false })) {
useMessagePipelineStore.getState().setPlayerState(this._state);
}
}

private _emitState() {
this._syncActiveDataSlice();
this._syncActiveDataSlice({ includeCurrentTime: true });

if (this._listener) {
this._listener(this._state);
Expand Down
Loading
Loading