add py-ratio1024-log2 but log looks notsogood so its just ratio

master
Bel LaPointe 2022-04-11 19:11:04 -06:00
parent e161cacf00
commit 1c41c2a717
8 changed files with 248 additions and 0 deletions

View File

@ -0,0 +1,189 @@
import signal
import time
import argparse
from math import log
def main():
args = get_args()
with_(
P=args.p,
D=args.d,
M=args.m,
)
def get_args():
ap = argparse.ArgumentParser()
ap.add_argument("-p", type=str, help="path to write out to", default="/tmp/cbappend.both.txt")
ap.add_argument("-d", type=int, help="milliseconds to retain", default=1000)
ap.add_argument("-m", type=float, help="noise threshold", default=.25)
return ap.parse_args()
def with_(P, D, M):
triggered = CBAppend(P)
released = CBAppend(P)
cb = CBFork(triggered, released)
window = Window(D, M)
while True:
got = readline()
if got:
window.push(got)
else:
window.pop()
print(window.report())
def readline():
def __input(*args):
return input()
def _input(*args):
try:
foo = __input()
except Exception as e:
foo = None
return foo
timeout = 1
signal.signal(signal.SIGALRM, __input)
signal.alarm(timeout)
foo = _input()
return foo
class State():
def __init__(self, active, f):
self.active = active
self.f = f
class Window():
def __init__(self, D, M):
self.D = D
self.M = M
self.w = []
self.n = 1024
def report(self):
scaled_rates = self.report_scaled_rates()
ttl = sum([scaled_rates[k] for k in scaled_rates])
results = {}
for key in scaled_rates:
results[key] = int(100.0 * scaled_rates[key] / ttl) / 100
return {k:results[k] for k in results if results[k] > self.M}
def report_scaled_rates(self):
self.__pop()
cnt = len(self.w)
keys = list(set([i[0] for i in self.w]))
scaled_rates = {}
for key in keys:
count = len([True for i in self.w if i[0] == key])
#scaled_rate = int(log((count / cnt) * 1024.0, 2)+.5)
scaled_rate = int((count / cnt) * 1024.0 + .5)
scaled_rates[key] = scaled_rate
return scaled_rates
def push(self, k):
self.__pop()
self.__push(k)
def pop(self):
self.__pop()
def __push(self, k):
self.w.append((k, self.__now()))
if len(self.w) > self.n:
self.w = self.w[len(self.w)-self.n:]
def __pop(self):
now = self.__now()
self.w = [
i for i in self.w if now - i[1] < self.D
]
def __now(self):
return time.time()*1000
class Bucket():
def __init__(self, N, M, R, T, CB):
self.q = 0.0
self.N = N
self.M = M
self.R = R
self.T = T
self.CB = CB
self.__last_pop = 0
self.__last_state = False
def push(self):
result = self.__push_c(1)
self.__cb()
return result
def pop(self):
result = self.__pop()
self.__cb()
return result
def __cb(self):
new_state = self.q > self.T
if new_state == self.__last_state:
return
self.__last_state = new_state
filledness = int(
100*(
max(
[self.q-self.T, 0]
)/max(
[self.M-self.T, 1]
)
)
)/100.0
if filledness > 1.0:
filledness = 1.0
self.CB(State(new_state, filledness))
def state(self):
return self.__last_state
def __push_c(self, c):
self.__pop()
if self.q+c > self.N:
return False
self.q += c
return True
def __pop(self):
now = self.__now()
remove_up_to = (now - self.__last_pop) / self.R
if remove_up_to > self.q:
remove_up_to = self.q
self.q -= remove_up_to
self.__last_pop = now
def __now(self):
return time.time()
class CBAppend():
def __init__(self, path):
self.__path = path
def cb(self, payload):
def cb(state):
with open(self.__path, "a") as f:
f.write(f"{'/' if not state.active else ''}{payload} {state.f}\n")
return cb
class CBFork():
def __init__(self, triggered, released):
self.__triggered = triggered
self.__released = released
def cb(self, payload):
cb_triggered = self.__triggered.cb(payload)
cb_released = self.__released.cb(payload)
def cb(state):
if state.active:
return cb_triggered(state)
return cb_released(state)
return cb
if __name__ == "__main__":
main()

View File

@ -0,0 +1 @@
pyautogui

View File

@ -0,0 +1,27 @@
import argparse
import os
from time import sleep
def main():
args = get_args()
buckets = {}
with open(args.p, "r") as f:
f.seek(0, os.SEEK_END)
while True:
line = f.readline().strip()
if not line:
sleep(0.25)
continue
key = line.strip("/").split()[0]
if not key in buckets:
buckets[key] = False
buckets[key] = False if line[0] == "/" else float(line.split()[-1])
print("[", " ".join(sorted([f"{int(i)}={buckets[i]}" for i in buckets if buckets[i]])), "]")
def get_args():
ap = argparse.ArgumentParser()
ap.add_argument("-p", type=str, help="path to write out to", default="/tmp/cbappend.both.txt")
return ap.parse_args()
if __name__ == "__main__":
main()

31
poc/py-rps/test.sh Normal file
View File

@ -0,0 +1,31 @@
#! /bin/bash
set -e
set -o pipefail
peek() {
while read -r line; do
echo $line
echo $line >&2
done
}
cleanup() {
kill -9 $(jobs -p) || true
}
trap cleanup EXIT
python3 ./state_to_buttons.py &
python3 ./testdata/rand_0_n_stream.py \
-n 6 \
-b-min 1 \
-b-max 10 \
-d-min 100 \
-d-max 3000 \
-between 100 \
| peek \
| python3 ./stream_to_state.py \
-n 20 \
-m 15 \
-r 15 \
-t 15