Entlein/adaptive write#37
Conversation
…house Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
…ntext Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
…e context prometheus backends and test out the write clickhouse experiment Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
Signed-off-by: entlein <einentlein@gmail.com>
…config # Conflicts: # k8s/vizier/bootstrap/adaptive_export_deployment.yaml # src/vizier/services/adaptive_export/cmd/main.go # src/vizier/services/adaptive_export/internal/config/config.go # src/vizier/services/adaptive_export/internal/pixie/pixie.go
Signed-off-by: entlein <einentlein@gmail.com>
📝 WalkthroughWalkthroughAdds an adaptive-export operator and ClickHouse schema/DDL/runtime, makes perf-tool exporters pluggable (BigQuery, Parquet→GCS), extends CLI/suites/UI for ClickHouse experiments, updates dependencies and build targets, and adds comprehensive tests. ChangesAdaptive export operator and perf-tool integration
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Actionable comments posted: 31
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@go.mod`:
- Line 324: The go.mod file contains a replace directive forcing
google.golang.org/protobuf to v1.29.1 (the vulnerable downgrade) while the
require already specifies a safe v1.34.2; remove the replace directive
referencing google.golang.org/protobuf v1.29.1 so the module can use the patched
version and then run module cleanup (e.g., go mod tidy) to update the dependency
graph and ensure go.mod/go.sum reflect the removal.
In `@src/carnot/exec/clickhouse_export_sink_node.cc`:
- Around line 38-39: The sink currently dereferences mapped indices without
guarding for missing columns (see mapping.input_column_index() being passed to
rb.ColumnAt), so update the ClickhouseExportSinkNode code that iterates mappings
to check that the mapped input column actually exists before calling
rb.ColumnAt(mapping.input_column_index()); e.g., verify
mapping.input_column_index() is within rb.NumColumns() (or use rb.HasColumn /
equivalent) and skip/continue for mappings that are out-of-range or otherwise
invalid so the node ignores missing columns instead of crashing.
In `@src/e2e_test/perf_tool/pkg/cluster/context.go`:
- Around line 75-77: The code currently overwrites an empty kubeconfigPath with
clientcmd.RecommendedHomeFile, which can differ from what
NewDefaultClientConfigLoadingRules() actually used (including KUBECONFIG);
instead, when kubeconfigPath is empty, query the loading rules for the real
filename (e.g., call loadingRules.GetDefaultFilename() on the
ClientConfigLoadingRules returned by NewDefaultClientConfigLoadingRules()) and
assign that to kubeconfigPath so the value passed to AddEnv matches the actual
kubeconfig source used by the client config.
In `@src/e2e_test/perf_tool/pkg/exporter/bq_exporter.go`:
- Around line 86-107: ExportResults currently starts BatchInserter.Run(bqCh) in
a goroutine and defers close(bqCh) but returns without waiting, losing buffered
rows and swallowing errors; modify ExportResults to (1) create a done/error
channel or sync.WaitGroup and start inserter.Run(bqCh, ctx) (or wrap Run to
accept ctx) so the inserter can observe context cancellation, (2) close bqCh
only after draining resultCh, then wait for the inserter goroutine to finish
before returning, and (3) surface any insertion errors from
uploadBatch/BatchInserter back to ExportResults (via an error channel or
returned error) instead of only logging them; reference
BQExporter.ExportResults, bq.BatchInserter.Run and uploadBatch to locate where
to add waiting, context handling, and error propagation.
In `@src/e2e_test/perf_tool/pkg/exporter/parquet_exporter_test.go`:
- Around line 304-314: The test opens files in a loop and uses defer f.Close(),
which delays closing until the test function exits; change this so each file is
closed immediately after use: in the loop that iterates over uploadedFiles,
replace defer f.Close() with an explicit f.Close() call (checking/ignoring the
error) once you're done with parquet.OpenFile and the pf.NumRows() assertion, or
alternatively wrap the loop body in an inner anonymous func so the defer
f.Close() is scoped and runs at the end of each iteration; update the code paths
around os.Open, parquet.OpenFile, and pf.NumRows() accordingly.
In `@src/e2e_test/perf_tool/pkg/exporter/parquet_exporter.go`:
- Around line 141-144: The Close method on ParquetGCSExporter can panic if
gcsClient is nil (e.g., when an upload override is used in tests); update
ParquetGCSExporter's Close to check if e.gcsClient != nil before calling
e.gcsClient.Close() and return nil when it's nil, ensuring safe shutdown. Locate
the Close method on type ParquetGCSExporter and add the nil guard so Close only
calls gcsClient.Close() when the client exists.
- Around line 167-173: The loop writes each row with
writer.WriteRows([]parquet.Row{parquetRow}) causing per-call overhead; instead,
build a slice of parquet.Row using buildResultRow for each entry in rows (or
accumulate in fixed-size batches if memory is a concern) and call
writer.WriteRows once (or once per batch) to write them; preserve the existing
error handling (close tmpFile on error and return fmt.Errorf(...)) and update
the error message to reference the batch write failure where writer.WriteRows is
called.
In `@src/e2e_test/perf_tool/pkg/metrics/prometheus_recorder.go`:
- Around line 46-49: The struct field block in prometheus_recorder.go (fields
clusterCtx, ownsClusterCtx, spec, eg on the Prometheus recorder type) is not
gofmt-formatted; run the repository's Go formatter (e.g., gofmt -s -w or
goimports -w) on src/e2e_test/perf_tool/pkg/metrics/prometheus_recorder.go to
fix spacing/indentation so the struct declaration for the Prometheus recorder
(clusterCtx, ownsClusterCtx, spec, eg) conforms to the project's lint/format
rules and CI passes.
In `@src/e2e_test/perf_tool/pkg/suites/suites.go`:
- Around line 101-102: The clickhouseDSN variable currently contains hardcoded
credentials; remove the plaintext DSN and instead read it from a secure source
(e.g., an environment variable or a CLI flag). Replace references to
clickhouseDSN with a value loaded from os.Getenv("CLICKHOUSE_DSN") or a new flag
(e.g., add a flag similar to --prom_recorder_override) and validate it is
non-empty before use; keep clickhouseTable as-is or make it configurable via an
env var/flag if needed, and document placeholder usage so no secrets are
committed.
- Around line 89-91: Remove the leftover commented-out debug duration lines so
only the intended duration assignment remains: delete the "// preDur := 2 *
time.Minute" and "// dur := 5 * time.Minute" comment lines and keep the active
"dur := 30 * time.Minute" assignment in the suites setup (look for the dur
variable in suites.go).
In `@src/e2e_test/perf_tool/pkg/suites/workloads.go`:
- Around line 229-231: The workload currently sets a hardcoded placeholder
SkaffoldPath ("src/e2e_test/clickhouse_read_load/skaffold.yaml") which will
break deployment; update the code in workloads.go where SkaffoldPath is set (the
workload struct initialiser) to either: 1) gate inclusion by checking that the
real skaffold file exists (os.Stat or a helper like fileExists) before adding
this workload to active experiments, or 2) guard with a feature flag/config
(e.g., enableClickHouseReadLoad) and only set SkaffoldPath and wire the workload
when that flag is true; ensure the placeholder path is removed from default
active wiring until the real app and path land.
In `@src/e2e_test/perf_tool/ui/index.html`:
- Around line 428-429: The suite and workload filter values (variables
suiteFilter and workloadFilter) are interpolated directly into SQL and allow
injection; sanitize these inputs before building the SQL by escaping single
quotes and other SQL-breaking characters (e.g., replace each ' with ''), or run
them through a small helper like escapeSqlString(s) and use the escaped result
in the template strings for suiteFilter and workloadFilter; update any code that
constructs those filters to use the escaped values (references: suiteFilter,
workloadFilter) so no raw user/tag value is injected into the SQL.
- Line 107: The button element with id "gcs-load-btn" currently lacks an
explicit type and will default to type="submit"; change it to include
type="button" to prevent unintended form submission. Apply the same change (add
type="button") to the other plain <button class="btn"> elements in this file —
specifically the other buttons near the referenced lines (the other .btn buttons
around lines 120, 121, 131, and 150) so each has type="button".
In `@src/utils/shared/k8s/delete.go`:
- Around line 126-127: The method ObjectDeleter.getAggregatedGroupVersions
currently uses the deprecated non-generic sets API; update its signature return
type from sets.String to sets.Set[string], replace the initialization
sets.NewString() with sets.New[string](), and update the creation that uses
resource.Verbs from sets.NewString(resource.Verbs...) to
sets.New[string](resource.Verbs...); make these changes inside
getAggregatedGroupVersions so all set variables and returns use sets.Set[string]
and the generic sets.New[string](...) constructors.
- Around line 128-133: The code in getDeletableResourceTypes calls
o.dynamicClient.Resource(apiServiceGVR).List(...) and treats any error as fatal,
causing a permissions regression; update the error handling around that List
call (the apiServiceGVR dynamicClient call in delete.go /
getDeletableResourceTypes) to treat authorization failures like
NotFound/NoMatch: if errors.IsNotFound(err) || meta.IsNoMatchError(err) ||
errors.IsForbidden(err) || errors.IsUnauthorized(err) then return the current
out, nil (i.e., continue without the aggregated-resource skip) and emit a debug
log indicating APIService discovery was skipped due to lack of permission;
otherwise, keep returning the error as before.
In `@src/vizier/funcs/md_udtfs/md_udtfs_impl.h`:
- Around line 1158-1167: The code stores cluster_name_ without validation and
later interpolates it into DROP/CREATE SQL, enabling injection; in the
Init/constructor where cluster_name is assigned (the block that sets
cluster_name_ = std::string(cluster_name)), add strict identifier validation
(e.g. reject empty values, reject quotes/semicolons/whitespace and require a
conservative regex like ^[A-Za-z0-9_]+$), and if validation fails return an
error status instead of proceeding; apply the same validation check at the other
initialization sites referenced (the blocks around the DROP/CREATE usage noted
in the comment, e.g., the sections at ~1234-1237 and ~1315-1317) so
cluster_name_ is never interpolated into SQL unless it passes validation.
In `@src/vizier/services/adaptive_export/cmd/main.go`:
- Around line 176-179: The durEnv helper can return non-positive durations which
will cause time.NewTicker(pruneInterval) to panic and can invert attribution
windows; validate the duration returned by durEnv for pruneInterval and the
before/after window values (the variables named pruneInterval, beforeWindow,
afterWindow) and if the value is <= 0, replace it with the safe default used
previously (e.g., the second durEnv argument) or return an error; update the
code around the time.NewTicker(...) call and the logic handling the before/after
windows (the callers that use durEnv at lines creating those variables) to
perform this check before using the duration so negative or zero overrides are
rejected and the default is used instead.
- Around line 177-213: The shutdown path currently cancels the context then
unconditionally sleeps 500ms, which can cut off background work (the ticker
goroutine that calls ctl.PruneExpired and the controller started with ctl.Run)
that may take seconds; replace the naive sleep with coordinated waiting: start
those goroutines under a sync.WaitGroup or an errgroup (use the same ctx passed
to ctl.Run and the ticker), increment the group for the ticker goroutine and the
ctl.Run goroutine, ensure each decrements Done() when finished, and after
calling cancel() wait for the group to complete (and capture/log errors returned
by ctl.Run) instead of sleeping; keep signal handling (sigCh, cancel) but remove
the fixed time.Sleep so shutdown blocks until both the ticker goroutine and
ctl.Run finish.
- Around line 88-89: The code currently uses resolveHostname() which falls back
to os.Hostname(), causing incorrect pod hostnames for node-scoped queries;
change the logic so NODE_NAME (env var) is required: read NODE_NAME and if empty
log an error and exit instead of falling back, update the place where hostname
is assigned and logged (resolveHostname / hostname variable and the
log.WithField("hostname", hostname).Info call) and apply the same mandatory
check/exit behavior to the other block around lines 227-236 that also uses node
hostname resolution.
In `@src/vizier/services/adaptive_export/internal/clickhouse/apply.go`:
- Around line 86-96: Apply currently iterates OperatorOwnedTables and never runs
the database CREATE statement from schema.sql, causing table creation to fail on
a fresh cluster; update Applier.Apply to first obtain and execute the database
DDL (use DDL(...) for the database name or a dedicated function that returns the
"CREATE DATABASE IF NOT EXISTS forensic_db" statement) via a.execute(ctx, ddl)
before looping OperatorOwnedTables, and preserve the existing error-wrapping
style (fmt.Errorf) used in the loop for both fetching the database DDL and
executing it.
In `@src/vizier/services/adaptive_export/internal/clickhouse/ddl_test.go`:
- Around line 19-23: The import block in the test file is not formatted to
satisfy gci; run the project's import organizer (gci) or goimports/gofmt on the
file and fix the import grouping/order and spacing (ensure standard,
third-party, and local imports are grouped correctly), remove any unused imports
(e.g., check "errors" or "strings"), and re-save so the file passes the CI
formatting check referenced at Line 97.
- Around line 68-70: The test calls DDL(name) but ignores the returned error;
change the test to capture both ddl and err from DDL(name), check if err != nil
and fail immediately (e.g., t.Fatalf or t.Fatalf with the error) before
asserting that ddl does not contain "anomaly_hash" or "anomaly_hashes"; locate
the DDL(name) call and the strings.Contains checks in the test and insert the
err check so the test reports extraction failures rather than misleading
hash-column assertions.
In `@src/vizier/services/adaptive_export/internal/clickhouse/insert_test.go`:
- Line 110: The file ends with an extra trailing blank line after the final
closing brace in insert_test.go; remove that blank line (or run gofmt -w on
src/vizier/services/adaptive_export/internal/clickhouse/insert_test.go) so the
file ends immediately after the final closing brace of the test(s) in
insert_test.go, ensuring no extra newline is present to satisfy formatting
checks.
In `@src/vizier/services/adaptive_export/internal/controller/controller_test.go`:
- Around line 319-345: The test TestController_RestartMidStream_Aborts uses
fixed time.Sleep calls after trig.push(canonicalEvent()) which makes it flaky;
replace those sleeps with condition-based waits that poll or wait on
deterministic signals: after trig.push(...) wait until c.Active() == 1 (with a
short timeout) or wait on a channel/notification from the fakeTrigger/fakeSink
indicating the event was received/processed, and in the goroutine that runs
c.Run(ctx) use the existing done channel with a select+timeout to assert abort;
update the two occurrences around trig.push(...) (the Sleep at the first push
and the Sleep before cancel) to use these condition-based waits so the test no
longer depends on wall-clock sleeps.
In `@src/vizier/services/adaptive_export/internal/e2e/e2e_test.go`:
- Line 82: Replace the numeric literal 405 in the HTTP error response with the
standard constant http.StatusMethodNotAllowed; locate the call to http.Error(w,
"method", 405) (inside the request handler) and change the status argument to
http.StatusMethodNotAllowed so the code uses the named HTTP status constant for
clarity and maintainability.
In `@src/vizier/services/adaptive_export/internal/kubescape/extract_test.go`:
- Around line 94-95: The test currently ignores errors from Extract when calling
Extract(canonicalRow()) and Extract(r), which can mask parsing failures; change
both calls to capture the error (e.g., a, err := Extract(canonicalRow()) and b,
err := Extract(r)) and fail the test immediately on error (use t.Fatalf or
t.Fatalf with a clear message) so the reorder-stability test cannot pass with
zero-values when parsing fails; ensure you reference the Extract calls around
canonicalRow() and r in the reorder-stability test and include the error in the
failure message for debugging.
In `@src/vizier/services/adaptive_export/internal/kubescape/extract.go`:
- Around line 30-36: The import block in extract.go is not formatted per gci
rules: keep the standard library imports (encoding/json, errors, fmt) sorted
together, add a separating blank line, then the local package import
(px.dev/pixie/.../anomaly); update the imports block around those symbols so
stdlib and local imports are split by a blank line and re-run gci or gofmt to
satisfy the linter.
In `@src/vizier/services/adaptive_export/internal/pixie/pixie.go`:
- Around line 68-72: The isInternal check in Client.init currently uses
strings.ContainsAny(c.cloudAddr, "cluster.local") which incorrectly matches many
hostnames and disables TLS verification; update Client.init to determine
internal addresses correctly by parsing c.cloudAddr to extract the hostname
(e.g., via net.SplitHostPort or url/hostname parsing) and then use
strings.HasSuffix(hostname, "cluster.local") or an exact match to set
isInternal, so tls.Config{InsecureSkipVerify: isInternal} is only true for
genuine internal cluster addresses; keep the rest of the flow
(credentials.NewTLS, grpc.Dial) unchanged.
In `@src/vizier/services/adaptive_export/internal/pxl/tables_test.go`:
- Around line 23-32: Update the test comment in TestBuiltinTables_Has13 to
clearly state that BuiltinTables currently contains 12 table entries even though
the user-specified protocol list contains 13 protocol identifiers (because
http2_messages.beta is a separate protocol entry but does not add an extra
table), and change the wording to explicitly say "BuiltinTables should have 12
entries; the protocol list contains 13 identifiers but one maps to an existing
table (http2_messages.beta is listed separately from http_events)". Keep the
guard logic unchanged and reference BuiltinTables and TestBuiltinTables_Has13 so
future maintainers understand the difference between protocol identifiers and
table entries.
In `@src/vizier/services/adaptive_export/internal/sink/clickhouse_test.go`:
- Line 55: This test file is failing gofumpt formatting around the top-level var
block ('var ('); run gofumpt to reformat the file (e.g., gofumpt -w <that test
file>) or apply the gofumpt style changes to the 'var (' declaration and
surrounding code, then save and commit the resulting formatting changes so CI
will pass; locate the offending block by searching for the 'var (' token in the
test file (clickhouse_test.go) and ensure the formatted output is committed.
In `@src/vizier/services/adaptive_export/internal/trigger/clickhouse.go`:
- Around line 103-121: The cursor using only event_time (watermark) is unsafe
because advancing watermark to maxSeen can permanently skip later-arriving rows
with the same event_time; update the logic around t.fetchSince, watermark, and
maxSeen to use a stable secondary key (e.g., primary key or id) or switch to an
inclusive query plus downstream dedupe: modify t.fetchSince to accept/return a
composite cursor (event_time + id) or return rows ordered by (event_time, id)
and a (maxTime,maxID) pair, change the poll loop that calls t.fetchSince and the
watermark update to advance to the composite cursor (or use event_time >=
watermark and maintain a dedupe set of last-seen IDs before sending to out), and
ensure kubescape.Extract/out sending still respects deduplication so no event
with equal event_time but later id is lost.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 0355ed64-49e6-4d9d-a791-ada36b3daea6
⛔ Files ignored due to path filters (2)
go.sumis excluded by!**/*.sumsrc/e2e_test/perf_tool/experimentpb/experiment.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (64)
go.modgo_deps.bzlsrc/carnot/exec/clickhouse_export_sink_node.ccsrc/carnot/funcs/metadata/metadata_ops.ccsrc/carnot/funcs/metadata/metadata_ops.hsrc/e2e_test/perf_tool/cmd/BUILD.bazelsrc/e2e_test/perf_tool/cmd/run.gosrc/e2e_test/perf_tool/experimentpb/experiment.protosrc/e2e_test/perf_tool/pkg/cluster/context.gosrc/e2e_test/perf_tool/pkg/exporter/BUILD.bazelsrc/e2e_test/perf_tool/pkg/exporter/bq_exporter.gosrc/e2e_test/perf_tool/pkg/exporter/exporter.gosrc/e2e_test/perf_tool/pkg/exporter/parquet_exporter.gosrc/e2e_test/perf_tool/pkg/exporter/parquet_exporter_test.gosrc/e2e_test/perf_tool/pkg/metrics/prometheus_recorder.gosrc/e2e_test/perf_tool/pkg/metrics/recorder.gosrc/e2e_test/perf_tool/pkg/run/BUILD.bazelsrc/e2e_test/perf_tool/pkg/run/run.gosrc/e2e_test/perf_tool/pkg/suites/BUILD.bazelsrc/e2e_test/perf_tool/pkg/suites/experiments.gosrc/e2e_test/perf_tool/pkg/suites/metrics.gosrc/e2e_test/perf_tool/pkg/suites/scripts/clickhouse_export.pxlsrc/e2e_test/perf_tool/pkg/suites/scripts/clickhouse_read.pxlsrc/e2e_test/perf_tool/pkg/suites/suites.gosrc/e2e_test/perf_tool/pkg/suites/workloads.gosrc/e2e_test/perf_tool/ui/index.htmlsrc/utils/shared/k8s/delete.gosrc/vizier/funcs/md_udtfs/md_udtfs_impl.hsrc/vizier/services/adaptive_export/cmd/BUILD.bazelsrc/vizier/services/adaptive_export/cmd/main.gosrc/vizier/services/adaptive_export/internal/anomaly/BUILD.bazelsrc/vizier/services/adaptive_export/internal/anomaly/hash.gosrc/vizier/services/adaptive_export/internal/anomaly/hash_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/BUILD.bazelsrc/vizier/services/adaptive_export/internal/clickhouse/apply.gosrc/vizier/services/adaptive_export/internal/clickhouse/apply_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/ddl.gosrc/vizier/services/adaptive_export/internal/clickhouse/ddl_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/insert.gosrc/vizier/services/adaptive_export/internal/clickhouse/insert_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/schema.sqlsrc/vizier/services/adaptive_export/internal/config/BUILD.bazelsrc/vizier/services/adaptive_export/internal/config/definition.gosrc/vizier/services/adaptive_export/internal/controller/BUILD.bazelsrc/vizier/services/adaptive_export/internal/controller/controller.gosrc/vizier/services/adaptive_export/internal/controller/controller_test.gosrc/vizier/services/adaptive_export/internal/e2e/BUILD.bazelsrc/vizier/services/adaptive_export/internal/e2e/e2e_test.gosrc/vizier/services/adaptive_export/internal/kubescape/BUILD.bazelsrc/vizier/services/adaptive_export/internal/kubescape/extract.gosrc/vizier/services/adaptive_export/internal/kubescape/extract_test.gosrc/vizier/services/adaptive_export/internal/pixie/BUILD.bazelsrc/vizier/services/adaptive_export/internal/pixie/pixie.gosrc/vizier/services/adaptive_export/internal/pxl/BUILD.bazelsrc/vizier/services/adaptive_export/internal/pxl/pxl.gosrc/vizier/services/adaptive_export/internal/pxl/tables.gosrc/vizier/services/adaptive_export/internal/pxl/tables_test.gosrc/vizier/services/adaptive_export/internal/script/script.gosrc/vizier/services/adaptive_export/internal/sink/BUILD.bazelsrc/vizier/services/adaptive_export/internal/sink/clickhouse.gosrc/vizier/services/adaptive_export/internal/sink/clickhouse_test.gosrc/vizier/services/adaptive_export/internal/trigger/BUILD.bazelsrc/vizier/services/adaptive_export/internal/trigger/clickhouse.gosrc/vizier/services/adaptive_export/internal/trigger/clickhouse_test.go
💤 Files with no reviewable changes (4)
- src/vizier/services/adaptive_export/internal/pixie/BUILD.bazel
- src/vizier/services/adaptive_export/internal/pxl/pxl.go
- src/vizier/services/adaptive_export/internal/config/definition.go
- src/vizier/services/adaptive_export/internal/script/script.go
| gopkg.in/yaml.v2 => gopkg.in/yaml.v2 v2.4.0 | ||
| ) | ||
|
|
||
| replace google.golang.org/protobuf => google.golang.org/protobuf v1.29.1 |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
What is the first patched (non-vulnerable) version of google.golang.org/protobuf for GHSA-8r3f-844c-mc37 / GO-2024-2611?
💡 Result:
The first patched (non-vulnerable) version of google.golang.org/protobuf for GHSA-8r3f-844c-mc37 (also known as GO-2024-2611 and CVE-2024-24786) is v1.33.0.[1][2][3] This version fixes the infinite loop in protojson.Unmarshal when handling certain invalid JSON inputs, particularly with google.protobuf.Any or UnmarshalOptions.DiscardUnknown set.[4] Official advisories confirm affected versions are before v1.33.0, with v1.33.0 as the patched version.[1][5][2]
Citations:
- 1: GHSA-8r3f-844c-mc37
- 2: https://pkg.go.dev/vuln/GO-2024-2611
- 3: https://osv.dev/vulnerability/GO-2024-2611
- 4: https://github.com/protocolbuffers/protobuf-go/releases/tag/v1.33.0
- 5: https://osv.dev/vulnerability/GHSA-8r3f-844c-mc37
🏁 Script executed:
# Read the go.mod file around line 324 to see context and any comments
head -n 330 go.mod | tail -n 20Repository: k8sstormcenter/pixie
Length of output: 1339
🏁 Script executed:
# Search for all protobuf-related entries in go.mod
rg "protobuf" go.mod -n -A 1 -B 1Repository: k8sstormcenter/pixie
Length of output: 1000
🏁 Script executed:
# Check the require section to see what version of protobuf would be used without the replace
rg "google.golang.org/protobuf" go.mod -nRepository: k8sstormcenter/pixie
Length of output: 193
Remove the forced protobuf downgrade to a vulnerable version.
Line 324 pins google.golang.org/protobuf to v1.29.1, which contains the high-severity infinite-loop vulnerability (GHSA-8r3f-844c-mc37 / GO-2024-2611). This replace directive downgrades from the safe v1.34.2 already declared in the require section. Removing this replace allows the patched version to be used.
Proposed fix
-replace google.golang.org/protobuf => google.golang.org/protobuf v1.29.1📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| replace google.golang.org/protobuf => google.golang.org/protobuf v1.29.1 |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@go.mod` at line 324, The go.mod file contains a replace directive forcing
google.golang.org/protobuf to v1.29.1 (the vulnerable downgrade) while the
require already specifies a safe v1.34.2; remove the replace directive
referencing google.golang.org/protobuf v1.29.1 so the module can use the patched
version and then run module cleanup (e.g., go mod tidy) to update the dependency
graph and ensure go.mod/go.sum reflect the removal.
| // TODO(ddelnano): Defend against columns that don't exist. These should be | ||
| // ignored by the Node. |
There was a problem hiding this comment.
Implement the missing-column guard before dereferencing mapped indices.
Line 38 states missing columns should be ignored, but Line 107 still unconditionally calls rb.ColumnAt(mapping.input_column_index()). A stale or partial mapping can crash the sink instead of skipping the column.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/carnot/exec/clickhouse_export_sink_node.cc` around lines 38 - 39, The
sink currently dereferences mapped indices without guarding for missing columns
(see mapping.input_column_index() being passed to rb.ColumnAt), so update the
ClickhouseExportSinkNode code that iterates mappings to check that the mapped
input column actually exists before calling
rb.ColumnAt(mapping.input_column_index()); e.g., verify
mapping.input_column_index() is within rb.NumColumns() (or use rb.HasColumn /
equivalent) and skip/continue for mappings that are out-of-range or otherwise
invalid so the node ignores missing columns instead of crashing.
| if kubeconfigPath == "" { | ||
| kubeconfigPath = clientcmd.RecommendedHomeFile | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -path "*src/e2e_test/perf_tool/pkg/cluster/context.go" -type fRepository: k8sstormcenter/pixie
Length of output: 112
🏁 Script executed:
cat -n src/e2e_test/perf_tool/pkg/cluster/context.go | head -100Repository: k8sstormcenter/pixie
Length of output: 3768
🏁 Script executed:
grep -n "NewNonInteractiveDeferredLoadingClientConfig" src/e2e_test/perf_tool/pkg/cluster/context.goRepository: k8sstormcenter/pixie
Length of output: 158
🏁 Script executed:
grep -n "AddEnv\|configPath" src/e2e_test/perf_tool/pkg/cluster/context.goRepository: k8sstormcenter/pixie
Length of output: 821
🏁 Script executed:
cat -n src/e2e_test/perf_tool/pkg/cluster/context.go | tail -50Repository: k8sstormcenter/pixie
Length of output: 1797
🏁 Script executed:
rg -A 5 -B 5 "NewDefaultClientConfigLoadingRules" --type goRepository: k8sstormcenter/pixie
Length of output: 975
🏁 Script executed:
rg "RecommendedConfigPathEnvVar|RecommendedHomeFile" --type goRepository: k8sstormcenter/pixie
Length of output: 159
🌐 Web query:
clientcmd.NewDefaultClientConfigLoadingRules KUBECONFIG environment variable
💡 Result:
clientcmd.NewDefaultClientConfigLoadingRules creates a ClientConfigLoadingRules struct that defines the default precedence for loading Kubernetes configuration files, with the KUBECONFIG environment variable taking highest priority [1][2][3]. Specifically, it first checks os.Getenv("KUBECONFIG") (const RecommendedConfigPathEnvVar = "KUBECONFIG"); if set and non-empty, it splits the value by path delimiters (e.g., colon on Unix) into a deduplicated list of files and sets those as the Precedence slice, also enabling WarnIfAllMissing=true to warn if all listed files are absent [2]. If KUBECONFIG is unset or empty, Precedence is set to just the default home file (~/.kube/config, const RecommendedHomeFile) [2][3]. It also sets default MigrationRules for legacy ~/.kube/.kubeconfig migration when using the home file [2]. The ExplicitPath field remains empty unless set separately. This matches kubectl's documented loading order: --kubeconfig flag first (explicit), then $KUBECONFIG files (merged), then ~/.kube/config [4]. Example usage: loadingRules:= clientcmd.NewDefaultClientConfigLoadingRules; config, err:= loadingRules.GetStartingConfig [5].
Citations:
- 1: https://godoc.org/k8s.io/client-go/tools/clientcmd
- 2: https://github.com/kubernetes/client-go/blob/master/tools/clientcmd/loader.go
- 3: https://pkg.go.dev/k8s.io/client-go/tools/clientcmd
- 4: https://kubernetes.io/docs/reference/kubectl/generated/kubectl_config/
- 5: https://kubernetes.io/blog/2026/01/19/clientcmd-apiserver-access/
🏁 Script executed:
# Verify that the constant names are correct in the codebase
grep -r "RecommendedConfigPathEnvVar" . --include="*.go" | head -5Repository: k8sstormcenter/pixie
Length of output: 46
🏁 Script executed:
# Check if os package is imported
grep "^import" -A 10 src/e2e_test/perf_tool/pkg/cluster/context.go | head -15Repository: k8sstormcenter/pixie
Length of output: 230
Preserve the actual kubeconfig source when kubeconfigPath is empty.
When kubeconfigPath is empty, NewDefaultClientConfigLoadingRules() (line 64) respects the KUBECONFIG environment variable, potentially loading config from a non-default location. However, lines 75-77 unconditionally set kubeconfigPath to clientcmd.RecommendedHomeFile, causing AddEnv() to expose a different kubeconfig path to spawned commands than what was actually used to load the context.
Proposed fix
@@
if kubeconfigPath == "" {
- kubeconfigPath = clientcmd.RecommendedHomeFile
+ kubeconfigPath = os.Getenv(clientcmd.RecommendedConfigPathEnvVar)
+ if kubeconfigPath == "" {
+ kubeconfigPath = clientcmd.RecommendedHomeFile
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if kubeconfigPath == "" { | |
| kubeconfigPath = clientcmd.RecommendedHomeFile | |
| } | |
| if kubeconfigPath == "" { | |
| kubeconfigPath = os.Getenv(clientcmd.RecommendedConfigPathEnvVar) | |
| if kubeconfigPath == "" { | |
| kubeconfigPath = clientcmd.RecommendedHomeFile | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/e2e_test/perf_tool/pkg/cluster/context.go` around lines 75 - 77, The code
currently overwrites an empty kubeconfigPath with clientcmd.RecommendedHomeFile,
which can differ from what NewDefaultClientConfigLoadingRules() actually used
(including KUBECONFIG); instead, when kubeconfigPath is empty, query the loading
rules for the real filename (e.g., call loadingRules.GetDefaultFilename() on the
ClientConfigLoadingRules returned by NewDefaultClientConfigLoadingRules()) and
assign that to kubeconfigPath so the value passed to AddEnv matches the actual
kubeconfig source used by the client config.
| // ExportResults consumes metrics from resultCh and inserts them into BigQuery in batches. | ||
| func (e *BQExporter) ExportResults(ctx context.Context, expID uuid.UUID, resultCh <-chan *metrics.ResultRow) error { | ||
| bqCh := make(chan interface{}) | ||
| defer close(bqCh) | ||
|
|
||
| inserter := &bq.BatchInserter{ | ||
| Table: e.resultTable, | ||
| BatchSize: 512, | ||
| PushTimeout: 2 * time.Minute, | ||
| } | ||
| go inserter.Run(bqCh) | ||
|
|
||
| for row := range resultCh { | ||
| bqRow, err := MetricsRowToResultRow(expID, row) | ||
| if err != nil { | ||
| log.WithError(err).Error("Failed to convert result row") | ||
| continue | ||
| } | ||
| bqCh <- bqRow | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for BatchInserter implementation to understand its behavior
ast-grep --pattern $'func ($_ *BatchInserter) Run($_) {
$$$
}'Repository: k8sstormcenter/pixie
Length of output: 1455
🏁 Script executed:
# Find the uploadBatch method to understand what it does
rg -A 15 "func.*\buploadBatch" src/shared/bq/batch_inserter.goRepository: k8sstormcenter/pixie
Length of output: 378
🏁 Script executed:
# Check the full BatchInserter.Run method context including error handling
cat -n src/shared/bq/batch_inserter.go | head -70Repository: k8sstormcenter/pixie
Length of output: 2366
🏁 Script executed:
# Find callers of ExportResults to see if they expect synchronization or error handling
rg -B 3 -A 3 "ExportResults" src/e2e_test/perf_tool/Repository: k8sstormcenter/pixie
Length of output: 6476
🏁 Script executed:
# Verify the bq.Table.Inserter().Put() behavior - does it actually wait for BigQuery?
rg -A 5 "func.*Put" src/shared/bq/ || echo "Checking BigQuery API docs..."Repository: k8sstormcenter/pixie
Length of output: 94
Race condition: ExportResults returns before the BatchInserter goroutine completes.
The function starts inserter.Run(bqCh) as a goroutine but returns immediately after closing bqCh (line 89 defer), without waiting for the inserter to finish. Although uploadBatch() is called after the channel closes, it executes within the goroutine. If the function returns and the process exits before the inserter completes, buffered rows may not be inserted.
Additionally, uploadBatch() logs insertion errors but does not propagate them—errors are silently lost. The ctx parameter is unused; context cancellation has no effect on the inserter.
🛠️ Proposed fix to wait for inserter completion
func (e *BQExporter) ExportResults(ctx context.Context, expID uuid.UUID, resultCh <-chan *metrics.ResultRow) error {
bqCh := make(chan interface{})
- defer close(bqCh)
inserter := &bq.BatchInserter{
Table: e.resultTable,
BatchSize: 512,
PushTimeout: 2 * time.Minute,
}
- go inserter.Run(bqCh)
+ done := make(chan struct{})
+ go func() {
+ inserter.Run(bqCh)
+ close(done)
+ }()
for row := range resultCh {
bqRow, err := MetricsRowToResultRow(expID, row)
if err != nil {
log.WithError(err).Error("Failed to convert result row")
continue
}
- bqCh <- bqRow
+ select {
+ case bqCh <- bqRow:
+ case <-ctx.Done():
+ close(bqCh)
+ <-done
+ return ctx.Err()
+ }
}
+ close(bqCh)
+ <-done
return nil
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/e2e_test/perf_tool/pkg/exporter/bq_exporter.go` around lines 86 - 107,
ExportResults currently starts BatchInserter.Run(bqCh) in a goroutine and defers
close(bqCh) but returns without waiting, losing buffered rows and swallowing
errors; modify ExportResults to (1) create a done/error channel or
sync.WaitGroup and start inserter.Run(bqCh, ctx) (or wrap Run to accept ctx) so
the inserter can observe context cancellation, (2) close bqCh only after
draining resultCh, then wait for the inserter goroutine to finish before
returning, and (3) surface any insertion errors from uploadBatch/BatchInserter
back to ExportResults (via an error channel or returned error) instead of only
logging them; reference BQExporter.ExportResults, bq.BatchInserter.Run and
uploadBatch to locate where to add waiting, context handling, and error
propagation.
| for _, localPath := range uploadedFiles { | ||
| f, err := os.Open(localPath) | ||
| require.NoError(t, err) | ||
| defer f.Close() | ||
|
|
||
| stat, err := f.Stat() | ||
| require.NoError(t, err) | ||
|
|
||
| pf, err := parquet.OpenFile(f, stat.Size()) | ||
| require.NoError(t, err) | ||
| assert.Equal(t, int64(3), pf.NumRows()) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | 💤 Low value
Defer in loop may delay file handle cleanup.
Using defer f.Close() inside a loop means all file handles remain open until the function returns. For this test with only one iteration, it's benign, but it's a pattern to avoid.
♻️ Suggested fix
for _, localPath := range uploadedFiles {
f, err := os.Open(localPath)
require.NoError(t, err)
- defer f.Close()
stat, err := f.Stat()
require.NoError(t, err)
pf, err := parquet.OpenFile(f, stat.Size())
require.NoError(t, err)
assert.Equal(t, int64(3), pf.NumRows())
+ f.Close()
// Verify schema has tag columns from the union of all rows.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for _, localPath := range uploadedFiles { | |
| f, err := os.Open(localPath) | |
| require.NoError(t, err) | |
| defer f.Close() | |
| stat, err := f.Stat() | |
| require.NoError(t, err) | |
| pf, err := parquet.OpenFile(f, stat.Size()) | |
| require.NoError(t, err) | |
| assert.Equal(t, int64(3), pf.NumRows()) | |
| for _, localPath := range uploadedFiles { | |
| f, err := os.Open(localPath) | |
| require.NoError(t, err) | |
| stat, err := f.Stat() | |
| require.NoError(t, err) | |
| pf, err := parquet.OpenFile(f, stat.Size()) | |
| require.NoError(t, err) | |
| assert.Equal(t, int64(3), pf.NumRows()) | |
| f.Close() |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/e2e_test/perf_tool/pkg/exporter/parquet_exporter_test.go` around lines
304 - 314, The test opens files in a loop and uses defer f.Close(), which delays
closing until the test function exits; change this so each file is closed
immediately after use: in the loop that iterates over uploadedFiles, replace
defer f.Close() with an explicit f.Close() call (checking/ignoring the error)
once you're done with parquet.OpenFile and the pf.NumRows() assertion, or
alternatively wrap the loop body in an inner anonymous func so the defer
f.Close() is scoped and runs at the end of each iteration; update the code paths
around os.Open, parquet.OpenFile, and pf.NumRows() accordingly.
Signed-off-by: entlein <einentlein@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
src/vizier/services/adaptive_export/internal/pixie/pixie.go (1)
70-70: 🧹 Nitpick | 🔵 Trivial | ⚡ Quick winSet TLS
MinVersionto enforce minimum protocol security.The static analysis correctly flags the missing
MinVersion. Adding this ensures the client won't negotiate deprecated TLS versions.🔒 Proposed fix to add MinVersion
- tlsConfig := &tls.Config{InsecureSkipVerify: isInternal} + tlsConfig := &tls.Config{ + InsecureSkipVerify: isInternal, + MinVersion: tls.VersionTLS12, + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/vizier/services/adaptive_export/internal/pixie/pixie.go` at line 70, The tls.Config construction for the Pixie client sets InsecureSkipVerify but omits MinVersion; update the tlsConfig initialization (variable tlsConfig) to include MinVersion: tls.VersionTLS12 (or tls.VersionTLS13 if you require only TLS1.3) so the client will refuse to negotiate deprecated TLS protocols; keep InsecureSkipVerify: isInternal as-is and ensure the change is applied where tlsConfig is used (e.g., the Pixie client TLS setup).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/vizier/services/adaptive_export/cmd/main.go`:
- Line 140: The lifecycle step comments in main (around the bootstrap sequence
that builds trigger+sink+controller and the subsequent lifecycle blocks) have
duplicated/incorrect numbering: update the comment preceding the
trigger/sink/controller construction (currently "2.") to "4.", change the later
comments that read "3.", "4.", and "5." to "5.", "6.", and "7." respectively so
the sequence is monotonically increasing; locate these comments near the main
bootstrap function that constructs the trigger/sink/controller and the following
lifecycle blocks and correct their numeric prefixes accordingly.
- Line 282: Remove the dead-code placeholder "var _ = fmt.Sprintf" — delete the
unused fmt.Sprintf reference so the code no longer has an unnecessary blank
identifier assignment; keep the fmt import intact because fmt.Errorf is used
elsewhere (so no other changes to imports are required).
In `@src/vizier/services/adaptive_export/internal/pixie/pixie.go`:
- Around line 68-72: The internal-hostname check in Client.init incorrectly uses
strings.Contains on c.cloudAddr which can match substrings (e.g.,
"cluster.local.attacker.com") and incorrectly set tls.Config.InsecureSkipVerify;
instead parse the host portion (use net.SplitHostPort or net/url/ net.ParseIP
depending on format) from c.cloudAddr, then perform a suffix check against
"cluster.local" (e.g., strings.HasSuffix(host, ".cluster.local") or host ==
"cluster.local") to set isInternal; update Client.init to derive host, compute
isInternal from the suffix-safe check, and then create
tls.Config{InsecureSkipVerify: isInternal} and proceed with credentials.NewTLS
and grpc.Dial as before.
---
Duplicate comments:
In `@src/vizier/services/adaptive_export/internal/pixie/pixie.go`:
- Line 70: The tls.Config construction for the Pixie client sets
InsecureSkipVerify but omits MinVersion; update the tlsConfig initialization
(variable tlsConfig) to include MinVersion: tls.VersionTLS12 (or
tls.VersionTLS13 if you require only TLS1.3) so the client will refuse to
negotiate deprecated TLS protocols; keep InsecureSkipVerify: isInternal as-is
and ensure the change is applied where tlsConfig is used (e.g., the Pixie client
TLS setup).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 0a5ad53f-7732-4ae6-a44a-8927b4ecc794
📒 Files selected for processing (12)
src/vizier/services/adaptive_export/cmd/main.gosrc/vizier/services/adaptive_export/internal/clickhouse/apply.gosrc/vizier/services/adaptive_export/internal/clickhouse/apply_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/ddl_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/insert_test.gosrc/vizier/services/adaptive_export/internal/controller/controller.gosrc/vizier/services/adaptive_export/internal/controller/controller_test.gosrc/vizier/services/adaptive_export/internal/e2e/e2e_test.gosrc/vizier/services/adaptive_export/internal/kubescape/extract.gosrc/vizier/services/adaptive_export/internal/kubescape/extract_test.gosrc/vizier/services/adaptive_export/internal/pixie/pixie.gosrc/vizier/services/adaptive_export/internal/pxl/tables_test.go
Switch the kubescape_logs poll from `event_time > watermark` to `event_time >= watermark` and track a per-boundary fingerprint set (sha256 of EventTime + RuleID + Hostname + JSON content). Rows that land with the boundary timestamp after a poll completed are no longer silently dropped — they are deduped against the fingerprint set on the next poll. Addresses CodeRabbit comment on PR #37 src/vizier/services/adaptive_export/internal/trigger/clickhouse.go:121.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/vizier/services/adaptive_export/internal/trigger/clickhouse_test.go`:
- Around line 127-146: Add a unit test in clickhouse_test.go that verifies
fingerprint-based deduplication at the watermark boundary: create an
httptest.Server handler that on consecutive polls first returns the same
canonical row twice (to simulate a late duplicate) and then returns a different
row with the same event_time; instantiate the trigger via New(Config{Endpoint:
srv.URL, Hostname: "node-1", PollInterval: 30*time.Millisecond}), Subscribe(ctx)
and assert that the duplicate row is emitted only once while the distinct row
with the same event_time is also emitted; reference the existing TestTrigger_*
tests and use the same canonicalRowJSON, New, Config and Subscribe symbols to
locate where to add the test.
In `@src/vizier/services/adaptive_export/internal/trigger/clickhouse.go`:
- Around line 168-175: The SQL built in ClickHouseHTTP.fetchSince interpolates
t.cfg.Database and t.cfg.Table directly, allowing identifier injection; update
the ClickHouseHTTP constructor (New) to validate or safely quote these
identifiers: validate cfg.Database and cfg.Table against
/^[A-Za-z_][A-Za-z0-9_]*$/ and return an error on mismatch, or normalize and
wrap them in ClickHouse identifier quotes (backticks) so fetchSince uses only
validated/quoted values; keep quoteCH for hostname literal escaping and ensure
any change is applied to the stored t.cfg fields used by fetchSince.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 834318a0-cbd7-43be-828a-f0b703a6837c
📒 Files selected for processing (2)
src/vizier/services/adaptive_export/internal/trigger/clickhouse.gosrc/vizier/services/adaptive_export/internal/trigger/clickhouse_test.go
- Reject Database / Table config values that don't match the plain ClickHouse identifier shape `[A-Za-z_][A-Za-z0-9_]*`. SQL identifiers cannot be parameterised, so the only safe path is validation at Config construction time. Closes the SELECT injection surface flagged by review. - Add a unit test that drives the watermark-boundary dedup path: stub server returns the same canonical row twice across consecutive polls AND a distinct row at the same event_time; assert the duplicate is emitted once and the distinct row is emitted once. Addresses CodeRabbit review on PR #37 src/vizier/services/adaptive_export/internal/trigger/clickhouse.go:168-175 src/vizier/services/adaptive_export/internal/trigger/clickhouse_test.go:127-146.
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/vizier/services/adaptive_export/internal/trigger/clickhouse_test.go`:
- Around line 33-43: The test is racy because lastQuery is written in the server
goroutine and read later after a fixed sleep; change the httptest.NewServer
handler in clickhouse_test.go to send each received query string into a channel
(instead of writing to lastQuery directly) and have the test receive from that
channel to wait for the second poll before asserting; update references to
calls/lastQuery by reading the second value from the channel (use the existing
atomic calls to keep counting if needed) so the assertion waits
deterministically for the server's second request rather than sleeping.
In `@src/vizier/services/adaptive_export/internal/trigger/clickhouse.go`:
- Around line 214-218: The code currently calls io.ReadAll(resp.Body) and passes
the whole byte slice to parseJSONEachRow, which risks OOM for large ClickHouse
result sets; change to stream and parse directly from the response body
(resp.Body) instead of buffering: add or refactor parseJSONEachRow to accept an
io.Reader (e.g., parseJSONEachRowFromReader or change signature to
parseJSONEachRow(r io.Reader) returning the same values), call it with
resp.Body, and ensure resp.Body is closed and errors are propagated; apply the
same change to the other occurrence referenced (the block around lines 221-237)
so both paths parse JSONEachRow from the reader rather than using io.ReadAll.
- Around line 244-250: The current loop aborts the whole batch on json.Unmarshal
or parseUint64Loose errors, returning an error that includes the raw line and
leaving the watermark unchanged so the bad row is retried forever; change the
handling in the block that uses json.Unmarshal(line, &rr) and
parseUint64Loose(rr.EventTime) to skip malformed rows instead of returning: log
the parse error (do not include the raw line payload in the log returned to
callers), continue to the next row, and ensure normal processing continues so
the watermark can advance for subsequent good rows; do not change the method
signature—just replace the fmt.Errorf returns in that parse path with a
non-fatal log/skip and continue.
- Around line 73-75: The constructor New() currently only calls
url.Parse(cfg.Endpoint) which accepts relative URLs; change the validation to
parse cfg.Endpoint (using url.Parse or url.ParseRequestURI) and then ensure the
parsed URL has a non-empty Scheme of "http" or "https" and a non-empty Host; if
those checks fail return an error like the existing fmt.Errorf("trigger: invalid
Endpoint %q: %w", cfg.Endpoint, err) or a similar descriptive error. Update the
validation near the url.Parse call and keep the error handling path consistent
with the function so callers get a clear failure immediately instead of failing
later in http.NewRequestWithContext.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 6559ef54-31d4-4cdc-a627-bd9d7c4ea30e
📒 Files selected for processing (2)
src/vizier/services/adaptive_export/internal/trigger/clickhouse.gosrc/vizier/services/adaptive_export/internal/trigger/clickhouse_test.go
…e, skip-on-malformed, race-free test - Endpoint validation: require http or https scheme + non-empty Host in New() so a config with a bare path or relative URL fails at construction, not later inside http.NewRequestWithContext. - parseJSONEachRow now takes io.Reader and streams from resp.Body with bufio.Scanner. Memory is bounded at one row regardless of result-set size; the previous io.ReadAll path could OOM on large ClickHouse responses. - Malformed JSON rows and bad event_time values are now logged at Debug and skipped, not fatal. The previous return-on-error caused the watermark to never advance, so a single bad row would block every subsequent poll forever. - Test TestTrigger_Polls_HostnameAndWatermark replaces the shared-variable-plus-fixed-sleep pattern with a channel that the server pushes each query into; the assertion waits deterministically for the second request instead of racing. Addresses CodeRabbit review on PR #37 (4 actionable items at trigger/clickhouse.go lines 73-75, 214-218, 244-250 and trigger/clickhouse_test.go lines 33-43).
… DDL When the operator runs with INSERT-only ClickHouse creds (the safe production split: separate high-priv Job applies DDL once, low-priv Deployment never DDLs), `clickhouse.Apply` cannot succeed and the boot would fatal. Add ADAPTIVE_SKIP_APPLY=true to skip Apply while keeping VerifyPixieSchema (which only requires SELECT on system.columns) so the drift guard still fires. Also documents the recommended production split in the comment block.
Two demo-blocking bugs found while running the live attack against a
sovereignsocdemo cluster:
(1) controller.go — kubescape's event_time as written by the soc
Vector pipeline is unix SECONDS (UInt64 ~1.7e9), not nanos. The
operator was passing it to time.Unix(0, X) which interprets X as
nanos, producing t_start=1969-12-31 / last_seen=1970-01-01 in
every adaptive_attribution row. New eventTimeToTime() auto-detects
the unit by magnitude (seconds < 1e10 < millis < 1e13 < nanos).
(2) cmd/main.go — Pixie's ClickHouse retention plugin can be enabled
with no scripts installed; the plugin then writes nothing. Add
INSTALL_PRESET_SCRIPTS env (default false) that on boot fetches
the plugin's presets and installs the ones not already on the
cluster (matched by script name). Restores internal/script and
the GetPresetScripts / GetClusterScripts / AddDataRetentionScript
methods on the pixie plugin client; both were deleted in rev-2
on the assumption users would author scripts via the Pixie UI.
Production deployments still default to that path; the env flag
is for demo / lab bootstrap.
Single-shot fan-out only captured pixie data from PEM-startup to query-time (~30-60s). The spec requires the operator to write pixie rows for the FULL [event_time-Before, event_time+After] window (default ±5min) on each kubescape anomaly. Loop pushPixieRows every PushRefreshInterval (default 30s) until the attribution row's t_end is in the past. Each pass queries pixie for [lastUpper, now] so already-written rows aren't duplicated. Window extensions from concurrent kubescape events are picked up on the next iteration. Validated end-to-end: a 12-attack run produced 580s span of redis_events (≈ ATTACK ± 5min), all timestamps consistent across kubescape_logs.event_time / adaptive_attribution.t_start / redis_events.time_ within 1s.
There was a problem hiding this comment.
Actionable comments posted: 12
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/vizier/services/adaptive_export/cmd/main.go`:
- Around line 403-420: The current purge loop deletes every script returned in
current before re-installing built-ins; change it to only delete scripts that
are known managed/legacy preset names. Build a set/map of preset names from
builtinPresetScripts() (or a maintained list of legacy managed names), then
iterate over current (from GetClusterScripts()) and call
client.DeleteDataRetentionScript(s.ScriptId) only if s.Name is in that preset
set; leave all other scripts untouched. Keep the existing logging around
client.DeleteDataRetentionScript, AddDataRetentionScript, and the installed
counter (functions: DeleteDataRetentionScript, AddDataRetentionScript,
builtinPresetScripts).
- Around line 434-449: The preset generation currently includes dotted table
names in the tables slice (e.g., "http2_messages.beta", "kafka_events.beta")
which px.DataFrame rejects; remove those dotted entries from the tables list (or
filter them out before creating ScriptDefinition) so the loop that builds script
bodies (the tables variable and the for _, t := range tables loop in main.go)
does not emit broken presets; also update the surrounding comment to reflect
that dotted-named tables are intentionally excluded.
In `@src/vizier/services/adaptive_export/internal/controller/controller.go`:
- Around line 314-345: The loop advances lastUpper unconditionally which drops
time slices when a table's QueryFor / querier.Query / sink.WritePixieRows (used
in the PushPixieTables loop) fails; change the logic so each table maintains its
own watermark (e.g., a map[table]string/Time or per-table lastUpper) and only
update that table's cursor when its call to c.sink.WritePixieRows succeeds,
leaving other tables' cursors unchanged and retrying failed slices on subsequent
passes; locate the loop iterating over c.cfg.PushPixieTables and update the
advancement of lastUpper to per-table state tied to the table identifier used in
QueryFor, querier.Query, and WritePixieRows.
- Around line 96-115: The defaulting of PushRefreshInterval inside
Config.defaulted() prevents callers from selecting the documented single-shot
mode (<=0) because defaulted() rewrites 0 to 30s; remove or change that behavior
by not overwriting a zero value in Config.defaulted() and instead apply the 30s
default at the env-parsing/constructor layer (or change PushRefreshInterval to a
pointer/optional type so nil means "use default" while 0 remains a valid
single-shot sentinel). Update code paths that rely on pushPixieRows() reading
<=0 for single-shot to continue working and move the current defaulting logic
out of defaulted() (or represent unset separately) so callers can intentionally
pass 0.
- Around line 283-296: The code reads fields from a shared struct pointer
`current` (from `c.active[initial.AnomalyHash]`) after releasing `c.mu`, causing
a data race with `handle()` which mutates the same struct; fix by copying the
active row while holding `c.mu` (e.g., make a local value copy of `*current`)
and then release the lock and use that copy for checks like `TEnd` and the
fan-out in `pushPixieRows` so reads no longer race with `handle()` mutations.
In `@src/vizier/services/adaptive_export/internal/pixieapi/BUILD.bazel`:
- Around line 1-14: Add the standard Apache 2.0 copyright header to this
BUILD.bazel file to match other files in the PR (e.g., the header present in
src/vizier/services/adaptive_export/internal/pxl/BUILD.bazel); update the top of
the file containing the go_library rule named "pixieapi" (and its metadata like
srcs/importpath/visibility/deps) to prepend the same multi-line copyright
comment block used across the repo.
In `@src/vizier/services/adaptive_export/internal/pixieapi/pixieapi.go`:
- Around line 109-130: The Query method (Adapter.Query) creates a fresh pxapi
client when a.directOpts != nil via pxapi.NewClient (assigned to local c and
then to client) but never closes that client; modify Query to ensure the newly
created client (c) is closed after the request completes (e.g., defer c.Close()
immediately after NewClient succeeds or call c.Close() before returning/after
using client) so the temporary gRPC connection is explicitly released when
direct mode is used.
In `@src/vizier/services/adaptive_export/internal/pxl/queryfor.go`:
- Around line 42-45: The padding logic around pad := now.Sub(sliceStart) +
30*time.Second is redundant for past or present sliceStart; either remove the if
block that forces pad = 30*time.Second or, better, validate the caller and
return an error when sliceStart > now. Update the function that contains this
code (e.g., QueryFor) to check if sliceStart.After(now) and return a descriptive
error (or panic if that fits project conventions); otherwise simply compute pad
:= now.Sub(sliceStart) + 30*time.Second and drop the minimum-padding branch.
- Around line 38-71: Add unit tests for QueryFor that validate the time-window
math, escaping, and conditional namespace/pod filters: write a test file that
calls QueryFor with a known builtin table (use IsBuiltin to pick one or stub it)
and asserts the returned PXL string contains a correctly computed relStart (now
- sliceStart, floored to seconds and min 30s), contains px.int64_to_time(...)
with sliceStart.UnixNano() and sliceEnd.UnixNano(), sets df.namespace and
df.pod, and applies the namespace and pod filters (including the namespaced
podKey when both t.Namespace and t.Pod are set and that escapePxL is applied);
also add a test where QueryFor is called with an unknown table and assert it
returns an error wrapping ErrUnknownTable. Ensure tests reference QueryFor,
escapePxL, IsBuiltin, and ErrUnknownTable so they fail if behavior changes.
In `@src/vizier/services/adaptive_export/internal/sink/clickhouse.go`:
- Around line 251-256: QueryActive currently calls io.ReadAll then
parseActiveRows which buffers the whole response; change QueryActive to stream
and parse line-by-line like trigger's parseJSONEachRow to avoid buffering large
responses. Concretely, update QueryActive to pass resp.Body (an io.Reader) into
a streaming parser instead of calling io.ReadAll, and either refactor
parseActiveRows to accept an io.Reader or add a new parseActiveRowsFromReader
that uses bufio.Scanner or bufio.Reader to read JSON rows line-by-line and
decode/append incrementally; keep existing parseActiveRows semantics for small
inputs (you can call it from the new reader-based implementation for
single-buffer fallback). Ensure function names referenced: QueryActive,
parseActiveRows, parseJSONEachRow.
- Around line 258-263: The quoteCH function repeatedly allocates a
strings.Replacer per call (strings.NewReplacer(`\`, `\\`, `'`, `\'`)), causing
unnecessary allocations; replace that by creating a single shared replacer
(e.g., a package-level variable like chQuoteReplacer :=
strings.NewReplacer(...)) or move quoteCH into a shared utility package and
reuse that replacer from both sink and trigger, then update quoteCH to call
chQuoteReplacer.Replace(s) and return the quoted string.
In `@src/vizier/services/adaptive_export/internal/trigger/clickhouse.go`:
- Around line 284-287: The quoteCH function creates a new strings.Replacer on
every call causing unnecessary allocations; hoist the replacer to a
package-level variable (e.g., var chQuoteReplacer = strings.NewReplacer(`\`,
`\\`, `'`, `\'`)) and have quoteCH call chQuoteReplacer.Replace(s) instead,
leaving the returned "'" + ... + "'" logic intact; this mirrors the optimization
used in queryfor.go and reduces per-row allocations in rowFingerprint while
relying on strings.Replacer being safe for concurrent use.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: b0c60ed1-3f6d-4fb0-9609-b83463169972
📒 Files selected for processing (14)
src/vizier/services/adaptive_export/cmd/BUILD.bazelsrc/vizier/services/adaptive_export/cmd/main.gosrc/vizier/services/adaptive_export/internal/controller/BUILD.bazelsrc/vizier/services/adaptive_export/internal/controller/controller.gosrc/vizier/services/adaptive_export/internal/controller/controller_test.gosrc/vizier/services/adaptive_export/internal/pixie/pixie.gosrc/vizier/services/adaptive_export/internal/pixieapi/BUILD.bazelsrc/vizier/services/adaptive_export/internal/pixieapi/pixieapi.gosrc/vizier/services/adaptive_export/internal/pxl/BUILD.bazelsrc/vizier/services/adaptive_export/internal/pxl/queryfor.gosrc/vizier/services/adaptive_export/internal/sink/clickhouse.gosrc/vizier/services/adaptive_export/internal/sink/clickhouse_test.gosrc/vizier/services/adaptive_export/internal/trigger/clickhouse.gosrc/vizier/services/adaptive_export/internal/trigger/clickhouse_test.go
| // PushRefreshInterval — how often pushPixieRows re-queries pixie | ||
| // while the attribution window is still active. The first query | ||
| // covers [t_start, now]; subsequent queries cover only the new | ||
| // slice [last_upper, now] so we don't duplicate rows. Defaults to | ||
| // 30s. Set to 0 to disable periodic re-fan-out (single-shot mode, | ||
| // which loses pixie traffic that arrives after the kubescape event). | ||
| PushRefreshInterval time.Duration | ||
| } | ||
|
|
||
| func (c *Config) defaulted() Config { | ||
| out := *c | ||
| if out.Before == 0 { | ||
| out.Before = 5 * time.Minute | ||
| } | ||
| if out.After == 0 { | ||
| out.After = 5 * time.Minute | ||
| } | ||
| if out.PushRefreshInterval == 0 { | ||
| out.PushRefreshInterval = 30 * time.Second | ||
| } |
There was a problem hiding this comment.
PushRefreshInterval=0 can never select single-shot mode.
defaulted() rewrites 0 to 30s, but both the Config doc and pushPixieRows() treat <= 0 as “disable periodic re-fan-out”. Callers cannot request the documented behavior, and the only way to reach it today is an undocumented negative value. Move this defaulting to the env-parsing layer or represent “unset” separately.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/internal/controller/controller.go` around
lines 96 - 115, The defaulting of PushRefreshInterval inside Config.defaulted()
prevents callers from selecting the documented single-shot mode (<=0) because
defaulted() rewrites 0 to 30s; remove or change that behavior by not overwriting
a zero value in Config.defaulted() and instead apply the 30s default at the
env-parsing/constructor layer (or change PushRefreshInterval to a
pointer/optional type so nil means "use default" while 0 remains a valid
single-shot sentinel). Update code paths that rely on pushPixieRows() reading
<=0 for single-shot to continue working and move the current defaulting logic
out of defaulted() (or represent unset separately) so callers can intentionally
pass 0.
| func QueryFor(table string, t anomaly.Target, sliceStart, sliceEnd, now time.Time) (string, error) { | ||
| if !IsBuiltin(table) { | ||
| return "", fmt.Errorf("%w: %q", ErrUnknownTable, table) | ||
| } | ||
| pad := now.Sub(sliceStart) + 30*time.Second | ||
| if pad < 30*time.Second { | ||
| pad = 30 * time.Second | ||
| } | ||
| relStart := "-" + strconv.FormatInt(int64(pad/time.Second), 10) + "s" | ||
|
|
||
| var b strings.Builder | ||
| b.WriteString("import px\n") | ||
| b.WriteString("df = px.DataFrame(table='" + table + "', start_time='" + relStart + "')\n") | ||
| b.WriteString("df = df[df.time_ >= px.int64_to_time(" + strconv.FormatInt(sliceStart.UnixNano(), 10) + ")]\n") | ||
| b.WriteString("df = df[df.time_ < px.int64_to_time(" + strconv.FormatInt(sliceEnd.UnixNano(), 10) + ")]\n") | ||
| b.WriteString("df.namespace = px.upid_to_namespace(df.upid)\n") | ||
| // px.upid_to_pod_name returns "<namespace>/<pod>" (carnot: | ||
| // metadata_ops.h UPIDToPodNameUDF::Exec → absl::Substitute("$0/$1", ns, name)), | ||
| // not the bare pod name. Filtering against bare t.Pod would always | ||
| // miss; build the namespaced key when we have both fields. | ||
| b.WriteString("df.pod = px.upid_to_pod_name(df.upid)\n") | ||
| if t.Namespace != "" { | ||
| b.WriteString("df = df[df.namespace == '" + escapePxL(t.Namespace) + "']\n") | ||
| } | ||
| if t.Pod != "" { | ||
| podKey := t.Pod | ||
| if t.Namespace != "" { | ||
| podKey = t.Namespace + "/" + t.Pod | ||
| } | ||
| b.WriteString("df = df[df.pod == '" + escapePxL(podKey) + "']\n") | ||
| } | ||
| b.WriteString("px.display(df, '" + table + "')\n") | ||
| return b.String(), nil | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | ⚡ Quick win
Consider adding unit tests for QueryFor.
The AI summary notes that tables_test.go covers builtin table metadata but there are no direct tests for QueryFor. Given the time-window math, escaping logic, and conditional namespace/pod filters, unit tests would help catch regressions.
Would you like me to generate a test file skeleton covering the key cases (valid table, unknown table, namespace/pod filtering, time bounds)?
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/internal/pxl/queryfor.go` around lines 38
- 71, Add unit tests for QueryFor that validate the time-window math, escaping,
and conditional namespace/pod filters: write a test file that calls QueryFor
with a known builtin table (use IsBuiltin to pick one or stub it) and asserts
the returned PXL string contains a correctly computed relStart (now -
sliceStart, floored to seconds and min 30s), contains px.int64_to_time(...) with
sliceStart.UnixNano() and sliceEnd.UnixNano(), sets df.namespace and df.pod, and
applies the namespace and pod filters (including the namespaced podKey when both
t.Namespace and t.Pod are set and that escapePxL is applied); also add a test
where QueryFor is called with an unknown table and assert it returns an error
wrapping ErrUnknownTable. Ensure tests reference QueryFor, escapePxL, IsBuiltin,
and ErrUnknownTable so they fail if behavior changes.
| pad := now.Sub(sliceStart) + 30*time.Second | ||
| if pad < 30*time.Second { | ||
| pad = 30 * time.Second | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | 💤 Low value
Redundant minimum-padding check when sliceStart is in the past.
When sliceStart <= now, now.Sub(sliceStart) is non-negative, so pad is already >= 30s after line 42. The condition on lines 43-45 only triggers if sliceStart > now (a future timestamp), which would be a caller bug. Consider either removing the redundant check or explicitly validating/erroring on sliceStart > now.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/internal/pxl/queryfor.go` around lines 42
- 45, The padding logic around pad := now.Sub(sliceStart) + 30*time.Second is
redundant for past or present sliceStart; either remove the if block that forces
pad = 30*time.Second or, better, validate the caller and return an error when
sliceStart > now. Update the function that contains this code (e.g., QueryFor)
to check if sliceStart.After(now) and return a descriptive error (or panic if
that fits project conventions); otherwise simply compute pad :=
now.Sub(sliceStart) + 30*time.Second and drop the minimum-padding branch.
Signed-off-by: entlein <einentlein@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/vizier/services/adaptive_export/cmd/main.go (1)
53-62:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winImport ordering still needs fixing.
The
pxapiimport (line 55) is misplaced betweenclickhouseandcontrollerimports, causing thegcilinter failure. It should be grouped with otherpx.dev/pixie/src/apiimports or sorted alphabetically within thepx.devgroup.🔧 Proposed fix
import ( "context" "fmt" "os" "os/signal" "strconv" "strings" "sync" "syscall" "time" log "github.com/sirupsen/logrus" + "px.dev/pixie/src/api/go/pxapi" "px.dev/pixie/src/vizier/services/adaptive_export/internal/clickhouse" "px.dev/pixie/src/vizier/services/adaptive_export/internal/config" - "px.dev/pixie/src/api/go/pxapi" "px.dev/pixie/src/vizier/services/adaptive_export/internal/controller" "px.dev/pixie/src/vizier/services/adaptive_export/internal/pixie"🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/vizier/services/adaptive_export/cmd/main.go` around lines 53 - 62, Move the misordered import "px.dev/pixie/src/api/go/pxapi" so it is grouped/sorted correctly with the other px.dev imports (either alongside other px.dev/api imports or alphabetically within the px.dev/* group) in the import block of main.go; update the import ordering to match the project's gci style (e.g., place pxapi next to other px.dev API imports or alphabetize within the px.dev group) and run the project's import formatter (gci/goimports) to verify the linter error is resolved.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/vizier/services/adaptive_export/internal/sink/clickhouse.go`:
- Around line 319-328: The error returned from json.Unmarshal includes the full
raw line (string(line)) which may leak sensitive ClickHouse row data; update the
error construction in the json.Unmarshal failure path (where
json.Unmarshal(line, &w) is called) to omit or truncate the line content — e.g.,
change the fmt.Errorf call to only include the unmarshalling error
(fmt.Errorf("sink: parse active row: %w", err)) or a safe truncated preview —
and apply the same safe-logging approach for similar parse failures around
nsFromRaw handling (the block using ts, te, ls, pidI64, nAn and
err1/err2/err3/errPID/errN) so no raw payload is logged.
---
Duplicate comments:
In `@src/vizier/services/adaptive_export/cmd/main.go`:
- Around line 53-62: Move the misordered import "px.dev/pixie/src/api/go/pxapi"
so it is grouped/sorted correctly with the other px.dev imports (either
alongside other px.dev/api imports or alphabetically within the px.dev/* group)
in the import block of main.go; update the import ordering to match the
project's gci style (e.g., place pxapi next to other px.dev API imports or
alphabetize within the px.dev group) and run the project's import formatter
(gci/goimports) to verify the linter error is resolved.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 96eb5715-03d7-4ac4-8b8a-9ea0e78cc175
📒 Files selected for processing (7)
src/vizier/services/adaptive_export/cmd/main.gosrc/vizier/services/adaptive_export/internal/controller/controller.gosrc/vizier/services/adaptive_export/internal/pixieapi/BUILD.bazelsrc/vizier/services/adaptive_export/internal/pixieapi/pixieapi.gosrc/vizier/services/adaptive_export/internal/pxl/queryfor.gosrc/vizier/services/adaptive_export/internal/sink/clickhouse.gosrc/vizier/services/adaptive_export/internal/trigger/clickhouse.go
Signed-off-by: entlein <einentlein@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/vizier/services/adaptive_export/internal/sink/clickhouse.go`:
- Around line 23-25: Update the stale header comment to reflect current
behavior: replace the assertion that the sink does NOT write Pixie observation
rows with a statement that it now does write Pixie observation rows via the
WritePixieRows function (in addition to writing to
forensic_db.adaptive_attribution), so future readers aren't misled; mention
WritePixieRows by name in the comment to tie behavior to the implementation.
- Around line 98-103: cfg.Database is used directly in SQL string interpolation
(where SQL is built around cfg.Database) and must be validated/sanitized to
prevent injection; add a helper like validateDatabaseName(db string) error (or
sanitizeDatabaseName) and call it before any SQL construction that uses
cfg.Database, enforcing a strict whitelist regex such as
^[A-Za-z_][A-Za-z0-9_]*$ (or another narrowly-scoped pattern your DB allows),
returning an error if it fails; then use the validated/sanitized value (or quote
it safely using ClickHouse identifier quoting) when building the SQL in the
functions that construct queries using cfg.Database to ensure only safe
identifiers are interpolated.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 54500d66-fe97-42b9-8381-b9991f9d948b
📒 Files selected for processing (1)
src/vizier/services/adaptive_export/internal/sink/clickhouse.go
Signed-off-by: entlein <einentlein@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/vizier/services/adaptive_export/internal/sink/clickhouse.go`:
- Around line 339-358: The parsing of PID and NAnomalies via nsFromRaw currently
relies on fmt.Sscanf with "%d" which parses into int64 and can fail for values >
INT64_MAX; update nsFromRaw (or add a dedicated parseUintField helper used when
constructing AttributionRow for PID and NAnomalies) to use strconv.ParseUint
with base 10 and 64-bit width, return a uint64 and proper error, then
convert/assign that uint64 directly to AttributionRow.PID and
AttributionRow.NAnomalies (replace usage of pidI64/nAn and uint64(...) casts),
and ensure the error checks in the block that builds AttributionRow handle the
new parse errors the same way.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 219dc661-9ff8-4744-848d-56c4c85edfb1
📒 Files selected for processing (1)
src/vizier/services/adaptive_export/internal/sink/clickhouse.go
Signed-off-by: entlein <einentlein@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/vizier/services/adaptive_export/internal/sink/clickhouse.go`:
- Around line 117-123: The constructor currently only treats cfg.Timeout == 0
specially; ensure cfg.Timeout is validated so negative values cannot disable the
http.Client deadline: in the function that builds and returns
&ClickHouseHTTP{...} check cfg.Timeout and if cfg.Timeout < 0 return an error
(or normalize it to a safe default) instead of proceeding, and keep using
cfg.Timeout when creating http.Client{Timeout: cfg.Timeout}; reference
cfg.Timeout, ClickHouseHTTP and the http.Client{Timeout: cfg.Timeout} creation
to locate and update the logic.
- Around line 101-107: The constructor New currently parses cfg.Endpoint but
only checks scheme and host; extend validation to reject endpoints containing
query or fragment components so downstream concatenation
(s.cfg.Endpoint+"/?"+q.Encode()) cannot produce malformed URLs. After parsing
(where u, err := url.Parse(cfg.Endpoint) is used), add checks that u.RawQuery ==
"" and u.Fragment == "" and return a clear error (e.g., fmt.Errorf("sink:
Endpoint must not contain query or fragment: %q", cfg.Endpoint)) when either is
non-empty; reference the New function, cfg.Endpoint, and the parsed url u when
making this change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: abefaddf-2c30-46ff-8320-a0086aa3a059
📒 Files selected for processing (1)
src/vizier/services/adaptive_export/internal/sink/clickhouse.go
Signed-off-by: entlein <einentlein@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/vizier/services/adaptive_export/internal/sink/clickhouse.go`:
- Around line 194-195: The timestamp serialization is emitting bare "2006-01-02
15:04:05.000000000" strings (see the time.Time case in the serializer in
clickhouse.go) which ClickHouse will interpret in the session timezone and can
silently shift values; fix by either (preferred) declaring adaptive_attribution
columns t_start, t_end, and last_seen as DateTime64(9, 'UTC') in the ClickHouse
schema, or (alternate) change the time.Time serialization in the serializer
function (the time.Time case) to append an explicit timezone marker (e.g.,
"2006-01-02 15:04:05.000000000 UTC") so stored values are unambiguous. Ensure
any deserialization/path that formats time (the other time formatting block
present) is updated consistently if you choose the serialization approach.
- Line 169: The request URL is built by string-concatenating s.cfg.Endpoint +
"/?"+q.Encode() (seen where http.NewRequestWithContext is called), which can
produce a double-slash if Endpoint ends with a slash; update the constructor
New() to normalize Endpoint (strip any trailing slash) or, better, parse
s.cfg.Endpoint into a url.URL once in New() and store it, then compose requests
by setting u.Path (or using u.ResolveReference) and u.RawQuery = q.Encode()
before calling http.NewRequestWithContext; apply this change to the three call
sites that build the request URL so all requests use the normalized/constructed
url rather than string concatenation.
- Around line 183-184: The handlers that build errors using fmt.Errorf("sink:
pixie HTTP %d (%s): %s", ...) must stop reading and returning ClickHouse
response bodies (which can leak Pixie rows); instead discard the response body
and return an error that only includes the HTTP status code and context (e.g.,
"sink: pixie HTTP %d (%s)"). Update the occurrences in
src/vizier/services/adaptive_export/internal/sink/clickhouse.go where body is
read via io.ReadAll(io.LimitReader(...)) and used in the fmt.Errorf call (the
INSERT/JSONEachRow and SELECT error paths) to remove the body content from the
error message, and apply the same redaction pattern to the two other handlers
that currently include the body in their fmt.Errorf calls so all three return
only status and context.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: abf342f7-8995-4b9d-8557-73a60c797908
📒 Files selected for processing (1)
src/vizier/services/adaptive_export/internal/sink/clickhouse.go
Signed-off-by: entlein <einentlein@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/vizier/services/adaptive_export/internal/clickhouse/schema.sql`:
- Around line 78-353: The time zone is not pinned for Pixie observation
timestamp columns, so ClickHouse will interpret the sink's UTC-formatted
timestamps in the server/session timezone; update each table definition
(forensic_db.http_events, forensic_db.`http2_messages.beta`,
forensic_db.dns_events, forensic_db.redis_events, forensic_db.mysql_events,
forensic_db.pgsql_events, forensic_db.cql_events, forensic_db.mongodb_events,
forensic_db.`kafka_events.beta`, forensic_db.amqp_events,
forensic_db.mux_events, forensic_db.tls_events) to declare the timestamp columns
time_ and event_time as DateTime64 with an explicit UTC TZ (e.g., change
DateTime64(9) to DateTime64(9, 'UTC') and DateTime64(3) to DateTime64(3, 'UTC'))
so the stored values match the sink's x.UTC() formatting (consistent with
adaptive_attribution).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 1e559583-53a6-4549-8c0c-c26b854ee19f
📒 Files selected for processing (2)
src/vizier/services/adaptive_export/internal/clickhouse/schema.sqlsrc/vizier/services/adaptive_export/internal/sink/clickhouse.go
Signed-off-by: entlein <einentlein@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
src/vizier/services/adaptive_export/internal/sink/clickhouse.go (1)
239-242:⚠️ Potential issue | 🟠 Major | ⚡ Quick winRedact response body from
Writeerror path to prevent attribution payload leakage.Unlike
WritePixieRows(line 191) andQueryActive(line 286) which drain and discard the response body on error,Writestill reads and echoes it. ClickHouse error responses may contain the failingJSONEachRowpayload—here, attribution rows withanomaly_hash, namespace, pod, and timestamps that could propagate to operator logs or surfaced errors.🛡️ Proposed fix to redact response body
if resp.StatusCode/100 != 2 { - msg, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) - return fmt.Errorf("sink: HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(msg))) + // Drain (don't echo) — body may carry attribution rows. + _, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, 4096)) + return fmt.Errorf("sink: HTTP %d", resp.StatusCode) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/vizier/services/adaptive_export/internal/sink/clickhouse.go` around lines 239 - 242, The Write method currently reads and includes the ClickHouse response body in the returned error, risking leakage of attribution payload; change the error path in Write (same pattern used by WritePixieRows and QueryActive) to drain and discard resp.Body (e.g., io.Copy to io.Discard or io.LimitReader + discard) and return a redacted error message that does not contain the response body (e.g., "sink: HTTP %d" with resp.StatusCode). Ensure resp.Body is closed after draining.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/vizier/services/adaptive_export/internal/pxl/queryfor_test.go`:
- Around line 19-33: Imports and formatting in queryfor_test.go are violating
gci rules; run the repo's Go formatting/import normalization (e.g.,
gofmt/goimports or the project's gci autofix) and commit the result so imports
are grouped and ordered correctly (standard libs, blank line, external, blank
line, local packages) and spacing matches gofmt; locate the block containing
imports and the vars fixedNow, fixedStart, fixedEnd, and target to verify the
file is normalized and CI passes.
- Around line 174-183: The test TestQueryFor_EscapesBackslashInTarget only
asserts the namespace was escaped; add a second assertion that verifies the pod
value from anomaly.Target.Pod is also escaped in the generated query from
QueryFor (use strings.Contains on q to check for the escaped pod literal, e.g.
the pod comparison expression produced by QueryFor like "df.pod == 'p\\od'" or
the namespaced pod key that QueryFor emits), referencing anomaly.Target.Pod to
locate the test input and QueryFor to locate where the output string is
generated.
In `@src/vizier/services/adaptive_export/internal/sink/clickhouse_test.go`:
- Around line 19-31: The import block in clickhouse_test.go is mis-ordered per
the project's gci rules; run the project's linter fix (run gci to rewrite
imports or the Makefile lint-fix target) to reorder imports into
standard-library, third-party, then internal groups with single blank lines
between groups, and re-run tests/CI; update the import statement in
src/vizier/services/adaptive_export/internal/sink/clickhouse_test.go accordingly
so the gci check passes.
---
Duplicate comments:
In `@src/vizier/services/adaptive_export/internal/sink/clickhouse.go`:
- Around line 239-242: The Write method currently reads and includes the
ClickHouse response body in the returned error, risking leakage of attribution
payload; change the error path in Write (same pattern used by WritePixieRows and
QueryActive) to drain and discard resp.Body (e.g., io.Copy to io.Discard or
io.LimitReader + discard) and return a redacted error message that does not
contain the response body (e.g., "sink: HTTP %d" with resp.StatusCode). Ensure
resp.Body is closed after draining.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 8574daea-06c5-4f54-83ce-19233a357a8f
📒 Files selected for processing (5)
src/vizier/services/adaptive_export/internal/clickhouse/schema.sqlsrc/vizier/services/adaptive_export/internal/pxl/BUILD.bazelsrc/vizier/services/adaptive_export/internal/pxl/queryfor_test.gosrc/vizier/services/adaptive_export/internal/sink/clickhouse.gosrc/vizier/services/adaptive_export/internal/sink/clickhouse_test.go
| import ( | ||
| "bytes" | ||
| "context" | ||
| "fmt" | ||
| "io" | ||
| "net/http" | ||
| "net/http/httptest" | ||
| "strings" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "px.dev/pixie/src/vizier/services/adaptive_export/internal/anomaly" | ||
| ) |
There was a problem hiding this comment.
Run gci to fix import ordering and unblock CI.
Static analysis reports this file is not properly formatted per gci rules at line 454. This typically relates to import group ordering or spacing. Run the project's linter fix command to auto-format.
#!/bin/bash
# Check gci configuration and current violations
if command -v gci &> /dev/null; then
gci diff src/vizier/services/adaptive_export/internal/sink/clickhouse_test.go
fi
# Also check if there's a Makefile target for lint-fix
rg -l "gci|golangci-lint" Makefile .golangci.yml 2>/dev/null | head -5🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/internal/sink/clickhouse_test.go` around
lines 19 - 31, The import block in clickhouse_test.go is mis-ordered per the
project's gci rules; run the project's linter fix (run gci to rewrite imports or
the Makefile lint-fix target) to reorder imports into standard-library,
third-party, then internal groups with single blank lines between groups, and
re-run tests/CI; update the import statement in
src/vizier/services/adaptive_export/internal/sink/clickhouse_test.go accordingly
so the gci check passes.
Signed-off-by: entlein <einentlein@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/vizier/services/adaptive_export/internal/pxl/queryfor_test.go`:
- Around line 192-205: The test TestQueryFor_EveryBuiltinTableEmits currently
iterates Names(BuiltinTables) and will vacuously pass if that slice is empty;
add an explicit guard at the start of the test that computes tables :=
Names(BuiltinTables) and calls t.Fatalf (or t.Fatal) with a clear message if
len(tables) == 0 to fail loudly on registry regressions, then iterate over that
tables slice (use the existing QueryFor, target, fixedStart, fixedEnd, fixedNow
checks).
- Around line 115-127: The test
TestQueryFor_TimeBoundsAreInclusiveLowerExclusiveUpper is brittle because
wantLower/wantUpper hardcode epoch nanos and exact spacing; update the
assertions to be resilient to template formatting by checking for the essential
tokens instead of exact spacing—e.g., build expected nanos from
fixedStart/fixedEnd (or use the existing constants) and assert that q contains
"int64_to_time(<nanos>)" and separately contains the comparison operators with
the field (e.g., "df.time_ >=" for lower and "df.time_ <" for upper), or use
regexp.MustCompile to match patterns like
`df\s*=\s*df\[df\.time_\s*>=\s*px\.int64_to_time\(<nanos>\)\]`/`df\s*=\s*df\[df\.time_\s*<\s*px\.int64_to_time\(<nanos>\)\]`;
adjust the checks in TestQueryFor_TimeBoundsAreInclusiveLowerExclusiveUpper and
remove reliance on exact spacing in wantLower/wantUpper.
In `@src/vizier/services/adaptive_export/internal/sink/clickhouse_test.go`:
- Around line 103-104: The tests currently ignore errors from
New(Config{Endpoint: srv.URL}) which can hide constructor failures; update each
occurrence (the New constructor calls in clickhouse_test.go used to create s) to
capture the returned error and fail the test immediately if non-nil (e.g.,
t.Fatalf or require.NoError) before calling s.Write or other methods so
constructor validation errors surface clearly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: ffd1ceb5-a929-49a4-affc-21a0b9a2ace9
📒 Files selected for processing (2)
src/vizier/services/adaptive_export/internal/pxl/queryfor_test.gosrc/vizier/services/adaptive_export/internal/sink/clickhouse_test.go
Signed-off-by: entlein <einentlein@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/vizier/services/adaptive_export/internal/sink/clickhouse_test.go`:
- Around line 395-436: The test TestParseActiveRowsStream_BoundsMemory claims to
verify peak memory usage but only validates correctness; either rename it to
TestParseActiveRowsStream_StreamsChunkedInput to reflect that it tests chunked
reading and decoding (change the function name and any references), or augment
it to actually measure allocations by sampling runtime.MemStats (or using
testing.AllocsPerRun) around the parseActiveRowsStream(rdr) call and assert
peak/allocs are below a chosen threshold; keep references to
parseActiveRowsStream and the existing chunkedReader setup when adding the
measurement so the logic and assertions remain tied to the same code path.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: a4da1222-0df3-4e3a-82f5-07b53d2ecd4d
📒 Files selected for processing (2)
src/vizier/services/adaptive_export/internal/pxl/queryfor_test.gosrc/vizier/services/adaptive_export/internal/sink/clickhouse_test.go
| // TestParseActiveRowsStream_BoundsMemory — proves the streaming path | ||
| // doesn't allocate proportional to total response size. Builds a | ||
| // 5 MiB synthetic JSONEachRow body fed in 64 KiB chunks, parses, and | ||
| // asserts (a) all rows decoded correctly, (b) peak intermediate | ||
| // allocation is well below the body size (loose bound: parseActiveRows | ||
| // hands one row at a time to the caller; we collect into a slice but | ||
| // never hold the wire representation of more than one line). | ||
| func TestParseActiveRowsStream_BoundsMemory(t *testing.T) { | ||
| const targetRows = 5000 // ~5MiB at ~1KiB/row | ||
| var buf bytes.Buffer | ||
| row := func(i int) string { | ||
| return fmt.Sprintf(`{"anomaly_hash":"%032x","namespace":"redis","pod":"p","comm":"c","pid":%d,"hostname":"h","t_start_ns":%d,"t_end_ns":%d,"last_seen_ns":%d,"last_rule_id":"R0001","n_anomalies":%d,"_pad":"%s"}`+"\n", | ||
| i, i, 1700000000000000000+int64(i), 1700000000000000000+int64(i)+300_000_000_000, 1700000000000000000+int64(i)+150_000_000_000, i, strings.Repeat("x", 800)) | ||
| } | ||
| for i := 0; i < targetRows; i++ { | ||
| buf.WriteString(row(i)) | ||
| } | ||
| body := buf.Bytes() | ||
|
|
||
| const chunkSize = 64 * 1024 | ||
| chunks := make([][]byte, 0, len(body)/chunkSize+1) | ||
| for off := 0; off < len(body); off += chunkSize { | ||
| end := off + chunkSize | ||
| if end > len(body) { | ||
| end = len(body) | ||
| } | ||
| chunks = append(chunks, body[off:end]) | ||
| } | ||
| rdr := &chunkedReader{chunks: chunks, delay: 0} | ||
|
|
||
| rows, err := parseActiveRowsStream(rdr) | ||
| if err != nil { | ||
| t.Fatalf("parseActiveRowsStream: %v", err) | ||
| } | ||
| if len(rows) != targetRows { | ||
| t.Fatalf("parsed %d rows, want %d", len(rows), targetRows) | ||
| } | ||
| // Spot-check round-trip on one row (last element). | ||
| if rows[targetRows-1].PID != uint64(targetRows-1) { | ||
| t.Fatalf("last row PID = %d, want %d", rows[targetRows-1].PID, targetRows-1) | ||
| } | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | 💤 Low value
Test name suggests memory bounds verification, but no memory is measured.
The test verifies streaming parsing correctness (chunked reads, row decoding), but does not actually measure peak memory allocation. The comment claims it "proves the streaming path doesn't allocate proportional to total response size," yet there's no runtime.ReadMemStats or allocation benchmarking.
Consider either:
- Renaming to
TestParseActiveRowsStream_StreamsChunkedInputto reflect what's actually tested, or - Adding allocation measurement if memory bounds are a critical invariant
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/internal/sink/clickhouse_test.go` around
lines 395 - 436, The test TestParseActiveRowsStream_BoundsMemory claims to
verify peak memory usage but only validates correctness; either rename it to
TestParseActiveRowsStream_StreamsChunkedInput to reflect that it tests chunked
reading and decoding (change the function name and any references), or augment
it to actually measure allocations by sampling runtime.MemStats (or using
testing.AllocsPerRun) around the parseActiveRowsStream(rdr) call and assert
peak/allocs are below a chosen threshold; keep references to
parseActiveRowsStream and the existing chunkedReader setup when adding the
measurement so the logic and assertions remain tied to the same code path.
Summary: pixie aw now creates its own tables, creates default queries with pm 5 min inserts after an anomaly
Test Plan: tested locally with go and tested e2e in lab
Type of change: /kind feature-improvement