Files
yakpanel-core/mod/base/web_conf/referer.py
2026-04-07 02:04:22 +05:30

364 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
import json
from dataclasses import dataclass
from typing import Tuple, Optional, Union, Dict
from .util import webserver, check_server_config, DB, \
write_file, read_file, GET_CLASS, service_reload, pre_re_key
from mod.base import json_response
@dataclass
class _RefererConf:
name: str
fix: str
domains: str
status: str
return_rule: str
http_status: str
def __str__(self):
return '{"name"="%s","fix"="%s","domains"="%s","status"="%s","http_status"="%s","return_rule"="%s"}' % (
self.name, self.fix, self.domains, self.status, self.http_status, self.return_rule
)
class RealReferer:
_referer_conf_dir = '/www/server/panel/vhost/config' # 防盗链配置
_ng_referer_conf_format = r''' #SECURITY-START 防盗链配置
location ~ .*\.(%s)$ {
expires 30d;
access_log /dev/null;
valid_referers %s;
if ($invalid_referer){
%s;
}
}
#SECURITY-END'''
def __init__(self, config_prefix: str):
if not os.path.isdir(self._referer_conf_dir):
os.makedirs(self._referer_conf_dir)
self.config_prefix: str = config_prefix
self._webserver = None
@property
def webserver(self) -> str:
if self._webserver is not None:
return self._webserver
self._webserver = webserver()
return self._webserver
def get_config(self, site_name: str) -> Optional[_RefererConf]:
try:
config = json.loads(read_file("{}/{}{}_door_chain.json".format(self._referer_conf_dir, self.config_prefix, site_name)))
except (json.JSONDecodeError, TypeError, ValueError):
config = None
if isinstance(config, dict):
return _RefererConf(**config)
return None
def save_config(self, site_name: str, data: Union[dict, str, _RefererConf]) -> bool:
if isinstance(data, dict):
c = json.dumps(data)
elif isinstance(data, _RefererConf):
c = json.dumps(str(data))
else:
c = data
file_path = "{}/{}{}_door_chain.json".format(self._referer_conf_dir, self.config_prefix, site_name)
return write_file(file_path, c)
# 检测参数,如果正确则返回 配置数据类型的值,否则返回错误信息
@staticmethod
def check_args(get: Union[Dict, GET_CLASS]) -> Union[_RefererConf, str]:
res = {}
if isinstance(get, GET_CLASS):
try:
res["status"] = "true" if not hasattr(get, "status") else get.status.strip()
res["http_status"] = "false" if not hasattr(get, "http_status") else get.http_status.strip()
res["name"] = get.name.strip()
res["fix"] = get.fix.strip()
res["domains"] = get.domains.strip()
res["return_rule"] = get.return_rule.strip()
except AttributeError:
return "Parameter error"
else:
try:
res["status"] = "true" if "status" not in get else get["status"].strip()
res["http_status"] = "false" if "http_status" not in get else get["http_status"].strip()
res["name"] = get["name"].strip()
res["fix"] = get["fix"].strip()
res["domains"] = get["domains"].strip()
res["return_rule"] = get["return_rule"].strip()
except KeyError:
return "Parameter error"
rconf = _RefererConf(**res)
if rconf.status not in ("true", "false") and rconf.return_rule not in ("true", "false"):
return "状态参数只能使用【true,false】"
if rconf.return_rule not in ('404', '403', '200', '301', '302', '401') and rconf.return_rule[0] != "/":
return "响应资源应使用URI路径或HTTP状态码/test.png 或 404"
if len(rconf.domains) < 3:
return "防盗链域名不能为空"
if len(rconf.fix) < 2:
return 'URL后缀不能为空!'
return rconf
def set_referer_security(self, rc: _RefererConf) -> Tuple[bool, str]:
error_msg = self._set_nginx_referer_security(rc)
if error_msg and self.webserver == "nginx":
return False, error_msg
error_msg = self._set_apache_referer_security(rc)
if error_msg and self.webserver == "apache":
return False, error_msg
service_reload()
self.save_config(rc.name, rc)
return True, "设置成功"
def _set_nginx_referer_security(self, rc: _RefererConf) -> Optional[str]:
ng_file = '/www/server/panel/vhost/nginx/{}{}.conf'.format(self.config_prefix, rc.name)
ng_conf = read_file(ng_file)
if not isinstance(ng_conf, str):
return "nginx配置文件丢失无法设置"
start_idx, end_idx = self._get_nginx_referer_security_idx(ng_conf)
if rc.status == "true":
if rc.return_rule[0] == "/":
return_rule = "rewrite /.* {} break".format(rc.return_rule)
else:
return_rule = 'return {}'.format(rc.return_rule)
valid_args_list = []
if rc.http_status == "true":
valid_args_list.extend(("none", "blocked"))
valid_args_list.extend(map(lambda x: x.strip(), rc.domains.split(",")))
valid_args = " ".join(valid_args_list)
location_args = "|".join(map(lambda x: pre_re_key(x.strip()), rc.fix.split(",")))
if start_idx is not None:
new_conf = ng_conf[:start_idx] + "\n" + (
self._ng_referer_conf_format % (location_args, valid_args, return_rule)
) + "\n" + ng_conf[end_idx:]
else:
rep_redirect_include = re.compile(r"\sinclude +.*/redirect/.*\*\.conf;", re.M)
redirect_include_res = rep_redirect_include.search(ng_conf)
if redirect_include_res:
new_conf = ng_conf[:redirect_include_res.end()] + "\n" + (
self._ng_referer_conf_format % (location_args, valid_args, return_rule)
) + ng_conf[redirect_include_res.end():]
else:
if "#SSL-END" not in ng_conf:
return "添加配置失败无法定位SSL相关配置的位置"
new_conf = ng_conf.replace("#SSL-END", "#SSL-END\n" + self._ng_referer_conf_format % (
location_args, valid_args, return_rule))
else:
if start_idx is None:
return
new_conf = ng_conf[:start_idx] + "\n" + ng_conf[end_idx:]
write_file(ng_file, new_conf)
if self.webserver == "nginx" and check_server_config() is not None:
write_file(ng_file, ng_conf)
return "配置失败"
@staticmethod
def _get_nginx_referer_security_idx(ng_conf: str) -> Tuple[Optional[int], Optional[int]]:
rep_security = re.compile(
r"(\s*#\s*SECURITY-START.*\n)?\s*location\s+~\s+\.\*\\\.\(.*(\|.*)?\)\$\s*\{[^}]*valid_referers"
)
res = rep_security.search(ng_conf)
if res is None:
return None, None
start_idx = res.start()
s_idx = start_idx + ng_conf[start_idx:].find("{") + 1 # 起始位置
l_n = 1
max_idx = len(ng_conf)
while l_n > 0:
next_l = ng_conf[s_idx:].find("{")
next_r = ng_conf[s_idx:].find("}") # 可能存在报错
if next_r == -1:
return None, None
if next_l == -1:
next_l = max_idx
if next_l < next_r:
l_n += 1
else:
l_n -= 1
s_idx += min(next_l, next_r) + 1
rep_comment = re.search(r"^\s*#\s*SECURITY-END[^\n]*\n", ng_conf[s_idx:])
if rep_comment is not None:
end_idx = s_idx + rep_comment.end()
else:
end_idx = s_idx
return start_idx, end_idx
@staticmethod
def _build_apache_referer_security_conf(rc: _RefererConf) -> str:
r_conf_list = ["#SECURITY-START 防盗链配置"]
cond_format = " RewriteCond %{{HTTP_REFERER}} !{} [NC]"
if rc.http_status == "false":
r_conf_list.append(cond_format.format("^$"))
r_conf_list.extend(map(lambda x: cond_format.format(x.strip()), rc.domains.split(",")))
rule_format = " RewriteRule .({}) {} "
if rc.return_rule[0] == "/":
r_conf_list.append(rule_format.format(
"|".join(map(lambda x: x.strip(), rc.fix.split(","))),
rc.return_rule
))
else:
r_conf_list.append(rule_format.format(
"|".join(map(lambda x: x.strip(), rc.fix.split(","))),
"/{s}.html [R={s},NC,L]".format(s=rc.return_rule)
))
r_conf_list.append(" #SECURITY-END")
return "\n".join(r_conf_list)
# 根据配置正则确定位置 并将配置文件添加进去 use_start 参数指定添加的前后
def _add_apache_referer_security_by_rep_idx(self,
rep: re.Pattern,
use_start: bool,
ap_conf, ap_file, r_conf) -> bool:
tmp_conf_list = []
last_idx = 0
for tmp in rep.finditer(ap_conf):
tmp_conf_list.append(ap_conf[last_idx:tmp.start()])
if use_start:
tmp_conf_list.append("\n" + r_conf + "\n")
tmp_conf_list.append(tmp.group())
else:
tmp_conf_list.append(tmp.group())
tmp_conf_list.append("\n" + r_conf + "\n")
last_idx = tmp.end()
if last_idx == 0:
return False
tmp_conf_list.append(ap_conf[last_idx:])
_conf = "".join(tmp_conf_list)
write_file(ap_file, _conf)
if self.webserver == "apache" and check_server_config() is not None:
write_file(ap_file, ap_conf)
return False
return True
def _set_apache_referer_security(self, rc: _RefererConf) -> Optional[str]:
ap_file = '/www/server/panel/vhost/apache/{}{}.conf'.format(self.config_prefix, rc.name)
ap_conf = read_file(ap_file)
if not isinstance(ap_conf, str):
return "nginx配置文件丢失无法设置"
rep_security = re.compile(r"#\s*SECURITY-START(.|\n)#SECURITY-END.*\n")
res = rep_security.search(ap_conf)
if rc.status == "true":
r_conf = self._build_apache_referer_security_conf(rc)
if res is not None:
new_conf_list = []
_idx = 0
for tmp_res in rep_security.finditer(ap_conf):
new_conf_list.append(ap_conf[_idx:tmp_res.start()])
new_conf_list.append("\n" + r_conf + "\n")
_idx = tmp_res.end()
new_conf_list.append(ap_conf[_idx:])
new_conf = "".join(new_conf_list)
write_file(ap_file, new_conf)
if self.webserver == "apache" and check_server_config() is not None:
write_file(ap_file, ap_conf)
return "配置修改失败"
rep_redirect_include = re.compile(r"IncludeOptional +.*/redirect/.*\*\.conf.*\n", re.M)
rep_custom_log = re.compile(r"CustomLog .*\n")
rep_deny_files = re.compile(r"\n\s*#DENY FILES")
if self._add_apache_referer_security_by_rep_idx(rep_redirect_include, False, ap_conf, ap_file, r_conf):
return
if self._add_apache_referer_security_by_rep_idx(rep_custom_log, False, ap_conf, ap_file, r_conf):
return
if self._add_apache_referer_security_by_rep_idx(rep_deny_files, True, ap_conf, ap_file, r_conf):
return
return "设置添加失败"
else:
if res is None:
return
new_conf_list = []
_idx = 0
for tmp_res in rep_security.finditer(ap_conf):
new_conf_list.append(ap_conf[_idx:tmp_res.start()])
_idx = tmp_res.end()
new_conf_list.append(ap_conf[_idx:])
new_conf = "".join(new_conf_list)
write_file(ap_file, new_conf)
if self.webserver == "apache" and check_server_config() is not None:
write_file(ap_file, ap_conf)
return "配置修改失败"
def get_referer_security(self, site_name) -> Optional[dict]:
r = self.get_config(site_name)
if r is None:
return None
return json.loads(str(r))
def remove_site_referer_info(self, site_name):
file_path = "{}/{}{}_door_chain.json".format(self._referer_conf_dir, self.config_prefix, site_name)
if os.path.exists(file_path):
os.remove(file_path)
# 从配置文件中获取referer配置信息
# 暂时不实现,意义不大
def _get_referer_security_by_conf(self, site_name):
if self.webserver == "nginx":
self._get_nginx_referer_security()
else:
self._get_apache_referer_security()
class Referer:
def __init__(self, config_prefix: str):
self.config_prefix: str = config_prefix
self._r = RealReferer(self.config_prefix)
def get_referer_security(self, get):
try:
site_name = get.site_name.strip()
except AttributeError:
return json_response(status=False, msg="Parameter error")
data = self._r.get_referer_security(site_name)
if data is None:
default_conf = {
"name": site_name,
"fix": "jpg,jpeg,gif,png,js,css",
"domains": "",
"status": "false",
"return_rule": "404",
"http_status": "false",
}
site_info = DB("sites").where("name=?", (site_name,)).field('id').find()
if not isinstance(site_info, dict):
return json_response(status=False, msg="Site query error")
domains_info = DB("domain").where("pid=?", (site_info["id"],)).field('name').select()
if not isinstance(domains_info, list):
return json_response(status=False, msg="Site query error")
default_conf["domains"] = ",".join(map(lambda x: x["name"], domains_info))
return json_response(status=True, data=default_conf)
return json_response(status=True, data=data)
def set_referer_security(self, get):
r = self._r.check_args(get)
if isinstance(r, str):
return json_response(status=False, msg=r)
flag, msg = self._r.set_referer_security(r)
return json_response(status=flag, msg=msg)