diff options
Diffstat (limited to 'playbooks/roles/backup/templates/volumes/scripts/backup.py')
| -rw-r--r-- | playbooks/roles/backup/templates/volumes/scripts/backup.py | 708 |
1 files changed, 708 insertions, 0 deletions
diff --git a/playbooks/roles/backup/templates/volumes/scripts/backup.py b/playbooks/roles/backup/templates/volumes/scripts/backup.py new file mode 100644 index 0000000..8b71e0f --- /dev/null +++ b/playbooks/roles/backup/templates/volumes/scripts/backup.py @@ -0,0 +1,708 @@ +#!/usr/bin/env python3 +""" +Homelab Backup Script +Performs two-stage backup: file copy and SQLite database backup +Uses parallel processing for top-level subdirectories +""" + +import os +import sys +import logging +import argparse +import shutil +import sqlite3 +import gzip +import tarfile +import stat +import time +import subprocess +from datetime import datetime +from pathlib import Path +from typing import List, Set, Tuple, Dict +from concurrent.futures import ThreadPoolExecutor, as_completed, Future +import threading +import json +import urllib.request +import urllib.parse +from queue import Queue, Empty + +# Thread-safe logging +logging_lock = threading.Lock() + +def setup_logging(log_level: str = "INFO") -> logging.Logger: + """Setup logging configuration""" + logger = logging.getLogger("backup") + logger.setLevel(getattr(logging, log_level.upper())) + + # Console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + console_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - [%(threadName)s] - %(message)s' + ) + console_handler.setFormatter(console_formatter) + logger.addHandler(console_handler) + + # File handler + log_dir = Path("/scripts/logs") + log_dir.mkdir(exist_ok=True) + log_file = log_dir / f"backup-{datetime.now().strftime('%Y%m%d')}.log" + file_handler = logging.FileHandler(log_file) + file_handler.setLevel(logging.DEBUG) + file_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - [%(threadName)s] - %(message)s' + ) + file_handler.setFormatter(file_formatter) + logger.addHandler(file_handler) + + return logger + +class TaskMonitor: + """Monitors and reports on running tasks""" + + def __init__(self, logger: logging.Logger): + self.logger = logger + self.active_tasks = {} + self.task_lock = threading.Lock() + self.stop_monitoring = threading.Event() + self.monitor_thread = None + + def start_task(self, task_id: str, description: str): + """Register a new task as started""" + with self.task_lock: + self.active_tasks[task_id] = { + 'description': description, + 'start_time': datetime.now(), + 'thread_name': threading.current_thread().name + } + self.logger.info(f"🚀 Started: {description} [{task_id}]") + + def finish_task(self, task_id: str): + """Mark a task as completed""" + with self.task_lock: + if task_id in self.active_tasks: + task = self.active_tasks.pop(task_id) + duration = (datetime.now() - task['start_time']).total_seconds() + self.logger.info(f"✅ Completed: {task['description']} [{task_id}] ({duration:.1f}s)") + + def start_monitoring(self): + """Start the periodic status monitoring""" + self.stop_monitoring.clear() + self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) + self.monitor_thread.start() + self.logger.info("📊 Task monitoring started") + + def stop_monitoring(self): + """Stop the periodic status monitoring""" + self.stop_monitoring.set() + if self.monitor_thread: + self.monitor_thread.join() + self.logger.info("📊 Task monitoring stopped") + + def _monitor_loop(self): + """Periodic monitoring loop""" + while not self.stop_monitoring.wait(5.0): # Check every 5 seconds + with self.task_lock: + if self.active_tasks: + now = datetime.now() + active_list = [] + for task_id, task in self.active_tasks.items(): + duration = (now - task['start_time']).total_seconds() + active_list.append(f"{task['description']} ({duration:.0f}s)") + + self.logger.info(f"🔄 Active tasks ({len(self.active_tasks)}): {', '.join(active_list)}") + + +class BackupManager: + """Main backup management class with parallel processing""" + + def __init__(self, source_path: str, work_dir: str, backup_dir: str, + blocklist: List[str], logger: logging.Logger, max_workers: int = 4): + self.source_path = Path(source_path) + self.work_dir = Path(work_dir) + self.backup_dir = Path(backup_dir) + self.blocklist = set(blocklist) + self.logger = logger + self.max_workers = max_workers + + # Thread-safe progress tracking + self.total_files = 0 + self.copied_files = 0 + self.skipped_files = 0 + self.sqlite_files = set() + self.progress_lock = threading.Lock() + + # Task monitoring + self.task_monitor = TaskMonitor(logger) + + # Ensure directories exist + self.work_dir.mkdir(parents=True, exist_ok=True) + self.backup_dir.mkdir(parents=True, exist_ok=True) + + # Clean work directory at start + self._clean_work_dir() + + def _clean_work_dir(self): + """Clean the work directory before starting""" + if self.work_dir.exists(): + for item in self.work_dir.iterdir(): + if item.is_dir(): + shutil.rmtree(item) + else: + item.unlink() + self.logger.info(f"Cleaned work directory: {self.work_dir}") + + def _is_blocklisted(self, path: Path) -> bool: + """Check if a path is in the blocklist""" + path_str = str(path) + for blocked in self.blocklist: + if blocked in path_str or path.name == blocked: + return True + return False + + def _update_progress(self, files_copied: int, files_skipped: int, sqlite_found: Set[Path]): + """Thread-safe progress update""" + with self.progress_lock: + self.copied_files += files_copied + self.skipped_files += files_skipped + self.sqlite_files.update(sqlite_found) + + total_processed = self.copied_files + self.skipped_files + if self.total_files > 0 and total_processed % 100 == 0: # Log every 100 files + progress = (total_processed / self.total_files) * 100 + self.logger.info(f"Progress: {self.copied_files} copied, {self.skipped_files} skipped, {total_processed}/{self.total_files} total ({progress:.1f}%)") + + def _count_files_in_subdirectory(self, subdir: Path) -> int: + """Count files in a subdirectory for progress tracking""" + count = 0 + try: + for root, dirs, files in os.walk(subdir): + root_path = Path(root) + if self._is_blocklisted(root_path): + dirs.clear() + continue + dirs[:] = [d for d in dirs if not self._is_blocklisted(root_path / d)] + count += len(files) + except Exception as e: + self.logger.error(f"Error counting files in {subdir}: {e}") + return count + + def _should_skip_file(self, file_path: Path) -> Tuple[bool, str]: + """Check if a file should be skipped and return reason""" + try: + file_stat = file_path.stat() + + # Skip sockets + if stat.S_ISSOCK(file_stat.st_mode): + return True, "socket" + + # Skip named pipes (FIFOs) + if stat.S_ISFIFO(file_stat.st_mode): + return True, "pipe" + + # Skip character/block devices + if stat.S_ISCHR(file_stat.st_mode) or stat.S_ISBLK(file_stat.st_mode): + return True, "device" + + # Only process regular files and symlinks + if not (stat.S_ISREG(file_stat.st_mode) or stat.S_ISLNK(file_stat.st_mode)): + return True, "special" + + except (OSError, PermissionError) as e: + return True, f"access_error: {e}" + + return False, "" + + def _process_subdirectory(self, subdir: Path) -> Tuple[int, int, Set[Path]]: + """Process a single subdirectory (runs in parallel)""" + task_id = f"subdir_{subdir.name}" + self.task_monitor.start_task(task_id, f"Processing {subdir.name}") + + copied_files = 0 + skipped_files = 0 + sqlite_files = set() + + try: + # Create destination directory structure + rel_path = subdir.relative_to(self.source_path) + dest_dir = self.work_dir / rel_path + dest_dir.mkdir(parents=True, exist_ok=True) + + # Walk through subdirectory + for root, dirs, files in os.walk(subdir): + root_path = Path(root) + + # Skip blocklisted directories + if self._is_blocklisted(root_path): + dirs.clear() + continue + + # Filter out blocklisted subdirectories + dirs[:] = [d for d in dirs if not self._is_blocklisted(root_path / d)] + + # Create relative path structure + rel_root = root_path.relative_to(self.source_path) + dest_root = self.work_dir / rel_root + dest_root.mkdir(parents=True, exist_ok=True) + + # Process files + for file in files: + source_file = root_path / file + dest_file = dest_root / file + + # Check if file should be skipped + should_skip, skip_reason = self._should_skip_file(source_file) + if should_skip: + skipped_files += 1 + if skip_reason not in ["socket", "pipe"]: # Only log unusual skips + self.logger.debug(f"Skipped {source_file}: {skip_reason}") + continue + + try: + # Use copy2 to preserve metadata (timestamps, permissions) + shutil.copy2(source_file, dest_file) + + # Also try to preserve extended attributes and ACLs if possible + try: + # Copy stat info again to ensure everything is preserved + shutil.copystat(source_file, dest_file) + except (OSError, AttributeError): + # Some filesystems don't support all metadata + pass + + copied_files += 1 + + # Track SQLite databases + if file.endswith('.db') or file.endswith('.sqlite') or file.endswith('.sqlite3'): + sqlite_files.add(dest_file) + + except Exception as e: + skipped_files += 1 + self.logger.error(f"Failed to copy {source_file}: {e}") + + except Exception as e: + self.logger.error(f"Error processing {subdir}: {e}") + finally: + self.task_monitor.finish_task(task_id) + + return copied_files, skipped_files, sqlite_files + + def _copy_files_parallel(self) -> Set[Path]: + """Stage 1: Copy files in parallel by subdirectory""" + self.task_monitor.start_task("stage1", "File copy stage") + self.logger.info("Stage 1: Starting parallel file copy operation") + + if not self.source_path.exists(): + raise FileNotFoundError(f"Source path does not exist: {self.source_path}") + + # Get top-level subdirectories (and files) + subdirs = [] + top_level_files = [] + + for item in self.source_path.iterdir(): + if item.is_dir() and not self._is_blocklisted(item): + subdirs.append(item) + elif item.is_file(): + top_level_files.append(item) + + self.logger.info(f"Found {len(subdirs)} subdirectories and {len(top_level_files)} top-level files") + + # Count total files for progress tracking + self.task_monitor.start_task("counting", "Counting files for progress tracking") + self.total_files = len(top_level_files) + + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + # Submit counting tasks + count_futures = {executor.submit(self._count_files_in_subdirectory, subdir): subdir + for subdir in subdirs} + + for future in as_completed(count_futures): + try: + count = future.result() + self.total_files += count + except Exception as e: + subdir = count_futures[future] + self.logger.error(f"Error counting files in {subdir}: {e}") + + self.task_monitor.finish_task("counting") + self.logger.info(f"Total files to process: {self.total_files}") + + # Process top-level files first + if top_level_files: + self.task_monitor.start_task("toplevel", f"Processing {len(top_level_files)} top-level files") + skipped_toplevel = 0 + + for file in top_level_files: + should_skip, skip_reason = self._should_skip_file(file) + if should_skip: + skipped_toplevel += 1 + if skip_reason not in ["socket", "pipe"]: + self.logger.debug(f"Skipped top-level {file}: {skip_reason}") + continue + + try: + dest_file = self.work_dir / file.name + # Use copy2 to preserve metadata (timestamps, permissions) + shutil.copy2(file, dest_file) + + # Also try to preserve extended attributes and ACLs if possible + try: + shutil.copystat(file, dest_file) + except (OSError, AttributeError): + # Some filesystems don't support all metadata + pass + + sqlite_found = set() + if file.name.endswith('.db') or file.name.endswith('.sqlite') or file.name.endswith('.sqlite3'): + sqlite_found.add(dest_file) + + self._update_progress(1, 0, sqlite_found) + + except Exception as e: + skipped_toplevel += 1 + self.logger.error(f"Failed to copy top-level file {file}: {e}") + + if skipped_toplevel > 0: + self._update_progress(0, skipped_toplevel, set()) + + self.task_monitor.finish_task("toplevel") + + # Process subdirectories in parallel + if subdirs: + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + # Submit all subdirectory processing tasks + future_to_subdir = {executor.submit(self._process_subdirectory, subdir): subdir + for subdir in subdirs} + + # Collect results as they complete + for future in as_completed(future_to_subdir): + subdir = future_to_subdir[future] + try: + copied_count, skipped_count, sqlite_found = future.result() + self._update_progress(copied_count, skipped_count, sqlite_found) + except Exception as e: + self.logger.error(f"Error processing subdirectory {subdir}: {e}") + + self.task_monitor.finish_task("stage1") + self.logger.info(f"Stage 1 complete: Copied {self.copied_files} files, skipped {self.skipped_files} files, found {len(self.sqlite_files)} SQLite databases") + return self.sqlite_files + + def _backup_sqlite_databases(self, sqlite_files: Set[Path]): + """Stage 2: Create live backups of SQLite databases""" + if not sqlite_files: + return + + self.task_monitor.start_task("stage2", "SQLite database backup stage") + self.logger.info("Stage 2: Starting SQLite database backup") + + # Process SQLite backups in parallel too + def backup_single_db(db_file: Path) -> bool: + db_task_id = f"sqlite_{db_file.name}" + self.task_monitor.start_task(db_task_id, f"Backing up {db_file.name}") + + try: + # Find original database path + rel_path = db_file.relative_to(self.work_dir) + original_db = self.source_path / rel_path + + if not original_db.exists(): + self.logger.warning(f"Original database not found: {original_db}") + return False + + self.logger.info(f"Creating live backup of: {original_db}") + + # Use SQLite's backup API for live backup + self._sqlite_backup(str(original_db), str(db_file)) + return True + + except Exception as e: + self.logger.error(f"Failed to backup SQLite database {db_file}: {e}") + return False + finally: + self.task_monitor.finish_task(db_task_id) + + with ThreadPoolExecutor(max_workers=min(len(sqlite_files), self.max_workers)) as executor: + backup_futures = {executor.submit(backup_single_db, db_file): db_file + for db_file in sqlite_files} + + successful_backups = 0 + for future in as_completed(backup_futures): + if future.result(): + successful_backups += 1 + + self.logger.info(f"SQLite backup complete: {successful_backups}/{len(sqlite_files)} databases backed up successfully") + + self.task_monitor.finish_task("stage2") + + def _sqlite_backup(self, source_db: str, dest_db: str): + """Perform SQLite backup using the backup API""" + try: + # Connect to source and destination databases + source_conn = sqlite3.connect(source_db) + dest_conn = sqlite3.connect(dest_db) + + # Perform backup + source_conn.backup(dest_conn) + + # Close connections + source_conn.close() + dest_conn.close() + + self.logger.debug(f"SQLite backup completed: {source_db} -> {dest_db}") + + except sqlite3.Error as e: + self.logger.error(f"SQLite backup failed for {source_db}: {e}") + raise + + def _create_borg_backup(self) -> str: + """Create Borg backup with deduplication and compression""" + self.task_monitor.start_task("borg", "Creating Borg backup") + + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + archive_name = f"backup_{timestamp}" + + # Get Borg configuration from environment + borg_repo = os.getenv('BORG_REPO', '/backups/borg-repo') + borg_passphrase = os.getenv('BORG_PASSPHRASE', '') + compression = os.getenv('BORG_COMPRESSION', 'zstd,3') + + self.logger.info(f"Creating Borg backup: {archive_name} to repo {borg_repo}") + + try: + # Set environment for Borg + borg_env = os.environ.copy() + borg_env['BORG_REPO'] = borg_repo + borg_env['BORG_PASSPHRASE'] = borg_passphrase + borg_env['BORG_RELOCATED_REPO_ACCESS_IS_OK'] = 'yes' + borg_env['BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK'] = 'yes' + # SSH configuration to accept unknown hosts + borg_env['BORG_RSH'] = 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' + + # Initialize repository if it doesn't exist + self._ensure_borg_repo_exists(borg_env) + + # Create the backup + borg_create_cmd = [ + 'borg', 'create', + '--verbose', + '--stats', + '--show-rc', + '--compression', compression, + '--exclude-caches', + f'{borg_repo}::{archive_name}', + str(self.work_dir) + ] + + self.logger.info(f"Running: {' '.join(borg_create_cmd[:-1])} <work_dir>") + + result = subprocess.run( + borg_create_cmd, + env=borg_env, + capture_output=True, + text=True, + timeout=3600 # 1 hour timeout + ) + + if result.returncode != 0: + self.logger.error(f"Borg create failed with return code {result.returncode}") + self.logger.error(f"STDERR: {result.stderr}") + raise subprocess.CalledProcessError(result.returncode, borg_create_cmd) + + # Log Borg statistics + if result.stdout: + self.logger.info("Borg backup statistics:") + for line in result.stdout.split('\n'): + if line.strip() and ('Archive name:' in line or 'Time' in line or 'Original size:' in line or 'Compressed size:' in line or 'Deduplicated size:' in line): + self.logger.info(f" {line.strip()}") + + # Get repository info for size reporting + try: + info_result = subprocess.run( + ['borg', 'info', '--json', borg_repo], + env=borg_env, + capture_output=True, + text=True, + timeout=60 + ) + if info_result.returncode == 0: + import json + repo_info = json.loads(info_result.stdout) + if 'cache' in repo_info: + total_size = repo_info['cache'].get('stats', {}).get('total_size', 0) + size_mb = total_size / (1024 * 1024) + self.logger.info(f"Repository total size: {size_mb:.1f} MB") + except Exception as e: + self.logger.debug(f"Could not get repository size: {e}") + + self.logger.info(f"Borg backup created successfully: {archive_name}") + return f"{borg_repo}::{archive_name}" + + except subprocess.TimeoutExpired: + self.logger.error("Borg backup timed out after 1 hour") + raise + except Exception as e: + self.logger.error(f"Failed to create Borg backup: {e}") + raise + finally: + self.task_monitor.finish_task("borg") + + def _ensure_borg_repo_exists(self, borg_env: dict): + """Ensure Borg repository exists, create if necessary""" + borg_repo = borg_env['BORG_REPO'] + + # Check if repository exists + result = subprocess.run( + ['borg', 'info', borg_repo], + env=borg_env, + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode == 0: + self.logger.debug(f"Borg repository exists: {borg_repo}") + return + + # Repository doesn't exist, create it + self.logger.info(f"Initializing new Borg repository: {borg_repo}") + + # Determine repository type based on passphrase + if borg_env.get('BORG_PASSPHRASE'): + encryption_mode = 'repokey' + else: + encryption_mode = 'none' + self.logger.warning("No passphrase set - creating unencrypted repository") + + init_cmd = [ + 'borg', 'init', + '--encryption', encryption_mode, + borg_repo + ] + + init_result = subprocess.run( + init_cmd, + env=borg_env, + capture_output=True, + text=True, + timeout=60 + ) + + if init_result.returncode != 0: + self.logger.error(f"Failed to initialize Borg repository: {init_result.stderr}") + raise subprocess.CalledProcessError(init_result.returncode, init_cmd) + + self.logger.info(f"Borg repository initialized successfully with {encryption_mode} encryption") + + def _send_notification(self, success: bool, message: str): + """Send notification via ntfy if configured""" + ntfy_topic = os.getenv('NTFY_TOPIC') + if not ntfy_topic: + return + + try: + # Use ASCII-safe status indicators + status = "SUCCESS" if success else "FAILED" + title = f"Homelab Backup {status}" + + url = f'https://ntfy.sh/{ntfy_topic}' + data = message.encode('utf-8') + + req = urllib.request.Request(url, data=data, method='POST') + req.add_header('Title', title.encode('ascii', 'ignore').decode('ascii')) + req.add_header('Content-Type', 'text/plain; charset=utf-8') + + with urllib.request.urlopen(req, timeout=10) as response: + if response.status == 200: + self.logger.debug("Notification sent successfully") + else: + self.logger.warning(f"Notification returned status {response.status}") + + except Exception as e: + self.logger.error(f"Failed to send notification: {e}") + + def run_backup(self) -> bool: + """Run the complete backup process""" + start_time = datetime.now() + self.logger.info("=== Starting parallel backup process ===") + + # Start task monitoring + self.task_monitor.start_monitoring() + + try: + # Stage 1: Copy files in parallel + sqlite_files = self._copy_files_parallel() + + # Stage 2: Backup SQLite databases in parallel + self._backup_sqlite_databases(sqlite_files) + + # Create Borg backup + archive_path = self._create_borg_backup() + + # Calculate duration + duration = datetime.now() - start_time + + message = f"Parallel backup completed successfully in {duration.total_seconds():.1f} seconds. Files: {self.copied_files} copied, {self.skipped_files} skipped, SQLite DBs: {len(self.sqlite_files)}. Archive: {Path(archive_path).name}" + self.logger.info(message) + self._send_notification(True, message) + + return True + + except Exception as e: + error_msg = f"Backup failed: {str(e)}" + self.logger.error(error_msg) + self._send_notification(False, error_msg) + return False + + finally: + # Stop task monitoring + self.task_monitor.stop_monitoring() + + +def main(): + """Main function""" + parser = argparse.ArgumentParser(description='Parallel Homelab Backup Script') + parser.add_argument('source_path', help='Source directory to backup') + parser.add_argument('--work-dir', default='/work', help='Working directory for staging') + parser.add_argument('--backup-dir', default='/backups', help='Final backup destination') + parser.add_argument('--max-workers', type=int, default=4, help='Maximum number of parallel workers') + parser.add_argument('--blocklist', nargs='*', default=[ + 'node_modules', '.git', '__pycache__', '.cache', 'tmp', 'temp', + 'logs', '.logs', 'cache', '.npm', '.yarn', 'dist', 'build' + ], help='Directories to exclude from backup') + parser.add_argument('--log-level', default='INFO', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR']) + + args = parser.parse_args() + + # Handle blocklist - it might come as a single space-separated string from Ansible template + blocklist = args.blocklist + if blocklist and len(blocklist) == 1 and ' ' in blocklist[0]: + # Split the single string into multiple items + blocklist = blocklist[0].split() + elif not blocklist: + # Use default blocklist if none provided + blocklist = [ + 'node_modules', '.git', '__pycache__', '.cache', 'tmp', 'temp', + 'logs', '.logs', 'cache', '.npm', '.yarn', 'dist', 'build' + ] + + # Setup logging + logger = setup_logging(args.log_level) + + logger.info(f"Using blocklist: {blocklist}") + + # Create backup manager and run + backup_manager = BackupManager( + source_path=args.source_path, + work_dir=args.work_dir, + backup_dir=args.backup_dir, + blocklist=blocklist, + logger=logger, + max_workers=args.max_workers + ) + + success = backup_manager.run_backup() + sys.exit(0 if success else 1) + + +if __name__ == '__main__': + main()
\ No newline at end of file |
