Compare commits
10 Commits
7f8c87c7bc
...
f2bce26ecb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f2bce26ecb | ||
|
|
582ee7a0d2 | ||
|
|
2914c64fda | ||
|
|
c6b648195f | ||
|
|
ac7b583f76 | ||
|
|
8a9cb5a82d | ||
|
|
4b846fd9ec | ||
|
|
1da2d9fe9c | ||
|
|
dbff6792fa | ||
|
|
f5c6c37868 |
@@ -1,24 +1,25 @@
|
|||||||
import random
|
import random
|
||||||
import time
|
import time
|
||||||
|
import math
|
||||||
|
|
||||||
class Bucket:
|
class Bucket:
|
||||||
def __init__(self, interval):
|
def __init__(self, interval):
|
||||||
random.seed(time.time())
|
random.seed(time.time())
|
||||||
self.interval = interval
|
self.interval = interval
|
||||||
self.name = self.ms_to_bucket(Bucket.now_ms())
|
self.name = self.ms_to_bucket(Bucket.now_ms())
|
||||||
|
self.chooser = BucketChooserProportionalLogRandom()
|
||||||
self.content = []
|
self.content = []
|
||||||
|
|
||||||
def push(self, v):
|
def push(self, v):
|
||||||
if v:
|
if v:
|
||||||
self.content.append(v)
|
self._push(v)
|
||||||
|
|
||||||
def pick_n(self, n):
|
def _push(self, v):
|
||||||
if not self.content:
|
for i in self.content:
|
||||||
return []
|
if i[0] == v:
|
||||||
result = []
|
i[1] += 1
|
||||||
for i in range(0, n):
|
return
|
||||||
result.append(self.content[random.randint(0, len(self.content)-1)])
|
self.content.append([v, 1])
|
||||||
return list(set(result))
|
|
||||||
|
|
||||||
def now_ms():
|
def now_ms():
|
||||||
return int(1000 * time.time())
|
return int(1000 * time.time())
|
||||||
@@ -26,3 +27,34 @@ class Bucket:
|
|||||||
def ms_to_bucket(self, ms):
|
def ms_to_bucket(self, ms):
|
||||||
return int(int(ms // self.interval) * self.interval)
|
return int(int(ms // self.interval) * self.interval)
|
||||||
|
|
||||||
|
def pick_n(self, n):
|
||||||
|
if not self.content:
|
||||||
|
return []
|
||||||
|
result = set()
|
||||||
|
for i in range(n):
|
||||||
|
result.add(self.chooser.choose(self.content))
|
||||||
|
return list(result)
|
||||||
|
|
||||||
|
class BucketChooserProportionalLogRandom:
|
||||||
|
def choose(self, content):
|
||||||
|
content = self.xform(content)
|
||||||
|
assert(content)
|
||||||
|
idx = random.randint(0, sum([i[1] for i in content])-1)
|
||||||
|
while content:
|
||||||
|
candidate = content.pop()
|
||||||
|
idx -= candidate[1]
|
||||||
|
if idx <= 0:
|
||||||
|
return candidate[0]
|
||||||
|
raise Exception("how?!")
|
||||||
|
|
||||||
|
def xform(self, content):
|
||||||
|
return [
|
||||||
|
(
|
||||||
|
i[0],
|
||||||
|
1 + int(100 * math.log2(i[1])),
|
||||||
|
) for i in content
|
||||||
|
]
|
||||||
|
|
||||||
|
class BucketChooserProportionalRandom(BucketChooserProportionalLogRandom):
|
||||||
|
def xform(self, content):
|
||||||
|
return [(i[0], 1+int(100*i[1])) for i in content]
|
||||||
|
|||||||
16
src/main.py
16
src/main.py
@@ -15,15 +15,23 @@ def main():
|
|||||||
ap.add_argument("--max-keys-down", default=2, type=int)
|
ap.add_argument("--max-keys-down", default=2, type=int)
|
||||||
ap.add_argument("--startup", default=3, type=int)
|
ap.add_argument("--startup", default=3, type=int)
|
||||||
ap.add_argument("--controller-setup", default=False, action="store_true")
|
ap.add_argument("--controller-setup", default=False, action="store_true")
|
||||||
|
ap.add_argument("--simulated-annealing-initial-health", default=50, type=int)
|
||||||
|
ap.add_argument("--simulated-annealing-nothing-penalty", default=100, type=int)
|
||||||
|
ap.add_argument("--simulated-annealing-decay-rate", default=5, type=int)
|
||||||
ap.add_argument("--translation", default=json.dumps({
|
ap.add_argument("--translation", default=json.dumps({
|
||||||
"up": {"key": "w", "weight": 20},
|
"up": {"key": "w", "weight": 20, "hold": False},
|
||||||
"down": {"key": "s", "weight": 1},
|
"down": {"key": "s", "weight": 1, "hold": True},
|
||||||
"left": {"key": "a", "weight": 1},
|
"left": {"key": "a", "weight": 1, "hold": True},
|
||||||
"right": {"key": "d", "weight": 20},
|
"right": {"key": "d", "weight": 20, "hold": True},
|
||||||
}))
|
}))
|
||||||
args = ap.parse_args()
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
writer.LineChooserSimulatedAnnealing.initial_health = args.simulated_annealing_initial_health
|
||||||
|
writer.LineChooserSimulatedAnnealing.nothing_penalty = args.simulated_annealing_nothing_penalty
|
||||||
|
writer.LineChooserSimulatedAnnealing.decay_rate = args.simulated_annealing_decay_rate
|
||||||
|
|
||||||
w_translation = json.loads(args.translation) if args.translation else None
|
w_translation = json.loads(args.translation) if args.translation else None
|
||||||
|
|
||||||
w = writer.Writer(writer.PyAutoGUIWriter(w_translation))
|
w = writer.Writer(writer.PyAutoGUIWriter(w_translation))
|
||||||
if args.stdout:
|
if args.stdout:
|
||||||
w = writer.Writer(writer.StdoutWriter())
|
w = writer.Writer(writer.StdoutWriter())
|
||||||
|
|||||||
@@ -1,10 +1,13 @@
|
|||||||
import threading
|
import threading
|
||||||
|
import math
|
||||||
|
import json
|
||||||
import queue
|
import queue
|
||||||
import time
|
import time
|
||||||
import sys
|
import sys
|
||||||
import select
|
import select
|
||||||
import random
|
import random
|
||||||
import log
|
import log
|
||||||
|
import bucket
|
||||||
|
|
||||||
__interval__ = .1
|
__interval__ = .1
|
||||||
|
|
||||||
@@ -67,45 +70,26 @@ class StdinReader:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
class RandomReader:
|
class RandomReader:
|
||||||
def __init__(self, keys={"a":{"weight":1}, "b":{"weight":1}}):
|
def __init__(self, translation):
|
||||||
self.keys = keys
|
self.translation = translation
|
||||||
self.pool = RandomPool(keys)
|
self.pool = bucket.Bucket(1)
|
||||||
|
self.pool.chooser = bucket.BucketChooserProportionalRandom()
|
||||||
|
|
||||||
|
total_weight = float(sum([i["weight"] for i in translation.values()]))
|
||||||
|
for v in translation.values():
|
||||||
|
v["weight"] = float(v["weight"]) / total_weight
|
||||||
|
for v in translation.values():
|
||||||
|
v["weight"] = int(1000 * math.sqrt(v["weight"]))
|
||||||
|
print(1, json.dumps(translation, indent=" "))
|
||||||
|
for k,v in translation.items():
|
||||||
|
for i in range(v["weight"]):
|
||||||
|
self.pool.push(k)
|
||||||
|
|
||||||
def read(self):
|
def read(self):
|
||||||
return self.pool.pop()
|
result = self.pool.pick_n(1)
|
||||||
|
if not result:
|
||||||
class RandomPool:
|
return None
|
||||||
def __init__(self, values):
|
return result[0]
|
||||||
self.values = {k:1000*v["weight"] for k,v in values.items()}
|
|
||||||
self.total = sum(self.values.values())
|
|
||||||
self.consumed = set()
|
|
||||||
|
|
||||||
def reset(self):
|
|
||||||
self.consumed = set()
|
|
||||||
|
|
||||||
def pop(self):
|
|
||||||
k = self._pop()
|
|
||||||
for i in range(0, 3-1):
|
|
||||||
if k in self.consumed:
|
|
||||||
k = self._pop()
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
self.consumed.add(k)
|
|
||||||
if self.should_reset():
|
|
||||||
self.reset()
|
|
||||||
return k
|
|
||||||
|
|
||||||
def _pop(self):
|
|
||||||
idx = random.randint(0, self.total-1)
|
|
||||||
idx_offset = 0
|
|
||||||
for k in sorted(self.values.keys()):
|
|
||||||
if self.values[k] > idx+idx_offset:
|
|
||||||
return k
|
|
||||||
idx_offset -= self.values[k]
|
|
||||||
raise Exception(":(")
|
|
||||||
|
|
||||||
def should_reset(self):
|
|
||||||
return len(self.consumed) >= len(self.values)/2
|
|
||||||
|
|
||||||
class FileReader:
|
class FileReader:
|
||||||
def __init__(self, path):
|
def __init__(self, path):
|
||||||
|
|||||||
45
src/test_bucket.py
Normal file
45
src/test_bucket.py
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
import unittest
|
||||||
|
|
||||||
|
import bucket
|
||||||
|
|
||||||
|
class TestBucket(unittest.TestCase):
|
||||||
|
def test(self):
|
||||||
|
bkt = bucket.Bucket(1)
|
||||||
|
for i in range(50):
|
||||||
|
bkt.push(1)
|
||||||
|
bkt.push(2)
|
||||||
|
|
||||||
|
self.assertEqual(2, len(bkt.content))
|
||||||
|
for i in range(1000):
|
||||||
|
result = bkt.pick_n(2)
|
||||||
|
self.assertTrue(result)
|
||||||
|
self.assertTrue(len(result) <= 2)
|
||||||
|
|
||||||
|
result = bkt.pick_n(2)
|
||||||
|
while len(result) != 2:
|
||||||
|
result = bkt.pick_n(2)
|
||||||
|
|
||||||
|
result = bkt.pick_n(2)
|
||||||
|
while len(result) != 1:
|
||||||
|
result = bkt.pick_n(2)
|
||||||
|
|
||||||
|
class TestBucketChooserProportionalLogRandom(unittest.TestCase):
|
||||||
|
def test_xform(self):
|
||||||
|
chooser = bucket.BucketChooserProportionalLogRandom()
|
||||||
|
self.assertFalse(bool(chooser.xform([])))
|
||||||
|
self.assertEqual(1, len(chooser.xform([ (1,1) ])))
|
||||||
|
self.assertEqual(2, len(chooser.xform([ (1,1) ])[0]))
|
||||||
|
|
||||||
|
class TestBucketChoosers(unittest.TestCase):
|
||||||
|
def test_choosers(self):
|
||||||
|
for chooser in [
|
||||||
|
bucket.BucketChooserProportionalLogRandom(),
|
||||||
|
bucket.BucketChooserProportionalRandom(),
|
||||||
|
]:
|
||||||
|
with self.assertRaises(AssertionError):
|
||||||
|
self.assertRaises(chooser.choose([]))
|
||||||
|
self.assertTrue(chooser.choose([(1,1)]))
|
||||||
|
self.assertEqual(1, chooser.choose([(1,1)]))
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
60
src/test_writer.py
Normal file
60
src/test_writer.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
import unittest
|
||||||
|
|
||||||
|
import writer
|
||||||
|
|
||||||
|
class TestLineChooser(unittest.TestCase):
|
||||||
|
def test_simulated_annealing(self):
|
||||||
|
chooser = writer.LineChooserSimulatedAnnealing()
|
||||||
|
chooser.now = lambda *args: 100
|
||||||
|
|
||||||
|
for name, c in ({
|
||||||
|
"fresh something": [
|
||||||
|
writer.LineChooserSimulatedAnnealing.initial_health,
|
||||||
|
chooser.now(),
|
||||||
|
],
|
||||||
|
"old something": [
|
||||||
|
writer.LineChooserSimulatedAnnealing.initial_health - writer.LineChooserSimulatedAnnealing.decay_rate * 5,
|
||||||
|
chooser.now() - 5,
|
||||||
|
],
|
||||||
|
"fresh nothing": [
|
||||||
|
writer.LineChooserSimulatedAnnealing.initial_health - writer.LineChooserSimulatedAnnealing.nothing_penalty,
|
||||||
|
chooser.now(),
|
||||||
|
],
|
||||||
|
}).items():
|
||||||
|
self.assertEqual(
|
||||||
|
c[0],
|
||||||
|
chooser.health(self.new_line(True, c[1], not "nothing" in name)),
|
||||||
|
name,
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_latest_sticky(self):
|
||||||
|
stale_something = self.new_line(False, 1, 1)
|
||||||
|
stale_nothing = self.new_line(False, 1, None)
|
||||||
|
old_something = self.new_line(True, 2, 2)
|
||||||
|
old_nothing = self.new_line(True, 2, None)
|
||||||
|
new_something = self.new_line(True, 3, 3)
|
||||||
|
new_nothing = self.new_line(True, 3, None)
|
||||||
|
|
||||||
|
chooser = writer.LineChooserLatestSticky()
|
||||||
|
|
||||||
|
for name, c in ({
|
||||||
|
"nothing over stale": [new_nothing, stale_something],
|
||||||
|
"slightly old over nothing": [old_something, new_nothing],
|
||||||
|
"new and nonzero": [new_something, old_something],
|
||||||
|
"new over nothing": [new_something, old_nothing],
|
||||||
|
"new over stale nothing": [new_something, stale_nothing],
|
||||||
|
}).items():
|
||||||
|
self.assertEqual(
|
||||||
|
c[0],
|
||||||
|
chooser.choose(c[0], c[1]),
|
||||||
|
name,
|
||||||
|
)
|
||||||
|
|
||||||
|
def new_line(self, is_recent, t, v):
|
||||||
|
result = writer.Line(v)
|
||||||
|
result.t = t
|
||||||
|
result.is_recent = lambda *args: is_recent
|
||||||
|
return result
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
@@ -2,17 +2,19 @@ import time
|
|||||||
import json
|
import json
|
||||||
import pyautogui
|
import pyautogui
|
||||||
import log
|
import log
|
||||||
|
import random
|
||||||
|
|
||||||
class Writer:
|
class Writer:
|
||||||
def __init__(self, writer):
|
def __init__(self, writer):
|
||||||
self.writer = writer
|
self.writer = writer
|
||||||
self.previous = Line([])
|
self.previous = Line([])
|
||||||
|
self.chooser = LineChooserLatestSticky()
|
||||||
|
self.chooser = LineChooserSimulatedAnnealing()
|
||||||
|
|
||||||
def write(self, v):
|
def write(self, v):
|
||||||
latest = Line(v)
|
latest = Line(v)
|
||||||
chosen = Line.choose(self.previous, latest)
|
chosen = self.chooser.choose(self.previous, latest)
|
||||||
if chosen.v != self.previous.v:
|
self.writer.write(chosen)
|
||||||
self.writer.write(chosen)
|
|
||||||
self.previous = chosen
|
self.previous = chosen
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
@@ -28,14 +30,19 @@ class MultiWriter:
|
|||||||
w.write(v)
|
w.write(v)
|
||||||
|
|
||||||
class StdoutWriter:
|
class StdoutWriter:
|
||||||
|
def __init__(self):
|
||||||
|
self.last = ""
|
||||||
def write(self, v):
|
def write(self, v):
|
||||||
|
if str(v) == self.last:
|
||||||
|
return
|
||||||
|
self.last = str(v)
|
||||||
print(v)
|
print(v)
|
||||||
|
|
||||||
class PyAutoGUIWriter:
|
class PyAutoGUIWriter:
|
||||||
def __init__(self, optional_translation):
|
def __init__(self, translation):
|
||||||
pyautogui.FAILSAFE = False
|
pyautogui.FAILSAFE = False
|
||||||
self.keys_down = set()
|
self.keys_down = set()
|
||||||
self.translation = optional_translation
|
self.translation = translation
|
||||||
log.info(json.dumps(self.translation, indent=" "))
|
log.info(json.dumps(self.translation, indent=" "))
|
||||||
|
|
||||||
def write(self, v):
|
def write(self, v):
|
||||||
@@ -45,24 +52,31 @@ class PyAutoGUIWriter:
|
|||||||
if got:
|
if got:
|
||||||
to_push.add(got)
|
to_push.add(got)
|
||||||
for key in [k for k in self.keys_down]:
|
for key in [k for k in self.keys_down]:
|
||||||
if not key in to_push:
|
if not key in [i[0] for i in to_push]:
|
||||||
self.release(key)
|
self.release(key)
|
||||||
for key in to_push:
|
for key in to_push:
|
||||||
self.push(key)
|
self.push(key[0], key[1])
|
||||||
log.info("PUSHING", self.keys_down)
|
log.info("PUSHING", self.keys_down)
|
||||||
|
|
||||||
# https://pyautogui.readthedocs.io/en/latest/keyboard.html#keyboard-keys
|
# https://pyautogui.readthedocs.io/en/latest/keyboard.html#keyboard-keys
|
||||||
def translate(self, v):
|
def translate(self, v):
|
||||||
result = self.translation.get(v, {}).get("key")
|
result = self.translation.get(v, {})
|
||||||
if result:
|
key = result.get("key")
|
||||||
return result
|
hold = result.get("hold")
|
||||||
|
if key:
|
||||||
|
return (key, hold)
|
||||||
|
|
||||||
def push(self, k):
|
def push(self, k, hold):
|
||||||
self.keys_down.add(k)
|
self.keys_down.add(k)
|
||||||
pyautogui.keyDown(k)
|
pyautogui.keyDown(k)
|
||||||
|
if not hold:
|
||||||
|
self._release(k)
|
||||||
|
|
||||||
def release(self, k):
|
def release(self, k):
|
||||||
self.keys_down.remove(k)
|
self.keys_down.remove(k)
|
||||||
|
self._release(k)
|
||||||
|
|
||||||
|
def _release(self, k):
|
||||||
pyautogui.keyUp(k)
|
pyautogui.keyUp(k)
|
||||||
|
|
||||||
class Line:
|
class Line:
|
||||||
@@ -73,13 +87,47 @@ class Line:
|
|||||||
def __str__(self):
|
def __str__(self):
|
||||||
return json.dumps({"t":self.t, "v": self.v})
|
return json.dumps({"t":self.t, "v": self.v})
|
||||||
|
|
||||||
def choose(a, b):
|
def is_nothing(self):
|
||||||
|
return not self.v
|
||||||
|
|
||||||
|
def is_recent(self):
|
||||||
|
return time.time() - self.t < 5
|
||||||
|
|
||||||
|
class LineChooser:
|
||||||
|
def choose(self, a, b):
|
||||||
if not a:
|
if not a:
|
||||||
return b
|
return b
|
||||||
if not b:
|
if not b:
|
||||||
return a
|
return a
|
||||||
latest = max([a,b], key=lambda x:x.t)
|
latest = max([a,b], key=lambda x:x.t)
|
||||||
oldest = min([a,b], key=lambda x:x.t)
|
oldest = min([a,b], key=lambda x:x.t)
|
||||||
if not latest.v and time.time() - oldest.t < 1:
|
return self._choose(latest, oldest)
|
||||||
|
|
||||||
|
def now(self):
|
||||||
|
return time.time()
|
||||||
|
|
||||||
|
class LineChooserLatestSticky(LineChooser):
|
||||||
|
def _choose(self, latest, oldest):
|
||||||
|
if latest.is_nothing() and oldest.is_recent():
|
||||||
return oldest
|
return oldest
|
||||||
return latest
|
return latest
|
||||||
|
|
||||||
|
class LineChooserSimulatedAnnealing(LineChooser):
|
||||||
|
initial_health = 50
|
||||||
|
nothing_penalty = 100
|
||||||
|
decay_rate = 5
|
||||||
|
|
||||||
|
def _choose(self, latest, oldest):
|
||||||
|
oldest_health = self.health(oldest)
|
||||||
|
seed = random.randint(0, 100)
|
||||||
|
if seed < oldest_health:
|
||||||
|
print("LineChooserSimulatedAnnealing retaining old because", seed, "<", oldest_health)
|
||||||
|
return oldest
|
||||||
|
return latest
|
||||||
|
|
||||||
|
def health(self, line):
|
||||||
|
health = LineChooserSimulatedAnnealing.initial_health
|
||||||
|
health = health - LineChooserSimulatedAnnealing.decay_rate * (self.now() - line.t)
|
||||||
|
if line.is_nothing():
|
||||||
|
health -= LineChooserSimulatedAnnealing.nothing_penalty
|
||||||
|
return health
|
||||||
|
|||||||
Reference in New Issue
Block a user