"""YakPanel - Utility functions (ported from legacy panel public module)""" import os import re import hashlib import asyncio import subprocess import html from typing import Tuple, Optional regex_safe_path = re.compile(r"^[\w\s./\-]*$") # systemd often sets PATH to venv-only; subprocess shells then miss /usr/bin (dnf, apt-get, …). _SYSTEM_PATH = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" def ensure_system_path(env: dict[str, str]) -> None: """Append standard locations to PATH if /usr/bin is missing.""" cur = (env.get("PATH") or "").strip() if "/usr/bin" in cur: return env["PATH"] = f"{cur}:{_SYSTEM_PATH}" if cur else _SYSTEM_PATH def environment_with_system_path(base: Optional[dict[str, str]] = None) -> dict[str, str]: """Copy of process env (or base) with PATH guaranteed to include system bin dirs.""" env = dict(base) if base is not None else os.environ.copy() ensure_system_path(env) return env def md5(strings: str | bytes) -> str: """Generate MD5 hash""" if isinstance(strings, str): strings = strings.encode("utf-8") return hashlib.md5(strings).hexdigest() def read_file(filename: str, mode: str = "r") -> str | bytes | None: """Read file contents""" if not os.path.exists(filename): return None try: with open(filename, mode, encoding="utf-8" if "b" not in mode else None) as f: return f.read() except Exception: try: with open(filename, mode) as f: return f.read() except Exception: return None def write_file(filename: str, content: str | bytes, mode: str = "w+") -> bool: """Write content to file""" try: os.makedirs(os.path.dirname(filename) or ".", exist_ok=True) with open(filename, mode, encoding="utf-8" if "b" not in mode else None) as f: f.write(content) return True except Exception: return False def xss_decode(text: str) -> str: """Decode XSS-encoded text""" try: cs = {""": '"', """: '"', "'": "'", "'": "'"} for k, v in cs.items(): text = text.replace(k, v) return html.unescape(text) except Exception: return text def path_safe_check(path: str, force: bool = True) -> bool: """Validate path for security (no traversal, no dangerous chars)""" if len(path) > 256: return False checks = ["..", "./", "\\", "%", "$", "^", "&", "*", "~", '"', "'", ";", "|", "{", "}", "`"] for c in checks: if c in path: return False if force and not regex_safe_path.match(path): return False return True async def exec_shell( cmd: str, timeout: Optional[float] = None, cwd: Optional[str] = None, ) -> Tuple[str, str]: """Execute shell command asynchronously. Returns (stdout, stderr).""" proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, env=environment_with_system_path(), ) try: stdout, stderr = await asyncio.wait_for( proc.communicate(), timeout=timeout or 300, ) except asyncio.TimeoutError: proc.kill() await proc.wait() return "", "Timed out" out = stdout.decode("utf-8", errors="replace") if stdout else "" err = stderr.decode("utf-8", errors="replace") if stderr else "" return out, err def exec_shell_sync(cmd: str, timeout: Optional[float] = None, cwd: Optional[str] = None) -> Tuple[str, str]: """Execute shell command synchronously. Returns (stdout, stderr).""" try: result = subprocess.run( cmd, shell=True, capture_output=True, timeout=timeout or 300, cwd=cwd, env=environment_with_system_path(), ) out = result.stdout.decode("utf-8", errors="replace") if result.stdout else "" err = result.stderr.decode("utf-8", errors="replace") if result.stderr else "" return out, err except subprocess.TimeoutExpired: return "", "Timed out" except Exception as e: return "", str(e)