Files
yakpanel-core/mod/base/msg/util.py
2026-04-07 02:04:22 +05:30

140 lines
3.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sys
from typing import Optional, List, Tuple
from mod.base.push_mod import BaseTask, WxAccountMsgBase, WxAccountMsg, get_push_public_data
if "/www/server/panel/class" not in sys.path:
sys.path.insert(0, "/www/server/panel/class")
import public
PANEL_PATH = "/www/server/panel"
public_http_post = public.httpPost
def write_push_log(
module_name: str,
status: bool,
title: str,
user: Optional[List[str]] = None):
"""
记录 告警推送情况
@param module_name: 通道方式
@param status: 是否成功
@param title: 标题
@param user: 推送到的用户,可以为空,如:钉钉 不需要
@return:
"""
if status:
status_str = '<span style="color:#20a53a;"> Success </span>'
else:
status_str = '<span style="color:red;">Fail</span>'
if not user:
user_str = '[ default ]'
else:
user_str = '[ {} ]'.format(",".join(user))
log = 'Title: [{}], Notification: [{}], Result: [{}], Addressee: {}'.format(title, module_name, status_str, user_str)
public.WriteLog('Alarm notification', log)
return True
def write_mail_push_log(
title: str,
error_user: List[str],
success_user: List[str],
):
"""
记录 告警推送情况
@param title: 标题
@param error_user: 失败的用户
@param success_user: 成功的用户
@return:
"""
e_fmt = '<span style="color:#20a53a;">{}</span>'
s_fmt = '<span style="color:red;">{}</span>'
error_user_msg = ",".join([e_fmt.format(i) for i in error_user])
success_user = ",".join([s_fmt.format(i) for i in success_user])
log = 'Title: [{}], notification method: [Email], send failed recipients: {}, send successful recipients: {}'.format(
title, error_user_msg, success_user
)
public.WriteLog('Alarm notification', log)
return True
def write_file(filename: str, s_body: str, mode='w+') -> bool:
"""
写入文件内容
@filename 文件名
@s_body 欲写入的内容
return bool 若文件不存在则尝试自动创建
"""
try:
fp = open(filename, mode=mode)
fp.write(s_body)
fp.close()
return True
except:
try:
fp = open(filename, mode=mode, encoding="utf-8")
fp.write(s_body)
fp.close()
return True
except:
return False
def read_file(filename, mode='r') -> Optional[str]:
"""
读取文件内容
@filename 文件名
return string(bin) 若文件不存在则返回None
"""
import os
if not os.path.exists(filename):
return None
fp = None
try:
fp = open(filename, mode=mode)
f_body = fp.read()
except:
return None
finally:
if fp and not fp.closed:
fp.close()
return f_body
class _TestMsgTask(BaseTask):
"""
用来测试的短息
"""
@staticmethod
def the_push_public_data():
return get_push_public_data()
def get_keywords(self, task_data: dict) -> str:
pass
def to_sms_msg(self, push_data: dict, push_public_data: dict) -> Tuple[str, dict]:
raise NotImplementedError()
def to_wx_account_msg(self, push_data: dict, push_public_data: dict) -> WxAccountMsg:
msg = WxAccountMsg.new_msg()
msg.thing_type = self.title
msg.msg = "The message channel was configured successfully"
return msg
def get_test_msg(title: str, task_name="Message channel configuration reminders") -> _TestMsgTask:
"""
用来测试的短息
"""
t = _TestMsgTask()
t.title = title
t.template_name = task_name
return t