Files
2026-04-07 13:30:25 +05:30

116 lines
4.0 KiB
Python

"""YakPanel - Firewall API"""
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from pydantic import BaseModel
from app.core.database import get_db
from app.core.utils import exec_shell_sync
from app.api.auth import get_current_user
from app.models.user import User
from app.models.firewall import FirewallRule
router = APIRouter(prefix="/firewall", tags=["firewall"])
class CreateFirewallRuleRequest(BaseModel):
port: str
protocol: str = "tcp"
action: str = "accept"
ps: str = ""
@router.get("/status")
async def firewall_backend_status(current_user: User = Depends(get_current_user)):
"""UFW and firewalld presence/state for the Security UI (read-only)."""
ufw_out, _ = exec_shell_sync("ufw status 2>/dev/null", timeout=5)
ufw_text = (ufw_out or "").strip()
ufw_detected = bool(ufw_text) and "Status:" in ufw_text
ufw_active: bool | None = None
if ufw_detected:
if "Status: active" in ufw_text:
ufw_active = True
elif "Status: inactive" in ufw_text:
ufw_active = False
fw_state_out, _ = exec_shell_sync("firewall-cmd --state 2>/dev/null", timeout=5)
fw_line = (fw_state_out or "").strip().lower()
firewalld_running = fw_line == "running"
firewalld_detected = fw_line in ("running", "not running")
return {
"ufw": {
"detected": ufw_detected,
"active": ufw_active,
"summary_line": ufw_text.split("\n")[0] if ufw_text else "",
},
"firewalld": {
"detected": firewalld_detected,
"running": firewalld_running,
"state": fw_line or None,
},
}
@router.get("/list")
async def firewall_list(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""List firewall rules"""
result = await db.execute(select(FirewallRule).order_by(FirewallRule.id))
rows = result.scalars().all()
return [{"id": r.id, "port": r.port, "protocol": r.protocol, "action": r.action, "ps": r.ps} for r in rows]
@router.post("/create")
async def firewall_create(
body: CreateFirewallRuleRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Add firewall rule (stored in panel; use Apply to UFW to sync)"""
if not body.port or len(body.port) > 32:
raise HTTPException(status_code=400, detail="Invalid port")
rule = FirewallRule(port=body.port, protocol=body.protocol, action=body.action, ps=body.ps)
db.add(rule)
await db.commit()
return {"status": True, "msg": "Rule added", "id": rule.id}
@router.delete("/{rule_id}")
async def firewall_delete(
rule_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Delete firewall rule"""
result = await db.execute(select(FirewallRule).where(FirewallRule.id == rule_id))
rule = result.scalar_one_or_none()
if not rule:
raise HTTPException(status_code=404, detail="Rule not found")
await db.delete(rule)
await db.commit()
return {"status": True, "msg": "Rule deleted"}
@router.post("/apply")
async def firewall_apply(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Apply firewall rules to ufw (runs ufw allow/deny for each rule)"""
result = await db.execute(select(FirewallRule).order_by(FirewallRule.id))
rules = result.scalars().all()
errors = []
for r in rules:
port_proto = f"{r.port}/{r.protocol}"
action = "allow" if r.action == "accept" else "deny"
cmd = f"ufw {action} {port_proto}"
out, err = exec_shell_sync(cmd, timeout=10)
if err and "error" in err.lower() and "already" not in err.lower():
errors.append(f"{port_proto}: {err.strip()}")
if errors:
raise HTTPException(status_code=500, detail="; ".join(errors))
return {"status": True, "msg": "Rules applied", "count": len(rules)}