#!/usr/bin/env python3 """ Homelab Backup Script Performs two-stage backup: file copy and SQLite database backup Uses parallel processing for top-level subdirectories """ import os import sys import logging import argparse import shutil import sqlite3 import gzip import tarfile import stat import time import subprocess from datetime import datetime from pathlib import Path from typing import List, Set, Tuple, Dict from concurrent.futures import ThreadPoolExecutor, as_completed, Future import threading import json import urllib.request import urllib.parse from queue import Queue, Empty # Thread-safe logging logging_lock = threading.Lock() def setup_logging(log_level: str = "INFO") -> logging.Logger: """Setup logging configuration""" logger = logging.getLogger("backup") logger.setLevel(getattr(logging, log_level.upper())) # Console handler console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - [%(threadName)s] - %(message)s' ) console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) # File handler log_dir = Path("/scripts/logs") log_dir.mkdir(exist_ok=True) log_file = log_dir / f"backup-{datetime.now().strftime('%Y%m%d')}.log" file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.DEBUG) file_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - [%(threadName)s] - %(message)s' ) file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) return logger class TaskMonitor: """Monitors and reports on running tasks""" def __init__(self, logger: logging.Logger): self.logger = logger self.active_tasks = {} self.task_lock = threading.Lock() self.stop_monitoring = threading.Event() self.monitor_thread = None def start_task(self, task_id: str, description: str): """Register a new task as started""" with self.task_lock: self.active_tasks[task_id] = { 'description': description, 'start_time': datetime.now(), 'thread_name': threading.current_thread().name } self.logger.info(f"🚀 Started: {description} [{task_id}]") def finish_task(self, task_id: str): """Mark a task as completed""" with self.task_lock: if task_id in self.active_tasks: task = self.active_tasks.pop(task_id) duration = (datetime.now() - task['start_time']).total_seconds() self.logger.info(f"✅ Completed: {task['description']} [{task_id}] ({duration:.1f}s)") def start_monitoring(self): """Start the periodic status monitoring""" self.stop_monitoring.clear() self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) self.monitor_thread.start() self.logger.info("📊 Task monitoring started") def stop_monitoring(self): """Stop the periodic status monitoring""" self.stop_monitoring.set() if self.monitor_thread: self.monitor_thread.join() self.logger.info("📊 Task monitoring stopped") def _monitor_loop(self): """Periodic monitoring loop""" while not self.stop_monitoring.wait(5.0): # Check every 5 seconds with self.task_lock: if self.active_tasks: now = datetime.now() active_list = [] for task_id, task in self.active_tasks.items(): duration = (now - task['start_time']).total_seconds() active_list.append(f"{task['description']} ({duration:.0f}s)") self.logger.info(f"🔄 Active tasks ({len(self.active_tasks)}): {', '.join(active_list)}") class BackupManager: """Main backup management class with parallel processing""" def __init__(self, source_path: str, work_dir: str, backup_dir: str, blocklist: List[str], logger: logging.Logger, max_workers: int = 4): self.source_path = Path(source_path) self.work_dir = Path(work_dir) self.backup_dir = Path(backup_dir) self.blocklist = set(blocklist) self.logger = logger self.max_workers = max_workers # Thread-safe progress tracking self.total_files = 0 self.copied_files = 0 self.skipped_files = 0 self.sqlite_files = set() self.progress_lock = threading.Lock() # Task monitoring self.task_monitor = TaskMonitor(logger) # Ensure directories exist self.work_dir.mkdir(parents=True, exist_ok=True) self.backup_dir.mkdir(parents=True, exist_ok=True) # Clean work directory at start self._clean_work_dir() def _clean_work_dir(self): """Clean the work directory before starting""" if self.work_dir.exists(): for item in self.work_dir.iterdir(): if item.is_dir(): shutil.rmtree(item) else: item.unlink() self.logger.info(f"Cleaned work directory: {self.work_dir}") def _is_blocklisted(self, path: Path) -> bool: """Check if a path is in the blocklist""" path_str = str(path) for blocked in self.blocklist: if blocked in path_str or path.name == blocked: return True return False def _update_progress(self, files_copied: int, files_skipped: int, sqlite_found: Set[Path]): """Thread-safe progress update""" with self.progress_lock: self.copied_files += files_copied self.skipped_files += files_skipped self.sqlite_files.update(sqlite_found) total_processed = self.copied_files + self.skipped_files if self.total_files > 0 and total_processed % 100 == 0: # Log every 100 files progress = (total_processed / self.total_files) * 100 self.logger.info(f"Progress: {self.copied_files} copied, {self.skipped_files} skipped, {total_processed}/{self.total_files} total ({progress:.1f}%)") def _count_files_in_subdirectory(self, subdir: Path) -> int: """Count files in a subdirectory for progress tracking""" count = 0 try: for root, dirs, files in os.walk(subdir): root_path = Path(root) if self._is_blocklisted(root_path): dirs.clear() continue dirs[:] = [d for d in dirs if not self._is_blocklisted(root_path / d)] count += len(files) except Exception as e: self.logger.error(f"Error counting files in {subdir}: {e}") return count def _should_skip_file(self, file_path: Path) -> Tuple[bool, str]: """Check if a file should be skipped and return reason""" try: file_stat = file_path.stat() # Skip sockets if stat.S_ISSOCK(file_stat.st_mode): return True, "socket" # Skip named pipes (FIFOs) if stat.S_ISFIFO(file_stat.st_mode): return True, "pipe" # Skip character/block devices if stat.S_ISCHR(file_stat.st_mode) or stat.S_ISBLK(file_stat.st_mode): return True, "device" # Only process regular files and symlinks if not (stat.S_ISREG(file_stat.st_mode) or stat.S_ISLNK(file_stat.st_mode)): return True, "special" except (OSError, PermissionError) as e: return True, f"access_error: {e}" return False, "" def _process_subdirectory(self, subdir: Path) -> Tuple[int, int, Set[Path]]: """Process a single subdirectory (runs in parallel)""" task_id = f"subdir_{subdir.name}" self.task_monitor.start_task(task_id, f"Processing {subdir.name}") copied_files = 0 skipped_files = 0 sqlite_files = set() try: # Create destination directory structure rel_path = subdir.relative_to(self.source_path) dest_dir = self.work_dir / rel_path dest_dir.mkdir(parents=True, exist_ok=True) # Walk through subdirectory for root, dirs, files in os.walk(subdir): root_path = Path(root) # Skip blocklisted directories if self._is_blocklisted(root_path): dirs.clear() continue # Filter out blocklisted subdirectories dirs[:] = [d for d in dirs if not self._is_blocklisted(root_path / d)] # Create relative path structure rel_root = root_path.relative_to(self.source_path) dest_root = self.work_dir / rel_root dest_root.mkdir(parents=True, exist_ok=True) # Process files for file in files: source_file = root_path / file dest_file = dest_root / file # Check if file should be skipped should_skip, skip_reason = self._should_skip_file(source_file) if should_skip: skipped_files += 1 if skip_reason not in ["socket", "pipe"]: # Only log unusual skips self.logger.debug(f"Skipped {source_file}: {skip_reason}") continue try: # Use copy2 to preserve metadata (timestamps, permissions) shutil.copy2(source_file, dest_file) # Also try to preserve extended attributes and ACLs if possible try: # Copy stat info again to ensure everything is preserved shutil.copystat(source_file, dest_file) except (OSError, AttributeError): # Some filesystems don't support all metadata pass copied_files += 1 # Track SQLite databases if file.endswith('.db') or file.endswith('.sqlite') or file.endswith('.sqlite3'): sqlite_files.add(dest_file) except Exception as e: skipped_files += 1 self.logger.error(f"Failed to copy {source_file}: {e}") except Exception as e: self.logger.error(f"Error processing {subdir}: {e}") finally: self.task_monitor.finish_task(task_id) return copied_files, skipped_files, sqlite_files def _copy_files_parallel(self) -> Set[Path]: """Stage 1: Copy files in parallel by subdirectory""" self.task_monitor.start_task("stage1", "File copy stage") self.logger.info("Stage 1: Starting parallel file copy operation") if not self.source_path.exists(): raise FileNotFoundError(f"Source path does not exist: {self.source_path}") # Get top-level subdirectories (and files) subdirs = [] top_level_files = [] for item in self.source_path.iterdir(): if item.is_dir() and not self._is_blocklisted(item): subdirs.append(item) elif item.is_file(): top_level_files.append(item) self.logger.info(f"Found {len(subdirs)} subdirectories and {len(top_level_files)} top-level files") # Count total files for progress tracking self.task_monitor.start_task("counting", "Counting files for progress tracking") self.total_files = len(top_level_files) with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # Submit counting tasks count_futures = {executor.submit(self._count_files_in_subdirectory, subdir): subdir for subdir in subdirs} for future in as_completed(count_futures): try: count = future.result() self.total_files += count except Exception as e: subdir = count_futures[future] self.logger.error(f"Error counting files in {subdir}: {e}") self.task_monitor.finish_task("counting") self.logger.info(f"Total files to process: {self.total_files}") # Process top-level files first if top_level_files: self.task_monitor.start_task("toplevel", f"Processing {len(top_level_files)} top-level files") skipped_toplevel = 0 for file in top_level_files: should_skip, skip_reason = self._should_skip_file(file) if should_skip: skipped_toplevel += 1 if skip_reason not in ["socket", "pipe"]: self.logger.debug(f"Skipped top-level {file}: {skip_reason}") continue try: dest_file = self.work_dir / file.name # Use copy2 to preserve metadata (timestamps, permissions) shutil.copy2(file, dest_file) # Also try to preserve extended attributes and ACLs if possible try: shutil.copystat(file, dest_file) except (OSError, AttributeError): # Some filesystems don't support all metadata pass sqlite_found = set() if file.name.endswith('.db') or file.name.endswith('.sqlite') or file.name.endswith('.sqlite3'): sqlite_found.add(dest_file) self._update_progress(1, 0, sqlite_found) except Exception as e: skipped_toplevel += 1 self.logger.error(f"Failed to copy top-level file {file}: {e}") if skipped_toplevel > 0: self._update_progress(0, skipped_toplevel, set()) self.task_monitor.finish_task("toplevel") # Process subdirectories in parallel if subdirs: with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # Submit all subdirectory processing tasks future_to_subdir = {executor.submit(self._process_subdirectory, subdir): subdir for subdir in subdirs} # Collect results as they complete for future in as_completed(future_to_subdir): subdir = future_to_subdir[future] try: copied_count, skipped_count, sqlite_found = future.result() self._update_progress(copied_count, skipped_count, sqlite_found) except Exception as e: self.logger.error(f"Error processing subdirectory {subdir}: {e}") self.task_monitor.finish_task("stage1") self.logger.info(f"Stage 1 complete: Copied {self.copied_files} files, skipped {self.skipped_files} files, found {len(self.sqlite_files)} SQLite databases") return self.sqlite_files def _backup_sqlite_databases(self, sqlite_files: Set[Path]): """Stage 2: Create live backups of SQLite databases""" if not sqlite_files: return self.task_monitor.start_task("stage2", "SQLite database backup stage") self.logger.info("Stage 2: Starting SQLite database backup") # Process SQLite backups in parallel too def backup_single_db(db_file: Path) -> bool: db_task_id = f"sqlite_{db_file.name}" self.task_monitor.start_task(db_task_id, f"Backing up {db_file.name}") try: # Find original database path rel_path = db_file.relative_to(self.work_dir) original_db = self.source_path / rel_path if not original_db.exists(): self.logger.warning(f"Original database not found: {original_db}") return False self.logger.info(f"Creating live backup of: {original_db}") # Use SQLite's backup API for live backup self._sqlite_backup(str(original_db), str(db_file)) return True except Exception as e: self.logger.error(f"Failed to backup SQLite database {db_file}: {e}") return False finally: self.task_monitor.finish_task(db_task_id) with ThreadPoolExecutor(max_workers=min(len(sqlite_files), self.max_workers)) as executor: backup_futures = {executor.submit(backup_single_db, db_file): db_file for db_file in sqlite_files} successful_backups = 0 for future in as_completed(backup_futures): if future.result(): successful_backups += 1 self.logger.info(f"SQLite backup complete: {successful_backups}/{len(sqlite_files)} databases backed up successfully") self.task_monitor.finish_task("stage2") def _sqlite_backup(self, source_db: str, dest_db: str): """Perform SQLite backup using the backup API""" try: # Connect to source and destination databases source_conn = sqlite3.connect(source_db) dest_conn = sqlite3.connect(dest_db) # Perform backup source_conn.backup(dest_conn) # Close connections source_conn.close() dest_conn.close() self.logger.debug(f"SQLite backup completed: {source_db} -> {dest_db}") except sqlite3.Error as e: self.logger.error(f"SQLite backup failed for {source_db}: {e}") raise def _create_borg_backup(self) -> str: """Create Borg backup with deduplication and compression""" self.task_monitor.start_task("borg", "Creating Borg backup") timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') archive_name = f"backup_{timestamp}" # Get Borg configuration from environment borg_repo = os.getenv('BORG_REPO', '/backups/borg-repo') borg_passphrase = os.getenv('BORG_PASSPHRASE', '') compression = os.getenv('BORG_COMPRESSION', 'zstd,3') self.logger.info(f"Creating Borg backup: {archive_name} to repo {borg_repo}") try: # Set environment for Borg borg_env = os.environ.copy() borg_env['BORG_REPO'] = borg_repo borg_env['BORG_PASSPHRASE'] = borg_passphrase borg_env['BORG_RELOCATED_REPO_ACCESS_IS_OK'] = 'yes' borg_env['BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK'] = 'yes' # SSH configuration to accept unknown hosts borg_env['BORG_RSH'] = 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' # Initialize repository if it doesn't exist self._ensure_borg_repo_exists(borg_env) # Create the backup borg_create_cmd = [ 'borg', 'create', '--verbose', '--stats', '--show-rc', '--compression', compression, '--exclude-caches', f'{borg_repo}::{archive_name}', str(self.work_dir) ] self.logger.info(f"Running: {' '.join(borg_create_cmd[:-1])} ") result = subprocess.run( borg_create_cmd, env=borg_env, capture_output=True, text=True, timeout=3600 # 1 hour timeout ) if result.returncode != 0: self.logger.error(f"Borg create failed with return code {result.returncode}") self.logger.error(f"STDERR: {result.stderr}") raise subprocess.CalledProcessError(result.returncode, borg_create_cmd) # Log Borg statistics if result.stdout: self.logger.info("Borg backup statistics:") for line in result.stdout.split('\n'): if line.strip() and ('Archive name:' in line or 'Time' in line or 'Original size:' in line or 'Compressed size:' in line or 'Deduplicated size:' in line): self.logger.info(f" {line.strip()}") # Get repository info for size reporting try: info_result = subprocess.run( ['borg', 'info', '--json', borg_repo], env=borg_env, capture_output=True, text=True, timeout=60 ) if info_result.returncode == 0: import json repo_info = json.loads(info_result.stdout) if 'cache' in repo_info: total_size = repo_info['cache'].get('stats', {}).get('total_size', 0) size_mb = total_size / (1024 * 1024) self.logger.info(f"Repository total size: {size_mb:.1f} MB") except Exception as e: self.logger.debug(f"Could not get repository size: {e}") self.logger.info(f"Borg backup created successfully: {archive_name}") return f"{borg_repo}::{archive_name}" except subprocess.TimeoutExpired: self.logger.error("Borg backup timed out after 1 hour") raise except Exception as e: self.logger.error(f"Failed to create Borg backup: {e}") raise finally: self.task_monitor.finish_task("borg") def _ensure_borg_repo_exists(self, borg_env: dict): """Ensure Borg repository exists, create if necessary""" borg_repo = borg_env['BORG_REPO'] # Check if repository exists result = subprocess.run( ['borg', 'info', borg_repo], env=borg_env, capture_output=True, text=True, timeout=30 ) if result.returncode == 0: self.logger.debug(f"Borg repository exists: {borg_repo}") return # Repository doesn't exist, create it self.logger.info(f"Initializing new Borg repository: {borg_repo}") # Determine repository type based on passphrase if borg_env.get('BORG_PASSPHRASE'): encryption_mode = 'repokey' else: encryption_mode = 'none' self.logger.warning("No passphrase set - creating unencrypted repository") init_cmd = [ 'borg', 'init', '--encryption', encryption_mode, borg_repo ] init_result = subprocess.run( init_cmd, env=borg_env, capture_output=True, text=True, timeout=60 ) if init_result.returncode != 0: self.logger.error(f"Failed to initialize Borg repository: {init_result.stderr}") raise subprocess.CalledProcessError(init_result.returncode, init_cmd) self.logger.info(f"Borg repository initialized successfully with {encryption_mode} encryption") def _send_notification(self, success: bool, message: str): """Send notification via ntfy if configured""" ntfy_topic = os.getenv('NTFY_TOPIC') if not ntfy_topic: return try: # Use ASCII-safe status indicators status = "SUCCESS" if success else "FAILED" title = f"Homelab Backup {status}" url = f'https://ntfy.sh/{ntfy_topic}' data = message.encode('utf-8') req = urllib.request.Request(url, data=data, method='POST') req.add_header('Title', title.encode('ascii', 'ignore').decode('ascii')) req.add_header('Content-Type', 'text/plain; charset=utf-8') with urllib.request.urlopen(req, timeout=10) as response: if response.status == 200: self.logger.debug("Notification sent successfully") else: self.logger.warning(f"Notification returned status {response.status}") except Exception as e: self.logger.error(f"Failed to send notification: {e}") def run_backup(self) -> bool: """Run the complete backup process""" start_time = datetime.now() self.logger.info("=== Starting parallel backup process ===") # Start task monitoring self.task_monitor.start_monitoring() try: # Stage 1: Copy files in parallel sqlite_files = self._copy_files_parallel() # Stage 2: Backup SQLite databases in parallel self._backup_sqlite_databases(sqlite_files) # Create Borg backup archive_path = self._create_borg_backup() # Calculate duration duration = datetime.now() - start_time message = f"Parallel backup completed successfully in {duration.total_seconds():.1f} seconds. Files: {self.copied_files} copied, {self.skipped_files} skipped, SQLite DBs: {len(self.sqlite_files)}. Archive: {Path(archive_path).name}" self.logger.info(message) self._send_notification(True, message) return True except Exception as e: error_msg = f"Backup failed: {str(e)}" self.logger.error(error_msg) self._send_notification(False, error_msg) return False finally: # Stop task monitoring self.task_monitor.stop_monitoring() def main(): """Main function""" parser = argparse.ArgumentParser(description='Parallel Homelab Backup Script') parser.add_argument('source_path', help='Source directory to backup') parser.add_argument('--work-dir', default='/work', help='Working directory for staging') parser.add_argument('--backup-dir', default='/backups', help='Final backup destination') parser.add_argument('--max-workers', type=int, default=4, help='Maximum number of parallel workers') parser.add_argument('--blocklist', nargs='*', default=[ 'node_modules', '.git', '__pycache__', '.cache', 'tmp', 'temp', 'logs', '.logs', 'cache', '.npm', '.yarn', 'dist', 'build' ], help='Directories to exclude from backup') parser.add_argument('--log-level', default='INFO', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR']) args = parser.parse_args() # Handle blocklist - it might come as a single space-separated string from Ansible template blocklist = args.blocklist if blocklist and len(blocklist) == 1 and ' ' in blocklist[0]: # Split the single string into multiple items blocklist = blocklist[0].split() elif not blocklist: # Use default blocklist if none provided blocklist = [ 'node_modules', '.git', '__pycache__', '.cache', 'tmp', 'temp', 'logs', '.logs', 'cache', '.npm', '.yarn', 'dist', 'build' ] # Setup logging logger = setup_logging(args.log_level) logger.info(f"Using blocklist: {blocklist}") # Create backup manager and run backup_manager = BackupManager( source_path=args.source_path, work_dir=args.work_dir, backup_dir=args.backup_dir, blocklist=blocklist, logger=logger, max_workers=args.max_workers ) success = backup_manager.run_backup() sys.exit(0 if success else 1) if __name__ == '__main__': main()