Fix/inference logging client arrow binary type#374
Conversation
add inference logging reader client
The features and metadata columns can contain raw binary data stored as StringType. PyArrow fails UTF-8 validation on these during Arrow-to-Pandas serialization in mapInPandas, throwing ArrowException before _decode_batch code even runs. - Cast features and metadata to BinaryType before mapInPandas so Arrow serializes them as raw bytes without UTF-8 validation - Handle bytes/bytearray input in _decode_batch for features_data - Handle bytes/bytearray input in _extract_metadata_byte for metadata Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Add _split_raw_proto_entities and _is_raw_proto_wire_format helpers
to auto-detect and decode the v2 binary framing format [{<proto>}, ...]
- Cast both features and metadata columns to BinaryType before mapInPandas
to prevent ArrowException on non-UTF-8 binary payloads
- Handle bytes input in _extract_metadata_byte from BinaryType cast
- Stringify parent_entity values to prevent ArrowTypeError
- Port v0.3.7 additions: decode_mplog_proto_dataframe, decode_mplog_proto_csv,
_normalize_schema
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
WalkthroughThe PR expands MPLog proto decoding capabilities by introducing v2 raw-proto wire format support, two new schema-driven public functions for DataFrame and CSV decoding, improving metadata robustness for binary inputs, and broadening pyspark dependency compatibility from an exact pin to a version range. ChangesProto Decoding Enhancements and New APIs
🚥 Pre-merge checks | ✅ 1✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
Loosen the pyspark dependency from `==3.3.0` to `>=3.3,<4` so the client co-installs cleanly with packages targeting newer PySpark minors (e.g. meeupp-spark-lib-python 1.2.2 → pyspark~=3.5.3), which previously failed pip resolution. PySpark 3.x is API-stable for the surface this client uses (Spark SQL DataFrame / mapInPandas / Arrow types). Bump to 0.3.9 because 0.3.8 is already on PyPI. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
|
||
| dependencies = [ | ||
| "pyspark==3.3.0", | ||
| "pyspark>=3.3,<4", |
There was a problem hiding this comment.
Semgrep identified a blocking 🔴 issue in your code:
Dependency "$MATCH" uses a range operator. Pin to exact version with == or use a lockfile (e.g. uv.lock, pdm.lock, poetry.lock). Range pins allow auto-upgrades to compromised versions in CI.
Why this might be safe to ignore:
This finding is in a project dependency specification (pyproject.toml) for pandas version constraint. Using range operators like '>=' in library dependencies is a standard practice to ensure compatibility with newer versions while maintaining minimum requirements. This is not a security vulnerability but rather a dependency management choice. The rule's concern about auto-upgrades to compromised versions in CI is mitigated by using lockfiles, which this project can implement separately without changing the base dependency specification.
To resolve this comment:
🔧 No guidance has been designated for this issue. Fix according to your organization's approved methods.
💬 Ignore this finding
Reply with Semgrep commands to ignore this finding.
/fp <comment>for false positive/ar <comment>for acceptable risk/other <comment>for all other reasons
Alternatively, triage in Semgrep AppSec Platform to ignore the finding created by pyproject-dependency-range-pin.
You can view more details about this finding in the Semgrep AppSec Platform.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
py-sdk/inference_logging_client/inference_logging_client/__init__.py (1)
422-434:⚠️ Potential issue | 🟠 Major | ⚡ Quick winHonor
mp_config_id_columninstead of hardcodingmp_config_id.These paths still build pass-through metadata from the literal
"mp_config_id". With a custommp_config_id_column,decode_mplog_dataframe()can still decode schemas, but the configured column disappears from the output;decode_mplog_proto_dataframe()does the same, anddecode_mplog_proto_csv()currently ignores the parameter entirely. Please threadmp_config_id_columnthrough the metadata lists and final column ordering.Also applies to: 663-675, 805-817, 1119-1131
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around lines 422 - 434, The metadata lists and final column ordering currently hardcode "mp_config_id" instead of using the mp_config_id_column parameter; update each place that builds row_metadata_columns (and equivalent lists at the other noted regions) to insert the mp_config_id_column variable rather than the literal string, and thread mp_config_id_column through calls and returns in decode_mplog_dataframe(), decode_mplog_proto_dataframe(), and decode_mplog_proto_csv() so the configured column name is preserved in the output ordering; ensure defaulting behavior remains the same (use the parameter value when provided) and adjust any downstream references that expect "mp_config_id" to use the variable name.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 104-143: _split_raw_proto_entities and the related
_is_raw_proto_wire_format are incorrectly using literal byte-sequence sentinels
(like b'},{' / b'}, {') to split protobuf blobs, which corrupts valid proto
payloads; replace this fragile logic with a deterministic protobuf framing
parser: detect and parse length-delimited (varint-prefixed) protobuf messages by
reading a varint length then that many bytes in a loop, and only fall back to
single-message parsing if varint framing fails; update both
_split_raw_proto_entities and _is_raw_proto_wire_format to use this
varint-length framing (or use the protobuf library's ParseDelimitedFromString
helper) instead of searching for brace separators so opaque proto bytes are
never interpreted as delimiters.
- Around line 847-851: In decode_mplog_proto_dataframe(), when handling the
JSON-envelope branch inside the mapInPandas row loop (the check using
features_data), do not skip bytes-like values produced by Arrow after casting
features_column to BinaryType; instead detect bytes/bytearray/memoryview and
decode them to UTF-8 to obtain a str before calling json.loads (same approach as
in decode_mplog_dataframe()). Update the branch that currently does `if not
isinstance(features_data, str): continue` to accept and convert bytes-like
inputs so JSON-envelope rows are parsed correctly.
---
Outside diff comments:
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 422-434: The metadata lists and final column ordering currently
hardcode "mp_config_id" instead of using the mp_config_id_column parameter;
update each place that builds row_metadata_columns (and equivalent lists at the
other noted regions) to insert the mp_config_id_column variable rather than the
literal string, and thread mp_config_id_column through calls and returns in
decode_mplog_dataframe(), decode_mplog_proto_dataframe(), and
decode_mplog_proto_csv() so the configured column name is preserved in the
output ordering; ensure defaulting behavior remains the same (use the parameter
value when provided) and adjust any downstream references that expect
"mp_config_id" to use the variable name.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ad2b705c-a1dc-4d59-b667-957e4305ff5b
📒 Files selected for processing (2)
py-sdk/inference_logging_client/inference_logging_client/__init__.pypy-sdk/inference_logging_client/pyproject.toml
| def _split_raw_proto_entities(raw: bytes) -> list: | ||
| """Split v2 raw-proto wire format into per-entity byte chunks. | ||
|
|
||
| Wire format: [{<proto bytes>}, {<proto bytes>}, ...] | ||
| Separator between entities is b'},{' or b'}, {'. | ||
| """ | ||
| if len(raw) < 4: | ||
| return [raw] if raw else [] | ||
|
|
||
| if raw[0:1] == b'[' and raw[1:2] == b'{': | ||
| inner = raw[2:] | ||
| if inner.endswith(b'}]'): | ||
| inner = inner[:-2] | ||
| elif inner.endswith(b'}'): | ||
| inner = inner[:-1] | ||
|
|
||
| chunks = [] | ||
| start = 0 | ||
| i = 0 | ||
| while i < len(inner): | ||
| if inner[i:i + 1] == b'}': | ||
| rest = inner[i + 1:i + 4] | ||
| if rest.startswith(b', {') or rest.startswith(b',{'): | ||
| chunks.append(inner[start:i]) | ||
| skip = 3 if rest.startswith(b', ') else 2 | ||
| start = i + 1 + skip | ||
| i = start | ||
| continue | ||
| i += 1 | ||
| if start < len(inner): | ||
| chunks.append(inner[start:]) | ||
| return [c for c in chunks if c] | ||
|
|
||
| if raw[0:1] == b'{': | ||
| inner = raw[1:] | ||
| if inner.endswith(b'}'): | ||
| inner = inner[:-1] | ||
| return [inner] if inner else [] | ||
|
|
||
| return [raw] |
There was a problem hiding this comment.
Use an unambiguous parser for raw-proto framing.
Both helpers treat opaque proto bytes as if sentinel bytes were reserved: _is_raw_proto_wire_format() rejects any payload whose first entity byte is 0x22, and _split_raw_proto_entities() splits on b'},{' / b'}, {' even though those byte sequences are legal inside a protobuf body. That will misclassify or corrupt valid rows as soon as the payload happens to contain those bytes.
Also applies to: 146-152
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 104 - 143, _split_raw_proto_entities and the related
_is_raw_proto_wire_format are incorrectly using literal byte-sequence sentinels
(like b'},{' / b'}, {') to split protobuf blobs, which corrupts valid proto
payloads; replace this fragile logic with a deterministic protobuf framing
parser: detect and parse length-delimited (varint-prefixed) protobuf messages by
reading a varint length then that many bytes in a loop, and only fall back to
single-message parsing if varint framing fails; update both
_split_raw_proto_entities and _is_raw_proto_wire_format to use this
varint-length framing (or use the protobuf library's ParseDelimitedFromString
helper) instead of searching for brace separators so opaque proto bytes are
never interpreted as delimiters.
| # Cast features to BinaryType for safe Arrow serialization | ||
| from pyspark.sql import functions as _F | ||
| from pyspark.sql.types import BinaryType as _BinaryType | ||
| if not isinstance(input_field_map.get(features_column), _BinaryType): | ||
| df = df.withColumn(features_column, _F.col(features_column).cast(_BinaryType())) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate the target file and inspect the referenced line ranges
FILE="py-sdk/inference_logging_client/inference_logging_client/__init__.py"
echo "=== File existence ==="
ls -la "$FILE" || true
echo "=== Snippet around lines 820-900 ==="
sed -n '820,910p' "$FILE"
echo "=== Snippet around lines 950-1030 ==="
sed -n '950,1030p' "$FILE"
# Find the decode functions and mapInPandas handling
echo "=== Find decode_mplog_proto_dataframe ==="
rg -n "def decode_mplog_proto_dataframe|decode_mplog_proto_dataframe\(" "$FILE" -S || true
echo "=== Find decode_mplog_dataframe ==="
rg -n "def decode_mplog_dataframe|decode_mplog_dataframe\(" "$FILE" -S || true
echo "=== Search for JSON-envelope branch / envelope decoding ==="
rg -n "json|envelope|JSON|decode.*envelope|mplog_proto|mapInPandas" "$FILE" -S || true
echo "=== Inspect broader region where mapInPandas is used ==="
rg -n "mapInPandas" "$FILE" -S || trueRepository: Meesho/BharatMLStack
Length of output: 12381
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="py-sdk/inference_logging_client/inference_logging_client/__init__.py"
echo "=== decode_mplog_dataframe JSON/raw handling (around 430-620) ==="
sed -n '400,660p' "$FILE"
echo "=== decode_mplog_proto_dataframe JSON/raw handling (around 720-1040) ==="
sed -n '720,1045p' "$FILE"
echo "=== Search for bytes/bytearray -> UTF-8 decoding ==="
rg -n "(bytearray|bytes).*utf-8|utf-8.*(bytearray|bytes)|decode\\(.*utf-8\\)|decode.*'utf-8'|decode.*\"utf-8\"" "$FILE" -S || trueRepository: Meesho/BharatMLStack
Length of output: 28374
Don't skip JSON-envelope rows after casting features to BinaryType.
In decode_mplog_proto_dataframe(), features_column is cast to BinaryType before mapInPandas(), but the JSON-envelope branch immediately does if not isinstance(features_data, str): continue, so bytes/bytearray (likely produced by the Arrow→pandas boundary) are discarded and the JSON-envelope format becomes unreachable. Decode bytes/bytearray/memoryview to UTF-8 before json.loads, same as decode_mplog_dataframe().
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 847 - 851, In decode_mplog_proto_dataframe(), when handling the
JSON-envelope branch inside the mapInPandas row loop (the check using
features_data), do not skip bytes-like values produced by Arrow after casting
features_column to BinaryType; instead detect bytes/bytearray/memoryview and
decode them to UTF-8 to obtain a str before calling json.loads (same approach as
in decode_mplog_dataframe()). Update the branch that currently does `if not
isinstance(features_data, str): continue` to accept and convert bytes-like
inputs so JSON-envelope rows are parsed correctly.
🔁 Pull Request Template – BharatMLStack
Context:
Give a brief overview of the motivation behind this change. Include any relevant discussion links (Slack, documents, tickets, etc.) that help reviewers understand the background and the issue being addressed.
Describe your changes:
Mention the changes made in the codebase.
Testing:
Please describe how you tested the code. If manual tests were performed - please explain how. If automatic tests were added or existing ones cover the change - please explain how did you run them.
Monitoring:
Explain how this change will be tracked after deployment. Indicate whether current dashboards, alerts, and logs are enough, or if additional instrumentation is required.
Rollback plan
Explain rollback plan in case of issues.
Checklist before requesting a review
📂 Modules Affected
horizon(Real-time systems / networking)online-feature-store(Feature serving infra)trufflebox-ui(Admin panel / UI)infra(Docker, CI/CD, GCP/AWS setup)docs(Documentation updates)___________✅ Type of Change
___________📊 Benchmark / Metrics (if applicable)
Summary by CodeRabbit
Release Notes
New Features
decode_mplog_proto_dataframefor Spark-based decoding with custom caller-provided schemas.decode_mplog_proto_csvfor CSV-based decoding and output transformation.Improvements