Files
yakpanel-core/class_v2/safeModelV2/serversafeModel.py
2026-04-07 02:04:22 +05:30

394 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# coding: utf-8
# -------------------------------------------------------------------
# YakPanel
# -------------------------------------------------------------------
# Copyright (c) 2014-2099 YakPanel(www.yakpanel.com) All rights reserved.
# -------------------------------------------------------------------
# Author: yakpanel
# -------------------------------------------------------------------
# ------------------------------
# server safe app
# ------------------------------
import json
import os
import re
from copy import deepcopy
from typing import Callable
import public
from public.exceptions import HintException
from public.validate import Param
public.sys_path_append("class_v2/")
from ssh_security_v2 import ssh_security
from config_v2 import config
class main:
def __init__(self):
# {name:安全项名称,desc:描述,
# suggest:修复建议,check:检查函数,repair:修复函数,value:获取当前值函数,status:状态}
self.config = [
{
"name": "Default SSH Port",
"desc": public.lang("Modify the default SSH port to improve server security"),
"suggest": public.lang("Use a high port other than 22"),
"check": self.check_ssh_port,
},
{
"name": "Password Complexity Policy",
"desc": public.lang("Enable password complexity check to ensure password security"),
"suggest": public.lang("Use a level greater than 3"),
"check": self.check_ssh_minclass,
"repair": self.repair_ssh_minclass,
},
{
"name": "Password Length Limit",
"desc": public.lang("Set minimum password length requirement"),
"suggest": public.lang("Use a password of 9-20 characters"),
"check": self.check_ssh_security,
"repair": self.repair_ssh_passwd_len,
},
{
"name": "SSH Login Alert",
"desc": public.lang("Send alert notification upon SSH login"),
"suggest": public.lang("Enable SSH login alert"),
"check": self.check_ssh_login_sender,
},
{
"name": "Root Login Settings",
"desc": public.lang("It is recommended to allow key-based login only"),
"suggest": public.lang("Allow only SSH key-based login"),
"check": self.check_ssh_login_root_with_key,
},
{
"name": "SSH Brute-force",
"desc": public.lang("Prevent SSH brute-force attacks"),
"suggest": public.lang("Enable SSH brute-force protection"),
"check": self.check_ssh_fail2ban_brute,
},
{
"name": "Panel Login Alert",
"desc": public.lang("Send alert notification upon panel login"),
"suggest": public.lang("Enable panel login alert"),
"check": self.check_panel_swing,
},
{
"name": "Panel Google Authenticator login",
"desc": public.lang("Enable TOTP for enhanced security"),
"suggest": public.lang("Enable OTP authentication"),
"check": self.check_panel_login_2fa,
},
{
"name": "UnAuth Response Status Code",
"desc": public.lang("Set the HTTP response status code for unauthenticated access"),
"suggest": public.lang("Set 404 as the response code"),
"check": self.check_panel_not_auth_code,
},
{
"name": "Panel SSL",
"desc": public.lang("Enable HTTPS encrypted transmission (after setting will restart the panel)"),
"suggest": public.lang("Enable panel HTTPS"),
"check": self.check_panel_ssl,
}
]
self.ssh_security_obj = ssh_security()
self.config_obj = config()
def get_security_info(self, get=None):
"""
获取安全评分
"""
new_list = deepcopy(self.config)
for idx, module in enumerate(new_list):
if isinstance(module.get("check"), Callable):
try:
module["id"] = int(idx) + 1
check_status = module["check"]()
module["status"] = check_status.get("status", False)
module["value"] = check_status.get("value")
except:
module["status"] = False
module["value"] = None
if "check" in module and isinstance(module["check"], Callable):
del module["check"]
if "repair" in module and isinstance(module["repair"], Callable):
del module["repair"]
if "value" not in module:
module["value"] = None
total_score = 100 # 总分
score = total_score / len(new_list) # 每条的分数
missing_count = 0 # 缺少的条数
for module in new_list:
if module["status"] is False:
missing_count += 1
# 计算总分
security_score = total_score - (missing_count * score)
security_score = round(security_score, 2)
# 计算得分文本
if security_score >= 90:
score_text = public.lang("Secure")
elif security_score >= 70:
score_text = public.lang("Relatively Secure")
elif security_score >= 50:
score_text = public.lang("Average Security")
else:
score_text = public.lang("Insecure")
public.set_module_logs("server_secury", "get_security_info", 1)
return public.success_v2({
"security_data": new_list,
"total_score": total_score,
"score_text": score_text,
"score": int(security_score)
})
def install_fail2ban(self, get):
from panel_plugin_v2 import panelPlugin
public.set_module_logs("server_secury", "install_fail2ban", 1)
return panelPlugin().install_plugin(get)
def repair_security(self, get):
"""
@name 修复安全项
@parma {"name":"","args":{}}
"""
try:
get.validate([
Param("name").String().Require(),
Param("args").Dict().Require(),
], [public.validate.trim_filter()])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.fail_v2(str(ex))
for security in self.config:
if security.get("name") == get.name and isinstance(security.get("repair"), Callable):
return security["repair"](public.to_dict_obj(get.args))
raise HintException(public.lang(f"Security Repair Item [{get.name}] Not Found!"))
@staticmethod
def _find_pwquality_conf_with_keyword(re_search: str) -> str:
"""
读取ssh密码复杂度配置
@param re_search: 正则表达式
"""
try:
if not re_search:
raise HintException("required parameter re_search")
p_file = '/etc/security/pwquality.conf'
p_body = public.readFile(p_file)
if not p_body:
return "" # 无配置文件时
tmp = re.findall(re_search, p_body, re.M)
if not tmp:
return "" # 未设置minclass
find = tmp[0].strip()
return find
except:
return "" # 异常时认为无
# =================== 检查函数 ===================
def check_ssh_port(self) -> dict:
"""
@name 检查SSH端口是否为默认端口22
"""
current_port = public.get_ssh_port()
return {"status": current_port != 22, "value": current_port}
def check_ssh_minclass(self) -> dict:
"""
@name 检查SSH密码复杂度策略
"""
re_pattern = r"\n\s*minclass\s+=\s+(.+)"
find = self._find_pwquality_conf_with_keyword(re_pattern)
if not find:
return {"status": False, "value": None} # 未设置minclass
minclass_value = int(find)
return {"status": minclass_value >= 3, "value": minclass_value}
def check_ssh_security(self) -> dict:
"""
@name 检查SSH密码长度限制
"""
re_pattern = r"\s*minlen\s+=\s+(.+)"
find = self._find_pwquality_conf_with_keyword(re_pattern)
if not find:
return {"status": True, "value": None} # 未设置minlen时认为无风险
minlen_value = int(find)
return {"status": minlen_value >= 9, "value": minlen_value}
def check_panel_swing(self) -> dict:
"""
@name 检查面板登录告警是否开启
"""
tip_files = [
"panel_login_send.pl", "login_send_type.pl", "login_send_mail.pl", "login_send_dingding.pl"
]
enabled_files = []
for fname in tip_files:
filename = "data/" + fname
if os.path.exists(filename):
enabled_files.append(fname)
break
is_enabled = len(enabled_files) > 0
value = None
if not is_enabled:
return {"status": False, "value": value}
task_file_path = "/www/server/panel/data/mod_push_data/task.json"
sender_file_path = "/www/server/panel/data/mod_push_data/sender.json"
task_data = {}
try:
with open(task_file_path, "r") as file:
tasks = json.load(file)
# 读取发送者配置文件
with open(sender_file_path, "r") as file:
senders = json.load(file)
sender_dict = {
sender["id"]: sender for sender in senders
}
# 查找特定的告警任务
for task in tasks:
if task.get("keyword") == "panel_login":
task_data = task
sender_types = set() # 使用集合来保证类型的唯一性
# 对应sender的ID获取sender_type并保证唯一性
for sender_id in task.get("sender", []):
if sender_id in sender_dict:
sender_types.add(sender_dict[sender_id]["sender_type"])
# 将唯一的通道类型列表转回列表格式,添加到告警数据中
task_data["channels"] = list(sender_types)
break
except:
pass
value = task_data
return {"status": value.get("status", False), "value": value}
def check_ssh_login_sender(self) -> dict:
"""
@name 检查SSH登录告警是否启用
"""
result = self.ssh_security_obj.get_login_send(None)
res = public.find_value_by_key(
result, "result", "error"
)
return {"status": res != "error", "value": res}
def check_ssh_login_root_with_key(self) -> dict:
"""
@name 检查SSH是否仅允许密钥登录root
"""
parsed = self.ssh_security_obj.paser_root_login()
current_policy = None
try:
current_policy = parsed[1]
except Exception as e:
import traceback
public.print_log("error info: {}".format(traceback.format_exc()))
return {"status": current_policy == "without-password", "value": current_policy}
def check_ssh_fail2ban_brute(self) -> dict:
"""
@name 检查SSH防爆破是否启用
"""
from safeModelV2.sshModel import main as sshmod
cfg = sshmod._get_ssh_fail2ban() or {}
current_value = cfg.get("status", 0)
return {"status": current_value == 1, "value": current_value}
def check_panel_login_2fa(self) -> dict:
"""
@name 检查面板登录动态口令认证是否启用
"""
current_value = self.config_obj.check_two_step(None)
res = public.find_value_by_key(
current_value, "result", False
)
return {"status": bool(res), "value": res}
def check_panel_not_auth_code(self) -> dict:
"""
@name 检查面板未登录响应状态码是否设置为 400+
"""
current_code = self.config_obj.get_not_auth_status()
return {"status": current_code != 0, "value": current_code}
def check_panel_ssl(self):
"""
@name 检查面板是否开启SSL
"""
enabled = os.path.exists("data/ssl.pl")
return {"status": bool(enabled), "value": enabled}
# =================== 修复函数 ===================
def repair_ssh_minclass(self, get):
"""
@name 修复SSH密码复杂度
@param {"minclass":9}
"""
try:
get.validate([
Param("minclass").Integer(">", 0).Require(),
], [public.validate.trim_filter()])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.fail_v2(str(ex))
minclass = int(get.minclass)
file = "/etc/security/pwquality.conf"
result = {
"status": False, "msg": public.lang("Failed to set SSH password complexity, "
"please disable system hardening or set it manually")
}
if not os.path.exists(file):
public.ExecShell("apt install libpam-pwquality -y")
if os.path.exists(file):
f_data = public.readFile(file)
if re.findall("\n\s*minclass\s*=\s*\d*", f_data):
file_result = re.sub("\n\s*minclass\s*=\s*\d*", "\nminclass = {}".format(minclass), f_data)
else:
file_result = f_data + "\nminclass = {}".format(minclass)
public.writeFile(file, file_result)
f_data = public.readFile(file)
if f_data.find("minclass = {}".format(minclass)) != -1:
result["status"] = True
result["msg"] = public.lang("SSH minimum password complexity has been set")
return public.return_message(0 if result["status"] else 1, 0, result["msg"])
def repair_ssh_passwd_len(self, get):
"""
@name SSH密码最小长度设置
@param {"len":9}
"""
try:
get.validate([
Param("len").Integer(">", 0).Require(),
], [public.validate.trim_filter()])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.fail_v2(str(ex))
pwd_len = int(get.len)
file = "/etc/security/pwquality.conf"
result = {
"status": False, "msg": public.lang("Failed to set SSH minimum password length, please set it manually")
}
if not os.path.exists(file):
public.ExecShell("apt install libpam-pwquality -y")
if os.path.exists(file):
f_data = public.readFile(file)
ssh_minlen = "\n#?\s*minlen\s*=\s*\d*"
file_result = re.sub(ssh_minlen, "\nminlen = {}".format(pwd_len), f_data)
public.writeFile(file, file_result)
f_data = public.readFile(file)
if f_data.find("minlen = {}".format(pwd_len)) != -1:
result["status"] = True
result["msg"] = "SSH minimum password length has been set to {}".format(pwd_len)
return public.return_message(0 if result["status"] else 1, 0, result["msg"])