Files
yakpanel-core/YakTask/brain.py
2026-04-07 02:04:22 +05:30

414 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# coding: utf-8
# -------------------------------------------------------------------
# YakPanel
# -------------------------------------------------------------------
# Copyright (c) 2014-2099 YakPanel(www.yakpanel.com) All rights reserved.
# -------------------------------------------------------------------
# Author: yakpanel
# -------------------------------------------------------------------
# ------------------------------
# task app
# ------------------------------
import os
import sys
from typing import Callable, Type, TypeVar, List, Dict
sys.path.insert(0, "/www/server/panel/class/")
import time
import threading
import heapq
import psutil
from dataclasses import dataclass, field, fields
from concurrent.futures import ThreadPoolExecutor, Future
from YakTask.conf import logger, CHILD_PID_PATH
T = TypeVar("T", bound="TaskInfo")
CPU_COUNT = psutil.cpu_count()
@dataclass(order=True)
class TaskInfo:
# dont init next_run
next_run: float = field(init=False)
task_id: str
# dont compare
func: Callable = field(compare=False)
interval: int | float
is_core: bool
loop: bool = True
@classmethod
def from_dict(cls: Type[T], data: dict, next_run: float) -> T:
class_fields = {f.name for f in fields(cls)}
filtered_data = {k: v for k, v in data.items() if k in class_fields}
instance = cls(**filtered_data)
instance.next_run = next_run
return instance
class SimpleBrain:
MAXFACTOR = 5.0 # 最大延迟因子
MEM_LIMIT_MB = 100 # 内存限制
MAX_WORKER = 16 # 最大线程数
DEFAULT_WORKER = max(4, CPU_COUNT) # 默认线程数
CHECKER_INTERVAL = 10 # 检查器间隔时间/秒
TIMEOUT_FACTOR = 2 # 任务超时倍率
TASK_MAX_TIMEOUT = 6 * 3600.0 # 任务最大超时6小时
__slots__ = (
"cpu_max", "workers", "executor", "task_queue", "queue_lock",
"task_status", "status_lock", "core_status_lock", "core_tasks", "delay_factor",
"shutdown", "mem_limit", "pool_full_count",
)
def __init__(self, cpu_max: float = 30.0, workers: int = None, mem_limit: int = None):
self.cpu_max = cpu_max
self.workers = min(self.MAX_WORKER, workers or self.DEFAULT_WORKER)
self.mem_limit = mem_limit or self.MEM_LIMIT_MB
logger.debug(f"max_workers = {self.workers}, mem_limit = {self.mem_limit}MB")
self.task_queue: List[TaskInfo] = []
self.queue_lock = threading.RLock()
self.status_lock = threading.RLock()
self.core_status_lock = threading.RLock()
self.core_tasks: Dict[str, tuple[threading.Thread, tuple[Callable, int]]] = {}
self.task_status: Dict[str, tuple[float, float, Future]] = {}
self.shutdown = False
self.delay_factor = 0.0
self.pool_full_count = 0
self.executor = ThreadPoolExecutor(max_workers=self.workers)
self._start_checker()
logger.debug("SimpleBrain initialized")
def _auto_pool(self):
"""自适应扩容池"""
cpu = psutil.cpu_percent(interval=0.1)
new_size = None
# 扩容, 任务堆积
if all([
self.pool_full_count >= (60 // self.CHECKER_INTERVAL), # 1min内连续满载
cpu < 80, # 在CPU未过载时进行线程扩容
self.workers < self.MAX_WORKER
]):
new_size = min(self.workers + 2, self.MAX_WORKER)
# else: # 缩容
if new_size and new_size != self.workers:
logger.warning(f"Resize Task Pool from {self.workers} to {new_size}...")
self._shutdown()
os.system(
f"nohup /www/server/panel/Yak-Task --max-workers {new_size} >> /www/server/panel/logs/task.log 2>&1 &"
)
os._exit(0)
@property
def queue_size(self) -> int:
with self.queue_lock:
return len(self.task_queue)
@property
def get_current_process_memory(self):
"""返回当前进程内存MB"""
return round(psutil.Process().memory_info().rss / 1024 / 1024, 2)
def __mem_limit(self, current_mem: float = None):
if not current_mem:
current_mem = self.get_current_process_memory
if current_mem >= self.mem_limit:
logger.warning("Memory exceed limit...Restart brain task")
self._shutdown()
os.system(
"nohup /www/server/panel/Yak-Task >> /www/server/panel/logs/task.log 2>&1 &"
)
os._exit(0)
def __core_alive(self):
with self.core_status_lock:
dead_tasks = [
task_id for task_id, (thread, _) in self.core_tasks.items()
if not thread.is_alive()
]
for task_id in dead_tasks:
logger.warning(f"Core task [{task_id}] is dead. Restarting...")
_, args = self.core_tasks[task_id]
thread = threading.Thread(
target=self._core_task_runner,
args=args,
daemon=True,
name=f"CoreTask-{task_id}"
)
thread.start()
self.core_tasks[task_id] = (thread, args)
def __normal_task_process(self, now_time: float) -> int:
"""
- 对于普通任务 deadline_ts != -1如果超时则移除。
- 对于循环任务 deadline_ts == -1如果其 future非running则移除。
返回当前运行中的所有普通任务数
"""
def _pop_task(tid_str: str):
self.clean_child(os.path.join(CHILD_PID_PATH, f"{tid_str}.pid"))
self.task_status.pop(tid_str, None)
with self.status_lock:
for tid, info in list(self.task_status.items()):
if not info or len(info) != 3:
_pop_task(tid)
continue
start_time, deadline_ts, future = info
if deadline_ts != -1:
if now_time > deadline_ts:
# 普通任务超时静默移除
_pop_task(tid)
continue
if future and not future.running():
# loop任务异常移除
_pop_task(tid)
return len(self.task_status)
def __checker(self):
resize_check_interval = 60 # 每1min检查一次是否需要扩容
last_check_time = time.monotonic()
last_resize_check = time.monotonic()
while not self.shutdown:
try:
cpu = psutil.cpu_percent(interval=0.2)
now = time.monotonic()
# 每10s
now_time = time.time()
if now - last_check_time > self.CHECKER_INTERVAL:
last_check_time = now
mem = self.get_current_process_memory
self.__mem_limit(mem) # 内存阈值检测
self.__core_alive()
running_tasks = self.__normal_task_process(now_time)
rate = (running_tasks / self.workers) * 100
logger.debug(
f"Mem: {mem} MB, CPU: {cpu}%, "
f"Pool: {running_tasks}/{self.workers} ({rate:.1f}%)"
)
if rate >= 90.0: # 高负荷计数
self.pool_full_count += 1
else: # 重置计数
self.pool_full_count = 0
# 每1min
if now - last_resize_check > resize_check_interval:
last_resize_check = now
self._auto_pool()
if cpu > self.cpu_max:
self.delay_factor = min(
self.MAXFACTOR,
self.MAXFACTOR * (cpu - self.cpu_max) / (100.0 - self.cpu_max)
)
elif self.delay_factor > 0:
self.delay_factor = max(0.0, self.delay_factor - 0.1)
except Exception as e:
logger.error(f"cpu mem checker error: {e}")
time.sleep(5)
def _start_checker(self):
threading.Thread(
target=self.__checker, daemon=True, name="BrainChecker"
).start()
def _callback(self, task_id: str, future: Future):
try:
future.result()
except Exception as e:
logger.error(f"task error [{task_id}]: {str(e)}")
finally:
with self.status_lock:
info = self.task_status.pop(task_id, None)
if not info:
logger.info(f"task done [{task_id}]")
return
start_time, _, _ = info
logger.debug(
f"task done [{task_id}] use time: {round(time.time() - start_time, 2)}s"
)
def _run_func_safe(self, func: Callable):
"""捕获异常"""
try:
func()
except Exception:
import traceback
logger.error(traceback.format_exc())
def _core_task_runner(self, funcs: Callable | List[Callable], interval: float):
funcs = funcs if isinstance(funcs, list) else [funcs]
while 1:
for f in funcs:
self._run_func_safe(f)
time.sleep(interval)
def _submit_task(self, tasks: List[TaskInfo]):
for task in tasks:
start_time = time.time()
# 超时值: 自身任务间隔 * 超时倍率, 最大限制
if not task.loop:
timeout_sec = min(
self.TASK_MAX_TIMEOUT, (max(1.0, float(task.interval) * self.TIMEOUT_FACTOR))
)
deadline_ts = start_time + timeout_sec
else:
deadline_ts = -1 # 无限期
future = None
try:
with self.status_lock:
future = self.executor.submit(task.func)
self.task_status[task.task_id] = (
start_time, deadline_ts, future
)
future.add_done_callback(
lambda f, tid=task.task_id: self._callback(tid, f)
)
except Exception as e:
logger.error(f"Failed to submit task [{task.task_id}]: {e}")
# 无条件确保清理状态
with self.status_lock:
self.task_status.pop(task.task_id, None)
continue
# ========================== Public =======================================
def register_task(self, **kwargs):
task_id = kwargs["task_id"]
if kwargs.get("is_core"):
# core task, daemon thread
if task_id in self.core_tasks:
return
args = (kwargs["func"], kwargs["interval"])
core_thread = threading.Thread(
target=self._core_task_runner,
args=args,
daemon=True,
name=f"CoreTask-{task_id}"
)
core_thread.start()
logger.debug(
f"registe core task [{task_id}], interval [{kwargs['interval']}]"
)
with self.core_status_lock:
self.core_tasks[task_id] = (core_thread, args)
else: # not core task, heapq, pool
with self.queue_lock:
if task_id not in {t.task_id for t in self.task_queue}:
task = TaskInfo.from_dict(kwargs, next_run=time.monotonic())
heapq.heappush(self.task_queue, task)
logger.debug(
f"register normal task [{task_id}], interval [{task.interval}s]"
)
@staticmethod
def clean_child(pid_path: str = None):
# 清理进程, 防泄漏
def _clean_single(path):
if not os.path.exists(path) or not path.endswith(".pid"):
return
try:
with open(path, "r") as pf:
pid = int(pf.read().strip())
psutil.Process(pid).kill()
except (psutil.NoSuchProcess, psutil.AccessDenied, FileNotFoundError, ValueError):
pass
except Exception:
pass
finally:
try:
os.remove(path)
except Exception:
pass
if not pid_path:
# 清理所有子进程
if not os.path.exists(CHILD_PID_PATH):
return
for pid_file in os.listdir(CHILD_PID_PATH):
_clean_single(os.path.join(CHILD_PID_PATH, pid_file))
else: # 清理指定子进程
_clean_single(pid_path)
def run(self):
self.clean_child()
while not self.shutdown:
now = time.monotonic()
with self.queue_lock:
ready_tasks = []
while (
self.task_queue
and self.task_queue[0].next_run <= now
and len(ready_tasks) < 1
):
ready_tasks.append(heapq.heappop(self.task_queue))
next_run_time = self.task_queue[0].next_run if self.task_queue else now + 1.0
submit = []
requeue = []
for task in ready_tasks:
with self.status_lock:
is_running = task.task_id in self.task_status
if is_running:
logger.debug(f"still running, {task.task_id}")
task.next_run = now + task.interval
requeue.append(task)
continue
if self.delay_factor > 0:
# 指数退避
logger.debug(f"cpu overload, requeue [{task.task_id}]")
task.next_run = now + 1.5 * (1.0 + self.delay_factor)
requeue.append(task)
continue
# 更新间隔, 准备提交, 归队
task.next_run = now + task.interval
requeue.append(task)
submit.append(task)
if requeue:
with self.queue_lock:
for task in requeue:
heapq.heappush(self.task_queue, task)
next_run_time = self.task_queue[0].next_run
if submit:
self._submit_task(submit)
time.sleep(1) # 使相同间隔的任务加大错峰
time.sleep(max(0.01, min(1.0, next_run_time - now)))
def _shutdown(self):
self.shutdown = True
logger.warning("shutdown...")
with self.queue_lock:
pending = len(self.task_queue)
self.task_queue.clear()
with self.status_lock:
running = len(self.task_status)
self.task_status.clear()
with self.core_status_lock:
self.core_tasks.clear()
logger.warning(
f"shutdown complete, {running} tasks cancelled, {pending} tasks discarded"
)
self.executor.shutdown(wait=False)
self.clean_child()
logger.warning("released")