Files

371 lines
12 KiB
Python
Raw Permalink Normal View History

2026-04-07 02:04:22 +05:30
#coding: utf-8
#-------------------------------------------------------------------
# YakPanel
#-------------------------------------------------------------------
# Copyright (c) 2015-2099 YakPanel(www.yakpanel.com) All rights reserved.
#-------------------------------------------------------------------
# Author: cjxin <bt_ahong@yakpanel.com>
#-------------------------------------------------------------------
#------------------------------
# 面板日志类
#------------------------------
import os,re,json,time
from logsModel.base import logsBase
import public,db
from html import unescape,escape
from public.validate import Param
class main(logsBase):
def __init__(self):
super().__init__()
public.get_client_info_db_obj()
def get_logs_info(self,args):
'''
@name 获取分类日志信息
'''
data = public.M('logs').query('''
select type,count(id) as 'count' from logs
group by type
order by count(id) desc
''')
result = []
for arrs in data:
item = {}
if not arrs: continue
item['count'] = arrs[1]
item['type'] = arrs[0]
result.append(item)
public.set_module_logs('get_logs_info','get_logs_info')
return public.return_message(0, 0, result)
def get_logs_bytype(self,args):
"""
@name 根据类型获取日志
@param args.type 日志类型
"""
p,limit = 1,20
if 'p' in args: p = int(args.p)
if 'limit' in args: limit = int(args.limit)
stype = args.stype
search = '[' + str(args.search) + ']'
where = "type=? and log like ? "
count = public.M('logs').where(where,(stype,'%'+search+'%')).count()
data = public.get_page(count,p,limit)
data['data'] = public.M('logs').where(where,(stype,'%'+search+'%')).limit('{},{}'.format(data['shift'], data['row'])).order('id desc').select()
return public.return_message(0, 0, data)
def __get_panel_dirs(self):
'''
@name 获取面板日志目录
'''
dirs = []
for filename in os.listdir('{}/logs/request'.format(public.get_panel_path())):
if filename.find('.json') != -1:
dirs.append(filename)
dirs = sorted(dirs,reverse=True)
return dirs
def get_panel_log(self,get):
"""
@name 获取面板日志
"""
p,limit,search = 1,20,''
if 'p' in get: p = int(get.p)
if 'limit' in get: limit = int(get.limit)
if 'search' in get: search = get.search
find_idx = 0
log_list = []
dirs = self.__get_panel_dirs()
for filename in dirs:
log_path = '{}/logs/request/{}'.format(public.get_panel_path(),filename)
if not os.path.exists(log_path): #文件不存在
continue
if len(log_list) >= limit:
break
p_num = 0 #分页计数器
next_file = False
while not next_file:
if len(log_list) >= limit:
break
p_num += 1
result = self.GetNumLines(log_path,10001,p_num).split('\r\n')
if len(result) < 10000:
next_file = True
result.reverse()
for _line in result:
if not _line: continue
if len(log_list) >= limit:
break
try:
if self.find_line_str(_line,search):
find_idx += 1
if find_idx > (p-1) * limit:
info = json.loads(unescape(_line))
for key in info:
if isinstance(info[key],str):
info[key] = escape(info[key])
info['address'] = info['ip'].split(':')[0]
log_list.append(info)
except:pass
# return public.return_area(log_list,'address')
return public.return_message(0, 0, log_list)
def get_panel_error_logs(self,get):
'''
@name 获取面板错误日志
'''
search = ''
if 'search' in get:
search = get.search
filename = '{}/logs/error.log'.format(public.get_panel_path())
if not os.path.exists(filename):
return public.return_message(-1, 0, public.lang("No error log"))
res = {}
res['data'] = public.xssdecode(self.GetNumLines(filename,2000,1,search))
res['data'].reverse()
return public.return_message(0, 0, res)
def __get_ftp_log_files(self,path):
"""
@name 获取FTP日志文件列表
@param path 日志文件路径
@return list
"""
file_list = []
if os.path.exists(path):
for filename in os.listdir(path):
if filename.find('.log') == -1: continue
file_list.append('{}/{}'.format(path,filename))
file_list = sorted(file_list,reverse=True)
return file_list
def get_ftp_logs(self,get):
"""
@name 获取ftp日志
"""
p,limit,search,username = 1,500,'',''
if 'p' in get: p = int(get.p)
if 'limit' in get: limit = int(get.limit)
if 'search' in get: search = get.search
if 'username' in get: username = get.username
find_idx = 0
ip_list = []
log_list = []
dirs = self.__get_ftp_log_files('{}/ftpServer/Logs'.format(public.get_soft_path()))
for log_path in dirs:
if not os.path.exists(log_path): continue
if len(log_list) >= limit: break
p_num = 0 #分页计数器
next_file = False
while not next_file:
if len(log_list) >= limit:
break
p_num += 1
result = self.GetNumLines(log_path,10001,p_num).split('\r\n')
if len(result) < 10000:
next_file = True
result.reverse()
for _line in result:
if not _line.strip(): continue
if len(log_list) >= limit:
break
try:
if self.find_line_str(_line,search):
#根据用户名查找
if username and not re.search(r'-\s+({})\s+\('.format(username),_line):
continue
find_idx += 1
if find_idx > (p-1) * limit:
#获取ip归属地
for _ip in public.get_line_ips(_line):
if not _ip in ip_list: ip_list.append(_ip)
info = escape(_line)
log_list.append(info)
except:pass
# return self.return_line_area(log_list,ip_list)
return public.return_message(0, 0, log_list)
#取慢日志
def get_slow_logs(self,get):
'''
@name 获取慢日志
@get.search 搜索关键字
'''
search,p,limit = '',1,1000
if 'search' in get: search = get.search
if 'limit' in get: limit = get.limit
my_info = public.get_mysql_info()
if not my_info['datadir']:
return public.return_message(-1, 0, public.lang("MySQL is not installed!"))
path = my_info['datadir'] + '/mysql-slow.log'
if not os.path.exists(path):
return public.return_message(-1, 0, public.lang("Log file does not exist!"))
# mysql慢日志有顺序问题,倒序显示不利于排查问题
return public.return_message(0, 0, public.xsssec(public.GetNumLines(path, limit)))
def IP_geolocation(self, get):
'''
@name 列出所有IP及其归属地
@return list {ip: {ip: ip_address, operation_num: 12 ,info: 归属地}, ...]
'''
result = dict()
data = public.M('logs').query('''
select * from logs
''')
for arrs in data:
if not arrs: continue
end = 0
# 获得IP的尾后索引
for ch in arrs[2]:
if ch.isnumeric() or ch == '.':
end += 1
else:
break
ip_addr = arrs[2][0:end]
if ip_addr:
if result.get(ip_addr) != None:
result[ip_addr]["operation_num"] = result[ip_addr]["operation_num"] + 1
else:
result[ip_addr] = {"ip":ip_addr,"operation_num":1, "info":None}
return_list = []
for k in result:
info = public.get_free_ip_info(k)
result[k]["info"] = info["info"]
return_list.append(result[k])
return public.return_message(0, 0, return_list)
def get_error_logs_by_search(self, args):
'''
@name 根据搜索内容, 获取运行日志中的内容
@args.search 匹配内容
@return 匹配该内容的所有日志
'''
log_file_path = "{}/logs/error.log".format(public.get_panel_path())
#return log_file_path
data = public.readFile(log_file_path)
if not data:
return public.return_message(0, 0, None)
data = data.split('\n')
result = []
for line in data:
if args.search == None:
result.append(line)
elif args.search in line:
result.append(line)
return public.return_message(0, 0, result)
def get_panel_login_log(self, get):
"""
@name 获取面板登录日志
@param get
search : 关键字
login_type: 登陆状态
page : 页码
limit : 每页显示数量
"""
try:
get.validate([
Param("p").Integer(),
Param("limit").Integer(),
Param("search").String(),
], [public.validate.trim_filter()])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.fail_v2(str(ex))
query_conditions = []
query_params = []
# 处理 login_type 条件
if hasattr(get, "login_type"):
login_type = get.login_type
if isinstance(login_type, bytes):
login_type = login_type.decode('utf-8').strip()
elif isinstance(login_type, str):
login_type = login_type.strip()
if login_type:
query_conditions.append("login_type = ?")
query_params.append(login_type)
# search
if hasattr(get, "search"):
search = get.search.strip()
# 处理字节类型的 search
if isinstance(search, bytes):
search = search.decode('utf-8').strip()
elif isinstance(search, str):
search = search.strip()
if search:
query_conditions.append("(remote_addr LIKE ? OR user_agent LIKE ?)")
search_params = "%{}%".format(search)
query_params.extend([search_params, search_params])
# 构建查询
query_string = " AND ".join(query_conditions) if query_conditions else "1=1"
# 分页处理
page = int(get.p) if hasattr(get, 'p') and str(get.p).isdigit() else 1
limit = int(get.limit) if hasattr(get, 'limit') and str(get.limit).isdigit() else 10
offset = (page - 1) * limit
# 执行查询 按 login_time 降序排序
import db
with db.Sql() as db_obj:
db_obj._Sql__DB_FILE = "data/db/client_info.db"
db_obj.table("client_info")
data = db_obj.where(query_string, tuple(query_params)) \
.field("id,remote_addr,remote_port,user_agent,login_time,login_type") \
.order("login_time DESC") \
.limit(str(offset) + ',' + str(limit)) \
.select()
# 获取总数
total = db_obj.where(query_string, tuple(query_params)).count()
return public.success_v2({
"data": public.return_area(data, "remote_addr"),
"total": total
})