Compare commits

..

10 Commits

Author SHA1 Message Date
Bel LaPointe
f2bce26ecb almost, but cant press multi keys atm 2022-09-20 16:19:26 -06:00
Bel LaPointe
582ee7a0d2 simulated annealing CLI changeable 2022-09-20 15:56:54 -06:00
Bel LaPointe
2914c64fda simulated annealing output chooser 2022-09-20 15:52:41 -06:00
Bel LaPointe
c6b648195f test line chooser sticky 2022-09-20 15:39:43 -06:00
Bel LaPointe
ac7b583f76 objectify 2022-09-20 15:29:48 -06:00
Bel LaPointe
8a9cb5a82d ok thats pretty good but what can simulated annealing do 2022-09-20 15:22:59 -06:00
Bel LaPointe
4b846fd9ec reader users bucket with non log chooser 2022-09-20 15:16:13 -06:00
Bel LaPointe
1da2d9fe9c one more 2022-09-20 15:04:37 -06:00
Bel LaPointe
dbff6792fa test bucket 2022-09-20 15:03:38 -06:00
bel
f5c6c37868 todo 2022-09-18 19:42:58 -06:00
6 changed files with 239 additions and 62 deletions

View File

@@ -1,24 +1,25 @@
import random import random
import time import time
import math
class Bucket: class Bucket:
def __init__(self, interval): def __init__(self, interval):
random.seed(time.time()) random.seed(time.time())
self.interval = interval self.interval = interval
self.name = self.ms_to_bucket(Bucket.now_ms()) self.name = self.ms_to_bucket(Bucket.now_ms())
self.chooser = BucketChooserProportionalLogRandom()
self.content = [] self.content = []
def push(self, v): def push(self, v):
if v: if v:
self.content.append(v) self._push(v)
def pick_n(self, n): def _push(self, v):
if not self.content: for i in self.content:
return [] if i[0] == v:
result = [] i[1] += 1
for i in range(0, n): return
result.append(self.content[random.randint(0, len(self.content)-1)]) self.content.append([v, 1])
return list(set(result))
def now_ms(): def now_ms():
return int(1000 * time.time()) return int(1000 * time.time())
@@ -26,3 +27,34 @@ class Bucket:
def ms_to_bucket(self, ms): def ms_to_bucket(self, ms):
return int(int(ms // self.interval) * self.interval) return int(int(ms // self.interval) * self.interval)
def pick_n(self, n):
if not self.content:
return []
result = set()
for i in range(n):
result.add(self.chooser.choose(self.content))
return list(result)
class BucketChooserProportionalLogRandom:
def choose(self, content):
content = self.xform(content)
assert(content)
idx = random.randint(0, sum([i[1] for i in content])-1)
while content:
candidate = content.pop()
idx -= candidate[1]
if idx <= 0:
return candidate[0]
raise Exception("how?!")
def xform(self, content):
return [
(
i[0],
1 + int(100 * math.log2(i[1])),
) for i in content
]
class BucketChooserProportionalRandom(BucketChooserProportionalLogRandom):
def xform(self, content):
return [(i[0], 1+int(100*i[1])) for i in content]

View File

@@ -15,15 +15,23 @@ def main():
ap.add_argument("--max-keys-down", default=2, type=int) ap.add_argument("--max-keys-down", default=2, type=int)
ap.add_argument("--startup", default=3, type=int) ap.add_argument("--startup", default=3, type=int)
ap.add_argument("--controller-setup", default=False, action="store_true") ap.add_argument("--controller-setup", default=False, action="store_true")
ap.add_argument("--simulated-annealing-initial-health", default=50, type=int)
ap.add_argument("--simulated-annealing-nothing-penalty", default=100, type=int)
ap.add_argument("--simulated-annealing-decay-rate", default=5, type=int)
ap.add_argument("--translation", default=json.dumps({ ap.add_argument("--translation", default=json.dumps({
"up": {"key": "w", "weight": 20}, "up": {"key": "w", "weight": 20, "hold": False},
"down": {"key": "s", "weight": 1}, "down": {"key": "s", "weight": 1, "hold": True},
"left": {"key": "a", "weight": 1}, "left": {"key": "a", "weight": 1, "hold": True},
"right": {"key": "d", "weight": 20}, "right": {"key": "d", "weight": 20, "hold": True},
})) }))
args = ap.parse_args() args = ap.parse_args()
writer.LineChooserSimulatedAnnealing.initial_health = args.simulated_annealing_initial_health
writer.LineChooserSimulatedAnnealing.nothing_penalty = args.simulated_annealing_nothing_penalty
writer.LineChooserSimulatedAnnealing.decay_rate = args.simulated_annealing_decay_rate
w_translation = json.loads(args.translation) if args.translation else None w_translation = json.loads(args.translation) if args.translation else None
w = writer.Writer(writer.PyAutoGUIWriter(w_translation)) w = writer.Writer(writer.PyAutoGUIWriter(w_translation))
if args.stdout: if args.stdout:
w = writer.Writer(writer.StdoutWriter()) w = writer.Writer(writer.StdoutWriter())

View File

@@ -1,10 +1,13 @@
import threading import threading
import math
import json
import queue import queue
import time import time
import sys import sys
import select import select
import random import random
import log import log
import bucket
__interval__ = .1 __interval__ = .1
@@ -67,45 +70,26 @@ class StdinReader:
return None return None
class RandomReader: class RandomReader:
def __init__(self, keys={"a":{"weight":1}, "b":{"weight":1}}): def __init__(self, translation):
self.keys = keys self.translation = translation
self.pool = RandomPool(keys) self.pool = bucket.Bucket(1)
self.pool.chooser = bucket.BucketChooserProportionalRandom()
total_weight = float(sum([i["weight"] for i in translation.values()]))
for v in translation.values():
v["weight"] = float(v["weight"]) / total_weight
for v in translation.values():
v["weight"] = int(1000 * math.sqrt(v["weight"]))
print(1, json.dumps(translation, indent=" "))
for k,v in translation.items():
for i in range(v["weight"]):
self.pool.push(k)
def read(self): def read(self):
return self.pool.pop() result = self.pool.pick_n(1)
if not result:
class RandomPool: return None
def __init__(self, values): return result[0]
self.values = {k:1000*v["weight"] for k,v in values.items()}
self.total = sum(self.values.values())
self.consumed = set()
def reset(self):
self.consumed = set()
def pop(self):
k = self._pop()
for i in range(0, 3-1):
if k in self.consumed:
k = self._pop()
else:
break
self.consumed.add(k)
if self.should_reset():
self.reset()
return k
def _pop(self):
idx = random.randint(0, self.total-1)
idx_offset = 0
for k in sorted(self.values.keys()):
if self.values[k] > idx+idx_offset:
return k
idx_offset -= self.values[k]
raise Exception(":(")
def should_reset(self):
return len(self.consumed) >= len(self.values)/2
class FileReader: class FileReader:
def __init__(self, path): def __init__(self, path):

45
src/test_bucket.py Normal file
View File

@@ -0,0 +1,45 @@
import unittest
import bucket
class TestBucket(unittest.TestCase):
def test(self):
bkt = bucket.Bucket(1)
for i in range(50):
bkt.push(1)
bkt.push(2)
self.assertEqual(2, len(bkt.content))
for i in range(1000):
result = bkt.pick_n(2)
self.assertTrue(result)
self.assertTrue(len(result) <= 2)
result = bkt.pick_n(2)
while len(result) != 2:
result = bkt.pick_n(2)
result = bkt.pick_n(2)
while len(result) != 1:
result = bkt.pick_n(2)
class TestBucketChooserProportionalLogRandom(unittest.TestCase):
def test_xform(self):
chooser = bucket.BucketChooserProportionalLogRandom()
self.assertFalse(bool(chooser.xform([])))
self.assertEqual(1, len(chooser.xform([ (1,1) ])))
self.assertEqual(2, len(chooser.xform([ (1,1) ])[0]))
class TestBucketChoosers(unittest.TestCase):
def test_choosers(self):
for chooser in [
bucket.BucketChooserProportionalLogRandom(),
bucket.BucketChooserProportionalRandom(),
]:
with self.assertRaises(AssertionError):
self.assertRaises(chooser.choose([]))
self.assertTrue(chooser.choose([(1,1)]))
self.assertEqual(1, chooser.choose([(1,1)]))
if __name__ == "__main__":
unittest.main()

60
src/test_writer.py Normal file
View File

@@ -0,0 +1,60 @@
import unittest
import writer
class TestLineChooser(unittest.TestCase):
def test_simulated_annealing(self):
chooser = writer.LineChooserSimulatedAnnealing()
chooser.now = lambda *args: 100
for name, c in ({
"fresh something": [
writer.LineChooserSimulatedAnnealing.initial_health,
chooser.now(),
],
"old something": [
writer.LineChooserSimulatedAnnealing.initial_health - writer.LineChooserSimulatedAnnealing.decay_rate * 5,
chooser.now() - 5,
],
"fresh nothing": [
writer.LineChooserSimulatedAnnealing.initial_health - writer.LineChooserSimulatedAnnealing.nothing_penalty,
chooser.now(),
],
}).items():
self.assertEqual(
c[0],
chooser.health(self.new_line(True, c[1], not "nothing" in name)),
name,
)
def test_latest_sticky(self):
stale_something = self.new_line(False, 1, 1)
stale_nothing = self.new_line(False, 1, None)
old_something = self.new_line(True, 2, 2)
old_nothing = self.new_line(True, 2, None)
new_something = self.new_line(True, 3, 3)
new_nothing = self.new_line(True, 3, None)
chooser = writer.LineChooserLatestSticky()
for name, c in ({
"nothing over stale": [new_nothing, stale_something],
"slightly old over nothing": [old_something, new_nothing],
"new and nonzero": [new_something, old_something],
"new over nothing": [new_something, old_nothing],
"new over stale nothing": [new_something, stale_nothing],
}).items():
self.assertEqual(
c[0],
chooser.choose(c[0], c[1]),
name,
)
def new_line(self, is_recent, t, v):
result = writer.Line(v)
result.t = t
result.is_recent = lambda *args: is_recent
return result
if __name__ == "__main__":
unittest.main()

View File

@@ -2,17 +2,19 @@ import time
import json import json
import pyautogui import pyautogui
import log import log
import random
class Writer: class Writer:
def __init__(self, writer): def __init__(self, writer):
self.writer = writer self.writer = writer
self.previous = Line([]) self.previous = Line([])
self.chooser = LineChooserLatestSticky()
self.chooser = LineChooserSimulatedAnnealing()
def write(self, v): def write(self, v):
latest = Line(v) latest = Line(v)
chosen = Line.choose(self.previous, latest) chosen = self.chooser.choose(self.previous, latest)
if chosen.v != self.previous.v: self.writer.write(chosen)
self.writer.write(chosen)
self.previous = chosen self.previous = chosen
def close(self): def close(self):
@@ -28,14 +30,19 @@ class MultiWriter:
w.write(v) w.write(v)
class StdoutWriter: class StdoutWriter:
def __init__(self):
self.last = ""
def write(self, v): def write(self, v):
if str(v) == self.last:
return
self.last = str(v)
print(v) print(v)
class PyAutoGUIWriter: class PyAutoGUIWriter:
def __init__(self, optional_translation): def __init__(self, translation):
pyautogui.FAILSAFE = False pyautogui.FAILSAFE = False
self.keys_down = set() self.keys_down = set()
self.translation = optional_translation self.translation = translation
log.info(json.dumps(self.translation, indent=" ")) log.info(json.dumps(self.translation, indent=" "))
def write(self, v): def write(self, v):
@@ -45,24 +52,31 @@ class PyAutoGUIWriter:
if got: if got:
to_push.add(got) to_push.add(got)
for key in [k for k in self.keys_down]: for key in [k for k in self.keys_down]:
if not key in to_push: if not key in [i[0] for i in to_push]:
self.release(key) self.release(key)
for key in to_push: for key in to_push:
self.push(key) self.push(key[0], key[1])
log.info("PUSHING", self.keys_down) log.info("PUSHING", self.keys_down)
# https://pyautogui.readthedocs.io/en/latest/keyboard.html#keyboard-keys # https://pyautogui.readthedocs.io/en/latest/keyboard.html#keyboard-keys
def translate(self, v): def translate(self, v):
result = self.translation.get(v, {}).get("key") result = self.translation.get(v, {})
if result: key = result.get("key")
return result hold = result.get("hold")
if key:
return (key, hold)
def push(self, k): def push(self, k, hold):
self.keys_down.add(k) self.keys_down.add(k)
pyautogui.keyDown(k) pyautogui.keyDown(k)
if not hold:
self._release(k)
def release(self, k): def release(self, k):
self.keys_down.remove(k) self.keys_down.remove(k)
self._release(k)
def _release(self, k):
pyautogui.keyUp(k) pyautogui.keyUp(k)
class Line: class Line:
@@ -73,13 +87,47 @@ class Line:
def __str__(self): def __str__(self):
return json.dumps({"t":self.t, "v": self.v}) return json.dumps({"t":self.t, "v": self.v})
def choose(a, b): def is_nothing(self):
return not self.v
def is_recent(self):
return time.time() - self.t < 5
class LineChooser:
def choose(self, a, b):
if not a: if not a:
return b return b
if not b: if not b:
return a return a
latest = max([a,b], key=lambda x:x.t) latest = max([a,b], key=lambda x:x.t)
oldest = min([a,b], key=lambda x:x.t) oldest = min([a,b], key=lambda x:x.t)
if not latest.v and time.time() - oldest.t < 1: return self._choose(latest, oldest)
def now(self):
return time.time()
class LineChooserLatestSticky(LineChooser):
def _choose(self, latest, oldest):
if latest.is_nothing() and oldest.is_recent():
return oldest return oldest
return latest return latest
class LineChooserSimulatedAnnealing(LineChooser):
initial_health = 50
nothing_penalty = 100
decay_rate = 5
def _choose(self, latest, oldest):
oldest_health = self.health(oldest)
seed = random.randint(0, 100)
if seed < oldest_health:
print("LineChooserSimulatedAnnealing retaining old because", seed, "<", oldest_health)
return oldest
return latest
def health(self, line):
health = LineChooserSimulatedAnnealing.initial_health
health = health - LineChooserSimulatedAnnealing.decay_rate * (self.now() - line.t)
if line.is_nothing():
health -= LineChooserSimulatedAnnealing.nothing_penalty
return health