394 lines
15 KiB
Python
394 lines
15 KiB
Python
|
|
# coding: utf-8
|
|||
|
|
# -------------------------------------------------------------------
|
|||
|
|
# YakPanel
|
|||
|
|
# -------------------------------------------------------------------
|
|||
|
|
# Copyright (c) 2014-2099 YakPanel(www.yakpanel.com) All rights reserved.
|
|||
|
|
# -------------------------------------------------------------------
|
|||
|
|
# Author: yakpanel
|
|||
|
|
# -------------------------------------------------------------------
|
|||
|
|
|
|||
|
|
# ------------------------------
|
|||
|
|
# server safe app
|
|||
|
|
# ------------------------------
|
|||
|
|
import json
|
|||
|
|
import os
|
|||
|
|
import re
|
|||
|
|
from copy import deepcopy
|
|||
|
|
from typing import Callable
|
|||
|
|
|
|||
|
|
import public
|
|||
|
|
from public.exceptions import HintException
|
|||
|
|
from public.validate import Param
|
|||
|
|
|
|||
|
|
public.sys_path_append("class_v2/")
|
|||
|
|
from ssh_security_v2 import ssh_security
|
|||
|
|
from config_v2 import config
|
|||
|
|
|
|||
|
|
|
|||
|
|
class main:
|
|||
|
|
def __init__(self):
|
|||
|
|
# {name:安全项名称,desc:描述,
|
|||
|
|
# suggest:修复建议,check:检查函数,repair:修复函数,value:获取当前值函数,status:状态}
|
|||
|
|
self.config = [
|
|||
|
|
{
|
|||
|
|
"name": "Default SSH Port",
|
|||
|
|
"desc": public.lang("Modify the default SSH port to improve server security"),
|
|||
|
|
"suggest": public.lang("Use a high port other than 22"),
|
|||
|
|
"check": self.check_ssh_port,
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"name": "Password Complexity Policy",
|
|||
|
|
"desc": public.lang("Enable password complexity check to ensure password security"),
|
|||
|
|
"suggest": public.lang("Use a level greater than 3"),
|
|||
|
|
"check": self.check_ssh_minclass,
|
|||
|
|
"repair": self.repair_ssh_minclass,
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"name": "Password Length Limit",
|
|||
|
|
"desc": public.lang("Set minimum password length requirement"),
|
|||
|
|
"suggest": public.lang("Use a password of 9-20 characters"),
|
|||
|
|
"check": self.check_ssh_security,
|
|||
|
|
"repair": self.repair_ssh_passwd_len,
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"name": "SSH Login Alert",
|
|||
|
|
"desc": public.lang("Send alert notification upon SSH login"),
|
|||
|
|
"suggest": public.lang("Enable SSH login alert"),
|
|||
|
|
"check": self.check_ssh_login_sender,
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"name": "Root Login Settings",
|
|||
|
|
"desc": public.lang("It is recommended to allow key-based login only"),
|
|||
|
|
"suggest": public.lang("Allow only SSH key-based login"),
|
|||
|
|
"check": self.check_ssh_login_root_with_key,
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"name": "SSH Brute-force",
|
|||
|
|
"desc": public.lang("Prevent SSH brute-force attacks"),
|
|||
|
|
"suggest": public.lang("Enable SSH brute-force protection"),
|
|||
|
|
"check": self.check_ssh_fail2ban_brute,
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"name": "Panel Login Alert",
|
|||
|
|
"desc": public.lang("Send alert notification upon panel login"),
|
|||
|
|
"suggest": public.lang("Enable panel login alert"),
|
|||
|
|
"check": self.check_panel_swing,
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"name": "Panel Google Authenticator login",
|
|||
|
|
"desc": public.lang("Enable TOTP for enhanced security"),
|
|||
|
|
"suggest": public.lang("Enable OTP authentication"),
|
|||
|
|
"check": self.check_panel_login_2fa,
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"name": "UnAuth Response Status Code",
|
|||
|
|
"desc": public.lang("Set the HTTP response status code for unauthenticated access"),
|
|||
|
|
"suggest": public.lang("Set 404 as the response code"),
|
|||
|
|
"check": self.check_panel_not_auth_code,
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"name": "Panel SSL",
|
|||
|
|
"desc": public.lang("Enable HTTPS encrypted transmission (after setting will restart the panel)"),
|
|||
|
|
"suggest": public.lang("Enable panel HTTPS"),
|
|||
|
|
"check": self.check_panel_ssl,
|
|||
|
|
}
|
|||
|
|
]
|
|||
|
|
self.ssh_security_obj = ssh_security()
|
|||
|
|
self.config_obj = config()
|
|||
|
|
|
|||
|
|
def get_security_info(self, get=None):
|
|||
|
|
"""
|
|||
|
|
获取安全评分
|
|||
|
|
"""
|
|||
|
|
new_list = deepcopy(self.config)
|
|||
|
|
for idx, module in enumerate(new_list):
|
|||
|
|
if isinstance(module.get("check"), Callable):
|
|||
|
|
try:
|
|||
|
|
module["id"] = int(idx) + 1
|
|||
|
|
check_status = module["check"]()
|
|||
|
|
module["status"] = check_status.get("status", False)
|
|||
|
|
module["value"] = check_status.get("value")
|
|||
|
|
except:
|
|||
|
|
module["status"] = False
|
|||
|
|
module["value"] = None
|
|||
|
|
|
|||
|
|
if "check" in module and isinstance(module["check"], Callable):
|
|||
|
|
del module["check"]
|
|||
|
|
if "repair" in module and isinstance(module["repair"], Callable):
|
|||
|
|
del module["repair"]
|
|||
|
|
if "value" not in module:
|
|||
|
|
module["value"] = None
|
|||
|
|
|
|||
|
|
total_score = 100 # 总分
|
|||
|
|
score = total_score / len(new_list) # 每条的分数
|
|||
|
|
missing_count = 0 # 缺少的条数
|
|||
|
|
for module in new_list:
|
|||
|
|
if module["status"] is False:
|
|||
|
|
missing_count += 1
|
|||
|
|
# 计算总分
|
|||
|
|
security_score = total_score - (missing_count * score)
|
|||
|
|
security_score = round(security_score, 2)
|
|||
|
|
|
|||
|
|
# 计算得分文本
|
|||
|
|
if security_score >= 90:
|
|||
|
|
score_text = public.lang("Secure")
|
|||
|
|
elif security_score >= 70:
|
|||
|
|
score_text = public.lang("Relatively Secure")
|
|||
|
|
elif security_score >= 50:
|
|||
|
|
score_text = public.lang("Average Security")
|
|||
|
|
else:
|
|||
|
|
score_text = public.lang("Insecure")
|
|||
|
|
|
|||
|
|
public.set_module_logs("server_secury", "get_security_info", 1)
|
|||
|
|
return public.success_v2({
|
|||
|
|
"security_data": new_list,
|
|||
|
|
"total_score": total_score,
|
|||
|
|
"score_text": score_text,
|
|||
|
|
"score": int(security_score)
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
def install_fail2ban(self, get):
|
|||
|
|
from panel_plugin_v2 import panelPlugin
|
|||
|
|
public.set_module_logs("server_secury", "install_fail2ban", 1)
|
|||
|
|
return panelPlugin().install_plugin(get)
|
|||
|
|
|
|||
|
|
def repair_security(self, get):
|
|||
|
|
"""
|
|||
|
|
@name 修复安全项
|
|||
|
|
@parma {"name":"","args":{}}
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
get.validate([
|
|||
|
|
Param("name").String().Require(),
|
|||
|
|
Param("args").Dict().Require(),
|
|||
|
|
], [public.validate.trim_filter()])
|
|||
|
|
except Exception as ex:
|
|||
|
|
public.print_log("error info: {}".format(ex))
|
|||
|
|
return public.fail_v2(str(ex))
|
|||
|
|
|
|||
|
|
for security in self.config:
|
|||
|
|
if security.get("name") == get.name and isinstance(security.get("repair"), Callable):
|
|||
|
|
return security["repair"](public.to_dict_obj(get.args))
|
|||
|
|
raise HintException(public.lang(f"Security Repair Item [{get.name}] Not Found!"))
|
|||
|
|
|
|||
|
|
@staticmethod
|
|||
|
|
def _find_pwquality_conf_with_keyword(re_search: str) -> str:
|
|||
|
|
"""
|
|||
|
|
读取ssh密码复杂度配置
|
|||
|
|
@param re_search: 正则表达式
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
if not re_search:
|
|||
|
|
raise HintException("required parameter re_search")
|
|||
|
|
p_file = '/etc/security/pwquality.conf'
|
|||
|
|
p_body = public.readFile(p_file)
|
|||
|
|
if not p_body:
|
|||
|
|
return "" # 无配置文件时
|
|||
|
|
tmp = re.findall(re_search, p_body, re.M)
|
|||
|
|
if not tmp:
|
|||
|
|
return "" # 未设置minclass
|
|||
|
|
find = tmp[0].strip()
|
|||
|
|
return find
|
|||
|
|
except:
|
|||
|
|
return "" # 异常时认为无
|
|||
|
|
|
|||
|
|
# =================== 检查函数 ===================
|
|||
|
|
def check_ssh_port(self) -> dict:
|
|||
|
|
"""
|
|||
|
|
@name 检查SSH端口是否为默认端口22
|
|||
|
|
"""
|
|||
|
|
current_port = public.get_ssh_port()
|
|||
|
|
return {"status": current_port != 22, "value": current_port}
|
|||
|
|
|
|||
|
|
def check_ssh_minclass(self) -> dict:
|
|||
|
|
"""
|
|||
|
|
@name 检查SSH密码复杂度策略
|
|||
|
|
"""
|
|||
|
|
re_pattern = r"\n\s*minclass\s+=\s+(.+)"
|
|||
|
|
find = self._find_pwquality_conf_with_keyword(re_pattern)
|
|||
|
|
if not find:
|
|||
|
|
return {"status": False, "value": None} # 未设置minclass
|
|||
|
|
minclass_value = int(find)
|
|||
|
|
return {"status": minclass_value >= 3, "value": minclass_value}
|
|||
|
|
|
|||
|
|
def check_ssh_security(self) -> dict:
|
|||
|
|
"""
|
|||
|
|
@name 检查SSH密码长度限制
|
|||
|
|
"""
|
|||
|
|
re_pattern = r"\s*minlen\s+=\s+(.+)"
|
|||
|
|
find = self._find_pwquality_conf_with_keyword(re_pattern)
|
|||
|
|
if not find:
|
|||
|
|
return {"status": True, "value": None} # 未设置minlen时认为无风险
|
|||
|
|
minlen_value = int(find)
|
|||
|
|
return {"status": minlen_value >= 9, "value": minlen_value}
|
|||
|
|
|
|||
|
|
def check_panel_swing(self) -> dict:
|
|||
|
|
"""
|
|||
|
|
@name 检查面板登录告警是否开启
|
|||
|
|
"""
|
|||
|
|
tip_files = [
|
|||
|
|
"panel_login_send.pl", "login_send_type.pl", "login_send_mail.pl", "login_send_dingding.pl"
|
|||
|
|
]
|
|||
|
|
enabled_files = []
|
|||
|
|
for fname in tip_files:
|
|||
|
|
filename = "data/" + fname
|
|||
|
|
if os.path.exists(filename):
|
|||
|
|
enabled_files.append(fname)
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
is_enabled = len(enabled_files) > 0
|
|||
|
|
value = None
|
|||
|
|
if not is_enabled:
|
|||
|
|
return {"status": False, "value": value}
|
|||
|
|
|
|||
|
|
task_file_path = "/www/server/panel/data/mod_push_data/task.json"
|
|||
|
|
sender_file_path = "/www/server/panel/data/mod_push_data/sender.json"
|
|||
|
|
task_data = {}
|
|||
|
|
try:
|
|||
|
|
with open(task_file_path, "r") as file:
|
|||
|
|
tasks = json.load(file)
|
|||
|
|
# 读取发送者配置文件
|
|||
|
|
with open(sender_file_path, "r") as file:
|
|||
|
|
senders = json.load(file)
|
|||
|
|
sender_dict = {
|
|||
|
|
sender["id"]: sender for sender in senders
|
|||
|
|
}
|
|||
|
|
# 查找特定的告警任务
|
|||
|
|
for task in tasks:
|
|||
|
|
if task.get("keyword") == "panel_login":
|
|||
|
|
task_data = task
|
|||
|
|
sender_types = set() # 使用集合来保证类型的唯一性
|
|||
|
|
# 对应sender的ID,获取sender_type,并保证唯一性
|
|||
|
|
for sender_id in task.get("sender", []):
|
|||
|
|
if sender_id in sender_dict:
|
|||
|
|
sender_types.add(sender_dict[sender_id]["sender_type"])
|
|||
|
|
# 将唯一的通道类型列表转回列表格式,添加到告警数据中
|
|||
|
|
task_data["channels"] = list(sender_types)
|
|||
|
|
break
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
value = task_data
|
|||
|
|
return {"status": value.get("status", False), "value": value}
|
|||
|
|
|
|||
|
|
def check_ssh_login_sender(self) -> dict:
|
|||
|
|
"""
|
|||
|
|
@name 检查SSH登录告警是否启用
|
|||
|
|
"""
|
|||
|
|
result = self.ssh_security_obj.get_login_send(None)
|
|||
|
|
res = public.find_value_by_key(
|
|||
|
|
result, "result", "error"
|
|||
|
|
)
|
|||
|
|
return {"status": res != "error", "value": res}
|
|||
|
|
|
|||
|
|
def check_ssh_login_root_with_key(self) -> dict:
|
|||
|
|
"""
|
|||
|
|
@name 检查SSH是否仅允许密钥登录root
|
|||
|
|
"""
|
|||
|
|
parsed = self.ssh_security_obj.paser_root_login()
|
|||
|
|
current_policy = None
|
|||
|
|
try:
|
|||
|
|
current_policy = parsed[1]
|
|||
|
|
except Exception as e:
|
|||
|
|
import traceback
|
|||
|
|
public.print_log("error info: {}".format(traceback.format_exc()))
|
|||
|
|
return {"status": current_policy == "without-password", "value": current_policy}
|
|||
|
|
|
|||
|
|
def check_ssh_fail2ban_brute(self) -> dict:
|
|||
|
|
"""
|
|||
|
|
@name 检查SSH防爆破是否启用
|
|||
|
|
"""
|
|||
|
|
from safeModelV2.sshModel import main as sshmod
|
|||
|
|
cfg = sshmod._get_ssh_fail2ban() or {}
|
|||
|
|
current_value = cfg.get("status", 0)
|
|||
|
|
return {"status": current_value == 1, "value": current_value}
|
|||
|
|
|
|||
|
|
def check_panel_login_2fa(self) -> dict:
|
|||
|
|
"""
|
|||
|
|
@name 检查面板登录动态口令认证是否启用
|
|||
|
|
"""
|
|||
|
|
current_value = self.config_obj.check_two_step(None)
|
|||
|
|
res = public.find_value_by_key(
|
|||
|
|
current_value, "result", False
|
|||
|
|
)
|
|||
|
|
return {"status": bool(res), "value": res}
|
|||
|
|
|
|||
|
|
def check_panel_not_auth_code(self) -> dict:
|
|||
|
|
"""
|
|||
|
|
@name 检查面板未登录响应状态码是否设置为 400+
|
|||
|
|
"""
|
|||
|
|
current_code = self.config_obj.get_not_auth_status()
|
|||
|
|
return {"status": current_code != 0, "value": current_code}
|
|||
|
|
|
|||
|
|
def check_panel_ssl(self):
|
|||
|
|
"""
|
|||
|
|
@name 检查面板是否开启SSL
|
|||
|
|
"""
|
|||
|
|
enabled = os.path.exists("data/ssl.pl")
|
|||
|
|
return {"status": bool(enabled), "value": enabled}
|
|||
|
|
|
|||
|
|
# =================== 修复函数 ===================
|
|||
|
|
def repair_ssh_minclass(self, get):
|
|||
|
|
"""
|
|||
|
|
@name 修复SSH密码复杂度
|
|||
|
|
@param {"minclass":9}
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
get.validate([
|
|||
|
|
Param("minclass").Integer(">", 0).Require(),
|
|||
|
|
], [public.validate.trim_filter()])
|
|||
|
|
except Exception as ex:
|
|||
|
|
public.print_log("error info: {}".format(ex))
|
|||
|
|
return public.fail_v2(str(ex))
|
|||
|
|
|
|||
|
|
minclass = int(get.minclass)
|
|||
|
|
file = "/etc/security/pwquality.conf"
|
|||
|
|
result = {
|
|||
|
|
"status": False, "msg": public.lang("Failed to set SSH password complexity, "
|
|||
|
|
"please disable system hardening or set it manually")
|
|||
|
|
}
|
|||
|
|
if not os.path.exists(file):
|
|||
|
|
public.ExecShell("apt install libpam-pwquality -y")
|
|||
|
|
if os.path.exists(file):
|
|||
|
|
f_data = public.readFile(file)
|
|||
|
|
if re.findall("\n\s*minclass\s*=\s*\d*", f_data):
|
|||
|
|
file_result = re.sub("\n\s*minclass\s*=\s*\d*", "\nminclass = {}".format(minclass), f_data)
|
|||
|
|
else:
|
|||
|
|
file_result = f_data + "\nminclass = {}".format(minclass)
|
|||
|
|
public.writeFile(file, file_result)
|
|||
|
|
f_data = public.readFile(file)
|
|||
|
|
if f_data.find("minclass = {}".format(minclass)) != -1:
|
|||
|
|
result["status"] = True
|
|||
|
|
result["msg"] = public.lang("SSH minimum password complexity has been set")
|
|||
|
|
return public.return_message(0 if result["status"] else 1, 0, result["msg"])
|
|||
|
|
|
|||
|
|
def repair_ssh_passwd_len(self, get):
|
|||
|
|
"""
|
|||
|
|
@name SSH密码最小长度设置
|
|||
|
|
@param {"len":9}
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
get.validate([
|
|||
|
|
Param("len").Integer(">", 0).Require(),
|
|||
|
|
], [public.validate.trim_filter()])
|
|||
|
|
except Exception as ex:
|
|||
|
|
public.print_log("error info: {}".format(ex))
|
|||
|
|
return public.fail_v2(str(ex))
|
|||
|
|
|
|||
|
|
pwd_len = int(get.len)
|
|||
|
|
file = "/etc/security/pwquality.conf"
|
|||
|
|
result = {
|
|||
|
|
"status": False, "msg": public.lang("Failed to set SSH minimum password length, please set it manually")
|
|||
|
|
}
|
|||
|
|
if not os.path.exists(file):
|
|||
|
|
public.ExecShell("apt install libpam-pwquality -y")
|
|||
|
|
if os.path.exists(file):
|
|||
|
|
f_data = public.readFile(file)
|
|||
|
|
ssh_minlen = "\n#?\s*minlen\s*=\s*\d*"
|
|||
|
|
file_result = re.sub(ssh_minlen, "\nminlen = {}".format(pwd_len), f_data)
|
|||
|
|
public.writeFile(file, file_result)
|
|||
|
|
f_data = public.readFile(file)
|
|||
|
|
if f_data.find("minlen = {}".format(pwd_len)) != -1:
|
|||
|
|
result["status"] = True
|
|||
|
|
result["msg"] = "SSH minimum password length has been set to {}".format(pwd_len)
|
|||
|
|
return public.return_message(0 if result["status"] else 1, 0, result["msg"])
|