81 lines
3.2 KiB
Python
Executable File
81 lines
3.2 KiB
Python
Executable File
import os
|
|
from alert_config_list import Alert_Config_List
|
|
from alert_config import Alert_Config
|
|
from job_list import Job_List
|
|
from job import Job
|
|
from process_factory import Process_Factory
|
|
from time import sleep
|
|
from config import glob_the_configs
|
|
from serviceapp import service
|
|
|
|
class Service() :
|
|
def __init__(self, logger, reload_interval, hostname, config):
|
|
self.alert_config_list = Alert_Config_List()
|
|
self.job_list = Job_List()
|
|
self.logger = logger
|
|
self.info = self.logger.info
|
|
self.error = self.logger.error
|
|
self.reload_interval = reload_interval
|
|
self.box_hostname = os.environ['HOSTNAME'] if hostname is None else hostname
|
|
self.production = not "TEST" in os.environ
|
|
self.config = config
|
|
|
|
def start(self) :
|
|
self.info("Waiting 15s for Consul service to pass")
|
|
sleep(15)
|
|
while self.is_running() :
|
|
new_alert_config_list = self.get_new_alert_config_list()
|
|
self.purge_stale(new_alert_config_list)
|
|
self.create_upserted(new_alert_config_list)
|
|
self.alert_config_list = new_alert_config_list
|
|
total_jobs = len(self.job_list)
|
|
self.info("Total running jobs: {}".format(total_jobs))
|
|
service.send_stat('total_jobs', total_jobs, dict(), statprefix='aom')
|
|
sleep(self.reload_interval)
|
|
self.info("Exiting alerts")
|
|
self.purge_stale(Alert_Config_List())
|
|
|
|
def is_running(self) :
|
|
return True
|
|
|
|
def get_new_alert_config_list(self) :
|
|
try :
|
|
yaml_configs = self.parse_alert_config_files()
|
|
alert_configs = [Alert_Config(i) for i in yaml_configs]
|
|
return Alert_Config_List(alert_configs)
|
|
except Exception as e :
|
|
self.error("Failed to load config files: {}".format(e))
|
|
return []
|
|
|
|
def parse_alert_config_files(self) :
|
|
path = self.config['alert_folder']
|
|
routing = self.config['alert_routing_config']
|
|
consul = 'http://consul.service.consul:8500'
|
|
return glob_the_configs(path, routing, consul, self.box_hostname, self.logger)
|
|
|
|
def purge_stale(self, new_alert_config_list) :
|
|
_, removed_ids, modified_ids = self.alert_config_list.compare(new_alert_config_list)
|
|
stale_ids = removed_ids.union(modified_ids)
|
|
for stale_id in stale_ids :
|
|
self.job_list.kill(stale_id)
|
|
service.send_stat('removed_jobs', len(removed_ids), dict(), statprefix='aom')
|
|
self.info("Removed alert_configs: {}".format(removed_ids))
|
|
|
|
def create_upserted(self, new_alert_config_list) :
|
|
added_ids, _, modified_ids = self.alert_config_list.compare(new_alert_config_list)
|
|
upserted_ids = added_ids.union(modified_ids)
|
|
for id in upserted_ids :
|
|
p = self.spawn_process(new_alert_config_list[id])
|
|
j = Job(id, p)
|
|
self.job_list.add(j)
|
|
service.send_stat('new_jobs', len(added_ids), dict(), statprefix='aom')
|
|
service.send_stat('modified_jobs', len(modified_ids), dict(), statprefix='aom')
|
|
self.info("Added alert_configs: {}".format(added_ids))
|
|
self.info("Modified alert_configs: {}".format(added_ids))
|
|
|
|
def spawn_process(self, alert_config) :
|
|
process_factory = Process_Factory(self.config, self.logger, self.production)
|
|
process = process_factory.build(alert_config)
|
|
process.start()
|
|
return process
|