Files
Py-Backup/core/encryption_manager.py
Désiré Werner Menrath 2d685e1d97 refactor: Rework encrypted backup UI and logic
- Centralizes backup content view logic into a single BackupContentFrame.
- Removes the separate, now obsolete EncryptedBackupContentFrame.
- Adds a toggle button within the BackupContentFrame to switch between viewing normal and encrypted backups.
- Centralizes the Restore, Delete, and Edit Comment buttons into a single button bar in BackupContentFrame.
- Corrects the path resolution logic to find backups and encrypted containers within the /pybackup subdirectory.
- Fixes UI bugs where action buttons would disappear when switching tabs.
2025-09-04 23:22:12 +02:00

216 lines
9.3 KiB
Python

import keyring
import keyring.errors
from keyring.backends import SecretService
import os
import shutil
import subprocess
import tempfile
import stat
from typing import Optional
from pyimage_ui.password_dialog import PasswordDialog
class EncryptionManager:
def __init__(self, logger, app=None):
try:
keyring.set_keyring(SecretService.Keyring())
except Exception as e:
logger.log(f"Failed to set keyring backend to SecretService: {e}")
self.logger = logger
self.app = app
self.service_id = "py-backup-encryption"
self.session_password = None
def get_password_from_keyring(self, username: str) -> Optional[str]:
try:
return keyring.get_password(self.service_id, username)
except keyring.errors.InitError as e:
self.logger.log(f"Could not initialize keyring. Keyring is not available on this system or is not configured correctly. Error: {e}")
return None
except Exception as e:
self.logger.log(f"Could not get password from keyring: {e}")
return None
def is_key_in_keyring(self, username: str) -> bool:
"""Checks if a password for the given username exists in the keyring."""
try:
return self.get_password_from_keyring(username) is not None
except Exception as e:
self.logger.log(f"Could not check password in keyring: {e}")
return False
def set_password_in_keyring(self, username: str, password: str) -> bool:
try:
keyring.set_password(self.service_id, username, password)
self.logger.log(f"Password for {username} stored in keyring.")
return True
except Exception as e:
self.logger.log(f"Could not set password in keyring: {e}")
return False
def delete_password_from_keyring(self, username: str):
try:
keyring.delete_password(self.service_id, username)
self.logger.log(f"Password for {username} deleted from keyring.")
except Exception as e:
self.logger.log(f"Could not delete password from keyring: {e}")
def clear_session_password(self):
self.session_password = None
def get_password(self, username: str, confirm: bool = True) -> Optional[str]:
if self.session_password:
return self.session_password
password = self.get_password_from_keyring(username)
if password:
self.session_password = password
return password
dialog = PasswordDialog(self.app, title=f"Enter password for {username}", confirm=confirm)
password, save_to_keyring = dialog.get_password()
if password and save_to_keyring:
self.set_password_in_keyring(username, password)
if password:
self.session_password = password
return password
def unlock_container(self, base_path: str, password: str) -> Optional[str]:
"""Unlocks and mounts an existing LUKS container."""
self.logger.log(f"Attempting to unlock encrypted container in {base_path}")
container_file = "pybackup_encrypted.luks"
container_path = os.path.join(base_path, container_file)
if not os.path.exists(container_path):
self.logger.log(f"Encrypted container not found at {container_path}")
return None
mapper_name = f"pybackup_{os.path.basename(base_path.rstrip('/'))}"
mount_point = f"/mnt/{mapper_name}"
if os.path.ismount(mount_point):
self.logger.log(f"Container already mounted at {mount_point}")
return mount_point
script = f"""
mkdir -p {mount_point}
echo -n '{password}' | cryptsetup luksOpen {container_path} {mapper_name} -
mount /dev/mapper/{mapper_name} {mount_point}
"""
if not self._execute_as_root(script):
self.logger.log("Failed to unlock existing encrypted container. Check password or permissions.")
# Clean up failed mount attempt
self.cleanup_encrypted_backup(mapper_name, mount_point)
return None
self.logger.log(f"Encrypted container unlocked and mounted at {mount_point}")
return mount_point
def lock_container(self, base_path: str):
"""Unmounts and closes the LUKS container for a given base path."""
mapper_name = f"pybackup_{os.path.basename(base_path.rstrip('/'))}"
mount_point = f"/mnt/{mapper_name}"
self.cleanup_encrypted_backup(mapper_name, mount_point)
def setup_encrypted_backup(self, queue, base_path: str, size_gb: int, password: str) -> Optional[str]:
"""Sets up a persistent LUKS encrypted container for the backup destination."""
self.logger.log(f"Setting up encrypted container at {base_path}")
if not shutil.which("cryptsetup"):
self.logger.log("Error: cryptsetup is not installed.")
queue.put(('error', "cryptsetup is not installed."))
return None
container_file = "pybackup_encrypted.luks"
container_path = os.path.join(base_path, container_file)
mapper_name = f"pybackup_{os.path.basename(base_path.rstrip('/'))}"
mount_point = f"/mnt/{mapper_name}"
if not password:
self.logger.log("No password provided for encryption.")
queue.put(('error', "No password provided for encryption."))
return None
# If mount point already exists, something is wrong. Clean up first.
if os.path.ismount(mount_point):
self.logger.log(f"Mount point {mount_point} already exists. Cleaning up before proceeding.")
self.cleanup_encrypted_backup(mapper_name, mount_point)
if os.path.exists(container_path):
self.logger.log(f"Encrypted container {container_path} already exists. Attempting to unlock.")
return self.unlock_container(base_path, password)
else:
self.logger.log(f"Creating new encrypted container: {container_path}")
script = f"""
fallocate -l {size_gb}G {container_path}
echo -n '{password}' | cryptsetup luksFormat {container_path} -
mkdir -p {mount_point}
echo -n '{password}' | cryptsetup luksOpen {container_path} {mapper_name} -
mkfs.ext4 /dev/mapper/{mapper_name}
mount /dev/mapper/{mapper_name} {mount_point}
"""
if not self._execute_as_root(script):
self.logger.log("Failed to create and setup encrypted container.")
self.cleanup_encrypted_backup(mapper_name, mount_point)
# Also remove the failed container file
if os.path.exists(container_path):
# This should be done with pkexec as well for safety
self._execute_as_root(f"rm -f {container_path}")
queue.put(('error', "Failed to setup encrypted container."))
return None
self.logger.log(f"Encrypted container is ready and mounted at {mount_point}")
return mount_point
def cleanup_encrypted_backup(self, mapper_name: str, mount_point: str):
"""Unmounts and closes the LUKS container."""
self.logger.log(f"Cleaning up encrypted backup: {mapper_name}")
script = f"""
umount {mount_point} || echo "Mount point {mount_point} not found or already unmounted."
cryptsetup luksClose {mapper_name} || echo "Mapper {mapper_name} not found or already closed."
rmdir {mount_point} || echo "Mount point directory {mount_point} not found or already removed."
"""
if not self._execute_as_root(script):
self.logger.log("Encrypted backup cleanup script failed.")
def _execute_as_root(self, script_content: str) -> bool:
"""Executes a shell script with root privileges using pkexec."""
script_path = ''
try:
with tempfile.NamedTemporaryFile(mode='w', delete=False, prefix="pybackup_script_", suffix=".sh", dir="/tmp") as tmp_script:
tmp_script.write("#!/bin/bash\n\n")
tmp_script.write("set -e\n\n")
tmp_script.write(script_content)
script_path = tmp_script.name
os.chmod(script_path, stat.S_IRWXU | stat.S_IRGRP |
stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
command = ['pkexec', script_path]
self.logger.log(
f"Executing privileged command via script: {script_path}")
self.logger.log(
f"Script content:\n---\n{script_content}\n---")
result = subprocess.run(
command, capture_output=True, text=True, check=False)
if result.returncode == 0:
self.logger.log(
f"Privileged script executed successfully. Output:\n{result.stdout}")
return True
else:
self.logger.log(
f"Privileged script failed. Return code: {result.returncode}\nStderr: {result.stderr}\nStdout: {result.stdout}")
return False
except Exception as e:
self.logger.log(
f"Failed to set up or execute privileged command: {e}")
return False
finally:
if script_path and os.path.exists(script_path):
os.remove(script_path)