-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmicListener.py
123 lines (95 loc) · 4.23 KB
/
micListener.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import sounddevice as sd # needed to control the microphone
import soundfile as sf # needed to create the audio files
#import numpy # needed to create the numpy array of the wav files
import queue # needed for making the queue that handles real time audio
import sys # needed for file status
from openai import OpenAI # needed for calling OpenAI Audio API
import yaml # needed for config
import pika # needed to send messages out via RabbitMQ
import threading # needed for multi threads
from gpiozero import Button, LED # needed for button control
import time # needed for sleep
class MicListener:
"""
Class that handles listening to the microphone and generating
"""
def __init__(self):
"""
Initialization method
"""
self.recordStatus = False # boolean for if the audio is being saved
self.led = LED("BOARD8")
self.led.off() # just using this to turn the pin into a ground
self.button = Button("BOARD10") # the actual button pin
self.queue = queue.Queue()
# setup microphone
self.deviceInfo = sd.query_devices(kind='input')
#print(str(self.deviceInfo))
# load config settings
with open("./configs/billing.yaml", "r") as ymlfile:
config = yaml.safe_load(ymlfile)
# load openAI keys into client
self.client = OpenAI(api_key=config["openai"]["API_KEY"])
# set up RabbitMQ
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', heartbeat=3600)) # increase heartbeat to deal with weird dropouts
self.channel = self.connection.channel()
self.channel.queue_declare(queue='userOutput')
def callback(self, indata, frames, time, status):
"""This is called (from a separate thread) for each audio block."""
if status:
print(status, file=sys.stderr)
self.queue.put(indata.copy())
def transcribeAudio(self):
"""
Transcribes the recorded audio into a text string and returns it
Returns:
str: The text representing all speech recorded by the audio file
"""
# check the file size to make sure audio file is long enough
f = sf.SoundFile("request.wav")
#print(f"Runtime = {str(f.frames / f.samplerate)}")
if (f.frames / f.samplerate) > 0.1:
audio_file = open("request.wav", "rb")
transcript = self.client.audio.transcriptions.create(model="whisper-1", file=audio_file, response_format="text")
return str(transcript)
else:
return "Error, recording was too short"
def piListener(self):
"""
records mic while button is pressed, and stops while released
"""
print("press button to record")
sampleRate = int(self.deviceInfo['default_samplerate'])
with sf.SoundFile(file="request.wav", mode='w', samplerate=sampleRate, channels=1, subtype='PCM_16') as file:
with sd.InputStream(samplerate=sampleRate, channels=1, callback=self.callback):
while True:
if self.button.is_pressed:
self.recordStatus = True
file.write(self.queue.get())
elif not self.button.is_pressed and self.recordStatus:
self.recordStatus = False
print("Finished recording")
file.close()
break
elif not self.queue.empty():
#print("clearing queue")
self.queue.get()
def publishText(self, text):
"""
pushes text out to the message queue
Args:
text (str): the text to add to the message queue
"""
self.channel.basic_publish(exchange='', routing_key='userOutput', body=text)
if __name__ == "__main__":
print("Running Mic Listener")
micListener = MicListener()
while True:
micListener.piListener()
text = micListener.transcribeAudio()
if not text.startswith("Error"):
micListener.publishText(text=text)
else:
print(text)
#print(text)
#micListener.callLLM(text)