Files
Py-Backup/core/encryption_manager.py
Désiré Werner Menrath e1b12227d0 feat: Improve encrypted backup management and UI feedback
This commit introduces significant improvements to how encrypted backups are handled,
focusing on user experience and system integration.

- Persistent Mounts: Encrypted backup containers now remain mounted across UI view changes,
  eliminating repeated password prompts when navigating the application. The container is
  automatically unmounted when the destination changes or the application closes.
- Key Management Fallback: The mounting process now intelligently falls back from
  keyring to keyfile, and finally to a user password prompt if previous methods fail.
- Enhanced UI Status: The header now provides detailed feedback on the encryption key
  status, indicating whether a key is available (via keyring or keyfile) and if the
  container is currently in use.
- Reduced pkexec Prompts: By keeping containers mounted, the number of system-level
  pkexec authentication prompts is drastically reduced, improving workflow.
- Bug Fixes:
    - Corrected a SyntaxError in encryption_manager.py related to string escaping.
    - Fixed an AttributeError in header_frame.py by restoring the is_key_in_keyring method.
    - Addressed a TclError on application shutdown by safely destroying Tkinter widgets.
2025-09-06 15:39:59 +02:00

295 lines
13 KiB
Python

import keyring
import keyring.errors
from keyring.backends import SecretService
import os
import shutil
import subprocess
import tempfile
import stat
import re
from typing import Optional, List
from core.pbp_app_config import AppConfig
from pyimage_ui.password_dialog import PasswordDialog
class EncryptionManager:
def __init__(self, logger, app=None):
try:
keyring.set_keyring(SecretService.Keyring())
except Exception as e:
logger.log(f"Failed to set keyring backend to SecretService: {e}")
self.logger = logger
self.app = app
self.service_id = "py-backup-encryption"
self.session_password = None
self.mounted_destinations = set()
self.auth_method = None
def get_key_file_path(self, base_dest_path: str) -> str:
"""Generates the standard path for the key file for a given destination."""
key_filename = f"keyfile_{os.path.basename(base_dest_path.rstrip('/'))}.key"
return os.path.join(AppConfig.CONFIG_DIR, key_filename)
def get_password_from_keyring(self, username: str) -> Optional[str]:
try:
return keyring.get_password(self.service_id, username)
except keyring.errors.InitError as e:
self.logger.log(f"Could not initialize keyring: {e}")
return None
except Exception as e:
self.logger.log(f"Could not get password from keyring: {e}")
return None
def is_key_in_keyring(self, username: str) -> bool:
try:
return self.get_password_from_keyring(username) is not None
except Exception as e:
self.logger.log(f"Could not check password in keyring: {e}")
return False
def set_password_in_keyring(self, username: str, password: str) -> bool:
try:
keyring.set_password(self.service_id, username, password)
self.logger.log(f"Password for {username} stored in keyring.")
return True
except Exception as e:
self.logger.log(f"Could not set password in keyring: {e}")
return False
def get_password(self, username: str, confirm: bool = True) -> Optional[str]:
if self.session_password:
return self.session_password
password = self.get_password_from_keyring(username)
if password:
self.session_password = password
return password
dialog = PasswordDialog(
self.app, title=f"Enter password for {username}", confirm=confirm)
password, save_to_keyring = dialog.get_password()
if password and save_to_keyring:
self.set_password_in_keyring(username, password)
if password:
self.session_password = password
return password
def is_encrypted(self, base_dest_path: str) -> bool:
if os.path.basename(base_dest_path) == "pybackup":
pybackup_dir = base_dest_path
else:
pybackup_dir = os.path.join(base_dest_path, "pybackup")
user_encrypt_dir = os.path.join(pybackup_dir, "user_encrypt")
container_path = os.path.join(user_encrypt_dir, "pybackup_lvm.img")
return os.path.exists(container_path)
def is_mounted(self, base_dest_path: str) -> bool:
if os.path.basename(base_dest_path) == "pybackup":
pybackup_dir = base_dest_path
else:
pybackup_dir = os.path.join(base_dest_path, "pybackup")
mount_point = os.path.join(pybackup_dir, "encrypted")
return os.path.ismount(mount_point)
def mount(self, base_dest_path: str, queue=None) -> Optional[str]:
if not self.is_encrypted(base_dest_path):
self.auth_method = None
return None
if self.is_mounted(base_dest_path):
self.mounted_destinations.add(base_dest_path)
pybackup_dir = os.path.join(base_dest_path, "pybackup")
return os.path.join(pybackup_dir, "encrypted")
username = os.path.basename(base_dest_path.rstrip('/'))
# Use a dummy queue if none is provided
if queue is None:
from queue import Queue
queue = Queue()
# 1. Try keyring
password = self.get_password_from_keyring(username)
if password:
self.logger.log("Found password in keyring. Attempting to mount.")
mount_point = self._setup_encrypted_backup(queue, base_dest_path, 0, password=password)
if mount_point:
self.auth_method = "keyring"
self.mounted_destinations.add(base_dest_path)
return mount_point
# 2. Try key file
key_file_path = self.get_key_file_path(base_dest_path)
if os.path.exists(key_file_path):
self.logger.log(f"Found key file at {key_file_path}. Attempting to mount.")
mount_point = self._setup_encrypted_backup(queue, base_dest_path, 0, key_file=key_file_path)
if mount_point:
self.auth_method = "keyfile"
self.mounted_destinations.add(base_dest_path)
return mount_point
# 3. Prompt for password
self.logger.log("No password in keyring or key file found. Prompting user.")
password = self.get_password(username, confirm=False)
if not password:
self.logger.log("No password provided, cannot mount container.")
self.auth_method = None
return None
mount_point = self._setup_encrypted_backup(queue, base_dest_path, 0, password=password)
if mount_point:
self.auth_method = "password"
self.mounted_destinations.add(base_dest_path)
return mount_point
def unmount(self, base_dest_path: str):
if base_dest_path in self.mounted_destinations:
self._cleanup_encrypted_backup(base_dest_path)
self.mounted_destinations.remove(base_dest_path)
def unmount_all(self):
self.logger.log(f"Unmounting all mounted backup destinations: {self.mounted_destinations}")
# Create a copy for safe iteration
for path in list(self.mounted_destinations):
self.unmount(path)
def _setup_encrypted_backup(self, queue, base_dest_path: str, size_gb: int, password: Optional[str] = None, key_file: Optional[str] = None) -> Optional[str]:
self.logger.log(f"Setting up LVM-based encrypted container at {base_dest_path}")
for tool in ["cryptsetup", "losetup", "pvcreate", "vgcreate", "lvcreate", "lvextend", "resize2fs"]:
if not shutil.which(tool):
self.logger.log(f"Error: Required tool '{tool}' is not installed.")
queue.put(('error', f"Required tool '{tool}' is not installed."))
return None
pybackup_dir = os.path.join(base_dest_path, "pybackup")
user_encrypt_dir = os.path.join(pybackup_dir, "user_encrypt")
encrypted_dir = os.path.join(pybackup_dir, "encrypted")
container_path = os.path.join(user_encrypt_dir, "pybackup_lvm.img")
mount_point = encrypted_dir
base_name = os.path.basename(base_dest_path.rstrip('/'))
vg_name = f"pybackup_vg_{base_name}"
lv_name = "backup_lv"
mapper_name = f"pybackup_luks_{base_name}"
lv_path = f"/dev/{vg_name}/{lv_name}"
if not password and not key_file:
self.logger.log("No password or key file provided for encryption.")
queue.put(('error', "No password or key file provided for encryption."))
return None
if os.path.ismount(mount_point):
self.logger.log(f"Mount point {mount_point} already in use. Assuming it's correctly mounted.")
return mount_point
if os.path.exists(container_path):
auth_part = f"echo -n '{password}' | cryptsetup luksOpen {lv_path} {mapper_name} -" if password else f"cryptsetup luksOpen {lv_path} {mapper_name} --key-file {key_file}"
script = f"""
mkdir -p {user_encrypt_dir}
LOOP_DEVICE=$(losetup -f --show {container_path})
pvscan --cache
vgchange -ay {vg_name}
{auth_part}
mount /dev/mapper/{mapper_name} {mount_point}
echo "LOOP_DEVICE_PATH=$LOOP_DEVICE"
"""
if not self._execute_as_root(script):
self.logger.log("Failed to unlock existing LVM container.")
self._cleanup_encrypted_backup(base_dest_path)
return None
self.logger.log(f"Encrypted LVM container unlocked and mounted at {mount_point}")
return mount_point
else:
format_auth_part = f"echo -n '{password}' | cryptsetup luksFormat {lv_path} -" if password else f"cryptsetup luksFormat {lv_path} --key-file {key_file}"
open_auth_part = f"echo -n '{password}' | cryptsetup luksOpen {lv_path} {mapper_name} -" if password else f"cryptsetup luksOpen {lv_path} {mapper_name} --key-file {key_file}"
script = f"""
mkdir -p {encrypted_dir}
mkdir -p {user_encrypt_dir}
fallocate -l {size_gb}G {container_path}
LOOP_DEVICE=$(losetup -f --show {container_path})
pvcreate $LOOP_DEVICE
vgcreate {vg_name} $LOOP_DEVICE
lvcreate -n {lv_name} -l 100%FREE {vg_name}
{format_auth_part}
{open_auth_part}
mkfs.ext4 /dev/mapper/{mapper_name}
mount /dev/mapper/{mapper_name} {mount_point}
echo "LOOP_DEVICE_PATH=$LOOP_DEVICE"
"""
if not self._execute_as_root(script):
self.logger.log("Failed to create and setup LVM-based encrypted container.")
self._cleanup_encrypted_backup(base_dest_path)
if os.path.exists(container_path):
self._execute_as_root(f"rm -f {container_path}")
queue.put(('error', "Failed to setup LVM-based encrypted container."))
return None
self.logger.log(f"Encrypted LVM container is ready and mounted at {mount_point}")
return mount_point
def _cleanup_encrypted_backup(self, base_dest_path: str):
pybackup_dir = os.path.join(base_dest_path, "pybackup")
user_encrypt_dir = os.path.join(pybackup_dir, "user_encrypt")
mount_point = os.path.join(pybackup_dir, "encrypted")
container_path = os.path.join(user_encrypt_dir, "pybackup_lvm.img")
base_name = os.path.basename(base_dest_path.rstrip('/'))
vg_name = f"pybackup_vg_{base_name}"
mapper_name = f"pybackup_luks_{base_name}"
self.logger.log(f"Cleaning up encrypted LVM backup for {base_dest_path}")
find_loop_device_cmd = f"losetup -j {container_path} | cut -d: -f1"
script = f"""
LOOP_DEVICE=$({find_loop_device_cmd} | head -n 1)
umount {mount_point} || echo "Mount point {mount_point} not found or already unmounted."
cryptsetup luksClose {mapper_name} || echo "Mapper {mapper_name} not found or already closed."
if vgdisplay {vg_name} >/dev/null 2>&1; then
vgchange -an {vg_name} || echo "Could not deactivate volume group {vg_name}."
fi
if [ -n "$LOOP_DEVICE" ] && losetup $LOOP_DEVICE >/dev/null 2>&1; then
losetup -d $LOOP_DEVICE || echo "Could not detach loop device $LOOP_DEVICE."
fi
"""
if not self._execute_as_root(script):
self.logger.log("Encrypted LVM backup cleanup script failed.")
def _execute_as_root(self, script_content: str) -> bool:
script_path = ''
try:
with tempfile.NamedTemporaryFile(mode='w', delete=False, prefix="pybackup_script_", suffix=".sh", dir="/tmp") as tmp_script:
tmp_script.write("#!/bin/bash\n\n")
tmp_script.write("set -e\n\n")
tmp_script.write(script_content)
script_path = tmp_script.name
os.chmod(script_path, stat.S_IRWXU | stat.S_IRGRP |
stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
command = ['pkexec', script_path]
sanitized_script_content = re.sub(
r"echo -n \'.*?\'", "echo -n \'[REDACTED]\'", script_content)
self.logger.log(
f"Executing privileged command via script: {script_path}")
self.logger.log(
f"Script content:\n---\n{sanitized_script_content}\n---")
result = subprocess.run(
command, capture_output=True, text=True, check=False)
if result.returncode == 0:
self.logger.log(
f"Privileged script executed successfully. Output:\n{result.stdout}")
return True
else:
self.logger.log(
f"Privileged script failed. Return code: {result.returncode}\nStderr: {result.stderr}\nStdout: {result.stdout}")
return False
except Exception as e:
self.logger.log(
f"Failed to set up or execute privileged command: {e}")
return False
finally:
if script_path and os.path.exists(script_path):
os.remove(script_path)