Compare commits

...

10 Commits

Author SHA1 Message Date
Bel LaPointe
f2bce26ecb almost, but cant press multi keys atm 2022-09-20 16:19:26 -06:00
Bel LaPointe
582ee7a0d2 simulated annealing CLI changeable 2022-09-20 15:56:54 -06:00
Bel LaPointe
2914c64fda simulated annealing output chooser 2022-09-20 15:52:41 -06:00
Bel LaPointe
c6b648195f test line chooser sticky 2022-09-20 15:39:43 -06:00
Bel LaPointe
ac7b583f76 objectify 2022-09-20 15:29:48 -06:00
Bel LaPointe
8a9cb5a82d ok thats pretty good but what can simulated annealing do 2022-09-20 15:22:59 -06:00
Bel LaPointe
4b846fd9ec reader users bucket with non log chooser 2022-09-20 15:16:13 -06:00
Bel LaPointe
1da2d9fe9c one more 2022-09-20 15:04:37 -06:00
Bel LaPointe
dbff6792fa test bucket 2022-09-20 15:03:38 -06:00
bel
f5c6c37868 todo 2022-09-18 19:42:58 -06:00
6 changed files with 239 additions and 62 deletions

View File

@@ -1,24 +1,25 @@
import random
import time
import math
class Bucket:
def __init__(self, interval):
random.seed(time.time())
self.interval = interval
self.name = self.ms_to_bucket(Bucket.now_ms())
self.chooser = BucketChooserProportionalLogRandom()
self.content = []
def push(self, v):
if v:
self.content.append(v)
self._push(v)
def pick_n(self, n):
if not self.content:
return []
result = []
for i in range(0, n):
result.append(self.content[random.randint(0, len(self.content)-1)])
return list(set(result))
def _push(self, v):
for i in self.content:
if i[0] == v:
i[1] += 1
return
self.content.append([v, 1])
def now_ms():
return int(1000 * time.time())
@@ -26,3 +27,34 @@ class Bucket:
def ms_to_bucket(self, ms):
return int(int(ms // self.interval) * self.interval)
def pick_n(self, n):
if not self.content:
return []
result = set()
for i in range(n):
result.add(self.chooser.choose(self.content))
return list(result)
class BucketChooserProportionalLogRandom:
def choose(self, content):
content = self.xform(content)
assert(content)
idx = random.randint(0, sum([i[1] for i in content])-1)
while content:
candidate = content.pop()
idx -= candidate[1]
if idx <= 0:
return candidate[0]
raise Exception("how?!")
def xform(self, content):
return [
(
i[0],
1 + int(100 * math.log2(i[1])),
) for i in content
]
class BucketChooserProportionalRandom(BucketChooserProportionalLogRandom):
def xform(self, content):
return [(i[0], 1+int(100*i[1])) for i in content]

View File

@@ -15,15 +15,23 @@ def main():
ap.add_argument("--max-keys-down", default=2, type=int)
ap.add_argument("--startup", default=3, type=int)
ap.add_argument("--controller-setup", default=False, action="store_true")
ap.add_argument("--simulated-annealing-initial-health", default=50, type=int)
ap.add_argument("--simulated-annealing-nothing-penalty", default=100, type=int)
ap.add_argument("--simulated-annealing-decay-rate", default=5, type=int)
ap.add_argument("--translation", default=json.dumps({
"up": {"key": "w", "weight": 20},
"down": {"key": "s", "weight": 1},
"left": {"key": "a", "weight": 1},
"right": {"key": "d", "weight": 20},
"up": {"key": "w", "weight": 20, "hold": False},
"down": {"key": "s", "weight": 1, "hold": True},
"left": {"key": "a", "weight": 1, "hold": True},
"right": {"key": "d", "weight": 20, "hold": True},
}))
args = ap.parse_args()
writer.LineChooserSimulatedAnnealing.initial_health = args.simulated_annealing_initial_health
writer.LineChooserSimulatedAnnealing.nothing_penalty = args.simulated_annealing_nothing_penalty
writer.LineChooserSimulatedAnnealing.decay_rate = args.simulated_annealing_decay_rate
w_translation = json.loads(args.translation) if args.translation else None
w = writer.Writer(writer.PyAutoGUIWriter(w_translation))
if args.stdout:
w = writer.Writer(writer.StdoutWriter())

View File

@@ -1,10 +1,13 @@
import threading
import math
import json
import queue
import time
import sys
import select
import random
import log
import bucket
__interval__ = .1
@@ -67,45 +70,26 @@ class StdinReader:
return None
class RandomReader:
def __init__(self, keys={"a":{"weight":1}, "b":{"weight":1}}):
self.keys = keys
self.pool = RandomPool(keys)
def __init__(self, translation):
self.translation = translation
self.pool = bucket.Bucket(1)
self.pool.chooser = bucket.BucketChooserProportionalRandom()
total_weight = float(sum([i["weight"] for i in translation.values()]))
for v in translation.values():
v["weight"] = float(v["weight"]) / total_weight
for v in translation.values():
v["weight"] = int(1000 * math.sqrt(v["weight"]))
print(1, json.dumps(translation, indent=" "))
for k,v in translation.items():
for i in range(v["weight"]):
self.pool.push(k)
def read(self):
return self.pool.pop()
class RandomPool:
def __init__(self, values):
self.values = {k:1000*v["weight"] for k,v in values.items()}
self.total = sum(self.values.values())
self.consumed = set()
def reset(self):
self.consumed = set()
def pop(self):
k = self._pop()
for i in range(0, 3-1):
if k in self.consumed:
k = self._pop()
else:
break
self.consumed.add(k)
if self.should_reset():
self.reset()
return k
def _pop(self):
idx = random.randint(0, self.total-1)
idx_offset = 0
for k in sorted(self.values.keys()):
if self.values[k] > idx+idx_offset:
return k
idx_offset -= self.values[k]
raise Exception(":(")
def should_reset(self):
return len(self.consumed) >= len(self.values)/2
result = self.pool.pick_n(1)
if not result:
return None
return result[0]
class FileReader:
def __init__(self, path):

45
src/test_bucket.py Normal file
View File

@@ -0,0 +1,45 @@
import unittest
import bucket
class TestBucket(unittest.TestCase):
def test(self):
bkt = bucket.Bucket(1)
for i in range(50):
bkt.push(1)
bkt.push(2)
self.assertEqual(2, len(bkt.content))
for i in range(1000):
result = bkt.pick_n(2)
self.assertTrue(result)
self.assertTrue(len(result) <= 2)
result = bkt.pick_n(2)
while len(result) != 2:
result = bkt.pick_n(2)
result = bkt.pick_n(2)
while len(result) != 1:
result = bkt.pick_n(2)
class TestBucketChooserProportionalLogRandom(unittest.TestCase):
def test_xform(self):
chooser = bucket.BucketChooserProportionalLogRandom()
self.assertFalse(bool(chooser.xform([])))
self.assertEqual(1, len(chooser.xform([ (1,1) ])))
self.assertEqual(2, len(chooser.xform([ (1,1) ])[0]))
class TestBucketChoosers(unittest.TestCase):
def test_choosers(self):
for chooser in [
bucket.BucketChooserProportionalLogRandom(),
bucket.BucketChooserProportionalRandom(),
]:
with self.assertRaises(AssertionError):
self.assertRaises(chooser.choose([]))
self.assertTrue(chooser.choose([(1,1)]))
self.assertEqual(1, chooser.choose([(1,1)]))
if __name__ == "__main__":
unittest.main()

60
src/test_writer.py Normal file
View File

@@ -0,0 +1,60 @@
import unittest
import writer
class TestLineChooser(unittest.TestCase):
def test_simulated_annealing(self):
chooser = writer.LineChooserSimulatedAnnealing()
chooser.now = lambda *args: 100
for name, c in ({
"fresh something": [
writer.LineChooserSimulatedAnnealing.initial_health,
chooser.now(),
],
"old something": [
writer.LineChooserSimulatedAnnealing.initial_health - writer.LineChooserSimulatedAnnealing.decay_rate * 5,
chooser.now() - 5,
],
"fresh nothing": [
writer.LineChooserSimulatedAnnealing.initial_health - writer.LineChooserSimulatedAnnealing.nothing_penalty,
chooser.now(),
],
}).items():
self.assertEqual(
c[0],
chooser.health(self.new_line(True, c[1], not "nothing" in name)),
name,
)
def test_latest_sticky(self):
stale_something = self.new_line(False, 1, 1)
stale_nothing = self.new_line(False, 1, None)
old_something = self.new_line(True, 2, 2)
old_nothing = self.new_line(True, 2, None)
new_something = self.new_line(True, 3, 3)
new_nothing = self.new_line(True, 3, None)
chooser = writer.LineChooserLatestSticky()
for name, c in ({
"nothing over stale": [new_nothing, stale_something],
"slightly old over nothing": [old_something, new_nothing],
"new and nonzero": [new_something, old_something],
"new over nothing": [new_something, old_nothing],
"new over stale nothing": [new_something, stale_nothing],
}).items():
self.assertEqual(
c[0],
chooser.choose(c[0], c[1]),
name,
)
def new_line(self, is_recent, t, v):
result = writer.Line(v)
result.t = t
result.is_recent = lambda *args: is_recent
return result
if __name__ == "__main__":
unittest.main()

View File

@@ -2,16 +2,18 @@ import time
import json
import pyautogui
import log
import random
class Writer:
def __init__(self, writer):
self.writer = writer
self.previous = Line([])
self.chooser = LineChooserLatestSticky()
self.chooser = LineChooserSimulatedAnnealing()
def write(self, v):
latest = Line(v)
chosen = Line.choose(self.previous, latest)
if chosen.v != self.previous.v:
chosen = self.chooser.choose(self.previous, latest)
self.writer.write(chosen)
self.previous = chosen
@@ -28,14 +30,19 @@ class MultiWriter:
w.write(v)
class StdoutWriter:
def __init__(self):
self.last = ""
def write(self, v):
if str(v) == self.last:
return
self.last = str(v)
print(v)
class PyAutoGUIWriter:
def __init__(self, optional_translation):
def __init__(self, translation):
pyautogui.FAILSAFE = False
self.keys_down = set()
self.translation = optional_translation
self.translation = translation
log.info(json.dumps(self.translation, indent=" "))
def write(self, v):
@@ -45,24 +52,31 @@ class PyAutoGUIWriter:
if got:
to_push.add(got)
for key in [k for k in self.keys_down]:
if not key in to_push:
if not key in [i[0] for i in to_push]:
self.release(key)
for key in to_push:
self.push(key)
self.push(key[0], key[1])
log.info("PUSHING", self.keys_down)
# https://pyautogui.readthedocs.io/en/latest/keyboard.html#keyboard-keys
def translate(self, v):
result = self.translation.get(v, {}).get("key")
if result:
return result
result = self.translation.get(v, {})
key = result.get("key")
hold = result.get("hold")
if key:
return (key, hold)
def push(self, k):
def push(self, k, hold):
self.keys_down.add(k)
pyautogui.keyDown(k)
if not hold:
self._release(k)
def release(self, k):
self.keys_down.remove(k)
self._release(k)
def _release(self, k):
pyautogui.keyUp(k)
class Line:
@@ -73,13 +87,47 @@ class Line:
def __str__(self):
return json.dumps({"t":self.t, "v": self.v})
def choose(a, b):
def is_nothing(self):
return not self.v
def is_recent(self):
return time.time() - self.t < 5
class LineChooser:
def choose(self, a, b):
if not a:
return b
if not b:
return a
latest = max([a,b], key=lambda x:x.t)
oldest = min([a,b], key=lambda x:x.t)
if not latest.v and time.time() - oldest.t < 1:
return self._choose(latest, oldest)
def now(self):
return time.time()
class LineChooserLatestSticky(LineChooser):
def _choose(self, latest, oldest):
if latest.is_nothing() and oldest.is_recent():
return oldest
return latest
class LineChooserSimulatedAnnealing(LineChooser):
initial_health = 50
nothing_penalty = 100
decay_rate = 5
def _choose(self, latest, oldest):
oldest_health = self.health(oldest)
seed = random.randint(0, 100)
if seed < oldest_health:
print("LineChooserSimulatedAnnealing retaining old because", seed, "<", oldest_health)
return oldest
return latest
def health(self, line):
health = LineChooserSimulatedAnnealing.initial_health
health = health - LineChooserSimulatedAnnealing.decay_rate * (self.now() - line.t)
if line.is_nothing():
health -= LineChooserSimulatedAnnealing.nothing_penalty
return health