python监控oozie组件的任务失败,任务未执行,并报警到钉钉
import pymysql
import sys
import os
import requests
import hmac
import hashlib
import base64
import urllib.parse
import time
import datetime
import logging
FORMAT = "%(message)s"
DATEFMT = "%Y-%m-%d %H:%M:%S"
FILE_FORMAT = "%Y_%m_%d_%H"
oozie_log = os.path.join(f'{time.strftime(FILE_FORMAT)}')
logging.basicConfig(level=logging.INFO, format=FORMAT, datefmt=DATEFMT, filename=oozie_log)
logger = logging.getLogger(__name__)
class OozieJob(object):
@staticmethod
def conn_oozie_mysql():
try:
MYSQL_CONFIG = {'host': '127.0.0.1',
'user': 'oozie',
'password': 'oozie',
'database': 'oozie',
'port': 3306
}
db = pymysql.connect(**MYSQL_CONFIG)
return db
except Exception as err:
logger.info(f'db connect error ====> {err}')
db_msg = {
"msgtype": "markdown",
"markdown": {
"title": "Oozie 数据库连接失败",
"text": f'title: Oozie 数据库连接失败 \n\n job_name: conn_oozie_mysql \n\n err_time: {time.time()} \n\n err_msg: {err}'
}
}
send_ding_msg(db_msg)
@staticmethod
def fail_job(db):
cur = db.cursor()
sql = "select id,app_name,user_name,end_time,start_time,status from WF_JOBS where status='KILLED' and end_time between date_add(now(),interval-3 minute) and now()"
cur.execute(sql)
datalist = cur.fetchall()
for (id, app_name, user_name, end_time, start_time, status) in datalist:
fail_msg = {
"msgtype": "markdown",
"markdown": {
"title": "Oozie任务失败",
"text": f"title: Oozie任务失败 \n\n job_name: {app_name},\n\n job_id: {id},\n\n start_time: {start_time},\n\n end_time: {end_time},\n\n job_stauts: {status}\n\n"
}
}
if check_send_log(id, oozie_log) is False:
send_ding_msg(fail_msg)
logger.info(id)
@staticmethod
def not_exec_job(db):
cur = db.cursor()
sql_ys_job = "SELECT DISTINCT(app_name) as app_name FROM WF_JOBS WHERE LEFT(start_time, 10) = LEFT(DATE_ADD(NOW(), INTERVAL -1 DAY), 10) AND STATUS = 'SUCCEEDED'"
sql_td_job = "SELECT DISTINCT(app_name) as app_name FROM WF_JOBS WHERE LEFT(start_time, 10) = LEFT(DATE_ADD(NOW(), INTERVAL 0 DAY), 10) "
cur.execute(sql_ys_job)
ys_job = cur.fetchall()
cur.execute(sql_td_job)
td_job = cur.fetchall()
if len(td_job) < len(ys_job) or len(td_job) == 0:
not_msg = {
"msgtype": "markdown",
"markdown": {
"title": "Oozie任务执行异常",
"text": f"title: Oozie任务执行异常 \n\n yesterday_job_counts: {len(ys_job)},\n\n today_job_counts: {len(td_job)}, \n\n job_diff: {len(td_job) - len(ys_job)},\n\n msg_time: {time.strftime(DATEFMT)},\n\n "
}
}
send_ding_msg(not_msg)
logger.info('oozie job exec error')
@staticmethod
def delay_job(db):
cur = db.cursor()
sql = "select id,app_name,user_name,start_time,TIMESTAMPDIFF(MINUTE, start_time, now()) as diff from WF_JOBS where status='RUNNING' and TIMESTAMPDIFF(MINUTE, start_time, now()) >60 and app_name <> 'error_log_track_workflow'"
cur.execute(sql)
datalist = cur.fetchall()
for (id, app_name, user_name, start_time, diff) in datalist:
delay_msg = {
"msgtype": "markdown",
"markdown": {
"title": "Oozie任务延迟告警",
"text": f"title: Oozie任务延迟告警 \n\n job_name: {app_name},\n\n job_id: {id},\n\n start_time: {start_time},\n\n running_time: {diff},\n\n job_stauts: RUNNING\n\n"
}
}
if check_send_log(id, oozie_log) is False:
send_ding_msg(delay_msg)
logger.info(id)
def check_log(job_id, send_log):
with open(send_log, 'r', encoding='utf-8') as s:
for line in s:
line = line.replace('\n', '')
if line == job_id:
return True
return False
def get_ding_sign(timestamp, secret):
"""
钉钉机器人API验签
"""
secret_encode = secret.encode('utf-8')
str_to_sign = f'{timestamp}\n{secret}'
str_to_sign_enc = str_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_encode, str_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return sign
def send_ding_msg(msg):
url = 'https://oapi.dingtalk.com/robot/send?access_token=token(自己的哦)'
secret = 'secret(自己的哦)'
timestamp = str(round(time.time() * 1000))
sign = get_ding_sign(timestamp=timestamp, secret=secret)
url = url + '×tamp=' + timestamp + '&sign=' + sign
try:
req = requests.post(url, json=msg)
print(msg)
result = req.json()
print(req.status_code)
if result['errcode'] != 0:
print(f'notify dingtalk error: {result["errcode"]}')
except Exception as err:
print(err)
if __name__ == '__main__':
start_time = time.strftime(DATEFMT)
logger.info(f'==== oozie monitor start ====> {start_time}')
oz_cli = OozieJob()
db = oz_cli.conn_oozie_mysql()
oz_cli.fail_job(db)
time.sleep(0.1)
oz_cli.delay_job(db)
time.sleep(0.2)
if datetime.datetime.now().hour == 7:
oz_cli.not_exec_job(db)
end_time = time.strftime(DATEFMT)
logger.info(f'==== oozie monitor end ====> {start_time}')
JAVA报警到钉钉
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.squareup.okhttp.*;
import org.apache.commons.net.util.Base64;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
public class Demo {
public static void main(String[] args) {
post2DingDing("测试2");
}
public static String post2DingDing(String message){
String secret = "secret自己的哦";
String url = "https://oapi.dingtalk.com/robot/send?access_token=token自己的哦";
long timestamp = System.currentTimeMillis();
String str="{ \"at\": { \"isAtAll\": true }, \"text\": { \"content\": \" %s \"}, \"msgtype\":\"text\" }";
String format = String.format(str, message);
JSONObject json = JSON.parseObject(format);
RequestBody requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), json.toJSONString());
Request request = new Request.Builder().url(getSign(timestamp, secret, url)).post(requestBody).build();
try {
Response response = new OkHttpClient().newCall(request).execute();
if (response.body() != null){
return response.body().string();
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public static String getSign(long timestamp,String secret,String url){
String string_to_sign = timestamp + "\n" +secret;
byte[] sign_data = null;
String sign = "";
try {
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8),"HmacSHA256"));
sign_data = mac.doFinal(string_to_sign.getBytes(StandardCharsets.UTF_8));
sign = url + "×tamp=" + timestamp + "&sign=" + URLEncoder.encode(new String(Base64.encodeBase64(sign_data),"UTF-8"));
} catch (NoSuchAlgorithmException | InvalidKeyException | UnsupportedEncodingException e) {
e.printStackTrace();
}
return sign;
}
}
|