Files
yakpanel-core/class_v2/safeModelV2/firewallModel.py
2026-04-07 02:04:22 +05:30

4169 lines
181 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#coding: utf-8
#-------------------------------------------------------------------
# YakPanel
#-------------------------------------------------------------------
# Copyright (c) 2015-2099 YakPanel(www.yakpanel.com) All rights reserved.
#-------------------------------------------------------------------
# Author: hwliang <hwl@yakpanel.com>
#-------------------------------------------------------------------
# 系统防火墙
#------------------------------
import sys, os, json, re, time, sqlite3
import contextlib
import traceback
from xml.etree.ElementTree import ElementTree, Element
from safeModelV2.base import safeBase
from flask import send_file, abort
os.chdir("/www/server/panel")
sys.path.append("class/")
import public
from public import dict_obj
from public.validate import Param
class main(safeBase):
__isFirewalld = False
__isUfw = False
__firewall_obj = None
_add_sid = 0
_ip_list = []
_port_list = []
_ufw_default = '/etc/default/ufw'
_ufw_sysctl = '/etc/ufw/sysctl.conf'
_ufw_before = '/etc/ufw/before.rules'
_trans_status = "/www/server/panel/plugin/firewall/status.json"
_rule_path = "/www/server/panel/plugin/firewall/"
old_ips_path = "/www/server/panel/plugin/firewall/ips.txt"
_ips_path = "/www/server/panel/plugin/firewall/GeoLite2-Country.json"
_country_path = "/www/server/panel/plugin/firewall/country.txt"
_white_list_file = "/www/server/panel/plugin/firewall/whitelist.txt" # 证书验证IP
_white_list = []
_host_ip = None
_firewall_create_tip = '{}/data/firewall_sqlite.pl'.format(public.get_panel_path())
def __init__(self):
super().__init__()
self.__firewall_obj = firewalld()
if os.path.exists('/usr/sbin/firewalld') and os.path.exists('/usr/bin/yum'):
self.__isFirewalld = True
if os.path.exists('/usr/sbin/ufw') and os.path.exists('/usr/bin/apt-get'):
self.__isUfw = True
self.get_old_rule()
if not os.path.exists(self._trans_status):
ret = {"status": "close"}
public.writeFile(self._trans_status, json.dumps(ret))
Sqlite()
# if not os.path.exists(self._firewall_create_tip):
# Sqlite()
# public.writeFile(self._firewall_create_tip, '')
def get_old_rule(self):
"""
@兼容防火墙插件规则
"""
self._rule_path = self._rule_path.replace('plugin', 'data')
if not os.path.exists(self._rule_path): os.makedirs(self._rule_path)
if os.path.exists(self._trans_status):
n_path = self._trans_status.replace('plugin', 'data')
if not os.path.exists(n_path):
public.writeFile(n_path, public.readFile(self._trans_status))
if os.path.exists(self._ips_path):
n_path = self._ips_path.replace('plugin', 'data')
if not os.path.exists(n_path):
public.writeFile(n_path, public.readFile(self._ips_path))
if os.path.exists(self._white_list_file):
n_path = self._white_list_file.replace('plugin', 'data')
if not os.path.exists(n_path):
public.writeFile(n_path, public.readFile(self._white_list_file))
if os.path.exists(self._country_path):
n_path = self._country_path.replace('plugin', 'data')
if not os.path.exists(n_path):
public.writeFile(n_path, public.readFile(self._country_path))
self._white_list_file = self._white_list_file.replace('plugin', 'data')
self._country_path = self._country_path.replace('plugin', 'data')
self._ips_path = self._ips_path.replace('plugin', 'data')
self._trans_status = self._trans_status.replace('plugin', 'data')
def install_sys_firewall(self, get):
"""
@安装系统防火墙
"""
res = public.install_sys_firewall()
if res:
return public.return_message(0, 0, public.lang("Successful installation"))
return public.return_message(-1, 0, public.lang("installation failed"))
def get_firewall_info(self, get):
"""
@name 获取防火墙统计
"""
data = {}
data['port'] = public.M('firewall_new').count()
data['ip'] = public.M('firewall_ip').count()
data['trans'] = public.M('firewall_trans').count()
data['country'] = public.M('firewall_country').count()
isPing = True
try:
file = '/etc/sysctl.conf'
conf = public.readFile(file)
rep = r"#*net\.ipv4\.icmp_echo_ignore_all\s*=\s*([0-9]+)\n"
tmp = re.search(rep, conf).groups(0)[0]
if tmp == '1': isPing = False
except:
isPing = True
data['ping'] = isPing
data['status'] = self.get_firewall_status()
return public.return_message(0, 0, data)
# 服务状态获取
def get_firewall_status(self):
if self.__isUfw:
res = public.ExecShell("systemctl is-active ufw")[0]
if res == "active":
return public.return_message(0, 0, True)
res = public.ExecShell("systemctl list-units | grep ufw")[0]
if res.find('active running') != -1:
return public.return_message(0, 0, True)
res = public.ExecShell('/lib/ufw/ufw-init status')[0]
if res.find("Firewall is not running") != -1:
return public.return_message(-1, 0, True)
res = public.ExecShell('ufw status verbose')[0]
if res.find('inactive') != -1:
return public.return_message(-1, 0, True)
return public.return_message(0, 0, True)
if self.__isFirewalld:
res = public.ExecShell("ps -ef|grep firewalld|grep -v grep")[0]
if res:
return public.return_message(0, 0, True)
res = public.ExecShell("systemctl is-active firewalld")[0]
if res == "active":
return public.return_message(0, 0, True)
res = public.ExecShell("systemctl list-units | grep firewalld")[0]
if res.find('active running') != -1:
return public.return_message(0, 0, True)
return public.return_message(-1, 0, True)
else:
res = public.ExecShell("/etc/init.d/iptables status")[0]
if res.find('not running') != -1:
return public.return_message(-1, 0, True)
res = public.ExecShell("systemctl is-active iptables")[0]
if res == "active":
return public.return_message(0, 0, True)
return public.return_message(0, 0, True)
def SetPing(self, get):
if "status" not in get:
return public.return_message(-1, 0, 'params status not found')
if get.status == '1':
get.status = '0'
else:
get.status = '1'
filename = '/etc/sysctl.conf'
conf = public.readFile(filename)
if not conf or not isinstance(conf, str):
return public.return_message(-1, 0, '/etc/sysctl.conf File Read ExceptionUnable to set ping ban')
if conf.find('net.ipv4.icmp_echo') != -1:
rep = r"net\.ipv4\.icmp_echo.*"
conf = re.sub(rep, 'net.ipv4.icmp_echo_ignore_all=' + get.status + "\n", conf)
else:
conf += "\nnet.ipv4.icmp_echo_ignore_all=" + get.status + "\n"
if public.writeFile(filename, conf):
public.ExecShell('sysctl -p')
return public.return_message(0, 0, public.lang("SUCCESS"))
else:
return public.return_message(-1, 0,
'<a style="color:red;">Error: Setting failed, sysctl.conf is not writable!</a><br>'
'1. If [System Hardening] is installed, please close it first<br>'
'2. If Cloud Lock is installed, please turn off the [System Hardening] function<br>'
'3. If a security dog is installed, please turn off the [System Protection] function<br>'
'4. If you use other security software, please uninstall it first<br>'
)
# 服务状态控制 dict_obj.
def firewall_admin(self, get: dict_obj):
order = ['reload', 'restart', 'stop', 'start']
if not get.status in order:
return public.returnMsg(False, public.lang("unknown control command!"))
names = ["reload", "restart", "stop", "start"]
result = dict(zip(order, names))
cache_key = "firewall_info"
from YakPanel import cache
data = cache.get(cache_key)
if data:
cache.delete(cache_key)
public.ExecShell("systemctl reload BT-FirewallServices")
if self.__isUfw:
if get.status == "stop":
public.ExecShell('/usr/sbin/ufw disable')
elif get.status == "start":
public.ExecShell('echo y|/usr/sbin/ufw enable')
elif get.status == "reload":
public.ExecShell('/usr/sbin/ufw reload')
elif get.status == "restart":
public.ExecShell('/usr/sbin/ufw disable && /usr/sbin/ufw enable')
# ufw防火墙启动时重载一次sysctl
# 解决deian系统启动ufw后禁ping失1效 by wzz/2023-06-14
filename = '/etc/sysctl.conf'
conf = public.readFile(filename)
if conf.find('net.ipv4.icmp_echo') != -1:
public.ExecShell("sysctl -p")
public.WriteLog("system firewall", "firewall {}".format(result[get.status]))
return public.return_message(0, 0, public.lang("firewall has {}", result[get.status]))
if self.__isFirewalld:
public.ExecShell('systemctl {} firewalld'.format(get.status))
public.WriteLog("system firewall", "firewall {}".format(result[get.status]))
return public.return_message(0, 0, public.lang("firewall has {}", result[get.status]))
else:
public.ExecShell('service iptables {}'.format(get.status))
public.WriteLog("system firewall", "firewall {}".format(result[get.status]))
return public.return_message(0, 0, public.lang("fire/wall has {}", result[get.status]))
# 重载防火墙配置
def FirewallReload(self):
if self.__isUfw:
public.ExecShell('/usr/sbin/ufw reload') # 兼容安装了多个防火墙的情况 hezhihong # return
# 解决deian系统启动ufw后禁ping失1效 by wzz/2023-06-14
filename = '/etc/sysctl.conf'
conf = public.readFile(filename)
if conf.find('net.ipv4.icmp_echo') != -1:
public.ExecShell("sysctl -p")
if self.__isFirewalld:
public.ExecShell('firewall-cmd --reload')
else:
public.ExecShell('/etc/init.d/iptables save')
public.ExecShell('/etc/init.d/iptables restart')
#端口扫描
def CheckPort(self, port, protocol):
import socket
localIP = '127.0.0.1'
temp = {}
temp['port'] = port
temp['local'] = True
try:
if 'tcp' in protocol.lower():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.01)
s.connect((localIP, port))
s.close()
if 'udp' in protocol.lower():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0.01)
s.sendto(b'', (localIP, port))
s.close()
except:
temp['local'] = False
result = 0
if temp['local']: result += 2
return result
# 查询入栈规则
def get_rules_list(self, args):
# 分页校验参数
try:
args.validate([
Param('limit').Integer(),
Param('p').Integer(),
Param('query').String().Xss(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
if self.__isFirewalld:
self.__firewall_obj = firewalld()
self.GetList()
else:
self.get_ufw_list()
try:
p = 1
limit = 15
if 'p' in args: p = args.p
if 'limit' in args: limit = args.limit
where = '1=1'
sql = public.M('firewall_new')
if hasattr(args, 'query'):
where = " ports like '%{search}%' or brief like '%{search}%' or address like '%{search}%'".format(
search=args.query
)
count = sql.where(where, ()).count()
data = public.get_page(count, int(p), int(limit))
data['data'] = sql.where(where, ()).limit('{},{}'.format(data['shift'], data['row'])).order(
'addtime desc'
).select()
res_data = data['data']
for i in range(len(res_data)):
if not 'ports' in res_data[i]:
res_data[i]['status'] = -1
continue
d = res_data[i]
_port = d['ports']
_protocol = d['protocol']
if _port.find(':') != -1 or _port.find('.') != -1 or _port.find('-') != -1:
d['status'] = -1
else:
d['status'] = self.CheckPort(int(_port), _protocol)
for i in res_data:
if 'brief' in i:
i['brief'] = public.xsssec(i['brief'])
return public.return_message(0, 0, res_data)
except Exception as e:
return public.return_message(-1, 0, [])
def check_firewall_rule(self, args):
"""
@检测防火墙规则
"""
port = args['port']
find = public.M('firewall_new').where('ports=?', (str(port), )).find()
if find:
return True
return False
# 端口检查
def check_port(self, port_list):
rep1 = r"^\d{1,5}(:\d{1,5})?$"
# rep1 = r'^[0-9]|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5]$'
for port in port_list:
if port.find('-') != -1:
ports = port.split('-')
if not re.search(rep1, ports[0]):
return public.returnMsg(False, public.lang("Port range is incorrect!"))
if not re.search(rep1, ports[1]):
return public.returnMsg(False, public.lang("Port range is incorrect!"))
elif port.find(':') != -1:
ports = port.split(':')
if not re.search(rep1, ports[0]):
return public.returnMsg(False, public.lang("Port range is incorrect!"))
if not re.search(rep1, ports[1]):
return public.returnMsg(False, public.lang("Port range is incorrect!"))
else:
if not re.search(rep1, port):
return public.returnMsg(False, public.lang("Port range is incorrect!"))
def parse_ip_interval(self, ip_str):
"""解析区间IP
author: lx
date: 2022/10/25
Returns:
list : IP列表
"""
ips = []
try:
rep2 = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(\/\d{1,2})?$"
searchor = re.compile(rep2)
if ip_str.find("-") != -1:
pre_ip, end_ip = ip_str.split("-")
if searchor.search(pre_ip) and searchor.search(end_ip):
ips.append(pre_ip)
pinx = pre_ip.rfind(".") + 1
einx = end_ip.rfind(".") + 1
pre = pre_ip[0:pinx]
end = end_ip[0:einx]
if pre == end:
start_num = int(pre_ip[pinx:])
end_num = min(int(end_ip[einx:]), 255)
for i in range(start_num + 1, end_num):
new_ip = pre + str(i)
if searchor.search(new_ip):
ips.append(new_ip)
end_ip = end + str(end_num)
ips.append(end_ip)
except:
pass
return ips
# 判断是否为ipv6网段
@staticmethod
def is_ipv6_network_segment_or_ipv6_address(ip_datas: str) -> bool:
from ipaddress import IPv6Network, IPv6Address
try:
tmp_data = IPv6Network(ip_datas)
except:
try:
tmp_data = IPv6Address(ip_datas)
except:
return False
return True
# 添加入栈规则
def create_rules2(self, get):
'''
get 里面 有 protocol port type address brief 五个参数
protocol == ['tcp','udp']
port = 端口
types == [accept、drop] # 放行和禁止
address 地址允许放行的ip如果全部就是0.0.0.0/0;另外可以包含“,"或者"-"
表示区间IP
brief 备注说明
'''
protocol = get.protocol
ports = get.ports.strip()
types = get.types
address = get.source.strip()
port_list = ports.split(',')
result = self.check_port(port_list) # 检测端口
if result: return result
allow_ips = []
if address:
sources = [
sip.strip() for sip in address.split(",") if sip.strip()
]
rep2 = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(\/\d{1,2})?$"
_ips = []
for source_ip in sources:
if source_ip.find("-") != -1:
_ips += self.parse_ip_interval(source_ip)
else:
_ips.append(source_ip)
for source_ip in _ips:
if not re.search(rep2, source_ip) and not self.is_ipv6_network_segment_or_ipv6_address(source_ip):
return public.returnMsg(False, public.lang("IP address you've input is illegal!"))
allow_ips.append(source_ip)
if not allow_ips:
allow_ips.append("")
for source_ip in allow_ips:
if self.__isUfw:
for port in port_list:
if port.find('-') != -1:
port = port.replace('-', ':')
self.add_ufw_rule(source_ip, protocol, port, types)
else:
if self.__isFirewalld:
for port in port_list:
if port.find(':') != -1:
port = port.replace(':', '-')
self.add_firewall_rule(source_ip, protocol, port, types)
else:
for port in port_list:
self.add_iptables_rule(source_ip, protocol, port, types)
# 添加入栈规则
def create_rules(self, get):
'''
get 里面 有 protocol ports types address brief 五个参数
protocol == ['tcp','udp']
port = 端口
types == [accept、drop] # 放行和禁止
address 地址允许放行的ip如果全部就是0.0.0.0/0;另外可以包含“,"或者"-"
表示区间IP
brief 备注说明
'''
# 校验参数
try:
get.validate([
Param('ports').Require().Number(">=", 1).Number("<=", 65535),
Param('address').Require().Ip(),
Param('types').Require().String('in', ['accept', 'drop']),
Param('protocol').Require().String('in', ['tcp', 'udp']),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
protocol = get.protocol
ports = get.ports.strip()
types = get.types
address = get.source.strip()
brief = get.brief.strip()
port_list = ports.split(',')
is_add = 2 if 'add' not in get else get.add
domain_total = '' if ("domain" not in get or not get.domain) else get.domain.strip()
domain = '' if ("domain" not in get or not get.domain) else get.domain.strip() + '|' + address
result = self.check_port(port_list) # 检测端口
if result:
return public.return_message(0, 0, result)
allow_ips = []
if address:
sources = [sip.strip() for sip in address.split(",") if sip.strip()]
rep2 = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(\/\d{1,2})?$"
_ips = []
for source_ip in sources:
if source_ip.find("-") != -1:
_ips += self.parse_ip_interval(source_ip)
else:
_ips.append(source_ip)
for source_ip in _ips:
if not re.search(rep2, source_ip) and not self.is_ipv6_network_segment_or_ipv6_address(source_ip):
return public.return_message(-1, 0, public.lang("IP address you've input is illegal!"))
query_result = public.M('firewall_new').where(
'ports=? and address=? and protocol=? and types=?',
(ports, source_ip, protocol, types,)
).count()
if query_result > 0:
continue
allow_ips.append(source_ip)
if not allow_ips:
allow_ips.append("")
# 忽略的列表
ignore_list = []
addtime = time.strftime('%Y-%m-%d %X', time.localtime())
for source_ip in allow_ips:
for port in port_list:
if is_add == 1: continue
# 检测端口是否已经添加过
query_result = public.M('firewall_new').where(
'ports=? and address=? and protocol=? and types=?',
(port, source_ip, protocol, types,)
).find()
firewall_rules = self.get_sys_firewall_rules()
if query_result:
new_query_result = {"ports": query_result["ports"], "address": query_result["address"],
"protocol": query_result["protocol"], "types": query_result["types"]}
for firewall_rule in firewall_rules:
if new_query_result == firewall_rule:
ignore_list.append(port)
break
self._add_firewall_rules(source_ip, protocol, port, types)
if not query_result:
try:
self._add_sid = public.M('firewall_new').add(
'ports,brief,protocol,address,types,addtime,domain,sid',
(port, public.xsssec(brief), protocol, source_ip, types, addtime, domain, 0)
)
except Exception as e:
if "table firewall_new has no column named" in str(e):
public.M('firewall_new').execute(
'ALTER TABLE "firewall_domain" ADD "domain_total" TEXT DEFAULT ""')
public.M('firewall_new').execute('ALTER TABLE "firewall_new" ADD "domain" TEXT DEFAULT ""')
public.M('firewall_new').execute('ALTER TABLE "firewall_new" ADD "sid" INT DEFAULT 0')
self._add_sid = public.M('firewall_new').add(
'ports,brief,protocol,address,types,addtime,domain,sid',
(port, public.xsssec(brief), protocol, source_ip, types, addtime, domain, 0)
)
if domain:
domain_sid = public.M('firewall_domain').add(
'types,domain,port,address,brief,addtime,sid,protocol,domain_total',
(types, domain, ports, address, public.xsssec(brief), addtime, self._add_sid, protocol,
domain_total)
)
public.M('firewall_new').where("id=?", (self._add_sid,)).save('sid', domain_sid)
if len(allow_ips) > 0:
self.FirewallReload()
if not get.source.strip():
log_ip = "All IPs"
else:
log_ip = get.source.strip()
strategy = ''
if types == 'accept':
strategy = "accept"
elif types == 'drop':
strategy = "drop"
public.WriteLog("system firewall", "Add port rules: Protocol:{}, Port:{}, Policy:{}, IP:{}".format(protocol, ports, strategy, log_ip))
# 如果有忽略的端口,返回忽略的端口
if ignore_list:
return public.return_message(0, 0, public.lang("Added successfully, {} The same rule exists for the port and has been skipped", ', '.join(ignore_list)))
return public.return_message(0, 0, public.lang("Added successfully!"))
# 删除入栈规则
def remove_rules(self, get):
'''
get 里面有 id protocol port type address 五个参数
protocol == ['tcp','udp']
port = 端口
types == [accept、drop] # 放行和禁止
address 地址允许放行的ip
'''
# 检测是否开启防火墙 hezhihong
if not self.get_firewall_status():
return public.return_message(-1, 0, public.lang("Please enable the firewall before proceeding."))
# {
# "id": 13,
# "protocol": "tcp",
# "ports": "3309",
# "address": "192.168.69.148",
# "types": "accept",
# "source": "192.168.69.148"
# }
# 校验参数
try:
get.validate([
Param('id').Require().Integer(),
Param('ports').Require().Number(">=", 1).Number("<=", 65535),
Param('address').Require().Ip(),
Param('source').Require().Ip(),
Param('types').Require().String('in', ['accept', 'drop']),
Param('protocol').Require().String('in', ['tcp', 'udp']),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
id = get.id
address = get.address
protocol = get.protocol
ports = get.ports
types = get.types
self._del_firewall_rules(address, protocol, ports, types)
public.M('firewall_new').where("id=?", (id, )).delete()
self.FirewallReload()
if not get.address:
log_ip = "All IPs"
else:
log_ip = get.address
if types == 'accept':
strategy = "accept"
elif types == 'drop':
strategy = "drop"
public.WriteLog("system firewall", "Delete port rules: Protocol:{}, Port:{}, Policy:{}, IP:{}".format(get.protocol, get.ports, strategy, log_ip))
return public.return_message(-1, 0, public.lang("Delete successfully!"))
# 修改入栈规则
def modify_rules(self, get, addtime=None):
'''
get 里面有 id protocol port type address 五个参数
protocol == ['tcp','udp']
port = 端口
types==['reject','accept'] # 放行和禁止
address 地址允许放行的ip如果全部就是0.0.0.0/0
'''
# 检测是否开启防火墙 hezhihong
if not self.get_firewall_status():
return public.return_message(-1, 0, public.lang("Please enable the firewall before proceeding."))
# 校验参数
try:
get.validate([
Param('id').Require().Integer(),
Param('ports').Require().Number(">=", 1).Number("<=", 65535),
Param('address').Require().Ip(),
Param('source').Require().Ip(),
Param('types').Require().String('in', ['accept', 'drop']),
Param('protocol').Require().String('in', ['tcp', 'udp']),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
id = get.id
protocol = get.protocol
ports = get.ports.strip()
types = get.types
address = get.source.strip()
brief = get.brief.strip()
domain = '' if 'domain' not in get else get.domain
domain_total = domain.split('|')[0]
sid = 0 if 'sid' not in get else get.sid
if address:
rep = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(\/\d{1,2})?$"
if not re.search(rep, get.source) and self.is_ipv6_network_segment_or_ipv6_address(get.source):
return public.returnMsg(False, public.lang("IP address you've input is illegal!"))
data = public.M('firewall_new').where('id=?', (id, )).field(
'id,address,protocol,ports,types,brief,addtime,domain'
).find()
if data and type(data) == dict:
_address = data.get("address", "")
_protocol = data.get("protocol", "")
_port = data.get("ports", "")
_type = data.get("types", "")
else:
_address = _protocol = _port = _type = ""
self._modify_firewall_rules(_address, _protocol, _port, _type, address, protocol, ports, types)
addtime = time.strftime('%Y-%m-%d %X', time.localtime())
public.M('firewall_new').where('id=?', id).update(
{
'address': address,
'protocol': protocol,
'ports': ports,
'types': types,
'brief': brief,
'addtime': addtime,
'sid': sid,
'domain': domain
}
)
if domain:
public.M('firewall_domain').where("id=?", (sid,)).save(
'sid,types,brief,protocol,domain_total',
(id, types, brief, protocol, domain_total)
)
with contextlib.suppress(Exception):
if int(ports) == 22: self.delete_service()
self.FirewallReload()
if not address:
log_ip = "All IPs"
else:
log_ip = address
if get.types == 'accept':
strategy = "accept"
elif get.types == 'drop':
strategy = "drop"
public.WriteLog("system firewall", "Modify port rules: Protocol:{}, Port:{}, Strategy:{}, IP:{}".format(get.protocol, get.ports.strip(), get.types, log_ip))
return public.return_message(-1, 0, public.lang("operate successfully"))
# firewall端口规则添加
def add_firewall_rule(self, address, protocol, ports, types):
if not address:
if protocol.find('/') != -1:
if types == "accept":
public.ExecShell(
'firewall-cmd --permanent --zone=public --add-port=' +
ports + '/tcp')
public.ExecShell(
'firewall-cmd --permanent --zone=public --add-port=' +
ports + '/udp')
else:
public.ExecShell(
'firewall-cmd --permanent --add-rich-rule="rule family=ipv4 port protocol="tcp" port="%s" drop"'
% ports
)
public.ExecShell(
'firewall-cmd --permanent --add-rich-rule="rule family=ipv4 port protocol="udp" port="%s" drop"'
% ports
)
else:
if types == "accept":
public.ExecShell(
'firewall-cmd --permanent --zone=public --add-port=' +
ports + '/' + protocol + '')
else:
public.ExecShell(
'firewall-cmd --permanent --add-rich-rule="rule family=ipv4 port protocol="%s" port="%s" drop"'
% (protocol, ports))
return True
if self.is_ipv6_network_segment_or_ipv6_address(address):
if protocol.find('/') != -1:
public.ExecShell(
'firewall-cmd --permanent '
'--add-rich-rule="rule family=ipv6 source address="%s" port protocol="tcp" port="%s" %s"'
% (address, ports, types))
public.ExecShell(
'firewall-cmd --permanent '
'--add-rich-rule="rule family=ipv6 source address="%s" port protocol="udp" port="%s" %s"'
% (address, ports, types))
else:
public.ExecShell(
'firewall-cmd --permanent '
'--add-rich-rule="rule family=ipv6 source address="%s" port protocol="%s" port="%s" %s"'
% (address, protocol, ports, types))
else:
if protocol.find('/') != -1:
public.ExecShell(
'firewall-cmd --permanent '
'--add-rich-rule="rule family=ipv4 source address="%s" port protocol="tcp" port="%s" %s"'
% (address, ports, types))
public.ExecShell(
'firewall-cmd --permanent '
'--add-rich-rule="rule family=ipv4 source address="%s" port protocol="udp" port="%s" %s"'
% (address, ports, types))
else:
public.ExecShell(
'firewall-cmd --permanent '
'--add-rich-rule="rule family=ipv4 source address="%s" port protocol="%s" port="%s" %s"'
% (address, protocol, ports, types))
return True
# firewall端口规则删除
def del_firewall_rule(self, address, protocol, ports, types):
if not address:
if protocol.find('/') != -1:
if types == "accept":
public.ExecShell(
'firewall-cmd --permanent --zone=public --remove-port='
+ ports + '/tcp')
public.ExecShell(
'firewall-cmd --permanent --zone=public --remove-port='
+ ports + '/udp')
else:
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family=ipv4 port protocol="tcp" port="%s" drop"'
% ports
)
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family=ipv4 port protocol="udp" port="%s" drop"'
% ports
)
else:
if types == "accept":
public.ExecShell(
'firewall-cmd --permanent --zone=public --remove-port='
+ ports + '/' + protocol + '')
else:
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family=ipv4 port protocol="%s" port="%s" drop"'
% (protocol, ports))
self.update_panel_data(ports)
return True
if self.is_ipv6_network_segment_or_ipv6_address(address):
if protocol.find('/') != -1:
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family="ipv6" source address="%s" port protocol="tcp" port="%s" %s"'
% (address, ports, types))
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family="ipv6" source address="%s" port protocol="udp" port="%s" %s"'
% (address, ports, types))
else:
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family="ipv6" source address="%s" port protocol="%s" port="%s" %s"'
% (address, protocol, ports, types))
else:
if protocol.find('/') != -1:
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family="ipv4" source address="%s" port protocol="tcp" port="%s" %s"'
% (address, ports, types))
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family="ipv4" source address="%s" port protocol="udp" port="%s" %s"'
% (address, ports, types))
else:
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family="ipv4" source address="%s" port protocol="%s" port="%s" %s"'
% (address, protocol, ports, types))
return True
# firewall端口规则编辑
def edit_firewall_rule(self, _address, _protocol, _port, _type, address,
protocol, ports, types):
if not _address:
if _protocol.find('/') != -1:
if _type == "accept":
public.ExecShell(
'firewall-cmd --permanent --zone=public --remove-port='
+ _port + '/tcp')
public.ExecShell(
'firewall-cmd --permanent --zone=public --remove-port='
+ _port + '/udp')
else:
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family=ipv4 port protocol="tcp" port="%s" drop"'
% ports
)
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family=ipv4 port protocol="udp" port="%s" drop"'
% ports
)
else:
if _type == "accept":
public.ExecShell(
'firewall-cmd --permanent --zone=public --remove-port='
+ _port + '/' + _protocol + '')
else:
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family=ipv4 port protocol="%s" port="%s" drop"'
% (protocol, ports))
else:
if self.is_ipv6_network_segment_or_ipv6_address(address):
if _protocol.find('/') != -1:
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family="ipv6" source address="%s" port protocol="tcp" port="%s" %s"'
% (_address, _port, _type))
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family="ipv6" source address="%s" port protocol="udp" port="%s" %s"'
% (_address, _port, _type))
else:
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family="ipv6" source address="%s" port protocol="%s" port="%s" %s"'
% (_address, _protocol, _port, _type))
else:
if _protocol.find('/') != -1:
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family="ipv4" source address="%s" port protocol="tcp" port="%s" %s"'
% (_address, _port, _type))
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family="ipv4" source address="%s" port protocol="udp" port="%s" %s"'
% (_address, _port, _type))
else:
public.ExecShell(
'firewall-cmd --permanent '
'--remove-rich-rule="rule family="ipv4" source address="%s" port protocol="%s" port="%s" %s"'
% (_address, _protocol, _port, _type))
if not address:
if protocol.find('/') != -1:
if types == "accept":
public.ExecShell(
'firewall-cmd --permanent --zone=public --add-port=' +
ports + '/tcp')
public.ExecShell(
'firewall-cmd --permanent --zone=public --add-port=' +
ports + '/udp')
else:
public.ExecShell(
'firewall-cmd --permanent '
'--add-rich-rule="rule family=ipv4 port protocol="tcp" port="%s" drop"'
% ports
)
public.ExecShell(
'firewall-cmd --permanent '
'--add-rich-rule="rule family=ipv4 port protocol="udp" port="%s" drop"'
% ports
)
else:
if types == "accept":
public.ExecShell(
'firewall-cmd --permanent --zone=public --add-port=' +
ports + '/' + protocol + '')
else:
public.ExecShell(
'firewall-cmd --permanent '
'--add-rich-rule="rule family=ipv4 port protocol="%s" port="%s" drop"'
% (protocol, ports))
else:
if self.is_ipv6_network_segment_or_ipv6_address(address):
if protocol.find('/') != -1:
public.ExecShell(
'firewall-cmd --permanent '
'--add-rich-rule="rule family=ipv6 source address="%s" port protocol="tcp" port="%s" %s"'
% (address, ports, types))
public.ExecShell(
'firewall-cmd --permanent '
'--add-rich-rule="rule family=ipv6 source address="%s" port protocol="udp" port="%s" %s"'
% (address, ports, types))
else:
public.ExecShell(
'firewall-cmd --permanent '
'--add-rich-rule="rule family=ipv6 source address="%s" port protocol="%s" port="%s" %s"'
% (address, protocol, ports, types))
else:
if protocol.find('/') != -1:
public.ExecShell(
'firewall-cmd --permanent '
'--add-rich-rule="rule family=ipv4 source address="%s" port protocol="tcp" port="%s" %s"'
% (address, ports, types))
public.ExecShell(
'firewall-cmd --permanent '
'--add-rich-rule="rule family=ipv4 source address="%s" port protocol="udp" port="%s" %s"'
% (address, ports, types))
else:
public.ExecShell(
'firewall-cmd --permanent '
'--add-rich-rule="rule family=ipv4 source address="%s" port protocol="%s" port="%s" %s"'
% (address, protocol, ports, types))
return True
# ufw 端口规则添加
def add_ufw_rule(self, address, protocol, ports, types):
rule = "allow" if types == "accept" else "deny"
if address == "":
if protocol.find('/') != -1:
# public.ExecShell('ufw ' + rule + ' ' + ports + '/tcp')
# public.ExecShell('ufw ' + rule + ' ' + ports + '/udp')
public.ExecShell('ufw ' + rule + ' ' + ports)
else:
public.ExecShell('ufw ' + rule + ' ' + ports + '/' + protocol + '')
else:
if protocol.find('/') != -1:
# public.ExecShell('ufw ' + rule + ' proto tcp from ' + address + ' to any port ' + ports + '')
# public.ExecShell('ufw ' + rule + ' proto udp from ' + address + ' to any port ' + ports + '')
public.ExecShell('ufw ' + rule + ' from ' + address + ' to any port ' + ports + '')
else:
public.ExecShell(
'ufw ' + rule + ' proto ' + protocol + ' from ' + address + ' to any port ' + ports + '')
# ufw 端口规则删除
def del_ufw_rule(self, address, protocol, ports, types):
rule = "allow" if types == "accept" else "deny"
if address == "":
if protocol.find('/') != -1:
public.ExecShell('ufw delete ' + rule + ' ' + ports + '/tcp')
public.ExecShell('ufw delete ' + rule + ' ' + ports + '/udp')
public.ExecShell('ufw delete ' + rule + ' ' + ports)
else:
public.ExecShell('ufw delete ' + rule + ' ' + ports + '/' + protocol + '')
else:
if protocol.find('/') != -1:
public.ExecShell('ufw delete ' + rule + ' proto tcp from ' + address + ' to any port ' + ports + '')
public.ExecShell('ufw delete ' + rule + ' proto udp from ' + address + ' to any port ' + ports + '')
public.ExecShell('ufw delete ' + rule + ' from ' + address + ' to any port ' + ports)
else:
public.ExecShell(
'ufw delete ' + rule + ' proto ' + protocol + ' from ' + address + ' to any port ' + ports + ''
)
self.update_panel_data(ports)
# ufw 端口规则修改
def edit_ufw_rule(self, _address, _protocol, _port, _type, address,
protocol, ports, types):
_rule = "allow" if _type == "accept" else "deny"
rules = "allow" if types == "accept" else "deny"
if _address == "":
if _protocol.find('/') != -1:
public.ExecShell('ufw delete ' + _rule + ' ' + _port + '/tcp')
public.ExecShell('ufw delete ' + _rule + ' ' + _port + '/udp')
public.ExecShell('ufw delete ' + _rule + ' ' + _port)
else:
public.ExecShell('ufw delete ' + _rule + ' ' + _port + '/' + _protocol + '')
else:
if _protocol.find('/') != -1:
public.ExecShell('ufw delete ' + _rule + ' proto tcp from ' + _address + ' to any port ' + _port + '')
public.ExecShell('ufw delete ' + _rule + ' proto udp from ' + _address + ' to any port ' + _port + '')
public.ExecShell('ufw delete ' + _rule + ' from ' + _address + ' to any port ' + _port)
else:
public.ExecShell(
'ufw delete ' + _rule + ' proto ' + _protocol + ' from ' + _address + ' to any port ' + _port + ''
)
if address == "":
if protocol.find('/') != -1:
# public.ExecShell('ufw ' + rules + ' ' + ports + '/tcp')
# public.ExecShell('ufw ' + rules + ' ' + ports + '/udp')
public.ExecShell('ufw ' + rules + ' ' + ports)
else:
public.ExecShell('ufw ' + rules + ' ' + ports + '/' + protocol + '')
else:
if protocol.find('/') != -1:
# public.ExecShell('ufw ' + rules + ' proto tcp from ' + address + ' to any port ' + ports + '')
# public.ExecShell('ufw ' + rules + ' proto udp from ' + address + ' to any port ' + ports + '')
public.ExecShell('ufw ' + rules + ' from ' + address + ' to any port ' + ports)
else:
public.ExecShell(
'ufw ' + rules + ' proto ' + protocol + ' from ' + address + ' to any port ' + ports + ''
)
# iptables端口规则添加
def add_iptables_rule(self, address, protocol, ports, types):
rule = "ACCEPT" if types == "accept" else "DROP"
if not address:
if protocol.find('/') != -1:
public.ExecShell(
'iptables -I INPUT -p tcp -m state --state NEW -m tcp --dport '
+ ports + ' -j ' + rule + '')
public.ExecShell(
'iptables -I INPUT -p tcp -m state --state NEW -m udp --dport '
+ ports + ' -j ' + rule + '')
else:
public.ExecShell(
'iptables -I INPUT -p tcp -m state --state NEW -m ' +
protocol + ' --dport ' + ports + ' -j ' + rule + '')
else:
if protocol.find('/') != -1:
public.ExecShell('iptables -I INPUT -s ' + address +
' -p tcp --dport ' + ports + ' -j ' + rule +
'')
public.ExecShell('iptables -I INPUT -s ' + address +
' -p udp --dport ' + ports + ' -j ' + rule +
'')
else:
public.ExecShell('iptables -I INPUT -s ' + address + ' -p ' +
protocol + ' --dport ' + ports + ' -j ' +
rule + '')
return True
# iptables端口规则删除
def del_iptables_rule(self, address, protocol, ports, types):
rule = "ACCEPT" if types == "accept" else "DROP"
if not address:
if protocol.find('/') != -1:
public.ExecShell(
'iptables -D INPUT -p tcp -m state --state NEW -m tcp --dport '
+ ports + ' -j ' + rule + '')
public.ExecShell(
'iptables -D INPUT -p tcp -m state --state NEW -m udp --dport '
+ ports + ' -j ' + rule + '')
else:
public.ExecShell(
'iptables -D INPUT -p tcp -m state --state NEW -m ' +
protocol + ' --dport ' + ports + ' -j ' + rule + '')
else:
if protocol.find('/') != -1:
public.ExecShell('iptables -D INPUT -s ' + address +
' -p tcp --dport ' + ports + ' -j ' + rule +
'')
public.ExecShell('iptables -D INPUT -s ' + address +
' -p udp --dport ' + ports + ' -j ' + rule +
'')
else:
public.ExecShell('iptables -D INPUT -s ' + address + ' -p ' +
protocol + ' --dport ' + ports + ' -j ' +
rule + '')
return True
# iptables端口规则编辑
def edit_iptables_rule(self, _address, _protocol, _port, _type, address,
protocol, ports, types):
rule1 = "ACCEPT" if _type == "accept" else "DROP"
rule2 = "ACCEPT" if types == "accept" else "DROP"
if not _address:
if _protocol.find('/') != -1:
public.ExecShell(
'iptables -D INPUT -p tcp -m state --state NEW -m tcp --dport '
+ _port + ' -j ' + rule1 + '')
public.ExecShell(
'iptables -D INPUT -p tcp -m state --state NEW -m udp --dport '
+ _port + ' -j ' + rule1 + '')
else:
public.ExecShell(
'iptables -D INPUT -p tcp -m state --state NEW -m ' +
_protocol + ' --dport ' + _port + ' -j ' + rule1 + '')
else:
if _protocol.find('/') != -1:
public.ExecShell('iptables -D INPUT -s ' + _address +
' -p tcp --dport ' + _port + ' -j ' + rule1 +
'')
public.ExecShell('iptables -D INPUT -s ' + _address +
' -p udp --dport ' + _port + ' -j ' + rule1 +
'')
else:
public.ExecShell('iptables -D INPUT -s ' + _address + ' -p ' +
_protocol + ' --dport ' + _port + ' -j ' +
rule1 + '')
if not address:
if protocol.find('/') != -1:
public.ExecShell(
'iptables -I INPUT -p tcp -m state --state NEW -m tcp --dport '
+ ports + ' -j ' + rule2 + '')
public.ExecShell(
'iptables -I INPUT -p tcp -m state --state NEW -m udp --dport '
+ ports + ' -j ' + rule2 + '')
else:
public.ExecShell(
'iptables -I INPUT -p tcp -m state --state NEW -m ' +
protocol + ' --dport ' + ports + ' -j ' + rule2 + '')
else:
if protocol.find('/') != -1:
public.ExecShell('iptables -I INPUT -s ' + address +
' -p tcp --dport ' + ports + ' -j ' + rule2 +
'')
public.ExecShell('iptables -I INPUT -s ' + address +
' -p udp --dport ' + ports + ' -j ' + rule2 +
'')
else:
public.ExecShell('iptables -I INPUT -s ' + address + ' -p ' +
protocol + ' --dport ' + ports + ' -j ' +
rule2 + '')
return True
# 修改面板数据
def update_panel_data(self, ports):
res = public.M('firewall').where("port=?", (ports, )).delete()
# 查询IP规则
def get_ip_rules_list(self, args):
# 分页校验参数
try:
args.validate([
Param('limit').Integer(),
Param('p').Integer(),
Param('query').String().Xss(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
p = 1
limit = 15
if 'p' in args: p = args.p
if 'limit' in args: limit = args.limit
where = '1=1'
sql = public.M('firewall_ip')
if hasattr(args, 'query'):
where = " address like '%{search}%' or brief like '%{search}%' ".format(
search=args.query)
count = sql.where(where, ()).count()
data = public.get_page(count, int(p), int(limit))
data['data'] = sql.where(where, ()).limit('{},{}'.format(
data['shift'], data['row'])).order('addtime desc').select()
data['data'] = public.return_area(data['data'], 'address')
# return data
return public.return_message(0, 0, data)
def check_a_ip(self, address):
"""
@name 检测A记录是否为域名
@author hezhihong
"""
if address:
if public.is_ipv4(address) or public.is_ipv6(address):
return address
if address[-1] == '.':
address = address[:-1]
if public.is_domain(address): return self.get_a_ip(address)
return address
def get_a_ip(self, hostname):
'''
@name 检测主机名是否有A记录
@author hezhihong
:param hostname:
:return:
'''
if not self.install_dnspython():
return public.returnMsg(False, public.lang("Please install the dnspython module first"))
import dns.resolver
# 尝试3次
a_ip = []
for i in range(3):
try:
resolver = dns.resolver.Resolver()
resolver.timeout = 1
try:
result = resolver.query(hostname, 'A')
except:
result = resolver.resolve(hostname, 'A')
for i in result.response.answer:
for j in i.items:
try:
A_ip = str(j).strip()
if A_ip[-1] == '.':
A_ip = A_ip[:-1]
except:
pass
if A_ip not in a_ip:
a_ip.append(A_ip)
except:
pass
# 去除域名
if len(a_ip) > 1:
for i2 in a_ip:
if public.is_ipv4(i2) or public.is_ipv6(i2):
continue
if public.is_domain(i2):
a_ip.remove(i2)
return a_ip
def install_dnspython(self):
"""
@name 安装dnspython模块
@author hezhihong
"""
# 检测dns解析
try:
import dns.resolver
return True
except:
if os.path.exists('/www/server/panel/pyenv'):
public.ExecShell('/www/server/panel/pyenv/bin/pip install dnspython')
else:
public.ExecShell('pip3 install dnspython')
try:
import dns.resolver
return True
except:
return False
def del_domain_ip(self, args):
"""
@name 删除域名设置
@author hezhihong
"""
if 'id' not in args or not args.id or 'sid' not in args:
return public.returnMsg(False, public.lang("Parameter error"))
domain_id = int(args.sid)
# 删除IP规则
if domain_id > 0:
# 删除域名解析
public.M('firewall_domain').where('id=?', (str(domain_id),)).delete()
# 删除端口规则
if 'ports' in args:
self.remove_rules(args)
# 删除IP规则
else:
self.remove_ip_rules(args)
# 当没有域名解析时,删除计划任务
if not public.M('firewall_domain').count():
pdata = public.M('crontab').where('name=?', '[Do not delete] System firewall domain name resolution detection task').select()
if pdata:
for i in pdata:
args = {"id": i['id']}
import crontab
crontab.crontab().DelCrontab(args)
# return public.returnMsg(True, public.lang("successfully deleted"))
return public.return_message(0, 0, public.lang("successfully deleted"))
def add_crontab(self):
"""
@name 构造日志切割任务
@author hezhihong
"""
python_path = ''
try:
python_path = public.ExecShell('which btpython')[0].strip("\n")
except:
try:
python_path = public.ExecShell('which python')[0].strip("\n")
except:
pass
if not python_path: return False
if not public.M('crontab').where('name=?', ('[Do not delete] System firewall domain name resolution detection task',)).count():
cmd = '{} {}'.format(python_path, '/www/server/panel/script/firewall_domain.py')
args = {"name": "[Do not delete] System firewall domain name resolution detection task", "type": 'minute-n', "where1": '5', "hour": '',
"minute": '', "sName": "",
"sType": 'toShell', "notice": '', "notice_channel": '', "save": '', "save_local": '1',
"backupTo": '', "sBody": cmd,
"urladdress": ''}
import crontab
res = crontab.crontab().AddCrontab(args)
if res and "id" in res.keys():
return True
return False
return True
def __check_auth(self):
try:
from pluginAuth import Plugin
plugin_obj = Plugin(False)
plugin_list = plugin_obj.get_plugin_list()
if int(plugin_list['ltd']) > time.time():
return True
return False
except:
return False
def set_domain_ip2(self, args):
"""
@name 设置域名规则
@author hezhihong
"""
pay = self.__check_auth()
if not pay: return public.returnMsg(False, public.lang("Current features are exclusive to the professional version"))
if not args.domain: return public.returnMsg(False, public.lang("Please enter domain name"))
ports = ''
if 'ports' in args and args.ports: ports = args.ports
ip = args.source
# 添加计划任务
self.add_crontab()
# 添加端口规则
# {"protocol":"tcp","ports":"819","choose":"point","address":"125.93.252.236","types":"accept","brief":"","source":"125.93.252.236"}
args.address = ip
args.source = ip
if ports:
if public.is_ipv6(ip):
return public.returnMsg(False, public.lang("The domain name is resolved to an IPv6 address and port rules are not supported."))
self.create_rules2(args)
# 添加IP规则
else:
# return 333
self.create_ip_rules(args)
return public.returnMsg(True, public.lang("Domain name {} resolution added successfully", args.domain))
def set_domain_ip(self, args):
"""
@name 设置域名规则
@author hezhihong
"""
pay = self.__check_auth()
if not pay: return public.returnMsg(False, public.lang("Current features are exclusive to the professional version"))
if not args.domain: return public.returnMsg(False, public.lang("Please enter domain name"))
ports = ''
if 'ports' in args and args.ports: ports = args.ports
protocol = '' if 'protocol' not in args else args.protocol
a_ip = self.get_a_ip(args.domain)
# return a_ip
if a_ip and len(a_ip) < 2 and public.is_domain(a_ip[0]):
# return 111
a_ip = [self.check_a_ip(a_ip[0])]
# return a_ip
if not a_ip:
return public.returnMsg(False, public.lang("The domain name resolution has not been resolved or the resolution has not taken effect. If it has been resolved, please try again after 10 minutes."))
if public.M('firewall_domain').where("domain=? and types=? and port=? and protocol=?",
(args.domain, args.types, ports, protocol,)).count():
return public.returnMsg(False, public.lang("Domain name {} already exists", args.domain))
# 添加计划任务
self.add_crontab()
# 添加端口规则
# {"protocol":"tcp","ports":"819","choose":"point","address":"125.93.252.236","types":"accept","brief":"","source":"125.93.252.236"}
for ip in a_ip:
args.address = ip
args.source = ip
if ports:
if public.is_ipv6(ip):
return public.returnMsg(False, public.lang("The domain name is resolved to an IPv6 address and port rules are not supported."))
self.create_rules(args)
# 添加IP规则
else:
# return 333
self.create_ip_rules(args)
return public.returnMsg(True, public.lang("Domain name {} resolution added successfully", args.domain))
def modify_domain_ip(self, args):
"""
@name 修改域名规则(当修改为指定域名或从指定域名修改为其他时,需要调用此方法)
@name hezhihong
"""
pay = self.__check_auth()
if not pay: return public.returnMsg(False, public.lang("Current features are exclusive to the professional version"))
# 检测是否开启防火墙 hezhihong
if not self.get_firewall_status():
return public.returnMsg(False, public.lang("Please enable the firewall before proceeding."))
modify_args = public.dict_obj()
modify_args.id = args.id
modify_args.types = args.types
modify_args.brief = args.brief
modify_args.address = args.address
modify_args.sid = 0 if 'sid' not in args else args.sid
ports = '' if 'ports' not in args else args.ports
domain = '' if 'domain' not in args else args.domain
if ports: modify_args.ports = ports
choose = '' if 'choose' not in args else args.choose
pdata = {}
if int(args.sid) > 0:
pdata = public.M('firewall_domain').where('id=?', (args.sid,)).find()
# 修改端口规则
if ports:
modify_args.protocol = args.protocol
# 已经指定域名
if int(args.sid) > 0:
# 当修改为指定域名时
if choose == 'domain':
# 当修改为不同域名时
if domain != pdata['domain']:
self.del_domain_ip(args)
self.set_domain_ip(args)
# 当修改为相同域名时
else:
pdata['protocol'] = args.protocol
pdata['types'] = args.types
pdata['brief'] = public.xsssec(args.brief)
addtime = time.strftime('%Y-%m-%d %X', time.localtime())
pdata['addtime'] = addtime
public.M('firewall_domain').where('id=?', pdata['id']).update(pdata)
self.modify_rules(args)
return public.returnMsg(True, public.lang("Successfully modified"))
else:
args.domain = ''
self.del_domain_ip(args)
self.create_rules(args)
return public.returnMsg(True, public.lang("Successfully modified"))
# 当未指定域名时
else:
# 修改为指定域名
if domain:
self.remove_rules(args)
self.set_domain_ip(args)
return public.returnMsg(True, public.lang("Successfully modified"))
# 修改IP规则
else:
if int(args.sid) > 0:
modify_args.address = pdata['address']
modify_args.domain = pdata['domain']
return self.modify_ip_rules(modify_args)
# IP地址检测
def check_ip(self, address_list):
rep = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(\/\d{1,2})?$"
for address in address_list:
address = address.strip()
address = address.split('/')[0]
if address.find('-') != -1:
addresses = address.split('-')
if addresses[0] >= addresses[1]:
return public.returnMsg(False, public.lang("IP address you've input is illegal!"))
s_ips = addresses[0].split(".")
e_ips = addresses[1].split(".")
head_s_ip = s_ips[0] + "." + s_ips[1] + "." + s_ips[2] + "."
head_e_ip = e_ips[0] + "." + e_ips[1] + "." + e_ips[2] + "."
if head_s_ip != head_e_ip:
return public.returnMsg(False, public.lang("IP address you've input is illegal!"))
if not re.search(rep, addresses[0]):
return public.returnMsg(False, public.lang("IP address you've input is illegal!"))
if not re.search(rep, addresses[1]):
return public.returnMsg(False, public.lang("IP address you've input is illegal!"))
else:
if not re.search(rep, address) and not public.is_ipv6(address):
return public.returnMsg(False, public.lang("IP address you've input is illegal!"))
# 获取IP范围
def get_ip(self, address):
result = []
arrys = address.split("-")
s_ips = arrys[0].split(".")
e_ips = arrys[1].split(".")
head_s_ip = s_ips[0] + "." + s_ips[1] + "." + s_ips[2] + "."
region = int(e_ips[-1]) - int(s_ips[-1])
for num in range(0, region + 1):
result.append(head_s_ip + str(num + int(s_ips[-1])))
return result
def handle_firewall_ip(self, address, types):
ip_list = self.get_ip(address)
if isinstance(ip_list, dict):
return
public.ExecShell(
'firewall-cmd --permanent --zone=public --new-ipset=' + address +
' --type=hash:net')
xml_path = "/etc/firewalld/ipsets/%s.xml" % address
import os
if not os.path.exists(xml_path):
return
tree = ElementTree()
tree.parse(xml_path)
root = tree.getroot()
for ip in ip_list:
entry = Element("entry")
entry.text = ip
root.append(entry)
self.format(root)
tree.write(xml_path, 'utf-8', xml_declaration=True)
# public.ExecShell('firewall-cmd --permanent --zone=public --add-rich-rule=\'rule source ipset="'+ address +'" accept\'')
public.ExecShell(
'firewall-cmd --permanent --zone=public --add-rich-rule=\'rule source ipset="'
+ address + '" ' + types + '\'')
def handle_ufw_ip(self, address, types):
ip_list = self.get_ip(address)
if isinstance(ip_list, dict):
return
public.ExecShell('ipset create ' + address + ' hash:net')
for ip in ip_list:
public.ExecShell('ipset add ' + address + ' ' + ip)
public.ExecShell('iptables -I INPUT -m set --match-set ' + address +
' src -j ' + types.upper())
# 检查IP地址是否在范围内
def ip_in_range(self, ip, ip_range):
try:
import ipaddress
# 2024/1/3 下午 7:59 兼容192.168.0.0/24这种形式
if ip_range.find('/') != -1:
return ipaddress.ip_address(ip) in ipaddress.ip_network(ip_range)
ip_range = ip_range.split('-')
if len(ip_range) == 1: # 如果只有一个IP地址
return ipaddress.ip_address(ip) == ipaddress.ip_address(ip_range[0])
else: # 如果是一个IP范围
start_ip, end_ip = ip_range
ip_networks = ipaddress.summarize_address_range(ipaddress.ip_address(start_ip),
ipaddress.ip_address(end_ip))
return any(ipaddress.ip_address(ip) in net for net in ip_networks)
except:
return True
# 添加IP规则
def create_ip_rules(self, get):
# {
# "protocol": "tcp",
# "ports": "3309",
# "choose": "point",
# "address": "192.168.69.148",
# "types": "accept",
# "brief": "",
# "source": "192.168.69.148"
# }
# 校验参数
try:
get.validate([
Param('ports').Number(">=", 1).Number("<=", 65535),
Param('source').Ip(),
Param('address').Ip(),
Param('types').String('in', ['accept', 'drop']),
Param('protocol').String('in', ['tcp', 'udp']),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
from flask import request
user_ip = request.remote_addr
_address = get.address.strip()
original_types = get.types
brief = get.brief
domain_total = '' if ('domain' not in get or not get.domain) else get.domain.strip()
domain = '' if ('domain' not in get or not get.domain) else get.domain.strip() + '|' + _address
address_list = _address.split(',')
result = self.check_ip(address_list)
if result:
return public.return_message(-1, 0, result['msg'])
# 先处理用户的IP地址
old_login_ip = public.M('firewall_ip').where("brief=?", ("IP that allows users to log in",)).field('id, address').select()
for ip_range in address_list:
if self.ip_in_range(user_ip, ip_range.strip()):
if old_login_ip and old_login_ip[0][' address'] != user_ip:
address = old_login_ip[0][' address']
public.M('firewall_ip').where("address=?", (address,)).delete()
self.update_panel_data(address) # 删除面板自带防火墙的表数据
self.add_rule(user_ip, "accept", "IP that allows users to log in", domain, domain_total)
self.add_rule(ip_range.strip(), original_types, brief, domain, domain_total)
self.FirewallReload()
public.WriteLog("system firewall", "Add IP rules: IP: {}, policy: {}".format(_address, original_types))
return public.return_message(0, 0, public.lang("Added successfully!"))
# 添加单个IP规则
def add_rule(self, address, types, brief, domain, domain_total):
if public.M('firewall_ip').where("address=? and types=? and domain=?", (address, types, domain)).count() > 0:
return
if self.__isUfw:
_rule = "allow" if types == "accept" else "deny"
if address.find('-') != -1:
self.handle_ufw_ip(address, types)
else:
is_debian = True if public.get_os_version().lower().find("debian") != -1 else False
if not is_debian:
if _rule == "allow":
if public.is_ipv6(address):
public.ExecShell('ufw ' + _rule + ' from ' + address + ' to any')
else:
public.ExecShell('ufw insert 1 ' + _rule + ' from ' + address + ' to any')
else:
public.ExecShell('ufw ' + _rule + ' from ' + address + ' to any')
else:
public.ExecShell('iptables -I INPUT -s ' + address + ' -j ' + types.upper())
else:
if self.__isFirewalld:
if address.find('-') != -1:
self.handle_firewall_ip(address, types)
else:
if types == "accept":
public.ExecShell('firewall-cmd --permanent --add-source=' + address + ' --zone=trusted')
else:
if public.is_ipv6(address):
public.ExecShell(
'firewall-cmd --permanent --add-rich-rule=\'rule family=ipv6 source address="' + address + '" ' + types + '\'')
else:
public.ExecShell(
'firewall-cmd --permanent --add-rich-rule=\'rule family=ipv4 source address="' + address + '" ' + types + '\'')
else:
if address.find('-') != -1:
self.handle_ufw_ip(address, types)
else:
public.ExecShell('iptables -I INPUT -s ' + address + ' -j ' + types.upper())
addtime = time.strftime('%Y-%m-%d %X', time.localtime())
self._add_sid = public.M('firewall_ip').add('address,types,brief,addtime,domain,sid',
(address, types, public.xsssec(brief), addtime, domain, 0,))
if domain:
domain_sid = public.M('firewall_domain').add(
'types,domain,port,address,brief,addtime,sid,protocol,domain_total', (
types, domain, '', address, public.xsssec(brief), addtime, self._add_sid, '', domain_total))
public.M('firewall_ip').where("id=?", (self._add_sid,)).save('sid', domain_sid)
# 删除All IPs规则
def remove_all_ip_rules(self, get):
ip_list = public.M('firewall_ip').select()
for ip in ip_list:
id = ip["id"]
address = ip["address"]
types = ip["types"]
if self.__isUfw:
_rule = "allow" if types == "accept" else "deny"
if address.find('-') != -1:
public.ExecShell('iptables -D INPUT -m set --match-set ' +
address + ' src -j ' + types.upper())
public.ExecShell('ipset destroy ' + address)
else:
is_debian = True if public.get_os_version().lower().find(
"debian") != -1 else False
if not is_debian:
public.ExecShell('ufw delete ' + _rule + ' from ' + address + ' to any')
else:
public.ExecShell("iptables -D INPUT -s " + address +
" -j " + types.upper())
else:
if self.__isFirewalld:
if address.find('-') != -1:
public.ExecShell(
'firewall-cmd --permanent --zone=public --remove-rich-rule=\'rule source ipset="'
+ address + '" ' + types + '\'')
public.ExecShell(
'firewall-cmd --permanent --zone=public --delete-ipset='
+ address)
else:
public.ExecShell(
'firewall-cmd --permanent --remove-source=' +
address + ' --zone=trusted')
if public.is_ipv6(address):
public.ExecShell(
'firewall-cmd --permanent --remove-rich-rule=\'rule family=ipv6 source address="'
+ address + '" ' + types + '\'')
else:
public.ExecShell(
'firewall-cmd --permanent --remove-rich-rule=\'rule family=ipv4 source address="'
+ address + '" ' + types + '\'')
else:
if address.find('-') != -1:
public.ExecShell(
'iptables -D INPUT -m set --match-set ' + address +
' src -j ' + types.upper())
public.ExecShell('ipset destroy ' + address)
else:
public.ExecShell('iptables -D INPUT -s ' + address +
' -j ' + types.upper())
public.M('firewall_ip').where("id=?", (id, )).delete()
self.update_panel_data(address) # 删除面板自带防火墙的表数据
self.FirewallReload()
return public.returnMsg(True, public.lang("All IP rules have been removed."))
# 删除IP规则
def remove_ip_rules(self, get):
# {"id": 4, "types": "accept", "address": "192.168.168.162", "brief": "123", "addtime": "2024-04-12 17:19:04",
# "sid": 0, "domain": "", "area": {"info": "Intranet"}}
# 校验参数
try:
get.validate([
Param('id').Require().Integer(),
# Param('ports').Require(),
Param('address').Require().Ip(),
Param('types').Require().String('in', ['accept', 'drop']),
# Param('area').Require().Dict().Xss(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
id = get.id
address = get.address
types = get.types
if self.__isUfw:
_rule = "allow" if types == "accept" else "deny"
if address.find('-') != -1:
public.ExecShell('iptables -D INPUT -m set --match-set ' +
address + ' src -j ' + types.upper())
public.ExecShell('ipset destroy ' + address)
else:
is_debian = True if public.get_os_version().lower().find(
"debian") != -1 else False
if not is_debian:
public.ExecShell('ufw delete ' + _rule + ' from ' + address + ' to any')
else:
public.ExecShell('ufw delete ' + _rule + ' from ' + address + ' to any')
public.ExecShell("iptables -D INPUT -s " + address +
" -j " + types.upper())
else:
if self.__isFirewalld:
if address.find('-') != -1:
public.ExecShell(
'firewall-cmd --permanent --zone=public --remove-rich-rule=\'rule source ipset="'
+ address + '" ' + types + '\'')
public.ExecShell(
'firewall-cmd --permanent --zone=public --delete-ipset='
+ address)
else:
public.ExecShell(
'firewall-cmd --permanent --remove-source=' + address +
' --zone=trusted')
if public.is_ipv6(address):
public.ExecShell(
'firewall-cmd --permanent --remove-rich-rule=\'rule family=ipv6 source address="'
+ address + '" ' + types + '\'')
else:
public.ExecShell(
'firewall-cmd --permanent --remove-rich-rule=\'rule family=ipv4 source address="'
+ address + '" ' + types + '\'')
else:
if address.find('-') != -1:
public.ExecShell('iptables -D INPUT -m set --match-set ' +
address + ' src -j ' + types.upper())
public.ExecShell('ipset destroy ' + address)
else:
public.ExecShell('iptables -D INPUT -s ' + address +
' -j ' + types.upper())
public.M('firewall_ip').where("id=?", (id, )).delete()
self.update_panel_data(address) # 删除面板自带防火墙的表数据
self.FirewallReload()
strategy= ''
if get.types == 'accept':
strategy = "accept"
elif get.types == 'drop':
strategy = "drop"
public.WriteLog("system firewall", "Delete IP rules: IP:{}, policy:{}".format(get.address, strategy))
return public.return_message(0, 0, public.lang("Delete successfully!"))
# 修改IP规则
def modify_ip_rules(self, get):
# 校验参数
try:
get.validate([
Param('id').Require().Integer(),
# Param('ports').Require(),
# Param('source').Require().Ip(),
Param('address').Require().Ip(),
Param('types').Require().String('in', ['accept', 'drop']),
# Param('protocol').Require().String('in', ['tcp', 'udp']),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
id = get.id
address = get.address.strip()
types = get.types
brief = get.brief
result = self.check_ip([address])
sid = 0 if 'sid' not in get else get.sid
domain = '' if 'domain' not in get else get.domain
domain_total = domain.split('|')[0]
# return 22
if result:
return public.return_message(0, 0, result)
data = public.M('firewall_ip').where(
'id=?', (id, )).field('id,address,types,brief,addtime').find()
_address = data.get("address", "")
_type = data.get("types", "")
if self.__isUfw:
rule1 = "allow" if _type == "accept" else "deny"
if _address.find('-') != -1:
public.ExecShell('iptables -D INPUT -m set --match-set ' +
_address + ' src -j ' + _type.upper())
public.ExecShell('ipset destroy ' + _address)
else:
is_debian = True if public.get_os_version().lower().find(
"debian") != -1 else False
if not is_debian:
public.ExecShell('ufw delete ' + rule1 + ' from ' + address + ' to any')
else:
cmd = "iptables -D INPUT -s " + address + " -j " + _type.upper(
)
public.ExecShell(cmd)
rule2 = "allow" if types == "accept" else "deny"
if address.find('-') != -1:
self.handle_ufw_ip(address, types)
else:
is_debian = True if public.get_os_version().lower().find(
"debian") != -1 else False
if not is_debian:
if rule2 == "allow":
if public.is_ipv6(address):
public.ExecShell('ufw ' + rule2 + ' from ' + address + ' to any')
else:
public.ExecShell('ufw insert 1 ' + rule2 + ' from ' + address + ' to any')
else:
public.ExecShell('ufw ' + rule2 + ' from ' + address + ' to any')
else:
public.ExecShell('iptables -I INPUT -s ' + address +
' -j ' + types.upper())
else:
if self.__isFirewalld:
if _address.find('-') != -1:
public.ExecShell(
'firewall-cmd --permanent --zone=public --remove-rich-rule=\'rule source ipset="'
+ _address + '" ' + _type + '\'')
public.ExecShell(
'firewall-cmd --permanent --zone=public --delete-ipset='
+ _address)
else:
public.ExecShell(
'firewall-cmd --permanent --remove-source=' +
_address + ' --zone=trusted')
if public.is_ipv6(address):
public.ExecShell(
'firewall-cmd --permanent --remove-rich-rule=\'rule family=ipv6 source address="'
+ _address + '" ' + _type + '\'')
else:
public.ExecShell(
'firewall-cmd --permanent --remove-rich-rule=\'rule family=ipv4 source address="'
+ _address + '" ' + _type + '\'')
if address.find('-') != -1:
brief = address
self.handle_firewall_ip(address, types)
else:
if types == "accept":
public.ExecShell(
'firewall-cmd --permanent --add-source=' +
address + ' --zone=trusted')
else:
if public.is_ipv6(address):
public.ExecShell(
'firewall-cmd --permanent --add-rich-rule=\'rule family=ipv6 source address="'
+ address + '" ' + types + '\'')
else:
public.ExecShell(
'firewall-cmd --permanent --add-rich-rule=\'rule family=ipv4 source address="'
+ address + '" ' + types + '\'')
else:
if _address.find('-') != -1:
public.ExecShell('iptables -D INPUT -m set --match-set ' +
_address + ' src -j ' + types.upper())
public.ExecShell('ipset destroy ' + _address)
else:
public.ExecShell('iptables -D INPUT -s ' + _address +
' -j ' + _type.upper())
if address.find('-') != -1:
self.handle_ufw_ip(address, types)
else:
public.ExecShell('iptables -I INPUT -s ' + address +
' -j ' + types.upper())
addtime = time.strftime('%Y-%m-%d %X', time.localtime())
public.M('firewall_ip').where('id=?', id).update(
{'address': address, 'types': types, 'brief': brief, 'addtime': addtime, 'sid': sid, 'domain': domain})
if domain:
public.M('firewall_domain').where('id=?', (sid,)).save('sid,types,brief,domain_total',
(id, types, brief, domain_total))
self.FirewallReload()
old_strategy = ''
if types == 'accept':
old_strategy = "accept"
elif types == 'drop':
old_strategy = "drop"
if get.types == 'accept':
strategy = "accept"
elif get.types == 'drop':
strategy = "drop"
public.WriteLog("system firewall", "Modify rules, IP:{}, Strategy:{} -> IP:{}, Strategy:{}".format(_address, old_strategy, get.address.strip(), strategy))
return public.return_message(0, 0, public.lang("Successful operation"))
# 查看端口转发状态
def trans_status(self):
content = dict()
with open(self._trans_status, 'r') as fr:
content = json.loads(fr.read())
if content["status"] == "open":
return True
self.open_forward()
content["status"] = "open"
with open(self._trans_status, 'w') as fw:
fw.write(json.dumps(content))
return True
# 查询端口转发
def get_forward_list(self, args):
# 分页校验参数
try:
args.validate([
Param('limit').Integer(),
Param('p').Integer(),
Param('query').String().Xss(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
result = self.trans_status()
p = 1
limit = 15
if 'p' in args: p = args.p
if 'limit' in args: limit = args.limit
where = '1=1'
sql = public.M('firewall_trans')
if hasattr(args, 'query'):
where = " start_port like '%{search}%'".format(search=args.query)
count = sql.where(where, ()).count()
data = public.get_page(count, int(p), int(limit))
data['data'] = sql.where(where, ()).limit('{},{}'.format(
data['shift'], data['row'])).order('addtime desc').select()
return public.return_message(0, 0, data)
# 添加端口转发
def create_forward(self, get):
# {"protocol":"tcp","s_ports":"1234","d_address":"192.168.198.199","d_ports":"2345"}
# 校验参数
try:
get.validate([
# Param('id').Require().Integer().Xss(),
Param('s_ports').Require().Number(">=", 1).Number("<=", 65535),
Param('d_ports').Require().Number(">=", 1).Number("<=", 65535),
Param('d_address').Require().Ip(),
Param('protocol').Require().String('in', ['tcp', 'udp']),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
s_port = get.s_ports.strip() # 起始端口
d_port = get.d_ports.strip() # 目的端口
d_ip = get.d_address.strip() # 目的ip
protocol = get.protocol
rep1 = r"^\d{1,5}(:\d{1,5})?$"
if not re.search(rep1, s_port):
return public.return_message(-1, 0, public.lang("Port range is incorrect!"))
if not re.search(rep1, d_port):
return public.return_message(-1, 0, public.lang("Port range is incorrect!"))
rep = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(\/\d{1,2})?$"
if d_ip:
if not re.search(rep, get.d_address) and not public.is_ipv6(get.d_address):
return public.return_message(-1, 0, public.lang("IP address you've input is illegal!"))
if d_ip in ["127.0.0.1", "localhost"]:
d_ip = ""
if public.M('firewall_trans').where("start_port=?", (s_port, )).count() > 0:
return public.return_message(-1, 0, public.lang("This port already exists, please do not add it again!"))
if self.__isUfw:
content = self.ufw_handle_add(s_port, d_port, d_ip, protocol)
self.save_profile(self._ufw_before, content)
else:
if self.__isFirewalld:
self.firewall_handle_add(s_port, d_port, d_ip, protocol)
else:
self.iptables_handle_add(s_port, d_port, d_ip, protocol)
addtime = time.strftime('%Y-%m-%d %X', time.localtime())
public.M('firewall_trans').add(
'start_port, ended_ip, ended_port, protocol, addtime',
(s_port, d_ip, d_port, protocol, addtime))
self.FirewallReload()
public.WriteLog("system firewall", "Add port forwarding rules: Start port: {}, Destination port: {}, Destination IP: {}".format(s_port, d_port, d_ip))
return public.return_message(0, 0, public.lang("Added successfully!"))
# 删除端口转发
def remove_forward(self, get):
# 校验参数
try:
get.validate([
Param('id').Require().Integer(),
Param('s_ports').Require().Number(">=", 1).Number("<=", 65535),
Param('d_ports').Require().Number(">=", 1).Number("<=", 65535),
# Param('d_address').Require().Ip(),
Param('protocol').Require().String('in', ['tcp', 'udp']),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
id = get.id
s_port = get.s_port
d_port = get.d_port
d_ip = get.d_ip
protocol = get.protocol
if self.__isUfw:
content = self.ufw_handle_del(s_port, d_port, d_ip, protocol)
self.save_profile(self._ufw_before, content)
else:
if self.__isFirewalld:
self.firewall_handle_del(s_port, d_port, d_ip, protocol)
else:
self.iptables_handle_del(s_port, d_port, d_ip, protocol)
public.M('firewall_trans').where("id=?", (id, )).delete()
self.FirewallReload()
public.WriteLog("system firewall", "Delete port forwarding rules: Start port: {}, Destination port: {}, Destination IP: {}".format(s_port, d_port, d_ip))
return public.return_message(0, 0, public.lang("Delete successfully!"))
# 修改端口转发
def modify_forward(self, get):
# 校验参数
try:
get.validate([
Param('id').Require().Integer(),
Param('s_ports').Require().Number(">=", 1).Number("<=", 65535),
Param('d_ports').Require().Number(">=", 1).Number("<=", 65535),
Param('d_address').Require().Ip(),
Param('protocol').Require().String('in', ['tcp', 'udp']),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
id = get.id
s_port = get.s_ports.strip()
d_port = get.d_ports.strip()
d_ip = get.d_address.strip()
pool = get.protocol
rep1 = r"^\d{1,5}(:\d{1,5})?$"
if not re.search(rep1, s_port):
return public.return_message(-1, 0, public.lang("Port range is incorrect!"))
if not re.search(rep1, d_port):
return public.return_message(-1, 0, public.lang("Port range is incorrect!"))
data = public.M('firewall_trans').where('id=?', (id, )).field(
'id,start_port,ended_ip,ended_port,protocol,addtime').find()
start_port = data.get("start_port", "")
ended_ip = data.get("ended_ip", "")
ended_port = data.get("ended_port", "")
protocol = data.get("protocol", "")
if d_ip:
rep = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(\/\d{1,2})?$"
if not re.search(rep, get.d_address) and not public.is_ipv6(get.d_address):
return public.return_message(-1, 0, public.lang("IP address you've input is illegal!"))
if d_ip in ["127.0.0.1", "localhost"]:
d_ip = ""
if self.__isUfw:
content = self.ufw_handle_update(start_port, ended_ip, ended_port,
protocol, s_port, d_ip, d_port,
pool)
self.save_profile(self._ufw_before, content)
else:
if self.__isFirewalld:
self.firewall_handle_update(start_port, ended_ip, ended_port,
protocol, s_port, d_ip, d_port,
pool)
else:
self.iptables_handle_update(start_port, ended_ip, ended_port,
protocol, s_port, d_ip, d_port,
pool)
addtime = time.strftime('%Y-%m-%d %X', time.localtime())
public.M('firewall_trans').where('id=?', id).update(
{'start_port': s_port, "ended_ip": d_ip, "ended_port": d_port, "protocol": pool})
self.FirewallReload()
public.WriteLog("system firewall", "Modify port forwarding rules: Start port: {}, Destination port: {}, Destination IP: {} -> Start port: {}, Destination port: {}, Destination IP: {}".format(start_port, ended_port, ended_ip, s_port, d_port, d_ip))
return public.return_message(0, 0, public.lang("Successful operation"))
# 处理ufw的端口转发添加
def ufw_handle_add(self, s_port, d_port, d_ip, protocol):
content = self.get_profile(self._ufw_before)
if content.find('*nat') == -1:
content = "*nat\n" + ":PREROUTING ACCEPT [0:0]\n" + ":POSTROUTING ACCEPT [0:0]\n" + "COMMIT\n" + content
array = content.split('\n')
result = array.index(":POSTROUTING ACCEPT [0:0]")
if d_ip == "":
if protocol.find('/') != -1:
_string = "-A PREROUTING -p tcp --dport {1} -j REDIRECT --to-port {2}\n".format(
s_port, d_port)
_string = _string + "-A PREROUTING -p udp --dport {1} -j REDIRECT --to-port {2}".format(
s_port, d_port)
else:
_string = "-A PREROUTING -p {0} --dport {1} -j REDIRECT --to-port {2}".format(
protocol, s_port, d_port)
else:
_string = "-A PREROUTING -p {0} --dport {1} -j DNAT --to-destination {2}:{3}\n".format(
protocol, s_port, d_ip,
d_port) + "-A POSTROUTING -d {0} -j MASQUERADE".format(d_ip)
array.insert(result + 1, _string)
return '\n'.join(array)
# 处理ufw的端口转发删除
def ufw_handle_del(self, s_port, d_port, d_ip, protocol):
content = self.get_profile(self._ufw_before)
if d_ip == "":
_string = "-A PREROUTING -p {0} --dport {1} -j REDIRECT --to-port {2}\n".format(
protocol, s_port, d_port)
else:
_string = "-A PREROUTING -p {0} --dport {1} -j DNAT --to-destination {2}:{3}\n".format(
protocol, s_port, d_ip,
d_port) + "-A POSTROUTING -d {0} -j MASQUERADE\n".format(d_ip)
content = content.replace(_string, "")
return content
# 处理ufw的端口转发修改
def ufw_handle_update(self, start_port, ended_ip, ended_port, protocol, s_port, d_ip, d_port, pool):
content = self.get_profile(self._ufw_before)
if ended_ip == "":
s_string = "-A PREROUTING -p {0} --dport {1} -j REDIRECT --to-port {2}\n".format(
protocol, start_port, ended_port)
else:
s_string = "-A PREROUTING -p {0} --dport {1} -j DNAT --to-destination {2}:{3}\n".format(
protocol, start_port, ended_ip, ended_port
) + "-A POSTROUTING -d {0} -j MASQUERADE\n".format(ended_ip)
if d_ip == "":
d_string = "-A PREROUTING -p {0} --dport {1} -j REDIRECT --to-port {2}\n".format(
pool, s_port, d_port)
else:
d_string = "-A PREROUTING -p {0} --dport {1} -j DNAT --to-destination {2}:{3}\n".format(
pool, s_port, d_ip,
d_port) + "-A POSTROUTING -d {0} -j MASQUERADE\n".format(d_ip)
content = content.replace(s_string, d_string)
return content
# 处理firewall的端口转发添加
def firewall_handle_add(self, s_port, d_port, d_ip, protocol):
if protocol.find('/') != -1:
public.ExecShell(
"firewall-cmd --permanent --zone=public --add-forward-port=port="
+ s_port + ":proto=tcp:toaddr=" + d_ip + ":toport=" + d_port +
"")
public.ExecShell(
"firewall-cmd --permanent --zone=public --add-forward-port=port="
+ s_port + ":proto=udp:toaddr=" + d_ip + ":toport=" + d_port +
"")
else:
cmd = "firewall-cmd --permanent --zone=public --add-forward-port=port=" + s_port + ":proto=" + protocol + ":toaddr=" + d_ip + ":toport=" + d_port + ""
public.ExecShell(cmd)
# 处理firewall的端口转发删除
def firewall_handle_del(self, s_port, d_port, d_ip, protocol):
if protocol.find('/') != -1:
public.ExecShell(
"firewall-cmd --permanent --zone=public --remove-forward-port=port="
+ s_port + ":proto=tcp:toaddr=" + d_ip + ":toport=" + d_port +
"")
public.ExecShell(
"firewall-cmd --permanent --zone=public --remove-forward-port=port="
+ s_port + ":proto=udp:toaddr=" + d_ip + ":toport=" + d_port +
"")
else:
public.ExecShell(
"firewall-cmd --permanent --zone=public --remove-forward-port=port="
+ s_port + ":proto=" + protocol + ":toaddr=" + d_ip +
":toport=" + d_port + "")
# 处理firewall的端口转发修改
def firewall_handle_update(self, start_port, ended_ip, ended_port,
protocol, s_port, d_ip, d_port, pool):
if protocol.find('/') != -1:
public.ExecShell(
"firewall-cmd --permanent --zone=public --remove-forward-port=port="
+ start_port + ":proto=tcp:toaddr=" + ended_ip + ":toport=" +
ended_port + "")
public.ExecShell(
"firewall-cmd --permanent --zone=public --remove-forward-port=port="
+ start_port + ":proto=udp:toaddr=" + ended_ip + ":toport=" +
ended_port + "")
else:
public.ExecShell(
"firewall-cmd --permanent --zone=public --remove-forward-port=port="
+ start_port + ":proto=" + protocol + ":toaddr=" + ended_ip +
":toport=" + ended_port + "")
if pool.find('/') != -1:
public.ExecShell(
"firewall-cmd --permanent --zone=public --add-forward-port=port="
+ s_port + ":proto=tcp:toaddr=" + d_ip + ":toport=" + d_port +
"")
public.ExecShell(
"firewall-cmd --permanent --zone=public --add-forward-port=port="
+ s_port + ":proto=udp:toaddr=" + d_ip + ":toport=" + d_port +
"")
else:
public.ExecShell(
"firewall-cmd --permanent --zone=public --add-forward-port=port="
+ s_port + ":proto=" + pool + ":toaddr=" + d_ip + ":toport=" +
d_port + "")
# 处理iptables的端口转发添加
def iptables_handle_add(self, s_port, d_port, d_ip, protocol):
if d_ip == "":
if protocol.find('/') != -1:
public.ExecShell(
"iptables -t nat -A PREROUTING -p tcp --dport " + s_port +
" -j REDIRECT --to-port " + d_port + '')
public.ExecShell(
"iptables -t nat -A PREROUTING -p udp --dport " + s_port +
" -j REDIRECT --to-port " + d_port + '')
else:
public.ExecShell("iptables -t nat -A PREROUTING -p " +
protocol + " --dport " + s_port +
" -j REDIRECT --to-port " + d_port + '')
else:
if protocol.find('/') != -1:
public.ExecShell(
"iptables -t nat -A PREROUTING -p tcp --dport " + s_port +
" -j DNAT --to-destination " + d_ip + ":" + d_port + '')
public.ExecShell(
"iptables -t nat -A PREROUTING -p udp --dport " + s_port +
" -j DNAT --to-destination " + d_ip + ":" + d_port + '')
public.ExecShell(
"iptables -t nat -A POSTROUTING -j MASQUERADE")
else:
public.ExecShell(
"iptables -t nat -A PREROUTING -p " + protocol + " --dport " + s_port + " -j DNAT --to-destination " + d_ip + ":" + d_port + '')
public.ExecShell("iptables -t nat -A POSTROUTING -j MASQUERADE")
return True
# 处理iptables的端口转发删除
def iptables_handle_del(self, s_port, d_port, d_ip, protocol):
if d_ip == "":
if protocol.find('/') != -1:
public.ExecShell(
"iptables -t nat -D PREROUTING -p tcp --dport " + s_port +
" -j REDIRECT --to-port " + d_port + '')
public.ExecShell(
"iptables -t nat -D PREROUTING -p udp --dport " + s_port +
" -j REDIRECT --to-port " + d_port + '')
else:
public.ExecShell("iptables -t nat -D PREROUTING -p " +
protocol + " --dport " + s_port +
" -j REDIRECT --to-port " + d_port + '')
else:
if protocol.find('/') != -1:
public.ExecShell(
"iptables -t nat -D PREROUTING -p tcp --dport " + s_port + " -j DNAT --to-destination " + d_ip + ":" + d_port + '')
public.ExecShell(
"iptables -t nat -D PREROUTING -p udp --dport " + s_port + " -j DNAT --to-destination " + d_ip + ":" + d_port + '')
public.ExecShell("iptables -t nat -D POSTROUTING -j MASQUERADE")
else:
public.ExecShell(
"iptables -t nat -D PREROUTING -p " + protocol + " --dport " + s_port + " -j DNAT --to-destination " + d_ip + ":" + d_port + '')
public.ExecShell("iptables -t nat -D POSTROUTING -j MASQUERADE")
return True
# 处理iptables的端口转发删除
def iptables_handle_update(self, start_port, ended_ip, ended_port,
protocol, s_port, d_ip, d_port, pool):
if ended_ip == "":
if protocol.find('/') != -1:
public.ExecShell(
"iptables -t nat -D PREROUTING -p tcp --dport " + s_port +
" -j REDIRECT --to-port " + d_port + '')
public.ExecShell(
"iptables -t nat -D PREROUTING -p udp --dport " + s_port +
" -j REDIRECT --to-port " + d_port + '')
else:
public.ExecShell("iptables -t nat -D PREROUTING -p " +
protocol + " --dport " + s_port +
" -j REDIRECT --to-port " + d_port + '')
else:
if protocol.find('/') != -1:
public.ExecShell(
"iptables -t nat -D PREROUTING -p tcp --dport " + s_port + " -j DNAT --to-destination " + d_ip + ":" + d_port + '')
public.ExecShell(
"iptables -t nat -D PREROUTING -p udp --dport " + s_port + " -j DNAT --to-destination " + d_ip + ":" + d_port + '')
public.ExecShell("iptables -t nat -D POSTROUTING -j MASQUERADE")
else:
public.ExecShell(
"iptables -t nat -D PREROUTING -p " + protocol + " --dport " + s_port + " -j DNAT --to-destination " + d_ip + ":" + d_port + '')
public.ExecShell("iptables -t nat -D POSTROUTING -j MASQUERADE")
if d_ip == "":
if pool.find('/') != -1:
public.ExecShell(
"iptables -t nat -A PREROUTING -p tcp --dport " + s_port +
" -j REDIRECT --to-port " + d_port + '')
public.ExecShell(
"iptables -t nat -A PREROUTING -p udp --dport " + s_port +
" -j REDIRECT --to-port " + d_port + '')
else:
public.ExecShell("iptables -t nat -A PREROUTING -p " +
protocol + " --dport " + s_port +
" -j REDIRECT --to-port " + d_port + '')
else:
if pool.find('/') != -1:
public.ExecShell(
"iptables -t nat -A PREROUTING -p tcp --dport " + s_port + " -j DNAT --to-destination " + d_ip + ":" + d_port + '')
public.ExecShell(
"iptables -t nat -A PREROUTING -p udp --dport " + s_port + " -j DNAT --to-destination " + d_ip + ":" + d_port + '')
public.ExecShell("iptables -t nat -A POSTROUTING -j MASQUERADE")
else:
public.ExecShell(
"iptables -t nat -A PREROUTING -p " + protocol + " --dport " + s_port + " -j DNAT --to-destination " + d_ip + ":" + d_port + '')
public.ExecShell("iptables -t nat -A POSTROUTING -j MASQUERADE")
return True
# 开启端口转发
def open_forward(self):
if self.__isUfw:
content1 = self.get_profile(self._ufw_default)
content2 = self.get_profile(self._ufw_sysctl)
content1 = content1.replace('DEFAULT_FORWARD_POLICY="DROP"',
'DEFAULT_FORWARD_POLICY="ACCEPT"')
content2 = content2.replace('#net/ipv4/ip_forward=1',
'net/ipv4/ip_forward=1')
self.save_profile(self._ufw_default, content1)
self.save_profile(self._ufw_sysctl, content2)
self.FirewallReload()
return True
if self.__isFirewalld:
public.ExecShell(
'echo "\nnet.ipv4.ip_forward=1" >> /etc/sysctl.conf')
public.ExecShell('firewall-cmd --add-masquerade --permanent')
self.FirewallReload()
else:
public.ExecShell(
'echo "\nnet.ipv4.ip_forward=1" >> /etc/sysctl.conf')
public.ExecShell('sysctl -p /etc/sysctl.conf')
self.FirewallReload()
return True
# 开启或关闭端口转发
def open_close_forward(self, get):
if not get.status in ["open", "close"]:
return public.returnMsg(False, public.lang("Unknown control command!"))
if self.__isUfw:
content1 = self.get_profile(self._ufw_default)
content2 = self.get_profile(self._ufw_sysctl)
if get.status == 'open':
content1 = content1.replace('DEFAULT_FORWARD_POLICY="DROP"',
'DEFAULT_FORWARD_POLICY="ACCEPT"')
content2 = content2.replace('#net/ipv4/ip_forward=1',
'net/ipv4/ip_forward=1')
else:
content1 = content1.replace('DEFAULT_FORWARD_POLICY="ACCEPT"',
'DEFAULT_FORWARD_POLICY="DROP"')
content2 = content2.replace('net/ipv4/ip_forward=1',
'#net/ipv4/ip_forward=1')
self.save_profile(self._ufw_default, content1)
self.save_profile(self._ufw_sysctl, content2)
self.FirewallReload()
return public.returnMsg(True, 'Enable' if get.status == "open" else "Disable")
if self.__isFirewalld:
if get.status == 'open':
public.ExecShell('firewall-cmd --add-masquerade --permanent')
else:
public.ExecShell(
'firewall-cmd --remove-masquerade --permanent')
self.FirewallReload()
else:
public.ExecShell(
'echo "net.ipv4.ip_forward=1" >> /etc/sysctl.conf')
public.ExecShell('sysctl -p /etc/sysctl.conf')
return public.returnMsg(True, public.lang("Turn off port forwarding"))
def get_host_ip(self):
"""
查询本机ip地址
:return:
"""
if self._host_ip is None:
try:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
self._host_ip = s.getsockname()[0]
finally:
s.close()
return self._host_ip
def load_white_list(self):
try:
if not self._white_list:
ip_data = self.get_profile(self._white_list_file)
white_list_ips = json.loads(ip_data)
white_list = []
for ip_obj in white_list_ips:
white_list += ip_obj["ips"]
self._white_list = white_list
return self._white_list
except Exception as e:
public.WriteLog("firewall", "Failed to load whitelist")
return []
def ip_in_network(self, ip, ip_make_net):
"""判断 IP 是否属于网段"""
# 解析网段
network_ip, prefix = ip_make_net.split("/")
prefix = int(prefix)
# 将 IP 和网段转换为整数
ip_int = int.from_bytes(map(int, ip.split(".")), "big")
network_int = int.from_bytes(map(int, network_ip.split(".")), "big")
# 计算掩码
mask = (0xFFFFFFFF << (32 - prefix)) & 0xFFFFFFFF
# 判断 IP 是否属于网段
return (ip_int & mask) == (network_int & mask)
def verify_ip(self, ip_entry):
"""检查规则IP是否和内网IP重叠"""
try:
ip_start = ip_entry.split(".")
if ip_start[0] in ('127', '10'):
return False
if (ip_start[0] == "172" and ip_start[1] == "16") or (ip_start[0] == "192" and ip_start[1] == "168"):
return False
release_ips = [self.get_host_ip()]
white_list = self.load_white_list()
release_ips += white_list
for rip in release_ips:
if self.ip_in_network(rip, ip_entry):
return False
return True
except:
return False
def handle_firewall_country(self, brief, ip_list, types, port_list, is_update=False):
_ipset = "in_bt_country_{}".format(brief)
firewall_ipset_path = "/tmp/firewall_{}.txt".format(brief)
if is_update or not os.path.exists(firewall_ipset_path):
tmp_file = open(firewall_ipset_path, 'w')
_string = ""
for ip in ip_list:
if self.verify_ip(ip):
_string = _string + "add {} {}\n".format(_ipset, ip)
tmp_file.write(_string)
tmp_file.close()
public.ExecShell(
'ipset create {} hash:net maxelem 1000000; ipset restore -f {}'.format(_ipset, firewall_ipset_path))
else:
public.ExecShell(
'ipset create {} hash:net maxelem 1000000; ipset restore -f {}'.format(_ipset, firewall_ipset_path))
if port_list:
for port in port_list:
public.ExecShell('iptables -I IN_BT_Country -m set --match-set {} src -p tcp --destination-port {} -j {}'.format(_ipset,port,types.upper()))
else:
public.ExecShell('iptables -I IN_BT_Country -m set --match-set {} src -j {}'.format(_ipset, types.upper()))
# 查询区域规则
def get_country_list(self, args):
# 分页校验参数
try:
args.validate([
Param('limit').Integer(),
Param('p').Integer(),
Param('query').String().Xss(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
p = 1
limit = 15
if 'p' in args: p = args.p
if 'limit' in args: limit = args.limit
where = '1=1'
sql = public.M('firewall_country')
if hasattr(args, 'query'):
where = " country like '%{search}%' or brief like '%{search}%'".format(search=args.query)
count = sql.where(where, ()).count()
data = public.get_page(count, int(p), int(limit))
data['data'] = sql.where(where, ()).limit('{},{}'.format(data['shift'], data['row'])).order(
'addtime desc'
).select()
return public.return_message(0, 0, data)
def create_countrys(self, get):
try:
get.country = get.get("country", None)
if not hasattr(get, 'country'):
return public.return_message(-1, 0, public.lang("Please enter the country name!"))
input_country = get.country
countrys = self.get_countrys(None)["message"]
countrys = countrys[1:]
countrys = [
{'CH': x['CH'], 'brief': x['brief'].strip()} for x in countrys
]
# 2024/1/6 下午 5:00 获取防火墙状态,如果没有启动则启动防火墙
if not self.get_firewall_status():
get.status = "start"
self.firewall_admin(get)
content = self.get_profile(self._ips_path)
try:
content = json.loads(content)
except:
return public.return_message(-1, 0, 'Failed to get region list!')
# {"US":[],"XX":[]}
ip_dict = {
x['brief'].strip(): x['ips'] for x in content
}
if "Except China" in input_country:
input_country = [i['CH'] for i in countrys if not "China" in i['CH']]
countrys_dict = {i['CH']: i['brief'] for i in countrys}
for i in input_country:
get.brief = countrys_dict.get(i, None)
get.country = i
ip_list = ip_dict.get(get.brief)
if ip_list:
self.create_country(get, True, ip_list)
else:
countrys_dict = {i['CH']: i['brief'] for i in countrys}
if isinstance(input_country, str):
input_country = [input_country]
for i in input_country:
get.brief = countrys_dict.get(i, None)
get.country = i
ip_list = ip_dict.get(get.brief)
if ip_list is None:
continue
self.create_country(get, True, ip_list)
get.status = "restart"
self.firewall_admin(get)
return public.return_message(0, 0, public.lang("Added successfully"))
except Exception as e:
public.print_log(public.get_error_info())
print(traceback.format_exc())
return public.return_message(-1, 0, public.lang(str(e)))
# 添加区域规则
def create_country(self, get, is_mutil=False, _ips_paths=None):
# {"country": "US Virgin Islands", "types": "drop", "choose": "all", "ports": "", "brief": "VI"} 暂时只有封锁和地区可选
# 校验参数
try:
get.validate([
Param('country').Require().Xss(),
Param('types').Require().String('in', ['accept', 'drop']),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
get.is_update = get.get("is_update", False)
brief = get.brief
types = get.types # types in [accept, drop]
ports = get.ports
country = get.country
rep = r"^\d{1,5}(:\d{1,5})?$"
port_list = []
# 检测该区域是否已添加过全部端口规则 hezhihong status must in ['start', 'stop']
add_list = public.M('firewall_country').where("country=?", (country,)).field('ports').select()
for add in add_list:
if not add['ports']:
return public.return_message(-1, 0, public.lang("This area has already been added, please do not add it again!"))
if ports:
port_list = ports.split(',')
for port in port_list:
if not re.search(rep, port):
return public.return_message(-1, 0, public.lang("Port range is incorrect!"))
if public.M('firewall_country').where(
"country=? and ports=?", (country, port)
).count() > 0:
return public.return_message(-1, 0, public.lang("This area has already been added, please do not add it again!"))
cache_key = "firewall_info"
from YakPanel import cache
data = cache.get(cache_key)
if data: cache.delete(cache_key)
self.get_os_info()
ip_list = []
if _ips_paths is None:
content = self.get_profile(self._ips_path)
try:
result = json.loads(content)
except:
return public.return_message(-1, 0, 'Failed to get region list!')
for r in result:
if brief == r["brief"]:
ip_list = r["ips"]
break
if not ip_list:
return public.return_message(-1, 0, "Please enter the correct region name!")
else:
ip_list = _ips_paths
self.handle_firewall_country(brief, ip_list, types, port_list, get.is_update)
addtime = time.strftime('%Y-%m-%d %X', time.localtime())
if port_list:
for port in port_list:
public.M('firewall_country').add('country,types,brief,ports,addtime',
(country, types, brief, port, addtime))
else:
public.M('firewall_country').add('country,types,brief,ports,addtime', (country, types, brief, '', addtime))
if is_mutil is False:
get.status = "restart"
self.firewall_admin(get)
if not get.ports:
log_port = "All Ports"
else:
log_port = get.ports
if get.types == 'accept':
strategy = "accept"
else:
strategy = "drop"
public.WriteLog("system firewall", "Add regional rules: Region:{}, Policy:{}, Port:{}".format(get.country, strategy, log_port))
return public.return_message(0, 0, public.lang("Added successfully!"))
# 删除区域规则
def remove_country(self, get):
# 校验参数
try:
get.validate([
Param('id').Require().Integer(),
Param('country').Require().Xss(),
Param('types').Require().String('in', ['accept', 'drop']),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
id = get.id
types = get.types
brief = get.brief
ports = get.ports
get.reload = get.get("reload/d", 1)
_ipset = "in_bt_country_{}".format(brief)
cache_key = "firewall_info"
from YakPanel import cache
data = cache.get(cache_key)
if data: cache.delete(cache_key)
public.M('firewall_country').where("id=?", (id,)).delete()
if not ports:
public.ExecShell('iptables -D IN_BT_Country -m set --match-set ' + _ipset + ' src -j ' + types.upper())
else:
public.ExecShell('iptables -D IN_BT_Country -m set --match-set ' + _ipset + ' src -p tcp --destination-port ' + ports + ' -j ' + types.upper())
if not get.ports:
log_port = "All Ports"
else:
log_port = get.ports
if get.types == 'accept':
strategy = "accept"
elif get.types == 'drop':
strategy = 'drop'
public.WriteLog("system firewall", "Delete zone rules: Region:{}, Policy:{}, Port:{}".format(get.country, strategy, log_port))
return public.return_message(0, 0, public.lang("Delete successfully!"))
# 2024/6/5 下午5:48 批量删除区域规则
def batch_remove_country(self, get):
'''
@name 批量删除区域规则
@author wzz <2024/6/5 下午5:49>
@param "data":{"参数名":""} <数据类型> 参数描述
@return dict{"status":True/False,"msg":"提示信息"}
'''
get.remove_country = get.get("remove_country", [])
if not isinstance(get.remove_country, list):
return public.return_message(-1, 0, "params remove_country is not json")
if len(get.remove_country) == 0:
return public.return_message(-1, 0, "Please select the rules to be deleted")
get.reload = 0
result_list = []
for r in get.remove_country:
args = public.to_dict_obj({
"id": r["id"],
"types": r["types"],
"brief": r["brief"],
"ports": r["ports"],
"country": r["country"],
"reload": 0,
})
result = self.remove_country(args)
if not result["status"]:
result_list.append({"country": r["country"], "status": False})
continue
result_list.append({"country": r["country"], "status": True})
return public.return_message(0, 0, result_list)
# 编辑区域规则
def modify_country(self, get):
# {"id":17,"country":"Vanuatu","types":"drop","choose":"all","ports":"","brief":"VU "}
# 校验参数
try:
get.validate([
Param('id').Require().Integer(),
Param('country').Require().Xss(),
Param('types').Require().String('in', ['accept', 'drop']),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
id = get.id
rep = "^\d{1,5}(:\d{1,5})?$"
if "ports" in get and get.ports:
port_list = get.ports.split(',')
for port in port_list:
if not re.search(rep, port):
return public.return_message(-1, 0, 'PORT_CHECK_RANGE')
if public.M('firewall_country').where("country=? and ports=?", (get.country, port)).count() > 0:
return public.return_message(-1, 0, 'This area has already been added, please do not add it again!')
data = public.M('firewall_country').where('id=?', (id,)).field('id,country,types,brief,ports,addtime').find()
ori_get = public.dict_obj()
ori_get.id = id
ori_get.types = data.get("types", "")
ori_get.brief = data.get("brief", "")
ori_get.country = data.get("country", "")
ori_get.ports = data.get("ports", "")
ori_get.not_reload = "true"
rm_res = self.remove_country(ori_get)
if rm_res["status"] == 0:
create_res = self.create_country(get)
if create_res["status"] == 0:
return public.return_message(0, 0, public.lang("Successful operation"))
return public.return_message(0, 0, public.lang("operation failed"))
# 获取服务端列表centos
def GetList(self):
try:
result, arry = self.__Obj.GetAcceptPortList()
addtime = time.strftime('%Y-%m-%d %X', time.localtime())
for i in range(len(result)):
if "address" not in result[i].keys(): continue
tmp = self.check_db_exists(result[i]['ports'],
result[i]['address'],
result[i]['types'])
protocol = result[i]['protocol']
ports = result[i]['ports']
types = result[i]['types']
address = result[i]['address']
if not tmp:
if ports:
public.M('firewall_new').add(
'ports,protocol,address,types,brief,addtime',
(ports, protocol, address, types, '', addtime))
else:
public.M('firewall_ip').add(
'address,types,brief,addtime',
(address, types, '', addtime))
for i in range(len(arry)):
if arry[i]['port']:
tmp = self.check_trans_data(arry[i]['port'])
protocol = arry[i]['protocol']
s_port = arry[i]['port']
d_port = arry[i]['to-port']
address = arry[i]['address']
if not tmp:
public.M('firewall_trans').add(
'start_port,ended_ip,ended_port,protocol,addtime',
(s_port, address, d_port, protocol, addtime))
except Exception as e:
file = open('error.txt', 'w')
return public.returnMsg(False, e)
# 获取服务端列表ufw
def get_ufw_list(self):
data = public.M('firewall').field('id,port,ps,addtime').select()
if type(data) != list: return
try:
for dt in data:
port = dt['port']
brief = dt['ps']
addtime = dt['addtime']
if port.find('.') != -1:
tmp = self.check_db_exists('', port, 'drop')
if not tmp:
public.M('firewall_ip').add('address,types,brief,addtime',
(port, 'drop', '', addtime))
else:
tmp = self.check_db_exists(port, '', 'accept')
if not tmp:
public.M('firewall_new').add(
'ports,brief,protocol,address,types,addtime',
(port, brief, 'tcp/udp', '', 'accept', addtime))
except:
pass
# 检查数据库是否存在
def check_db_exists(self, ports, address, types):
if ports:
data = public.M('firewall_new').field(
'id,ports,protocol,address,types,brief,addtime').select()
for dt in data:
if dt['ports'] == ports: return dt
return False
else:
data = public.M('firewall_ip').field(
'id,address,types,brief,addtime').select()
for dt in data:
if dt["address"] == address and dt["types"] == types: return dt
return False
def check_trans_data(self, ports):
data = public.M('firewall_trans').field(
'id,start_port,ended_ip,ended_port,protocol,addtime').select()
for dt in data:
if dt['start_port'] == ports: return dt
return False
# 规则导出:服务器
def export_rules(self, get):
get.rule_name = get.get("rule_name", None)
if get.rule_name is None:
return public.return_message(-1, 0, 'parmas rule_name not found')
if get.rule_name == "port_rule":
filename = self._rule_path + "port.json"
data_list = public.M('firewall_new').order("id desc").select()
elif get.rule_name == "ip_rule":
filename = self._rule_path + "ip.json"
data_list = public.M('firewall_ip').order("id desc").select()
elif get.rule_name == "trans_rule":
filename = self._rule_path + "forward.json"
data_list = public.M('firewall_trans').order("id desc").select()
elif get.rule_name == "country_rule":
filename = self._rule_path + "country.json"
data_list = public.M('firewall_country').order("id desc").select()
else:
return public.return_message(-1, 0, 'unknow type: {}'.format(get.rule_name))
write_string = ""
if data_list:
for i in data_list:
for v in i.keys():
if v == 'domain': i[v] = i[v].replace('|', '#')
write_string += str(i[v]) + "|"
write_string += '\n'
public.writeFile(filename, write_string)
public.WriteLog("system firewall", "Export port rules")
return public.return_message(0, 0, filename)
# 规则导出:本地
def get_file(self, args):
filename = args.filename
mimetype = "application/octet-stream"
if not os.path.exists(filename):
return abort(404)
return send_file(filename,
mimetype=mimetype,
as_attachment=True,
attachment_filename=os.path.basename(filename),
cache_timeout=0)
# 规则导入json
def import_rules(self, get):
try:
rule_name = get.rule_name # 规则名:[port_rule, ip_rule, trans_rule, country_rule]
file_name = get.file_name # 文件命:[port.json, ip.json, trans.json, country.json]
file_path = "{0}{1}".format(self._rule_path, file_name)
data_list = self.get_profile(file_path)
pay = self.__check_auth()
not_pay_list = []
tmp_data = []
result = {"status": False, "msg": "import rules fail"}
# |分隔符格式文件导入 hezhihong
if data_list and isinstance(data_list, str):
if data_list.find('|') != -1:
data_list = data_list.split('\n')
for data in data_list:
if not data: continue
split_data = data.split('|')
data_dict = {}
data_dict['id'] = split_data[0]
if rule_name == 'port_rule':
if not pay and split_data[7].find('#') != -1:
not_pay_list.append(data)
continue
data_dict['protocol'] = split_data[1]
data_dict['ports'] = split_data[2]
data_dict['types'] = split_data[3]
data_dict['address'] = split_data[4]
data_dict['brief'] = split_data[5]
data_dict['addtime'] = split_data[6]
data_dict['domain'] = split_data[7]
elif rule_name == 'ip_rule':
if not pay and split_data[6].find('#') != -1:
not_pay_list.append(data)
continue
data_dict['types'] = split_data[1]
data_dict['address'] = split_data[2]
data_dict['brief'] = split_data[3]
data_dict['addtime'] = split_data[4]
data_dict['domain'] = split_data[6]
elif rule_name == 'trans_rule':
data_dict['start_port'] = split_data[1]
data_dict['ended_ip'] = split_data[2]
data_dict['ended_port'] = split_data[3]
data_dict['protocol'] = split_data[4]
data_dict['addtime'] = split_data[5]
elif rule_name == 'country_rule':
data_dict['types'] = split_data[1]
data_dict['country'] = split_data[2]
data_dict['brief'] = split_data[3]
data_dict['addtime'] = split_data[4]
data_dict['ports'] = split_data[5]
tmp_data.append(data_dict)
data_list = tmp_data
# 一行一条规则格式文件导入 hezhihong
if data_list and isinstance(data_list, str):
data_list = data_list.strip()
if isinstance(data_list, str) and data_list.find('\n') != -1:
data_list = data_list.split('\n')
try:
data_list.remove('')
except:
pass
if isinstance(data_list, str):
try:
data_list = json.loads(data_list)
except:
if os.path.exists(file_path):
os.remove(file_path)
return public.return_message(-1, 0, public.lang("The file content is incorrect!"))
if data_list:
if isinstance(data_list, dict):
data_list = [data_list]
if not isinstance(data_list, list):
if os.path.exists(file_path):
os.remove(file_path)
return public.return_message(-1, 0, public.lang("The file content is incorrect!"))
if len(data_list) == 0:
return public.return_message(-1, 0, public.lang("The file is empty!"))
result = self.hand_import_rules(rule_name, data_list)
os.remove(file_path)
if not_pay_list:
not_pay_list = ("<br/>" + "-" * 20 + "<br/>").join(not_pay_list)
st = 0 if result["status"] else -1
return public.return_message(st, 0,
"{}<br/>The designated domain name function is exclusive to the Enterprise Edition, and the following rules are not imported:<br/>{}".format(result["msg"], not_pay_list)
)
public.WriteLog("system firewall", "Import port rules")
st = 0 if result["status"] else -1
return public.return_message(st, 0, result["msg"])
except Exception:
public.print_log(public.get_error_info())
return public.return_message(-1, 0, public.lang("The import failed. The format of the rules is wrong. Please try again according to the format of the export rules!"))
# 处理规则导入读取json文件内容
def hand_import_rules(self, rule_name, data_list):
table_head = []
try:
if rule_name == "port_rule":
table_head = ["id", "protocol", "ports", "types", "address", "brief", "addtime", "domain", ]
for data in data_list:
#兼容一行一条规则格式文件导入 hezhihong
try:
data = json.loads(data)
except:
pass
res = all([field in data.keys() for field in table_head])
if not res or len(table_head) != len(data.keys()):
return {"status": False, "msg": "The data format is incorrect!"}
get = public.dict_obj()
get.protocol = data["protocol"]
get.ports = data["ports"]
get.types = data["types"]
get.source = data["address"]
get.brief = data["brief"]
# 兼容域名导入 hezhihong
if 'domain' in data.keys() and data['domain']:
get.domain = data['domain'].split('#')[0]
get.source = data['domain'].split('#')[1]
result = self.create_rules(get)
if not result["status"]:
continue
elif rule_name == "ip_rule":
table_head = ["id", "types", "address", "brief", "addtime"]
for data in data_list:
# 兼容一行一条规则格式文件导入 hezhihong
try:
data = json.loads(data)
except:
pass
res = all([field in data.keys() for field in table_head])
if not res:
return {"status": False, "msg": "The data format is incorrect!"}
get = public.dict_obj()
get.types = data["types"]
get.address = data["address"]
get.brief = data["brief"]
# 兼容域名导入 hezhihong
if 'domain' in data.keys() and data['domain']:
get.domain = data['domain'].split('#')[0]
get.source = data['domain'].split('#')[1]
result = self.create_ip_rules(get)
if result["status"] == -1:
continue
elif rule_name == "trans_rule":
table_head = [
"id", "start_port", "ended_ip", "ended_port", "protocol",
"addtime"
]
for data in data_list:
# 兼容一行一条规则格式文件导入 hezhihong
try:
data = json.loads(data)
except:
pass
res = all([field in data.keys() for field in table_head])
if not res:
return {"status": False, "msg": "The data format is incorrect!"}
get = public.dict_obj()
get.s_ports = data["start_port"]
get.d_address = data["ended_ip"]
get.d_ports = data["ended_port"]
get.protocol = data["protocol"]
result = self.create_forward(get)
if not result["status"]:
continue
elif rule_name == "country_rule":
table_head = ["id", "types", "country", "brief", "addtime", "ports"]
for data in data_list:
# 兼容一行一条规则格式文件导入 hezhihong
try:
data = json.loads(data)
except:
pass
res = all([field in data.keys() for field in table_head])
if not res:
return {"status": False, "msg": "The data format is incorrect!"}
get = public.dict_obj()
get.types = data["types"]
get.ports = data["ports"]
get.brief = data["brief"]
get.country = data["country"]
result = self.create_country(get)
if not result["status"]:
continue
except:
return {"status": False, "msg": "Import failed!"}
return {"status": True, "msg": "Imported successfully!"}
def get_countrys(self, get):
try:
content = self.get_profile(self._country_path)
result = json.loads(content)
result = sorted(result, key=lambda x: x['CH'], reverse=True)
if isinstance(result, list):
result.insert(0, {"CH": "Except China", "brief": "OTHER"})
return public.return_message(0, 0, result)
except:
return public.return_message(0, 0, [])
# 读取配置文件
def get_profile(self, path):
if not os.path.exists(path):
b_path = os.path.dirname(path)
if not os.path.exists(b_path): os.makedirs(b_path)
if path in [self._ips_path, self._country_path, self._white_list_file]:
public.downloadFile('{}/install/lib/{}'.format(public.get_url(), os.path.basename(path)), path)
content = ""
with open(path, "r", encoding='utf-8', errors='ignore') as fr:
content = fr.read()
return content
# 保存配置文件
def save_profile(self, path, data):
with open(path, "w") as fw:
fw.write(data)
# 读取配置文件
def update_profile(self, path):
import files
f = files.files()
return f.GetFileBody(path)
# 获取端口规则列表
def get_port_rules(self, get):
rule_list = public.M('firewall_new').order("id desc").select()
return public.return_message(0, 0, rule_list)
# 整理配置文件格式
def format(self, em, level=0):
i = "\n" + level * " "
if len(em):
if not em.text or not em.text.strip():
em.text = i + " "
for e in em:
self.format(e, level + 1)
if not e.tail or not e.tail.strip():
e.tail = i
if level and (not em.tail or not em.tail.strip()):
em.tail = i
def check_table(self):
if public.M('sqlite_master').where('type=? AND name=?',
('table', 'firewall_new')).count():
if public.M('sqlite_master').where(
'type=? AND name=?', ('table', 'firewall_ip')).count():
if public.M('sqlite_master').where(
'type=? AND name=?',
('table', 'firewall_trans')).count():
if public.M('sqlite_master').where(
'type=? AND name=?',
('table', 'firewall_country')).count():
return True
return Sqlite()
def delete_service(self):
if self.__isUfw:
public.ExecShell('ufw delete allow ssh')
else:
if self.__isFirewalld:
public.ExecShell(
'firewall-cmd --zone=public --remove-service=ssh --permanent'
)
else:
pass
return True
# 获取系统类型(具体到哪个版本)
def get_os_info(self):
tmp = {"osname": "", "version": ""}
if os.path.exists('/etc/redhat-release'):
sys_info = public.ReadFile('/etc/redhat-release')
elif os.path.exists('/usr/bin/yum'):
sys_info = public.ReadFile('/etc/issue')
elif os.path.exists('/etc/issue'):
sys_info = public.ReadFile('/etc/issue')
try:
tmp['osname'] = sys_info.split()[0]
tmp['version'] = re.search(r'\d+(\.\d*)*', sys_info).group()
except:
os_result = public.ExecShell(". /etc/os-release && echo $ID")[0]
if "amzn" == os_result:
tmp['osname'] = 'CentOS'
tmp['version'] = '8'
if tmp["osname"] == "CentOS":
if tmp["version"].startswith("8"):
content = self.get_profile("/etc/firewalld/firewalld.conf")
content = content.replace("FirewallBackend=nftables", "FirewallBackend=iptables")
self.save_profile("/etc/firewalld/firewalld.conf", content)
public.ExecShell("systemctl restart firewalld")
return True
# 新加代码----- start
def sync_must_ports(self, get):
'''
同步必须放行的端口
@param get:
@return:
'''
# 检查必传参数
try:
public.exists_args('ports', get)
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(0, 0, str(ex))
protocol = "tcp"
ports = get.ports.strip()
print(ports)
if not ports: return public.returnMsg(False, public.lang("Port cannot be empty!"))
port_list = ports.split(",") if ports.find(",") != -1 else [ports]
types = "accept"
check_result = self.check_port(port_list)
if check_result: return check_result
firewall_type = 'iptables'
if self.__isFirewalld: firewall_type = 'firewalld'
if self.__isUfw: firewall_type = 'ufw'
try:
# 2024/1/6 下午 5:00 获取防火墙状态,如果没有启动则启动防火墙
if not self.get_firewall_status():
get = public.dict_obj()
get.status = 1
# get.status = 'start'
self.firewall_admin(get)
for port in port_list:
if firewall_type == 'firewalld':
if port.find(':') != -1: port = port.replace(':', '-')
self.add_firewall_rule("", protocol, port, types)
elif firewall_type == 'ufw':
if port.find('-') != -1: port = port.replace('-', ':')
self.add_ufw_rule("", protocol, port, types)
else:
self.add_iptables_rule("", protocol, port, types)
query_result = public.M('firewall_new').where(
'ports=? and address=? and protocol=? and types=?',
(port, "", protocol, types)
).find()
if query_result: continue
addtime = time.strftime('%Y-%m-%d %X', time.localtime())
self._add_sid = public.M('firewall_new').add(
'ports,brief,protocol,address,types,addtime,domain,sid',
(port, "", protocol, "", types, addtime, "", 0)
)
return public.return_message(0, 0, public.lang("Added successfully!"))
except Exception:
print(traceback.format_exc())
return public.return_message(0, 0, public.lang("Failed to add"))
def _get_webserver(self):
'''
获取web服务器类型
@return:
'''
webserver = ''
if os.path.exists('/www/server/nginx/sbin/nginx'):
webserver = 'nginx'
elif os.path.exists('/www/server/apache/bin/httpd'):
webserver = 'apache'
elif os.path.exists('/usr/local/lsws/bin/lswsctrl'):
webserver = 'lswsctrl'
return webserver
def get_port_info(self, get):
'''
获取面板防火墙关键服务端口放行状态信息
判断服务是否存在,能读取文件就读取文件,配置文件不大不会影响性能,这种方式能最大缩短接口响应时间,公网测试80ms
@param get:
@return:
'''
ports_list = []
result_list = [{"name": "FTP passive port", "status": 0, "port": "39000-40000"}]
webserver = self._get_webserver()
if webserver in ['nginx', 'apache', 'lswsctrl']:
result_list.append({"name": "website port", "status": 0, "port": "80"})
ports_list.append("80")
port_443, _ = public.ExecShell("fuser -n tcp 443")
if port_443:
result_list.append({"name": "HTTPS port", "status": 0, "port": "443"})
ports_list.append("443")
_panel_port_file = '/www/server/panel/data/port.pl'
panel_port = public.readFile(_panel_port_file).strip(" ").strip("\n")
cmd = "cat /www/server/pure-ftpd/etc/pure-ftpd.conf |grep Bind|awk -F ',' '{print $2}'"
ftp_port = public.ExecShell(cmd)[0].strip(" ").strip("\n").strip("\r")
cmd = "cat /etc/ssh/sshd_config |grep -E '^Port'|awk '{print $2}'|awk 'NR == 1'"
ssh_port = public.ExecShell(cmd)[0].strip(" ").strip("\n")
if ssh_port == "": ssh_port = "22"
ports_list.append(panel_port)
result_list.append({"name": "panel", "status": 0, "port": panel_port})
ports_list.append(ftp_port)
result_list.append({"name": "FTP active port", "status": 0, "port": ftp_port})
ports_list.append(ssh_port)
result_list.append({"name": "SSH", "status": 0, "port": ssh_port})
ports_list.append("39000-40000")
if self.__isUfw:
list1 = self._get_ufw_port_status(ports_list, result_list)
return public.return_message(0, 0, list1)
if self.__isFirewalld:
list1 = self._get_firewall_port_status(ports_list, result_list)
return public.return_message(0, 0, list1)
return public.return_message(0, 0, {})
def _get_firewall_port_status(self, ports_list, result_list):
'''
获取firewalld防火墙端口状态
@param ports_list:
@param result_list:
@return:
'''
with contextlib.suppress(Exception):
if self.__firewall_obj is None:
return result_list
_firewalld_ports, _ = self.__firewall_obj.GetAcceptPortList()
for firewalld_port in _firewalld_ports:
if firewalld_port['ports'] in ports_list:
for result in result_list:
if result['port'] == firewalld_port['ports']:
result['status'] = 1
break
return result_list
def _get_ufw_port_status(self, ports_list, result_list):
'''
获取ufw防火墙端口状态
@param ports_list:
@param result_list:
@return:
'''
with contextlib.suppress(Exception):
rules_result = self._get_ufw_port_info()
ports_set = set(ports_list) # 将要查找的端口列表转换成集合,以便进行高效查找
for rule in rules_result:
if 'tcp' in rule['protocol'] and rule['ports'] in ports_set:
for result in result_list:
if result['port'] == rule['ports']:
result['status'] = 1
ports_set.remove(rule['ports'])
break
if not ports_set: break
return result_list
def _get_ufw_port_info(self):
'''
获取ufw防火墙端口信息
@return:
'''
with open('/etc/ufw/user.rules', 'r') as f:
content = f.read()
start_index = content.find('### RULES ###')
end_index = content.find('### END RULES ###')
result = content[start_index + 15:end_index]
sys_rules = [rule for rule in result.split('\n') if rule != '' and '###' in rule]
# 将sys_rules列表中的每个元素拆分出来,并且去掉空格,元素为字符串,例如:'### tuple ### allow tcp 20 0.0.0.0/0 any 0.0.0.0/0 in'
# 拆分后的列表元素为:{'protocol': 'tcp', 'ports': '20', 'types': 'allow', 'address': '0.0.0.0'}
rules = []
for rule in sys_rules:
rule = rule.split(' ')
rule = [i for i in rule if i != '']
rules.append({
'protocol': rule[4] if rule[4] != 'any' else 'tcp/udp',
'ports': rule[5] if rule[5].find(':') == -1 else rule[5].replace(':', '-'),
'types': 'accept' if rule[3] == 'allow' else 'drop',
'address': rule[8] if rule[8] != '0.0.0.0/0' else ''
})
unique_set = set(tuple(sorted(item.items())) for item in rules)
rules = [dict(item) for item in unique_set]
return rules
@staticmethod
def get_listening_processes(get):
'''
获取指定端口的进程信息
@param get:
@return:
'''
# 校验参数
try:
get.validate([
Param('port').Require().Number(">=", 1).Number("<=", 65535).Xss(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
if get.port.find('-') != -1 or get.port.find(':') != -1:
return public.return_message(-1, 0, public.lang("Range ports not supported"))
process_name = ''
process_pid = ''
process_cmd = ''
cmd = "lsof -F c -i:{}".format(get.port)
info_list = public.ExecShell(cmd)[0].strip("\n").split("\n")
if len(info_list) >= 2:
process_name = info_list[1][1:]
process_pid = info_list[0][1:]
cmd_ps = "ps aux|grep {}|grep -v grep".format(process_pid)
cmd_awk = "|awk '{print $11,$12,$13,$14}'"
process_cmd = public.ExecShell(cmd_ps + cmd_awk)[0].split("\n")[0].strip(" ")
data = {
"process_name": process_name,
"process_pid": process_pid,
"process_cmd": process_cmd
}
return public.return_message(0, 0, data)
def get_diff_panel_firewall_rules(self, get):
'''
对比面板防火墙规则数据库和防火墙配置文件,取出差异的规则
@return:
'''
# 获取面板防火墙规则数据库
panel_firewall_rules = self.get_panel_firewall_rules()
# 获取防火墙配置文件
firewall_rules = self.get_sys_firewall_rules()
# 取出差异的规则
diff_rules = self._get_diff_rules(panel_firewall_rules, firewall_rules)
return public.return_message(0, 0, diff_rules)
def get_panel_firewall_rules(self):
'''
获取面板防火墙规则数据库
@return:
'''
all_ports = public.M('firewall_new').field('protocol,ports,types,address').order('addtime desc').select()
unique_set = set(tuple(sorted(item.items())) for item in all_ports)
new_ports = [dict(item) for item in unique_set]
return new_ports
def get_sys_firewall_rules(self):
'''
获取防火墙配置文件
@return:
'''
if self.__isUfw:
return self._get_ufw_port_info()
if self.__isFirewalld:
if self.__firewall_obj is None:
return []
return self.__firewall_obj.recombine_rules()
return []
def _diff_dict_list(self, list1, list2):
'''
比较两个dict类型的list,返回list1中有,而list2中没有的元素
@param list1:
@param list2:
@return:
'''
list1_not_in_list2 = []
for item1 in list1:
found = False
try:
for item2 in list2:
# 对比'protocol,ports,types,address'是否相同
if all(item1[key] == item2[key] for key in ('protocol', 'ports', 'types', 'address')):
found = True
break
except KeyError:
continue
if not found: list1_not_in_list2.append(item1)
return list1_not_in_list2
def _get_diff_rules(self, panel_firewall_rules, firewall_rules):
'''
取出差异的规则
@param panel_firewall_rules:
@param firewall_rules:
@return:
'''
firewall_diff_rules_name = 'firewall_diff_rules'
firewall_diff_rules = {}
if os.path.isfile("config/{}.json".format(firewall_diff_rules_name)):
firewall_diff_rules = public.read_config(firewall_diff_rules_name)
if not firewall_diff_rules:
data = {
"panel_not_in_sys_fw_diff_list": self._diff_dict_list(panel_firewall_rules, firewall_rules),
"sys_not_in_panel_fw_diff_list": self._diff_dict_list(firewall_rules, panel_firewall_rules),
"panel_exclude": [],
"sys_exclude": []
}
public.save_config(firewall_diff_rules_name, data)
return data
panel_not_in_sys_fw_diff_list = self._diff_dict_list(panel_firewall_rules, firewall_rules)
sys_not_in_panel_fw_diff_list = self._diff_dict_list(firewall_rules, panel_firewall_rules)
# 如果firewall_diff_rules的panel_exclude和sys_exclude中有值,则将其从panel_not_in_sys_fw_diff_list和sys_not_in_panel_fw_diff_list中去掉
if firewall_diff_rules['panel_exclude']:
for key in firewall_diff_rules['panel_exclude']:
if key in panel_not_in_sys_fw_diff_list:
panel_not_in_sys_fw_diff_list.remove(key)
if firewall_diff_rules['sys_exclude']:
for key in firewall_diff_rules['sys_exclude']:
# panel_not_in_sys_fw_diff_list是list如果key在panel_not_in_sys_fw_diff_list中,则将其从panel_not_in_sys_fw_diff_list中去掉
if key in sys_not_in_panel_fw_diff_list:
sys_not_in_panel_fw_diff_list.remove(key)
# 将差异规则写入文件并返回
firewall_diff_rules['panel_not_in_sys_fw_diff_list'] = panel_not_in_sys_fw_diff_list
firewall_diff_rules['sys_not_in_panel_fw_diff_list'] = sys_not_in_panel_fw_diff_list
public.save_config(firewall_diff_rules_name, firewall_diff_rules)
return firewall_diff_rules
def exclude_diff_rules(self, get):
'''
排除firewall_diff_rules的规则并写入配置文件
@param get:
@return:
'''
try:
panel_excludes = get.panel_exclude if "panel_exclude" in get.get_items().keys() else {}
sys_excludes = get.sys_exclude if "sys_exclude" in get.get_items().keys() else {}
status = get.status if "status" in get.get_items().keys() else {}
if status == 'add':
return self._add_exclude(panel_excludes, sys_excludes)
elif status == 'del':
return self._del_exclude(panel_excludes, sys_excludes)
except Exception as e:
return public.return_message(0, 0, public.lang("Ignore rule failed,{}!", e))
def _add_exclude(self, panel_excludes, sys_excludes):
'''
添加排除规则
@param panel_exclude:
@param sys_exclude:
@return:
'''
firewall_diff_rules_name = 'firewall_diff_rules'
firewall_diff_rules = {}
if os.path.isfile("config/{}.json".format(firewall_diff_rules_name)):
firewall_diff_rules = public.read_config(firewall_diff_rules_name)
if not firewall_diff_rules['panel_exclude']:
firewall_diff_rules['panel_exclude'] = panel_excludes
else:
for exclude in panel_excludes:
if exclude not in firewall_diff_rules['panel_exclude']:
firewall_diff_rules['panel_exclude'].append(exclude)
if not firewall_diff_rules['sys_exclude']:
firewall_diff_rules['sys_exclude'] = sys_excludes
else:
for exclude in sys_excludes:
if exclude not in firewall_diff_rules['sys_exclude']:
firewall_diff_rules['sys_exclude'].append(exclude)
public.save_config(firewall_diff_rules_name, firewall_diff_rules)
return public.returnMsg(True, public.lang("Ignore rules successfully!"))
def _del_exclude(self, panel_excludes, sys_excludes):
'''
删除排除规则
@param panel_excludes:
@param sys_excludes:
@return:
'''
firewall_diff_rules_name = 'firewall_diff_rules'
firewall_diff_rules = {}
if os.path.isfile("config/{}.json".format(firewall_diff_rules_name)):
firewall_diff_rules = public.read_config(firewall_diff_rules_name)
new_panel_exclude = []
new_sys_exclude = []
for exclude in firewall_diff_rules['panel_exclude']:
if exclude not in panel_excludes:
new_panel_exclude.append(exclude)
for exclude in firewall_diff_rules['sys_exclude']:
if exclude not in sys_excludes:
new_sys_exclude.append(exclude)
firewall_diff_rules['panel_exclude'] = new_panel_exclude
firewall_diff_rules['sys_exclude'] = new_sys_exclude
public.save_config(firewall_diff_rules_name, firewall_diff_rules)
return public.return_message(0, 0, public.lang("Cancel ignore rule successfully"))
def _add_firewall_rules(self, source_ip, protocol, port, types):
'''
添加防火墙规则
@param source_ip:
@param protocol:
@param port:
@param types:
@return:
'''
if self.__isUfw:
if port.find('-') != -1:
port = port.replace('-', ':')
self.add_ufw_rule(source_ip, protocol, port, types)
elif self.__isFirewalld:
if port.find(':') != -1:
port = port.replace(':', '-')
self.add_firewall_rule(source_ip, protocol, port, types)
else:
self.add_iptables_rule(source_ip, protocol, port, types)
def _del_firewall_rules(self, source_ip, protocol, port, types):
'''
删除防火墙规则
@param source_ip:
@param protocol:
@param port:
@param types:
@return:
'''
if self.__isUfw:
self.del_ufw_rule(source_ip, protocol, port, types)
elif self.__isFirewalld:
self.del_firewall_rule(source_ip, protocol, port, types)
else:
self.del_iptables_rule(source_ip, protocol, port, types)
def _modify_firewall_rules(self, address, protocol, port, type, source_ip, source_protocol, ports, types):
'''
修改防火墙规则1
@param address:
@param protocol:
@param port:
@param type:
@param source_ip:
@param source_protocol:
@param ports:
@param types:
@return:
'''
if self.__isUfw:
self.edit_ufw_rule(address, protocol, port, type, source_ip, source_protocol, ports, types)
elif self.__isFirewalld:
self.edit_firewall_rule(address, protocol, port, type, source_ip, source_protocol, ports, types)
else:
self.edit_iptables_rule(address, protocol, port, type, source_ip, source_protocol, ports, types)
# 新加代码----- end
# 端口防扫描 --- start
def _get_server_lists_scan(self):
"""
@name 获取服务器常用端口
@return:
"""
return {
"sshd": "{}".format(public.get_sshd_port()),
"mysql": "{}".format(public.get_mysql_info()["port"]),
"ftpd": "21",
"dovecot": "110,143",
"postfix": "25,465,587",
}
def get_anti_scan_logs(self, get):
"""
@name 获取防扫描日志
@param get:
@return:
"""
get = public.dict_obj()
server_lists = self._get_server_lists_scan()
result_dict = {
"currently_failed": 0,
"total_failed": 0,
"currently_banned": 0,
"total_banned": 0,
"banned_ip_list": []
}
import PluginLoader
for key in server_lists:
get.mode = key
logs_result = PluginLoader.plugin_run('fail2ban', 'get_status', get)
if type(logs_result['msg']) is dict:
result_dict["currently_failed"] += int(logs_result["msg"]["currently_failed"])
result_dict["total_failed"] += int(logs_result["msg"]["total_failed"])
result_dict["currently_banned"] += int(logs_result["msg"]["currently_banned"])
result_dict["total_banned"] += int(logs_result["msg"]["total_banned"])
result_dict["banned_ip_list"] += logs_result["msg"]["banned_ip_list"]
return public.return_message(0, 0, result_dict)
def get_anti_scan_status(self, get):
"""
@name 获取端口防扫描
@return:
"""
plugin_path = "/www/server/panel/plugin/fail2ban"
result_data = {"status": 0, "installed": 1}
if not os.path.exists("{}".format(plugin_path)):
result_data['installed'] = 0
return public.return_message(0, 0, result_data)
sock = "{}/fail2ban.sock".format(plugin_path)
if not os.path.exists(sock):
return public.return_message(0, 0, result_data)
server_lists = self._get_server_lists_scan()
s_file = '{}/plugin/fail2ban/config.json'.format(public.get_panel_path())
if os.path.exists(s_file):
try:
data = json.loads(public.readFile(s_file))
if len(data) == 0:
return public.return_message(0, 0, result_data)
for key in server_lists:
if key in data:
if data[key]['act'] != 'true':
result_data['status'] = 0
return public.return_message(0, 0, result_data)
result_data['status'] = 1
return public.return_message(0, 0, result_data)
except:
pass
return public.return_message(0, 0, result_data)
def set_anti_scan_status(self, get):
"""
@name 设置常用端口防扫描
@param get:
@return:
"""
scan_status = get.status if "status" in get else 0
param_dict = {
'type': 'edit',
'act': 'true' if scan_status == 1 else 'false',
'maxretry': '30',
'findtime': '300',
'bantime': '600',
'port': '',
'mode': ''
}
server_lists = self._get_server_lists_scan()
_set_up_path = "/www/server/panel/plugin/fail2ban"
_config = _set_up_path + "/config.json"
if not os.path.exists(_set_up_path + "/fail2ban_main.py"):
return public.return_message(0, 0, public.lang("fail2ban plugin is not installed"))
if os.path.exists(_config):
try:
_conf_data = json.loads(public.ReadFile(_config))
except:
_conf_data = {}
else:
_conf_data = {}
import PluginLoader
# if scan_status == "1" and PluginLoader.plugin_run('fail2ban', 'get_fail2ban_status', get) is False:
# get.type = "start"
# PluginLoader.plugin_run('fail2ban', 'set_fail2ban_status', get)
for key in server_lists:
tmp = param_dict.copy()
tmp["port"] = server_lists[key]
tmp["mode"] = key
if key not in _conf_data:
tmp["type"] = "add"
else:
tmp["maxretry"] = _conf_data[key]["maxretry"]
tmp["findtime"] = _conf_data[key]["findtime"]
tmp["bantime"] = _conf_data[key]["bantime"]
tmp = public.to_dict_obj(tmp)
PluginLoader.plugin_run('fail2ban', 'set_anti', tmp)
del tmp
# if scan_status == "0":
# get.type = "stop"
# PluginLoader.plugin_run('fail2ban', 'set_fail2ban_status', get)
public.WriteLog("Port Scanning Prevention", "[Security]-[System Firewall]-[Set Port Scanning Prevention]")
return public.return_message(0, 0, public.lang("Setup successful!"))
def del_ban_ip(self, get):
"""
删除封锁IP
@param get:
@return:
"""
get.ip = get.ip
import PluginLoader
server_lists = self._get_server_lists_scan()
for key in server_lists:
get.mode = key
PluginLoader.plugin_run('fail2ban', 'ban_ip_release', get)
return public.return_message(0, 0, public.lang("Unlocked successful!"))
# 端口防扫描 --- end111
class firewalld:
__TREE = None
__ROOT = None
__CONF_FILE = '/etc/firewalld/zones/public.xml'
# 初始化配置文件XML对象
def __init__(self):
if self.__TREE:
return
if not os.path.exists(self.__CONF_FILE):
return
self.__TREE = ElementTree()
try:
self.__TREE.parse(self.__CONF_FILE)
self.__ROOT = self.__TREE.getroot()
except:
return
# 获取规则列表
def GetAcceptPortList(self):
try:
mlist = self.__ROOT.getchildren()
except:
mlist = []
data, arry = [], []
if len(mlist) < 1:
return data, arry
data, arry = [], []
for p in mlist:
tmp = {}
if p.tag == 'port':
tmp["protocol"] = p.attrib['protocol']
tmp['ports'] = p.attrib['port']
tmp['types'] = 'accept'
tmp['address'] = ''
elif p.tag == 'forward-port':
tmp["protocol"] = p.attrib['protocol']
tmp["port"] = p.attrib['port']
tmp["address"] = p.attrib.get('to-addr', '')
tmp["to-port"] = p.attrib['to-port']
arry.append(tmp)
continue
elif p.tag == 'rule':
tmp["types"] = 'accept'
tmp['ports'] = ''
tmp['protocol'] = ''
ch = p.getchildren()
for c in ch:
if c.tag == 'port':
tmp['protocol'] = c.attrib['protocol']
tmp['ports'] = c.attrib['port']
elif c.tag == 'drop':
tmp['types'] = 'drop'
elif c.tag == 'reject':
tmp['types'] = 'reject'
elif c.tag == 'source':
if "address" in c.attrib.keys():
tmp['address'] = c.attrib['address']
if "address" not in tmp:
tmp['address'] = ''
else:
continue
if tmp:
data.append(tmp)
return data, arry
def recombine_rules(self):
'''
重组防火墙规则,将tcp和udp端口相同的规则合并111111
@return:
'''
firewalld_rules = self.GetAcceptPortList()[0]
tcp_rules = []
udp_rules = []
for rule in firewalld_rules:
if rule['protocol'] == 'tcp':
tcp_rules.append(rule)
elif rule['protocol'] == 'udp':
udp_rules.append(rule)
result_rules = []
for tcp_rule in tcp_rules:
for udp_rule in udp_rules:
if tcp_rule['ports'] == udp_rule['ports']:
if tcp_rule['types'] == udp_rule['types']:
if tcp_rule['address'] == udp_rule['address']:
if tcp_rule['protocol'] != udp_rule['protocol']:
tcp_rule['protocol'] = 'tcp/udp'
udp_rules.remove(udp_rule)
break
result_rules.append(tcp_rule)
result_rules.extend(udp_rules)
return result_rules
class Sqlite():
db_file = None # 数据库文件
connection = None # 数据库连接对象
def __init__(self):
self.db_file = "/www/server/panel/data/default.db"
# self.db_file = "/www/server/panel/data/db/firewall.db"
self.create_table()
# 获取数据库对象
def GetConn(self):
try:
if self.connection is None:
self.connection = sqlite3.connect(self.db_file)
self.connection.text_factory = str
except Exception as ex:
return "error: " + str(ex)
def create_table(self):
# 创建firewall_new表记录端口规则
public.M(None).db('firewall_new').execute('''CREATE TABLE IF NOT EXISTS "firewall_new" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"protocol" TEXT DEFAULT '',
"ports" TEXT,
"types" TEXT,
"address" TEXT DEFAULT '',
"brief" TEXT DEFAULT '',
"addtime" TEXT DEFAULT '');''')
public.M(None).db('firewall_new').execute('CREATE INDEX firewall_new_port ON firewall_new (ports);')
if public.M('firewall_new').count() < 1:
# 写入默认数据
if not public.M('firewall_new').where('ports=?', ('80',)).count():
public.M('firewall_new').add(
'ports,brief,addtime,protocol,types',
('80', 'Website default port', '0000-00-00 00:00:00', 'tcp', 'accept')
)
if not public.M('firewall_new').where('ports=?', ('21',)).count():
public.M('firewall_new').add(
'ports,brief,addtime,protocol,types',
('21', 'FTP service', '0000-00-00 00:00:00', 'tcp', 'accept')
)
if not public.M('firewall_new').where('ports=?', ('22',)).count():
public.M('firewall_new').add(
'ports,brief,addtime,protocol,types',
('22', 'SSH remote service', '0000-00-00 00:00:00', 'tcp', 'accept')
)
try:
_panel_port_file = '/www/server/panel/data/port.pl'
panel_port = public.readFile(_panel_port_file).strip(" ").strip("\n")
except Exception:
panel_port = '8888'
if not public.M('firewall_new').where('ports=?', (panel_port,)).count():
public.M('firewall_new').add(
'ports,brief,addtime,protocol,types',
(panel_port, 'panel', '0000-00-00 00:00:00', 'tcp', 'accept')
)
# 创建firewall_ip表记录IP规则屏蔽或放行
if not public.M('sqlite_master').where(
'type=? AND name=?', ('table', 'firewall_ip')).count():
public.M('').execute('''CREATE TABLE "firewall_ip" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"types" TEXT,
"address" TEXT DEFAULT '',
"brief" TEXT DEFAULT '',
"addtime" TEXT DEFAULT '',
"sid" INTEGER DEFAULT 0,
"domain" TEXT DEFAULT '',
"chain" TEXT DEFAULT ''
);''')
public.M('').execute(
'CREATE INDEX firewall_ip_addr ON firewall_ip (address);')
# 创建firewall_trans表记录端口转发记录
if not public.M('sqlite_master').where(
'type=? AND name=?', ('table', 'firewall_trans')).count():
public.M('').execute('''CREATE TABLE firewall_trans (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"start_port" TEXT,
"ended_ip" TEXT,
"ended_port" TEXT,
"protocol" TEXT DEFAULT '',
"addtime" TEXT DEFAULT '');''')
public.M('').execute(
'CREATE INDEX firewall_trans_port ON firewall_trans (start_port);'
)
# 创建firewall_country表记录IP规则屏蔽或放行
if not public.M('sqlite_master').where(
'type=? AND name=?', ('table', 'firewall_country')).count():
public.M('').execute('''CREATE TABLE "firewall_country" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"types" TEXT,
"country" TEXT DEFAULT '',
"brief" TEXT DEFAULT '',
"addtime" TEXT DEFAULT '');''')
public.M('').execute('CREATE INDEX firewall_country_name ON firewall_country (country);')
# 创建firewall_domain表记录域名规则屏蔽或放行
if not public.M('sqlite_master').where('type=? AND name=?', ('table', 'firewall_domain')).count():
public.M('').execute('''CREATE TABLE "firewall_domain" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"types" TEXT,
"domain" TEXT,
"domain_total" TEXT,
"port" TEXT,
"sid" int DEFAULT 0,
"address" TEXT DEFAULT '',
"brief" TEXT DEFAULT '',
"protocol" TEXT DEFAULT '',
"addtime" TEXT DEFAULT '');''')
public.M('').execute('CREATE INDEX firewall_domain_addr ON firewall_domain (domain);')
public.M(None).db('firewall_malicious_ip').execute('CREATE INDEX firewall_malicious_ip_address ON firewall_malicious_ip (address);')
# 修复之前已经创建的 firewall_domain 表无 domain_total 字段的问题
create_table_str = public.M('firewall_new').table('sqlite_master').where(
'type=? AND name=?', ('table', 'firewall_new')).getField('sql')
if 'domain_total' not in create_table_str:
public.M('firewall_new').execute('ALTER TABLE "firewall_domain" ADD "domain_total" TEXT DEFAULT ""')
# 修复之前已经创建的 firewall_new 表无 domain 字段的问题
create_table_str = public.M('firewall_new').table('sqlite_master').where(
'type=? AND name=?', ('table', 'firewall_new')).getField('sql')
if 'domain' not in create_table_str:
public.M('firewall_new').execute('ALTER TABLE "firewall_new" ADD "domain" TEXT DEFAULT ""')
if 'sid' not in create_table_str:
public.M('firewall_new').execute('ALTER TABLE "firewall_new" ADD "sid" int DEFAULT 0')
if 'chain' not in create_table_str:
public.M('firewall_new').execute('ALTER TABLE "firewall_new" ADD "chain" TEXT DEFAULT ""')
# 修复之前已经创建的 firewall_ip 表无 domain 字段的问题
create_table_str = public.M('firewall_ip').table('sqlite_master').where(
'type=? AND name=?', ('table', 'firewall_ip')).getField('sql')
if 'sid' not in create_table_str:
public.M('firewall_ip').execute('ALTER TABLE "firewall_ip" ADD "sid" int DEFAULT 0')
if 'domain' not in create_table_str:
public.M('firewall_ip').execute('ALTER TABLE "firewall_ip" ADD "domain" TEXT DEFAULT ""')
if 'chain' not in create_table_str:
public.M('firewall_ip').execute('ALTER TABLE "firewall_ip" ADD "chain" TEXT DEFAULT ""')
# 修复之前已经创建的 firewall_country 表无 ports 字段的问题
create_table_str = public.M('firewall_country').table('sqlite_master').where(
'type=? AND name=?', ('table', 'firewall_country')).getField('sql')
if 'ports' not in create_table_str:
public.M('firewall_country').execute('ALTER TABLE "firewall_country" ADD "ports" TEXT DEFAULT ""')
def create_trigger(self, sql):
self.GetConn()
self.connection.text_factory = str
try:
result = self.connection.execute(sql)
id = result.lastrowid
self.connection.commit()
return id
except Exception as ex:
return "error: " + str(ex)
sql = """
CREATE TRIGGER update_port AFTER DELETE ON firewall
when old.port!=''
BEGIN
delete from firewall_new where ports = old.port;
delete from firewall_ip where address = old.port;
END;
"""
s = Sqlite()
s.create_trigger(sql)