# Import standard modules
import ipaddress
-import logging
-import os
import re
import sqlite3
+from typing import Any, Dict, List, Optional
# Import local modules
from backend.db.db import get_db, register_init
# Import Logging
-from backend.log.log import setup_logging, get_logger
+from backend.log.log import get_logger
# Logger initialization
logger = get_logger(__name__)
# -----------------------------
# Check Data Input
# -----------------------------
-def validate_data(data: dict) -> dict:
+def validate_data(data: Dict[str, Any]) -> Dict[str, Any]:
# Check name
if "name" not in data:
raise ValueError("Missing required field: name")
# -----------------------------
# SELECT ALL ALIASES
# -----------------------------
-def get_aliases():
+def get_aliases() -> List[Dict[str, Any]]:
conn = get_db()
cur = conn.execute("SELECT * FROM aliases ORDER BY target")
rows = [dict(r) for r in cur.fetchall()]
# -----------------------------
# SELECT SINGLE ALIAS
# -----------------------------
-def get_alias(alias_id: int):
+def get_alias(alias_id: int) -> Optional[Dict[str, Any]]:
conn = get_db()
cur = conn.execute("SELECT * FROM aliases WHERE id = ?", (alias_id,))
row = cur.fetchone()
# -----------------------------
# ADD ALIAS
# -----------------------------
-def add_alias(data: dict):
+def add_alias(data: Dict[str, Any]) -> int:
# Validate input
cleaned = validate_data(data)
conn = get_db()
try:
cur = conn.execute(
- "INSERT INTO aliases (name, target, note, ssl_enabled, visibility) VALUES (?, ?, ?, ?, ?)",
+ """
+ INSERT INTO aliases (name, target, note, ssl_enabled, visibility)
+ VALUES (?, ?, ?, ?, ?)
+ """,
(
cleaned["name"],
cleaned["target"],
cleaned["note"],
cleaned["ssl_enabled"],
cleaned["visibility"],
- )
+ ),
)
conn.commit()
return cur.lastrowid
- except sqlite3.IntegrityError as e:
+ except sqlite3.IntegrityError:
conn.rollback()
return -1
- except Exception as e:
+ except Exception as err:
conn.rollback()
+ logger.error(f"ALIASES DB: Error adding alias - {err}")
raise
# -----------------------------
# UPDATE ALIAS
# -----------------------------
-def update_alias(alias_id: int, data: dict) -> bool:
+def update_alias(alias_id: int, data: Dict[str, Any]) -> bool:
# Validate input
cleaned = validate_data(data)
cleaned["ssl_enabled"],
cleaned["visibility"],
alias_id,
- )
+ ),
)
conn.commit()
return cur.rowcount > 0
- except Exception:
+ except Exception as err:
conn.rollback()
+ logger.error(f"ALIASES DB: Error updating alias - {err}")
raise
# -----------------------------
conn = get_db()
try:
- cur = conn.execute(
- "DELETE FROM aliases WHERE id = ?",
- (alias_id,)
- )
+ cur = conn.execute("DELETE FROM aliases WHERE id = ?", (alias_id,))
conn.commit()
-
return cur.rowcount > 0
- except Exception:
+ except Exception as err:
conn.rollback()
+ logger.error(f"ALIASES DB: Error deleting alias - {err}")
raise
# -----------------------------
# Initialize Aliases DB Table
# -----------------------------
@register_init
-def init_db_alias_table(cur):
+def init_db_alias_table(cur: sqlite3.Cursor) -> None:
# ALIASES TABLE
- cur.execute("""
+ cur.execute(
+ """
CREATE TABLE aliases (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
visibility INTEGER NOT NULL DEFAULT 0,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
- """)
+ """
+ )
cur.execute("CREATE INDEX idx_aliases_name ON aliases(name);")
logger.info("ALIASES DB: Database initialized successfully")
+
+# -----------------------------
+# Reset Aliases DB Table
+# -----------------------------
+def reset_aliases_db() -> None:
+ conn = get_db()
+ try:
+ conn.execute("DELETE FROM aliases;")
+ conn.execute("DELETE FROM sqlite_sequence WHERE name='aliases';")
+ conn.commit()
+
+ except Exception as err:
+ conn.rollback()
+ logger.error(f"ALIASES DB: Error resetting tables - {err}")
+ raise
# Import standard modules
import ipaddress
-import logging
-import os
import re
import sqlite3
+from typing import Any, Dict, List, Optional
# Import local modules
from backend.db.db import get_db, register_init
# Import Logging
-from backend.log.log import setup_logging, get_logger
+from backend.log.log import get_logger
# Logger initialization
logger = get_logger(__name__)
# -----------------------------
# Check Data Input
# -----------------------------
-def validate_data(data: dict) -> dict:
+def validate_data(data: Dict[str, Any]) -> Dict[str, Any]:
# Check name
if "name" not in data:
raise ValueError("Missing required field: name")
except ValueError:
raise ValueError(f"Invalid IPv6 address: {ipv6}")
+ # Check MAC
mac = data.get("mac")
if mac and not MAC_RE.match(mac):
raise ValueError(f"Invalid MAC address: {mac}")
# -----------------------------
# Sorting hosts
# -----------------------------
-def ipv4_sort_key(h: dict):
- v = (h.get('ipv4') or '').strip()
+def ipv4_sort_key(h: Dict[str, Any]):
+ v = (h.get("ipv4") or "").strip()
if not v:
# no ip at the end
return (1, 0)
try:
return (0, int(ipaddress.IPv4Address(v)))
except ValueError:
- return (0, float('inf'))
+ return (0, float("inf"))
# -----------------------------
# SELECT ALL HOSTS
# -----------------------------
-def get_hosts():
+def get_hosts() -> List[Dict[str, Any]]:
conn = get_db()
cur = conn.execute("SELECT * FROM hosts")
rows = [dict(r) for r in cur.fetchall()]
# -----------------------------
# SELECT SINGLE HOST
# -----------------------------
-def get_host(host_id: int):
+def get_host(host_id: int) -> Optional[Dict[str, Any]]:
conn = get_db()
cur = conn.execute("SELECT * FROM hosts WHERE id = ?", (host_id,))
row = cur.fetchone()
# -----------------------------
# ADD HOST
# -----------------------------
-def add_host(data: dict):
+def add_host(data: Dict[str, Any]) -> int:
# Validate input
cleaned = validate_data(data)
conn = get_db()
try:
cur = conn.execute(
- "INSERT INTO hosts (name, ipv4, ipv6, mac, note, ssl_enabled, visibility) VALUES (?, ?, ?, ?, ?, ?, ?)",
+ """
+ INSERT INTO hosts (name, ipv4, ipv6, mac, note, ssl_enabled, visibility)
+ VALUES (?, ?, ?, ?, ?, ?, ?)
+ """,
(
cleaned["name"],
cleaned["ipv4"],
cleaned["note"],
cleaned["ssl_enabled"],
cleaned["visibility"],
- )
+ ),
)
conn.commit()
return cur.lastrowid
- except sqlite3.IntegrityError as e:
+ except sqlite3.IntegrityError:
conn.rollback()
return -1
- except Exception as e:
+ except Exception as err:
conn.rollback()
+ logger.error(f"HOSTS DB: Error adding host - {err}")
raise
# -----------------------------
# UPDATE HOST
# -----------------------------
-def update_host(host_id: int, data: dict) -> bool:
+def update_host(host_id: int, data: Dict[str, Any]) -> bool:
# Validate input
cleaned = validate_data(data)
cleaned["ssl_enabled"],
cleaned["visibility"],
host_id,
- )
+ ),
)
conn.commit()
return cur.rowcount > 0
- except Exception:
+ except Exception as err:
conn.rollback()
+ logger.error(f"HOSTS DB: Error updating host - {err}")
raise
# -----------------------------
conn = get_db()
try:
- cur = conn.execute(
- "DELETE FROM hosts WHERE id = ?",
- (host_id,)
- )
+ cur = conn.execute("DELETE FROM hosts WHERE id = ?", (host_id,))
conn.commit()
-
return cur.rowcount > 0
- except Exception:
+ except Exception as err:
conn.rollback()
+ logger.error(f"HOSTS DB: Error deleting host - {err}")
raise
# -----------------------------
# Initialize Hosts DB Table
# -----------------------------
@register_init
-def init_db_hosts_table(cur):
+def init_db_hosts_table(cur: sqlite3.Cursor) -> None:
# HOSTS TABLE
- cur.execute("""
+ cur.execute(
+ """
CREATE TABLE hosts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
visibility INTEGER NOT NULL DEFAULT 0,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
- """)
+ """
+ )
cur.execute("CREATE INDEX idx_hosts_name ON hosts(name);")
# TXT TABLE
- cur.execute("""
+ cur.execute(
+ """
CREATE TABLE txt_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
host_id INTEGER,
FOREIGN KEY (host_id) REFERENCES hosts(id)
);
- """)
+ """
+ )
cur.execute("CREATE INDEX idx_txt_host ON txt_records(host_id);")
logger.info("HOSTS DB: Tables initialized successfully")
+
+# -----------------------------
+# Reset Hosts DB Table
+# -----------------------------
+def reset_hosts_db() -> None:
+ conn = get_db()
+ try:
+ conn.execute("DELETE FROM hosts;")
+ conn.execute("DELETE FROM sqlite_sequence WHERE name='hosts';")
+ conn.commit()
+
+ except Exception as err:
+ conn.rollback()
+ logger.error(f"HOSTS DB: Error resetting tables - {err}")
+ raise
import os
import ipaddress
import time
+from typing import Iterable, List, Tuple, Dict, Any
# Import local modules
-from backend.db.hosts import get_hosts
-from backend.db.aliases import get_aliases
+from backend.db.hosts import get_hosts, add_host, reset_hosts_db
+from backend.db.aliases import get_aliases, add_alias, reset_aliases_db
# Import Settings & Logging
from backend.settings.settings import settings
-from backend.log.log import setup_logging, get_logger
+from backend.log.log import get_logger
# Logger initialization
logger = get_logger(__name__)
for a in aliases:
f.write(json.dumps(a, ensure_ascii=False) + "\n")
+# ---------------------------------------------------------
+# Internal: load NDJSON utility
+# ---------------------------------------------------------
+def _load_ndjson(path: str) -> Tuple[List[Dict[str, Any]], List[str]]:
+ records: List[Dict[str, Any]] = []
+ errors: List[str] = []
+
+ if not os.path.exists(path):
+ errors.append(f"File not found: {path}")
+ return records, errors
+
+ with open(path, "r", encoding="utf-8") as f:
+ for lineno, line in enumerate(f, start=1):
+ line = line.strip()
+ if not line:
+ continue
+ try:
+ obj = json.loads(line)
+ if isinstance(obj, dict):
+ records.append(obj)
+ else:
+ errors.append(f"{os.path.basename(path)}:{lineno} -> JSON is not an object")
+ except json.JSONDecodeError as e:
+ errors.append(f"{os.path.basename(path)}:{lineno} -> JSON decode error: {str(e)}")
+
+ return records, errors
+
+# ---------------------------------------------------------
+# Restore Hosts DB
+# ---------------------------------------------------------
+def restore_hosts() -> Dict[str, Any]:
+
+ # Initialization
+ start_ns = time.monotonic_ns()
+ src_path = os.path.join(settings.DATA_PATH, "hosts.json")
+ restored = 0
+
+ # load records from NDJSON file
+ records, load_errors = _load_ndjson(src_path)
+
+ try:
+ for r in records:
+ add_host(r)
+ restored += 1
+
+ except Exception as e:
+ logger.exception("restore_hosts failed applying records: %s", str(e).strip())
+ raise
+
+ took_ms = (time.monotonic_ns() - start_ns) / 1_000_000
+ return {
+ "file": src_path,
+ "count_loaded": len(records),
+ "count_restored": restored,
+ "load_errors": load_errors,
+ "took_ms": took_ms,
+ }
+
+# ---------------------------------------------------------
+# Restore Aliases DB
+# ---------------------------------------------------------
+def restore_aliases() -> Dict[str, Any]:
+
+ # Initialization
+ start_ns = time.monotonic_ns()
+ src_path = os.path.join(settings.DATA_PATH, "aliases.json")
+ restored = 0
+
+ # load records from NDJSON file
+ records, load_errors = _load_ndjson(src_path)
+
+ try:
+ for r in records:
+ add_alias(r)
+ restored += 1
+
+ except Exception as e:
+ logger.exception("restore_aliases failed applying records: %s", str(e).strip())
+ raise
+
+ took_ms = (time.monotonic_ns() - start_ns) / 1_000_000
+ return {
+ "file": src_path,
+ "count_loaded": len(records),
+ "count_restored": restored,
+ "load_errors": load_errors,
+ "took_ms": took_ms,
+ }
+
# ---------------------------------------------------------
# API ENDPOINTS
# ---------------------------------------------------------
})
async def api_backup(request: Request):
- # Inizializzazioni
+ # Initialization
start_ns = time.monotonic_ns()
try:
"took_ms": took_ms,
},
)
+
+# ---------------------------------------------------------
+# API: Restore from backup
+# ---------------------------------------------------------
+@router.get("/api/restore", status_code=status.HTTP_200_OK, responses={
+ 200: {"description": "Restore executed successfully"},
+ 400: {"description": "Invalid backup files"},
+ 500: {"description": "Internal server error"},
+})
+async def api_restore(request: Request):
+ start_ns = time.monotonic_ns()
+
+ try:
+ # 1) Restore hosts
+ reset_hosts_db()
+ hosts_result = restore_hosts()
+
+ # 2) Restore aliases
+ reset_aliases_db()
+ aliases_result = restore_aliases()
+
+ # Se uno dei file ha errori di parsing, segnaliamolo come 400
+ load_errors = (hosts_result.get("load_errors") or []) + (aliases_result.get("load_errors") or [])
+ if load_errors:
+ # Non blocchiamo l'operazione se comunque abbiamo applicato record;
+ # ma comunichiamo che ci sono righe scartate.
+ logger.warning("Restore completed with parsing issues: %d errors", len(load_errors))
+
+ took_ms = (time.monotonic_ns() - start_ns) / 1_000_000
+ return {
+ "code": "RESTORE_OK",
+ "status": "success",
+ "message": "RESTORE executed successfully",
+ "took_ms": took_ms,
+ "results": {
+ "hosts": hosts_result,
+ "aliases": aliases_result,
+ },
+ }
+
+ except HTTPException:
+ raise
+
+ except Exception as err:
+ took_ms = (time.monotonic_ns() - start_ns) / 1_000_000
+ logger.exception("Error executing restore: %s", str(err).strip())
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail={
+ "code": "RESTORE_ERROR",
+ "status": "failure",
+ "message": "Internal error executing restore",
+ "took_ms": took_ms,
+ },
+ )