Skip to content

Unable to process audio-video modality properly #355

@niranjanakella

Description

@niranjanakella

The code samples and informaiton regarding the audio-video modality is very limited @freddyaboulton. I am trying to build a AWS bedrock flow with this and I did try to wrap my head around the gemini audio-video chat that you had. But clearly am unable to understand the flow. Below is my code sample, just am not able to understand how to make it work at all. And I strongly feed that you should make this audio-video thing work with open LLMs and vLLM locally if possible with examples.

import os
import time, boto3, json
import base64
from io import BytesIO
import asyncio

import gradio as gr
import numpy as np
from dotenv import load_dotenv
# from elevenlabs import ElevenLabs
from fastapi import FastAPI
from fastrtc import (
    AdditionalOutputs,
    AsyncAudioVideoStreamHandler,
    ReplyOnPause,
    Stream,
    get_stt_model,
    get_twilio_turn_credentials,
    wait_for_item,
)
from gradio.utils import get_space
from groq import Groq
from numpy.typing import NDArray
from PIL import Image

load_dotenv()
# groq_client = Groq()
# tts_client = ElevenLabs(api_key=os.getenv("ELEVENLABS_API_KEY"))
stt_model = get_stt_model()

polly = boto3.client('polly')
brk = boto3.client('bedrock-runtime')

def encode_image(data: np.ndarray) -> str:
    # Accepts np.ndarray (H, W, 3), returns base64 JPEG string
    with BytesIO() as output_bytes:
        pil_image = Image.fromarray(data.astype(np.uint8))
        pil_image.save(output_bytes, "JPEG")
        bytes_data = output_bytes.getvalue()
    return base64.b64encode(bytes_data).decode("utf-8")

class LLMVoiceVideoHandler(AsyncAudioVideoStreamHandler):
    def __init__(self):
        super().__init__(
            expected_layout="mono",
            output_sample_rate=16000,
            input_sample_rate=16000,
        )
        self.latest_video_frame = None
        self.chatbot = []
        self.audio_queue = asyncio.Queue()
        self.video_queue = asyncio.Queue()
        self.tts_queue = asyncio.Queue()
        self.quit = asyncio.Event()
        self.processing = False

    def copy(self):
        return LLMVoiceVideoHandler()

    async def start_up(self):
        # Start background task to process audio and video
        # TTS
        polly_response = polly.synthesize_speech(
            Text="Hello Welcome to the World",
            OutputFormat='pcm',
            VoiceId='Joanna'
        )
        stream = polly_response['AudioStream']

        chunk_size = 10000  # bytes; adjust as needed for your use case

        # for i in range(1000000):  # Large upper bound to avoid infinite loop
        #     chunk = stream.read(chunk_size)
        #     if not chunk:
        #         break

        audio_array = np.frombuffer(stream.read(), dtype=np.int16).reshape(1, -1)
        self.tts_queue.put_nowait(audio_array)

        # --- End handle_interaction logic ---
        self.processing = False
    
        while not self.quit.is_set():
            # Wait for audio (simulate pause detection by chunk size or time)
            audio_chunk = await self.audio_queue.get()
            if not self.processing:
                self.processing = True

                sr = audio_chunk[0]
                audio_np = audio_chunk[1]
                audio = (sr, audio_np)

                # Transcribe
                text = stt_model.stt(audio)
                if text == "":
                    self.processing = False
                    # await asyncio.sleep(1)
                    continue

                print("transcribed", text)
                self.chatbot.append({"role": "user", "content": text})
                await self.tts_queue.put(AdditionalOutputs(self.chatbot))
                # Prepare Bedrock payload
                messages = [{"role": d["role"], "content": d["content"]} for d in self.chatbot]
                payload = {
                    "messages": messages,
                    "anthropic_version": "bedrock-2023-05-31",
                    "max_tokens": 4096,
                    'temperature': 0.8,
                    'top_k': 250,
                    'top_p': 0.999,
                }
                # Attach image if available
                if self.latest_video_frame is not None:
                    payload["messages"][-1] = {"role": "user", 
                                                "content": [{
                                                    "type": "image",
                                                    "source": {
                                                        "type": "base64",
                                                        "media_type": "image/jpeg",
                                                        "data": encode_image(self.latest_video_frame)
                                                    }
                                                },{
                                                    "type": "text",
                                                    "text": payload["messages"][-1]["content"]
                                                    }
                                                ]

                                                    }

                response = brk.invoke_model(
                    modelId="anthropic.claude-3-haiku-20240307-v1:0",
                    body=json.dumps(payload),
                    contentType="application/json"
                )
                response_body = json.loads(response.get('body').read())
                response_text = response_body['content'][-1]['text']
                self.chatbot.append({"role": "assistant", "content": response_text})
                # TTS
                polly_response = polly.synthesize_speech(
                    Text=response_text,
                    OutputFormat='pcm',
                    VoiceId='Joanna'
                )
                stream = polly_response['AudioStream']
                chunk_size = 10000
                i = 0
                while True:
                    chunk = stream.read(chunk_size)
                    if not chunk:
                        break
                    if i == 0:
                        await self.tts_queue.put(AdditionalOutputs(self.chatbot))
                    audio_array = np.frombuffer(chunk, dtype=np.int16).reshape(1, -1)
                    await self.tts_queue.put((16000, audio_array))
                    i += 1
                # --- End handle_interaction logic ---
                self.processing = False

    # Audio handling
    async def receive(self, frame: tuple[int, NDArray[np.int16 | np.float32]]):
        # Buffer audio for STT
        # _, array = frame
        # array = array.squeeze()
        self.audio_queue.put_nowait(frame)

    async def emit(self):
        # Emit TTS audio chunks if available
        array = await wait_for_item(self.tts_queue, 0.01)
        if array is not None:
            return (self.output_sample_rate, array)
        return array

    # Video handling
    async def video_receive(self, frame: np.ndarray):
        self.video_queue.put_nowait(frame)
        # Store the latest video frame
        self.latest_video_frame = frame

    async def video_emit(self):
        frame = await wait_for_item(self.video_queue, 0.01)
        if frame is not None:
            return frame
        else:
            # Return a blank frame (e.g., 100x100 black image)
            return np.zeros((100, 100, 3), dtype=np.uint8)

    async def shutdown(self) -> None:
        # if self.session:
        self.quit.set()
            # await self.session.close()
        self.quit.clear()
            
chatbot = gr.Chatbot(type="messages")
stream = Stream(
    modality="audio-video",
    mode="send-receive",
    handler=LLMVoiceVideoHandler(),
    additional_outputs_handler=lambda a, b: b,
    rtc_configuration=get_twilio_turn_credentials() if get_space() else None,
    concurrency_limit=5 if get_space() else None,
    time_limit=90 if get_space() else None,
    ui_args={"title": "LLM Voice+Video Chat (Bedrock, Polly, WebRTC ⚡️)"},
)

# Mount the STREAM UI to the FastAPI app
# Because I don't want to build the UI manually
# app = FastAPI()
# app = gr.mount_gradio_app(app, stream.ui, path="/")


if __name__ == "__main__":
    # import os

    # os.environ["GRADIO_SSR_MODE"] = "false"
    # stream.fastphone(host="0.0.0.0", port=7860)
    stream.ui.launch(server_port=7860)

I tried my level best but unable to make it work. The audio transcription always outputs '' empty string because the audio samples are way small. It worked well when only doing the audio modality but with audio-video I am unable to understand the flow. And trust me when I say this is for a bigger agentic project and not just some side project.

@mhart @freddyaboulton Please kindly help me out.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions