-
Notifications
You must be signed in to change notification settings - Fork 64
[kernel-1116] browser telemetry add s2 storage #239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
archandatta
merged 24 commits into
main
from
archand/kernel-1116/browser-telemetry/add-s2
May 13, 2026
Merged
Changes from all commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
0529bd8
feat: add EventsStorage interface, writer, and S2 backend for browser…
archandatta 6e50eb4
feat: wire S2 storage writer into main with ordered shutdown
archandatta 7868205
feat: pass S2_BASIN, S2_ACCESS_TOKEN, and S2_STREAM through superviso…
archandatta bbf4a35
fix: update config tests to expect S2 batcher defaults
archandatta c9a6445
chore: promote s2-sdk-go to direct dependency
archandatta f42f992
review: rename Storage/StorageWriter, use time.Duration for batcher l…
archandatta 92a96b6
fix: remove dead defaults, bound ack goroutine contexts, embed s2Prod…
archandatta 64e42f0
fix: return err directly from Run, inject logger, enforce single-use …
archandatta 4d63909
fix: include dropped seq range in storage writer log
archandatta 9df3e6c
feat: count append errors on StorageWriter for ops visibility
archandatta cc1d744
fix: drain ring after HTTP shutdown to prevent data loss on exit
archandatta fc0c7db
review: redact S2AccessToken in config logs, bound Close with context…
archandatta 040f361
review: update eventsstorage comments and test error handling
archandatta c6467b3
review: remove DroppedFrom/DroppedTo fields and trim redundant comments
archandatta ab5d8ba
fix: pass context.Background() to NewS2Storage so S2 pipeline outlive…
archandatta 42f1c47
fix: apply S2Config documented defaults when zero values are passed
archandatta 3f9550f
fix: always call p.Close() in s2Producer.close even when ack drain ti…
archandatta c9e555a
review: bound p.Close() with ctx, give Close its own deadline, collap…
archandatta a1742b3
review: bound AppendSession setup with a 30s signal-aware context; ba…
archandatta 5383db9
review: collapse s2 storage into S2StorageWriter with Start/Stop life…
archandatta d174e12
fix: give AppendSession its own context so SIGTERM doesn't kill the s…
archandatta 750e244
fix: add ctx guard to newS2Storage; fix misleading comments on sessio…
archandatta b478192
review: add comment for s2writer shutdown
archandatta 0f67535
Merge branch 'main' into archand/kernel-1116/browser-telemetry/add-s2
archandatta File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2 changes: 1 addition & 1 deletion
2
images/chromium-headful/supervisor/services/kernel-images-api.conf
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2 changes: 1 addition & 1 deletion
2
images/chromium-headless/image/supervisor/services/kernel-images-api.conf
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| package e2e | ||
|
|
||
| import ( | ||
| "context" | ||
| "net/http" | ||
| "os" | ||
| "os/exec" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/s2-streamstore/s2-sdk-go/s2" | ||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
|
|
||
| instanceoapi "github.com/kernel/kernel-images/server/lib/oapi" | ||
| ) | ||
|
|
||
| // TestS2StorageWriter starts a headless container with S2 credentials, runs a | ||
| // capture session, and verifies that events land in the configured S2 stream. | ||
| // | ||
| // Skips automatically when S2_BASIN, S2_ACCESS_TOKEN, or S2_STREAM are unset. | ||
| func TestS2StorageWriter(t *testing.T) { | ||
| basin := os.Getenv("S2_BASIN") | ||
| accessToken := os.Getenv("S2_ACCESS_TOKEN") | ||
| stream := os.Getenv("S2_STREAM") | ||
| if basin == "" || accessToken == "" || stream == "" { | ||
| t.Skip("S2_BASIN, S2_ACCESS_TOKEN, and S2_STREAM must be set to run this test") | ||
| } | ||
|
|
||
| if _, err := exec.LookPath("docker"); err != nil { | ||
| t.Skipf("docker not available: %v", err) | ||
| } | ||
|
|
||
| ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) | ||
| defer cancel() | ||
|
|
||
| c := NewTestContainer(t, headlessImage) | ||
| require.NoError(t, c.Start(ctx, ContainerConfig{ | ||
| Env: map[string]string{ | ||
| "S2_BASIN": basin, | ||
| "S2_ACCESS_TOKEN": accessToken, | ||
| "S2_STREAM": stream, | ||
| }, | ||
| }), "failed to start container") | ||
| defer c.Stop(ctx) | ||
|
|
||
| require.NoError(t, c.WaitReady(ctx), "api not ready") | ||
|
|
||
| client, err := c.APIClient() | ||
| require.NoError(t, err) | ||
|
|
||
| // Note the current S2 stream tail seq before we write anything so we only | ||
| // read records produced by this test run. | ||
| s2Client := s2.New(accessToken, nil) | ||
| streamClient := s2Client.Basin(basin).Stream(s2.StreamName(stream)) | ||
|
|
||
| checkResp, err := streamClient.CheckTail(ctx) | ||
| require.NoError(t, err, "check tail before test") | ||
| startSeq := checkResp.Tail.SeqNum | ||
|
|
||
| // Start a capture session. | ||
| startResp, err := client.StartCaptureSessionWithResponse(ctx, instanceoapi.StartCaptureSessionJSONRequestBody{}) | ||
| require.NoError(t, err) | ||
| require.Equal(t, http.StatusCreated, startResp.StatusCode(), "start capture session: %s", string(startResp.Body)) | ||
| require.NotNil(t, startResp.JSON201) | ||
| sessionID := startResp.JSON201.Id | ||
| t.Logf("capture session started: %s", sessionID) | ||
|
|
||
| // Let the session run briefly so at least one event is published (the | ||
| // session_started system event is emitted on session start). | ||
| time.Sleep(500 * time.Millisecond) | ||
|
|
||
| // Stop the capture session. | ||
| stopResp, err := client.StopCaptureSessionWithResponse(ctx) | ||
| require.NoError(t, err) | ||
| require.Equal(t, http.StatusOK, stopResp.StatusCode(), "stop capture session: %s", string(stopResp.Body)) | ||
| t.Log("capture session stopped") | ||
|
|
||
| // Give the storage writer time to flush to S2 (batcher linger + network). | ||
| time.Sleep(2 * time.Second) | ||
|
|
||
| // Read records written after the pre-test tail and verify at least one | ||
| // envelope is present. | ||
| readCtx, readCancel := context.WithTimeout(ctx, 10*time.Second) | ||
| defer readCancel() | ||
|
|
||
| readSession, err := streamClient.ReadSession(readCtx, &s2.ReadOptions{ | ||
| SeqNum: s2.Uint64(startSeq), | ||
| }) | ||
| require.NoError(t, err, "open S2 read session") | ||
| defer readSession.Close() | ||
|
|
||
| var count int | ||
| for readSession.Next() { | ||
| count++ | ||
| } | ||
| // EOF is expected once we reach the tail — not an error. | ||
| if err := readSession.Err(); err != nil && readCtx.Err() == nil { | ||
| t.Fatalf("S2 read session error: %v", err) | ||
| } | ||
|
|
||
| assert.Greater(t, count, 0, "expected at least one event record in S2 stream %q", stream) | ||
| t.Logf("found %d record(s) in S2 stream after seq %d", count, startSeq) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| package events | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "log/slog" | ||
| "sync" | ||
| "sync/atomic" | ||
| ) | ||
|
|
||
| type Storage interface { | ||
| Append(ctx context.Context, env Envelope) error | ||
| Close(ctx context.Context) error | ||
| } | ||
|
|
||
| // StorageWriter reads from the ring buffer and forwards each envelope to | ||
| // Storage. Single-use and not thread-safe: call Run once, then after | ||
| // it returns call Drain followed by Close. Reads start from the oldest | ||
| // available event in the ring, not the current tail. Delivery is | ||
| // at-least-once; consumers should dedupe by env.Seq. | ||
| type StorageWriter struct { | ||
| reader *Reader | ||
| storage Storage | ||
| log *slog.Logger | ||
| once sync.Once | ||
| appendErrors atomic.Uint64 // total append failures; best-effort, not retried | ||
| } | ||
|
|
||
| // NewStorageWriter creates a writer that reads from es starting at seq 0. | ||
| func NewStorageWriter(es *EventStream, storage Storage, log *slog.Logger) *StorageWriter { | ||
| return &StorageWriter{ | ||
| reader: es.NewReader(0), | ||
| storage: storage, | ||
| log: log, | ||
| } | ||
| } | ||
|
|
||
| // Run reads from the ring buffer and appends each envelope to storage until | ||
| // ctx is cancelled. Returns the context error on clean shutdown. Must be | ||
| // called at most once; returns an error on a second call. | ||
| func (w *StorageWriter) Run(ctx context.Context) error { | ||
| firstCall := false | ||
| w.once.Do(func() { firstCall = true }) | ||
| if !firstCall { | ||
| return fmt.Errorf("events: StorageWriter.Run called more than once") | ||
| } | ||
|
|
||
| for { | ||
| res, err := w.reader.Read(ctx) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if err := w.processResult(ctx, res); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Drain reads any events still in the ring non-blockingly until caught up or | ||
| // ctx expires. Call after all publishers have stopped and Run has returned to | ||
| // ensure no events are silently skipped on shutdown. | ||
| func (w *StorageWriter) Drain(ctx context.Context) error { | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| w.log.Warn("storage writer: drain deadline exceeded, ring may have unread events") | ||
| return ctx.Err() | ||
| default: | ||
| } | ||
|
|
||
| res, ok := w.reader.TryRead() | ||
| if !ok { | ||
| return nil | ||
| } | ||
| if err := w.processResult(ctx, res); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (w *StorageWriter) processResult(ctx context.Context, res ReadResult) error { | ||
| if res.Dropped > 0 { | ||
| w.log.Warn("storage writer: dropped events", "count", res.Dropped) | ||
| return nil | ||
| } | ||
| if err := w.storage.Append(ctx, *res.Envelope); err != nil { | ||
| total := w.appendErrors.Add(1) | ||
| w.log.Error("storage writer: append failed", "seq", res.Envelope.Seq, "err", err, "total_append_errors", total) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // Close drains in-flight writes and releases backend resources. | ||
| func (w *StorageWriter) Close(ctx context.Context) error { | ||
| return w.storage.Close(ctx) | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should have a comment here explaining why this shutdown happens after the other shutdown()s (because the other things above could produce events into the stream, and we don't want to drop those events, so we wait for them to complete shutdown)