Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,23 @@ Auto-Approve Route to Inspector

### Sync to Cloud

Only **approved episodes** (QA score 90% or inspector-approved) are synced to cloud:
Only **approved episodes** (QA score >= 90% or inspector-approved) are eligible for cloud sync:

```
Edge MinIO ══(push)══► Cloud S3
```

Sync is handled via `POST /api/v1/sync/episode` with:
- Sidecar JSON (metadata)
Keystone should default to manual cloud sync scheduling. `KEYSTONE_SYNC_ENABLED`
controls whether cloud sync capability and the worker are available.
`KEYSTONE_SYNC_AUTO_SCAN_ENABLED` controls whether the worker periodically
discovers newly eligible approved unsynced episodes, and its default is
`false`. With the default setting, recorded data remains in edge MinIO until an
admin manually syncs one episode or explicitly scans and queues eligible
episodes.

Sync upload is executed by the edge `SyncWorker` through the cloud data gateway
with:
- Sidecar JSON metadata converted to raw tags
- MCAP file (raw sensor data)
- SHA-256 checksums for integrity validation

Expand Down Expand Up @@ -376,10 +385,14 @@ EDGE (Source of Truth) CLOUD (Eventually Consistent Replica)

**After network restored**:

1. Edge sync worker resumes
2. Queued episodes uploaded (`POST /api/v1/sync/episode`)
3. Cloud processes each asynchronously (returns `202 Accepted`)
4. Cloud DB becomes eventually consistent with edge
1. Edge sync worker resumes and drains persisted `pending` sync rows.
2. Already queued or retryable sync jobs continue according to retry policy.
3. If `KEYSTONE_SYNC_AUTO_SCAN_ENABLED=true`, newly eligible approved unsynced
episodes are discovered and queued automatically.
4. If automatic discovery is disabled, new local episodes remain unsynced until
an admin manually syncs one episode or explicitly scans eligible episodes.
5. Cloud processes each upload asynchronously and cloud DB becomes eventually
consistent with edge.

### Sync Guarantees

Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ Configuration is loaded from environment variables. See [`docker/.env.example`](
| `KEYSTONE_MINIO_ENDPOINT` | `http://localhost:9000` | MinIO endpoint |
| `KEYSTONE_MYSQL_HOST` | `localhost` | MySQL host |
| `KEYSTONE_MYSQL_PASSWORD` | *required* | MySQL password |
| `KEYSTONE_SYNC_ENABLED` | `true` | Enable cloud sync capability, worker, and manual sync APIs when cloud endpoints and credentials are configured |
| `KEYSTONE_SYNC_AUTO_SCAN_ENABLED` | `false` | Enable periodic automatic discovery of newly eligible approved unsynced episodes |

### Cloud Sync Credentials

Expand All @@ -85,6 +87,14 @@ to `AuthService.ExchangeCredential` as `credential_base64`. Keystone does not
decode, split, validate, or derive `site_id` / secret values from this key; the
cloud AuthService owns credential interpretation and validation.

Cloud sync capability and automatic scheduling are separate. Keep
`KEYSTONE_SYNC_ENABLED=true` when admins should be able to manually sync data to
cloud. Leave `KEYSTONE_SYNC_AUTO_SCAN_ENABLED=false` for the default manual-only
mode, where newly recorded or newly approved episodes remain local until an
admin triggers single-episode sync or an explicit batch scan. Set
`KEYSTONE_SYNC_AUTO_SCAN_ENABLED=true` only when the site should automatically
queue every newly eligible approved unsynced episode.

## Project Structure

```
Expand Down
27 changes: 17 additions & 10 deletions cmd/keystone-edge/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"os"
Expand Down Expand Up @@ -154,17 +155,18 @@ func main() {
}

syncWorker = services.NewSyncWorker(db.DB, uploader, s3Client, cfg.Storage.Bucket, services.SyncWorkerConfig{
BatchSize: cfg.Sync.BatchSize,
MaxConcurrent: cfg.Sync.MaxConcurrent,
MaxRetries: cfg.Sync.MaxRetries,
IntervalSec: cfg.Sync.WorkerIntervalSec,
RetryBaseSec: cfg.Sync.RetryBaseSec,
RetryMaxSec: cfg.Sync.RetryMaxSec,
RetryJitterSec: cfg.Sync.RetryJitterSec,
BatchSize: cfg.Sync.BatchSize,
MaxConcurrent: cfg.Sync.MaxConcurrent,
MaxRetries: cfg.Sync.MaxRetries,
AutoScanEnabled: cfg.Sync.AutoScanEnabled,
IntervalSec: cfg.Sync.WorkerIntervalSec,
RetryBaseSec: cfg.Sync.RetryBaseSec,
RetryMaxSec: cfg.Sync.RetryMaxSec,
RetryJitterSec: cfg.Sync.RetryJitterSec,
}, &cfg.Sync)

syncWorker.Start()
logger.Printf("[SYNC] Cloud sync worker started: auth=%s gateway=%s", cfg.Sync.AuthEndpoint, cfg.Sync.GatewayEndpoint)
logger.Printf("[SYNC] Cloud sync worker started: auth=%s gateway=%s auto_scan=%t", cfg.Sync.AuthEndpoint, cfg.Sync.GatewayEndpoint, cfg.Sync.AutoScanEnabled)
} else {
logger.Println("[SYNC] Cloud sync disabled (KEYSTONE_SYNC_ENABLED=false or missing endpoints)")
}
Expand All @@ -184,11 +186,16 @@ func main() {

logger.Println("[SERVER] Shutting down...")

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
shutdownTimeout := 30 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()

if err := srv.Shutdown(ctx); err != nil {
logger.Printf("[SERVER] Error during shutdown: %v", err)
if errors.Is(err, context.DeadlineExceeded) || errors.Is(ctx.Err(), context.DeadlineExceeded) {
logger.Printf("[SERVER] Error during shutdown after %s (timeout_ms=%d): %v", shutdownTimeout, shutdownTimeout.Milliseconds(), err)
} else {
logger.Printf("[SERVER] Error during shutdown: %v", err)
}
}

logger.Println("[SERVER] Keystone Edge stopped")
Expand Down
66 changes: 62 additions & 4 deletions docs/designs/cloud-sync-persistent-queue-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,25 @@ The polling worker periodically:

Automatic retry also does not create an observable queued period.

### 2.3 Frontend Listing
### 2.3 Automatic Discovery

The worker also has a discovery path that finds approved, unsynced episodes and
enqueues them without an operator action. That behavior should be controlled by
`KEYSTONE_SYNC_AUTO_SCAN_ENABLED` and should default to `false`.

With the default disabled setting:

- newly recorded or newly approved episodes remain local after upload/QA;
- single-episode manual sync still works through `POST /api/v1/sync/episodes/:id`;
- explicit batch scan still works through `POST /api/v1/sync/episodes`;
- persisted `pending` rows and due retries from already attempted sync jobs are
still processed by the worker.

This separates automatic discovery from retry/backoff recovery. A failed manual
sync can continue retrying according to sync retry policy without re-enabling
automatic discovery for unrelated new episodes.

### 2.4 Frontend Listing

Cloud Sync Center lists one row per episode using the latest `sync_logs` row.
Its queued count is the number of latest rows whose status is `pending`.
Expand All @@ -90,6 +108,8 @@ Therefore, a job that only exists in memory cannot be counted as queued.
- Preserve manual retry semantics: operators can retry exhausted or backoff
failures explicitly.
- Preserve automatic retry limits and backoff behavior.
- Disable automatic discovery of newly eligible episodes by default while keeping
manual enqueue and retry recovery available.
- Allow queued work to recover after Keystone restarts.
- Keep the change scoped to the existing `sync_logs` model unless stronger
queue semantics become necessary later.
Expand Down Expand Up @@ -164,10 +184,12 @@ The worker loop should process DB-backed queued work before discovering new work

1. Dispatch latest `pending` sync logs.
2. Promote due failed rows to `pending`, then dispatch them.
3. Discover approved, unsynced episodes with no active/latest completed log.
3. Discover approved, unsynced episodes with no active/latest completed log only
when automatic discovery is enabled.

This makes the queue restart-safe. If Keystone crashes after writing `pending`
but before placing the job into memory, the next polling cycle will pick it up.
Disabling automatic discovery must not disable this pending-row recovery path.

### 5.4 Claiming Pending Rows

Expand Down Expand Up @@ -229,6 +251,24 @@ After a successful manual retry:
The frontend should continue treating the backend summary endpoint as the source
of truth.

### 5.7 Automatic Discovery Switch

Add a worker configuration field backed by an environment variable:

| Field | Environment Variable | Default | Meaning |
|-------|----------------------|---------|---------|
| `AutoScanEnabled` | `KEYSTONE_SYNC_AUTO_SCAN_ENABLED` | `false` | Permit periodic discovery of newly eligible approved unsynced episodes |

`KEYSTONE_SYNC_ENABLED` remains the cloud sync capability switch. Setting it to
`false` disables the worker and therefore disables manual sync APIs. To support
manual-only production mode, keep `KEYSTONE_SYNC_ENABLED=true` and leave
`KEYSTONE_SYNC_AUTO_SCAN_ENABLED=false`.

The worker polling loop should be structured so that this switch only gates the
new-episode discovery step. It should not gate dispatch of existing `pending`
rows, manual enqueue acceleration, or retry/backoff handling for already
attempted sync work.

## 6. Risks and Mitigations

### 6.1 Duplicate Pending Rows
Expand Down Expand Up @@ -308,6 +348,16 @@ Mitigation:

## 7. Implementation Plan

### Phase 0: Automatic Discovery Control

- Add `KEYSTONE_SYNC_AUTO_SCAN_ENABLED` with default `false`.
- Add matching `SyncConfig.AutoScanEnabled` and worker config fields.
- Return `auto_scan_enabled` from `GET /api/v1/sync/config` as read-only
runtime state.
- Gate only the worker's newly eligible episode discovery path.
- Keep manual single-episode sync, explicit batch scan, pending-row recovery,
and retry/backoff processing available while the worker is running.

### Phase 1: Durable Manual Queue

- Add a worker method to create or reuse a pending sync log transactionally.
Expand Down Expand Up @@ -347,6 +397,12 @@ Retry 4 failed episodes with MaxConcurrent=2

### Backend Unit Tests

- Worker polling does not auto-discover approved unsynced episodes when
`AutoScanEnabled=false`.
- Worker polling still dispatches persisted pending rows when
`AutoScanEnabled=false`.
- Worker polling auto-discovers approved unsynced episodes when
`AutoScanEnabled=true`.
- Manual retry creates a pending row for a failed exhausted episode.
- Manual retry rejects an episode whose latest row is pending.
- Manual retry rejects an episode whose latest row is in_progress.
Expand Down Expand Up @@ -374,8 +430,10 @@ Retry 4 failed episodes with MaxConcurrent=2

## 9. Recommendation

Implement Phase 1 first. It fixes the operator-visible problem with a small,
contained backend change and does not require a new database table.
Implement Phase 0 and Phase 1 first. Phase 0 prevents newly recorded data from
automatically leaving the edge by default, while Phase 1 preserves clear manual
queue visibility. Both changes are small, contained backend changes and do not
require a new database table.

Do not ship durable pending without DB-backed duplicate protection and polling
recovery. Those two pieces are required for correctness; otherwise the system
Expand Down
44 changes: 40 additions & 4 deletions docs/designs/cloud-sync-ui-implementation.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ Synapse. Data production statistics should keep cloud sync as a metric only, but
should not be the primary place for queue control, retry diagnosis, navigation,
or future sync operations.

Cloud sync scheduling should be explicit by default. Keystone should keep cloud
sync capability available when configured, but automatic discovery of newly
approved unsynced episodes should default to off. Operators or admins can still
manually enqueue one episode or explicitly scan and enqueue eligible episodes.

## 2. Current Backend Capability

### 2.1 Existing Cloud Sync APIs
Expand All @@ -43,7 +48,7 @@ endpoints recommended for the episode-centered Cloud Sync Center redesign:

| Method | Path | Purpose |
|--------|------|---------|
| `POST` | `/api/v1/sync/episodes` | Enqueue pending approved, unsynced episodes for cloud sync |
| `POST` | `/api/v1/sync/episodes` | Manually scan and enqueue approved, unsynced episodes for cloud sync |
| `POST` | `/api/v1/sync/episodes/:id` | Enqueue one episode for cloud sync by numeric episode ID |
| `GET` | `/api/v1/sync/episodes` | Existing: list raw sync log entries for history/diagnosis |
| `GET` | `/api/v1/sync/episodes/summary` | Recommended new endpoint: list latest sync state grouped by episode |
Expand Down Expand Up @@ -77,7 +82,33 @@ from local MinIO, uploads it through the cloud DataGateway/OSS flow, and updates
- `episodes.cloud_mcap_path`
- `episodes.cloud_processed`

### 2.3 Manual Trigger Semantics
### 2.3 Scheduling Modes

Keystone should separate two concerns:

| Concern | Configuration | Default | Behavior |
|---------|---------------|---------|----------|
| Cloud sync capability | `KEYSTONE_SYNC_ENABLED` | `true` | Creates the cloud clients, `SyncWorker`, sync APIs, and manual enqueue path when cloud endpoints and credentials are present |
| Automatic episode discovery | `KEYSTONE_SYNC_AUTO_SCAN_ENABLED` | `false` | When enabled, the worker periodically finds newly eligible approved unsynced episodes and enqueues them without an operator action |

When `KEYSTONE_SYNC_AUTO_SCAN_ENABLED=false`, Keystone must not automatically
enqueue newly completed or newly approved episodes just because they match the
eligibility rules. This is the recommended default for controlled production
sites where data should remain local until an admin explicitly syncs it.

Manual sync remains available while `KEYSTONE_SYNC_ENABLED=true` and the worker
is running:

- `POST /api/v1/sync/episodes/:id` enqueues one approved unsynced episode.
- `POST /api/v1/sync/episodes` explicitly scans and enqueues all currently
eligible approved unsynced episodes.

Automatic retry is separate from automatic discovery. Once an episode has been
manually enqueued and a `sync_logs` row exists, retry/backoff may continue using
the existing retry rules. Disabling automatic discovery should not strand
already queued or already attempted sync work.

### 2.4 Manual Trigger Semantics

The existing manual APIs have different retry behavior.

Expand Down Expand Up @@ -110,7 +141,7 @@ Behavior:
For product clarity, the current batch API should be presented as "scan and
enqueue eligible pending episodes", not as "force retry all failures".

### 2.4 Listing Model
### 2.5 Listing Model

`sync_logs` is an audit-style attempt-chain table. One episode may have multiple
rows because manual retry can create a fresh `sync_logs` row when the latest
Expand Down Expand Up @@ -138,6 +169,7 @@ as an expandable row, drawer, or "View history" action.
| Admin | Retry failed cloud sync jobs safely |
| Admin | Understand why a sync failed |
| Admin | Trigger sync for all eligible unsynced episodes |
| Admin | Keep newly recorded local data from syncing automatically by default |
| Operator | See whether collected data is available, without managing retries |

### 3.2 Design Goals
Expand All @@ -147,6 +179,7 @@ as an expandable row, drawer, or "View history" action.
- Avoid fake progress bars because Keystone does not expose upload percentage.
- Make failure recovery explicit and auditable.
- Keep destructive or expensive bulk actions deliberate.
- Make automatic discovery opt-in; default operation is manual enqueue.
- Preserve current backend behavior while identifying API gaps for follow-up.
- Keep the data production statistics page focused on analysis, not operations.

Expand Down Expand Up @@ -243,7 +276,7 @@ Primary actions:
| Scan and queue eligible unsynced episodes | `POST /api/v1/sync/episodes` | Confirm before enqueue, then refresh counts/table |
| Retry one failed episode | `POST /api/v1/sync/episodes/:id` | Row-level action, then refresh the row/list |
| Retry failed episodes in bulk | Not explicit yet | TODO; requires backend API semantics |
| Pause/resume automatic sync | Not supported yet | TODO; do not show as active control |
| Enable/disable automatic discovery | Config only | Default off; show read-only state, do not expose as a live UI toggle |
| Edit sync configuration | Not supported yet | TODO; read-only summary for now |

The page should make queue state explicit:
Expand Down Expand Up @@ -730,6 +763,7 @@ TODO: extend `GET /sync/config` with non-sensitive runtime values:
```json
{
"worker_running": true,
"auto_scan_enabled": false,
"batch_size": 10,
"max_concurrent": 2,
"max_retries": 5,
Expand Down Expand Up @@ -822,6 +856,7 @@ Acceptance criteria:
- Add latest sync summary to episode list/detail responses.
- Add batch trigger body with `force` and filters.
- Extend sync config response with read-only non-sensitive values.
- Add `auto_scan_enabled` to sync config response.
- Add admin auth guard for sync APIs.

Acceptance criteria:
Expand All @@ -841,3 +876,4 @@ Acceptance criteria:
| Should batch sync use current statistics filters? | Only after backend supports filter parameters |
| Should long-running uploads show percentage? | No; backend does not expose progress |
| Should `.env` sync config be editable in Synapse? | Not now; keep as TODO after read-only config visibility |
| Should automatic discovery run by default? | No; default `KEYSTONE_SYNC_AUTO_SCAN_ENABLED=false` |
Loading
Loading