Skip to content

Commit 9ee8f6d

Browse files
fixed some issues from review
1 parent 1f0dec7 commit 9ee8f6d

5 files changed

Lines changed: 22 additions & 42 deletions

File tree

asap-tools/experiments/experiment_only_ingest_path.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -289,16 +289,7 @@ def main(cfg: DictConfig):
289289
f"http://localhost:{prometheus_service.get_query_endpoint_port()}"
290290
)
291291

292-
print("Waiting for Prometheus to become ready...")
293-
prometheus_ready_timeout = 60
294-
prometheus_ready_start = time.time()
295-
while not prometheus_service.is_healthy():
296-
if time.time() - prometheus_ready_start > prometheus_ready_timeout:
297-
raise RuntimeError(
298-
f"Prometheus did not become ready within {prometheus_ready_timeout}s"
299-
)
300-
time.sleep(2)
301-
print("Prometheus is ready.")
292+
prometheus_service.wait_until_ready()
302293

303294
label_discovery_wait = prometheus_scrape_interval * 2
304295
print(

asap-tools/experiments/experiment_run_e2e.py

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -342,17 +342,7 @@ def main(cfg: DictConfig):
342342
# Poll until Prometheus is actually accepting connections before sleeping
343343
# for scrape data. Prometheus takes a few seconds to bind the port after
344344
# its process starts, so a fixed sleep alone can race.
345-
print("Waiting for Prometheus to become ready...")
346-
prometheus_ready_timeout = 60
347-
prometheus_ready_start = time.time()
348-
while not prometheus_service.is_healthy():
349-
if time.time() - prometheus_ready_start > prometheus_ready_timeout:
350-
raise RuntimeError(
351-
"Prometheus did not become ready within "
352-
f"{prometheus_ready_timeout}s"
353-
)
354-
time.sleep(2)
355-
print("Prometheus is ready.")
345+
prometheus_service.wait_until_ready()
356346

357347
# Wait for two scrape intervals so Prometheus has series to return.
358348
label_discovery_wait = prometheus_scrape_interval * 2
@@ -484,9 +474,7 @@ def main(cfg: DictConfig):
484474
pipeline_id=arroyosketch_pipeline_id,
485475
experiment_output_dir=experiment_output_dir,
486476
)
487-
elif args.streaming_engine == "precompute":
488-
pass # precompute engine handles sketch computation internally; no external pipeline needed
489-
else:
477+
elif args.streaming_engine not in ("precompute",):
490478
raise ValueError(
491479
"Invalid streaming engine: {}. Supported engines are 'flink', 'arroyo', and 'precompute'".format(
492480
args.streaming_engine

asap-tools/experiments/experiment_run_grafana_demo.py

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -350,16 +350,7 @@ def main(cfg: DictConfig):
350350
f"http://localhost:{prometheus_service.get_query_endpoint_port()}"
351351
)
352352

353-
print("Waiting for Prometheus to become ready...")
354-
prometheus_ready_timeout = 60
355-
prometheus_ready_start = time.time()
356-
while not prometheus_service.is_healthy():
357-
if time.time() - prometheus_ready_start > prometheus_ready_timeout:
358-
raise RuntimeError(
359-
f"Prometheus did not become ready within {prometheus_ready_timeout}s"
360-
)
361-
time.sleep(2)
362-
print("Prometheus is ready.")
353+
prometheus_service.wait_until_ready()
363354

364355
label_discovery_wait = prometheus_scrape_interval * 2
365356
print(
@@ -448,9 +439,7 @@ def main(cfg: DictConfig):
448439
pipeline_id=arroyosketch_pipeline_id,
449440
experiment_output_dir=experiment_output_dir,
450441
)
451-
elif args.streaming_engine == "precompute":
452-
pass # precompute engine handles sketch computation internally; no external pipeline needed
453-
else:
442+
elif args.streaming_engine not in ("precompute",):
454443
raise ValueError(
455444
"Invalid streaming engine: {}. Supported engines are 'flink', 'arroyo', and 'precompute'".format(
456445
args.streaming_engine

asap-tools/experiments/experiment_utils/services/base.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,18 @@ def is_healthy(self) -> bool:
5757
"""
5858
return True
5959

60+
def wait_until_ready(self, timeout: int = 60) -> None:
61+
"""Block until is_healthy() returns True, or raise RuntimeError on timeout."""
62+
print(f"Waiting for {self.__class__.__name__} to become ready...")
63+
start = time.time()
64+
while not self.is_healthy():
65+
if time.time() - start > timeout:
66+
raise RuntimeError(
67+
f"{self.__class__.__name__} did not become ready within {timeout}s"
68+
)
69+
time.sleep(2)
70+
print(f"{self.__class__.__name__} is ready.")
71+
6072
def restart(self, **kwargs) -> None:
6173
"""
6274
Restart the service. Default implementation stops then starts.

asap-tools/experiments/experiment_utils/services/query_engine.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import yaml
99

1010
import constants
11+
import utils
1112
from .base import BaseService
1213
from experiment_utils.providers.base import InfrastructureProvider
1314

@@ -126,8 +127,9 @@ def _build_engine_config(
126127
"forward_unsupported_queries": forward_unsupported_queries,
127128
}
128129

129-
# Ingest config depends on the streaming engine
130-
if streaming_engine == "arroyo":
130+
# Ingest config depends on the streaming engine.
131+
# Both flink and arroyo produce to the same Kafka topic.
132+
if streaming_engine in ("arroyo", "flink"):
131133
ingest: dict = {
132134
"type": "kafka",
133135
"broker": kafka_broker,
@@ -143,7 +145,7 @@ def _build_engine_config(
143145
else:
144146
raise ValueError(
145147
f"streaming_engine='{streaming_engine}' is not supported by the Rust query engine. "
146-
"Use 'arroyo' or 'precompute'."
148+
"Use 'flink', 'arroyo', or 'precompute'."
147149
)
148150

149151
return {
@@ -179,8 +181,6 @@ def _write_engine_config_to_remote(
179181
local_path: Local path to write the YAML file to
180182
remote_path: Absolute path on the remote node where the file should land
181183
"""
182-
import utils # top-level module in the experiments/ tree
183-
184184
os.makedirs(os.path.dirname(local_path), exist_ok=True)
185185
config_yaml = yaml.dump(
186186
config_dict, default_flow_style=False, allow_unicode=True

0 commit comments

Comments
 (0)