Skip to content

Commit

Permalink
Merge pull request #599 from pipecat-ai/mb/remove-metrics-from-transport
Browse files Browse the repository at this point in the history
Move metrics from transport to rtvi
  • Loading branch information
markbackman authored Oct 16, 2024
2 parents 8d9a748 + 5760fad commit 2aee8a1
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 84 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Changed

- Metrics messages have moved out from the transport's base output into RTVI.

## [0.0.44] - 2024-10-15

### Added
Expand Down
61 changes: 60 additions & 1 deletion src/pipecat/processors/frameworks/rtvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,17 @@

import asyncio
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Dict, List, Literal, Optional, Union
from typing import (
Any,
Awaitable,
Callable,
Dict,
List,
Literal,
Mapping,
Optional,
Union,
)

from loguru import logger
from pydantic import BaseModel, Field, PrivateAttr, ValidationError
Expand All @@ -24,6 +34,7 @@
InterimTranscriptionFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
MetricsFrame,
StartFrame,
SystemFrame,
TextFrame,
Expand All @@ -35,6 +46,12 @@
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.metrics.metrics import (
LLMUsageMetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData,
)
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
Expand Down Expand Up @@ -343,6 +360,12 @@ class RTVIBotStoppedSpeakingMessage(BaseModel):
type: Literal["bot-stopped-speaking"] = "bot-stopped-speaking"


class RTVIMetricsMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
type: Literal["metrics"] = "metrics"
data: Mapping[str, Any]


class RTVIProcessorParams(BaseModel):
send_bot_ready: bool = True

Expand Down Expand Up @@ -509,6 +532,42 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
await self._push_transport_message_urgent(message)


class RTVIMetricsProcessor(RTVIFrameProcessor):
def __init__(self, **kwargs):
super().__init__(**kwargs)

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

await self.push_frame(frame, direction)

if isinstance(frame, MetricsFrame):
await self._handle_metrics(frame)

async def _handle_metrics(self, frame: MetricsFrame):
metrics = {}
for d in frame.data:
if isinstance(d, TTFBMetricsData):
if "ttfb" not in metrics:
metrics["ttfb"] = []
metrics["ttfb"].append(d.model_dump(exclude_none=True))
elif isinstance(d, ProcessingMetricsData):
if "processing" not in metrics:
metrics["processing"] = []
metrics["processing"].append(d.model_dump(exclude_none=True))
elif isinstance(d, LLMUsageMetricsData):
if "tokens" not in metrics:
metrics["tokens"] = []
metrics["tokens"].append(d.value.model_dump(exclude_none=True))
elif isinstance(d, TTSUsageMetricsData):
if "characters" not in metrics:
metrics["characters"] = []
metrics["characters"].append(d.model_dump(exclude_none=True))

message = RTVIMetricsMessage(data=metrics)
await self._push_transport_message_urgent(message)


class RTVIProcessor(FrameProcessor):
def __init__(
self,
Expand Down
25 changes: 8 additions & 17 deletions src/pipecat/transports/base_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,34 @@

import asyncio
import itertools
import time
import sys
import time
from typing import List

from loguru import logger
from PIL import Image
from typing import List

from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import (
BotSpeakingFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
MetricsFrame,
EndFrame,
Frame,
OutputAudioRawFrame,
OutputImageRawFrame,
SpriteFrame,
StartFrame,
EndFrame,
Frame,
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame,
TTSStartedFrame,
TTSStoppedFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams

from loguru import logger

from pipecat.utils.time import nanoseconds_to_seconds


Expand Down Expand Up @@ -141,9 +138,6 @@ async def cancel(self, frame: CancelFrame):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
pass

async def send_metrics(self, frame: MetricsFrame):
pass

async def write_frame_to_camera(self, frame: OutputImageRawFrame):
pass

Expand Down Expand Up @@ -173,9 +167,6 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
elif isinstance(frame, (StartInterruptionFrame, StopInterruptionFrame)):
await self.push_frame(frame, direction)
await self._handle_interruptions(frame)
elif isinstance(frame, MetricsFrame):
await self.push_frame(frame, direction)
await self.send_metrics(frame)
elif isinstance(frame, TransportMessageUrgentFrame):
await self.send_message(frame)
elif isinstance(frame, SystemFrame):
Expand Down
32 changes: 0 additions & 32 deletions src/pipecat/transports/services/daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
Frame,
InputAudioRawFrame,
InterimTranscriptionFrame,
MetricsFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
SpriteFrame,
Expand All @@ -39,12 +38,6 @@
UserImageRawFrame,
UserImageRequestFrame,
)
from pipecat.metrics.metrics import (
LLMUsageMetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.transcriptions.language import Language
from pipecat.transports.base_input import BaseInputTransport
Expand Down Expand Up @@ -759,31 +752,6 @@ async def cleanup(self):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._messages_queue.put(frame)

async def send_metrics(self, frame: MetricsFrame):
metrics = {}
for d in frame.data:
if isinstance(d, TTFBMetricsData):
if "ttfb" not in metrics:
metrics["ttfb"] = []
metrics["ttfb"].append(d.model_dump(exclude_none=True))
elif isinstance(d, ProcessingMetricsData):
if "processing" not in metrics:
metrics["processing"] = []
metrics["processing"].append(d.model_dump(exclude_none=True))
elif isinstance(d, LLMUsageMetricsData):
if "tokens" not in metrics:
metrics["tokens"] = []
metrics["tokens"].append(d.value.model_dump(exclude_none=True))
elif isinstance(d, TTSUsageMetricsData):
if "characters" not in metrics:
metrics["characters"] = []
metrics["characters"].append(d.model_dump(exclude_none=True))

message = DailyTransportMessageFrame(
message={"label": "rtvi-ai", "type": "metrics", "data": metrics}
)
await self._messages_queue.put(message)

async def write_raw_audio_frames(self, frames: bytes):
await self._client.write_raw_audio_frames(frames)

Expand Down
37 changes: 3 additions & 34 deletions src/pipecat/transports/services/livekit.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,25 @@

import numpy as np
from loguru import logger
from pydantic import BaseModel
from scipy import signal

from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
MetricsFrame,
OutputAudioRawFrame,
StartFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
)
from pipecat.metrics.metrics import (
LLMUsageMetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.vad.vad_analyzer import VADAnalyzer
from pydantic import BaseModel
from scipy import signal

try:
from livekit import rtc
Expand Down Expand Up @@ -450,31 +444,6 @@ async def send_message(self, frame: TransportMessageFrame | TransportMessageUrge
else:
await self._client.send_data(frame.message.encode())

async def send_metrics(self, frame: MetricsFrame):
metrics = {}
for d in frame.data:
if isinstance(d, TTFBMetricsData):
if "ttfb" not in metrics:
metrics["ttfb"] = []
metrics["ttfb"].append(d.model_dump(exclude_none=True))
elif isinstance(d, ProcessingMetricsData):
if "processing" not in metrics:
metrics["processing"] = []
metrics["processing"].append(d.model_dump(exclude_none=True))
elif isinstance(d, LLMUsageMetricsData):
if "tokens" not in metrics:
metrics["tokens"] = []
metrics["tokens"].append(d.value.model_dump(exclude_none=True))
elif isinstance(d, TTSUsageMetricsData):
if "characters" not in metrics:
metrics["characters"] = []
metrics["characters"].append(d.model_dump(exclude_none=True))

message = LiveKitTransportMessageFrame(
message={"type": "pipecat-metrics", "metrics": metrics}
)
await self._client.send_data(str(message.message).encode())

async def write_raw_audio_frames(self, frames: bytes):
livekit_audio = self._convert_pipecat_audio_to_livekit(frames)
await self._client.publish_audio(livekit_audio)
Expand Down

0 comments on commit 2aee8a1

Please sign in to comment.