leaky-bucket/poc/py/stream_to_state.py

165 lines
3.9 KiB
Python

import signal
from os import getpid, kill
import time
import argparse
def main():
args = get_args()
with_(
N=args.n,
M=args.m,
R=args.r,
T=args.t,
p=args.p,
)
def get_args():
ap = argparse.ArgumentParser()
ap.add_argument("-n", type=int, help="queue capacity", default=3)
ap.add_argument("-m", type=int, help="queue fill line", default=3)
ap.add_argument("-r", type=int, help="drain rate per second", default=2)
ap.add_argument("-t", type=int, help="threshold for state", default=2)
ap.add_argument("-p", type=str, help="path to write out to", default="/tmp/cbappend.both.txt")
return ap.parse_args()
def with_(N, M, R, T, p):
triggered = CBAppend(p)
released = CBAppend(p)
cb = CBFork(triggered, released)
buckets = {}
while True:
got = readline()
if got:
if not got in buckets:
buckets[got] = Bucket(N, M, R, T, cb.cb(got))
buckets[got].push()
# TODO no /state
[buckets[i].pop() for i in buckets]
def readline():
def __input(*args):
return input()
def _input(*args):
try:
foo = __input()
except Exception as e:
foo = None
return foo
timeout = 1
signal.signal(signal.SIGALRM, __input)
signal.alarm(timeout)
foo = _input()
return foo
class State():
def __init__(self, active, f):
self.active = active
self.f = f
class Bucket():
def __init__(self, N, M, R, T, CB):
self.q = 0.0
self.N = N
self.M = M
self.R = R
self.T = T
self.CB = CB
self.__last_pop = 0
self.__last_state = False
def push(self):
result = self.__push_c(1)
self.__cb()
return result
def pop(self):
result = self.__pop()
self.__cb()
return result
def __cb(self):
new_state = self.q > self.T
if new_state == self.__last_state:
return
self.__last_state = new_state
filledness = int(
100*(
max(
[self.q-self.T, 0]
)/max(
[self.M-self.T, 1]
)
)
)/100.0
if filledness > 1.0:
filledness = 1.0
self.CB(State(new_state, filledness))
def state(self):
return self.__last_state
def __push_c(self, c):
self.__pop()
if self.q+c > self.N:
return False
self.q += c
return True
def __pop(self):
now = self.__now()
remove_up_to = (now - self.__last_pop) / self.R
if remove_up_to > self.q:
remove_up_to = self.q
self.q -= remove_up_to
self.__last_pop = now
def __now(self):
return time.time()
class CBSignals():
def __init__(self, pid, signal_triggered, signal_released):
self.__pid = pid
self.__signal_triggered = signal_triggered
self.__signal_released = signal_released
def cb(self, payload):
def cb(state):
print(f"state is now {state}")
kill(
self.__pid,
self.__signal_triggered if state.active else self.__signal_released,
)
handler = lambda s, f: print("SIGNAL:", s)
signal.signal(self.__signal_triggered, handler)
signal.signal(self.__signal_released, handler)
return cb
class CBAppend():
def __init__(self, path):
self.__path = path
def cb(self, payload):
def cb(state):
with open(self.__path, "a") as f:
f.write(f"{'/' if not state.active else ''}{payload} {state.f}\n")
return cb
class CBFork():
def __init__(self, triggered, released):
self.__triggered = triggered
self.__released = released
def cb(self, payload):
cb_triggered = self.__triggered.cb(payload)
cb_released = self.__released.cb(payload)
def cb(state):
if state.active:
return cb_triggered(state)
return cb_released(state)
return cb
if __name__ == "__main__":
main()