"""YakPanel - File manager API""" import os import shutil import stat import zipfile from datetime import datetime, timezone from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form from fastapi.responses import FileResponse from pydantic import BaseModel, Field from app.core.config import get_runtime_config from app.core.utils import read_file, write_file, path_safe_check from app.api.auth import get_current_user from app.models.user import User router = APIRouter(prefix="/files", tags=["files"]) def _resolve_path(path: str) -> str: """ Resolve API path to an OS path. On Linux/macOS: path is an absolute POSIX path from filesystem root (/) so admins can browse the whole server (typical expectation for a full-server admin file manager). On Windows (dev): paths stay sandboxed under www_root / setup_path. """ if ".." in path: raise HTTPException(status_code=400, detail="Path traversal not allowed") raw = path.strip().replace("\\", "/") if os.name == "nt": cfg = get_runtime_config() www_root = os.path.abspath(cfg["www_root"]) setup_path = os.path.abspath(cfg["setup_path"]) allowed = [www_root, setup_path] norm_path = raw.strip("/") if not norm_path or norm_path in ("www", "www/wwwroot", "wwwroot"): full = www_root elif norm_path.startswith("www/wwwroot/"): full = os.path.abspath(os.path.join(www_root, norm_path[12:])) else: full = os.path.abspath(os.path.join(www_root, norm_path)) if not any( full == r or (full + os.sep).startswith(r + os.sep) for r in allowed ): raise HTTPException(status_code=403, detail="Path not allowed") return full # POSIX: absolute paths from / if not raw or raw == "/": return "/" if not raw.startswith("/"): raw = "/" + raw full = os.path.normpath(raw) if not full.startswith("/"): raise HTTPException(status_code=400, detail="Invalid path") return full def _stat_entry(path: str, name: str) -> dict | None: item_path = os.path.join(path, name) try: st = os.stat(item_path, follow_symlinks=False) except OSError: return None is_dir = os.path.isdir(item_path) owner = str(st.st_uid) group = str(st.st_gid) try: import pwd import grp owner = pwd.getpwuid(st.st_uid).pw_name group = grp.getgrgid(st.st_gid).gr_name except (ImportError, KeyError, OSError): pass try: sym = stat.filemode(st.st_mode) except Exception: sym = "" return { "name": name, "is_dir": is_dir, "size": st.st_size if not is_dir else 0, "mtime": datetime.fromtimestamp(st.st_mtime, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S"), "mtime_ts": int(st.st_mtime), "mode": format(st.st_mode & 0o777, "o"), "mode_symbolic": sym, "owner": owner, "group": group, } @router.get("/list") async def files_list( path: str = "/", current_user: User = Depends(get_current_user), ): """List directory contents""" try: full = _resolve_path(path) except HTTPException: raise if not os.path.isdir(full): raise HTTPException(status_code=404, detail="Not a directory") items = [] try: names = os.listdir(full) except PermissionError: raise HTTPException(status_code=403, detail="Permission denied") except FileNotFoundError: raise HTTPException(status_code=404, detail="Not found") except OSError as e: raise HTTPException(status_code=500, detail=str(e)) for name in sorted(names, key=str.lower): row = _stat_entry(full, name) if row: items.append(row) display_path = full if full == "/" else full.rstrip("/") return {"path": display_path, "items": items} @router.get("/dir-size") async def files_dir_size( path: str, current_user: User = Depends(get_current_user), ): """Return total byte size of directory tree (may be slow on large trees).""" try: full = _resolve_path(path) except HTTPException: raise if not os.path.isdir(full): raise HTTPException(status_code=400, detail="Not a directory") total = 0 try: for root, _dirs, files in os.walk(full): for fn in files: fp = os.path.join(root, fn) try: total += os.path.getsize(fp) except OSError: pass except PermissionError: raise HTTPException(status_code=403, detail="Permission denied") return {"size": total} @router.get("/search") async def files_search( q: str, path: str = "/", max_results: int = 200, current_user: User = Depends(get_current_user), ): """Find files/folders whose name contains q (case-insensitive), walking from path.""" if not q or not q.strip(): return {"path": path, "results": []} try: root = _resolve_path(path) except HTTPException: raise if not os.path.isdir(root): raise HTTPException(status_code=400, detail="Not a directory") qn = q.strip().lower() results: list[dict] = [] skip_prefixes = tuple() if os.name != "nt" and root in ("/", "//"): skip_prefixes = ("/proc", "/sys", "/dev") def should_skip(p: str) -> bool: ap = os.path.abspath(p) return any(ap == sp or ap.startswith(sp + os.sep) for sp in skip_prefixes) try: for dirpath, dirnames, filenames in os.walk(root, topdown=True): if len(results) >= max_results: break if should_skip(dirpath): dirnames[:] = [] continue try: for dn in list(dirnames): if len(results) >= max_results: break if qn in dn.lower(): rel = os.path.join(dirpath, dn) results.append({"path": rel.replace("\\", "/"), "name": dn, "is_dir": True}) for fn in filenames: if len(results) >= max_results: break if qn in fn.lower(): rel = os.path.join(dirpath, fn) results.append({"path": rel.replace("\\", "/"), "name": fn, "is_dir": False}) except OSError: continue except PermissionError: raise HTTPException(status_code=403, detail="Permission denied") return {"path": path, "query": q, "results": results[:max_results]} @router.get("/read") async def files_read( path: str, current_user: User = Depends(get_current_user), ): """Read file content""" try: full = _resolve_path(path) except HTTPException: raise if not os.path.isfile(full): raise HTTPException(status_code=404, detail="Not a file") content = read_file(full) if content is None: raise HTTPException(status_code=500, detail="Failed to read file") return {"path": path, "content": content} @router.get("/download") async def files_download( path: str, current_user: User = Depends(get_current_user), ): """Download file""" try: full = _resolve_path(path) except HTTPException: raise if not os.path.isfile(full): raise HTTPException(status_code=404, detail="Not a file") return FileResponse(full, filename=os.path.basename(full)) @router.post("/upload") async def files_upload( path: str = Form(...), file: UploadFile = File(...), current_user: User = Depends(get_current_user), ): """Upload file to directory""" try: full = _resolve_path(path) except HTTPException: raise if not os.path.isdir(full): raise HTTPException(status_code=400, detail="Path must be a directory") filename = file.filename or "upload" if ".." in filename or "/" in filename or "\\" in filename: raise HTTPException(status_code=400, detail="Invalid filename") dest = os.path.join(full, filename) content = await file.read() if not write_file(dest, content, "wb"): raise HTTPException(status_code=500, detail="Failed to write file") return {"status": True, "msg": "Uploaded", "path": path + "/" + filename} class MkdirRequest(BaseModel): path: str name: str class RenameRequest(BaseModel): path: str old_name: str new_name: str class DeleteRequest(BaseModel): path: str name: str is_dir: bool @router.post("/mkdir") async def files_mkdir( body: MkdirRequest, current_user: User = Depends(get_current_user), ): """Create directory""" try: parent = _resolve_path(body.path) except HTTPException: raise if not os.path.isdir(parent): raise HTTPException(status_code=400, detail="Parent must be a directory") if not body.name or ".." in body.name or "/" in body.name or "\\" in body.name: raise HTTPException(status_code=400, detail="Invalid directory name") if not path_safe_check(body.name): raise HTTPException(status_code=400, detail="Invalid directory name") full = os.path.join(parent, body.name) if os.path.exists(full): raise HTTPException(status_code=400, detail="Already exists") try: os.makedirs(full, 0o755) except OSError as e: raise HTTPException(status_code=500, detail=str(e)) return {"status": True, "msg": "Created", "path": body.path.rstrip("/") + "/" + body.name} @router.post("/rename") async def files_rename( body: RenameRequest, current_user: User = Depends(get_current_user), ): """Rename file or directory""" try: parent = _resolve_path(body.path) except HTTPException: raise if not body.old_name or not body.new_name: raise HTTPException(status_code=400, detail="Names required") for n in (body.old_name, body.new_name): if ".." in n or "/" in n or "\\" in n or not path_safe_check(n): raise HTTPException(status_code=400, detail="Invalid name") old_full = os.path.join(parent, body.old_name) new_full = os.path.join(parent, body.new_name) if not os.path.exists(old_full): raise HTTPException(status_code=404, detail="Not found") if os.path.exists(new_full): raise HTTPException(status_code=400, detail="Target already exists") try: os.rename(old_full, new_full) except OSError as e: raise HTTPException(status_code=500, detail=str(e)) return {"status": True, "msg": "Renamed"} @router.post("/delete") async def files_delete( body: DeleteRequest, current_user: User = Depends(get_current_user), ): """Delete file or directory""" try: parent = _resolve_path(body.path) except HTTPException: raise if not body.name or ".." in body.name or "/" in body.name or "\\" in body.name: raise HTTPException(status_code=400, detail="Invalid name") full = os.path.join(parent, body.name) if not os.path.exists(full): raise HTTPException(status_code=404, detail="Not found") try: if body.is_dir: import shutil shutil.rmtree(full) else: os.remove(full) except OSError as e: raise HTTPException(status_code=500, detail=str(e)) return {"status": True, "msg": "Deleted"} class WriteFileRequest(BaseModel): path: str content: str @router.post("/write") async def files_write( body: WriteFileRequest, current_user: User = Depends(get_current_user), ): """Write text file content""" try: full = _resolve_path(body.path) except HTTPException: raise if os.path.isdir(full): raise HTTPException(status_code=400, detail="Cannot write to directory") if not write_file(full, body.content): raise HTTPException(status_code=500, detail="Failed to write file") return {"status": True, "msg": "Saved"} class TouchRequest(BaseModel): path: str name: str class ChmodRequest(BaseModel): file_path: str mode: str = Field(description="Octal mode e.g. 0644 or 755") recursive: bool = False class CopyRequest(BaseModel): path: str name: str dest_path: str dest_name: str | None = None class MoveRequest(BaseModel): path: str name: str dest_path: str dest_name: str | None = None class CompressRequest(BaseModel): path: str names: list[str] archive_name: str @router.post("/touch") async def files_touch( body: TouchRequest, current_user: User = Depends(get_current_user), ): """Create an empty file in directory path.""" try: parent = _resolve_path(body.path) except HTTPException: raise if not body.name or ".." in body.name or "/" in body.name or "\\" in body.name: raise HTTPException(status_code=400, detail="Invalid name") if not path_safe_check(body.name): raise HTTPException(status_code=400, detail="Invalid name") full = os.path.join(parent, body.name) if os.path.exists(full): raise HTTPException(status_code=400, detail="Already exists") try: open(full, "a", encoding="utf-8").close() except OSError as e: raise HTTPException(status_code=500, detail=str(e)) return {"status": True, "msg": "Created"} @router.post("/chmod") async def files_chmod( body: ChmodRequest, current_user: User = Depends(get_current_user), ): """chmod a file or directory. mode is octal string (644, 0755).""" try: full = _resolve_path(body.file_path) except HTTPException: raise if not os.path.exists(full): raise HTTPException(status_code=404, detail="Not found") try: mode = int(body.mode.strip(), 8) except ValueError: raise HTTPException(status_code=400, detail="Invalid mode") if mode < 0 or mode > 0o7777: raise HTTPException(status_code=400, detail="Invalid mode") def _chmod_one(p: str) -> None: os.chmod(p, mode) try: if body.recursive and os.path.isdir(full): for root, dirs, files in os.walk(full): for d in dirs: _chmod_one(os.path.join(root, d)) for f in files: _chmod_one(os.path.join(root, f)) _chmod_one(full) else: _chmod_one(full) except PermissionError: raise HTTPException(status_code=403, detail="Permission denied") except OSError as e: raise HTTPException(status_code=500, detail=str(e)) return {"status": True, "msg": "Permissions updated"} @router.post("/copy") async def files_copy( body: CopyRequest, current_user: User = Depends(get_current_user), ): """Copy file or directory into dest_path (optionally new name).""" try: src_parent = _resolve_path(body.path) dest_parent = _resolve_path(body.dest_path) except HTTPException: raise if not body.name or ".." in body.name: raise HTTPException(status_code=400, detail="Invalid name") src = os.path.join(src_parent, body.name) dest_name = body.dest_name or body.name if ".." in dest_name or "/" in dest_name or "\\" in dest_name: raise HTTPException(status_code=400, detail="Invalid destination name") dest = os.path.join(dest_parent, dest_name) if not os.path.exists(src): raise HTTPException(status_code=404, detail="Source not found") if os.path.exists(dest): raise HTTPException(status_code=400, detail="Destination already exists") if not os.path.isdir(dest_parent): raise HTTPException(status_code=400, detail="Destination parent must be a directory") try: if os.path.isdir(src): shutil.copytree(src, dest, symlinks=True) else: shutil.copy2(src, dest) except OSError as e: raise HTTPException(status_code=500, detail=str(e)) return {"status": True, "msg": "Copied"} @router.post("/move") async def files_move( body: MoveRequest, current_user: User = Depends(get_current_user), ): """Move (rename) file or directory to another folder.""" try: src_parent = _resolve_path(body.path) dest_parent = _resolve_path(body.dest_path) except HTTPException: raise if not body.name or ".." in body.name: raise HTTPException(status_code=400, detail="Invalid name") src = os.path.join(src_parent, body.name) dest_name = body.dest_name or body.name if ".." in dest_name or "/" in dest_name or "\\" in dest_name: raise HTTPException(status_code=400, detail="Invalid destination name") dest = os.path.join(dest_parent, dest_name) if not os.path.exists(src): raise HTTPException(status_code=404, detail="Source not found") if os.path.exists(dest): raise HTTPException(status_code=400, detail="Destination already exists") if not os.path.isdir(dest_parent): raise HTTPException(status_code=400, detail="Destination parent must be a directory") try: shutil.move(src, dest) except OSError as e: raise HTTPException(status_code=500, detail=str(e)) return {"status": True, "msg": "Moved"} @router.post("/compress") async def files_compress( body: CompressRequest, current_user: User = Depends(get_current_user), ): """Create a zip in path containing named files/folders.""" try: parent = _resolve_path(body.path) except HTTPException: raise if not os.path.isdir(parent): raise HTTPException(status_code=400, detail="Not a directory") if not body.names: raise HTTPException(status_code=400, detail="Nothing to compress") name = (body.archive_name or "archive").strip() if not name.lower().endswith(".zip"): name += ".zip" if ".." in name or "/" in name or "\\" in name: raise HTTPException(status_code=400, detail="Invalid archive name") zip_path = os.path.join(parent, name) if os.path.exists(zip_path): raise HTTPException(status_code=400, detail="Archive already exists") try: with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: for entry in body.names: if not entry or ".." in entry or "/" in entry or "\\" in entry: continue src = os.path.join(parent, entry) if not os.path.exists(src): continue if os.path.isfile(src): zf.write(src, arcname=entry.replace("\\", "/")) elif os.path.isdir(src): for root, _dirs, files in os.walk(src): for f in files: fp = os.path.join(root, f) zn = os.path.relpath(fp, parent).replace("\\", "/") zf.write(fp, zn) except OSError as e: if os.path.exists(zip_path): try: os.remove(zip_path) except OSError: pass raise HTTPException(status_code=500, detail=str(e)) return {"status": True, "msg": "Compressed", "archive": name}