QVolution2019.2/AoM_Service/library/service.py

81 lines
3.2 KiB
Python
Executable File

import os
from alert_config_list import Alert_Config_List
from alert_config import Alert_Config
from job_list import Job_List
from job import Job
from process_factory import Process_Factory
from time import sleep
from config import glob_the_configs
from serviceapp import service
class Service() :
def __init__(self, logger, reload_interval, hostname, config):
self.alert_config_list = Alert_Config_List()
self.job_list = Job_List()
self.logger = logger
self.info = self.logger.info
self.error = self.logger.error
self.reload_interval = reload_interval
self.box_hostname = os.environ['HOSTNAME'] if hostname is None else hostname
self.production = not "TEST" in os.environ
self.config = config
def start(self) :
self.info("Waiting 15s for Consul service to pass")
sleep(15)
while self.is_running() :
new_alert_config_list = self.get_new_alert_config_list()
self.purge_stale(new_alert_config_list)
self.create_upserted(new_alert_config_list)
self.alert_config_list = new_alert_config_list
total_jobs = len(self.job_list)
self.info("Total running jobs: {}".format(total_jobs))
service.send_stat('total_jobs', total_jobs, dict(), statprefix='aom')
sleep(self.reload_interval)
self.info("Exiting alerts")
self.purge_stale(Alert_Config_List())
def is_running(self) :
return True
def get_new_alert_config_list(self) :
try :
yaml_configs = self.parse_alert_config_files()
alert_configs = [Alert_Config(i) for i in yaml_configs]
return Alert_Config_List(alert_configs)
except Exception as e :
self.error("Failed to load config files: {}".format(e))
return []
def parse_alert_config_files(self) :
path = self.config['alert_folder']
routing = self.config['alert_routing_config']
consul = 'http://consul.service.consul:8500'
return glob_the_configs(path, routing, consul, self.box_hostname, self.logger)
def purge_stale(self, new_alert_config_list) :
_, removed_ids, modified_ids = self.alert_config_list.compare(new_alert_config_list)
stale_ids = removed_ids.union(modified_ids)
for stale_id in stale_ids :
self.job_list.kill(stale_id)
service.send_stat('removed_jobs', len(removed_ids), dict(), statprefix='aom')
self.info("Removed alert_configs: {}".format(removed_ids))
def create_upserted(self, new_alert_config_list) :
added_ids, _, modified_ids = self.alert_config_list.compare(new_alert_config_list)
upserted_ids = added_ids.union(modified_ids)
for id in upserted_ids :
p = self.spawn_process(new_alert_config_list[id])
j = Job(id, p)
self.job_list.add(j)
service.send_stat('new_jobs', len(added_ids), dict(), statprefix='aom')
service.send_stat('modified_jobs', len(modified_ids), dict(), statprefix='aom')
self.info("Added alert_configs: {}".format(added_ids))
self.info("Modified alert_configs: {}".format(added_ids))
def spawn_process(self, alert_config) :
process_factory = Process_Factory(self.config, self.logger, self.production)
process = process_factory.build(alert_config)
process.start()
return process