From 527d14e0d2d92bb581de52d31b2196cb7c472688 Mon Sep 17 00:00:00 2001 From: Giorgio Ravera Date: Fri, 29 May 2026 23:54:27 +0200 Subject: [PATCH] Review database --- backend/db/aliases.py | 5 +-- backend/db/config.py | 10 +++++- backend/db/hosts.py | 27 +++++++++------ backend/db/leases.py | 81 +++++++++++++++++++++---------------------- backend/db/users.py | 40 +++++++++++---------- 5 files changed, 89 insertions(+), 74 deletions(-) diff --git a/backend/db/aliases.py b/backend/db/aliases.py index d0b30ed..1c803ff 100644 --- a/backend/db/aliases.py +++ b/backend/db/aliases.py @@ -107,7 +107,7 @@ def add_alias(data: Dict[str, Any]) -> int: except sqlite3.IntegrityError: conn.rollback() - return -1 + raise ValueError("Alias already exists or unique constraint failed") except Exception as err: conn.rollback() @@ -127,7 +127,8 @@ def update_alias(alias_id: int, data: Dict[str, Any]) -> bool: cur = conn.execute( """ UPDATE aliases - SET name=?, target=?, description=?, ssl_enabled=?, visibility=?, last_updated=CURRENT_TIMESTAMP + SET name=?, target=?, description=?, ssl_enabled=?, visibility=?, + last_updated=strftime('%Y-%m-%dT%H:%M:%SZ','now') WHERE id=? """, ( diff --git a/backend/db/config.py b/backend/db/config.py index 1de99ac..4507c74 100644 --- a/backend/db/config.py +++ b/backend/db/config.py @@ -2,6 +2,7 @@ # Import local modules from backend.db.db import get_db, register_init +from backend.utils import to_bool # Import Settings from backend.settings.settings import settings @@ -11,12 +12,19 @@ from backend.log.log import get_logger # Logger initialization logger = get_logger(__name__) +# --------------------------------------------------------- +# Internal: wrapper to to_bool +# --------------------------------------------------------- +def _to_bool(v): + result = to_bool(v) + return result if result is not None else False + # --------------------------------------------------------- # Type mapping for config keys # --------------------------------------------------------- CONFIG_TYPES = { "LOG_LEVEL": str, - "LOG_TO_FILE": lambda v: v.lower() in ("1", "true", "yes"), + "LOG_TO_FILE": _to_bool, "EXTERNAL_NAME": str, "LOGIN_MAX_ATTEMPTS": int, "LOGIN_WINDOW_SECONDS": int, diff --git a/backend/db/hosts.py b/backend/db/hosts.py index 224d3a4..937dd70 100644 --- a/backend/db/hosts.py +++ b/backend/db/hosts.py @@ -48,8 +48,10 @@ def validate_data(data: Dict[str, Any]) -> Dict[str, Any]: # Check MAC mac = data.get("mac") - if mac and not MAC_RE.match(mac): - raise ValueError(f"Invalid MAC address: {mac}") + if mac: + mac = mac.strip() + if not MAC_RE.match(mac): + raise ValueError(f"Invalid MAC address: {mac}") # Check description description = data.get("description") @@ -65,7 +67,7 @@ def validate_data(data: Dict[str, Any]) -> Dict[str, Any]: "name": normalize(name), "ipv4": normalize(ipv4), "ipv6": normalize(ipv6), - "mac": normalize(mac), + "mac": normalize(mac).lower() if mac else None, "description": normalize(description), "ssl_enabled": ssl_enabled, "visibility": visibility, @@ -89,16 +91,18 @@ def ipv4_sort_key(h: Dict[str, Any]): # ----------------------------- def get_hosts(filter_devices: bool = False) -> List[Dict[str, Any]]: conn = get_db() - if (filter_devices != True): - cur = conn.execute("SELECT * FROM hosts") - else: - cur = conn.execute("SELECT id, ipv4, mac, name, description FROM hosts WHERE ipv4 IS NOT NULL") + query = ( + "SELECT * FROM hosts" + if not filter_devices + else "SELECT id, ipv4, mac, name, description FROM hosts WHERE ipv4 IS NOT NULL" + ) + cur = conn.execute(query) rows = [] for r in cur.fetchall(): item = dict(r) - if (filter_devices == True): - item["id"] = f"s-{item['id']}" + if filter_devices: + item["id"] = f"s-{item['id']}" # Frontend requires this format rows.append(item) rows.sort(key=ipv4_sort_key) return rows @@ -151,7 +155,7 @@ def add_host(data: Dict[str, Any]) -> int: except sqlite3.IntegrityError: conn.rollback() - return -1 + raise ValueError("Host already exists or unique constraint failed") except Exception as err: conn.rollback() @@ -171,7 +175,8 @@ def update_host(host_id: int, data: Dict[str, Any]) -> bool: cur = conn.execute( """ UPDATE hosts - SET name=?, ipv4=?, ipv6=?, mac=?, description=?, ssl_enabled=?, visibility=?, last_updated=CURRENT_TIMESTAMP + SET name=?, ipv4=?, ipv6=?, mac=?, description=?, ssl_enabled=?, visibility=?, + last_updated=strftime('%Y-%m-%dT%H:%M:%SZ','now') WHERE id=? """, ( diff --git a/backend/db/leases.py b/backend/db/leases.py index 6e82ea8..b4bc3f3 100644 --- a/backend/db/leases.py +++ b/backend/db/leases.py @@ -13,21 +13,21 @@ from backend.settings.settings import settings # Import Logging from backend.log.log import get_logger +ALIASES_MAP = { + "client_id": "client-id", + "valid_lifetime": "valid-lft", + "subnet_id": "subnet-id", + "fqdn_fwd": "fqdn-fwd", + "fqdn_rev": "fqdn-rev", + "user_context": "user-context", + "pool_id": "pool-id", +} + # ----------------------------- # Normalizes column names to expected keys # ----------------------------- def _norm(col: str) -> str: - col = (col or "").strip() - aliases = { - "client_id": "client-id", - "valid_lifetime": "valid-lft", - "subnet_id": "subnet-id", - "fqdn_fwd": "fqdn-fwd", - "fqdn_rev": "fqdn-rev", - "user_context": "user-context", - "pool_id": "pool-id", - } - return aliases.get(col, col) + return ALIASES_MAP.get((col or "").strip(), col) # Logger initialization logger = get_logger(__name__) @@ -49,40 +49,37 @@ def get_leases(filter_devices: bool = False) -> List[Dict[str, Any]]: if not reader.fieldnames: return [] - if(filter_devices != True): - for raw in reader: - rec = { _norm(k): (v if v is not None else "") for k, v in raw.items() } + for raw in reader: + rec = {_norm(k): (v if v is not None else "") for k, v in raw.items()} + + base = { + "ipv4": rec.get("address", "").strip() or None, + "mac": rec.get("hwaddr", "").strip().lower() or None, + "name": rec.get("hostname", "").strip() or None, + "dhcp_state": rec.get("state", "").strip() or None, + } + if not filter_devices: item = { - "id": index, - "ipv4": rec.get("address", "").strip() or None, - "mac": rec.get("hwaddr", "").strip().lower() or None, - "client_id": rec.get("client-id", "").strip() or None, + "id": index, + **base, + "client_id": rec.get("client-id", "").strip() or None, "valid_lifetime": to_int(rec.get("valid-lft", "")), - "expire": rec.get("expire", "").strip() or None, - "subnet_id": to_int(rec.get("subnet-id", "")), - "fqdn_fwd": to_bool(rec.get("fqdn-fwd", "")), - "fqdn_rev": to_bool(rec.get("fqdn-rev", "")), - "name": rec.get("hostname", "").strip() or None, - "dhcp_state": rec.get("state", "").strip() or None, - "user_context": rec.get("user-context", "").strip() or None, # spesso JSON serializzato - "pool_id": to_int(rec.get("pool-id", "")), + "expire": rec.get("expire", "").strip() or None, + "subnet_id": to_int(rec.get("subnet-id", "")), + "fqdn_fwd": to_bool(rec.get("fqdn-fwd", "")), + "fqdn_rev": to_bool(rec.get("fqdn-rev", "")), + "user_context": rec.get("user-context", "").strip() or None, + "pool_id": to_int(rec.get("pool-id", "")), } - leases.append(item) - index += 1 - else: - for raw in reader: - rec = { _norm(k): (v if v is not None else "") for k, v in raw.items() } - + else: item = { - "id": f"d-{index}", - "ipv4": rec.get("address", "").strip() or None, - "mac": rec.get("hwaddr", "").strip().lower() or None, - "name": rec.get("hostname", "").strip() or None, - "dhcp_state": rec.get("state", "").strip() or None, + "id": f"d-{index}", # Frontend requires this format + **base, } - leases.append(item) - index += 1 + + leases.append(item) + index += 1 return leases @@ -130,7 +127,7 @@ def delete_lease(lease_id: int): if not path.exists(): raise FileNotFoundError(f"File not found: {path}") - with path.open("r") as f: + with path.open("r", encoding="utf-8", newline="") as f: lines = f.readlines() # file empty or only header @@ -146,8 +143,8 @@ def delete_lease(lease_id: int): raise ValueError(f"Lease index out of range: {lease_id}") # delete the line - deleted_line = data_lines.pop(index) + data_lines.pop(index) # Rewrite the file without the deleted line - with path.open("w") as f: + with path.open("w", encoding="utf-8", newline="") as f: f.writelines([header] + data_lines) diff --git a/backend/db/users.py b/backend/db/users.py index 56879df..ee86b3b 100644 --- a/backend/db/users.py +++ b/backend/db/users.py @@ -3,8 +3,6 @@ # Import standard modules import bcrypt import json -import logging -import os # Import local modules from backend.db.db import get_db, register_init @@ -32,7 +30,8 @@ def get_user_by_username(username): conn = get_db() cur = conn.cursor() cur.execute("SELECT * FROM users WHERE username = ?", (username,)) - return cur.fetchone() + row = cur.fetchone() + return dict(row) if row else None # ----------------------------- # Create User @@ -41,20 +40,25 @@ def create_user(username, password_hash, email=None, is_admin=0, modules=None): conn = get_db() cur = conn.cursor() - cur.execute(""" - INSERT INTO users ( - username, password_hash, email, is_admin, modules, status, - created_at, updated_at, password_changed_at - ) VALUES (?, ?, ?, ?, ?, 'active', strftime('%s','now'), strftime('%s','now'), strftime('%s','now')); - """, ( - username, - password_hash, - email, - is_admin, - json.dumps(modules or []) - )) - - conn.commit() + try: + cur.execute(""" + INSERT INTO users ( + username, password_hash, email, is_admin, modules, status, + created_at, updated_at, password_changed_at + ) VALUES (?, ?, ?, ?, ?, 'active', strftime('%s','now'), strftime('%s','now'), strftime('%s','now')); + """, ( + username, + password_hash, + email, + is_admin, + json.dumps(modules or []) + )) + conn.commit() + return cur.lastrowid + except Exception as err: + conn.rollback() + logger.error(f"USERS DB: Error creating user - {err}") + raise # ----------------------------- # Create Users Table @@ -103,6 +107,6 @@ def init_db_users_defaults(cur): password_hash, "admin@example.com", 1, - '["dns","dhcp"]', + json.dumps(["dns", "dhcp"]), "active" )) -- 2.47.3