Skip to content

Commit c33fb5a

Browse files
mode updates
1 parent fd52156 commit c33fb5a

13 files changed

Lines changed: 49 additions & 464 deletions

asap-tools/experiments/CONFIG_PARAMETERS_REFERENCE.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,9 @@ These parameters must be provided for all experiment scripts:
136136

137137
#### `streaming.engine` (string, optional)
138138
- **Description**: Which streaming engine to use
139-
- **Default**: `"flink"`
140-
- **Choices**: `"flink"`, `"arroyo"`
141-
- **Example**: `"arroyo"`
139+
- **Default**: `"precompute"`
140+
- **Choices**: `"flink"`, `"arroyo"`, `"precompute"`
141+
- **Example**: `"precompute"`
142142
- **Usage**: Selects streaming processing framework
143143

144144
#### `streaming.flink_input_format` (string, optional)

asap-tools/experiments/HYDRA_CONFIG_USAGE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ flow.no_teardown: true/false
5252
flow.steady_state_wait: 300 # seconds
5353

5454
# Streaming engine
55-
streaming.engine: flink|arroyo
55+
streaming.engine: flink|arroyo|precompute
5656
streaming.flink_input_format: json|avro-json|avro-binary
5757
streaming.flink_output_format: json|byte
5858
streaming.enable_object_reuse: true/false

asap-tools/experiments/config/config.yaml

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,7 @@ fake_exporter_language: "rust" # choices: ["python", "rust"]
7878
# Cluster data exporter configuration
7979
cluster_data_directory: "/data/cluster_traces" # Path to directory containing Google/Alibaba cluster trace data
8080

81-
# Query engine language
82-
query_engine_language: "rust" # choices: ["python", "rust"]
83-
84-
# Query language (SQL vs PROMQL) - only used by Rust query engine
81+
# Query language (SQL vs PROMQL)
8582
query_language: "PROMQL" # choices: ["SQL", "PROMQL"]
8683

8784
# Query engine options
@@ -120,7 +117,7 @@ prometheus_client:
120117

121118
# Container deployment settings
122119
use_container:
123-
query_engine: true # QueryEngineService - containerized query engine
120+
query_engine: true # QueryEngineRustService - containerized query engine
124121
arroyo: true # ArroyoService - containerized Arroyo streaming engine
125122
controller: true # ControllerService - containerized controller
126123
fake_exporter: true # ExporterServiceFactory - containerized fake exporters

asap-tools/experiments/constants.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@
1414
FLINK_OUTPUT_TOPIC = "flink_output"
1515
KAFKA_BROKER = "localhost:9092"
1616

17-
QUERY_ENGINE_PY_PROCESS_KEYWORD = "main_query_engine.py"
1817
QUERY_ENGINE_RS_PROCESS_KEYWORD = "query_engine_rust"
19-
QUERY_ENGINE_PY_CONTAINER_NAME = "sketchdb-queryengine"
2018
QUERY_ENGINE_RS_CONTAINER_NAME = "sketchdb-queryengine-rust"
2119

2220
ARROYO_IMAGE = "ghcr.io/projectasap/asap-arroyo:v0.1.0"

asap-tools/experiments/experiment_only_ingest_path.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -242,12 +242,16 @@ def main(cfg: DictConfig):
242242
"controller_client_configs",
243243
f"{experiment_mode}.yaml",
244244
)
245+
prometheus_url = (
246+
f"http://localhost:{prometheus_service.get_query_endpoint_port()}"
247+
)
245248
controller_service.start(
246249
controller_input_file=controller_client_config,
247250
prometheus_scrape_interval=prometheus_scrape_interval,
248251
streaming_engine=args.streaming_engine,
249252
controller_remote_output_dir=CONTROLLER_REMOTE_OUTPUT_DIR,
250253
punting=args.controller_punting,
254+
prometheus_url=prometheus_url,
251255
)
252256
sync.rsync_controller_config_remote_to_local(
253257
provider,
@@ -257,19 +261,21 @@ def main(cfg: DictConfig):
257261
)
258262

259263
# Start Kafka
260-
kafka_service.start()
261-
kafka_service.wait_until_ready()
262-
kafka_service.delete_topics()
263-
kafka_service.create_topics()
264+
if args.streaming_engine != "precompute":
265+
kafka_service.start()
266+
kafka_service.wait_until_ready()
267+
kafka_service.delete_topics()
268+
kafka_service.create_topics()
264269

265270
# Start Arroyo
266-
arroyo_service.stop()
267-
time.sleep(10)
268-
arroyo_service.start(
269-
experiment_output_dir=experiment_output_dir,
270-
remote_write_base_port=args.remote_write_base_port,
271-
parallelism=args.parallelism,
272-
)
271+
if args.streaming_engine != "precompute":
272+
arroyo_service.stop()
273+
time.sleep(10)
274+
arroyo_service.start(
275+
experiment_output_dir=experiment_output_dir,
276+
remote_write_base_port=args.remote_write_base_port,
277+
parallelism=args.parallelism,
278+
)
273279

274280
# Start fake exporter if configured
275281
if config.check_exporter_and_queries_exist("fake_exporter", cfg.experiment_params):
@@ -317,7 +323,7 @@ def main(cfg: DictConfig):
317323
prometheus_service.start(experiment_output_dir)
318324

319325
# Start V2-specific: Run ArroyoSketch pipeline
320-
if is_v2:
326+
if is_v2 and args.streaming_engine != "precompute":
321327
print("Starting ArroyoSketch pipeline...")
322328
arroyosketch_pipeline_id = arroyo_service.run_arroyosketch(
323329
experiment_name=args.experiment_name,
@@ -405,8 +411,9 @@ def main(cfg: DictConfig):
405411
if arroyosketch_pipeline_id:
406412
arroyo_service.stop_arroyosketch(arroyosketch_pipeline_id)
407413
arroyo_service.stop()
408-
kafka_service.delete_topics()
409-
kafka_service.stop()
414+
if args.streaming_engine != "precompute":
415+
kafka_service.delete_topics()
416+
kafka_service.stop()
410417
controller_service.stop()
411418

412419
# Stop core services

asap-tools/experiments/experiment_run_e2e.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from experiment_utils.services import (
1313
KafkaService,
1414
FlinkService,
15-
QueryEngineServiceFactory,
15+
QueryEngineRustService,
1616
ExporterServiceFactory,
1717
PrometheusKafkaAdapterService,
1818
ArroyoService,
@@ -135,9 +135,7 @@ def main(cfg: DictConfig):
135135
# Initialize services
136136
kafka_service = KafkaService(provider, args.node_offset, num_tries=KAFKA_NUM_TRIES)
137137
flink_service = FlinkService(provider, args.node_offset)
138-
# Initialize query engine service based on language
139-
query_engine_service = QueryEngineServiceFactory.create_query_engine_service(
140-
args.query_engine_language,
138+
query_engine_service = QueryEngineRustService(
141139
provider,
142140
use_container=args.use_container_query_engine,
143141
node_offset=args.node_offset,

asap-tools/experiments/experiment_run_grafana_demo.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from experiment_utils.providers.factory import create_provider
1212
from experiment_utils.services import (
1313
KafkaService,
14-
QueryEngineServiceFactory,
14+
QueryEngineRustService,
1515
ExporterServiceFactory,
1616
ArroyoService,
1717
ArroyoThroughputMonitor,
@@ -120,9 +120,7 @@ def main(cfg: DictConfig):
120120

121121
# Initialize services
122122
kafka_service = KafkaService(provider, args.node_offset, num_tries=KAFKA_NUM_TRIES)
123-
# Initialize query engine service based on language
124-
query_engine_service = QueryEngineServiceFactory.create_query_engine_service(
125-
args.query_engine_language,
123+
query_engine_service = QueryEngineRustService(
126124
provider,
127125
use_container=args.use_container_query_engine,
128126
node_offset=args.node_offset,
@@ -289,23 +287,28 @@ def main(cfg: DictConfig):
289287

290288
# copy_controller_client_config(args.controller_client_config, local_experiment_dir)
291289
if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME:
290+
prometheus_url = (
291+
f"http://localhost:{prometheus_service.get_query_endpoint_port()}"
292+
)
292293
controller_service.start(
293294
controller_input_file=controller_client_config,
294295
prometheus_scrape_interval=prometheus_scrape_interval,
295296
streaming_engine=args.streaming_engine,
296297
controller_remote_output_dir=CONTROLLER_REMOTE_OUTPUT_DIR,
297298
punting=args.controller_punting,
299+
prometheus_url=prometheus_url,
298300
)
299301
sync.rsync_controller_config_remote_to_local(
300302
provider,
301303
CONTROLLER_REMOTE_OUTPUT_DIR,
302304
CONTROLLER_LOCAL_OUTPUT_DIR,
303305
node_offset=args.node_offset,
304306
)
305-
kafka_service.start()
306-
kafka_service.wait_until_ready()
307-
kafka_service.delete_topics()
308-
kafka_service.create_topics()
307+
if args.streaming_engine != "precompute":
308+
kafka_service.start()
309+
kafka_service.wait_until_ready()
310+
kafka_service.delete_topics()
311+
kafka_service.create_topics()
309312

310313
if config.check_exporter_and_queries_exist("fake_exporter", cfg.experiment_params):
311314
# this DOES NOT block

asap-tools/experiments/experiment_teardown_everything.py

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from experiment_utils.services import (
1919
KafkaService,
2020
FlinkService,
21-
QueryEngineServiceFactory,
21+
QueryEngineRustService,
2222
ExporterServiceFactory,
2323
PrometheusKafkaAdapterService,
2424
ArroyoService,
@@ -72,22 +72,11 @@ def main(cfg: DictConfig):
7272
kafka_service = KafkaService(provider, args.node_offset, num_tries=KAFKA_NUM_TRIES)
7373
flink_service = FlinkService(provider, args.node_offset)
7474

75-
# Initialize both query engine languages
76-
query_engine_service_rust = QueryEngineServiceFactory.create_query_engine_service(
77-
"rust", provider, use_container=True, node_offset=args.node_offset
78-
)
79-
query_engine_service_python = QueryEngineServiceFactory.create_query_engine_service(
80-
"python", provider, use_container=True, node_offset=args.node_offset
81-
)
82-
query_engine_service_rust_native = (
83-
QueryEngineServiceFactory.create_query_engine_service(
84-
"rust", provider, use_container=False, node_offset=args.node_offset
85-
)
75+
query_engine_service_container = QueryEngineRustService(
76+
provider, use_container=True, node_offset=args.node_offset
8677
)
87-
query_engine_service_python_native = (
88-
QueryEngineServiceFactory.create_query_engine_service(
89-
"python", provider, use_container=False, node_offset=args.node_offset
90-
)
78+
query_engine_service_native = QueryEngineRustService(
79+
provider, use_container=False, node_offset=args.node_offset
9180
)
9281

9382
system_exporters_service = SystemExportersService(
@@ -176,10 +165,8 @@ def main(cfg: DictConfig):
176165
("Prometheus Client (container)", prometheus_client_service_container),
177166
("Prometheus Client (native)", prometheus_client_service_native),
178167
("Remote Monitor", remote_monitor_service),
179-
("Query Engine Rust (container)", query_engine_service_rust),
180-
("Query Engine Python (container)", query_engine_service_python),
181-
("Query Engine Rust (native)", query_engine_service_rust_native),
182-
("Query Engine Python (native)", query_engine_service_python_native),
168+
("Query Engine (container)", query_engine_service_container),
169+
("Query Engine (native)", query_engine_service_native),
183170
("Kafka", kafka_service),
184171
("Prometheus-Kafka Adapter", prometheus_kafka_adapter_service),
185172
("System Exporters", system_exporters_service),

asap-tools/experiments/experiment_utils/config.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -413,9 +413,6 @@ def __init__(self, cfg: DictConfig):
413413
# Fake exporter language
414414
self.fake_exporter_language = cfg.fake_exporter_language
415415

416-
# Query engine language
417-
self.query_engine_language = cfg.query_engine_language
418-
419416
# Query language (SQL vs PROMQL) - only used by Rust query engine
420417
self.query_language = cfg.query_language
421418

@@ -511,17 +508,6 @@ def validate_config(cfg: DictConfig, script_name: str = "experiment_run_e2e"):
511508
f"Valid options: {valid_policies}"
512509
)
513510

514-
# Validate Python query engine only supports no_cleanup
515-
if (
516-
hasattr(cfg, "query_engine_language")
517-
and cfg.query_engine_language == "python"
518-
and policy != "no_cleanup"
519-
):
520-
raise ValueError(
521-
f"aggregate_cleanup.policy='{policy}' is not supported by the Python query engine. "
522-
"Either use query_engine_language='rust' or set aggregate_cleanup.policy='no_cleanup'"
523-
)
524-
525511

526512
def generate_and_copy_prometheus_config(
527513
num_nodes_in_experiment,

asap-tools/experiments/experiment_utils/services/__init__.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99
from .kafka import KafkaService
1010
from .flink import FlinkService
1111
from .query_engine import (
12-
QueryEngineService,
1312
QueryEngineRustService,
14-
QueryEngineServiceFactory,
1513
)
1614
from .monitoring import MonitoringService
1715
from .fake_exporters import (
@@ -131,9 +129,7 @@ def create_prometheus_service(cfg, provider, num_nodes: int, node_offset: int):
131129
"DockerServiceBase",
132130
"KafkaService",
133131
"FlinkService",
134-
"QueryEngineService",
135132
"QueryEngineRustService",
136-
"QueryEngineServiceFactory",
137133
"MonitoringService",
138134
"ExporterServiceFactory",
139135
"PythonExporterService",

0 commit comments

Comments
 (0)