Files
yakpanel-core/class/ssl_info.py
2026-04-07 02:04:22 +05:30

330 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os,sys,re,time,subprocess
panelPath = '/www/server/panel/'
os.chdir(panelPath)
sys.path.insert(0, panelPath + "class/")
import public
from datetime import date, datetime
is_openssl = True
try:
import OpenSSL
except:
is_openssl = False
class ssl_info:
def __init__(self) -> None:
pass
def create_key(self,bits=2048):
"""
@name 创建RSA密钥
@param bits 密钥长度
"""
if is_openssl:
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, bits)
private_key = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
return private_key
else:
tmp_pk_file = "/tmp/private_key_{}.pem".format(int(time.time()))
cmd = ["openssl", "genpkey", "-algorithm", "RSA", "-out", tmp_pk_file, "-pkeyopt", f"rsa_keygen_bits:{bits}"]
subprocess.run(cmd, check=True)
with open(tmp_pk_file, "r") as f:
private_key = f.read()
try:
os.remove("private_key.pem")
except:
pass
return private_key
def load_ssl_info_by_data(self, pem_data: str):
if not isinstance(pem_data, (str, bytes)):
return None
if is_openssl:
return self.__get_cert_info(pem_data)
else:
result = {}
pem_file = "/tmp/fullchain_{}.pem".format(int(time.time()))
public.writeFile(pem_file, pem_data)
res = public.ExecShell("openssl x509 -in {} -noout -text".format(pem_file))[0]
issuer_match = re.search(r"Issuer: (.*)", res)
if issuer_match:
data = {}
issuer = issuer_match.group(1)
for key,val in re.findall(r"(\w+)\s*=([^,]+)", issuer):
data[key] = val.strip()
result["issuer"] = data['CN']
if "CN" in data:
s = data['CN'].encode().decode('unicode_escape')
result["issuer"] = bytes(s, 'latin1').decode('utf-8')
result["issuer_O"] = data['O']
if "O" in data:
s = data['O'].encode().decode('unicode_escape')
result["issuer_O"] = bytes(s, 'latin1').decode('utf-8')
validity_match = re.search(r"Not After\s*:\s*(.*)", res)
if validity_match:
not_after = validity_match.group(1)
dt_after = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
result['notAfter'] = dt_after.strftime("%Y-%m-%d %H:%M:%S")
result['endtime'] = (dt_after - datetime.now()).days
else:
result['endtime'] = 0
validity_match = re.search(r"Not Before\s*:\s*(.*)", res)
if validity_match:
not_befoer = validity_match.group(1)
dt_befoer = datetime.strptime(not_befoer, "%b %d %H:%M:%S %Y %Z")
result['notBefore'] = dt_befoer.strftime("%Y-%m-%d %H:%M:%S")
subject_match = re.search(r"Subject: (.*)", res)
if subject_match:
subject = subject_match.group(1)
for key,val in re.findall(r"(\w+)\s*=([^,]+)", subject):
if key == 'CN':
s = val.encode().decode('unicode_escape')
result["subject"] = bytes(s, 'latin1').decode('utf-8').strip()
# 取可选名称
result['dns'] = []
dns_match = re.findall(r"DNS:([^\s,]+)", res)
for dns in dns_match:
result['dns'].append(dns)
if os.path.exists(pem_file):
os.remove(pem_file)
return result
def load_ssl_info(self,pem_file):
"""
@name 获取证书详情
"""
if not os.path.exists(pem_file):
return None
pem_data = public.readFile(pem_file)
if not pem_data:
return None
return self.load_ssl_info_by_data(pem_data)
def __get_cert_info(self,pem_data):
"""
@name 通过python的openssl模块获取证书信息
@param pem_data 证书内容
"""
result = {}
try:
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem_data)
except: # 证书格式可能是错的,就没有办法读取证书内容
return None
issuer = x509.get_issuer()
result['issuer'] = ''
if hasattr(issuer, 'CN'):
result['issuer'] = issuer.CN
if not result['issuer']:
is_key = [b'0', '0']
issue_comp = issuer.get_components()
if len(issue_comp) == 1:
is_key = [b'CN', 'CN']
for iss in issue_comp:
if iss[0] in is_key:
result['issuer'] = iss[1].decode()
break
if hasattr(issuer, 'O'):
result['issuer_O'] = issuer.O
# 取到期时间
result['notAfter'] = self.strf_date(
bytes.decode(x509.get_notAfter())[:-1])
# 取申请时间
result['notBefore'] = self.strf_date(
bytes.decode(x509.get_notBefore())[:-1])
# 取可选名称
result['dns'] = []
for i in range(x509.get_extension_count()):
s_name = x509.get_extension(i)
if s_name.get_short_name() in [b'subjectAltName', 'subjectAltName']:
s_dns = str(s_name).split(',')
for d in s_dns:
result['dns'].append(d.split(':')[1])
subject = x509.get_subject().get_components()
# 取主要认证名称
if len(subject) == 1:
result['subject'] = subject[0][1].decode()
else:
if not result['dns']:
for sub in subject:
if sub[0] == b'CN':
result['subject'] = sub[1].decode()
break
if 'subject' in result:
result['dns'].append(result['subject'])
else:
result['subject'] = result['dns'][0]
result['endtime'] = int(int(time.mktime(time.strptime(result['notAfter'], "%Y-%m-%d")) - time.time()) / 86400)
return result
# 转换时间
def strf_date(self, sdate):
return time.strftime('%Y-%m-%d', time.strptime(sdate, '%Y%m%d%H%M%S'))
#转换时间
def strfToTime(self,sdate):
import time
return time.strftime('%Y-%m-%d',time.strptime(sdate,'%b %d %H:%M:%S %Y %Z'))
def dump_pkcs12_new(self, key_pem=None, cert_pem=None, ca_pem=None, friendly_name=""):
try:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.x509 import load_pem_x509_certificate
private_key = self.analysis_private_key(key_pem)
cert = self.analysis_certificate(cert_pem)
if not private_key or not cert:
return None
# 加载CA证书
cas = None
if ca_pem:
cas = [load_pem_x509_certificate(ca_pem.encode(), default_backend())]
# 将证书和私钥组合成PKCS12格式的文件
from cryptography.hazmat.primitives.serialization.pkcs12 import serialize_key_and_certificates
p12 = serialize_key_and_certificates(
name=friendly_name.encode() if friendly_name else None,
key=private_key,
cert=cert,
encryption_algorithm=serialization.NoEncryption(),
cas=cas
)
return p12
except:
return None
def analysis_private_key(self, key_pem, password=None):
"""
解析私钥
:param key_pem: 私钥内容
:param password: 私钥密码
:return: 私钥对象
"""
try:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
private_key = serialization.load_pem_private_key(
key_pem.encode(),
password=password,
backend=default_backend()
)
return private_key
except:
return None
def analysis_certificate(self, cert_pem):
"""
解析证书
:param cert_pem: 证书内容
:return: 有效返回True无效返回False
"""
try:
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import load_pem_x509_certificate
cert = load_pem_x509_certificate(cert_pem.encode(), default_backend())
return cert
except Exception as e:
public.print_log(public.get_error_info())
return None
def verify_certificate_and_key_match(self, key_pem, cert_pem, password=None):
"""
验证证书和私钥是否匹配(通过签名验证)
:param key_pem: 私钥内容
:param cert_pem: 证书内容
:param password: 私钥密码
:return: 验证成功返回True失败返回False
"""
try:
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding, ec
message = b"test message"
private_key = self.analysis_private_key(key_pem, password)
cert = self.analysis_certificate(cert_pem)
if not private_key:
return False, "Key error, please check if it is the correct PEM format private key"
if not cert:
return False, "Certificate error, please check if it is the correct PEM format certificate"
# 使用私钥对消息进行签名
try:
signature = private_key.sign(
message,
padding.PKCS1v15(),
hashes.SHA256()
)
except Exception as e:
return True, str(e)
# 使用证书中的公钥验证签名
public_key = cert.public_key()
try:
if isinstance(public_key, ec.EllipticCurvePublicKey):
public_key.verify(
signature,
message,
ec.ECDSA(hashes.SHA256())
)
else:
public_key.verify(
signature,
message,
padding.PKCS1v15(),
hashes.SHA256()
)
return True, "Verification successful"
except Exception as e:
return False, "The key and certificate do not match."
except Exception as e:
public.print_log(public.get_error_info())
return True, str(e)
def verify_certificate_chain(self, cert_pem):
"""
验证证书链是否完整
:param cert_pem: 证书链内容
:return: 验证成功返回True失败返回False
"""
try:
from cryptography.hazmat.primitives.asymmetric import padding, ec
cert_chain = [(i + "-----END CERTIFICATE-----").strip() for i in
cert_pem.strip().split("-----END CERTIFICATE-----") if i]
for i in range(len(cert_chain) - 1):
cert = self.analysis_certificate(cert_chain[i])
issuer_cert = self.analysis_certificate(cert_chain[i + 1])
# 验证当前证书的签名是否由上一级签发
try:
issuer_public_key = issuer_cert.public_key()
# 检查 issuer_public_key 是否为 ECC 公钥或 RSA 公钥
if isinstance(issuer_public_key, ec.EllipticCurvePublicKey):
# 使用 ECDSA 验证 ECC 证书签名
issuer_public_key.verify(
cert.signature,
cert.tbs_certificate_bytes,
ec.ECDSA(cert.signature_hash_algorithm),
)
else:
# 使用 PKCS1v15 验证 RSA 证书签名
issuer_public_key.verify(
cert.signature,
cert.tbs_certificate_bytes,
padding.PKCS1v15(),
cert.signature_hash_algorithm,
)
except Exception as e:
return False, "Certificate chain verification failed, please check if the certificate chain is complete."
return True, "Certificate chain verification successful."
except Exception as e:
public.print_log(public.get_error_info())
return True, str(e)