Skip to content

Commit 84b5ef8

Browse files
committed
feat(query): add billing usage daily table function
1 parent ceaf0b7 commit 84b5ef8

15 files changed

Lines changed: 1006 additions & 21 deletions

File tree

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
syntax = "proto3";
2+
3+
package billingproto;
4+
5+
message BillingError {
6+
string kind = 1;
7+
string message = 2;
8+
int32 code = 3;
9+
}
10+
11+
message GetBillingUsageDailyRequest {
12+
string tenant_id = 1;
13+
string billing_month = 2; // YYYY-MM
14+
string sql_user = 3; // audit only, empty means absent
15+
string query_id = 4; // audit only, empty means absent
16+
}
17+
18+
message BillingUsageDailyRow {
19+
// Billing date in YYYY-MM-DD format.
20+
string usage_date = 1;
21+
22+
// Top-level billing category, aligned with Snowflake naming where practical.
23+
// Examples: compute, storage, cloud services.
24+
string usage_type = 2;
25+
26+
// More specific service category.
27+
// Examples: WAREHOUSE_METERING, STORAGE, CLOUD_SERVICES.
28+
string service_type = 3;
29+
30+
// Logical billable resource name when applicable, such as warehouse name.
31+
// Empty string means absent.
32+
string resource_name = 4;
33+
34+
// Billable usage quantity encoded as a decimal string to avoid float precision loss.
35+
string usage = 5;
36+
37+
// Unit for usage.
38+
// Examples: second, byte, request.
39+
string usage_unit = 6;
40+
41+
// Reserved billing unit price field.
42+
// Empty string means the rate is intentionally undisclosed.
43+
string rate = 7;
44+
45+
// Billing dimension unit for the reserved rate field.
46+
// Examples: second, tb_day, k_request.
47+
// Returned when the billing dimension is known.
48+
string rate_unit = 8;
49+
50+
// Final billed amount encoded as a decimal string.
51+
string usage_in_currency = 9;
52+
53+
// Settlement currency, for example USD.
54+
string currency = 10;
55+
56+
// Resource tags when the underlying usage is associated with a tagged resource.
57+
map<string, string> tags = 11;
58+
59+
// JSON object string for category-specific or future-compatible extension fields.
60+
// Examples:
61+
// {"cluster_name":"cl-00000","max_clusters":1,"size":"XSmall"}
62+
// {}
63+
string details = 12;
64+
}
65+
66+
message GetBillingUsageDailyResponse {
67+
repeated BillingUsageDailyRow rows = 1;
68+
optional BillingError error = 2;
69+
}
70+
71+
service BillingService {
72+
rpc GetBillingUsageDaily(GetBillingUsageDailyRequest) returns (GetBillingUsageDailyResponse);
73+
}

src/common/cloud_control/proto/task.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,4 +246,4 @@ service TaskService {
246246
rpc ShowTaskRuns(ShowTaskRunsRequest) returns (ShowTaskRunsResponse);
247247
rpc GetTaskDependents(GetTaskDependentsRequest) returns (GetTaskDependentsResponse);
248248
rpc EnableTaskDependents(EnableTaskDependentsRequest) returns (EnableTaskDependentsResponse);
249-
}
249+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use databend_common_exception::Result;
18+
use tonic::Request;
19+
use tonic::transport::Channel;
20+
21+
use crate::pb::GetBillingUsageDailyRequest;
22+
use crate::pb::GetBillingUsageDailyResponse;
23+
use crate::pb::billing_service_client::BillingServiceClient;
24+
use crate::task_client::MAX_DECODING_SIZE;
25+
use crate::task_client::MAX_ENCODING_SIZE;
26+
27+
pub struct BillingClient {
28+
pub client: BillingServiceClient<Channel>,
29+
}
30+
31+
impl BillingClient {
32+
pub async fn new(channel: Channel) -> Result<Arc<BillingClient>> {
33+
let client = BillingServiceClient::new(channel)
34+
.max_decoding_message_size(MAX_DECODING_SIZE)
35+
.max_encoding_message_size(MAX_ENCODING_SIZE);
36+
Ok(Arc::new(BillingClient { client }))
37+
}
38+
39+
pub async fn get_billing_usage_daily(
40+
&self,
41+
req: Request<GetBillingUsageDailyRequest>,
42+
) -> Result<GetBillingUsageDailyResponse> {
43+
let mut client = self.client.clone();
44+
let resp = client.get_billing_usage_daily(req).await?;
45+
Ok(resp.into_inner())
46+
}
47+
}

src/common/cloud_control/src/cloud_api.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ use databend_common_base::base::GlobalInstance;
1919
use databend_common_exception::ErrorCode;
2020
use databend_common_exception::Result;
2121

22+
use crate::billing_client::BillingClient;
2223
use crate::notification_client::NotificationClient;
2324
use crate::task_client::TaskClient;
2425
use crate::worker_client::WorkerClient;
2526

2627
pub const CLOUD_REQUEST_TIMEOUT_SEC: u64 = 5; // 5 seconds
2728

2829
pub struct CloudControlApiProvider {
30+
pub billing_client: Arc<BillingClient>,
2931
pub task_client: Arc<TaskClient>,
3032
pub notification_client: Arc<NotificationClient>,
3133
pub worker_client: Arc<WorkerClient>,
@@ -42,10 +44,12 @@ impl CloudControlApiProvider {
4244

4345
let endpoint = Self::get_endpoint(endpoint, timeout).await?;
4446
let channel = endpoint.connect_lazy();
47+
let billing_client = BillingClient::new(channel.clone()).await?;
4548
let task_client = TaskClient::new(channel.clone()).await?;
4649
let notification_client = NotificationClient::new(channel.clone()).await?;
4750
let worker_client = WorkerClient::new(channel).await?;
4851
Ok(Arc::new(CloudControlApiProvider {
52+
billing_client,
4953
task_client,
5054
notification_client,
5155
worker_client,
@@ -85,6 +89,10 @@ impl CloudControlApiProvider {
8589
self.task_client.clone()
8690
}
8791

92+
pub fn get_billing_client(&self) -> Arc<BillingClient> {
93+
self.billing_client.clone()
94+
}
95+
8896
pub fn get_notification_client(&self) -> Arc<NotificationClient> {
8997
self.notification_client.clone()
9098
}

src/common/cloud_control/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
pub mod billing_client;
1516
pub mod client_config;
1617
pub mod cloud_api;
1718
pub mod notification_client;
@@ -24,6 +25,7 @@ pub mod worker_client;
2425
#[allow(clippy::large_enum_variant)]
2526
/// ProtoBuf generated files.
2627
pub mod pb {
28+
tonic::include_proto!("billingproto");
2729
// taskproto is proto package name.
2830
tonic::include_proto!("taskproto");
2931
tonic::include_proto!("notificationproto");
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::BTreeMap;
16+
17+
use databend_common_base::runtime;
18+
use databend_common_cloud_control::billing_client::BillingClient;
19+
use databend_common_cloud_control::pb::BillingUsageDailyRow;
20+
use databend_common_cloud_control::pb::GetBillingUsageDailyRequest;
21+
use databend_common_cloud_control::pb::GetBillingUsageDailyResponse;
22+
use databend_common_cloud_control::pb::billing_service_server::BillingService;
23+
use databend_common_cloud_control::pb::billing_service_server::BillingServiceServer;
24+
use hyper_util::rt::TokioIo;
25+
use tonic::Request;
26+
use tonic::Response;
27+
use tonic::Status;
28+
use tonic::codegen::tokio_stream;
29+
use tonic::transport::Endpoint;
30+
use tonic::transport::Server;
31+
use tonic::transport::Uri;
32+
use tower::service_fn;
33+
34+
#[derive(Default)]
35+
pub struct MockBillingService {}
36+
37+
#[tonic::async_trait]
38+
impl BillingService for MockBillingService {
39+
async fn get_billing_usage_daily(
40+
&self,
41+
request: Request<GetBillingUsageDailyRequest>,
42+
) -> std::result::Result<Response<GetBillingUsageDailyResponse>, Status> {
43+
Ok(Response::new(GetBillingUsageDailyResponse {
44+
rows: vec![BillingUsageDailyRow {
45+
usage_date: request.into_inner().billing_month,
46+
usage_type: "compute".to_string(),
47+
service_type: "WAREHOUSE_METERING".to_string(),
48+
resource_name: "default".to_string(),
49+
usage: "2653".to_string(),
50+
usage_unit: "second".to_string(),
51+
rate: "".to_string(),
52+
rate_unit: "second".to_string(),
53+
usage_in_currency: "0.737".to_string(),
54+
currency: "USD".to_string(),
55+
tags: BTreeMap::from([("env".to_string(), "test".to_string())]),
56+
details: "{\"cluster_name\":\"cl-00000\"}".to_string(),
57+
}],
58+
error: None,
59+
}))
60+
}
61+
}
62+
63+
#[tokio::test(flavor = "current_thread")]
64+
async fn test_billing_client_success_cases() -> anyhow::Result<()> {
65+
let (client, server) = tokio::io::duplex(1024);
66+
let client = TokioIo::new(client);
67+
68+
runtime::spawn(async move {
69+
Server::builder()
70+
.add_service(BillingServiceServer::new(MockBillingService::default()))
71+
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
72+
.await
73+
});
74+
75+
let mut client_io = Some(client);
76+
let channel = Endpoint::try_from("http://[::]:0")
77+
.unwrap()
78+
.connect_with_connector(service_fn(move |_: Uri| {
79+
let client = client_io.take();
80+
81+
async move {
82+
if let Some(client) = client {
83+
Ok(client)
84+
} else {
85+
Err(std::io::Error::other("Client already taken"))
86+
}
87+
}
88+
}))
89+
.await
90+
.unwrap();
91+
92+
let client = BillingClient::new(channel).await?;
93+
94+
let resp = client
95+
.get_billing_usage_daily(Request::new(GetBillingUsageDailyRequest {
96+
tenant_id: "tenant".to_string(),
97+
billing_month: "2026-03".to_string(),
98+
sql_user: "root".to_string(),
99+
query_id: "query-1".to_string(),
100+
}))
101+
.await?;
102+
assert_eq!(resp.rows.len(), 1);
103+
assert_eq!(resp.rows[0].usage_date, "2026-03");
104+
assert_eq!(resp.rows[0].usage_type, "compute");
105+
assert_eq!(resp.rows[0].resource_name, "default");
106+
107+
Ok(())
108+
}

src/common/cloud_control/tests/it/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
mod billing_client;
1516
mod task_client;

src/meta/api/src/api_impl/garbage_collection_api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ where
139139

140140
// Check if we should update based on monotonic property
141141
let should_update = match current_retention {
142-
None => true, // Never set before, always update
142+
None => true, // Never set before, always update
143143
Some(existing) => new_timestamp > existing.time, // Only update if new timestamp is greater
144144
};
145145

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -339,11 +339,11 @@ impl ReclusterTableInterpreter {
339339
// Adjust number of partitions according to the block size thresholds
340340
if total_partitions < block_thresholds.block_per_segment
341341
&& block_thresholds.check_perfect_segment(
342-
block_thresholds.block_per_segment, // this effectively by-pass the total_blocks criteria
343-
total_rows,
344-
total_bytes,
345-
total_compressed,
346-
)
342+
block_thresholds.block_per_segment, // this effectively by-pass the total_blocks criteria
343+
total_rows,
344+
total_bytes,
345+
total_compressed,
346+
)
347347
{
348348
total_partitions = block_thresholds.block_per_segment;
349349
}

src/query/service/src/table_functions/table_function_factory.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ use databend_common_storages_iceberg::IcebergInspectTable;
3939
use databend_common_storages_stream::stream_status_table_func::StreamStatusTable;
4040
use databend_meta_client::types::MetaId;
4141
#[cfg(feature = "task-support")]
42+
use databend_query_task_support::table_functions::BillingUsageDailyTable;
43+
#[cfg(feature = "task-support")]
4244
use databend_query_task_support::table_functions::TaskDependentsEnableTable;
4345
#[cfg(feature = "task-support")]
4446
use databend_query_task_support::table_functions::TaskDependentsTable;
@@ -344,6 +346,11 @@ impl TableFunctionFactory {
344346
"task_history".to_string(),
345347
(next_id(), Arc::new(TaskHistoryTable::create)),
346348
);
349+
350+
creators.insert(
351+
"billing_usage_daily".to_string(),
352+
(next_id(), Arc::new(BillingUsageDailyTable::create)),
353+
);
347354
}
348355

349356
creators.insert(

0 commit comments

Comments
 (0)