Files

294 lines
9.8 KiB
Python
Raw Permalink Normal View History

2026-04-07 02:04:22 +05:30
import json
import os
import shutil
import tarfile
import time
from hashlib import md5
from typing import Optional, List, Union, Dict, Any
from .util import DB, ExecShell, write_file, write_log, read_file
class VersionTool:
_config_file = "/www/server/panel/data/version_config.json"
def __init__(self):
self._config: Optional[Dict[str, List[Dict[str, Any]]]] = None
self._pack_class = BasePack
self.pack_path = "/www/backup/versions"
if not os.path.isdir(self.pack_path):
os.makedirs(self.pack_path)
@property
def config(self) -> Dict[str, List[Dict[str, Any]]]:
if self._config is not None:
return self._config
data = {}
try:
res = read_file(self._config_file)
if isinstance(res, str):
data = json.loads(res)
except (json.JSONDecoder, TypeError, ValueError):
pass
self._config = data
return self._config
def save_config(self):
if self._config is not None:
write_file(self._config_file, json.dumps(self._config))
def add_to_config(self, data: dict):
project_name = data.get("project_name")
self._config = None
if project_name not in self.config:
self.config[project_name] = []
self.config[project_name].append(data)
self.save_config()
def set_pack_class(self, pack_cls):
self._pack_class = pack_cls
def version_list(self, project_name: str):
if project_name in self.config:
return self.config[project_name]
return []
def get_version_info(self, project_name: str, version: str) -> Optional[dict]:
if project_name in self.config:
for i in self.config[project_name]:
if i.get("version") == version:
return i
return None
# 把某个路径下的文件打包并发布为一个版本
def publish_by_src_path(self,
project_name: str, # 名称
src_path: str, # 源路径
version: str, # 版本号
ps: Optional[str] = None, # 备注
other: Optional[dict] = None, # 其他信息
sync: bool = False, # 是否同步执行
):
if project_name in self.config:
for i in self.config[project_name]:
if i["version"] == version:
return "The current version already exists"
if not os.path.isdir(src_path):
return "The source path does not exist"
if ps is None:
ps = ''
if other is None:
other = {}
zip_name = "{}_{}.tar.gz".format(
os.path.basename(src_path), time.strftime('%Y%m%d_%H%M%S', time.localtime())
)
data = {
"project_name": project_name,
"version": version,
"ps": ps,
"other": other,
"zip_name": zip_name,
"backup_time": time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
}
return self._pack_class(src_path, self.pack_path, zip_name, sync=sync, vt=self, data=data)(**other)
def recover(self,
project_name: str, # 名称
version: str, # 版本
target_path: str, # 目标路径
run_path=None
):
if not run_path:
run_path = target_path
if project_name not in self.config:
return 'The project does not exist'
target = None
for i in self.config[project_name]:
if i["version"] == version:
target = i
break
if target is None:
return 'Version does not exist'
file = os.path.join(self.pack_path, target["zip_name"])
if not os.path.exists(file):
return 'Version file missing'
tmp_path = '/tmp/version_{}'.format(int(time.time()))
tar = tarfile.open(file, mode='r')
tar.extractall(tmp_path)
user_data = None
if os.path.exists(target_path):
ExecShell("chattr -i -R {}/".format(target_path))
user_data = read_file(run_path + "/.user.ini")
ExecShell("rm -rf {}".format(target_path))
os.makedirs(target_path)
if not os.path.exists(target_path):
os.makedirs(target_path)
ExecShell(r"\cp -rf {}/* {}".format(tmp_path, target_path))
if user_data:
write_file(target_path + "/.user.ini", run_path)
ExecShell("chattr +i {}/.user.ini".format(run_path))
ExecShell("rm -rf {}".format(tmp_path))
return True
def publish_by_file(self,
project_name: str, # 名称
src_file: str, # 源路径
version: str, # 版本号
ps: Optional[str] = None, # 备注
other: Optional[dict] = None, # 其他信息
):
if project_name in self.config:
for i in self.config[project_name]:
if i["version"] == version:
return "The current version already exists"
if not os.path.isfile(src_file):
return "The source path does not exist"
if ps is None:
ps = ''
if other is None:
other = {}
zip_name = os.path.basename(src_file)
data = {
"project_name": project_name,
"version": version,
"ps": ps,
"other": other,
"zip_name": zip_name,
"backup_time": time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
}
try:
shutil.copy(src_file, self.pack_path + "/" + zip_name)
except:
return "File save failed"
self.add_to_config(data)
return None
def remove(self,
project_name: str, # 名称
version: str, # 版本
) -> Optional[str]:
if project_name not in self.config:
return 'The project does not exist'
target = None
for i in self.config[project_name]:
if i["version"] == version:
target = i
break
if target is None:
return 'Version does not exist'
file = os.path.join(self.pack_path, target["zip_name"])
if os.path.isfile(file):
os.remove(file)
self.config[project_name].remove(target)
self.save_config()
return None
def set_ps(self, name: str, version: str, ps: str):
[i.update({'ps': ps}) for i in self.config[name] if i["version"] == version]
self.save_config()
return True
class BasePack:
exec_log_file = "/tmp/project_pack.log"
def __init__(self, src_path, target_path, zip_name, sync=False, vt: VersionTool = None, data: dict = None):
self.src_path = src_path
self.target_path = target_path
self.zip_name = zip_name
self.sync = sync
self.v = vt
self._add_data = data
def save_config(self):
self.v.add_to_config(self._add_data)
def __call__(self, *args, **kwargs) -> Optional[str]:
if not os.path.exists(self.src_path):
return "The source path does not exist"
target_path = "/www/backup/versions"
if not os.path.isdir(target_path):
os.makedirs(target_path)
if self.sync:
return self._sync_backup(self.src_path, target_path, self.zip_name)
else:
return self._async_backup(self.src_path, target_path, self.zip_name)
def _sync_backup(self, src: str, target_path: str, zip_name: str) -> Optional[str]:
try:
write_file(self.exec_log_file, "")
execStr = ("cd {} && "
"tar -zcvf '{}' --exclude=.user.ini ./ 2>&1 > {} \n"
"echo '---The packaging execution is complete---' >> {}"
).format(src, os.path.join(target_path, zip_name), self.exec_log_file, self.exec_log_file)
ExecShell(execStr)
self.save_config()
except:
return "The packaging execution failed"
def _async_backup(self, src: str, target_path: str, zip_name: str):
import threading
hash_name = self._hash_src_name(src)
backup_tip_path = "/tmp/mod_version_tip"
if os.path.exists(backup_tip_path):
os.makedirs(backup_tip_path)
tip_file = os.path.join(backup_tip_path, hash_name)
if os.path.isfile(tip_file):
mtime = os.stat(tip_file).st_mtime
if time.time() - mtime > 60 * 20: # 20 分钟未执行,认为出现在不可抗力,导致备份失败,允许再次备份
os.remove(tip_file)
else:
return "Packing is in progress, please do not proceed"
write_file(tip_file, "")
def _back_p():
try:
write_file(self.exec_log_file, "")
execStr = ("cd {} && "
"tar -zcvf '{}' --exclude=.user.ini ./ 2>&1 > {} \n"
"echo '---Backup execution completed---' >> {}"
).format(src, os.path.join(target_path, zip_name), self.exec_log_file, self.exec_log_file)
ExecShell(execStr)
self.save_config()
except:
pass
finally:
if os.path.exists(tip_file):
os.remove(tip_file)
t = threading.Thread(target=_back_p)
t.start()
@staticmethod
def _hash_src_name(name: Union[str, bytes]) -> str:
if isinstance(name, str):
name = name.encode('utf-8')
md5_obj = md5()
md5_obj.update(name)
return md5_obj.hexdigest()