import speech_recognition as sr import time import threading import queue import signal import sys import os import requests import yaml def log(*args): print(">", *args, file=sys.stderr) class Piper(threading.Thread): def __init__(self, inq, outq): threading.Thread.__init__(self) self.inq = inq self.outq = outq def run(self): while True: got = self.inq.get() if got is None: break self._run(got) self.outq.put(None) class Manager(threading.Thread): def __init__(self, outq): threading.Thread.__init__(self) self.outq = outq inq = queue.Queue() def catcher(sig, frame): inq.put(None) self.inq = inq signal.signal(signal.SIGINT, catcher) def run(self): log("Manager.run: start") self.inq.get() self.outq.put(None) log("Manager.run: stop") class Reader(threading.Thread): def __init__(self, inq, outq): threading.Thread.__init__(self) self.name = os.environ.get("MIC_NAME", "pulse_monitor") if not self.name: for index, name in enumerate(sr.Microphone.list_microphone_names()): print("[{0}] Microphone with name \"{1}\" found for `Microphone(device_index={0})`".format(index, name)) exit() self.inq = inq self.outq = outq def run(self): log("Reader.run: start") try: idx = [ idx for idx,v in enumerate( sr.Microphone.list_microphone_names(), ) if v in self.name.split(",") ][0] with sr.Microphone(device_index=idx) as mic: while not self.should_stop(): try: self.outq.put(self._run(mic)) except Exception as e: if not "timed out" in str(e): log("Reader.run: error:", e) except Exception as e: log("Reader.run panic:", e) log("microphones:", sr.Microphone.list_microphone_names()) finally: self.outq.put(None) log("Reader.run: stop") def should_stop(self): return not self.inq.empty() def _run(self, mic): mic_timeout = int(os.environ.get("MIC_TIMEOUT", 5)) r = sr.Recognizer() return r.listen( mic, timeout=mic_timeout, phrase_time_limit=mic_timeout, ) class Parser(threading.Thread): def __init__(self, inq, outq): threading.Thread.__init__(self) self.inq = inq self.outq = outq def run(self): log("Parser.run: start") while True: try: clip = self.inq.get() backlog = self.inq.qsize() if backlog: log("Parser.run backlog", backlog) if clip is None: break self.outq.put(self._run(clip)) except Exception as e: log("Parser.run: error:", e) self.outq.put(None) log("Parser.run: stop") def _run(self, clip): wav = clip.get_wav_data(convert_rate=16000) p = "/tmp/whisper-cpp.wav" with open("/tmp/whisper-cpp.wav", "wb") as f: f.write(wav) proc = subprocess.run(f"./main -m ./models/ggml-{os.environ.get('MODEL','tiny.en')}.bin -t 4 -f {p} --no-timestamps".split(), capture_output=True) result = proc.stdout.decode().strip() if os.environ.get("DEBUG", None): log("raw transcript:", result) result = result.replace(">>", "") result = "".join([i.split("]")[-1] for i in result.split("[")[0]]) result = "".join([i.split(")")[-1] for i in result.split("(")[0]]) if os.environ.get("DEBUG", None): log("annotation-free transcript:", result) return result #r = sr.Recognizer() #return r.recognize_whisper(clip, language="english", model=os.environ.get("MODEL", "small.en")) # tiny.en=32x, base.en=16x, small.en=6x, medium.en=x2 import subprocess def load_dot_notation(v, s): items = s.replace("[]", ".[]").split(".") return _load_dot_notation(v, items) def _load_dot_notation(v, items): for i in range(len(items)): k = items[i] if not k: continue if k == "[]": if isinstance(v, list): result = [] for j in v: subresult = _load_dot_notation(j, items[i+1:]) if isinstance(subresult, list): result.extend(subresult) else: result.append(subresult) return result else: result = [] for j in v.values(): subresult = _load_dot_notation(j, items[i+1:]) if isinstance(subresult, list): result.extend(subresult) else: result.append(subresult) return result else: if isinstance(v, list): v = v[int(k)] else: v = v[k] return v def test_load_dot_notation(): for i in [ "a" == load_dot_notation("a", "."), ["a"] == load_dot_notation(["a"], "."), "b" == load_dot_notation({"a":"b"}, ".a"), "c" == load_dot_notation({"a":{"b":"c"}}, ".a.b"), "c" == load_dot_notation({"a":{"b":["c"]}}, ".a.b.0"), ["c","d"] == load_dot_notation({"a":{"b":"c"}, "a2":{"b":"d"}}, ".[].b"), ["c","d"] == load_dot_notation({"a":{"b":["c"], "b2":["d"]}}, ".a.[].0"), ["c","d"] == load_dot_notation({"a":{"b":["c"], "b2":["d"]}}, ".a[].0"), ["c","d"] == load_dot_notation(["c", "d"], "."), ["c","d"] == load_dot_notation(["c", "d"], "[]"), ]: if not i: raise Exception(i) test_load_dot_notation() class Reactor(threading.Thread): def __init__(self, inq, outq): threading.Thread.__init__(self) self.inq = inq self.outq = outq self.load_hotwords = Reactor.new_load_hotwords() log(f"hotwords: {self.load_hotwords()}") def new_load_hotwords(): p = os.environ.get("HOTWORDS", None) if not p: def load_nothing(): return [] return load_nothing try: if "@" in p: def load_hotwords_in_yaml_file(): with open(p.split("@")[0], "r") as f: v = yaml.safe_load(f) v = load_dot_notation(v, p.split("@")[-1]) return ["".join(i.strip().lower().split()) for i in v if i] load_hotwords_in_yaml_file() return load_hotwords_in_yaml_file else: def load_hotwords_in_file(): with open(p, "r") as f: return ["".join(i.strip().lower().split()) for i in f.readlines()] load_hotwords_in_file() return load_hotwords_in_file except Exception as e: log(f"$HOTWORDS {p} is not a file: {e}") hotwords = ["".join(i.lower().strip().split()) for i in p.split("\/\/")] log(f'$HOTWORDS: {hotwords}') def load_hotwords_as_literal(): return hotwords return load_hotwords_as_literal def run(self): log("Reactor.run: start") while True: text = self.inq.get() if text is None: break self.handle(text) self.outq.put(None) log("Reactor.run: stop") def handle(self, text): hotwords = self.load_hotwords() if os.environ.get("DEBUG", None): log(f"seeking {hotwords} in {text}") if not hotwords: if not os.environ.get("HOTWORDS", None): print(text) else: log(text) return cleantext = "".join([i for i in "".join(text.lower().split()) if i.isalpha()]) for i in hotwords: if i in cleantext: #log(f"Reactor.handle: found hotword '{i}' in '{text}' as '{cleantext}'") self.outq.put((i, text)) class Actor(threading.Thread): def __init__(self, inq): threading.Thread.__init__(self) self.inq = inq self.handle = self.handle_stderr if os.environ.get("STDOUT", "") == "true": self.handle = self.handle_stdout elif os.environ.get("SIGUSR2", ""): self.pid = int(environ["SIGUSR2"]) self.handle = self.handle_signal elif os.environ.get("URL", ""): self.url = environ["URL"] self.handle = self.handle_url self.headers = [i.split("=")[:2] for i in os.environ.get("HEADERS", "").split("//") if i] self.body = os.environ.get("BODY", '{"hotword":"{{hotword}}","context":"{{context}}"}') log(self.headers) def run(self): log("Actor.run: start") while True: got = self.inq.get() if got is None: break self.handle(got[0], got[1]) log("Actor.run: stop") def handle_stderr(self, hotword, context): log(f"'{hotword}' in '{context}'") def handle_stdout(self, hotword, context): log(context) print(hotword) def handle_signal(self, hotword, context): self.handle_stderr(hotword, context) os.kill(self.pid, signal.SIGUSR2) def handle_url(self, hotword, context): self.handle_stderr(hotword, context) try: headers = {} for i in self.headers: key = i[0] value = i[1] value = value.replace("{{hotword}}", hotword) value = value.replace("{{context}}", context) headers[key] = value body = self.body body = body.replace("{{hotword}}", hotword) body = body.replace("{{context}}", context) if os.environ.get("DEBUG", "") : log("POST", self.url, headers, body) requests.post(self.url, headers=headers, data=body) except Exception as e: log("Actor.handle_url:", e) def main(): managerToParserQ = queue.Queue(maxsize=1) readerToParserQ = queue.Queue(maxsize=10) parserToReactorQ = queue.Queue(maxsize=10) reactorToActorQ = queue.Queue(maxsize=10) threads = [ Manager(managerToParserQ), Reader(managerToParserQ, readerToParserQ), Parser(readerToParserQ, parserToReactorQ), Reactor(parserToReactorQ, reactorToActorQ), Actor(reactorToActorQ), ] [t.start() for t in threads] [t.join() for t in threads] if __name__ == "__main__": main()