test bucket

master
Bel LaPointe 2022-09-20 15:03:38 -06:00
parent f5c6c37868
commit dbff6792fa
2 changed files with 73 additions and 12 deletions

View File

@ -1,11 +1,13 @@
import random
import time
import math
class Bucket:
def __init__(self, interval):
random.seed(time.time())
self.interval = interval
self.name = self.ms_to_bucket(Bucket.now_ms())
self.chooser = BucketChooserProportionalLogRandom()
self.content = []
def push(self, v):
@ -19,21 +21,36 @@ class Bucket:
return
self.content.append([v, 1])
def pick_n(self, n):
if not self.content:
return []
content = [() for i in content] # TODO log2
result = []
for i in range(0, n):
idx = random.randint(0, len(self.content)-1)
result.append(
self.content[idx][0],
)
return list(set(result))
def now_ms():
return int(1000 * time.time())
def ms_to_bucket(self, ms):
return int(int(ms // self.interval) * self.interval)
def pick_n(self, n):
if not self.content:
return []
result = set()
for i in range(n):
result.add(self.chooser.choose(self.content))
return list(result)
class BucketChooserProportionalLogRandom:
def choose(self, content):
content = self.xform(content)
assert(content)
idx = random.randint(0, sum([i[1] for i in content])-1)
while content:
candidate = content.pop()
idx -= candidate[1]
if idx <= 0:
return candidate[0]
raise Exception("how?!")
def xform(self, content):
return [
(
i[0],
1 + int(100 * math.log2(i[1])),
) for i in content
]

44
src/test_bucket.py Normal file
View File

@ -0,0 +1,44 @@
import unittest
import bucket
class TestBucket(unittest.TestCase):
def test(self):
bkt = bucket.Bucket(1)
for i in range(50):
bkt.push(1)
bkt.push(2)
self.assertEqual(2, len(bkt.content))
for i in range(1000):
result = bkt.pick_n(2)
self.assertTrue(result)
self.assertTrue(len(result) <= 2)
result = bkt.pick_n(2)
while len(result) != 2:
result = bkt.pick_n(2)
result = bkt.pick_n(2)
while len(result) != 1:
result = bkt.pick_n(2)
class TestBucketChooserProportionalLogRandom(unittest.TestCase):
def test_xform(self):
chooser = bucket.BucketChooserProportionalLogRandom()
self.assertFalse(bool(chooser.xform([])))
self.assertEqual(1, len(chooser.xform([ (1,1) ])))
self.assertEqual(2, len(chooser.xform([ (1,1) ])[0]))
class TestBucketChoosers(unittest.TestCase):
def test_choosers(self):
for chooser in [
bucket.BucketChooserProportionalLogRandom(),
]:
with self.assertRaises(AssertionError):
self.assertRaises(chooser.choose([]))
self.assertTrue(chooser.choose([(1,1)]))
self.assertEqual(1, chooser.choose([(1,1)]))
if __name__ == "__main__":
unittest.main()