#!/usr/bin/python3 """ Alert On Metrics Project""" import logging import multiprocessing import json import base64 import os import subprocess from time import time, sleep import requests import yaml import traceback from sanic import Sanic, response from library.args import get_service_args from serviceapp import service from library.config import glob_the_configs from library.logger import AlertLogging LOG = AlertLogging('aom') LOG.start() LOG.start_log_file("logs/aom_service.log") LOG.start_debug() APP = Sanic() SERVICE_JOB = multiprocessing.Value('i', 0) NUM_JOBS = multiprocessing.Value('i', 0) LEADERSHIP = multiprocessing.Value('i', 0) LEADER_STATUS = None LEADER_TIME = None CONSUL_URL = None LEADER_OVERRIDE = None HOSTNAME = None SERVICE_CONFIG = None # move to library def dict_compare(d1, d2): """ Function to compare two dictionaries """ d1_keys = set(d1.keys()) d2_keys = set(d2.keys()) intersect_keys = d1_keys.intersection(d2_keys) added = d1_keys - d2_keys removed = d2_keys - d1_keys modified = set(o for o in intersect_keys if d1[o] != d2[o]) return added, removed, modified #, same @APP.route("/") async def index(_): """ Return total number of jobs """ global NUM_JOBS return response.json({"job_count": NUM_JOBS.value}) @APP.route('/healthcheck') async def health(request): """ Flask healthcheck so that consul and friends work, see this as a service Returns: json object of status: ok """ LOG.debug("healthcheck") service_process = multiprocessing.Process(target=start_service, \ args=(LOG, SERVICE_CONFIG['alert_reload_interval']), \ name="service", daemon=False) # TRY TO START SERVICE, IF LEADER AND NOT RUNNING if SERVICE_JOB.value == 0: LOG.info("Starting alerts background job") SERVICE_JOB.value += 1 service_process.start()#start_service(log) return response.json({"status": "ok"}, 200) # @APP.route("/override") # async def override(request): # """ # Sets the LEADER_OVERRIDE global parameter to force an override # """ # global LEADER_OVERRIDE # if request.args.get('enable') == 'true': # LEADER_OVERRIDE = True # elif request.args.get('enable') == 'false': # LEADER_OVERRIDE = False # return response.json({"override": LEADER_OVERRIDE}) # def leader(): # """ # Needs to be implemented that goes out to consul and checks if node is leader, # or if there is no leader volunteers itself. # Returns: # bool of True or False.... once the logic gets worked out # """ # global LEADER_STATUS, LEADER_TIME # # CHECK IF THERE IS AN ARGUMENT FOR OVERRIDING THE CHECK LEADER # if LEADER_OVERRIDE is True: # return True # # CHECK IF LEADER_TIME IS SET AND THAT IT'S LESS THAN 30 SECONDS FROM LAST SET # if LEADER_TIME is None or time() - LEADER_TIME > 60: # LOG.info("Cache has expired or was not set") # box_hostname = os.environ['HOSTNAME'] if HOSTNAME is None else HOSTNAME # LOG.info("Getting Leader Election status") # # RIGHT NOW IN THE CONFIG THIS IS HARD SET TO THE CONSUL1-APP.ENG.OPS.NET # try: # r = requests.get(CONSUL_URL, timeout=60) # assert r.status_code == 200, "Failed to get back a 200 from consul." # LOG.info("Verify that the Value is {}".format(box_hostname)) # # THE VALUE BACK IS A BASE64 ENCODED BYTE, THAT NEEDS TO BE DECODED, # # TURNED TO A STRING, THEN TO A DICT # value = json.loads(base64.b64decode(r.json()[0]['Value']).decode('utf-8')) # # CACHE THE VALUE AND TIMESTAMP # if value['HostName'] == box_hostname: # LEADER_STATUS = True # LEADER_TIME = time() # return True # else: # return False # except TimeoutError: # LOG.error("Timed out connecting to Consul") # return LEADER_STATUS # else: # return LEADER_STATUS def start_service(log, reload_interval): """ Starts the service Args: None Returns: None """ jobs = [] alert_list = [] alert_hash = {} box_hostname = os.environ['HOSTNAME'] if HOSTNAME is None else HOSTNAME production_mode = not "TEST" in os.environ # WAIT FOR LEADER ELECTION TO PASS # while not leader(): # return False # # GLOB ALL THE CONFIG FILES TO BUILD POOL OF ALERTS log.info("Waiting 15s for Consul service to pass") #sleep(15) while True: try: alert_list = glob_the_configs(SERVICE_CONFIG['alert_folder'], \ SERVICE_CONFIG['alert_routing_lookup'], \ 'http://consul.service.consul:8500', box_hostname, log) except Exception: log.error("Failed to load config files: {}".format(traceback.format_exc())) # CREATE THREAD POOL, TO PREVENT RECURSIVE INCLUSION WITH # MULTIPROCESSING, MOVED FUNCTION TO ANOTHER FILE log.info("Found {} alerts".format(len(alert_list))) new_alert_hash = {} for alert_config in alert_list: if alert_config['id'] in new_alert_hash.keys(): log.info("Duplicate alert id found: {}. \ Ignoring one of them.".format(alert_config['id'])) else: new_alert_hash[alert_config['id']] = alert_config added, removed, modified = dict_compare(new_alert_hash, alert_hash) log.info("Added alerts {}".format(added)) log.info("Removed alerts {}".format(removed)) log.info("Modified alerts {}".format(modified)) # PROCESSES TO KILL for al_config in removed.union(modified): position = None # Find if process is currently running for i, job in enumerate(jobs): if job.name == al_config and job.is_alive(): position = i # once found exit loop break # Terminate process and remove it from the list log.info("Stopping config: {}".format(jobs[position].name)) subprocess.call(["/bin/kill", "-9", "{}".format(jobs[position].pid)]) jobs[position].join() NUM_JOBS.value -= 1 log.info("Process stopped succesfully") jobs.pop(position) # PROCESSES TO START alert_configurations = added.union(modified) alert_configurations = sorted(alert_configurations, key=lambda x:len(new_alert_hash[x].get('resolvedDependencies').getDependencies())) for al_config in added.union(modified): if new_alert_hash[al_config].get('query_type') == 'prometheus': p = multiprocessing.Process(target=service.check_prometheus_alert, \ args=(new_alert_hash[al_config], SERVICE_CONFIG, log, production_mode), \ name=al_config, daemon=True) else: p = multiprocessing.Process(target=service.check_kairosdb_alert,\ args=(new_alert_hash[al_config], SERVICE_CONFIG, log, production_mode), \ name=al_config, daemon=True) jobs.append(p) log.info("Starting new config: {}".format(p.name)) p.start() NUM_JOBS.value += 1 # store current list alert_hash = new_alert_hash.copy() log.info("Total number of jobs: {}".format(NUM_JOBS.value)) service.send_stat('total_jobs', NUM_JOBS.value, dict(), statprefix='aom') if added: service.send_stat('new_jobs', len(added), dict(), statprefix='aom') if modified: service.send_stat('modified_jobs', len(modified), dict(), statprefix='aom') if removed: service.send_stat('removed_jobs', len(removed), dict(), statprefix='aom') sleep(reload_interval) #No longer leader killing all processes log.info("No longer leader. Exiting alerts background job") for job in jobs: # job.terminate() causes the server to stop subprocess.call(["/bin/kill", "-9", "{}".format(job.pid)]) NUM_JOBS.value -= 1 SERVICE_JOB.value = 0 return False if __name__ == "__main__": # GET ARGS AND START LOGGING ARGS = get_service_args() logging.setLoggerClass(AlertLogging) LOG.info("Starting Service") # GET SERVICE CONFIG LEADER_OVERRIDE = ARGS['override'] HOSTNAME = ARGS['hostname'] SERVICE_CONFIG = yaml.safe_load(open('service.yaml', 'r').read()) if ARGS['alert_configs'] is not None: SERVICE_CONFIG['alert_folder'] = ARGS['alert_configs'] if ARGS['alert_routing_lookup'] is not None: SERVICE_CONFIG['alert_routing_lookup'] = ARGS['alert_routing_lookup'] # SET CONSUL URL FOR LEADER CHECK CONSUL_URL = SERVICE_CONFIG['consul_url'] # START THE MAIN SERVICE APP.run(host="0.0.0.0", port=ARGS['port'])