Files
yakpanel-core/class_v2/ssh_security_v2.py

1332 lines
54 KiB
Python
Raw Permalink Normal View History

2026-04-07 02:04:22 +05:30
# coding: utf-8
# -------------------------------------------------------------------
# YakPanel
# -------------------------------------------------------------------
# Copyright (c) 2015-2017 YakPanel(www.yakpanel.com) All rights reserved.
# -------------------------------------------------------------------
# Author: lkqiang <lkq@yakpanel.com>
# -------------------------------------------------------------------
# SSH 安全类
# ------------------------------
import public, os, re, send_mail, json
from public.validate import Param
class ssh_security:
__type_list = ['ed25519', 'ecdsa', 'rsa', 'dsa']
__key_type_file = '{}/data/ssh_key_type.pl'.format(public.get_panel_path())
__key_files = ['/root/.ssh/id_ed25519', '/root/.ssh/id_ecdsa', '/root/.ssh/id_rsa', '/root/.ssh/id_rsa_bt']
__type_files = {
"ed25519": "/root/.ssh/id_ed25519",
"ecdsa": "/root/.ssh/id_ecdsa",
"rsa": "/root/.ssh/id_rsa",
"dsa": "/root/.ssh/id_dsa"
}
open_ssh_login = public.get_panel_path() + '/data/open_ssh_login.pl'
__SSH_CONFIG = '/etc/ssh/sshd_config'
__ip_data = None
__ClIENT_IP = '/www/server/panel/data/host_login_ip.json'
__pyenv = 'python'
__REPAIR = {"1": {"id": 1,
"type": "file",
"harm": "High",
"repaired": "1",
"level": "3",
"name": "Make sure SSH MaxAuthTries is set between 3-6",
"file": "/etc/ssh/sshd_config",
"Suggestions": "Remove the MaxAuthTries comment symbol # in /etc/ssh/sshd_config, set the maximum number of failed password attempts 3-6 recommended 4",
"repair": "MaxAuthTries 4",
"rule": [{"re": "\nMaxAuthTries\\s*(\\d+)", "check": {"type": "number", "max": 7, "min": 3}}],
"repair_loophole": [{"re": "\n?#?MaxAuthTries\\s*(\\d+)", "check": "\nMaxAuthTries 4"}]},
"2": {"id": 2,
"repaired": "1",
"type": "file",
"harm": "High",
"level": "3",
"name": "SSHD Mandatory use of V2 security protocol",
"file": "/etc/ssh/sshd_config",
"Suggestions": "Set parameters in the /etc/ssh/sshd_config file as follows",
"repair": "Protocol 2",
"rule": [{"re": "\nProtocol\\s*(\\d+)",
"check": {"type": "number", "max": 3, "min": 1}}],
"repair_loophole": [{"re": "\n?#?Protocol\\s*(\\d+)", "check": "\nProtocol 2"}]},
"3": {"id": 3,
"repaired": "1",
"type": "file",
"harm": "High",
"level": "3",
"name": "Set SSH idle exit time",
"file": "/etc/ssh/sshd_config",
"Suggestions": "Set ClientAliveInterval to 300 to 900 in /etc/ssh/sshd_config, which is 5-15 minutes, and set ClientAliveCountMax to 0-3",
"repair": "ClientAliveInterval 600 ClientAliveCountMax 2",
"rule": [{"re": "\nClientAliveInterval\\s*(\\d+)",
"check": {"type": "number", "max": 900, "min": 300}}],
"repair_loophole": [
{"re": "\n?#?ClientAliveInterval\\s*(\\d+)", "check": "\nClientAliveInterval 600"}]},
"4": {"id": 4,
"repaired": "1",
"type": "file",
"harm": "High",
"level": "3",
"name": "Make sure SSH LogLevel is set to INFO",
"file": "/etc/ssh/sshd_config",
"Suggestions": "Set parameters in the /etc/ssh/sshd_config file as follows (uncomment)",
"repair": "LogLevel INFO",
"rule": [{"re": "\nLogLevel\\s*(\\w+)", "check": {"type": "string", "value": ["INFO"]}}],
"repair_loophole": [{"re": "\n?#?LogLevel\\s*(\\w+)", "check": "\nLogLevel INFO"}]},
"5": {"id": 5,
"repaired": "1",
"type": "file",
"harm": "High",
"level": "3",
"name": "Disable SSH users with empty passwords from logging in",
"file": "/etc/ssh/sshd_config",
"Suggestions": "Configure PermitEmptyPasswords to no in /etc/ssh/sshd_config",
"repair": "PermitEmptyPasswords no",
"rule": [
{"re": "\nPermitEmptyPasswords\\s*(\\w+)", "check": {"type": "string", "value": ["no"]}}],
"repair_loophole": [
{"re": "\n?#?PermitEmptyPasswords\\s*(\\w+)", "check": "\nPermitEmptyPasswords no"}]},
"6": {"id": 6,
"repaired": "1",
"type": "file",
"name": "SSH uses the default port 22",
"harm": "High",
"level": "3",
"file": "/etc/ssh/sshd_config",
"Suggestions": "Set Port to 6000 to 65535 in / etc / ssh / sshd_config",
"repair": "Port 60151",
"rule": [{"re": "Port\\s*(\\d+)", "check": {"type": "number", "max": 65535, "min": 22}}],
"repair_loophole": [{"re": "\n?#?Port\\s*(\\d+)", "check": "\nPort 65531"}]}}
__root_login_types = {'yes': 'yes - keys and passwords', 'no': 'no - no login',
'without-password': 'without-password - only key login',
'forced-commands-only': 'forced-commands-only - can only execute commands'}
def __init__(self):
if not public.M('sqlite_master').where('type=? AND name=?', ('table', 'ssh_login_record')).count():
public.M('').execute('''CREATE TABLE ssh_login_record
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
addr TEXT,
server_ip TEXT,
user_agent TEXT,
ssh_user TEXT,
login_time INTEGER DEFAULT 0,
close_time INTEGER DEFAULT 0,
video_addr TEXT
);''')
public.M('').execute('CREATE INDEX ssh_login_record ON ssh_login_record (addr);')
if not os.path.exists(self.__ClIENT_IP):
public.WriteFile(self.__ClIENT_IP, json.dumps([]))
self.__mail = send_mail.send_mail()
self.__mail_config = self.__mail.get_settings()
self._check_pyenv()
try:
self.__ip_data = json.loads(public.ReadFile(self.__ClIENT_IP))
except:
self.__ip_data = []
def _check_pyenv(self):
if os.path.exists('/www/server/panel/pyenv'):
self.__pyenv = 'btpython'
def get_ssh_key_type(self):
'''
获取ssh密钥类型
@author hwliang
:return:
'''
default_type = 'rsa'
if not os.path.exists(self.__key_type_file):
return default_type
new_type = public.ReadFile(self.__key_type_file)
if new_type in self.__type_list:
return new_type
return default_type
def return_python(self):
if os.path.exists('/www/server/panel/pyenv/bin/python'): return '/www/server/panel/pyenv/bin/python'
if os.path.exists('/usr/bin/python'): return '/usr/bin/python'
if os.path.exists('/usr/bin/python3'): return '/usr/bin/python3'
return 'python'
def return_profile(self):
if os.path.exists('/root/.bash_profile'): return '/root/.bash_profile'
if os.path.exists('/etc/profile'): return '/etc/profile'
fd = open('/root/.bash_profil', mode="w", encoding="utf-8")
fd.close()
return '/root/.bash_profil'
def return_bashrc(self):
if os.path.exists('/root/.bashrc'): return '/root/.bashrc'
if os.path.exists('/etc/bashrc'): return '/etc/bashrc'
if os.path.exists('/etc/bash.bashrc'): return '/etc/bash.bashrc'
fd = open('/root/.bashrc', mode="w", encoding="utf-8")
fd.close()
return '/root/.bashrc'
def check_files(self):
try:
json.loads(public.ReadFile(self.__ClIENT_IP))
except:
public.WriteFile(self.__ClIENT_IP, json.dumps([]))
def get_ssh_port(self):
conf = public.readFile(self.__SSH_CONFIG)
if not conf: conf = ''
rep = r"#*Port\s+([0-9]+)\s*\n"
tmp1 = re.search(rep, conf)
port = '22'
if tmp1:
port = tmp1.groups(0)[0]
return port
# 主判断函数
def check_san_baseline(self, base_json):
if base_json['type'] == 'file':
if 'check_file' in base_json:
if not os.path.exists(base_json['check_file']):
return False
else:
if os.path.exists(base_json['file']):
ret = public.ReadFile(base_json['file'])
for i in base_json['rule']:
valuse = re.findall(i['re'], ret)
if i['check']['type'] == 'number':
if not valuse: return False
if not valuse[0]: return False
valuse = int(valuse[0])
if valuse > i['check']['min'] and valuse < i['check']['max']:
return True
else:
return False
elif i['check']['type'] == 'string':
if not valuse: return False
if not valuse[0]: return False
valuse = valuse[0]
if valuse in i['check']['value']:
return True
else:
return False
return True
def san_ssh_security(self, get):
data = {"num": 100, "result": []}
result = []
ret = self.check_san_baseline(self.__REPAIR['1'])
if not ret: result.append(self.__REPAIR['1'])
ret = self.check_san_baseline(self.__REPAIR['2'])
if not ret: result.append(self.__REPAIR['2'])
ret = self.check_san_baseline(self.__REPAIR['3'])
if not ret: result.append(self.__REPAIR['3'])
ret = self.check_san_baseline(self.__REPAIR['4'])
if not ret: result.append(self.__REPAIR['4'])
ret = self.check_san_baseline(self.__REPAIR['5'])
if not ret: result.append(self.__REPAIR['5'])
ret = self.check_san_baseline(self.__REPAIR['6'])
if not ret: result.append(self.__REPAIR['6'])
data["result"] = result
if len(result) >= 1:
data['num'] = data['num'] - (len(result) * 10)
return data
################## SSH 登陆报警设置 ####################################
def send_mail_data(self, title, body, login_ip, type=None):
# public.print_log(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
# public.print_log((title, body, login_ip))
from panel_msg.collector import SitePushMsgCollect
msg = SitePushMsgCollect.ssh_login(body)
push_data = {
"login_ip": "" if body.find("backdoor user") != -1 else (login_ip if login_ip != "" else "unknown ip"),
"msg_list": ['>Send content:' + body]
}
# public.print_log(push_data)
try:
import sys
if "/www/server/panel" not in sys.path:
sys.path.insert(0, "/www/server/panel")
from mod.base.push_mod import push_by_task_keyword
# public.print_log(push_data)
res = push_by_task_keyword("ssh_login", "ssh_login", push_data=push_data)
if res:
return
except:
pass
try:
login_send_type_conf = "/www/server/panel/data/ssh_send_type.pl"
if not os.path.exists(login_send_type_conf):
return
# login_type = "mail"
else:
login_type = public.readFile(login_send_type_conf).strip()
if not login_type:
login_type = "mail"
object = public.init_msg(login_type.strip())
if not object:
return False
if login_type == "mail":
data = {}
data['title'] = title
data['msg'] = body
object.push_data(data)
elif login_type == "wx_account":
from push.site_push import ToWechatAccountMsg
if body.find("backdoor user") != -1:
msg = ToWechatAccountMsg.ssh_login("")
else:
msg = ToWechatAccountMsg.ssh_login(login_ip if login_ip != "" else "unknown ip")
object.send_msg(msg)
else:
msg = public.get_push_info("SSH logon alarm", ['>Send content:' + body])
msg['push_type'] = "SSH logon alarm"
object.push_data(msg)
except:
pass
# 检测非UID为0的账户
def check_user(self):
ret = []
cfile = '/etc/passwd'
if os.path.exists(cfile):
f = open(cfile, 'r')
for i in f:
i = i.strip().split(":")
if i[2] == '0' and i[3] == '0':
if i[0] == 'root': continue
ret.append(i[0])
if ret:
data = ''.join(ret)
public.run_thread(self.send_mail_data,
args=(public.GetLocalIp() + ' There is a backdoor user in the server',
public.GetLocalIp() + ' There is a backdoor user in the server ' + data + ' please check/etc/passwd',))
return True
else:
return False
# 记录root 的登陆日志
# 返回登陆IP
def return_ip(self, get):
self.check_files()
# return public.returnMsg(True, self.__ip_data)
return public.return_message(0, 0, self.__ip_data)
# 添加IP白名单
def add_return_ip(self, get):
# 校验参数
try:
get.validate([
Param('ip').Require().String().Ip(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
self.check_files()
if get.ip.strip() in self.__ip_data:
# return public.returnMsg(False, public.lang("Already exists"))
return public.return_message(-1, 0, public.lang("Already exists"))
else:
self.__ip_data.append(get.ip.strip())
public.writeFile(self.__ClIENT_IP, json.dumps(self.__ip_data))
# return public.returnMsg(True, public.lang("Added successfully"))
return public.return_message(0, 0, public.lang("Added successfully"))
def del_return_ip(self, get):
# 校验参数
try:
get.validate([
Param('ip').Require().Ip(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
self.check_files()
if get.ip.strip() in self.__ip_data:
self.__ip_data.remove(get.ip.strip())
public.writeFile(self.__ClIENT_IP, json.dumps(self.__ip_data))
# return public.returnMsg(True, public.lang("Successfully deleted"))
return public.return_message(0, 0, public.lang("Successfully deleted"))
else:
# return public.returnMsg(False, public.lang("IP does not exist"))
return public.return_message(-1, 0, public.lang("IP does not exist"))
# 取登陆的前50个条记录
def login_last(self):
self.check_files()
data = public.ExecShell('last -n 50')
data = re.findall(r"(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)",
data[0])
if data >= 1:
data2 = list(set(data))
for i in data2:
if not i in self.__ip_data:
self.__ip_data.append(i)
public.writeFile(self.__ClIENT_IP, json.dumps(self.__ip_data))
return self.__ip_data
# 获取ROOT当前登陆的IP
def get_ip(self):
data = public.ExecShell(''' who am i |awk ' {print $5 }' ''')
data = re.findall(r"(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)",
data[0])
return data
def get_logs(self, get):
# 分页校验参数
try:
get.validate([
Param('p_size').Integer(),
Param('p').Integer(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
import page
page = page.Page()
count = public.M('logs').where('type=?', ('SSH security',)).count()
limit = 10
info = {}
info['count'] = count
info['row'] = limit
info['p'] = 1
if hasattr(get, 'p'):
info['p'] = int(get['p'])
info['uri'] = get
info['return_js'] = ''
if hasattr(get, 'tojs'):
info['return_js'] = get.tojs
data = {}
# 获取分页数据
data['page'] = page.GetPage(info, '1,2,3,4,5,8')
data['data'] = public.M('logs').where('type=?', (u'SSH security',)).order('id desc').limit(
str(page.SHIFT) + ',' + str(page.ROW)).field('log,addtime').select()
# return data
return public.return_message(0, 0, data)
def get_server_ip(self):
if os.path.exists('/www/server/panel/data/iplist.txt'):
data = public.ReadFile('/www/server/panel/data/iplist.txt')
return data.strip()
else:
return '127.0.0.1'
# 登陆的情况下
def login(self):
self.check_files()
self.check_user()
self.__ip_data = json.loads(public.ReadFile(self.__ClIENT_IP))
ip = self.get_ip()
if len(ip[0]) == 0: return False
try:
import time
mDate = time.strftime('%Y-%m-%d %X', time.localtime())
if ip[0] in self.__ip_data:
if public.M('logs').where('type=? addtime', ('SSH security', mDate,)).count(): return False
public.WriteLog('SSH security',
'The server {} login IP is {}, login user is root'.format(public.GetLocalIp(), ip[0]))
return False
else:
if public.M('logs').where('type=? addtime', ('SSH security', mDate,)).count(): return False
self.send_mail_data('Server {} login alarm'.format(public.GetLocalIp()),
'There is a login alarm on the server {}, the login IP is {}, the login user is root'.format(
public.GetLocalIp(), ip[0]))
public.WriteLog('SSH security',
'There is a login alarm on the server {}, the login IP is {}, login user is root'.format(
public.GetLocalIp(), ip[0]))
return True
except:
pass
# 修复bashrc文件
def repair_bashrc(self):
data = public.ReadFile(self.return_bashrc())
if re.search(self.return_python() + ' /www/server/panel/class/ssh_security.py', data):
public.WriteFile(self.return_bashrc(),
data.replace(self.return_python() + ' /www/server/panel/class/ssh_security.py login', ''))
# 遗留的错误信息
datassss = public.ReadFile(self.return_bashrc())
if re.search(self.return_python(), datassss):
public.WriteFile(self.return_bashrc(), datassss.replace(self.return_python(), ''))
# 开启监控
def start_jian(self, get):
self.repair_bashrc()
data = public.ReadFile(self.return_profile())
if not re.search(self.return_python() + ' /www/server/panel/class/ssh_security.py', data):
cmd = '''shell="%s /www/server/panel/class/ssh_security.py login"
nohup `${shell}` &>/dev/null &
disown $!''' % (self.return_python())
public.WriteFile(self.return_profile(), data.strip() + '\n' + cmd)
return public.returnMsg(True, public.lang("Open successfully"))
return public.returnMsg(False, public.lang("Open failed"))
# 关闭监控
def stop_jian(self, get):
data = public.ReadFile(self.return_profile())
if re.search(self.return_python() + ' /www/server/panel/class/ssh_security.py', data):
cmd = '''shell="%s /www/server/panel/class/ssh_security.py login"''' % (self.return_python())
data = data.replace(cmd, '')
cmd = '''nohup `${shell}` &>/dev/null &'''
data = data.replace(cmd, '')
cmd = '''disown $!'''
data = data.replace(cmd, '')
public.WriteFile(self.return_profile(), data)
# 检查是否还存在遗留
if re.search(self.return_python() + ' /www/server/panel/class/ssh_security.py', data):
public.WriteFile(self.return_profile(),
data.replace(self.return_python() + ' /www/server/panel/class/ssh_security.py login',
''))
# 遗留的错误信息
datassss = public.ReadFile(self.return_profile())
if re.search(self.return_python(), datassss):
public.WriteFile(self.return_profile(), datassss.replace(self.return_python(), ''))
return public.returnMsg(True, public.lang("Closed successfully"))
else:
return public.returnMsg(True, public.lang("Closed successfully"))
# 监控状态
def get_jian(self, get):
data = public.ReadFile(self.return_profile())
# if re.search(r'{}\/www\/server\/panel\/class\/ssh_security.py\s+login'.format(r".*python\s+"), data):
if re.search('/www/server/panel/class/ssh_security.py login', data):
return public.returnMsg(True, public.lang("1"))
else:
return public.returnMsg(False, public.lang("1"))
def set_password(self, get):
'''
开启密码登陆
get: 无需传递参数
'''
ssh_password = r'\n#?PasswordAuthentication\s\w+'
file = public.readFile(self.__SSH_CONFIG)
if not file:
return public.return_message(-1, 0, public.lang(
"ERROR: sshd config configuration file does not exist, cannot continue!"))
# return public.returnMsg(False, public.lang("ERROR: sshd config configuration file does not exist, cannot continue!"))
if len(re.findall(ssh_password, file)) == 0:
file_result = file + '\nPasswordAuthentication yes'
else:
file_result = re.sub(ssh_password, '\nPasswordAuthentication yes', file)
self.wirte(self.__SSH_CONFIG, file_result)
self.restart_ssh()
public.WriteLog('SSH management', 'Enable password login')
# return public.returnMsg(True, public.lang("Open successfully"))
return public.return_message(0, 0, public.lang("Open successfully"))
def set_sshkey(self, get):
'''
设置ssh 的key
参数 ssh=rsa&type=yes
'''
# 分页校验参数
try:
get.validate([
Param('ssh').Require().String('in', ['yes', 'no']).Xss(),
Param('type').Require().Xss(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
# ssh_type = ['yes', 'no']
ssh = get.ssh
# if not ssh in ssh_type: return public.returnMsg(False, public.lang("ssh option failed"))
s_type = get.type
if not s_type in self.__type_list:
# return public.returnMsg(False, public.lang("Wrong encryption method"))
return public.return_message(-1, 0, public.lang("Wrong encryption method"))
authorized_keys = '/root/.ssh/authorized_keys'
file = ['/root/.ssh/id_{}.pub'.format(s_type), '/root/.ssh/id_{}'.format(s_type)]
for i in file:
if os.path.exists(i):
public.ExecShell(r'sed -i "\~$(cat %s)~d" %s' % (file[0], authorized_keys))
os.remove(i)
os.system("ssh-keygen -t {s_type} -P '' -f /root/.ssh/id_{s_type} |echo y".format(s_type=s_type))
if os.path.exists(file[0]):
public.ExecShell('cat %s >> %s && chmod 600 %s' % (file[0], authorized_keys, authorized_keys))
rec = r'\n#?RSAAuthentication\s\w+'
rec2 = r'\n#?PubkeyAuthentication\s\w+'
file = public.readFile(self.__SSH_CONFIG)
if not file:
# return public.returnMsg(False, public.lang("ERROR: sshd config configuration file does not exist, cannot continue!"))
return public.return_message(-1, 0, public.lang("ERROR: sshd config configuration file does not exist"))
if len(re.findall(rec, file)) == 0: file = file + '\nRSAAuthentication yes'
if len(re.findall(rec2, file)) == 0: file = file + '\nPubkeyAuthentication yes'
file_ssh = re.sub(rec, '\nRSAAuthentication yes', file)
file_result = re.sub(rec2, '\nPubkeyAuthentication yes', file_ssh)
if ssh == 'no':
ssh_password = r'\n#?PasswordAuthentication\s\w+'
if len(re.findall(ssh_password, file_result)) == 0:
file_result = file_result + '\nPasswordAuthentication no'
else:
file_result = re.sub(ssh_password, '\nPasswordAuthentication no', file_result)
self.wirte(self.__SSH_CONFIG, file_result)
public.writeFile(self.__key_type_file, s_type)
self.restart_ssh()
public.WriteLog('SSH management', 'Set up SSH key authentication and successfully generate the key')
# return public.returnMsg(True, public.lang("Open successfully"))
return public.return_message(0, 0, public.lang("Open successfully"))
else:
public.WriteLog('SSH management', 'Failed to set SSH key authentication')
# return public.returnMsg(False, public.lang("Open failed"))
return public.return_message(-1, 0, public.lang("Open failed"))
# 取SSH信息
def get_msg_push_list(self, get):
"""
@name 获取消息通道配置列表
@auther: cjxin
@date: 2022-08-16
@
"""
cpath = 'data/msg.json'
try:
if 'force' in get or not os.path.exists(cpath):
public.downloadFile('{}/linux/panel/msg/msg.json'.format("https://node.yakpanel.com"), cpath)
except:
pass
data = {}
if os.path.exists(cpath):
msgs = json.loads(public.readFile(cpath))
for x in msgs:
x['setup'] = False
x['info'] = False
key = x['name']
try:
obj = public.init_msg(x['name'])
if obj:
x['setup'] = True
x['info'] = obj.get_version_info(None)
except:
print(public.get_error_info())
pass
data[key] = x
# return data
return public.return_message(0, 0, data)
def _get_msg_push_list(self, get):
"""
@name 获取消息通道配置列表
@auther: cjxin
@date: 2022-08-16
@
"""
cpath = 'data/msg.json'
try:
if 'force' in get or not os.path.exists(cpath):
public.downloadFile('{}/linux/panel/msg/msg.json'.format("https://node.yakpanel.com"), cpath)
except:
pass
data = {}
if os.path.exists(cpath):
msgs = json.loads(public.readFile(cpath))
for x in msgs:
x['setup'] = False
x['info'] = False
key = x['name']
try:
obj = public.init_msg(x['name'])
if obj:
x['setup'] = True
x['info'] = obj.get_version_info(None)
except:
print(public.get_error_info())
pass
data[key] = x
return data
# return public.return_message(0, 0, data)
# 取消告警
def clear_login_send(self, get):
# 校验参数
try:
get.validate([
Param('type').Require().String().Xss(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
login_send_type_conf = "/www/server/panel/data/ssh_send_type.pl"
os.remove(login_send_type_conf)
self.stop_jian(get)
# return public.returnMsg(True, public.lang("Successfully cancel the login alarm"))
return public.return_message(0, 0, public.lang("Successfully cancel the login alarm"))
# 设置告警
def set_login_send(self, get):
# 校验参数
try:
get.validate([
Param('type').Require().String().Xss(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
login_send_type_conf = "/www/server/panel/data/ssh_send_type.pl"
set_type = get.type.strip()
msg_configs = self._get_msg_push_list(get)
# public.print_log("22222 --{}".format(msg_configs.keys()))
if set_type not in msg_configs.keys():
# return public.returnMsg(False, public.lang("This send type is not supported"))
return public.return_message(-1, 0, public.lang("This send type is not supported"))
from panelMessage import panelMessage
pm = panelMessage()
obj = pm.init_msg_module(set_type)
if not obj:
# return public.returnMsg(False, public.lang("The message channel is not installed."))
return public.return_message(-1, 0, public.lang("The message channel is not installed"))
public.writeFile(login_send_type_conf, set_type)
self.start_jian(get)
# return public.returnMsg(True, public.lang("Successfully set"))
return public.return_message(0, 0, public.lang("Successfully set"))
# 查看告警
def get_login_send(self, get):
# 仅返回当前配置的通道
login_send_type_conf = "/www/server/panel/data/ssh_send_type.pl"
if os.path.exists(login_send_type_conf):
send_type = public.readFile(login_send_type_conf).strip()
else:
send_type = "error"
# return public.returnMsg(True, send_type)
return public.return_message(0, 0, send_type)
def GetSshInfo(self, get):
# port = public.get_ssh_port()
pid_file = '/run/sshd.pid'
if os.path.exists(pid_file):
pid = int(public.readFile(pid_file))
status = public.pid_exists(pid)
else:
import system
panelsys = system.system()
version = panelsys.GetSystemVersion()
if os.path.exists('/usr/bin/apt-get'):
if os.path.exists('/etc/init.d/sshd'):
status = public.ExecShell("service sshd status | grep -P '(dead|stop)'|grep -v grep")
else:
status = public.ExecShell("service ssh status | grep -P '(dead|stop)'|grep -v grep")
else:
if version.find(' 7.') != -1 or version.find(' 8.') != -1 or version.find('Fedora') != -1:
status = public.ExecShell("systemctl status sshd.service | grep 'dead'|grep -v grep")
else:
status = public.ExecShell("/etc/init.d/sshd status | grep -e 'stopped' -e '已停'|grep -v grep")
# return status;
if len(status[0]) > 3:
status = False
else:
status = True
return status
def stop_key(self, get):
'''
关闭key
无需参数传递
'''
is_ssh_status = self.GetSshInfo(get)
rec = r'\n\s*#?\s*RSAAuthentication\s+\w+'
rec2 = r'\n\s*#?\s*PubkeyAuthentication\s+\w+'
file = public.readFile(self.__SSH_CONFIG)
if not file:
# return public.returnMsg(False, public.lang("错误sshd_config配置文件不存在无法继续!"))
return public.return_message(-1, 0, public.lang("Error: sshd config configuration file does not exist"))
file_ssh = re.sub(rec, '\nRSAAuthentication no', file)
file_result = re.sub(rec2, '\nPubkeyAuthentication no', file_ssh)
self.wirte(self.__SSH_CONFIG, file_result)
if is_ssh_status:
self.set_password(get)
self.restart_ssh()
public.WriteLog('SSH management', 'Disable SSH key login')
# return public.returnMsg(True, public.lang("Disable successfully"))
return public.return_message(0, 0, public.lang("Disable successfully"))
def get_config(self, get):
'''
获取配置文件
无参数传递
'''
result = {}
file = public.readFile(self.__SSH_CONFIG)
if not file:
# return public.returnMsg(False, public.lang("Error: sshd config does not exist"))
return public.return_message(-1, 0, public.lang("Error: sshd config does not exist"))
# ======== 以下在2022-10-12重构 ==========
# author : hwliang
# 是否开启RSA公钥认证
# 默认开启(最新版openssh已经不支持RSA公钥认证)
# yes = 开启
# no = 关闭
result['rsa_auth'] = 'yes'
rec = r'^\s*RSAAuthentication\s*(yes|no)'
rsa_find = re.findall(rec, file, re.M | re.I)
if rsa_find and rsa_find[0].lower() == 'no': result['rsa_auth'] = 'no'
# 获取是否开启公钥认证
# 默认关闭
# yes = 开启
# no = 关闭
result['pubkey'] = 'no'
if self._get_key(get)['msg']: # 先检查是否存在可用的公钥
pubkey = r'^\s*PubkeyAuthentication\s*(yes|no)'
pubkey_find = re.findall(pubkey, file, re.M | re.I)
if pubkey_find and pubkey_find[0].lower() == 'yes': result['pubkey'] = 'yes'
# 是否开启密码登录
# 默认开启
# yes = 开启
# no = 关闭
result['password'] = 'yes'
ssh_password = r'^\s*PasswordAuthentication\s*([\w\-]+)'
ssh_password_find = re.findall(ssh_password, file, re.M | re.I)
if ssh_password_find and ssh_password_find[0].lower() == 'no': result['password'] = 'no'
# 是否允许root登录
# 默认允许
# yes = 允许
# no = 不允许
# without-password = 允许,但不允许使用密码登录
# forced-commands-only = 允许,但只允许执行命令,不能使用终端
result['root_is_login'] = 'yes'
result['root_login_type'] = 'yes'
root_is_login = r'^\s*PermitRootLogin\s*([\w\-]+)'
root_is_login_find = re.findall(root_is_login, file, re.M | re.I)
if root_is_login_find and root_is_login_find[0].lower() != 'yes':
result['root_is_login'] = 'no'
result['root_login_type'] = root_is_login_find[0].lower()
result['root_login_types'] = self.__root_login_types
result['port'] = public.get_sshd_port()
result['key_type'] = public.ReadFile(self.__key_type_file)
# return result
return public.return_message(0, 0, result)
def set_root(self, get):
'''
开启密码登陆
get: 无需传递参数
'''
# without-password yes no forced-commands-only
# 分页校验参数
try:
get.validate([
Param('p_type').String('in', ['yes', 'no', 'without-password', 'forced-commands-only']).Xss(),
], [
public.validate.trim_filter(),
])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.return_message(-1, 0, str(ex))
p_type = 'yes'
if 'p_type' in get: p_type = get.p_type
if p_type not in self.__root_login_types.keys():
# return public.returnMsg(False, public.lang("Parameter passing error!"))
return public.return_message(-1, 0, public.lang("Parameter passing error"))
ssh_password = r'^\s*#?\s*PermitRootLogin\s*([\w\-]+)'
file = public.readFile(self.__SSH_CONFIG)
src_line = re.search(ssh_password, file, re.M)
new_line = 'PermitRootLogin {}'.format(p_type)
if not src_line:
file_result = file + '\n{}'.format(new_line)
else:
file_result = file.replace(src_line.group(), new_line)
self.wirte(self.__SSH_CONFIG, file_result)
self.restart_ssh()
msg = public.lang('Set the root login method as: {}', self.__root_login_types[p_type])
public.WriteLog('SSH management', msg)
# return public.returnMsg(True, msg)
return public.return_message(0, 0, msg)
def set_root_password(self, get):
"""
@name 设置root密码
@param get:
@return:
"""
password = get.password if "password" in get else ""
username = get.username if "username" in get else ""
if not password: return public.return_message(-1, 0, public.lang("The password cannot be empty"))
if len(password) < 8: return public.return_message(-1, 0,
public.lang("The password cannot be less than 8 bits long"))
if get.username not in self.get_sys_user(get)['msg']:
return public.return_message(-1, 0, public.lang("The username already exists"))
has_letter = bool(re.search(r'[a-zA-Z!@#$%^&*()-_+=]', password))
has_digit_or_symbol = bool(re.search(r'[0-9!@#$%^&*()-_+=]', password))
if not has_letter or not has_digit_or_symbol: return public.return_message(-1, 0, public.lang(
"The password must contain letters and numbers or symbols"))
if username == "root":
cmd_result, cmd_err = public.ExecShell("echo root:%s|chpasswd" % password)
else:
cmd_result, cmd_err = public.ExecShell("echo %s:%s|chpasswd" % (username, password))
if cmd_err: return public.return_message(-1, 0, public.lang("Setup failure"))
public.WriteLog("SSH", "[Security] - [SSH] - [Set %s password]" % username)
return public.return_message(0, 0, public.lang("successfully set"))
def get_sys_user(self, get):
"""获取所有用户名
@param:
@return
"""
from collections import deque
user_set = deque()
with open('/etc/passwd') as fp:
for line in fp.readlines():
user_set.append(line.split(':', 1)[0])
return public.returnMsg(True, list(user_set))
def stop_root(self, get):
'''
开启密码登陆
get: 无需传递参数
'''
ssh_password = r'\n\s*PermitRootLogin\s+\w+'
file = public.readFile(self.__SSH_CONFIG)
if len(re.findall(ssh_password, file)) == 0:
file_result = file + '\nPermitRootLogin no'
else:
file_result = re.sub(ssh_password, '\nPermitRootLogin no', file)
self.wirte(self.__SSH_CONFIG, file_result)
self.restart_ssh()
public.WriteLog('SSH management', 'Set the root login method to: no')
return public.returnMsg(True, public.lang("Disable successfully"))
def stop_password(self, get):
'''
关闭密码访问
无参数传递
'''
file = public.readFile(self.__SSH_CONFIG)
ssh_password = r'\n#?PasswordAuthentication\s\w+'
file_result = re.sub(ssh_password, '\nPasswordAuthentication no', file)
self.wirte(self.__SSH_CONFIG, file_result)
self.restart_ssh()
public.WriteLog('SSH management', 'Disable password access')
return public.returnMsg(True, public.lang("Closed successfully"))
def _get_key(self, get):
'''
获取key 无参数传递
'''
key_type = self.get_ssh_key_type()
if key_type in self.__type_files.keys():
key_file = self.__type_files[key_type]
key = public.readFile(key_file)
return public.returnMsg(True, key)
return public.returnMsg(True, public.lang(""))
def get_key(self, get):
'''
获取key 无参数传递
'''
key_type = self.get_ssh_key_type()
if key_type in self.__type_files.keys():
key_file = self.__type_files[key_type]
key = public.readFile(key_file)
return public.return_message(0, 0, key)
return public.return_message(0, 0, public.lang(""))
def download_key(self, get):
'''
@name 下载密钥
'''
download_file = ''
key_type = self.get_ssh_key_type()
if key_type in self.__type_files.keys():
if os.path.exists(self.__type_files[key_type]):
download_file = self.__type_files[key_type]
else:
for file in self.__key_files:
if not os.path.exists(file): continue
download_file = file
break
if not download_file: return public.returnMsg(False, public.lang("Key file not found!"))
from flask import send_file
filename = "{}_{}".format(public.GetHost(), os.path.basename(download_file))
return send_file(download_file, download_name=filename)
def wirte(self, file, ret):
result = public.writeFile(file, ret)
return result
def restart_ssh(self):
'''
重启ssh 无参数传递
'''
version = public.readFile('/etc/redhat-release')
act = 'restart'
if not os.path.exists('/etc/redhat-release'):
public.ExecShell('service ssh ' + act)
elif version.find(' 7.') != -1 or version.find(' 8.') != -1:
public.ExecShell("systemctl " + act + " sshd.service")
else:
public.ExecShell("/etc/init.d/sshd " + act)
# 检查是否设置了钉钉
def check_dingding(self, get):
'''
检查是否设置了钉钉
'''
# 检查文件是否存在
if not os.path.exists('/www/server/panel/data/dingding.json'): return False
dingding_config = public.ReadFile('/www/server/panel/data/dingding.json')
if not dingding_config: return False
# 解析json
try:
dingding = json.loads(dingding_config)
if dingding['dingding_url']:
return True
except:
return False
# 开启SSH双因子认证
def start_auth_method(self, get):
'''
开启SSH双因子认证
'''
# 检查是否设置了钉钉
import ssh_authentication
ssh_class = ssh_authentication.ssh_authentication()
return ssh_class.start_ssh_authentication_two_factors()
# 关闭SSH双因子认证
def stop_auth_method(self, get):
'''
关闭SSH双因子认证
'''
# 检查是否设置了钉钉
import ssh_authentication
ssh_class = ssh_authentication.ssh_authentication()
return ssh_class.close_ssh_authentication_two_factors()
# 获取SSH双因子认证状态
def get_auth_method(self, get):
'''
获取SSH双因子认证状态
'''
# 检查是否设置了钉钉
import ssh_authentication
ssh_class = ssh_authentication.ssh_authentication()
return ssh_class.check_ssh_authentication_two_factors()
# 判断so文件是否存在
def check_so_file(self, get):
'''
判断so文件是否存在
'''
import ssh_authentication
ssh_class = ssh_authentication.ssh_authentication()
return ssh_class.is_check_so()
# 下载so文件
def get_so_file(self, get):
'''
下载so文件
'''
import ssh_authentication
ssh_class = ssh_authentication.ssh_authentication()
return ssh_class.download_so()
# 获取pin
def get_pin(self, get):
'''
获取pin
'''
import ssh_authentication
ssh_class = ssh_authentication.ssh_authentication()
return public.returnMsg(True, ssh_class.get_pin())
def get_login_record(self, get):
if os.path.exists(self.open_ssh_login):
return public.returnMsg(True, public.lang(""))
else:
return public.returnMsg(False, public.lang(""))
def start_login_record(self, get):
if os.path.exists(self.open_ssh_login):
return public.returnMsg(True, public.lang(""))
else:
public.writeFile(self.open_ssh_login, "True")
return public.returnMsg(True, public.lang(""))
def stop_login_record(self, get):
if os.path.exists(self.open_ssh_login):
os.remove(self.open_ssh_login)
return public.returnMsg(True, public.lang(""))
else:
return public.returnMsg(True, public.lang(""))
# 获取登录记录列表
def get_record_list(self, get):
if 'limit' in get:
limit = int(get.limit.strip())
else:
limit = 12
import page
page = page.Page()
count = public.M('ssh_login_record').order("id desc").count()
info = {}
info['count'] = count
info['row'] = limit
info['p'] = 1
if hasattr(get, 'p'):
info['p'] = int(get['p'])
info['uri'] = get
info['return_js'] = ''
if hasattr(get, 'tojs'):
info['return_js'] = get.tojs
data = {}
# 获取分页数据
data['page'] = page.GetPage(info, '1,2,3,4,5,8')
data['data'] = public.M('ssh_login_record').order('id desc').limit(
str(page.SHIFT) + ',' + str(page.ROW)).select()
return data
def get_file_json(self, get):
if os.path.exists(get.path):
ret = json.loads(public.ReadFile(get.path))
return ret
else:
return ''
@staticmethod
def is_redhat():
if os.path.exists("/usr/bin/yum") and os.path.exists("/usr/bin/rpm"):
return True
return False
@staticmethod
def _get_other_conf_list():
sshd_conf = []
if os.path.isdir("/etc/ssh/sshd_config.d"):
for i in os.listdir("/etc/ssh/sshd_config.d"):
file_path = "/etc/ssh/sshd_config.d/{}".format(i)
# 以".conf"结尾且为文件
if i.endswith(".conf") and os.path.isfile(file_path):
tmp_data = public.readFile(file_path)
if isinstance(tmp_data, str):
sshd_conf.append({
"data": tmp_data,
"path": file_path,
"name": i,
})
# 按照字母循序加载的
sshd_conf.sort(key=lambda x: x["name"])
return sshd_conf
def paser_root_login(self, conf_data: str = None) -> tuple:
can_login = 'no'
login_type = 'without-password'
if self.is_redhat():
can_login = 'yes'
login_type = 'yes'
if conf_data is None:
conf_data = public.readFile(self.__SSH_CONFIG)
if not isinstance(conf_data, str):
return can_login, login_type
other_conf = self._get_other_conf_list()
sshd_conf = [
i["data"] for i in other_conf
]
sshd_conf.insert(0, conf_data)
test_re = re.compile(r"^\s*PermitRootLogin\s*(?P<target>[\w\-]+)", re.M)
is_break = False
for cf in sshd_conf:
for tmp_res in test_re.finditer(cf):
login_type = tmp_res.group("target")
is_break = True
break
if is_break:
break
if login_type in ('yes', 'without-password'):
can_login = 'yes'
return can_login, login_type
def get_sshd_anti_logs(self, get):
"""
@name 获取SSH防爆破日志
@param get:
@return:
"""
logs_result = public.run_plugin(
"fail2ban", "get_status", public.to_dict_obj({"mode": "sshd"})
)
if logs_result.get("status") is False:
return public.fail_v2(logs_result.get("msg", "Failed to get SSH anti-brute-force logs"))
logs_result = public.find_value_by_key(logs_result, "msg", {})
return public.success_v2(
{
"currently_failed": logs_result.get("currently_failed", "0"),
"total_failed": logs_result.get("total_failed", "0"),
"currently_banned": logs_result.get("currently_banned", "0"),
"total_banned": logs_result.get("total_banned", "0"),
"banned_ip_list": logs_result.get("banned_ip_list", [])
}
)
def get_anti_conf(self, get):
"""
@name 获取SSH防爆破配置
@param get:
@return:
"""
result_data = {
'maxretry': '30',
'findtime': '300',
'bantime': '600'
}
_set_up_path = "/www/server/panel/plugin/fail2ban"
_config = _set_up_path + "/config.json"
if not os.path.exists(_set_up_path + "/fail2ban_main.py"):
return public.fail_v2(public.lang("Not Install Fail2ban Plugin!"))
if not os.path.exists(_config):
return result_data
_conf_data = json.loads(public.ReadFile(_config))
if not "sshd" in _conf_data:
return public.success_v2(result_data)
result_data["maxretry"] = _conf_data["sshd"]["maxretry"]
result_data["findtime"] = _conf_data["sshd"]["findtime"]
result_data["bantime"] = _conf_data["sshd"]["bantime"]
return public.success_v2(result_data)
def set_anti_conf(self, get):
"""
@name 设置SSH防爆破
@param get: act是控制开关, 参数 maxretry, findtime, bantime 修改配置
@return:
"""
try:
get.validate([
Param("act").String(),
Param("maxretry").Integer(),
Param("findtime").Integer(),
Param("bantime").Integer(),
], [public.validate.trim_filter()])
except Exception as ex:
public.print_log("error info: {}".format(ex))
return public.fail_v2(str(ex))
param_dict = {
'type': 'edit',
'act': 'true',
'maxretry': '30',
'findtime': '300',
'bantime': '600',
'port': "{}".format(public.get_sshd_port()),
'mode': 'sshd'
}
_set_up_path = "/www/server/panel/plugin/fail2ban"
_config = _set_up_path + "/config.json"
if not os.path.exists(_set_up_path + "/fail2ban_main.py"):
return public.fail_v2(public.lang("Not Install Fail2ban Plugin!"))
if not os.path.exists(_config):
param_dict["type"] = "add"
if os.path.exists(_config):
_conf_data = json.loads(public.ReadFile(_config))
if not "sshd" in _conf_data:
param_dict["type"] = "add"
if "sshd" in _conf_data:
param_dict["maxretry"] = _conf_data["sshd"]["maxretry"]
param_dict["findtime"] = _conf_data["sshd"]["findtime"]
param_dict["bantime"] = _conf_data["sshd"]["bantime"]
if "maxretry" in get:
param_dict["maxretry"] = get.maxretry
if "findtime" in get:
param_dict["findtime"] = get.findtime
if "bantime" in get:
param_dict["bantime"] = get.bantime
if "act" in get:
param_dict["act"] = get.act
param_dict = public.to_dict_obj(param_dict)
res = public.run_plugin("fail2ban", "set_anti", param_dict)
public.WriteLog(
"SSH Manager",
f"【Security】-【Server Security】-【Set SSH anti-brute-force status: {res.get('msg', 'fail')}"
)
return public.return_message(0 if res.get("status", False) else -1, 0, res.get("msg", "fail"))
def del_ban_ip(self, get):
"""
删除fail2ban封锁IP
@param get:
@return:
"""
get.mode = "sshd"
get.ip = get.ip
import PluginLoader
res = PluginLoader.plugin_run('fail2ban', 'ban_ip_release', get)
return public.return_message(0 if res.get("status", False) else -1, 0, res.get("msg", "fail"))
if __name__ == '__main__':
import sys
type = sys.argv[1]
if type == 'login':
try:
aa = ssh_security()
aa.login()
except:
pass
else:
pass