rust-whisper/whisper-2023/hotwords.py

116 lines
2.8 KiB
Python

import speech_recognition as sr
import time
import threading
import queue
import signal
import sys
from os import environ
def main():
managerToParserQ = queue.Queue(maxsize=1)
readerToParserQ = queue.Queue(maxsize=10)
parserToReactorQ = queue.Queue(maxsize=10)
threads = [
Manager(managerToParserQ),
Reader(managerToParserQ, readerToParserQ),
Parser(readerToParserQ, parserToReactorQ),
Reactor(parserToReactorQ),
]
[t.start() for t in threads]
[t.join() for t in threads]
def log(*args):
print(">", *args, file=sys.stderr)
class Manager(threading.Thread):
def __init__(self, outq):
threading.Thread.__init__(self)
self.outq = outq
inq = queue.Queue(maxsize=1)
def catcher(sig, frame):
inq.put(None)
self.inq = inq
signal.signal(signal.SIGINT, catcher)
def run(self):
log("Manager.run: start")
self.inq.get()
self.outq.put(None)
log("Manager.run: stop")
class Reader(threading.Thread):
def __init__(self, inq, outq):
threading.Thread.__init__(self)
self.name = environ.get("MIC_NAME", "pulse_monitor")
self.inq = inq
self.outq = outq
def run(self):
log("Reader.run: start")
idx = [
idx for idx,v in enumerate(
sr.Microphone.list_microphone_names(),
) if v in self.name.split(",")
][0]
with sr.Microphone(device_index=idx) as mic:
while not self.should_stop():
try:
self.outq.put(self._run(mic))
except Exception as e:
log("Reader.run: error:", e)
self.outq.put(None)
log("Reader.run: stop")
def should_stop(self):
return not self.inq.empty()
def _run(self, mic):
mic_timeout = int(environ.get("MIC_TIMEOUT", 5))
r = sr.Recognizer()
return r.listen(
mic,
timeout=mic_timeout,
phrase_time_limit=mic_timeout,
)
class Parser(threading.Thread):
def __init__(self, inq, outq):
threading.Thread.__init__(self)
self.inq = inq
self.outq = outq
def run(self):
log("Parser.run: start")
while True:
try:
clip = self.inq.get()
if not clip:
break
self.outq.put(self._run(clip))
except Exception as e:
log("Parser.run: error:", e)
self.outq.put(None)
log("Parser.run: stop")
def _run(self, clip):
r = sr.Recognizer()
return r.recognize_whisper(clip, language="english")
class Reactor(threading.Thread):
def __init__(self, inq):
threading.Thread.__init__(self)
self.inq = inq
def run(self):
log("Reactor.run: start")
while True:
got = self.inq.get()
if not got:
break
print(got)
log("Reactor.run: stop")
if __name__ == "__main__":
main()