Skip to content

Commit efc97be

Browse files
committed
Add precompute engine design doc
1 parent 427ea5c commit efc97be

1 file changed

Lines changed: 393 additions & 0 deletions

File tree

Lines changed: 393 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,393 @@
1+
# Precompute Engine Design Document
2+
3+
## 1. Overview
4+
5+
### Why this PR is needed
6+
7+
ASAPQuery already has a query path over precomputed summaries, but before this PR
8+
there was no standalone runtime inside `asap-query-engine` that could continuously:
9+
10+
- accept raw metric samples,
11+
- turn them into windowed precomputed outputs, and
12+
- write those outputs into the same store that the query engine reads.
13+
14+
PR #228 fills that gap by introducing a first working version of a **precompute
15+
engine**. The engine runs as a separate binary, accepts Prometheus remote write
16+
traffic, partitions incoming series across workers, computes windowed
17+
accumulators, and stores the results for later query-time retrieval.
18+
19+
This PR is primarily about establishing the end-to-end execution path and the
20+
core abstractions:
21+
22+
- ingest endpoint,
23+
- worker sharding model,
24+
- window management,
25+
- accumulator construction and update,
26+
- output sink abstraction, and
27+
- integration with the existing store and query engine.
28+
29+
### Requirements
30+
31+
The implementation in this PR is driven by the following requirements:
32+
33+
1. ASAPQuery needs a native precompute path inside the Rust query engine codebase.
34+
2. The system must ingest a high volume of time-series samples without forcing
35+
cross-worker coordination on every sample.
36+
3. Samples for the same series must be processed consistently by the same worker
37+
so per-series state can stay local.
38+
4. The engine must support windowed precomputation for the aggregation
39+
configurations already defined in `StreamingConfig`.
40+
5. The output must be written in the same `PrecomputedOutput` form already
41+
consumed by the store and query engine.
42+
6. The design must stay simple enough to validate correctness end to end before
43+
adding more advanced features such as richer late-data policies or multi-stage
44+
aggregation.
45+
46+
### Scope of this PR
47+
48+
This PR delivers a pragmatic v1:
49+
50+
- single-process, multi-worker execution,
51+
- Prometheus remote write ingest,
52+
- store-backed output,
53+
- watermark-based window closing,
54+
- bounded per-series buffering,
55+
- best-effort handling of out-of-order data via a lateness threshold,
56+
- optional raw passthrough mode.
57+
58+
It does **not** try to solve every future concern yet. In particular, it does
59+
not add multi-stage aggregation, explicit late-data re-emission policies,
60+
cross-worker merge coordination, or pane-based sliding-window optimization.
61+
62+
## 2. Architecture
63+
64+
### High-level data flow
65+
66+
```text
67+
Prometheus Remote Write
68+
|
69+
v
70+
POST /api/v1/write (Axum)
71+
|
72+
v
73+
decode_prometheus_remote_write()
74+
|
75+
v
76+
group samples by series key
77+
|
78+
v
79+
SeriesRouter (xxhash(series_key) % num_workers)
80+
|
81+
+-------------------+-------------------+-------------------+
82+
| | | |
83+
v v v v
84+
Worker 0 Worker 1 Worker 2 Worker N-1
85+
| | | |
86+
| per-series buffer + per-aggregation active windows |
87+
+-------------------+-------------------+-------------------+
88+
|
89+
v
90+
OutputSink::emit_batch()
91+
|
92+
v
93+
Store
94+
|
95+
v
96+
Query Engine
97+
```
98+
99+
### Main components
100+
101+
#### `PrecomputeEngine` (`mod.rs`)
102+
103+
`PrecomputeEngine` is the top-level orchestrator. It:
104+
105+
- loads aggregation configs from `StreamingConfig`,
106+
- creates one bounded MPSC channel per worker,
107+
- builds a `SeriesRouter`,
108+
- spawns worker tasks,
109+
- starts the ingest HTTP server, and
110+
- starts a periodic flush loop.
111+
112+
The engine keeps the worker model intentionally simple: workers are symmetric,
113+
and routing is deterministic.
114+
115+
#### `SeriesRouter` (`series_router.rs`)
116+
117+
The router computes:
118+
119+
```text
120+
worker_idx = xxhash64(series_key) % num_workers
121+
```
122+
123+
This guarantees that all samples for one exact series key go to the same worker.
124+
That is the main design decision that keeps worker-local state lock-free.
125+
126+
#### `Worker` (`worker.rs`)
127+
128+
Each worker owns a shard of the series space. For each series it stores:
129+
130+
- a `SeriesBuffer`,
131+
- the previous watermark seen for that series,
132+
- one `AggregationState` per matching aggregation config.
133+
134+
Each `AggregationState` contains:
135+
136+
- the copied `AggregationConfig`,
137+
- a `WindowManager`,
138+
- a map of active window accumulators.
139+
140+
Workers receive `Samples`, `Flush`, and `Shutdown` messages. On samples, the
141+
worker inserts data into the series buffer, applies lateness filtering, updates
142+
active window accumulators, detects newly closed windows, and emits completed
143+
accumulators to the sink.
144+
145+
#### `SeriesBuffer` (`series_buffer.rs`)
146+
147+
The buffer stores timestamped samples per series in timestamp order and tracks a
148+
monotonic watermark. It is bounded by `max_buffer_per_series`, which prevents a
149+
single hot or stalled series from growing unbounded in memory.
150+
151+
#### `WindowManager` (`window_manager.rs`)
152+
153+
`WindowManager` encapsulates window boundary logic:
154+
155+
- map a timestamp to an aligned window start,
156+
- decide which windows became closed after watermark advancement,
157+
- return `[window_start, window_end)` bounds.
158+
159+
The current implementation supports both tumbling and slide-aligned window
160+
closure logic. Window close is driven by event-time watermark progression, not
161+
wall-clock time.
162+
163+
#### `AccumulatorUpdater` factory (`accumulator_factory.rs`)
164+
165+
Workers do not hardcode sketch logic. Instead, they construct accumulator
166+
updaters from the aggregation config. This keeps the precompute engine generic
167+
across supported aggregation types and lets it emit the same accumulator objects
168+
already used elsewhere in ASAPQuery.
169+
170+
#### `OutputSink` (`output_sink.rs`)
171+
172+
`OutputSink` separates computation from persistence. This PR ships three useful
173+
implementations:
174+
175+
- `StoreOutputSink` for normal precompute writes,
176+
- `RawPassthroughSink` for writing raw samples as `SumAccumulator`s,
177+
- `NoopOutputSink` for tests.
178+
179+
### Execution model
180+
181+
The execution model is:
182+
183+
1. Decode one remote-write request.
184+
2. Group samples by exact series key.
185+
3. Route each grouped batch to one worker.
186+
4. Process series state only on that worker.
187+
5. Emit completed windows in batches to the sink.
188+
189+
This design avoids per-sample cross-worker synchronization and keeps the first
190+
version operationally understandable.
191+
192+
## 3. Key Features Derived From the Requirements
193+
194+
### Deterministic per-series routing
195+
196+
Requirement: samples for one series must share local state.
197+
198+
Derived feature: the hash-based router always sends the same series key to the
199+
same worker. This means:
200+
201+
- no shared mutable state across workers for a given series,
202+
- no locking around per-series accumulators,
203+
- predictable ownership of series-local watermarks and buffers.
204+
205+
### Config-driven aggregation matching
206+
207+
Requirement: reuse aggregation definitions already present in the system.
208+
209+
Derived feature: each worker matches a series against the loaded
210+
`AggregationConfig`s and creates aggregation state only for the configs relevant
211+
to that series. The engine therefore stays driven by `StreamingConfig` instead
212+
of inventing a separate configuration model.
213+
214+
### Windowed precomputation with watermark closure
215+
216+
Requirement: emit queryable precomputed windows rather than raw streams only.
217+
218+
Derived feature: each aggregation uses a `WindowManager` to:
219+
220+
- align samples to windows,
221+
- detect when watermark movement closes a window,
222+
- emit `PrecomputedOutput` records with exact window bounds.
223+
224+
This gives the query engine stable window ranges to read later.
225+
226+
### Bounded memory for series-local state
227+
228+
Requirement: the engine must remain safe under continuous ingestion.
229+
230+
Derived feature: each series uses a bounded `SeriesBuffer`, and each worker uses
231+
bounded channels from the router. This does not solve every overload scenario,
232+
but it prevents the obvious unbounded growth cases in the v1 design.
233+
234+
### Optional raw passthrough mode
235+
236+
Requirement: support bring-up, debugging, and staged rollout.
237+
238+
Derived feature: when `pass_raw_samples=true`, the worker bypasses windowed
239+
aggregation and emits one `SumAccumulator` per sample. This is useful for
240+
testing the ingest-to-store plumbing independently from sketch behavior.
241+
242+
### Direct integration with the existing store and query engine
243+
244+
Requirement: the precompute path must fit ASAPQuery's existing runtime.
245+
246+
Derived feature: the standalone `precompute_engine` binary can be launched with:
247+
248+
- a `StreamingConfig`,
249+
- a store implementation,
250+
- an optional query HTTP server in the same process.
251+
252+
That makes the PR immediately testable end to end.
253+
254+
## 4. System Implementation Corner Cases
255+
256+
### Late and out-of-order samples
257+
258+
The current policy is intentionally simple:
259+
260+
- if `timestamp < watermark - allowed_lateness_ms`, the sample is dropped;
261+
- otherwise it is accepted.
262+
263+
This means the PR chooses predictability over replay complexity. There is no
264+
secondary path yet for re-opening or patching already emitted windows.
265+
266+
### Idle series and flush behavior
267+
268+
The engine has a periodic flush loop, but the current implementation does **not**
269+
advance watermarks on its own. As a result, a flush only emits windows that have
270+
become closable due to prior event-time progress. If a series stops receiving
271+
samples before a later sample advances the watermark, the worker does not invent
272+
time progress just because wall-clock time passed.
273+
274+
This is an important behavior boundary for this PR.
275+
276+
### Sliding-window semantics in v1
277+
278+
`WindowManager` understands slide intervals, and tests cover slide-aligned
279+
window closing. However, this PR keeps the worker update path simple: samples
280+
are placed into the accumulator keyed by `window_start_for(ts)`, and the design
281+
does not yet implement the more advanced pane-sharing or multi-window fan-out
282+
approach described in earlier discussion branches.
283+
284+
So the current PR establishes the reusable windowing abstraction first, while
285+
leaving richer sliding-window execution strategies for follow-up work.
286+
287+
### Cross-series aggregation across workers
288+
289+
Routing is based on the full series key, not on the final grouping key. That
290+
keeps ingestion simple, but it also means different source series that
291+
contribute to the same logical grouped result may be processed on different
292+
workers. This PR does not introduce a second-tier reduce stage; it relies on the
293+
existing downstream model of storing precomputed outputs and reading them later.
294+
295+
### Series-key parsing assumptions
296+
297+
Grouping-label extraction currently parses series keys in the expected Prometheus
298+
text form:
299+
300+
```text
301+
metric_name{label1="value1",label2="value2"}
302+
```
303+
304+
Missing grouping labels are converted to empty strings. This keeps the worker
305+
path robust, but it is worth documenting because output keys depend on this
306+
parsing behavior.
307+
308+
### Raw mode loses label-group semantics
309+
310+
In raw passthrough mode, the engine emits one point output per sample with
311+
`key=None`. That is acceptable for the intended debugging and plumbing use case,
312+
but it is deliberately not equivalent to fully configured grouped aggregation.
313+
314+
## 5. Examples
315+
316+
### Example 1: basic tumbling-window flow
317+
318+
Assume:
319+
320+
- metric: `fake_metric`
321+
- window size: 60 seconds
322+
- slide interval: 0 (tumbling)
323+
- one sample arrives at `t=12_000 ms`
324+
325+
The worker computes:
326+
327+
- `window_start = 0`
328+
- `window_end = 60_000`
329+
330+
The sample updates the active accumulator for window `[0, 60_000)`. Once the
331+
watermark later reaches at least `60_000`, the worker emits:
332+
333+
- `PrecomputedOutput(start=0, end=60_000, aggregation_id=...)`
334+
- the finished accumulator for that window
335+
336+
### Example 2: out-of-order sample handling
337+
338+
Assume:
339+
340+
- current series watermark is `100_000 ms`
341+
- `allowed_lateness_ms = 5_000`
342+
343+
Then:
344+
345+
- sample at `97_000 ms` is accepted,
346+
- sample at `94_999 ms` is dropped.
347+
348+
This keeps the lateness rule easy to reason about.
349+
350+
### Example 3: deterministic sharding
351+
352+
Assume two incoming series:
353+
354+
- `cpu_usage{host="a",job="node"}`
355+
- `cpu_usage{host="b",job="node"}`
356+
357+
The router hashes each full series key independently. Each series is assigned to
358+
one worker, and every later batch for that same series goes back to that same
359+
worker. The benefit is that each worker can maintain series-local state without
360+
coordination.
361+
362+
### Example 4: raw passthrough mode
363+
364+
If `pass_raw_samples=true` and a sample arrives:
365+
366+
```text
367+
series_key = fake_metric{instance="i1"}
368+
timestamp = 25_000
369+
value = 42.0
370+
```
371+
372+
The worker emits one point output immediately:
373+
374+
- `PrecomputedOutput(start=25_000, end=25_000, key=None, aggregation_id=raw_mode_aggregation_id)`
375+
- `SumAccumulator::with_sum(42.0)`
376+
377+
This mode is useful when validating the ingest path independently from
378+
windowed aggregation correctness.
379+
380+
## 6. Summary
381+
382+
PR #228 introduces the first integrated precompute engine inside
383+
`asap-query-engine`. The design deliberately favors a clear and testable v1:
384+
385+
- one process,
386+
- deterministic worker sharding,
387+
- config-driven accumulator creation,
388+
- watermark-based window emission,
389+
- direct store integration.
390+
391+
That foundation is the reason this PR is needed. It creates the runtime path
392+
that later PRs can extend with more sophisticated window execution, richer late
393+
data handling, and more advanced cross-worker aggregation strategies.

0 commit comments

Comments
 (0)