Files
Py-Backup/core/encryption_manager.py
Désiré Werner Menrath 73e6e42485 Refactor: Encrypted backups to use direct LUKS
Replaced the LVM-on-a-file implementation with a more robust, industry-standard LUKS-on-a-file approach.

This change was motivated by persistent and hard-to-debug errors related to LVM state management and duplicate loop device detection during repeated mount/unmount cycles.

The new implementation provides several key benefits:
- **Robustness:** Eliminates the entire LVM layer, which was the root cause of the mount/unmount failures.
- **Improved UX:** Drastically reduces the number of password prompts for encrypted user backups. By changing ownership of the mountpoint, rsync can run with user privileges.
- **Enhanced Security:** The file transfer process (rsync) for user backups no longer runs with root privileges.
- **Better Usability:** Encrypted containers are now left mounted during the application's lifecycle and are only unmounted on exit, improving workflow for consecutive operations.
2025-09-07 15:58:28 +02:00

239 lines
11 KiB
Python

import keyring
import keyring.errors
from keyring.backends import SecretService
import os
import shutil
import subprocess
import tempfile
import stat
import re
import math
from typing import Optional, List
from core.pbp_app_config import AppConfig
from pyimage_ui.password_dialog import PasswordDialog
class EncryptionManager:
def __init__(self, logger, app=None):
try:
keyring.set_keyring(SecretService.Keyring())
except Exception as e:
logger.log(f"Failed to set keyring backend to SecretService: {e}")
self.logger = logger
self.app = app
self.service_id = "py-backup-encryption"
self.mounted_destinations = set()
def get_password_from_keyring(self, username: str) -> Optional[str]:
try:
return keyring.get_password(self.service_id, username)
except Exception as e:
self.logger.log(f"Could not get password from keyring: {e}")
return None
def is_key_in_keyring(self, username: str) -> bool:
return self.get_password_from_keyring(username) is not None
def set_password_in_keyring(self, username: str, password: str) -> bool:
try:
keyring.set_password(self.service_id, username, password)
self.logger.log(f"Password for {username} stored in keyring.")
return True
except Exception as e:
self.logger.log(f"Could not set password in keyring: {e}")
return False
def get_password(self, username: str, confirm: bool = True) -> Optional[str]:
password = self.get_password_from_keyring(username)
if password:
return password
dialog = PasswordDialog(self.app, title=f"Enter password for {username}", confirm=confirm)
password, save_to_keyring = dialog.get_password()
if password and save_to_keyring:
self.set_password_in_keyring(username, password)
return password
def get_container_path(self, base_dest_path: str) -> str:
pybackup_dir = os.path.join(base_dest_path, "pybackup")
user_encrypt_dir = os.path.join(pybackup_dir, "user_encrypt")
return os.path.join(user_encrypt_dir, "pybackup_luks.img")
def is_encrypted(self, base_dest_path: str) -> bool:
return os.path.exists(self.get_container_path(base_dest_path))
def is_mounted(self, base_dest_path: str) -> bool:
pybackup_dir = os.path.join(base_dest_path, "pybackup")
mount_point = os.path.join(pybackup_dir, "encrypted")
return os.path.ismount(mount_point)
def prepare_encrypted_destination(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]:
container_path = self.get_container_path(base_dest_path)
if os.path.exists(container_path):
return self._handle_existing_container(base_dest_path, is_system, source_size, queue)
else:
return self._handle_new_container(base_dest_path, is_system, source_size, queue)
def _handle_existing_container(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]:
self.logger.log("Handling existing container.")
mount_point = os.path.join(os.path.dirname(self.get_container_path(base_dest_path)), "..", "encrypted")
if not self._open_and_mount(base_dest_path, is_system):
self.logger.log("Failed to mount container for size check.")
return None
free_space = shutil.disk_usage(mount_point).free
required_space = int(source_size * 1.15)
if required_space > free_space:
self.logger.log(f"Resize needed. Free: {free_space}, Required: {required_space}")
queue.put(('status_update', "Container zu klein. Vergrößere..."))
current_total = shutil.disk_usage(mount_point).total
needed_additional = required_space - free_space
new_total_size = current_total + needed_additional
self.unmount_and_reset_owner(base_dest_path)
if not self._resize_container(base_dest_path, int(new_total_size)):
self.logger.log("Failed to resize container.")
return None
if not self._open_and_mount(base_dest_path, is_system):
self.logger.log("Failed to remount after resize.")
return None
self.mounted_destinations.add(base_dest_path)
return mount_point
def _handle_new_container(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]:
self.logger.log("Handling new container creation.")
size_gb = math.ceil((source_size * 1.2) / (1024**3)) + 5
username = os.path.basename(base_dest_path.rstrip('/'))
password = self.get_password(username, confirm=True)
if not password: return None
container_path = self.get_container_path(base_dest_path)
mount_point = os.path.join(os.path.dirname(container_path), "..", "encrypted")
mapper_name = f"pybackup_luks_{username}"
chown_cmd = self._get_chown_command(mount_point, is_system)
script = f"""
mkdir -p \"{os.path.dirname(container_path)}\"\n mkdir -p \"{mount_point}\"\n truncate -s {int(size_gb)}G \"{container_path}\"\n echo -n \"$LUKSPASS\" | cryptsetup luksFormat \"{container_path}\" -
echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} -
mkfs.ext4 \"/dev/mapper/{mapper_name}\"
mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"\n {chown_cmd}\n """
if not self._execute_as_root(script, password):
return None
self.mounted_destinations.add(base_dest_path)
return mount_point
def _open_and_mount(self, base_dest_path: str, is_system: bool) -> bool:
username = os.path.basename(base_dest_path.rstrip('/'))
password = self.get_password(username, confirm=False)
if not password: return False
container_path = self.get_container_path(base_dest_path)
mount_point = os.path.join(os.path.dirname(container_path), "..", "encrypted")
mapper_name = f"pybackup_luks_{username}"
chown_cmd = self._get_chown_command(mount_point, is_system)
script = f"""
umount -l \"{mount_point}\" || true
cryptsetup luksClose {mapper_name} || true
mkdir -p \"{mount_point}\"\n echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} -
mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"\n {chown_cmd}\n """
return self._execute_as_root(script, password)
def _resize_container(self, base_dest_path: str, new_size_bytes: int) -> bool:
username = os.path.basename(base_dest_path.rstrip('/'))
password = self.get_password(username, confirm=False)
if not password: return False
container_path = self.get_container_path(base_dest_path)
mapper_name = f"pybackup_luks_{username}"
script = f"""
truncate -s {new_size_bytes} \"{container_path}\"\n echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} -
e2fsck -fy \"/dev/mapper/{mapper_name}\"\n resize2fs \"/dev/mapper/{mapper_name}\"\n cryptsetup luksClose {mapper_name}
"""
return self._execute_as_root(script, password)
def unmount_and_reset_owner(self, base_dest_path: str, force_unmap=False):
mapper_name = f"pybackup_luks_{os.path.basename(base_dest_path.rstrip('/'))}"
if not os.path.exists(f"/dev/mapper/{mapper_name}") and not self.is_mounted(base_dest_path):
if not force_unmap: return
self.logger.log(f"Unmounting and resetting owner for {base_dest_path}")
container_path = self.get_container_path(base_dest_path)
mount_point = os.path.join(os.path.dirname(container_path), "..", "encrypted")
script = f"""
chown root:root \"{mount_point}\" || true
umount -l \"{mount_point}\" || true
cryptsetup luksClose {mapper_name} || true
"""
self._execute_as_root(script)
if base_dest_path in self.mounted_destinations:
self.mounted_destinations.remove(base_dest_path)
def unmount_all(self):
self.logger.log(f"Unmounting all: {self.mounted_destinations}")
for path in list(self.mounted_destinations):
self.unmount_and_reset_owner(path, force_unmap=True)
def _get_chown_command(self, mount_point: str, is_system: bool) -> str:
if not is_system:
try:
uid = os.getuid()
gid = os.getgid()
return f"chown {uid}:{gid} \"{mount_point}\""
except Exception as e:
self.logger.log(f"Could not get current user UID/GID for chown: {e}")
return ""
def _execute_as_root(self, script_content: str, password_for_stdin: Optional[str] = None) -> bool:
script_path = ''
try:
if password_for_stdin:
escaped_password = password_for_stdin.replace("'", "'\\''")
script_content = f"LUKSPASS='{escaped_password}'\n{script_content}"
with tempfile.NamedTemporaryFile(mode='w', delete=False, prefix="pybackup_script_", suffix=".sh", dir="/tmp") as tmp_script:
tmp_script.write("#!/bin/bash\n" + script_content)
script_path = tmp_script.name
os.chmod(script_path, stat.S_IRWXU)
command = ['pkexec', script_path]
log_lines = []
for line in script_content.split('\n'):
if "LUKSPASS=" in line:
log_lines.append("LUKSPASS='[REDACTED]'")
else:
log_lines.append(line)
sanitized_script_content = "\n".join(log_lines)
self.logger.log(f"Executing privileged command via script: {script_path}")
self.logger.log(f"Script content:\n---\n{sanitized_script_content}\n---")
result = subprocess.run(command, capture_output=True, text=True, check=False)
if result.returncode == 0:
self.logger.log(f"Privileged script executed successfully. Output:\n{result.stdout}")
return True
else:
self.logger.log(f"Privileged script failed. Return code: {result.returncode}\nStderr: {result.stderr}\nStdout: {result.stdout}")
return False
except Exception as e:
self.logger.log(f"Failed to set up or execute privileged command: {e}")
return False
finally:
if script_path and os.path.exists(script_path):
try:
os.remove(script_path)
except OSError as e:
self.logger.log(f"Error removing temp script {script_path}: {e}")