Files

449 lines
18 KiB
Python
Raw Permalink Normal View History

2026-04-07 02:04:22 +05:30
# coding: utf-8
# +-------------------------------------------------------------------
# | YakPanel
# +-------------------------------------------------------------------
# | Copyright (c) 2015-2099 YakPanel(www.yakpanel.com) All rights reserved.
# +-------------------------------------------------------------------
# | Author: hwliang <hwl@yakpanel.com>
# +-------------------------------------------------------------------
import json
import os
import sys
import time
from datetime import datetime
import public
from YakPanel import (
session,
request,
redirect,
g,
abort,
Response,
)
PANEL_PATH = public.get_panel_path()
public.sys_path_append("class_v2/")
from theme_config import ThemeConfigManager
class panelSetup:
def __init_panel_theme(self) -> None:
# 初始化主题配置管理器(默认检测是否存在配置文件,并新建)
theme_manager = ThemeConfigManager()
# 获取主题配置数据
theme_manager_data = theme_manager.get_config()
# 获取主题配置
g.panel_theme = theme_manager_data.get("data", {})
def init(self):
panel_path = public.get_panel_path()
if os.getcwd() != panel_path: os.chdir(panel_path)
g.ua = request.headers.get('User-Agent', '')
if g.ua:
ua = g.ua.lower()
if ua.find('spider') != -1 or g.ua.find('bot') != -1:
return abort(403)
g.version = '8.3.0'
g.title = public.GetConfigValue('title')
g.uri = request.path
g.debug = os.path.exists('data/debug.pl')
g.pyversion = sys.version_info[0]
session['version'] = g.version
self.__init_panel_theme() # 初始化面板主题配置
if not public.get_improvement(): session['is_flush_soft_list'] = 1
if request.method == 'GET':
if not g.debug:
g.cdn_url = public.get_cdn_url()
if not g.cdn_url:
g.cdn_url = '/static'
else:
g.cdn_url = '//' + g.cdn_url + '/' + g.version
else:
g.cdn_url = '/static'
session['title'] = g.title
g.recycle_bin_open = 0
if os.path.exists("data/recycle_bin.pl"): g.recycle_bin_open = 1
g.recycle_bin_db_open = 0
if os.path.exists("data/recycle_bin_db.pl"): g.recycle_bin_db_open = 1
g.is_aes = False
self.other_import()
return None
def other_import(self):
g.o = public.readFile('data/o.pl')
g.other_css = []
g.other_js = []
if g.o:
s_path = 'YakPanel/static/other/{}'
css_name = "css/{}.css".format(g.o)
css_file = s_path.format(css_name)
if os.path.exists(css_file): g.other_css.append('/static/other/{}'.format(css_name))
js_name = "js/{}.js".format(g.o)
js_file = s_path.format(js_name)
if os.path.exists(js_file): g.other_js.append('/static/other/{}'.format(js_name))
class panelAdmin(panelSetup):
setupPath = '/www/server'
# 本地请求
def local(self):
result = panelSetup().init()
if result:
return result
result = self.check_login()
if result:
# public.print_log("local 2登录检查返回 {}".format(result))
return result
result = self.setSession()
if result:
return result
result = self.checkClose()
if result:
return result
result = self.checkWebType()
if result:
return result
result = self.checkConfig()
self.GetOS()
@staticmethod
def __set_session_munes():
g.menus = public.get_menus_for_session_router()
g.yaer = datetime.now().year
session['menus'] = g.menus
# 设置基础Session
def setSession(self):
session["top_tips"] = public.lang(
"The current IE browser version is too low to display some features, please use another browser."
" Or if you use a browser developed by a Chinese company, please switch to Extreme Mode!"
)
session["bt_help"] = public.lang("For Support|Suggestions, please visit the YakPanel Forum")
session["download"] = public.lang("Downloading:")
if request.method == 'GET':
self.__set_session_munes()
if not 'brand' in session:
session['brand'] = public.GetConfigValue('brand')
session['product'] = public.GetConfigValue('product')
session['rootPath'] = '/www'
session['download_url'] = 'https://node.yakpanel.com'
session['setupPath'] = session['rootPath'] + '/server'
session['logsPath'] = '/www/wwwlogs'
session['yaer'] = datetime.now().year
# if not 'menu' in session:
# session['menu'] = public.GetLan('menu')
if session.get('uid', None) == 1:
self.__set_session_munes()
if not 'lan' in session:
session['lan'] = public.GetLanguage()
if not 'home' in session:
session['home'] = public.OfficialApiBase()
return False
# 检查Web服务器类型
def checkWebType(self):
# if request.method == 'GET':
if not 'webserver' in session:
if os.path.exists('/usr/local/lsws/bin/lswsctrl'):
session['webserver'] = 'openlitespeed'
elif os.path.exists(self.setupPath + '/apache/bin/apachectl'):
session['webserver'] = 'apache'
else:
session['webserver'] = 'nginx'
if not 'webversion' in session:
if os.path.exists(self.setupPath + '/' + session['webserver'] + '/version.pl'):
session['webversion'] = public.ReadFile(
self.setupPath + '/' + session['webserver'] + '/version.pl').strip()
if not 'phpmyadminDir' in session:
filename = self.setupPath + '/data/phpmyadminDirName.pl'
if os.path.exists(filename):
session['phpmyadminDir'] = public.ReadFile(filename).strip()
return False
# 检查面板是否关闭
def checkClose(self):
if os.path.exists('data/close.pl'):
return redirect('/close')
# 跳转到登录页面
def to_login(self, url, msg=" Login has expired, please log in again"):
session.clear()
x_http_token = request.headers.get('X-Http-Token', '')
if x_http_token:
# 如果是ajax请求
# res = {"status": False, "msg": msg, "redirect": url}
# 修改为通用返回方式
res = {
"status": -1,
"timestamp": int(time.time()),
"message": {
# "result": "successfully added!",
"msg": msg,
"redirect": url
}
}
# public.print_log("有x_http_token res --{}".format(res))
return Response(json.dumps(res), content_type='application/json')
# public.print_log("无x_http_token 跳转 --{}".format(url))
return redirect(url)
# 检查登录
def check_login1(self):
try:
api_check = True
g.api_request = False
if not 'login' in session:
api_check = self.get_sk()
if api_check:
if not isinstance(api_check, dict):
if public.get_admin_path() == '/login':
return redirect('/login?err=1')
return api_check
g.api_request = True
else:
if session['login'] == False:
session.clear()
return redirect(public.get_admin_path())
if 'tmp_login_expire' in session:
s_file = 'data/session/{}'.format(session['tmp_login_id'])
if session['tmp_login_expire'] < time.time():
session.clear()
if os.path.exists(s_file): os.remove(s_file)
return redirect(public.get_admin_path())
if not os.path.exists(s_file):
session.clear()
return redirect(public.get_admin_path())
if not public.check_client_hash():
session.clear()
return redirect(public.get_admin_path())
if api_check:
now_time = time.time()
session_timeout = session.get('session_timeout', 0)
if session_timeout < now_time and session_timeout != 0:
session.clear()
return redirect(public.get_admin_path())
login_token = session.get('login_token', '')
if login_token:
if login_token != public.get_login_token_auth():
session.clear()
return redirect(public.get_admin_path())
# if api_check:
# filename = 'data/sess_files/' + public.get_sess_key()
# if not os.path.exists(filename):
# session.clear()
# return redirect(public.get_admin_path())
# 标记新的会话过期时间
# session['session_timeout'] = time.time() + public.get_session_timeout()
# 标记新的会话过期时间
self.check_session()
except:
public.print_log(public.get_error_info())
session.clear()
public.print_error()
return redirect('/login?id=2')
# 检查登录
def check_login(self):
try:
api_check = True
g.api_request = False
if not 'login' in session:
api_check = self.get_sk()
if api_check:
# if not isinstance(api_check, dict):
# if public.get_admin_path() == '/login':
# return redirect('/login?err=1')
return api_check
g.api_request = True
else:
if session['login'] is False:
return self.to_login(public.get_admin_path())
if 'tmp_login_expire' in session:
s_file = 'data/session/{}'.format(session['tmp_login_id'])
if session['tmp_login_expire'] < time.time():
if os.path.exists(s_file): os.remove(s_file)
return self.to_login(public.get_admin_path(),
'The temporary login has expired, please log in again')
if not os.path.exists(s_file):
return self.to_login(public.get_admin_path(),
'The temporary login has expired, please log in again')
# 检查客户端hash -- 不要删除
if not public.check_client_hash():
return self.to_login(public.get_admin_path(), 'Client verification failed, please log in again')
if api_check:
now_time = time.time()
session_timeout = session.get('session_timeout', 0)
if session_timeout < now_time and session_timeout != 0:
return self.to_login(public.get_admin_path(), "Login session has expired, please log in again")
login_token = session.get('login_token', '')
if login_token and login_token != public.get_login_token_auth():
return self.to_login(public.get_admin_path(), 'Login verification failed, please log in again')
# if api_check:
# filename = 'data/sess_files/' + public.get_sess_key()
# if not os.path.exists(filename):
# session.clear()
# return redirect(public.get_admin_path())
# 标记新的会话过期时间
self.check_session()
except:
public.print_log(public.get_error_info())
public.print_error()
public.print_log("except Login has expired, please log in again")
return self.to_login('/login', ' Login has expired, please log in again')
def check_session(self):
white_list = ['/favicon.ico', '/system?action=GetNetWork']
if g.uri in white_list:
return
session['session_timeout'] = time.time() + public.get_session_timeout()
# 获取sk
def get_sk(self):
save_path = '/www/server/panel/config/api.json'
if not os.path.exists(save_path):
return public.redirect_to_login(None)
# return self.to_login(public.get_admin_path(), "Login session has expired, please log in again")
try:
api_config = json.loads(public.ReadFile(save_path))
except:
os.remove(save_path)
return public.redirect_to_login(None)
# return self.to_login(public.get_admin_path(), "Login session has expired, please log in again")
if not api_config['open']:
return public.redirect_to_login(None)
# return self.to_login(public.get_admin_path(), "Login session has expired, please log in again")
from YakPanel import get_input
get = get_input()
client_ip = public.GetClientIp()
if not 'client_bind_token' in get:
if not 'request_token' in get or not 'request_time' in get:
return public.redirect_to_login(None)
# return self.to_login(public.get_admin_path(), "Login session has expired, please log in again")
num_key = client_ip + '_api'
if not public.get_error_num(num_key, 20):
return public.returnJson(False, '20 consecutive verification failures, prohibited for 1 hour')
if not public.is_api_limit_ip(api_config['limit_addr'],
client_ip): # client_ip in api_config['limit_addr']:
public.set_error_num(num_key)
return public.returnJson(False, '%s[' % public.lang(
"IP validation failed, your access IP is") + client_ip + ']')
else:
num_key = client_ip + '_app'
if not public.get_error_num(num_key, 20):
return public.returnJson(False, '20 consecutive verification failures, prohibited for 1 hour')
a_file = '/dev/shm/' + get.client_bind_token
if not public.path_safe_check(get.client_bind_token):
public.set_error_num(num_key)
return public.returnJson(False, 'illegal request')
if not os.path.exists(a_file):
import panelApi
if not panelApi.panelApi().get_app_find(get.client_bind_token):
public.set_error_num(num_key)
return public.returnJson(False, 'Unbound device')
public.writeFile(a_file, '')
if not 'key' in api_config:
public.set_error_num(num_key)
return public.returnJson(False, 'Key verification failed')
if not 'form_data' in get:
public.set_error_num(num_key)
return public.returnJson(False, 'No form_data data found')
g.form_data = json.loads(public.aes_decrypt(get.form_data, api_config['key']))
get = get_input()
if not 'request_token' in get or not 'request_time' in get:
return public.error_not_login('/login')
g.is_aes = True
g.aes_key = api_config['key']
request_token = public.md5(get.request_time + api_config['token'])
if get.request_token == request_token:
public.set_error_num(num_key, True)
return False
public.set_error_num(num_key)
return public.returnJson(False, 'Secret key verification failed')
# 检查系统配置
def checkConfig(self):
if not 'config' in session:
session['config'] = public.M('config').where("id=?", ('1',)).field(
'webserver,sites_path,backup_path,status,mysql_root').find()
# 4.29 修复config可能是空列表导致赋值不上的问题
if not session['config']:
session['config'] = {}
# if not 'email' in session['config']:
if session['config'] and not 'email' in session['config']:
session['config']['email'] = public.M(
'users').where("id=?", ('1',)).getField('email')
if not 'address' in session:
session['address'] = public.GetLocalIp()
return False
# 获取操作系统类型
def GetOS(self):
if not 'server_os' in session:
tmp = {}
issue_file = '/etc/issue'
redhat_release = '/etc/redhat-release'
if os.path.exists(redhat_release):
tmp['x'] = 'RHEL'
tmp['osname'] = self.get_osname(redhat_release)
elif os.path.exists('/usr/bin/yum'):
tmp['x'] = 'RHEL'
tmp['osname'] = self.get_osname(issue_file)
elif os.path.exists(issue_file):
tmp['x'] = 'Debian'
tmp['osname'] = self.get_osname(issue_file)
session['server_os'] = tmp
return False
def get_osname(self, i_file):
'''
@name 从指定文件中获取系统名称
@author hwliang<2021-04-07>
@param i_file<string> 指定文件全路径
@return string
'''
if not os.path.exists(i_file): return ''
issue_str = public.ReadFile(i_file).strip()
if issue_str: return issue_str.split()[0]
return ''