Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 195 additions & 6 deletions rust_snuba/src/processors/eap_items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,22 @@ struct AttributeMap {
#[serde(skip_serializing_if = "HashMap::is_empty")]
attributes_array: HashMap<String, Vec<EAPValue>>,

// Typed map columns for array-valued attributes (migration 0059). Arrays are
// double-written here, alongside the legacy `attributes_array` JSON column, on
// both the JSON and RowBinary paths so the read path can filter/aggregate on
// values and enumerate keys via `mapKeys(...)` like the scalar attribute maps.
#[serde(skip_serializing_if = "HashMap::is_empty")]
attributes_array_string: HashMap<String, Vec<String>>,

#[serde(skip_serializing_if = "HashMap::is_empty")]
attributes_array_int: HashMap<String, Vec<i64>>,

#[serde(skip_serializing_if = "HashMap::is_empty")]
attributes_array_float: HashMap<String, Vec<f64>>,

#[serde(skip_serializing_if = "HashMap::is_empty")]
attributes_array_bool: HashMap<String, Vec<bool>>,

#(
#[serde(skip_serializing_if = "HashMap::is_empty")]
attributes_string_~N: HashMap<String, String>,
Expand Down Expand Up @@ -461,14 +477,42 @@ impl AttributeMap {
}

pub fn insert_array(&mut self, k: String, v: ArrayValue) {
// Each element is appended to its typed `Map(String, Array(T))` column as
// it is read, and also to the legacy `attributes_array` JSON column. A
// homogeneous array (the common case) lands in a single typed column;
// mixed arrays are split across columns by element type.
let mut values: Vec<EAPValue> = Vec::default();

for value in v.values {
match value.value {
Some(Value::StringValue(string)) => values.push(EAPValue::String(string)),
Some(Value::DoubleValue(double)) => values.push(EAPValue::Double(double)),
Some(Value::IntValue(int)) => values.push(EAPValue::Int(int)),
Some(Value::BoolValue(bool)) => values.push(EAPValue::Bool(bool)),
Some(Value::StringValue(string)) => {
self.attributes_array_string
.entry(k.clone())
.or_default()
.push(string.clone());
values.push(EAPValue::String(string));
}
Some(Value::DoubleValue(double)) => {
self.attributes_array_float
.entry(k.clone())
.or_default()
.push(double);
values.push(EAPValue::Double(double));
}
Some(Value::IntValue(int)) => {
self.attributes_array_int
.entry(k.clone())
.or_default()
.push(int);
values.push(EAPValue::Int(int));
Comment on lines +502 to +507

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Integer arrays are only written to attributes_array_int, not the float column, making them unqueryable after the planned read-path migration, which is expected to read from the float column.
Severity: HIGH

Suggested Fix

Modify the insert_array function for Value::IntValue. When an integer array is processed, it should be written to both attributes_array_int and attributes_array_float to mirror the behavior of scalar integers. This ensures that the data will be queryable when the read path is migrated to use the new typed array columns.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: rust_snuba/src/processors/eap_items.rs#L502-L507

Potential issue: The `insert_array` method for integer values writes data exclusively to
the `attributes_array_int` column. This is inconsistent with the existing behavior for
scalar integers, which are double-written to both integer and float columns
(`attributes_float`). The read path currently resolves numeric types to float columns to
enable querying. When the read path is migrated for arrays in a future change, it will
likely look for integer arrays in `attributes_array_float`. Since the data written by
this PR will only be in `attributes_array_int`, queries for this data will return empty
results, leading to silent data inaccessibility.

Also affects:

  • rust_snuba/src/processors/eap_items.rs:1393~1394

Did we get this right? 👍 / 👎 to inform future reviews.

}
Some(Value::BoolValue(bool)) => {
self.attributes_array_bool
.entry(k.clone())
.or_default()
.push(bool);
values.push(EAPValue::Bool(bool));
}
Some(Value::BytesValue(_)) => (),
Some(Value::KvlistValue(_)) => (),
Some(Value::ArrayValue(_)) => (),
Expand Down Expand Up @@ -510,6 +554,11 @@ pub struct EAPItemRow {
)*

attributes_array: String,

attributes_array_string: Vec<(String, Vec<String>)>,
attributes_array_int: Vec<(String, Vec<i64>)>,
attributes_array_float: Vec<(String, Vec<f64>)>,
attributes_array_bool: Vec<(String, Vec<bool>)>,
}
}

Expand Down Expand Up @@ -551,6 +600,10 @@ impl EAPItemRow {
concat!("attributes_float_", stringify!(N)),
)*
"attributes_array",
"attributes_array_string",
"attributes_array_int",
"attributes_array_float",
"attributes_array_bool",
];
}
}
Expand Down Expand Up @@ -586,6 +639,18 @@ impl TryFrom<EAPItem> for EAPItemRow {
attributes_float_~N: item.attributes.attributes_float_~N.into_iter().collect(),
)*
attributes_array,
attributes_array_string: item
.attributes
.attributes_array_string
.into_iter()
.collect(),
attributes_array_int: item.attributes.attributes_array_int.into_iter().collect(),
attributes_array_float: item
.attributes
.attributes_array_float
.into_iter()
.collect(),
attributes_array_bool: item.attributes.attributes_array_bool.into_iter().collect(),
});
}
}
Expand Down Expand Up @@ -1004,8 +1069,9 @@ mod tests {
#[test]
fn test_column_names_match_struct_layout() {
let names = EAPItemRow::COLUMN_NAMES;
// 12 scalars + indexed_name + attributes_bool + attributes_int + 80 buckets + attributes_array
assert_eq!(names.len(), 96);
// 12 scalars + indexed_name + attributes_bool + attributes_int + 80
// buckets + attributes_array + 4 typed array maps
assert_eq!(names.len(), 100);
assert_eq!(names[0], "organization_id");
assert_eq!(names[5], "item_id");
assert_eq!(names[6], "indexed_name");
Expand All @@ -1021,6 +1087,11 @@ mod tests {
assert_eq!(names[93], "attributes_string_39");
assert_eq!(names[94], "attributes_float_39");
assert_eq!(names[95], "attributes_array");
// Typed array map columns follow the JSON column.
assert_eq!(names[96], "attributes_array_string");
assert_eq!(names[97], "attributes_array_int");
assert_eq!(names[98], "attributes_array_float");
assert_eq!(names[99], "attributes_array_bool");
}

#[test]
Expand Down Expand Up @@ -1239,6 +1310,95 @@ mod tests {
// Array attributes are serialized as JSON string in the attributes_array column
assert!(row.attributes_array.contains("my_array"));
assert!(row.attributes_array.contains("elem"));

// ...and double-written to the typed `attributes_array_string` column.
assert!(row
.attributes_array_string
.iter()
.any(|(k, v)| k == "my_array" && v == &vec!["elem".to_string()]));
}

#[test]
fn test_array_typed_columns_double_write() {
let item_id = Uuid::new_v4();
let mut trace_item = generate_trace_item(item_id);

// Homogeneous arrays of each element type.
trace_item.attributes.insert(
"strs".to_string(),
AnyValue {
value: Some(Value::ArrayValue(ArrayValue {
values: vec![
AnyValue {
value: Some(Value::StringValue("a".to_string())),
},
AnyValue {
value: Some(Value::StringValue("b".to_string())),
},
],
})),
},
);
trace_item.attributes.insert(
"ints".to_string(),
AnyValue {
value: Some(Value::ArrayValue(ArrayValue {
values: vec![
AnyValue {
value: Some(Value::IntValue(1)),
},
AnyValue {
value: Some(Value::IntValue(2)),
},
],
})),
},
);
trace_item.attributes.insert(
"floats".to_string(),
AnyValue {
value: Some(Value::ArrayValue(ArrayValue {
values: vec![AnyValue {
value: Some(Value::DoubleValue(1.5)),
}],
})),
},
);
trace_item.attributes.insert(
"bools".to_string(),
AnyValue {
value: Some(Value::ArrayValue(ArrayValue {
values: vec![AnyValue {
value: Some(Value::BoolValue(true)),
}],
})),
},
);

// The typed maps live on AttributeMap, which both the JSON and RowBinary
// paths serialize, so checking them here covers both write paths.
let attrs = EAPItem::try_from(trace_item).unwrap().attributes;

assert_eq!(
attrs.attributes_array_string.get("strs"),
Some(&vec!["a".to_string(), "b".to_string()])
);
assert_eq!(attrs.attributes_array_int.get("ints"), Some(&vec![1i64, 2]));
assert_eq!(
attrs.attributes_array_float.get("floats"),
Some(&vec![1.5f64])
);
assert_eq!(attrs.attributes_array_bool.get("bools"), Some(&vec![true]));

// Ints are NOT double-written to the float column.
assert_eq!(attrs.attributes_array_float.get("ints"), None);
// Typed maps are not cross-populated across element types.
assert_eq!(attrs.attributes_array_int.get("strs"), None);
assert_eq!(attrs.attributes_array_bool.get("ints"), None);

// The legacy JSON column is still populated (double write).
assert!(attrs.attributes_array.contains_key("strs"));
assert!(attrs.attributes_array.contains_key("ints"));
}

#[test]
Expand Down Expand Up @@ -1601,6 +1761,35 @@ mod tests {
);
}

// Compare the typed array columns. Both paths double-write them, and
// array_attr = [String "a", Int 1] splits across the string and int maps.
let json_arr_string: HashMap<String, Vec<String>> = json_row
.get("attributes_array_string")
.map(|v| serde_json::from_value(v.clone()).unwrap())
.unwrap_or_default();
let rb_arr_string: HashMap<String, Vec<String>> =
rb_row.attributes_array_string.iter().cloned().collect();
assert_eq!(
json_arr_string.get("array_attr"),
Some(&vec!["a".to_string()])
);
assert_eq!(
json_arr_string, rb_arr_string,
"attributes_array_string mismatch between JSON and RowBinary"
);

let json_arr_int: HashMap<String, Vec<i64>> = json_row
.get("attributes_array_int")
.map(|v| serde_json::from_value(v.clone()).unwrap())
.unwrap_or_default();
let rb_arr_int: HashMap<String, Vec<i64>> =
rb_row.attributes_array_int.iter().cloned().collect();
assert_eq!(json_arr_int.get("array_attr"), Some(&vec![1i64]));
assert_eq!(
json_arr_int, rb_arr_int,
"attributes_array_int mismatch between JSON and RowBinary"
);

// Compare cogs_data
assert_eq!(
json_batch.cogs_data, rb_batch.cogs_data,
Expand Down
Loading