Files
yakpanel-core/mod/project/docker/apphub/tool.py
2026-04-07 02:04:22 +05:30

191 lines
6.0 KiB
Python

import os
import json
import shutil
import datetime
import tarfile
import zipfile
import public
class BaseCompressHandler:
"""压缩文件处理基类"""
def __init__(self):
pass
def get_files(self, sfile):
"""获取压缩包内文件列表"""
pass
def get_file_info(self, sfile,filename):
"""获取压缩包内文件信息"""
pass
def check_file_exists(self, file_path):
"""检查文件是否存在"""
if not os.path.exists(file_path):
return False
return True
class GzHandler(BaseCompressHandler):
"""tar.gz压缩文件处理类"""
def get_filename(self, item):
"""获取压缩包文件名"""
filename = item.name
try:
filename = item.name.encode('cp437').decode('gbk')
except:
pass
if item.isdir():
filename += '/'
return filename
def check_file_type(self, file_path):
"""检查文件是否为tar文件"""
if not tarfile.is_tarfile(file_path):
if file_path[-3:] == ".gz":
return False, 'This is not tar.gz archive file, the gz archive file does not support preview, only decompression'
return False, 'Not a valid tar.gz archive file'
return True, ''
def get_files(self, sfile):
"""获取压缩包内文件列表"""
if not self.check_file_exists(sfile):
return public.returnMsg(False, 'FILE_NOT_EXISTS')
is_valid, message = self.check_file_type(sfile)
if not is_valid:
return public.returnMsg(False, message)
zip_file = tarfile.open(sfile)
data = {}
for item in zip_file.getmembers():
file_name = self.get_filename(item)
temp_list = file_name.split("/")
sub_data = data
for name in temp_list:
if not name: continue
if name not in sub_data:
if file_name.endswith(name) and not ".{}".format(name) in file_name:
sub_data[name] = {
'file_size': item.size,
'filename': name,
'fullpath': file_name,
'date_time': public.format_date(times=item.mtime),
'is_dir': 1 if item.isdir() else 0
}
else:
sub_data[name] = {}
sub_data = sub_data[name]
zip_file.close()
return data
def get_file_info(self, sfile, filename):
"""获取压缩包内文件信息"""
if not self.check_file_exists(sfile):
return public.returnMsg(False, 'FILE_NOT_EXISTS')
tmp_path = '{}/tmp/{}'.format(public.get_panel_path(), public.md5(sfile + filename))
result = {}
result['status'] = True
result['data'] = ''
with tarfile.open(sfile, 'r') as zip_file:
try:
zip_file.extract(filename, tmp_path)
result['data'] = public.readFile('{}/{}'.format(tmp_path, filename))
except:
pass
public.ExecShell("rm -rf {}".format(tmp_path))
return result
class ZipHandler(BaseCompressHandler):
"""zip压缩文件处理类"""
def check_file_type(self, sfile, is_close=False):
"""检查文件是否为zip文件"""
zip_file = None
try:
zip_file = zipfile.ZipFile(sfile)
except:
pass
if is_close and zip_file:
zip_file.close()
return zip_file
def get_filename(self, item):
"""获取压缩包文件名"""
path = item.filename
try:
path_name = path.encode('cp437').decode('utf-8')
except:
try:
path_name = path.encode('cp437').decode('gbk')
path_name = path_name.encode('utf-8').decode('utf-8')
except:
path_name = path
return path_name
def get_files(self, sfile):
"""获取压缩包内文件列表"""
if not self.check_file_exists(sfile):
return public.returnMsg(False, 'FILE_NOT_EXISTS')
zip_file = self.check_file_type(sfile)
if not zip_file:
return public.returnMsg(False, 'NOT_ZIP_FILE')
data = {}
for item in zip_file.infolist():
file_name = self.get_filename(item)
temp_list = file_name.lstrip("./").split("/")
sub_data = data
for name in temp_list:
if not name: continue
if name not in sub_data:
if file_name.endswith(name):
sub_data[name] = {
'file_size': item.file_size,
'compress_size': item.compress_size,
'compress_type': item.compress_type,
'filename': name,
'fullpath': file_name,
'date_time': datetime.datetime(*item.date_time).strftime("%Y-%m-%d %H:%M:%S"),
'is_dir': 1 if item.is_dir() else 0
}
else:
sub_data[name] = {}
sub_data = sub_data[name]
zip_file.close()
return data
def get_file_info(self, sfile,filename):
"""获取压缩包内文件信息"""
if not self.check_file_exists(sfile):
return public.returnMsg(False, 'FILE_NOT_EXISTS')
result = {}
result['status'] = True
result['data'] = ''
with zipfile.ZipFile(sfile, 'r') as zip_file:
for item in zip_file.infolist():
z_filename = self.get_filename(item)
if z_filename == filename:
buff = zip_file.read(item.filename)
encoding, srcBody = public.decode_data(buff)
result['encoding'] = encoding
result['data'] = srcBody
break
return result