Compare commits

...

12 Commits

Author SHA1 Message Date
Bel LaPointe
711035dadf gr try adjust test.sh 2022-04-13 06:59:22 -06:00
bel
5dca6be2b3 add DRY mode, dropping drian didnt help flap so much 2022-04-12 21:58:52 -06:00
bel
534bb42c06 some buttons in FCEUX but prety flappy 2022-04-12 21:51:15 -06:00
bel
90667cba52 test doesnt spam keycodes 2022-04-12 21:04:23 -06:00
bel
721a8e7e38 what happened 2022-04-12 20:32:52 -06:00
bel
b842227eb8 bad buttons 2022-04-12 20:31:19 -06:00
bel
9672390b55 Merge branch 'master' of https://gogs.inhome.blapointe.com/bel/leaky-bucket 2022-04-12 19:39:13 -06:00
bel
b91c73c5bb gr 2022-04-12 19:39:09 -06:00
Bel LaPointe
0dd99501e5 wip 2022-04-12 18:30:16 -06:00
Bel LaPointe
40343490f2 need to normal stream if i want leaky bucket to work nicely but hmmm 2022-04-12 15:51:15 -06:00
bel
896040391d buttons doesnt work with famicon at all, even with ez button 2022-04-12 07:51:17 -06:00
bel
5ec7cf4801 famicom doesnt capture key presses 2022-04-12 07:49:00 -06:00
5 changed files with 129 additions and 45 deletions

View File

@@ -18,6 +18,7 @@ def up(keycode):
_display.sync() _display.sync()
def __init_keys__(): def __init_keys__():
return [i for i in range(10, 20)] #1..0
import subprocess import subprocess
_p = subprocess.run( _p = subprocess.run(
"xmodmap -pke".split(), "xmodmap -pke".split(),
@@ -26,11 +27,40 @@ def __init_keys__():
assert(_p.returncode == 0) assert(_p.returncode == 0)
stdout = _p.stdout stdout = _p.stdout
result = [] result = []
for line in stdout.split("\n".encode()): allowed = ["F"+str(i) for i in range(13, 25)]
unassigned = []
# already assigned
for line in stdout.split("\n".encode())[1:]:
if line: if line:
words = line.split() words = line.split()
key = int(words[1]) key = int(words[1])
if len(words) < 4: if len(words) < 4:
unassigned.append(key)
elif words[3].decode() in allowed:
allowed.remove(words[3].decode())
result.append(key) result.append(key)
# not assigned
for key in unassigned:
if not allowed:
break
word = allowed.pop()
if word:
assert(subprocess.run([
"xmodmap", "-e", f"keycode {key} = {word}",
]).returncode == 0)
result.append(key)
print("unassigned", unassigned)
print("allowed", allowed)
print("result", result)
return result return result
keys = __init_keys__() keys = __init_keys__()
if __name__ == "__main__":
import time
for key in keys:
print("pushing", key, "in...")
for i in range(3):
print("", 3-i, "...")
time.sleep(1)
tap(53)
exit()

View File

@@ -1,36 +1,79 @@
from Xlib.display import Display try:
from Xlib.ext.xtest import fake_input from Xlib.display import Display
from Xlib import X from Xlib.ext.xtest import fake_input
from os import environ from Xlib import X
_display = Display(environ['DISPLAY'])
except Exception:
pass
_display = Display(environ['DISPLAY']) from os import environ
def tap(keycode): def tap(keycode):
down(keycode) down(keycode)
time.sleep(0.05)
up(keycode) up(keycode)
def down(keycode): def down(keycode):
if environ.get("DRY"):
return
fake_input(_display, X.KeyPress, keycode) fake_input(_display, X.KeyPress, keycode)
_display.sync() _display.sync()
def up(keycode): def up(keycode):
if environ.get("DRY"):
return
fake_input(_display, X.KeyRelease, keycode) fake_input(_display, X.KeyRelease, keycode)
_display.sync() _display.sync()
def __init_keys__(): def __init_keys__():
import subprocess if environ.get("DRY"):
return {int(i):str(i) for i in range(12)}
import subprocess
_p = subprocess.run( _p = subprocess.run(
"xmodmap -pke".split(), "xmodmap -pke".split(),
capture_output=True, capture_output=True,
) )
assert(_p.returncode == 0) assert(_p.returncode == 0)
stdout = _p.stdout stdout = _p.stdout
result = [] allowed = ["F"+str(i) for i in range(13, 25)]
for line in stdout.split("\n".encode()): result = {}
unassigned = []
# already assigned
for line in stdout.split("\n".encode())[1:]:
if line: if line:
words = line.split() words = line.split()
key = int(words[1]) key = int(words[1])
if len(words) < 4: if len(words) < 4:
result.append(key) unassigned.append(key)
elif words[3].decode() in allowed:
allowed.remove(words[3].decode())
result[key] = words[3].decode()
# not assigned
for key in unassigned:
if not allowed:
break
word = allowed.pop()
if word:
assert(subprocess.run([
"xmodmap", "-e", f"keycode {key} = {word}",
]).returncode == 0)
result[key] = word
print("unassigned", unassigned)
print("allowed", allowed)
print("result", result)
return result return result
keys = __init_keys__() keys = __init_keys__()
if __name__ == "__main__":
import time
for key in keys:
print("key", key, "in...")
n = 2
for i in range(n):
print(" ", n-i, "...")
time.sleep(1)
down(key)
time.sleep(0.1)
up(key)
print(" /key", key)
time.sleep(3)

View File

@@ -2,7 +2,8 @@ import argparse
import os import os
from time import sleep from time import sleep
#import buttons import buttons
print(buttons.keys)
def main(): def main():
args = get_args() args = get_args()
@@ -19,13 +20,15 @@ def main():
buckets[key] = [False, len(buckets)] buckets[key] = [False, len(buckets)]
buckets[key][0] = False if line[0] == "/" else float(line.split()[-1]) buckets[key][0] = False if line[0] == "/" else float(line.split()[-1])
for key in buckets: for key in buckets:
#keycode = buttons.keys[bucket[key][1]] keyindex = buckets[key][1]
keycode = list(sorted(buttons.keys.keys()))[keyindex]
keyname = buttons.keys[keycode]
if buckets[key][0]: if buckets[key][0]:
print(f"+{key}", end=" ") #, button[key]) print(f"+{key}({keyname})", end=" ")
#buttons.down(keycode) buttons.down(keycode)
else: else:
print(f"-{key}", end=" ") #, button[key]) print(f"-{key}({keyname})", end=" ")
#buttons.up(keycode) buttons.up(keycode)
print() print()
def get_args(): def get_args():

View File

@@ -9,20 +9,18 @@ def main():
N=args.n, N=args.n,
M=args.m, M=args.m,
R=args.r, R=args.r,
T=args.t,
p=args.p, p=args.p,
) )
def get_args(): def get_args():
ap = argparse.ArgumentParser() ap = argparse.ArgumentParser()
ap.add_argument("-n", type=int, help="queue capacity", default=3) ap.add_argument("-n", type=float, help="queue capacity", default=3)
ap.add_argument("-m", type=int, help="queue fill line", default=3) ap.add_argument("-m", type=float, help="queue fill line", default=3)
ap.add_argument("-r", type=int, help="drain rate per second", default=2) ap.add_argument("-r", type=float, help="drain rate per second", default=2)
ap.add_argument("-t", type=int, help="threshold for state", default=2)
ap.add_argument("-p", type=str, help="path to write out to", default="/tmp/cbappend.both.txt") ap.add_argument("-p", type=str, help="path to write out to", default="/tmp/cbappend.both.txt")
return ap.parse_args() return ap.parse_args()
def with_(N, M, R, T, p): def with_(N, M, R, p):
triggered = CBAppend(p) triggered = CBAppend(p)
released = CBAppend(p) released = CBAppend(p)
cb = CBFork(triggered, released) cb = CBFork(triggered, released)
@@ -33,7 +31,7 @@ def with_(N, M, R, T, p):
got = readline() got = readline()
if got: if got:
if not got in buckets: if not got in buckets:
buckets[got] = Bucket(N, M, R, T, cb.cb(got)) buckets[got] = Bucket(N, M, R, cb.cb(got))
buckets[got].push() buckets[got].push()
# TODO no /state # TODO no /state
[buckets[i].pop() for i in buckets] [buckets[i].pop() for i in buckets]
@@ -59,18 +57,18 @@ class State():
self.f = f self.f = f
class Bucket(): class Bucket():
def __init__(self, N, M, R, T, CB): def __init__(self, N, M, R, CB):
self.q = 0.0 self.q = 0.0
self.N = N self.N = N
self.M = M self.M = M
self.R = R self.R = R
self.T = T
self.CB = CB self.CB = CB
self.__last_pop = 0 self.__last_pop = 0
self.__last_state = False self.__last_state = False
self.__last_push = None
def push(self): def push(self):
result = self.__push_c(1) result = self.__push()
self.__cb() self.__cb()
return result return result
@@ -80,16 +78,16 @@ class Bucket():
return result return result
def __cb(self): def __cb(self):
new_state = self.q > self.T new_state = self.q >= self.M
if new_state == self.__last_state: if new_state == self.__last_state:
return return
self.__last_state = new_state self.__last_state = new_state
filledness = int( filledness = int(
100*( 100*(
max( max(
[self.q-self.T, 0] [self.q-self.M, 0]
)/max( )/max(
[self.M-self.T, 1] [self.N-self.M, 1]
) )
) )
)/100.0 )/100.0
@@ -100,10 +98,15 @@ class Bucket():
def state(self): def state(self):
return self.__last_state return self.__last_state
def __push_c(self, c): def __push(self):
self.__pop() self.__pop()
c = 1
now = self.__now()
if self.__last_push:
c = min([now - self.__last_push, 1])
if self.q+c > self.N: if self.q+c > self.N:
return False return False
self.__last_push = now
self.q += c self.q += c
return True return True

View File

@@ -16,18 +16,23 @@ trap cleanup EXIT
python3 ./state_to_buttons.py & python3 ./state_to_buttons.py &
python3 ./testdata/rand_0_n_weighted_stream.py \ (
-n 6 \ python3 ./testdata/rand_0_n_weighted_stream.py \
-b-min 1 \ -n 6 \
-b-max 10 \ -n 4 \
-d-min 100 \ -b-min 1 \
-d-max 3000 \ -b-max 10 \
-between 100 \ -d-min 100 \
-b-min 10 \ -d-max 1000 \
-b-max 100 \ -between 10 \
-d-min 100 \ -w 2 \
-d-max 3000 \ | peek \
-between 10 \ | python3 ./stream_to_state.py \
-w 3 \ -n 1 \
| peek \ -m .25 \
| python3 ./stream_to_state.py -r .1
) &
while true; do
read -s var
done