100 lines
3.3 KiB
Python
Executable File
100 lines
3.3 KiB
Python
Executable File
import unittest
|
|
|
|
import service
|
|
|
|
class TestMockRedis(unittest.TestCase) :
|
|
def test_base(self) :
|
|
from redis import Redis
|
|
r = service.get_redis_client()
|
|
self.assertTrue(not isinstance(r, Redis))
|
|
self.assertEqual(r.get("a"), None)
|
|
self.assertEqual(r.set("a", "b"), None)
|
|
self.assertEqual(r.call("KEYS", "b*"), [])
|
|
def test_test(self) :
|
|
r = TestRedis()
|
|
r.set("a", "b")
|
|
self.assertEqual(r.get("a"), "b")
|
|
self.assertEqual(r.call("KEYS", "a*"), ["a"])
|
|
|
|
DB = None
|
|
class TestRedis(service.MockRedis) :
|
|
def __init__(self) :
|
|
global DB
|
|
if DB is None :
|
|
DB = {}
|
|
def get(self, key) :
|
|
global DB
|
|
return DB[key] if key in DB else None
|
|
def delete(self, key) :
|
|
global DB
|
|
if key in DB :
|
|
del(DB[key])
|
|
def set(self, key, value) :
|
|
global DB
|
|
DB[key] = value
|
|
def call(self, cmd, arg) :
|
|
global DB
|
|
if not cmd is "KEYS" :
|
|
return None
|
|
return [i for i in DB.keys() if i.startswith(arg.strip("*"))]
|
|
|
|
class TestSetFiring(unittest.TestCase) :
|
|
def test_base(self) :
|
|
def mock_get_redis_client() :
|
|
return TestRedis()
|
|
service.get_redis_client = mock_get_redis_client
|
|
service.set_firing("TestSetFiring", [
|
|
[0, 0, 0, {"dc": "there"}],
|
|
[0, 0, 0, {"dc": "here"}],
|
|
])
|
|
self.assertEqual(len(service.list_firing("TestSetFiring")), 2)
|
|
service.set_firing("TestSetFiring", [
|
|
[0, 0, 0, {"dc": "here"}],
|
|
])
|
|
self.assertEqual(len(service.list_firing("TestSetFiring")), 1)
|
|
|
|
class MockResolveDep():
|
|
def __init__(self, l) :
|
|
self.l = l
|
|
def getDependencies(self) :
|
|
return self.l
|
|
|
|
class TestIsSuppressed(unittest.TestCase) :
|
|
def test_base(self) :
|
|
def mock_get_redis_client() :
|
|
return TestRedis()
|
|
service.get_redis_client = mock_get_redis_client
|
|
|
|
alert_config = {
|
|
'id': "TestIsSuppressed",
|
|
'resolvedDependencies': MockResolveDep(["TestIsSuppressedD", "b", "c"]),
|
|
'suppressed_occurrences_threshold': 2,
|
|
}
|
|
alert_tags = {"dc":"z", "x":"y"}
|
|
|
|
# dependency fires one alert, suppress in effect
|
|
service.set_firing("TestIsSuppressedD", [[0, 0, 0, alert_tags]])
|
|
service.clear_suppressed(alert_config, alert_tags)
|
|
self.assertTrue(service.is_suppressed(alert_config, alert_tags))
|
|
|
|
# dependency still firing alert, suppress stops
|
|
service.set_firing("TestIsSuppressedD", [[0, 0, 0, alert_tags]])
|
|
service.clear_suppressed(alert_config, alert_tags)
|
|
self.assertFalse(service.is_suppressed(alert_config, alert_tags))
|
|
|
|
# dependency in different dc fires alert, suppress in DC1 stops, suppress in DC2 starts
|
|
new_alert_tags = {"dc":"w"}
|
|
service.set_firing("TestIsSuppressedD", [[0, 0, 0, new_alert_tags]])
|
|
service.clear_suppressed(alert_config, new_alert_tags)
|
|
self.assertFalse(service.is_suppressed(alert_config, alert_tags))
|
|
self.assertTrue(service.is_suppressed(alert_config, new_alert_tags))
|
|
|
|
# dependencies clear everywhere, suppress stops everywhere
|
|
service.set_firing("TestIsSuppressedD", [])
|
|
service.clear_suppressed(alert_config, [])
|
|
self.assertFalse(service.is_suppressed(alert_config, alert_tags))
|
|
self.assertFalse(service.is_suppressed(alert_config, new_alert_tags))
|
|
|
|
if __name__ == "__main__" :
|
|
unittest.main()
|