Files
Py-Backup/core/encryption_manager.py
Désiré Werner Menrath 452a56b813 feat: Implement auto-scaling encrypted containers and fix UI workflow
Refactors the encryption mechanism to use a flexible LVM-on-a-loop-device backend instead of a fixed-size file. This resolves issues with containers running out of space.

- Implements auto-resizing of the container when a backup fails due to lack of space.
- Implements transparent inspection of encrypted containers, allowing the UI to display their contents (full/incremental backups) just like unencrypted ones.
- Fixes deletion of encrypted backups by ensuring the container is unlocked before deletion.
- Fixes a bug where deleting unencrypted user backups incorrectly required root privileges.
- Fixes a UI freeze caused by calling a password dialog from a non-UI thread during deletion.
- Simplifies the UI by removing the now-obsolete "Show Encrypted Backups" button.
- Changes the default directory for encrypted user backups to `user_encrypt`.
2025-09-06 12:46:36 +02:00

363 lines
15 KiB
Python

import keyring
import keyring.errors
from keyring.backends import SecretService
import os
import shutil
import subprocess
import tempfile
import stat
import re
from typing import Optional, List
from core.pbp_app_config import AppConfig
from pyimage_ui.password_dialog import PasswordDialog
class EncryptionManager:
def __init__(self, logger, app=None):
try:
keyring.set_keyring(SecretService.Keyring())
except Exception as e:
logger.log(f"Failed to set keyring backend to SecretService: {e}")
self.logger = logger
self.app = app
self.service_id = "py-backup-encryption"
self.session_password = None
def get_key_file_path(self, base_dest_path: str) -> str:
"""Generates the standard path for the key file for a given destination."""
key_filename = f"keyfile_{os.path.basename(base_dest_path.rstrip('/'))}.key"
return os.path.join(AppConfig.CONFIG_DIR, key_filename)
def create_and_add_key_file(self, base_dest_path: str, password: str) -> Optional[str]:
"""Creates a new key file and adds it as a valid key to the LUKS container."""
# TODO: This needs to be adapted for the new LVM-based structure.
self.logger.log("create_and_add_key_file is not yet implemented for LVM containers.")
return None
def get_password_from_keyring(self, username: str) -> Optional[str]:
try:
return keyring.get_password(self.service_id, username)
except keyring.errors.InitError as e:
self.logger.log(f"Could not initialize keyring: {e}")
return None
except Exception as e:
self.logger.log(f"Could not get password from keyring: {e}")
return None
def is_key_in_keyring(self, username: str) -> bool:
try:
return self.get_password_from_keyring(username) is not None
except Exception as e:
self.logger.log(f"Could not check password in keyring: {e}")
return False
def set_password_in_keyring(self, username: str, password: str) -> bool:
try:
keyring.set_password(self.service_id, username, password)
self.logger.log(f"Password for {username} stored in keyring.")
return True
except Exception as e:
self.logger.log(f"Could not set password in keyring: {e}")
return False
def delete_password_from_keyring(self, username: str):
try:
keyring.delete_password(self.service_id, username)
self.logger.log(f"Password for {username} deleted from keyring.")
except Exception as e:
self.logger.log(f"Could not delete password from keyring: {e}")
def clear_session_password(self):
self.session_password = None
def get_password(self, username: str, confirm: bool = True) -> Optional[str]:
if self.session_password:
return self.session_password
password = self.get_password_from_keyring(username)
if password:
self.session_password = password
return password
dialog = PasswordDialog(
self.app, title=f"Enter password for {username}", confirm=confirm)
password, save_to_keyring = dialog.get_password()
if password and save_to_keyring:
self.set_password_in_keyring(username, password)
if password:
self.session_password = password
return password
def is_container_mounted(self, base_dest_path: str) -> bool:
pybackup_dir = os.path.join(base_dest_path, "pybackup")
mount_point = os.path.join(pybackup_dir, "encrypted")
return os.path.ismount(mount_point)
def lock_container(self, base_dest_path: str):
self.cleanup_encrypted_backup(base_dest_path)
def setup_encrypted_backup(self, queue, base_dest_path: str, size_gb: int, password: Optional[str] = None, key_file: Optional[str] = None) -> Optional[str]:
self.logger.log(f"Setting up LVM-based encrypted container at {base_dest_path}")
for tool in ["cryptsetup", "losetup", "pvcreate", "vgcreate", "lvcreate", "lvextend", "resize2fs"]:
if not shutil.which(tool):
self.logger.log(f"Error: Required tool '{tool}' is not installed.")
queue.put(('error', f"Required tool '{tool}' is not installed."))
return None
pybackup_dir = os.path.join(base_dest_path, "pybackup")
encrypted_dir = os.path.join(pybackup_dir, "encrypted")
# New container is an image file for LVM
container_path = os.path.join(pybackup_dir, "pybackup_lvm.img")
mount_point = encrypted_dir
base_name = os.path.basename(base_dest_path.rstrip('/'))
vg_name = f"pybackup_vg_{base_name}"
lv_name = "backup_lv"
mapper_name = f"pybackup_luks_{base_name}"
lv_path = f"/dev/{vg_name}/{lv_name}"
if not password and not key_file:
self.logger.log("No password or key file provided for encryption.")
queue.put(('error', "No password or key file provided for encryption."))
return None
if os.path.ismount(mount_point):
self.logger.log(f"Mount point {mount_point} already in use. Cleaning up before proceeding.")
self.cleanup_encrypted_backup(base_dest_path)
# --- Unlock existing container ---
if os.path.exists(container_path):
self.logger.log(f"Encrypted LVM container {container_path} already exists. Attempting to unlock.")
if password:
auth_part = f"echo -n '{password}' | cryptsetup luksOpen {lv_path} {mapper_name} -"
else:
auth_part = f"cryptsetup luksOpen {lv_path} {mapper_name} --key-file {key_file}"
script = f"""
LOOP_DEVICE=$(losetup -f --show {container_path})
pvscan --cache
vgchange -ay {vg_name}
{auth_part}
mount /dev/mapper/{mapper_name} {mount_point}
echo "LOOP_DEVICE_PATH=$LOOP_DEVICE"
"""
if not self._execute_as_root(script):
self.logger.log("Failed to unlock existing LVM container. Check password/key or permissions.")
self.cleanup_encrypted_backup(base_dest_path)
return None
self.logger.log(f"Encrypted LVM container unlocked and mounted at {mount_point}")
return mount_point
# --- Create new container ---
else:
self.logger.log(f"Creating new LVM-based encrypted container: {container_path}")
if password:
format_auth_part = f"echo -n '{password}' | cryptsetup luksFormat {lv_path} -"
open_auth_part = f"echo -n '{password}' | cryptsetup luksOpen {lv_path} {mapper_name} -"
else:
format_auth_part = f"cryptsetup luksFormat {lv_path} --key-file {key_file}"
open_auth_part = f"cryptsetup luksOpen {lv_path} {mapper_name} --key-file {key_file}"
script = f"""
mkdir -p {encrypted_dir}
fallocate -l {size_gb}G {container_path}
LOOP_DEVICE=$(losetup -f --show {container_path})
pvcreate $LOOP_DEVICE
vgcreate {vg_name} $LOOP_DEVICE
lvcreate -n {lv_name} -l 100%FREE {vg_name}
{format_auth_part}
{open_auth_part}
mkfs.ext4 /dev/mapper/{mapper_name}
mount /dev/mapper/{mapper_name} {mount_point}
echo "LOOP_DEVICE_PATH=$LOOP_DEVICE"
"""
if not self._execute_as_root(script):
self.logger.log("Failed to create and setup LVM-based encrypted container.")
self.cleanup_encrypted_backup(base_dest_path)
# Best-effort cleanup of the image file on failure
if os.path.exists(container_path):
self._execute_as_root(f"rm -f {container_path}")
queue.put(('error', "Failed to setup LVM-based encrypted container."))
return None
self.logger.log(f"Encrypted LVM container is ready and mounted at {mount_point}")
return mount_point
def resize_encrypted_container(self, base_dest_path: str, new_size_gb: int, password: Optional[str] = None, key_file: Optional[str] = None) -> bool:
"""Resizes the LVM-based encrypted container."""
self.logger.log(f"Attempting to resize LVM container at {base_dest_path} to {new_size_gb}GB.")
if not password and not key_file:
self.logger.log("Cannot resize: Password or key file is required.")
return False
pybackup_dir = os.path.join(base_dest_path, "pybackup")
mount_point = os.path.join(pybackup_dir, "encrypted")
container_path = os.path.join(pybackup_dir, "pybackup_lvm.img")
base_name = os.path.basename(base_dest_path.rstrip('/'))
vg_name = f"pybackup_vg_{base_name}"
lv_name = "backup_lv"
mapper_name = f"pybackup_luks_{base_name}"
lv_path = f"/dev/{vg_name}/{lv_name}"
self.logger.log("Step 1: Cleaning up existing mount before resizing.")
self.cleanup_encrypted_backup(base_dest_path)
if password:
open_auth_part = f"echo -n '{password}' | cryptsetup luksOpen {lv_path} {mapper_name} -"
else:
open_auth_part = f"cryptsetup luksOpen {lv_path} {mapper_name} --key-file {key_file}"
self.logger.log("Step 2: Resizing container and underlying volumes.")
script = f"""
# Step 1: Resize the backing file
fallocate -l {new_size_gb}G {container_path}
# Step 2: Connect loop device and resize PV
LOOP_DEVICE=$(losetup -f --show {container_path})
pvresize $LOOP_DEVICE
# Step 3: Extend LV to fill the new space
lvextend -l +100%FREE {lv_path}
# Step 4: Re-open LUKS and resize it
{open_auth_part}
cryptsetup resize {mapper_name}
# Step 5: Check and resize the filesystem
e2fsck -f /dev/mapper/{mapper_name}
resize2fs /dev/mapper/{mapper_name}
# Step 6: Mount the resized filesystem
mount /dev/mapper/{mapper_name} {mount_point}
"""
if not self._execute_as_root(script):
self.logger.log("Failed to resize LVM-based encrypted container.")
# Attempt a basic cleanup after failed resize
self.cleanup_encrypted_backup(base_dest_path)
return False
self.logger.log("Successfully resized and remounted the encrypted container.")
return True
def cleanup_encrypted_backup(self, base_dest_path: str):
pybackup_dir = os.path.join(base_dest_path, "pybackup")
mount_point = os.path.join(pybackup_dir, "encrypted")
container_path = os.path.join(pybackup_dir, "pybackup_lvm.img")
base_name = os.path.basename(base_dest_path.rstrip('/'))
vg_name = f"pybackup_vg_{base_name}"
mapper_name = f"pybackup_luks_{base_name}"
self.logger.log(f"Cleaning up encrypted LVM backup for {base_dest_path}")
# Find the loop device associated with the container file
# This is a bit tricky as the script that creates it is ephemeral.
# We can parse `losetup -j <file>`
find_loop_device_cmd = f"losetup -j {container_path} | cut -d: -f1"
script = f"""
LOOP_DEVICE=$({find_loop_device_cmd} | head -n 1)
umount {mount_point} || echo "Mount point {mount_point} not found or already unmounted."
cryptsetup luksClose {mapper_name} || echo "Mapper {mapper_name} not found or already closed."
# Deactivate VG only if it exists
if vgdisplay {vg_name} >/dev/null 2>&1; then
vgchange -an {vg_name} || echo "Could not deactivate volume group {vg_name}."
fi
# Detach loop device only if it was found
if [ -n "$LOOP_DEVICE" ] && losetup $LOOP_DEVICE >/dev/null 2>&1; then
losetup -d $LOOP_DEVICE || echo "Could not detach loop device $LOOP_DEVICE."
fi
"""
if not self._execute_as_root(script):
self.logger.log("Encrypted LVM backup cleanup script failed.")
def inspect_container(self, base_dest_path: str, listing_callback) -> List:
"""Temporarily mounts an encrypted container to list its contents."""
pybackup_dir = os.path.join(base_dest_path, "pybackup")
container_path = os.path.join(pybackup_dir, "pybackup_lvm.img")
mount_point = os.path.join(pybackup_dir, "encrypted")
if not os.path.exists(container_path):
# Not an encrypted destination, just run the callback on the base path
return listing_callback(base_dest_path)
self.logger.log(f"Encrypted destination detected. Attempting to inspect {container_path}")
password = self.get_password_from_keyring("root")
if not password:
self.logger.log("No password in keyring. Cannot inspect encrypted container.")
return []
# Use a dummy queue as we don't want to send UI updates during inspection
from queue import Queue
dummy_queue = Queue()
# The setup function handles unlocking and mounting
mounted_path = self.setup_encrypted_backup(
dummy_queue, base_dest_path, size_gb=0, password=password)
if not mounted_path:
self.logger.log("Failed to mount container for inspection.")
return []
try:
# Run the actual listing logic on the mounted path
self.logger.log(f"Container mounted at {mounted_path}. Running listing callback.")
return listing_callback(base_dest_path, mounted_path=mounted_path)
finally:
self.logger.log("Inspection complete. Cleaning up mount.")
self.cleanup_encrypted_backup(base_dest_path)
def _execute_as_root(self, script_content: str) -> bool:
script_path = ''
try:
with tempfile.NamedTemporaryFile(mode='w', delete=False, prefix="pybackup_script_", suffix=".sh", dir="/tmp") as tmp_script:
tmp_script.write("#!/bin/bash\n\n")
tmp_script.write("set -e\n\n")
tmp_script.write(script_content)
script_path = tmp_script.name
os.chmod(script_path, stat.S_IRWXU | stat.S_IRGRP |
stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
command = ['pkexec', script_path]
sanitized_script_content = re.sub(
r"echo -n '.*?'", "echo -n '[REDACTED]'", script_content)
self.logger.log(
f"Executing privileged command via script: {script_path}")
self.logger.log(
f"Script content:\n---\n{sanitized_script_content}\n---")
result = subprocess.run(
command, capture_output=True, text=True, check=False)
if result.returncode == 0:
self.logger.log(
f"Privileged script executed successfully. Output:\n{result.stdout}")
return True
else:
self.logger.log(
f"Privileged script failed. Return code: {result.returncode}\nStderr: {result.stderr}\nStdout: {result.stdout}")
return False
except Exception as e:
self.logger.log(
f"Failed to set up or execute privileged command: {e}")
return False
finally:
if script_path and os.path.exists(script_path):
os.remove(script_path)