Skip to content

Commit 6a21faa

Browse files
authored
feat(query): add RAP definition cache (#19757)
* feat(query): add RAP definition cache Add a query-node local row access policy definition cache keyed by (tenant, policy_id) to avoid repeated meta-store RPCs during bind. The cache uses DashMap<(Tenant, u64), Arc<OnceCell<_>>> so cache hits stay cheap and concurrent misses on the same key share one in-flight load. Errors are not cached. Also add sqllogictest coverage for attach/detach/re-attach cycles and a periodic background cleanup task to reclaim orphaned entries left on remote nodes after DROP ROW ACCESS POLICY. * chore(test): remove redundant RAP definition cache tests The RAP definition cache is keyed by (tenant, policy_id) where policy_id is a monotonic meta-store sequence number. Cache correctness relies on: 1. Immutability: a given policy_id always maps to the same definition, so a cached entry can never become stale while the policy exists. 2. Invalidation on drop: the DROP ROW ACCESS POLICY interpreter calls invalidate(tenant, policy_id), removing the entry on the node that executes the DDL. 3. Bounded staleness on remote nodes: a background task clears the entire cache every 5 minutes, so remote nodes converge within one cleanup interval after a policy is dropped. Given these properties, the removed tests add no unique coverage: - The inline unit test only verified DashMap::remove semantics. - The sqllogictest (05_0016) duplicated attach/detach/re-attach cycles already covered by 05_0004_ddl_security_policy.test, and its consecutive-query approach cannot observably confirm a cache hit (unlike 05_0011 which uses EXPLAIN to verify result cache behavior).
1 parent 9ec2499 commit 6a21faa

7 files changed

Lines changed: 179 additions & 14 deletions

File tree

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/ee_features/row_access_policy/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@ edition = { workspace = true }
99

1010
[dependencies]
1111
async-trait = { workspace = true }
12+
dashmap = { workspace = true }
1213
databend-common-base = { workspace = true }
1314
databend-common-exception = { workspace = true }
1415
databend-common-meta-app = { workspace = true }
1516
databend-common-meta-store = { workspace = true }
1617
databend-meta-client = { workspace = true }
18+
log = { workspace = true }
19+
tokio = { workspace = true }
1720

1821
[build-dependencies]
1922

src/query/ee_features/row_access_policy/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
pub mod row_access_policy_cache;
1516
pub mod row_access_policy_handler;
1617

18+
pub use row_access_policy_cache::RowAccessPolicyCacheManager;
1719
pub use row_access_policy_handler::RowAccessPolicyHandler;
1820
pub use row_access_policy_handler::RowAccessPolicyHandlerWrapper;
1921
pub use row_access_policy_handler::get_row_access_policy_handler;
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
use std::time::Duration;
17+
18+
use dashmap::DashMap;
19+
use databend_common_base::base::GlobalInstance;
20+
use databend_common_exception::Result;
21+
use databend_common_meta_app::row_access_policy::RowAccessPolicyMeta;
22+
use databend_common_meta_app::tenant::Tenant;
23+
use databend_common_meta_store::MetaStore;
24+
use databend_meta_client::types::SeqV;
25+
use log::debug;
26+
use log::info;
27+
use tokio::sync::OnceCell;
28+
29+
use crate::row_access_policy_handler::get_row_access_policy_handler;
30+
31+
/// Background cleanup interval for orphaned cache entries.
32+
/// Entries for dropped policies on remote nodes are never accessed again,
33+
/// so periodic full clear is the simplest way to bound memory growth.
34+
const RAP_CACHE_CLEANUP_INTERVAL: Duration = Duration::from_secs(300);
35+
36+
/// Cache key: (tenant, policy_id).
37+
type CacheKey = (Tenant, u64);
38+
39+
/// Each entry is an `Arc<OnceCell<...>>` so that concurrent misses on the same
40+
/// key only trigger one meta-store RPC (singleflight).
41+
///
42+
/// `OnceCell::get_or_try_init` will NOT persist an `Err` — if the first caller
43+
/// fails, the cell stays empty and the next caller retries.
44+
type CacheEntry = Arc<OnceCell<SeqV<RowAccessPolicyMeta>>>;
45+
46+
pub struct RowAccessPolicyCacheManager {
47+
cache: DashMap<CacheKey, CacheEntry>,
48+
}
49+
50+
impl RowAccessPolicyCacheManager {
51+
fn new() -> Self {
52+
Self {
53+
cache: DashMap::new(),
54+
}
55+
}
56+
57+
pub fn init() -> Result<()> {
58+
let manager = Arc::new(Self::new());
59+
GlobalInstance::set(manager.clone());
60+
61+
// Background task to reclaim orphaned entries.
62+
// On remote nodes, entries for dropped policies are never looked up
63+
// again (policy_id is monotonic), so periodic clear is the only way
64+
// to reclaim them.
65+
databend_common_base::runtime::spawn(async move {
66+
loop {
67+
tokio::time::sleep(RAP_CACHE_CLEANUP_INTERVAL).await;
68+
let count = manager.cache.len();
69+
if count > 0 {
70+
manager.cache.clear();
71+
debug!(
72+
"row_access_policy_cache: cleared {} orphaned/stale entries",
73+
count
74+
);
75+
}
76+
}
77+
});
78+
79+
Ok(())
80+
}
81+
82+
pub fn instance() -> Arc<RowAccessPolicyCacheManager> {
83+
GlobalInstance::get()
84+
}
85+
86+
/// Synchronous fast path — returns `Some` only when the definition is
87+
/// already cached. The binder calls this first to avoid `block_on`
88+
/// overhead on cache hits.
89+
pub fn try_get(&self, tenant: &Tenant, policy_id: u64) -> Option<SeqV<RowAccessPolicyMeta>> {
90+
let key = (tenant.clone(), policy_id);
91+
self.cache.get(&key).and_then(|cell| cell.get().cloned())
92+
}
93+
94+
/// Invalidate one cached definition when the underlying RAP is dropped.
95+
pub fn invalidate(&self, tenant: &Tenant, policy_id: u64) {
96+
let key = (tenant.clone(), policy_id);
97+
self.cache.remove(&key);
98+
}
99+
100+
/// Async path — used on cache miss. Guarantees singleflight: concurrent
101+
/// callers for the same `(tenant, policy_id)` share one in-flight RPC.
102+
/// Errors are never cached.
103+
pub async fn get_or_load(
104+
&self,
105+
meta_api: Arc<MetaStore>,
106+
tenant: &Tenant,
107+
policy_id: u64,
108+
) -> Result<SeqV<RowAccessPolicyMeta>> {
109+
let key = (tenant.clone(), policy_id);
110+
111+
// Get-or-insert the OnceCell for this key.
112+
let cell = self
113+
.cache
114+
.entry(key)
115+
.or_insert_with(|| Arc::new(OnceCell::new()))
116+
.value()
117+
.clone();
118+
119+
let result = cell
120+
.get_or_try_init(|| {
121+
let meta_api = meta_api.clone();
122+
let tenant = tenant.clone();
123+
async move {
124+
let handler = get_row_access_policy_handler();
125+
let res = handler
126+
.get_row_access_policy_by_id(meta_api, &tenant, policy_id)
127+
.await;
128+
if res.is_ok() {
129+
info!(
130+
"row_access_policy_cache: loaded policy_id={} from meta store",
131+
policy_id
132+
);
133+
}
134+
res
135+
}
136+
})
137+
.await?;
138+
139+
Ok(result.clone())
140+
}
141+
}

src/query/service/src/global_services.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use databend_common_users::RoleCacheManager;
4141
use databend_common_users::UserApiProvider;
4242
use databend_common_users::builtin::BuiltIn;
4343
use databend_enterprise_resources_management::DummyResourcesManagement;
44+
use databend_enterprise_row_access_policy_feature::RowAccessPolicyCacheManager;
4445
use databend_meta_runtime::DatabendRuntime;
4546
use databend_storages_common_cache::CacheManager;
4647
use databend_storages_common_cache::TempDirManager;
@@ -164,6 +165,7 @@ impl GlobalServices {
164165
.await?;
165166
}
166167
RoleCacheManager::init()?;
168+
RowAccessPolicyCacheManager::init()?;
167169

168170
DataOperator::init(&config.storage, config.spill.storage_params.clone()).await?;
169171
ShareTableConfig::init(

src/query/service/src/interpreters/interpreter_row_access_policy_drop.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use databend_common_meta_app::principal::OwnershipObject;
2323
use databend_common_sql::plans::DropRowAccessPolicyPlan;
2424
use databend_common_users::RoleCacheManager;
2525
use databend_common_users::UserApiProvider;
26+
use databend_enterprise_row_access_policy_feature::RowAccessPolicyCacheManager;
2627
use databend_enterprise_row_access_policy_feature::get_row_access_policy_handler;
2728

2829
use crate::interpreters::Interpreter;
@@ -80,6 +81,7 @@ impl Interpreter for DropRowAccessPolicyInterpreter {
8081
}
8182

8283
if let Some(policy_id) = policy_id {
84+
RowAccessPolicyCacheManager::instance().invalidate(&tenant, policy_id);
8385
let role_api = UserApiProvider::instance().role_api(&tenant);
8486
role_api
8587
.revoke_ownership(&OwnershipObject::RowAccessPolicy { policy_id })

src/query/sql/src/planner/binder/table.rs

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ use databend_common_meta_app::tenant::Tenant;
6363
use databend_common_storage::StageFileInfo;
6464
use databend_common_storage::StageFilesInfo;
6565
use databend_common_users::UserApiProvider;
66-
use databend_enterprise_row_access_policy_feature::get_row_access_policy_handler;
66+
use databend_enterprise_row_access_policy_feature::RowAccessPolicyCacheManager;
6767
use databend_meta_client::types::MetaId;
6868
use databend_storages_common_table_meta::table::ChangeType;
6969
use log::debug;
@@ -502,8 +502,6 @@ impl Binder {
502502
) -> Result<SExpr> {
503503
LicenseManagerSwitch::instance()
504504
.check_enterprise_enabled(self.ctx.get_license_key(), Feature::RowAccessPolicy)?;
505-
let meta_api = UserApiProvider::instance().get_meta_store_client();
506-
let handler = get_row_access_policy_handler();
507505
// Collect arguments in policy.columns_ids order (matches the policy parameter list).
508506
// Previously this iterated `fields` (schema order) with a contains-filter, which
509507
// silently reordered arguments when USING column order differed from schema order.
@@ -521,18 +519,32 @@ impl Binder {
521519
})
522520
.collect();
523521
let policy = policy.policy_id;
522+
let tenant = self.ctx.get_tenant();
523+
let cache = RowAccessPolicyCacheManager::instance();
524524
let start = std::time::Instant::now();
525-
let res = databend_common_base::runtime::block_on(handler.get_row_access_policy_by_id(
526-
meta_api,
527-
&self.ctx.get_tenant(),
528-
policy,
529-
))?;
530-
let fetch_elapsed = start.elapsed();
531-
info!(
532-
"row_access_policy: policy_id={}, fetch_ms={:.3}",
533-
policy,
534-
fetch_elapsed.as_secs_f64() * 1000.0,
535-
);
525+
// Fast sync path: avoid block_on overhead on cache hits.
526+
let res = match cache.try_get(&tenant, policy) {
527+
Some(cached) => {
528+
debug!(
529+
"row_access_policy: policy_id={}, cache_hit, elapsed_ms={:.3}",
530+
policy,
531+
start.elapsed().as_secs_f64() * 1000.0,
532+
);
533+
cached
534+
}
535+
None => {
536+
let meta_api = UserApiProvider::instance().get_meta_store_client();
537+
let loaded = databend_common_base::runtime::block_on(
538+
cache.get_or_load(meta_api, &tenant, policy),
539+
)?;
540+
info!(
541+
"row_access_policy: policy_id={}, cache_miss, fetch_ms={:.3}",
542+
policy,
543+
start.elapsed().as_secs_f64() * 1000.0,
544+
);
545+
loaded
546+
}
547+
};
536548
let body = res.data.body;
537549
let settings = self.ctx.get_settings();
538550
let sql_dialect = settings.get_sql_dialect()?;

0 commit comments

Comments
 (0)