except sqlite3.IntegrityError:
conn.rollback()
- return -1
+ raise ValueError("Alias already exists or unique constraint failed")
except Exception as err:
conn.rollback()
cur = conn.execute(
"""
UPDATE aliases
- SET name=?, target=?, description=?, ssl_enabled=?, visibility=?, last_updated=CURRENT_TIMESTAMP
+ SET name=?, target=?, description=?, ssl_enabled=?, visibility=?,
+ last_updated=strftime('%Y-%m-%dT%H:%M:%SZ','now')
WHERE id=?
""",
(
# Import local modules
from backend.db.db import get_db, register_init
+from backend.utils import to_bool
# Import Settings
from backend.settings.settings import settings
# Logger initialization
logger = get_logger(__name__)
+# ---------------------------------------------------------
+# Internal: wrapper to to_bool
+# ---------------------------------------------------------
+def _to_bool(v):
+ result = to_bool(v)
+ return result if result is not None else False
+
# ---------------------------------------------------------
# Type mapping for config keys
# ---------------------------------------------------------
CONFIG_TYPES = {
"LOG_LEVEL": str,
- "LOG_TO_FILE": lambda v: v.lower() in ("1", "true", "yes"),
+ "LOG_TO_FILE": _to_bool,
"EXTERNAL_NAME": str,
"LOGIN_MAX_ATTEMPTS": int,
"LOGIN_WINDOW_SECONDS": int,
# Check MAC
mac = data.get("mac")
- if mac and not MAC_RE.match(mac):
- raise ValueError(f"Invalid MAC address: {mac}")
+ if mac:
+ mac = mac.strip()
+ if not MAC_RE.match(mac):
+ raise ValueError(f"Invalid MAC address: {mac}")
# Check description
description = data.get("description")
"name": normalize(name),
"ipv4": normalize(ipv4),
"ipv6": normalize(ipv6),
- "mac": normalize(mac),
+ "mac": normalize(mac).lower() if mac else None,
"description": normalize(description),
"ssl_enabled": ssl_enabled,
"visibility": visibility,
# -----------------------------
def get_hosts(filter_devices: bool = False) -> List[Dict[str, Any]]:
conn = get_db()
- if (filter_devices != True):
- cur = conn.execute("SELECT * FROM hosts")
- else:
- cur = conn.execute("SELECT id, ipv4, mac, name, description FROM hosts WHERE ipv4 IS NOT NULL")
+ query = (
+ "SELECT * FROM hosts"
+ if not filter_devices
+ else "SELECT id, ipv4, mac, name, description FROM hosts WHERE ipv4 IS NOT NULL"
+ )
+ cur = conn.execute(query)
rows = []
for r in cur.fetchall():
item = dict(r)
- if (filter_devices == True):
- item["id"] = f"s-{item['id']}"
+ if filter_devices:
+ item["id"] = f"s-{item['id']}" # Frontend requires this format
rows.append(item)
rows.sort(key=ipv4_sort_key)
return rows
except sqlite3.IntegrityError:
conn.rollback()
- return -1
+ raise ValueError("Host already exists or unique constraint failed")
except Exception as err:
conn.rollback()
cur = conn.execute(
"""
UPDATE hosts
- SET name=?, ipv4=?, ipv6=?, mac=?, description=?, ssl_enabled=?, visibility=?, last_updated=CURRENT_TIMESTAMP
+ SET name=?, ipv4=?, ipv6=?, mac=?, description=?, ssl_enabled=?, visibility=?,
+ last_updated=strftime('%Y-%m-%dT%H:%M:%SZ','now')
WHERE id=?
""",
(
# Import Logging
from backend.log.log import get_logger
+ALIASES_MAP = {
+ "client_id": "client-id",
+ "valid_lifetime": "valid-lft",
+ "subnet_id": "subnet-id",
+ "fqdn_fwd": "fqdn-fwd",
+ "fqdn_rev": "fqdn-rev",
+ "user_context": "user-context",
+ "pool_id": "pool-id",
+}
+
# -----------------------------
# Normalizes column names to expected keys
# -----------------------------
def _norm(col: str) -> str:
- col = (col or "").strip()
- aliases = {
- "client_id": "client-id",
- "valid_lifetime": "valid-lft",
- "subnet_id": "subnet-id",
- "fqdn_fwd": "fqdn-fwd",
- "fqdn_rev": "fqdn-rev",
- "user_context": "user-context",
- "pool_id": "pool-id",
- }
- return aliases.get(col, col)
+ return ALIASES_MAP.get((col or "").strip(), col)
# Logger initialization
logger = get_logger(__name__)
if not reader.fieldnames:
return []
- if(filter_devices != True):
- for raw in reader:
- rec = { _norm(k): (v if v is not None else "") for k, v in raw.items() }
+ for raw in reader:
+ rec = {_norm(k): (v if v is not None else "") for k, v in raw.items()}
+
+ base = {
+ "ipv4": rec.get("address", "").strip() or None,
+ "mac": rec.get("hwaddr", "").strip().lower() or None,
+ "name": rec.get("hostname", "").strip() or None,
+ "dhcp_state": rec.get("state", "").strip() or None,
+ }
+ if not filter_devices:
item = {
- "id": index,
- "ipv4": rec.get("address", "").strip() or None,
- "mac": rec.get("hwaddr", "").strip().lower() or None,
- "client_id": rec.get("client-id", "").strip() or None,
+ "id": index,
+ **base,
+ "client_id": rec.get("client-id", "").strip() or None,
"valid_lifetime": to_int(rec.get("valid-lft", "")),
- "expire": rec.get("expire", "").strip() or None,
- "subnet_id": to_int(rec.get("subnet-id", "")),
- "fqdn_fwd": to_bool(rec.get("fqdn-fwd", "")),
- "fqdn_rev": to_bool(rec.get("fqdn-rev", "")),
- "name": rec.get("hostname", "").strip() or None,
- "dhcp_state": rec.get("state", "").strip() or None,
- "user_context": rec.get("user-context", "").strip() or None, # spesso JSON serializzato
- "pool_id": to_int(rec.get("pool-id", "")),
+ "expire": rec.get("expire", "").strip() or None,
+ "subnet_id": to_int(rec.get("subnet-id", "")),
+ "fqdn_fwd": to_bool(rec.get("fqdn-fwd", "")),
+ "fqdn_rev": to_bool(rec.get("fqdn-rev", "")),
+ "user_context": rec.get("user-context", "").strip() or None,
+ "pool_id": to_int(rec.get("pool-id", "")),
}
- leases.append(item)
- index += 1
- else:
- for raw in reader:
- rec = { _norm(k): (v if v is not None else "") for k, v in raw.items() }
-
+ else:
item = {
- "id": f"d-{index}",
- "ipv4": rec.get("address", "").strip() or None,
- "mac": rec.get("hwaddr", "").strip().lower() or None,
- "name": rec.get("hostname", "").strip() or None,
- "dhcp_state": rec.get("state", "").strip() or None,
+ "id": f"d-{index}", # Frontend requires this format
+ **base,
}
- leases.append(item)
- index += 1
+
+ leases.append(item)
+ index += 1
return leases
if not path.exists():
raise FileNotFoundError(f"File not found: {path}")
- with path.open("r") as f:
+ with path.open("r", encoding="utf-8", newline="") as f:
lines = f.readlines()
# file empty or only header
raise ValueError(f"Lease index out of range: {lease_id}")
# delete the line
- deleted_line = data_lines.pop(index)
+ data_lines.pop(index)
# Rewrite the file without the deleted line
- with path.open("w") as f:
+ with path.open("w", encoding="utf-8", newline="") as f:
f.writelines([header] + data_lines)
# Import standard modules
import bcrypt
import json
-import logging
-import os
# Import local modules
from backend.db.db import get_db, register_init
conn = get_db()
cur = conn.cursor()
cur.execute("SELECT * FROM users WHERE username = ?", (username,))
- return cur.fetchone()
+ row = cur.fetchone()
+ return dict(row) if row else None
# -----------------------------
# Create User
conn = get_db()
cur = conn.cursor()
- cur.execute("""
- INSERT INTO users (
- username, password_hash, email, is_admin, modules, status,
- created_at, updated_at, password_changed_at
- ) VALUES (?, ?, ?, ?, ?, 'active', strftime('%s','now'), strftime('%s','now'), strftime('%s','now'));
- """, (
- username,
- password_hash,
- email,
- is_admin,
- json.dumps(modules or [])
- ))
-
- conn.commit()
+ try:
+ cur.execute("""
+ INSERT INTO users (
+ username, password_hash, email, is_admin, modules, status,
+ created_at, updated_at, password_changed_at
+ ) VALUES (?, ?, ?, ?, ?, 'active', strftime('%s','now'), strftime('%s','now'), strftime('%s','now'));
+ """, (
+ username,
+ password_hash,
+ email,
+ is_admin,
+ json.dumps(modules or [])
+ ))
+ conn.commit()
+ return cur.lastrowid
+ except Exception as err:
+ conn.rollback()
+ logger.error(f"USERS DB: Error creating user - {err}")
+ raise
# -----------------------------
# Create Users Table
password_hash,
"admin@example.com",
1,
- '["dns","dhcp"]',
+ json.dumps(["dns", "dhcp"]),
"active"
))