add cli args
parent
7d16adc1ff
commit
1455a93b40
|
|
@ -2,9 +2,10 @@ import random
|
|||
import time
|
||||
|
||||
class Bucket:
|
||||
def __init__(self):
|
||||
def __init__(self, interval):
|
||||
random.seed(time.time())
|
||||
self.name = Bucket.ms_to_bucket(Bucket.now_ms())
|
||||
self.interval = interval
|
||||
self.name = self.ms_to_bucket(Bucket.now_ms())
|
||||
self.content = []
|
||||
|
||||
def push(self, v):
|
||||
|
|
@ -22,9 +23,6 @@ class Bucket:
|
|||
def now_ms():
|
||||
return int(1000 * time.time())
|
||||
|
||||
def ms_to_bucket(ms):
|
||||
return int(int(ms // Bucket.interval()) * Bucket.interval())
|
||||
|
||||
def interval():
|
||||
return 100
|
||||
def ms_to_bucket(self, ms):
|
||||
return int(int(ms // self.interval) * self.interval)
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,4 @@
|
|||
from sys import stderr
|
||||
|
||||
def info(*args):
|
||||
print(*args, file=stderr)
|
||||
53
src/main.py
53
src/main.py
|
|
@ -1,30 +1,65 @@
|
|||
import reader
|
||||
import writer
|
||||
import bucket
|
||||
import log
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import time
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--stdin", default=False, action="store_true")
|
||||
ap.add_argument("--interval", default=250, type=int)
|
||||
ap.add_argument("--max-keys-down", default=2, type=int)
|
||||
ap.add_argument("--startup", default=3, type=int)
|
||||
ap.add_argument("--controller-setup", default=False, action="store_true")
|
||||
ap.add_argument("--translation", default=json.dumps({
|
||||
"up": {"key": "w", "weight": 20},
|
||||
"down": {"key": "s", "weight": 1},
|
||||
"left": {"key": "a", "weight": 1},
|
||||
"right": {"key": "d", "weight": 20},
|
||||
}))
|
||||
args = ap.parse_args()
|
||||
|
||||
w_translation = json.loads(args.translation) if args.translation else None
|
||||
w = writer.Writer(writer.PyAutoGUIWriter(w_translation))
|
||||
|
||||
if args.controller_setup:
|
||||
stdin = reader.StdinReader()
|
||||
for k in w_translation.keys():
|
||||
log.info(f"===configuring '{k}' in===")
|
||||
for i in range(3, 0, -1):
|
||||
log.info(f" {i}")
|
||||
time.sleep(1)
|
||||
w.write(k)
|
||||
time.sleep(1)
|
||||
w.write([])
|
||||
time.sleep(1)
|
||||
log.info(f"/configured '{k}', [ENTER] to continue")
|
||||
while not stdin.read():
|
||||
time.sleep(.1)
|
||||
exit(0)
|
||||
|
||||
r = reader.Reader(reader.StdinReader())
|
||||
if not args.stdin:
|
||||
r = reader.Reader(reader.RandomReader())
|
||||
w = writer.Writer(writer.MultiWriter(
|
||||
#writer.StdoutWriter(),
|
||||
writer.PyAutoGUIWriter(),
|
||||
))
|
||||
bkt = bucket.Bucket()
|
||||
r = reader.Reader(reader.RandomReader(w_translation))
|
||||
bkt = bucket.Bucket(args.interval)
|
||||
|
||||
log.info("starting in")
|
||||
for i in range(args.startup, 0, -1):
|
||||
log.info("...", i)
|
||||
time.sleep(1)
|
||||
log.info("go")
|
||||
|
||||
for line in r.read():
|
||||
latest_bkt = bucket.Bucket()
|
||||
latest_bkt = bucket.Bucket(args.interval)
|
||||
if latest_bkt.name != bkt.name:
|
||||
picked = bkt.pick_n(2)
|
||||
picked = bkt.pick_n(args.max_keys_down)
|
||||
w.write(picked)
|
||||
bkt = latest_bkt
|
||||
bkt.push(line)
|
||||
print("closing writer")
|
||||
log.info("closing writer")
|
||||
w.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import time
|
|||
import sys
|
||||
import select
|
||||
import random
|
||||
import log
|
||||
|
||||
__interval__ = .1
|
||||
|
||||
|
|
@ -30,7 +31,8 @@ class Reader:
|
|||
self.q.put_nowait(self.foo())
|
||||
except queue.Full:
|
||||
pass
|
||||
time.sleep(__interval__/2.0)
|
||||
except StopIteration:
|
||||
self.stop()
|
||||
|
||||
reader = async_reader(q, self.reader.read)
|
||||
reader.start()
|
||||
|
|
@ -49,28 +51,42 @@ class Reader:
|
|||
reader.join()
|
||||
|
||||
class StdinReader:
|
||||
def __init__(self):
|
||||
self.__closed__ = False
|
||||
|
||||
def read(self):
|
||||
if self.__closed__:
|
||||
raise StopIteration
|
||||
try:
|
||||
if select.select([sys.stdin,],[],[],__interval__/2.0)[0]:
|
||||
line = sys.stdin.readline()
|
||||
self.__closed__ = not line
|
||||
return line
|
||||
except Exception as e:
|
||||
pass
|
||||
return None
|
||||
|
||||
class RandomReader:
|
||||
def __init__(self, keys={"a":{"weight":1}, "b":{"weight":1}}):
|
||||
self.keys = keys
|
||||
self.weighted_keys = []
|
||||
for k in self.keys:
|
||||
weight = int(self.keys[k]["weight"] * 1000)
|
||||
self.weighted_keys.extend([k] * weight)
|
||||
log.info("weighted keys", self.weighted_keys)
|
||||
|
||||
def read(self):
|
||||
seed = random.randint(0, 99)
|
||||
if seed < 10:
|
||||
return "a"
|
||||
elif seed < 50:
|
||||
return "B"
|
||||
else:
|
||||
return "b"
|
||||
return self.weighted_keys[
|
||||
random.randint(0, len(self.weighted_keys)-1)
|
||||
]
|
||||
|
||||
class FileReader:
|
||||
def __init__(self, path):
|
||||
self.f = open(path, "r")
|
||||
self.__closed__ = False
|
||||
|
||||
def read(self):
|
||||
return self.f.readline()
|
||||
if self.__closed__:
|
||||
raise StopIteration
|
||||
result = self.f.readline()
|
||||
result = not self.__closed__
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import time
|
||||
import json
|
||||
import pyautogui
|
||||
import log
|
||||
|
||||
class Writer:
|
||||
def __init__(self, writer):
|
||||
|
|
@ -16,6 +17,7 @@ class Writer:
|
|||
|
||||
def close(self):
|
||||
self.writer.write(Line([]))
|
||||
time.sleep(.2)
|
||||
|
||||
class MultiWriter:
|
||||
def __init__(self, *writers):
|
||||
|
|
@ -30,14 +32,11 @@ class StdoutWriter:
|
|||
print(v)
|
||||
|
||||
class PyAutoGUIWriter:
|
||||
translation = {
|
||||
"a": "f24",
|
||||
"b": "f23",
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, optional_translation):
|
||||
pyautogui.FAILSAFE = False
|
||||
self.keys_down = set()
|
||||
print(json.dumps(PyAutoGUIWriter.translation, indent=" "))
|
||||
self.translation = optional_translation
|
||||
log.info(json.dumps(self.translation, indent=" "))
|
||||
|
||||
def write(self, v):
|
||||
to_push = set()
|
||||
|
|
@ -50,21 +49,21 @@ class PyAutoGUIWriter:
|
|||
self.release(key)
|
||||
for key in to_push:
|
||||
self.push(key)
|
||||
print("PUSHING", self.keys_down)
|
||||
log.info("PUSHING", self.keys_down)
|
||||
|
||||
# https://pyautogui.readthedocs.io/en/latest/keyboard.html#keyboard-keys
|
||||
def translate(self, v):
|
||||
result = PyAutoGUIWriter.translation.get(v, None)
|
||||
result = self.translation.get(v, {}).get("key")
|
||||
if result:
|
||||
return result
|
||||
|
||||
def push(self, k):
|
||||
self.keys_down.add(k)
|
||||
#pyautogui.keyDown(k)
|
||||
pyautogui.keyDown(k)
|
||||
|
||||
def release(self, k):
|
||||
self.keys_down.remove(k)
|
||||
#pyautogui.keyUp(k)
|
||||
pyautogui.keyUp(k)
|
||||
|
||||
class Line:
|
||||
def __init__(self, v):
|
||||
|
|
|
|||
Loading…
Reference in New Issue