Files
Py-Backup/core/encryption_manager.py
Désiré Werner Menrath 4b6062981a fix: Resolve issues with encrypted backup view and logic
- Fixes a bug where the backup list would not display "orphan" incremental backups in the encrypted view.
- Fixes a bug where system backups were incorrectly shown in the user backup list.
- Prevents the app from asking for system permissions to lock an encrypted container on exit if it is not currently mounted.
- Fixes pathing inconsistencies for the encryption manager to ensure mapper names are created consistently.
- Adds debug logging to the backup list functions to help diagnose future issues.
2025-09-05 00:53:44 +02:00

216 lines
9.2 KiB
Python

import keyring
import keyring.errors
from keyring.backends import SecretService
import os
import shutil
import subprocess
import tempfile
import stat
import re
from typing import Optional
from pyimage_ui.password_dialog import PasswordDialog
class EncryptionManager:
def __init__(self, logger, app=None):
try:
keyring.set_keyring(SecretService.Keyring())
except Exception as e:
logger.log(f"Failed to set keyring backend to SecretService: {e}")
self.logger = logger
self.app = app
self.service_id = "py-backup-encryption"
self.session_password = None
def get_password_from_keyring(self, username: str) -> Optional[str]:
try:
return keyring.get_password(self.service_id, username)
except keyring.errors.InitError as e:
self.logger.log(f"Could not initialize keyring. Keyring is not available on this system or is not configured correctly. Error: {e}")
return None
except Exception as e:
self.logger.log(f"Could not get password from keyring: {e}")
return None
def is_key_in_keyring(self, username: str) -> bool:
try:
return self.get_password_from_keyring(username) is not None
except Exception as e:
self.logger.log(f"Could not check password in keyring: {e}")
return False
def set_password_in_keyring(self, username: str, password: str) -> bool:
try:
keyring.set_password(self.service_id, username, password)
self.logger.log(f"Password for {username} stored in keyring.")
return True
except Exception as e:
self.logger.log(f"Could not set password in keyring: {e}")
return False
def delete_password_from_keyring(self, username: str):
try:
keyring.delete_password(self.service_id, username)
self.logger.log(f"Password for {username} deleted from keyring.")
except Exception as e:
self.logger.log(f"Could not delete password from keyring: {e}")
def clear_session_password(self):
self.session_password = None
def get_password(self, username: str, confirm: bool = True) -> Optional[str]:
if self.session_password:
return self.session_password
password = self.get_password_from_keyring(username)
if password:
self.session_password = password
return password
dialog = PasswordDialog(self.app, title=f"Enter password for {username}", confirm=confirm)
password, save_to_keyring = dialog.get_password()
if password and save_to_keyring:
self.set_password_in_keyring(username, password)
if password:
self.session_password = password
return password
def is_container_mounted(self, base_dest_path: str) -> bool:
"""Checks if the LUKS container for a given base path is currently mounted."""
mapper_name = f"pybackup_{os.path.basename(base_dest_path.rstrip('/'))}"
mount_point = f"/mnt/{mapper_name}"
return os.path.ismount(mount_point)
def unlock_container(self, base_dest_path: str, password: str) -> Optional[str]:
self.logger.log(f"Attempting to unlock encrypted container for base path {base_dest_path}")
pybackup_dir = os.path.join(base_dest_path, "pybackup")
container_path = os.path.join(pybackup_dir, "pybackup_encrypted.luks")
if not os.path.exists(container_path):
self.logger.log(f"Encrypted container not found at {container_path}")
return None
mapper_name = f"pybackup_{os.path.basename(base_dest_path.rstrip('/'))}"
mount_point = f"/mnt/{mapper_name}"
if os.path.ismount(mount_point):
self.logger.log(f"Container already mounted at {mount_point}")
return mount_point
script = f"""
mkdir -p {mount_point}
echo -n '{password}' | cryptsetup luksOpen {container_path} {mapper_name} -
mount /dev/mapper/{mapper_name} {mount_point}
"""
if not self._execute_as_root(script):
self.logger.log("Failed to unlock existing encrypted container. Check password or permissions.")
self.cleanup_encrypted_backup(base_dest_path)
return None
self.logger.log(f"Encrypted container unlocked and mounted at {mount_point}")
return mount_point
def lock_container(self, base_dest_path: str):
self.cleanup_encrypted_backup(base_dest_path)
def setup_encrypted_backup(self, queue, base_dest_path: str, size_gb: int, password: str) -> Optional[str]:
self.logger.log(f"Setting up encrypted container at {base_dest_path}")
if not shutil.which("cryptsetup"):
self.logger.log("Error: cryptsetup is not installed.")
queue.put(('error', "cryptsetup is not installed."))
return None
pybackup_dir = os.path.join(base_dest_path, "pybackup")
container_path = os.path.join(pybackup_dir, "pybackup_encrypted.luks")
mapper_name = f"pybackup_{os.path.basename(base_dest_path.rstrip('/'))}"
mount_point = f"/mnt/{mapper_name}"
if not password:
self.logger.log("No password provided for encryption.")
queue.put(('error', "No password provided for encryption."))
return None
if os.path.ismount(mount_point):
self.logger.log(f"Mount point {mount_point} already exists. Cleaning up before proceeding.")
self.cleanup_encrypted_backup(base_dest_path)
if os.path.exists(container_path):
self.logger.log(f"Encrypted container {container_path} already exists. Attempting to unlock.")
return self.unlock_container(base_dest_path, password)
else:
self.logger.log(f"Creating new encrypted container: {container_path}")
script = f"""
mkdir -p {pybackup_dir}
fallocate -l {size_gb}G {container_path}
echo -n '{password}' | cryptsetup luksFormat {container_path} -
mkdir -p {mount_point}
echo -n '{password}' | cryptsetup luksOpen {container_path} {mapper_name} -
mkfs.ext4 /dev/mapper/{mapper_name}
mount /dev/mapper/{mapper_name} {mount_point}
"""
if not self._execute_as_root(script):
self.logger.log("Failed to create and setup encrypted container.")
self.cleanup_encrypted_backup(base_dest_path)
if os.path.exists(container_path):
self._execute_as_root(f"rm -f {container_path}")
queue.put(('error', "Failed to setup encrypted container."))
return None
self.logger.log(f"Encrypted container is ready and mounted at {mount_point}")
return mount_point
def cleanup_encrypted_backup(self, base_dest_path: str):
mapper_name = f"pybackup_{os.path.basename(base_dest_path.rstrip('/'))}"
mount_point = f"/mnt/{mapper_name}"
self.logger.log(f"Cleaning up encrypted backup: {mapper_name}")
script = f"""
umount {mount_point} || echo "Mount point {mount_point} not found or already unmounted."
cryptsetup luksClose {mapper_name} || echo "Mapper {mapper_name} not found or already closed."
rmdir {mount_point} || echo "Mount point directory {mount_point} not found or already removed."
"""
if not self._execute_as_root(script):
self.logger.log("Encrypted backup cleanup script failed.")
def _execute_as_root(self, script_content: str) -> bool:
script_path = ''
try:
with tempfile.NamedTemporaryFile(mode='w', delete=False, prefix="pybackup_script_", suffix=".sh", dir="/tmp") as tmp_script:
tmp_script.write("#!/bin/bash\n\n")
tmp_script.write("set -e\n\n")
tmp_script.write(script_content)
script_path = tmp_script.name
os.chmod(script_path, stat.S_IRWXU | stat.S_IRGRP |
stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
command = ['pkexec', script_path]
sanitized_script_content = re.sub(r"echo -n '.*?'", "echo -n '[REDACTED]'", script_content)
self.logger.log(
f"Executing privileged command via script: {script_path}")
self.logger.log(
f"Script content:\n---\n{sanitized_script_content}\n---")
result = subprocess.run(
command, capture_output=True, text=True, check=False)
if result.returncode == 0:
self.logger.log(
f"Privileged script executed successfully. Output:\n{result.stdout}")
return True
else:
self.logger.log(
f"Privileged script failed. Return code: {result.returncode}\nStderr: {result.stderr}\nStdout: {result.stdout}")
return False
except Exception as e:
self.logger.log(
f"Failed to set up or execute privileged command: {e}")
return False
finally:
if script_path and os.path.exists(script_path):
os.remove(script_path)