Files
Py-Backup/core/encryption_manager.py

180 lines
7.6 KiB
Python

import keyring
import keyring.errors
from keyring.backends import SecretService
import os
import shutil
import subprocess
import tempfile
import stat
from typing import Optional
from pyimage_ui.password_dialog import PasswordDialog
class EncryptionManager:
def __init__(self, logger, app=None):
self.logger = logger
self.app = app
self.service_id = "py-backup-encryption"
self.session_password = None
self.save_to_keyring = False
def get_password_from_keyring(self, username: str) -> Optional[str]:
try:
return keyring.get_password(self.service_id, username)
except keyring.errors.InitError as e:
self.logger.log(f"Could not initialize keyring. Keyring is not available on this system or is not configured correctly. Error: {e}")
return None
except Exception as e:
self.logger.log(f"Could not get password from keyring: {e}")
return None
def set_password_in_keyring(self, username: str, password: str):
try:
keyring.set_password(self.service_id, username, password)
self.logger.log(f"Password for {username} stored in keyring.")
except Exception as e:
self.logger.log(f"Could not set password in keyring: {e}")
def delete_password_from_keyring(self, username: str):
try:
keyring.delete_password(self.service_id, username)
self.logger.log(f"Password for {username} deleted from keyring.")
except Exception as e:
self.logger.log(f"Could not delete password from keyring: {e}")
def set_session_password(self, password: str, save_to_keyring: bool):
self.session_password = password
self.save_to_keyring = save_to_keyring
def clear_session_password(self):
self.session_password = None
self.save_to_keyring = False
def get_password(self, username: str, confirm: bool = True) -> Optional[str]:
if self.session_password:
if self.save_to_keyring:
self.set_password_in_keyring(username, self.session_password)
return self.session_password
password = self.get_password_from_keyring(username)
if password:
return password
# If not in keyring, prompt the user
dialog = PasswordDialog(self.app, title=f"Enter password for {username}", confirm=confirm)
password = dialog.get_password()
if password:
# Ask to save the password
# For now, we don't save it automatically. This will be a UI option.
pass
return password
def setup_encrypted_backup(self, queue, container_path: str, size_gb: int) -> Optional[str]:
"""Sets up a LUKS encrypted container for the backup."""
self.logger.log(f"Setting up encrypted container at {container_path}")
if not shutil.which("cryptsetup"):
self.logger.log("Error: cryptsetup is not installed.")
queue.put(('error', "cryptsetup is not installed."))
return None
mapper_name = f"pybackup_{os.path.basename(container_path)}"
mount_point = f"/mnt/{mapper_name}"
if os.path.exists(container_path):
# Container exists, try to open it
self.logger.log(f"Encrypted container {container_path} already exists. Attempting to unlock.")
password = self.get_password(os.path.basename(container_path), confirm=False)
if not password:
self.logger.log("User cancelled password entry.")
queue.put(('completion', {'status': 'cancelled', 'returncode': -1}))
return None
script = f"""
echo -n '{password}' | cryptsetup luksOpen {container_path} {mapper_name} -
mkdir -p {mount_point}
mount /dev/mapper/{mapper_name} {mount_point}
"""
if not self._execute_as_root(script):
self.logger.log("Failed to unlock existing encrypted container.")
queue.put(('error', "Failed to unlock existing encrypted container."))
return None
else:
# Container does not exist, create it
password = self.get_password(os.path.basename(container_path), confirm=True)
if not password:
self.logger.log("User cancelled password entry.")
queue.put(('completion', {'status': 'cancelled', 'returncode': -1}))
return None
script = f"""
fallocate -l {size_gb}G {container_path}
echo -n '{password}' | cryptsetup luksFormat {container_path} -
echo -n '{password}' | cryptsetup luksOpen {container_path} {mapper_name} -
mkfs.ext4 /dev/mapper/{mapper_name}
mkdir -p {mount_point}
mount /dev/mapper/{mapper_name} {mount_point}
"""
if not self._execute_as_root(script):
self.logger.log("Failed to setup encrypted container.")
self._cleanup_encrypted_backup(mapper_name, mount_point)
queue.put(('error', "Failed to setup encrypted container."))
return None
self.logger.log(f"Encrypted container is ready and mounted at {mount_point}")
return mount_point
def cleanup_encrypted_backup(self, mapper_name: str, mount_point: str):
"""Unmounts and closes the LUKS container."""
self.logger.log(f"Cleaning up encrypted backup: {mapper_name}")
script = f"""
umount {mount_point} || echo "Mount point not found or already unmounted."
cryptsetup luksClose {mapper_name} || echo "Mapper not found or already closed."
rmdir {mount_point} || echo "Mount point directory not found."
"""
if not self._execute_as_root(script):
self.logger.log("Encrypted backup cleanup script failed.")
def _execute_as_root(self, script_content: str) -> bool:
"""Executes a shell script with root privileges using pkexec."""
script_path = ''
try:
# Use tempfile for secure temporary file creation
with tempfile.NamedTemporaryFile(mode='w', delete=False, prefix="pybackup_script_", suffix=".sh", dir="/tmp") as tmp_script:
tmp_script.write("#!/bin/bash\n\n")
tmp_script.write("set -e\n\n") # Exit on error
tmp_script.write(script_content)
script_path = tmp_script.name
# Make the script executable
os.chmod(script_path, stat.S_IRWXU | stat.S_IRGRP |
stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
command = ['pkexec', script_path]
self.logger.log(
f"Executing privileged command via script: {script_path}")
self.logger.log(
f"Script content:\n---\n{script_content}\n---")
result = subprocess.run(
command, capture_output=True, text=True, check=False)
if result.returncode == 0:
self.logger.log(
f"Privileged script executed successfully. Output:\n{result.stdout}")
return True
else:
self.logger.log(
f"Privileged script failed. Return code: {result.returncode}\nStderr: {result.stderr}\nStdout: {result.stdout}")
return False
except Exception as e:
self.logger.log(
f"Failed to set up or execute privileged command: {e}")
return False
finally:
if script_path and os.path.exists(script_path):
os.remove(script_path)