-
-
Notifications
You must be signed in to change notification settings - Fork 63
feat(clickhouse): Add clickhouse-connect (HTTP) driver behind a runtime config [EAP-497] #8060
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
747936a
925c465
15c08f2
2cfaab9
0635e19
740897e
1e8affa
2bf6855
0500c2f
e124ee5
5bf8da9
a87be31
d8cd5e4
778ccc9
8ecbb7f
fac0556
30915ab
d88912b
1c28b61
69af580
830b6f1
348135c
b144516
4d788a1
3f7d05c
c485a37
58ac330
d080300
9a71608
fe24fb7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,3 +18,4 @@ gocd/templates/vendor/ | |
| gocd/generated-pipelines/ | ||
| Brewfile.lock.json | ||
| .zed/ | ||
| dump.rdb | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,7 +7,13 @@ | |
|
|
||
| from snuba import settings | ||
| from snuba.clickhouse.native import ClickhousePool | ||
| from snuba.clusters.cluster import ClickhouseClientSettings, ClickhouseCluster | ||
| from snuba.clusters.cluster import ( | ||
| ClickhouseClientSettings, | ||
| ClickhouseCluster, | ||
| ClickhouseNode, | ||
| connection_cache, | ||
| use_clickhouse_connect_driver, | ||
| ) | ||
| from snuba.datasets.storage import ReadableTableStorage | ||
| from snuba.datasets.storages.factory import get_storage | ||
| from snuba.datasets.storages.storage_key import StorageKey | ||
|
|
@@ -43,7 +49,7 @@ def is_valid_node(host: str, port: int, cluster: ClickhouseCluster, storage_name | |
| }, | ||
| ) | ||
|
|
||
| return any(node.host_name == host and node.port == port for node in nodes) | ||
| return any(node.host_name == host and node.native_port == port for node in nodes) | ||
|
|
||
|
|
||
| def _get_storage(storage_name: str) -> ReadableTableStorage: | ||
|
|
@@ -71,7 +77,7 @@ def _validate_node( | |
| "host": clickhouse_host, | ||
| "port": clickhouse_port, | ||
| "query_host": cluster.get_query_node().host_name, | ||
| "query_port": cluster.get_query_node().port, | ||
| "query_port": cluster.get_query_node().native_port, | ||
| }, | ||
| ) | ||
|
|
||
|
|
@@ -89,24 +95,38 @@ def _build_validated_pool( | |
| password: str, | ||
| client_settings: ClickhouseClientSettings, | ||
| ) -> ClickhousePool: | ||
| # Single chokepoint for admin ClickhousePool construction. ClickhousePool | ||
| # ships the user/password in the first hello packet of the native protocol, | ||
| # so an unvalidated host means credentials reach whatever listener answers. | ||
| # All admin helpers must go through here — never call ClickhousePool | ||
| # directly from this module. The regression test | ||
| # test_no_direct_clickhouse_pool_construction_in_admin enforces this. | ||
| # Single chokepoint for admin ClickhousePool acquisition. A pool ships the | ||
| # user/password to the node (the native protocol's first hello packet, or | ||
| # the HTTP auth header), so an unvalidated host means credentials reach | ||
| # whatever listener answers. All admin helpers must go through here — never | ||
| # acquire a pool from the connection cache directly in this module. The | ||
| # regression test test_no_direct_clickhouse_pool_construction_in_admin | ||
| # enforces this. | ||
| _validate_node(clickhouse_host, clickhouse_port, cluster, storage_name) | ||
| return ClickhousePool( | ||
| clickhouse_host, | ||
| clickhouse_port, | ||
| # Go through the shared connection cache so the driver (native vs | ||
| # clickhouse-connect/HTTP) is selected by the runtime config, behind the | ||
| # abstract ClickhousePool type, just like the cluster's own connections. | ||
| return connection_cache.get_node_connection( | ||
| client_settings, | ||
| ClickhouseNode(clickhouse_host, clickhouse_port, http_port=cluster.get_http_port()), | ||
| username, | ||
| password, | ||
| database, | ||
| max_pool_size=2, | ||
| client_settings=client_settings.value.settings, | ||
| secure=False, | ||
| ca_certs=None, | ||
| verify=False, | ||
| ) | ||
|
|
||
|
|
||
| def _driver_cache_token() -> str: | ||
| # Part of the admin connection cache keys so that flipping the | ||
| # use_clickhouse_connect_driver runtime flag re-resolves admin connections | ||
| # to the new driver, instead of returning a pool pinned to whichever driver | ||
| # was active when the entry was first cached. This keeps admin traffic | ||
| # switchable at runtime, like the cluster query/reader paths. | ||
| return "connect" if use_clickhouse_connect_driver() else "native" | ||
|
|
||
|
|
||
| def get_ro_node_connection( | ||
| clickhouse_host: str, | ||
| clickhouse_port: int, | ||
|
|
@@ -115,7 +135,7 @@ def get_ro_node_connection( | |
| ) -> ClickhousePool: | ||
| storage = _get_storage(storage_name) | ||
|
|
||
| key = f"{storage.get_storage_key()}-{clickhouse_host}" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Admin pool size limit removedMedium Severity Admin ClickHouse connections no longer cap the native pool at two connections. Reviewed by Cursor Bugbot for commit 9a71608. Configure here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is intentional, not an oversight. Earlier in the PR the admin helper built its own Admin is low-traffic (human operators running system queries / tracing), so sharing the standard sizing is acceptable, and pool size is now runtime-tunable via Generated by Claude Code |
||
| key = f"{storage.get_storage_key()}-{clickhouse_host}-{_driver_cache_token()}" | ||
| if key in NODE_CONNECTIONS: | ||
| return NODE_CONNECTIONS[key] | ||
|
|
||
|
|
@@ -162,8 +182,9 @@ def get_ro_node_connection( | |
| def get_ro_query_node_connection( | ||
| storage_name: str, client_settings: ClickhouseClientSettings | ||
| ) -> ClickhousePool: | ||
| if storage_name in CLUSTER_CONNECTIONS: | ||
| return CLUSTER_CONNECTIONS[storage_name] | ||
| key = f"{storage_name}-{_driver_cache_token()}" | ||
| if key in CLUSTER_CONNECTIONS: | ||
| return CLUSTER_CONNECTIONS[key] | ||
|
|
||
| storage = _get_storage(storage_name) | ||
| cluster = storage.get_cluster() | ||
|
|
@@ -172,7 +193,7 @@ def get_ro_query_node_connection( | |
| connection_id.hostname, connection_id.tcp_port, storage_name, client_settings | ||
| ) | ||
|
|
||
| CLUSTER_CONNECTIONS[storage_name] = connection | ||
| CLUSTER_CONNECTIONS[key] = connection | ||
| return connection | ||
|
|
||
|
|
||
|
|
@@ -184,7 +205,7 @@ def get_sudo_node_connection( | |
| ) -> ClickhousePool: | ||
| storage = _get_storage(storage_name) | ||
|
|
||
| key = f"{storage.get_storage_key()}-{clickhouse_host}-sudo" | ||
| key = f"{storage.get_storage_key()}-{clickhouse_host}-sudo-{_driver_cache_token()}" | ||
| if key in NODE_CONNECTIONS: | ||
| return NODE_CONNECTIONS[key] | ||
|
|
||
|
|
@@ -216,7 +237,7 @@ def get_clusterless_node_connection( | |
| cluster = storage.get_cluster() | ||
| database = cluster.get_database() | ||
|
|
||
| key = f"{storage.get_storage_key()}-{clickhouse_host}-clusterless-{database}" | ||
| key = f"{storage.get_storage_key()}-{clickhouse_host}-clusterless-{database}-{_driver_cache_token()}" | ||
| if key in NODE_CONNECTIONS: | ||
| return NODE_CONNECTIONS[key] | ||
|
|
||
|
|
@@ -245,7 +266,7 @@ def get_ro_clusterless_node_connection( | |
| cluster = storage.get_cluster() | ||
| database = cluster.get_database() | ||
|
|
||
| key = f"{storage.get_storage_key()}-{clickhouse_host}-clusterless-ro-{database}" | ||
| key = f"{storage.get_storage_key()}-{clickhouse_host}-clusterless-ro-{database}-{_driver_cache_token()}" | ||
| if key in NODE_CONNECTIONS: | ||
| return NODE_CONNECTIONS[key] | ||
|
|
||
|
|
||


Uh oh!
There was an error while loading. Please reload this page.