import os
import time
from typing import List, Dict, Any, Optional
+import zipfile
# Import local modules
from backend.db.hosts import get_hosts, add_host, reset_hosts_db
# Timestamp used for backup file naming
TIMESTAMP = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
+# Backup files to include in the archive (must match metadata structure)
+backup_files = [
+ config.BACKUP_METADATA_FILE,
+ config.BACKUP_HOSTS_FILE,
+ config.BACKUP_ALIASES_FILE,
+]
+
+ # Set to True to remove individual backup files after creating the archive (optional, can be set to False for debugging)
+remove_backup_files = True
+
# ---------------------------------------------------------
# Internal: Calculate file checksum
# ---------------------------------------------------------
h.update(chunk)
return h.hexdigest()
+# ---------------------------------------------------------
+# Create Backup Archive (ZIP)
+# ---------------------------------------------------------
+def create_backup_archive(timestamp: Optional[str] = None) -> Dict[str, Any]:
+
+ # Initialization
+ start_ns = time.monotonic_ns()
+ count = 0
+ errors: List[str] = []
+ ts = timestamp or TIMESTAMP
+
+ try:
+ # File paths
+ backup_dir = settings.BACKUP_PATH
+ zip_name = f"backup_{ts}.zip"
+ zip_path = os.path.join(backup_dir, zip_name)
+
+ # Create ZIP
+ with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z:
+
+ for fname in backup_files:
+ fpath = os.path.join(backup_dir, fname)
+
+ if not os.path.isfile(fpath):
+ raise FileNotFoundError(f"Missing file for archive: {fname}")
+
+ # arcname evita path assoluti dentro lo zip
+ z.write(fpath, arcname=fname)
+
+ # increment count of included files
+ count += 1
+
+ # Calcolo SHA256 dello zip
+ archive_sha256 = file_checksum(zip_path)
+
+ if remove_backup_files == True:
+ # Remove files after archiving (optional, can be commented out if you want to keep them)
+ for fname in backup_files:
+ fpath = os.path.join(backup_dir, fname)
+ if os.path.isfile(fpath):
+ os.remove(fpath)
+
+ except Exception as e:
+ logger.exception("create_backup_archive failed: %s", str(e).strip())
+ errors.append(str(e))
+
+ took_ms = (time.monotonic_ns() - start_ns) / 1_000_000
+
+ if errors:
+ result: Dict[str, Any] = {
+ "status": "failure",
+ "file": zip_path,
+ "errors": errors,
+ "took_ms": took_ms,
+ }
+ else:
+ result: Dict[str, Any] = {
+ "status": "success",
+ "file": zip_path,
+ "count": count,
+ "sha256": archive_sha256,
+ "took_ms": took_ms,
+ }
+
+ return result
+
+
# ---------------------------------------------------------
# Save Hosts DB
# ---------------------------------------------------------
count_stored = 0
count_loaded = 0
errors: List[str] = []
+ ts = timestamp or TIMESTAMP
try:
# Get Hosts List
with open(path, "w", encoding="utf-8") as f:
data = {
- "generated_at": timestamp or TIMESTAMP,
+ "generated_at": ts,
"count": count_loaded,
"hosts": hosts,
}
count_stored = 0
count_loaded = 0
errors: List[str] = []
+ ts = timestamp or TIMESTAMP
try:
# Get Aliases List
path = os.path.join(settings.BACKUP_PATH, config.BACKUP_ALIASES_FILE)
with open(path, "w", encoding="utf-8") as f:
data = {
- "generated_at": timestamp or TIMESTAMP,
+ "generated_at": ts,
"count": count_loaded,
"aliases": aliases,
}
start_ns = time.monotonic_ns()
path = os.path.join(settings.BACKUP_PATH, config.BACKUP_METADATA_FILE)
errors: List[str] = []
+ ts = timestamp or TIMESTAMP
try:
with open(path, "w", encoding="utf-8") as f:
data = {
- "generated_at": timestamp or TIMESTAMP,
+ "generated_at": ts,
"backup_version": config.BACKUP_VERSION,
"db_structure_version": config.BACKUP_DB_STRUCTURE_VERSION,
"file_count": 2,
# Timestamp used for backup file naming
timestamp = TIMESTAMP
+ # Hosts
hosts_result = store_hosts(timestamp)
+ # Aliases
aliases_result = store_aliases(timestamp)
+ # Metadata (must be last to ensure it reflects the actual state of files)
metadata_result = store_metadata(timestamp)
+ # Create ZIP archive only if all individual file operations succeeded
+ archive_result = create_backup_archive(timestamp)
+ # Process errors from individual operations
errors = ((metadata_result.get("errors") or [])
+ (hosts_result.get("errors") or [])
+ (aliases_result.get("errors") or [])
+ + (archive_result.get("errors") or [])
)
- # Collect errors and results
- result = {
- "metadata": metadata_result,
- "hosts": hosts_result,
- "aliases": aliases_result,
- }
-
# Compute summary
- operations = [metadata_result, hosts_result, aliases_result]
+ operations = [metadata_result, hosts_result, aliases_result, archive_result]
summary = {
"total": len(operations),
"success": sum(1 for op in operations if op.get("status") == "success"),
"failed": sum(1 for op in operations if op.get("status") == "failure"),
}
+ # Collect errors and results
result = {
"metadata": metadata_result,
"hosts": hosts_result,
"aliases": aliases_result,
+ "archive": archive_result,
"summary": summary,
}