|
1 | 1 | #!/usr/bin/env bash |
2 | 2 | set -euo pipefail |
3 | 3 |
|
4 | | -# ingest_wait.sh — waits for the asap-demo Arroyo pipeline to reach RUNNING |
5 | | -# state, then sleeps to allow sketches to accumulate before verifying that the |
6 | | -# query engine has ingested data. |
| 4 | +# ingest_wait.sh — waits for the query engine to accumulate sketches before |
| 5 | +# benchmarking. With the precompute engine, the query engine begins computing |
| 6 | +# immediately on startup, so we just sleep and then verify data is present. |
7 | 7 |
|
8 | | -ARROYO_URL="http://localhost:5115/api/v1/pipelines" |
9 | 8 | QE_URL="http://localhost:8088/api/v1/query" |
10 | | -PIPELINE_NAME="asap-demo" |
11 | | -MAX_PIPELINE_WAIT=600 # seconds — Arroyo must compile Rust UDFs; allow extra time |
12 | | -ACCUMULATE_SLEEP=90 # seconds after pipeline is running |
13 | | -SLEEP=5 |
| 9 | +ACCUMULATE_SLEEP=90 # seconds for sketches to accumulate |
14 | 10 |
|
15 | | -# ── 1. Wait for asap-demo pipeline to reach RUNNING ───────────────────────── |
16 | | -echo "[ingest_wait] Waiting for Arroyo pipeline '${PIPELINE_NAME}' to reach RUNNING state ..." |
17 | | -elapsed=0 |
18 | | -while true; do |
19 | | - state=$(curl -sf --max-time 10 "${ARROYO_URL}" 2>/dev/null \ |
20 | | - | python3 -c " |
21 | | -import sys, json |
22 | | -try: |
23 | | - data = json.load(sys.stdin) |
24 | | - # 'data' key may be null when no pipelines exist; use 'or []' to handle that |
25 | | - pipelines = data if isinstance(data, list) else (data.get('data') or []) |
26 | | - for p in pipelines: |
27 | | - name = str(p.get('name') or p.get('id') or '') |
28 | | - # Normalise hyphens/underscores so 'asap-demo' matches 'asap_demo' |
29 | | - if '${PIPELINE_NAME}'.replace('-', '_') in name.replace('-', '_'): |
30 | | - state = p.get('state') |
31 | | - stop = p.get('stop', '') |
32 | | - # Arroyo signals a running pipeline via state=null and stop='none' |
33 | | - if state is None and stop == 'none': |
34 | | - print('Running') |
35 | | - elif state is not None: |
36 | | - print(str(state)) |
37 | | - else: |
38 | | - print('stopped') |
39 | | - break |
40 | | - else: |
41 | | - # No matching pipeline found yet — print nothing so caller retries |
42 | | - pass |
43 | | -except Exception: |
44 | | - pass |
45 | | -" 2>/dev/null || true) |
46 | | - |
47 | | - if [ "${state}" = "Running" ] || [ "${state}" = "RUNNING" ] || [ "${state}" = "running" ]; then |
48 | | - echo "[ingest_wait] Pipeline '${PIPELINE_NAME}' is RUNNING (${elapsed}s elapsed)" |
49 | | - break |
50 | | - fi |
51 | | - |
52 | | - if [ "${elapsed}" -ge "${MAX_PIPELINE_WAIT}" ]; then |
53 | | - echo "[ingest_wait] ERROR: Pipeline '${PIPELINE_NAME}' did not reach RUNNING within ${MAX_PIPELINE_WAIT}s (last state: '${state:-unknown}')" >&2 |
54 | | - # Dump pipeline list for diagnosis |
55 | | - echo "[ingest_wait] Current Arroyo pipeline list:" >&2 |
56 | | - curl -sf --max-time 10 "${ARROYO_URL}" 2>/dev/null | python3 -m json.tool 2>/dev/null >&2 || true |
57 | | - exit 1 |
58 | | - fi |
59 | | - |
60 | | - echo "[ingest_wait] Pipeline state: '${state:-not found}' — retrying in ${SLEEP}s (${elapsed}s elapsed) ..." |
61 | | - sleep "${SLEEP}" |
62 | | - elapsed=$(( elapsed + SLEEP )) |
63 | | -done |
64 | | - |
65 | | -# ── 2. Allow sketches to accumulate ───────────────────────────────────────── |
66 | | -echo "[ingest_wait] Pipeline running. Sleeping ${ACCUMULATE_SLEEP}s for sketches to accumulate ..." |
| 11 | +# ── 1. Allow sketches to accumulate ───────────────────────────────────────── |
| 12 | +echo "[ingest_wait] Sleeping ${ACCUMULATE_SLEEP}s for sketches to accumulate ..." |
67 | 13 | sleep "${ACCUMULATE_SLEEP}" |
68 | 14 |
|
69 | | -# ── 3. Verify query engine has data ───────────────────────────────────────── |
| 15 | +# ── 2. Verify query engine has data ───────────────────────────────────────── |
70 | 16 | echo "[ingest_wait] Verifying query engine has data ..." |
71 | 17 | response=$(curl -sf --max-time 10 \ |
72 | 18 | "${QE_URL}?query=avg%28sensor_reading%29" 2>/dev/null || true) |
|
0 commit comments