Compare commits

..

12 Commits

Author SHA1 Message Date
Bel LaPointe
711035dadf gr try adjust test.sh 2022-04-13 06:59:22 -06:00
bel
5dca6be2b3 add DRY mode, dropping drian didnt help flap so much 2022-04-12 21:58:52 -06:00
bel
534bb42c06 some buttons in FCEUX but prety flappy 2022-04-12 21:51:15 -06:00
bel
90667cba52 test doesnt spam keycodes 2022-04-12 21:04:23 -06:00
bel
721a8e7e38 what happened 2022-04-12 20:32:52 -06:00
bel
b842227eb8 bad buttons 2022-04-12 20:31:19 -06:00
bel
9672390b55 Merge branch 'master' of https://gogs.inhome.blapointe.com/bel/leaky-bucket 2022-04-12 19:39:13 -06:00
bel
b91c73c5bb gr 2022-04-12 19:39:09 -06:00
Bel LaPointe
0dd99501e5 wip 2022-04-12 18:30:16 -06:00
Bel LaPointe
40343490f2 need to normal stream if i want leaky bucket to work nicely but hmmm 2022-04-12 15:51:15 -06:00
bel
896040391d buttons doesnt work with famicon at all, even with ez button 2022-04-12 07:51:17 -06:00
bel
5ec7cf4801 famicom doesnt capture key presses 2022-04-12 07:49:00 -06:00
5 changed files with 129 additions and 45 deletions

View File

@@ -18,6 +18,7 @@ def up(keycode):
_display.sync()
def __init_keys__():
return [i for i in range(10, 20)] #1..0
import subprocess
_p = subprocess.run(
"xmodmap -pke".split(),
@@ -26,11 +27,40 @@ def __init_keys__():
assert(_p.returncode == 0)
stdout = _p.stdout
result = []
for line in stdout.split("\n".encode()):
allowed = ["F"+str(i) for i in range(13, 25)]
unassigned = []
# already assigned
for line in stdout.split("\n".encode())[1:]:
if line:
words = line.split()
key = int(words[1])
if len(words) < 4:
unassigned.append(key)
elif words[3].decode() in allowed:
allowed.remove(words[3].decode())
result.append(key)
# not assigned
for key in unassigned:
if not allowed:
break
word = allowed.pop()
if word:
assert(subprocess.run([
"xmodmap", "-e", f"keycode {key} = {word}",
]).returncode == 0)
result.append(key)
print("unassigned", unassigned)
print("allowed", allowed)
print("result", result)
return result
keys = __init_keys__()
if __name__ == "__main__":
import time
for key in keys:
print("pushing", key, "in...")
for i in range(3):
print("", 3-i, "...")
time.sleep(1)
tap(53)
exit()

View File

@@ -1,23 +1,33 @@
from Xlib.display import Display
from Xlib.ext.xtest import fake_input
from Xlib import X
from os import environ
try:
from Xlib.display import Display
from Xlib.ext.xtest import fake_input
from Xlib import X
_display = Display(environ['DISPLAY'])
except Exception:
pass
_display = Display(environ['DISPLAY'])
from os import environ
def tap(keycode):
down(keycode)
time.sleep(0.05)
up(keycode)
def down(keycode):
if environ.get("DRY"):
return
fake_input(_display, X.KeyPress, keycode)
_display.sync()
def up(keycode):
if environ.get("DRY"):
return
fake_input(_display, X.KeyRelease, keycode)
_display.sync()
def __init_keys__():
if environ.get("DRY"):
return {int(i):str(i) for i in range(12)}
import subprocess
_p = subprocess.run(
"xmodmap -pke".split(),
@@ -25,12 +35,45 @@ def __init_keys__():
)
assert(_p.returncode == 0)
stdout = _p.stdout
result = []
for line in stdout.split("\n".encode()):
allowed = ["F"+str(i) for i in range(13, 25)]
result = {}
unassigned = []
# already assigned
for line in stdout.split("\n".encode())[1:]:
if line:
words = line.split()
key = int(words[1])
if len(words) < 4:
result.append(key)
unassigned.append(key)
elif words[3].decode() in allowed:
allowed.remove(words[3].decode())
result[key] = words[3].decode()
# not assigned
for key in unassigned:
if not allowed:
break
word = allowed.pop()
if word:
assert(subprocess.run([
"xmodmap", "-e", f"keycode {key} = {word}",
]).returncode == 0)
result[key] = word
print("unassigned", unassigned)
print("allowed", allowed)
print("result", result)
return result
keys = __init_keys__()
if __name__ == "__main__":
import time
for key in keys:
print("key", key, "in...")
n = 2
for i in range(n):
print(" ", n-i, "...")
time.sleep(1)
down(key)
time.sleep(0.1)
up(key)
print(" /key", key)
time.sleep(3)

View File

@@ -2,7 +2,8 @@ import argparse
import os
from time import sleep
#import buttons
import buttons
print(buttons.keys)
def main():
args = get_args()
@@ -19,13 +20,15 @@ def main():
buckets[key] = [False, len(buckets)]
buckets[key][0] = False if line[0] == "/" else float(line.split()[-1])
for key in buckets:
#keycode = buttons.keys[bucket[key][1]]
keyindex = buckets[key][1]
keycode = list(sorted(buttons.keys.keys()))[keyindex]
keyname = buttons.keys[keycode]
if buckets[key][0]:
print(f"+{key}", end=" ") #, button[key])
#buttons.down(keycode)
print(f"+{key}({keyname})", end=" ")
buttons.down(keycode)
else:
print(f"-{key}", end=" ") #, button[key])
#buttons.up(keycode)
print(f"-{key}({keyname})", end=" ")
buttons.up(keycode)
print()
def get_args():

View File

@@ -9,20 +9,18 @@ def main():
N=args.n,
M=args.m,
R=args.r,
T=args.t,
p=args.p,
)
def get_args():
ap = argparse.ArgumentParser()
ap.add_argument("-n", type=int, help="queue capacity", default=3)
ap.add_argument("-m", type=int, help="queue fill line", default=3)
ap.add_argument("-r", type=int, help="drain rate per second", default=2)
ap.add_argument("-t", type=int, help="threshold for state", default=2)
ap.add_argument("-n", type=float, help="queue capacity", default=3)
ap.add_argument("-m", type=float, help="queue fill line", default=3)
ap.add_argument("-r", type=float, help="drain rate per second", default=2)
ap.add_argument("-p", type=str, help="path to write out to", default="/tmp/cbappend.both.txt")
return ap.parse_args()
def with_(N, M, R, T, p):
def with_(N, M, R, p):
triggered = CBAppend(p)
released = CBAppend(p)
cb = CBFork(triggered, released)
@@ -33,7 +31,7 @@ def with_(N, M, R, T, p):
got = readline()
if got:
if not got in buckets:
buckets[got] = Bucket(N, M, R, T, cb.cb(got))
buckets[got] = Bucket(N, M, R, cb.cb(got))
buckets[got].push()
# TODO no /state
[buckets[i].pop() for i in buckets]
@@ -59,18 +57,18 @@ class State():
self.f = f
class Bucket():
def __init__(self, N, M, R, T, CB):
def __init__(self, N, M, R, CB):
self.q = 0.0
self.N = N
self.M = M
self.R = R
self.T = T
self.CB = CB
self.__last_pop = 0
self.__last_state = False
self.__last_push = None
def push(self):
result = self.__push_c(1)
result = self.__push()
self.__cb()
return result
@@ -80,16 +78,16 @@ class Bucket():
return result
def __cb(self):
new_state = self.q > self.T
new_state = self.q >= self.M
if new_state == self.__last_state:
return
self.__last_state = new_state
filledness = int(
100*(
max(
[self.q-self.T, 0]
[self.q-self.M, 0]
)/max(
[self.M-self.T, 1]
[self.N-self.M, 1]
)
)
)/100.0
@@ -100,10 +98,15 @@ class Bucket():
def state(self):
return self.__last_state
def __push_c(self, c):
def __push(self):
self.__pop()
c = 1
now = self.__now()
if self.__last_push:
c = min([now - self.__last_push, 1])
if self.q+c > self.N:
return False
self.__last_push = now
self.q += c
return True

View File

@@ -16,18 +16,23 @@ trap cleanup EXIT
python3 ./state_to_buttons.py &
python3 ./testdata/rand_0_n_weighted_stream.py \
(
python3 ./testdata/rand_0_n_weighted_stream.py \
-n 6 \
-n 4 \
-b-min 1 \
-b-max 10 \
-d-min 100 \
-d-max 3000 \
-between 100 \
-b-min 10 \
-b-max 100 \
-d-min 100 \
-d-max 3000 \
-d-max 1000 \
-between 10 \
-w 3 \
| peek \
| python3 ./stream_to_state.py
-w 2 \
| peek \
| python3 ./stream_to_state.py \
-n 1 \
-m .25 \
-r .1
) &
while true; do
read -s var
done