Skip to content

grpc aio server interceptor error with stream_stream calls #3066

Open
@tonyay163

Description

Describe your environment

python = "^3.10"
grpcio = "1.64.3"
opentelemetry-distro = "0.48b0"
opentelemetry-instrumentation-grpc = "0.48b0"
protobuf = "5.28.1"

What happened?

Using aio server interceptor with a stream_stream call results in an error:

  File "[PYTHON]/lib/python3.10/site-packages/opentelemetry/instrumentation/grpc/_aio_server.py", line 144, in _stream_interceptor
    raise error
  File "[PYTHON]/lib/python3.10/site-packages/opentelemetry/instrumentation/grpc/_aio_server.py", line 135, in _stream_interceptor
    async for response in behavior(
TypeError: 'async for' requires an object with __aiter__ method, got coroutine

Steps to Reproduce

Use the grpc classes from the example: https://github.com/grpc/grpc/tree/e9c16ace65211b0f0d07ff2fdc7e657865fe8350/examples/python/async_streaming

import asyncio
import logging
from typing import AsyncIterable

import grpc
from opentelemetry.instrumentation.grpc import aio_server_interceptor

import phone_pb2
import phone_pb2_grpc


def create_state_response(
    call_state: phone_pb2.CallState.State,
) -> phone_pb2.StreamCallResponse:
    response = phone_pb2.StreamCallResponse()
    response.call_state.state = call_state
    return response


class Phone(phone_pb2_grpc.PhoneServicer):
    async def StreamCall(
        self,
        request_iterator: AsyncIterable[phone_pb2.StreamCallRequest],
        context: grpc.aio.ServicerContext,
    ):
        async for _ in request_iterator:
            await context.write(
                phone_pb2.StreamCallResponse()
            )


async def serve(address: str) -> None:
    server = grpc.aio.server(interceptors=[aio_server_interceptor()])
    phone_pb2_grpc.add_PhoneServicer_to_server(Phone(), server)
    server.add_insecure_port(address)
    await server.start()

    # Make a call to the server
    async def call_request_stream():
        yield phone_pb2.StreamCallRequest()
    async with grpc.aio.insecure_channel(address) as channel:
        stub = phone_pb2_grpc.PhoneStub(channel)
        async for _ in stub.StreamCall(call_request_stream()):
            pass

    await server.stop(grace=None)
    await server.wait_for_termination()


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(serve("[::]:50051"))

Expected Result

No error

Actual Result

Error

Additional context

No response

Would you like to implement a fix?

None

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions