Files
Py-Backup/core/encryption_manager.py
Désiré Werner Menrath 827f3a1e08 feat: Implement CLI script and key file support for automated backups
- Introduces `pybackup-cli.py` as a command-line interface for non-interactive backups.
- Adds support for LUKS key files in `EncryptionManager` for passwordless container operations.
- Updates `BackupManager` to pass key file arguments to encryption routines.
- Modifies `AdvancedSettingsFrame` to provide a GUI for creating and managing key files.
- Integrates `pybackup-cli.py` and key file options into `schedule_job_dialog.py` for cronjob generation.
2025-09-05 01:48:49 +02:00

281 lines
12 KiB
Python

import keyring
import keyring.errors
from keyring.backends import SecretService
import os
import shutil
import subprocess
import tempfile
import stat
import re
from typing import Optional
from pbp_app_config import AppConfig
from pyimage_ui.password_dialog import PasswordDialog
class EncryptionManager:
def __init__(self, logger, app=None):
try:
keyring.set_keyring(SecretService.Keyring())
except Exception as e:
logger.log(f"Failed to set keyring backend to SecretService: {e}")
self.logger = logger
self.app = app
self.service_id = "py-backup-encryption"
self.session_password = None
def get_key_file_path(self, base_dest_path: str) -> str:
"""Generates the standard path for the key file for a given destination."""
key_filename = f"keyfile_{os.path.basename(base_dest_path.rstrip('/'))}.key"
return os.path.join(AppConfig.CONFIG_DIR, key_filename)
def create_and_add_key_file(self, base_dest_path: str, password: str) -> Optional[str]:
"""Creates a new key file and adds it as a valid key to the LUKS container."""
self.logger.log(f"Attempting to create and add key file for {base_dest_path}")
pybackup_dir = os.path.join(base_dest_path, "pybackup")
container_path = os.path.join(pybackup_dir, "pybackup_encrypted.luks")
key_file_path = self.get_key_file_path(base_dest_path)
if not os.path.exists(container_path):
self.logger.log(f"Container does not exist at {container_path}. Cannot add key file.")
return None
if os.path.exists(key_file_path):
self.logger.log(f"Key file already exists at {key_file_path}. Aborting.")
return key_file_path
# Create a temporary file for the new key
try:
with tempfile.NamedTemporaryFile(mode='w', delete=False, prefix="temp_keyfile_") as tmp_keyfile:
tmp_keyfile_path = tmp_keyfile.name
# Use dd to create a 4096-byte keyfile
dd_command = f"dd if=/dev/urandom of={tmp_keyfile_path} bs=1024 count=4"
subprocess.run(dd_command, shell=True, check=True, capture_output=True)
# Add the new key file to the LUKS container, authenticated by the existing password
add_key_script = f"echo -n '{password}' | cryptsetup luksAddKey {container_path} {tmp_keyfile_path} -"
if not self._execute_as_root(add_key_script):
self.logger.log("Failed to add new key file to LUKS container.")
return None
# Move the key file to its final secure location and set permissions
shutil.move(tmp_keyfile_path, key_file_path)
os.chmod(key_file_path, stat.S_IRUSR) # Read-only for user
self.logger.log(f"Successfully created and added key file: {key_file_path}")
return key_file_path
except Exception as e:
self.logger.log(f"An error occurred during key file creation: {e}")
return None
finally:
if 'tmp_keyfile_path' in locals() and os.path.exists(tmp_keyfile_path):
os.remove(tmp_keyfile_path)
def get_password_from_keyring(self, username: str) -> Optional[str]:
try:
return keyring.get_password(self.service_id, username)
except keyring.errors.InitError as e:
self.logger.log(f"Could not initialize keyring: {e}")
return None
except Exception as e:
self.logger.log(f"Could not get password from keyring: {e}")
return None
def is_key_in_keyring(self, username: str) -> bool:
try:
return self.get_password_from_keyring(username) is not None
except Exception as e:
self.logger.log(f"Could not check password in keyring: {e}")
return False
def set_password_in_keyring(self, username: str, password: str) -> bool:
try:
keyring.set_password(self.service_id, username, password)
self.logger.log(f"Password for {username} stored in keyring.")
return True
except Exception as e:
self.logger.log(f"Could not set password in keyring: {e}")
return False
def delete_password_from_keyring(self, username: str):
try:
keyring.delete_password(self.service_id, username)
self.logger.log(f"Password for {username} deleted from keyring.")
except Exception as e:
self.logger.log(f"Could not delete password from keyring: {e}")
def clear_session_password(self):
self.session_password = None
def get_password(self, username: str, confirm: bool = True) -> Optional[str]:
if self.session_password:
return self.session_password
password = self.get_password_from_keyring(username)
if password:
self.session_password = password
return password
dialog = PasswordDialog(self.app, title=f"Enter password for {username}", confirm=confirm)
password, save_to_keyring = dialog.get_password()
if password and save_to_keyring:
self.set_password_in_keyring(username, password)
if password:
self.session_password = password
return password
def is_container_mounted(self, base_dest_path: str) -> bool:
mapper_name = f"pybackup_{os.path.basename(base_dest_path.rstrip('/'))}"
mount_point = f"/mnt/{mapper_name}"
return os.path.ismount(mount_point)
def unlock_container(self, base_dest_path: str, password: Optional[str] = None, key_file: Optional[str] = None) -> Optional[str]:
self.logger.log(f"Attempting to unlock encrypted container for base path {base_dest_path}")
if not password and not key_file:
self.logger.log("Unlock failed: Either password or key_file must be provided.")
return None
pybackup_dir = os.path.join(base_dest_path, "pybackup")
container_path = os.path.join(pybackup_dir, "pybackup_encrypted.luks")
if not os.path.exists(container_path):
self.logger.log(f"Encrypted container not found at {container_path}")
return None
mapper_name = f"pybackup_{os.path.basename(base_dest_path.rstrip('/'))}"
mount_point = f"/mnt/{mapper_name}"
if os.path.ismount(mount_point):
self.logger.log(f"Container already mounted at {mount_point}")
return mount_point
if password:
auth_part = f"echo -n '{password}' | cryptsetup luksOpen {container_path} {mapper_name} -"
else: # key_file is provided
auth_part = f"cryptsetup luksOpen {container_path} {mapper_name} --key-file {key_file}"
script = f"""
mkdir -p {mount_point}
{auth_part}
mount /dev/mapper/{mapper_name} {mount_point}
"""
if not self._execute_as_root(script):
self.logger.log("Failed to unlock existing encrypted container. Check password/key file or permissions.")
self.cleanup_encrypted_backup(base_dest_path)
return None
self.logger.log(f"Encrypted container unlocked and mounted at {mount_point}")
return mount_point
def lock_container(self, base_dest_path: str):
self.cleanup_encrypted_backup(base_dest_path)
def setup_encrypted_backup(self, queue, base_dest_path: str, size_gb: int, password: Optional[str] = None, key_file: Optional[str] = None) -> Optional[str]:
self.logger.log(f"Setting up encrypted container at {base_dest_path}")
if not shutil.which("cryptsetup"):
self.logger.log("Error: cryptsetup is not installed.")
queue.put(('error', "cryptsetup is not installed."))
return None
pybackup_dir = os.path.join(base_dest_path, "pybackup")
container_path = os.path.join(pybackup_dir, "pybackup_encrypted.luks")
mapper_name = f"pybackup_{os.path.basename(base_dest_path.rstrip('/'))}"
mount_point = f"/mnt/{mapper_name}"
if not password and not key_file:
self.logger.log("No password or key file provided for encryption.")
queue.put(('error', "No password or key file provided for encryption."))
return None
if os.path.ismount(mount_point):
self.logger.log(f"Mount point {mount_point} already exists. Cleaning up before proceeding.")
self.cleanup_encrypted_backup(base_dest_path)
if os.path.exists(container_path):
self.logger.log(f"Encrypted container {container_path} already exists. Attempting to unlock.")
return self.unlock_container(base_dest_path, password=password, key_file=key_file)
else:
self.logger.log(f"Creating new encrypted container: {container_path}")
if password:
format_auth_part = f"echo -n '{password}' | cryptsetup luksFormat {container_path} -"
open_auth_part = f"echo -n '{password}' | cryptsetup luksOpen {container_path} {mapper_name} -"
else: # key_file is provided
format_auth_part = f"cryptsetup luksFormat {container_path} --key-file {key_file}"
open_auth_part = f"cryptsetup luksOpen {container_path} {mapper_name} --key-file {key_file}"
script = f"""
mkdir -p {pybackup_dir}
fallocate -l {size_gb}G {container_path}
{format_auth_part}
mkdir -p {mount_point}
{open_auth_part}
mkfs.ext4 /dev/mapper/{mapper_name}
mount /dev/mapper/{mapper_name} {mount_point}
"""
if not self._execute_as_root(script):
self.logger.log("Failed to create and setup encrypted container.")
self.cleanup_encrypted_backup(base_dest_path)
if os.path.exists(container_path):
self._execute_as_root(f"rm -f {container_path}")
queue.put(('error', "Failed to setup encrypted container."))
return None
self.logger.log(f"Encrypted container is ready and mounted at {mount_point}")
return mount_point
def cleanup_encrypted_backup(self, base_dest_path: str):
mapper_name = f"pybackup_{os.path.basename(base_dest_path.rstrip('/'))}"
mount_point = f"/mnt/{mapper_name}"
self.logger.log(f"Cleaning up encrypted backup: {mapper_name}")
script = f"""
umount {mount_point} || echo "Mount point {mount_point} not found or already unmounted."
cryptsetup luksClose {mapper_name} || echo "Mapper {mapper_name} not found or already closed."
rmdir {mount_point} || echo "Mount point directory {mount_point} not found or already removed."
"""
if not self._execute_as_root(script):
self.logger.log("Encrypted backup cleanup script failed.")
def _execute_as_root(self, script_content: str) -> bool:
script_path = ''
try:
with tempfile.NamedTemporaryFile(mode='w', delete=False, prefix="pybackup_script_", suffix=".sh", dir="/tmp") as tmp_script:
tmp_script.write("#!/bin/bash\n\n")
tmp_script.write("set -e\n\n")
tmp_script.write(script_content)
script_path = tmp_script.name
os.chmod(script_path, stat.S_IRWXU | stat.S_IRGRP |
stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
command = ['pkexec', script_path]
sanitized_script_content = re.sub(r"echo -n '.*?'", "echo -n '[REDACTED]'", script_content)
self.logger.log(
f"Executing privileged command via script: {script_path}")
self.logger.log(
f"Script content:\n---\n{sanitized_script_content}\n---")
result = subprocess.run(
command, capture_output=True, text=True, check=False)
if result.returncode == 0:
self.logger.log(
f"Privileged script executed successfully. Output:\n{result.stdout}")
return True
else:
self.logger.log(
f"Privileged script failed. Return code: {result.returncode}\nStderr: {result.stderr}\nStdout: {result.stdout}")
return False
except Exception as e:
self.logger.log(
f"Failed to set up or execute privileged command: {e}")
return False
finally:
if script_path and os.path.exists(script_path):
os.remove(script_path)