"""YakPanel - Utility functions (ported from legacy panel public module)""" import os import re import hashlib import asyncio import subprocess import html from typing import Tuple, Optional regex_safe_path = re.compile(r"^[\w\s./\-]*$") # systemd often sets PATH to venv-only; subprocess shells then miss /usr/bin (dnf, apt-get, …). _SYSTEM_PATH = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" def ensure_system_path(env: dict[str, str]) -> None: """Append standard locations to PATH if /usr/bin is missing.""" cur = (env.get("PATH") or "").strip() if "/usr/bin" in cur: return env["PATH"] = f"{cur}:{_SYSTEM_PATH}" if cur else _SYSTEM_PATH def environment_with_system_path(base: Optional[dict[str, str]] = None) -> dict[str, str]: """Copy of process env (or base) with PATH guaranteed to include system bin dirs.""" env = dict(base) if base is not None else os.environ.copy() ensure_system_path(env) return env def md5(strings: str | bytes) -> str: """Generate MD5 hash""" if isinstance(strings, str): strings = strings.encode("utf-8") return hashlib.md5(strings).hexdigest() def read_file(filename: str, mode: str = "r") -> str | bytes | None: """Read file contents""" if not os.path.exists(filename): return None try: with open(filename, mode, encoding="utf-8" if "b" not in mode else None) as f: return f.read() except Exception: try: with open(filename, mode) as f: return f.read() except Exception: return None def write_file(filename: str, content: str | bytes, mode: str = "w+") -> bool: """Write content to file""" try: os.makedirs(os.path.dirname(filename) or ".", exist_ok=True) with open(filename, mode, encoding="utf-8" if "b" not in mode else None) as f: f.write(content) return True except Exception: return False def xss_decode(text: str) -> str: """Decode XSS-encoded text""" try: cs = {""": '"', """: '"', "'": "'", "'": "'"} for k, v in cs.items(): text = text.replace(k, v) return html.unescape(text) except Exception: return text def path_safe_check(path: str, force: bool = True) -> bool: """Validate path for security (no traversal, no dangerous chars)""" if len(path) > 256: return False checks = ["..", "./", "\\", "%", "$", "^", "&", "*", "~", '"', "'", ";", "|", "{", "}", "`"] for c in checks: if c in path: return False if force and not regex_safe_path.match(path): return False return True async def exec_shell( cmd: str, timeout: Optional[float] = None, cwd: Optional[str] = None, ) -> Tuple[str, str]: """Execute shell command asynchronously. Returns (stdout, stderr).""" proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, env=environment_with_system_path(), ) try: stdout, stderr = await asyncio.wait_for( proc.communicate(), timeout=timeout or 300, ) except asyncio.TimeoutError: proc.kill() await proc.wait() return "", "Timed out" out = stdout.decode("utf-8", errors="replace") if stdout else "" err = stderr.decode("utf-8", errors="replace") if stderr else "" return out, err def exec_shell_sync(cmd: str, timeout: Optional[float] = None, cwd: Optional[str] = None) -> Tuple[str, str]: """Execute shell command synchronously. Returns (stdout, stderr).""" try: result = subprocess.run( cmd, shell=True, capture_output=True, timeout=timeout or 300, cwd=cwd, env=environment_with_system_path(), ) out = result.stdout.decode("utf-8", errors="replace") if result.stdout else "" err = result.stderr.decode("utf-8", errors="replace") if result.stderr else "" return out, err except subprocess.TimeoutExpired: return "", "Timed out" except Exception as e: return "", str(e) def nginx_test_and_reload(nginx_bin: str, timeout: float = 60.0) -> Tuple[bool, str]: """Run ``nginx -t`` then ``nginx -s reload``. Returns (success, error_message).""" if not nginx_bin or not os.path.isfile(nginx_bin): return True, "" env = environment_with_system_path() try: t = subprocess.run( [nginx_bin, "-t"], capture_output=True, text=True, timeout=timeout, env=env, ) except (FileNotFoundError, OSError, subprocess.TimeoutExpired) as e: return False, str(e) if t.returncode != 0: err = (t.stderr or t.stdout or "").strip() return False, err or f"nginx -t exited {t.returncode}" try: r = subprocess.run( [nginx_bin, "-s", "reload"], capture_output=True, text=True, timeout=timeout, env=env, ) except (FileNotFoundError, OSError, subprocess.TimeoutExpired) as e: return False, str(e) if r.returncode != 0: err = (r.stderr or r.stdout or "").strip() return False, err or f"nginx -s reload exited {r.returncode}" return True, "" def nginx_binary_candidates() -> list[str]: """Nginx binaries to operate on: panel-bundled first, then common system paths (deduped by realpath).""" from app.core.config import get_runtime_config cfg = get_runtime_config() seen: set[str] = set() binaries: list[str] = [] panel_ngx = os.path.join(cfg.get("setup_path") or "", "nginx", "sbin", "nginx") if os.path.isfile(panel_ngx): binaries.append(panel_ngx) try: seen.add(os.path.realpath(panel_ngx)) except OSError: seen.add(panel_ngx) for alt in ("/usr/sbin/nginx", "/usr/bin/nginx", "/usr/local/nginx/sbin/nginx"): if not os.path.isfile(alt): continue try: rp = os.path.realpath(alt) except OSError: rp = alt if rp in seen: continue binaries.append(alt) seen.add(rp) return binaries def nginx_reload_all_known(timeout: float = 60.0) -> Tuple[bool, str]: """ Test and reload panel nginx (setup_path/nginx/sbin/nginx) and distinct system nginx binaries so vhost changes apply regardless of which daemon serves sites. """ binaries = nginx_binary_candidates() if not binaries: return True, "" errs: list[str] = [] ok_any = False for ngx in binaries: ok, err = nginx_test_and_reload(ngx, timeout=timeout) if ok: ok_any = True else: errs.append(f"{ngx}: {err}") if ok_any: return True, "" return False, "; ".join(errs) if errs else "nginx reload failed for all candidates"