QVolution2019.2/sleeper_agents_aom_engine/aom_service.py

241 lines
8.9 KiB
Python
Executable File

#!/usr/bin/python3
""" Alert On Metrics Project"""
import logging
import multiprocessing
import json
import base64
import os
import subprocess
from time import time, sleep
import requests
import yaml
import traceback
from sanic import Sanic, response
from library.args import get_service_args
from serviceapp import service
from library.config import glob_the_configs
from library.logger import AlertLogging
LOG = AlertLogging('aom')
LOG.start()
LOG.start_log_file("logs/aom_service.log")
LOG.start_debug()
APP = Sanic()
SERVICE_JOB = multiprocessing.Value('i', 0)
NUM_JOBS = multiprocessing.Value('i', 0)
LEADERSHIP = multiprocessing.Value('i', 0)
LEADER_STATUS = None
LEADER_TIME = None
CONSUL_URL = None
LEADER_OVERRIDE = None
HOSTNAME = None
SERVICE_CONFIG = None
# move to library
def dict_compare(d1, d2):
"""
Function to compare two dictionaries
"""
d1_keys = set(d1.keys())
d2_keys = set(d2.keys())
intersect_keys = d1_keys.intersection(d2_keys)
added = d1_keys - d2_keys
removed = d2_keys - d1_keys
modified = set(o for o in intersect_keys if d1[o] != d2[o])
return added, removed, modified #, same
@APP.route("/")
async def index(_):
"""
Return total number of jobs
"""
global NUM_JOBS
return response.json({"job_count": NUM_JOBS.value})
@APP.route('/healthcheck')
async def health(request):
"""
Flask healthcheck so that consul and friends work, see this as a service
Returns:
json object of status: ok
"""
LOG.debug("healthcheck")
service_process = multiprocessing.Process(target=start_service, \
args=(LOG, SERVICE_CONFIG['alert_reload_interval']), \
name="service", daemon=False)
# TRY TO START SERVICE, IF LEADER AND NOT RUNNING
if SERVICE_JOB.value == 0:
LOG.info("Starting alerts background job")
SERVICE_JOB.value += 1
service_process.start()#start_service(log)
return response.json({"status": "ok"}, 200)
# @APP.route("/override")
# async def override(request):
# """
# Sets the LEADER_OVERRIDE global parameter to force an override
# """
# global LEADER_OVERRIDE
# if request.args.get('enable') == 'true':
# LEADER_OVERRIDE = True
# elif request.args.get('enable') == 'false':
# LEADER_OVERRIDE = False
# return response.json({"override": LEADER_OVERRIDE})
# def leader():
# """
# Needs to be implemented that goes out to consul and checks if node is leader,
# or if there is no leader volunteers itself.
# Returns:
# bool of True or False.... once the logic gets worked out
# """
# global LEADER_STATUS, LEADER_TIME
# # CHECK IF THERE IS AN ARGUMENT FOR OVERRIDING THE CHECK LEADER
# if LEADER_OVERRIDE is True:
# return True
# # CHECK IF LEADER_TIME IS SET AND THAT IT'S LESS THAN 30 SECONDS FROM LAST SET
# if LEADER_TIME is None or time() - LEADER_TIME > 60:
# LOG.info("Cache has expired or was not set")
# box_hostname = os.environ['HOSTNAME'] if HOSTNAME is None else HOSTNAME
# LOG.info("Getting Leader Election status")
# # RIGHT NOW IN THE CONFIG THIS IS HARD SET TO THE CONSUL1-APP.ENG.OPS.NET
# try:
# r = requests.get(CONSUL_URL, timeout=60)
# assert r.status_code == 200, "Failed to get back a 200 from consul."
# LOG.info("Verify that the Value is {}".format(box_hostname))
# # THE VALUE BACK IS A BASE64 ENCODED BYTE, THAT NEEDS TO BE DECODED,
# # TURNED TO A STRING, THEN TO A DICT
# value = json.loads(base64.b64decode(r.json()[0]['Value']).decode('utf-8'))
# # CACHE THE VALUE AND TIMESTAMP
# if value['HostName'] == box_hostname:
# LEADER_STATUS = True
# LEADER_TIME = time()
# return True
# else:
# return False
# except TimeoutError:
# LOG.error("Timed out connecting to Consul")
# return LEADER_STATUS
# else:
# return LEADER_STATUS
def start_service(log, reload_interval):
"""
Starts the service
Args:
None
Returns:
None
"""
jobs = []
alert_list = []
alert_hash = {}
box_hostname = os.environ['HOSTNAME'] if HOSTNAME is None else HOSTNAME
production_mode = not "TEST" in os.environ
# WAIT FOR LEADER ELECTION TO PASS
# while not leader():
# return False
# # GLOB ALL THE CONFIG FILES TO BUILD POOL OF ALERTS
log.info("Waiting 15s for Consul service to pass")
#sleep(15)
while True:
try:
alert_list = glob_the_configs(SERVICE_CONFIG['alert_folder'], \
SERVICE_CONFIG['alert_routing_lookup'], \
'http://consul.service.consul:8500', box_hostname, log)
except Exception:
log.error("Failed to load config files: {}".format(traceback.format_exc()))
# CREATE THREAD POOL, TO PREVENT RECURSIVE INCLUSION WITH
# MULTIPROCESSING, MOVED FUNCTION TO ANOTHER FILE
log.info("Found {} alerts".format(len(alert_list)))
new_alert_hash = {}
for alert_config in alert_list:
if alert_config['id'] in new_alert_hash.keys():
log.info("Duplicate alert id found: {}. \
Ignoring one of them.".format(alert_config['id']))
else:
new_alert_hash[alert_config['id']] = alert_config
added, removed, modified = dict_compare(new_alert_hash, alert_hash)
log.info("Added alerts {}".format(added))
log.info("Removed alerts {}".format(removed))
log.info("Modified alerts {}".format(modified))
# PROCESSES TO KILL
for al_config in removed.union(modified):
position = None
# Find if process is currently running
for i, job in enumerate(jobs):
if job.name == al_config and job.is_alive():
position = i
# once found exit loop
break
# Terminate process and remove it from the list
log.info("Stopping config: {}".format(jobs[position].name))
subprocess.call(["/bin/kill", "-9", "{}".format(jobs[position].pid)])
jobs[position].join()
NUM_JOBS.value -= 1
log.info("Process stopped succesfully")
jobs.pop(position)
# PROCESSES TO START
alert_configurations = added.union(modified)
alert_configurations = sorted(alert_configurations, key=lambda x:len(new_alert_hash[x].get('resolvedDependencies').getDependencies()))
for al_config in added.union(modified):
if new_alert_hash[al_config].get('query_type') == 'prometheus':
p = multiprocessing.Process(target=service.check_prometheus_alert, \
args=(new_alert_hash[al_config], SERVICE_CONFIG, log, production_mode), \
name=al_config, daemon=True)
else:
p = multiprocessing.Process(target=service.check_kairosdb_alert,\
args=(new_alert_hash[al_config], SERVICE_CONFIG, log, production_mode), \
name=al_config, daemon=True)
jobs.append(p)
log.info("Starting new config: {}".format(p.name))
p.start()
NUM_JOBS.value += 1
# store current list
alert_hash = new_alert_hash.copy()
log.info("Total number of jobs: {}".format(NUM_JOBS.value))
service.send_stat('total_jobs', NUM_JOBS.value, dict(), statprefix='aom')
if added:
service.send_stat('new_jobs', len(added), dict(), statprefix='aom')
if modified:
service.send_stat('modified_jobs', len(modified), dict(), statprefix='aom')
if removed:
service.send_stat('removed_jobs', len(removed), dict(), statprefix='aom')
sleep(reload_interval)
#No longer leader killing all processes
log.info("No longer leader. Exiting alerts background job")
for job in jobs:
# job.terminate() causes the server to stop
subprocess.call(["/bin/kill", "-9", "{}".format(job.pid)])
NUM_JOBS.value -= 1
SERVICE_JOB.value = 0
return False
if __name__ == "__main__":
# GET ARGS AND START LOGGING
ARGS = get_service_args()
logging.setLoggerClass(AlertLogging)
LOG.info("Starting Service")
# GET SERVICE CONFIG
LEADER_OVERRIDE = ARGS['override']
HOSTNAME = ARGS['hostname']
SERVICE_CONFIG = yaml.safe_load(open('service.yaml', 'r').read())
if ARGS['alert_configs'] is not None:
SERVICE_CONFIG['alert_folder'] = ARGS['alert_configs']
if ARGS['alert_routing_lookup'] is not None:
SERVICE_CONFIG['alert_routing_lookup'] = ARGS['alert_routing_lookup']
# SET CONSUL URL FOR LEADER CHECK
CONSUL_URL = SERVICE_CONFIG['consul_url']
# START THE MAIN SERVICE
APP.run(host="0.0.0.0", port=ARGS['port'])