From 4b6062981ae3c64b2fee68dece3f41d2bc50b8b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A9sir=C3=A9=20Werner=20Menrath?= Date: Fri, 5 Sep 2025 00:53:44 +0200 Subject: [PATCH] fix: Resolve issues with encrypted backup view and logic - Fixes a bug where the backup list would not display "orphan" incremental backups in the encrypted view. - Fixes a bug where system backups were incorrectly shown in the user backup list. - Prevents the app from asking for system permissions to lock an encrypted container on exit if it is not currently mounted. - Fixes pathing inconsistencies for the encryption manager to ensure mapper names are created consistently. - Adds debug logging to the backup list functions to help diagnose future issues. --- backup_manager.py | 21 +++++++++- core/encryption_manager.py | 61 +++++++++++++++--------------- main_app.py | 4 +- pyimage_ui/backup_content_frame.py | 6 +-- 4 files changed, 54 insertions(+), 38 deletions(-) diff --git a/backup_manager.py b/backup_manager.py index 706b271..4af871c 100644 --- a/backup_manager.py +++ b/backup_manager.py @@ -586,6 +586,11 @@ set -e """Lists all system backups, handling encrypted containers and sorting into groups.""" if not os.path.isdir(scan_path): return [] + try: + self.logger.log(f"Scanning for backups in {scan_path}. Contents: {os.listdir(scan_path)}") + except Exception as e: + self.logger.log(f"Could not list directory {scan_path}: {e}") + return [] all_backups = [] name_regex = re.compile( @@ -646,8 +651,10 @@ set -e if current_group: grouped_backups.append(current_group) current_group = [backup] - else: # Incremental - if current_group: # Add to the current group if it exists + else: # Incremental + if not current_group: # This is an orphan incremental, start a new group with it + current_group = [backup] + else: current_group.append(backup) if current_group: grouped_backups.append(current_group) @@ -665,8 +672,18 @@ set -e user_backups = [] if not os.path.isdir(base_backup_path): return user_backups + try: + self.logger.log(f"Scanning for user backups in {base_backup_path}. Contents: {os.listdir(base_backup_path)}") + except Exception as e: + self.logger.log(f"Could not list directory {base_backup_path}: {e}") + return user_backups + + system_backup_regex = re.compile( + r"^(\d{2}-\d{2}-\d{4})_(\d{2}:\d{2}:\d{2})_system_(full|incremental)(\.tar\.gz)?$", re.IGNORECASE) for item in os.listdir(base_backup_path): + if system_backup_regex.match(item): + continue # Skip system backups full_path = os.path.join(base_backup_path, item) if not os.path.isdir(full_path): continue diff --git a/core/encryption_manager.py b/core/encryption_manager.py index a1aaec4..b862980 100644 --- a/core/encryption_manager.py +++ b/core/encryption_manager.py @@ -6,6 +6,7 @@ import shutil import subprocess import tempfile import stat +import re from typing import Optional from pyimage_ui.password_dialog import PasswordDialog @@ -32,7 +33,6 @@ class EncryptionManager: return None def is_key_in_keyring(self, username: str) -> bool: - """Checks if a password for the given username exists in the keyring.""" try: return self.get_password_from_keyring(username) is not None except Exception as e: @@ -77,17 +77,22 @@ class EncryptionManager: return password - def unlock_container(self, base_path: str, password: str) -> Optional[str]: - """Unlocks and mounts an existing LUKS container.""" - self.logger.log(f"Attempting to unlock encrypted container in {base_path}") + def is_container_mounted(self, base_dest_path: str) -> bool: + """Checks if the LUKS container for a given base path is currently mounted.""" + mapper_name = f"pybackup_{os.path.basename(base_dest_path.rstrip('/'))}" + mount_point = f"/mnt/{mapper_name}" + return os.path.ismount(mount_point) + + def unlock_container(self, base_dest_path: str, password: str) -> Optional[str]: + self.logger.log(f"Attempting to unlock encrypted container for base path {base_dest_path}") - container_file = "pybackup_encrypted.luks" - container_path = os.path.join(base_path, container_file) + pybackup_dir = os.path.join(base_dest_path, "pybackup") + container_path = os.path.join(pybackup_dir, "pybackup_encrypted.luks") if not os.path.exists(container_path): self.logger.log(f"Encrypted container not found at {container_path}") return None - mapper_name = f"pybackup_{os.path.basename(base_path.rstrip('/'))}" + mapper_name = f"pybackup_{os.path.basename(base_dest_path.rstrip('/'))}" mount_point = f"/mnt/{mapper_name}" if os.path.ismount(mount_point): @@ -101,31 +106,26 @@ class EncryptionManager: """ if not self._execute_as_root(script): self.logger.log("Failed to unlock existing encrypted container. Check password or permissions.") - # Clean up failed mount attempt - self.cleanup_encrypted_backup(mapper_name, mount_point) + self.cleanup_encrypted_backup(base_dest_path) return None self.logger.log(f"Encrypted container unlocked and mounted at {mount_point}") return mount_point - def lock_container(self, base_path: str): - """Unmounts and closes the LUKS container for a given base path.""" - mapper_name = f"pybackup_{os.path.basename(base_path.rstrip('/'))}" - mount_point = f"/mnt/{mapper_name}" - self.cleanup_encrypted_backup(mapper_name, mount_point) + def lock_container(self, base_dest_path: str): + self.cleanup_encrypted_backup(base_dest_path) - def setup_encrypted_backup(self, queue, base_path: str, size_gb: int, password: str) -> Optional[str]: - """Sets up a persistent LUKS encrypted container for the backup destination.""" - self.logger.log(f"Setting up encrypted container at {base_path}") + def setup_encrypted_backup(self, queue, base_dest_path: str, size_gb: int, password: str) -> Optional[str]: + self.logger.log(f"Setting up encrypted container at {base_dest_path}") if not shutil.which("cryptsetup"): self.logger.log("Error: cryptsetup is not installed.") queue.put(('error', "cryptsetup is not installed.")) return None - container_file = "pybackup_encrypted.luks" - container_path = os.path.join(base_path, container_file) - mapper_name = f"pybackup_{os.path.basename(base_path.rstrip('/'))}" + pybackup_dir = os.path.join(base_dest_path, "pybackup") + container_path = os.path.join(pybackup_dir, "pybackup_encrypted.luks") + mapper_name = f"pybackup_{os.path.basename(base_dest_path.rstrip('/'))}" mount_point = f"/mnt/{mapper_name}" if not password: @@ -133,17 +133,17 @@ class EncryptionManager: queue.put(('error', "No password provided for encryption.")) return None - # If mount point already exists, something is wrong. Clean up first. if os.path.ismount(mount_point): self.logger.log(f"Mount point {mount_point} already exists. Cleaning up before proceeding.") - self.cleanup_encrypted_backup(mapper_name, mount_point) + self.cleanup_encrypted_backup(base_dest_path) if os.path.exists(container_path): self.logger.log(f"Encrypted container {container_path} already exists. Attempting to unlock.") - return self.unlock_container(base_path, password) + return self.unlock_container(base_dest_path, password) else: self.logger.log(f"Creating new encrypted container: {container_path}") script = f""" + mkdir -p {pybackup_dir} fallocate -l {size_gb}G {container_path} echo -n '{password}' | cryptsetup luksFormat {container_path} - mkdir -p {mount_point} @@ -154,10 +154,8 @@ class EncryptionManager: if not self._execute_as_root(script): self.logger.log("Failed to create and setup encrypted container.") - self.cleanup_encrypted_backup(mapper_name, mount_point) - # Also remove the failed container file + self.cleanup_encrypted_backup(base_dest_path) if os.path.exists(container_path): - # This should be done with pkexec as well for safety self._execute_as_root(f"rm -f {container_path}") queue.put(('error', "Failed to setup encrypted container.")) return None @@ -165,8 +163,9 @@ class EncryptionManager: self.logger.log(f"Encrypted container is ready and mounted at {mount_point}") return mount_point - def cleanup_encrypted_backup(self, mapper_name: str, mount_point: str): - """Unmounts and closes the LUKS container.""" + def cleanup_encrypted_backup(self, base_dest_path: str): + mapper_name = f"pybackup_{os.path.basename(base_dest_path.rstrip('/'))}" + mount_point = f"/mnt/{mapper_name}" self.logger.log(f"Cleaning up encrypted backup: {mapper_name}") script = f""" umount {mount_point} || echo "Mount point {mount_point} not found or already unmounted." @@ -177,7 +176,6 @@ class EncryptionManager: self.logger.log("Encrypted backup cleanup script failed.") def _execute_as_root(self, script_content: str) -> bool: - """Executes a shell script with root privileges using pkexec.""" script_path = '' try: with tempfile.NamedTemporaryFile(mode='w', delete=False, prefix="pybackup_script_", suffix=".sh", dir="/tmp") as tmp_script: @@ -191,10 +189,11 @@ class EncryptionManager: command = ['pkexec', script_path] + sanitized_script_content = re.sub(r"echo -n '.*?'", "echo -n '[REDACTED]'", script_content) self.logger.log( f"Executing privileged command via script: {script_path}") self.logger.log( - f"Script content:\n---\n{script_content}\n---") + f"Script content:\n---\n{sanitized_script_content}\n---") result = subprocess.run( command, capture_output=True, text=True, check=False) @@ -213,4 +212,4 @@ class EncryptionManager: return False finally: if script_path and os.path.exists(script_path): - os.remove(script_path) \ No newline at end of file + os.remove(script_path) diff --git a/main_app.py b/main_app.py index a6c02fe..5f45b82 100644 --- a/main_app.py +++ b/main_app.py @@ -511,7 +511,9 @@ class MainApplication(tk.Tk): def on_closing(self): if self.destination_path: - self.backup_manager.encryption_manager.lock_container(self.destination_path) + # Only try to lock if the container is actually mounted + if self.backup_manager.encryption_manager.is_container_mounted(self.destination_path): + self.backup_manager.encryption_manager.lock_container(self.destination_path) self.config_manager.set_setting("last_mode", self.mode) diff --git a/pyimage_ui/backup_content_frame.py b/pyimage_ui/backup_content_frame.py index 31269f4..23d9d99 100644 --- a/pyimage_ui/backup_content_frame.py +++ b/pyimage_ui/backup_content_frame.py @@ -102,15 +102,13 @@ class BackupContentFrame(ttk.Frame): MessageDialog(master=self.app, message_type="info", title=Msg.STR["info_menu"], text=Msg.STR["err_no_dest_folder"]) return - pybackup_dir = os.path.join(self.app.destination_path, "pybackup") - if not self.viewing_encrypted: username = os.path.basename(self.app.destination_path.rstrip('/')) password = self.app.backup_manager.encryption_manager.get_password(username, confirm=False) if not password: return - mount_point = self.app.backup_manager.encryption_manager.unlock_container(pybackup_dir, password) + mount_point = self.app.backup_manager.encryption_manager.unlock_container(self.app.destination_path, password) if mount_point: self.viewing_encrypted = True self.toggle_encrypted_button.config(text=Msg.STR["show_normal_backups"]) @@ -119,7 +117,7 @@ class BackupContentFrame(ttk.Frame): else: MessageDialog(master=self.app, message_type="error", title=Msg.STR["error"], text=Msg.STR["err_unlock_failed"]) else: - self.app.backup_manager.encryption_manager.lock_container(pybackup_dir) + self.app.backup_manager.encryption_manager.lock_container(self.app.destination_path) self.viewing_encrypted = False self.toggle_encrypted_button.config(text=Msg.STR["show_encrypted_backups"]) self.show(self.app.destination_path)