Files

330 lines
13 KiB
Python
Raw Permalink Normal View History

2026-04-07 02:04:22 +05:30
import os,sys,re,time,subprocess
panelPath = '/www/server/panel/'
os.chdir(panelPath)
sys.path.insert(0, panelPath + "class/")
import public
from datetime import date, datetime
is_openssl = True
try:
import OpenSSL
except:
is_openssl = False
class ssl_info:
def __init__(self) -> None:
pass
def create_key(self,bits=2048):
"""
@name 创建RSA密钥
@param bits 密钥长度
"""
if is_openssl:
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, bits)
private_key = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
return private_key
else:
tmp_pk_file = "/tmp/private_key_{}.pem".format(int(time.time()))
cmd = ["openssl", "genpkey", "-algorithm", "RSA", "-out", tmp_pk_file, "-pkeyopt", f"rsa_keygen_bits:{bits}"]
subprocess.run(cmd, check=True)
with open(tmp_pk_file, "r") as f:
private_key = f.read()
try:
os.remove("private_key.pem")
except:
pass
return private_key
def load_ssl_info_by_data(self, pem_data: str):
if not isinstance(pem_data, (str, bytes)):
return None
if is_openssl:
return self.__get_cert_info(pem_data)
else:
result = {}
pem_file = "/tmp/fullchain_{}.pem".format(int(time.time()))
public.writeFile(pem_file, pem_data)
res = public.ExecShell("openssl x509 -in {} -noout -text".format(pem_file))[0]
issuer_match = re.search(r"Issuer: (.*)", res)
if issuer_match:
data = {}
issuer = issuer_match.group(1)
for key,val in re.findall(r"(\w+)\s*=([^,]+)", issuer):
data[key] = val.strip()
result["issuer"] = data['CN']
if "CN" in data:
s = data['CN'].encode().decode('unicode_escape')
result["issuer"] = bytes(s, 'latin1').decode('utf-8')
result["issuer_O"] = data['O']
if "O" in data:
s = data['O'].encode().decode('unicode_escape')
result["issuer_O"] = bytes(s, 'latin1').decode('utf-8')
validity_match = re.search(r"Not After\s*:\s*(.*)", res)
if validity_match:
not_after = validity_match.group(1)
dt_after = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
result['notAfter'] = dt_after.strftime("%Y-%m-%d %H:%M:%S")
result['endtime'] = (dt_after - datetime.now()).days
else:
result['endtime'] = 0
validity_match = re.search(r"Not Before\s*:\s*(.*)", res)
if validity_match:
not_befoer = validity_match.group(1)
dt_befoer = datetime.strptime(not_befoer, "%b %d %H:%M:%S %Y %Z")
result['notBefore'] = dt_befoer.strftime("%Y-%m-%d %H:%M:%S")
subject_match = re.search(r"Subject: (.*)", res)
if subject_match:
subject = subject_match.group(1)
for key,val in re.findall(r"(\w+)\s*=([^,]+)", subject):
if key == 'CN':
s = val.encode().decode('unicode_escape')
result["subject"] = bytes(s, 'latin1').decode('utf-8').strip()
# 取可选名称
result['dns'] = []
dns_match = re.findall(r"DNS:([^\s,]+)", res)
for dns in dns_match:
result['dns'].append(dns)
if os.path.exists(pem_file):
os.remove(pem_file)
return result
def load_ssl_info(self,pem_file):
"""
@name 获取证书详情
"""
if not os.path.exists(pem_file):
return None
pem_data = public.readFile(pem_file)
if not pem_data:
return None
return self.load_ssl_info_by_data(pem_data)
def __get_cert_info(self,pem_data):
"""
@name 通过python的openssl模块获取证书信息
@param pem_data 证书内容
"""
result = {}
try:
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem_data)
except: # 证书格式可能是错的,就没有办法读取证书内容
return None
issuer = x509.get_issuer()
result['issuer'] = ''
if hasattr(issuer, 'CN'):
result['issuer'] = issuer.CN
if not result['issuer']:
is_key = [b'0', '0']
issue_comp = issuer.get_components()
if len(issue_comp) == 1:
is_key = [b'CN', 'CN']
for iss in issue_comp:
if iss[0] in is_key:
result['issuer'] = iss[1].decode()
break
if hasattr(issuer, 'O'):
result['issuer_O'] = issuer.O
# 取到期时间
result['notAfter'] = self.strf_date(
bytes.decode(x509.get_notAfter())[:-1])
# 取申请时间
result['notBefore'] = self.strf_date(
bytes.decode(x509.get_notBefore())[:-1])
# 取可选名称
result['dns'] = []
for i in range(x509.get_extension_count()):
s_name = x509.get_extension(i)
if s_name.get_short_name() in [b'subjectAltName', 'subjectAltName']:
s_dns = str(s_name).split(',')
for d in s_dns:
result['dns'].append(d.split(':')[1])
subject = x509.get_subject().get_components()
# 取主要认证名称
if len(subject) == 1:
result['subject'] = subject[0][1].decode()
else:
if not result['dns']:
for sub in subject:
if sub[0] == b'CN':
result['subject'] = sub[1].decode()
break
if 'subject' in result:
result['dns'].append(result['subject'])
else:
result['subject'] = result['dns'][0]
result['endtime'] = int(int(time.mktime(time.strptime(result['notAfter'], "%Y-%m-%d")) - time.time()) / 86400)
return result
# 转换时间
def strf_date(self, sdate):
return time.strftime('%Y-%m-%d', time.strptime(sdate, '%Y%m%d%H%M%S'))
#转换时间
def strfToTime(self,sdate):
import time
return time.strftime('%Y-%m-%d',time.strptime(sdate,'%b %d %H:%M:%S %Y %Z'))
def dump_pkcs12_new(self, key_pem=None, cert_pem=None, ca_pem=None, friendly_name=""):
try:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.x509 import load_pem_x509_certificate
private_key = self.analysis_private_key(key_pem)
cert = self.analysis_certificate(cert_pem)
if not private_key or not cert:
return None
# 加载CA证书
cas = None
if ca_pem:
cas = [load_pem_x509_certificate(ca_pem.encode(), default_backend())]
# 将证书和私钥组合成PKCS12格式的文件
from cryptography.hazmat.primitives.serialization.pkcs12 import serialize_key_and_certificates
p12 = serialize_key_and_certificates(
name=friendly_name.encode() if friendly_name else None,
key=private_key,
cert=cert,
encryption_algorithm=serialization.NoEncryption(),
cas=cas
)
return p12
except:
return None
def analysis_private_key(self, key_pem, password=None):
"""
解析私钥
:param key_pem: 私钥内容
:param password: 私钥密码
:return: 私钥对象
"""
try:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
private_key = serialization.load_pem_private_key(
key_pem.encode(),
password=password,
backend=default_backend()
)
return private_key
except:
return None
def analysis_certificate(self, cert_pem):
"""
解析证书
:param cert_pem: 证书内容
:return: 有效返回True无效返回False
"""
try:
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import load_pem_x509_certificate
cert = load_pem_x509_certificate(cert_pem.encode(), default_backend())
return cert
except Exception as e:
public.print_log(public.get_error_info())
return None
def verify_certificate_and_key_match(self, key_pem, cert_pem, password=None):
"""
验证证书和私钥是否匹配(通过签名验证)
:param key_pem: 私钥内容
:param cert_pem: 证书内容
:param password: 私钥密码
:return: 验证成功返回True失败返回False
"""
try:
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding, ec
message = b"test message"
private_key = self.analysis_private_key(key_pem, password)
cert = self.analysis_certificate(cert_pem)
if not private_key:
return False, "Key error, please check if it is the correct PEM format private key"
if not cert:
return False, "Certificate error, please check if it is the correct PEM format certificate"
# 使用私钥对消息进行签名
try:
signature = private_key.sign(
message,
padding.PKCS1v15(),
hashes.SHA256()
)
except Exception as e:
return True, str(e)
# 使用证书中的公钥验证签名
public_key = cert.public_key()
try:
if isinstance(public_key, ec.EllipticCurvePublicKey):
public_key.verify(
signature,
message,
ec.ECDSA(hashes.SHA256())
)
else:
public_key.verify(
signature,
message,
padding.PKCS1v15(),
hashes.SHA256()
)
return True, "Verification successful"
except Exception as e:
return False, "The key and certificate do not match."
except Exception as e:
public.print_log(public.get_error_info())
return True, str(e)
def verify_certificate_chain(self, cert_pem):
"""
验证证书链是否完整
:param cert_pem: 证书链内容
:return: 验证成功返回True失败返回False
"""
try:
from cryptography.hazmat.primitives.asymmetric import padding, ec
cert_chain = [(i + "-----END CERTIFICATE-----").strip() for i in
cert_pem.strip().split("-----END CERTIFICATE-----") if i]
for i in range(len(cert_chain) - 1):
cert = self.analysis_certificate(cert_chain[i])
issuer_cert = self.analysis_certificate(cert_chain[i + 1])
# 验证当前证书的签名是否由上一级签发
try:
issuer_public_key = issuer_cert.public_key()
# 检查 issuer_public_key 是否为 ECC 公钥或 RSA 公钥
if isinstance(issuer_public_key, ec.EllipticCurvePublicKey):
# 使用 ECDSA 验证 ECC 证书签名
issuer_public_key.verify(
cert.signature,
cert.tbs_certificate_bytes,
ec.ECDSA(cert.signature_hash_algorithm),
)
else:
# 使用 PKCS1v15 验证 RSA 证书签名
issuer_public_key.verify(
cert.signature,
cert.tbs_certificate_bytes,
padding.PKCS1v15(),
cert.signature_hash_algorithm,
)
except Exception as e:
return False, "Certificate chain verification failed, please check if the certificate chain is complete."
return True, "Certificate chain verification successful."
except Exception as e:
public.print_log(public.get_error_info())
return True, str(e)