Skip to content

fix(langchain): make RunnableRails.transform/atransform proper (async) iterators#2090

Open
nac7 wants to merge 1 commit into
NVIDIA-NeMo:developfrom
nac7:fix/runnable-rails-atransform-async-iterator
Open

fix(langchain): make RunnableRails.transform/atransform proper (async) iterators#2090
nac7 wants to merge 1 commit into
NVIDIA-NeMo:developfrom
nac7:fix/runnable-rails-atransform-async-iterator

Conversation

@nac7

@nac7 nac7 commented Jun 27, 2026

Copy link
Copy Markdown

Summary

Fixes #1692.

RunnableRails.transform and RunnableRails.atransform were aliased to invoke/ainvoke, which violates the LangChain Runnable streaming protocol:

  • transform must consume an iterator of inputs and return an iterator of outputs.
  • atransform must consume an async iterator of inputs and must itself be an async iterator.

Because atransform was an async def that returned a value, calling it produced a coroutine rather than an async iterator. As a result, placing RunnableRails inside a RunnableSequence and streaming it raised:

TypeError: 'async for' requires an object with __aiter__ method, got coroutine

Reproduction (from the issue)

import asyncio
from langchain_core.runnables import RunnableLambda
from nemoguardrails import RailsConfig
from nemoguardrails.integrations.langchain.runnable_rails import RunnableRails

async def reproduce_bug():
    config = RailsConfig.from_content(yaml_content="models: []")
    rails = RunnableRails(config, passthrough=True)
    pipeline = RunnableLambda(lambda x: x) | rails
    async for _ in pipeline.astream("hello"):  # TypeError before this fix
        pass

asyncio.run(reproduce_bug())

Fix

Defer to the base Runnable.transform/atransform implementations, which buffer the input stream and delegate to the already-correct stream/astream methods. Guardrails therefore run once over the fully assembled input, with no per-item re-invocation.

Note: this intentionally avoids the per-item invoke/ainvoke approach that led the earlier attempts (#1702, #1763) to be closed — stream/astream already handle a single assembled input correctly.

Tests

Added regression tests in tests/integrations/langchain/runnable_rails/test_streaming.py:

  • RunnableRails nested in a RunnableSequence is streamable via astream (the exact issue scenario) and stream.
  • atransform returns an async iterator (not a coroutine).
  • transform consumes an iterator and yields outputs.

All four fail on the previous code and pass with this fix. Full runnable_rails suite: 165 passed, 2 skipped.

…) iterators

RunnableRails.transform and atransform were aliased to invoke/ainvoke, which
violates the LangChain Runnable streaming protocol: transform must consume an
iterator of inputs and return an iterator, and atransform must be an async
iterator. Because atransform returned a coroutine, nesting RunnableRails in a
RunnableSequence and streaming it raised:

    TypeError: 'async for' requires an object with __aiter__ method, got coroutine

Defer to the base Runnable implementations, which buffer the input stream and
delegate to the already-correct stream/astream methods, so guardrails run once
over the fully assembled input (no per-item re-invocation).

Fixes NVIDIA-NeMo#1692
@github-actions github-actions Bot added status: needs triage New issues that have not yet been reviewed or categorized. size: M labels Jun 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size: M status: needs triage New issues that have not yet been reviewed or categorized.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

bug: RunnableRails.atransform is not returning an async interator

1 participant