From 221060af7a0cc417be96e6df9922641b6409b28e Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 19 Jun 2026 19:55:00 +0000 Subject: [PATCH 1/4] feat(eap-items): double-write arrays into typed map columns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Populate the typed `Map(String, Array(T))` columns added in #8056 (`attributes_array_string`, `attributes_array_int`, `attributes_array_float`, `attributes_array_bool`) from `EAPItemsProcessor`, alongside the existing `attributes_array` JSON column. This is a double write: the legacy JSON column keeps being written so the read path is unaffected until it is migrated to the typed columns. Each array attribute is split by element type into the matching typed bucket. Homogeneous arrays — the common case — populate exactly one bucket. Integer elements are also written as floats, mirroring the scalar `insert_int` double-write, because the read path resolves numeric attributes to the float columns (`_VALUE_TYPE_TO_COLUMN`). The new columns are appended to `EAPItemRow` and `COLUMN_NAMES` (which is sent explicitly on `INSERT ... FORMAT RowBinary`) in matching order. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_017PBiwoiUxf5TAnzWmbZebu --- rust_snuba/src/processors/eap_items.rs | 196 ++++++++++++++++++++++++- 1 file changed, 190 insertions(+), 6 deletions(-) diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index ebfe8ba25dd..9416464309a 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -415,6 +415,22 @@ struct AttributeMap { #[serde(skip_serializing_if = "HashMap::is_empty")] attributes_array: HashMap>, + // Typed map columns for array-valued attributes (migration 0059). Arrays are + // double-written here alongside the legacy `attributes_array` JSON column so + // the read path can filter/aggregate on values and enumerate keys via + // `mapKeys(...)` like the scalar attribute maps. + #[serde(skip_serializing_if = "HashMap::is_empty")] + attributes_array_string: HashMap>, + + #[serde(skip_serializing_if = "HashMap::is_empty")] + attributes_array_int: HashMap>, + + #[serde(skip_serializing_if = "HashMap::is_empty")] + attributes_array_float: HashMap>, + + #[serde(skip_serializing_if = "HashMap::is_empty")] + attributes_array_bool: HashMap>, + #( #[serde(skip_serializing_if = "HashMap::is_empty")] attributes_string_~N: HashMap, @@ -461,14 +477,39 @@ impl AttributeMap { } pub fn insert_array(&mut self, k: String, v: ArrayValue) { + // Legacy JSON representation, preserving mixed-type ordering. let mut values: Vec = Vec::default(); + // Typed buckets, one per element type, for the `attributes_array_*` + // columns. Homogeneous arrays — the common case — populate exactly one + // bucket; mixed arrays are split across buckets by element type. + let mut strings: Vec = Vec::default(); + let mut ints: Vec = Vec::default(); + let mut floats: Vec = Vec::default(); + let mut bools: Vec = Vec::default(); + for value in v.values { match value.value { - Some(Value::StringValue(string)) => values.push(EAPValue::String(string)), - Some(Value::DoubleValue(double)) => values.push(EAPValue::Double(double)), - Some(Value::IntValue(int)) => values.push(EAPValue::Int(int)), - Some(Value::BoolValue(bool)) => values.push(EAPValue::Bool(bool)), + Some(Value::StringValue(string)) => { + strings.push(string.clone()); + values.push(EAPValue::String(string)); + } + Some(Value::DoubleValue(double)) => { + floats.push(double); + values.push(EAPValue::Double(double)); + } + Some(Value::IntValue(int)) => { + // Double-write ints as floats too, mirroring the scalar + // `insert_int`: the read path resolves numeric attributes to + // the float columns. + ints.push(int); + floats.push(int as f64); + values.push(EAPValue::Int(int)); + } + Some(Value::BoolValue(bool)) => { + bools.push(bool); + values.push(EAPValue::Bool(bool)); + } Some(Value::BytesValue(_)) => (), Some(Value::KvlistValue(_)) => (), Some(Value::ArrayValue(_)) => (), @@ -476,6 +517,19 @@ impl AttributeMap { } } + if !strings.is_empty() { + self.attributes_array_string.insert(k.clone(), strings); + } + if !ints.is_empty() { + self.attributes_array_int.insert(k.clone(), ints); + } + if !floats.is_empty() { + self.attributes_array_float.insert(k.clone(), floats); + } + if !bools.is_empty() { + self.attributes_array_bool.insert(k.clone(), bools); + } + self.attributes_array.insert(k, values); } } @@ -510,6 +564,11 @@ pub struct EAPItemRow { )* attributes_array: String, + + attributes_array_string: Vec<(String, Vec)>, + attributes_array_int: Vec<(String, Vec)>, + attributes_array_float: Vec<(String, Vec)>, + attributes_array_bool: Vec<(String, Vec)>, } } @@ -551,6 +610,10 @@ impl EAPItemRow { concat!("attributes_float_", stringify!(N)), )* "attributes_array", + "attributes_array_string", + "attributes_array_int", + "attributes_array_float", + "attributes_array_bool", ]; } } @@ -586,6 +649,18 @@ impl TryFrom for EAPItemRow { attributes_float_~N: item.attributes.attributes_float_~N.into_iter().collect(), )* attributes_array, + attributes_array_string: item + .attributes + .attributes_array_string + .into_iter() + .collect(), + attributes_array_int: item.attributes.attributes_array_int.into_iter().collect(), + attributes_array_float: item + .attributes + .attributes_array_float + .into_iter() + .collect(), + attributes_array_bool: item.attributes.attributes_array_bool.into_iter().collect(), }); } } @@ -1004,8 +1079,9 @@ mod tests { #[test] fn test_column_names_match_struct_layout() { let names = EAPItemRow::COLUMN_NAMES; - // 12 scalars + indexed_name + attributes_bool + attributes_int + 80 buckets + attributes_array - assert_eq!(names.len(), 96); + // 12 scalars + indexed_name + attributes_bool + attributes_int + 80 + // buckets + attributes_array + 4 typed array maps + assert_eq!(names.len(), 100); assert_eq!(names[0], "organization_id"); assert_eq!(names[5], "item_id"); assert_eq!(names[6], "indexed_name"); @@ -1021,6 +1097,11 @@ mod tests { assert_eq!(names[93], "attributes_string_39"); assert_eq!(names[94], "attributes_float_39"); assert_eq!(names[95], "attributes_array"); + // Typed array map columns follow the JSON column. + assert_eq!(names[96], "attributes_array_string"); + assert_eq!(names[97], "attributes_array_int"); + assert_eq!(names[98], "attributes_array_float"); + assert_eq!(names[99], "attributes_array_bool"); } #[test] @@ -1239,6 +1320,109 @@ mod tests { // Array attributes are serialized as JSON string in the attributes_array column assert!(row.attributes_array.contains("my_array")); assert!(row.attributes_array.contains("elem")); + + // ...and double-written to the typed `attributes_array_string` column. + assert!(row + .attributes_array_string + .iter() + .any(|(k, v)| k == "my_array" && v == &vec!["elem".to_string()])); + } + + #[test] + fn test_array_typed_columns_double_write() { + let item_id = Uuid::new_v4(); + let mut trace_item = generate_trace_item(item_id); + + // Homogeneous arrays of each element type. + trace_item.attributes.insert( + "strs".to_string(), + AnyValue { + value: Some(Value::ArrayValue(ArrayValue { + values: vec![ + AnyValue { + value: Some(Value::StringValue("a".to_string())), + }, + AnyValue { + value: Some(Value::StringValue("b".to_string())), + }, + ], + })), + }, + ); + trace_item.attributes.insert( + "ints".to_string(), + AnyValue { + value: Some(Value::ArrayValue(ArrayValue { + values: vec![ + AnyValue { + value: Some(Value::IntValue(1)), + }, + AnyValue { + value: Some(Value::IntValue(2)), + }, + ], + })), + }, + ); + trace_item.attributes.insert( + "floats".to_string(), + AnyValue { + value: Some(Value::ArrayValue(ArrayValue { + values: vec![AnyValue { + value: Some(Value::DoubleValue(1.5)), + }], + })), + }, + ); + trace_item.attributes.insert( + "bools".to_string(), + AnyValue { + value: Some(Value::ArrayValue(ArrayValue { + values: vec![AnyValue { + value: Some(Value::BoolValue(true)), + }], + })), + }, + ); + + let item = EAPItem::try_from(trace_item).unwrap(); + let attrs = &item.attributes; + + assert_eq!( + attrs.attributes_array_string.get("strs").unwrap(), + &vec!["a".to_string(), "b".to_string()] + ); + assert_eq!( + attrs.attributes_array_int.get("ints").unwrap(), + &vec![1i64, 2] + ); + // Ints are double-written as floats too, mirroring scalar `insert_int`. + assert_eq!( + attrs.attributes_array_float.get("ints").unwrap(), + &vec![1.0f64, 2.0] + ); + assert_eq!( + attrs.attributes_array_float.get("floats").unwrap(), + &vec![1.5f64] + ); + assert_eq!( + attrs.attributes_array_bool.get("bools").unwrap(), + &vec![true] + ); + + // Non-numeric typed maps are not cross-populated. + assert!(attrs.attributes_array_int.get("strs").is_none()); + assert!(attrs.attributes_array_bool.get("ints").is_none()); + + // The legacy JSON column is still populated (double write). + assert_eq!( + attrs.attributes_array.get("strs").unwrap()[0], + EAPValue::String("a".to_string()) + ); + assert_eq!( + attrs.attributes_array.get("ints").unwrap()[0], + EAPValue::Int(1) + ); } #[test] From 2a1347fd305fff97a2302bafd955651f7a12d1e4 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 19 Jun 2026 19:59:27 +0000 Subject: [PATCH 2/4] fix(eap-items): use !contains_key instead of get().is_none() in test Satisfies clippy::unnecessary_get_then_check on the CI toolchain. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_017PBiwoiUxf5TAnzWmbZebu --- rust_snuba/src/processors/eap_items.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index 9416464309a..bcd29a8d7d9 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -1411,8 +1411,8 @@ mod tests { ); // Non-numeric typed maps are not cross-populated. - assert!(attrs.attributes_array_int.get("strs").is_none()); - assert!(attrs.attributes_array_bool.get("ints").is_none()); + assert!(!attrs.attributes_array_int.contains_key("strs")); + assert!(!attrs.attributes_array_bool.contains_key("ints")); // The legacy JSON column is still populated (double write). assert_eq!( From 85f5f3a0d4832a61be1e7399245e2382cc9de552 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Jun 2026 19:10:25 +0000 Subject: [PATCH 3/4] ref(eap-items): derive typed array columns without extra clones Address review feedback: - Drop the int->float double-write for arrays. Integer array elements now go only to `attributes_array_int`, not also to `attributes_array_float`. - Stop cloning array values (and keys) when populating the typed columns. Instead of building parallel typed maps in `insert_array` (which cloned every string and every key), `insert_array` is back to only building the `attributes_array` JSON representation. The typed `Map(String, Array(T))` columns are derived in `TryFrom for EAPItemRow` by consuming that representation and moving the values into the typed buckets. A `push_typed_array` helper moves the attribute key into the single bucket a homogeneous array touches (the common case) and only clones it for the extra buckets a rare mixed-type array spans. This keeps the production RowBinary path (eap_items always runs with --use-row-binary) writing both the legacy JSON column and the typed columns, with no per-element clones. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_017PBiwoiUxf5TAnzWmbZebu --- rust_snuba/src/processors/eap_items.rs | 193 ++++++++++++------------- 1 file changed, 93 insertions(+), 100 deletions(-) diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index bcd29a8d7d9..722bf632f6e 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -387,6 +387,29 @@ fn read_item_id(from: Vec) -> anyhow::Result { Ok(u128::from_le_bytes(bytes)) } +/// Append `(key, values)` to a typed `Map(String, Array(T))` column, skipping +/// empty arrays. `remaining` is the number of typed buckets this attribute key +/// still has to be written to; the key is moved into the last one and cloned +/// for any earlier buckets. Homogeneous arrays — the common case — touch a +/// single bucket and move the key with no clone. +fn push_typed_array( + dest: &mut Vec<(String, Vec)>, + key: &mut String, + remaining: &mut usize, + values: Vec, +) { + if values.is_empty() { + return; + } + *remaining -= 1; + let key = if *remaining == 0 { + std::mem::take(key) + } else { + key.clone() + }; + dest.push((key, values)); +} + macro_rules! seq_attrs { ($($tt:tt)*) => { seq!(N in 0..40 { @@ -415,22 +438,6 @@ struct AttributeMap { #[serde(skip_serializing_if = "HashMap::is_empty")] attributes_array: HashMap>, - // Typed map columns for array-valued attributes (migration 0059). Arrays are - // double-written here alongside the legacy `attributes_array` JSON column so - // the read path can filter/aggregate on values and enumerate keys via - // `mapKeys(...)` like the scalar attribute maps. - #[serde(skip_serializing_if = "HashMap::is_empty")] - attributes_array_string: HashMap>, - - #[serde(skip_serializing_if = "HashMap::is_empty")] - attributes_array_int: HashMap>, - - #[serde(skip_serializing_if = "HashMap::is_empty")] - attributes_array_float: HashMap>, - - #[serde(skip_serializing_if = "HashMap::is_empty")] - attributes_array_bool: HashMap>, - #( #[serde(skip_serializing_if = "HashMap::is_empty")] attributes_string_~N: HashMap, @@ -477,39 +484,14 @@ impl AttributeMap { } pub fn insert_array(&mut self, k: String, v: ArrayValue) { - // Legacy JSON representation, preserving mixed-type ordering. let mut values: Vec = Vec::default(); - // Typed buckets, one per element type, for the `attributes_array_*` - // columns. Homogeneous arrays — the common case — populate exactly one - // bucket; mixed arrays are split across buckets by element type. - let mut strings: Vec = Vec::default(); - let mut ints: Vec = Vec::default(); - let mut floats: Vec = Vec::default(); - let mut bools: Vec = Vec::default(); - for value in v.values { match value.value { - Some(Value::StringValue(string)) => { - strings.push(string.clone()); - values.push(EAPValue::String(string)); - } - Some(Value::DoubleValue(double)) => { - floats.push(double); - values.push(EAPValue::Double(double)); - } - Some(Value::IntValue(int)) => { - // Double-write ints as floats too, mirroring the scalar - // `insert_int`: the read path resolves numeric attributes to - // the float columns. - ints.push(int); - floats.push(int as f64); - values.push(EAPValue::Int(int)); - } - Some(Value::BoolValue(bool)) => { - bools.push(bool); - values.push(EAPValue::Bool(bool)); - } + Some(Value::StringValue(string)) => values.push(EAPValue::String(string)), + Some(Value::DoubleValue(double)) => values.push(EAPValue::Double(double)), + Some(Value::IntValue(int)) => values.push(EAPValue::Int(int)), + Some(Value::BoolValue(bool)) => values.push(EAPValue::Bool(bool)), Some(Value::BytesValue(_)) => (), Some(Value::KvlistValue(_)) => (), Some(Value::ArrayValue(_)) => (), @@ -517,19 +499,6 @@ impl AttributeMap { } } - if !strings.is_empty() { - self.attributes_array_string.insert(k.clone(), strings); - } - if !ints.is_empty() { - self.attributes_array_int.insert(k.clone(), ints); - } - if !floats.is_empty() { - self.attributes_array_float.insert(k.clone(), floats); - } - if !bools.is_empty() { - self.attributes_array_bool.insert(k.clone(), bools); - } - self.attributes_array.insert(k, values); } } @@ -625,6 +594,50 @@ impl TryFrom for EAPItemRow { fn try_from(item: EAPItem) -> Result { let attributes_array = serde_json::to_string(&item.attributes.attributes_array)?; + // Derive the typed `Map(String, Array(T))` columns from the same array + // attributes, double-writing alongside the `attributes_array` JSON column + // (serialized just above) so the read path can filter/aggregate on values + // and enumerate keys via `mapKeys(...)`. We consume the `EAPValue` + // representation here and move its values into the typed buckets rather + // than cloning them. Arrays are typically homogeneous, so a key usually + // lands in a single bucket and is moved, not cloned. + let mut attributes_array_string: Vec<(String, Vec)> = Vec::new(); + let mut attributes_array_int: Vec<(String, Vec)> = Vec::new(); + let mut attributes_array_float: Vec<(String, Vec)> = Vec::new(); + let mut attributes_array_bool: Vec<(String, Vec)> = Vec::new(); + for (mut key, values) in item.attributes.attributes_array { + let mut strings: Vec = Vec::new(); + let mut ints: Vec = Vec::new(); + let mut floats: Vec = Vec::new(); + let mut bools: Vec = Vec::new(); + for value in values { + match value { + EAPValue::String(s) => strings.push(s), + EAPValue::Int(i) => ints.push(i), + EAPValue::Double(d) => floats.push(d), + EAPValue::Bool(b) => bools.push(b), + } + } + let mut remaining = usize::from(!strings.is_empty()) + + usize::from(!ints.is_empty()) + + usize::from(!floats.is_empty()) + + usize::from(!bools.is_empty()); + push_typed_array( + &mut attributes_array_string, + &mut key, + &mut remaining, + strings, + ); + push_typed_array(&mut attributes_array_int, &mut key, &mut remaining, ints); + push_typed_array( + &mut attributes_array_float, + &mut key, + &mut remaining, + floats, + ); + push_typed_array(&mut attributes_array_bool, &mut key, &mut remaining, bools); + } + // `return` is needed because `seq_attrs!` expands with a trailing semicolon, // which makes the struct expression a statement rather than a tail expression. seq_attrs! { @@ -649,18 +662,10 @@ impl TryFrom for EAPItemRow { attributes_float_~N: item.attributes.attributes_float_~N.into_iter().collect(), )* attributes_array, - attributes_array_string: item - .attributes - .attributes_array_string - .into_iter() - .collect(), - attributes_array_int: item.attributes.attributes_array_int.into_iter().collect(), - attributes_array_float: item - .attributes - .attributes_array_float - .into_iter() - .collect(), - attributes_array_bool: item.attributes.attributes_array_bool.into_iter().collect(), + attributes_array_string, + attributes_array_int, + attributes_array_float, + attributes_array_bool, }); } } @@ -1385,44 +1390,32 @@ mod tests { }, ); - let item = EAPItem::try_from(trace_item).unwrap(); - let attrs = &item.attributes; + let row = EAPItemRow::try_from(EAPItem::try_from(trace_item).unwrap()).unwrap(); + + fn get<'a, T>(col: &'a [(String, T)], key: &str) -> Option<&'a T> { + col.iter().find(|(k, _)| k == key).map(|(_, v)| v) + } assert_eq!( - attrs.attributes_array_string.get("strs").unwrap(), - &vec!["a".to_string(), "b".to_string()] - ); - assert_eq!( - attrs.attributes_array_int.get("ints").unwrap(), - &vec![1i64, 2] - ); - // Ints are double-written as floats too, mirroring scalar `insert_int`. - assert_eq!( - attrs.attributes_array_float.get("ints").unwrap(), - &vec![1.0f64, 2.0] - ); - assert_eq!( - attrs.attributes_array_float.get("floats").unwrap(), - &vec![1.5f64] + get(&row.attributes_array_string, "strs"), + Some(&vec!["a".to_string(), "b".to_string()]) ); + assert_eq!(get(&row.attributes_array_int, "ints"), Some(&vec![1i64, 2])); assert_eq!( - attrs.attributes_array_bool.get("bools").unwrap(), - &vec![true] + get(&row.attributes_array_float, "floats"), + Some(&vec![1.5f64]) ); + assert_eq!(get(&row.attributes_array_bool, "bools"), Some(&vec![true])); - // Non-numeric typed maps are not cross-populated. - assert!(!attrs.attributes_array_int.contains_key("strs")); - assert!(!attrs.attributes_array_bool.contains_key("ints")); + // Ints are NOT double-written to the float column. + assert_eq!(get(&row.attributes_array_float, "ints"), None); + // Typed maps are not cross-populated across element types. + assert_eq!(get(&row.attributes_array_int, "strs"), None); + assert_eq!(get(&row.attributes_array_bool, "ints"), None); // The legacy JSON column is still populated (double write). - assert_eq!( - attrs.attributes_array.get("strs").unwrap()[0], - EAPValue::String("a".to_string()) - ); - assert_eq!( - attrs.attributes_array.get("ints").unwrap()[0], - EAPValue::Int(1) - ); + assert!(row.attributes_array.contains("strs")); + assert!(row.attributes_array.contains("ints")); } #[test] From b51275f8c427ccd586da9f29566c1e27959e2e31 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Jun 2026 19:43:00 +0000 Subject: [PATCH 4/4] ref(eap-items): write typed array columns on both paths, insert-as-read The typed Map(String, Array(T)) columns were only being derived in the RowBinary EAPItemRow conversion, so the JSON (JSONEachRow) path stopped double-writing them. Move the typed maps back onto AttributeMap, which both write paths serialize, so the double-write happens on both the JSON and RowBinary paths. Simplify the write: insert_array now appends each element to its typed column as the value is read (entry API), instead of buffering into per-type vectors and deriving the columns afterward. The EAPItemRow conversion is a plain move of the maps again, and the push_typed_array helper is gone. The JSON<->RowBinary equivalence test now also asserts the typed array columns match across both paths. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_017PBiwoiUxf5TAnzWmbZebu --- rust_snuba/src/processors/eap_items.rs | 190 +++++++++++++------------ 1 file changed, 101 insertions(+), 89 deletions(-) diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index 722bf632f6e..768a697084f 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -387,29 +387,6 @@ fn read_item_id(from: Vec) -> anyhow::Result { Ok(u128::from_le_bytes(bytes)) } -/// Append `(key, values)` to a typed `Map(String, Array(T))` column, skipping -/// empty arrays. `remaining` is the number of typed buckets this attribute key -/// still has to be written to; the key is moved into the last one and cloned -/// for any earlier buckets. Homogeneous arrays — the common case — touch a -/// single bucket and move the key with no clone. -fn push_typed_array( - dest: &mut Vec<(String, Vec)>, - key: &mut String, - remaining: &mut usize, - values: Vec, -) { - if values.is_empty() { - return; - } - *remaining -= 1; - let key = if *remaining == 0 { - std::mem::take(key) - } else { - key.clone() - }; - dest.push((key, values)); -} - macro_rules! seq_attrs { ($($tt:tt)*) => { seq!(N in 0..40 { @@ -438,6 +415,22 @@ struct AttributeMap { #[serde(skip_serializing_if = "HashMap::is_empty")] attributes_array: HashMap>, + // Typed map columns for array-valued attributes (migration 0059). Arrays are + // double-written here, alongside the legacy `attributes_array` JSON column, on + // both the JSON and RowBinary paths so the read path can filter/aggregate on + // values and enumerate keys via `mapKeys(...)` like the scalar attribute maps. + #[serde(skip_serializing_if = "HashMap::is_empty")] + attributes_array_string: HashMap>, + + #[serde(skip_serializing_if = "HashMap::is_empty")] + attributes_array_int: HashMap>, + + #[serde(skip_serializing_if = "HashMap::is_empty")] + attributes_array_float: HashMap>, + + #[serde(skip_serializing_if = "HashMap::is_empty")] + attributes_array_bool: HashMap>, + #( #[serde(skip_serializing_if = "HashMap::is_empty")] attributes_string_~N: HashMap, @@ -484,14 +477,42 @@ impl AttributeMap { } pub fn insert_array(&mut self, k: String, v: ArrayValue) { + // Each element is appended to its typed `Map(String, Array(T))` column as + // it is read, and also to the legacy `attributes_array` JSON column. A + // homogeneous array (the common case) lands in a single typed column; + // mixed arrays are split across columns by element type. let mut values: Vec = Vec::default(); for value in v.values { match value.value { - Some(Value::StringValue(string)) => values.push(EAPValue::String(string)), - Some(Value::DoubleValue(double)) => values.push(EAPValue::Double(double)), - Some(Value::IntValue(int)) => values.push(EAPValue::Int(int)), - Some(Value::BoolValue(bool)) => values.push(EAPValue::Bool(bool)), + Some(Value::StringValue(string)) => { + self.attributes_array_string + .entry(k.clone()) + .or_default() + .push(string.clone()); + values.push(EAPValue::String(string)); + } + Some(Value::DoubleValue(double)) => { + self.attributes_array_float + .entry(k.clone()) + .or_default() + .push(double); + values.push(EAPValue::Double(double)); + } + Some(Value::IntValue(int)) => { + self.attributes_array_int + .entry(k.clone()) + .or_default() + .push(int); + values.push(EAPValue::Int(int)); + } + Some(Value::BoolValue(bool)) => { + self.attributes_array_bool + .entry(k.clone()) + .or_default() + .push(bool); + values.push(EAPValue::Bool(bool)); + } Some(Value::BytesValue(_)) => (), Some(Value::KvlistValue(_)) => (), Some(Value::ArrayValue(_)) => (), @@ -594,50 +615,6 @@ impl TryFrom for EAPItemRow { fn try_from(item: EAPItem) -> Result { let attributes_array = serde_json::to_string(&item.attributes.attributes_array)?; - // Derive the typed `Map(String, Array(T))` columns from the same array - // attributes, double-writing alongside the `attributes_array` JSON column - // (serialized just above) so the read path can filter/aggregate on values - // and enumerate keys via `mapKeys(...)`. We consume the `EAPValue` - // representation here and move its values into the typed buckets rather - // than cloning them. Arrays are typically homogeneous, so a key usually - // lands in a single bucket and is moved, not cloned. - let mut attributes_array_string: Vec<(String, Vec)> = Vec::new(); - let mut attributes_array_int: Vec<(String, Vec)> = Vec::new(); - let mut attributes_array_float: Vec<(String, Vec)> = Vec::new(); - let mut attributes_array_bool: Vec<(String, Vec)> = Vec::new(); - for (mut key, values) in item.attributes.attributes_array { - let mut strings: Vec = Vec::new(); - let mut ints: Vec = Vec::new(); - let mut floats: Vec = Vec::new(); - let mut bools: Vec = Vec::new(); - for value in values { - match value { - EAPValue::String(s) => strings.push(s), - EAPValue::Int(i) => ints.push(i), - EAPValue::Double(d) => floats.push(d), - EAPValue::Bool(b) => bools.push(b), - } - } - let mut remaining = usize::from(!strings.is_empty()) - + usize::from(!ints.is_empty()) - + usize::from(!floats.is_empty()) - + usize::from(!bools.is_empty()); - push_typed_array( - &mut attributes_array_string, - &mut key, - &mut remaining, - strings, - ); - push_typed_array(&mut attributes_array_int, &mut key, &mut remaining, ints); - push_typed_array( - &mut attributes_array_float, - &mut key, - &mut remaining, - floats, - ); - push_typed_array(&mut attributes_array_bool, &mut key, &mut remaining, bools); - } - // `return` is needed because `seq_attrs!` expands with a trailing semicolon, // which makes the struct expression a statement rather than a tail expression. seq_attrs! { @@ -662,10 +639,18 @@ impl TryFrom for EAPItemRow { attributes_float_~N: item.attributes.attributes_float_~N.into_iter().collect(), )* attributes_array, - attributes_array_string, - attributes_array_int, - attributes_array_float, - attributes_array_bool, + attributes_array_string: item + .attributes + .attributes_array_string + .into_iter() + .collect(), + attributes_array_int: item.attributes.attributes_array_int.into_iter().collect(), + attributes_array_float: item + .attributes + .attributes_array_float + .into_iter() + .collect(), + attributes_array_bool: item.attributes.attributes_array_bool.into_iter().collect(), }); } } @@ -1390,32 +1375,30 @@ mod tests { }, ); - let row = EAPItemRow::try_from(EAPItem::try_from(trace_item).unwrap()).unwrap(); - - fn get<'a, T>(col: &'a [(String, T)], key: &str) -> Option<&'a T> { - col.iter().find(|(k, _)| k == key).map(|(_, v)| v) - } + // The typed maps live on AttributeMap, which both the JSON and RowBinary + // paths serialize, so checking them here covers both write paths. + let attrs = EAPItem::try_from(trace_item).unwrap().attributes; assert_eq!( - get(&row.attributes_array_string, "strs"), + attrs.attributes_array_string.get("strs"), Some(&vec!["a".to_string(), "b".to_string()]) ); - assert_eq!(get(&row.attributes_array_int, "ints"), Some(&vec![1i64, 2])); + assert_eq!(attrs.attributes_array_int.get("ints"), Some(&vec![1i64, 2])); assert_eq!( - get(&row.attributes_array_float, "floats"), + attrs.attributes_array_float.get("floats"), Some(&vec![1.5f64]) ); - assert_eq!(get(&row.attributes_array_bool, "bools"), Some(&vec![true])); + assert_eq!(attrs.attributes_array_bool.get("bools"), Some(&vec![true])); // Ints are NOT double-written to the float column. - assert_eq!(get(&row.attributes_array_float, "ints"), None); + assert_eq!(attrs.attributes_array_float.get("ints"), None); // Typed maps are not cross-populated across element types. - assert_eq!(get(&row.attributes_array_int, "strs"), None); - assert_eq!(get(&row.attributes_array_bool, "ints"), None); + assert_eq!(attrs.attributes_array_int.get("strs"), None); + assert_eq!(attrs.attributes_array_bool.get("ints"), None); // The legacy JSON column is still populated (double write). - assert!(row.attributes_array.contains("strs")); - assert!(row.attributes_array.contains("ints")); + assert!(attrs.attributes_array.contains_key("strs")); + assert!(attrs.attributes_array.contains_key("ints")); } #[test] @@ -1778,6 +1761,35 @@ mod tests { ); } + // Compare the typed array columns. Both paths double-write them, and + // array_attr = [String "a", Int 1] splits across the string and int maps. + let json_arr_string: HashMap> = json_row + .get("attributes_array_string") + .map(|v| serde_json::from_value(v.clone()).unwrap()) + .unwrap_or_default(); + let rb_arr_string: HashMap> = + rb_row.attributes_array_string.iter().cloned().collect(); + assert_eq!( + json_arr_string.get("array_attr"), + Some(&vec!["a".to_string()]) + ); + assert_eq!( + json_arr_string, rb_arr_string, + "attributes_array_string mismatch between JSON and RowBinary" + ); + + let json_arr_int: HashMap> = json_row + .get("attributes_array_int") + .map(|v| serde_json::from_value(v.clone()).unwrap()) + .unwrap_or_default(); + let rb_arr_int: HashMap> = + rb_row.attributes_array_int.iter().cloned().collect(); + assert_eq!(json_arr_int.get("array_attr"), Some(&vec![1i64])); + assert_eq!( + json_arr_int, rb_arr_int, + "attributes_array_int mismatch between JSON and RowBinary" + ); + // Compare cogs_data assert_eq!( json_batch.cogs_data, rb_batch.cogs_data,