diff --git a/core/backup_manager.py b/core/backup_manager.py index deb570a..1fb5640 100644 --- a/core/backup_manager.py +++ b/core/backup_manager.py @@ -12,7 +12,6 @@ from pathlib import Path from crontab import CronTab import tempfile import stat -import shutil from core.pbp_app_config import AppConfig from pyimage_ui.password_dialog import PasswordDialog @@ -77,26 +76,39 @@ class BackupManager: finally: self.inhibit_cookie = None + def _get_profile_path(self, base_dest_path: str, is_system: bool, source_name: str, is_encrypted: bool) -> str: + """Helper function to construct the path to a specific backup profile directory.""" + pybackup_dir = os.path.join(base_dest_path, "pybackup") + if is_encrypted: + base_data_dir = self.encryption_manager.get_mount_point( + base_dest_path) + else: + base_data_dir = os.path.join(pybackup_dir, "unencrypted") + + if is_system: + return os.path.join(base_data_dir, "system") + else: + return os.path.join(base_data_dir, "user", source_name) + def check_for_full_backup(self, dest_path: str, source_name: str, is_encrypted: bool) -> bool: - """Checks if a full backup already exists for a given source in the new flat structure.""" + """Checks if a full backup already exists for a given source.""" self.logger.log( f"Checking for existing full backup for source '{source_name}' in '{dest_path}' (Encrypted: {is_encrypted})") - pybackup_dir = os.path.join(dest_path, "pybackup") - scan_dir = self.encryption_manager.get_mount_point( - dest_path) if is_encrypted else pybackup_dir + is_system = source_name == 'system' + profile_path = self._get_profile_path( + dest_path, is_system, source_name, is_encrypted) - if not scan_dir or not os.path.isdir(scan_dir): + if not os.path.isdir(profile_path): self.logger.log( - f"Scan directory '{scan_dir}' does not exist. No full backup found.") + f"Profile directory '{profile_path}' does not exist. No full backup found.") return False enc_suffix = "enc" if is_encrypted else "plain" - # Pattern matches: 20250908-133000_system_full_plain pattern = re.compile( rf"\d{{8}}-\d{{6}}_{re.escape(source_name)}_full_{enc_suffix}") - for dirname in os.listdir(scan_dir): + for dirname in os.listdir(profile_path): if pattern.match(dirname): self.logger.log( f"Found existing full backup directory: {dirname}") @@ -106,31 +118,6 @@ class BackupManager: f"No existing full backup found for source '{source_name}'.") return False - def _find_latest_backup(self, rsync_base_dir: str, is_system: bool, source_name: str) -> Optional[str]: - self.logger.log( - f"Searching for latest full backup for source '{source_name}' in: {rsync_base_dir}") - full_backups = [] - if os.path.isdir(rsync_base_dir): - # Pattern matches any full backup for the given source name, encrypted or not - pattern = re.compile( - rf"^(\d{{8}}-\d{{6}})_{re.escape(source_name)}_full_(plain|enc)$") - for item in os.listdir(rsync_base_dir): - item_path = os.path.join(rsync_base_dir, item) - if os.path.isdir(item_path) and pattern.match(item): - full_backups.append(item) - - if not full_backups: - self.logger.log("No full backups found.") - return None - - full_backups.sort(reverse=True) - latest_backup_dir = full_backups[0] - latest_backup_path = os.path.join(rsync_base_dir, latest_backup_dir) - - self.logger.log( - f"Found latest full backup for --link-dest: {latest_backup_path}") - return latest_backup_path - def start_backup(self, queue, source_path: str, dest_path: str, is_system: bool, source_name: str, is_dry_run: bool = False, exclude_files: Optional[List[Path]] = None, source_size: int = 0, is_compressed: bool = False, is_encrypted: bool = False, mode: str = "incremental", use_trash_bin: bool = False, no_trash_bin: bool = False): self.is_system_process = is_system self._inhibit_screensaver() @@ -149,24 +136,20 @@ class BackupManager: return None thread = threading.Thread(target=self._run_backup_path, args=( - queue, source_path, dest_path, is_system, source_name, is_dry_run, exclude_files, source_size, is_compressed, is_encrypted, mode, mount_point, use_trash_bin, no_trash_bin)) + queue, source_path, dest_path, is_system, source_name, is_dry_run, exclude_files, source_size, is_compressed, is_encrypted, mode, use_trash_bin, no_trash_bin)) thread.daemon = True thread.start() return thread - def _run_backup_path(self, queue, source_path: str, dest_path: str, is_system: bool, source_name: str, is_dry_run: bool, exclude_files: Optional[List[Path]], source_size: int, is_compressed: bool, is_encrypted: bool, mode: str, mount_point: Optional[str], use_trash_bin: bool, no_trash_bin: bool): - base_dest_path = dest_path # The user-selected destination path - rsync_dest = None # Initialize to None - + def _run_backup_path(self, queue, source_path: str, dest_path: str, is_system: bool, source_name: str, is_dry_run: bool, exclude_files: Optional[List[Path]], source_size: int, is_compressed: bool, is_encrypted: bool, mode: str, use_trash_bin: bool, no_trash_bin: bool): + base_dest_path = dest_path try: - pybackup_dir = os.path.join(base_dest_path, "pybackup") - if not os.path.isdir(pybackup_dir): - os.makedirs(pybackup_dir, exist_ok=True) - - rsync_base_dir = mount_point if is_encrypted else pybackup_dir + profile_path = self._get_profile_path( + base_dest_path, is_system, source_name, is_encrypted) + os.makedirs(profile_path, exist_ok=True) latest_full_backup_path = self._find_latest_backup( - rsync_base_dir, is_system, source_name) + profile_path, source_name) if mode == "incremental" and not latest_full_backup_path: self.logger.log( @@ -177,14 +160,11 @@ class BackupManager: timestamp = now.strftime("%Y%m%d-%H%M%S") encryption_suffix = "enc" if is_encrypted else "plain" backup_dir_name = f"{timestamp}_{source_name}_{mode}_{encryption_suffix}" - rsync_dest = os.path.join(rsync_base_dir, backup_dir_name) + rsync_dest = os.path.join(profile_path, backup_dir_name) - # Send the determined path back to the main thread via the queue queue.put(('current_path', rsync_dest)) - # --- Rsync command construction --- - rsync_command_parts = [ - 'rsync', '-aAXHv'] if is_system else ['rsync', '-aL'] + rsync_command_parts = ['rsync', '-aAXHv'] if is_system else ['rsync', '-aL'] if mode == "incremental" and latest_full_backup_path and not is_dry_run: rsync_command_parts.append( f"--link-dest='{latest_full_backup_path}'") @@ -200,7 +180,8 @@ class BackupManager: rsync_command_parts.append('--dry-run') if not is_system: - trash_bin_path = os.path.join(rsync_base_dir, ".Trash") + user_base_dir = os.path.dirname(profile_path) + trash_bin_path = os.path.join(user_base_dir, ".Trash") if use_trash_bin: rsync_command_parts.extend( ['--backup', f'--backup-dir=\'{trash_bin_path}\'', '--delete']) @@ -210,15 +191,15 @@ class BackupManager: rsync_command_parts.append( f"--exclude='{os.path.basename(trash_bin_path)}/'") - rsync_command_parts.extend([f"'{source_path}'", f"'{rsync_dest}'"]) - if is_system: - # Restore the working single-password solution + # For system backup, build a shell command string with quoted paths. + rsync_command_parts.extend([f"'{source_path}'", f"'{rsync_dest}'"]) rsync_cmd_str = ' '.join(rsync_command_parts) - # Important: Use single quotes around paths to handle spaces, and use -p with mkdir. full_system_cmd = f"mkdir -p '{rsync_dest}' && {rsync_cmd_str}" command = ['pkexec', 'bash', '-c', full_system_cmd] else: + # For user backup, pass a list of args to Popen without extra quotes. + rsync_command_parts.extend([source_path, rsync_dest]) os.makedirs(rsync_dest, exist_ok=True) command = rsync_command_parts @@ -232,7 +213,6 @@ class BackupManager: status = 'success' if return_code == 0 else 'warning' if return_code in [ 23, 24] else 'cancelled' if return_code in [143, -15, 15, -9] else 'error' if status in ['success', 'warning'] and not is_dry_run: - # After a successful backup, get the true size of the destination directory final_size = self._get_directory_size(rsync_dest) self._create_info_json( base_dest_path=base_dest_path, @@ -257,18 +237,16 @@ class BackupManager: self.process = None def _get_directory_size(self, path: str) -> int: - """Calculates the total disk space used by a directory using `du`. """ if not os.path.isdir(path): return 0 try: - # Use `du -sb` to get the real disk usage in bytes, correctly handling hard links. - result = subprocess.run(["du", "-sb", path], capture_output=True, text=True, check=True) - # Output is like "12345\t/path/to/dir", so we split and take the first part. + result = subprocess.run( + ["du", "-sb", path], capture_output=True, text=True, check=True) size_in_bytes = int(result.stdout.split()[0]) return size_in_bytes except (subprocess.CalledProcessError, FileNotFoundError, ValueError, IndexError) as e: - self.logger.log(f"Could not calculate directory size for {path} using du: {e}") - # Fallback to a simpler, less accurate method if du fails + self.logger.log( + f"Could not calculate directory size for {path} using du: {e}") total_size = 0 try: for dirpath, dirnames, filenames in os.walk(path): @@ -278,115 +256,96 @@ class BackupManager: total_size += os.path.getsize(fp) return total_size except Exception as fallback_e: - self.logger.log(f"Fallback size calculation also failed for {path}: {fallback_e}") + self.logger.log( + f"Fallback size calculation also failed for {path}: {fallback_e}") return 0 - def _prepare_and_get_mounted_path(self, base_dest_path: str, is_system: bool, mount_if_needed: bool) -> Optional[str]: - if not self.encryption_manager.is_encrypted(base_dest_path): - return None - - if not self.encryption_manager.is_mounted(base_dest_path): - if mount_if_needed: - if not self.encryption_manager.prepare_encrypted_destination(base_dest_path, is_system=is_system, source_size=0, queue=self.app.queue): - return None - else: - return None - - if self.encryption_manager.is_mounted(base_dest_path): - pybackup_dir = os.path.join(base_dest_path, "pybackup") - return os.path.join(pybackup_dir, "encrypted") - - return None - def list_all_backups(self, base_dest_path: str, mount_if_needed: bool = True): - is_encrypted_dest = self.encryption_manager.is_encrypted( - base_dest_path) - scan_dir = self.encryption_manager.get_mount_point( - base_dest_path) if is_encrypted_dest else os.path.join(base_dest_path, "pybackup") - - if not scan_dir or not os.path.isdir(scan_dir): - # Try to mount if it wasn't already - if is_encrypted_dest and mount_if_needed: - if not self.encryption_manager.prepare_encrypted_destination(base_dest_path, is_system=False, source_size=0, queue=self.app.queue): - return [], [] # Mount failed - scan_dir = self.encryption_manager.get_mount_point( - base_dest_path) - if not scan_dir or not os.path.isdir(scan_dir): - return [], [] - else: - return [], [] - - metadata_dir = os.path.join(base_dest_path, "pybackup", "metadata") + pybackup_dir = os.path.join(base_dest_path, "pybackup") + metadata_dir = os.path.join(pybackup_dir, "metadata") if not os.path.isdir(metadata_dir): return [], [] all_backups = [] - for backup_dir_name in os.listdir(scan_dir): - backup_dir_path = os.path.join(scan_dir, backup_dir_name) - if not os.path.isdir(backup_dir_path): + for info_file_name in os.listdir(metadata_dir): + if not info_file_name.endswith(".json"): continue - info_file_path = os.path.join( - metadata_dir, f"{backup_dir_name}.json") - if os.path.isfile(info_file_path): - try: - with open(info_file_path, 'r') as f: - info_data = json.load(f) + info_file_path = os.path.join(metadata_dir, info_file_name) + try: + with open(info_file_path, 'r') as f: + info_data = json.load(f) - dt_obj = datetime.datetime.fromisoformat( - info_data["creation_date"]) + is_encrypted = info_data.get("is_encrypted", False) + is_system = info_data.get("backup_type") == "system" + source_name = info_data.get("source_name", "N/A") + backup_dir_name = info_file_name.replace(".json", "") - backup_type_display = info_data["mode"].capitalize() - if info_data.get("is_compressed", False): - backup_type_display += " (Compressed)" - if info_data.get("is_encrypted", False): - backup_type_display += " (Encrypted)" + profile_path = self._get_profile_path( + base_dest_path, is_system, source_name, is_encrypted) + full_path = os.path.join(profile_path, backup_dir_name) - backup_info = { - "date": dt_obj.strftime('%d-%m-%Y'), - "time": dt_obj.strftime('%H:%M:%S'), - "type": backup_type_display, - "size": info_data.get("size_readable", "N/A"), - "folder_name": backup_dir_name, - "full_path": backup_dir_path, - "info_file_path": info_file_path, - "comment": info_data.get("comment", ""), - "is_encrypted": info_data.get("is_encrypted", False), - "is_compressed": info_data.get("is_compressed", False), - "backup_type_base": info_data["mode"].capitalize(), - "datetime": dt_obj, - "source": info_data.get("source_name", "N/A"), - "is_system": info_data.get("backup_type") == "system" - } - all_backups.append(backup_info) - except (IOError, json.JSONDecodeError, KeyError) as e: - self.logger.log( - f"Could not read or parse info file {info_file_path}: {e}") + if not os.path.isdir(full_path): + if not is_encrypted: + self.logger.log( + f"Metadata file found for {backup_dir_name} but data directory not found at {full_path}. Skipping.") + continue + if not self.encryption_manager.is_mounted(base_dest_path): + self.logger.log( + f"Mounting {base_dest_path} to check for encrypted backup data...") + self.encryption_manager.prepare_encrypted_destination( + base_dest_path, is_system, 0, self.app.queue if self.app else None) + if not os.path.isdir(full_path): + self.logger.log( + f"Data directory {full_path} still not found after mount attempt. Skipping.") + continue + + dt_obj = datetime.datetime.fromisoformat( + info_data["creation_date"]) + backup_type_display = info_data["mode"].capitalize() + if is_encrypted: + backup_type_display += " (Encrypted)" + + backup_info = { + "date": dt_obj.strftime('%d-%m-%Y'), + "time": dt_obj.strftime('%H:%M:%S'), + "type": backup_type_display, + "size": info_data.get("size_readable", "N/A"), + "folder_name": backup_dir_name, + "full_path": full_path, + "info_file_path": info_file_path, + "comment": info_data.get("comment", ""), + "is_encrypted": is_encrypted, + "backup_type_base": info_data["mode"].capitalize(), + "datetime": dt_obj, + "source": source_name, + "is_system": is_system + } + all_backups.append(backup_info) + except (IOError, json.JSONDecodeError, KeyError) as e: + self.logger.log( + f"Could not read or parse info file {info_file_path}: {e}") - # Separate into system and user backups for correct sorting and display system_backups = sorted( [b for b in all_backups if b["is_system"]], key=lambda x: x['datetime'], reverse=True) user_backups = sorted([b for b in all_backups if not b["is_system"]], key=lambda x: x['datetime'], reverse=True) - # Further group system backups by full/inc chains grouped_system_backups = [] temp_group = [] - # Sort from oldest to newest for grouping for backup in reversed(system_backups): if backup['backup_type_base'] == 'Full': if temp_group: grouped_system_backups.append(temp_group) temp_group = [backup] else: - if not temp_group: # Orphaned incremental + if not temp_group: grouped_system_backups.append([backup]) else: temp_group.append(backup) if temp_group: grouped_system_backups.append(temp_group) - # Sort groups by the date of the first element (the full backup), descending grouped_system_backups.sort( key=lambda g: g[0]['datetime'], reverse=True) final_system_list = [ @@ -394,16 +353,15 @@ class BackupManager: return final_system_list, user_backups - def _find_latest_backup(self, rsync_base_dir: str, is_system: bool, source_name: str) -> Optional[str]: + def _find_latest_backup(self, profile_path: str, source_name: str) -> Optional[str]: self.logger.log( - f"Searching for latest full backup for source '{source_name}' in: {rsync_base_dir}") + f"Searching for latest full backup for source '{source_name}' in: {profile_path}") full_backups = [] - if os.path.isdir(rsync_base_dir): - # Pattern matches any full backup for the given source name, encrypted or not + if os.path.isdir(profile_path): pattern = re.compile( rf"^(\d{{8}}-\d{{6}})_{re.escape(source_name)}_full_(plain|enc)$") - for item in os.listdir(rsync_base_dir): - item_path = os.path.join(rsync_base_dir, item) + for item in os.listdir(profile_path): + item_path = os.path.join(profile_path, item) if os.path.isdir(item_path) and pattern.match(item): full_backups.append(item) @@ -413,22 +371,21 @@ class BackupManager: full_backups.sort(reverse=True) latest_backup_dir = full_backups[0] - latest_backup_path = os.path.join(rsync_base_dir, latest_backup_dir) + latest_backup_path = os.path.join(profile_path, latest_backup_dir) self.logger.log( f"Found latest full backup for --link-dest: {latest_backup_path}") return latest_backup_path def _create_info_json(self, base_dest_path: str, backup_dir_name: str, source_name: str, backup_type: str, mode: str, size_bytes: int, is_encrypted: bool, based_on: Optional[str] = None, comment: str = ""): - """Creates a backup_info.json file inside the user-writable metadata directory.""" try: - # All metadata files go into a single, flat metadata directory for simplicity - metadata_path = os.path.join(base_dest_path, "pybackup", "metadata") + metadata_path = os.path.join( + base_dest_path, "pybackup", "metadata") os.makedirs(metadata_path, exist_ok=True) - info_file_path = os.path.join(metadata_path, f"{backup_dir_name}.json") - - # Format size for human-readable display + info_file_path = os.path.join( + metadata_path, f"{backup_dir_name}.json") + if size_bytes > 0: power = 1024 n = 0 @@ -456,12 +413,12 @@ class BackupManager: with open(info_file_path, 'w') as f: json.dump(info_data, f, indent=4) - self.logger.log(f"Successfully created metadata file: {info_file_path}") + self.logger.log( + f"Successfully created metadata file: {info_file_path}") except Exception as e: self.logger.log(f"Failed to create metadata file. Error: {e}") def get_comment(self, info_file_path: str) -> str: - """Reads the comment from a backup_info.json file.""" try: with open(info_file_path, 'r') as f: data = json.load(f) @@ -470,7 +427,6 @@ class BackupManager: return "" def update_comment(self, info_file_path: str, new_comment: str): - """Updates the comment in a backup_info.json file.""" try: with open(info_file_path, 'r') as f: data = json.load(f) @@ -479,9 +435,11 @@ class BackupManager: with open(info_file_path, 'w') as f: json.dump(data, f, indent=4) - self.logger.log(f"Successfully updated comment in {info_file_path}") + self.logger.log( + f"Successfully updated comment in {info_file_path}") except (IOError, json.JSONDecodeError) as e: - self.logger.log(f"Failed to update comment in {info_file_path}: {e}") + self.logger.log( + f"Failed to update comment in {info_file_path}: {e}") def _execute_rsync(self, queue, command: List[str]): transferred_size, total_size, stderr_output = 0, 0, "" @@ -562,18 +520,16 @@ class BackupManager: def _run_delete(self, path_to_delete: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str]): try: - # Determine metadata file path before any deletion backup_dir_name = os.path.basename(path_to_delete.rstrip('/')) metadata_file_path = os.path.join( base_dest_path, "pybackup", "metadata", f"{backup_dir_name}.json") - # Delete the backup data directory if is_encrypted: self.logger.log( f"Starting encrypted deletion for {path_to_delete}") mount_point = self.encryption_manager.get_mount_point( base_dest_path) - if not mount_point: + if not mount_point or not self.encryption_manager.is_mounted(base_dest_path): if password: mount_point = self.encryption_manager.mount_for_deletion( base_dest_path, is_system, password) @@ -587,13 +543,13 @@ class BackupManager: return internal_path_to_delete = os.path.join( - mount_point, backup_dir_name) + mount_point, os.path.basename(os.path.dirname(path_to_delete)), backup_dir_name) success = False if is_system: script_content = f"rm -rf '{internal_path_to_delete}'" success = self.encryption_manager._execute_as_root( script_content) - else: # User backup, no root needed + else: try: if os.path.isdir(internal_path_to_delete): shutil.rmtree(internal_path_to_delete) @@ -609,199 +565,15 @@ class BackupManager: self.logger.log( "Failed to delete files within encrypted container.") queue.put(('deletion_complete', False)) - return # Stop if data deletion failed + return elif is_system: script_content = f"rm -rf '{path_to_delete}'" if not self.encryption_manager._execute_as_root(script_content): self.logger.log(f"Failed to delete {path_to_delete}") queue.put(('deletion_complete', False)) - return # Stop if data deletion failed - else: # Unencrypted user backup - try: - if os.path.isdir(path_to_delete): - shutil.rmtree(path_to_delete) - self.logger.log(f"Successfully deleted {path_to_delete}") - except Exception as e: - self.logger.log( - f"Failed to delete unencrypted user backup {path_to_delete}: {e}") - queue.put(('deletion_complete', False)) - return # Stop if data deletion failed - - # Finally, delete the metadata file (with user permissions) - try: - if os.path.exists(metadata_file_path): - os.remove(metadata_file_path) - self.logger.log( - f"Successfully deleted metadata file {metadata_file_path}") - queue.put(('deletion_complete', True)) - except Exception as e: - self.logger.log( - f"Failed to delete metadata file {metadata_file_path}: {e}") - queue.put(('deletion_complete', False)) - - except Exception as e: - self.logger.log(f"Error during threaded deletion: {e}") - queue.put(('deletion_complete', False)) - - def get_comment(self, info_file_path: str) -> str: - """Reads the comment from a backup_info.json file.""" - try: - with open(info_file_path, 'r') as f: - data = json.load(f) - return data.get("comment", "") - except (IOError, json.JSONDecodeError): - return "" - - def update_comment(self, info_file_path: str, new_comment: str): - """Updates the comment in a backup_info.json file.""" - try: - with open(info_file_path, 'r') as f: - data = json.load(f) - - data["comment"] = new_comment - - with open(info_file_path, 'w') as f: - json.dump(data, f, indent=4) - self.logger.log(f"Successfully updated comment in {info_file_path}") - except (IOError, json.JSONDecodeError) as e: - self.logger.log(f"Failed to update comment in {info_file_path}: {e}") - - def _execute_rsync(self, queue, command: List[str]): - transferred_size, total_size, stderr_output = 0, 0, "" - try: - env = os.environ.copy() - env['LC_ALL'] = 'C' - self.process = subprocess.Popen( - command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, preexec_fn=os.setsid, env=env) - - if self.process.stdout: - for line in iter(self.process.stdout.readline, ''): - stripped_line = line.strip() - self.logger.log(f"Rsync stdout: {stripped_line}") - if '%' in stripped_line: - match = re.search(r'\s*(\d+)%\s+', stripped_line) - if match: - queue.put(('progress', int(match.group(1)))) - elif stripped_line and not stripped_line.startswith(('sending', 'sent', 'total')): - queue.put(('file_update', stripped_line)) - - self.process.wait() - if self.process.stderr: - stderr_output = self.process.stderr.read() - if stderr_output: - self.logger.log(f"Rsync Stderr: {stderr_output.strip()}") - - except FileNotFoundError: - self.logger.log(f"Error: '{command[0]}' not found.") - queue.put(('error', None)) - except Exception as e: - self.logger.log(f"Rsync execution error: {e}") - queue.put(('error', None)) - return transferred_size, total_size, stderr_output - - def start_restore(self, source_path: str, dest_path: str, is_compressed: bool): - from queue import Queue - queue = self.app.queue if hasattr(self.app, 'queue') else Queue() - thread = threading.Thread(target=self._run_restore, args=( - queue, source_path, dest_path, is_compressed)) - thread.daemon = True - thread.start() - - def _run_restore(self, queue, source_path: str, dest_path: str, is_compressed: bool): - self.logger.log(f"Starting restore from {source_path} to {dest_path}") - status = 'error' - try: - source = source_path.rstrip('/') + '/' - script_content = f"tar -xzf '{source_path}' -C '{dest_path}'" if is_compressed else f"rsync -aAXHv '{source}' '{dest_path}'" - if self.encryption_manager._execute_as_root(script_content): - status = 'success' + return else: - self.logger.log("Restore script failed.") - except Exception as e: - self.logger.log( - f"An unexpected error occurred during restore: {e}") - finally: - queue.put( - ('completion', {'status': status, 'returncode': 0 if status == 'success' else 1})) - - def get_scheduled_jobs(self) -> List[Dict[str, Any]]: - jobs_list = [] - try: - user_cron = CronTab(user=True) - for job in user_cron: - if self.app_tag in job.comment: - details = self._parse_job_comment(job.comment) - if details: - jobs_list.append(details) - except Exception as e: - self.logger.log(f"Error loading cron jobs: {e}") - return jobs_list - - def start_delete_backup(self, path_to_delete: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str] = None): - thread = threading.Thread(target=self._run_delete, args=( - path_to_delete, is_encrypted, is_system, base_dest_path, queue, password)) - thread.daemon = True - thread.start() - - def _run_delete(self, path_to_delete: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str]): - try: - # Determine metadata file path before any deletion - backup_dir_name = os.path.basename(path_to_delete.rstrip('/')) - metadata_file_path = os.path.join( - base_dest_path, "pybackup", "metadata", f"{backup_dir_name}.json") - - # Delete the backup data directory - if is_encrypted: - self.logger.log( - f"Starting encrypted deletion for {path_to_delete}") - mount_point = self.encryption_manager.get_mount_point( - base_dest_path) - if not mount_point: - if password: - mount_point = self.encryption_manager.mount_for_deletion( - base_dest_path, is_system, password) - else: - self.logger.log( - "Password not provided for encrypted deletion.") - - if not mount_point: - self.logger.log("Failed to unlock container for deletion.") - queue.put(('deletion_complete', False)) - return - - internal_path_to_delete = os.path.join( - mount_point, backup_dir_name) - success = False - if is_system: - script_content = f"rm -rf '{internal_path_to_delete}'" - success = self.encryption_manager._execute_as_root( - script_content) - else: # User backup, no root needed - try: - if os.path.isdir(internal_path_to_delete): - shutil.rmtree(internal_path_to_delete) - self.logger.log( - f"Successfully deleted {internal_path_to_delete}") - success = True - except Exception as e: - self.logger.log( - f"Failed to delete user backup {internal_path_to_delete}: {e}") - success = False - - if not success: - self.logger.log( - "Failed to delete files within encrypted container.") - queue.put(('deletion_complete', False)) - return # Stop if data deletion failed - - elif is_system: - script_content = f"rm -rf '{path_to_delete}'" - if not self.encryption_manager._execute_as_root(script_content): - self.logger.log(f"Failed to delete {path_to_delete}") - queue.put(('deletion_complete', False)) - return # Stop if data deletion failed - else: # Unencrypted user backup try: if os.path.isdir(path_to_delete): shutil.rmtree(path_to_delete) @@ -810,19 +582,20 @@ class BackupManager: self.logger.log( f"Failed to delete unencrypted user backup {path_to_delete}: {e}") queue.put(('deletion_complete', False)) - return # Stop if data deletion failed + return - # Finally, delete the metadata file (with user permissions) - try: - if os.path.exists(metadata_file_path): + if os.path.exists(metadata_file_path): + try: os.remove(metadata_file_path) self.logger.log( f"Successfully deleted metadata file {metadata_file_path}") + queue.put(('deletion_complete', True)) + except Exception as e: + self.logger.log( + f"Failed to delete metadata file {metadata_file_path}: {e}") + queue.put(('deletion_complete', False)) + else: queue.put(('deletion_complete', True)) - except Exception as e: - self.logger.log( - f"Failed to delete metadata file {metadata_file_path}: {e}") - queue.put(('deletion_complete', False)) except Exception as e: self.logger.log(f"Error during threaded deletion: {e}") @@ -849,10 +622,9 @@ if [ -n "{delete_path}" ] && [ "{delete_path}" != "/" ]; then rm -rf "{delete_pa self.logger.log( f"Attempting to cancel backup process with PID: {self.process.pid}") try: - # Terminate the entire process group to stop rsync os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) self.logger.log("Successfully sent SIGTERM to process group.") except ProcessLookupError: self.logger.log("Process already finished.") except Exception as e: - self.logger.log(f"Error cancelling process: {e}") + self.logger.log(f"Error cancelling process: {e}") \ No newline at end of file diff --git a/core/encryption_manager.py b/core/encryption_manager.py index 0512e31..6e3b8d8 100644 --- a/core/encryption_manager.py +++ b/core/encryption_manager.py @@ -89,9 +89,9 @@ class EncryptionManager: return password def get_container_path(self, base_dest_path: str) -> str: + """Returns the path for the LUKS container file itself.""" pybackup_dir = os.path.join(base_dest_path, "pybackup") - user_encrypt_dir = os.path.join(pybackup_dir, "user_encrypt") - return os.path.join(user_encrypt_dir, "pybackup_luks.img") + return os.path.join(pybackup_dir, "pybackup_encrypted.luks") def get_key_file_path(self, base_dest_path: str) -> str: pybackup_dir = os.path.join(base_dest_path, "pybackup") @@ -122,25 +122,33 @@ class EncryptionManager: return None def _get_password_or_key_cmd(self, base_dest_path: str, username: str) -> Tuple[str, Optional[str]]: + # 1. Check cache and keyring (without triggering dialog) + password = self.password_cache.get(username) or self.get_password_from_keyring(username) + if password: + self.logger.log("Using password from cache or keyring for LUKS operation.") + self.password_cache[username] = password # ensure it's cached + return "-", password + + # 2. Check for key file key_file_path = self.get_key_file_path(base_dest_path) if os.path.exists(key_file_path): self.logger.log( f"Using key file for LUKS operation: {key_file_path}") - return f'--key-file "{key_file_path}"', None - else: - password = self.get_password(username, confirm=False) - if not password: - return "", None - return "-", password + return f'--key-file "{key_file_path}"' + + # 3. If nothing found, prompt for password + self.logger.log("No password in keyring and no keyfile found. Prompting user.") + password = self.get_password(username, confirm=False) # This will now definitely open the dialog + if not password: + return "", None + return "-", password def is_encrypted(self, base_dest_path: str) -> bool: return os.path.exists(self.get_container_path(base_dest_path)) def get_mount_point(self, base_dest_path: str) -> str: """Constructs the unique, static mount point path for a given destination.""" - username = os.path.basename(base_dest_path.rstrip('/')) - mapper_name = f"pybackup_luks_{username}" - return os.path.join("/mnt", mapper_name) + return os.path.join(base_dest_path, "pybackup", "encrypted") def is_mounted(self, base_dest_path: str) -> bool: mount_point = self.get_mount_point(base_dest_path) diff --git a/main_app.py b/main_app.py index 4eb0a24..113c03c 100644 --- a/main_app.py +++ b/main_app.py @@ -352,19 +352,7 @@ class MainApplication(tk.Tk): if hasattr(self, 'header_frame'): self.header_frame.refresh_status() - container_path = os.path.join( - backup_dest_path, "pybackup_encrypted.luks") - if os.path.exists(container_path): - username = os.path.basename(backup_dest_path.rstrip('/')) - password = self.backup_manager.encryption_manager.get_password_from_keyring( - username) - if password: - self.backup_manager.encryption_manager.unlock_container( - backup_dest_path, password) - app_logger.log( - "Automatically unlocked encrypted container.") - if hasattr(self, 'header_frame'): - self.header_frame.refresh_status() + restore_src_path = self.config_manager.get_setting( "restore_source_path") diff --git a/pyimage_ui/header_frame.py b/pyimage_ui/header_frame.py index f0dccae..adab4dd 100644 --- a/pyimage_ui/header_frame.py +++ b/pyimage_ui/header_frame.py @@ -3,6 +3,7 @@ import os from core.pbp_app_config import Msg from shared_libs.common_tools import IconManager +from shared_libs.logger import app_logger class HeaderFrame(tk.Frame): def __init__(self, container, image_manager, encryption_manager, app, **kwargs): @@ -68,14 +69,23 @@ class HeaderFrame(tk.Frame): def refresh_status(self): """Checks the keyring status based on the current destination and updates the label.""" + app_logger.log("HeaderFrame: Refreshing status...") dest_path = self.app.destination_path + app_logger.log(f"HeaderFrame: Destination path is '{dest_path}'") + if not dest_path or not self.encryption_manager.is_encrypted(dest_path): + app_logger.log("HeaderFrame: No destination path or not encrypted. Clearing status.") self.keyring_status_label.config(text="") # Clear status if not encrypted return - + + app_logger.log("HeaderFrame: Destination is encrypted.") username = os.path.basename(dest_path.rstrip('/')) + app_logger.log(f"HeaderFrame: Username is '{username}'") - if self.encryption_manager.is_mounted(dest_path): + is_mounted = self.encryption_manager.is_mounted(dest_path) + app_logger.log(f"HeaderFrame: Is mounted? {is_mounted}") + + if is_mounted: status_text = "Key: In Use" auth_method = getattr(self.encryption_manager, 'auth_method', None) if auth_method == 'keyring': @@ -86,18 +96,25 @@ class HeaderFrame(tk.Frame): text=status_text, fg="#2E8B57" # SeaGreen ) - elif self.encryption_manager.is_key_in_keyring(username): - self.keyring_status_label.config( - text="Key: Available (Keyring)", - fg="#FFD700" # Gold - ) - elif os.path.exists(self.encryption_manager.get_key_file_path(dest_path)): - self.keyring_status_label.config( - text="Key: Available (Keyfile)", - fg="#FFD700" # Gold - ) else: - self.keyring_status_label.config( - text="Key: Not Available", - fg="#A9A9A9" # DarkGray - ) + key_in_keyring = self.encryption_manager.is_key_in_keyring(username) + app_logger.log(f"HeaderFrame: Key in keyring? {key_in_keyring}") + key_file_exists = os.path.exists(self.encryption_manager.get_key_file_path(dest_path)) + app_logger.log(f"HeaderFrame: Key file exists? {key_file_exists}") + + if key_in_keyring: + self.keyring_status_label.config( + text="Key: Available (Keyring)", + fg="#FFD700" # Gold + ) + elif key_file_exists: + self.keyring_status_label.config( + text="Key: Available (Keyfile)", + fg="#FFD700" # Gold + ) + else: + self.keyring_status_label.config( + text="Key: Not Available", + fg="#A9A9A9" # DarkGray + ) + app_logger.log("HeaderFrame: Status refresh complete.") \ No newline at end of file