Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/eventhub/azure-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release History

## 5.15.2 (Unreleased)

### Bugs Fixed

- Fixed a bug in the pyAMQP transport where decoding an incoming performative whose trailing null fields were omitted by the sender (permitted by AMQP 1.0 section 1.4) raised `IndexError`/`TypeError`. The decoded field list is now padded to the performative's full field count so omitted trailing fields read back as `None`.

## 5.15.1 (2025-11-11)

### Bugs Fixed
Expand Down
36 changes: 36 additions & 0 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@


from .message import Message, Header, Properties
from . import performatives

if TYPE_CHECKING:
from .message import MessageDict
Expand Down Expand Up @@ -327,6 +328,36 @@ def decode_payload(buffer: memoryview) -> Message:
return Message(**message_properties)


# Number of fields encoded on the wire for each performative, keyed by its
# frame-type code. The AMQP 1.0 spec (section 1.4) lets a sender omit trailing
# null fields, so an incoming performative list can be shorter than the full
# field count. Padding the decoded list up to this count keeps positional
# (frame[N]) access and namedtuple unpacking safe; the missing trailing fields
# read back as None, which is the spec-defined meaning of an omitted field.
# The transfer performative (code 20) carries a trailing payload that is not a
# wire field, so its _definition uses a None sentinel for that slot, which is
# excluded from the count here.
_PERFORMATIVE_FIELD_COUNT: Dict[int, int] = {
performative._code: sum(1 for field in performative._definition if field is not None)
for performative in (
performatives.OpenFrame,
performatives.BeginFrame,
performatives.AttachFrame,
performatives.FlowFrame,
performatives.TransferFrame,
performatives.DispositionFrame,
performatives.DetachFrame,
performatives.EndFrame,
performatives.CloseFrame,
performatives.SASLMechanism,
performatives.SASLInit,
performatives.SASLChallenge,
performatives.SASLResponse,
performatives.SASLOutcome,
)
}


def decode_frame(data: memoryview) -> Tuple[int, List[Any]]:
# Ignore the first two bytes, they will always be the constructors for
# described type then ulong.
Expand All @@ -343,6 +374,11 @@ def decode_frame(data: memoryview) -> Tuple[int, List[Any]]:
fields: List[Optional[memoryview]] = [None] * count
for i in range(count):
buffer, fields[i] = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:])
# A sender may omit trailing null fields, so pad the decoded list up to the
# performative's full field count before any positional access or unpacking.
full_field_count = _PERFORMATIVE_FIELD_COUNT.get(frame_type)
if full_field_count is not None and count < full_field_count:
fields.extend([None] * (full_field_count - count))
if frame_type == 20:
fields.append(buffer)
return frame_type, fields
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub/azure/eventhub/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
# Licensed under the MIT License.
# ------------------------------------

VERSION = "5.15.1"
VERSION = "5.15.2"
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
from azure.eventhub._pyamqp._decode import _decode_decimal128
from azure.eventhub._pyamqp._decode import _decode_decimal128, decode_frame, _PERFORMATIVE_FIELD_COUNT
from azure.eventhub._pyamqp import performatives
from decimal import Decimal


Expand All @@ -18,3 +19,62 @@ def test_decimal_decode(value, expected):
assert output[1] == expected


def _list8_frame(code, count, encoded_fields, payload=b""):
# Build a described performative frame using a list8 (0xc0) body:
# described-type ctor (0x00), ulong ctor (0x53), descriptor code, list8 (0xc0),
# size, count, then the encoded field bytes and any trailing payload.
header = bytes([0x00, 0x53, code, 0xC0, len(encoded_fields) + 1, count])
return memoryview(header + encoded_fields + payload)


# A sender may omit trailing null fields (AMQP 1.0 section 1.4), so an incoming
# performative list can be shorter than the full field count. The decoder must
# pad it back to the full count so positional access and namedtuple unpacking
# stay safe and omitted fields read back as None.
def test_short_open_is_padded_to_full_field_count():
# Open with only container_id set ("x"), 1 field on the wire out of 10.
frame = _list8_frame(performatives.OpenFrame._code, 1, bytes([0xA1, 0x01, 0x78]))
frame_type, fields = decode_frame(frame)
assert frame_type == performatives.OpenFrame._code
assert len(fields) == 10
# Unpacking and fixed-index access must not raise on the omitted fields.
open_frame = performatives.OpenFrame(*fields)
assert open_frame.container_id == b"x"
assert open_frame.properties is None
assert fields[9] is None


def test_short_transfer_pads_fields_and_preserves_payload():
# Transfer with only handle (0) set, plus a message payload. The payload is
# appended after the fields and must survive the padding.
frame = _list8_frame(performatives.TransferFrame._code, 1, bytes([0x52, 0x00]), payload=b"\xde\xad")
frame_type, fields = decode_frame(frame)
assert frame_type == performatives.TransferFrame._code
# 11 wire fields padded out, then the trailing payload appended (12 total).
assert len(fields) == 12
transfer = performatives.TransferFrame(*fields)
assert transfer.handle == 0
assert transfer.batchable is None
assert bytes(transfer.payload) == b"\xde\xad"


@pytest.mark.parametrize(
"frame_cls,expected_count",
[
(performatives.OpenFrame, 10),
(performatives.BeginFrame, 8),
(performatives.AttachFrame, 14),
(performatives.FlowFrame, 11),
(performatives.TransferFrame, 11),
(performatives.DispositionFrame, 6),
(performatives.DetachFrame, 3),
(performatives.EndFrame, 1),
(performatives.CloseFrame, 1),
],
)
def test_performative_field_count_matches_spec(frame_cls, expected_count):
# The padding target is the number of wire fields defined for each
# performative (the trailing transfer payload slot is excluded).
assert _PERFORMATIVE_FIELD_COUNT[frame_cls._code] == expected_count


1 change: 1 addition & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Bugs Fixed

- Read `com.microsoft:max-message-batch-size` vendor property from the AMQP sender link to correctly limit batch size on Premium large-message entities, where `max-message-size` can be up to 100 MB but the batch limit is 1 MB.
- Fixed a bug in the pyAMQP transport where decoding an incoming performative whose trailing null fields were omitted by the sender (permitted by AMQP 1.0 section 1.4) raised `IndexError`/`TypeError`. The decoded field list is now padded to the performative's full field count so omitted trailing fields read back as `None`.

## 7.14.3 (2025-11-11)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@


from .message import Message, Header, Properties
from . import performatives

if TYPE_CHECKING:
from .message import MessageDict
Expand Down Expand Up @@ -327,6 +328,36 @@ def decode_payload(buffer: memoryview) -> Message:
return Message(**message_properties)


# Number of fields encoded on the wire for each performative, keyed by its
# frame-type code. The AMQP 1.0 spec (section 1.4) lets a sender omit trailing
# null fields, so an incoming performative list can be shorter than the full
# field count. Padding the decoded list up to this count keeps positional
# (frame[N]) access and namedtuple unpacking safe; the missing trailing fields
# read back as None, which is the spec-defined meaning of an omitted field.
# The transfer performative (code 20) carries a trailing payload that is not a
# wire field, so its _definition uses a None sentinel for that slot, which is
# excluded from the count here.
_PERFORMATIVE_FIELD_COUNT: Dict[int, int] = {
performative._code: sum(1 for field in performative._definition if field is not None)
for performative in (
performatives.OpenFrame,
performatives.BeginFrame,
performatives.AttachFrame,
performatives.FlowFrame,
performatives.TransferFrame,
performatives.DispositionFrame,
performatives.DetachFrame,
performatives.EndFrame,
performatives.CloseFrame,
performatives.SASLMechanism,
performatives.SASLInit,
performatives.SASLChallenge,
performatives.SASLResponse,
performatives.SASLOutcome,
)
}


def decode_frame(data: memoryview) -> Tuple[int, List[Any]]:
# Ignore the first two bytes, they will always be the constructors for
# described type then ulong.
Expand All @@ -343,6 +374,11 @@ def decode_frame(data: memoryview) -> Tuple[int, List[Any]]:
fields: List[Optional[memoryview]] = [None] * count
for i in range(count):
buffer, fields[i] = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:])
# A sender may omit trailing null fields, so pad the decoded list up to the
# performative's full field count before any positional access or unpacking.
full_field_count = _PERFORMATIVE_FIELD_COUNT.get(frame_type)
if full_field_count is not None and count < full_field_count:
fields.extend([None] * (full_field_count - count))
if frame_type == 20:
fields.append(buffer)
return frame_type, fields
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import pytest
from azure.servicebus._pyamqp._decode import decode_frame, _PERFORMATIVE_FIELD_COUNT
from azure.servicebus._pyamqp import performatives


def _list8_frame(code, count, encoded_fields, payload=b""):
# Build a described performative frame using a list8 (0xc0) body:
# described-type ctor (0x00), ulong ctor (0x53), descriptor code, list8 (0xc0),
# size, count, then the encoded field bytes and any trailing payload.
header = bytes([0x00, 0x53, code, 0xC0, len(encoded_fields) + 1, count])
return memoryview(header + encoded_fields + payload)


# A sender may omit trailing null fields (AMQP 1.0 section 1.4), so an incoming
# performative list can be shorter than the full field count. The decoder must
# pad it back to the full count so positional access and namedtuple unpacking
# stay safe and omitted fields read back as None.
def test_short_open_is_padded_to_full_field_count():
# Open with only container_id set ("x"), 1 field on the wire out of 10.
frame = _list8_frame(performatives.OpenFrame._code, 1, bytes([0xA1, 0x01, 0x78]))
frame_type, fields = decode_frame(frame)
assert frame_type == performatives.OpenFrame._code
assert len(fields) == 10
# Unpacking and fixed-index access must not raise on the omitted fields.
open_frame = performatives.OpenFrame(*fields)
assert open_frame.container_id == b"x"
assert open_frame.properties is None
assert fields[9] is None


def test_short_transfer_pads_fields_and_preserves_payload():
# Transfer with only handle (0) set, plus a message payload. The payload is
# appended after the fields and must survive the padding.
frame = _list8_frame(performatives.TransferFrame._code, 1, bytes([0x52, 0x00]), payload=b"\xde\xad")
frame_type, fields = decode_frame(frame)
assert frame_type == performatives.TransferFrame._code
# 11 wire fields padded out, then the trailing payload appended (12 total).
assert len(fields) == 12
transfer = performatives.TransferFrame(*fields)
assert transfer.handle == 0
assert transfer.batchable is None
assert bytes(transfer.payload) == b"\xde\xad"


@pytest.mark.parametrize(
"frame_cls,expected_count",
[
(performatives.OpenFrame, 10),
(performatives.BeginFrame, 8),
(performatives.AttachFrame, 14),
(performatives.FlowFrame, 11),
(performatives.TransferFrame, 11),
(performatives.DispositionFrame, 6),
(performatives.DetachFrame, 3),
(performatives.EndFrame, 1),
(performatives.CloseFrame, 1),
],
)
def test_performative_field_count_matches_spec(frame_cls, expected_count):
# The padding target is the number of wire fields defined for each
# performative (the trailing transfer payload slot is excluded).
assert _PERFORMATIVE_FIELD_COUNT[frame_cls._code] == expected_count