import os from alert_config_list import Alert_Config_List from alert_config import Alert_Config from job_list import Job_List from job import Job from process_factory import Process_Factory from time import sleep from config import glob_the_configs from serviceapp import service class Service() : def __init__(self, logger, reload_interval, hostname, config): self.alert_config_list = Alert_Config_List() self.job_list = Job_List() self.logger = logger self.info = self.logger.info self.error = self.logger.error self.reload_interval = reload_interval self.box_hostname = os.environ['HOSTNAME'] if hostname is None else hostname self.production = not "TEST" in os.environ self.config = config def start(self) : self.info("Waiting 15s for Consul service to pass") sleep(15) while self.is_running() : new_alert_config_list = self.get_new_alert_config_list() self.purge_stale(new_alert_config_list) self.create_upserted(new_alert_config_list) self.alert_config_list = new_alert_config_list total_jobs = len(self.job_list) self.info("Total running jobs: {}".format(total_jobs)) service.send_stat('total_jobs', total_jobs, dict(), statprefix='aom') sleep(self.reload_interval) self.info("Exiting alerts") self.purge_stale(Alert_Config_List()) def is_running(self) : return True def get_new_alert_config_list(self) : try : yaml_configs = self.parse_alert_config_files() alert_configs = [Alert_Config(i) for i in yaml_configs] return Alert_Config_List(alert_configs) except Exception as e : self.error("Failed to load config files: {}".format(e)) return [] def parse_alert_config_files(self) : path = self.config['alert_folder'] routing = self.config['alert_routing_config'] consul = 'http://consul.service.consul:8500' return glob_the_configs(path, routing, consul, self.box_hostname, self.logger) def purge_stale(self, new_alert_config_list) : _, removed_ids, modified_ids = self.alert_config_list.compare(new_alert_config_list) stale_ids = removed_ids.union(modified_ids) for stale_id in stale_ids : self.job_list.kill(stale_id) service.send_stat('removed_jobs', len(removed_ids), dict(), statprefix='aom') self.info("Removed alert_configs: {}".format(removed_ids)) def create_upserted(self, new_alert_config_list) : added_ids, _, modified_ids = self.alert_config_list.compare(new_alert_config_list) upserted_ids = added_ids.union(modified_ids) for id in upserted_ids : p = self.spawn_process(new_alert_config_list[id]) j = Job(id, p) self.job_list.add(j) service.send_stat('new_jobs', len(added_ids), dict(), statprefix='aom') service.send_stat('modified_jobs', len(modified_ids), dict(), statprefix='aom') self.info("Added alert_configs: {}".format(added_ids)) self.info("Modified alert_configs: {}".format(added_ids)) def spawn_process(self, alert_config) : process_factory = Process_Factory(self.config, self.logger, self.production) process = process_factory.build(alert_config) process.start() return process