]> git.giorgioravera.it Git - network-manager.git/commitdiff
added backup zip creation
authorGiorgio Ravera <giorgio.ravera@gmail.com>
Wed, 27 May 2026 21:44:15 +0000 (23:44 +0200)
committerGiorgio Ravera <giorgio.ravera@gmail.com>
Wed, 27 May 2026 21:44:15 +0000 (23:44 +0200)
backend/backup.py
backend/settings/config.py

index b6f7bf4a2a4f987b1f801183e10fe3c512c07438..64ab6b27423353542d9cad2d67100cf248d0f1fb 100644 (file)
@@ -7,6 +7,7 @@ import json
 import os
 import time
 from typing import List, Dict, Any, Optional
+import zipfile
 
 # Import local modules
 from backend.db.hosts import get_hosts, add_host, reset_hosts_db
@@ -23,6 +24,16 @@ logger = get_logger(__name__)
 # Timestamp used for backup file naming
 TIMESTAMP = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
 
+# Backup files to include in the archive (must match metadata structure)
+backup_files = [
+    config.BACKUP_METADATA_FILE,
+    config.BACKUP_HOSTS_FILE,
+    config.BACKUP_ALIASES_FILE,
+]
+
+ # Set to True to remove individual backup files after creating the archive (optional, can be set to False for debugging)
+remove_backup_files = True
+
 # ---------------------------------------------------------
 # Internal: Calculate file checksum
 # ---------------------------------------------------------
@@ -33,6 +44,73 @@ def file_checksum(path: str) -> str:
             h.update(chunk)
     return h.hexdigest()
 
+# ---------------------------------------------------------
+# Create Backup Archive (ZIP)
+# ---------------------------------------------------------
+def create_backup_archive(timestamp: Optional[str] = None) -> Dict[str, Any]:
+
+    # Initialization
+    start_ns = time.monotonic_ns()
+    count = 0
+    errors: List[str] = []
+    ts = timestamp or TIMESTAMP
+
+    try:
+        # File paths
+        backup_dir = settings.BACKUP_PATH
+        zip_name = f"backup_{ts}.zip"
+        zip_path = os.path.join(backup_dir, zip_name)
+
+        # Create ZIP
+        with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z:
+
+            for fname in backup_files:
+                fpath = os.path.join(backup_dir, fname)
+
+                if not os.path.isfile(fpath):
+                    raise FileNotFoundError(f"Missing file for archive: {fname}")
+
+                # arcname evita path assoluti dentro lo zip
+                z.write(fpath, arcname=fname)
+
+                # increment count of included files
+                count += 1
+
+        # Calcolo SHA256 dello zip
+        archive_sha256 = file_checksum(zip_path)
+
+        if remove_backup_files == True:
+            # Remove files after archiving (optional, can be commented out if you want to keep them)
+            for fname in backup_files:
+                fpath = os.path.join(backup_dir, fname)
+                if os.path.isfile(fpath):
+                    os.remove(fpath)
+
+    except Exception as e:
+        logger.exception("create_backup_archive failed: %s", str(e).strip())
+        errors.append(str(e))
+
+    took_ms = (time.monotonic_ns() - start_ns) / 1_000_000
+
+    if errors:
+        result: Dict[str, Any] = {
+            "status": "failure",
+            "file": zip_path,
+            "errors": errors,
+            "took_ms": took_ms,
+        }
+    else:
+        result: Dict[str, Any] = {
+            "status": "success",
+            "file": zip_path,
+            "count": count,
+            "sha256": archive_sha256,
+            "took_ms": took_ms,
+        }
+
+    return result
+
+
 # ---------------------------------------------------------
 # Save Hosts DB
 # ---------------------------------------------------------
@@ -44,6 +122,7 @@ def store_hosts(timestamp: Optional[str] = None) -> Dict[str, Any]:
     count_stored = 0
     count_loaded = 0
     errors: List[str] = []
+    ts = timestamp or TIMESTAMP
 
     try:
         # Get Hosts List
@@ -52,7 +131,7 @@ def store_hosts(timestamp: Optional[str] = None) -> Dict[str, Any]:
 
         with open(path, "w", encoding="utf-8") as f:
             data = {
-                "generated_at": timestamp or TIMESTAMP,
+                "generated_at": ts,
                 "count": count_loaded,
                 "hosts": hosts,
             }
@@ -144,6 +223,7 @@ def store_aliases(timestamp: Optional[str] = None) -> Dict[str, Any]:
     count_stored = 0
     count_loaded = 0
     errors: List[str] = []
+    ts = timestamp or TIMESTAMP
 
     try:
         # Get Aliases List
@@ -154,7 +234,7 @@ def store_aliases(timestamp: Optional[str] = None) -> Dict[str, Any]:
         path = os.path.join(settings.BACKUP_PATH, config.BACKUP_ALIASES_FILE)
         with open(path, "w", encoding="utf-8") as f:
             data = {
-                "generated_at": timestamp or TIMESTAMP,
+                "generated_at": ts,
                 "count": count_loaded,
                 "aliases": aliases,
             }
@@ -244,11 +324,12 @@ def store_metadata(timestamp: Optional[str] = None) -> Dict[str, Any]:
     start_ns = time.monotonic_ns()
     path = os.path.join(settings.BACKUP_PATH, config.BACKUP_METADATA_FILE)
     errors: List[str] = []
+    ts = timestamp or TIMESTAMP
 
     try:
         with open(path, "w", encoding="utf-8") as f:
             data = {
-                "generated_at": timestamp or TIMESTAMP,
+                "generated_at": ts,
                 "backup_version": config.BACKUP_VERSION,
                 "db_structure_version": config.BACKUP_DB_STRUCTURE_VERSION,
                 "file_count": 2,
@@ -370,33 +451,35 @@ def backup() -> Dict[str, Any]:
     # Timestamp used for backup file naming
     timestamp = TIMESTAMP
 
+    # Hosts
     hosts_result = store_hosts(timestamp)
+    # Aliases
     aliases_result = store_aliases(timestamp)
+    # Metadata (must be last to ensure it reflects the actual state of files)
     metadata_result = store_metadata(timestamp)
+    # Create ZIP archive only if all individual file operations succeeded
+    archive_result = create_backup_archive(timestamp)
+    # Process errors from individual operations
     errors = ((metadata_result.get("errors") or [])
             + (hosts_result.get("errors") or [])
             + (aliases_result.get("errors") or [])
+            + (archive_result.get("errors") or [])
     )
 
-    # Collect errors and results
-    result = {
-        "metadata": metadata_result,
-        "hosts": hosts_result,
-        "aliases": aliases_result,
-    }
-
     # Compute summary
-    operations = [metadata_result, hosts_result, aliases_result]
+    operations = [metadata_result, hosts_result, aliases_result, archive_result]
     summary = {
         "total": len(operations),
         "success": sum(1 for op in operations if op.get("status") == "success"),
         "failed": sum(1 for op in operations if op.get("status") == "failure"),
     }
 
+    # Collect errors and results
     result = {
         "metadata": metadata_result,
         "hosts": hosts_result,
         "aliases": aliases_result,
+        "archive": archive_result,
         "summary": summary,
     }
 
index 6fe32ceb0fd68906f3346b768e147f0e37f5e713..49897ffc6a36a0e9428f087889b79cbb3b4902ed 100644 (file)
@@ -13,4 +13,4 @@ BACKUP_VERSION = "1.0"
 BACKUP_DB_STRUCTURE_VERSION = "1.0"
 BACKUP_METADATA_FILE = "metadata.json"
 BACKUP_HOSTS_FILE = "hosts.json"
-BACKUP_ALIASES_FILE = "aliases.json"
\ No newline at end of file
+BACKUP_ALIASES_FILE = "aliases.json"