diff --git a/images/chromium-headful/run-docker.sh b/images/chromium-headful/run-docker.sh index aad41331..0c1255d7 100755 --- a/images/chromium-headful/run-docker.sh +++ b/images/chromium-headful/run-docker.sh @@ -71,6 +71,17 @@ if [[ -n "${PLAYWRIGHT_ENGINE:-}" ]]; then RUN_ARGS+=( -e PLAYWRIGHT_ENGINE="$PLAYWRIGHT_ENGINE" ) fi +# S2 durable event storage (all three must be set to enable the sink) +if [[ -n "${S2_BASIN:-}" ]]; then + RUN_ARGS+=( -e S2_BASIN="$S2_BASIN" ) +fi +if [[ -n "${S2_ACCESS_TOKEN:-}" ]]; then + RUN_ARGS+=( -e S2_ACCESS_TOKEN="$S2_ACCESS_TOKEN" ) +fi +if [[ -n "${S2_STREAM:-}" ]]; then + RUN_ARGS+=( -e S2_STREAM="$S2_STREAM" ) +fi + # WebRTC port mapping if [[ "${ENABLE_WEBRTC:-}" == "true" ]]; then echo "Running container with WebRTC" diff --git a/images/chromium-headful/supervisor/services/kernel-images-api.conf b/images/chromium-headful/supervisor/services/kernel-images-api.conf index 0638dea8..064aa538 100644 --- a/images/chromium-headful/supervisor/services/kernel-images-api.conf +++ b/images/chromium-headful/supervisor/services/kernel-images-api.conf @@ -1,5 +1,5 @@ [program:kernel-images-api] -command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api' +command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" S2_BASIN="${S2_BASIN:-}" S2_ACCESS_TOKEN="${S2_ACCESS_TOKEN:-}" S2_STREAM="${S2_STREAM:-}" exec /usr/local/bin/kernel-images-api' autostart=false autorestart=true startsecs=0 diff --git a/images/chromium-headless/image/supervisor/services/kernel-images-api.conf b/images/chromium-headless/image/supervisor/services/kernel-images-api.conf index 0638dea8..064aa538 100644 --- a/images/chromium-headless/image/supervisor/services/kernel-images-api.conf +++ b/images/chromium-headless/image/supervisor/services/kernel-images-api.conf @@ -1,5 +1,5 @@ [program:kernel-images-api] -command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api' +command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" S2_BASIN="${S2_BASIN:-}" S2_ACCESS_TOKEN="${S2_ACCESS_TOKEN:-}" S2_STREAM="${S2_STREAM:-}" exec /usr/local/bin/kernel-images-api' autostart=false autorestart=true startsecs=0 diff --git a/images/chromium-headless/run-docker.sh b/images/chromium-headless/run-docker.sh index 56f582bf..4a670748 100755 --- a/images/chromium-headless/run-docker.sh +++ b/images/chromium-headless/run-docker.sh @@ -24,6 +24,17 @@ if [[ -n "${PLAYWRIGHT_ENGINE:-}" ]]; then RUN_ARGS+=( -e PLAYWRIGHT_ENGINE="$PLAYWRIGHT_ENGINE" ) fi +# S2 durable event storage (all three must be set to enable the sink) +if [[ -n "${S2_BASIN:-}" ]]; then + RUN_ARGS+=( -e S2_BASIN="$S2_BASIN" ) +fi +if [[ -n "${S2_ACCESS_TOKEN:-}" ]]; then + RUN_ARGS+=( -e S2_ACCESS_TOKEN="$S2_ACCESS_TOKEN" ) +fi +if [[ -n "${S2_STREAM:-}" ]]; then + RUN_ARGS+=( -e S2_STREAM="$S2_STREAM" ) +fi + # If a positional argument is given, use it as the entrypoint ENTRYPOINT_ARG=() if [[ $# -ge 1 && -n "$1" ]]; then diff --git a/server/cmd/api/main.go b/server/cmd/api/main.go index 48d17351..dfaa05b6 100644 --- a/server/cmd/api/main.go +++ b/server/cmd/api/main.go @@ -102,6 +102,17 @@ func main() { } captureSession := capturesession.NewCaptureSession(eventStream) + // Optional S2 storage sink. + var s2Writer *events.S2StorageWriter + if config.S2Basin != "" && config.S2AccessToken != "" && config.S2Stream != "" { + slogger.Info("S2 storage enabled", "basin", config.S2Basin, "stream", config.S2Stream) + s2Writer = events.NewS2StorageWriter(eventStream, config.S2Basin, config.S2AccessToken, config.S2Stream, events.S2Config{}, slogger) + if err := s2Writer.Start(ctx); err != nil { + slogger.Error("failed to start S2 storage writer", "err", err) + os.Exit(1) + } + } + apiService, err := api.New( recorder.NewFFmpegManager(), recorder.NewFFmpegRecorderFactory(config.PathToFFmpeg, defaultParams, stz), @@ -269,6 +280,16 @@ func main() { if err := g.Wait(); err != nil { slogger.Error("server failed to shutdown", "err", err) } + + // s2Writer shuts down after the servers above, since they might produce events we + // want to capture into the stream; we must let them finish before closing the writer. + if s2Writer != nil { + stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer stopCancel() + if err := s2Writer.Stop(stopCtx); err != nil { + slogger.Error("s2 storage writer stop failed", "err", err) + } + } } func mustFFmpeg() { diff --git a/server/cmd/config/config.go b/server/cmd/config/config.go index 2fdd4bdb..c2dddced 100644 --- a/server/cmd/config/config.go +++ b/server/cmd/config/config.go @@ -2,6 +2,7 @@ package config import ( "fmt" + "log/slog" "time" "github.com/kelseyhightower/envconfig" @@ -35,6 +36,36 @@ type Config struct { // DevTools proxy address passed to ChromeDriver as goog:chromeOptions.debuggerAddress. // If empty, it is derived from DevToolsProxyPort as 127.0.0.1:. DevToolsProxyAddr string `envconfig:"DEVTOOLS_PROXY_ADDR" default:""` + + // S2 durable event storage. All three fields must be set to enable the S2 sink. + S2Basin string `envconfig:"S2_BASIN" default:""` + S2AccessToken string `envconfig:"S2_ACCESS_TOKEN" default:""` + S2Stream string `envconfig:"S2_STREAM" default:""` +} + +// LogValue implements slog.LogValuer, redacting secret fields. +func (c *Config) LogValue() slog.Value { + s2AccessToken := "" + if c.S2AccessToken != "" { + s2AccessToken = "[redacted]" + } + return slog.GroupValue( + slog.Int("port", c.Port), + slog.Int("frame_rate", c.FrameRate), + slog.Int("display_num", c.DisplayNum), + slog.Int("max_size_mb", c.MaxSizeInMB), + slog.String("output_dir", c.OutputDir), + slog.String("ffmpeg_path", c.PathToFFmpeg), + slog.Int("devtools_proxy_port", c.DevToolsProxyPort), + slog.Bool("log_cdp_messages", c.LogCDPMessages), + slog.Duration("scale_to_zero_cooldown", c.ScaleToZeroCooldown), + slog.Int("chromedriver_proxy_port", c.ChromeDriverProxyPort), + slog.String("chromedriver_upstream_addr", c.ChromeDriverUpstreamAddr), + slog.String("devtools_proxy_addr", c.DevToolsProxyAddr), + slog.String("s2_basin", c.S2Basin), + slog.String("s2_access_token", s2AccessToken), + slog.String("s2_stream", c.S2Stream), + ) } // Load loads configuration from environment variables diff --git a/server/e2e/e2e_s2_storage_test.go b/server/e2e/e2e_s2_storage_test.go new file mode 100644 index 00000000..64d8c9a7 --- /dev/null +++ b/server/e2e/e2e_s2_storage_test.go @@ -0,0 +1,104 @@ +package e2e + +import ( + "context" + "net/http" + "os" + "os/exec" + "testing" + "time" + + "github.com/s2-streamstore/s2-sdk-go/s2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + instanceoapi "github.com/kernel/kernel-images/server/lib/oapi" +) + +// TestS2StorageWriter starts a headless container with S2 credentials, runs a +// capture session, and verifies that events land in the configured S2 stream. +// +// Skips automatically when S2_BASIN, S2_ACCESS_TOKEN, or S2_STREAM are unset. +func TestS2StorageWriter(t *testing.T) { + basin := os.Getenv("S2_BASIN") + accessToken := os.Getenv("S2_ACCESS_TOKEN") + stream := os.Getenv("S2_STREAM") + if basin == "" || accessToken == "" || stream == "" { + t.Skip("S2_BASIN, S2_ACCESS_TOKEN, and S2_STREAM must be set to run this test") + } + + if _, err := exec.LookPath("docker"); err != nil { + t.Skipf("docker not available: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + c := NewTestContainer(t, headlessImage) + require.NoError(t, c.Start(ctx, ContainerConfig{ + Env: map[string]string{ + "S2_BASIN": basin, + "S2_ACCESS_TOKEN": accessToken, + "S2_STREAM": stream, + }, + }), "failed to start container") + defer c.Stop(ctx) + + require.NoError(t, c.WaitReady(ctx), "api not ready") + + client, err := c.APIClient() + require.NoError(t, err) + + // Note the current S2 stream tail seq before we write anything so we only + // read records produced by this test run. + s2Client := s2.New(accessToken, nil) + streamClient := s2Client.Basin(basin).Stream(s2.StreamName(stream)) + + checkResp, err := streamClient.CheckTail(ctx) + require.NoError(t, err, "check tail before test") + startSeq := checkResp.Tail.SeqNum + + // Start a capture session. + startResp, err := client.StartCaptureSessionWithResponse(ctx, instanceoapi.StartCaptureSessionJSONRequestBody{}) + require.NoError(t, err) + require.Equal(t, http.StatusCreated, startResp.StatusCode(), "start capture session: %s", string(startResp.Body)) + require.NotNil(t, startResp.JSON201) + sessionID := startResp.JSON201.Id + t.Logf("capture session started: %s", sessionID) + + // Let the session run briefly so at least one event is published (the + // session_started system event is emitted on session start). + time.Sleep(500 * time.Millisecond) + + // Stop the capture session. + stopResp, err := client.StopCaptureSessionWithResponse(ctx) + require.NoError(t, err) + require.Equal(t, http.StatusOK, stopResp.StatusCode(), "stop capture session: %s", string(stopResp.Body)) + t.Log("capture session stopped") + + // Give the storage writer time to flush to S2 (batcher linger + network). + time.Sleep(2 * time.Second) + + // Read records written after the pre-test tail and verify at least one + // envelope is present. + readCtx, readCancel := context.WithTimeout(ctx, 10*time.Second) + defer readCancel() + + readSession, err := streamClient.ReadSession(readCtx, &s2.ReadOptions{ + SeqNum: s2.Uint64(startSeq), + }) + require.NoError(t, err, "open S2 read session") + defer readSession.Close() + + var count int + for readSession.Next() { + count++ + } + // EOF is expected once we reach the tail — not an error. + if err := readSession.Err(); err != nil && readCtx.Err() == nil { + t.Fatalf("S2 read session error: %v", err) + } + + assert.Greater(t, count, 0, "expected at least one event record in S2 stream %q", stream) + t.Logf("found %d record(s) in S2 stream after seq %d", count, startSeq) +} diff --git a/server/go.mod b/server/go.mod index 5b2d7ed8..1bdae079 100644 --- a/server/go.mod +++ b/server/go.mod @@ -21,6 +21,7 @@ require ( github.com/m1k1o/neko/server v0.0.0-20251008185748-46e2fc7d3866 github.com/nrednav/cuid2 v1.1.0 github.com/oapi-codegen/runtime v1.2.0 + github.com/s2-streamstore/s2-sdk-go v0.16.1 github.com/samber/lo v1.52.0 github.com/stretchr/testify v1.11.1 github.com/testcontainers/testcontainers-go v0.40.0 @@ -99,6 +100,7 @@ require ( go.opentelemetry.io/proto/otlp v1.9.0 // indirect golang.org/x/crypto v0.43.0 // indirect golang.org/x/mod v0.28.0 // indirect + golang.org/x/net v0.45.0 // indirect golang.org/x/text v0.30.0 // indirect golang.org/x/tools v0.37.0 // indirect google.golang.org/protobuf v1.36.10 // indirect diff --git a/server/go.sum b/server/go.sum index 0f5296e8..b26c03c9 100644 --- a/server/go.sum +++ b/server/go.sum @@ -198,6 +198,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/s2-streamstore/s2-sdk-go v0.16.1 h1:18Qht850wUhIb9JZkMwF5EJWfnmZnjdtW3z8xOuL7Ys= +github.com/s2-streamstore/s2-sdk-go v0.16.1/go.mod h1:1a+v2sGqU+s5neI8XwqRJz78ktStkR+mZH/JEi9HNSo= github.com/samber/lo v1.52.0 h1:Rvi+3BFHES3A8meP33VPAxiBZX/Aws5RxrschYGjomw= github.com/samber/lo v1.52.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0= github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= diff --git a/server/lib/events/eventsstorage.go b/server/lib/events/eventsstorage.go new file mode 100644 index 00000000..73065eec --- /dev/null +++ b/server/lib/events/eventsstorage.go @@ -0,0 +1,96 @@ +package events + +import ( + "context" + "fmt" + "log/slog" + "sync" + "sync/atomic" +) + +type Storage interface { + Append(ctx context.Context, env Envelope) error + Close(ctx context.Context) error +} + +// StorageWriter reads from the ring buffer and forwards each envelope to +// Storage. Single-use and not thread-safe: call Run once, then after +// it returns call Drain followed by Close. Reads start from the oldest +// available event in the ring, not the current tail. Delivery is +// at-least-once; consumers should dedupe by env.Seq. +type StorageWriter struct { + reader *Reader + storage Storage + log *slog.Logger + once sync.Once + appendErrors atomic.Uint64 // total append failures; best-effort, not retried +} + +// NewStorageWriter creates a writer that reads from es starting at seq 0. +func NewStorageWriter(es *EventStream, storage Storage, log *slog.Logger) *StorageWriter { + return &StorageWriter{ + reader: es.NewReader(0), + storage: storage, + log: log, + } +} + +// Run reads from the ring buffer and appends each envelope to storage until +// ctx is cancelled. Returns the context error on clean shutdown. Must be +// called at most once; returns an error on a second call. +func (w *StorageWriter) Run(ctx context.Context) error { + firstCall := false + w.once.Do(func() { firstCall = true }) + if !firstCall { + return fmt.Errorf("events: StorageWriter.Run called more than once") + } + + for { + res, err := w.reader.Read(ctx) + if err != nil { + return err + } + if err := w.processResult(ctx, res); err != nil { + return err + } + } +} + +// Drain reads any events still in the ring non-blockingly until caught up or +// ctx expires. Call after all publishers have stopped and Run has returned to +// ensure no events are silently skipped on shutdown. +func (w *StorageWriter) Drain(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + w.log.Warn("storage writer: drain deadline exceeded, ring may have unread events") + return ctx.Err() + default: + } + + res, ok := w.reader.TryRead() + if !ok { + return nil + } + if err := w.processResult(ctx, res); err != nil { + return err + } + } +} + +func (w *StorageWriter) processResult(ctx context.Context, res ReadResult) error { + if res.Dropped > 0 { + w.log.Warn("storage writer: dropped events", "count", res.Dropped) + return nil + } + if err := w.storage.Append(ctx, *res.Envelope); err != nil { + total := w.appendErrors.Add(1) + w.log.Error("storage writer: append failed", "seq", res.Envelope.Seq, "err", err, "total_append_errors", total) + } + return nil +} + +// Close drains in-flight writes and releases backend resources. +func (w *StorageWriter) Close(ctx context.Context) error { + return w.storage.Close(ctx) +} diff --git a/server/lib/events/eventsstorage_writer_test.go b/server/lib/events/eventsstorage_writer_test.go new file mode 100644 index 00000000..ec9631ad --- /dev/null +++ b/server/lib/events/eventsstorage_writer_test.go @@ -0,0 +1,201 @@ +package events + +import ( + "context" + "errors" + "log/slog" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type mockBackend struct { + mu sync.Mutex + appended []Envelope + err error + errCount int +} + +func (m *mockBackend) Append(_ context.Context, env Envelope) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.err != nil { + m.errCount++ + return m.err + } + m.appended = append(m.appended, env) + return nil +} + +func (m *mockBackend) Close(_ context.Context) error { return nil } + +func (m *mockBackend) envelopes() []Envelope { + m.mu.Lock() + defer m.mu.Unlock() + out := make([]Envelope, len(m.appended)) + copy(out, m.appended) + return out +} + +func (m *mockBackend) errors() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.errCount +} + +func (m *mockBackend) clearErr() { + m.mu.Lock() + defer m.mu.Unlock() + m.err = nil +} + +func newTestStream(t *testing.T, capacity int) *EventStream { + t.Helper() + es, err := NewEventStream(EventStreamConfig{RingCapacity: capacity}) + require.NoError(t, err) + return es +} + +func makeEvent(typ string) Event { + return Event{Type: typ, Category: CategorySystem} +} + +func TestStorageWriter_NormalAppend(t *testing.T) { + es := newTestStream(t, 64) + backend := &mockBackend{} + w := NewStorageWriter(es, backend, slog.Default()) + + ctx, cancel := context.WithCancel(context.Background()) + + done := make(chan error, 1) + go func() { done <- w.Run(ctx) }() + + env1 := es.Publish(Envelope{Event: makeEvent("test.one")}) + env2 := es.Publish(Envelope{Event: makeEvent("test.two")}) + + require.Eventually(t, func() bool { + return len(backend.envelopes()) == 2 + }, time.Second, 5*time.Millisecond) + + cancel() + require.ErrorIs(t, <-done, context.Canceled) + + got := backend.envelopes() + assert.Equal(t, env1.Seq, got[0].Seq) + assert.Equal(t, "test.one", got[0].Event.Type) + assert.Equal(t, env2.Seq, got[1].Seq) + assert.Equal(t, "test.two", got[1].Event.Type) +} + +func TestStorageWriter_DroppedEvents(t *testing.T) { + es := newTestStream(t, 4) + backend := &mockBackend{} + w := NewStorageWriter(es, backend, slog.Default()) + + // Publish 8 events before the writer starts — fills and wraps the ring. + for i := range 8 { + es.Publish(Envelope{Event: makeEvent("drop.test." + string(rune('a'+i)))}) + } + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- w.Run(ctx) }() + + require.Eventually(t, func() bool { + return len(backend.envelopes()) > 0 + }, time.Second, 5*time.Millisecond) + + cancel() + require.ErrorIs(t, <-done, context.Canceled) + + // With ring capacity 4 and 8 publishes, the writer must have skipped at + // least 4 events via a drop gap — so fewer than 8 envelopes landed. + got := backend.envelopes() + assert.Less(t, len(got), 8, "expected fewer than 8 envelopes due to ring overflow") + for _, env := range got { + assert.NotEmpty(t, env.Event.Type) + } +} + +func TestStorageWriter_AppendError(t *testing.T) { + es := newTestStream(t, 64) + backend := &mockBackend{err: errors.New("storage unavailable")} + w := NewStorageWriter(es, backend, slog.Default()) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- w.Run(ctx) }() + + // Publish an event that will fail. Wait until the writer has attempted it + // (errCount > 0), then clear the error and publish a second event. The + // writer must continue past the error and deliver the second event. + es.Publish(Envelope{Event: makeEvent("will.fail")}) + require.Eventually(t, func() bool { + return backend.errors() > 0 + }, time.Second, 5*time.Millisecond) + + backend.clearErr() + es.Publish(Envelope{Event: makeEvent("will.succeed")}) + require.Eventually(t, func() bool { + return len(backend.envelopes()) == 1 + }, time.Second, 5*time.Millisecond) + + cancel() + require.ErrorIs(t, <-done, context.Canceled) + + got := backend.envelopes() + require.Len(t, got, 1) + assert.Equal(t, "will.succeed", got[0].Event.Type) +} + +func TestStorageWriter_ContextCancelled(t *testing.T) { + es := newTestStream(t, 64) + backend := &mockBackend{} + w := NewStorageWriter(es, backend, slog.Default()) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := w.Run(ctx) + assert.ErrorIs(t, err, context.Canceled) +} + +// TestStorageWriter_DrainFlushesRingAfterRunExits verifies that events +// published before Drain is called are not lost even after Run has returned. +func TestStorageWriter_DrainFlushesRingAfterRunExits(t *testing.T) { + es := newTestStream(t, 64) + backend := &mockBackend{} + w := NewStorageWriter(es, backend, slog.Default()) + + // Publish events before Run starts so the ring is non-empty. + for range 5 { + es.Publish(Envelope{Event: Event{Type: "pre"}}) + } + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- w.Run(ctx) }() + + // Let Run consume some events, then cancel. + time.Sleep(20 * time.Millisecond) + + // Publish more events that may arrive while Run is winding down. + for range 3 { + es.Publish(Envelope{Event: Event{Type: "post"}}) + } + cancel() + require.ErrorIs(t, <-done, context.Canceled) + + // Drain then Close mirrors the real shutdown sequence. + drainCtx, drainCancel := context.WithTimeout(context.Background(), time.Second) + defer drainCancel() + require.NoError(t, w.Drain(drainCtx)) + require.NoError(t, w.Close(drainCtx)) + + got := backend.envelopes() + // All 8 published events must have been appended across Run + Drain. + assert.Len(t, got, 8) +} diff --git a/server/lib/events/ringbuffer.go b/server/lib/events/ringbuffer.go index e9733309..ee874c3f 100644 --- a/server/lib/events/ringbuffer.go +++ b/server/lib/events/ringbuffer.go @@ -80,6 +80,31 @@ type Reader struct { nextSeq uint64 } +// TryRead returns the next available result without blocking. Returns +// (result, true) if data is available, (ReadResult{}, false) if the reader +// has caught up to the latest published seq. +func (r *Reader) TryRead() (ReadResult, bool) { + r.rb.mu.RLock() + defer r.rb.mu.RUnlock() + + latest := r.rb.latestSeq + oldest := r.rb.oldestSeq() + + if latest == 0 || r.nextSeq > latest { + return ReadResult{}, false + } + + if r.nextSeq < oldest { + dropped := oldest - r.nextSeq + r.nextSeq = oldest + return ReadResult{Dropped: dropped}, true + } + + env := r.rb.buf[r.nextSeq%r.rb.cap] + r.nextSeq++ + return ReadResult{Envelope: &env}, true +} + // Read blocks until the next envelope is available or ctx is cancelled. func (r *Reader) Read(ctx context.Context) (ReadResult, error) { for { diff --git a/server/lib/events/s2storage.go b/server/lib/events/s2storage.go new file mode 100644 index 00000000..dd112798 --- /dev/null +++ b/server/lib/events/s2storage.go @@ -0,0 +1,228 @@ +package events + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log/slog" + "sync" + "sync/atomic" + "time" + + "github.com/s2-streamstore/s2-sdk-go/s2" +) + +type S2Config struct { + // BatcherLinger is how long the batcher waits before flushing (default: 100ms). + BatcherLinger time.Duration + // BatcherMaxRecords is the max records per batch (default: 50). + BatcherMaxRecords int +} + +type s2Producer struct { + p *s2.Producer + wg sync.WaitGroup +} + +func (sp *s2Producer) close(ctx context.Context) error { + done := make(chan struct{}) + go func() { + sp.wg.Wait() + close(done) + }() + var drainErr error + select { + case <-done: + case <-ctx.Done(): + drainErr = ctx.Err() + } + closeDone := make(chan error, 1) + go func() { closeDone <- sp.p.Close() }() + select { + case err := <-closeDone: + return errors.Join(drainErr, err) + case <-ctx.Done(): + return errors.Join(drainErr, ctx.Err()) + } +} + +// s2Storage appends all events to a single fixed stream set at construction time. +type s2Storage struct { + producer s2Producer + sessionCancel context.CancelFunc + shutdownCtx context.Context + shutdownCancel context.CancelFunc + closeOnce sync.Once + ackErrors atomic.Uint64 + log *slog.Logger +} + +// newS2Storage opens an AppendSession that runs under an independent context so +// SIGTERM does not kill it before the batcher flushes. Close cancels that +// context after the producer has drained. +func newS2Storage(ctx context.Context, basin, accessToken, streamName string, cfg S2Config, log *slog.Logger) (*s2Storage, error) { + if basin == "" || accessToken == "" || streamName == "" { + return nil, fmt.Errorf("s2storage: basin, accessToken, and streamName are required") + } + + if err := ctx.Err(); err != nil { + return nil, fmt.Errorf("s2storage: context already cancelled: %w", err) + } + + client := s2.New(accessToken, nil) + stream := client.Basin(basin).Stream(s2.StreamName(streamName)) + + // sessionCtx is independent of the signal context so SIGTERM does not kill + // the session before the batcher has been flushed. Close cancels it after + // the producer drains. + sessionCtx, sessionCancel := context.WithCancel(context.Background()) + session, err := stream.AppendSession(sessionCtx, nil) + if err != nil { + sessionCancel() + return nil, fmt.Errorf("s2storage: open append session: %w", err) + } + + if cfg.BatcherLinger == 0 { + cfg.BatcherLinger = 100 * time.Millisecond + } + if cfg.BatcherMaxRecords == 0 { + cfg.BatcherMaxRecords = 50 + } + batcher := s2.NewBatcher(context.Background(), &s2.BatchingOptions{ + Linger: cfg.BatcherLinger, + MaxRecords: cfg.BatcherMaxRecords, + }) + producer := s2.NewProducer(context.Background(), batcher, session) + + shutdownCtx, shutdownCancel := context.WithCancel(context.Background()) + return &s2Storage{ + producer: s2Producer{p: producer}, + sessionCancel: sessionCancel, + shutdownCtx: shutdownCtx, + shutdownCancel: shutdownCancel, + log: log, + }, nil +} + +func (s *s2Storage) Append(ctx context.Context, env Envelope) error { + if err := ctx.Err(); err != nil { + return err + } + + data, err := json.Marshal(env) + if err != nil { + return fmt.Errorf("s2storage: marshal envelope seq=%d: %w", env.Seq, err) + } + + s.producer.wg.Add(1) + future, err := s.producer.p.Submit(s2.AppendRecord{Body: data}) + if err != nil { + s.producer.wg.Done() + return fmt.Errorf("s2storage: submit seq=%d: %w", env.Seq, err) + } + + go func() { + defer s.producer.wg.Done() + + ticket, err := future.Wait(s.shutdownCtx) + if err != nil { + total := s.ackErrors.Add(1) + s.log.Error("s2storage: wait for submit failed", "seq", env.Seq, "err", err, "total_ack_errors", total) + return + } + if ticket == nil { + return + } + if _, err := ticket.Ack(s.shutdownCtx); err != nil { + total := s.ackErrors.Add(1) + s.log.Error("s2storage: ack failed", "seq", env.Seq, "err", err, "total_ack_errors", total) + } + }() + + return nil +} + +// Close cancels in-flight ack goroutines, waits for them to drain, flushes the +// S2 batcher to the network, then tears down the session. +func (s *s2Storage) Close(ctx context.Context) error { + s.closeOnce.Do(s.shutdownCancel) + err := s.producer.close(ctx) + s.sessionCancel() + return err +} + +// S2StorageWriter reads from an EventStream and forwards each event to S2. +// Construct with NewS2StorageWriter, call Start to begin, Stop to drain and shut down. +type S2StorageWriter struct { + es *EventStream + basin string + accessToken string + streamName string + cfg S2Config + log *slog.Logger + + mu sync.Mutex + started bool + storage *s2Storage + writer *StorageWriter + done chan struct{} +} + +func NewS2StorageWriter(es *EventStream, basin, accessToken, streamName string, cfg S2Config, log *slog.Logger) *S2StorageWriter { + return &S2StorageWriter{ + es: es, + basin: basin, + accessToken: accessToken, + streamName: streamName, + cfg: cfg, + log: log, + } +} + +// Start opens the S2 append session and begins reading from the event stream. +// ctx governs the Run loop — cancel it (e.g. on SIGTERM) to stop reading. +// The session itself outlives ctx and is torn down by Stop after flushing. +func (w *S2StorageWriter) Start(ctx context.Context) error { + w.mu.Lock() + defer w.mu.Unlock() + if w.started { + return fmt.Errorf("s2storagewriter: Start called more than once") + } + storage, err := newS2Storage(ctx, w.basin, w.accessToken, w.streamName, w.cfg, w.log) + if err != nil { + return err + } + w.storage = storage + w.writer = NewStorageWriter(w.es, storage, w.log) + w.done = make(chan struct{}) + w.started = true + go func() { + defer close(w.done) + if err := w.writer.Run(ctx); err != nil && ctx.Err() == nil { + w.log.Error("s2 storage writer failed", "err", err) + } + }() + return nil +} + +// Stop waits for the Run goroutine to exit, drains any remaining ring events, +// then closes the S2 producer. ctx bounds the total shutdown time. +func (w *S2StorageWriter) Stop(ctx context.Context) error { + w.mu.Lock() + if !w.started { + w.mu.Unlock() + return nil + } + w.mu.Unlock() + + select { + case <-w.done: + case <-ctx.Done(): + return ctx.Err() + } + if err := w.writer.Drain(ctx); err != nil { + w.log.Warn("s2 storage writer: drain incomplete", "err", err) + } + return w.storage.Close(ctx) +}