rust-whisper/whisper-2023/hotwords.py

291 lines
8.8 KiB
Python

import speech_recognition as sr
import time
import threading
import queue
import signal
import sys
from os import environ
from os import kill
import requests
import yaml
def main():
managerToParserQ = queue.Queue(maxsize=1)
readerToParserQ = queue.Queue(maxsize=10)
parserToReactorQ = queue.Queue(maxsize=10)
reactorToActorQ = queue.Queue(maxsize=10)
threads = [
Manager(managerToParserQ),
Reader(managerToParserQ, readerToParserQ),
Parser(readerToParserQ, parserToReactorQ),
Reactor(parserToReactorQ, reactorToActorQ),
Actor(reactorToActorQ),
]
[t.start() for t in threads]
[t.join() for t in threads]
def log(*args):
print(">", *args, file=sys.stderr)
class Piper(threading.Thread):
def __init__(self, inq, outq):
threading.Thread.__init__(self)
self.inq = inq
self.outq = outq
def run(self):
while True:
got = self.inq.get()
if got is None:
break
self._run(got)
self.outq.put(None)
class Manager(threading.Thread):
def __init__(self, outq):
threading.Thread.__init__(self)
self.outq = outq
inq = queue.Queue(maxsize=1)
def catcher(sig, frame):
inq.put(None)
self.inq = inq
signal.signal(signal.SIGINT, catcher)
def run(self):
log("Manager.run: start")
self.inq.get()
self.outq.put(None)
log("Manager.run: stop")
class Reader(threading.Thread):
def __init__(self, inq, outq):
threading.Thread.__init__(self)
self.name = environ.get("MIC_NAME", "pulse_monitor")
if not self.name:
for index, name in enumerate(sr.Microphone.list_microphone_names()):
print("[{0}] Microphone with name \"{1}\" found for `Microphone(device_index={0})`".format(index, name))
exit()
self.inq = inq
self.outq = outq
def run(self):
log("Reader.run: start")
idx = [
idx for idx,v in enumerate(
sr.Microphone.list_microphone_names(),
) if v in self.name.split(",")
][0]
with sr.Microphone(device_index=idx) as mic:
while not self.should_stop():
try:
self.outq.put(self._run(mic))
except Exception as e:
if not "timed out" in str(e):
log("Reader.run: error:", e)
self.outq.put(None)
log("Reader.run: stop")
def should_stop(self):
return not self.inq.empty()
def _run(self, mic):
mic_timeout = int(environ.get("MIC_TIMEOUT", 5))
r = sr.Recognizer()
return r.listen(
mic,
timeout=mic_timeout,
phrase_time_limit=mic_timeout,
)
class Parser(threading.Thread):
def __init__(self, inq, outq):
threading.Thread.__init__(self)
self.inq = inq
self.outq = outq
def run(self):
log("Parser.run: start")
while True:
try:
clip = self.inq.get()
if not clip:
break
self.outq.put(self._run(clip).strip())
except Exception as e:
log("Parser.run: error:", e)
self.outq.put(None)
log("Parser.run: stop")
def _run(self, clip):
r = sr.Recognizer()
return r.recognize_whisper(clip, language="english")
def load_dot_notation(v, s):
items = s.replace("[]", ".[]").split(".")
return _load_dot_notation(v, items)
def _load_dot_notation(v, items):
for i in range(len(items)):
k = items[i]
if not k:
continue
if k == "[]":
if isinstance(v, list):
result = []
for j in v:
subresult = _load_dot_notation(j, items[i+1:])
if isinstance(subresult, list):
result.extend(subresult)
else:
result.append(subresult)
return result
else:
result = []
for j in v.values():
subresult = _load_dot_notation(j, items[i+1:])
if isinstance(subresult, list):
result.extend(subresult)
else:
result.append(subresult)
return result
else:
if isinstance(v, list):
v = v[int(k)]
else:
v = v[k]
return v
def test_load_dot_notation():
assert("a" == load_dot_notation("a", "."))
assert(["a"] == load_dot_notation(["a"], "."))
assert("b" == load_dot_notation({"a":"b"}, ".a"))
assert("c" == load_dot_notation({"a":{"b":"c"}}, ".a.b"))
assert("c" == load_dot_notation({"a":{"b":["c"]}}, ".a.b.0"))
assert(["c","d"] == load_dot_notation({"a":{"b":"c"}, "a2":{"b":"d"}}, ".[].b"))
assert(["c","d"] == load_dot_notation({"a":{"b":["c"], "b2":["d"]}}, ".a.[].0"))
assert(["c","d"] == load_dot_notation({"a":{"b":["c"], "b2":["d"]}}, ".a[].0"))
assert(["c","d"] == load_dot_notation(["c", "d"], "."))
assert(["c","d"] == load_dot_notation(["c", "d"], "[]"))
test_load_dot_notation()
class Reactor(threading.Thread):
def __init__(self, inq, outq):
threading.Thread.__init__(self)
self.inq = inq
self.outq = outq
self.load_hotwords = Reactor.new_load_hotwords()
log(f"hotwords: {self.load_hotwords()}")
def new_load_hotwords():
p = environ.get("HOTWORDS", None)
if not p:
def load_nothing():
return []
return load_nothing
try:
if "@" in p:
def load_hotwords_in_yaml_file():
with open(p.split("@")[0], "r") as f:
v = yaml.safe_load(f)
v = load_dot_notation(v, p.split("@")[-1])
return ["".join(i.strip().lower().split()) for i in v if i]
load_hotwords_in_yaml_file()
return load_hotwords_in_yaml_file
else:
def load_hotwords_in_file():
with open(p, "r") as f:
return ["".join(i.strip().lower().split()) for i in f.readlines()]
load_hotwords_in_file()
return load_hotwords_in_file
except Exception as e:
log(f"$HOTWORDS {p} is not a file: {e}")
hotwords = ["".join(i.lower().strip().split()) for i in p.split("\/\/")]
log(f'$HOTWORDS: {hotwords}')
def load_hotwords_as_literal():
return hotwords
return load_hotwords_as_literal
def run(self):
log("Reactor.run: start")
while True:
text = self.inq.get()
if text is None:
break
self.handle(text)
self.outq.put(None)
log("Reactor.run: stop")
def handle(self, text):
hotwords = self.load_hotwords()
if environ.get("DEBUG", None):
log(f"seeking {hotwords} in {text}")
if not hotwords:
self.outq.put(("", text))
return
cleantext = "".join([i for i in "".join(text.lower().split()) if i.isalpha()])
for i in hotwords:
if i in cleantext:
#log(f"Reactor.handle: found hotword '{i}' in '{text}' as '{cleantext}'")
self.outq.put((i, text))
class Actor(threading.Thread):
def __init__(self, inq):
threading.Thread.__init__(self)
self.inq = inq
self.handle = self.handle_stderr
if environ.get("STDOUT", "") == "true":
self.handle = self.handle_stdout
elif environ.get("SIGUSR2", ""):
self.pid = int(environ["SIGUSR2"])
self.handle = self.handle_signal
elif environ.get("URL", ""):
self.url = environ["URL"]
self.handle = self.handle_url
self.headers = [i.split("=")[:2] for i in environ.get("HEADERS", "").split("//") if i]
self.body = environ.get("BODY", '{"hotword":"{{hotword}}","context":"{{context}}"}')
log(self.headers)
def run(self):
log("Actor.run: start")
while True:
got = self.inq.get()
if got is None:
break
self.handle(got[0], got[1])
log("Actor.run: stop")
def handle_stderr(self, hotword, context):
log(f"'{hotword}' in '{context}'")
def handle_stdout(self, hotword, context):
log(context)
print(hotword)
def handle_signal(self, hotword, context):
self.handle_stderr(hotword, context)
kill(self.pid, signal.SIGUSR2)
def handle_url(self, hotword, context):
self.handle_stderr(hotword, context)
try:
headers = {}
for i in self.headers:
key = i[0]
value = i[1]
value = value.replace("{{hotword}}", hotword)
value = value.replace("{{context}}", context)
headers[key] = value
body = self.body
body = body.replace("{{hotword}}", hotword)
body = body.replace("{{context}}", context)
if environ.get("DEBUG", "") :
log("POST", self.url, headers, body)
requests.post(self.url, headers=headers, data=body)
except Exception as e:
log("Actor.handle_url:", e)
if __name__ == "__main__":
main()