COPY backend backend
COPY frontend frontend
COPY entrypoint.py entrypoint.py
+COPY log log
+COPY settings settings
RUN chmod 755 entrypoint.py
# ---------- STAGE 2: DISTROLESS ----------
COPY --from=builder /app/backend backend
COPY --from=builder /app/frontend frontend
COPY --from=builder /app/entrypoint.py entrypoint.py
+COPY --from=builder /app/log log
+COPY --from=builder /app/settings settings
# Ensure Python sees the installed packages
ENV PYTHONPATH="/usr/local/lib/python3.12/site-packages"
+++ /dev/null
-# backend/config.py
-
-# Import standard modules
-import datetime
-import os
-import secrets
-# Import local modules
-from backend.utils import load_hash
-
-# Software Version
-BASE_VERSION = "0.1.0"
-DEVEL = os.getenv("DEV", "0") == "1"
-if DEVEL:
- timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M")
- APP_VERSION = f"{BASE_VERSION}-dev-{timestamp}"
-else:
- APP_VERSION = BASE_VERSION
-
-# Base Image / Docker Image
-BASEIMG_NAME = os.getenv("BASEIMG_NAME", "unknown")
-BASEIMG_VERSION = os.getenv("BASEIMG_VERSION", "unknown")
-
-# Frontend related settings
-FRONTEND_DIR = "/app/frontend"
-
-# Database related settings
-DB_FILE = os.getenv("DB_FILE", "/data/database.db")
-DB_RESET = os.getenv("DB_RESET", "0") == "1"
-
-# Hosts related settings
-DOMAIN = os.getenv("DOMAIN", "example.com")
-PUBLIC_IP = os.getenv("PUBLIC_IP", "127.0.0.1")
-
-# Web server related settings
-HTTP_PORT = os.getenv("HTTP_PORT", "8000")
-SECRET_KEY = os.getenv("SESSION_SECRET")
-if not SECRET_KEY:
- SECRET_KEY = load_hash("SECRET_KEY_FILE") or secrets.token_urlsafe(64)
-LOGIN_MAX_ATTEMPTS = int(os.getenv("LOGIN_MAX_ATTEMPTS", "5"))
-LOGIN_WINDOW_SECONDS = int(os.getenv("LOGIN_WINDOW_SECONDS", "600"))
-
-# User related settings
-ADMIN_USER = os.getenv("ADMIN_USER", "admin")
-ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin")
-ADMIN_HASH = os.getenv("ADMIN_HASH") or load_hash("ADMIN_HASH_FILE")
# Import standard modules
import os
import sqlite3
-# Import local modules
-from backend.config import DB_FILE
+# Import Settings
+from settings.settings import settings
+# Import Log
+from log.log import get_logger
_connection = None
_init_functions = []
def get_db():
global _connection
if _connection is None:
- os.makedirs(os.path.dirname(DB_FILE) or ".", exist_ok=True)
- _connection = sqlite3.connect(DB_FILE, check_same_thread=False)
+ os.makedirs(os.path.dirname(settings.DB_FILE) or ".", exist_ok=True)
+ _connection = sqlite3.connect(settings.DB_FILE, check_same_thread=False)
_connection.row_factory = sqlite3.Row
_connection.execute("PRAGMA foreign_keys = ON;")
return _connection
# Init Database
# -----------------------------
def init_db():
- print(f"INFO: Starting DB Initialization.")
+ logger = get_logger(__name__)
+ logger.info("Starting DB Initialization")
conn = get_db()
cur = conn.cursor()
conn.commit()
conn.close()
- print(f"INFO: DB Initialization Completed.")
\ No newline at end of file
+ logger.info("DB Initialization Completed")
# Import standard modules
import ipaddress
+import logging
import os
# Import local modules
-from backend.db.db import get_db
-from backend.db.db import register_init
+from backend.db.db import get_db, register_init
+# Import Settings
+from settings.settings import settings
+# Import Log
+from log.log import get_logger
# -----------------------------
# SELECT ALL HOSTS
# -----------------------------
@register_init
def init_db_hosts_table(cur):
- from backend.config import DOMAIN
- from backend.config import PUBLIC_IP
+ logger = get_logger(__name__)
- # GLOBAL SETTINGS
+ # SETTINGS TABLE
cur.execute("""
CREATE TABLE settings (
key TEXT PRIMARY KEY,
value TEXT
);
""")
- cur.execute("INSERT INTO settings (key, value) VALUES (?, ?)", ("domain", DOMAIN))
- cur.execute("INSERT INTO settings (key, value) VALUES (?, ?)", ("external_ipv4", PUBLIC_IP))
+ cur.execute("INSERT INTO settings (key, value) VALUES (?, ?)", ("domain", settings.DOMAIN))
+ cur.execute("INSERT INTO settings (key, value) VALUES (?, ?)", ("external_ipv4", settings.PUBLIC_IP))
- # HOSTS
+ # HOSTS TABLE
cur.execute("""
CREATE TABLE hosts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
""")
cur.execute("CREATE INDEX idx_hosts_name ON hosts(name);")
- # ALIASES
+ # ALIASES TABLE
cur.execute("""
CREATE TABLE aliases (
id INTEGER PRIMARY KEY AUTOINCREMENT,
""")
cur.execute("CREATE INDEX idx_aliases_host ON aliases(host_id);")
- # TXT RECORDS
+ # TXT TABLE
cur.execute("""
CREATE TABLE txt_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
""")
cur.execute("CREATE INDEX idx_txt_host ON txt_records(host_id);")
- print(f"INFO: - HOSTS DB: Database initialized successfully for {DOMAIN}.")
- print(f"INFO: - HOSTS DB: Public IP: {PUBLIC_IP}.")
+ logger.info("HOSTS DB: Database initialized successfully for %s", settings.DOMAIN)
+ logger.info("HOSTS DB: Public IP: %s", settings.PUBLIC_IP)
# backend/db/users.py
# Import standard modules
+import bcrypt
import json
+import logging
import os
# Import local modules
from backend.db.db import get_db, register_init
+# Import Settings
+from settings.settings import settings
+# Import Log
+from log.log import get_logger
# ================================
# Create hash password
# -----------------------------
@register_init
def init_db_users_table(cur):
- from backend.config import ADMIN_USER
- from backend.config import ADMIN_PASSWORD
- from backend.config import ADMIN_HASH
+ logger = get_logger(__name__)
+ # USERS TABLE
cur.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
""")
cur.execute("CREATE INDEX idx_users_username ON users(username);")
# Insert default admin user
- if not ADMIN_HASH:
- ADMIN_HASH = hash_password(ADMIN_PASSWORD)
+ if not settings.ADMIN_PASSWORD_HASH:
+ settings.ADMIN_PASSWORD_HASH = hash_password(settings.ADMIN_PASSWORD)
else:
- ADMIN_PASSWORD = "(hidden)"
+ settings.ADMIN_PASSWORD = "(hidden)"
cur.execute("""
INSERT INTO users (
username, password_hash, email, is_admin, modules, status,
created_at, updated_at, password_changed_at
) VALUES (?, ?, ?, ?, ?, ?, strftime('%s','now'), strftime('%s','now'), strftime('%s','now'));
""", (
- ADMIN_USER,
- ADMIN_HASH,
+ settings.ADMIN_USER,
+ settings.ADMIN_PASSWORD_HASH,
"admin@example.com",
1,
'["dns","dhcp"]',
"active"
))
- print(f"INFO: - USERS DB: Admin user: {ADMIN_USER} with password {ADMIN_PASSWORD} - {ADMIN_HASH}.")
+ logger.info("USERS DB: Admin user: %s with password %s - %s" ,
+ settings.ADMIN_USER, settings.ADMIN_PASSWORD, settings.ADMIN_PASSWORD_HASH)
# -----------------------------
# Create User
# backend/main.py
# import standard modules
-from fastapi import FastAPI, Request, Response
+from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
-from fastapi.responses import FileResponse, RedirectResponse, JSONResponse
+from fastapi.responses import FileResponse, RedirectResponse, JSONResponse, Response
+import logging
import os
-# Import local modules
-from backend.security import is_logged_in, apply_session
+# Import Routers
+from backend.routes.about import router as about_router
from backend.routes.health import router as health_router
from backend.routes.login import router as login_router
from backend.routes.hosts import router as hosts_router
-# Import config variables
-from backend.config import FRONTEND_DIR, HTTP_PORT
+# Import Security
+from backend.security import is_logged_in, apply_session
+# Import Settings
+from settings.settings import settings
+# Import Logging
+from log.log import setup_logging, get_logger
+
+# ------------------------------------------------------------------------------
+# Logging setup
+# ------------------------------------------------------------------------------
+setup_logging(settings.LOG_LEVEL, settings.LOG_TO_FILE, settings.LOG_FILE)
+logger = get_logger(__name__)
+
+# ------------------------------------------------------------------------------
+# App init
+# ------------------------------------------------------------------------------
+app = FastAPI(
+ title=settings.APP_NAME,
+ version=settings.APP_VERSION,
+)
-# Start FastAPI app
-app = FastAPI()
+# ------------------------------------------------------------------------------
+# Routers
+# ------------------------------------------------------------------------------
+app.include_router(about_router)
app.include_router(health_router)
app.include_router(login_router)
app.include_router(hosts_router)
-# Allow frontend JS to call the API
+# ------------------------------------------------------------------------------
+# Startup log
+# ------------------------------------------------------------------------------
+@app.on_event("startup")
+async def startup_log():
+ logger = get_logger(__name__)
+
+ safe_secret = "****" if settings.SECRET_KEY else "undefined"
+ safe_admin_pwd = "****" if settings.ADMIN_PASSWORD else "undefined"
+ safe_admin_hash = "****" if settings.ADMIN_PASSWORD_HASH else "undefined"
+
+ logger.info(
+ "%s starting | app_version=%s | baseimg_version=%s",
+ settings.APP_NAME, settings.APP_VERSION, settings.BASEIMG_VERSION
+ )
+ logger.info(
+ "App settings: frontend=%s | port=%d | secret=%s",
+ settings.FRONTEND_DIR, settings.HTTP_PORT, safe_secret
+ )
+ logger.info(
+ "Database: file=%s | reset=%s",
+ settings.DB_FILE, settings.DB_RESET
+ )
+ logger.info(
+ "Log: level=%s, to_file=%s, file=%s",
+ settings.LOG_LEVEL, settings.LOG_TO_FILE, settings.LOG_FILE
+ )
+ logger.info(
+ "Users: admin=%s | password=%s | hash=%s | hash_file=%s",
+ settings.ADMIN_USER, safe_admin_pwd, safe_admin_hash, settings.ADMIN_PASSWORD_HASH_FILE
+ )
+
+# ------------------------------------------------------------------------------
+# CORS
+# ------------------------------------------------------------------------------
+cors_origins = [
+ f"http://localhost:{settings.HTTP_PORT}",
+ f"http://127.0.0.1:{settings.HTTP_PORT}",
+]
+
app.add_middleware(
CORSMiddleware,
- allow_origins=[
- f"http://localhost:{HTTP_PORT}",
- f"http://127.0.0.1:{HTTP_PORT}",
- ],
- allow_methods=["GET", "POST", "PUT", "DELETE"],
- allow_headers=["Content-Type"],
+ allow_origins=cors_origins,
+ allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
+ allow_headers=["Content-Type", "Authorization"],
+ allow_credentials=True,
)
-# ---------------------------------------------------------
-# Middleware to manage Login
-# ---------------------------------------------------------
-
+# ------------------------------------------------------------------------------
+# Session / Auth middleware
+# ------------------------------------------------------------------------------
@app.middleware("http")
async def session_middleware(request: Request, call_next):
path = request.url.path
if (
path.startswith("/css") or
path.startswith("/js") or
- path.endswith(".js") or
- path.endswith(".css") or
- path.endswith(".png") or
- path.endswith(".jpg") or
- path.endswith(".ico")
+ path.startswith("/static") or
+ path.endswith((".js", ".css", ".png", ".jpg", ".jpeg", ".ico", ".svg", ".map"))
):
return await call_next(request)
if path.startswith("/api"):
if not is_logged_in(request):
return JSONResponse({"error": "Not authenticated"}, status_code=401)
-
response = await call_next(request)
# Sliding expiration
apply_session(response, username=None, token=token)
apply_session(response, username=None, token=token)
return response
-# ---------------------------------------------------------
-# FRONTEND PATHS (absolute paths inside Docker)
-# ---------------------------------------------------------
-
+# ------------------------------------------------------------------------------
+# FRONTEND (FileResponse): pages and assets
+# ------------------------------------------------------------------------------
# Homepage
@app.get("/")
def home(request: Request):
- return FileResponse(os.path.join(FRONTEND_DIR, "hosts.html"))
+ return FileResponse(os.path.join(settings.FRONTEND_DIR, "hosts.html"))
# CSS variables
@app.get("/css/variables.css")
-def home(request: Request):
- return FileResponse(os.path.join(FRONTEND_DIR, "css/variables.css"))
+def css_variables(request: Request):
+ return FileResponse(os.path.join(settings.FRONTEND_DIR, "css/variables.css"))
-# CSS Layoyt
+# CSS Layout
@app.get("/css/layout.css")
-def home(request: Request):
- return FileResponse(os.path.join(FRONTEND_DIR, "css/layout.css"))
+def css_layout(request: Request):
+ return FileResponse(os.path.join(settings.FRONTEND_DIR, "css/layout.css"))
+
--- /dev/null
+# backend/about.py
+
+# Import standard modules
+from fastapi import APIRouter
+# Import Settings
+from settings.settings import settings
+
+# Create Router
+router = APIRouter()
+
+# ---------------------------------------------------------
+# API ENDPOINTS
+# ---------------------------------------------------------
+@router.get("/about")
+def about():
+ return {
+ "app": {
+ "version": settings.APP_VERSION
+ },
+ "baseimg": {
+ "name": settings.BASEIMG_NAME,
+ "version": settings.BASEIMG_VERSION
+ },
+ "domain": settings.DOMAIN,
+ "admin_hash_loaded": settings.ADMIN_PASSWORD_HASH is not None,
+ }
import sqlite3
import time
import os
-# Import config variables
-from backend.config import APP_VERSION, BASEIMG_NAME, BASEIMG_VERSION, DB_FILE
+# Import Settings
+from settings.settings import settings
# Create Router
router = APIRouter()
# ---------------------------------------------------------
# API ENDPOINTS
# ---------------------------------------------------------
-
@router.get("/api/health", tags=["health"])
def health():
start = time.time()
db_size = None
try:
- conn = sqlite3.connect(DB_FILE)
+ conn = sqlite3.connect(settings.DB_FILE)
cursor = conn.cursor()
cursor.execute("select sqlite_version()")
conn.close()
- db_size = round(os.path.getsize(DB_FILE) / (1024 * 1024), 2)
+ db_size = round(os.path.getsize(settings.DB_FILE) / (1024 * 1024), 2)
except Exception as e:
db_status = "error"
return {
"status": "ok" if db_status == "ok" else "degraded",
- "app": {
- "version": APP_VERSION
- },
- "baseimg": {
- "name": BASEIMG_NAME,
- "version": BASEIMG_VERSION
- },
"latency_ms": latency,
"database": {
"status": db_status,
"size_mb": db_size
}
}
-
update_host,
delete_host
)
-# Import config variables
-from backend.config import FRONTEND_DIR
+# Import Settings
+from settings.settings import settings
# Create Router
router = APIRouter()
# ---------------------------------------------------------
# FRONTEND PATHS (absolute paths inside Docker)
# ---------------------------------------------------------
-
# Hosts page
@router.get("/hosts")
def hosts(request: Request):
- return FileResponse(os.path.join(FRONTEND_DIR, "hosts.html"))
+ return FileResponse(os.path.join(settings.FRONTEND_DIR, "hosts.html"))
# Serve hosts.css
@router.get("/css/hosts.css")
def css_hosts():
- return FileResponse(os.path.join(FRONTEND_DIR, "css/hosts.css"))
+ return FileResponse(os.path.join(settings.FRONTEND_DIR, "css/hosts.css"))
# Serve hosts.js
@router.get("/js/hosts.js")
def css_hosts():
- return FileResponse(os.path.join(FRONTEND_DIR, "js/hosts.js"))
+ return FileResponse(os.path.join(settings.FRONTEND_DIR, "js/hosts.js"))
# ---------------------------------------------------------
# API ENDPOINTS
# ---------------------------------------------------------
-
@router.get("/api/hosts")
def api_get_hosts(request: Request):
return get_hosts()
import time
# Import local modules
from backend.security import verify_login, apply_session
-# Import config variables
-from backend.config import FRONTEND_DIR, LOGIN_MAX_ATTEMPTS, LOGIN_WINDOW_SECONDS
+# Import Settings
+from settings.settings import settings
# Create Router
router = APIRouter()
now = time.time()
attempts = login_attempts.get(ip, [])
# tieni solo tentativi negli ultimi LOGIN_WINDOW_SECONDS secondi
- attempts = [t for t in attempts if now - t < LOGIN_WINDOW_SECONDS]
+ attempts = [t for t in attempts if now - t < settings.LOGIN_WINDOW_SECONDS]
- if len(attempts) >= LOGIN_MAX_ATTEMPTS:
+ if len(attempts) >= settings.LOGIN_MAX_ATTEMPTS:
raise HTTPException(status_code=429, detail="Too many login attempts")
# registra nuovo tentativo
# ---------------------------------------------------------
# FRONTEND PATHS (absolute paths inside Docker)
# ---------------------------------------------------------
-
# Login page
@router.get("/login")
def login_page(request: Request):
- return FileResponse(os.path.join(FRONTEND_DIR, "login.html"))
+ return FileResponse(os.path.join(settings.FRONTEND_DIR, "login.html"))
# Serve login.css
@router.get("/css/login.css")
def css_login():
- return FileResponse(os.path.join(FRONTEND_DIR, "css/login.css"))
+ return FileResponse(os.path.join(settings.FRONTEND_DIR, "css/login.css"))
# Serve login.js
@router.get("/js/login.js")
def css_login():
- return FileResponse(os.path.join(FRONTEND_DIR, "js/login.js"))
+ return FileResponse(os.path.join(settings.FRONTEND_DIR, "js/login.js"))
# Serve session.js
@router.get("/js/session.js")
def css_login():
- return FileResponse(os.path.join(FRONTEND_DIR, "js/session.js"))
+ return FileResponse(os.path.join(settings.FRONTEND_DIR, "js/session.js"))
# ---------------------------------------------------------
# API ENDPOINTS
# ---------------------------------------------------------
-
@router.post("/api/login")
def api_login(request: Request, data: dict, response: Response):
ip = request.client.host
from itsdangerous import TimestampSigner
# Import local modules
from backend.db.users import get_user_by_username
-from backend.utils import log_event
-# Import config variables
-from backend.config import FRONTEND_DIR, SECRET_KEY
+# Import Settings
+from settings.settings import settings
+# Import Log
+from log.log import get_logger
-signer = TimestampSigner(SECRET_KEY)
+signer = TimestampSigner(settings.SECRET_KEY)
# -----------------------------
# Verify Login
# -----------------------------
def verify_login(username, password):
+ logger = get_logger(__name__)
user = get_user_by_username(username)
if not user:
- log_event("LOGIN failed - user not found", user=username)
+ logger.error("LOGIN failed - user %s not found", username)
return False
if user["status"] != "active":
- log_event("LOGIN Failed - user disabled", user=username)
+ logger.error("LOGIN Failed - user %s disabled", username)
return False
if not bcrypt.checkpw(password.encode(), user["password_hash"].encode()):
- log_event("LOGIN Failed - password wrong", user=username)
+ logger.error("LOGIN Failed - password wrong for user %s", username)
return False
- log_event("LOGIN", user=username)
+ logger.info("LOGIN user %s", username)
return True
# ----------------------------
# creates or renew the cookie
# ----------------------------
def apply_session(response, username: str | None = None, token: str | None = None):
+ logger = get_logger(__name__)
# First Login
if username is not None and token is None:
token = signer.sign(username).decode()
- log_event("SESSION_CREATE", user=username)
+ logger.info("SESSION_CREATE - %s", username)
if username is None:
username = signer.unsign(token, max_age=86400).decode()
- log_event("SESSION_UPDATE", user=username)
+ logger.info("SESSION_UPDATE - %s", username)
if username is None or token is None:
- log_event("SESSION_ERROR")
+ logger.error("SESSION_ERROR")
return
response.set_cookie(
# backend/db/utils.py
# Import standard modules
-from datetime import datetime
import os
# -----------------------------
# Load hash from file
# -----------------------------
-def load_hash(hash_file: str):
- path = os.environ.get(hash_file)
+def load_hash(path: str):
if path and os.path.exists(path):
with open(path, "r") as f:
return f.read().strip()
return None
-
-# -----------------------------
-# Log Event
-# -----------------------------
-def log_event(event: str, **fields):
- ts = datetime.utcnow().isoformat() + "Z"
- parts = " ".join(f"{k}={v}" for k, v in fields.items())
- print(f"INFO: {ts} {event} {parts}")
-
#!/usr/local/bin/python3
# Import standard modules
+import logging
import os
import sys
+import argparse
# Import local modules
from backend.db.db import init_db
import backend.db.hosts
import backend.db.users
+# Import Settings
+from settings.settings import settings
+# Import Log
+from log.log import setup_logging, get_logger
# ================================
-# Variables
+# Parse CLI arguments
# ================================
-BASEIMG_NAME = "network-manager-distroless"
-BASEIMG_VERSION = "0.2"
-
-from backend.config import DB_FILE
-from backend.config import DB_RESET
+def parse_args():
+ parser = argparse.ArgumentParser(add_help=False)
+ parser.add_argument("--reset", action="store_true")
+ parser.add_argument("--domain")
+ parser.add_argument("--public-ip")
+ parser.add_argument("cmd", nargs=argparse.REMAINDER)
+ return parser.parse_args()
# ================================
# Create DB if needed
# ================================
-def docker_create_db():
+def docker_create_db(logger):
# Reset database if requested
- if DB_RESET and os.path.exists(DB_FILE):
- print("INFO: Removing existing database.")
- os.remove(DB_FILE)
+ if settings.DB_RESET and os.path.exists(settings.DB_FILE):
+ logger.info("Removing existing database: %s", settings.DB_FILE)
+ os.remove(settings.DB_FILE)
# Skip creation if DB already exists
- if os.path.exists(DB_FILE):
- print("INFO: Database already exists. Nothing to do.")
+ if os.path.exists(settings.DB_FILE):
+ logger.info("Database already exists. Nothing to do.")
return
- print(f"INFO: Creating database: {DB_FILE}.")
+ logger.info("Creating database: %s", settings.DB_FILE)
# Ensure directory exists
- os.makedirs(os.path.dirname(DB_FILE) or ".", exist_ok=True)
+ os.makedirs(os.path.dirname(settings.DB_FILE) or ".", exist_ok=True)
- # Initialize all registered DB tables
+ # Initialize DB tables
init_db()
# ================================
# Entry Point
# ================================
+def main():
+ # Enable logging
+ setup_logging()
+ logger = get_logger("baseimg")
+
+ # Log startup docker image
+ logger.info("Starting docker image %s version %s", settings.BASEIMG_NAME, settings.BASEIMG_VERSION)
+
+ # Parse arguments
+ args = parse_args()
+
+ # Apply arguments into settings
+ if args.reset:
+ settings.DB_RESET = True
+ if args.domain:
+ settings.DOMAIN = args.domain
+ if args.public_ip:
+ settings.PUBLIC_IP = args.public_ip
+
+ # Create or update database
+ docker_create_db(logger)
+
+ # If no command provided -> error
+ if not args.cmd:
+ logger.error("No command provided. Exiting.")
+ sys.exit(1)
+
+ cmd = args.cmd[0]
+ rest = args.cmd[1:]
+
+ logger.info("Docker image initialization completed — executing: %s %s", cmd, " ".join(rest))
+
+ try:
+ os.execvp(cmd, [cmd, *rest])
+ except FileNotFoundError:
+ logger.critical("Command not found: %s", cmd)
+ sys.exit(1)
-# Force flush
-sys.stdout.reconfigure(line_buffering=True)
-
-print(f"INFO: Starting {BASEIMG_NAME} docker image version {BASEIMG_VERSION}.")
-os.environ["BASEIMG_NAME"] = BASEIMG_NAME
-os.environ["BASEIMG_VERSION"] = BASEIMG_VERSION
-
-# Parse arguments
-args = sys.argv[1:]
-i = 0
-while i < len(args):
- if args[i] == "--reset":
- DB_RESET = True
- i += 1
- elif args[i] == "--domain" and i + 1 < len(args):
- DOMAIN = args[i + 1]
- i += 2
- elif args[i] == "--public-ip" and i + 1 < len(args):
- PUBLIC_IP = args[i + 1]
- i += 2
- elif args[i] == "--":
- args = args[i + 1:]
- break
- else:
- break
-
-# Create DB
-docker_create_db()
-
-# Continue to CMD
-if not args:
- print("ERROR: No command provided to exec.")
- sys.exit(1)
-
-os.execvp(args[0], args)
+if __name__ == "__main__":
+ main()
--- /dev/null
+# log/log.py
+
+import logging
+import logging.config
+import os
+import sys
+
+# Module-level flag to prevent re-initialization
+_INITIALIZED = False
+
+# ---------------------------------------------------------
+# Build a full dictConfig for logging
+# ---------------------------------------------------------
+def build_log_config(level: str = "INFO", to_file: bool = False, file: str | None = None) -> dict:
+ """
+ Returns a complete dictConfig for the logging system, including formatters,
+ console/file handlers, and specific loggers (uvicorn, fastapi, etc).
+ """
+ level = (level or "INFO").upper()
+
+ formatters = {
+ "detailed": {
+ "format": "%(asctime)s %(levelname)s [%(name)s] %(message)s",
+ "datefmt": "%Y-%m-%dT%H:%M:%S%z",
+ },
+ "access": {
+ "format": '%(asctime)s %(levelname)s [%(name)s] '
+ '%(client_addr)s - "%(request_line)s" %(status_code)s',
+ "datefmt": "%Y-%m-%dT%H:%M:%S%z",
+ },
+ }
+
+ handlers = {
+ "console": {
+ "class": "logging.StreamHandler",
+ "level": level,
+ "formatter": "detailed",
+ "stream": "ext://sys.stdout",
+ }
+ }
+
+ # Select active handler based on console
+ active_handlers = ["console"]
+
+ if to_file:
+ if file is not None:
+ # Ensure the log directory exists and add a rotating file handler
+ log_dir = os.path.dirname(file) or "."
+ os.makedirs(log_dir, exist_ok=True)
+ handlers["file"] = {
+ "class": "logging.handlers.RotatingFileHandler",
+ "level": level,
+ "formatter": "detailed",
+ "filename": file,
+ "maxBytes": 5 * 1024 * 1024,
+ "backupCount": 5,
+ "encoding": "utf-8",
+ }
+ # Add active handler based on file
+ active_handlers.append("file")
+
+ return {
+ "version": 1,
+ "disable_existing_loggers": False,
+ "formatters": formatters,
+ "handlers": handlers,
+ "root": {
+ "level": level,
+ "handlers": active_handlers,
+ },
+ # Modules
+ "loggers": {
+ # Uvicorn core
+ "uvicorn": {
+ "level": level,
+ "handlers": active_handlers,
+ "propagate": False,
+ },
+ # Uvicorn internal error
+ "uvicorn.error": {
+ "level": level,
+ "handlers": active_handlers,
+ "propagate": False,
+ },
+ # Uvicorn HTTP access
+ "uvicorn.access": {
+ "level": level,
+ "handlers": active_handlers,
+ "propagate": False,
+ },
+ # FastAPI
+ "fastapi": {
+ "level": level,
+ "handlers": active_handlers,
+ "propagate": False,
+ },
+ },
+ }
+
+# ---------------------------------------------------------
+# Initialize logging once (singleton guard)
+# ---------------------------------------------------------
+def setup_logging(level: str = "INFO", to_file: bool = False, file: str | None = None) -> None:
+ """
+ Initializes the logging system only once. Subsequent calls are no-ops.
+ Useful to prevent duplicated handlers or reconfiguration side effects.
+ """
+ global _INITIALIZED
+
+ if _INITIALIZED:
+ return
+
+ config = build_log_config(level=level, to_file=to_file, file=file)
+ logging.config.dictConfig(config)
+
+ logging.getLogger(__name__).info(
+ "Logging configured (level=%s, to_file=%s, file=%s)",
+ level.upper(), to_file, file
+ )
+
+ _INITIALIZED = True
+
+# ---------------------------------------------------------
+# Get a configured logger for the given module/name
+# ---------------------------------------------------------
+def get_logger(name: str = None) -> logging.Logger:
+ """
+ Returns a logger instance configured via the module setup. If setup was not
+ called yet, it falls back to the standard logging defaults.
+ """
+ if not name:
+ name = __name__
+ return logging.getLogger(name)
--- /dev/null
+# backend/config.py
+
+# ---------------------------------------------------------
+# BASEIMG
+# ---------------------------------------------------------
+BASEIMG_NAME = "network-manager-distroless"
+BASEIMG_VERSION = "0.3"
+
+# ---------------------------------------------------------
+# APP
+# ---------------------------------------------------------
+APP_NAME = "network-manager"
+APP_VERSION = "0.2.0"
--- /dev/null
+# backend/default.py
+
+# ---------------------------------------------------------
+# Frontend
+# ---------------------------------------------------------
+FRONTEND_DIR = "/app/frontend"
+
+# ---------------------------------------------------------
+# Database
+# ---------------------------------------------------------
+DB_FILE = "/data/database.db"
+DB_RESET = False
+
+# ---------------------------------------------------------
+# Log
+# ---------------------------------------------------------
+LOG_LEVEL = "INFO"
+LOG_TO_FILE = False
+LOG_FILE = "/data/app.log"
+
+# ---------------------------------------------------------
+# Host
+# ---------------------------------------------------------
+DOMAIN = "example.com"
+PUBLIC_IP = "127.0.0.1"
+
+# ---------------------------------------------------------
+# Web
+# ---------------------------------------------------------
+HTTP_PORT = "8000"
+LOGIN_MAX_ATTEMPTS = "5"
+LOGIN_WINDOW_SECONDS = "600"
+
+# ---------------------------------------------------------
+# Admin
+# ---------------------------------------------------------
+ADMIN_USER = "admin"
+ADMIN_PASSWORD = "admin"
+ADMIN_PASSWORD_HASH_FILE = "/run/secrets/admin_password_hash"
--- /dev/null
+# backend/settings.py
+
+from __future__ import annotations
+
+# import standard modules
+import os
+import secrets
+import datetime
+from pathlib import Path
+from typing import Optional
+from pydantic import BaseModel, Field, field_validator
+# Import Parameters
+from . import config, default
+
+# ---------------------------------------------------------
+# Convert value to boolean
+# ---------------------------------------------------------
+def _to_bool(val) -> bool:
+ if isinstance(val, bool):
+ return val
+ if val is None:
+ return False
+ return str(val).strip().lower() in {"1", "true", "yes", "on"}
+
+# ---------------------------------------------------------
+# Read text from file if it exists
+# ---------------------------------------------------------
+def _read_text_if_exists(path: Optional[str]) -> Optional[str]:
+ if not path:
+ return None
+ p = Path(path)
+ if p.exists() and p.is_file():
+ try:
+ return p.read_text(encoding="utf-8").strip()
+ except Exception:
+ return None
+ return None
+
+# ---------------------------------------------------------
+# Settings Model
+# ---------------------------------------------------------
+class Settings(BaseModel):
+ # Naming
+ APP_NAME: str = Field(default_factory=lambda: config.APP_NAME)
+
+ # Versioning
+ APP_VERSION: str = Field(default_factory=lambda: config.APP_VERSION)
+ DEVEL: bool = Field(default_factory=lambda: _to_bool(os.getenv("DEV", False)))
+
+ # Base Image / Docker Image
+ BASEIMG_NAME: str = Field(default_factory=lambda: config.BASEIMG_NAME)
+ BASEIMG_VERSION: str = Field(default_factory=lambda: config.BASEIMG_VERSION)
+
+ # Frontend
+ FRONTEND_DIR: str = Field(default_factory=lambda: os.getenv("FRONTEND_DIR", default.FRONTEND_DIR))
+
+ # Database
+ DB_FILE: str = Field(default_factory=lambda: os.getenv("DB_FILE", default.DB_FILE))
+ DB_RESET: bool = Field(default_factory=lambda: _to_bool(os.getenv("DB_RESET", default.DB_RESET)))
+
+ # Log
+ LOG_LEVEL: str = Field(default_factory=lambda: os.getenv("LOG_LEVEL", default.LOG_LEVEL))
+ LOG_TO_FILE: bool = Field(default_factory=lambda: _to_bool(os.getenv("LOG_TO_FILE", default.LOG_TO_FILE)))
+ LOG_FILE: str = Field(default_factory=lambda: os.getenv("LOG_FILE", default.LOG_FILE))
+
+ # Hosts
+ DOMAIN: str = Field(default_factory=lambda: os.getenv("DOMAIN", default.DOMAIN))
+ PUBLIC_IP: str = Field(default_factory=lambda: os.getenv("PUBLIC_IP", default.DOMAIN))
+
+ # Web
+ HTTP_PORT: int = Field(default_factory=lambda: int(os.getenv("HTTP_PORT", default.HTTP_PORT)))
+ SECRET_KEY: str = Field(default_factory=lambda: (
+ (os.getenv("SESSION_SECRET") or _read_text_if_exists(os.getenv("SECRET_KEY_FILE")) or secrets.token_urlsafe(64)).strip()
+ ))
+ LOGIN_MAX_ATTEMPTS: int = Field(default_factory=lambda: int(os.getenv("LOGIN_MAX_ATTEMPTS", default.LOGIN_MAX_ATTEMPTS)))
+ LOGIN_WINDOW_SECONDS: int = Field(default_factory=lambda: int(os.getenv("LOGIN_WINDOW_SECONDS", default.LOGIN_WINDOW_SECONDS)))
+
+ # Admin
+ ADMIN_USER: str = Field(default_factory=lambda: os.getenv("ADMIN_USER", default.ADMIN_USER))
+ ADMIN_PASSWORD: str = Field(default_factory=lambda: os.getenv("ADMIN_PASSWORD", default.ADMIN_PASSWORD))
+ ADMIN_PASSWORD_HASH_FILE: str = Field(default_factory=lambda: os.getenv("ADMIN_PASSWORD_HASH_FILE", default.ADMIN_PASSWORD_HASH_FILE))
+ ADMIN_PASSWORD_HASH: Optional[str] = Field(default_factory=lambda: (
+ (os.getenv("ADMIN_PASSWORD_HASH") or _read_text_if_exists(os.getenv("ADMIN_PASSWORD_HASH_FILE", default.ADMIN_PASSWORD_HASH_FILE)) or None)
+ ))
+
+ def model_post_init(self, __context) -> None:
+ if self.DEVEL:
+ ts = datetime.datetime.now().strftime("%Y%m%d-%H%M")
+ object.__setattr__(self, "APP_VERSION", f"{self.APP_VERSION}-dev-{ts}")
+ else:
+ object.__setattr__(self, "APP_VERSION", self.APP_VERSION)
+
+# ---------------------------------------------------------
+# Singleton
+# ---------------------------------------------------------
+settings = Settings()