From 73e6e42485b32d68c1145b8df1fe33f6f6f4b48d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A9sir=C3=A9=20Werner=20Menrath?= Date: Sun, 7 Sep 2025 15:58:28 +0200 Subject: [PATCH] Refactor: Encrypted backups to use direct LUKS Replaced the LVM-on-a-file implementation with a more robust, industry-standard LUKS-on-a-file approach. This change was motivated by persistent and hard-to-debug errors related to LVM state management and duplicate loop device detection during repeated mount/unmount cycles. The new implementation provides several key benefits: - **Robustness:** Eliminates the entire LVM layer, which was the root cause of the mount/unmount failures. - **Improved UX:** Drastically reduces the number of password prompts for encrypted user backups. By changing ownership of the mountpoint, rsync can run with user privileges. - **Enhanced Security:** The file transfer process (rsync) for user backups no longer runs with root privileges. - **Better Usability:** Encrypted containers are now left mounted during the application's lifecycle and are only unmounted on exit, improving workflow for consecutive operations. --- core/backup_manager.py | 912 +++--------------- core/encryption_manager.py | 421 +++----- .../01-September-2025_000000_system_full.txt | 0 pyimage_ui/backup_content_frame.py | 23 +- pyimage_ui/navigation.py | 15 +- 5 files changed, 315 insertions(+), 1056 deletions(-) delete mode 100644 pybackup/01-September-2025_000000_system_full.txt diff --git a/core/backup_manager.py b/core/backup_manager.py index 390e8a8..cef6b1d 100644 --- a/core/backup_manager.py +++ b/core/backup_manager.py @@ -33,54 +33,29 @@ class BackupManager: self.inhibit_cookie = None def _inhibit_screensaver(self): - """Prevents the screensaver and auto-suspend during a backup.""" if not shutil.which("gdbus"): - self.logger.log( - "gdbus command not found, cannot inhibit screensaver.") return - try: - self.logger.log( - "Attempting to inhibit screensaver and power management.") + self.logger.log("Attempting to inhibit screensaver and power management.") command = [ "gdbus", "call", "--session", "--dest", "org.freedesktop.ScreenSaver", "--object-path", "/org/freedesktop/ScreenSaver", "--method", "org.freedesktop.ScreenSaver.Inhibit", "Py-Backup", "Backup in progress" ] - result = subprocess.run( - command, capture_output=True, text=True, check=True) - # Output is like "(uint32 12345,)", we need to extract the number. + result = subprocess.run(command, capture_output=True, text=True, check=True) match = re.search(r'uint32\s+(\d+)', result.stdout) if match: self.inhibit_cookie = int(match.group(1)) - self.logger.log( - f"Successfully inhibited screensaver with cookie {self.inhibit_cookie}") - else: - self.logger.log( - f"Could not parse inhibit cookie from gdbus output: {result.stdout}") - except FileNotFoundError: - self.logger.log( - "gdbus command not found, cannot inhibit screensaver.") - except subprocess.CalledProcessError as e: - self.logger.log( - f"Failed to inhibit screensaver. D-Bus call failed: {e.stderr}") + self.logger.log(f"Successfully inhibited screensaver with cookie {self.inhibit_cookie}") except Exception as e: - self.logger.log( - f"An unexpected error occurred while inhibiting screensaver: {e}") + self.logger.log(f"An unexpected error occurred while inhibiting screensaver: {e}") def _uninhibit_screensaver(self): - """Releases the screensaver and auto-suspend lock.""" - if self.inhibit_cookie is None: - return - if not shutil.which("gdbus"): - self.logger.log( - "gdbus command not found, cannot uninhibit screensaver.") - return - + if self.inhibit_cookie is None: return + if not shutil.which("gdbus"): return try: - self.logger.log( - f"Attempting to uninhibit screensaver with cookie {self.inhibit_cookie}") + self.logger.log(f"Attempting to uninhibit screensaver with cookie {self.inhibit_cookie}") command = [ "gdbus", "call", "--session", "--dest", "org.freedesktop.ScreenSaver", "--object-path", "/org/freedesktop/ScreenSaver", @@ -100,17 +75,14 @@ class BackupManager: mount_point = None if is_encrypted: - # Calculate size in GB, add a 2GB buffer - size_in_gb = (source_size // (1024**3)) + 2 - self.logger.log( - f"Calculated container size: {size_in_gb} GB for source size {source_size} bytes.") - mount_point = self.encryption_manager.mount( - os.path.dirname(dest_path), queue, size_gb=size_in_gb) + base_dest_path = os.path.dirname(dest_path) + mount_point = self.encryption_manager.prepare_encrypted_destination( + base_dest_path, is_system, source_size, queue) + if not mount_point: - self.logger.log( - "Failed to mount encrypted destination. Aborting backup.") - queue.put( - ('completion', {'status': 'error', 'returncode': -1})) + self.logger.log("Failed to prepare encrypted destination. Aborting backup.") + queue.put(('completion', {'status': 'error', 'returncode': -1})) + self._uninhibit_screensaver() return None thread = threading.Thread(target=self._run_backup_path, args=( @@ -120,220 +92,67 @@ class BackupManager: return thread def _run_backup_path(self, queue, source_path: str, dest_path: str, is_system: bool, is_dry_run: bool, exclude_files: Optional[List[Path]], source_size: int, is_compressed: bool, is_encrypted: bool, mode: str, mount_point: Optional[str], use_trash_bin: bool, no_trash_bin: bool): + base_dest_path = os.path.dirname(dest_path) try: - base_dest_path = os.path.dirname(dest_path) pybackup_dir = os.path.join(base_dest_path, "pybackup") backup_name = os.path.basename(dest_path) user_source_name = None if not is_system: - # Extract source name from backup_name (e.g., 2025-09-06_10-00-00_user_MyDocs_full.txt -> MyDocs) - match = re.match( - r"^\d{2}-\d{2}-\d{4}_\d{2}:\d{2}:\d{2}_user_(.+?)_(full|incremental)(_encrypted)?$", backup_name) - if match: - user_source_name = match.group(1) - else: - self.logger.log( - f"Could not parse user source name from backup_name: {backup_name}") + match = re.match(r"^(\d{2}-\d{2}-\d{4}_\d{2}:\d{2}:\d{2})_user_(.+?)_(full|incremental)", backup_name) + if match: user_source_name = match.group(2) if is_encrypted: - if not mount_point: - self.logger.log( - "Critical error: Encrypted backup run without a mount point.") - queue.put( - ('completion', {'status': 'error', 'returncode': -1})) - return - if not is_system: - rsync_base_dest = os.path.join(mount_point, "user_encrypt") - else: - rsync_base_dest = mount_point + if not mount_point: raise Exception("Encrypted backup run without a mount point.") + rsync_base_dest = mount_point else: - if not is_system: - rsync_base_dest = os.path.join( - pybackup_dir, "user_backups") - else: - rsync_base_dest = pybackup_dir + rsync_base_dest = os.path.join(pybackup_dir, "user_backups") if not is_system else pybackup_dir + if not is_system: + if user_source_name: rsync_base_dest = os.path.join(rsync_base_dest, user_source_name) + rsync_dest = os.path.join(rsync_base_dest, backup_name) - # Ensure the base destination for rsync exists, especially for the first run if not os.path.exists(rsync_base_dest): - if is_system or is_encrypted: - self.logger.log( - f"Creating privileged base destination: {rsync_base_dest}") - self.encryption_manager._execute_as_root( - f"mkdir -p {rsync_base_dest}") - else: - self.logger.log( - f"Creating base destination: {rsync_base_dest}") + if not is_system: os.makedirs(rsync_base_dest, exist_ok=True) + else: + self.encryption_manager._execute_as_root(f"mkdir -p \"{rsync_base_dest}\"") - latest_backup_path = self._find_latest_backup( - rsync_base_dest, user_source_name) + latest_backup_path = self._find_latest_backup(rsync_base_dest, is_system) - # Determine actual mode for user backups - if not is_system and not latest_backup_path: - mode = "full" # If no previous backup, force full - elif not is_system and latest_backup_path: - mode = "incremental" # If previous backup exists, default to incremental + if not is_system and not latest_backup_path: mode = "full" + elif not is_system and latest_backup_path: mode = "incremental" - command = [] - if is_system or is_encrypted: - command.extend(['pkexec', 'rsync', '-aAXHvL']) - else: - command.extend(['rsync', '-avL']) - - if mode == "incremental" and latest_backup_path and not is_dry_run: - command.append(f"--link-dest={latest_backup_path}") + command = ['pkexec', 'rsync', '-aAXHvL'] if is_system else ['rsync', '-avL'] + if mode == "incremental" and latest_backup_path and not is_dry_run: command.append(f"--link-dest={latest_backup_path}") command.extend(['--info=progress2']) - if exclude_files: - command.extend([f"--exclude-from={f}" for f in exclude_files]) - if AppConfig.MANUAL_EXCLUDE_LIST_PATH.exists(): - command.append( - f"--exclude-from={AppConfig.MANUAL_EXCLUDE_LIST_PATH}") - if is_dry_run: - command.append('--dry-run') + if exclude_files: command.extend([f"--exclude-from={f}" for f in exclude_files]) + if AppConfig.MANUAL_EXCLUDE_LIST_PATH.exists(): command.append(f"--exclude-from={AppConfig.MANUAL_EXCLUDE_LIST_PATH}") + if is_dry_run: command.append('--dry-run') - # Handle trash bin / pure sync options for user backups if not is_system: trash_bin_path = os.path.join(rsync_base_dest, ".Trash") - if use_trash_bin: - command.extend( - ['--backup', f'--backup-dir={trash_bin_path}', '--delete']) - # Exclude the trash bin itself from the backup - command.append( - f"--exclude={os.path.basename(trash_bin_path)}/") - elif no_trash_bin: - command.append('--delete') - # Exclude the trash bin itself from the backup if it exists from previous use_trash_bin - command.append( - f"--exclude={os.path.basename(trash_bin_path)}/") + if use_trash_bin: command.extend(['--backup', f'--backup-dir={trash_bin_path}', '--delete']) + elif no_trash_bin: command.append('--delete') + if use_trash_bin or no_trash_bin: command.append(f"--exclude={os.path.basename(trash_bin_path)}/") command.extend([source_path, rsync_dest]) self.logger.log(f"Rsync command: {' '.join(command)}") - max_retries = 1 - for i in range(max_retries + 1): - transferred_size, total_size, stderr = self._execute_rsync( - queue, command) - return_code = self.process.returncode if self.process else -1 - - if is_encrypted and "No space left on device" in stderr and i < max_retries: - self.logger.log( - "Rsync failed due to lack of space. Attempting to dynamically resize container.") - try: - # Get necessary paths and names - container_path = os.path.join( - base_dest_path, "pybackup", "user_encrypt", "pybackup_lvm.img") - base_name = os.path.basename( - base_dest_path.rstrip('/')) - vg_name = f"pybackup_vg_{base_name}" - lv_name = "backup_lv" - mapper_name = f"pybackup_luks_{base_name}" - lv_path = f"/dev/{vg_name}/{lv_name}" - mapper_path = f"/dev/mapper/{mapper_name}" - - # Calculate new required size - current_container_size_bytes = os.path.getsize( - container_path) - needed_total_bytes = source_size * 1.1 # 10% buffer - - if needed_total_bytes <= current_container_size_bytes: - # If container is already big enough but full, just add a fixed amount. - gb_to_add = 5 - new_total_size_bytes = current_container_size_bytes + \ - (gb_to_add * (1024**3)) - else: - bytes_to_add = needed_total_bytes - current_container_size_bytes - gb_to_add = math.ceil(bytes_to_add / (1024**3)) - new_total_size_bytes = current_container_size_bytes + \ - (gb_to_add * (1024**3)) - - # Check physical drive space - physical_drive_stats = shutil.disk_usage( - base_dest_path) - if physical_drive_stats.free < (gb_to_add * (1024**3)): - self.logger.log( - f"Not enough physical space on device {base_dest_path} to expand container.") - queue.put( - ('error', "Nicht genügend physischer Speicherplatz für die Vergrößerung.")) - break - - self.logger.log( - f"Calculated expansion size: {gb_to_add} GB") - - resize_script = f''' - set -e - # Find loop device associated with the container - LOOP_DEVICE=$(losetup -j {container_path} | cut -d: -f1 | head -n 1) - if [ -z "$LOOP_DEVICE" ]; then - echo "Error: Could not find loop device for {container_path}" - exit 1 - fi - # 1. Extend backing file (truncate can extend) - truncate -s {int(new_total_size_bytes)} {container_path} - # 2. Resize loop device - losetup -c $LOOP_DEVICE - # 3. Resize PV - pvresize $LOOP_DEVICE - # 4. Extend LV to fill all new space in VG - lvextend -l +100%FREE {lv_path} - # 5. Resize filesystem - resize2fs {mapper_path} - ''' - if self.encryption_manager._execute_as_root(resize_script): - self.logger.log( - "Container resized successfully. Unmounting and remounting to apply changes.") - # Unmount to ensure kernel recognizes the new size - self .encryption_manager._unmount_encrypted_backup( - base_dest_path) - # Remount - new_mount_point = self.encryption_manager.mount( - base_dest_path, queue) - if not new_mount_point: - self.logger.log( - "Failed to remount after resize. Aborting.") - break - - mount_point = new_mount_point - self.logger.log( - "Remount successful. Retrying rsync...") - queue.put( - ('status_update', f"Container um {gb_to_add} GB vergrößert. Setze Backup fort...")) - continue - else: - self.logger.log("Failed to resize container.") - queue.put( - ('error', "Container-Vergrößerung fehlgeschlagen. Backup abgebrochen.")) - break - except Exception as e: - self.logger.log( - f"Error during dynamic resize calculation: {e}") - break - else: - # Break the loop if rsync was successful or retries are exhausted - break + transferred_size, total_size, stderr = self._execute_rsync(queue, command) + return_code = self.process.returncode if self.process else -1 if self.process: - status = 'error' - if return_code == 0: - status = 'success' - elif return_code in [23, 24]: - status = 'warning' - elif return_code in [143, -15, 15, -9]: - status = 'cancelled' - + status = 'success' if return_code == 0 else 'warning' if return_code in [23, 24] else 'cancelled' if return_code in [143, -15, 15, -9] else 'error' if status in ['success', 'warning'] and not is_dry_run: - info_filename_base = backup_name - final_size = transferred_size if ( - mode == 'incremental' and latest_backup_path) else (total_size or source_size) - self._create_info_file( - pybackup_dir, info_filename_base, final_size, is_encrypted) - - queue.put( - ('completion', {'status': status, 'returncode': return_code})) + final_size = transferred_size if (mode == 'incremental' and latest_backup_path) else (total_size or source_size) + self._create_info_file(pybackup_dir, backup_name, final_size, is_encrypted) + queue.put(('completion', {'status': status, 'returncode': return_code})) finally: + # The container is intentionally left mounted for user convenience. + # It will be unmounted when the application closes. self._uninhibit_screensaver() self.process = None @@ -346,14 +165,11 @@ class BackupManager: mounted_path = os.path.join(pybackup_dir, "encrypted") else: return [], [] - return self._list_all_backups_from_path(base_dest_path, mounted_path) def _list_all_backups_from_path(self, base_dest_path: str, mounted_path: Optional[str] = None): - system_backups = self._list_system_backups_from_path( - base_dest_path, mounted_path) - user_backups = self._list_user_backups_from_path( - base_dest_path, mounted_path) + system_backups = self._list_system_backups_from_path(base_dest_path, mounted_path) + user_backups = self._list_user_backups_from_path(base_dest_path, mounted_path) return system_backups, user_backups def list_system_backups(self, base_dest_path: str, mount_if_needed: bool = True) -> Optional[List[Dict[str, str]]]: @@ -362,65 +178,44 @@ class BackupManager: if is_encrypted: if not self.encryption_manager.is_mounted(base_dest_path): if mount_if_needed: - self.logger.log( - "Mounting needed for listing system backups.") - mounted_path = self.encryption_manager.mount( - base_dest_path) + mounted_path = self.encryption_manager.prepare_encrypted_destination(base_dest_path, is_system=True, source_size=0, queue=self.app.queue) else: return None - if self.encryption_manager.is_mounted(base_dest_path): pybackup_dir = os.path.join(base_dest_path, "pybackup") mounted_path = os.path.join(pybackup_dir, "encrypted") - return self._list_system_backups_from_path(base_dest_path, mounted_path) def _list_system_backups_from_path(self, base_dest_path: str, mounted_path: Optional[str] = None) -> List[Dict[str, str]]: - if os.path.basename(base_dest_path) == "pybackup": - pybackup_dir = base_dest_path - else: - pybackup_dir = os.path.join(base_dest_path, "pybackup") - if not os.path.isdir(pybackup_dir): - return [] + pybackup_dir = os.path.join(base_dest_path, "pybackup") + if not os.path.isdir(pybackup_dir): return [] all_backups = [] - name_regex = re.compile( - r"^(\d{2}-\d{2}-\d{4})_(\d{2}:\d{2}:\d{2})_system_(full|incremental)(\.tar\.gz)?(_encrypted)?\.txt$", re.IGNORECASE) + name_regex = re.compile(r"^(\d{2}-\d{2}-\d{4})_(\d{2}:\d{2}:\d{2})_system_(full|incremental)(\.tar\.gz)?(_encrypted)?\.txt$", re.IGNORECASE) for item in os.listdir(pybackup_dir): match = name_regex.match(item) - if not match: - continue + if not match: continue date_str, time_str, backup_type_base, comp_ext, enc_suffix = match.groups() is_encrypted = (enc_suffix is not None) is_compressed = (comp_ext is not None) backup_name = item.replace(".txt", "").replace("_encrypted", "") - if mounted_path: - full_path = os.path.join(mounted_path, backup_name) - else: - full_path = os.path.join(pybackup_dir, backup_name) + full_path = os.path.join(mounted_path or pybackup_dir, backup_name) backup_type = backup_type_base.capitalize() - if is_compressed: - backup_type += " (Compressed)" - if is_encrypted: - backup_type += " (Encrypted)" - backup_size = "N/A" - comment = "" + if is_compressed: backup_type += " (Compressed)" + if is_encrypted: backup_type += " (Encrypted)" + backup_size, comment = "N/A", "" info_file_path = os.path.join(pybackup_dir, item) if os.path.exists(info_file_path): try: with open(info_file_path, 'r') as f: for line in f: - if line.strip().lower().startswith("originalgröße:"): - backup_size = line.split( - ":", 1)[1].strip().split('(')[0].strip() - elif line.strip().lower().startswith("kommentar:"): - comment = line.split(":", 1)[1].strip() + if line.strip().lower().startswith("originalgröße:"): backup_size = line.split(":", 1)[1].strip().split('(')[0].strip() + elif line.strip().lower().startswith("kommentar:"): comment = line.split(":", 1)[1].strip() except Exception as e: - self.logger.log( - f"Could not read info file {info_file_path}: {e}") + self.logger.log(f"Could not read info file {info_file_path}: {e}") all_backups.append({ - "date": date_str, "time": time_str, "type": backup_type, - "size": backup_size, "folder_name": backup_name, "full_path": full_path, - "comment": comment, "is_compressed": is_compressed, "is_encrypted": is_encrypted, + "date": date_str, "time": time_str, "type": backup_type, "size": backup_size, + "folder_name": backup_name, "full_path": full_path, "comment": comment, + "is_compressed": is_compressed, "is_encrypted": is_encrypted, "backup_type_base": backup_type_base.capitalize(), "datetime": datetime.datetime.strptime(f"{date_str} {time_str}", '%d-%m-%Y %H:%M:%S') }) @@ -429,20 +224,14 @@ class BackupManager: current_group = [] for backup in all_backups: if backup['backup_type_base'] == 'Full': - if current_group: - grouped_backups.append(current_group) + if current_group: grouped_backups.append(current_group) current_group = [backup] else: - if not current_group: - current_group.append(backup) - else: - current_group.append(backup) - if current_group: - grouped_backups.append(current_group) + if not current_group: current_group.append(backup) + else: current_group.append(backup) + if current_group: grouped_backups.append(current_group) grouped_backups.sort(key=lambda g: g[0]['datetime'], reverse=True) - final_sorted_list = [ - item for group in grouped_backups for item in group] - return final_sorted_list + return [item for group in grouped_backups for item in group] def list_user_backups(self, base_dest_path: str, mount_if_needed: bool = True) -> Optional[List[Dict[str, str]]]: is_encrypted = self.encryption_manager.is_encrypted(base_dest_path) @@ -450,10 +239,7 @@ class BackupManager: if is_encrypted: if not self.encryption_manager.is_mounted(base_dest_path): if mount_if_needed: - self.logger.log( - "Mounting needed for listing user backups.") - mounted_path = self.encryption_manager.mount( - base_dest_path) + mounted_path = self.encryption_manager.prepare_encrypted_destination(base_dest_path, is_system=False, source_size=0, queue=self.app.queue) else: return None if self.encryption_manager.is_mounted(base_dest_path): @@ -462,116 +248,72 @@ class BackupManager: return self._list_user_backups_from_path(base_dest_path, mounted_path) def _list_user_backups_from_path(self, base_dest_path: str, mounted_path: Optional[str] = None) -> List[Dict[str, str]]: - if os.path.basename(base_dest_path) == "pybackup": - pybackup_dir = base_dest_path - else: - pybackup_dir = os.path.join(base_dest_path, "pybackup") - if not os.path.isdir(pybackup_dir): - return [] + pybackup_dir = os.path.join(base_dest_path, "pybackup") + if not os.path.isdir(pybackup_dir): return [] user_backups = [] - name_regex = re.compile( - r"^(\d{2}-\d{2}-\d{4})_(\d{2}:\d{2}:\d{2})_user_(.+?)_(full|incremental)(_encrypted)?\.txt$", re.IGNORECASE) + name_regex = re.compile(r"^(\d{2}-\d{2}-\d{4})_(\d{2}:\d{2}:\d{2})_user_(.+?)_(full|incremental)(_encrypted)?\.txt$", re.IGNORECASE) for item in os.listdir(pybackup_dir): match = name_regex.match(item) - if not match: - continue + if not match: continue date_str, time_str, source_name, backup_type_base, enc_suffix = match.groups() is_encrypted = (enc_suffix is not None) - is_compressed = False # User backups are not compressed in this context backup_name = item.replace(".txt", "").replace("_encrypted", "") if mounted_path: - user_backup_dir = os.path.join(mounted_path, "user_encrypt") + user_backup_dir = os.path.join(mounted_path, source_name) full_path = os.path.join(user_backup_dir, backup_name) else: - user_backups_dir = os.path.join(pybackup_dir, "user_backups") + user_backups_dir = os.path.join(pybackup_dir, "user_backups", source_name) full_path = os.path.join(user_backups_dir, backup_name) backup_type = backup_type_base.capitalize() - if is_compressed: - backup_type += " (Compressed)" - if is_encrypted: - backup_type += " (Encrypted)" - - backup_size = "N/A" - comment = "" + if is_encrypted: backup_type += " (Encrypted)" + backup_size, comment = "N/A", "" info_file_path = os.path.join(pybackup_dir, item) if os.path.exists(info_file_path): try: with open(info_file_path, 'r') as f: for line in f: - if line.strip().lower().startswith("originalgröße:"): - backup_size = line.split( - ":", 1)[1].strip().split('(')[0].strip() - elif line.strip().lower().startswith("kommentar:"): - comment = line.split(":", 1)[1].strip() + if line.strip().lower().startswith("originalgröße:"): backup_size = line.split(":", 1)[1].strip().split('(')[0].strip() + elif line.strip().lower().startswith("kommentar:"): comment = line.split(":", 1)[1].strip() except Exception as e: - self.logger.log( - f"Could not read info file {info_file_path}: {e}") + self.logger.log(f"Could not read info file {info_file_path}: {e}") user_backups.append({ - "date": date_str, "time": time_str, "type": backup_type, - "size": backup_size, "folder_name": backup_name, "full_path": full_path, - "comment": comment, "is_encrypted": is_encrypted, "source": source_name, - "is_compressed": is_compressed, "backup_type_base": backup_type_base.capitalize(), + "date": date_str, "time": time_str, "type": backup_type, "size": backup_size, + "folder_name": backup_name, "full_path": full_path, "comment": comment, + "is_encrypted": is_encrypted, "source": source_name, "is_compressed": False, + "backup_type_base": backup_type_base.capitalize(), "datetime": datetime.datetime.strptime(f"{date_str} {time_str}", '%d-%m-%Y %H:%M:%S') }) user_backups.sort(key=lambda x: x['datetime'], reverse=True) return user_backups - def has_encrypted_backups(self, base_backup_path: str) -> bool: - return self.encryption_manager.is_encrypted(base_backup_path) - - def _find_latest_backup(self, base_backup_path: str, source_name: Optional[str] = None) -> Optional[str]: - """Finds the most recent backup directory in a given path, optionally filtered by source name.""" - self.logger.log( - f"Searching for latest backup in: {base_backup_path} for source: {source_name or 'All'}") - + def _find_latest_backup(self, base_backup_path: str, is_system: bool) -> Optional[str]: + self.logger.log(f"Searching for latest backup in: {base_backup_path}") backup_names = [] if os.path.isdir(base_backup_path): for item in os.listdir(base_backup_path): - # Only consider directories that match the expected backup name pattern - # and optionally filter by source_name if os.path.isdir(os.path.join(base_backup_path, item)): - if source_name: - if f"_user_{source_name}_" in item: - backup_names.append(item) + if is_system: + if "_system_" in item: backup_names.append(item) else: - # For system backups or if no source_name is provided, include all - # Simple check to exclude other user backups if source_name is None - if "_system_" in item or "_user_" not in item: - backup_names.append(item) - - # Sort by date and time (assuming format YYYY-MM-DD_HH-MM-SS or similar at the beginning) - # This is a simplified sort, a more robust one would parse datetime objects + backup_names.append(item) backup_names.sort(reverse=True) - - if not backup_names: - self.logger.log("No previous backups found to link against.") - return None - - latest_backup_name = backup_names[0] - latest_backup_path = os.path.join(base_backup_path, latest_backup_name) - + if not backup_names: return None + latest_backup_path = os.path.join(base_backup_path, backup_names[0]) if os.path.isdir(latest_backup_path): - self.logger.log( - f"Found latest backup for --link-dest: {latest_backup_path}") + self.logger.log(f"Found latest backup for --link-dest: {latest_backup_path}") return latest_backup_path - - self.logger.log( - f"Latest backup entry '{latest_backup_name}' was not a directory. No link will be used.") return None def _create_info_file(self, pybackup_dir: str, backup_name: str, source_size: int, is_encrypted: bool): try: - info_filename = f"{backup_name}_encrypted.txt" if is_encrypted else f"{backup_name}.txt" + info_filename = f"{backup_name}{'_encrypted' if is_encrypted else ''}.txt" info_file_path = os.path.join(pybackup_dir, info_filename) - - original_bytes = source_size if source_size > 0: - power = 1024 - n = 0 + power, n = 1024, 0 power_labels = {0: 'B', 1: 'KB', 2: 'MB', 3: 'GB', 4: 'TB'} - display_size = original_bytes + display_size = source_size while display_size >= power and n < len(power_labels) - 1: display_size /= power n += 1 @@ -579,194 +321,63 @@ class BackupManager: else: size_str = "0 B" date_str = datetime.datetime.now().strftime("%d-%m-%Y %H:%M:%S") - - info_content = ( - f"Backup-Datum: {date_str}\n" - f"Originalgröße: {size_str} ({original_bytes} Bytes)\n" - ) - - self.logger.log( - f"Attempting to write info file to {info_file_path} as current user.") - with open(info_file_path, 'w') as f: - f.write(info_content) - self.logger.log( - f"Successfully created metadata file: {info_file_path}") - + info_content = (f"Backup-Datum: {date_str}\n" f"Originalgröße: {size_str} ({source_size} Bytes)\n") + with open(info_file_path, 'w') as f: f.write(info_content) + self.logger.log(f"Successfully created metadata file: {info_file_path}") except Exception as e: - self.logger.log( - f"Failed to create metadata file. Please check permissions for {pybackup_dir}. Error: {e}") + self.logger.log(f"Failed to create metadata file for {pybackup_dir}. Error: {e}") def _execute_rsync(self, queue, command: List[str]): - transferred_size = 0 - total_size = 0 - stderr_output = "" + transferred_size, total_size, stderr_output = 0, 0, "" try: - try: - env = os.environ.copy() - env['LC_ALL'] = 'C' - self.process = subprocess.Popen( - command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, preexec_fn=os.setsid, env=env) - except FileNotFoundError: - self.logger.log( - "Error: 'pkexec' or 'rsync' command not found in PATH during Popen call.") - queue.put(('error', None)) - return 0, 0, "" - except Exception as e: - self.logger.log( - f"Error starting rsync process with Popen: {e}") - queue.put(('error', None)) - return 0, 0, "" - - if self.process is None: - self.logger.log( - "Error: subprocess.Popen returned None for rsync process (after exception handling).") - queue.put(('error', None)) - return 0, 0, "" - - progress_regex = re.compile(r'\s*(\d+)%\s+') - output_lines = [] - + env = os.environ.copy() + env['LC_ALL'] = 'C' + self.process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, preexec_fn=os.setsid, env=env) + if self.process.stdout: - full_stdout = [] for line in iter(self.process.stdout.readline, ''): stripped_line = line.strip() - self.logger.log(f"Rsync stdout line: {stripped_line}") - full_stdout.append(stripped_line) - - match = progress_regex.search(stripped_line) - if match: - percentage = int(match.group(1)) - queue.put(('progress', percentage)) - else: - if stripped_line and not stripped_line.startswith(('sending incremental file list', 'sent', 'total size')): - queue.put(('file_update', stripped_line)) + self.logger.log(f"Rsync stdout: {stripped_line}") + if '%' in stripped_line: + match = re.search(r'\s*(\d+)%\s+', stripped_line) + if match: queue.put(('progress', int(match.group(1)))) + elif stripped_line and not stripped_line.startswith(('sending', 'sent', 'total')): + queue.put(('file_update', stripped_line)) self.process.wait() if self.process.stderr: stderr_output = self.process.stderr.read() - if stderr_output: - self.logger.log(f"Rsync Stderr: {stderr_output.strip()}") - full_stdout.extend(stderr_output.strip().split('\n')) - - output_lines = full_stdout - - transferred_size = 0 - total_size = 0 - summary_regex = re.compile( - r"sent ([\d,. ]+) bytes\s+received ([\d,. ]+) bytes") - total_size_regex = re.compile(r"total size is ([\d,. ]+) speedup") - - for line in reversed(output_lines): - match = summary_regex.search(line) - if match and transferred_size == 0: - try: - sent_str = match.group(1).replace( - ',', '').replace('.', '') - received_str = match.group(2).replace( - ',', '').replace('.', '') - bytes_sent = int(sent_str) - bytes_received = int(received_str) - transferred_size = bytes_sent + bytes_received - self.logger.log( - f"Detected total bytes transferred from summary: {transferred_size} bytes") - except (ValueError, IndexError) as e: - self.logger.log( - f"Could not parse sent/received bytes from line: '{line}'. Error: {e}") - - total_match = total_size_regex.search(line) - if total_match and total_size == 0: - try: - total_size_str = total_match.group( - 1).replace(',', '').replace('.', '') - total_size = int(total_size_str) - self.logger.log( - f"Detected total size from summary: {total_size} bytes") - except (ValueError, IndexError) as e: - self.logger.log( - f"Could not parse total size from line: '{line}'. Error: {e}") - - self.logger.log( - f"_execute_rsync final parsed values: transferred_size={transferred_size}, total_size={total_size}") - - if transferred_size == 0: - bytes_sent = 0 - bytes_received = 0 - for line in output_lines: - if line.strip().startswith('Total bytes sent:'): - try: - size_str = line.split(':')[1].strip() - bytes_sent = int(size_str.replace( - ',', '').replace('.', '')) - except (ValueError, IndexError): - self.logger.log( - f"Could not parse bytes sent from line: {line}") - elif line.strip().startswith('Total bytes received:'): - try: - size_str = line.split(':')[1].strip() - bytes_received = int( - size_str.replace(',', '').replace('.', '')) - except (ValueError, IndexError): - self.logger.log( - f"Could not parse bytes received from line: {line}") - - if bytes_sent > 0 or bytes_received > 0: - transferred_size = bytes_sent + bytes_received - self.logger.log( - f"Detected total bytes transferred from --stats: {transferred_size} bytes") - else: - self.logger.log( - "Could not determine transferred size from rsync output. Size will be 0.") + if stderr_output: self.logger.log(f"Rsync Stderr: {stderr_output.strip()}") except FileNotFoundError: - self.logger.log( - "Error: 'rsync' command not found. Please ensure it is installed and in your PATH.") + self.logger.log(f"Error: '{command[0]}' not found.") queue.put(('error', None)) except Exception as e: - self.logger.log(f"An unexpected error occurred: {e}") + self.logger.log(f"Rsync execution error: {e}") queue.put(('error', None)) - return transferred_size, total_size, stderr_output def start_restore(self, source_path: str, dest_path: str, is_compressed: bool): - """Starts a restore process in a separate thread.""" - try: - queue = self.app.queue - except AttributeError: - self.logger.log( - "Could not get queue from app instance. Restore progress will not be reported.") - from queue import Queue - queue = Queue() - - thread = threading.Thread(target=self._run_restore, args=( - queue, source_path, dest_path, is_compressed)) + from queue import Queue + queue = self.app.queue if hasattr(self.app, 'queue') else Queue() + thread = threading.Thread(target=self._run_restore, args=(queue, source_path, dest_path, is_compressed)) thread.daemon = True thread.start() def _run_restore(self, queue, source_path: str, dest_path: str, is_compressed: bool): - """Executes the restore logic for a system backup.""" self.logger.log(f"Starting restore from {source_path} to {dest_path}") status = 'error' try: - if is_compressed: - script_content = f"tar -xzf '{source_path}' -C '{dest_path}'" - else: - source = source_path.rstrip('/') + '/' - script_content = f"rsync -aAXHv '{source}' '{dest_path}'" - + source = source_path.rstrip('/') + '/' + script_content = f"tar -xzf '{source_path}' -C '{dest_path}'" if is_compressed else f"rsync -aAXHv '{source}' '{dest_path}'" if self.encryption_manager._execute_as_root(script_content): - self.logger.log("Restore script executed successfully.") status = 'success' else: self.logger.log("Restore script failed.") - status = 'error' - except Exception as e: - self.logger.log( - f"An unexpected error occurred during restore: {e}") - status = 'error' + self.logger.log(f"An unexpected error occurred during restore: {e}") finally: - queue.put( - ('completion', {'status': status, 'returncode': 0 if status == 'success' else 1})) + queue.put(('completion', {'status': status, 'returncode': 0 if status == 'success' else 1})) def get_scheduled_jobs(self) -> List[Dict[str, Any]]: jobs_list = [] @@ -775,298 +386,69 @@ class BackupManager: for job in user_cron: if self.app_tag in job.comment: details = self._parse_job_comment(job.comment) - if details: - jobs_list.append({ - "id": job.comment, - "active": job.is_enabled(), - "type": details.get("type", "N/A"), - "frequency": details.get("freq", "N/A"), - "destination": details.get("dest", "N/A"), - "sources": details.get("sources", []), - "command": job.command - }) + if details: jobs_list.append(details) except Exception as e: self.logger.log(f"Error loading cron jobs: {e}") return jobs_list - def add_scheduled_job(self, job_details: Dict[str, Any]): - try: - user_cron = CronTab(user=True) - job = user_cron.new( - command=job_details["command"], comment=job_details["comment"]) - - if job_details["frequency"] == "daily": - job.day.every(1) - elif job_details["frequency"] == "weekly": - job.dow.every(1) - elif job_details["frequency"] == "monthly": - job.dom.every(1) - - job.enable() - user_cron.write() - self.logger.log( - f"Job successfully added: {job_details['comment']}") - except Exception as e: - self.logger.log(f"Error adding cron job: {e}") - - def remove_scheduled_job(self, job_id: str): - try: - user_cron = CronTab(user=True) - user_cron.remove_all(comment=job_id) - user_cron.write() - self.logger.log(f"Job successfully removed: {job_id}") - except Exception as e: - self.logger.log(f"Error removing cron job: {e}") - - def _parse_job_comment(self, comment: str) -> Dict[str, Any]: - details = {} - parts = comment.split("; ") - for part in parts: - if ":" in part: - key, value = part.split(":", 1) - if key.strip() == "sources": - details[key.strip()] = [s.strip() - for s in value.split(",")] - else: - details[key.strip()] = value.strip() - return details - - def get_comment(self, info_file_path: str) -> str: - """Reads an info file and returns the comment, if it exists.""" - if not os.path.exists(info_file_path): - return "" - try: - with open(info_file_path, 'r') as f: - for line in f: - if line.strip().lower().startswith("kommentar:"): - return line.split(":", 1)[1].strip() - except Exception as e: - self.logger.log( - f"Error reading comment from {info_file_path}: {e}") - return "" - - def update_comment(self, info_file_path: str, new_comment: str): - """Updates the comment in a given info file.""" - try: - lines = [] - comment_found = False - if os.path.exists(info_file_path): - with open(info_file_path, 'r') as f: - lines = f.readlines() - - new_lines = [] - for line in lines: - if line.strip().lower().startswith("kommentar:"): - if new_comment: - new_lines.append(f"Kommentar: {new_comment}\n") - comment_found = True - else: - new_lines.append(line) - - if not comment_found and new_comment: - new_lines.append(f"Kommentar: {new_comment}\n") - - with open(info_file_path, 'w') as f: - f.writelines(new_lines) - self.logger.log( - f"Successfully updated comment in {info_file_path}") - - except Exception as e: - self.logger.log(f"Error updating comment in {info_file_path}: {e}") - - def test_pkexec_rsync(self, source_path: str, dest_path: str): - self.logger.log(f"Testing pkexec rsync command...") - command = ['pkexec', 'rsync', '-aAXHv', source_path, dest_path] - try: - result = subprocess.run( - command, capture_output=True, text=True, check=False) - self.logger.log(f"pkexec rsync return code: {result.returncode}") - self.logger.log(f"pkexec rsync stdout: {result.stdout.strip()}") - self.logger.log(f"pkexec rsync stderr: {result.stderr.strip()}") - except FileNotFoundError: - self.logger.log("Error: 'pkexec' or 'rsync' command not found.") - except Exception as e: - self.logger.log( - f"An unexpected error occurred during pkexec rsync test: {e}") - - def cancel_and_delete_privileged_backup(self, delete_path: str): - """Cancels a running system backup and deletes the target directory in one atomic pkexec call.""" - if not self.process or self.process.poll() is not None: - self.logger.log("No active backup process to cancel.") - return - - self.logger.log( - "Attempting to cancel backup and delete directory with root privileges...") - try: - pgid = os.getpgid(self.process.pid) - - script_parts = [ - f"echo 'Attempting to terminate process group {pgid}'", - f"kill -SIGTERM -- -{pgid} || echo 'Process group {pgid} not found or already terminated.'", - f"echo 'Attempting to delete directory {delete_path}'", - f'if [ -n "{delete_path}" ] && [ "{delete_path}" != "/" ]; then', - f' rm -rf "{delete_path}"', - f'fi' - ] - script_content = "\n".join(script_parts) - - if self.encryption_manager._execute_as_root(script_content): - self.logger.log( - "Backup cancellation and deletion script succeeded.") - else: - self.logger.log( - "Backup cancellation and deletion script failed.") - - except ProcessLookupError: - self.logger.log("Backup process already terminated before action.") - self.delete_privileged_path(delete_path) - except Exception as e: - self.logger.log( - f"An error occurred during privileged cancel and delete: {e}") - - def delete_privileged_path(self, path: str): - """Deletes a given path using root privileges.""" - self.logger.log(f"Requesting privileged deletion of: {path}") - if not path or path == "/": - self.logger.log("Invalid path for deletion provided.") - return - - script_content = f'rm -rf "{path}"' - if self.encryption_manager._execute_as_root(script_content): - self.logger.log(f"Successfully deleted path: {path}") - else: - self.logger.log(f"Failed to delete path: {path}") - def start_delete_backup(self, path_to_delete: str, info_file_path: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str] = None): - """Starts a threaded backup deletion.""" thread = threading.Thread(target=self._run_delete, args=( path_to_delete, info_file_path, is_encrypted, is_system, base_dest_path, queue, password)) thread.daemon = True thread.start() def _run_delete(self, path_to_delete: str, info_file_path: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str]): - """Runs the deletion and puts a message on the queue when done.""" try: if is_encrypted: - self.logger.log( - f"Starting encrypted deletion for {path_to_delete}") - if not password: - self.logger.log( - "Password not provided for encrypted deletion.") - queue.put(('deletion_complete', False)) - return - - mount_point = self.encryption_manager.mount( - base_dest_path, queue) + self.logger.log(f"Starting encrypted deletion for {path_to_delete}") + mount_point = self.encryption_manager.prepare_encrypted_destination( + base_dest_path, is_system, source_size=0, queue=queue) if not mount_point: self.logger.log("Failed to unlock container for deletion.") queue.put(('deletion_complete', False)) return - self.logger.log( - f"Container unlocked. Deleting {path_to_delete} and {info_file_path}") - script_content = f""" -rm -rf '{path_to_delete}' -rm -f '{info_file_path}' -""" - success = self.encryption_manager._execute_as_root( - script_content) + internal_path_to_delete = os.path.join(mount_point, os.path.basename(path_to_delete)) + script_content = f"rm -rf '{internal_path_to_delete}'\nrm -f '{info_file_path}'" + success = self.encryption_manager._execute_as_root(script_content) if success: self.logger.log("Encrypted backup deleted successfully.") queue.put(('deletion_complete', True)) else: - self.logger.log( - "Failed to delete files within encrypted container.") + self.logger.log("Failed to delete files within encrypted container.") queue.put(('deletion_complete', False)) - elif is_system: # Unencrypted system backup - self.logger.log( - f"Starting unencrypted system deletion for {path_to_delete}") - script_content = f""" -rm -rf '{path_to_delete}' -rm -f '{info_file_path}' -""" + elif is_system: + script_content = f"rm -rf '{path_to_delete}'\nrm -f '{info_file_path}'" if self.encryption_manager._execute_as_root(script_content): - self.logger.log( - f"Successfully deleted {path_to_delete} and {info_file_path}") + self.logger.log(f"Successfully deleted {path_to_delete} and {info_file_path}") queue.put(('deletion_complete', True)) else: self.logger.log(f"Failed to delete {path_to_delete}") queue.put(('deletion_complete', False)) - - else: # Unencrypted user backup - self.logger.log( - f"Starting unencrypted user deletion for {path_to_delete}") + else: try: - if os.path.isdir(path_to_delete): - shutil.rmtree(path_to_delete) - if os.path.exists(info_file_path): - os.remove(info_file_path) - self.logger.log( - f"Successfully deleted {path_to_delete} and {info_file_path}") + if os.path.isdir(path_to_delete): shutil.rmtree(path_to_delete) + if os.path.exists(info_file_path): os.remove(info_file_path) + self.logger.log(f"Successfully deleted {path_to_delete} and {info_file_path}") queue.put(('deletion_complete', True)) except Exception as e: - self.logger.log( - f"Failed to delete unencrypted user backup {path_to_delete}: {e}") + self.logger.log(f"Failed to delete unencrypted user backup {path_to_delete}: {e}") queue.put(('deletion_complete', False)) - except Exception as e: self.logger.log(f"Error during threaded deletion: {e}") queue.put(('deletion_complete', False)) - def _compress_and_cleanup(self, dest_path: str, is_system: bool) -> bool: - """Compresses the backup directory and cleans up the original.""" - self.logger.log(f"Starting compression for: {dest_path}") - parent_dir = os.path.dirname(dest_path) - archive_name = os.path.basename(dest_path) + ".tar.gz" - archive_path = os.path.join(parent_dir, archive_name) - - tar_command = f"tar -czf '{archive_path}' -C '{parent_dir}' '{os.path.basename(dest_path)}'" - rm_command = f"rm -rf '{dest_path}'" - - script_content = f""" -#!/bin/bash -set -e - -{tar_command} - echo \"tar command finished with exit code $?.\" - -{rm_command} - echo \"rm command finished with exit code $?.\" -""" - - if is_system or is_encrypted: - self.logger.log("Executing compression and cleanup as root.") - if self.encryption_manager._execute_as_root(script_content): - self.logger.log( - "Compression and cleanup script executed successfully.") - return True - else: - self.logger.log("Compression and cleanup script failed.") - return False - else: - try: - self.logger.log(f"Executing local command: {tar_command}") - tar_result = subprocess.run( - tar_command, shell=True, capture_output=True, text=True, check=True) - self.logger.log( - f"tar command successful. Output: {tar_result.stdout}") - - self.logger.log(f"Executing local command: {rm_command}") - rm_result = subprocess.run( - rm_command, shell=True, capture_output=True, text=True, check=True) - self.logger.log( - f"rm command successful. Output: {rm_result.stdout}") - - return True - except subprocess.CalledProcessError as e: - self.logger.log( - f"A command failed during local compression/cleanup. Return code: {e.returncode}") - self.logger.log(f"Stdout: {e.stdout}") - self.logger.log(f"Stderr: {e.stderr}") - return False - except Exception as e: - self.logger.log( - f"An unexpected error occurred during local compression/cleanup: {e}") - return False + def cancel_and_delete_privileged_backup(self, delete_path: str): + if not self.process or self.process.poll() is not None: return + self.logger.log("Attempting to cancel backup and delete directory with root privileges...") + try: + pgid = os.getpgid(self.process.pid) + script_content = f""" + kill -SIGTERM -- -{pgid} || echo 'Process group not found or already terminated.' + if [ -n \"{delete_path}\" ] && [ \"{delete_path}\" != \"/\" ]; then rm -rf \"{delete_path}\"; fi + """ + self.encryption_manager._execute_as_root(script_content) + except Exception as e: + self.logger.log(f"An error occurred during privileged cancel and delete: {e}") diff --git a/core/encryption_manager.py b/core/encryption_manager.py index a33e06a..af868da 100644 --- a/core/encryption_manager.py +++ b/core/encryption_manager.py @@ -7,6 +7,7 @@ import subprocess import tempfile import stat import re +import math from typing import Optional, List from core.pbp_app_config import AppConfig @@ -22,32 +23,17 @@ class EncryptionManager: self.logger = logger self.app = app self.service_id = "py-backup-encryption" - self.session_password = None self.mounted_destinations = set() - self.auth_method = None - self.is_mounting = False - - def get_key_file_path(self, base_dest_path: str) -> str: - """Generates the standard path for the key file for a given destination.""" - key_filename = f"keyfile_{os.path.basename(base_dest_path.rstrip('/'))}.key" - return os.path.join(AppConfig.CONFIG_DIR, key_filename) def get_password_from_keyring(self, username: str) -> Optional[str]: try: return keyring.get_password(self.service_id, username) - except keyring.errors.InitError as e: - self.logger.log(f"Could not initialize keyring: {e}") - return None except Exception as e: self.logger.log(f"Could not get password from keyring: {e}") return None def is_key_in_keyring(self, username: str) -> bool: - try: - return self.get_password_from_keyring(username) is not None - except Exception as e: - self.logger.log(f"Could not check password in keyring: {e}") - return False + return self.get_password_from_keyring(username) is not None def set_password_in_keyring(self, username: str, password: str) -> bool: try: @@ -59,288 +45,195 @@ class EncryptionManager: return False def get_password(self, username: str, confirm: bool = True) -> Optional[str]: - if self.session_password: - return self.session_password - password = self.get_password_from_keyring(username) if password: - self.session_password = password return password - - dialog = PasswordDialog( - self.app, title=f"Enter password for {username}", confirm=confirm) + dialog = PasswordDialog(self.app, title=f"Enter password for {username}", confirm=confirm) password, save_to_keyring = dialog.get_password() if password and save_to_keyring: self.set_password_in_keyring(username, password) - - if password: - self.session_password = password - return password - def is_encrypted(self, base_dest_path: str) -> bool: - if os.path.basename(base_dest_path) == "pybackup": - pybackup_dir = base_dest_path - else: - pybackup_dir = os.path.join(base_dest_path, "pybackup") + def get_container_path(self, base_dest_path: str) -> str: + pybackup_dir = os.path.join(base_dest_path, "pybackup") user_encrypt_dir = os.path.join(pybackup_dir, "user_encrypt") - container_path = os.path.join(user_encrypt_dir, "pybackup_lvm.img") - return os.path.exists(container_path) + return os.path.join(user_encrypt_dir, "pybackup_luks.img") + + def is_encrypted(self, base_dest_path: str) -> bool: + return os.path.exists(self.get_container_path(base_dest_path)) def is_mounted(self, base_dest_path: str) -> bool: - if os.path.basename(base_dest_path) == "pybackup": - pybackup_dir = base_dest_path - else: - pybackup_dir = os.path.join(base_dest_path, "pybackup") + pybackup_dir = os.path.join(base_dest_path, "pybackup") mount_point = os.path.join(pybackup_dir, "encrypted") return os.path.ismount(mount_point) - def mount(self, base_dest_path: str, queue=None, size_gb: int = 0) -> Optional[str]: - if self.is_mounting: - self.logger.log("Mount process already in progress. Aborting new request.") - return None - - self.is_mounting = True - try: - if self.is_mounted(base_dest_path): - self.mounted_destinations.add(base_dest_path) - pybackup_dir = os.path.join(base_dest_path, "pybackup") - return os.path.join(pybackup_dir, "encrypted") - - username = os.path.basename(base_dest_path.rstrip('/')) - - # Use a dummy queue if none is provided - if queue is None: - from queue import Queue - queue = Queue() - - # 1. Try keyring - password = self.get_password_from_keyring(username) - if password: - self.logger.log("Found password in keyring. Attempting to mount.") - mount_point = self._setup_encrypted_backup(queue, base_dest_path, size_gb, password=password) - if mount_point: - self.auth_method = "keyring" - self.mounted_destinations.add(base_dest_path) - return mount_point - else: - # If mounting with keyring key fails, stop here and report error. - self.logger.log("Mounting with keyring password failed. Aborting mount attempt.") - return None - - # 2. Try key file - key_file_path = self.get_key_file_path(base_dest_path) - if os.path.exists(key_file_path): - self.logger.log(f"Found key file at {key_file_path}. Attempting to mount.") - mount_point = self._setup_encrypted_backup(queue, base_dest_path, size_gb, key_file=key_file_path) - if mount_point: - self.auth_method = "keyfile" - self.mounted_destinations.add(base_dest_path) - return mount_point - - # 3. Prompt for password - self.logger.log("No password in keyring or key file found. Prompting user.") - password = self.get_password(username, confirm=False) - if not password: - self.logger.log("No password provided, cannot mount container.") - self.auth_method = None - return None - - mount_point = self._setup_encrypted_backup(queue, base_dest_path, size_gb, password=password) - if mount_point: - self.auth_method = "password" - self.mounted_destinations.add(base_dest_path) - return mount_point - finally: - self.is_mounting = False - - def unmount(self, base_dest_path: str): - if base_dest_path in self.mounted_destinations: - self._unmount_encrypted_backup(base_dest_path) - self.mounted_destinations.remove(base_dest_path) - - def unmount_all(self): - self.logger.log(f"Unmounting all mounted backup destinations: {self.mounted_destinations}") - # Create a copy for safe iteration - for path in list(self.mounted_destinations): - self.unmount(path) - - def _unmount_encrypted_backup(self, base_dest_path: str): - """ Gently unmounts the container without destroying LVM structures. """ - pybackup_dir = os.path.join(base_dest_path, "pybackup") - user_encrypt_dir = os.path.join(pybackup_dir, "user_encrypt") - mount_point = os.path.join(pybackup_dir, "encrypted") - container_path = os.path.join(user_encrypt_dir, "pybackup_lvm.img") - base_name = os.path.basename(base_dest_path.rstrip('/')) - vg_name = f"pybackup_vg_{base_name}" - mapper_name = f"pybackup_luks_{base_name}" - self.logger.log(f"Unmounting encrypted LVM backup for {base_dest_path}") - find_loop_device_cmd = f"losetup -j {container_path} | cut -d: -f1" - script = f""" - LOOP_DEVICE=$({find_loop_device_cmd} | head -n 1) - if mountpoint -q {mount_point}; then - umount {mount_point} || echo "Umount failed, continuing..." - fi - if [ -e /dev/mapper/{mapper_name} ]; then - cryptsetup luksClose {mapper_name} || echo "luksClose failed, continuing..." - fi - if vgdisplay {vg_name} >/dev/null 2>&1; then - vgchange -an {vg_name} || echo "Could not deactivate volume group {vg_name}." - fi - if [ -n "$LOOP_DEVICE" ] && losetup $LOOP_DEVICE >/dev/null 2>&1; then - losetup -d $LOOP_DEVICE || echo "Could not detach loop device $LOOP_DEVICE." - fi - """ - if not self._execute_as_root(script): - self.logger.log("Encrypted LVM backup unmount script failed.") - - def _setup_encrypted_backup(self, queue, base_dest_path: str, size_gb: int, password: Optional[str] = None, key_file: Optional[str] = None) -> Optional[str]: - self.logger.log(f"Setting up LVM-based encrypted container at {base_dest_path}") - - for tool in ["cryptsetup", "losetup", "pvcreate", "vgcreate", "lvcreate", "lvextend", "resize2fs"]: - if not shutil.which(tool): - self.logger.log(f"Error: Required tool '{tool}' is not installed.") - queue.put(('error', f"Required tool '{tool}' is not installed.")) - return None - - pybackup_dir = os.path.join(base_dest_path, "pybackup") - user_encrypt_dir = os.path.join(pybackup_dir, "user_encrypt") - encrypted_dir = os.path.join(pybackup_dir, "encrypted") - container_path = os.path.join(user_encrypt_dir, "pybackup_lvm.img") - mount_point = encrypted_dir - - base_name = os.path.basename(base_dest_path.rstrip('/')) - vg_name = f"pybackup_vg_{base_name}" - lv_name = "backup_lv" - mapper_name = f"pybackup_luks_{base_name}" - lv_path = f"/dev/{vg_name}/{lv_name}" - - if not password and not key_file: - self.logger.log("No password or key file provided for encryption.") - queue.put(('error', "No password or key file provided for encryption.")) - return None - - if os.path.ismount(mount_point): - self.logger.log(f"Mount point {mount_point} already in use. Assuming it's correctly mounted.") - return mount_point - + def prepare_encrypted_destination(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]: + container_path = self.get_container_path(base_dest_path) if os.path.exists(container_path): - auth_part = f"echo -n '{password}' | cryptsetup luksOpen {lv_path} {mapper_name} -" if password else f"cryptsetup luksOpen {lv_path} {mapper_name} --key-file {key_file}" - script = f""" - mkdir -p {user_encrypt_dir} - LOOP_DEVICE=$(losetup -f --show {container_path}) - pvscan --cache - vgchange -ay {vg_name} - {auth_part} - mount /dev/mapper/{mapper_name} {mount_point} - echo "LOOP_DEVICE_PATH=$LOOP_DEVICE" - """ - if not self._execute_as_root(script): - self.logger.log("Failed to unlock existing LVM container.") - self._destroy_encrypted_structures(base_dest_path) - return None - self.logger.log(f"Encrypted LVM container unlocked and mounted at {mount_point}") - return mount_point + return self._handle_existing_container(base_dest_path, is_system, source_size, queue) else: - format_auth_part = f"echo -n '{password}' | cryptsetup luksFormat {lv_path} -" if password else f"cryptsetup luksFormat {lv_path} --key-file {key_file}" - open_auth_part = f"echo -n '{password}' | cryptsetup luksOpen {lv_path} {mapper_name} -" if password else f"cryptsetup luksOpen {lv_path} {mapper_name} --key-file {key_file}" - script = f""" - mkdir -p {encrypted_dir} - mkdir -p {user_encrypt_dir} - fallocate -l {size_gb}G {container_path} - LOOP_DEVICE=$(losetup -f --show {container_path}) - pvcreate $LOOP_DEVICE - vgcreate {vg_name} $LOOP_DEVICE - lvcreate -n {lv_name} -l 100%FREE {vg_name} - {format_auth_part} - {open_auth_part} - mkfs.ext4 /dev/mapper/{mapper_name} - mount /dev/mapper/{mapper_name} {mount_point} - echo "LOOP_DEVICE_PATH=$LOOP_DEVICE" - """ - if not self._execute_as_root(script): - self.logger.log("Failed to create and setup LVM-based encrypted container.") - self._destroy_encrypted_structures(base_dest_path) - if os.path.exists(container_path): - self._execute_as_root(f"rm -f {container_path}") - queue.put(('error', "Failed to setup LVM-based encrypted container.")) - return None - self.logger.log(f"Encrypted LVM container is ready and mounted at {mount_point}") - return mount_point + return self._handle_new_container(base_dest_path, is_system, source_size, queue) - def _destroy_encrypted_structures(self, base_dest_path: str): - pybackup_dir = os.path.join(base_dest_path, "pybackup") - user_encrypt_dir = os.path.join(pybackup_dir, "user_encrypt") - mount_point = os.path.join(pybackup_dir, "encrypted") - container_path = os.path.join(user_encrypt_dir, "pybackup_lvm.img") - base_name = os.path.basename(base_dest_path.rstrip('/')) - vg_name = f"pybackup_vg_{base_name}" - mapper_name = f"pybackup_luks_{base_name}" - self.logger.log(f"Cleaning up encrypted LVM backup for {base_dest_path}") - find_loop_device_cmd = f"losetup -j {container_path} | cut -d: -f1" - lv_name = "backup_lv" - lv_path = f"/dev/{vg_name}/{lv_name}" + def _handle_existing_container(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]: + self.logger.log("Handling existing container.") + mount_point = os.path.join(os.path.dirname(self.get_container_path(base_dest_path)), "..", "encrypted") + + if not self._open_and_mount(base_dest_path, is_system): + self.logger.log("Failed to mount container for size check.") + return None + + free_space = shutil.disk_usage(mount_point).free + required_space = int(source_size * 1.15) + + if required_space > free_space: + self.logger.log(f"Resize needed. Free: {free_space}, Required: {required_space}") + queue.put(('status_update', "Container zu klein. Vergrößere...")) + + current_total = shutil.disk_usage(mount_point).total + needed_additional = required_space - free_space + new_total_size = current_total + needed_additional + + self.unmount_and_reset_owner(base_dest_path) + + if not self._resize_container(base_dest_path, int(new_total_size)): + self.logger.log("Failed to resize container.") + return None + + if not self._open_and_mount(base_dest_path, is_system): + self.logger.log("Failed to remount after resize.") + return None + + self.mounted_destinations.add(base_dest_path) + return mount_point + + def _handle_new_container(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]: + self.logger.log("Handling new container creation.") + size_gb = math.ceil((source_size * 1.2) / (1024**3)) + 5 + + username = os.path.basename(base_dest_path.rstrip('/')) + password = self.get_password(username, confirm=True) + if not password: return None + + container_path = self.get_container_path(base_dest_path) + mount_point = os.path.join(os.path.dirname(container_path), "..", "encrypted") + mapper_name = f"pybackup_luks_{username}" + chown_cmd = self._get_chown_command(mount_point, is_system) script = f""" - set -x # Log executed commands - LOOP_DEVICE=$({find_loop_device_cmd} | head -n 1) - if mountpoint -q {mount_point}; then - umount {mount_point} || echo "Umount failed, continuing cleanup..." - fi - if [ -e /dev/mapper/{mapper_name} ]; then - cryptsetup luksClose {mapper_name} || echo "luksClose failed, continuing cleanup..." - fi - # Deactivate and remove all LVM structures associated with the VG - if vgdisplay {vg_name} >/dev/null 2>&1; then - lvchange -an {lv_path} >/dev/null 2>&1 || echo "lvchange failed, continuing..." - vgchange -an {vg_name} || echo "vgchange -an failed, continuing cleanup..." - lvremove -f {vg_name} || echo "lvremove failed, continuing cleanup..." - vgremove -f {vg_name} || echo "vgremove failed, continuing cleanup..." - fi - if [ -n "$LOOP_DEVICE" ] && losetup $LOOP_DEVICE >/dev/null 2>&1; then - pvremove -f $LOOP_DEVICE || echo "pvremove failed, continuing cleanup..." - losetup -d $LOOP_DEVICE || echo "losetup -d failed, continuing cleanup..." - fi - """ - if not self._execute_as_root(script): - self.logger.log("Encrypted LVM backup cleanup script failed.") + mkdir -p \"{os.path.dirname(container_path)}\"\n mkdir -p \"{mount_point}\"\n truncate -s {int(size_gb)}G \"{container_path}\"\n echo -n \"$LUKSPASS\" | cryptsetup luksFormat \"{container_path}\" - + echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} - + mkfs.ext4 \"/dev/mapper/{mapper_name}\" + mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"\n {chown_cmd}\n """ + if not self._execute_as_root(script, password): + return None + + self.mounted_destinations.add(base_dest_path) + return mount_point - def _execute_as_root(self, script_content: str) -> bool: + def _open_and_mount(self, base_dest_path: str, is_system: bool) -> bool: + username = os.path.basename(base_dest_path.rstrip('/')) + password = self.get_password(username, confirm=False) + if not password: return False + + container_path = self.get_container_path(base_dest_path) + mount_point = os.path.join(os.path.dirname(container_path), "..", "encrypted") + mapper_name = f"pybackup_luks_{username}" + chown_cmd = self._get_chown_command(mount_point, is_system) + + script = f""" + umount -l \"{mount_point}\" || true + cryptsetup luksClose {mapper_name} || true + mkdir -p \"{mount_point}\"\n echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} - + mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"\n {chown_cmd}\n """ + return self._execute_as_root(script, password) + + def _resize_container(self, base_dest_path: str, new_size_bytes: int) -> bool: + username = os.path.basename(base_dest_path.rstrip('/')) + password = self.get_password(username, confirm=False) + if not password: return False + + container_path = self.get_container_path(base_dest_path) + mapper_name = f"pybackup_luks_{username}" + + script = f""" + truncate -s {new_size_bytes} \"{container_path}\"\n echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} - + e2fsck -fy \"/dev/mapper/{mapper_name}\"\n resize2fs \"/dev/mapper/{mapper_name}\"\n cryptsetup luksClose {mapper_name} + """ + return self._execute_as_root(script, password) + + def unmount_and_reset_owner(self, base_dest_path: str, force_unmap=False): + mapper_name = f"pybackup_luks_{os.path.basename(base_dest_path.rstrip('/'))}" + if not os.path.exists(f"/dev/mapper/{mapper_name}") and not self.is_mounted(base_dest_path): + if not force_unmap: return + + self.logger.log(f"Unmounting and resetting owner for {base_dest_path}") + container_path = self.get_container_path(base_dest_path) + mount_point = os.path.join(os.path.dirname(container_path), "..", "encrypted") + + script = f""" + chown root:root \"{mount_point}\" || true + umount -l \"{mount_point}\" || true + cryptsetup luksClose {mapper_name} || true + """ + self._execute_as_root(script) + + if base_dest_path in self.mounted_destinations: + self.mounted_destinations.remove(base_dest_path) + + def unmount_all(self): + self.logger.log(f"Unmounting all: {self.mounted_destinations}") + for path in list(self.mounted_destinations): + self.unmount_and_reset_owner(path, force_unmap=True) + + def _get_chown_command(self, mount_point: str, is_system: bool) -> str: + if not is_system: + try: + uid = os.getuid() + gid = os.getgid() + return f"chown {uid}:{gid} \"{mount_point}\"" + except Exception as e: + self.logger.log(f"Could not get current user UID/GID for chown: {e}") + return "" + + def _execute_as_root(self, script_content: str, password_for_stdin: Optional[str] = None) -> bool: script_path = '' try: + if password_for_stdin: + escaped_password = password_for_stdin.replace("'", "'\\''") + script_content = f"LUKSPASS='{escaped_password}'\n{script_content}" + with tempfile.NamedTemporaryFile(mode='w', delete=False, prefix="pybackup_script_", suffix=".sh", dir="/tmp") as tmp_script: - tmp_script.write("#!/bin/bash\n\n") - tmp_script.write("set -e\n\n") - tmp_script.write(script_content) + tmp_script.write("#!/bin/bash\n" + script_content) script_path = tmp_script.name - os.chmod(script_path, stat.S_IRWXU | stat.S_IRGRP | - stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) - + os.chmod(script_path, stat.S_IRWXU) command = ['pkexec', script_path] - sanitized_script_content = re.sub( - r"echo -n \'.*?\'", "echo -n \'[REDACTED]\'", script_content) - self.logger.log( - f"Executing privileged command via script: {script_path}") - self.logger.log( - f"Script content:\n---\n{sanitized_script_content}\n---") + log_lines = [] + for line in script_content.split('\n'): + if "LUKSPASS=" in line: + log_lines.append("LUKSPASS='[REDACTED]'") + else: + log_lines.append(line) + sanitized_script_content = "\n".join(log_lines) - result = subprocess.run( - command, capture_output=True, text=True, check=False) + self.logger.log(f"Executing privileged command via script: {script_path}") + self.logger.log(f"Script content:\n---\n{sanitized_script_content}\n---") + + result = subprocess.run(command, capture_output=True, text=True, check=False) if result.returncode == 0: - self.logger.log( - f"Privileged script executed successfully. Output:\n{result.stdout}") + self.logger.log(f"Privileged script executed successfully. Output:\n{result.stdout}") return True else: - self.logger.log( - f"Privileged script failed. Return code: {result.returncode}\nStderr: {result.stderr}\nStdout: {result.stdout}") + self.logger.log(f"Privileged script failed. Return code: {result.returncode}\nStderr: {result.stderr}\nStdout: {result.stdout}") return False except Exception as e: - self.logger.log( - f"Failed to set up or execute privileged command: {e}") + self.logger.log(f"Failed to set up or execute privileged command: {e}") return False finally: if script_path and os.path.exists(script_path): - os.remove(script_path) \ No newline at end of file + try: + os.remove(script_path) + except OSError as e: + self.logger.log(f"Error removing temp script {script_path}: {e}") \ No newline at end of file diff --git a/pybackup/01-September-2025_000000_system_full.txt b/pybackup/01-September-2025_000000_system_full.txt deleted file mode 100644 index e69de29..0000000 diff --git a/pyimage_ui/backup_content_frame.py b/pyimage_ui/backup_content_frame.py index e0260b0..9147863 100644 --- a/pyimage_ui/backup_content_frame.py +++ b/pyimage_ui/backup_content_frame.py @@ -141,7 +141,7 @@ class BackupContentFrame(ttk.Frame): button.configure(style="Gray.Toolbutton") self.nav_progress_bars[i].pack_forget() - def show(self, backup_path): + def show(self, backup_path, initial_tab_index=0): app_logger.log( f"BackupContentFrame: show called with path {backup_path}") self.grid(row=2, column=0, sticky="nsew") @@ -151,21 +151,7 @@ class BackupContentFrame(ttk.Frame): # Check if the destination is encrypted and trigger mount if necessary is_encrypted = self.backup_manager.encryption_manager.is_encrypted( backup_path) - if is_encrypted and not self.backup_manager.encryption_manager.is_mounted(backup_path): - app_logger.log( - "Encrypted destination is not mounted. Attempting to mount.") - mount_point = self.backup_manager.encryption_manager.mount( - backup_path) - if not mount_point: - app_logger.log("Mount failed. Cannot display backup content.") - MessageDialog( - message_type="error", title=Msg.STR["error"], text=Msg.STR["err_unlock_failed"]) - # Clear views and return if mount fails - self.system_backups_frame.show(backup_path, []) - self.user_backups_frame.show(backup_path, []) - return - # Refresh header status after successful mount - self.app.header_frame.refresh_status() + self.viewing_encrypted = is_encrypted # Set this flag for remembering the view pybackup_dir = os.path.join(backup_path, "pybackup") @@ -187,9 +173,8 @@ class BackupContentFrame(ttk.Frame): self.system_backups_frame.show(backup_path, []) self.user_backups_frame.show(backup_path, []) - config_key = "last_encrypted_backup_content_view" if self.viewing_encrypted else "last_backup_content_view" - last_view = self.app.config_manager.get_setting(config_key, 0) - self._switch_view(last_view) + # Use the passed index to switch to the correct view + self._switch_view(initial_tab_index) def hide(self): self.grid_remove() diff --git a/pyimage_ui/navigation.py b/pyimage_ui/navigation.py index c1676f8..e23c4a5 100644 --- a/pyimage_ui/navigation.py +++ b/pyimage_ui/navigation.py @@ -269,10 +269,9 @@ class Navigation: self.app.top_bar.grid() self._update_task_bar_visibility("settings") - def toggle_backup_content_frame(self, active_index=None): + def toggle_backup_content_frame(self, initial_tab_index=0): self._cancel_calculation() - if active_index is not None: - self.app.drawing.update_nav_buttons(active_index) + self.app.drawing.update_nav_buttons(2) # Index 2 for Backup Content if not self.app.destination_path: MessageDialog(master=self.app, message_type="info", @@ -283,7 +282,9 @@ class Navigation: # Mount the destination if it is encrypted and not already mounted if self.app.backup_manager.encryption_manager.is_encrypted(self.app.destination_path): if not self.app.backup_manager.encryption_manager.is_mounted(self.app.destination_path): - mount_point = self.app.backup_manager.encryption_manager.mount(self.app.destination_path) + is_system = (initial_tab_index == 0) + mount_point = self.app.backup_manager.encryption_manager.prepare_encrypted_destination( + self.app.destination_path, is_system=is_system, source_size=0, queue=self.app.queue) if not mount_point: MessageDialog(master=self.app, message_type="error", title=Msg.STR["error"], text=Msg.STR.get("err_mount_failed", "Mounting failed.")) @@ -301,8 +302,6 @@ class Navigation: self.app.target_size_frame.grid_remove() self.app.restore_size_frame_before.grid_remove() self.app.restore_size_frame_after.grid_remove() - self.app.backup_content_frame.show(self.app.destination_path) + self.app.backup_content_frame.show(self.app.destination_path, initial_tab_index) self.app.top_bar.grid() - self._update_task_bar_visibility("scheduler") - - + self._update_task_bar_visibility("scheduler") \ No newline at end of file