
#! /usr/bin/env python
# -*- coding:utf-8 -*-
import sys, getopt
import os
import re
import subprocess
import time
#分析MySQL慢日志工具的路径
MYSQLDUMPSLOW = ""
#存放分析结果的临时目录
TMP_FOLDER = "/tmp"
#记录每次至多从日志文件中读取的内容,防止日志文件过大导致处理速度变慢
SLOW_LOG_BAK = "slow_query_log.bak"
#记录上次读取到日志文件的行数
LAST_POSITION_FILE = 'slow_query.ln'
#临时的记录分析结果
SLOW_ANALYSIS ='slow_analysis'
#最大读取的行数
MAX_READ_LINE = 5000
#最大读取的字节数,单位为Byte
MAX_READ_SIZE = 10 * 1024 * 1024
def is_blank(string):
if string is None or len(string) == 0:
return True
return False
#获取主机名
def hostname():
sys = os.name
if sys == 'nt':
hostname = os.getenv('computername')
return hostname.strip()
elif sys == 'posix':
host = os.popen('echo $HOSTNAME')
try:
hostname = host.read()
return hostname.strip()
finally:
host.close()
else:
return 'Unkwon hostname'
#MySQL慢日志存放路径
SLOW_LOG_FILE = ""
#SLOW_LOG_FILE = "/data/mysqldata/3306/logs/3306-slow.log"
#SLOW_LOG_FILE = "/data/mysql/data/%s-slow.log" % hostname()
#SLOW_LOG_FILE = "/var/log/mysql/mysql-slow_bak.log"
#SLOW_LOG_FILE = "/mnt/Windows/F/h0068019-slow.log"
#SLOW_LOG_FILE = "/tmp/h0068014-slow.log"
def usage():
print "\nUsage check_slow_query:"
print "\t-f mysqldumpslow可执行程序的路径"
# print "\t-w 设置告警状态的慢查询语句阈值"
# print "\t-c 设置紧急状态的慢查询语句阈值"
print "\t-l 设置mysql慢查询日志文件的路径"
print "\t-h 打印帮助信息"
def get_options():
path = None
warning = None
critical = None
slowLogPath = None
try:
options, args = getopt.getopt(sys.argv[1:],"h:f:l:",["help","path=", "logSlowPath="])
for name, value in options:
if name in ('-h', '--help'):
usage()
if name in('-f', '--path'):
path = value
if name in('-l', '--logSlowPath'):
slowLogPath = value
except getopt.GetoptError:
print "无效的参数!"
usage()
return path,slowLogPath
#根据指定的分隔符从key=value字符串中取出value
def get_value(content, start, end = ""):
start_pos = content.find(start);
start_len = len(start);
if end == "":
end_pos = len(content)
else:
end_pos = content.find(end);
return content[start_pos + start_len : end_pos]
def get_basic_info(line):
host=" "
query_time=""
items = re.split(" |, ", line)
for item in items:
if re.match("Time=", item):
query_time = get_value(item, "=", "s")
elif re.match(".+?@.+?", item):
host = get_value(item, "@")
host = host.replace("[","")
host = host.replace("]","")
return query_time, host
def get_host(line):
items = line.split(" ")
for item in items:
if re.match("^\[", item):
return item[1:len(item) - 1]
def get_query_time(line):
items = line.split(" ")
for item in items:
if re.match("^# Query_time", item):
query_time = get_value(item, ": ")
return query_time
def filter_sql(sql):
items = sql.split(";")
ret = []
for item in items:
if not re.match("^\s*(USE|use|SET|set)", item):
ret.append(item)
return "".join(ret)
def get_output(block):
if len(block) == 0:
return None
ts=time.time()
query_time = 0
host=" "
sql = []
values = {}
count = 1
for line in block.split("\n"):
if re.match("^Count", line):
query_time, host = get_basic_info(line)
count = get_value(line, ": ", " ")
elif re.match("^ # User@Host", line):
host = get_host(line)
elif re.match("^ (?:# )?Time", line): #do nothing
start_pos = line.find(": ")
end_pos = line.find(".")
if start_pos != -1 and end_pos != -1:
ts = time.mktime(time.strptime(line[(start_pos + 2): end_pos], '%Y-%m-%dT%H:%M:%S'))
elif re.match("^ # Query_time", line): #覆盖basic_info中或取到的query_time和lock_time
query_time = get_query_time(line)
elif re.match("^ \w+", line):
sql.append(line)
stmt = filter_sql("".join(sql))
values["count"] = count
values["stmt"] = stmt
values["query_time"] = query_time
values["host"] = host
values["ts"] = ts
return values
def read_file(fp, last_pos):
count = 0
current_size = 0
lines = []
fp.seek(last_pos)
for line in fp:
#每次至多读取MAX_READ_LINE行或者是MAX_READ_SIZE字节的内容
if count < MAX_READ_LINE and current_size < MAX_READ_SIZE:
lines.append(line)
count += 1
current_size += len(line)
#为了保证记录的完整性,每次会向前至多(仅在读取MAX_READ_LINE或MAX_READ_SIZE时读取的记录都是完整时)读取一条完整的记录
elif not re.match("^# Time:", line):
lines.append(line)
count += 1
current_size += len(line)
else:
break
pos_file = open("./%s" % LAST_POSITION_FILE, "w")
pos_file.write("%d" % (current_size + last_pos)) #记录上次处理日志文件的行数
pos_file.close()
return lines
def split_file():
if os.path.exists("./%s" % LAST_POSITION_FILE) :
last_position = int(open("./%s" % LAST_POSITION_FILE, "r").readline())
else:
last_position = 0
try:
fp = open(SLOW_LOG_FILE, "rb")
total_size = os.path.getsize(SLOW_LOG_FILE)
if last_position >= total_size:
sys.exit(0)
except IOError:
print "%s文件不存在" % SLOW_LOG_FILE
sys.exit(3)
#从上次处理的文件位置开始分割文件
lines = read_file(fp, last_position)
fp.close()
slow_log_bak = "%s/%s" % (TMP_FOLDER, SLOW_LOG_BAK)
slow_bak_file = open(slow_log_bak, "w")
slow_bak_file.writelines(lines)
slow_bak_file.close()
return slow_log_bak
def remove_quoted_text(content):
regex = re.compile("(?:'.*?'|\".*?\")([^\w]|$)", re.S)
return regex.sub(r"?\1", content)
def remove_numbers(content):
pattern = r"([^\w_])-?(?:(?:(?:0x|X)[0-9a-fA-F]+)|(?:[0-9]+(?:\.[0-9]+)?))"
return re.sub(pattern, r'\1?', content)
def remove_spaces(content):
regex = re.compile(r'\s\s+', re.S)
content = regex.sub(" ", content)
regex = re.compile(r'\s+(\)|,)', re.S)
content = regex.sub(r' \1', content)
regex = re.compile(r'(\(|,)\s+', re.S)
content = regex.sub(r'\1', content)
return content.strip()
def remove_multiple_insert_value(content):
if not content.startswith("INSERT"):
return content
values_pos = content.find("VALUES")
if values_pos == -1:
return content
right_bracket_pos = content.find(")", values_pos)
if right_bracket_pos == -1:
return content
return content[ : right_bracket_pos + 1]
def normalize(content):
content = remove_quoted_text(content)
content = remove_numbers(content)
content = remove_spaces(content)
content = remove_multiple_insert_value(content)
return content
def post_process(outputs):
result = {}
for item in outputs:
stmt = normalize(item["stmt"])
host = item["host"]
query_time = float(item["query_time"])
ts = item["ts"]
key = "%s___%s" % (host, stmt)
if not result.has_key(key):
result[key] = {"count": item["count"], "max_query_time": query_time, "total_query_time": query_time, "min_query_time": query_time, "ts":ts}
else:
data = result[key]
data["count"] = int(data["count"]) + int(item["count"])
data["max_query_time"] = (query_time if query_time > data["max_query_time"] else data["max_query_time"])
data["min_query_time"] = (query_time if query_time < data["min_query_time"] else data["min_query_time"])
data["total_query_time"] = data["total_query_time"] + query_time
data["ts"] = (ts if ts > data["ts"] else data["ts"])
return result
def analyze(path, slow_log):
MYSQLDUMPSLOW = "%s/%s" % (path, "mysqldumpslow")
args = [MYSQLDUMPSLOW, "-a", slow_log]
process = subprocess.Popen(args, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
stdout, stderr = process.communicate()
outputs = []
for line in stdout.split("\n\n"):
result = get_output(line)
if result is not None:
outputs.append(result)
post_result = post_process(outputs)
return post_result
def parse_output(outputs):
strs = []
sumcount = 0;
for (key, value) in outputs.items():
host_stmt = key.split("___")
host = host_stmt[0]
stmt = host_stmt[1]
max_query_time = value["max_query_time"]
avg_query_time = float(value["total_query_time"]) / int(value["count"])
min_query_time = value["min_query_time"]
count = value["count"]
ts = value["ts"]
sumcount+=int(count)
content = "%d<<:>>%s<<:>>%d<<:>>%.3f<<:>>%.3f<<:>>%.3f<<:>>%s<<###>>" % (ts, stmt, int(count), max_query_time, min_query_time, avg_query_time, host)
strs.append(content)
return "%d<<COUNT>>|MySQL_Slow_Query<<###>>%s" % (sumcount, ''.join(strs))
def monitor(path):
slow_log = split_file()
outputs = analyze(path, slow_log)
if len(outputs) != 0:
return parse_output(outputs)
else:
return ""
def main():
path,slowLogPath = get_options()
#if is_blank(warning) or is_blank(critical):
# print "必须提供用于设置监控项状态的阈值."
# usage()
# sys.exit(3)
if is_blank(path):
path = '/usr/local/mysql/bin'
if is_blank(slowLogPath):
print "必须提供用于慢查询的日志路径."
usage()
sys.exit(3)
global SLOW_LOG_FILE
SLOW_LOG_FILE = slowLogPath
print monitor(path)
if __name__ == '__main__':
main()