Files
Py-Backup/core/encryption_manager.py

378 lines
16 KiB
Python

import keyring
import keyring.errors
from keyring.backends import SecretService
import os
import shutil
import subprocess
import tempfile
import stat
import re
import math
from typing import Optional, List, Tuple
from core.pbp_app_config import AppConfig
from pyimage_ui.password_dialog import PasswordDialog
import json
class EncryptionManager:
def __init__(self, logger, app=None):
try:
keyring.set_keyring(SecretService.Keyring())
except Exception as e:
logger.log(f"Failed to set keyring backend to SecretService: {e}")
self.logger = logger
self.app = app
self.service_id = "py-backup-encryption"
self.mounted_destinations = set()
self.password_cache = {}
self.lock_file = AppConfig.LOCK_FILE_PATH
def _write_lock_file(self, data):
with open(self.lock_file, 'w') as f:
json.dump(data, f)
def _read_lock_file(self):
if not self.lock_file.exists():
return []
with open(self.lock_file, 'r') as f:
try:
return json.load(f)
except json.JSONDecodeError:
return []
def add_to_lock_file(self, base_path, mapper_name):
locks = self._read_lock_file()
if not any(lock['base_path'] == base_path for lock in locks):
locks.append({"base_path": base_path, "mapper_name": mapper_name})
self._write_lock_file(locks)
def remove_from_lock_file(self, base_path):
locks = self._read_lock_file()
updated_locks = [lock for lock in locks if lock['base_path'] != base_path]
self._write_lock_file(updated_locks)
def get_password_from_keyring(self, username: str) -> Optional[str]:
try:
return keyring.get_password(self.service_id, username)
except Exception as e:
self.logger.log(f"Could not get password from keyring: {e}")
return None
def is_key_in_keyring(self, username: str) -> bool:
return self.get_password_from_keyring(username) is not None
def set_password_in_keyring(self, username: str, password: str) -> bool:
try:
keyring.set_password(self.service_id, username, password)
self.logger.log(f"Password for {username} stored in keyring.")
return True
except Exception as e:
self.logger.log(f"Could not set password in keyring: {e}")
return False
def get_password(self, username: str, confirm: bool = True) -> Optional[str]:
if username in self.password_cache:
return self.password_cache[username]
password = self.get_password_from_keyring(username)
if password:
self.password_cache[username] = password
return password
dialog = PasswordDialog(
self.app, title=f"Enter password for {username}", confirm=confirm)
password, save_to_keyring = dialog.get_password()
if password:
self.password_cache[username] = password
if save_to_keyring:
self.set_password_in_keyring(username, password)
return password
def get_container_path(self, base_dest_path: str) -> str:
pybackup_dir = os.path.join(base_dest_path, "pybackup")
user_encrypt_dir = os.path.join(pybackup_dir, "user_encrypt")
return os.path.join(user_encrypt_dir, "pybackup_luks.img")
def get_key_file_path(self, base_dest_path: str) -> str:
pybackup_dir = os.path.join(base_dest_path, "pybackup")
return os.path.join(pybackup_dir, "luks.keyfile")
def create_and_add_key_file(self, base_dest_path: str, password: str) -> Optional[str]:
key_file_path = self.get_key_file_path(base_dest_path)
container_path = self.get_container_path(base_dest_path)
try:
with open(key_file_path, 'wb') as f:
f.write(os.urandom(4096))
os.chmod(key_file_path, 0o400)
self.logger.log(f"Generated new key file at {key_file_path}")
script = f'cryptsetup luksAddKey "{container_path}" "{key_file_path}"'
if self._execute_as_root(script, password):
self.logger.log(
"Successfully added key file to LUKS container.")
return key_file_path
else:
self.logger.log("Failed to add key file to LUKS container.")
os.remove(key_file_path)
return None
except Exception as e:
self.logger.log(f"Error creating key file: {e}")
if os.path.exists(key_file_path):
os.remove(key_file_path)
return None
def _get_password_or_key_cmd(self, base_dest_path: str, username: str) -> Tuple[str, Optional[str]]:
key_file_path = self.get_key_file_path(base_dest_path)
if os.path.exists(key_file_path):
self.logger.log(
f"Using key file for LUKS operation: {key_file_path}")
return f'--key-file "{key_file_path}"', None
else:
password = self.get_password(username, confirm=False)
if not password:
return "", None
return "-", password
def is_encrypted(self, base_dest_path: str) -> bool:
return os.path.exists(self.get_container_path(base_dest_path))
def is_mounted(self, base_dest_path: str) -> bool:
pybackup_dir = os.path.join(base_dest_path, "pybackup")
mount_point = os.path.join(pybackup_dir, "encrypted")
return os.path.ismount(mount_point) or base_dest_path in self.mounted_destinations
def mount_for_deletion(self, base_dest_path: str, is_system: bool, password: str) -> Optional[str]:
self.logger.log("Mounting container for deletion operation.")
if self._open_and_mount(base_dest_path, is_system, password):
mount_point = os.path.join(os.path.dirname(
self.get_container_path(base_dest_path)), "..", "encrypted")
self.mounted_destinations.add(base_dest_path)
return mount_point
self.logger.log("Failed to mount container for deletion.")
return None
def prepare_encrypted_destination(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]:
container_path = self.get_container_path(base_dest_path)
if os.path.exists(container_path):
return self._handle_existing_container(base_dest_path, is_system, source_size, queue)
else:
return self._handle_new_container(base_dest_path, is_system, source_size, queue)
def _handle_existing_container(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]:
self.logger.log("Handling existing container.")
username = os.path.basename(base_dest_path.rstrip('/'))
mount_point = os.path.join(os.path.dirname(
self.get_container_path(base_dest_path)), "..", "encrypted")
if not self.is_mounted(base_dest_path):
if not self._open_and_mount(base_dest_path, is_system):
self.logger.log("Failed to mount container for size check.")
return None
free_space = shutil.disk_usage(mount_point).free
required_space = int(source_size * 1.15)
if required_space > free_space:
self.logger.log(
f"Resize needed. Free: {free_space}, Required: {required_space}")
queue.put(('status_update', "Container zu klein. Vergrößere..."))
current_total = shutil.disk_usage(mount_point).total
needed_additional = required_space - free_space
new_total_size = current_total + needed_additional
container_path = self.get_container_path(base_dest_path)
mapper_name = f"pybackup_luks_{username}"
chown_cmd = self._get_chown_command(mount_point, is_system)
key_or_pass_arg, password = self._get_password_or_key_cmd(
base_dest_path, username)
if not key_or_pass_arg:
return None
luks_open_cmd = f'echo -n "$LUKSPASS" | cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}' if password else f'cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}'
resize_script = f"""
# Unmount cleanly first
umount -l \"{mount_point}\" || true
cryptsetup luksClose {mapper_name} || true
# Resize container file
truncate -s {int(new_total_size)} \"{container_path}\"
# Re-open, check, and resize filesystem
{luks_open_cmd}
e2fsck -fy \"/dev/mapper/{mapper_name}\"
resize2fs \"/dev/mapper/{mapper_name}\"
# Now mount it
mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"
{chown_cmd}
"""
if not self._execute_as_root(resize_script, password):
self.logger.log("Failed to execute resize and remount script.")
return None
if not self.is_mounted(base_dest_path):
self.logger.log(
"CRITICAL: Mount failed after resize script, but script reported success. Aborting.")
return None
self.mounted_destinations.add(base_dest_path)
return mount_point
def _handle_new_container(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]:
self.logger.log("Handling new container creation.")
size_gb = math.ceil((source_size * 1.2) / (1024**3)) + 5
username = os.path.basename(base_dest_path.rstrip('/'))
password = self.get_password(username, confirm=True)
if not password:
return None
container_path = self.get_container_path(base_dest_path)
mount_point = os.path.join(os.path.dirname(
container_path), "..", "encrypted")
mapper_name = f"pybackup_luks_{username}"
chown_cmd = self._get_chown_command(mount_point, is_system)
script = f"""
mkdir -p \"{os.path.dirname(container_path)}\"\n mkdir -p \"{mount_point}\"\n truncate -s {int(size_gb)}G \"{container_path}\"\n echo -n \"$LUKSPASS\" | cryptsetup luksFormat \"{container_path}\" -
echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} -
mkfs.ext4 \"/dev/mapper/{mapper_name}\"
mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"\n {chown_cmd}\n """
if not self._execute_as_root(script, password):
return None
self.mounted_destinations.add(base_dest_path)
return mount_point
def _open_and_mount(self, base_dest_path: str, is_system: bool, password_override: Optional[str] = None) -> bool:
username = os.path.basename(base_dest_path.rstrip('/'))
key_or_pass_arg, password = "", None
if password_override:
password = password_override
key_or_pass_arg = "-"
else:
key_or_pass_arg, password = self._get_password_or_key_cmd(
base_dest_path, username)
if not key_or_pass_arg:
return False
container_path = self.get_container_path(base_dest_path)
mount_point = os.path.join(os.path.dirname(
container_path), "..", "encrypted")
mapper_name = f"pybackup_luks_{username}"
chown_cmd = self._get_chown_command(mount_point, is_system)
luks_open_cmd = f'echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} {key_or_pass_arg}' if password else f'cryptsetup luksOpen \"{container_path}\" {mapper_name} {key_or_pass_arg}'
script = f"""
umount -l \"{mount_point}\" || true
cryptsetup luksClose {mapper_name} || true
mkdir -p \"{mount_point}\"
{luks_open_cmd}
mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"
{chown_cmd}
"""
if self._execute_as_root(script, password):
self.add_to_lock_file(base_dest_path, mapper_name)
return True
return False
def unmount_and_reset_owner(self, base_dest_path: str, force_unmap=False):
username = os.path.basename(base_dest_path.rstrip('/'))
mapper_name = f"pybackup_luks_{username}"
if not os.path.exists(f"/dev/mapper/{mapper_name}") and not self.is_mounted(base_dest_path):
if not force_unmap:
return
self.logger.log(f"Unmounting and resetting owner for {base_dest_path}")
container_path = self.get_container_path(base_dest_path)
mount_point = os.path.join(os.path.dirname(
container_path), "..", "encrypted")
script = f"""
chown root:root \"{mount_point}\" || true
umount -l \"{mount_point}\"
cryptsetup luksClose {mapper_name}
"""
password = self.password_cache.get(username)
self._execute_as_root(script, password)
self.remove_from_lock_file(base_dest_path)
if base_dest_path in self.mounted_destinations:
self.mounted_destinations.remove(base_dest_path)
if username in self.password_cache:
del self.password_cache[username]
def unmount_all(self):
self.logger.log(f"Unmounting all: {self.mounted_destinations}")
for path in list(self.mounted_destinations):
self.unmount_and_reset_owner(path, force_unmap=True)
def _get_chown_command(self, mount_point: str, is_system: bool) -> str:
if not is_system:
try:
uid = os.getuid()
gid = os.getgid()
return f"chown {uid}:{gid} \"{mount_point}\""
except Exception as e:
self.logger.log(
f"Could not get current user UID/GID for chown: {e}")
return ""
def _execute_as_root(self, script_content: str, password_for_stdin: Optional[str] = None) -> bool:
try:
if password_for_stdin:
escaped_password = password_for_stdin.replace("'", "'\\\''")
script_content = f"LUKSPASS='{escaped_password}'\n{script_content}"
project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..'))
runner_script_path = os.path.join(
project_root, 'core', 'privileged_script_runner.sh')
if not os.path.exists(runner_script_path):
self.logger.log(
f"CRITICAL: Privileged script runner not found at {runner_script_path}")
return False
command = ['pkexec', runner_script_path]
log_lines = []
for line in script_content.split('\n'):
if "LUKSPASS=" in line:
log_lines.append("LUKSPASS='[REDACTED]'")
else:
log_lines.append(line)
sanitized_script_content = "\n".join(log_lines)
self.logger.log(
f"Executing privileged command via runner: {runner_script_path}")
self.logger.log(
f"Script content to be piped:\n---\n{sanitized_script_content}\n---")
result = subprocess.run(
command, input=script_content, capture_output=True, text=True, check=False)
if result.returncode == 0:
log_output = f"Privileged script executed successfully."
if result.stdout:
log_output += f"\nStdout:\n{result.stdout}"
if result.stderr:
log_output += f"\nStderr:\n{result.stderr}"
self.logger.log(log_output)
return True
else:
self.logger.log(
f"Privileged script failed. Return code: {result.returncode}\nStderr: {result.stderr}\nStdout: {result.stdout}")
return False
except Exception as e:
self.logger.log(
f"Failed to set up or execute privileged command: {e}")
return False