Files
yakpanel-core/class/ssh_authentication.py
2026-04-07 02:04:22 +05:30

337 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# coding: utf-8
# +-------------------------------------------------------------------
# | version :1.0
# +-------------------------------------------------------------------
# | Author: liangkaiqiang <1249648969@yakpanel.com>
# +-------------------------------------------------------------------
# | SSH 双因子认证
# +--------------------------------------------------------------------
import public,re,os
import platform,time
class ssh_authentication:
__SSH_CONFIG='/etc/ssh/sshd_config'
__PAM_CONFIG='/etc/pam.d/sshd'
__python_pam='/usr/pam_python_so'
__config_pl='/www/server/panel/data/pam_btssh_authentication.pl'
def __init__(self):
'''检查pam_python目录是否存在'''
if not os.path.exists(self.__python_pam):
public.ExecShell("mkdir -p " + self.__python_pam)
public.ExecShell("chmod 600 " + self.__python_pam)
if not os.path.exists(self.__config_pl):
public.ExecShell("echo '%s' >>%s"%(public.GetRandomString(32),self.__config_pl))
public.ExecShell("chmod 600 " + self.__config_pl)
def wirte(self, file, ret):
result = public.writeFile(file, ret)
return result
#重启SSH
def restart_ssh(self):
act = 'restart'
if os.path.exists('/etc/redhat-release'):
version = public.readFile('/etc/redhat-release')
if isinstance(version, str):
if version.find(' 7.') != -1 or version.find(' 8.') != -1:
public.ExecShell("systemctl " + act + " sshd.service")
else:
public.ExecShell("/etc/init.d/sshd " + act)
else:
public.ExecShell("/etc/init.d/sshd " + act)
else:
public.ExecShell("/etc/init.d/sshd " + act)
#查找PAM目录
def get_pam_dir(self):
#Centos 系列
if os.path.exists('/etc/redhat-release'):
version = public.readFile('/etc/redhat-release')
if isinstance(version, str):
if version.find(' 7.') != -1:
return 'auth requisite %s/pam_btssh_authentication.so'%(self.__python_pam)
elif version.find(' 8.') != -1:
return 'auth requisite %s/pam_btssh_authentication.so'%(self.__python_pam)
else:
return False
#Ubuntu
elif os.path.exists('/etc/lsb-release'):
version = public.readFile('/etc/lsb-release')
if isinstance(version, str):
if version.find('16.') != -1:
return 'auth requisite %s/pam_btssh_authentication.so'%(self.__python_pam)
elif version.find('20.') != -1:
return 'auth requisite %s/pam_btssh_authentication.so'%(self.__python_pam)
elif version.find('18.') != -1:
return 'auth requisite %s/pam_btssh_authentication.so'%(self.__python_pam)
else:
return False
#debian
elif os.path.exists('/etc/debian_version'):
version = public.readFile('/etc/debian_version')
if isinstance(version, str):
if version.find('9.') != -1:
return 'auth requisite %s/pam_btssh_authentication.so'%(self.__python_pam)
elif version.find('10.') != -1:
return 'auth requisite %s/pam_btssh_authentication.so'%(self.__python_pam)
else:
return False
return False
#判断PAMSO文件是否存在
def isPamSoExists(self):
check2=self.get_pam_dir()
if not check2: return False
check=check2.split()
if len(check)<3: return False
if os.path.exists(check[2]):
#判断文件大小
if os.path.getsize(check[2])<10240:
self.install_pam_python(check)
return self.isPamSoExists()
return check2
else:
self.install_pam_python(check)
return self.isPamSoExists()
#安装pam_python
def install_pam_python(self,check):
so_path=check[2]
so_name=check[2].split('/')[-1]
public.ExecShell('/usr/local/curl/bin/curl -k -o %s https://www.yakpanel.com/btwaf_rule/pam_python_so/%s'%(so_path,so_name))
public.ExecShell("chmod 600 " + so_path)
return True
#开启双因子认证
def start_ssh_authentication(self):
check=self.isPamSoExists()
if not check:return False
if os.path.exists(self.__PAM_CONFIG):
auth_data=public.readFile(self.__PAM_CONFIG)
if isinstance(auth_data, str):
if auth_data.find("\n"+check) != -1:
return True
else:
auth_data=auth_data+"\n"+check
public.writeFile(self.__PAM_CONFIG,auth_data)
return True
return False
#关闭双因子认证
def stop_ssh_authentication(self):
check=self.isPamSoExists()
if not check:return False
if os.path.exists(self.__PAM_CONFIG):
auth_data=public.readFile(self.__PAM_CONFIG)
if isinstance(auth_data, str):
if auth_data.find("\n"+check) != -1:
auth_data=auth_data.replace("\n"+check,'')
public.writeFile(self.__PAM_CONFIG,auth_data)
return True
else:
return False
return False
#检查是否开启双因子认证
def check_ssh_authentication(self):
check=self.isPamSoExists()
if not check:return False
if os.path.exists(self.__PAM_CONFIG):
auth_data=public.readFile(self.__PAM_CONFIG)
if isinstance(auth_data, str):
if auth_data.find("\n"+check) != -1:
return True
else:
return False
return False
#设置SSH应答模式
def set_ssh_login_user(self):
ssh_password = '\nChallengeResponseAuthentication\\s\\w+'
file = public.readFile(self.__SSH_CONFIG)
if isinstance(file, str):
if len(re.findall(ssh_password, file)) == 0:
file_result = file + '\nChallengeResponseAuthentication yes'
else:
file_result = re.sub(ssh_password, '\nChallengeResponseAuthentication yes', file)
self.wirte(self.__SSH_CONFIG, file_result)
self.restart_ssh()
return public.returnMsg(True, public.lang("Successfully opened"))
#关闭SSH应答模式
def close_ssh_login_user(self):
file = public.readFile(self.__SSH_CONFIG)
ssh_password = '\nChallengeResponseAuthentication\\s\\w+'
if isinstance(file, str):
file_result = re.sub(ssh_password, '\nChallengeResponseAuthentication no', file)
self.wirte(self.__SSH_CONFIG, file_result)
self.restart_ssh()
return public.returnMsg(True, public.lang("Closed successfully"))
#查看SSH应答模式
def check_ssh_login_user(self):
file = public.readFile(self.__SSH_CONFIG)
ssh_password = '\nChallengeResponseAuthentication\\s\\w+'
if isinstance(file, str):
ret = re.findall(ssh_password, file)
if not ret:
return False
else:
if ret[-1].split()[-1] == 'yes':
return True
else:
return False
return False
#关闭密码访问
def stop_password(self):
'''
关闭密码访问
无参数传递
'''
file = public.readFile(self.__SSH_CONFIG)
if isinstance(file, str):
if file.find('PasswordAuthentication') != -1:
file_result = file.replace('\nPasswordAuthentication yes', '\nPasswordAuthentication no')
self.wirte(self.__SSH_CONFIG, file_result)
self.restart_ssh()
return public.returnMsg(True, public.lang("Closed successfully"))
else:
return public.returnMsg(False, public.lang("No password authentication"))
return public.returnMsg(False, public.lang("No password authentication"))
#开启密码登录
def start_password(self):
'''
开启密码登陆
get: 无需传递参数
'''
file = public.readFile(self.__SSH_CONFIG)
if isinstance(file, str):
if file.find('PasswordAuthentication') != -1:
file_result = file.replace('\nPasswordAuthentication no', '\nPasswordAuthentication yes')
self.wirte(self.__SSH_CONFIG, file_result)
self.restart_ssh()
return public.returnMsg(True, public.lang("Turn on password authentication successfully"))
else:
file_result = file + '\nPasswordAuthentication yes'
self.wirte(self.__SSH_CONFIG, file_result)
self.restart_ssh()
return public.returnMsg(True, public.lang("Turn on password authentication successfully"))
return public.returnMsg(False, public.lang("No password authentication"))
#查看密码登录状态
def check_password(self):
'''
查看密码登录状态
无参数传递
'''
file = public.readFile(self.__SSH_CONFIG)
ssh_password = '\nPasswordAuthentication\\s\\w+'
if isinstance(file, str):
ret = re.findall(ssh_password, file)
if not ret:
return False
else:
if ret[-1].split()[-1] == 'yes':
return True
else:
return False
return False
#开启SSH 双因子认证
def start_ssh_authentication_two_factors(self):
if not self.get_pam_dir():return public.returnMsg(False, public.lang("The system is not supported"))
check=self.isPamSoExists()
if not check:return 'False'
if not self.check_ssh_login_user():
self.set_ssh_login_user()
if not self.check_ssh_authentication():
self.start_ssh_authentication()
#如果开启的话,就关闭密码认证
# if self.check_password():
# self.stop_password()
#检查是否开启双因子认证
if self.check_ssh_authentication() and self.check_ssh_login_user():
return public.returnMsg(True, public.lang("Successfully opened"))
return public.returnMsg(True, public.lang("Failed to open"))
#关闭SSH 双因子认证
def close_ssh_authentication_two_factors(self):
if not self.get_pam_dir():return public.returnMsg(False, public.lang("The system is not supported"))
check=self.isPamSoExists()
if not check:return False
if self.check_ssh_authentication():
self.stop_ssh_authentication()
#检查是否关闭双因子认证
#如果是关闭的SSH那么就开启
# if not self.check_password():
# self.start_password()
if not self.check_ssh_authentication():
return public.returnMsg(True, public.lang("Closed"))
if self.stop_ssh_authentication():
return public.returnMsg(True, public.lang("Closed"))
#检查是否开启双因子认证
def check_ssh_authentication_two_factors(self):
if not self.get_pam_dir():return public.returnMsg(False, public.lang("The system is not supported"))
check=self.isPamSoExists()
if not check:return False
if not self.check_ssh_login_user():
return public.returnMsg(False, public.lang("Inactive"))
if not self.check_ssh_authentication():
return public.returnMsg(False, public.lang("Inactive"))
return public.returnMsg(True, public.lang("Activated"))
def is_check_so(self):
'''判断SO文件是否存在'''
if not self.get_pam_dir():return public.returnMsg(False, public.lang("The system is not supported"))
config_data=self.get_pam_dir()
if not config_data:return False
config_data2=config_data.split()
ret={}
ret['so_path']=config_data2[2].split('/')[-1]
if os.path.exists(config_data2[2]):
ret['so_status']=True
else:
ret['so_status']=False
return public.returnMsg(True,ret)
def download_so(self):
'''下载so文件'''
if not self.get_pam_dir():return public.returnMsg(False, public.lang("The system is not supported"))
config_data=self.get_pam_dir()
if not config_data:return False
config_data=config_data.split()
self.install_pam_python(config_data)
#判断下载的文件大小
if os.path.exists(config_data[2]) :
if os.path.getsize(config_data[2])>10240:
return public.returnMsg(True, public.lang("Download the file successfully"))
return public.returnMsg(False, public.lang("download failed"))
#获取Linux系统的主机名
def get_pin(self):
import platform,time
data=platform.uname()
tme_data=time.strftime('%Y-%m-%d%H:%M',time.localtime(time.time()))
#获取秒
tis_data=time.strftime('%S',time.localtime(time.time()))
ip_list=public.ReadFile('/www/server/panel/data/pam_btssh_authentication.pl')
ret={}
if isinstance(ip_list,str):
info=data[0]+data[1]+data[2]+tme_data+ip_list
md5_info=public.Md5(info)
ret['pin']=md5_info[:6]
ret['time']=60-int(tis_data)
return ret
else:
ret['pin']='error'
ret['time']=60
return ret