Initial YakPanel commit

This commit is contained in:
Niranjan
2026-04-07 02:04:22 +05:30
commit 2826d3e7f3
5359 changed files with 1390724 additions and 0 deletions

View File

@@ -0,0 +1,10 @@
import public
import public.PluginLoader as plugin_loader
from wp_toolkit.core import wpmgr, wp_version, wpfastcgi_cache, wpbackup, wpmigration, wpdeployment, wp_sets, wpmgr_remote
security_m = plugin_loader.get_module('{}/class_v2/wp_toolkit/security.py'.format(public.get_panel_path()))
wp_security = security_m.wp_security

8258
class_v2/wp_toolkit/core.py Normal file

File diff suppressed because it is too large Load Diff

Binary file not shown.

View File

@@ -0,0 +1,225 @@
# WP Toolkit security
# author: hzh<2024-07-02>
import public
from public.validate import Param
# WP安全模块类
class wp_security:
# 开启文件防护
def open_file_protection(self, get):
# 校验参数
try:
get.validate([
Param('paths').Require().String(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
tamper_core_status = public.run_plugin('tamper_core', 'get_service_status', public.to_dict_obj({}))
if not (tamper_core_status["kernel_module_status"] and tamper_core_status["controller_status"]):
return public.return_message(-1, 0, public.lang("Sorry, opening failed, please go to the App Store --Tamper-proof for Enterprise to view details"))
result= public.run_plugin('tamper_core', 'multi_create', public.to_dict_obj({
'paths': get.paths,
}))
if 'status' in result:
return public.return_message(-1,0,result[0]['msg'])
if type(result)==list:
# for i in result:
public.run_plugin('tamper_core', 'assign_rule_to_directory', public.to_dict_obj({
'rule_group_name': 'WordPress Normal',
'path_id': result[0]['pid'],
}))
return public.return_message(0, 0, public.lang("Successfully added file protection"))
#取安全防护配置
def get_security_info(self, get):
# 校验参数
try:
get.validate([
Param('path').Require().String(),
Param('site_name').Require().String(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
return_result={
'hotlink_status':0,
'file_status':0,
'firewall_status':0,
'file_count':0,
'firewall_count':0,
}
if get.path[-1] !='/':
get.path+='/'
#取文件防护配置
try:
tamper_core_status = public.run_plugin('tamper_core', 'get_service_status', public.to_dict_obj({}))
if tamper_core_status["kernel_module_status"] and tamper_core_status["controller_status"]:
result= public.run_plugin('tamper_core', 'get_tamper_paths', public.to_dict_obj({}))
if type(result)==list:
for i in result:
if i['path']==get.path and i['status']==1:
return_result['file_status']=1
today=i['total']['today']
return_result['file_count']=int(today['create'])+int(today['modify'])+int(today['unlink'])+int(today['rename'])+int(today['mkdir'])+int(today['rmdir'])+int(today['chmod'])+int(today['chown'])+int(today['link'])
break
except:pass
#取防火墙配置
result= public.run_plugin('btwaf', 'get_site_config_byname', public.to_dict_obj({'siteName':get.site_name}))
try:
if result['open']:
return_result['firewall_status']=1
except:
pass
result= public.run_plugin('btwaf', 'get_site_config3', public.to_dict_obj({'siteName':get.site_name}))
try:
if type(result['data'])==list:
for i in result['data']:
if i['siteName']!=get.site_name:
continue
if type(i['total'])==list:
for total in i['total']:
return_result['firewall_count']+=int(total['value'])
except:
pass
return public.return_message(0,0,return_result)
#关闭文件保护
def close_file_protection(self, get):
# 校验参数
try:
get.validate([
Param('path_id').Require().String(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
tamper_core_status = public.run_plugin('tamper_core', 'get_service_status', public.to_dict_obj({}))
if not (tamper_core_status["kernel_module_status"] and tamper_core_status["controller_status"]):
return public.return_message(-1, 0, public.lang("Sorry, opening failed, please go to the App Store --Tamper-proof for Enterprise to view details"))
result= public.run_plugin('tamper_core', 'remove_path_config', public.to_dict_obj({
'path_id': get.path_id,
}))
status=0
if not result['status']:
status=-1
return public.return_message(status,0,result['msg'])
# 开启防火墙防护
def open_firewall_protection(self, get):
# 校验参数
try:
get.validate([
Param('site_name').Require().String(),
Param('obj').Require().String(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
result= public.run_plugin('btwaf', 'get_total_all', public.to_dict_obj({
}))
if not result['open']:
result= public.run_plugin('btwaf', 'set_open', public.to_dict_obj({
}))
if not result['status']:
return public.return_message(-1, 0, public.lang("Failed to open firewall"))
result= public.run_plugin('btwaf', 'get_site_config_byname', public.to_dict_obj({
'siteName': get.site_name,
}))
if not result['open']:
result= public.run_plugin('btwaf', 'set_site_obj_open', public.to_dict_obj({
'siteName': get.site_name,
'obj': get.obj,
}))
if not result['status']:
return public.return_message(-1, 0, public.lang("Failed to open firewall"))
return public.return_message(0, 0, public.lang("Successfully opened firewall"))
# 关闭防火墙防护
def close_firewall_protection(self, get):
# 校验参数
try:
get.validate([
Param('site_name').Require().String(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
result= public.run_plugin('btwaf', 'set_site_obj_open', public.to_dict_obj({
'siteName': get.site_name,
'obj': 'open',
}))
if not result['status']:
return public.return_message(-1, 0, public.lang("Failed to close firewall"))
return public.return_message(0, 0, public.lang("Successfully closeed firewall"))
# 获取防火墙防护配置
def get_firewall_info(self, get):
# 校验参数
try:
get.validate([
Param('site_name').Require().String(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
result= public.run_plugin('btwaf', 'get_site_config_byname', public.to_dict_obj({
'siteName':get.site_name,
}))
return public.return_message(0,0,result)
# 获取文件防护配置
def get_file_info(self, get):
# 校验参数
try:
get.validate([
Param('path').Require().String(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
if get.path[-1] !='/':
get.path+='/'
result= public.run_plugin('tamper_core', 'get_tamper_paths', public.to_dict_obj({
'path':get.path,
}))
for i in result:
if i['path']==get.path:
return public.return_message(0,0,i)
return public.return_message(0,0,{})

View File

@@ -0,0 +1,440 @@
# coding: utf-8
# +-------------------------------------------------------------------
# | YakPanel
# +-------------------------------------------------------------------
# | Copyright (c) 2015-2099 YakPanel(https://www.yakpanel.com) All rights reserved.
# +-------------------------------------------------------------------
# | Author: hwliang <hwliang@yakpanel.com>
# +-------------------------------------------------------------------
import sqlite3
import os, time, sys
def ReadFile(filename,mode = 'r'):
"""
读取文件内容
@filename 文件名
return string(bin) 若文件不存在则返回None
"""
import os
if not os.path.exists(filename): return False
fp = None
try:
fp = open(filename, mode)
f_body = fp.read()
except Exception as ex:
if sys.version_info[0] != 2:
try:
fp = open(filename, mode,encoding="utf-8",errors='ignore')
f_body = fp.read()
except:
fp = open(filename, mode,encoding="GBK",errors='ignore')
f_body = fp.read()
else:
return False
finally:
if fp and not fp.closed:
fp.close()
return f_body
def readFile(filename,mode='r'):
'''
@name 读取指定文件数据
@author hwliang<2021-06-09>
@param filename<string> 文件名
@param mode<string> 文件打开模式默认r
@return string or bytes or False 如果返回False则说明读取失败
'''
return ReadFile(filename,mode)
class Sql():
# ------------------------------
# 数据库操作类 For sqlite3
# ------------------------------
__DB_FILE = None # 数据库文件
__DB_CONN = None # 数据库连接对象
__DB_TABLE = "" # 被操作的表名称
__OPT_WHERE = "" # where条件
__OPT_LIMIT = "" # limit条件
__OPT_ORDER = "" # order条件
__OPT_FIELD = "*" # field条件
__OPT_PARAM = () # where值
__LOCK = 'sqlite_lock.pl'
def __init__(self,wordpress_db = 'wordpress_vulnerabilities'):
#获取当前文件的所在目录
self.__DB_FILE = os.path.join(os.path.dirname(__file__), wordpress_db)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_trackback):
self.close()
def __del__(self):
self.close()
def __GetConn(self):
# 取数据库对象
try:
if self.__DB_CONN == None:
self.__DB_CONN = sqlite3.connect(self.__DB_FILE)
self.__DB_CONN.text_factory = str
except Exception as ex:
return "error: " + str(ex)
def dbfile(self, name):
self.__DB_FILE = os.path.join(os.path.dirname(__file__), name)
return self
def table(self, table):
# 设置表名
self.__DB_TABLE = table
return self
def where(self, where, param):
# WHERE条件
if where:
self.__OPT_WHERE = " WHERE " + where
self.__OPT_PARAM = self.__to_tuple(param)
return self
def __to_tuple(self, param):
# 将参数转换为tuple
if type(param) != tuple:
if type(param) == list:
param = tuple(param)
else:
param = (param,)
return param
def order(self, order):
# ORDER条件
if len(order):
self.__OPT_ORDER = " ORDER BY " + order
return self
def limit(self, limit):
# LIMIT条件
if len(limit):
self.__OPT_LIMIT = " LIMIT " + limit
return self
def field(self, field):
# FIELD条件
if len(field):
self.__OPT_FIELD = field
return self
def select(self):
# 查询数据集
self.__GetConn()
try:
self.__get_columns()
sql = "SELECT " + self.__OPT_FIELD + " FROM " + self.__DB_TABLE + self.__OPT_WHERE + self.__OPT_ORDER + self.__OPT_LIMIT
result = self.__DB_CONN.execute(sql, self.__OPT_PARAM)
data = result.fetchall()
# 构造字典系列
if self.__OPT_FIELD != "*":
fields = self.__format_field(self.__OPT_FIELD.split(','))
tmp = []
for row in data:
i = 0
tmp1 = {}
for key in fields:
tmp1[key.strip('`')] = row[i]
i += 1
tmp.append(tmp1)
del (tmp1)
data = tmp
del (tmp)
else:
# 将元组转换成列表
tmp = list(map(list, data))
data = tmp
del (tmp)
self.__close()
return data
except Exception as ex:
return "error: " + str(ex)
def select_by(self):
# 查询数据集
self.__GetConn()
try:
self.__get_columns()
sql = "SELECT " + self.__OPT_FIELD + " FROM " + self.__DB_TABLE + self.__OPT_WHERE + self.__OPT_ORDER + self.__OPT_LIMIT
print(sql)
print(self.__OPT_PARAM)
result = self.__DB_CONN.execute(sql, self.__OPT_PARAM)
data = result.fetchall()
# 构造字典系列
if self.__OPT_FIELD != "*":
fields = self.__format_field(self.__OPT_FIELD.split(','))
tmp = []
for row in data:
i = 0
tmp1 = {}
for key in fields:
tmp1[key.strip('`')] = row[i]
i += 1
tmp.append(tmp1)
del (tmp1)
data = tmp
del (tmp)
else:
# 将元组转换成列表
tmp = list(map(list, data))
data = tmp
del (tmp)
self.__close()
return data
except Exception as ex:
return "error: " + str(ex)
def get(self):
self.__get_columns()
return self.select()
def __format_field(self, field):
import re
fields = []
for key in field:
s_as = re.search(r'\s+as\s+', key, flags=re.IGNORECASE)
if s_as:
as_tip = s_as.group()
key = key.split(as_tip)[1]
fields.append(key)
return fields
def __get_columns(self):
if self.__OPT_FIELD == '*':
tmp_cols = self.query('PRAGMA table_info(' + self.__DB_TABLE + ')', ())
cols = []
for col in tmp_cols:
#bytes 转为str
col = list(map(lambda x: x.decode('utf-8') if type(x) == bytes else x, col))
if len(col) > 2: cols.append('`' + col[1] + '`')
if len(cols) > 0: self.__OPT_FIELD = ','.join(cols)
def getField(self, keyName):
# 取回指定字段
try:
result = self.field(keyName).select()
if len(result) != 0:
return result[0][keyName]
return result
except:
return None
def setField(self, keyName, keyValue):
# 更新指定字段
return self.save(keyName, (keyValue,))
def find(self):
# 取一行数据
try:
result = self.limit("1").select()
if len(result) == 1:
return result[0]
return result
except:
return None
def count(self):
# 取行数
key = "COUNT(*)"
data = self.field(key).select()
try:
return int(data[0][key])
except:
return 0
def add(self, keys, param):
# 插入数据
self.write_lock()
self.__GetConn()
self.__DB_CONN.text_factory = str
try:
values = ""
for key in keys.split(','):
values += "?,"
values = values[0:len(values) - 1]
sql = "INSERT INTO " + self.__DB_TABLE + "(" + keys + ") " + "VALUES(" + values + ")"
result = self.__DB_CONN.execute(sql, self.__to_tuple(param))
id = result.lastrowid
self.__close()
self.__DB_CONN.commit()
self.rm_lock()
return id
except Exception as ex:
return "error: " + str(ex)
# 插入数据
def insert(self, pdata):
if not pdata: return False
keys, param = self.__format_pdata(pdata)
return self.add(keys, param)
# 更新数据
def update(self, pdata):
if not pdata: return False
keys, param = self.__format_pdata(pdata)
return self.save(keys, param)
# 构造数据
def __format_pdata(self, pdata):
keys = pdata.keys()
keys_str = ','.join(keys)
param = []
for k in keys: param.append(pdata[k])
return keys_str, tuple(param)
def addAll(self, keys, param):
# 插入数据
#self.write_lock()
self.__GetConn()
self.__DB_CONN.text_factory = str
try:
values = ""
for key in keys.split(','):
values += "?,"
values = values[0:len(values) - 1]
sql = "INSERT INTO " + self.__DB_TABLE + "(" + keys + ") " + "VALUES(" + values + ")"
result = self.__DB_CONN.execute(sql, self.__to_tuple(param))
#self.rm_lock()
return True
except Exception as ex:
return "error: " + str(ex)
def commit(self):
self.__close()
self.__DB_CONN.commit()
def save(self, keys, param):
# 更新数据
self.write_lock()
self.__GetConn()
self.__DB_CONN.text_factory = str
try:
opt = ""
for key in keys.split(','):
opt += key + "=?,"
opt = opt[0:len(opt) - 1]
sql = "UPDATE " + self.__DB_TABLE + " SET " + opt + self.__OPT_WHERE
# 处理拼接WHERE与UPDATE参数
tmp = list(self.__to_tuple(param))
for arg in self.__OPT_PARAM:
tmp.append(arg)
self.__OPT_PARAM = tuple(tmp)
result = self.__DB_CONN.execute(sql, self.__OPT_PARAM)
self.__close()
self.__DB_CONN.commit()
self.rm_lock()
return result.rowcount
except Exception as ex:
return "error: " + str(ex)
def delete(self, id=None):
# 删除数据
self.write_lock()
self.__GetConn()
try:
if id:
self.__OPT_WHERE = " WHERE id=?"
self.__OPT_PARAM = (id,)
sql = "DELETE FROM " + self.__DB_TABLE + self.__OPT_WHERE
result = self.__DB_CONN.execute(sql, self.__OPT_PARAM)
self.__close()
self.__DB_CONN.commit()
self.rm_lock()
return result.rowcount
except Exception as ex:
return "error: " + str(ex)
def execute(self, sql, param=()):
# 执行SQL语句返回受影响行
self.write_lock()
self.__GetConn()
try:
result = self.__DB_CONN.execute(sql, self.__to_tuple(param))
self.__DB_CONN.commit()
self.rm_lock()
return result.rowcount
except Exception as ex:
return "error: " + str(ex)
# 是否有锁
def is_lock(self):
n = 0
while os.path.exists(self.__LOCK):
n += 1
if n > 100:
self.rm_lock()
break
time.sleep(0.01)
# 写锁
def write_lock(self):
self.is_lock()
with open(self.__LOCK, 'wb+') as f:
f.close()
# 解锁
def rm_lock(self):
if os.path.exists(self.__LOCK):
os.remove(self.__LOCK)
def query(self, sql, param=()):
# 执行SQL语句返回数据集
self.__GetConn()
try:
result = self.__DB_CONN.execute(sql, self.__to_tuple(param))
# 将元组转换成列表
data = list(map(list, result))
return data
except Exception as ex:
return "error: " + str(ex)
def create(self, name):
# 创建数据表
self.write_lock()
self.__GetConn()
script = readFile('data/' + name + '.sql')
result = self.__DB_CONN.executescript(script)
self.__DB_CONN.commit()
self.rm_lock()
return result.rowcount
def fofile(self, filename):
# 执行脚本
self.write_lock()
self.__GetConn()
script = readFile(filename)
result = self.__DB_CONN.executescript(script)
self.__DB_CONN.commit()
self.rm_lock()
return result.rowcount
def __close(self):
# 清理条件属性
self.__OPT_WHERE = ""
self.__OPT_FIELD = "*"
self.__OPT_ORDER = ""
self.__OPT_LIMIT = ""
self.__OPT_PARAM = ()
def close(self):
# 释放资源
try:
self.__DB_CONN.close()
self.__DB_CONN = None
except:
pass

Binary file not shown.

View File

@@ -0,0 +1,906 @@
# coding: utf-8
# +-------------------------------------------------------------------
# | YakPanel
# +-------------------------------------------------------------------
# | Copyright (c) 2024-2099 YakPanel(https://www.yakpanel.com) All rights reserved.
# +-------------------------------------------------------------------
# | Author: lkq <lkq@yakpanel.com>
# +-------------------------------------------------------------------
# | Wordpress 安全扫描
# +--------------------------------------------------------------------
import json, os, time
import requests,re,zipfile
proxies = {}
import public
import sys,os
if "/www/server/panel/class_v2/wp_toolkit/" not in sys.path:
sys.path.insert(1, "/www/server/panel/class_v2/wp_toolkit/")
#进入到
from wp_toolkit import totle_db
class wordpress_scan:
wordpress_diff_path = "/www/wordpress_diff_path"
#默认插件的头部信息
plugin_default_headers = {
"Name": "Plugin Name",
"PluginURI": "Plugin URI",
"Version": "Version",
"Description": "Description",
"Author": "Author",
"AuthorURI": "Author URI",
"TextDomain": "Text Domain",
"DomainPath": "Domain Path",
"Network": "Network",
"RequiresWP": "Requires at least",
"RequiresPHP": "Requires PHP",
"UpdateURI": "Update URI",
"RequiresPlugins": "Requires Plugins",
"_sitewide": "Site Wide Only"
}
#默认主题的头部信息
theme_default_headers = {
"Name": "Theme Name",
"Title": "Theme Name",
"Version": "Version",
"Author": "Author",
"AuthorURI": "Author URI",
"UpdateURI": "Update URI",
"Template": "Theme Name",
"Stylesheet": "Theme Name",
}
def check_dir(self):
'''
@name 检查需要的目录是否存在
@auther lkq
@time 2024-10-08
@msg 检查需要的目录是否存在
'''
if not os.path.exists(self.wordpress_diff_path):
os.makedirs(self.wordpress_diff_path)
if not os.path.exists(self.wordpress_diff_path + "/plugin/"):
os.makedirs(self.wordpress_diff_path + "/plugin/")
def M(self, table, db="wordpress_plugin"):
'''
@name 获取数据库对象
@param table 表名
@param db 数据库名
'''
with totle_db.Sql(db) as sql:
return sql.table(table)
def get_wordpress_version(self, path):
'''
@name 获取WordPress版本
@param path WordPress路径
@return dict
'''
wp_version_file = path + "/wp-includes/version.php"
version = {"version": "", "locale": ""}
if not os.path.exists(wp_version_file): return version
with open(wp_version_file, 'r', encoding='utf-8') as file:
file_data = file.read()
match = re.search(r"\$wp_version\s*=\s*\'(.*)\';", file_data)
if match:
version["version"] = match.group(1)
# $wp_local_package
match = re.search(r"\$wp_local_package\s*=\s*\'(.*)\';", file_data)
if match:
version["locale"] = match.group(1)
return version
def get_plugin_data(self, plugin_file, default_headers, context=''):
'''
@参考:/wp-admin/includes/plugin.php get_plugin_data 代码
@name 获取插件信息
@param plugin_file 插件文件
@return dict
@auther lkq
@time 2024-10-08
'''
# 读取文件内容
if not os.path.exists(plugin_file): return {}
# 定义8KB大小
max_length = 8 * 1024 # 8 KB
try:
# 读取文件的前8KB
with open(plugin_file, 'r', encoding='utf-8') as file:
file_data = file.read(max_length)
except Exception as e:
return {}
# 替换CR为LF
file_data = file_data.replace('\r', '\n')
# 处理额外的headers
extra_headers = {}
if context:
extra_context_headers = []
# 假设有一个函数可以获取额外的headers
# extra_context_headers = get_extra_headers(context)
extra_headers = dict.fromkeys(extra_context_headers, '') # 假设额外的headers
all_headers = {**extra_headers, **default_headers}
# 检索所有headers
for field, regex in all_headers.items():
if field.startswith('_'): # 跳过以_开头的内部字段
continue
match = re.search(f'{regex}:(.*)$', file_data, re.IGNORECASE | re.MULTILINE)
if match:
all_headers[field] = match.group(1).strip()
else:
all_headers[field] = ''
if all_headers.get("Network") and not all_headers['Network'] and all_headers['_sitewide']:
all_headers['Network'] = all_headers['_sitewide']
if all_headers.get("Network"):
all_headers['Network'] = 'true' == all_headers['Network'].lower()
if all_headers.get("_sitewide"):
del all_headers['_sitewide']
if all_headers.get("TextDomain") and not all_headers['TextDomain']:
plugin_slug = os.path.dirname(os.path.basename(plugin_file))
if '.' != plugin_slug and '/' not in plugin_slug:
all_headers['TextDomain'] = plugin_slug
all_headers['Title'] = all_headers['Name']
all_headers['AuthorName'] = all_headers['Author']
# 返回插件的信息
return all_headers
def Md5(self,strings):
"""
@name 生成MD5
@author hwliang <hwliang@yakpanel.com>
@param strings 要被处理的字符串
@return string(32)
"""
if type(strings) != bytes:
strings = strings.encode()
import hashlib
m = hashlib.md5()
m.update(strings)
return m.hexdigest()
def FileMd5(self,filename):
"""
@name 生成文件的MD5
@author hwliang <hwliang@yakpanel.com>
@param filename 文件名
@return string(32) or False
"""
if not os.path.isfile(filename): return False
import hashlib
my_hash = hashlib.md5()
f = open(filename, 'rb')
while True:
b = f.read(8096)
if not b:
break
my_hash.update(b)
f.close()
return my_hash.hexdigest()
def get_plugin(self, path,one=''):
'''
@name 获取WordPress插件信息
@param path 插件路径
@return dict
@auther lkq
@time 2024-10-08
'''
plugin_path = path + "/wp-content/plugins"
if not os.path.exists(plugin_path): return {}
tmp_list = []
for file in os.listdir(plugin_path):
if one:
if file!=one:continue
plugin_file = os.path.join(plugin_path, file)
# if os.path.isfile(plugin_file) and plugin_file.endswith(".php"):
# tmp_list.append(file)
if os.path.isdir(plugin_file):
# 读取文件夹中的第一层文件
for file2 in os.listdir(plugin_file):
plugin_file2 = os.path.join(plugin_file, file2)
if os.path.isfile(plugin_file2) and plugin_file2.endswith(".php"): tmp_list.append(
file + "/" + file2)
if len(tmp_list) == 0: return {}
result = {}
for i in tmp_list:
plugin_file = plugin_path + "/" + i
# 判断文件是否可读
if not os.access(plugin_file, os.R_OK): continue
plugin_data = self.get_plugin_data(plugin_file, self.plugin_default_headers)
if not plugin_data: continue
if plugin_data["Name"] == "": continue
#如果 name 中没/ 的话
if "/" not in i:
#则判断一下
if 'wordpress.org/plugins/' in plugin_data["PluginURI"]:
plugin_data["PluginURI"] = plugin_data["PluginURI"].replace('http://wordpress.org/plugins/', '').replace("http://wordpress.org/plugins/","")
#去掉最后的/
if plugin_data["PluginURI"][-1]=="/":
plugin_data["PluginURI"]=plugin_data["PluginURI"][:-1]
i=plugin_data["PluginURI"]
else:
continue
result[i] = plugin_data
return result
def get_themes(self, path):
'''
@name 获取WordPress主题信息
@param path 主题路径
@return dict
@auther lkq
@time 2024-10-08
'''
themes_path = path + "/wp-content/themes"
# 循环目录
if not os.path.exists(themes_path): return {}
tmp_list = []
for file in os.listdir(themes_path):
plugin_file = os.path.join(themes_path, file)
if os.path.isdir(plugin_file):
if os.path.exists(plugin_file + "/style.css"):
tmp_list.append(file)
if len(tmp_list) == 0: return {}
result = {}
for i in tmp_list:
plugin_file = themes_path + "/" + i + "/style.css"
# 判断文件是否可读
if not os.access(plugin_file, os.R_OK): continue
plugin_data = self.get_plugin_data(plugin_file, self.theme_default_headers)
if not plugin_data: continue
if plugin_data["Name"] == "": continue
result[i] = plugin_data
return result
def get_plugins_update(self, path):
'''
@name 获取插件更新
@param path WordPress路径
@auther lkq
@time 2024-10-10
@msg 获取插件更新
'''
plugin_info = self.get_plugin(path)
active = []
url = 'http://api.wordpress.org/plugins/update-check/1.1/'
for i in plugin_info: active.append(i)
plugins = {"plugins": json.dumps({"plugins": plugin_info, "active": active}), "locale": "%5B%22zh_CN%22%5D",
"all": "true", "translations": ""}
headers = {
'User-Agent': "WordPress/6.6.2; http://wp471.com",
'Content-Type': 'application/x-www-form-urlencoded',
}
response = requests.post(url, headers=headers, data=plugins, proxies=proxies)
print(response.text)
def get_themes_update(self, path):
'''
@name 获取主题更新
@param path WordPress路径
@auther lkq
@time 2024-10-10
@msg 获取主题更新
'''
plugin_info = self.get_themes(path)
active = []
url = 'http://api.wordpress.org/themes/update-check/1.1/'
for i in plugin_info: active.append(i)
plugins = {"themes": json.dumps({"themes": plugin_info, "active": active}), "locale": "", "all": "true",
"translations": ""}
headers = {
'User-Agent': "WordPress/6.6.2; http://wp471.com",
'Content-Type': 'application/x-www-form-urlencoded',
}
response = requests.post(url, headers=headers, data=plugins, proxies=proxies)
print(response.text)
def get_wordpress_update(self, path):
'''
@name 获取WordPress更新
@param path WordPress路径
@auther lkq
@time 2024-10-10
@msg 获取WordPress更新
'''
version = self.get_wordpress_version(path)
if version["version"] == "":
return
url = 'http://api.wordpress.org/core/version-check/1.7/?version=' + version["version"] + '&locale=' + version[
"locale"]
headers = {
'User-Agent': "WordPress/6.6.2; http://wp471.com",
}
response = requests.get(url, headers=headers, proxies=proxies)
print(response.text)
def compare_versions(self,version1, version2):
'''
@name 对比版本号
@param version1 版本1
@param version2 版本2
@return int 0 相等 1 大于 -1 小于
'''
# 分割版本号为整数列表
v1 = [int(num) if num.strip() != '' else 0 for num in version1.split('.')]
v2 = [int(num) if num.strip() != '' else 0 for num in version2.split('.')]
# 逐个比较版本号的每个部分
for num1, num2 in zip(v1, v2):
if num1 > num2:
return 1 # version1 > version2
elif num1 < num2:
return -1 # version1 < version2
# 如果所有部分都相同,比较长度(处理像'1.0'和'1.0.0'这样的情况)
if len(v1) > len(v2):
return 1 if any(num > 0 for num in v1[len(v2):]) else 0
elif len(v1) < len(v2):
return -1 if any(num > 0 for num in v2[len(v1):]) else 0
# 如果完全相同
return 0
def let_identify(self,version,vlun_infos):
'''
@name 对比版本号判断是否存在漏洞
@param version 当前版本
@param vlun_infos 漏洞信息
@return list
'''
for i in vlun_infos:
i["vlun_status"] = False
#如果是小于等于的话
if i["let"]=="<=":
if self.compare_versions(version,i["vlun_version"])<=0:
i["vlun_status"]=True
#小于
if i["let"]=="<":
if self.compare_versions(version,i["vlun_version"])<0:
i["vlun_status"]=True
if i['let']=='-':
#从某个版本开始、到某个版本结束
version_list=i["vlun_version"].split("-")
if len(version_list)!=2:continue
if self.compare_versions(version,version_list[0])>=0 and self.compare_versions(version,version_list[1])<=0:
i["vlun_status"]=True
return vlun_infos
def scan(self,path):
'''
@name 扫描WordPress
@param path WordPress路径
@return dict
@auther lkq
@time 2024-10-10
@msg 通过扫描WordPress的版本、插件、主题来判断是否存在漏洞
'''
vlun_list = []
#判断文件是否存在
import os
if not os.path.exists(path):
return vlun_list
result = {}
result["version"] = self.get_wordpress_version(path)
result["plugins"] = self.get_plugin(path)
result["themes"] = self.get_themes(path)
#扫描插件是否存在漏洞
for i in result["plugins"]:
plguin=i.split("/")[0]
Name=result["plugins"][i]["Name"]
#检查插件是否下架了
if self.M("plugin_error","plugin_error").where("slug=?",(plguin,)).count()>0:
error_status = self.M("plugin_error","plugin_error").where("slug=?",(plguin,)).field("error,name,slug,description,closed_date,reason,status").find()
if type(error_status)!=dict:continue
if error_status["status"]==0:
vlun = {"name": "", "vlun_info": "", "css": "", "type": "plugin_closed", "load_version": "","cve": "","time":"","status":0}
#时间格式转为时间戳
if len(error_status["closed_date"])<10:
error_status["closed_date"]=int(time.time())
else:
try:
error_status["closed_date"]=int(time.mktime(time.strptime(error_status["closed_date"], "%Y-%m-%d %H:%M:%S")))
except:
error_status["closed_date"]=int(time.time())
vlun["slug"]=plguin
vlun["name"]=Name
vlun["vlun_info"]=error_status["description"]
vlun["css"]="10"
vlun["load_version"]=result["plugins"][i]["Version"]
vlun["time"]=error_status["closed_date"]
vlun_list.append(vlun)
# continue
#检查插件这个插件是否好久没有更新了
if self.M("wordpress_not_update","wordpress_not_update").where("slug=?",(plguin,)).count()>0:
error_status = self.M("wordpress_not_update", "wordpress_not_update").where("slug=?", (plguin,)).field(
"last_time,status").find()
vlun = {"name": "", "vlun_info": "", "css": "", "type": "plugin_not_update", "load_version": "", "cve": "",
"time": "", "status": 0}
if type(error_status)!=dict:continue
#当前时间减去最后更新时间 计算出年数
if len(str(error_status["last_time"])) > 6:
#当前时间减去最后更新时间 计算出年数
year = int((time.time() - error_status["last_time"]) / 60 / 60 / 24 / 365)
if year >= 10:
vlun["css"] = 10
elif year >= 5:
vlun["css"] = 8
elif year >= 3:
vlun["css"] = 6
vlun["slug"] = plguin
vlun["name"] = Name
vlun["vlun_info"] = "The plugin has not been updated for more than {} years".format(year)
vlun["load_version"] = result["plugins"][i]["Version"]
vlun["time"] = int(time.time())
vlun_list.append(vlun)
if result["plugins"][i]["Version"]=="":continue
#检查插件是否存在漏洞
if self.M("wordpress_vulnerabilities","wordpress_vulnerabilities").where("plugin=?",(plguin,)).count()>0:
vlun_infos=self.M("wordpress_vulnerabilities","wordpress_vulnerabilities").where("plugin=? and types='plugin'",(plguin)).select()
vlun_infos=self.let_identify(result["plugins"][i]["Version"],vlun_infos)
for j2 in vlun_infos:
if j2["vlun_status"]:
vlun = {"name": "", "vlun_info": "", "css": "", "type": "plugin", "load_version": "","cve": "","time":""}
vlun["load_version"]=result["plugins"][i]["Version"]
vlun["cve"]=j2["cve"]
vlun["slug"]=plguin
vlun["name"] = Name
vlun["vlun_info"]=j2["msg"]
vlun["css"]=j2["css"]
vlun["time"] = j2["data_time"]
vlun_list.append(vlun)
#扫描主题是否存在漏洞
for i in result["themes"]:
plguin = i.split("/")[0]
if result["themes"][i]["Version"] == "": continue
Name = result["themes"][i]["Name"]
# 检查插件是否存在漏洞
if self.M("wordpress_vulnerabilities","wordpress_vulnerabilities").where("plugin=? and types='theme'", (plguin,)).count() > 0:
vlun_infos = self.M("wordpress_vulnerabilities","wordpress_vulnerabilities").where("plugin=? and types='theme'", (plguin)).select()
vlun_infos = self.let_identify(result["themes"][i]["Version"], vlun_infos)
for j2 in vlun_infos:
if j2["vlun_status"]:
vlun = {"name": "", "vlun_info": "", "css": "", "type": "theme", "load_version": "","cve": "","time":""}
vlun["load_version"] = result["themes"][i]["Version"]
vlun["cve"] = j2["cve"]
vlun["slug"] = plguin
vlun["name"] = Name
vlun["vlun_info"] = j2["msg"]
vlun["css"] = j2["css"]
vlun["time"] = j2["data_time"]
vlun_list.append(vlun)
#检查WordPress是否存在漏洞 #使用date_time 排序取最新的15个版本
if len(result["version"]["version"])>=1 and self.M("wordpress_vulnerabilities", "wordpress_vulnerabilities").where("types=?",("core")).count() > 0:
#取10个版本
vlun_infos = self.M("wordpress_vulnerabilities","wordpress_vulnerabilities").where("types=?",("core")).order("data_time desc").limit("15").select()
vlun_infos = self.let_identify(result["version"]["version"], vlun_infos)
for j2 in vlun_infos:
if j2["vlun_status"]:
vlun = {"name": "", "vlun_info": "", "css": "", "type": "core", "load_version": "","cve": "","time":""}
vlun["load_version"] = result["version"]["version"]
vlun["cve"] = j2["cve"]
vlun["slug"] = "WordPress"
vlun["name"] = "WordPress"
vlun["vlun_info"] = j2["msg"]
vlun["css"] = j2["css"]
vlun["time"]=j2["data_time"]
vlun_list.append(vlun)
#忽略
ignore_path = "/www/server/panel/data/wordpress_ignore_vuln.json"
if os.path.exists(ignore_path):
try:
ignore_infos = json.loads(public.readFile(ignore_path))
except:
ignore_infos = {}
else:
ignore_infos = {}
if path in ignore_infos:
for i in ignore_infos[path]:
for i2 in vlun_list:
if i["type"]=="plugin_closed":
if i["slug"]==i2["slug"]:
vlun_list.remove(i2)
if i["type"]=="plugin_not_update":
if i["slug"]==i2["slug"]:
vlun_list.remove(i2)
if i["slug"]==i2["slug"] and i["name"]==i2["name"] and i["vlun_info"]==i2["vlun_info"] and i["css"]==i2["css"] and i["type"]==i2["type"] and i["cve"]==i2["cve"] and i["time"]==i2["time"]:
vlun_list.remove(i2)
#更新到文件中
wordpress_scan_path = "/www/server/panel/data/wordpress_wp_scan.json"
status= {"last_time": int(time.time()), "vulnerabilities": len(vlun_list), "status": True}
import os
if os.path.exists(wordpress_scan_path):
try:
wordpress_scan_info = json.loads(public.readFile(wordpress_scan_path))
except:
wordpress_scan_info = {}
else:
wordpress_scan_info = {}
if path not in wordpress_scan_info:
wordpress_scan_info[path]=status
else:
wordpress_scan_info[path]["last_time"]=int(time.time())
wordpress_scan_info[path]["vulnerabilities"]=len(vlun_list)
public.WriteFile(wordpress_scan_path,json.dumps(wordpress_scan_info))
return vlun_list
def ignore_vuln(self,get):
'''
@name 增加忽略漏洞
@param slug 插件slug
@param path 插件路径
@auther lkq
@time 2024-10-10
@msg 增加忽略漏洞
'''
if 'path' not in get: return public.return_message(-1,0,public.lang("Parameter error"))
if 'name' not in get: return public.return_message(-1,0,public.lang("Parameter error"))
if 'vlun_info' not in get: return public.return_message(-1,0,public.lang("Parameter error"))
if 'css' not in get: return public.return_message(-1,0,public.lang("Parameter error"))
if 'type' not in get: return public.return_message(-1,0,public.lang("Parameter error"))
if 'cve' not in get: return public.return_message(-1,0,public.lang("Parameter error"))
if 'time' not in get: return public.return_message(-1, 0, public.lang("Parameter error"))
if 'slug' not in get: return public.return_message(-1, 0, public.lang("Parameter error"))
if 'ignore_type' not in get: return public.return_message(-1, 0, public.lang("Parameter error"))
path = get['path']
name = get['name']
vlun_info = get['vlun_info']
css = float(get['css'])
type = get['type']
cve = get['cve']
time = int(get['time'])
slug = get['slug']
ignore_type=get['ignore_type']
#忽略的列表路径path
ignore_path ="/www/server/panel/data/wordpress_ignore_vuln.json"
if os.path.exists(ignore_path):
try:
ignore_infos=json.loads(public.readFile(ignore_path))
except:
ignore_infos={}
else:
ignore_infos={}
if ignore_type=="add":
if path not in ignore_infos:
ignore_infos[path]=[]
ignore={"name":name,"vlun_info":vlun_info,"css":css,"type":type,"cve":cve,"time":time,"slug":slug}
if ignore not in ignore_infos[path]:
ignore_infos[path].append(ignore)
public.writeFile(ignore_path,json.dumps(ignore_infos))
return public.return_message(0,0,public.lang("Added successfully"))
if ignore_type=="del":
if path not in ignore_infos:
return public.return_message(-1,0,public.lang("No data found"))
ignore={"name":name,"vlun_info":vlun_info,"css":css,"type":type,"cve":cve,"time":time,"slug":slug}
if ignore in ignore_infos[path]:
ignore_infos[path].remove(ignore)
public.writeFile(ignore_path,json.dumps(ignore_infos))
return public.return_message(0,0,public.lang("Deleted successfully"))
def get_ignore_vuln(self,get):
'''
@name 获取忽略的漏洞
@param path 插件路径
@auther lkq
@time 2024-10-10
@msg 获取忽略的漏洞
'''
#如果不传递参数就返回所有
ignore_path = "/www/server/panel/data/wordpress_ignore_vuln.json"
if os.path.exists(ignore_path):
try:
ignore_infos = json.loads(public.readFile(ignore_path))
except:
ignore_infos = {}
else:
ignore_infos = {}
if 'path' in get:
path = get['path']
if path not in ignore_infos:
return public.return_message(0,0,[])
else:
return public.return_message(0,0,ignore_infos[path])
else:
return public.return_message(0, 0, [])
def download_file_with_progress(self,url, filename,slug,re=False):
'''
@name 下载插件的文件
@param url 下载地址
@param filename 文件名
@param slug 插件slug
@param re 是否重试
'''
header = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
}
with requests.get(url, stream=True, headers=header, proxies=proxies, timeout=20) as response:
#判断状态码
if response.status_code!=200:
# print("Download failed, status code:",response.status_code," URL:",url)
if not re:
self.download_file_with_progress("https://downloads.wordpress.org/plugin/"+slug+".zip", filename,slug,re=True)
return
total_length = response.headers.get('content-length')
if total_length is None: # 无法获取文件大小
response.raise_for_status()
with open(filename, 'wb') as file:
for chunk in response.iter_content(chunk_size=65536):
if chunk: # 过滤掉保活新块
file.write(chunk)
file.flush()
else:
total_length = int(total_length)
if total_length > 1024 * 1024 * 20:
print("\nSkip files larger than 20M ", slug)
return
downloaded = 0
with open(filename, 'wb') as file:
for chunk in response.iter_content(chunk_size=65536):
file.write(chunk)
file.flush()
downloaded += len(chunk)
# 计算下载进度
done = int(50 * downloaded / total_length)
if total_length < 1024 * 1024:
total_length_mb = total_length / 1024
# 已下载大小转为KB
downloaded_mb = downloaded / 1024
infos = "{:.2f}KB/{:.2f}KB slug:{}".format(downloaded_mb, total_length_mb,slug)
else:
total_length_mb = total_length / (1024 * 1024)
downloaded_mb = downloaded / (1024 * 1024)
infos = "{:.2f}MB/{:.2f}MB slug:{}".format(downloaded_mb, total_length_mb,slug)
print("\r[{}{}] {:.2f}% file_size:{} ".format('' * done, '.' * (50 - done),100 * downloaded / total_length,infos,), end='')
# 判断文件是否下载完整
if total_length != os.path.getsize(filename):
os.remove(filename)
print("\nThe downloaded file is incomplete and has been deleted. The file is currently being re downloaded", filename)
self.download_file_with_progress(url, filename,slug)
def zip_file_plugin_data(self,file_data, default_headers, context=''):
'''
@参考:/wp-admin/includes/plugin.php get_plugin_data 代码
@name 通过ZIP文件获取插件信息
@param plugin_file 插件文件
@return dict
@auther lkq
@time 2024-10-08
'''
file_data = file_data.replace('\r', '\n')
# 处理额外的headers
extra_headers = {}
if context:
extra_context_headers = []
extra_headers = dict.fromkeys(extra_context_headers, '') # 假设额外的headers
all_headers = {**extra_headers, **default_headers}
# 检索所有headers
for field, regex in all_headers.items():
if field.startswith('_'): # 跳过以_开头的内部字段
continue
match = re.search(f'{regex}:(.*)$', file_data, re.IGNORECASE | re.MULTILINE)
if match:
all_headers[field] = match.group(1).strip()
else:
all_headers[field] = ''
if all_headers.get("Network") and not all_headers['Network'] and all_headers['_sitewide']:
all_headers['Network'] = all_headers['_sitewide']
if all_headers.get("Network"):
all_headers['Network'] = 'true' == all_headers['Network'].lower()
if all_headers.get("_sitewide"):
del all_headers['_sitewide']
all_headers['Title'] = all_headers['Name']
all_headers['AuthorName'] = all_headers['Author']
# 返回插件的信息
return all_headers
def check_plugin(self,path,plugin_info):
'''
@name 检查所有的插件是否被修改过、或者新增了文件
@param path WordPress路径
@param plugin_info 插件信息
@return dict
@auther lkq
@time 2024-10-10
'''
self.check_dir()
for i in plugin_info:
slug=i.split("/")[0]
version=plugin_info[i]["Version"]
if version == "": continue
plugin_file = self.wordpress_diff_path + "/plugin/" + slug+"."+version+".zip"
if not os.path.exists(plugin_file):
self.download_file_with_progress("https://downloads.wordpress.org/plugin/"+slug+"."+version+".zip", plugin_file,slug)
else:
try:
zipfile.ZipFile(plugin_file)
print("压缩包文件已经存在、且可正常读取文件、正在跳过", plugin_file)
except:
print("The zip file cannot be opened, it is being deleted and re downloaded",slug+"."+version+".zip")
os.remove(plugin_file)
self.download_file_with_progress("https://downloads.wordpress.org/plugin/"+slug+"."+version+".zip", plugin_file,slug)
plugin_file_list={}
#获取所有插件的文件
for i in plugin_info:
slug = i.split("/")[0]
version=plugin_info[i]["Version"]
if version == "": continue
plugin_file = self.wordpress_diff_path + "/plugin/" + slug + "." + version + ".zip"
if not os.path.exists(plugin_file): continue
plugin_file_list[slug] = {}
plugin_path=path+"/wp-content/plugins/"+slug
#判断文件是否存在
if not os.path.exists(plugin_path): continue
#遍历目录下所有的PHP文件
for root, dirs, files in os.walk(plugin_path):
for file in files:
if file.endswith('.php'):
file_path = os.path.join(root, file)
plugin_file_list[slug][slug+"/"+file_path.replace(plugin_path + "/", "")] = self.FileMd5(file_path)
#读取压缩包中的文件版本是否和本地的版本一致
with zipfile.ZipFile(plugin_file, 'r') as zip_file:
#查找i 的文件
if i in zip_file.namelist():
#读取文件内容
with zip_file.open(i) as file:
file_data = file.read()
# #获取插件信息
plugin_data = self.zip_file_plugin_data(file_data.decode("utf-8"), self.plugin_default_headers)
if not plugin_data: continue
if plugin_data["Name"] == "": continue
#判断版本是否一致
if plugin_data["Version"]!=version:
print("版本不一致、跳过")
continue
#对比MD5
for file_name in plugin_file_list[slug]:
if file_name in zip_file.namelist():
with zip_file.open(file_name) as file:
file_data = file.read()
if self.Md5(file_data)!=plugin_file_list[slug][file_name]:
print("文件已经被修改、MD5于云端文件不一致",file_name)
else:
print("文件异常、原版压缩包中不存在该文件",file_name)
def check_all_plugin(self,path):
'''
@name 检查所有的插件是否被修改过、或者新增了文件
:param path:
:return:
'''
return self.check_plugin(path,self.get_plugin(path))
def one_check_plugin(self,path,slug):
'''
@name 检查单个插件是否被修改过、或者新增了文件
@param path WordPress路径
@return dict
@auther lkq
@time 2024-10-10
'''
plugin_file_path = path + "/wp-content/plugins/" + slug
if not os.path.exists(plugin_file_path):
return []
#获取所在插件的信息
plugin_info = self.get_plugin(path,slug)
if len(plugin_info)==0:
return []
self.check_plugin(path, plugin_info)
def get_vlu_time(self):
'''
@name 获取漏洞库更新时间
@param get:
:param get:
:return:
'''
date_time=self.M("wordpress_vulnerabilities", "wordpress_vulnerabilities").order("data_time desc").limit("1").field("data_time").find()
#转为2024-10-10
if date_time:
date_time=time.strftime("%Y-%m-%d", time.localtime(date_time["data_time"]))
return date_time
else:
#今天的日期
return time.strftime("%Y-%m-%d", time.localtime(time.time()))
def auto_scan(self):
'''
@name 自动扫描 每天扫描一次 每个网站延迟1S
'''
site_infos=public.M("sites").where("project_type=?",("WP2")).select()
#如果没有站点的话
if len(site_infos)==0:
return
#自动扫描的配置文件
wordpress_scan_path="/www/server/panel/data/wordpress_wp_scan.json"
if not os.path.exists(wordpress_scan_path):
wordpress_wp_scan={}
else:
try:
wordpress_wp_scan=json.loads(public.ReadFile(wordpress_scan_path))
except:
wordpress_wp_scan={}
for i in site_infos:
if i["path"] not in wordpress_wp_scan:
wordpress_wp_scan[i["path"]]={"last_time":0,"vulnerabilities":0,"status":True}
if not i["status"]:
continue
#获取上次扫描的时间
last_time=wordpress_wp_scan[i["path"]]["last_time"]
#判断有没有超过一天
if time.time()-last_time<43200:
continue
#获取站点的路径
path=i["path"]
time.sleep(1)
#扫描站点
try:
vlun_list=self.scan(path)
except:
continue
wordpress_wp_scan[i["path"]]["last_time"]=int(time.time())
wordpress_wp_scan[i["path"]]["vulnerabilities"]=len(vlun_list)
public.WriteFile(wordpress_scan_path,json.dumps(wordpress_wp_scan))
def set_auth_scan(self,path):
'''
@name 停止扫描
@param path WordPress路径
@auther lkq
@time 2024-10-10
@msg 停止扫描
'''
wordpress_scan_path = "/www/server/panel/data/wordpress_wp_scan.json"
flag=False
if os.path.exists(wordpress_scan_path):
try:
wordpress_scan_info = json.loads(public.readFile(wordpress_scan_path))
except:
wordpress_scan_info = {}
else:
wordpress_scan_info = {}
if path in wordpress_scan_info:
if wordpress_scan_info[path]["status"]:
wordpress_scan_info[path]["status"]=False
else:
wordpress_scan_info[path]["status"]=True
flag=True
else:
wordpress_scan_info[path]={"last_time":0,"vulnerabilities":0,"status":False}
public.WriteFile(wordpress_scan_path,json.dumps(wordpress_scan_info))
if flag:
return public.return_message(0,0,public.lang("Started successfully"))
return public.return_message(0,0,public.lang("Stopped successfully"))
def get_auth_scan_status(self,path):
'''
@name 获取扫描状态
@param path WordPress路径
@auther lkq
@time 2024-10-10
@msg 获取扫描状态
'''
wordpress_scan_path = "/www/server/panel/data/wordpress_wp_scan.json"
if os.path.exists(wordpress_scan_path):
try:
wordpress_scan_info = json.loads(public.readFile(wordpress_scan_path))
except:
wordpress_scan_info = {}
else:
wordpress_scan_info = {}
if path in wordpress_scan_info:
return public.return_message(0,0,wordpress_scan_info[path]["status"])
return public.return_message(0,0,True)

Binary file not shown.