Compare commits
10 Commits
7f8c87c7bc
...
f2bce26ecb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f2bce26ecb | ||
|
|
582ee7a0d2 | ||
|
|
2914c64fda | ||
|
|
c6b648195f | ||
|
|
ac7b583f76 | ||
|
|
8a9cb5a82d | ||
|
|
4b846fd9ec | ||
|
|
1da2d9fe9c | ||
|
|
dbff6792fa | ||
|
|
f5c6c37868 |
@@ -1,24 +1,25 @@
|
||||
import random
|
||||
import time
|
||||
import math
|
||||
|
||||
class Bucket:
|
||||
def __init__(self, interval):
|
||||
random.seed(time.time())
|
||||
self.interval = interval
|
||||
self.name = self.ms_to_bucket(Bucket.now_ms())
|
||||
self.chooser = BucketChooserProportionalLogRandom()
|
||||
self.content = []
|
||||
|
||||
def push(self, v):
|
||||
if v:
|
||||
self.content.append(v)
|
||||
self._push(v)
|
||||
|
||||
def pick_n(self, n):
|
||||
if not self.content:
|
||||
return []
|
||||
result = []
|
||||
for i in range(0, n):
|
||||
result.append(self.content[random.randint(0, len(self.content)-1)])
|
||||
return list(set(result))
|
||||
def _push(self, v):
|
||||
for i in self.content:
|
||||
if i[0] == v:
|
||||
i[1] += 1
|
||||
return
|
||||
self.content.append([v, 1])
|
||||
|
||||
def now_ms():
|
||||
return int(1000 * time.time())
|
||||
@@ -26,3 +27,34 @@ class Bucket:
|
||||
def ms_to_bucket(self, ms):
|
||||
return int(int(ms // self.interval) * self.interval)
|
||||
|
||||
def pick_n(self, n):
|
||||
if not self.content:
|
||||
return []
|
||||
result = set()
|
||||
for i in range(n):
|
||||
result.add(self.chooser.choose(self.content))
|
||||
return list(result)
|
||||
|
||||
class BucketChooserProportionalLogRandom:
|
||||
def choose(self, content):
|
||||
content = self.xform(content)
|
||||
assert(content)
|
||||
idx = random.randint(0, sum([i[1] for i in content])-1)
|
||||
while content:
|
||||
candidate = content.pop()
|
||||
idx -= candidate[1]
|
||||
if idx <= 0:
|
||||
return candidate[0]
|
||||
raise Exception("how?!")
|
||||
|
||||
def xform(self, content):
|
||||
return [
|
||||
(
|
||||
i[0],
|
||||
1 + int(100 * math.log2(i[1])),
|
||||
) for i in content
|
||||
]
|
||||
|
||||
class BucketChooserProportionalRandom(BucketChooserProportionalLogRandom):
|
||||
def xform(self, content):
|
||||
return [(i[0], 1+int(100*i[1])) for i in content]
|
||||
|
||||
16
src/main.py
16
src/main.py
@@ -15,15 +15,23 @@ def main():
|
||||
ap.add_argument("--max-keys-down", default=2, type=int)
|
||||
ap.add_argument("--startup", default=3, type=int)
|
||||
ap.add_argument("--controller-setup", default=False, action="store_true")
|
||||
ap.add_argument("--simulated-annealing-initial-health", default=50, type=int)
|
||||
ap.add_argument("--simulated-annealing-nothing-penalty", default=100, type=int)
|
||||
ap.add_argument("--simulated-annealing-decay-rate", default=5, type=int)
|
||||
ap.add_argument("--translation", default=json.dumps({
|
||||
"up": {"key": "w", "weight": 20},
|
||||
"down": {"key": "s", "weight": 1},
|
||||
"left": {"key": "a", "weight": 1},
|
||||
"right": {"key": "d", "weight": 20},
|
||||
"up": {"key": "w", "weight": 20, "hold": False},
|
||||
"down": {"key": "s", "weight": 1, "hold": True},
|
||||
"left": {"key": "a", "weight": 1, "hold": True},
|
||||
"right": {"key": "d", "weight": 20, "hold": True},
|
||||
}))
|
||||
args = ap.parse_args()
|
||||
|
||||
writer.LineChooserSimulatedAnnealing.initial_health = args.simulated_annealing_initial_health
|
||||
writer.LineChooserSimulatedAnnealing.nothing_penalty = args.simulated_annealing_nothing_penalty
|
||||
writer.LineChooserSimulatedAnnealing.decay_rate = args.simulated_annealing_decay_rate
|
||||
|
||||
w_translation = json.loads(args.translation) if args.translation else None
|
||||
|
||||
w = writer.Writer(writer.PyAutoGUIWriter(w_translation))
|
||||
if args.stdout:
|
||||
w = writer.Writer(writer.StdoutWriter())
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
import threading
|
||||
import math
|
||||
import json
|
||||
import queue
|
||||
import time
|
||||
import sys
|
||||
import select
|
||||
import random
|
||||
import log
|
||||
import bucket
|
||||
|
||||
__interval__ = .1
|
||||
|
||||
@@ -67,45 +70,26 @@ class StdinReader:
|
||||
return None
|
||||
|
||||
class RandomReader:
|
||||
def __init__(self, keys={"a":{"weight":1}, "b":{"weight":1}}):
|
||||
self.keys = keys
|
||||
self.pool = RandomPool(keys)
|
||||
def __init__(self, translation):
|
||||
self.translation = translation
|
||||
self.pool = bucket.Bucket(1)
|
||||
self.pool.chooser = bucket.BucketChooserProportionalRandom()
|
||||
|
||||
total_weight = float(sum([i["weight"] for i in translation.values()]))
|
||||
for v in translation.values():
|
||||
v["weight"] = float(v["weight"]) / total_weight
|
||||
for v in translation.values():
|
||||
v["weight"] = int(1000 * math.sqrt(v["weight"]))
|
||||
print(1, json.dumps(translation, indent=" "))
|
||||
for k,v in translation.items():
|
||||
for i in range(v["weight"]):
|
||||
self.pool.push(k)
|
||||
|
||||
def read(self):
|
||||
return self.pool.pop()
|
||||
|
||||
class RandomPool:
|
||||
def __init__(self, values):
|
||||
self.values = {k:1000*v["weight"] for k,v in values.items()}
|
||||
self.total = sum(self.values.values())
|
||||
self.consumed = set()
|
||||
|
||||
def reset(self):
|
||||
self.consumed = set()
|
||||
|
||||
def pop(self):
|
||||
k = self._pop()
|
||||
for i in range(0, 3-1):
|
||||
if k in self.consumed:
|
||||
k = self._pop()
|
||||
else:
|
||||
break
|
||||
self.consumed.add(k)
|
||||
if self.should_reset():
|
||||
self.reset()
|
||||
return k
|
||||
|
||||
def _pop(self):
|
||||
idx = random.randint(0, self.total-1)
|
||||
idx_offset = 0
|
||||
for k in sorted(self.values.keys()):
|
||||
if self.values[k] > idx+idx_offset:
|
||||
return k
|
||||
idx_offset -= self.values[k]
|
||||
raise Exception(":(")
|
||||
|
||||
def should_reset(self):
|
||||
return len(self.consumed) >= len(self.values)/2
|
||||
result = self.pool.pick_n(1)
|
||||
if not result:
|
||||
return None
|
||||
return result[0]
|
||||
|
||||
class FileReader:
|
||||
def __init__(self, path):
|
||||
|
||||
45
src/test_bucket.py
Normal file
45
src/test_bucket.py
Normal file
@@ -0,0 +1,45 @@
|
||||
import unittest
|
||||
|
||||
import bucket
|
||||
|
||||
class TestBucket(unittest.TestCase):
|
||||
def test(self):
|
||||
bkt = bucket.Bucket(1)
|
||||
for i in range(50):
|
||||
bkt.push(1)
|
||||
bkt.push(2)
|
||||
|
||||
self.assertEqual(2, len(bkt.content))
|
||||
for i in range(1000):
|
||||
result = bkt.pick_n(2)
|
||||
self.assertTrue(result)
|
||||
self.assertTrue(len(result) <= 2)
|
||||
|
||||
result = bkt.pick_n(2)
|
||||
while len(result) != 2:
|
||||
result = bkt.pick_n(2)
|
||||
|
||||
result = bkt.pick_n(2)
|
||||
while len(result) != 1:
|
||||
result = bkt.pick_n(2)
|
||||
|
||||
class TestBucketChooserProportionalLogRandom(unittest.TestCase):
|
||||
def test_xform(self):
|
||||
chooser = bucket.BucketChooserProportionalLogRandom()
|
||||
self.assertFalse(bool(chooser.xform([])))
|
||||
self.assertEqual(1, len(chooser.xform([ (1,1) ])))
|
||||
self.assertEqual(2, len(chooser.xform([ (1,1) ])[0]))
|
||||
|
||||
class TestBucketChoosers(unittest.TestCase):
|
||||
def test_choosers(self):
|
||||
for chooser in [
|
||||
bucket.BucketChooserProportionalLogRandom(),
|
||||
bucket.BucketChooserProportionalRandom(),
|
||||
]:
|
||||
with self.assertRaises(AssertionError):
|
||||
self.assertRaises(chooser.choose([]))
|
||||
self.assertTrue(chooser.choose([(1,1)]))
|
||||
self.assertEqual(1, chooser.choose([(1,1)]))
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
60
src/test_writer.py
Normal file
60
src/test_writer.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import unittest
|
||||
|
||||
import writer
|
||||
|
||||
class TestLineChooser(unittest.TestCase):
|
||||
def test_simulated_annealing(self):
|
||||
chooser = writer.LineChooserSimulatedAnnealing()
|
||||
chooser.now = lambda *args: 100
|
||||
|
||||
for name, c in ({
|
||||
"fresh something": [
|
||||
writer.LineChooserSimulatedAnnealing.initial_health,
|
||||
chooser.now(),
|
||||
],
|
||||
"old something": [
|
||||
writer.LineChooserSimulatedAnnealing.initial_health - writer.LineChooserSimulatedAnnealing.decay_rate * 5,
|
||||
chooser.now() - 5,
|
||||
],
|
||||
"fresh nothing": [
|
||||
writer.LineChooserSimulatedAnnealing.initial_health - writer.LineChooserSimulatedAnnealing.nothing_penalty,
|
||||
chooser.now(),
|
||||
],
|
||||
}).items():
|
||||
self.assertEqual(
|
||||
c[0],
|
||||
chooser.health(self.new_line(True, c[1], not "nothing" in name)),
|
||||
name,
|
||||
)
|
||||
|
||||
def test_latest_sticky(self):
|
||||
stale_something = self.new_line(False, 1, 1)
|
||||
stale_nothing = self.new_line(False, 1, None)
|
||||
old_something = self.new_line(True, 2, 2)
|
||||
old_nothing = self.new_line(True, 2, None)
|
||||
new_something = self.new_line(True, 3, 3)
|
||||
new_nothing = self.new_line(True, 3, None)
|
||||
|
||||
chooser = writer.LineChooserLatestSticky()
|
||||
|
||||
for name, c in ({
|
||||
"nothing over stale": [new_nothing, stale_something],
|
||||
"slightly old over nothing": [old_something, new_nothing],
|
||||
"new and nonzero": [new_something, old_something],
|
||||
"new over nothing": [new_something, old_nothing],
|
||||
"new over stale nothing": [new_something, stale_nothing],
|
||||
}).items():
|
||||
self.assertEqual(
|
||||
c[0],
|
||||
chooser.choose(c[0], c[1]),
|
||||
name,
|
||||
)
|
||||
|
||||
def new_line(self, is_recent, t, v):
|
||||
result = writer.Line(v)
|
||||
result.t = t
|
||||
result.is_recent = lambda *args: is_recent
|
||||
return result
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -2,17 +2,19 @@ import time
|
||||
import json
|
||||
import pyautogui
|
||||
import log
|
||||
import random
|
||||
|
||||
class Writer:
|
||||
def __init__(self, writer):
|
||||
self.writer = writer
|
||||
self.previous = Line([])
|
||||
self.chooser = LineChooserLatestSticky()
|
||||
self.chooser = LineChooserSimulatedAnnealing()
|
||||
|
||||
def write(self, v):
|
||||
latest = Line(v)
|
||||
chosen = Line.choose(self.previous, latest)
|
||||
if chosen.v != self.previous.v:
|
||||
self.writer.write(chosen)
|
||||
chosen = self.chooser.choose(self.previous, latest)
|
||||
self.writer.write(chosen)
|
||||
self.previous = chosen
|
||||
|
||||
def close(self):
|
||||
@@ -28,14 +30,19 @@ class MultiWriter:
|
||||
w.write(v)
|
||||
|
||||
class StdoutWriter:
|
||||
def __init__(self):
|
||||
self.last = ""
|
||||
def write(self, v):
|
||||
if str(v) == self.last:
|
||||
return
|
||||
self.last = str(v)
|
||||
print(v)
|
||||
|
||||
class PyAutoGUIWriter:
|
||||
def __init__(self, optional_translation):
|
||||
def __init__(self, translation):
|
||||
pyautogui.FAILSAFE = False
|
||||
self.keys_down = set()
|
||||
self.translation = optional_translation
|
||||
self.translation = translation
|
||||
log.info(json.dumps(self.translation, indent=" "))
|
||||
|
||||
def write(self, v):
|
||||
@@ -45,24 +52,31 @@ class PyAutoGUIWriter:
|
||||
if got:
|
||||
to_push.add(got)
|
||||
for key in [k for k in self.keys_down]:
|
||||
if not key in to_push:
|
||||
if not key in [i[0] for i in to_push]:
|
||||
self.release(key)
|
||||
for key in to_push:
|
||||
self.push(key)
|
||||
self.push(key[0], key[1])
|
||||
log.info("PUSHING", self.keys_down)
|
||||
|
||||
# https://pyautogui.readthedocs.io/en/latest/keyboard.html#keyboard-keys
|
||||
def translate(self, v):
|
||||
result = self.translation.get(v, {}).get("key")
|
||||
if result:
|
||||
return result
|
||||
result = self.translation.get(v, {})
|
||||
key = result.get("key")
|
||||
hold = result.get("hold")
|
||||
if key:
|
||||
return (key, hold)
|
||||
|
||||
def push(self, k):
|
||||
def push(self, k, hold):
|
||||
self.keys_down.add(k)
|
||||
pyautogui.keyDown(k)
|
||||
if not hold:
|
||||
self._release(k)
|
||||
|
||||
def release(self, k):
|
||||
self.keys_down.remove(k)
|
||||
self._release(k)
|
||||
|
||||
def _release(self, k):
|
||||
pyautogui.keyUp(k)
|
||||
|
||||
class Line:
|
||||
@@ -73,13 +87,47 @@ class Line:
|
||||
def __str__(self):
|
||||
return json.dumps({"t":self.t, "v": self.v})
|
||||
|
||||
def choose(a, b):
|
||||
def is_nothing(self):
|
||||
return not self.v
|
||||
|
||||
def is_recent(self):
|
||||
return time.time() - self.t < 5
|
||||
|
||||
class LineChooser:
|
||||
def choose(self, a, b):
|
||||
if not a:
|
||||
return b
|
||||
if not b:
|
||||
return a
|
||||
latest = max([a,b], key=lambda x:x.t)
|
||||
oldest = min([a,b], key=lambda x:x.t)
|
||||
if not latest.v and time.time() - oldest.t < 1:
|
||||
return self._choose(latest, oldest)
|
||||
|
||||
def now(self):
|
||||
return time.time()
|
||||
|
||||
class LineChooserLatestSticky(LineChooser):
|
||||
def _choose(self, latest, oldest):
|
||||
if latest.is_nothing() and oldest.is_recent():
|
||||
return oldest
|
||||
return latest
|
||||
|
||||
class LineChooserSimulatedAnnealing(LineChooser):
|
||||
initial_health = 50
|
||||
nothing_penalty = 100
|
||||
decay_rate = 5
|
||||
|
||||
def _choose(self, latest, oldest):
|
||||
oldest_health = self.health(oldest)
|
||||
seed = random.randint(0, 100)
|
||||
if seed < oldest_health:
|
||||
print("LineChooserSimulatedAnnealing retaining old because", seed, "<", oldest_health)
|
||||
return oldest
|
||||
return latest
|
||||
|
||||
def health(self, line):
|
||||
health = LineChooserSimulatedAnnealing.initial_health
|
||||
health = health - LineChooserSimulatedAnnealing.decay_rate * (self.now() - line.t)
|
||||
if line.is_nothing():
|
||||
health -= LineChooserSimulatedAnnealing.nothing_penalty
|
||||
return health
|
||||
|
||||
Reference in New Issue
Block a user