180 lines
7.8 KiB
Python
180 lines
7.8 KiB
Python
import keyring
|
|
import keyring.errors
|
|
from keyring.backends import SecretService
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
import stat
|
|
from typing import Optional
|
|
|
|
from pyimage_ui.password_dialog import PasswordDialog
|
|
|
|
class EncryptionManager:
|
|
def __init__(self, logger, app=None):
|
|
try:
|
|
keyring.set_keyring(SecretService.Keyring())
|
|
except Exception as e:
|
|
logger.log(f"Failed to set keyring backend to SecretService: {e}")
|
|
self.logger = logger
|
|
self.app = app
|
|
self.service_id = "py-backup-encryption"
|
|
self.session_password = None
|
|
|
|
def get_password_from_keyring(self, username: str) -> Optional[str]:
|
|
try:
|
|
return keyring.get_password(self.service_id, username)
|
|
except keyring.errors.InitError as e:
|
|
logger.log(f"Could not initialize keyring. Keyring is not available on this system or is not configured correctly. Error: {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.log(f"Could not get password from keyring: {e}")
|
|
return None
|
|
|
|
def set_password_in_keyring(self, username: str, password: str) -> bool:
|
|
try:
|
|
keyring.set_password(self.service_id, username, password)
|
|
self.logger.log(f"Password for {username} stored in keyring.")
|
|
return True
|
|
except Exception as e:
|
|
self.logger.log(f"Could not set password in keyring: {e}")
|
|
return False
|
|
|
|
def delete_password_from_keyring(self, username: str):
|
|
try:
|
|
keyring.delete_password(self.service_id, username)
|
|
self.logger.log(f"Password for {username} deleted from keyring.")
|
|
except Exception as e:
|
|
self.logger.log(f"Could not delete password from keyring: {e}")
|
|
|
|
def clear_session_password(self):
|
|
self.session_password = None
|
|
|
|
def get_password(self, username: str, confirm: bool = True) -> Optional[str]:
|
|
if self.session_password:
|
|
return self.session_password
|
|
|
|
password = self.get_password_from_keyring(username)
|
|
if password:
|
|
self.session_password = password
|
|
return password
|
|
|
|
dialog = PasswordDialog(self.app, title=f"Enter password for {username}", confirm=confirm)
|
|
password, save_to_keyring = dialog.get_password()
|
|
if password and save_to_keyring:
|
|
self.set_password_in_keyring(username, password)
|
|
|
|
if password:
|
|
self.session_password = password
|
|
|
|
return password
|
|
|
|
def setup_encrypted_backup(self, queue, base_path: str, size_gb: int, password: str) -> Optional[str]:
|
|
"""Sets up a persistent LUKS encrypted container for the backup destination."""
|
|
self.logger.log(f"Setting up encrypted container at {base_path}")
|
|
|
|
if not shutil.which("cryptsetup"):
|
|
self.logger.log("Error: cryptsetup is not installed.")
|
|
queue.put(('error', "cryptsetup is not installed."))
|
|
return None
|
|
|
|
container_file = "pybackup_encrypted.luks"
|
|
container_path = os.path.join(base_path, container_file)
|
|
mapper_name = f"pybackup_{os.path.basename(base_path)}"
|
|
mount_point = f"/mnt/{mapper_name}"
|
|
|
|
if not password:
|
|
self.logger.log("No password provided for encryption.")
|
|
queue.put(('error', "No password provided for encryption."))
|
|
return None
|
|
|
|
# If mount point already exists, something is wrong. Clean up first.
|
|
if os.path.ismount(mount_point):
|
|
self.logger.log(f"Mount point {mount_point} already exists. Cleaning up before proceeding.")
|
|
self.cleanup_encrypted_backup(mapper_name, mount_point)
|
|
|
|
if os.path.exists(container_path):
|
|
self.logger.log(f"Encrypted container {container_path} already exists. Attempting to unlock.")
|
|
script = f"""
|
|
mkdir -p {mount_point}
|
|
echo -n '{password}' | cryptsetup luksOpen {container_path} {mapper_name} -
|
|
mount /dev/mapper/{mapper_name} {mount_point}
|
|
"""
|
|
if not self._execute_as_root(script):
|
|
self.logger.log("Failed to unlock existing encrypted container. Check password or permissions.")
|
|
queue.put(('error', "Failed to unlock existing encrypted container."))
|
|
# Clean up failed mount attempt
|
|
self.cleanup_encrypted_backup(mapper_name, mount_point)
|
|
return None
|
|
else:
|
|
self.logger.log(f"Creating new encrypted container: {container_path}")
|
|
script = f"""
|
|
fallocate -l {size_gb}G {container_path}
|
|
echo -n '{password}' | cryptsetup luksFormat {container_path} -
|
|
mkdir -p {mount_point}
|
|
echo -n '{password}' | cryptsetup luksOpen {container_path} {mapper_name} -
|
|
mkfs.ext4 /dev/mapper/{mapper_name}
|
|
mount /dev/mapper/{mapper_name} {mount_point}
|
|
"""
|
|
|
|
if not self._execute_as_root(script):
|
|
self.logger.log("Failed to create and setup encrypted container.")
|
|
self._cleanup_encrypted_backup(mapper_name, mount_point)
|
|
# Also remove the failed container file
|
|
if os.path.exists(container_path):
|
|
os.remove(container_path) # This should be done with pkexec as well
|
|
queue.put(('error', "Failed to setup encrypted container."))
|
|
return None
|
|
|
|
self.logger.log(f"Encrypted container is ready and mounted at {mount_point}")
|
|
return mount_point
|
|
|
|
def cleanup_encrypted_backup(self, mapper_name: str, mount_point: str):
|
|
"""Unmounts and closes the LUKS container."""
|
|
self.logger.log(f"Cleaning up encrypted backup: {mapper_name}")
|
|
script = f"""
|
|
umount {mount_point} || echo "Mount point {mount_point} not found or already unmounted."
|
|
cryptsetup luksClose {mapper_name} || echo "Mapper {mapper_name} not found or already closed."
|
|
rmdir {mount_point} || echo "Mount point directory {mount_point} not found or already removed."
|
|
"""
|
|
if not self._execute_as_root(script):
|
|
self.logger.log("Encrypted backup cleanup script failed.")
|
|
|
|
def _execute_as_root(self, script_content: str) -> bool:
|
|
"""Executes a shell script with root privileges using pkexec."""
|
|
script_path = ''
|
|
try:
|
|
with tempfile.NamedTemporaryFile(mode='w', delete=False, prefix="pybackup_script_", suffix=".sh", dir="/tmp") as tmp_script:
|
|
tmp_script.write("#!/bin/bash\n\n")
|
|
tmp_script.write("set -e\n\n")
|
|
tmp_script.write(script_content)
|
|
script_path = tmp_script.name
|
|
|
|
os.chmod(script_path, stat.S_IRWXU | stat.S_IRGRP |
|
|
stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
|
|
|
|
command = ['pkexec', script_path]
|
|
|
|
self.logger.log(
|
|
f"Executing privileged command via script: {script_path}")
|
|
self.logger.log(
|
|
f"Script content:\n---\n{script_content}\n---")
|
|
|
|
result = subprocess.run(
|
|
command, capture_output=True, text=True, check=False)
|
|
|
|
if result.returncode == 0:
|
|
self.logger.log(
|
|
f"Privileged script executed successfully. Output:\n{result.stdout}")
|
|
return True
|
|
else:
|
|
self.logger.log(
|
|
f"Privileged script failed. Return code: {result.returncode}\nStderr: {result.stderr}\nStdout: {result.stdout}")
|
|
return False
|
|
except Exception as e:
|
|
self.logger.log(
|
|
f"Failed to set up or execute privileged command: {e}")
|
|
return False
|
|
finally:
|
|
if script_path and os.path.exists(script_path):
|
|
os.remove(script_path) |