# backend/db/users.py
# Import standard modules
-import bcrypt
import json
import os
# Import local modules
-from backend.db.db import get_db
-from backend.db.db import register_init
+from backend.db.db import get_db, register_init
# ================================
# Create hash password
))
conn.commit()
-
-# -----------------------------
-# Verify Login
-# -----------------------------
-def verify_login(username, password):
- user = get_user_by_username(username)
- if not user:
- return False
-
- if user["status"] != "active":
- return False
-
- if not bcrypt.checkpw(password.encode(), user["password_hash"].encode()):
- return False
-
- return True
from fastapi.responses import FileResponse, RedirectResponse, JSONResponse
import os
# Import local modules
-from backend.security import is_logged_in
+from backend.security import is_logged_in, apply_session
from backend.routes.health import router as health_router
from backend.routes.login import router as login_router
from backend.routes.hosts import router as hosts_router
@app.middleware("http")
async def session_middleware(request: Request, call_next):
path = request.url.path
+ token = request.cookies.get("session")
# Excludes the login methods
if path.startswith("/login") or path.startswith("/api/login"):
if path.startswith("/api"):
if not is_logged_in(request):
return JSONResponse({"error": "Not authenticated"}, status_code=401)
- return await call_next(request)
+
+ response = await call_next(request)
+ # Sliding expiration
+ apply_session(response, username=None, token=token)
+ return response
# Protected HTML pages
if not is_logged_in(request):
return RedirectResponse("/login")
- return await call_next(request)
+ response = await call_next(request)
+ # Sliding expiration
+ apply_session(response, username=None, token=token)
+ return response
# ---------------------------------------------------------
# FRONTEND PATHS (absolute paths inside Docker)
import os
import time
# Import local modules
-from backend.security import signer
-from backend.db.users import verify_login
+from backend.security import verify_login, apply_session
# Import config variables
from backend.config import FRONTEND_DIR, LOGIN_MAX_ATTEMPTS, LOGIN_WINDOW_SECONDS
# reset tentativi su IP
login_attempts.pop(ip, None)
- token = signer.sign(user).decode()
- response.set_cookie(
- "session",
- token,
- httponly=True,
- max_age=86400,
- path="/",
- #secure=True, # solo via HTTPS
- samesite="Strict"
- )
+ apply_session(response, username=user)
return {"status": "ok"}
return {"error": "Wrong credentials"}
# backend/security.py
-# import standard modules
+# Import standard modules
+import bcrypt
import os
from fastapi import Request, HTTPException
from itsdangerous import TimestampSigner
+# Import local modules
+from backend.db.users import get_user_by_username
+from backend.utils import log_event
# Import config variables
from backend.config import FRONTEND_DIR, SECRET_KEY
signer = TimestampSigner(SECRET_KEY)
+# -----------------------------
+# Verify Login
+# -----------------------------
+def verify_login(username, password):
+ user = get_user_by_username(username)
+ if not user:
+ log_event("LOGIN failed - user not found", user=username)
+ return False
+
+ if user["status"] != "active":
+ log_event("LOGIN Failed - user disabled", user=username)
+ return False
+
+ if not bcrypt.checkpw(password.encode(), user["password_hash"].encode()):
+ log_event("LOGIN Failed - password wrong", user=username)
+ return False
+
+ log_event("LOGIN", user=username)
+ return True
+
+# ----------------------------
+# creates or renew the cookie
+# ----------------------------
+def apply_session(response, username: str | None = None, token: str | None = None):
+
+ # First Login
+ if username is not None and token is None:
+ token = signer.sign(username).decode()
+ log_event("SESSION_CREATE", user=username)
+
+ if username is None:
+ username = signer.unsign(token, max_age=86400).decode()
+ log_event("SESSION_UPDATE", user=username)
+
+ if username is None or token is None:
+ log_event("SESSION_ERROR")
+ return
+
+ response.set_cookie(
+ "session",
+ token,
+ httponly=True,
+ max_age=86400,
+ path="/",
+ #secure=True, # solo via HTTPS
+ samesite="Strict"
+ )
+
# -----------------------------
# check session cookie
# -----------------------------
# backend/db/utils.py
# Import standard modules
+from datetime import datetime
import os
# -----------------------------
with open(path, "r") as f:
return f.read().strip()
return None
+
+# -----------------------------
+# Log Event
+# -----------------------------
+def log_event(event: str, **fields):
+ ts = datetime.utcnow().isoformat() + "Z"
+ parts = " ".join(f"{k}={v}" for k, v in fields.items())
+ print(f"INFO: {ts} {event} {parts}")
+