import secrets
import os
import ipaddress
+import time
# Import local models
from backend.db import (
delete_host
)
+# Read Variables
+SECRET_KEY = os.getenv("SESSION_SECRET", secrets.token_urlsafe(64))
+HTTP_PORT = os.getenv("HTTP_PORT", "8000")
+LOGIN_MAX_ATTEMPTS = int(os.getenv("LOGIN_MAX_ATTEMPTS", "5"))
+LOGIN_WINDOW_SECONDS = int(os.getenv("LOGIN_WINDOW_SECONDS", "600"))
+
+# IP → lista timestamp tentativi
+login_attempts = {}
+
+# Start FastAPI app
app = FastAPI()
# Allow frontend JS to call the API
app.add_middleware(
CORSMiddleware,
- allow_origins=["*"],
- allow_methods=["*"],
- allow_headers=["*"],
+ allow_origins=[
+ f"http://localhost:{HTTP_PORT}",
+ f"http://127.0.0.1:{HTTP_PORT}",
+ ],
+ allow_methods=["GET", "POST", "PUT", "DELETE"],
+ allow_headers=["Content-Type"],
)
# Token signer for session management
-SECRET_KEY = os.getenv("SESSION_SECRET", secrets.token_urlsafe(64))
signer = TimestampSigner(SECRET_KEY)
def require_login(request: Request):
except:
raise HTTPException(status_code=401, detail="Invalid session")
+def check_rate_limit(ip: str):
+ now = time.time()
+
+ attempts = login_attempts.get(ip, [])
+ # tieni solo tentativi negli ultimi LOGIN_WINDOW_SECONDS secondi
+ attempts = [t for t in attempts if now - t < LOGIN_WINDOW_SECONDS]
+
+ if len(attempts) >= LOGIN_MAX_ATTEMPTS:
+ raise HTTPException(status_code=429, detail="Too many login attempts")
+
+ # registra nuovo tentativo
+ attempts.append(now)
+ login_attempts[ip] = attempts
+
# ---------------------------------------------------------
# FRONTEND PATHS (absolute paths inside Docker)
# ---------------------------------------------------------
# ---------------------------------------------------------
@app.post("/api/login")
-def api_login(data: dict, response: Response):
+def api_login(request: Request, data: dict, response: Response):
+ ip = request.client.host
+ check_rate_limit(ip)
+
user = data.get("username")
pwd = data.get("password")
if user == "admin" and pwd == "admin":
+ # reset tentativi su IP
+ login_attempts.pop(ip, None)
+
token = signer.sign(user).decode()
response.set_cookie(
"session",
token,
httponly=True,
max_age=86400,
- path="/"
+ path="/",
+ #secure=True, # solo via HTTPS
+ samesite="Strict" # riduce CSRF
)
return {"status": "ok"}
return {"error": "Invalid credentials"}