Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection error in OpenAILLMService #1108

Open
KevGTL opened this issue Jan 29, 2025 · 4 comments
Open

Connection error in OpenAILLMService #1108

KevGTL opened this issue Jan 29, 2025 · 4 comments
Assignees

Comments

@KevGTL
Copy link
Contributor

KevGTL commented Jan 29, 2025

Description

When running pipecat dynamic flows with a Livekit transport for a voice call over webrtc, I get errors on LLM calls to OpenAI. It started happening today, but reverting to pipecat-ai 0.0.53 doesn't fix it.

Environment

pipecat-ai version: 0.0.54
python version: 3.11
OS: MacOS 15.0.1

Repro steps

It doesn't happen on every single llm call, but i usually get the error very quickly during a conversation (during the first 3 or 4 messages).

Logs

httpcore.RemoteProtocolError: peer closed connection without sending complete message body (incomplete chunked read)


The above exception was the direct cause of the following exception:


Traceback (most recent call last):

  File "/server.py", line 95, in <module>
    uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 8765)))
    │       │   │                             │  │       └ <function Mapping.get at 0x100e8e700>
    │       │   │                             │  └ environ({'NX_CLI_SET': 'true', '_VOLTA_TOOL_RECURSION': '1', 'NX_LOAD_DOT_ENV_FILES': 'true', 'TERM_PROGRAM': 'iTerm.app', 'L...
    │       │   │                             └ <module 'os' (frozen)>
    │       │   └ <fastapi.applications.FastAPI object at 0x16fcbdc50>
    │       └ <function run at 0x101f38d60>
    └ <module 'uvicorn' from '/venv/lib/python3.11/site-packages...

  File "/venv/lib/python3.11/site-packages/uvicorn/main.py", line 579, in run
    server.run()
    │      └ <function Server.run at 0x102604e00>
    └ <uvicorn.server.Server object at 0x17bb3a490>
  File "/venv/lib/python3.11/site-packages/uvicorn/server.py", line 66, in run
    return asyncio.run(self.serve(sockets=sockets))
           │       │   │    │             └ None
           │       │   │    └ <function Server.serve at 0x102604ea0>
           │       │   └ <uvicorn.server.Server object at 0x17bb3a490>
           │       └ <function run at 0x101bf1120>
           └ <module 'asyncio' from '/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyn...

  File "/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           │      │   └ <coroutine object Server.serve at 0x17c3dd300>
           │      └ <function Runner.run at 0x101caff60>
           └ <asyncio.runners.Runner object at 0x17b714850>

  File "/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           │    │     │                  └ <Task pending name='Task-1' coro=<Server.serve() running at /Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platfo...
           │    │     └ <cyfunction Loop.run_until_complete at 0x17c471560>
           │    └ <uvloop.Loop running=True closed=False debug=False>
           └ <asyncio.runners.Runner object at 0x17b714850>

> File "/venv/lib/python3.11/site-packages/pipecat/processors/frame_processor.py", line 308, in __input_frame_task_handler
    await self.process_frame(frame, direction)
          │    │             │      └ <FrameDirection.DOWNSTREAM: 1>
          │    │             └ OpenAILLMContextFrame(id=19, name='OpenAILLMContextFrame#0', pts=None, context=<pipecat.processors.aggregators.openai_llm_con...
          │    └ <function BaseOpenAILLMService.process_frame at 0x17a905760>
          └ <pipecat.services.openai.OpenAILLMService object at 0x17c4c2450>
  File "/venv/lib/python3.11/site-packages/pipecat/services/openai.py", line 312, in process_frame
    await self._process_context(context)
          │    │                └ <pipecat.processors.aggregators.openai_llm_context.OpenAILLMContext object at 0x16fcbd4d0>
          │    └ <function BaseOpenAILLMService._process_context at 0x17a905440>
          └ <pipecat.services.openai.OpenAILLMService object at 0x17c4c2450>
  File "/venv/lib/python3.11/site-packages/pipecat/services/openai.py", line 215, in _process_context
    async for chunk in chunk_stream:
              │        └ <openai.AsyncStream object at 0x317821ad0>
              └ ChatCompletionChunk(id='chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv', choices=[Choice(delta=ChoiceDelta(content=' vous', function_...
  File "/venv/lib/python3.11/site-packages/openai/_streaming.py", line 147, in __aiter__
    async for item in self._iterator:
              │       │    └ <async_generator object AsyncStream.__stream__ at 0x317bb16d0>
              │       └ <openai.AsyncStream object at 0x317821ad0>
              └ ChatCompletionChunk(id='chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv', choices=[Choice(delta=ChoiceDelta(content=' vous', function_...
  File "/venv/lib/python3.11/site-packages/openai/_streaming.py", line 160, in __stream__
    async for sse in iterator:
              │      └ <async_generator object AsyncStream._iter_events at 0x17c6499a0>
              └ ServerSentEvent(event=None, data={"id":"chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv","object":"chat.completion.chunk","created":17...
  File "/venv/lib/python3.11/site-packages/openai/_streaming.py", line 151, in _iter_events
    async for sse in self._decoder.aiter_bytes(self.response.aiter_bytes()):
              │      │    │        │           │    │        └ <function Response.aiter_bytes at 0x11079a700>
              │      │    │        │           │    └ <Response [200 OK]>
              │      │    │        │           └ <openai.AsyncStream object at 0x317821ad0>
              │      │    │        └ <function SSEDecoder.aiter_bytes at 0x17a05ea20>
              │      │    └ <openai._streaming.SSEDecoder object at 0x317864e50>
              │      └ <openai.AsyncStream object at 0x317821ad0>
              └ ServerSentEvent(event=None, data={"id":"chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv","object":"chat.completion.chunk","created":17...
  File "/venv/lib/python3.11/site-packages/openai/_streaming.py", line 302, in aiter_bytes
    async for chunk in self._aiter_chunks(iterator):
              │        │    │             └ <async_generator object Response.aiter_bytes at 0x317bb1b90>
              │        │    └ <function SSEDecoder._aiter_chunks at 0x17a05eac0>
              │        └ <openai._streaming.SSEDecoder object at 0x317864e50>
              └ b'data: {"id":"chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv","object":"chat.completion.chunk","created":1738160520,"model":"gpt-4o-...
  File "/venv/lib/python3.11/site-packages/openai/_streaming.py", line 313, in _aiter_chunks
    async for chunk in iterator:
              │        └ <async_generator object Response.aiter_bytes at 0x317bb1b90>
              └ b'data: {"id":"chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv","object":"chat.completion.chunk","created":1738160520,"model":"gpt-4o-...
  File "/venv/lib/python3.11/site-packages/httpx/_models.py", line 931, in aiter_bytes
    async for raw_bytes in self.aiter_raw():
              │            │    └ <function Response.aiter_raw at 0x11079a8e0>
              │            └ <Response [200 OK]>
              └ b'data: {"id":"chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv","object":"chat.completion.chunk","created":1738160520,"model":"gpt-4o-...
  File "/venv/lib/python3.11/site-packages/httpx/_models.py", line 989, in aiter_raw
    async for raw_stream_bytes in self.stream:
              │                   │    └ <httpx._client.BoundAsyncStream object at 0x317ec25d0>
              │                   └ <Response [200 OK]>
              └ b'data: {"id":"chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv","object":"chat.completion.chunk","created":1738160520,"model":"gpt-4o-...
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 150, in __aiter__
    async for chunk in self._stream:
              │        │    └ <httpx._transports.default.AsyncResponseStream object at 0x16fcbeb90>
              │        └ <httpx._client.BoundAsyncStream object at 0x317ec25d0>
              └ b'data: {"id":"chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv","object":"chat.completion.chunk","created":1738160520,"model":"gpt-4o-...
  File "/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 256, in __aiter__
    with map_httpcore_exceptions():
         └ <function map_httpcore_exceptions at 0x11079ede0>

  File "/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/contextlib.py", line 158, in __exit__
    self.gen.throw(typ, value, traceback)
    │    │   │     │    │      └ <traceback object at 0x317e73d40>
    │    │   │     │    └ RemoteProtocolError(RemoteProtocolError('peer closed connection without sending complete message body (incomplete chunked rea...
    │    │   │     └ <class 'httpcore.RemoteProtocolError'>
    │    │   └ <method 'throw' of 'generator' objects>
    │    └ <generator object map_httpcore_exceptions at 0x3178b6140>
    └ <contextlib._GeneratorContextManager object at 0x3177d8c50>

  File "/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 89, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
          │          └ 'peer closed connection without sending complete message body (incomplete chunked read)'
          └ <class 'httpx.RemoteProtocolError'>
@KevGTL
Copy link
Contributor Author

KevGTL commented Jan 29, 2025

@markbackman Can this be linked to 8c0ecb8?

@markbackman
Copy link
Contributor

@markbackman Can this be linked to 8c0ecb8?

I don't think so. We've tested this extensively since that change.

Is this issue with OpenAI or OpenAI Realtime?

Can you repro using one of the dynamic examples?

@KevGTL
Copy link
Contributor Author

KevGTL commented Jan 30, 2025

@markbackman Yes I just reproduced with https://github.com/pipecat-ai/pipecat-flows/blob/main/examples/dynamic/insurance_openai.py. Same as before, sometimes it works for a couple messages, sometimes I get the error right away.

Btw I had to comment "handler": collect_age, and "transition_callback": handle_age_collection, in create_initial_node, otherwise I was getting TypeError: Object of type function is not JSON serializable when pipecat tried to serialize the node function.

Full code
 async def run_bot_livekit(self, room):
        (url, token) = await configure_livekit(room["name"])

        self.piplet_call_id = room["sid"]
        logger.info(f"Running new bot for call: piplet_call_id - {room['sid']}")

        transport = LiveKitTransport(
            url=url,
            token=token,
            room_name=room["name"],
            params=LiveKitParams(
                audio_in_channels=1,
                audio_in_enabled=True,
                audio_out_enabled=True,
                vad_analyzer=SileroVADAnalyzer(),
                vad_enabled=True,
                vad_audio_passthrough=True,
            ),
        )

        llm = OpenAILLMService(api_key=self.openaiApiKey, model="gpt-4o")

        stt = DeepgramSTTService(
            api_key=self.deepgramApiKey,
            audio_passthrough=True,
            live_options=LiveOptions(
                language=Language.FR,
            ),
        )

        tts = CartesiaTTSService(
            api_key=Config.CARTESIA_API_KEY,
            voice_id="34a34fd0-157b-4470-aaea-db59a8efcfb9",
            model="sonic",
            params=CartesiaTTSService.InputParams(language=Language.FR),
        )

        context = OpenAILLMContext()
        context_aggregator = llm.create_context_aggregator(context)

        pipeline = Pipeline(
            [
                transport.input(),  # Websocket input from client
                stt,  # Speech-To-Text
                context_aggregator.user(),
                llm,  # LLM
                tts,  # Text-To-Speech
                transport.output(),  # Websocket output to client
                context_aggregator.assistant(),
            ]
        )

        class InsuranceQuote(TypedDict):
            monthly_premium: float
            coverage_amount: int
            deductible: int

        class AgeCollectionResult(FlowResult):
            age: int

        class MaritalStatusResult(FlowResult):
            marital_status: str

        class QuoteCalculationResult(FlowResult, InsuranceQuote):
            pass

        class CoverageUpdateResult(FlowResult, InsuranceQuote):
            pass

        # Simulated insurance data
        INSURANCE_RATES = {
            "young_single": {"base_rate": 150, "risk_multiplier": 1.5},
            "young_married": {"base_rate": 130, "risk_multiplier": 1.3},
            "adult_single": {"base_rate": 100, "risk_multiplier": 1.0},
            "adult_married": {"base_rate": 90, "risk_multiplier": 0.9},
        }

        # Function handlers
        async def collect_age(args: FlowArgs) -> AgeCollectionResult:
            """Process age collection."""
            age = args["age"]
            logger.debug(f"collect_age handler executing with age: {age}")
            return AgeCollectionResult(age=age)

        async def collect_marital_status(args: FlowArgs) -> MaritalStatusResult:
            """Process marital status collection."""
            status = args["marital_status"]
            logger.debug(
                f"collect_marital_status handler executing with status: {status}"
            )
            return MaritalStatusResult(marital_status=status)

        async def calculate_quote(args: FlowArgs) -> QuoteCalculationResult:
            """Calculate insurance quote based on age and marital status."""
            age = args["age"]
            marital_status = args["marital_status"]
            logger.debug(
                f"calculate_quote handler executing with age: {age}, status: {marital_status}"
            )

            # Determine rate category
            age_category = "young" if age < 25 else "adult"
            rate_key = f"{age_category}_{marital_status}"
            rates = INSURANCE_RATES.get(rate_key, INSURANCE_RATES["adult_single"])

            # Calculate quote
            monthly_premium = rates["base_rate"] * rates["risk_multiplier"]

            return {
                "monthly_premium": monthly_premium,
                "coverage_amount": 250000,
                "deductible": 1000,
            }

        async def update_coverage(args: FlowArgs) -> CoverageUpdateResult:
            """Update coverage options and recalculate premium."""
            coverage_amount = args["coverage_amount"]
            deductible = args["deductible"]
            logger.debug(
                f"update_coverage handler executing with amount: {coverage_amount}, deductible: {deductible}"
            )

            # Calculate adjusted quote
            monthly_premium = (coverage_amount / 250000) * 100
            if deductible > 1000:
                monthly_premium *= 0.9  # 10% discount for higher deductible

            return {
                "monthly_premium": monthly_premium,
                "coverage_amount": coverage_amount,
                "deductible": deductible,
            }

        async def end_quote() -> FlowResult:
            """Handle quote completion."""
            logger.debug("end_quote handler executing")
            return {"status": "completed"}

        # Transition callbacks and handlers
        async def handle_age_collection(args: Dict, flow_manager: FlowManager):
            flow_manager.state["age"] = args["age"]
            await flow_manager.set_node("marital_status", create_marital_status_node())

        async def handle_marital_status_collection(
            args: Dict, flow_manager: FlowManager
        ):
            flow_manager.state["marital_status"] = args["marital_status"]
            await flow_manager.set_node(
                "quote_calculation",
                create_quote_calculation_node(
                    flow_manager.state["age"], flow_manager.state["marital_status"]
                ),
            )

        async def handle_quote_calculation(args: Dict, flow_manager: FlowManager):
            quote = await calculate_quote(args)
            flow_manager.state["quote"] = quote
            await flow_manager.set_node(
                "quote_results", create_quote_results_node(quote)
            )

        async def handle_end_quote(_: Dict, flow_manager: FlowManager):
            await flow_manager.set_node("end", create_end_node())

        # Node configurations
        def create_initial_node() -> NodeConfig:
            """Create the initial node asking for age."""
            return {
                "role_messages": [
                    {
                        "role": "system",
                        "content": """You are a friendly insurance agent. Your responses will be
                            converted to audio, so avoid special characters. Always use
                            the available functions to progress the conversation naturally.""",
                    }
                ],
                "task_messages": [
                    {
                        "role": "system",
                        "content": "Start by asking for the customer's age.",
                    }
                ],
                "functions": [
                    {
                        "type": "function",
                        "function": {
                            "name": "collect_age",
                            # "handler": collect_age,
                            "description": "Record customer's age",
                            "parameters": {
                                "type": "object",
                                "properties": {"age": {"type": "integer"}},
                                "required": ["age"],
                            },
                            # "transition_callback": handle_age_collection,
                        },
                    }
                ],
            }

        def create_marital_status_node() -> NodeConfig:
            """Create node for collecting marital status."""
            return {
                "task_messages": [
                    {
                        "role": "system",
                        "content": "Ask about the customer's marital status for premium calculation.",
                    }
                ],
                "functions": [
                    {
                        "type": "function",
                        "function": {
                            "name": "collect_marital_status",
                            "handler": collect_marital_status,
                            "description": "Record marital status",
                            "parameters": {
                                "type": "object",
                                "properties": {
                                    "marital_status": {
                                        "type": "string",
                                        "enum": ["single", "married"],
                                    }
                                },
                                "required": ["marital_status"],
                            },
                            "transition_callback": handle_marital_status_collection,
                        },
                    }
                ],
            }

        def create_quote_calculation_node(age: int, marital_status: str) -> NodeConfig:
            """Create node for calculating initial quote."""
            return {
                "task_messages": [
                    {
                        "role": "system",
                        "content": (
                            f"Calculate a quote for {age} year old {marital_status} customer. "
                            "First, call calculate_quote with their information. "
                            "Then explain the quote details and ask if they'd like to adjust coverage."
                        ),
                    }
                ],
                "functions": [
                    {
                        "type": "function",
                        "function": {
                            "name": "calculate_quote",
                            "handler": calculate_quote,
                            "description": "Calculate initial insurance quote",
                            "parameters": {
                                "type": "object",
                                "properties": {
                                    "age": {"type": "integer"},
                                    "marital_status": {
                                        "type": "string",
                                        "enum": ["single", "married"],
                                    },
                                },
                                "required": ["age", "marital_status"],
                            },
                            "transition_callback": handle_quote_calculation,
                        },
                    }
                ],
            }

        def create_quote_results_node(
            quote: Union[QuoteCalculationResult, CoverageUpdateResult],
        ) -> NodeConfig:
            """Create node for showing quote and adjustment options."""
            return {
                "task_messages": [
                    {
                        "role": "system",
                        "content": (
                            f"Quote details:\n"
                            f"Monthly Premium: ${quote['monthly_premium']:.2f}\n"
                            f"Coverage Amount: ${quote['coverage_amount']:,}\n"
                            f"Deductible: ${quote['deductible']:,}\n\n"
                            "Explain these quote details to the customer. When they request changes, "
                            "use update_coverage to recalculate their quote. Explain how their "
                            "changes affected the premium and compare it to their previous quote. "
                            "Ask if they'd like to make any other adjustments or if they're ready "
                            "to end the quote process."
                        ),
                    }
                ],
                "functions": [
                    {
                        "type": "function",
                        "function": {
                            "name": "update_coverage",
                            "handler": update_coverage,
                            "description": "Recalculate quote with new coverage options",
                            "parameters": {
                                "type": "object",
                                "properties": {
                                    "coverage_amount": {"type": "integer"},
                                    "deductible": {"type": "integer"},
                                },
                                "required": ["coverage_amount", "deductible"],
                            },
                        },
                    },
                    {
                        "type": "function",
                        "function": {
                            "name": "end_quote",
                            "handler": end_quote,
                            "description": "Complete the quote process",
                            "parameters": {"type": "object", "properties": {}},
                            "transition_callback": handle_end_quote,
                        },
                    },
                ],
            }

        def create_end_node() -> NodeConfig:
            """Create the final node."""
            return {
                "task_messages": [
                    {
                        "role": "system",
                        "content": (
                            "Thank the customer for their time and end the conversation. "
                            "Mention that a representative will contact them about the quote."
                        ),
                    }
                ],
                "functions": [],
                "post_actions": [{"type": "end_conversation"}],
            }

        task = PipelineTask(
            pipeline,
            params=PipelineParams(allow_interruptions=True),
        )

        flow_manager = FlowManager(
            task=task, llm=llm, context_aggregator=context_aggregator, tts=tts
        )

        @transport.event_handler("on_first_participant_joined")
        async def on_first_participant_joined(transport, participant):
            # await transport.capture_participant_transcription(participant["id"])
            logger.debug("Initializing flow")
            await flow_manager.initialize()
            logger.debug("Setting initial node")
            await flow_manager.set_node("initial", create_initial_node())

        runner = PipelineRunner(handle_sigint=False)
        await runner.run(task)
Error trace

File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/server.py", line 95, in <module>
    uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 8765)))
    │       │   │                             │  │       └ <function Mapping.get at 0x102ff6700>
    │       │   │                             │  └ environ({'NX_CLI_SET': 'true', '_VOLTA_TOOL_RECURSION': '1', 'NX_LOAD_DOT_ENV_FILES': 'true', 'TERM_PROGRAM': 'iTerm.app', 'L...
    │       │   │                             └ <module 'os' (frozen)>
    │       │   └ <fastapi.applications.FastAPI object at 0x107e921d0>
    │       └ <function run at 0x105998d60>
    └ <module 'uvicorn' from '/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages...

  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/uvicorn/main.py", line 579, in run
    server.run()
    │      └ <function Server.run at 0x105a04e00>
    └ <uvicorn.server.Server object at 0x1624d5010>
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/uvicorn/server.py", line 66, in run
    return asyncio.run(self.serve(sockets=sockets))
           │       │   │    │             └ None
           │       │   │    └ <function Server.serve at 0x105a04ea0>
           │       │   └ <uvicorn.server.Server object at 0x1624d5010>
           │       └ <function run at 0x103d11120>
           └ <module 'asyncio' from '/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyn...

  File "/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           │      │   └ <coroutine object Server.serve at 0x164024f40>
           │      └ <function Runner.run at 0x103dcff60>
           └ <asyncio.runners.Runner object at 0x164051fd0>

  File "/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           │    │     │                  └ <Task pending name='Task-1' coro=<Server.serve() running at /Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platfo...
           │    │     └ <cyfunction Loop.run_until_complete at 0x16408d560>
           │    └ <uvloop.Loop running=True closed=False debug=False>
           └ <asyncio.runners.Runner object at 0x164051fd0>

> File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/pipecat/utils/asyncio.py", line 32, in run_coroutine
    await coroutine
          └ <coroutine object FrameProcessor.__input_frame_task_handler at 0x1648086d0>
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/pipecat/processors/frame_processor.py", line 322, in __input_frame_task_handler
    await self.process_frame(frame, direction)
          │    │             │      └ <FrameDirection.DOWNSTREAM: 1>
          │    │             └ OpenAILLMContextFrame(id=6245, name='OpenAILLMContextFrame#8', pts=None, context=<pipecat.processors.aggregators.openai_llm_c...
          │    └ <function BaseOpenAILLMService.process_frame at 0x1624bfb00>
          └ <pipecat.services.openai.OpenAILLMService object at 0x1640f76d0>
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/pipecat/services/openai.py", line 312, in process_frame
    await self._process_context(context)
          │    │                └ <pipecat.processors.aggregators.openai_llm_context.OpenAILLMContext object at 0x1641d7b90>
          │    └ <function BaseOpenAILLMService._process_context at 0x1624bf9c0>
          └ <pipecat.services.openai.OpenAILLMService object at 0x1640f76d0>
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/pipecat/services/openai.py", line 215, in _process_context
    async for chunk in chunk_stream:
              │        └ <openai.AsyncStream object at 0x1624d52d0>
              └ ChatCompletionChunk(id='chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA', choices=[Choice(delta=ChoiceDelta(content=' your', function_...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/openai/_streaming.py", line 147, in __aiter__
    async for item in self._iterator:
              │       │    └ <async_generator object AsyncStream.__stream__ at 0x16400b350>
              │       └ <openai.AsyncStream object at 0x1624d52d0>
              └ ChatCompletionChunk(id='chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA', choices=[Choice(delta=ChoiceDelta(content=' your', function_...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/openai/_streaming.py", line 160, in __stream__
    async for sse in iterator:
              │      └ <async_generator object AsyncStream._iter_events at 0x164811d20>
              └ ServerSentEvent(event=None, data={"id":"chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA","object":"chat.completion.chunk","created":17...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/openai/_streaming.py", line 151, in _iter_events
    async for sse in self._decoder.aiter_bytes(self.response.aiter_bytes()):
              │      │    │        │           │    │        └ <function Response.aiter_bytes at 0x11058c720>
              │      │    │        │           │    └ <Response [200 OK]>
              │      │    │        │           └ <openai.AsyncStream object at 0x1624d52d0>
              │      │    │        └ <function SSEDecoder.aiter_bytes at 0x137e75b20>
              │      │    └ <openai._streaming.SSEDecoder object at 0x164109550>
              │      └ <openai.AsyncStream object at 0x1624d52d0>
              └ ServerSentEvent(event=None, data={"id":"chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA","object":"chat.completion.chunk","created":17...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/openai/_streaming.py", line 302, in aiter_bytes
    async for chunk in self._aiter_chunks(iterator):
              │        │    │             └ <async_generator object Response.aiter_bytes at 0x16400a640>
              │        │    └ <function SSEDecoder._aiter_chunks at 0x137e75bc0>
              │        └ <openai._streaming.SSEDecoder object at 0x164109550>
              └ b'data: {"id":"chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA","object":"chat.completion.chunk","created":1738233468,"model":"gpt-4o-...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/openai/_streaming.py", line 313, in _aiter_chunks
    async for chunk in iterator:
              │        └ <async_generator object Response.aiter_bytes at 0x16400a640>
              └ b'data: {"id":"chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA","object":"chat.completion.chunk","created":1738233468,"model":"gpt-4o-...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/httpx/_models.py", line 931, in aiter_bytes
    async for raw_bytes in self.aiter_raw():
              │            │    └ <function Response.aiter_raw at 0x11058c900>
              │            └ <Response [200 OK]>
              └ b'data: {"id":"chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA","object":"chat.completion.chunk","created":1738233468,"model":"gpt-4o-...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/httpx/_models.py", line 989, in aiter_raw
    async for raw_stream_bytes in self.stream:
              │                   │    └ <httpx._client.BoundAsyncStream object at 0x1648ad090>
              │                   └ <Response [200 OK]>
              └ b'data: {"id":"chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA","object":"chat.completion.chunk","created":1738233468,"model":"gpt-4o-...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/httpx/_client.py", line 150, in __aiter__
    async for chunk in self._stream:
              │        │    └ <httpx._transports.default.AsyncResponseStream object at 0x1641c2e50>
              │        └ <httpx._client.BoundAsyncStream object at 0x1648ad090>
              └ b'data: {"id":"chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA","object":"chat.completion.chunk","created":1738233468,"model":"gpt-4o-...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 256, in __aiter__
    with map_httpcore_exceptions():
         └ <function map_httpcore_exceptions at 0x1105a0e00>

  File "/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/contextlib.py", line 158, in __exit__
    self.gen.throw(typ, value, traceback)
    │    │   │     │    │      └ <traceback object at 0x164818200>
    │    │   │     │    └ RemoteProtocolError(RemoteProtocolError('peer closed connection without sending complete message body (incomplete chunked rea...
    │    │   │     └ <class 'httpcore.RemoteProtocolError'>
    │    │   └ <method 'throw' of 'generator' objects>
    │    └ <generator object map_httpcore_exceptions at 0x16482c440>
    └ <contextlib._GeneratorContextManager object at 0x1648ae2d0>

  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 89, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
          │          └ 'peer closed connection without sending complete message body (incomplete chunked read)'
          └ <class 'httpx.RemoteProtocolError'>

httpx.RemoteProtocolError: peer closed connection without sending complete message body (incomplete chunked read)

@markbackman
Copy link
Contributor

What version of Flows are you using? On main, the examples have changed to match the code on main. There was a breaking change. This could explain the issue. I have no issue running the code you shared (or the examples in the example directory on main).

@markbackman markbackman self-assigned this Jan 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants