Files
Py-Backup/core/encryption_manager.py
Désiré Werner Menrath 22144859d8 feat: Implement Hard Reset and Refactor PasswordDialog
This commit introduces a new "Hard Reset" functionality in the settings, allowing users to reset the application to its initial state by deleting the configuration directory.

Key changes include:
- Added a "Hard Reset" button and a dedicated confirmation frame in `settings_frame.py`.
- Implemented the logic to delete the `.config/py_backup` directory and restart the application.
- Enhanced the hard reset process to unmount encrypted drives if they are mounted, prompting for a password if necessary.

To improve modularity and maintainability, the PasswordDialog class has been refactored:
- Moved PasswordDialog from `pyimage_ui/password_dialog.py` to `shared_libs/message.py`.
- Updated all references and imports to the new location.
- Externalized all user-facing strings in PasswordDialog for translation support.

Additionally, several bug fixes and improvements were made:
- Corrected object access hierarchy in `settings_frame.py` and `advanced_settings_frame.py` by passing manager instances directly.
- Handled `FileNotFoundError` in `actions.py` when selecting remote backup destinations, preventing crashes and displaying "N/A" for disk usage.
- Replaced incorrect `calculating_animation` reference with `animated_icon` in `actions.py`.
- Added missing translation keys in `pbp_app_config.py`.
2025-09-11 01:08:38 +02:00

392 lines
16 KiB
Python

import keyring
import keyring.errors
from keyring.backends import SecretService
import os
import shutil
import subprocess
import math
from typing import Optional, Tuple
from core.pbp_app_config import AppConfig, Msg
from shared_libs.message import PasswordDialog
import json
class EncryptionManager:
def __init__(self, logger, app=None):
try:
keyring.set_keyring(SecretService.Keyring())
except Exception as e:
logger.log(f"Failed to set keyring backend to SecretService: {e}")
self.logger = logger
self.app = app
self.service_id = "py-backup-encryption"
self.mounted_destinations = set()
self.password_cache = {}
self.lock_file = AppConfig.LOCK_FILE_PATH
def _write_lock_file(self, data):
with open(self.lock_file, 'w') as f:
json.dump(data, f)
def _read_lock_file(self):
if not self.lock_file.exists():
return []
with open(self.lock_file, 'r') as f:
try:
return json.load(f)
except json.JSONDecodeError:
return []
def add_to_lock_file(self, base_path, mapper_name):
locks = self._read_lock_file()
if not any(lock['base_path'] == base_path for lock in locks):
locks.append({"base_path": base_path, "mapper_name": mapper_name})
self._write_lock_file(locks)
def remove_from_lock_file(self, base_path):
locks = self._read_lock_file()
updated_locks = [
lock for lock in locks if lock['base_path'] != base_path]
self._write_lock_file(updated_locks)
def get_password_from_keyring(self, username: str) -> Optional[str]:
try:
return keyring.get_password(self.service_id, username)
except Exception as e:
self.logger.log(f"Could not get password from keyring: {e}")
return None
def is_key_in_keyring(self, username: str) -> bool:
return self.get_password_from_keyring(username) is not None
def set_password_in_keyring(self, username: str, password: str) -> bool:
try:
keyring.set_password(self.service_id, username, password)
self.logger.log(f"Password for {username} stored in keyring.")
return True
except Exception as e:
self.logger.log(f"Could not set password in keyring: {e}")
return False
def get_password(self, username: str, confirm: bool = True) -> Optional[str]:
if username in self.password_cache:
return self.password_cache[username]
password = self.get_password_from_keyring(username)
if password:
self.password_cache[username] = password
return password
dialog = PasswordDialog(
self.app, title=f"Enter password for {username}", confirm=confirm, translations=Msg.STR)
password, save_to_keyring = dialog.get_password()
if password:
self.password_cache[username] = password
if save_to_keyring:
self.set_password_in_keyring(username, password)
return password
def get_container_path(self, base_dest_path: str) -> str:
"""Returns the path for the LUKS container file itself."""
pybackup_dir = os.path.join(base_dest_path, "pybackup")
return os.path.join(pybackup_dir, "pybackup_encrypted.luks")
def get_key_file_path(self, base_dest_path: str) -> str:
pybackup_dir = os.path.join(base_dest_path, "pybackup")
return os.path.join(pybackup_dir, "luks.keyfile")
def create_and_add_key_file(self, base_dest_path: str, password: str) -> Optional[str]:
key_file_path = self.get_key_file_path(base_dest_path)
container_path = self.get_container_path(base_dest_path)
try:
with open(key_file_path, 'wb') as f:
f.write(os.urandom(4096))
os.chmod(key_file_path, 0o400)
self.logger.log(f"Generated new key file at {key_file_path}")
script = f'cryptsetup luksAddKey "{container_path}" "{key_file_path}"'
if self._execute_as_root(script, password):
self.logger.log(
"Successfully added key file to LUKS container.")
return key_file_path
else:
self.logger.log("Failed to add key file to LUKS container.")
os.remove(key_file_path)
return None
except Exception as e:
self.logger.log(f"Error creating key file: {e}")
if os.path.exists(key_file_path):
os.remove(key_file_path)
return None
def _get_password_or_key_cmd(self, base_dest_path: str, username: str) -> Tuple[str, Optional[str]]:
# 1. Check cache and keyring (without triggering dialog)
password = self.password_cache.get(
username) or self.get_password_from_keyring(username)
if password:
self.logger.log(
"Using password from cache or keyring for LUKS operation.")
self.password_cache[username] = password # ensure it's cached
return "-", password
# 2. Check for key file
key_file_path = self.get_key_file_path(base_dest_path)
if os.path.exists(key_file_path):
self.logger.log(
f"Using key file for LUKS operation: {key_file_path}")
return f'--key-file "{key_file_path}"'
# 3. If nothing found, prompt for password
self.logger.log(
"No password in keyring and no keyfile found. Prompting user.")
# This will now definitely open the dialog
password = self.get_password(username, confirm=False)
if not password:
return "", None
return "-", password
def is_encrypted(self, base_dest_path: str) -> bool:
return os.path.exists(self.get_container_path(base_dest_path))
def get_mount_point(self, base_dest_path: str) -> str:
"""Constructs the unique, static mount point path for a given destination."""
return os.path.join(base_dest_path, "pybackup", "encrypted")
def is_mounted(self, base_dest_path: str) -> bool:
mount_point = self.get_mount_point(base_dest_path)
return os.path.ismount(mount_point) or base_dest_path in self.mounted_destinations
def mount_for_deletion(self, base_dest_path: str, is_system: bool, password: str) -> Optional[str]:
self.logger.log("Mounting container for deletion operation.")
if self._open_and_mount(base_dest_path, is_system, password):
mount_point = self.get_mount_point(base_dest_path)
self.mounted_destinations.add(base_dest_path)
return mount_point
self.logger.log("Failed to mount container for deletion.")
return None
def prepare_encrypted_destination(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]:
container_path = self.get_container_path(base_dest_path)
if os.path.exists(container_path):
return self._handle_existing_container(base_dest_path, is_system, source_size, queue)
else:
return self._handle_new_container(base_dest_path, is_system, source_size, queue)
def _handle_existing_container(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]:
self.logger.log("Handling existing container.")
username = os.path.basename(base_dest_path.rstrip('/'))
mount_point = self.get_mount_point(base_dest_path)
if not self.is_mounted(base_dest_path):
if not self._open_and_mount(base_dest_path, is_system):
self.logger.log("Failed to mount container for size check.")
return None
free_space = shutil.disk_usage(mount_point).free
required_space = int(source_size * 1.15)
if required_space > free_space:
self.logger.log(
f"Resize needed. Free: {free_space}, Required: {required_space}")
queue.put(('status_update', "Container zu klein. Vergrößere..."))
current_total = shutil.disk_usage(mount_point).total
needed_additional = required_space - free_space
new_total_size = current_total + needed_additional
container_path = self.get_container_path(base_dest_path)
mapper_name = f"pybackup_luks_{username}"
chown_cmd = self._get_chown_command(mount_point, is_system)
key_or_pass_arg, password = self._get_password_or_key_cmd(
base_dest_path, username)
if not key_or_pass_arg:
return None
luks_open_cmd = f'echo -n "$LUKSPASS" | cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}' if password else f'cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}'
resize_script = f"""
# Unmount cleanly first
umount -l \"{mount_point}\" || true
cryptsetup luksClose {mapper_name} || true
# Resize container file
truncate -s {int(new_total_size)} \"{container_path}\"
# Re-open, check, and resize filesystem
{luks_open_cmd}
e2fsck -fy \"/dev/mapper/{mapper_name}\"
resize2fs \"/dev/mapper/{mapper_name}\"
# Now mount it
mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"
{chown_cmd}
"""
if not self._execute_as_root(resize_script, password):
self.logger.log("Failed to execute resize and remount script.")
return None
if not self.is_mounted(base_dest_path):
self.logger.log(
"CRITICAL: Mount failed after resize script, but script reported success. Aborting.")
return None
self.mounted_destinations.add(base_dest_path)
return mount_point
def _handle_new_container(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]:
self.logger.log("Handling new container creation.")
size_gb = math.ceil((source_size * 1.2) / (1024**3)) + 5
username = os.path.basename(base_dest_path.rstrip('/'))
password = self.get_password(username, confirm=True)
if not password:
return None
container_path = self.get_container_path(base_dest_path)
mount_point = self.get_mount_point(base_dest_path)
mapper_name = f"pybackup_luks_{username}"
chown_cmd = self._get_chown_command(mount_point, is_system)
script = f"""
mkdir -p \"{os.path.dirname(container_path)}\"\n mkdir -p \"{mount_point}\"\n truncate -s {int(size_gb)}G \"{container_path}\"\n echo -n \"$LUKSPASS\" | cryptsetup luksFormat \"{container_path}\" -
echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} -
mkfs.ext4 \"/dev/mapper/{mapper_name}\"
mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"\n {chown_cmd}\n """
if not self._execute_as_root(script, password):
return None
self.mounted_destinations.add(base_dest_path)
return mount_point
def _open_and_mount(self, base_dest_path: str, is_system: bool, password_override: Optional[str] = None) -> bool:
username = os.path.basename(base_dest_path.rstrip('/'))
key_or_pass_arg, password = "", None
if password_override:
password = password_override
key_or_pass_arg = "-"
else:
key_or_pass_arg, password = self._get_password_or_key_cmd(
base_dest_path, username)
if not key_or_pass_arg:
return False
container_path = self.get_container_path(base_dest_path)
mount_point = self.get_mount_point(base_dest_path)
mapper_name = f"pybackup_luks_{username}"
chown_cmd = self._get_chown_command(mount_point, is_system)
luks_open_cmd = f'echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} {key_or_pass_arg}' if password else f'cryptsetup luksOpen \"{container_path}\" {mapper_name} {key_or_pass_arg}'
script = f"""
umount -l \"{mount_point}\" || true
cryptsetup luksClose {mapper_name} || true
mkdir -p \"{mount_point}\"
{luks_open_cmd}
mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"
{chown_cmd}
"""
if self._execute_as_root(script, password):
self.add_to_lock_file(base_dest_path, mapper_name)
return True
return False
def unmount_and_reset_owner(self, base_dest_path: str, force_unmap=False):
username = os.path.basename(base_dest_path.rstrip('/'))
mapper_name = f"pybackup_luks_{username}"
if not os.path.exists(f"/dev/mapper/{mapper_name}") and not self.is_mounted(base_dest_path):
if not force_unmap:
return
self.logger.log(f"Unmounting and resetting owner for {base_dest_path}")
mount_point = self.get_mount_point(base_dest_path)
script = f"""
chown root:root \"{mount_point}\" || true
umount -l \"{mount_point}\"
cryptsetup luksClose {mapper_name}
"""
password = self.password_cache.get(username)
self._execute_as_root(script, password)
self.remove_from_lock_file(base_dest_path)
if base_dest_path in self.mounted_destinations:
self.mounted_destinations.remove(base_dest_path)
if username in self.password_cache:
del self.password_cache[username]
def unmount_all(self):
self.logger.log(f"Unmounting all: {self.mounted_destinations}")
for path in list(self.mounted_destinations):
self.unmount_and_reset_owner(path, force_unmap=True)
def unmount_all_encrypted_drives(self, password: str) -> Tuple[bool, str]:
for path in list(self.mounted_destinations):
self.unmount_and_reset_owner(path, force_unmap=True)
return True, "Successfully unmounted all drives."
def _get_chown_command(self, mount_point: str, is_system: bool) -> str:
if not is_system:
try:
uid = os.getuid()
gid = os.getgid()
return f"chown {uid}:{gid} \"{mount_point}\""
except Exception as e:
self.logger.log(
f"Could not get current user UID/GID for chown: {e}")
return ""
def _execute_as_root(self, script_content: str, password_for_stdin: Optional[str] = None) -> bool:
try:
if password_for_stdin:
escaped_password = password_for_stdin.replace("'", "'\\\''")
script_content = f"LUKSPASS='{escaped_password}'\n{script_content}"
project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..'))
runner_script_path = os.path.join(
project_root, 'core', 'privileged_script_runner.sh')
if not os.path.exists(runner_script_path):
self.logger.log(
f"CRITICAL: Privileged script runner not found at {runner_script_path}")
return False
command = ['pkexec', runner_script_path]
log_lines = []
for line in script_content.split('\n'):
if "LUKSPASS=" in line:
log_lines.append("LUKSPASS='[REDACTED]'")
else:
log_lines.append(line)
sanitized_script_content = "\n".join(log_lines)
self.logger.log(
f"Executing privileged command via runner: {runner_script_path}")
self.logger.log(
f"Script content to be piped:\n---\n{sanitized_script_content}\n---")
result = subprocess.run(
command, input=script_content, capture_output=True, text=True, check=False)
if result.returncode == 0:
log_output = f"Privileged script executed successfully."
if result.stdout:
log_output += f"\nStdout:\n{result.stdout}"
if result.stderr:
log_output += f"\nStderr:\n{result.stderr}"
self.logger.log(log_output)
return True
else:
self.logger.log(
f"Privileged script failed. Return code: {result.returncode}\nStderr: {result.stderr}\nStdout: {result.stdout}")
return False
except Exception as e:
self.logger.log(
f"Failed to set up or execute privileged command: {e}")
return False