From 798134dd20b7bacf200c3a4ac10e3fc5e3f48c35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A9sir=C3=A9=20Werner=20Menrath?= Date: Mon, 8 Sep 2025 21:02:03 +0200 Subject: [PATCH] new method to detect backup --- core/backup_manager.py | 859 +++++++++++++++++------- core/encryption_manager.py | 25 +- core/pbp_app_config.py | 4 +- main_app.py | 3 + pyimage_ui/actions.py | 118 ++-- pyimage_ui/user_backup_content_frame.py | 28 +- 6 files changed, 689 insertions(+), 348 deletions(-) diff --git a/core/backup_manager.py b/core/backup_manager.py index c52eafa..deb570a 100644 --- a/core/backup_manager.py +++ b/core/backup_manager.py @@ -6,6 +6,7 @@ import signal import datetime import math import shutil +import json from typing import Optional, List, Dict, Any from pathlib import Path from crontab import CronTab @@ -36,26 +37,33 @@ class BackupManager: if not shutil.which("gdbus"): return try: - self.logger.log("Attempting to inhibit screensaver and power management.") + self.logger.log( + "Attempting to inhibit screensaver and power management.") command = [ "gdbus", "call", "--session", "--dest", "org.freedesktop.ScreenSaver", "--object-path", "/org/freedesktop/ScreenSaver", "--method", "org.freedesktop.ScreenSaver.Inhibit", "Py-Backup", "Backup in progress" ] - result = subprocess.run(command, capture_output=True, text=True, check=True) + result = subprocess.run( + command, capture_output=True, text=True, check=True) match = re.search(r'uint32\s+(\d+)', result.stdout) if match: self.inhibit_cookie = int(match.group(1)) - self.logger.log(f"Successfully inhibited screensaver with cookie {self.inhibit_cookie}") + self.logger.log( + f"Successfully inhibited screensaver with cookie {self.inhibit_cookie}") except Exception as e: - self.logger.log(f"An unexpected error occurred while inhibiting screensaver: {e}") + self.logger.log( + f"An unexpected error occurred while inhibiting screensaver: {e}") def _uninhibit_screensaver(self): - if self.inhibit_cookie is None: return - if not shutil.which("gdbus"): return + if self.inhibit_cookie is None: + return + if not shutil.which("gdbus"): + return try: - self.logger.log(f"Attempting to uninhibit screensaver with cookie {self.inhibit_cookie}") + self.logger.log( + f"Attempting to uninhibit screensaver with cookie {self.inhibit_cookie}") command = [ "gdbus", "call", "--session", "--dest", "org.freedesktop.ScreenSaver", "--object-path", "/org/freedesktop/ScreenSaver", @@ -69,93 +77,210 @@ class BackupManager: finally: self.inhibit_cookie = None - def start_backup(self, queue, source_path: str, dest_path: str, is_system: bool, is_dry_run: bool = False, exclude_files: Optional[List[Path]] = None, source_size: int = 0, is_compressed: bool = False, is_encrypted: bool = False, mode: str = "incremental", use_trash_bin: bool = False, no_trash_bin: bool = False): + def check_for_full_backup(self, dest_path: str, source_name: str, is_encrypted: bool) -> bool: + """Checks if a full backup already exists for a given source in the new flat structure.""" + self.logger.log( + f"Checking for existing full backup for source '{source_name}' in '{dest_path}' (Encrypted: {is_encrypted})") + + pybackup_dir = os.path.join(dest_path, "pybackup") + scan_dir = self.encryption_manager.get_mount_point( + dest_path) if is_encrypted else pybackup_dir + + if not scan_dir or not os.path.isdir(scan_dir): + self.logger.log( + f"Scan directory '{scan_dir}' does not exist. No full backup found.") + return False + + enc_suffix = "enc" if is_encrypted else "plain" + # Pattern matches: 20250908-133000_system_full_plain + pattern = re.compile( + rf"\d{{8}}-\d{{6}}_{re.escape(source_name)}_full_{enc_suffix}") + + for dirname in os.listdir(scan_dir): + if pattern.match(dirname): + self.logger.log( + f"Found existing full backup directory: {dirname}") + return True + + self.logger.log( + f"No existing full backup found for source '{source_name}'.") + return False + + def _find_latest_backup(self, rsync_base_dir: str, is_system: bool, source_name: str) -> Optional[str]: + self.logger.log( + f"Searching for latest full backup for source '{source_name}' in: {rsync_base_dir}") + full_backups = [] + if os.path.isdir(rsync_base_dir): + # Pattern matches any full backup for the given source name, encrypted or not + pattern = re.compile( + rf"^(\d{{8}}-\d{{6}})_{re.escape(source_name)}_full_(plain|enc)$") + for item in os.listdir(rsync_base_dir): + item_path = os.path.join(rsync_base_dir, item) + if os.path.isdir(item_path) and pattern.match(item): + full_backups.append(item) + + if not full_backups: + self.logger.log("No full backups found.") + return None + + full_backups.sort(reverse=True) + latest_backup_dir = full_backups[0] + latest_backup_path = os.path.join(rsync_base_dir, latest_backup_dir) + + self.logger.log( + f"Found latest full backup for --link-dest: {latest_backup_path}") + return latest_backup_path + + def start_backup(self, queue, source_path: str, dest_path: str, is_system: bool, source_name: str, is_dry_run: bool = False, exclude_files: Optional[List[Path]] = None, source_size: int = 0, is_compressed: bool = False, is_encrypted: bool = False, mode: str = "incremental", use_trash_bin: bool = False, no_trash_bin: bool = False): self.is_system_process = is_system self._inhibit_screensaver() mount_point = None if is_encrypted: - base_dest_path = os.path.dirname(dest_path) mount_point = self.encryption_manager.prepare_encrypted_destination( - base_dest_path, is_system, source_size, queue) - + dest_path, is_system, source_size, queue) + if not mount_point: - self.logger.log("Failed to prepare encrypted destination. Aborting backup.") - queue.put(('completion', {'status': 'error', 'returncode': -1})) + self.logger.log( + "Failed to prepare encrypted destination. Aborting backup.") + queue.put( + ('completion', {'status': 'error', 'returncode': -1})) self._uninhibit_screensaver() return None thread = threading.Thread(target=self._run_backup_path, args=( - queue, source_path, dest_path, is_system, is_dry_run, exclude_files, source_size, is_compressed, is_encrypted, mode, mount_point, use_trash_bin, no_trash_bin)) + queue, source_path, dest_path, is_system, source_name, is_dry_run, exclude_files, source_size, is_compressed, is_encrypted, mode, mount_point, use_trash_bin, no_trash_bin)) thread.daemon = True thread.start() return thread - def _run_backup_path(self, queue, source_path: str, dest_path: str, is_system: bool, is_dry_run: bool, exclude_files: Optional[List[Path]], source_size: int, is_compressed: bool, is_encrypted: bool, mode: str, mount_point: Optional[str], use_trash_bin: bool, no_trash_bin: bool): - base_dest_path = os.path.dirname(dest_path) + def _run_backup_path(self, queue, source_path: str, dest_path: str, is_system: bool, source_name: str, is_dry_run: bool, exclude_files: Optional[List[Path]], source_size: int, is_compressed: bool, is_encrypted: bool, mode: str, mount_point: Optional[str], use_trash_bin: bool, no_trash_bin: bool): + base_dest_path = dest_path # The user-selected destination path + rsync_dest = None # Initialize to None + try: pybackup_dir = os.path.join(base_dest_path, "pybackup") - backup_name = os.path.basename(dest_path) - user_source_name = None - if not is_system: - match = re.match(r"^(\d{2}-\d{2}-\d{4}_\d{2}:\d{2}:\d{2})_user_(.+?)_(full|incremental)", backup_name) - if match: user_source_name = match.group(2) + if not os.path.isdir(pybackup_dir): + os.makedirs(pybackup_dir, exist_ok=True) - if is_encrypted: - if not mount_point: raise Exception("Encrypted backup run without a mount point.") - rsync_base_dest = mount_point + rsync_base_dir = mount_point if is_encrypted else pybackup_dir + + latest_full_backup_path = self._find_latest_backup( + rsync_base_dir, is_system, source_name) + + if mode == "incremental" and not latest_full_backup_path: + self.logger.log( + f"Mode is incremental, but no full backup found for source '{source_name}'. Forcing full backup.") + mode = "full" + + now = datetime.datetime.now() + timestamp = now.strftime("%Y%m%d-%H%M%S") + encryption_suffix = "enc" if is_encrypted else "plain" + backup_dir_name = f"{timestamp}_{source_name}_{mode}_{encryption_suffix}" + rsync_dest = os.path.join(rsync_base_dir, backup_dir_name) + + # Send the determined path back to the main thread via the queue + queue.put(('current_path', rsync_dest)) + + # --- Rsync command construction --- + rsync_command_parts = [ + 'rsync', '-aAXHv'] if is_system else ['rsync', '-aL'] + if mode == "incremental" and latest_full_backup_path and not is_dry_run: + rsync_command_parts.append( + f"--link-dest='{latest_full_backup_path}'") + + rsync_command_parts.extend(['--info=progress2']) + if exclude_files: + rsync_command_parts.extend( + [f"--exclude-from='{f}'" for f in exclude_files]) + if AppConfig.MANUAL_EXCLUDE_LIST_PATH.exists(): + rsync_command_parts.append( + f"--exclude-from='{AppConfig.MANUAL_EXCLUDE_LIST_PATH}'") + if is_dry_run: + rsync_command_parts.append('--dry-run') + + if not is_system: + trash_bin_path = os.path.join(rsync_base_dir, ".Trash") + if use_trash_bin: + rsync_command_parts.extend( + ['--backup', f'--backup-dir=\'{trash_bin_path}\'', '--delete']) + elif no_trash_bin: + rsync_command_parts.append('--delete') + if use_trash_bin or no_trash_bin: + rsync_command_parts.append( + f"--exclude='{os.path.basename(trash_bin_path)}/'") + + rsync_command_parts.extend([f"'{source_path}'", f"'{rsync_dest}'"]) + + if is_system: + # Restore the working single-password solution + rsync_cmd_str = ' '.join(rsync_command_parts) + # Important: Use single quotes around paths to handle spaces, and use -p with mkdir. + full_system_cmd = f"mkdir -p '{rsync_dest}' && {rsync_cmd_str}" + command = ['pkexec', 'bash', '-c', full_system_cmd] else: - rsync_base_dest = os.path.join(pybackup_dir, "user_backups") if not is_system else pybackup_dir + os.makedirs(rsync_dest, exist_ok=True) + command = rsync_command_parts - if not is_system: - if user_source_name: rsync_base_dest = os.path.join(rsync_base_dest, user_source_name) - - rsync_dest = os.path.join(rsync_base_dest, backup_name) + self.logger.log(f"Executing command: {' '.join(command)}") - if not os.path.exists(rsync_base_dest): - if not is_system: - os.makedirs(rsync_base_dest, exist_ok=True) - else: - self.encryption_manager._execute_as_root(f"mkdir -p \"{rsync_base_dest}\"") - - latest_backup_path = self._find_latest_backup(rsync_base_dest, is_system) - - if not is_system and not latest_backup_path: mode = "full" - elif not is_system and latest_backup_path: mode = "incremental" - - command = ['pkexec', 'rsync', '-aAXHvL'] if is_system else ['rsync', '-avL'] - if mode == "incremental" and latest_backup_path and not is_dry_run: command.append(f"--link-dest={latest_backup_path}") - - command.extend(['--info=progress2']) - if exclude_files: command.extend([f"--exclude-from={f}" for f in exclude_files]) - if AppConfig.MANUAL_EXCLUDE_LIST_PATH.exists(): command.append(f"--exclude-from={AppConfig.MANUAL_EXCLUDE_LIST_PATH}") - if is_dry_run: command.append('--dry-run') - - if not is_system: - trash_bin_path = os.path.join(rsync_base_dest, ".Trash") - if use_trash_bin: command.extend(['--backup', f'--backup-dir={trash_bin_path}', '--delete']) - elif no_trash_bin: command.append('--delete') - if use_trash_bin or no_trash_bin: command.append(f"--exclude={os.path.basename(trash_bin_path)}/") - - command.extend([source_path, rsync_dest]) - self.logger.log(f"Rsync command: {' '.join(command)}") - - transferred_size, total_size, stderr = self._execute_rsync(queue, command) + transferred_size, total_size, stderr = self._execute_rsync( + queue, command) return_code = self.process.returncode if self.process else -1 if self.process: - status = 'success' if return_code == 0 else 'warning' if return_code in [23, 24] else 'cancelled' if return_code in [143, -15, 15, -9] else 'error' + status = 'success' if return_code == 0 else 'warning' if return_code in [ + 23, 24] else 'cancelled' if return_code in [143, -15, 15, -9] else 'error' if status in ['success', 'warning'] and not is_dry_run: - final_size = transferred_size if (mode == 'incremental' and latest_backup_path) else (total_size or source_size) - self._create_info_file(pybackup_dir, backup_name, final_size, is_encrypted) - queue.put(('completion', {'status': status, 'returncode': return_code})) + # After a successful backup, get the true size of the destination directory + final_size = self._get_directory_size(rsync_dest) + self._create_info_json( + base_dest_path=base_dest_path, + backup_dir_name=backup_dir_name, + source_name=source_name, + backup_type="system" if is_system else "user", + mode=mode, + size_bytes=final_size, + is_encrypted=is_encrypted, + based_on=os.path.basename( + latest_full_backup_path) if latest_full_backup_path and mode == 'incremental' else None + ) + queue.put( + ('completion', {'status': status, 'returncode': return_code})) + + except Exception as e: + self.logger.log(f"Exception in _run_backup_path: {e}") + queue.put(('completion', {'status': 'error', 'returncode': -1})) finally: - # The container is intentionally left mounted for user convenience. - # It will be unmounted when the application closes. self._uninhibit_screensaver() self.process = None + def _get_directory_size(self, path: str) -> int: + """Calculates the total disk space used by a directory using `du`. """ + if not os.path.isdir(path): + return 0 + try: + # Use `du -sb` to get the real disk usage in bytes, correctly handling hard links. + result = subprocess.run(["du", "-sb", path], capture_output=True, text=True, check=True) + # Output is like "12345\t/path/to/dir", so we split and take the first part. + size_in_bytes = int(result.stdout.split()[0]) + return size_in_bytes + except (subprocess.CalledProcessError, FileNotFoundError, ValueError, IndexError) as e: + self.logger.log(f"Could not calculate directory size for {path} using du: {e}") + # Fallback to a simpler, less accurate method if du fails + total_size = 0 + try: + for dirpath, dirnames, filenames in os.walk(path): + for f in filenames: + fp = os.path.join(dirpath, f) + if not os.path.islink(fp): + total_size += os.path.getsize(fp) + return total_size + except Exception as fallback_e: + self.logger.log(f"Fallback size calculation also failed for {path}: {fallback_e}") + return 0 + def _prepare_and_get_mounted_path(self, base_dest_path: str, is_system: bool, mount_if_needed: bool) -> Optional[str]: if not self.encryption_manager.is_encrypted(base_dest_path): return None @@ -170,185 +295,218 @@ class BackupManager: if self.encryption_manager.is_mounted(base_dest_path): pybackup_dir = os.path.join(base_dest_path, "pybackup") return os.path.join(pybackup_dir, "encrypted") - + return None def list_all_backups(self, base_dest_path: str, mount_if_needed: bool = True): - mounted_path = self._prepare_and_get_mounted_path(base_dest_path, is_system=False, mount_if_needed=mount_if_needed) - return self._list_all_backups_from_path(base_dest_path, mounted_path) + is_encrypted_dest = self.encryption_manager.is_encrypted( + base_dest_path) + scan_dir = self.encryption_manager.get_mount_point( + base_dest_path) if is_encrypted_dest else os.path.join(base_dest_path, "pybackup") - def _list_all_backups_from_path(self, base_dest_path: str, mounted_path: Optional[str] = None): - system_backups = self._list_system_backups_from_path(base_dest_path, mounted_path) - user_backups = self._list_user_backups_from_path(base_dest_path, mounted_path) - return system_backups, user_backups + if not scan_dir or not os.path.isdir(scan_dir): + # Try to mount if it wasn't already + if is_encrypted_dest and mount_if_needed: + if not self.encryption_manager.prepare_encrypted_destination(base_dest_path, is_system=False, source_size=0, queue=self.app.queue): + return [], [] # Mount failed + scan_dir = self.encryption_manager.get_mount_point( + base_dest_path) + if not scan_dir or not os.path.isdir(scan_dir): + return [], [] + else: + return [], [] - def list_system_backups(self, base_dest_path: str, mount_if_needed: bool = True) -> Optional[List[Dict[str, str]]]: - mounted_path = self._prepare_and_get_mounted_path(base_dest_path, is_system=True, mount_if_needed=mount_if_needed) - if self.encryption_manager.is_encrypted(base_dest_path) and not mounted_path: - return None - return self._list_system_backups_from_path(base_dest_path, mounted_path) - - def _list_backups(self, base_dest_path: str, mounted_path: Optional[str], name_regex: re.Pattern, backup_type_prefix: str) -> List[Dict[str, str]]: - pybackup_dir = os.path.join(base_dest_path, "pybackup") - if not os.path.isdir(pybackup_dir): - return [] - - is_system = backup_type_prefix == "system" + metadata_dir = os.path.join(base_dest_path, "pybackup", "metadata") + if not os.path.isdir(metadata_dir): + return [], [] all_backups = [] - for item in os.listdir(pybackup_dir): - match = name_regex.match(item) - if not match: + for backup_dir_name in os.listdir(scan_dir): + backup_dir_path = os.path.join(scan_dir, backup_dir_name) + if not os.path.isdir(backup_dir_path): continue - groups = match.groups() - date_str, time_str = groups[0], groups[1] - - if is_system: - backup_type_base, comp_ext, enc_suffix = groups[2], groups[3], groups[4] - source_name = None - is_compressed = comp_ext is not None - else: - source_name, backup_type_base, enc_suffix = groups[2], groups[3], groups[4] - is_compressed = False - - is_encrypted = enc_suffix is not None - backup_name = item.replace(".txt", "").replace("_encrypted", "") - - if mounted_path: - if is_system: - full_path = os.path.join(mounted_path, backup_name) - else: - user_backup_dir = os.path.join(mounted_path, source_name) - full_path = os.path.join(user_backup_dir, backup_name) - else: - if is_system: - full_path = os.path.join(pybackup_dir, backup_name) - else: - user_backups_dir = os.path.join(pybackup_dir, "user_backups", source_name) - full_path = os.path.join(user_backups_dir, backup_name) - - backup_type = backup_type_base.capitalize() - if is_compressed: - backup_type += " (Compressed)" - if is_encrypted: - backup_type += " (Encrypted)" - - backup_size, comment = "N/A", "" - info_file_path = os.path.join(pybackup_dir, item) - if os.path.exists(info_file_path): + info_file_path = os.path.join( + metadata_dir, f"{backup_dir_name}.json") + if os.path.isfile(info_file_path): try: with open(info_file_path, 'r') as f: - for line in f: - if line.strip().lower().startswith("originalgröße:"): - backup_size = line.split(":", 1)[1].strip().split('(')[0].strip() - elif line.strip().lower().startswith("kommentar:"): - comment = line.split(":", 1)[1].strip() - except Exception as e: - self.logger.log(f"Could not read info file {info_file_path}: {e}") + info_data = json.load(f) - backup_info = { - "date": date_str, "time": time_str, "type": backup_type, "size": backup_size, - "folder_name": backup_name, "full_path": full_path, "comment": comment, - "is_encrypted": is_encrypted, "is_compressed": is_compressed, - "backup_type_base": backup_type_base.capitalize(), - "datetime": datetime.datetime.strptime(f"{date_str} {time_str}", '%d-%m-%Y %H:%M:%S') - } - if not is_system: - backup_info["source"] = source_name - - all_backups.append(backup_info) + dt_obj = datetime.datetime.fromisoformat( + info_data["creation_date"]) - if is_system: - all_backups.sort(key=lambda x: x['datetime']) - grouped_backups = [] - current_group = [] - for backup in all_backups: - if backup['backup_type_base'] == 'Full': - if current_group: - grouped_backups.append(current_group) - current_group = [backup] + backup_type_display = info_data["mode"].capitalize() + if info_data.get("is_compressed", False): + backup_type_display += " (Compressed)" + if info_data.get("is_encrypted", False): + backup_type_display += " (Encrypted)" + + backup_info = { + "date": dt_obj.strftime('%d-%m-%Y'), + "time": dt_obj.strftime('%H:%M:%S'), + "type": backup_type_display, + "size": info_data.get("size_readable", "N/A"), + "folder_name": backup_dir_name, + "full_path": backup_dir_path, + "info_file_path": info_file_path, + "comment": info_data.get("comment", ""), + "is_encrypted": info_data.get("is_encrypted", False), + "is_compressed": info_data.get("is_compressed", False), + "backup_type_base": info_data["mode"].capitalize(), + "datetime": dt_obj, + "source": info_data.get("source_name", "N/A"), + "is_system": info_data.get("backup_type") == "system" + } + all_backups.append(backup_info) + except (IOError, json.JSONDecodeError, KeyError) as e: + self.logger.log( + f"Could not read or parse info file {info_file_path}: {e}") + + # Separate into system and user backups for correct sorting and display + system_backups = sorted( + [b for b in all_backups if b["is_system"]], key=lambda x: x['datetime'], reverse=True) + user_backups = sorted([b for b in all_backups if not b["is_system"]], + key=lambda x: x['datetime'], reverse=True) + + # Further group system backups by full/inc chains + grouped_system_backups = [] + temp_group = [] + # Sort from oldest to newest for grouping + for backup in reversed(system_backups): + if backup['backup_type_base'] == 'Full': + if temp_group: + grouped_system_backups.append(temp_group) + temp_group = [backup] + else: + if not temp_group: # Orphaned incremental + grouped_system_backups.append([backup]) else: - if not current_group: - current_group.append(backup) - else: - current_group.append(backup) - if current_group: - grouped_backups.append(current_group) - grouped_backups.sort(key=lambda g: g[0]['datetime'], reverse=True) - return [item for group in grouped_backups for item in group] - else: - all_backups.sort(key=lambda x: x['datetime'], reverse=True) - return all_backups + temp_group.append(backup) + if temp_group: + grouped_system_backups.append(temp_group) - def _list_system_backups_from_path(self, base_dest_path: str, mounted_path: Optional[str] = None) -> List[Dict[str, str]]: - name_regex = re.compile(r"^(\d{2}-\d{2}-\d{4})_(\d{2}:\d{2}:\d{2})_system_(full|incremental)(\.tar\.gz)?(_encrypted)?\.txt$", re.IGNORECASE) - return self._list_backups(base_dest_path, mounted_path, name_regex, "system") + # Sort groups by the date of the first element (the full backup), descending + grouped_system_backups.sort( + key=lambda g: g[0]['datetime'], reverse=True) + final_system_list = [ + item for group in grouped_system_backups for item in group] - def _list_user_backups_from_path(self, base_dest_path: str, mounted_path: Optional[str] = None) -> List[Dict[str, str]]: - name_regex = re.compile(r"^(\d{2}-\d{2}-\d{4})_(\d{2}:\d{2}:\d{2})_user_(.+?)_(full|incremental)(_encrypted)?\.txt$", re.IGNORECASE) - return self._list_backups(base_dest_path, mounted_path, name_regex, "user") + return final_system_list, user_backups - def _find_latest_backup(self, base_backup_path: str, is_system: bool) -> Optional[str]: - self.logger.log(f"Searching for latest backup in: {base_backup_path}") - backup_names = [] - if os.path.isdir(base_backup_path): - for item in os.listdir(base_backup_path): - if os.path.isdir(os.path.join(base_backup_path, item)): - if is_system: - if "_system_" in item: backup_names.append(item) - else: - backup_names.append(item) - backup_names.sort(reverse=True) - if not backup_names: return None - latest_backup_path = os.path.join(base_backup_path, backup_names[0]) - if os.path.isdir(latest_backup_path): - self.logger.log(f"Found latest backup for --link-dest: {latest_backup_path}") - return latest_backup_path - return None + def _find_latest_backup(self, rsync_base_dir: str, is_system: bool, source_name: str) -> Optional[str]: + self.logger.log( + f"Searching for latest full backup for source '{source_name}' in: {rsync_base_dir}") + full_backups = [] + if os.path.isdir(rsync_base_dir): + # Pattern matches any full backup for the given source name, encrypted or not + pattern = re.compile( + rf"^(\d{{8}}-\d{{6}})_{re.escape(source_name)}_full_(plain|enc)$") + for item in os.listdir(rsync_base_dir): + item_path = os.path.join(rsync_base_dir, item) + if os.path.isdir(item_path) and pattern.match(item): + full_backups.append(item) - def _create_info_file(self, pybackup_dir: str, backup_name: str, source_size: int, is_encrypted: bool): + if not full_backups: + self.logger.log("No full backups found.") + return None + + full_backups.sort(reverse=True) + latest_backup_dir = full_backups[0] + latest_backup_path = os.path.join(rsync_base_dir, latest_backup_dir) + + self.logger.log( + f"Found latest full backup for --link-dest: {latest_backup_path}") + return latest_backup_path + + def _create_info_json(self, base_dest_path: str, backup_dir_name: str, source_name: str, backup_type: str, mode: str, size_bytes: int, is_encrypted: bool, based_on: Optional[str] = None, comment: str = ""): + """Creates a backup_info.json file inside the user-writable metadata directory.""" try: - info_filename = f"{backup_name}{'_encrypted' if is_encrypted else ''}.txt" - info_file_path = os.path.join(pybackup_dir, info_filename) - if source_size > 0: - power, n = 1024, 0 + # All metadata files go into a single, flat metadata directory for simplicity + metadata_path = os.path.join(base_dest_path, "pybackup", "metadata") + os.makedirs(metadata_path, exist_ok=True) + + info_file_path = os.path.join(metadata_path, f"{backup_dir_name}.json") + + # Format size for human-readable display + if size_bytes > 0: + power = 1024 + n = 0 power_labels = {0: 'B', 1: 'KB', 2: 'MB', 3: 'GB', 4: 'TB'} - display_size = source_size + display_size = size_bytes while display_size >= power and n < len(power_labels) - 1: display_size /= power n += 1 size_str = f"{display_size:.2f} {power_labels[n]}" else: size_str = "0 B" - date_str = datetime.datetime.now().strftime("%d-%m-%Y %H:%M:%S") - info_content = (f"Backup-Datum: {date_str}\n" f"Originalgröße: {size_str} ({source_size} Bytes)\n") - with open(info_file_path, 'w') as f: f.write(info_content) + + info_data = { + "creation_date": datetime.datetime.now().isoformat(), + "backup_type": backup_type, + "source_name": source_name, + "mode": mode, + "size_bytes": size_bytes, + "size_readable": size_str, + "is_encrypted": is_encrypted, + "based_on": based_on, + "comment": comment + } + + with open(info_file_path, 'w') as f: + json.dump(info_data, f, indent=4) + self.logger.log(f"Successfully created metadata file: {info_file_path}") except Exception as e: - self.logger.log(f"Failed to create metadata file for {pybackup_dir}. Error: {e}") + self.logger.log(f"Failed to create metadata file. Error: {e}") + + def get_comment(self, info_file_path: str) -> str: + """Reads the comment from a backup_info.json file.""" + try: + with open(info_file_path, 'r') as f: + data = json.load(f) + return data.get("comment", "") + except (IOError, json.JSONDecodeError): + return "" + + def update_comment(self, info_file_path: str, new_comment: str): + """Updates the comment in a backup_info.json file.""" + try: + with open(info_file_path, 'r') as f: + data = json.load(f) + + data["comment"] = new_comment + + with open(info_file_path, 'w') as f: + json.dump(data, f, indent=4) + self.logger.log(f"Successfully updated comment in {info_file_path}") + except (IOError, json.JSONDecodeError) as e: + self.logger.log(f"Failed to update comment in {info_file_path}: {e}") def _execute_rsync(self, queue, command: List[str]): transferred_size, total_size, stderr_output = 0, 0, "" try: env = os.environ.copy() env['LC_ALL'] = 'C' - self.process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, preexec_fn=os.setsid, env=env) - + self.process = subprocess.Popen( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, preexec_fn=os.setsid, env=env) + if self.process.stdout: for line in iter(self.process.stdout.readline, ''): stripped_line = line.strip() self.logger.log(f"Rsync stdout: {stripped_line}") if '%' in stripped_line: match = re.search(r'\s*(\d+)%\s+', stripped_line) - if match: queue.put(('progress', int(match.group(1)))) + if match: + queue.put(('progress', int(match.group(1)))) elif stripped_line and not stripped_line.startswith(('sending', 'sent', 'total')): queue.put(('file_update', stripped_line)) self.process.wait() if self.process.stderr: stderr_output = self.process.stderr.read() - if stderr_output: self.logger.log(f"Rsync Stderr: {stderr_output.strip()}") + if stderr_output: + self.logger.log(f"Rsync Stderr: {stderr_output.strip()}") except FileNotFoundError: self.logger.log(f"Error: '{command[0]}' not found.") @@ -361,7 +519,8 @@ class BackupManager: def start_restore(self, source_path: str, dest_path: str, is_compressed: bool): from queue import Queue queue = self.app.queue if hasattr(self.app, 'queue') else Queue() - thread = threading.Thread(target=self._run_restore, args=(queue, source_path, dest_path, is_compressed)) + thread = threading.Thread(target=self._run_restore, args=( + queue, source_path, dest_path, is_compressed)) thread.daemon = True thread.start() @@ -376,9 +535,11 @@ class BackupManager: else: self.logger.log("Restore script failed.") except Exception as e: - self.logger.log(f"An unexpected error occurred during restore: {e}") + self.logger.log( + f"An unexpected error occurred during restore: {e}") finally: - queue.put(('completion', {'status': status, 'returncode': 0 if status == 'success' else 1})) + queue.put( + ('completion', {'status': status, 'returncode': 0 if status == 'success' else 1})) def get_scheduled_jobs(self) -> List[Dict[str, Any]]: jobs_list = [] @@ -387,85 +548,291 @@ class BackupManager: for job in user_cron: if self.app_tag in job.comment: details = self._parse_job_comment(job.comment) - if details: jobs_list.append(details) + if details: + jobs_list.append(details) except Exception as e: self.logger.log(f"Error loading cron jobs: {e}") return jobs_list - def start_delete_backup(self, path_to_delete: str, info_file_path: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str] = None): + def start_delete_backup(self, path_to_delete: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str] = None): thread = threading.Thread(target=self._run_delete, args=( - path_to_delete, info_file_path, is_encrypted, is_system, base_dest_path, queue, password)) + path_to_delete, is_encrypted, is_system, base_dest_path, queue, password)) thread.daemon = True thread.start() - def _run_delete(self, path_to_delete: str, info_file_path: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str]): + def _run_delete(self, path_to_delete: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str]): try: + # Determine metadata file path before any deletion + backup_dir_name = os.path.basename(path_to_delete.rstrip('/')) + metadata_file_path = os.path.join( + base_dest_path, "pybackup", "metadata", f"{backup_dir_name}.json") + + # Delete the backup data directory if is_encrypted: - self.logger.log(f"Starting encrypted deletion for {path_to_delete}") - - mount_point = None - if self.encryption_manager.is_mounted(base_dest_path): - pybackup_dir = os.path.join(base_dest_path, "pybackup") - mount_point = os.path.join(pybackup_dir, "encrypted") - else: + self.logger.log( + f"Starting encrypted deletion for {path_to_delete}") + mount_point = self.encryption_manager.get_mount_point( + base_dest_path) + if not mount_point: if password: - mount_point = self.encryption_manager.mount_for_deletion(base_dest_path, is_system, password) + mount_point = self.encryption_manager.mount_for_deletion( + base_dest_path, is_system, password) else: - self.logger.log("Password not provided for encrypted deletion.") + self.logger.log( + "Password not provided for encrypted deletion.") if not mount_point: self.logger.log("Failed to unlock container for deletion.") queue.put(('deletion_complete', False)) return - internal_path_to_delete = os.path.join(mount_point, os.path.basename(path_to_delete)) + internal_path_to_delete = os.path.join( + mount_point, backup_dir_name) success = False if is_system: - script_content = f"rm -rf '{internal_path_to_delete}'\nrm -f '{info_file_path}'" - success = self.encryption_manager._execute_as_root(script_content) + script_content = f"rm -rf '{internal_path_to_delete}'" + success = self.encryption_manager._execute_as_root( + script_content) else: # User backup, no root needed try: if os.path.isdir(internal_path_to_delete): shutil.rmtree(internal_path_to_delete) - if os.path.exists(info_file_path): - os.remove(info_file_path) - self.logger.log(f"Successfully deleted {internal_path_to_delete} and {info_file_path}") + self.logger.log( + f"Successfully deleted {internal_path_to_delete}") success = True except Exception as e: - self.logger.log(f"Failed to delete user backup {internal_path_to_delete}: {e}") + self.logger.log( + f"Failed to delete user backup {internal_path_to_delete}: {e}") success = False - if success: - self.logger.log("Encrypted backup deleted successfully.") - queue.put(('deletion_complete', True)) - else: - self.logger.log("Failed to delete files within encrypted container.") + if not success: + self.logger.log( + "Failed to delete files within encrypted container.") queue.put(('deletion_complete', False)) + return # Stop if data deletion failed elif is_system: - script_content = f"rm -rf '{path_to_delete}'\nrm -f '{info_file_path}'" - if self.encryption_manager._execute_as_root(script_content): - self.logger.log(f"Successfully deleted {path_to_delete} and {info_file_path}") - queue.put(('deletion_complete', True)) - else: + script_content = f"rm -rf '{path_to_delete}'" + if not self.encryption_manager._execute_as_root(script_content): self.logger.log(f"Failed to delete {path_to_delete}") queue.put(('deletion_complete', False)) - else: + return # Stop if data deletion failed + else: # Unencrypted user backup try: - if os.path.isdir(path_to_delete): shutil.rmtree(path_to_delete) - if os.path.exists(info_file_path): os.remove(info_file_path) - self.logger.log(f"Successfully deleted {path_to_delete} and {info_file_path}") - queue.put(('deletion_complete', True)) + if os.path.isdir(path_to_delete): + shutil.rmtree(path_to_delete) + self.logger.log(f"Successfully deleted {path_to_delete}") except Exception as e: - self.logger.log(f"Failed to delete unencrypted user backup {path_to_delete}: {e}") + self.logger.log( + f"Failed to delete unencrypted user backup {path_to_delete}: {e}") queue.put(('deletion_complete', False)) + return # Stop if data deletion failed + + # Finally, delete the metadata file (with user permissions) + try: + if os.path.exists(metadata_file_path): + os.remove(metadata_file_path) + self.logger.log( + f"Successfully deleted metadata file {metadata_file_path}") + queue.put(('deletion_complete', True)) + except Exception as e: + self.logger.log( + f"Failed to delete metadata file {metadata_file_path}: {e}") + queue.put(('deletion_complete', False)) + + except Exception as e: + self.logger.log(f"Error during threaded deletion: {e}") + queue.put(('deletion_complete', False)) + + def get_comment(self, info_file_path: str) -> str: + """Reads the comment from a backup_info.json file.""" + try: + with open(info_file_path, 'r') as f: + data = json.load(f) + return data.get("comment", "") + except (IOError, json.JSONDecodeError): + return "" + + def update_comment(self, info_file_path: str, new_comment: str): + """Updates the comment in a backup_info.json file.""" + try: + with open(info_file_path, 'r') as f: + data = json.load(f) + + data["comment"] = new_comment + + with open(info_file_path, 'w') as f: + json.dump(data, f, indent=4) + self.logger.log(f"Successfully updated comment in {info_file_path}") + except (IOError, json.JSONDecodeError) as e: + self.logger.log(f"Failed to update comment in {info_file_path}: {e}") + + def _execute_rsync(self, queue, command: List[str]): + transferred_size, total_size, stderr_output = 0, 0, "" + try: + env = os.environ.copy() + env['LC_ALL'] = 'C' + self.process = subprocess.Popen( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, preexec_fn=os.setsid, env=env) + + if self.process.stdout: + for line in iter(self.process.stdout.readline, ''): + stripped_line = line.strip() + self.logger.log(f"Rsync stdout: {stripped_line}") + if '%' in stripped_line: + match = re.search(r'\s*(\d+)%\s+', stripped_line) + if match: + queue.put(('progress', int(match.group(1)))) + elif stripped_line and not stripped_line.startswith(('sending', 'sent', 'total')): + queue.put(('file_update', stripped_line)) + + self.process.wait() + if self.process.stderr: + stderr_output = self.process.stderr.read() + if stderr_output: + self.logger.log(f"Rsync Stderr: {stderr_output.strip()}") + + except FileNotFoundError: + self.logger.log(f"Error: '{command[0]}' not found.") + queue.put(('error', None)) + except Exception as e: + self.logger.log(f"Rsync execution error: {e}") + queue.put(('error', None)) + return transferred_size, total_size, stderr_output + + def start_restore(self, source_path: str, dest_path: str, is_compressed: bool): + from queue import Queue + queue = self.app.queue if hasattr(self.app, 'queue') else Queue() + thread = threading.Thread(target=self._run_restore, args=( + queue, source_path, dest_path, is_compressed)) + thread.daemon = True + thread.start() + + def _run_restore(self, queue, source_path: str, dest_path: str, is_compressed: bool): + self.logger.log(f"Starting restore from {source_path} to {dest_path}") + status = 'error' + try: + source = source_path.rstrip('/') + '/' + script_content = f"tar -xzf '{source_path}' -C '{dest_path}'" if is_compressed else f"rsync -aAXHv '{source}' '{dest_path}'" + if self.encryption_manager._execute_as_root(script_content): + status = 'success' + else: + self.logger.log("Restore script failed.") + except Exception as e: + self.logger.log( + f"An unexpected error occurred during restore: {e}") + finally: + queue.put( + ('completion', {'status': status, 'returncode': 0 if status == 'success' else 1})) + + def get_scheduled_jobs(self) -> List[Dict[str, Any]]: + jobs_list = [] + try: + user_cron = CronTab(user=True) + for job in user_cron: + if self.app_tag in job.comment: + details = self._parse_job_comment(job.comment) + if details: + jobs_list.append(details) + except Exception as e: + self.logger.log(f"Error loading cron jobs: {e}") + return jobs_list + + def start_delete_backup(self, path_to_delete: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str] = None): + thread = threading.Thread(target=self._run_delete, args=( + path_to_delete, is_encrypted, is_system, base_dest_path, queue, password)) + thread.daemon = True + thread.start() + + def _run_delete(self, path_to_delete: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str]): + try: + # Determine metadata file path before any deletion + backup_dir_name = os.path.basename(path_to_delete.rstrip('/')) + metadata_file_path = os.path.join( + base_dest_path, "pybackup", "metadata", f"{backup_dir_name}.json") + + # Delete the backup data directory + if is_encrypted: + self.logger.log( + f"Starting encrypted deletion for {path_to_delete}") + mount_point = self.encryption_manager.get_mount_point( + base_dest_path) + if not mount_point: + if password: + mount_point = self.encryption_manager.mount_for_deletion( + base_dest_path, is_system, password) + else: + self.logger.log( + "Password not provided for encrypted deletion.") + + if not mount_point: + self.logger.log("Failed to unlock container for deletion.") + queue.put(('deletion_complete', False)) + return + + internal_path_to_delete = os.path.join( + mount_point, backup_dir_name) + success = False + if is_system: + script_content = f"rm -rf '{internal_path_to_delete}'" + success = self.encryption_manager._execute_as_root( + script_content) + else: # User backup, no root needed + try: + if os.path.isdir(internal_path_to_delete): + shutil.rmtree(internal_path_to_delete) + self.logger.log( + f"Successfully deleted {internal_path_to_delete}") + success = True + except Exception as e: + self.logger.log( + f"Failed to delete user backup {internal_path_to_delete}: {e}") + success = False + + if not success: + self.logger.log( + "Failed to delete files within encrypted container.") + queue.put(('deletion_complete', False)) + return # Stop if data deletion failed + + elif is_system: + script_content = f"rm -rf '{path_to_delete}'" + if not self.encryption_manager._execute_as_root(script_content): + self.logger.log(f"Failed to delete {path_to_delete}") + queue.put(('deletion_complete', False)) + return # Stop if data deletion failed + else: # Unencrypted user backup + try: + if os.path.isdir(path_to_delete): + shutil.rmtree(path_to_delete) + self.logger.log(f"Successfully deleted {path_to_delete}") + except Exception as e: + self.logger.log( + f"Failed to delete unencrypted user backup {path_to_delete}: {e}") + queue.put(('deletion_complete', False)) + return # Stop if data deletion failed + + # Finally, delete the metadata file (with user permissions) + try: + if os.path.exists(metadata_file_path): + os.remove(metadata_file_path) + self.logger.log( + f"Successfully deleted metadata file {metadata_file_path}") + queue.put(('deletion_complete', True)) + except Exception as e: + self.logger.log( + f"Failed to delete metadata file {metadata_file_path}: {e}") + queue.put(('deletion_complete', False)) + except Exception as e: self.logger.log(f"Error during threaded deletion: {e}") queue.put(('deletion_complete', False)) def cancel_and_delete_privileged_backup(self, delete_path: str): - if not self.process or self.process.poll() is not None: return - self.logger.log("Attempting to cancel backup and delete directory with root privileges...") + if not self.process or self.process.poll() is not None: + return + self.logger.log( + "Attempting to cancel backup and delete directory with root privileges...") try: pgid = os.getpgid(self.process.pid) script_content = f""" @@ -474,4 +841,18 @@ if [ -n "{delete_path}" ] && [ "{delete_path}" != "/" ]; then rm -rf "{delete_pa """ self.encryption_manager._execute_as_root(script_content) except Exception as e: - self.logger.log(f"An error occurred during privileged cancel and delete: {e}") + self.logger.log( + f"An error occurred during privileged cancel and delete: {e}") + + def cancel_backup(self): + if self.process and self.process.poll() is None: + self.logger.log( + f"Attempting to cancel backup process with PID: {self.process.pid}") + try: + # Terminate the entire process group to stop rsync + os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) + self.logger.log("Successfully sent SIGTERM to process group.") + except ProcessLookupError: + self.logger.log("Process already finished.") + except Exception as e: + self.logger.log(f"Error cancelling process: {e}") diff --git a/core/encryption_manager.py b/core/encryption_manager.py index 7a36e87..0512e31 100644 --- a/core/encryption_manager.py +++ b/core/encryption_manager.py @@ -136,16 +136,20 @@ class EncryptionManager: def is_encrypted(self, base_dest_path: str) -> bool: return os.path.exists(self.get_container_path(base_dest_path)) + def get_mount_point(self, base_dest_path: str) -> str: + """Constructs the unique, static mount point path for a given destination.""" + username = os.path.basename(base_dest_path.rstrip('/')) + mapper_name = f"pybackup_luks_{username}" + return os.path.join("/mnt", mapper_name) + def is_mounted(self, base_dest_path: str) -> bool: - pybackup_dir = os.path.join(base_dest_path, "pybackup") - mount_point = os.path.join(pybackup_dir, "encrypted") + mount_point = self.get_mount_point(base_dest_path) return os.path.ismount(mount_point) or base_dest_path in self.mounted_destinations def mount_for_deletion(self, base_dest_path: str, is_system: bool, password: str) -> Optional[str]: self.logger.log("Mounting container for deletion operation.") if self._open_and_mount(base_dest_path, is_system, password): - mount_point = os.path.join(os.path.dirname( - self.get_container_path(base_dest_path)), "..", "encrypted") + mount_point = self.get_mount_point(base_dest_path) self.mounted_destinations.add(base_dest_path) return mount_point self.logger.log("Failed to mount container for deletion.") @@ -162,8 +166,7 @@ class EncryptionManager: self.logger.log("Handling existing container.") username = os.path.basename(base_dest_path.rstrip('/')) - mount_point = os.path.join(os.path.dirname( - self.get_container_path(base_dest_path)), "..", "encrypted") + mount_point = self.get_mount_point(base_dest_path) if not self.is_mounted(base_dest_path): if not self._open_and_mount(base_dest_path, is_system): @@ -232,8 +235,7 @@ mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\" return None container_path = self.get_container_path(base_dest_path) - mount_point = os.path.join(os.path.dirname( - container_path), "..", "encrypted") + mount_point = self.get_mount_point(base_dest_path) mapper_name = f"pybackup_luks_{username}" chown_cmd = self._get_chown_command(mount_point, is_system) @@ -263,8 +265,7 @@ mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\" return False container_path = self.get_container_path(base_dest_path) - mount_point = os.path.join(os.path.dirname( - container_path), "..", "encrypted") + mount_point = self.get_mount_point(base_dest_path) mapper_name = f"pybackup_luks_{username}" chown_cmd = self._get_chown_command(mount_point, is_system) @@ -291,9 +292,7 @@ mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\" return self.logger.log(f"Unmounting and resetting owner for {base_dest_path}") - container_path = self.get_container_path(base_dest_path) - mount_point = os.path.join(os.path.dirname( - container_path), "..", "encrypted") + mount_point = self.get_mount_point(base_dest_path) script = f""" chown root:root \"{mount_point}\" || true diff --git a/core/pbp_app_config.py b/core/pbp_app_config.py index a82e3aa..5f8f063 100644 --- a/core/pbp_app_config.py +++ b/core/pbp_app_config.py @@ -361,6 +361,6 @@ class Msg: "keyfile_settings": _("Keyfile Settings"), # New "backup_defaults_title": _("Backup Defaults"), # New "automation_settings_title": _("Automation Settings"), # New - "create_add_key_file": _("Create/Add Key File"), # New - "key_file_not_created": _("Key file not created."), # New + "create_add_key_file": _("Create/Add Key File"), # New + "key_file_not_created": _("Key file not created."), # New } diff --git a/main_app.py b/main_app.py index 67b620f..4eb0a24 100644 --- a/main_app.py +++ b/main_app.py @@ -678,6 +678,9 @@ class MainApplication(tk.Tk): self.task_progress.stop() elif message_type == 'cancel_button_state': self.start_pause_button.config(state=value) + elif message_type == 'current_path': + self.current_backup_path = value + app_logger.log(f"Set current backup path to: {value}") elif message_type == 'deletion_complete': self.actions._set_ui_state(True) self.backup_content_frame.hide_deletion_status() diff --git a/pyimage_ui/actions.py b/pyimage_ui/actions.py index 036b418..4e468c7 100644 --- a/pyimage_ui/actions.py +++ b/pyimage_ui/actions.py @@ -27,41 +27,46 @@ class Actions: self.app.inkrementell_var.set(True) def _update_backup_type_controls(self): - # Only applies to system backups in backup mode - if self.app.mode != 'backup' or self.app.left_canvas_data.get('folder') != "Computer": - self._set_backup_type("full") # Default for user backups - self.app.full_backup_cb.config(state='disabled') - self.app.incremental_cb.config(state='disabled') - return - else: - # Re-enable if we switch back to system backup + source_name = self.app.left_canvas_data.get('folder') + is_system_backup = (source_name == "Computer") + + # Re-enable controls for user backups, disable for system unless conditions are met + if not is_system_backup: + self.app.full_backup_cb.config(state='normal') + self.app.incremental_cb.config(state='normal') + else: # System backup self.app.full_backup_cb.config(state='normal') self.app.incremental_cb.config(state='normal') - # If controls are forced by advanced settings, do nothing - if self.app.full_backup_cb.cget('state') == 'disabled' and self.app.incremental_cb.cget('state') == 'disabled': + # Handle forced settings from advanced config, which have top priority + if self.app.config_manager.get_setting("force_full_backup", False): + self._set_backup_type("full") + self.app.full_backup_cb.config(state='disabled') + self.app.incremental_cb.config(state='disabled') + return + if self.app.config_manager.get_setting("force_incremental_backup", False): + self._set_backup_type("incremental") + self.app.full_backup_cb.config(state='disabled') + self.app.incremental_cb.config(state='disabled') return - full_backup_exists = False - if self.app.destination_path and os.path.isdir(self.app.destination_path): - pybackup_dir = os.path.join(self.app.destination_path, "pybackup") - if not os.path.isdir(pybackup_dir): - self._set_backup_type("full") - return + # Default to Full if no destination is set + if not self.app.destination_path or not os.path.isdir(self.app.destination_path): + self._set_backup_type("full") + return - is_encrypted_backup = self.app.encrypted_var.get() + is_encrypted = self.app.encrypted_var.get() + + # Use the new detection logic for both user and system backups + # Note: For system backups, source_name is "Computer". We might need a more specific profile name. + # For now, we adapt it to the check_for_full_backup function's expectation. + profile_name = "system" if is_system_backup else source_name - system_backups = self.app.backup_manager.list_system_backups( - self.app.destination_path, mount_if_needed=False) - - if system_backups is None: # Encrypted, but not inspected - full_backup_exists = True # Assume one exists to be safe - else: - for backup in system_backups: - # Match the encryption status and check if it's a full backup - if backup.get('is_encrypted') == is_encrypted_backup and backup.get('backup_type_base') == 'Full': - full_backup_exists = True - break + full_backup_exists = self.app.backup_manager.check_for_full_backup( + dest_path=self.app.destination_path, + source_name=profile_name, # Using a profile name now + is_encrypted=is_encrypted + ) if full_backup_exists: self._set_backup_type("incremental") @@ -643,32 +648,7 @@ class Actions: return is_encrypted = self.app.encrypted_var.get() - password = None - if is_encrypted: - username = os.path.basename(base_dest.rstrip('/')) - password = self.app.backup_manager.encryption_manager.get_password( - username, confirm=True) - if not password: - app_logger.log( - "Encryption enabled, but no password provided. Aborting backup.") - self.app.backup_is_running = False - self.app.start_pause_button["text"] = Msg.STR["start"] - self._set_ui_state(True) - return - - try: - locale.setlocale(locale.LC_TIME, 'de_DE.UTF-8') - except locale.Error: - app_logger.log( - "Could not set locale to de_DE.UTF-8. Using default.") - - now = datetime.datetime.now() - date_str = now.strftime("%d-%m-%Y") - time_str = now.strftime("%H:%M:%S") - folder_name = f"{date_str}_{time_str}_system_{mode}" - # The backup_manager will add /pybackup/ - final_dest = os.path.join(base_dest, folder_name) - self.app.current_backup_path = final_dest + # Password handling is now managed within the backup manager if needed for mounting source_size_bytes = self.app.left_canvas_data.get('total_bytes', 0) @@ -686,8 +666,9 @@ class Actions: self.app.backup_manager.start_backup( queue=self.app.queue, source_path="/", - dest_path=final_dest, + dest_path=base_dest, # Pass the base destination path is_system=True, + source_name="system", is_dry_run=is_dry_run, exclude_files=exclude_file_paths, source_size=source_size_bytes, @@ -710,30 +691,10 @@ class Actions: return is_encrypted = self.app.encrypted_var.get() - password = None - if is_encrypted: - username = os.path.basename(base_dest.rstrip('/')) - password = self.app.backup_manager.encryption_manager.get_password( - username, confirm=True) - if not password: - app_logger.log( - "Encryption enabled, but no password provided. Aborting backup.") - self.app.backup_is_running = False - self.app.start_pause_button["text"] = Msg.STR["start"] - self._set_ui_state(True) - return + # Password handling is now managed within the backup manager if needed for mounting - # Determine mode for user backup based on UI selection mode = "full" if self.app.vollbackup_var.get() else "incremental" - now = datetime.datetime.now() - date_str = now.strftime("%d-%m-%Y") - time_str = now.strftime("%H:%M:%S") - folder_name = f"{date_str}_{time_str}_user_{source_name}_{mode}" - - final_dest = os.path.join(base_dest, folder_name) - self.app.current_backup_path = final_dest - is_dry_run = self.app.testlauf_var.get() is_compressed = self.app.compressed_var.get() use_trash_bin = self.app.config_manager.get_setting( @@ -744,10 +705,11 @@ class Actions: self.app.backup_manager.start_backup( queue=self.app.queue, source_path=source_path, - dest_path=final_dest, + dest_path=base_dest, # Pass the base destination path is_system=False, + source_name=source_name, is_dry_run=is_dry_run, - exclude_files=None, + exclude_files=None, # User backups don't use the global exclude list here source_size=source_size_bytes, is_compressed=is_compressed, is_encrypted=is_encrypted, diff --git a/pyimage_ui/user_backup_content_frame.py b/pyimage_ui/user_backup_content_frame.py index ccb8cc4..f97184d 100644 --- a/pyimage_ui/user_backup_content_frame.py +++ b/pyimage_ui/user_backup_content_frame.py @@ -94,15 +94,15 @@ class UserBackupContentFrame(ttk.Frame): if not selected_backup: return - is_encrypted = selected_backup.get('is_encrypted', False) - info_file_name = f"{selected_item_id}{'_encrypted' if is_encrypted else ''}.txt" - info_file_path = os.path.join( - self.backup_path, "pybackup", info_file_name) - - if not os.path.exists(info_file_path): - self.backup_manager.update_comment(info_file_path, "") + # Use the direct path to the info file, which we added to the backup dict + info_file_path = selected_backup.get('info_file_path') + if not info_file_path or not os.path.isfile(info_file_path): + MessageDialog(self, message_type="error", title="Error", text=f"Metadata file not found: {info_file_path}") + return CommentEditorDialog(self, info_file_path, self.backup_manager) + + # Refresh the view to show the new comment self.parent_view.show(self.backup_path) def _restore_selected(self): @@ -129,28 +129,24 @@ class UserBackupContentFrame(ttk.Frame): password = None if is_encrypted: - username = os.path.basename(self.backup_path.rstrip('/')) - # Get password in the UI thread before starting the background task - password = self.backup_manager.encryption_manager.get_password( - username, confirm=False) + # For encrypted backups, the base_dest_path is the path to the container's parent directory + # We assume the logic to get the username/keyring entry is handled by the encryption manager + password = self.backup_manager.encryption_manager.get_password(confirm=False) if not password: self.actions.logger.log( "Password entry cancelled, aborting deletion.") return - info_file_to_delete = os.path.join( - self.backup_path, "pybackup", f"{selected_item_id}{'_encrypted' if is_encrypted else ''}.txt") - self.actions._set_ui_state(False) self.parent_view.show_deletion_status( Msg.STR["deleting_backup_in_progress"]) + # The info_file_path is no longer needed as it's inside the folder_to_delete self.backup_manager.start_delete_backup( path_to_delete=folder_to_delete, - info_file_path=info_file_to_delete, is_encrypted=is_encrypted, is_system=False, - base_dest_path=self.backup_path, + base_dest_path=self.backup_path, # This is the root destination folder password=password, queue=self.winfo_toplevel().queue )