-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtranscribe.py
287 lines (235 loc) · 9.89 KB
/
transcribe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
"""
Audio Stream Transcriber
-----------------------
A service that streams audio from Broadcastify, processes it using OpenAI's Whisper model,
and saves transcriptions to a file.
Requirements:
- See requirements.txt for dependencies
- Environment variables:
BROADCASTIFY_FEED_ID: ID of the Broadcastify feed
BROADCASTIFY_USERNAME: Broadcastify username
BROADCASTIFY_PASSWORD: Broadcastify password
WHISPER_MODEL_SIZE: Size of Whisper model (optional, defaults to 'medium')
"""
import io
import gc
import logging
import os
import signal
import sys
from datetime import datetime
from queue import Queue
from threading import Thread
import ffmpeg
import numpy as np
import pytz
import requests
import torch
import whisper
from dotenv import load_dotenv
class ConfigurationError(Exception):
"""Raised when there's an issue with the configuration settings."""
pass
class AudioStreamTranscriber:
"""Handles streaming audio from Broadcastify and transcribing it using Whisper."""
# Constants
BUFFER_DURATION = 10 # seconds
SAMPLE_RATE = 16000 # Hz (Whisper's expected sample rate)
BYTES_PER_SECOND = 16 * 1024 # 16KB per second for 128kbps MP3
CHUNK_SIZE = 1024 # Chunk size for reading stream
QUEUE_SIZE = 3 # Maximum size of processing queue
# Default paths that can be overridden by environment variables
OUTPUT_PATH = os.getenv('OUTPUT_PATH', os.path.join(os.getcwd(), 'data', 'transcribe.txt'))
LOG_PATH = os.getenv('LOG_PATH', os.path.join(os.getcwd(), 'data', 'transcribe.log'))
def __init__(self):
"""Initialize the transcriber with configuration from environment variables."""
self._load_config()
self._setup_torch()
self._setup_model()
self._setup_processing()
def _load_config(self):
"""Load and validate configuration from environment variables."""
load_dotenv()
# Required environment variables
self.feed_id = os.getenv('BROADCASTIFY_FEED_ID')
self.username = os.getenv('BROADCASTIFY_USERNAME')
self.password = os.getenv('BROADCASTIFY_PASSWORD')
if not all([self.feed_id, self.username, self.password]):
raise ConfigurationError(
"Missing required environment variables: "
"BROADCASTIFY_FEED_ID, BROADCASTIFY_USERNAME, or BROADCASTIFY_PASSWORD"
)
# Optional environment variables with defaults
self.no_speech_threshold = float(os.getenv('WHISPER_NO_SPEECH_THRESHOLD', '0.6'))
self.url = f"https://audio.broadcastify.com/{self.feed_id}.mp3"
self.timezone = pytz.timezone('America/Chicago')
self.buffer_size = self.BYTES_PER_SECOND * self.BUFFER_DURATION
def _setup_torch(self):
"""Configure PyTorch settings."""
torch._utils._load_global_deps = lambda obj, *args, **kwargs: obj
torch.set_grad_enabled(False)
self.device = "cuda" if torch.cuda.is_available() else "cpu"
if torch.cuda.is_available():
torch.backends.cudnn.benchmark = True
def _setup_model(self):
"""Load and configure the Whisper model."""
model_size = os.getenv('WHISPER_MODEL_SIZE', 'medium').lower()
valid_models = ['tiny', 'tiny.en', 'base', 'base.en', 'small',
'small.en', 'medium', 'medium.en', 'large', 'turbo']
if model_size not in valid_models:
raise ConfigurationError(
f"Invalid WHISPER_MODEL_SIZE. Must be one of: {', '.join(valid_models)}"
)
self.model = whisper.load_model(model_size, device=self.device)
def _setup_processing(self):
"""Set up audio processing configuration."""
self.audio_queue = Queue(maxsize=self.QUEUE_SIZE)
self.running = True
self.ffmpeg_args = {
'format': 'f32le',
'acodec': 'pcm_f32le',
'ac': 1,
'ar': self.SAMPLE_RATE
}
def get_formatted_time(self):
"""Get current time formatted with timezone."""
return datetime.now(self.timezone).strftime("%Y-%m-%d %H:%M:%S %Z")
def decode_mp3_to_pcm(self, mp3_data):
"""Convert MP3 data to PCM format using ffmpeg."""
try:
process = (
ffmpeg
.input('pipe:0')
.output('pipe:1', **self.ffmpeg_args)
.overwrite_output()
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True)
)
stdout, stderr = process.communicate(input=mp3_data)
return np.frombuffer(stdout, dtype=np.float32).copy()
except Exception as e:
logging.error(f"Decoding error: {e}")
return None
def process_audio_chunk(self, audio_data):
"""Process a chunk of audio data through the Whisper model."""
try:
pcm_data = self.decode_mp3_to_pcm(audio_data)
if pcm_data is None or len(pcm_data) == 0:
return None
audio_tensor = torch.from_numpy(pcm_data).to(self.device)
result = self.model.transcribe(
audio_tensor,
language="en",
task="transcribe",
initial_prompt="This is police and emergency dispatch radio communication.",
no_speech_threshold=self.no_speech_threshold
)
return result["text"]
except Exception as e:
logging.error(f"Processing error: {e}")
return None
finally:
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
def transcription_worker(self):
"""Worker thread for processing audio chunks from the queue."""
while self.running:
try:
audio_data = self.audio_queue.get(timeout=1)
transcription = self.process_audio_chunk(audio_data)
if transcription:
self.write_transcription(transcription)
self.audio_queue.task_done()
except Exception:
continue
def write_transcription(self, transcription):
"""Write transcription to file with timestamp."""
if not transcription or not transcription.strip():
return
timestamp = self.get_formatted_time()
try:
with open(self.OUTPUT_PATH, "a", encoding='utf-8') as f:
log_entry = f"[{timestamp}] {transcription}\n"
f.write(log_entry)
f.flush()
logging.info("Successfully wrote transcription to file")
except Exception as e:
logging.error(f"Error writing transcription: {e}")
def stream_and_transcribe(self):
"""Main method to stream audio and process it for transcription."""
try:
logging.info("Starting streaming and transcription service")
logging.info(f"Stream URL: {self.url}")
logging.info(f"Transcriptions will be saved in: {os.path.abspath(self.OUTPUT_PATH)}")
worker_thread = Thread(target=self.transcription_worker, daemon=True)
worker_thread.start()
session = requests.Session()
response = session.get(
self.url,
auth=(self.username, self.password),
stream=True,
timeout=30
)
if response.status_code != 200:
raise ConnectionError(f"Failed to connect to stream. Status code: {response.status_code}")
self._process_stream(response)
except KeyboardInterrupt:
logging.info("\nStopping stream...")
except Exception as e:
logging.error(f"Streaming error: {e}")
finally:
self._cleanup(session, worker_thread if 'worker_thread' in locals() else None)
def _process_stream(self, response):
"""Process the audio stream in chunks."""
buffer = bytearray()
for chunk in response.iter_content(chunk_size=self.CHUNK_SIZE):
if not self.running:
break
if chunk:
buffer.extend(chunk)
if len(buffer) >= self.buffer_size:
try:
self.audio_queue.put(bytes(buffer), timeout=1)
except Exception:
logging.warning("Queue full, skipping chunk")
buffer.clear()
def _cleanup(self, session, worker_thread):
"""Clean up resources when stopping the service."""
self.running = False
session.close()
if worker_thread:
worker_thread.join(timeout=5)
def setup_logging():
"""Configure logging settings and create necessary directories."""
# Create directory for log and output files if it doesn't exist
log_dir = os.path.dirname(AudioStreamTranscriber.LOG_PATH)
os.makedirs(log_dir, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(AudioStreamTranscriber.LOG_PATH),
logging.StreamHandler(sys.stdout)
]
)
def signal_handler(signum, frame):
"""Handle termination signals."""
logging.info("Received signal to terminate. Shutting down...")
sys.exit(0)
def main():
"""Main entry point for the application."""
signal.signal(signal.SIGTSTP, signal_handler) # Ctrl+Z
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
setup_logging()
try:
transcriber = AudioStreamTranscriber()
transcriber.stream_and_transcribe()
except ConfigurationError as e:
logging.error(f"Configuration error: {e}")
logging.error("Please check your environment variables and try again.")
sys.exit(1)
except Exception as e:
logging.error(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()