Skip to content

Fix/inference logging client arrow binary type#374

Open
ramanujam-meesho wants to merge 8 commits into
developfrom
fix/inference-logging-client-arrow-binary-type
Open

Fix/inference logging client arrow binary type#374
ramanujam-meesho wants to merge 8 commits into
developfrom
fix/inference-logging-client-arrow-binary-type

Conversation

@ramanujam-meesho
Copy link
Copy Markdown

@ramanujam-meesho ramanujam-meesho commented May 22, 2026

🔁 Pull Request Template – BharatMLStack

Please fill out the following sections to help us review your changes efficiently.

Context:

Give a brief overview of the motivation behind this change. Include any relevant discussion links (Slack, documents, tickets, etc.) that help reviewers understand the background and the issue being addressed.

Describe your changes:

Mention the changes made in the codebase.

Testing:

Please describe how you tested the code. If manual tests were performed - please explain how. If automatic tests were added or existing ones cover the change - please explain how did you run them.

Monitoring:

Explain how this change will be tracked after deployment. Indicate whether current dashboards, alerts, and logs are enough, or if additional instrumentation is required.

Rollback plan

Explain rollback plan in case of issues.

Checklist before requesting a review

  • I have reviewed my own changes?
  • Relevant or critical functionality is covered by tests?
  • Monitoring needs have been evaluated?
  • Any necessary documentation updates have been considered?

📂 Modules Affected

  • horizon (Real-time systems / networking)
  • online-feature-store (Feature serving infra)
  • trufflebox-ui (Admin panel / UI)
  • infra (Docker, CI/CD, GCP/AWS setup)
  • docs (Documentation updates)
  • Other: ___________

✅ Type of Change

  • Feature addition
  • Bug fix
  • Infra / build system change
  • Performance improvement
  • Refactor
  • Documentation
  • Other: ___________

📊 Benchmark / Metrics (if applicable)

Summary by CodeRabbit

Release Notes

  • New Features

    • Added decode_mplog_proto_dataframe for Spark-based decoding with custom caller-provided schemas.
    • Added decode_mplog_proto_csv for CSV-based decoding and output transformation.
    • Support for v2 raw-proto wire format in addition to existing JSON envelope encoding.
  • Improvements

    • Enhanced metadata parsing robustness in existing decoder.
    • Extended PySpark version compatibility (3.3.x–3.x range).

Review Change Stack

a0d00kc and others added 7 commits February 20, 2026 00:35
add inference logging reader client
The features and metadata columns can contain raw binary data stored as
StringType. PyArrow fails UTF-8 validation on these during Arrow-to-Pandas
serialization in mapInPandas, throwing ArrowException before _decode_batch
code even runs.

- Cast features and metadata to BinaryType before mapInPandas so Arrow
  serializes them as raw bytes without UTF-8 validation
- Handle bytes/bytearray input in _decode_batch for features_data
- Handle bytes/bytearray input in _extract_metadata_byte for metadata

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Add _split_raw_proto_entities and _is_raw_proto_wire_format helpers
  to auto-detect and decode the v2 binary framing format [{<proto>}, ...]
- Cast both features and metadata columns to BinaryType before mapInPandas
  to prevent ArrowException on non-UTF-8 binary payloads
- Handle bytes input in _extract_metadata_byte from BinaryType cast
- Stringify parent_entity values to prevent ArrowTypeError
- Port v0.3.7 additions: decode_mplog_proto_dataframe, decode_mplog_proto_csv,
  _normalize_schema

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 22, 2026

Walkthrough

The PR expands MPLog proto decoding capabilities by introducing v2 raw-proto wire format support, two new schema-driven public functions for DataFrame and CSV decoding, improving metadata robustness for binary inputs, and broadening pyspark dependency compatibility from an exact pin to a version range.

Changes

Proto Decoding Enhancements and New APIs

Layer / File(s) Summary
Version and public API exports
py-sdk/inference_logging_client/inference_logging_client/__init__.py, py-sdk/inference_logging_client/pyproject.toml
Version bumped to 0.3.9 across both module init and project config. __all__ exports expanded to include decode_mplog_proto_dataframe and decode_mplog_proto_csv. pyspark dependency loosened from exact pin (3.3.0) to a compatible range (>=3.3,<4).
Raw-proto wire format detection and splitting
py-sdk/inference_logging_client/inference_logging_client/__init__.py
Helper functions added to detect v2 raw-proto framing format and split framed entity byte chunks for downstream decoding.
Metadata robustness and type casting
py-sdk/inference_logging_client/inference_logging_client/__init__.py
_extract_metadata_byte extended to handle bytes/bytearray metadata by UTF-8 decoding with fallback. Spark BinaryType casting added upfront to prevent Arrow UTF-8 interpretation. Row-metadata passthrough updated to use _safe_get for consistent null handling.
Enhanced decode_mplog_dataframe with dual encoding support
py-sdk/inference_logging_client/inference_logging_client/__init__.py
decode_mplog_dataframe updated to support auto-detection and decoding of both JSON-envelope and v2 raw-proto wire formats. Entity expansion logic handles raw-proto chunk splitting, optional zstd decompression per chunk, and per-entity proto-feature decoding with typed output conversions.
Schema normalization and new proto-decoding functions
py-sdk/inference_logging_client/inference_logging_client/__init__.py
_normalize_schema helper added to validate and convert multiple schema shapes into ordered FeatureInfo lists. decode_mplog_proto_dataframe introduced for distributed Spark DataFrame decoding using caller-provided PROTO schema with dual-framing support and per-entity row expansion. decode_mplog_proto_csv introduced for pure-Python row-by-row CSV decoding with optional zstd decompression and feature filtering.
🚥 Pre-merge checks | ✅ 1
✅ Passed checks (1 passed)
Check name Status Explanation
Dynamic Configuration Validation ✅ Passed No changes to application-dyn-*.yml files detected. PR only modifies Python source files in py-sdk/inference_logging_client/.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Loosen the pyspark dependency from `==3.3.0` to `>=3.3,<4` so the
client co-installs cleanly with packages targeting newer PySpark
minors (e.g. meeupp-spark-lib-python 1.2.2 → pyspark~=3.5.3),
which previously failed pip resolution. PySpark 3.x is API-stable
for the surface this client uses (Spark SQL DataFrame /
mapInPandas / Arrow types).

Bump to 0.3.9 because 0.3.8 is already on PyPI.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

dependencies = [
"pyspark==3.3.0",
"pyspark>=3.3,<4",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semgrep identified a blocking 🔴 issue in your code:
Dependency "$MATCH" uses a range operator. Pin to exact version with == or use a lockfile (e.g. uv.lock, pdm.lock, poetry.lock). Range pins allow auto-upgrades to compromised versions in CI.

Why this might be safe to ignore:

This finding is in a project dependency specification (pyproject.toml) for pandas version constraint. Using range operators like '>=' in library dependencies is a standard practice to ensure compatibility with newer versions while maintaining minimum requirements. This is not a security vulnerability but rather a dependency management choice. The rule's concern about auto-upgrades to compromised versions in CI is mitigated by using lockfiles, which this project can implement separately without changing the base dependency specification.

To resolve this comment:

🔧 No guidance has been designated for this issue. Fix according to your organization's approved methods.

💬 Ignore this finding

Reply with Semgrep commands to ignore this finding.

  • /fp <comment> for false positive
  • /ar <comment> for acceptable risk
  • /other <comment> for all other reasons

Alternatively, triage in Semgrep AppSec Platform to ignore the finding created by pyproject-dependency-range-pin.

You can view more details about this finding in the Semgrep AppSec Platform.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
py-sdk/inference_logging_client/inference_logging_client/__init__.py (1)

422-434: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Honor mp_config_id_column instead of hardcoding mp_config_id.

These paths still build pass-through metadata from the literal "mp_config_id". With a custom mp_config_id_column, decode_mplog_dataframe() can still decode schemas, but the configured column disappears from the output; decode_mplog_proto_dataframe() does the same, and decode_mplog_proto_csv() currently ignores the parameter entirely. Please thread mp_config_id_column through the metadata lists and final column ordering.

Also applies to: 663-675, 805-817, 1119-1131

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 422 - 434, The metadata lists and final column ordering currently hardcode
"mp_config_id" instead of using the mp_config_id_column parameter; update each
place that builds row_metadata_columns (and equivalent lists at the other noted
regions) to insert the mp_config_id_column variable rather than the literal
string, and thread mp_config_id_column through calls and returns in
decode_mplog_dataframe(), decode_mplog_proto_dataframe(), and
decode_mplog_proto_csv() so the configured column name is preserved in the
output ordering; ensure defaulting behavior remains the same (use the parameter
value when provided) and adjust any downstream references that expect
"mp_config_id" to use the variable name.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 104-143: _split_raw_proto_entities and the related
_is_raw_proto_wire_format are incorrectly using literal byte-sequence sentinels
(like b'},{' / b'}, {') to split protobuf blobs, which corrupts valid proto
payloads; replace this fragile logic with a deterministic protobuf framing
parser: detect and parse length-delimited (varint-prefixed) protobuf messages by
reading a varint length then that many bytes in a loop, and only fall back to
single-message parsing if varint framing fails; update both
_split_raw_proto_entities and _is_raw_proto_wire_format to use this
varint-length framing (or use the protobuf library's ParseDelimitedFromString
helper) instead of searching for brace separators so opaque proto bytes are
never interpreted as delimiters.
- Around line 847-851: In decode_mplog_proto_dataframe(), when handling the
JSON-envelope branch inside the mapInPandas row loop (the check using
features_data), do not skip bytes-like values produced by Arrow after casting
features_column to BinaryType; instead detect bytes/bytearray/memoryview and
decode them to UTF-8 to obtain a str before calling json.loads (same approach as
in decode_mplog_dataframe()). Update the branch that currently does `if not
isinstance(features_data, str): continue` to accept and convert bytes-like
inputs so JSON-envelope rows are parsed correctly.

---

Outside diff comments:
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 422-434: The metadata lists and final column ordering currently
hardcode "mp_config_id" instead of using the mp_config_id_column parameter;
update each place that builds row_metadata_columns (and equivalent lists at the
other noted regions) to insert the mp_config_id_column variable rather than the
literal string, and thread mp_config_id_column through calls and returns in
decode_mplog_dataframe(), decode_mplog_proto_dataframe(), and
decode_mplog_proto_csv() so the configured column name is preserved in the
output ordering; ensure defaulting behavior remains the same (use the parameter
value when provided) and adjust any downstream references that expect
"mp_config_id" to use the variable name.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ad2b705c-a1dc-4d59-b667-957e4305ff5b

📥 Commits

Reviewing files that changed from the base of the PR and between 37b045d and e265188.

📒 Files selected for processing (2)
  • py-sdk/inference_logging_client/inference_logging_client/__init__.py
  • py-sdk/inference_logging_client/pyproject.toml

Comment on lines +104 to +143
def _split_raw_proto_entities(raw: bytes) -> list:
"""Split v2 raw-proto wire format into per-entity byte chunks.

Wire format: [{<proto bytes>}, {<proto bytes>}, ...]
Separator between entities is b'},{' or b'}, {'.
"""
if len(raw) < 4:
return [raw] if raw else []

if raw[0:1] == b'[' and raw[1:2] == b'{':
inner = raw[2:]
if inner.endswith(b'}]'):
inner = inner[:-2]
elif inner.endswith(b'}'):
inner = inner[:-1]

chunks = []
start = 0
i = 0
while i < len(inner):
if inner[i:i + 1] == b'}':
rest = inner[i + 1:i + 4]
if rest.startswith(b', {') or rest.startswith(b',{'):
chunks.append(inner[start:i])
skip = 3 if rest.startswith(b', ') else 2
start = i + 1 + skip
i = start
continue
i += 1
if start < len(inner):
chunks.append(inner[start:])
return [c for c in chunks if c]

if raw[0:1] == b'{':
inner = raw[1:]
if inner.endswith(b'}'):
inner = inner[:-1]
return [inner] if inner else []

return [raw]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Use an unambiguous parser for raw-proto framing.

Both helpers treat opaque proto bytes as if sentinel bytes were reserved: _is_raw_proto_wire_format() rejects any payload whose first entity byte is 0x22, and _split_raw_proto_entities() splits on b'},{' / b'}, {' even though those byte sequences are legal inside a protobuf body. That will misclassify or corrupt valid rows as soon as the payload happens to contain those bytes.

Also applies to: 146-152

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 104 - 143, _split_raw_proto_entities and the related
_is_raw_proto_wire_format are incorrectly using literal byte-sequence sentinels
(like b'},{' / b'}, {') to split protobuf blobs, which corrupts valid proto
payloads; replace this fragile logic with a deterministic protobuf framing
parser: detect and parse length-delimited (varint-prefixed) protobuf messages by
reading a varint length then that many bytes in a loop, and only fall back to
single-message parsing if varint framing fails; update both
_split_raw_proto_entities and _is_raw_proto_wire_format to use this
varint-length framing (or use the protobuf library's ParseDelimitedFromString
helper) instead of searching for brace separators so opaque proto bytes are
never interpreted as delimiters.

Comment on lines +847 to +851
# Cast features to BinaryType for safe Arrow serialization
from pyspark.sql import functions as _F
from pyspark.sql.types import BinaryType as _BinaryType
if not isinstance(input_field_map.get(features_column), _BinaryType):
df = df.withColumn(features_column, _F.col(features_column).cast(_BinaryType()))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Locate the target file and inspect the referenced line ranges
FILE="py-sdk/inference_logging_client/inference_logging_client/__init__.py"

echo "=== File existence ==="
ls -la "$FILE" || true

echo "=== Snippet around lines 820-900 ==="
sed -n '820,910p' "$FILE"

echo "=== Snippet around lines 950-1030 ==="
sed -n '950,1030p' "$FILE"

# Find the decode functions and mapInPandas handling
echo "=== Find decode_mplog_proto_dataframe ==="
rg -n "def decode_mplog_proto_dataframe|decode_mplog_proto_dataframe\(" "$FILE" -S || true

echo "=== Find decode_mplog_dataframe ==="
rg -n "def decode_mplog_dataframe|decode_mplog_dataframe\(" "$FILE" -S || true

echo "=== Search for JSON-envelope branch / envelope decoding ==="
rg -n "json|envelope|JSON|decode.*envelope|mplog_proto|mapInPandas" "$FILE" -S || true

echo "=== Inspect broader region where mapInPandas is used ==="
rg -n "mapInPandas" "$FILE" -S || true

Repository: Meesho/BharatMLStack

Length of output: 12381


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="py-sdk/inference_logging_client/inference_logging_client/__init__.py"

echo "=== decode_mplog_dataframe JSON/raw handling (around 430-620) ==="
sed -n '400,660p' "$FILE"

echo "=== decode_mplog_proto_dataframe JSON/raw handling (around 720-1040) ==="
sed -n '720,1045p' "$FILE"

echo "=== Search for bytes/bytearray -> UTF-8 decoding ==="
rg -n "(bytearray|bytes).*utf-8|utf-8.*(bytearray|bytes)|decode\\(.*utf-8\\)|decode.*'utf-8'|decode.*\"utf-8\"" "$FILE" -S || true

Repository: Meesho/BharatMLStack

Length of output: 28374


Don't skip JSON-envelope rows after casting features to BinaryType.

In decode_mplog_proto_dataframe(), features_column is cast to BinaryType before mapInPandas(), but the JSON-envelope branch immediately does if not isinstance(features_data, str): continue, so bytes/bytearray (likely produced by the Arrow→pandas boundary) are discarded and the JSON-envelope format becomes unreachable. Decode bytes/bytearray/memoryview to UTF-8 before json.loads, same as decode_mplog_dataframe().

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 847 - 851, In decode_mplog_proto_dataframe(), when handling the
JSON-envelope branch inside the mapInPandas row loop (the check using
features_data), do not skip bytes-like values produced by Arrow after casting
features_column to BinaryType; instead detect bytes/bytearray/memoryview and
decode them to UTF-8 to obtain a str before calling json.loads (same approach as
in decode_mplog_dataframe()). Update the branch that currently does `if not
isinstance(features_data, str): continue` to accept and convert bytes-like
inputs so JSON-envelope rows are parsed correctly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants