Files

81 lines
2.6 KiB
Python
Raw Permalink Normal View History

2026-04-07 02:04:22 +05:30
"""YakPanel - Logs viewer API"""
import os
from fastapi import APIRouter, Depends, HTTPException, Query
from app.core.config import get_runtime_config
from app.core.utils import read_file
from app.api.auth import get_current_user
from app.models.user import User
router = APIRouter(prefix="/logs", tags=["logs"])
def _resolve_log_path(path: str) -> str:
"""Resolve path within www_logs only"""
if ".." in path:
2026-04-07 05:17:40 +05:30
raise HTTPException(status_code=400, detail="Path traversal not allowed")
2026-04-07 02:04:22 +05:30
cfg = get_runtime_config()
logs_root = os.path.abspath(cfg["www_logs"])
path = path.strip().replace("\\", "/").lstrip("/")
if not path:
return logs_root
full = os.path.abspath(os.path.join(logs_root, path))
if not (full == logs_root or full.startswith(logs_root + os.sep)):
raise HTTPException(status_code=403, detail="Path not allowed")
return full
@router.get("/list")
async def logs_list(
path: str = "/",
current_user: User = Depends(get_current_user),
):
"""List log files and directories under www_logs"""
try:
full = _resolve_log_path(path)
except HTTPException:
raise
if not os.path.isdir(full):
raise HTTPException(status_code=400, detail="Not a directory")
items = []
for name in sorted(os.listdir(full)):
item_path = os.path.join(full, name)
try:
stat = os.stat(item_path)
items.append({
"name": name,
"is_dir": os.path.isdir(item_path),
"size": stat.st_size if os.path.isfile(item_path) else 0,
})
except OSError:
pass
rel = path.rstrip("/") or "/"
return {"path": rel, "items": items}
@router.get("/read")
async def logs_read(
path: str,
tail: int = Query(default=1000, ge=1, le=100000),
current_user: User = Depends(get_current_user),
):
"""Read log file content (last N lines)"""
try:
full = _resolve_log_path(path)
except HTTPException:
raise
if not os.path.isfile(full):
raise HTTPException(status_code=404, detail="Not a file")
content = read_file(full)
if content is None:
raise HTTPException(status_code=500, detail="Failed to read file")
if isinstance(content, bytes):
try:
content = content.decode("utf-8", errors="replace")
except Exception:
raise HTTPException(status_code=400, detail="Binary file")
lines = content.splitlines()
if len(lines) > tail:
lines = lines[-tail:]
return {"path": path, "content": "\n".join(lines), "total_lines": len(content.splitlines())}