import signal from os import getpid, kill import time import argparse def main(): args = get_args() with_( N=args.n, M=args.m, R=args.r, p=args.p, ) def get_args(): ap = argparse.ArgumentParser() ap.add_argument("-n", type=float, help="queue capacity", default=3) ap.add_argument("-m", type=float, help="queue fill line", default=3) ap.add_argument("-r", type=float, help="drain rate per second", default=2) ap.add_argument("-p", type=str, help="path to write out to", default="/tmp/cbappend.both.txt") return ap.parse_args() def with_(N, M, R, p): triggered = CBAppend(p) released = CBAppend(p) cb = CBFork(triggered, released) buckets = {} while True: got = readline() if got: if not got in buckets: buckets[got] = Bucket(N, M, R, cb.cb(got)) buckets[got].push() # TODO no /state [buckets[i].pop() for i in buckets] def readline(): def __input(*args): return input() def _input(*args): try: foo = __input() except Exception as e: foo = None return foo timeout = 1 signal.signal(signal.SIGALRM, __input) signal.alarm(timeout) foo = _input() return foo class State(): def __init__(self, active, f): self.active = active self.f = f class Bucket(): def __init__(self, N, M, R, CB): self.q = 0.0 self.N = N self.M = M self.R = R self.CB = CB self.__last_pop = 0 self.__last_state = False self.__last_push = None def push(self): result = self.__push() self.__cb() return result def pop(self): result = self.__pop() self.__cb() return result def __cb(self): new_state = self.q >= self.M if new_state == self.__last_state: return self.__last_state = new_state filledness = int( 100*( max( [self.q-self.M, 0] )/max( [self.N-self.M, 1] ) ) )/100.0 if filledness > 1.0: filledness = 1.0 self.CB(State(new_state, filledness)) def state(self): return self.__last_state def __push(self): self.__pop() c = 1 now = self.__now() if self.__last_push: c = min([now - self.__last_push, 1]) if self.q+c > self.N: return False self.__last_push = now self.q += c return True def __pop(self): now = self.__now() remove_up_to = (now - self.__last_pop) / self.R if remove_up_to > self.q: remove_up_to = self.q self.q -= remove_up_to self.__last_pop = now def __now(self): return time.time() class CBSignals(): def __init__(self, pid, signal_triggered, signal_released): self.__pid = pid self.__signal_triggered = signal_triggered self.__signal_released = signal_released def cb(self, payload): def cb(state): print(f"state is now {state}") kill( self.__pid, self.__signal_triggered if state.active else self.__signal_released, ) handler = lambda s, f: print("SIGNAL:", s) signal.signal(self.__signal_triggered, handler) signal.signal(self.__signal_released, handler) return cb class CBAppend(): def __init__(self, path): self.__path = path def cb(self, payload): def cb(state): with open(self.__path, "a") as f: f.write(f"{'/' if not state.active else ''}{payload} {state.f}\n") return cb class CBFork(): def __init__(self, triggered, released): self.__triggered = triggered self.__released = released def cb(self, payload): cb_triggered = self.__triggered.cb(payload) cb_released = self.__released.cb(payload) def cb(state): if state.active: return cb_triggered(state) return cb_released(state) return cb if __name__ == "__main__": main()