Skip to content

Commit 51f9cd7

Browse files
authored
feat: negotiate HTTP arrow features and stringify variant schema (#19780)
1 parent 62d6d1f commit 51f9cd7

12 files changed

Lines changed: 768 additions & 95 deletions

File tree

src/common/io/src/format_settings.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ pub struct OutputFormatSettings {
148148
pub headers: u8,
149149
// used only for compat with old bendSQL driver.
150150
pub http_arrow_use_jsonb: bool,
151+
// used only in http arrow response to gate decimal64 physical type output.
152+
pub http_arrow_use_decimal64: bool,
151153

152154
pub json_compact: bool,
153155
pub json_strings: bool,
@@ -165,6 +167,7 @@ impl Default for OutputFormatSettings {
165167
http_json_result_mode: HttpHandlerDataFormat::Display,
166168
headers: 0,
167169
http_arrow_use_jsonb: false,
170+
http_arrow_use_decimal64: true,
168171
json_compact: false,
169172
json_strings: false,
170173
format_null_as_str: false,

src/query/service/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ storage-hdfs = ["opendal/services-hdfs", "databend-common-storage/storage-hdfs"]
3232
anyhow = { workspace = true }
3333
arrow-array = { workspace = true }
3434
arrow-buffer = { workspace = true }
35+
arrow-cast = { workspace = true }
3536
arrow-flight = { workspace = true }
3637
arrow-ipc = { workspace = true }
3738
arrow-json = { workspace = true }
@@ -185,7 +186,6 @@ uuid = { workspace = true }
185186
walkdir = { workspace = true }
186187

187188
[dev-dependencies]
188-
arrow-cast = { workspace = true }
189189
databend-common-sql-test-support = { workspace = true }
190190
databend-storages-common-pruner = { workspace = true }
191191
geo = { workspace = true }

src/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 128 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ use super::query::HttpQuery;
6868
use super::query::HttpQueryRequest;
6969
use super::query::HttpQueryResponseInternal;
7070
use super::query::ResponseState;
71+
use super::query::execute_state::ARROW_FEATURE_NEGOTIATION_VERSION;
7172
use super::query::execute_state::LEGACY_ARROW_RESULT_VERSION;
7273
use super::query::execute_state::SERVER_MAX_ARROW_RESULT_VERSION;
7374
use super::query::execute_state::legacy_bendsql_python_arrow_result_version;
@@ -156,6 +157,28 @@ pub struct QueryResponseSettings {
156157
pub http_json_result_mode: String,
157158
#[serde(skip_serializing_if = "Option::is_none")]
158159
pub arrow_result_version: Option<u64>,
160+
#[serde(skip_serializing_if = "Option::is_none")]
161+
pub arrow_features: Option<ArrowFeatures>,
162+
}
163+
164+
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
165+
pub struct ArrowFeatures {
166+
#[serde(skip_serializing_if = "Option::is_none")]
167+
pub decimal64: Option<bool>,
168+
}
169+
170+
impl ArrowFeatures {
171+
pub(crate) fn decimal64_enabled() -> Self {
172+
Self {
173+
decimal64: Some(true),
174+
}
175+
}
176+
177+
fn visible_enabled_only(&self) -> Option<Self> {
178+
self.decimal64
179+
.filter(|enabled| *enabled)
180+
.map(|_| Self::decimal64_enabled())
181+
}
159182
}
160183

161184
#[derive(Serialize, Debug, Clone)]
@@ -211,6 +234,7 @@ impl QueryResponse {
211234
warnings,
212235
response_settings,
213236
arrow_result_version: _,
237+
arrow_features: _,
214238
},
215239
}: HttpQueryResponseInternal,
216240
is_final: bool,
@@ -263,7 +287,8 @@ impl QueryResponse {
263287
|| encoding.arrow_result_version.is_some()
264288
);
265289
let response_settings = response_settings.map(|mut settings| {
266-
settings.arrow_result_version = visible_arrow_result_version(encoding);
290+
settings.arrow_result_version = visible_arrow_result_version(&encoding);
291+
settings.arrow_features = visible_arrow_features(&encoding);
267292
settings
268293
});
269294

@@ -324,7 +349,7 @@ impl QueryResponse {
324349
}
325350
}
326351

327-
fn visible_arrow_result_version(encoding: ResponseEncoding) -> Option<u64> {
352+
fn visible_arrow_result_version(encoding: &ResponseEncoding) -> Option<u64> {
328353
match encoding.format {
329354
QueryResultFormat::Json => None,
330355
QueryResultFormat::Arrow => encoding
@@ -333,6 +358,22 @@ fn visible_arrow_result_version(encoding: ResponseEncoding) -> Option<u64> {
333358
}
334359
}
335360

361+
fn visible_arrow_features(encoding: &ResponseEncoding) -> Option<ArrowFeatures> {
362+
if matches!(encoding.format, QueryResultFormat::Json) {
363+
return None;
364+
}
365+
366+
encoding
367+
.arrow_result_version
368+
.filter(|version| *version >= ARROW_FEATURE_NEGOTIATION_VERSION)
369+
.and_then(|_| {
370+
encoding
371+
.arrow_features
372+
.as_ref()
373+
.and_then(ArrowFeatures::visible_enabled_only)
374+
})
375+
}
376+
336377
fn negotiate_arrow_result_version(
337378
req: &HttpQueryRequest,
338379
query_result_format: QueryResultFormat,
@@ -360,6 +401,28 @@ fn negotiate_arrow_result_version(
360401
)))
361402
}
362403

404+
fn negotiate_arrow_features(
405+
req: &HttpQueryRequest,
406+
arrow_result_version: Option<u64>,
407+
) -> Option<ArrowFeatures> {
408+
if arrow_result_version? < ARROW_FEATURE_NEGOTIATION_VERSION {
409+
return None;
410+
}
411+
412+
Some(
413+
match req
414+
.arrow_features
415+
.as_ref()
416+
.and_then(|features| features.decimal64)
417+
{
418+
Some(false) => ArrowFeatures {
419+
decimal64: Some(false),
420+
},
421+
_ => ArrowFeatures::decimal64_enabled(),
422+
},
423+
)
424+
}
425+
363426
fn require_negotiated_arrow_result_version(
364427
query_id: &str,
365428
query_result_format: QueryResultFormat,
@@ -382,10 +445,11 @@ pub struct StateResponse {
382445
pub stats: QueryStats,
383446
}
384447

385-
#[derive(Debug, Clone, Copy)]
448+
#[derive(Debug, Clone)]
386449
struct ResponseEncoding {
387450
format: QueryResultFormat,
388451
arrow_result_version: Option<u64>,
452+
arrow_features: Option<ArrowFeatures>,
389453
}
390454

391455
impl StateResponse {
@@ -477,6 +541,7 @@ async fn query_final_handler(
477541
response.state.arrow_result_version,
478542
)
479543
.map_err(HttpErrorCode::bad_request)?,
544+
arrow_features: response.state.arrow_features.clone(),
480545
};
481546
// it is safe to set these 2 fields to None, because client now check for null/None first.
482547
response.session = None;
@@ -610,6 +675,7 @@ async fn query_page_handler(
610675
resp.state.arrow_result_version,
611676
)
612677
.map_err(HttpErrorCode::bad_request)?,
678+
arrow_features: resp.state.arrow_features.clone(),
613679
};
614680
query
615681
.update_expire_time(false, resp.is_data_drained())
@@ -699,6 +765,7 @@ pub(crate) async fn query_handler(
699765
resp.state.arrow_result_version,
700766
)
701767
.map_err(HttpErrorCode::bad_request)?,
768+
arrow_features: resp.state.arrow_features.clone(),
702769
};
703770
Ok(QueryResponse::from_internal(
704771
ctx.query_id.clone(),
@@ -713,8 +780,16 @@ pub(crate) async fn query_handler(
713780
let arrow_result_version =
714781
negotiate_arrow_result_version(&req, query_result_format, ctx.user_agent.as_deref())
715782
.map_err(HttpErrorCode::bad_request)?;
783+
let arrow_features = negotiate_arrow_features(&req, arrow_result_version);
716784

717-
match HttpQuery::try_create(ctx, req.clone(), arrow_result_version).await {
785+
match HttpQuery::try_create(
786+
ctx,
787+
req.clone(),
788+
arrow_result_version,
789+
arrow_features.clone(),
790+
)
791+
.await
792+
{
718793
Err(err) => {
719794
let err = err.display_with_sql(&sql);
720795
error!("Failed to start SQL query, error: {:?}", err);
@@ -774,6 +849,7 @@ pub(crate) async fn query_handler(
774849
ResponseEncoding {
775850
format: query_result_format,
776851
arrow_result_version,
852+
arrow_features,
777853
},
778854
)
779855
.into_response())
@@ -1200,3 +1276,51 @@ impl<'a> FromRequest<'a> for QueryResultFormat {
12001276
.unwrap_or(QueryResultFormat::Json))
12011277
}
12021278
}
1279+
1280+
#[cfg(test)]
1281+
mod tests {
1282+
use super::*;
1283+
use crate::servers::http::v1::query::HttpQueryRequest;
1284+
1285+
fn dummy_request(arrow_features: Option<ArrowFeatures>) -> HttpQueryRequest {
1286+
HttpQueryRequest {
1287+
session_id: None,
1288+
session: None,
1289+
sql: "select 1".to_string(),
1290+
pagination: Default::default(),
1291+
string_fields: true,
1292+
stage_attachment: None,
1293+
params: None,
1294+
arrow_result_version_max: None,
1295+
arrow_features,
1296+
}
1297+
}
1298+
1299+
#[test]
1300+
fn test_negotiate_arrow_features_preserves_explicit_decimal64_disable() {
1301+
let features = negotiate_arrow_features(
1302+
&dummy_request(Some(ArrowFeatures {
1303+
decimal64: Some(false),
1304+
})),
1305+
Some(ARROW_FEATURE_NEGOTIATION_VERSION),
1306+
);
1307+
assert_eq!(
1308+
features,
1309+
Some(ArrowFeatures {
1310+
decimal64: Some(false),
1311+
})
1312+
);
1313+
}
1314+
1315+
#[test]
1316+
fn test_visible_arrow_features_hides_disabled_entries() {
1317+
let encoding = ResponseEncoding {
1318+
format: QueryResultFormat::Arrow,
1319+
arrow_result_version: Some(ARROW_FEATURE_NEGOTIATION_VERSION),
1320+
arrow_features: Some(ArrowFeatures {
1321+
decimal64: Some(false),
1322+
}),
1323+
};
1324+
assert_eq!(visible_arrow_features(&encoding), None);
1325+
}
1326+
}

0 commit comments

Comments
 (0)