Skip to content

Can not open a ChannelIdStore is the channel_max value has a default value of 0 #9

Description

@Ptitlaby

Hello,

This issue is a mix of both channels_rabbitmq and carehare, but ultimately the related code is in carehare so I created the issue here. Let me know if this is incorrect.

I got blocked today using channels_rabbitmq and carehare today, more specifically when initialising a new AsyncWebsocketConsumer in channels_rabbitmq:

channels=4.0.0
channels-rabbitmq=4.0.1
carehare=1.0.2

I am using channels_rabbitmq.core.RabbitmqChannelLayer as back end for the layer, and my connect() method in my async websocket consumer is as follow :

async def connect(self):
  await self.channel_layer.group_add("my_group", self.channel_name)

This method, called when a websocket connect to my AsyncWebsocketConsumer never returns something from await. Later on, this library is creating a connection using a carehare object:

async def _setup_connection(
    *,
    connection: carehare.Connection,
    queue_name: str,
    remote_capacity: Optional[int],
    expiry: Optional[int],
    groups_exchange: str,
) -> None:
    # Declare "groups" exchange. It may persist; spurious declarations
    # (such as on reconnect) are harmless.
    await connection.exchange_declare(groups_exchange, exchange_type="direct")

The created connection will end up creating a transport and protocol object, and the protocol object is creating a ChannelIdStore at line 321

    @_accept_handshake_frame.register  # type: ignore [no-redef]
    def _(self, frame: pamqp.commands.Connection.Tune) -> None:
        assert pamqp.commands.Connection.Start in self._lifecycle
        assert pamqp.commands.Connection.Tune not in self._lifecycle
        self._lifecycle[pamqp.commands.Connection.Tune] = frame
        self._frame_writer.send_frames(
            [
                pamqp.commands.Connection.TuneOk(
                    channel_max=frame.channel_max,
                    frame_max=frame.frame_max,
                    heartbeat=frame.heartbeat,
                ),
                pamqp.commands.Connection.Open(virtual_host=self._virtual_host),
            ]
        )
        self._heartbeat_sender = _heartbeat.HeartbeatSender(
            self._transport, frame.heartbeat
        )
        self._channel_id_store: ChannelIdStore = ChannelIdStore(frame.channel_max)

The init method of the ChannelIdStore class is as follow

    def __init__(self, channel_max: int):
        self._new: Iterator[int] = iter(range(1, channel_max))
        self._recycled: List[int] = []

However, when channel_max = 0 (which is a default value when I am using channels_rabbitmq), the range(1, 0) returns an empty list, which means that we can never acquire a channel id below:

    def acquire(self) -> int:
        if self._recycled:
            return self._recycled.pop()
        else:
            return next(self._new)

and the whole connection chain get blocked

I am not sure if this is an intended behavior (if yes, where can I override the channel_max value ? It seems to be the one from my frame directly), but if not, I would propose the following change:` for the ChannelIdStore:

    def __init__(self, channel_max: int):
        if not channel_max or channel_max == 0:
          channel_max = 65535
        self._new: Iterator[int] = iter(range(1, channel_max))
        self._recycled: List[int] = []

I am willing to provide more information if required about my setup, I tried to be as relevant as possible,

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions