#!/usr/bin/python # coding: utf-8 # Date 2025/11/04 # ------------------------------------------------------------------- # YakPanel # ------------------------------------------------------------------- # Copyright (c) 2015-2099 YakPanel(https://www.yakpanel.com) All rights reserved. # ------------------------------------------------------------------- # Author: wpl # 网站安全基础扫描模块 # ------------------------------------------------------------------- import re import sys, json, os, public, hashlib, requests, time import urllib.parse from YakPanel import cache import PluginLoader class main: __count = 0 __shell = "/www/server/panel/data/webbasic_shell_check.txt" session = requests.Session() send_time = "" # 记录上一次发送ws时间 web_name = "" # 当前检测的网站名 scan_type = "basicvulscan" web_scan_num = 0 bar = 0 # 新增:全局进度属性,按站点×模块综合计算 _total_units = 0 _done_units = 0 _module_count_per_site = 0 # 新增:标记是否处于全站扫描上下文,避免跨次扫描累加 _in_all_scan = False # 添加计数器 risk_count = { "warning": 0, # 告警(0) "low": 0, # 低危 (1) "middle": 0, # 中危 (2) "high": 0 # 高危 (3) } web_count_list = [] def GetWebInfo(self, get): ''' @name 获取网站信息 @author wpl<2025-11-4> @param name 网站名称 @return dict 网站信息 ''' webinfo = public.M('sites').where('project_type=? and name=?', ('PHP', get.name)).count() if not webinfo: return False webinfo = public.M('sites').where('project_type=? and name=?', ('PHP', get.name)).select() return webinfo[0] def GetAllSite(self, get): ''' @name 获取所有网站信息 @author wpl<2025-11-4> @return list 所有网站信息 ''' webinfo = public.M('sites').where('project_type=?', ('PHP',)).select() return webinfo def WebConfigSecurity(self, webinfo, get): ''' @name 网站配置安全性检测 @author wpl<2025-11-4> @param webinfo 网站信息 @param get 请求参数 @return list 检测结果 ''' if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "Scanning website configuration security for %s" % get.name, "type": "webscan", "bar": self.bar })) result = [] # Nginx版本泄露检测 if public.get_webserver() == 'nginx': nginx_path = '/www/server/nginx/conf/nginx.conf' if os.path.exists(nginx_path): nginx_info = public.ReadFile(nginx_path) if not 'server_tokens off' in nginx_info: result.append({ "name": public.lang("{} website has Nginx version disclosure",get.name) , "info": public.lang("Nginx version disclosure may expose sensitive server information and pose security risks"), "repair": public.lang("Open the nginx.conf file for {} website and add 'server_tokens off;' inside the http { } block;", get.name), "dangerous": 1, "type": "webscan" }) # PHP版本泄露检测 phpversion = public.get_site_php_version(get.name) phpini = '/www/server/php/%s/etc/php.ini' % phpversion if os.path.exists(phpini): php_info = public.ReadFile(phpini) if not 'expose_php = Off' in php_info: result.append({ "name": public.lang("{} website has PHP version disclosure",get.name), "info": public.lang("PHP version disclosure may expose sensitive server information and pose security risks"), "repair": public.lang("Open the php.ini file for {} website and set 'expose_php = Off'",get.name), "dangerous": 1, "type": "webscan" }) # 防火墙检测 # 跳过ols if public.get_webserver() != 'openlitespeed': if not os.path.exists("/www/server/btwaf/"): result.append({ "name": public.lang("Firewall not installed for {} website",get.name), "info": public.lang("Not having a firewall installed may expose sensitive server information and pose security risks"), "repair": public.lang("Install or enable the Nginx WAF"), "dangerous": 0, "type": "webscan" }) # 防跨站攻击检测 web_infos = public.M('sites').where("name=?", (get.name, )).select() for web in web_infos: run_path = self.GetSiteRunPath(web["name"], web["path"]) if not run_path: continue path = web["path"] + run_path user_ini_file = path + '/.user.ini' if not os.path.exists(user_ini_file): continue user_ini_conf = public.readFile(user_ini_file) if "open_basedir" not in user_ini_conf: result.append({ "name": public.lang("Cross-site attack protection not enabled for {} website",get.name), "info": public.lang("Without cross-site attack protection, sensitive server data may be exposed via directory traversal"), "repair": public.lang("Enable open_basedir protection in the website directory to prevent attackers from reading sensitive files across directories"), "dangerous": 0, "type": "webscan" }) # SSL证书安全性检测 self.WebSSLSecurity(webinfo, get, result) if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "Completed scanning website configuration security for %s" % get.name, "type": "webscan", "results": result, "bar": self.bar })) return result def _read_recent_logs(self, log_path, lines=10000): """ 读取最近N行日志并进行初步过滤 @param log_path 日志文件路径 @param lines 读取行数 @return 过滤后的日志内容 """ try: # 只读取状态码为200,301,302,403,404,500的请求,过滤掉静态资源请求 cmd = f"tail -n {lines} '{log_path}' | grep -E ' (200|301|302|403|404|500) ' | grep -v -E '\\.(css|js|png|jpg|jpeg|gif|ico|woff|woff2|ttf|svg)( |\\?|$)'" result = public.ExecShell(cmd)[0] # 过滤掉空行 result = [line for line in result.split('\n') if line.strip()] return result except Exception as e: # 如果grep失败,直接读取原始日志 cmd = f"tail -n {lines} '{log_path}'" return public.ExecShell(cmd)[0] def _analyze_attack_distribution(self, log_content, security_patterns): """ 分析各类攻击的数量和分布 @param log_content 日志内容 @param security_patterns 安全检测规则 @return 攻击统计信息 """ attack_stats = {} for attack_type, pattern_info in security_patterns.items(): attack_stats[attack_type] = { 'count': 0, 'sample_ips': [], 'sample_urls': [] } for line in log_content.split('\n'): if line.strip(): ip = None req_url = None ref_url = None try: parts = line.split() if parts: ip = parts[0] except: pass try: m_req = re.search(r"\"?(GET|POST|HEAD|PUT|DELETE|OPTIONS|PATCH)\s+(.*?)\s+HTTP\/[0-9.]+\"?", line) if m_req: req_url = m_req.group(2) except: pass try: qs = re.findall(r'"([^\"]*)"', line) if len(qs) >= 2: ref_full = qs[1] if ref_full and ref_full != '-': try: parsed = urllib.parse.urlparse(ref_full) ref_url = (parsed.path or '/') + (('?' + parsed.query) if parsed.query else '') except: ref_url = ref_full except: pass def field_matches(s): if not s: return False s_low = s.lower() try: dec = urllib.parse.unquote(s_low) except: dec = s_low for p in pattern_info['patterns']: pl = p.lower() if pl in s_low or (dec and pl in dec): return True return False matched = False if field_matches(req_url): matched = True url = req_url elif field_matches(ref_url): matched = True url = ref_url if matched: attack_stats[attack_type]['count'] += 1 try: if ip and ip not in attack_stats[attack_type]['sample_ips']: attack_stats[attack_type]['sample_ips'].append(ip) except: pass try: if url and url not in attack_stats[attack_type]['sample_urls']: attack_stats[attack_type]['sample_urls'].append(url) except: pass return attack_stats def _analyze_ip_frequency(self, log_content): """ 分析IP访问频率,返回访问次数统计 @param log_content 日志内容 @return IP访问频率统计 """ ip_stats = {} for line in log_content.split('\n'): if line.strip(): try: ip = line.split()[0] ip_stats[ip] = ip_stats.get(ip, 0) + 1 except: continue # 返回访问次数排序的结果 return sorted(ip_stats.items(), key=lambda x: x[1], reverse=True) def _analyze_url_attacks(self, log_content, security_patterns): """ 分析被攻击的URL统计 @param log_content 日志内容 @param security_patterns 安全检测规则 @return 被攻击URL统计 """ url_attacks = {} # 收集所有攻击模式 all_patterns = [] for pattern_info in security_patterns.values(): all_patterns.extend(pattern_info['patterns']) for line in log_content.split('\n'): if line.strip(): try: parts = line.split() if len(parts) >= 7: url = parts[6] line_lower = line.lower() # 检查是否包含攻击模式 for pattern in all_patterns: if pattern.lower() in line_lower: url_attacks[url] = url_attacks.get(url, 0) + 1 break except: continue # 返回攻击次数排序的结果 return sorted(url_attacks.items(), key=lambda x: x[1], reverse=True) def WebSSLSecurity(self, webinfo, get, result): ''' @name SSL证书安全性检测 @author wpl<2025-11-4> @param webinfo 网站信息 @param get 请求参数 @param result 结果列表 ''' if public.get_webserver() == 'nginx': conf_file = '/www/server/panel/vhost/nginx/{}.conf'.format(webinfo['name']) if os.path.exists(conf_file): conf_info = public.ReadFile(conf_file) keyText = 'ssl_certificate' if conf_info.find(keyText) == -1: result.append({ "name": public.lang("{} website has SSL disabled",webinfo['name']), "info": public.lang("SSL is not deployed may expose sensitive server information and pose security risks"), "repair": public.lang("Deploy SSL in Website → SSL"), "dangerous": 0, "type": "webscan" }) # 暂未实现 TLSv1 检测 # if 'TLSv1 ' in conf_info: # result.append({ # "name": "%s website has insecure TLSv1 protocol enabled" % webinfo['name'], # "info": "Enabling the insecure TLSv1 protocol may expose sensitive server information and pose security risks", # "repair": "In Website Settings, go to Advanced Settings → TLS Settings and disable TLSv1", # "dangerous": 2, # "type": "webscan" # }) def WebFileLeakDetection(self, webinfo, get): ''' @name 文件泄露检测 @author wpl<2025-11-4> @param webinfo 网站信息 @param get 请求参数 @return list 检测结果 ''' if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "Scanning %s for file leaks" % get.name, "type": "fileleak", "bar": self.bar })) result = [] site_path = webinfo['path'] # 检测敏感文件 sensitive_files = ['.env', '.git', '.svn', '.DS_Store'] for filename in sensitive_files: file_path = os.path.join(site_path, filename) if os.path.exists(file_path): result.append({ "name": public.lang("Sensitive file detected on {} website",webinfo['name']), "info": public.lang("Sensitive file 【{}】exposed — may leak server-sensitive information and pose security risks",filename), "repair": public.lang("Delete or move the sensitive file outside the website root directory"), "dangerous": 2, "type": "fileleak", "file_path": file_path }) # 检测SQL文件 只检测网站根目录下的SQL文件 # 只检测网站根目录下的SQL文件 try: files = os.listdir(site_path) for file in files: if file.endswith('.sql'): file_path = os.path.join(site_path, file) result.append({ "name": public.lang("SQL database file detected on {} website",webinfo['name']), "info": public.lang("SQL database file 【{}】exposed — may leak server-sensitive information and pose security risks",file), "repair": public.lang("Delete or move the SQL file outside the website root directory"), "dangerous": 3, "type": "fileleak", "file_path": file_path }) except Exception as e: # 如果无法访问目录,记录错误但不中断扫描 pass if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "File leak scan for %s completed, found %d issues" % (get.name, len(result)), "results": result, "type": "fileleak", "bar": self.bar })) return result def WebRootTrojanDetection(self, webinfo, get): ''' @name 木马检测 @author wpl<2025-11-5> @param webinfo 网站信息 @param get 请求参数 @return list 检测结果 ''' if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "Scanning %s root directory for trojan files" % get.name, "type": "webshell", "bar": self.bar })) result = [] self.__count = 0 # 仅列出网站根目录中的文件 base_path = webinfo.get('path') if isinstance(webinfo, dict) else None if not base_path or not os.path.isdir(base_path): return result try: entries = os.listdir(base_path) except Exception: entries = [] file_list = [] for name in entries: fp = os.path.join(base_path, name) # 仅扫描php文件 if os.path.isfile(fp) and name.lower().endswith('.php'): file_list.append(fp) if not file_list: if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "No files to scan found in %s root directory" % get.name, "type": "webshell", "bar": self.bar })) return result self.__count = len(file_list) # 本地正则匹配检测 rules = [ "@\\$\\_=", "eval\\(('|\")\\?>", "php_valueauto_append_file", "eval\\(gzinflate\\(", "eval\\(str_rot13\\(", "base64\\_decode\\(\\$\\_", "eval\\(gzuncompress\\(", "phpjm\\.net", "assert\\(('|\"|\\s*)\\$", "require_once\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "gzinflate\\(base64_decode\\(", "echo\\(file_get_contents\\(('|\")\\$_(POST|GET|REQUEST|COOKIE)", "c99shell", "cmd\\.php", "call_user_func\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "str_rot13", "webshell", "EgY_SpIdEr", "tools88\\.com", "SECFORCE", "eval\\(base64_decode\\(", "include\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "array_map[\\s]{0,20}\\(.{1,5}(eval|assert|ass\\\\x65rt).{1,20}\\$_(GET|POST|REQUEST).{0,15}", "call_user_func[\\s]{0,25}\\(.{0,25}\\$_(GET|POST|REQUEST).{0,15}", "gzdeflate|gzcompress|gzencode", "require_once\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "include_once\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "call_user_func\\((\"|')assert(\"|')", "php_valueauto_prepend_file", "SetHandlerapplication\\/x-httpd-php", "fputs\\(fopen\\((.+),('|'\")w('|'\")\\),('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)\\[", "file_put_contents\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)\\[([^\\]]+)\\],('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "\\$_(POST|GET|REQUEST|COOKIE)\\[([^\\]]+)\\]\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)\\[", "require\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "assert\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "eval\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "base64_decode\\(gzuncompress\\(", "gzuncompress\\(base64_decode\\(", "ies\",gzuncompress\\(\\$", "eval\\(gzdecode\\(", "preg_replace\\(\"\\/\\.\\*\\/e\"", "Scanners", "phpspy", "cha88\\.cn", "chr\\((\\d)+\\)\\.chr\\((\\d)+\\)", "\\$\\_=\\$\\_", "\\$(\\w)+\\(\\${", "\\(array\\)\\$_(POST|GET|REQUEST|COOKIE)", "\\$(\\w)+\\(\"\\/(\\S)+\\/e", "\"e\"\\.\"v\"\\.\"a\"\\.\"l\"", "\"e\"\\.\"v\"\\.\"a\"\\.\"l\"", "'e'\\.'v'\\.'a'\\.'l'", "@preg\\_replace\\((\")*\\/(\\S)*\\/e(\")*,\\$_POST\\[\\S*\\]", "\\${'\\_'", "@\\$\\_\\(\\$\\_", "\\$\\_=\"\"" ] patterns = [re.compile(p, re.IGNORECASE) for p in rules] shell_files = [] for fp in file_list: try: with open(fp, 'rb') as f: data = f.read() try: text = data.decode('utf-8', errors='ignore') except Exception: text = data.decode('latin-1', errors='ignore') for pat in patterns: if pat.search(text): shell_files.append(fp) break except Exception: continue for shell_file in shell_files: result.append({ "name": public.lang("Trojan file detected in {} website root directory",webinfo.get('name', get.name)), "info": public.lang("Trojan file 【{}】detected — may expose sensitive server information and pose security risks",shell_file), "repair": public.lang("Delete the trojan file or perform a security audit"), "dangerous": 3, "type": "webshell", "file_path": shell_file }) if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "Trojan scan for %s root directory completed, found %d trojans" % (get.name, len(result)), "results": result, "type": "webshell", "bar": self.bar })) return result def WebBackupFileDetection(self, webinfo, get): ''' @name 备份文件检测 @author wpl<2025-11-4> @param webinfo 网站信息 @param get 请求参数 @return list 检测结果 ''' if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "Scanning %s for backup files" % get.name, "type": "backup", "bar": self.bar })) result = [] site_path = webinfo['path'] # 备份文件扩展名 backup_extensions = ['.bak', '.backup', '.zip', '.rar', '.tar', '.gz', '.7z'] # 只扫描站点根目录是否存在备份文件(不遍历子目录) if os.path.exists(site_path): try: files = os.listdir(site_path) for file in files: file_path = os.path.join(site_path, file) # 只检查文件,跳过目录 if os.path.isfile(file_path): file_lower = file.lower() for ext in backup_extensions: if file_lower.endswith(ext): result.append({ "name": public.lang("Backup file detected on {} website", webinfo['name']), "info": public.lang("Backup file 【{}】exposed — may leak sensitive server information and pose security risks",file), "repair": public.lang("Delete the backup file or move it to a secure location"), "dangerous": 2, "type": "backup", "file_path": file_path }) break except Exception as e: # 如果无法访问目录,记录错误但不中断扫描 pass if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "Backup file scan for %s completed, found %d backup files" % (get.name, len(result)), "results": result, "type": "backup", "bar": self.bar })) return result def WebWeakPasswordDetection(self, webinfo, get): ''' @name 弱口令检测(数据库与FTP) @author wpl<2025-11-4> @param webinfo 网站信息 @param get 请求参数 @return list 检测结果 ''' if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "Scanning %s for weak passwords" % get.name, "type": "weakpass", "bar": self.bar })) result = [] # 读取弱口令字典 weekpassfile = "/www/server/panel/config/weak_pass.txt" pass_list = [] if os.path.exists(weekpassfile): try: pass_info = public.ReadFile(weekpassfile) pass_list = [p.strip() for p in pass_info.split('\n') if p.strip()] except: pass # 获取站点ID web_id = None try: if isinstance(webinfo, dict): web_id = webinfo.get('id') except: web_id = None # 数据库弱口令检测 if pass_list and web_id: try: database = public.M('databases').where("pid=?", (web_id,)).select() if isinstance(database, list): for dbinfo in database: pwd = dbinfo.get('password') if not pwd: continue if pwd in pass_list: dbname = dbinfo.get('name', '') # 密码脱敏 if hasattr(self, 'short_passwd'): masked = self.short_passwd(pwd) else: plen = len(pwd) masked = (pwd[:2] + "**" + pwd[-2:]) if plen > 4 else ((pwd[:1] + "****" + pwd[-1]) if 1 < plen <= 4 else "******") result.append({ "name": public.lang("{} website database uses a weak password",webinfo.get('name', '')), "info": public.lang("Website %s database 【{}】uses a weak password: {}",webinfo.get('name', ''), dbname, masked), "repair": public.lang("It is recommended to change the database user password in the panel to prevent brute-force attacks and data theft"), "dangerous": 1, "type": "weakpass" }) except: pass # FTP弱口令检测 if pass_list and web_id: try: ftps = public.M('ftps').where("pid=?", (web_id,)).select() if isinstance(ftps, list): for ftpinfo in ftps: pwd = ftpinfo.get('password') if not pwd: continue if pwd in pass_list: ftpname = ftpinfo.get('name', '') if hasattr(self, 'short_passwd'): masked = self.short_passwd(pwd) else: plen = len(pwd) masked = (pwd[:2] + "**" + pwd[-2:]) if plen > 4 else ((pwd[:1] + "****" + pwd[-1]) if 1 < plen <= 4 else "******") result.append({ "name": public.lang("{} website FTP account uses a weak password",webinfo.get('name', '')), "info": public.lang("Website {} FTP account 【{}】uses a weak password: {}",webinfo.get('name', ''), ftpname, masked), "repair": public.lang("Change the weak password immediately to prevent attackers from brute-forcing FTP credentials and tampering with website files"), "dangerous": 2, "type": "weakpass" }) except: pass if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "Weak password scan for %s completed, found %d issues" % (get.name, len(result)), "results": result, "type": "weakpass", "bar": self.bar })) return result def WebLogDetection(self, webinfo, get): ''' @name 网站日志检测 @author wpl<2025-11-4> @param webinfo 网站信息 @param get 请求参数 @return list 检测结果 ''' if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "Scanning %s website logs" % get.name, "type": "weblog", "bar": self.bar })) result = [] # 安全检测规则定义 security_patterns = { 'xss': { 'patterns': ['javascript:', 'data:', 'alert(', 'onerror=', 'onload=', 'onclick=', '%3Cscript', '%3Csvg/', '%3Ciframe/', '