]> git.giorgioravera.it Git - network-manager.git/commitdiff
Review database
authorGiorgio Ravera <giorgio.ravera@gmail.com>
Fri, 29 May 2026 21:54:27 +0000 (23:54 +0200)
committerGiorgio Ravera <giorgio.ravera@gmail.com>
Fri, 29 May 2026 21:54:27 +0000 (23:54 +0200)
backend/db/aliases.py
backend/db/config.py
backend/db/hosts.py
backend/db/leases.py
backend/db/users.py

index d0b30ed51b7d3dfc77b6b9a7770b252441fb72d7..1c803ff1e391c9beb09d255c39d74bb8a33434c4 100644 (file)
@@ -107,7 +107,7 @@ def add_alias(data: Dict[str, Any]) -> int:
 
     except sqlite3.IntegrityError:
         conn.rollback()
-        return -1
+        raise ValueError("Alias already exists or unique constraint failed")
 
     except Exception as err:
         conn.rollback()
@@ -127,7 +127,8 @@ def update_alias(alias_id: int, data: Dict[str, Any]) -> bool:
         cur = conn.execute(
             """
             UPDATE aliases
-            SET name=?, target=?, description=?, ssl_enabled=?, visibility=?, last_updated=CURRENT_TIMESTAMP
+            SET name=?, target=?, description=?, ssl_enabled=?, visibility=?,
+                last_updated=strftime('%Y-%m-%dT%H:%M:%SZ','now')
             WHERE id=?
             """,
             (
index 1de99ac480cdf30d60ff0a3468c610916838b3b2..4507c74b10e11ffe9649d8dc93a50248a0047891 100644 (file)
@@ -2,6 +2,7 @@
 
 # Import local modules
 from backend.db.db import get_db, register_init
+from backend.utils import to_bool
 
 # Import Settings
 from backend.settings.settings import settings
@@ -11,12 +12,19 @@ from backend.log.log import get_logger
 # Logger initialization
 logger = get_logger(__name__)
 
+# ---------------------------------------------------------
+# Internal: wrapper to to_bool
+# ---------------------------------------------------------
+def _to_bool(v):
+    result = to_bool(v)
+    return result if result is not None else False
+
 # ---------------------------------------------------------
 # Type mapping for config keys
 # ---------------------------------------------------------
 CONFIG_TYPES = {
     "LOG_LEVEL": str,
-    "LOG_TO_FILE": lambda v: v.lower() in ("1", "true", "yes"),
+    "LOG_TO_FILE": _to_bool,
     "EXTERNAL_NAME": str,
     "LOGIN_MAX_ATTEMPTS": int,
     "LOGIN_WINDOW_SECONDS": int,
index 224d3a463a2b35485266118061cf87f5889b0a84..937dd70e60664f00077cc6b935b0ed19911fa486 100644 (file)
@@ -48,8 +48,10 @@ def validate_data(data: Dict[str, Any]) -> Dict[str, Any]:
 
     # Check MAC
     mac = data.get("mac")
-    if mac and not MAC_RE.match(mac):
-        raise ValueError(f"Invalid MAC address: {mac}")
+    if mac:
+        mac = mac.strip()
+        if not MAC_RE.match(mac):
+            raise ValueError(f"Invalid MAC address: {mac}")
 
     # Check description
     description = data.get("description")
@@ -65,7 +67,7 @@ def validate_data(data: Dict[str, Any]) -> Dict[str, Any]:
         "name": normalize(name),
         "ipv4": normalize(ipv4),
         "ipv6": normalize(ipv6),
-        "mac": normalize(mac),
+        "mac": normalize(mac).lower() if mac else None,
         "description": normalize(description),
         "ssl_enabled": ssl_enabled,
         "visibility": visibility,
@@ -89,16 +91,18 @@ def ipv4_sort_key(h: Dict[str, Any]):
 # -----------------------------
 def get_hosts(filter_devices: bool = False) -> List[Dict[str, Any]]:
     conn = get_db()
-    if (filter_devices != True):
-        cur = conn.execute("SELECT * FROM hosts")
-    else:
-        cur = conn.execute("SELECT id, ipv4, mac, name, description FROM hosts WHERE ipv4 IS NOT NULL")
+    query = (
+        "SELECT * FROM hosts"
+        if not filter_devices
+        else "SELECT id, ipv4, mac, name, description FROM hosts WHERE ipv4 IS NOT NULL"
+    )
+    cur = conn.execute(query)
 
     rows = []
     for r in cur.fetchall():
         item = dict(r)
-        if (filter_devices == True):
-            item["id"] = f"s-{item['id']}"
+        if filter_devices:
+            item["id"] = f"s-{item['id']}"  # Frontend requires this format
         rows.append(item)
     rows.sort(key=ipv4_sort_key)
     return rows
@@ -151,7 +155,7 @@ def add_host(data: Dict[str, Any]) -> int:
 
     except sqlite3.IntegrityError:
         conn.rollback()
-        return -1
+        raise ValueError("Host already exists or unique constraint failed")
 
     except Exception as err:
         conn.rollback()
@@ -171,7 +175,8 @@ def update_host(host_id: int, data: Dict[str, Any]) -> bool:
         cur = conn.execute(
             """
             UPDATE hosts
-            SET name=?, ipv4=?, ipv6=?, mac=?, description=?, ssl_enabled=?, visibility=?, last_updated=CURRENT_TIMESTAMP
+            SET name=?, ipv4=?, ipv6=?, mac=?, description=?, ssl_enabled=?, visibility=?,
+                last_updated=strftime('%Y-%m-%dT%H:%M:%SZ','now')
             WHERE id=?
             """,
             (
index 6e82ea88ed100c201f1182d1d9e05b9c8cfd4d3c..b4bc3f384ca14f4f4ad419d5baa78d835c9d1a92 100644 (file)
@@ -13,21 +13,21 @@ from backend.settings.settings import settings
 # Import Logging
 from backend.log.log import get_logger
 
+ALIASES_MAP = {
+    "client_id": "client-id",
+    "valid_lifetime": "valid-lft",
+    "subnet_id": "subnet-id",
+    "fqdn_fwd": "fqdn-fwd",
+    "fqdn_rev": "fqdn-rev",
+    "user_context": "user-context",
+    "pool_id": "pool-id",
+}
+
 # -----------------------------
 # Normalizes column names to expected keys
 # -----------------------------
 def _norm(col: str) -> str:
-    col = (col or "").strip()
-    aliases = {
-        "client_id": "client-id",
-        "valid_lifetime": "valid-lft",
-        "subnet_id": "subnet-id",
-        "fqdn_fwd": "fqdn-fwd",
-        "fqdn_rev": "fqdn-rev",
-        "user_context": "user-context",
-        "pool_id": "pool-id",
-    }
-    return aliases.get(col, col)
+    return ALIASES_MAP.get((col or "").strip(), col)
 
 # Logger initialization
 logger = get_logger(__name__)
@@ -49,40 +49,37 @@ def get_leases(filter_devices: bool = False) -> List[Dict[str, Any]]:
         if not reader.fieldnames:
             return []
 
-        if(filter_devices != True):
-            for raw in reader:
-                rec = { _norm(k): (v if v is not None else "") for k, v in raw.items() }
+        for raw in reader:
+            rec = {_norm(k): (v if v is not None else "") for k, v in raw.items()}
+
+            base = {
+                "ipv4": rec.get("address", "").strip() or None,
+                "mac": rec.get("hwaddr", "").strip().lower() or None,
+                "name": rec.get("hostname", "").strip() or None,
+                "dhcp_state": rec.get("state", "").strip() or None,
+            }
 
+            if not filter_devices:
                 item = {
-                    "id":             index,
-                    "ipv4":           rec.get("address", "").strip() or None,
-                    "mac":            rec.get("hwaddr", "").strip().lower() or None,
-                    "client_id":      rec.get("client-id", "").strip() or None,
+                    "id": index,
+                    **base,
+                    "client_id": rec.get("client-id", "").strip() or None,
                     "valid_lifetime": to_int(rec.get("valid-lft", "")),
-                    "expire":         rec.get("expire", "").strip() or None,
-                    "subnet_id":      to_int(rec.get("subnet-id", "")),
-                    "fqdn_fwd":       to_bool(rec.get("fqdn-fwd", "")),
-                    "fqdn_rev":       to_bool(rec.get("fqdn-rev", "")),
-                    "name":           rec.get("hostname", "").strip() or None,
-                    "dhcp_state":     rec.get("state", "").strip() or None,
-                    "user_context":   rec.get("user-context", "").strip() or None,  # spesso JSON serializzato
-                    "pool_id":        to_int(rec.get("pool-id", "")),
+                    "expire": rec.get("expire", "").strip() or None,
+                    "subnet_id": to_int(rec.get("subnet-id", "")),
+                    "fqdn_fwd": to_bool(rec.get("fqdn-fwd", "")),
+                    "fqdn_rev": to_bool(rec.get("fqdn-rev", "")),
+                    "user_context": rec.get("user-context", "").strip() or None,
+                    "pool_id": to_int(rec.get("pool-id", "")),
                 }
-                leases.append(item)
-                index += 1
-        else:
-            for raw in reader:
-                rec = { _norm(k): (v if v is not None else "") for k, v in raw.items() }
-
+            else:
                 item = {
-                    "id":             f"d-{index}",
-                    "ipv4":           rec.get("address", "").strip() or None,
-                    "mac":            rec.get("hwaddr", "").strip().lower() or None,
-                    "name":           rec.get("hostname", "").strip() or None,
-                    "dhcp_state":     rec.get("state", "").strip() or None,
+                    "id": f"d-{index}",  # Frontend requires this format
+                    **base,
                 }
-                leases.append(item)
-                index += 1
+
+            leases.append(item)
+            index += 1
 
     return leases
 
@@ -130,7 +127,7 @@ def delete_lease(lease_id: int):
     if not path.exists():
         raise FileNotFoundError(f"File not found: {path}")
 
-    with path.open("r") as f:
+    with path.open("r", encoding="utf-8", newline="") as f:
         lines = f.readlines()
 
     # file empty or only header
@@ -146,8 +143,8 @@ def delete_lease(lease_id: int):
         raise ValueError(f"Lease index out of range: {lease_id}")
 
     # delete the line
-    deleted_line = data_lines.pop(index)
+    data_lines.pop(index)
 
     # Rewrite the file without the deleted line
-    with path.open("w") as f:
+    with path.open("w", encoding="utf-8", newline="") as f:
         f.writelines([header] + data_lines)
index 56879df9877574946671b96370f18781a5c1b573..ee86b3b36841baa3ace047efefa49cac388e438d 100644 (file)
@@ -3,8 +3,6 @@
 # Import standard modules
 import bcrypt
 import json
-import logging
-import os
 
 # Import local modules
 from backend.db.db import get_db, register_init
@@ -32,7 +30,8 @@ def get_user_by_username(username):
     conn = get_db()
     cur = conn.cursor()
     cur.execute("SELECT * FROM users WHERE username = ?", (username,))
-    return cur.fetchone()
+    row = cur.fetchone()
+    return dict(row) if row else None
 
 # -----------------------------
 # Create User
@@ -41,20 +40,25 @@ def create_user(username, password_hash, email=None, is_admin=0, modules=None):
     conn = get_db()
     cur = conn.cursor()
 
-    cur.execute("""
-        INSERT INTO users (
-            username, password_hash, email, is_admin, modules, status,
-            created_at, updated_at, password_changed_at
-        ) VALUES (?, ?, ?, ?, ?, 'active', strftime('%s','now'), strftime('%s','now'), strftime('%s','now'));
-    """, (
-        username,
-        password_hash,
-        email,
-        is_admin,
-        json.dumps(modules or [])
-    ))
-
-    conn.commit()
+    try:
+        cur.execute("""
+            INSERT INTO users (
+                username, password_hash, email, is_admin, modules, status,
+                created_at, updated_at, password_changed_at
+            ) VALUES (?, ?, ?, ?, ?, 'active', strftime('%s','now'), strftime('%s','now'), strftime('%s','now'));
+        """, (
+            username,
+            password_hash,
+            email,
+            is_admin,
+            json.dumps(modules or [])
+        ))
+        conn.commit()
+        return cur.lastrowid
+    except Exception as err:
+        conn.rollback()
+        logger.error(f"USERS DB: Error creating user - {err}")
+        raise
 
 # -----------------------------
 # Create Users Table
@@ -103,6 +107,6 @@ def init_db_users_defaults(cur):
         password_hash,
         "admin@example.com",
         1,
-        '["dns","dhcp"]',
+        json.dumps(["dns", "dhcp"]),
         "active"
     ))