Files
Py-Backup/core/encryption_manager.py
Désiré Werner Menrath dbaa623b17 fix(backup): Resolve multiple issues in encrypted backup handling
This commit addresses several bugs related to the mounting, unmounting, and deletion of encrypted backups, as well as a crash when listing backups.

The key changes are:
- **Fix Double Mount on View:** Removed redundant mount operation when viewing encrypted backup contents. The mount is now handled by a single, centralized function.
- **Fix Deletion of Encrypted Backups:**
    - The container is no longer re-mounted if already open, preventing a second password prompt.
    - Deletion of encrypted *user* backups is now performed with user-level permissions, removing the need for a third password prompt via pkexec.
- **Fix UI Refresh after Deletion:** The backup list now correctly refreshes after a backup is deleted.
- **Fix Crash on Empty Backup List:** Resolved an `UnboundLocalError` that occurred when listing backups from an empty or non-existent backup directory.
- **Improve Mount Detection:** The `is_mounted` check is now more robust to prevent race conditions or other OS-level inconsistencies.
2025-09-07 19:02:39 +02:00

252 lines
11 KiB
Python

import keyring
import keyring.errors
from keyring.backends import SecretService
import os
import shutil
import subprocess
import tempfile
import stat
import re
import math
from typing import Optional, List
from core.pbp_app_config import AppConfig
from pyimage_ui.password_dialog import PasswordDialog
class EncryptionManager:
def __init__(self, logger, app=None):
try:
keyring.set_keyring(SecretService.Keyring())
except Exception as e:
logger.log(f"Failed to set keyring backend to SecretService: {e}")
self.logger = logger
self.app = app
self.service_id = "py-backup-encryption"
self.mounted_destinations = set()
def get_password_from_keyring(self, username: str) -> Optional[str]:
try:
return keyring.get_password(self.service_id, username)
except Exception as e:
self.logger.log(f"Could not get password from keyring: {e}")
return None
def is_key_in_keyring(self, username: str) -> bool:
return self.get_password_from_keyring(username) is not None
def set_password_in_keyring(self, username: str, password: str) -> bool:
try:
keyring.set_password(self.service_id, username, password)
self.logger.log(f"Password for {username} stored in keyring.")
return True
except Exception as e:
self.logger.log(f"Could not set password in keyring: {e}")
return False
def get_password(self, username: str, confirm: bool = True) -> Optional[str]:
password = self.get_password_from_keyring(username)
if password:
return password
dialog = PasswordDialog(self.app, title=f"Enter password for {username}", confirm=confirm)
password, save_to_keyring = dialog.get_password()
if password and save_to_keyring:
self.set_password_in_keyring(username, password)
return password
def get_container_path(self, base_dest_path: str) -> str:
pybackup_dir = os.path.join(base_dest_path, "pybackup")
user_encrypt_dir = os.path.join(pybackup_dir, "user_encrypt")
return os.path.join(user_encrypt_dir, "pybackup_luks.img")
def is_encrypted(self, base_dest_path: str) -> bool:
return os.path.exists(self.get_container_path(base_dest_path))
def is_mounted(self, base_dest_path: str) -> bool:
pybackup_dir = os.path.join(base_dest_path, "pybackup")
mount_point = os.path.join(pybackup_dir, "encrypted")
return os.path.ismount(mount_point) or base_dest_path in self.mounted_destinations
def mount_for_deletion(self, base_dest_path: str, is_system: bool, password: str) -> Optional[str]:
self.logger.log("Mounting container for deletion operation.")
if self._open_and_mount(base_dest_path, is_system, password):
mount_point = os.path.join(os.path.dirname(self.get_container_path(base_dest_path)), "..", "encrypted")
self.mounted_destinations.add(base_dest_path)
return mount_point
self.logger.log("Failed to mount container for deletion.")
return None
def prepare_encrypted_destination(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]:
container_path = self.get_container_path(base_dest_path)
if os.path.exists(container_path):
return self._handle_existing_container(base_dest_path, is_system, source_size, queue)
else:
return self._handle_new_container(base_dest_path, is_system, source_size, queue)
def _handle_existing_container(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]:
self.logger.log("Handling existing container.")
mount_point = os.path.join(os.path.dirname(self.get_container_path(base_dest_path)), "..", "encrypted")
if not self._open_and_mount(base_dest_path, is_system):
self.logger.log("Failed to mount container for size check.")
return None
free_space = shutil.disk_usage(mount_point).free
required_space = int(source_size * 1.15)
if required_space > free_space:
self.logger.log(f"Resize needed. Free: {free_space}, Required: {required_space}")
queue.put(('status_update', "Container zu klein. Vergrößere..."))
current_total = shutil.disk_usage(mount_point).total
needed_additional = required_space - free_space
new_total_size = current_total + needed_additional
self.unmount_and_reset_owner(base_dest_path)
if not self._resize_container(base_dest_path, int(new_total_size)):
self.logger.log("Failed to resize container.")
return None
if not self._open_and_mount(base_dest_path, is_system):
self.logger.log("Failed to remount after resize.")
return None
self.mounted_destinations.add(base_dest_path)
return mount_point
def _handle_new_container(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]:
self.logger.log("Handling new container creation.")
size_gb = math.ceil((source_size * 1.2) / (1024**3)) + 5
username = os.path.basename(base_dest_path.rstrip('/'))
password = self.get_password(username, confirm=True)
if not password: return None
container_path = self.get_container_path(base_dest_path)
mount_point = os.path.join(os.path.dirname(container_path), "..", "encrypted")
mapper_name = f"pybackup_luks_{username}"
chown_cmd = self._get_chown_command(mount_point, is_system)
script = f"""
mkdir -p \"{os.path.dirname(container_path)}\"\n mkdir -p \"{mount_point}\"\n truncate -s {int(size_gb)}G \"{container_path}\"\n echo -n \"$LUKSPASS\" | cryptsetup luksFormat \"{container_path}\" -
echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} -
mkfs.ext4 \"/dev/mapper/{mapper_name}\"
mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"\n {chown_cmd}\n """
if not self._execute_as_root(script, password):
return None
self.mounted_destinations.add(base_dest_path)
return mount_point
def _open_and_mount(self, base_dest_path: str, is_system: bool, password: Optional[str] = None) -> bool:
username = os.path.basename(base_dest_path.rstrip('/'))
if not password:
password = self.get_password(username, confirm=False)
if not password: return False
container_path = self.get_container_path(base_dest_path)
mount_point = os.path.join(os.path.dirname(container_path), "..", "encrypted")
mapper_name = f"pybackup_luks_{username}"
chown_cmd = self._get_chown_command(mount_point, is_system)
script = f"""
umount -l \"{mount_point}\" || true
cryptsetup luksClose {mapper_name} || true
mkdir -p \"{mount_point}\"
echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} -
mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"
{chown_cmd}
"""
return self._execute_as_root(script, password)
def _resize_container(self, base_dest_path: str, new_size_bytes: int) -> bool:
username = os.path.basename(base_dest_path.rstrip('/'))
password = self.get_password(username, confirm=False)
if not password: return False
container_path = self.get_container_path(base_dest_path)
mapper_name = f"pybackup_luks_{username}"
script = f"""
truncate -s {new_size_bytes} \"{container_path}\"\n echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} -
e2fsck -fy \"/dev/mapper/{mapper_name}\"\n resize2fs \"/dev/mapper/{mapper_name}\"\n cryptsetup luksClose {mapper_name}
"""
return self._execute_as_root(script, password)
def unmount_and_reset_owner(self, base_dest_path: str, force_unmap=False):
mapper_name = f"pybackup_luks_{os.path.basename(base_dest_path.rstrip('/'))}"
if not os.path.exists(f"/dev/mapper/{mapper_name}") and not self.is_mounted(base_dest_path):
if not force_unmap: return
self.logger.log(f"Unmounting and resetting owner for {base_dest_path}")
container_path = self.get_container_path(base_dest_path)
mount_point = os.path.join(os.path.dirname(container_path), "..", "encrypted")
script = f"""
chown root:root \"{mount_point}\" || true
umount -l \"{mount_point}\" || true
cryptsetup luksClose {mapper_name} || true
"""
self._execute_as_root(script)
if base_dest_path in self.mounted_destinations:
self.mounted_destinations.remove(base_dest_path)
def unmount_all(self):
self.logger.log(f"Unmounting all: {self.mounted_destinations}")
for path in list(self.mounted_destinations):
self.unmount_and_reset_owner(path, force_unmap=True)
def _get_chown_command(self, mount_point: str, is_system: bool) -> str:
if not is_system:
try:
uid = os.getuid()
gid = os.getgid()
return f"chown {uid}:{gid} \"{mount_point}\""
except Exception as e:
self.logger.log(f"Could not get current user UID/GID for chown: {e}")
return ""
def _execute_as_root(self, script_content: str, password_for_stdin: Optional[str] = None) -> bool:
script_path = ''
try:
if password_for_stdin:
escaped_password = password_for_stdin.replace("'", "'\\''")
script_content = f"LUKSPASS='{escaped_password}'\n{script_content}"
with tempfile.NamedTemporaryFile(mode='w', delete=False, prefix="pybackup_script_", suffix=".sh", dir="/tmp") as tmp_script:
tmp_script.write("#!/bin/bash\n" + script_content)
script_path = tmp_script.name
os.chmod(script_path, stat.S_IRWXU)
command = ['pkexec', script_path]
log_lines = []
for line in script_content.split('\n'):
if "LUKSPASS=" in line:
log_lines.append("LUKSPASS='[REDACTED]'")
else:
log_lines.append(line)
sanitized_script_content = "\n".join(log_lines)
self.logger.log(f"Executing privileged command via script: {script_path}")
self.logger.log(f"Script content:\n---\n{sanitized_script_content}\n---")
result = subprocess.run(command, capture_output=True, text=True, check=False)
if result.returncode == 0:
self.logger.log(f"Privileged script executed successfully. Output:\n{result.stdout}")
return True
else:
self.logger.log(f"Privileged script failed. Return code: {result.returncode}\nStderr: {result.stderr}\nStdout: {result.stdout}")
return False
except Exception as e:
self.logger.log(f"Failed to set up or execute privileged command: {e}")
return False
finally:
if script_path and os.path.exists(script_path):
try:
os.remove(script_path)
except OSError as e:
self.logger.log(f"Error removing temp script {script_path}: {e}")