From dbaa623b17f0bcbe3a859ea1a7a4f85df0dd5ee8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A9sir=C3=A9=20Werner=20Menrath?= Date: Sun, 7 Sep 2025 19:02:39 +0200 Subject: [PATCH] fix(backup): Resolve multiple issues in encrypted backup handling This commit addresses several bugs related to the mounting, unmounting, and deletion of encrypted backups, as well as a crash when listing backups. The key changes are: - **Fix Double Mount on View:** Removed redundant mount operation when viewing encrypted backup contents. The mount is now handled by a single, centralized function. - **Fix Deletion of Encrypted Backups:** - The container is no longer re-mounted if already open, preventing a second password prompt. - Deletion of encrypted *user* backups is now performed with user-level permissions, removing the need for a third password prompt via pkexec. - **Fix UI Refresh after Deletion:** The backup list now correctly refreshes after a backup is deleted. - **Fix Crash on Empty Backup List:** Resolved an `UnboundLocalError` that occurred when listing backups from an empty or non-existent backup directory. - **Improve Mount Detection:** The `is_mounted` check is now more robust to prevent race conditions or other OS-level inconsistencies. --- core/backup_manager.py | 233 ++++++++++++++++------------- core/encryption_manager.py | 23 ++- main_app.py | 5 +- pyimage_ui/backup_content_frame.py | 2 +- pyimage_ui/navigation.py | 14 +- 5 files changed, 151 insertions(+), 126 deletions(-) diff --git a/core/backup_manager.py b/core/backup_manager.py index cef6b1d..c52eafa 100644 --- a/core/backup_manager.py +++ b/core/backup_manager.py @@ -156,15 +156,25 @@ class BackupManager: self._uninhibit_screensaver() self.process = None - def list_all_backups(self, base_dest_path: str): - is_encrypted = self.encryption_manager.is_encrypted(base_dest_path) - mounted_path = None - if is_encrypted: - if self.encryption_manager.is_mounted(base_dest_path): - pybackup_dir = os.path.join(base_dest_path, "pybackup") - mounted_path = os.path.join(pybackup_dir, "encrypted") + def _prepare_and_get_mounted_path(self, base_dest_path: str, is_system: bool, mount_if_needed: bool) -> Optional[str]: + if not self.encryption_manager.is_encrypted(base_dest_path): + return None + + if not self.encryption_manager.is_mounted(base_dest_path): + if mount_if_needed: + if not self.encryption_manager.prepare_encrypted_destination(base_dest_path, is_system=is_system, source_size=0, queue=self.app.queue): + return None else: - return [], [] + return None + + if self.encryption_manager.is_mounted(base_dest_path): + pybackup_dir = os.path.join(base_dest_path, "pybackup") + return os.path.join(pybackup_dir, "encrypted") + + return None + + def list_all_backups(self, base_dest_path: str, mount_if_needed: bool = True): + mounted_path = self._prepare_and_get_mounted_path(base_dest_path, is_system=False, mount_if_needed=mount_if_needed) return self._list_all_backups_from_path(base_dest_path, mounted_path) def _list_all_backups_from_path(self, base_dest_path: str, mounted_path: Optional[str] = None): @@ -173,120 +183,111 @@ class BackupManager: return system_backups, user_backups def list_system_backups(self, base_dest_path: str, mount_if_needed: bool = True) -> Optional[List[Dict[str, str]]]: - is_encrypted = self.encryption_manager.is_encrypted(base_dest_path) - mounted_path = None - if is_encrypted: - if not self.encryption_manager.is_mounted(base_dest_path): - if mount_if_needed: - mounted_path = self.encryption_manager.prepare_encrypted_destination(base_dest_path, is_system=True, source_size=0, queue=self.app.queue) - else: - return None - if self.encryption_manager.is_mounted(base_dest_path): - pybackup_dir = os.path.join(base_dest_path, "pybackup") - mounted_path = os.path.join(pybackup_dir, "encrypted") + mounted_path = self._prepare_and_get_mounted_path(base_dest_path, is_system=True, mount_if_needed=mount_if_needed) + if self.encryption_manager.is_encrypted(base_dest_path) and not mounted_path: + return None return self._list_system_backups_from_path(base_dest_path, mounted_path) - def _list_system_backups_from_path(self, base_dest_path: str, mounted_path: Optional[str] = None) -> List[Dict[str, str]]: + def _list_backups(self, base_dest_path: str, mounted_path: Optional[str], name_regex: re.Pattern, backup_type_prefix: str) -> List[Dict[str, str]]: pybackup_dir = os.path.join(base_dest_path, "pybackup") - if not os.path.isdir(pybackup_dir): return [] + if not os.path.isdir(pybackup_dir): + return [] + + is_system = backup_type_prefix == "system" + all_backups = [] - name_regex = re.compile(r"^(\d{2}-\d{2}-\d{4})_(\d{2}:\d{2}:\d{2})_system_(full|incremental)(\.tar\.gz)?(_encrypted)?\.txt$", re.IGNORECASE) for item in os.listdir(pybackup_dir): match = name_regex.match(item) - if not match: continue - date_str, time_str, backup_type_base, comp_ext, enc_suffix = match.groups() - is_encrypted = (enc_suffix is not None) - is_compressed = (comp_ext is not None) - backup_name = item.replace(".txt", "").replace("_encrypted", "") - full_path = os.path.join(mounted_path or pybackup_dir, backup_name) - backup_type = backup_type_base.capitalize() - if is_compressed: backup_type += " (Compressed)" - if is_encrypted: backup_type += " (Encrypted)" - backup_size, comment = "N/A", "" - info_file_path = os.path.join(pybackup_dir, item) - if os.path.exists(info_file_path): - try: - with open(info_file_path, 'r') as f: - for line in f: - if line.strip().lower().startswith("originalgröße:"): backup_size = line.split(":", 1)[1].strip().split('(')[0].strip() - elif line.strip().lower().startswith("kommentar:"): comment = line.split(":", 1)[1].strip() - except Exception as e: - self.logger.log(f"Could not read info file {info_file_path}: {e}") - all_backups.append({ - "date": date_str, "time": time_str, "type": backup_type, "size": backup_size, - "folder_name": backup_name, "full_path": full_path, "comment": comment, - "is_compressed": is_compressed, "is_encrypted": is_encrypted, - "backup_type_base": backup_type_base.capitalize(), - "datetime": datetime.datetime.strptime(f"{date_str} {time_str}", '%d-%m-%Y %H:%M:%S') - }) - all_backups.sort(key=lambda x: x['datetime']) - grouped_backups = [] - current_group = [] - for backup in all_backups: - if backup['backup_type_base'] == 'Full': - if current_group: grouped_backups.append(current_group) - current_group = [backup] + if not match: + continue + + groups = match.groups() + date_str, time_str = groups[0], groups[1] + + if is_system: + backup_type_base, comp_ext, enc_suffix = groups[2], groups[3], groups[4] + source_name = None + is_compressed = comp_ext is not None else: - if not current_group: current_group.append(backup) - else: current_group.append(backup) - if current_group: grouped_backups.append(current_group) - grouped_backups.sort(key=lambda g: g[0]['datetime'], reverse=True) - return [item for group in grouped_backups for item in group] + source_name, backup_type_base, enc_suffix = groups[2], groups[3], groups[4] + is_compressed = False - def list_user_backups(self, base_dest_path: str, mount_if_needed: bool = True) -> Optional[List[Dict[str, str]]]: - is_encrypted = self.encryption_manager.is_encrypted(base_dest_path) - mounted_path = None - if is_encrypted: - if not self.encryption_manager.is_mounted(base_dest_path): - if mount_if_needed: - mounted_path = self.encryption_manager.prepare_encrypted_destination(base_dest_path, is_system=False, source_size=0, queue=self.app.queue) - else: - return None - if self.encryption_manager.is_mounted(base_dest_path): - pybackup_dir = os.path.join(base_dest_path, "pybackup") - mounted_path = os.path.join(pybackup_dir, "encrypted") - return self._list_user_backups_from_path(base_dest_path, mounted_path) - - def _list_user_backups_from_path(self, base_dest_path: str, mounted_path: Optional[str] = None) -> List[Dict[str, str]]: - pybackup_dir = os.path.join(base_dest_path, "pybackup") - if not os.path.isdir(pybackup_dir): return [] - user_backups = [] - name_regex = re.compile(r"^(\d{2}-\d{2}-\d{4})_(\d{2}:\d{2}:\d{2})_user_(.+?)_(full|incremental)(_encrypted)?\.txt$", re.IGNORECASE) - for item in os.listdir(pybackup_dir): - match = name_regex.match(item) - if not match: continue - date_str, time_str, source_name, backup_type_base, enc_suffix = match.groups() - is_encrypted = (enc_suffix is not None) + is_encrypted = enc_suffix is not None backup_name = item.replace(".txt", "").replace("_encrypted", "") if mounted_path: - user_backup_dir = os.path.join(mounted_path, source_name) - full_path = os.path.join(user_backup_dir, backup_name) + if is_system: + full_path = os.path.join(mounted_path, backup_name) + else: + user_backup_dir = os.path.join(mounted_path, source_name) + full_path = os.path.join(user_backup_dir, backup_name) else: - user_backups_dir = os.path.join(pybackup_dir, "user_backups", source_name) - full_path = os.path.join(user_backups_dir, backup_name) + if is_system: + full_path = os.path.join(pybackup_dir, backup_name) + else: + user_backups_dir = os.path.join(pybackup_dir, "user_backups", source_name) + full_path = os.path.join(user_backups_dir, backup_name) backup_type = backup_type_base.capitalize() - if is_encrypted: backup_type += " (Encrypted)" + if is_compressed: + backup_type += " (Compressed)" + if is_encrypted: + backup_type += " (Encrypted)" + backup_size, comment = "N/A", "" info_file_path = os.path.join(pybackup_dir, item) if os.path.exists(info_file_path): try: with open(info_file_path, 'r') as f: for line in f: - if line.strip().lower().startswith("originalgröße:"): backup_size = line.split(":", 1)[1].strip().split('(')[0].strip() - elif line.strip().lower().startswith("kommentar:"): comment = line.split(":", 1)[1].strip() + if line.strip().lower().startswith("originalgröße:"): + backup_size = line.split(":", 1)[1].strip().split('(')[0].strip() + elif line.strip().lower().startswith("kommentar:"): + comment = line.split(":", 1)[1].strip() except Exception as e: self.logger.log(f"Could not read info file {info_file_path}: {e}") - user_backups.append({ - "date": date_str, "time": time_str, "type": backup_type, "size": backup_size, - "folder_name": backup_name, "full_path": full_path, "comment": comment, - "is_encrypted": is_encrypted, "source": source_name, "is_compressed": False, + + backup_info = { + "date": date_str, "time": time_str, "type": backup_type, "size": backup_size, + "folder_name": backup_name, "full_path": full_path, "comment": comment, + "is_encrypted": is_encrypted, "is_compressed": is_compressed, "backup_type_base": backup_type_base.capitalize(), "datetime": datetime.datetime.strptime(f"{date_str} {time_str}", '%d-%m-%Y %H:%M:%S') - }) - user_backups.sort(key=lambda x: x['datetime'], reverse=True) - return user_backups + } + if not is_system: + backup_info["source"] = source_name + + all_backups.append(backup_info) + + if is_system: + all_backups.sort(key=lambda x: x['datetime']) + grouped_backups = [] + current_group = [] + for backup in all_backups: + if backup['backup_type_base'] == 'Full': + if current_group: + grouped_backups.append(current_group) + current_group = [backup] + else: + if not current_group: + current_group.append(backup) + else: + current_group.append(backup) + if current_group: + grouped_backups.append(current_group) + grouped_backups.sort(key=lambda g: g[0]['datetime'], reverse=True) + return [item for group in grouped_backups for item in group] + else: + all_backups.sort(key=lambda x: x['datetime'], reverse=True) + return all_backups + + def _list_system_backups_from_path(self, base_dest_path: str, mounted_path: Optional[str] = None) -> List[Dict[str, str]]: + name_regex = re.compile(r"^(\d{2}-\d{2}-\d{4})_(\d{2}:\d{2}:\d{2})_system_(full|incremental)(\.tar\.gz)?(_encrypted)?\.txt$", re.IGNORECASE) + return self._list_backups(base_dest_path, mounted_path, name_regex, "system") + + def _list_user_backups_from_path(self, base_dest_path: str, mounted_path: Optional[str] = None) -> List[Dict[str, str]]: + name_regex = re.compile(r"^(\d{2}-\d{2}-\d{4})_(\d{2}:\d{2}:\d{2})_user_(.+?)_(full|incremental)(_encrypted)?\.txt$", re.IGNORECASE) + return self._list_backups(base_dest_path, mounted_path, name_regex, "user") def _find_latest_backup(self, base_backup_path: str, is_system: bool) -> Optional[str]: self.logger.log(f"Searching for latest backup in: {base_backup_path}") @@ -401,16 +402,38 @@ class BackupManager: try: if is_encrypted: self.logger.log(f"Starting encrypted deletion for {path_to_delete}") - mount_point = self.encryption_manager.prepare_encrypted_destination( - base_dest_path, is_system, source_size=0, queue=queue) + + mount_point = None + if self.encryption_manager.is_mounted(base_dest_path): + pybackup_dir = os.path.join(base_dest_path, "pybackup") + mount_point = os.path.join(pybackup_dir, "encrypted") + else: + if password: + mount_point = self.encryption_manager.mount_for_deletion(base_dest_path, is_system, password) + else: + self.logger.log("Password not provided for encrypted deletion.") + if not mount_point: self.logger.log("Failed to unlock container for deletion.") queue.put(('deletion_complete', False)) return internal_path_to_delete = os.path.join(mount_point, os.path.basename(path_to_delete)) - script_content = f"rm -rf '{internal_path_to_delete}'\nrm -f '{info_file_path}'" - success = self.encryption_manager._execute_as_root(script_content) + success = False + if is_system: + script_content = f"rm -rf '{internal_path_to_delete}'\nrm -f '{info_file_path}'" + success = self.encryption_manager._execute_as_root(script_content) + else: # User backup, no root needed + try: + if os.path.isdir(internal_path_to_delete): + shutil.rmtree(internal_path_to_delete) + if os.path.exists(info_file_path): + os.remove(info_file_path) + self.logger.log(f"Successfully deleted {internal_path_to_delete} and {info_file_path}") + success = True + except Exception as e: + self.logger.log(f"Failed to delete user backup {internal_path_to_delete}: {e}") + success = False if success: self.logger.log("Encrypted backup deleted successfully.") @@ -446,9 +469,9 @@ class BackupManager: try: pgid = os.getpgid(self.process.pid) script_content = f""" - kill -SIGTERM -- -{pgid} || echo 'Process group not found or already terminated.' - if [ -n \"{delete_path}\" ] && [ \"{delete_path}\" != \"/\" ]; then rm -rf \"{delete_path}\"; fi - """ +kill -SIGTERM -- -{pgid} || echo 'Process group not found or already terminated.' +if [ -n "{delete_path}" ] && [ "{delete_path}" != "/" ]; then rm -rf "{delete_path}"; fi +""" self.encryption_manager._execute_as_root(script_content) except Exception as e: self.logger.log(f"An error occurred during privileged cancel and delete: {e}") diff --git a/core/encryption_manager.py b/core/encryption_manager.py index af868da..66ff4ee 100644 --- a/core/encryption_manager.py +++ b/core/encryption_manager.py @@ -65,7 +65,16 @@ class EncryptionManager: def is_mounted(self, base_dest_path: str) -> bool: pybackup_dir = os.path.join(base_dest_path, "pybackup") mount_point = os.path.join(pybackup_dir, "encrypted") - return os.path.ismount(mount_point) + return os.path.ismount(mount_point) or base_dest_path in self.mounted_destinations + + def mount_for_deletion(self, base_dest_path: str, is_system: bool, password: str) -> Optional[str]: + self.logger.log("Mounting container for deletion operation.") + if self._open_and_mount(base_dest_path, is_system, password): + mount_point = os.path.join(os.path.dirname(self.get_container_path(base_dest_path)), "..", "encrypted") + self.mounted_destinations.add(base_dest_path) + return mount_point + self.logger.log("Failed to mount container for deletion.") + return None def prepare_encrypted_destination(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]: container_path = self.get_container_path(base_dest_path) @@ -130,9 +139,10 @@ class EncryptionManager: self.mounted_destinations.add(base_dest_path) return mount_point - def _open_and_mount(self, base_dest_path: str, is_system: bool) -> bool: + def _open_and_mount(self, base_dest_path: str, is_system: bool, password: Optional[str] = None) -> bool: username = os.path.basename(base_dest_path.rstrip('/')) - password = self.get_password(username, confirm=False) + if not password: + password = self.get_password(username, confirm=False) if not password: return False container_path = self.get_container_path(base_dest_path) @@ -143,8 +153,11 @@ class EncryptionManager: script = f""" umount -l \"{mount_point}\" || true cryptsetup luksClose {mapper_name} || true - mkdir -p \"{mount_point}\"\n echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} - - mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"\n {chown_cmd}\n """ + mkdir -p \"{mount_point}\" + echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} - + mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\" + {chown_cmd} + """ return self._execute_as_root(script, password) def _resize_container(self, base_dest_path: str, new_size_bytes: int) -> bool: diff --git a/main_app.py b/main_app.py index 053abc8..1a4634f 100644 --- a/main_app.py +++ b/main_app.py @@ -668,8 +668,9 @@ class MainApplication(tk.Tk): elif message_type == 'deletion_complete': self.actions._set_ui_state(True) self.backup_content_frame.hide_deletion_status() - self.backup_content_frame.system_backups_frame._load_backup_content() - self.backup_content_frame.user_backups_frame._load_backup_content() + if self.destination_path: + active_tab_index = self.backup_content_frame.current_view_index + self.backup_content_frame.show(self.destination_path, initial_tab_index=active_tab_index) elif message_type == 'error': self.animated_icon.stop("DISABLE") self.start_pause_button["text"] = "Start" diff --git a/pyimage_ui/backup_content_frame.py b/pyimage_ui/backup_content_frame.py index 9147863..051f0c2 100644 --- a/pyimage_ui/backup_content_frame.py +++ b/pyimage_ui/backup_content_frame.py @@ -163,7 +163,7 @@ class BackupContentFrame(ttk.Frame): self.user_backups_frame.show(backup_path, []) return - all_backups = self.backup_manager.list_all_backups(backup_path) + all_backups = self.backup_manager.list_all_backups(backup_path, mount_if_needed=True) if all_backups: system_backups, user_backups = all_backups self.system_backups_frame.show(backup_path, system_backups) diff --git a/pyimage_ui/navigation.py b/pyimage_ui/navigation.py index e23c4a5..cef4e04 100644 --- a/pyimage_ui/navigation.py +++ b/pyimage_ui/navigation.py @@ -271,7 +271,7 @@ class Navigation: def toggle_backup_content_frame(self, initial_tab_index=0): self._cancel_calculation() - self.app.drawing.update_nav_buttons(2) # Index 2 for Backup Content + self.app.drawing.update_nav_buttons(2) # Index 2 for Backup Content if not self.app.destination_path: MessageDialog(master=self.app, message_type="info", @@ -279,18 +279,6 @@ class Navigation: self.toggle_mode("backup", 0) return - # Mount the destination if it is encrypted and not already mounted - if self.app.backup_manager.encryption_manager.is_encrypted(self.app.destination_path): - if not self.app.backup_manager.encryption_manager.is_mounted(self.app.destination_path): - is_system = (initial_tab_index == 0) - mount_point = self.app.backup_manager.encryption_manager.prepare_encrypted_destination( - self.app.destination_path, is_system=is_system, source_size=0, queue=self.app.queue) - if not mount_point: - MessageDialog(master=self.app, message_type="error", - title=Msg.STR["error"], text=Msg.STR.get("err_mount_failed", "Mounting failed.")) - self.toggle_mode("backup", 0) - return - self.app.header_frame.refresh_status() self.app.canvas_frame.grid_remove()