Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0529bd8
feat: add EventsStorage interface, writer, and S2 backend for browser…
archandatta May 11, 2026
6e50eb4
feat: wire S2 storage writer into main with ordered shutdown
archandatta May 11, 2026
7868205
feat: pass S2_BASIN, S2_ACCESS_TOKEN, and S2_STREAM through superviso…
archandatta May 11, 2026
bbf4a35
fix: update config tests to expect S2 batcher defaults
archandatta May 11, 2026
c9a6445
chore: promote s2-sdk-go to direct dependency
archandatta May 11, 2026
f42f992
review: rename Storage/StorageWriter, use time.Duration for batcher l…
archandatta May 11, 2026
92a96b6
fix: remove dead defaults, bound ack goroutine contexts, embed s2Prod…
archandatta May 11, 2026
64e42f0
fix: return err directly from Run, inject logger, enforce single-use …
archandatta May 11, 2026
4d63909
fix: include dropped seq range in storage writer log
archandatta May 11, 2026
9df3e6c
feat: count append errors on StorageWriter for ops visibility
archandatta May 11, 2026
cc1d744
fix: drain ring after HTTP shutdown to prevent data loss on exit
archandatta May 11, 2026
fc0c7db
review: redact S2AccessToken in config logs, bound Close with context…
archandatta May 12, 2026
040f361
review: update eventsstorage comments and test error handling
archandatta May 12, 2026
c6467b3
review: remove DroppedFrom/DroppedTo fields and trim redundant comments
archandatta May 12, 2026
ab5d8ba
fix: pass context.Background() to NewS2Storage so S2 pipeline outlive…
archandatta May 12, 2026
42f1c47
fix: apply S2Config documented defaults when zero values are passed
archandatta May 12, 2026
3f9550f
fix: always call p.Close() in s2Producer.close even when ack drain ti…
archandatta May 12, 2026
c9e555a
review: bound p.Close() with ctx, give Close its own deadline, collap…
archandatta May 13, 2026
a1742b3
review: bound AppendSession setup with a 30s signal-aware context; ba…
archandatta May 13, 2026
5383db9
review: collapse s2 storage into S2StorageWriter with Start/Stop life…
archandatta May 13, 2026
d174e12
fix: give AppendSession its own context so SIGTERM doesn't kill the s…
archandatta May 13, 2026
750e244
fix: add ctx guard to newS2Storage; fix misleading comments on sessio…
archandatta May 13, 2026
b478192
review: add comment for s2writer shutdown
archandatta May 13, 2026
0f67535
Merge branch 'main' into archand/kernel-1116/browser-telemetry/add-s2
archandatta May 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions images/chromium-headful/run-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ if [[ -n "${PLAYWRIGHT_ENGINE:-}" ]]; then
RUN_ARGS+=( -e PLAYWRIGHT_ENGINE="$PLAYWRIGHT_ENGINE" )
fi

# S2 durable event storage (all three must be set to enable the sink)
if [[ -n "${S2_BASIN:-}" ]]; then
RUN_ARGS+=( -e S2_BASIN="$S2_BASIN" )
fi
if [[ -n "${S2_ACCESS_TOKEN:-}" ]]; then
RUN_ARGS+=( -e S2_ACCESS_TOKEN="$S2_ACCESS_TOKEN" )
fi
if [[ -n "${S2_STREAM:-}" ]]; then
RUN_ARGS+=( -e S2_STREAM="$S2_STREAM" )
fi

# WebRTC port mapping
if [[ "${ENABLE_WEBRTC:-}" == "true" ]]; then
echo "Running container with WebRTC"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[program:kernel-images-api]
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api'
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" S2_BASIN="${S2_BASIN:-}" S2_ACCESS_TOKEN="${S2_ACCESS_TOKEN:-}" S2_STREAM="${S2_STREAM:-}" exec /usr/local/bin/kernel-images-api'
autostart=false
autorestart=true
startsecs=0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[program:kernel-images-api]
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api'
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" S2_BASIN="${S2_BASIN:-}" S2_ACCESS_TOKEN="${S2_ACCESS_TOKEN:-}" S2_STREAM="${S2_STREAM:-}" exec /usr/local/bin/kernel-images-api'
autostart=false
autorestart=true
startsecs=0
Expand Down
11 changes: 11 additions & 0 deletions images/chromium-headless/run-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ if [[ -n "${PLAYWRIGHT_ENGINE:-}" ]]; then
RUN_ARGS+=( -e PLAYWRIGHT_ENGINE="$PLAYWRIGHT_ENGINE" )
fi

# S2 durable event storage (all three must be set to enable the sink)
if [[ -n "${S2_BASIN:-}" ]]; then
RUN_ARGS+=( -e S2_BASIN="$S2_BASIN" )
fi
if [[ -n "${S2_ACCESS_TOKEN:-}" ]]; then
RUN_ARGS+=( -e S2_ACCESS_TOKEN="$S2_ACCESS_TOKEN" )
fi
if [[ -n "${S2_STREAM:-}" ]]; then
RUN_ARGS+=( -e S2_STREAM="$S2_STREAM" )
fi

# If a positional argument is given, use it as the entrypoint
ENTRYPOINT_ARG=()
if [[ $# -ge 1 && -n "$1" ]]; then
Expand Down
21 changes: 21 additions & 0 deletions server/cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ func main() {
}
captureSession := capturesession.NewCaptureSession(eventStream)

// Optional S2 storage sink.
var s2Writer *events.S2StorageWriter
if config.S2Basin != "" && config.S2AccessToken != "" && config.S2Stream != "" {
slogger.Info("S2 storage enabled", "basin", config.S2Basin, "stream", config.S2Stream)
s2Writer = events.NewS2StorageWriter(eventStream, config.S2Basin, config.S2AccessToken, config.S2Stream, events.S2Config{}, slogger)
if err := s2Writer.Start(ctx); err != nil {
slogger.Error("failed to start S2 storage writer", "err", err)
os.Exit(1)
}
}

apiService, err := api.New(
recorder.NewFFmpegManager(),
recorder.NewFFmpegRecorderFactory(config.PathToFFmpeg, defaultParams, stz),
Expand Down Expand Up @@ -269,6 +280,16 @@ func main() {
if err := g.Wait(); err != nil {
slogger.Error("server failed to shutdown", "err", err)
}

// s2Writer shuts down after the servers above, since they might produce events we
// want to capture into the stream; we must let them finish before closing the writer.
if s2Writer != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should have a comment here explaining why this shutdown happens after the other shutdown()s (because the other things above could produce events into the stream, and we don't want to drop those events, so we wait for them to complete shutdown)

stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer stopCancel()
if err := s2Writer.Stop(stopCtx); err != nil {
slogger.Error("s2 storage writer stop failed", "err", err)
}
Comment thread
cursor[bot] marked this conversation as resolved.
}
}

func mustFFmpeg() {
Expand Down
31 changes: 31 additions & 0 deletions server/cmd/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"fmt"
"log/slog"
"time"

"github.com/kelseyhightower/envconfig"
Expand Down Expand Up @@ -35,6 +36,36 @@ type Config struct {
// DevTools proxy address passed to ChromeDriver as goog:chromeOptions.debuggerAddress.
// If empty, it is derived from DevToolsProxyPort as 127.0.0.1:<port>.
DevToolsProxyAddr string `envconfig:"DEVTOOLS_PROXY_ADDR" default:""`

// S2 durable event storage. All three fields must be set to enable the S2 sink.
S2Basin string `envconfig:"S2_BASIN" default:""`
S2AccessToken string `envconfig:"S2_ACCESS_TOKEN" default:""`
S2Stream string `envconfig:"S2_STREAM" default:""`
}

// LogValue implements slog.LogValuer, redacting secret fields.
func (c *Config) LogValue() slog.Value {
s2AccessToken := ""
if c.S2AccessToken != "" {
s2AccessToken = "[redacted]"
}
return slog.GroupValue(
slog.Int("port", c.Port),
slog.Int("frame_rate", c.FrameRate),
slog.Int("display_num", c.DisplayNum),
slog.Int("max_size_mb", c.MaxSizeInMB),
slog.String("output_dir", c.OutputDir),
slog.String("ffmpeg_path", c.PathToFFmpeg),
slog.Int("devtools_proxy_port", c.DevToolsProxyPort),
slog.Bool("log_cdp_messages", c.LogCDPMessages),
slog.Duration("scale_to_zero_cooldown", c.ScaleToZeroCooldown),
slog.Int("chromedriver_proxy_port", c.ChromeDriverProxyPort),
slog.String("chromedriver_upstream_addr", c.ChromeDriverUpstreamAddr),
slog.String("devtools_proxy_addr", c.DevToolsProxyAddr),
slog.String("s2_basin", c.S2Basin),
slog.String("s2_access_token", s2AccessToken),
slog.String("s2_stream", c.S2Stream),
)
}

// Load loads configuration from environment variables
Expand Down
104 changes: 104 additions & 0 deletions server/e2e/e2e_s2_storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package e2e

import (
"context"
"net/http"
"os"
"os/exec"
"testing"
"time"

"github.com/s2-streamstore/s2-sdk-go/s2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

instanceoapi "github.com/kernel/kernel-images/server/lib/oapi"
)

// TestS2StorageWriter starts a headless container with S2 credentials, runs a
// capture session, and verifies that events land in the configured S2 stream.
//
// Skips automatically when S2_BASIN, S2_ACCESS_TOKEN, or S2_STREAM are unset.
func TestS2StorageWriter(t *testing.T) {
basin := os.Getenv("S2_BASIN")
accessToken := os.Getenv("S2_ACCESS_TOKEN")
stream := os.Getenv("S2_STREAM")
if basin == "" || accessToken == "" || stream == "" {
t.Skip("S2_BASIN, S2_ACCESS_TOKEN, and S2_STREAM must be set to run this test")
}

if _, err := exec.LookPath("docker"); err != nil {
t.Skipf("docker not available: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()

c := NewTestContainer(t, headlessImage)
require.NoError(t, c.Start(ctx, ContainerConfig{
Env: map[string]string{
"S2_BASIN": basin,
"S2_ACCESS_TOKEN": accessToken,
"S2_STREAM": stream,
},
}), "failed to start container")
defer c.Stop(ctx)

require.NoError(t, c.WaitReady(ctx), "api not ready")

client, err := c.APIClient()
require.NoError(t, err)

// Note the current S2 stream tail seq before we write anything so we only
// read records produced by this test run.
s2Client := s2.New(accessToken, nil)
streamClient := s2Client.Basin(basin).Stream(s2.StreamName(stream))

checkResp, err := streamClient.CheckTail(ctx)
require.NoError(t, err, "check tail before test")
startSeq := checkResp.Tail.SeqNum

// Start a capture session.
startResp, err := client.StartCaptureSessionWithResponse(ctx, instanceoapi.StartCaptureSessionJSONRequestBody{})
require.NoError(t, err)
require.Equal(t, http.StatusCreated, startResp.StatusCode(), "start capture session: %s", string(startResp.Body))
require.NotNil(t, startResp.JSON201)
sessionID := startResp.JSON201.Id
t.Logf("capture session started: %s", sessionID)

// Let the session run briefly so at least one event is published (the
// session_started system event is emitted on session start).
time.Sleep(500 * time.Millisecond)

// Stop the capture session.
stopResp, err := client.StopCaptureSessionWithResponse(ctx)
require.NoError(t, err)
require.Equal(t, http.StatusOK, stopResp.StatusCode(), "stop capture session: %s", string(stopResp.Body))
t.Log("capture session stopped")

// Give the storage writer time to flush to S2 (batcher linger + network).
time.Sleep(2 * time.Second)

// Read records written after the pre-test tail and verify at least one
// envelope is present.
readCtx, readCancel := context.WithTimeout(ctx, 10*time.Second)
defer readCancel()

readSession, err := streamClient.ReadSession(readCtx, &s2.ReadOptions{
SeqNum: s2.Uint64(startSeq),
})
require.NoError(t, err, "open S2 read session")
defer readSession.Close()

var count int
for readSession.Next() {
count++
}
// EOF is expected once we reach the tail — not an error.
if err := readSession.Err(); err != nil && readCtx.Err() == nil {
t.Fatalf("S2 read session error: %v", err)
}

assert.Greater(t, count, 0, "expected at least one event record in S2 stream %q", stream)
t.Logf("found %d record(s) in S2 stream after seq %d", count, startSeq)
}
2 changes: 2 additions & 0 deletions server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/m1k1o/neko/server v0.0.0-20251008185748-46e2fc7d3866
github.com/nrednav/cuid2 v1.1.0
github.com/oapi-codegen/runtime v1.2.0
github.com/s2-streamstore/s2-sdk-go v0.16.1
github.com/samber/lo v1.52.0
github.com/stretchr/testify v1.11.1
github.com/testcontainers/testcontainers-go v0.40.0
Expand Down Expand Up @@ -99,6 +100,7 @@ require (
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
golang.org/x/crypto v0.43.0 // indirect
golang.org/x/mod v0.28.0 // indirect
golang.org/x/net v0.45.0 // indirect
golang.org/x/text v0.30.0 // indirect
golang.org/x/tools v0.37.0 // indirect
google.golang.org/protobuf v1.36.10 // indirect
Expand Down
2 changes: 2 additions & 0 deletions server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/s2-streamstore/s2-sdk-go v0.16.1 h1:18Qht850wUhIb9JZkMwF5EJWfnmZnjdtW3z8xOuL7Ys=
github.com/s2-streamstore/s2-sdk-go v0.16.1/go.mod h1:1a+v2sGqU+s5neI8XwqRJz78ktStkR+mZH/JEi9HNSo=
github.com/samber/lo v1.52.0 h1:Rvi+3BFHES3A8meP33VPAxiBZX/Aws5RxrschYGjomw=
github.com/samber/lo v1.52.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
Expand Down
96 changes: 96 additions & 0 deletions server/lib/events/eventsstorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package events

import (
"context"
"fmt"
"log/slog"
"sync"
"sync/atomic"
)

type Storage interface {
Append(ctx context.Context, env Envelope) error
Close(ctx context.Context) error
}

// StorageWriter reads from the ring buffer and forwards each envelope to
// Storage. Single-use and not thread-safe: call Run once, then after
// it returns call Drain followed by Close. Reads start from the oldest
// available event in the ring, not the current tail. Delivery is
// at-least-once; consumers should dedupe by env.Seq.
type StorageWriter struct {
reader *Reader
storage Storage
log *slog.Logger
once sync.Once
appendErrors atomic.Uint64 // total append failures; best-effort, not retried
}

// NewStorageWriter creates a writer that reads from es starting at seq 0.
func NewStorageWriter(es *EventStream, storage Storage, log *slog.Logger) *StorageWriter {
return &StorageWriter{
reader: es.NewReader(0),
storage: storage,
log: log,
}
}

// Run reads from the ring buffer and appends each envelope to storage until
// ctx is cancelled. Returns the context error on clean shutdown. Must be
// called at most once; returns an error on a second call.
func (w *StorageWriter) Run(ctx context.Context) error {
firstCall := false
w.once.Do(func() { firstCall = true })
if !firstCall {
return fmt.Errorf("events: StorageWriter.Run called more than once")
}

for {
res, err := w.reader.Read(ctx)
if err != nil {
return err
}
if err := w.processResult(ctx, res); err != nil {
return err
}
}
}

// Drain reads any events still in the ring non-blockingly until caught up or
// ctx expires. Call after all publishers have stopped and Run has returned to
// ensure no events are silently skipped on shutdown.
func (w *StorageWriter) Drain(ctx context.Context) error {
for {
select {
case <-ctx.Done():
w.log.Warn("storage writer: drain deadline exceeded, ring may have unread events")
return ctx.Err()
default:
}

res, ok := w.reader.TryRead()
if !ok {
return nil
}
if err := w.processResult(ctx, res); err != nil {
return err
}
}
}

func (w *StorageWriter) processResult(ctx context.Context, res ReadResult) error {
if res.Dropped > 0 {
w.log.Warn("storage writer: dropped events", "count", res.Dropped)
return nil
}
if err := w.storage.Append(ctx, *res.Envelope); err != nil {
total := w.appendErrors.Add(1)
w.log.Error("storage writer: append failed", "seq", res.Envelope.Seq, "err", err, "total_append_errors", total)
}
return nil
}

// Close drains in-flight writes and releases backend resources.
func (w *StorageWriter) Close(ctx context.Context) error {
return w.storage.Close(ctx)
}
Loading
Loading