diff options
Diffstat (limited to 'playbooks')
| -rw-r--r-- | playbooks/backup.yml | 7 | ||||
| -rw-r--r-- | playbooks/roles/backup/README.md | 6 | ||||
| -rw-r--r-- | playbooks/roles/backup/tasks/main.yml | 8 | ||||
| -rw-r--r-- | playbooks/roles/backup/templates/stacks/docker-compose.yml | 54 | ||||
| -rw-r--r-- | playbooks/roles/backup/templates/volumes/backups/.gitkeep | 0 | ||||
| -rw-r--r-- | playbooks/roles/backup/templates/volumes/scripts/backup.py | 708 | ||||
| -rw-r--r-- | playbooks/roles/backup/templates/volumes/scripts/cleanup.py | 280 | ||||
| -rw-r--r-- | playbooks/roles/backup/templates/volumes/scripts/test_backup.py | 211 | ||||
| -rwxr-xr-x | playbooks/roles/backup/templates/volumes/ssh/id_ed25519 | 1 | ||||
| -rw-r--r-- | playbooks/roles/backup/templates/volumes/work/.gitkeep | 0 | ||||
| -rw-r--r-- | playbooks/roles/ci/templates/stacks/docker-compose.yml | 2 | ||||
| -rw-r--r-- | playbooks/roles/traefik/templates/stacks/traefik.yml | 2 |
12 files changed, 1277 insertions, 2 deletions
diff --git a/playbooks/backup.yml b/playbooks/backup.yml new file mode 100644 index 0000000..9a2cce7 --- /dev/null +++ b/playbooks/backup.yml @@ -0,0 +1,7 @@ +--- + +- name: backup setup + hosts: backup + become: true + roles: + - backup diff --git a/playbooks/roles/backup/README.md b/playbooks/roles/backup/README.md new file mode 100644 index 0000000..a153923 --- /dev/null +++ b/playbooks/roles/backup/README.md @@ -0,0 +1,6 @@ +this is AI slop but maybe it works? :P + +restore: +``` +tar -xzf backup_20241028_143022.tar.gz --preserve-permissions +``` diff --git a/playbooks/roles/backup/tasks/main.yml b/playbooks/roles/backup/tasks/main.yml new file mode 100644 index 0000000..5a7e38f --- /dev/null +++ b/playbooks/roles/backup/tasks/main.yml @@ -0,0 +1,8 @@ +--- + +- name: Deploy backup + ansible.builtin.import_tasks: manage-docker-swarm-service.yml + vars: + service_name: backup + template_render_dir: "../templates" + service_destination_dir: "{{ backup_base }}"
\ No newline at end of file diff --git a/playbooks/roles/backup/templates/stacks/docker-compose.yml b/playbooks/roles/backup/templates/stacks/docker-compose.yml new file mode 100644 index 0000000..9089a8a --- /dev/null +++ b/playbooks/roles/backup/templates/stacks/docker-compose.yml @@ -0,0 +1,54 @@ +services: + backup: + image: python:3.11-alpine + volumes: + - "{{ swarm_base }}:/mnt/source:ro" + - "{{ backup_base }}/volumes/backups:/backups" + - "{{ backup_base }}/volumes/work:/work" + - "{{ backup_base }}/volumes/scripts:/scripts" +{% if borg_repo.startswith('ssh://') %} + - "{{ backup_base }}/volumes/ssh:/root/.ssh:ro" +{% endif %} + environment: + - TZ={{ timezone }} + - DEPLOYMENT_TIME={{ deployment_time }} + - BACKUP_RETENTION_DAYS={{ backup_retention_days | default(14) }} + - NTFY_TOPIC={{ ntfy_topic }} + - PYTHONUNBUFFERED=1 + - BORG_REPO={{ borg_repo }} + - BORG_PASSPHRASE={{ borg_passphrase }} + - BORG_COMPRESSION={{ borg_compression }} + - BORG_KEEP_DAILY={{ borg_keep_daily }} + - BORG_KEEP_WEEKLY={{ borg_keep_weekly }} + - BORG_KEEP_MONTHLY={{ borg_keep_monthly }} + command: > + sh -c " + apk add --no-cache borgbackup openssh-client && + chmod +x /scripts/*.py && + mkdir -p /scripts/logs && +{% if homelab_build %} + python3 /scripts/backup.py /mnt/source --blocklist {{ blocklist }} --work-dir /work --backup-dir /backups +{% else %} + echo '0 2 * * * cd /scripts && python3 backup.py /mnt/source --blocklist {{ blocklist }} --work-dir /work --backup-dir /backups >> logs/backup-$$(date +\\%Y\\%m\\%d).log 2>&1' > /etc/crontabs/root && + echo '0 4 * * * cd /scripts && python3 cleanup.py /backups --blocklist {{ blocklist }} --retention-days ${BACKUP_RETENTION_DAYS} >> logs/cleanup-$$(date +\\%Y\\%m\\%d).log 2>&1' >> /etc/crontabs/root && + crond -f +{% endif %} + " + networks: + - proxy + deploy: + mode: replicated + replicas: 1 + update_config: + parallelism: 1 + failure_action: rollback + order: start-first + delay: 5s + monitor: 30s + placement: + constraints: + - node.role == manager + +networks: + proxy: + external: true diff --git a/playbooks/roles/backup/templates/volumes/backups/.gitkeep b/playbooks/roles/backup/templates/volumes/backups/.gitkeep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/playbooks/roles/backup/templates/volumes/backups/.gitkeep diff --git a/playbooks/roles/backup/templates/volumes/scripts/backup.py b/playbooks/roles/backup/templates/volumes/scripts/backup.py new file mode 100644 index 0000000..8b71e0f --- /dev/null +++ b/playbooks/roles/backup/templates/volumes/scripts/backup.py @@ -0,0 +1,708 @@ +#!/usr/bin/env python3 +""" +Homelab Backup Script +Performs two-stage backup: file copy and SQLite database backup +Uses parallel processing for top-level subdirectories +""" + +import os +import sys +import logging +import argparse +import shutil +import sqlite3 +import gzip +import tarfile +import stat +import time +import subprocess +from datetime import datetime +from pathlib import Path +from typing import List, Set, Tuple, Dict +from concurrent.futures import ThreadPoolExecutor, as_completed, Future +import threading +import json +import urllib.request +import urllib.parse +from queue import Queue, Empty + +# Thread-safe logging +logging_lock = threading.Lock() + +def setup_logging(log_level: str = "INFO") -> logging.Logger: + """Setup logging configuration""" + logger = logging.getLogger("backup") + logger.setLevel(getattr(logging, log_level.upper())) + + # Console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + console_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - [%(threadName)s] - %(message)s' + ) + console_handler.setFormatter(console_formatter) + logger.addHandler(console_handler) + + # File handler + log_dir = Path("/scripts/logs") + log_dir.mkdir(exist_ok=True) + log_file = log_dir / f"backup-{datetime.now().strftime('%Y%m%d')}.log" + file_handler = logging.FileHandler(log_file) + file_handler.setLevel(logging.DEBUG) + file_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - [%(threadName)s] - %(message)s' + ) + file_handler.setFormatter(file_formatter) + logger.addHandler(file_handler) + + return logger + +class TaskMonitor: + """Monitors and reports on running tasks""" + + def __init__(self, logger: logging.Logger): + self.logger = logger + self.active_tasks = {} + self.task_lock = threading.Lock() + self.stop_monitoring = threading.Event() + self.monitor_thread = None + + def start_task(self, task_id: str, description: str): + """Register a new task as started""" + with self.task_lock: + self.active_tasks[task_id] = { + 'description': description, + 'start_time': datetime.now(), + 'thread_name': threading.current_thread().name + } + self.logger.info(f"š Started: {description} [{task_id}]") + + def finish_task(self, task_id: str): + """Mark a task as completed""" + with self.task_lock: + if task_id in self.active_tasks: + task = self.active_tasks.pop(task_id) + duration = (datetime.now() - task['start_time']).total_seconds() + self.logger.info(f"ā
Completed: {task['description']} [{task_id}] ({duration:.1f}s)") + + def start_monitoring(self): + """Start the periodic status monitoring""" + self.stop_monitoring.clear() + self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) + self.monitor_thread.start() + self.logger.info("š Task monitoring started") + + def stop_monitoring(self): + """Stop the periodic status monitoring""" + self.stop_monitoring.set() + if self.monitor_thread: + self.monitor_thread.join() + self.logger.info("š Task monitoring stopped") + + def _monitor_loop(self): + """Periodic monitoring loop""" + while not self.stop_monitoring.wait(5.0): # Check every 5 seconds + with self.task_lock: + if self.active_tasks: + now = datetime.now() + active_list = [] + for task_id, task in self.active_tasks.items(): + duration = (now - task['start_time']).total_seconds() + active_list.append(f"{task['description']} ({duration:.0f}s)") + + self.logger.info(f"š Active tasks ({len(self.active_tasks)}): {', '.join(active_list)}") + + +class BackupManager: + """Main backup management class with parallel processing""" + + def __init__(self, source_path: str, work_dir: str, backup_dir: str, + blocklist: List[str], logger: logging.Logger, max_workers: int = 4): + self.source_path = Path(source_path) + self.work_dir = Path(work_dir) + self.backup_dir = Path(backup_dir) + self.blocklist = set(blocklist) + self.logger = logger + self.max_workers = max_workers + + # Thread-safe progress tracking + self.total_files = 0 + self.copied_files = 0 + self.skipped_files = 0 + self.sqlite_files = set() + self.progress_lock = threading.Lock() + + # Task monitoring + self.task_monitor = TaskMonitor(logger) + + # Ensure directories exist + self.work_dir.mkdir(parents=True, exist_ok=True) + self.backup_dir.mkdir(parents=True, exist_ok=True) + + # Clean work directory at start + self._clean_work_dir() + + def _clean_work_dir(self): + """Clean the work directory before starting""" + if self.work_dir.exists(): + for item in self.work_dir.iterdir(): + if item.is_dir(): + shutil.rmtree(item) + else: + item.unlink() + self.logger.info(f"Cleaned work directory: {self.work_dir}") + + def _is_blocklisted(self, path: Path) -> bool: + """Check if a path is in the blocklist""" + path_str = str(path) + for blocked in self.blocklist: + if blocked in path_str or path.name == blocked: + return True + return False + + def _update_progress(self, files_copied: int, files_skipped: int, sqlite_found: Set[Path]): + """Thread-safe progress update""" + with self.progress_lock: + self.copied_files += files_copied + self.skipped_files += files_skipped + self.sqlite_files.update(sqlite_found) + + total_processed = self.copied_files + self.skipped_files + if self.total_files > 0 and total_processed % 100 == 0: # Log every 100 files + progress = (total_processed / self.total_files) * 100 + self.logger.info(f"Progress: {self.copied_files} copied, {self.skipped_files} skipped, {total_processed}/{self.total_files} total ({progress:.1f}%)") + + def _count_files_in_subdirectory(self, subdir: Path) -> int: + """Count files in a subdirectory for progress tracking""" + count = 0 + try: + for root, dirs, files in os.walk(subdir): + root_path = Path(root) + if self._is_blocklisted(root_path): + dirs.clear() + continue + dirs[:] = [d for d in dirs if not self._is_blocklisted(root_path / d)] + count += len(files) + except Exception as e: + self.logger.error(f"Error counting files in {subdir}: {e}") + return count + + def _should_skip_file(self, file_path: Path) -> Tuple[bool, str]: + """Check if a file should be skipped and return reason""" + try: + file_stat = file_path.stat() + + # Skip sockets + if stat.S_ISSOCK(file_stat.st_mode): + return True, "socket" + + # Skip named pipes (FIFOs) + if stat.S_ISFIFO(file_stat.st_mode): + return True, "pipe" + + # Skip character/block devices + if stat.S_ISCHR(file_stat.st_mode) or stat.S_ISBLK(file_stat.st_mode): + return True, "device" + + # Only process regular files and symlinks + if not (stat.S_ISREG(file_stat.st_mode) or stat.S_ISLNK(file_stat.st_mode)): + return True, "special" + + except (OSError, PermissionError) as e: + return True, f"access_error: {e}" + + return False, "" + + def _process_subdirectory(self, subdir: Path) -> Tuple[int, int, Set[Path]]: + """Process a single subdirectory (runs in parallel)""" + task_id = f"subdir_{subdir.name}" + self.task_monitor.start_task(task_id, f"Processing {subdir.name}") + + copied_files = 0 + skipped_files = 0 + sqlite_files = set() + + try: + # Create destination directory structure + rel_path = subdir.relative_to(self.source_path) + dest_dir = self.work_dir / rel_path + dest_dir.mkdir(parents=True, exist_ok=True) + + # Walk through subdirectory + for root, dirs, files in os.walk(subdir): + root_path = Path(root) + + # Skip blocklisted directories + if self._is_blocklisted(root_path): + dirs.clear() + continue + + # Filter out blocklisted subdirectories + dirs[:] = [d for d in dirs if not self._is_blocklisted(root_path / d)] + + # Create relative path structure + rel_root = root_path.relative_to(self.source_path) + dest_root = self.work_dir / rel_root + dest_root.mkdir(parents=True, exist_ok=True) + + # Process files + for file in files: + source_file = root_path / file + dest_file = dest_root / file + + # Check if file should be skipped + should_skip, skip_reason = self._should_skip_file(source_file) + if should_skip: + skipped_files += 1 + if skip_reason not in ["socket", "pipe"]: # Only log unusual skips + self.logger.debug(f"Skipped {source_file}: {skip_reason}") + continue + + try: + # Use copy2 to preserve metadata (timestamps, permissions) + shutil.copy2(source_file, dest_file) + + # Also try to preserve extended attributes and ACLs if possible + try: + # Copy stat info again to ensure everything is preserved + shutil.copystat(source_file, dest_file) + except (OSError, AttributeError): + # Some filesystems don't support all metadata + pass + + copied_files += 1 + + # Track SQLite databases + if file.endswith('.db') or file.endswith('.sqlite') or file.endswith('.sqlite3'): + sqlite_files.add(dest_file) + + except Exception as e: + skipped_files += 1 + self.logger.error(f"Failed to copy {source_file}: {e}") + + except Exception as e: + self.logger.error(f"Error processing {subdir}: {e}") + finally: + self.task_monitor.finish_task(task_id) + + return copied_files, skipped_files, sqlite_files + + def _copy_files_parallel(self) -> Set[Path]: + """Stage 1: Copy files in parallel by subdirectory""" + self.task_monitor.start_task("stage1", "File copy stage") + self.logger.info("Stage 1: Starting parallel file copy operation") + + if not self.source_path.exists(): + raise FileNotFoundError(f"Source path does not exist: {self.source_path}") + + # Get top-level subdirectories (and files) + subdirs = [] + top_level_files = [] + + for item in self.source_path.iterdir(): + if item.is_dir() and not self._is_blocklisted(item): + subdirs.append(item) + elif item.is_file(): + top_level_files.append(item) + + self.logger.info(f"Found {len(subdirs)} subdirectories and {len(top_level_files)} top-level files") + + # Count total files for progress tracking + self.task_monitor.start_task("counting", "Counting files for progress tracking") + self.total_files = len(top_level_files) + + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + # Submit counting tasks + count_futures = {executor.submit(self._count_files_in_subdirectory, subdir): subdir + for subdir in subdirs} + + for future in as_completed(count_futures): + try: + count = future.result() + self.total_files += count + except Exception as e: + subdir = count_futures[future] + self.logger.error(f"Error counting files in {subdir}: {e}") + + self.task_monitor.finish_task("counting") + self.logger.info(f"Total files to process: {self.total_files}") + + # Process top-level files first + if top_level_files: + self.task_monitor.start_task("toplevel", f"Processing {len(top_level_files)} top-level files") + skipped_toplevel = 0 + + for file in top_level_files: + should_skip, skip_reason = self._should_skip_file(file) + if should_skip: + skipped_toplevel += 1 + if skip_reason not in ["socket", "pipe"]: + self.logger.debug(f"Skipped top-level {file}: {skip_reason}") + continue + + try: + dest_file = self.work_dir / file.name + # Use copy2 to preserve metadata (timestamps, permissions) + shutil.copy2(file, dest_file) + + # Also try to preserve extended attributes and ACLs if possible + try: + shutil.copystat(file, dest_file) + except (OSError, AttributeError): + # Some filesystems don't support all metadata + pass + + sqlite_found = set() + if file.name.endswith('.db') or file.name.endswith('.sqlite') or file.name.endswith('.sqlite3'): + sqlite_found.add(dest_file) + + self._update_progress(1, 0, sqlite_found) + + except Exception as e: + skipped_toplevel += 1 + self.logger.error(f"Failed to copy top-level file {file}: {e}") + + if skipped_toplevel > 0: + self._update_progress(0, skipped_toplevel, set()) + + self.task_monitor.finish_task("toplevel") + + # Process subdirectories in parallel + if subdirs: + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + # Submit all subdirectory processing tasks + future_to_subdir = {executor.submit(self._process_subdirectory, subdir): subdir + for subdir in subdirs} + + # Collect results as they complete + for future in as_completed(future_to_subdir): + subdir = future_to_subdir[future] + try: + copied_count, skipped_count, sqlite_found = future.result() + self._update_progress(copied_count, skipped_count, sqlite_found) + except Exception as e: + self.logger.error(f"Error processing subdirectory {subdir}: {e}") + + self.task_monitor.finish_task("stage1") + self.logger.info(f"Stage 1 complete: Copied {self.copied_files} files, skipped {self.skipped_files} files, found {len(self.sqlite_files)} SQLite databases") + return self.sqlite_files + + def _backup_sqlite_databases(self, sqlite_files: Set[Path]): + """Stage 2: Create live backups of SQLite databases""" + if not sqlite_files: + return + + self.task_monitor.start_task("stage2", "SQLite database backup stage") + self.logger.info("Stage 2: Starting SQLite database backup") + + # Process SQLite backups in parallel too + def backup_single_db(db_file: Path) -> bool: + db_task_id = f"sqlite_{db_file.name}" + self.task_monitor.start_task(db_task_id, f"Backing up {db_file.name}") + + try: + # Find original database path + rel_path = db_file.relative_to(self.work_dir) + original_db = self.source_path / rel_path + + if not original_db.exists(): + self.logger.warning(f"Original database not found: {original_db}") + return False + + self.logger.info(f"Creating live backup of: {original_db}") + + # Use SQLite's backup API for live backup + self._sqlite_backup(str(original_db), str(db_file)) + return True + + except Exception as e: + self.logger.error(f"Failed to backup SQLite database {db_file}: {e}") + return False + finally: + self.task_monitor.finish_task(db_task_id) + + with ThreadPoolExecutor(max_workers=min(len(sqlite_files), self.max_workers)) as executor: + backup_futures = {executor.submit(backup_single_db, db_file): db_file + for db_file in sqlite_files} + + successful_backups = 0 + for future in as_completed(backup_futures): + if future.result(): + successful_backups += 1 + + self.logger.info(f"SQLite backup complete: {successful_backups}/{len(sqlite_files)} databases backed up successfully") + + self.task_monitor.finish_task("stage2") + + def _sqlite_backup(self, source_db: str, dest_db: str): + """Perform SQLite backup using the backup API""" + try: + # Connect to source and destination databases + source_conn = sqlite3.connect(source_db) + dest_conn = sqlite3.connect(dest_db) + + # Perform backup + source_conn.backup(dest_conn) + + # Close connections + source_conn.close() + dest_conn.close() + + self.logger.debug(f"SQLite backup completed: {source_db} -> {dest_db}") + + except sqlite3.Error as e: + self.logger.error(f"SQLite backup failed for {source_db}: {e}") + raise + + def _create_borg_backup(self) -> str: + """Create Borg backup with deduplication and compression""" + self.task_monitor.start_task("borg", "Creating Borg backup") + + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + archive_name = f"backup_{timestamp}" + + # Get Borg configuration from environment + borg_repo = os.getenv('BORG_REPO', '/backups/borg-repo') + borg_passphrase = os.getenv('BORG_PASSPHRASE', '') + compression = os.getenv('BORG_COMPRESSION', 'zstd,3') + + self.logger.info(f"Creating Borg backup: {archive_name} to repo {borg_repo}") + + try: + # Set environment for Borg + borg_env = os.environ.copy() + borg_env['BORG_REPO'] = borg_repo + borg_env['BORG_PASSPHRASE'] = borg_passphrase + borg_env['BORG_RELOCATED_REPO_ACCESS_IS_OK'] = 'yes' + borg_env['BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK'] = 'yes' + # SSH configuration to accept unknown hosts + borg_env['BORG_RSH'] = 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' + + # Initialize repository if it doesn't exist + self._ensure_borg_repo_exists(borg_env) + + # Create the backup + borg_create_cmd = [ + 'borg', 'create', + '--verbose', + '--stats', + '--show-rc', + '--compression', compression, + '--exclude-caches', + f'{borg_repo}::{archive_name}', + str(self.work_dir) + ] + + self.logger.info(f"Running: {' '.join(borg_create_cmd[:-1])} <work_dir>") + + result = subprocess.run( + borg_create_cmd, + env=borg_env, + capture_output=True, + text=True, + timeout=3600 # 1 hour timeout + ) + + if result.returncode != 0: + self.logger.error(f"Borg create failed with return code {result.returncode}") + self.logger.error(f"STDERR: {result.stderr}") + raise subprocess.CalledProcessError(result.returncode, borg_create_cmd) + + # Log Borg statistics + if result.stdout: + self.logger.info("Borg backup statistics:") + for line in result.stdout.split('\n'): + if line.strip() and ('Archive name:' in line or 'Time' in line or 'Original size:' in line or 'Compressed size:' in line or 'Deduplicated size:' in line): + self.logger.info(f" {line.strip()}") + + # Get repository info for size reporting + try: + info_result = subprocess.run( + ['borg', 'info', '--json', borg_repo], + env=borg_env, + capture_output=True, + text=True, + timeout=60 + ) + if info_result.returncode == 0: + import json + repo_info = json.loads(info_result.stdout) + if 'cache' in repo_info: + total_size = repo_info['cache'].get('stats', {}).get('total_size', 0) + size_mb = total_size / (1024 * 1024) + self.logger.info(f"Repository total size: {size_mb:.1f} MB") + except Exception as e: + self.logger.debug(f"Could not get repository size: {e}") + + self.logger.info(f"Borg backup created successfully: {archive_name}") + return f"{borg_repo}::{archive_name}" + + except subprocess.TimeoutExpired: + self.logger.error("Borg backup timed out after 1 hour") + raise + except Exception as e: + self.logger.error(f"Failed to create Borg backup: {e}") + raise + finally: + self.task_monitor.finish_task("borg") + + def _ensure_borg_repo_exists(self, borg_env: dict): + """Ensure Borg repository exists, create if necessary""" + borg_repo = borg_env['BORG_REPO'] + + # Check if repository exists + result = subprocess.run( + ['borg', 'info', borg_repo], + env=borg_env, + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode == 0: + self.logger.debug(f"Borg repository exists: {borg_repo}") + return + + # Repository doesn't exist, create it + self.logger.info(f"Initializing new Borg repository: {borg_repo}") + + # Determine repository type based on passphrase + if borg_env.get('BORG_PASSPHRASE'): + encryption_mode = 'repokey' + else: + encryption_mode = 'none' + self.logger.warning("No passphrase set - creating unencrypted repository") + + init_cmd = [ + 'borg', 'init', + '--encryption', encryption_mode, + borg_repo + ] + + init_result = subprocess.run( + init_cmd, + env=borg_env, + capture_output=True, + text=True, + timeout=60 + ) + + if init_result.returncode != 0: + self.logger.error(f"Failed to initialize Borg repository: {init_result.stderr}") + raise subprocess.CalledProcessError(init_result.returncode, init_cmd) + + self.logger.info(f"Borg repository initialized successfully with {encryption_mode} encryption") + + def _send_notification(self, success: bool, message: str): + """Send notification via ntfy if configured""" + ntfy_topic = os.getenv('NTFY_TOPIC') + if not ntfy_topic: + return + + try: + # Use ASCII-safe status indicators + status = "SUCCESS" if success else "FAILED" + title = f"Homelab Backup {status}" + + url = f'https://ntfy.sh/{ntfy_topic}' + data = message.encode('utf-8') + + req = urllib.request.Request(url, data=data, method='POST') + req.add_header('Title', title.encode('ascii', 'ignore').decode('ascii')) + req.add_header('Content-Type', 'text/plain; charset=utf-8') + + with urllib.request.urlopen(req, timeout=10) as response: + if response.status == 200: + self.logger.debug("Notification sent successfully") + else: + self.logger.warning(f"Notification returned status {response.status}") + + except Exception as e: + self.logger.error(f"Failed to send notification: {e}") + + def run_backup(self) -> bool: + """Run the complete backup process""" + start_time = datetime.now() + self.logger.info("=== Starting parallel backup process ===") + + # Start task monitoring + self.task_monitor.start_monitoring() + + try: + # Stage 1: Copy files in parallel + sqlite_files = self._copy_files_parallel() + + # Stage 2: Backup SQLite databases in parallel + self._backup_sqlite_databases(sqlite_files) + + # Create Borg backup + archive_path = self._create_borg_backup() + + # Calculate duration + duration = datetime.now() - start_time + + message = f"Parallel backup completed successfully in {duration.total_seconds():.1f} seconds. Files: {self.copied_files} copied, {self.skipped_files} skipped, SQLite DBs: {len(self.sqlite_files)}. Archive: {Path(archive_path).name}" + self.logger.info(message) + self._send_notification(True, message) + + return True + + except Exception as e: + error_msg = f"Backup failed: {str(e)}" + self.logger.error(error_msg) + self._send_notification(False, error_msg) + return False + + finally: + # Stop task monitoring + self.task_monitor.stop_monitoring() + + +def main(): + """Main function""" + parser = argparse.ArgumentParser(description='Parallel Homelab Backup Script') + parser.add_argument('source_path', help='Source directory to backup') + parser.add_argument('--work-dir', default='/work', help='Working directory for staging') + parser.add_argument('--backup-dir', default='/backups', help='Final backup destination') + parser.add_argument('--max-workers', type=int, default=4, help='Maximum number of parallel workers') + parser.add_argument('--blocklist', nargs='*', default=[ + 'node_modules', '.git', '__pycache__', '.cache', 'tmp', 'temp', + 'logs', '.logs', 'cache', '.npm', '.yarn', 'dist', 'build' + ], help='Directories to exclude from backup') + parser.add_argument('--log-level', default='INFO', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR']) + + args = parser.parse_args() + + # Handle blocklist - it might come as a single space-separated string from Ansible template + blocklist = args.blocklist + if blocklist and len(blocklist) == 1 and ' ' in blocklist[0]: + # Split the single string into multiple items + blocklist = blocklist[0].split() + elif not blocklist: + # Use default blocklist if none provided + blocklist = [ + 'node_modules', '.git', '__pycache__', '.cache', 'tmp', 'temp', + 'logs', '.logs', 'cache', '.npm', '.yarn', 'dist', 'build' + ] + + # Setup logging + logger = setup_logging(args.log_level) + + logger.info(f"Using blocklist: {blocklist}") + + # Create backup manager and run + backup_manager = BackupManager( + source_path=args.source_path, + work_dir=args.work_dir, + backup_dir=args.backup_dir, + blocklist=blocklist, + logger=logger, + max_workers=args.max_workers + ) + + success = backup_manager.run_backup() + sys.exit(0 if success else 1) + + +if __name__ == '__main__': + main()
\ No newline at end of file diff --git a/playbooks/roles/backup/templates/volumes/scripts/cleanup.py b/playbooks/roles/backup/templates/volumes/scripts/cleanup.py new file mode 100644 index 0000000..6d3bcae --- /dev/null +++ b/playbooks/roles/backup/templates/volumes/scripts/cleanup.py @@ -0,0 +1,280 @@ +#!/usr/bin/env python3 +""" +Backup Cleanup Script +Manages backup retention by removing old Borg backup archives +""" + +import os +import sys +import logging +import argparse +import subprocess +import json +from datetime import datetime, timedelta +from pathlib import Path +from typing import List +import urllib.request +import urllib.parse + + +def setup_logging(log_level: str = "INFO") -> logging.Logger: + """Setup logging configuration""" + logger = logging.getLogger("cleanup") + logger.setLevel(getattr(logging, log_level.upper())) + + # Console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + console_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + console_handler.setFormatter(console_formatter) + logger.addHandler(console_handler) + + # File handler + log_dir = Path("/scripts/logs") + log_dir.mkdir(exist_ok=True) + log_file = log_dir / f"cleanup-{datetime.now().strftime('%Y%m%d')}.log" + file_handler = logging.FileHandler(log_file) + file_handler.setLevel(logging.DEBUG) + file_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + file_handler.setFormatter(file_formatter) + logger.addHandler(file_handler) + + return logger + + +class BorgBackupCleanup: + """Borg backup cleanup manager""" + + def __init__(self, logger: logging.Logger): + self.logger = logger + # Get Borg configuration from environment + self.borg_repo = os.getenv('BORG_REPO', '/backups/borg-repo') + self.borg_passphrase = os.getenv('BORG_PASSPHRASE', '') + self.keep_daily = int(os.getenv('BORG_KEEP_DAILY', '7')) + self.keep_weekly = int(os.getenv('BORG_KEEP_WEEKLY', '4')) + self.keep_monthly = int(os.getenv('BORG_KEEP_MONTHLY', '6')) + + def _send_notification(self, success: bool, message: str): + """Send notification via ntfy if configured""" + ntfy_topic = os.getenv('NTFY_TOPIC') + if not ntfy_topic: + return + + try: + # Use ASCII-safe status indicators + status = "CLEANUP" if success else "CLEANUP FAILED" + title = f"Homelab Backup {status}" + + url = f'https://ntfy.sh/{ntfy_topic}' + data = message.encode('utf-8') + + req = urllib.request.Request(url, data=data, method='POST') + req.add_header('Title', title.encode('ascii', 'ignore').decode('ascii')) + req.add_header('Content-Type', 'text/plain; charset=utf-8') + + with urllib.request.urlopen(req, timeout=10) as response: + if response.status == 200: + self.logger.debug("Notification sent successfully") + else: + self.logger.warning(f"Notification returned status {response.status}") + + except Exception as e: + self.logger.error(f"Failed to send notification: {e}") + + def _get_borg_env(self) -> dict: + """Get environment for Borg commands""" + borg_env = os.environ.copy() + borg_env['BORG_REPO'] = self.borg_repo + borg_env['BORG_PASSPHRASE'] = self.borg_passphrase + borg_env['BORG_RELOCATED_REPO_ACCESS_IS_OK'] = 'yes' + borg_env['BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK'] = 'yes' + # SSH configuration to accept unknown hosts + borg_env['BORG_RSH'] = 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' + return borg_env + + def cleanup_old_backups(self) -> bool: + """Remove old Borg backup archives using retention policy""" + self.logger.info(f"Starting Borg cleanup with retention: {self.keep_daily}d/{self.keep_weekly}w/{self.keep_monthly}m") + + try: + borg_env = self._get_borg_env() + + # Use Borg's built-in pruning with retention policy + prune_cmd = [ + 'borg', 'prune', + '--verbose', + '--stats', + '--show-rc', + '--keep-daily', str(self.keep_daily), + '--keep-weekly', str(self.keep_weekly), + '--keep-monthly', str(self.keep_monthly), + self.borg_repo + ] + + self.logger.info(f"Running: {' '.join(prune_cmd)}") + + result = subprocess.run( + prune_cmd, + env=borg_env, + capture_output=True, + text=True, + timeout=300 # 5 minutes timeout + ) + + if result.returncode != 0: + self.logger.error(f"Borg prune failed with return code {result.returncode}") + self.logger.error(f"STDERR: {result.stderr}") + raise subprocess.CalledProcessError(result.returncode, prune_cmd) + + # Log pruning statistics + if result.stdout: + self.logger.info("Borg prune statistics:") + for line in result.stdout.split('\n'): + if line.strip() and ('Deleted' in line or 'Kept' in line or 'Repository size' in line): + self.logger.info(f" {line.strip()}") + + # Run compact to reclaim space + self.logger.info("Running Borg compact to reclaim space...") + compact_cmd = ['borg', 'compact', '--verbose', self.borg_repo] + + compact_result = subprocess.run( + compact_cmd, + env=borg_env, + capture_output=True, + text=True, + timeout=600 # 10 minutes timeout + ) + + if compact_result.returncode == 0: + self.logger.info("Borg compact completed successfully") + else: + self.logger.warning(f"Borg compact completed with warnings: {compact_result.stderr}") + + message = f"Borg cleanup complete: kept {self.keep_daily}d/{self.keep_weekly}w/{self.keep_monthly}m" + self.logger.info(message) + self._send_notification(True, message) + + return True + + except subprocess.TimeoutExpired: + self.logger.error("Borg cleanup timed out") + self._send_notification(False, "Borg cleanup timed out") + return False + except Exception as e: + error_msg = f"Borg cleanup failed: {str(e)}" + self.logger.error(error_msg) + self._send_notification(False, error_msg) + return False + + def get_backup_stats(self): + """Get statistics about current Borg repository""" + try: + borg_env = self._get_borg_env() + + # Get repository info + info_cmd = ['borg', 'info', '--json', self.borg_repo] + info_result = subprocess.run( + info_cmd, + env=borg_env, + capture_output=True, + text=True, + timeout=60 + ) + + if info_result.returncode != 0: + self.logger.warning(f"Could not get repository info: {info_result.stderr}") + return + + repo_info = json.loads(info_result.stdout) + + # Get archive list + list_cmd = ['borg', 'list', '--json', self.borg_repo] + list_result = subprocess.run( + list_cmd, + env=borg_env, + capture_output=True, + text=True, + timeout=60 + ) + + if list_result.returncode != 0: + self.logger.warning(f"Could not get archive list: {list_result.stderr}") + return + + archive_info = json.loads(list_result.stdout) + archives = archive_info.get('archives', []) + + # Display statistics + self.logger.info(f"Borg repository statistics:") + self.logger.info(f" Repository: {self.borg_repo}") + self.logger.info(f" Total archives: {len(archives)}") + + if 'cache' in repo_info: + cache_stats = repo_info['cache']['stats'] + total_size_mb = cache_stats.get('total_size', 0) / (1024 * 1024) + total_csize_mb = cache_stats.get('total_csize', 0) / (1024 * 1024) + unique_csize_mb = cache_stats.get('unique_csize', 0) / (1024 * 1024) + + self.logger.info(f" Original size: {total_size_mb:.1f} MB") + self.logger.info(f" Compressed size: {total_csize_mb:.1f} MB") + self.logger.info(f" Deduplicated size: {unique_csize_mb:.1f} MB") + + if total_size_mb > 0: + compression_ratio = (1 - total_csize_mb / total_size_mb) * 100 + dedup_ratio = (1 - unique_csize_mb / total_csize_mb) * 100 if total_csize_mb > 0 else 0 + self.logger.info(f" Compression ratio: {compression_ratio:.1f}%") + self.logger.info(f" Deduplication ratio: {dedup_ratio:.1f}%") + + if archives: + oldest_archive = min(archives, key=lambda a: a['time']) + newest_archive = max(archives, key=lambda a: a['time']) + + oldest_time = datetime.fromisoformat(oldest_archive['time'].replace('Z', '+00:00')) + newest_time = datetime.fromisoformat(newest_archive['time'].replace('Z', '+00:00')) + + self.logger.info(f" Oldest archive: {oldest_archive['name']} ({oldest_time.strftime('%Y-%m-%d %H:%M')})") + self.logger.info(f" Newest archive: {newest_archive['name']} ({newest_time.strftime('%Y-%m-%d %H:%M')})") + + except Exception as e: + self.logger.error(f"Failed to get backup statistics: {e}") + + +def main(): + """Main function""" + parser = argparse.ArgumentParser(description='Borg Backup Cleanup Script') + parser.add_argument('backup_dir', nargs='?', help='Ignored for compatibility - Borg repo from env') + parser.add_argument('--retention-days', type=int, + help='Ignored for compatibility - uses Borg retention policy from env') + parser.add_argument('--log-level', default='INFO', + choices=['DEBUG', 'INFO', 'WARNING', 'ERROR']) + parser.add_argument('--stats-only', action='store_true', + help='Only show backup statistics, do not perform cleanup') + parser.add_argument('--blocklist', nargs='*', + help='Ignored parameter for compatibility with backup script') + + args = parser.parse_args() + + # Setup logging + logger = setup_logging(args.log_level) + + # Create cleanup manager + cleanup_manager = BorgBackupCleanup(logger=logger) + + # Show current statistics + cleanup_manager.get_backup_stats() + + if args.stats_only: + logger.info("Stats-only mode, no cleanup performed") + sys.exit(0) + + # Perform cleanup + success = cleanup_manager.cleanup_old_backups() + sys.exit(0 if success else 1) + + +if __name__ == '__main__': + main()
\ No newline at end of file diff --git a/playbooks/roles/backup/templates/volumes/scripts/test_backup.py b/playbooks/roles/backup/templates/volumes/scripts/test_backup.py new file mode 100644 index 0000000..acd5994 --- /dev/null +++ b/playbooks/roles/backup/templates/volumes/scripts/test_backup.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +""" +Backup Test Script +Creates test data and validates backup functionality +""" + +import os +import sys +import tempfile +import shutil +import sqlite3 +from pathlib import Path +import subprocess + + +def create_test_data(test_dir: Path) -> Path: + """Create test directory structure with SQLite databases""" + print(f"Creating test data in: {test_dir}") + + # Create directory structure + (test_dir / "app1" / "data").mkdir(parents=True) + (test_dir / "app2" / "logs").mkdir(parents=True) + (test_dir / "shared" / "configs").mkdir(parents=True) + (test_dir / "node_modules").mkdir(parents=True) # Should be blocked + + # Create regular files + (test_dir / "app1" / "config.json").write_text('{"name": "app1"}') + (test_dir / "app2" / "settings.yaml").write_text('debug: true') + (test_dir / "shared" / "common.txt").write_text('shared data') + (test_dir / "node_modules" / "package.json").write_text('{}') # Should be ignored + + # Create SQLite databases + db1_path = test_dir / "app1" / "data" / "app1.db" + db2_path = test_dir / "app2" / "app2.sqlite" + + # Create database 1 + conn1 = sqlite3.connect(db1_path) + conn1.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)") + conn1.execute("INSERT INTO users (name) VALUES ('Alice'), ('Bob')") + conn1.commit() + conn1.close() + + # Create database 2 + conn2 = sqlite3.connect(db2_path) + conn2.execute("CREATE TABLE logs (id INTEGER PRIMARY KEY, message TEXT, timestamp TEXT)") + conn2.execute("INSERT INTO logs (message, timestamp) VALUES ('Test log', '2024-01-01')") + conn2.commit() + conn2.close() + + print(f"Test data created:") + print(f" - Directory structure: app1/, app2/, shared/, node_modules/") + print(f" - SQLite databases: {db1_path}, {db2_path}") + print(f" - Regular files: config.json, settings.yaml, common.txt") + + return test_dir + + +def run_backup_test(): + """Run a complete backup test""" + print("=== Backup Test Starting ===") + + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # Create test directories + source_dir = temp_path / "source" + work_dir = temp_path / "work" + backup_dir = temp_path / "backups" + + source_dir.mkdir() + work_dir.mkdir() + backup_dir.mkdir() + + # Create test data + create_test_data(source_dir) + + # Run backup script + backup_script = Path(__file__).parent / "backup.py" + + if not backup_script.exists(): + print(f"ERROR: Backup script not found at {backup_script}") + return False + + print(f"\nRunning backup script...") + cmd = [ + sys.executable, str(backup_script), + str(source_dir), + "--work-dir", str(work_dir), + "--backup-dir", str(backup_dir), + "--log-level", "INFO" + ] + + try: + result = subprocess.run(cmd, capture_output=True, text=True) + + print(f"Backup script exit code: {result.returncode}") + if result.stdout: + print("STDOUT:") + print(result.stdout) + if result.stderr: + print("STDERR:") + print(result.stderr) + + if result.returncode != 0: + print("ERROR: Backup script failed") + return False + + # Validate results + return validate_backup_results(source_dir, work_dir, backup_dir) + + except Exception as e: + print(f"ERROR: Failed to run backup script: {e}") + return False + + +def validate_backup_results(source_dir: Path, work_dir: Path, backup_dir: Path) -> bool: + """Validate backup results""" + print("\n=== Validating Backup Results ===") + + success = True + + # Check if archive was created + archives = list(backup_dir.glob("backup_*.tar.gz")) + if not archives: + print("ERROR: No backup archive found") + return False + + archive = archives[0] + print(f"ā Backup archive created: {archive.name}") + + # Check work directory contents + expected_files = [ + "app1/config.json", + "app1/data/app1.db", + "app2/settings.yaml", + "app2/app2.sqlite", + "shared/common.txt" + ] + + for expected_file in expected_files: + work_file = work_dir / expected_file + if work_file.exists(): + print(f"ā Found expected file: {expected_file}") + else: + print(f"ā Missing expected file: {expected_file}") + success = False + + # Check that blocklisted directory was excluded + node_modules = work_dir / "node_modules" + if node_modules.exists(): + print("ā Blocklisted directory 'node_modules' was not excluded") + success = False + else: + print("ā Blocklisted directory 'node_modules' was correctly excluded") + + # Validate SQLite databases + db_files = [ + work_dir / "app1/data/app1.db", + work_dir / "app2/app2.sqlite" + ] + + for db_file in db_files: + if db_file.exists(): + try: + conn = sqlite3.connect(db_file) + # Try to query the database + cursor = conn.cursor() + if "app1.db" in str(db_file): + cursor.execute("SELECT COUNT(*) FROM users") + count = cursor.fetchone()[0] + if count == 2: + print(f"ā SQLite database {db_file.name} has correct data") + else: + print(f"ā SQLite database {db_file.name} has incorrect data count: {count}") + success = False + else: + cursor.execute("SELECT COUNT(*) FROM logs") + count = cursor.fetchone()[0] + if count == 1: + print(f"ā SQLite database {db_file.name} has correct data") + else: + print(f"ā SQLite database {db_file.name} has incorrect data count: {count}") + success = False + conn.close() + except Exception as e: + print(f"ā Failed to validate SQLite database {db_file}: {e}") + success = False + else: + print(f"ā SQLite database not found: {db_file}") + success = False + + return success + + +def main(): + """Main function""" + print("Backup Test Script") + print("=" * 50) + + success = run_backup_test() + + if success: + print("\nš All tests passed!") + sys.exit(0) + else: + print("\nā Some tests failed!") + sys.exit(1) + + +if __name__ == '__main__': + main()
\ No newline at end of file diff --git a/playbooks/roles/backup/templates/volumes/ssh/id_ed25519 b/playbooks/roles/backup/templates/volumes/ssh/id_ed25519 new file mode 100755 index 0000000..61a4643 --- /dev/null +++ b/playbooks/roles/backup/templates/volumes/ssh/id_ed25519 @@ -0,0 +1 @@ +{{ backups_borg_key }} diff --git a/playbooks/roles/backup/templates/volumes/work/.gitkeep b/playbooks/roles/backup/templates/volumes/work/.gitkeep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/playbooks/roles/backup/templates/volumes/work/.gitkeep diff --git a/playbooks/roles/ci/templates/stacks/docker-compose.yml b/playbooks/roles/ci/templates/stacks/docker-compose.yml index 2a77205..a33e87f 100644 --- a/playbooks/roles/ci/templates/stacks/docker-compose.yml +++ b/playbooks/roles/ci/templates/stacks/docker-compose.yml @@ -5,7 +5,7 @@ services: image: oci.liz.coffee/emprespresso/ci_worker:release volumes: - /var/run/docker.sock:/var/run/docker.sock:ro - - {{ ci_base }}/volumes/laminar:/var/lib/laminar/ + - "{{ ci_base }}/volumes/laminar:/var/lib/laminar/" - /var/lib/laminar/cfg # don't overwrite cfg jobs & scripts healthcheck: test: ["CMD-SHELL", "/usr/bin/laminarc show-jobs"] diff --git a/playbooks/roles/traefik/templates/stacks/traefik.yml b/playbooks/roles/traefik/templates/stacks/traefik.yml index 8caa379..00da98d 100644 --- a/playbooks/roles/traefik/templates/stacks/traefik.yml +++ b/playbooks/roles/traefik/templates/stacks/traefik.yml @@ -41,7 +41,7 @@ providers: certificatesResolvers: letsencrypt: acme: - email: {{ certs_email }} + email: "{{ certs_email }}" storage: /certs/acme.json caServer: https://acme-v02.api.letsencrypt.org/directory # caServer: https://acme-staging-v02.api.letsencrypt.org/directory # staging |
