container separate user and system part one

This commit is contained in:
2025-09-12 22:26:06 +02:00
parent 4a96fd1547
commit 64374c221e
7 changed files with 166 additions and 170 deletions

View File

@@ -80,8 +80,9 @@ class BackupManager:
"""Helper function to construct the path to a specific backup profile directory.""" """Helper function to construct the path to a specific backup profile directory."""
pybackup_dir = os.path.join(base_dest_path, "pybackup") pybackup_dir = os.path.join(base_dest_path, "pybackup")
if is_encrypted: if is_encrypted:
profile_name = "system" if is_system else source_name
base_data_dir = self.encryption_manager.get_mount_point( base_data_dir = self.encryption_manager.get_mount_point(
base_dest_path) base_dest_path, profile_name)
else: else:
base_data_dir = os.path.join(pybackup_dir, "unencrypted") base_data_dir = os.path.join(pybackup_dir, "unencrypted")
@@ -124,8 +125,9 @@ class BackupManager:
mount_point = None mount_point = None
if is_encrypted: if is_encrypted:
profile_name = "system" if is_system else source_name
mount_point = self.encryption_manager.prepare_encrypted_destination( mount_point = self.encryption_manager.prepare_encrypted_destination(
dest_path, is_system, source_size, queue) dest_path, profile_name, is_system, source_size, queue)
if not mount_point: if not mount_point:
self.logger.log( self.logger.log(
@@ -411,11 +413,14 @@ class BackupManager:
self.logger.log( self.logger.log(
f"Metadata file found for {backup_dir_name} but data directory not found at {full_path}. Skipping.") f"Metadata file found for {backup_dir_name} but data directory not found at {full_path}. Skipping.")
continue continue
if not self.encryption_manager.is_mounted(base_dest_path):
profile_name = "system" if is_system else source_name
if not self.encryption_manager.is_mounted(base_dest_path, profile_name):
self.logger.log( self.logger.log(
f"Mounting {base_dest_path} to check for encrypted backup data...") f"Mounting container for profile {profile_name} at {base_dest_path} to check for backup data...")
self.encryption_manager.prepare_encrypted_destination( self.encryption_manager.prepare_encrypted_destination(
base_dest_path, is_system, 0, self.app.queue if self.app else None) base_dest_path, profile_name, is_system, 0, self.app.queue if self.app else None)
if not os.path.isdir(full_path): if not os.path.isdir(full_path):
self.logger.log( self.logger.log(
f"Data directory {full_path} still not found after mount attempt. Skipping.") f"Data directory {full_path} still not found after mount attempt. Skipping.")
@@ -673,7 +678,7 @@ class BackupManager:
thread.daemon = True thread.daemon = True
thread.start() thread.start()
def _run_delete(self, path_to_delete: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str]): def _run_delete(self, path_to_delete: str, is_encrypted: bool, is_system: bool, source_name: str, base_dest_path: str, queue, password: Optional[str]):
try: try:
backup_dir_name = os.path.basename(path_to_delete.rstrip('/')) backup_dir_name = os.path.basename(path_to_delete.rstrip('/'))
metadata_file_path = os.path.join( metadata_file_path = os.path.join(
@@ -682,20 +687,17 @@ class BackupManager:
if is_encrypted: if is_encrypted:
self.logger.log( self.logger.log(
f"Starting encrypted deletion for {path_to_delete}") f"Starting encrypted deletion for {path_to_delete}")
profile_name = "system" if is_system else source_name
mount_point = self.encryption_manager.get_mount_point( mount_point = self.encryption_manager.get_mount_point(
base_dest_path) base_dest_path, profile_name)
if not mount_point or not self.encryption_manager.is_mounted(base_dest_path):
if password: if not self.encryption_manager.is_mounted(base_dest_path, profile_name):
mount_point = self.encryption_manager.mount_for_deletion( self.logger.log(f"Container for profile {profile_name} not mounted. Mounting for deletion.")
base_dest_path, is_system, password) # Note: mount_for_deletion is not profile-aware yet, we are calling _open_and_mount directly
else: if not self.encryption_manager._open_and_mount(base_dest_path, profile_name, is_system, password_override=password):
self.logger.log( self.logger.log("Failed to unlock container for deletion.")
"Password not provided for encrypted deletion.") queue.put(('deletion_complete', False))
return
if not mount_point:
self.logger.log("Failed to unlock container for deletion.")
queue.put(('deletion_complete', False))
return
internal_path_to_delete = os.path.join( internal_path_to_delete = os.path.join(
mount_point, os.path.basename(os.path.dirname(path_to_delete)), backup_dir_name) mount_point, os.path.basename(os.path.dirname(path_to_delete)), backup_dir_name)

View File

@@ -1,4 +1,3 @@
import keyring import keyring
import keyring.errors import keyring.errors
from keyring.backends import SecretService from keyring.backends import SecretService
@@ -39,16 +38,17 @@ class EncryptionManager:
except json.JSONDecodeError: except json.JSONDecodeError:
return [] return []
def add_to_lock_file(self, base_path, mapper_name): def add_to_lock_file(self, base_path, profile_name, mapper_name):
locks = self._read_lock_file() locks = self._read_lock_file()
if not any(lock['base_path'] == base_path for lock in locks): if not any(l['mapper_name'] == mapper_name for l in locks):
locks.append({"base_path": base_path, "mapper_name": mapper_name}) locks.append(
{"base_path": base_path, "profile_name": profile_name, "mapper_name": mapper_name})
self._write_lock_file(locks) self._write_lock_file(locks)
def remove_from_lock_file(self, base_path): def remove_from_lock_file(self, mapper_name_to_remove: str):
locks = self._read_lock_file() locks = self._read_lock_file()
updated_locks = [ updated_locks = [l for l in locks if l['mapper_name']
lock for lock in locks if lock['base_path'] != base_path] != mapper_name_to_remove]
self._write_lock_file(updated_locks) self._write_lock_file(updated_locks)
def get_password_from_keyring(self, username: str) -> Optional[str]: def get_password_from_keyring(self, username: str) -> Optional[str]:
@@ -86,18 +86,19 @@ class EncryptionManager:
self.set_password_in_keyring(username, password) self.set_password_in_keyring(username, password)
return password return password
def get_container_path(self, base_dest_path: str) -> str: def get_container_path(self, base_dest_path: str, profile_name: str) -> str:
"""Returns the path for the LUKS container file itself."""
pybackup_dir = os.path.join(base_dest_path, "pybackup") pybackup_dir = os.path.join(base_dest_path, "pybackup")
return os.path.join(pybackup_dir, "pybackup_encrypted.luks") container_filename = f"pybackup_encrypted_{profile_name}.luks"
return os.path.join(pybackup_dir, container_filename)
def get_key_file_path(self, base_dest_path: str) -> str: def get_key_file_path(self, base_dest_path: str, profile_name: str) -> str:
pybackup_dir = os.path.join(base_dest_path, "pybackup") pybackup_dir = os.path.join(base_dest_path, "pybackup")
return os.path.join(pybackup_dir, "luks.keyfile") key_filename = f"luks_{profile_name}.keyfile"
return os.path.join(pybackup_dir, key_filename)
def create_and_add_key_file(self, base_dest_path: str, password: str) -> Optional[str]: def create_and_add_key_file(self, base_dest_path: str, profile_name: str, password: str) -> Optional[str]:
key_file_path = self.get_key_file_path(base_dest_path) key_file_path = self.get_key_file_path(base_dest_path, profile_name)
container_path = self.get_container_path(base_dest_path) container_path = self.get_container_path(base_dest_path, profile_name)
try: try:
with open(key_file_path, 'wb') as f: with open(key_file_path, 'wb') as f:
f.write(os.urandom(4096)) f.write(os.urandom(4096))
@@ -119,67 +120,50 @@ class EncryptionManager:
os.remove(key_file_path) os.remove(key_file_path)
return None return None
def _get_password_or_key_cmd(self, base_dest_path: str, username: str) -> Tuple[str, Optional[str]]: def _get_password_or_key_cmd(self, base_dest_path: str, profile_name: str, username: str) -> Tuple[str, Optional[str]]:
# 1. Check cache and keyring (without triggering dialog) password = self.password_cache.get(username)
password = self.password_cache.get(
username) or self.get_password_from_keyring(username)
if password: if password:
self.logger.log(
"Using password from cache or keyring for LUKS operation.")
self.password_cache[username] = password # ensure it's cached
return "-", password return "-", password
# 2. Check for key file password = self.get_password_from_keyring(username)
key_file_path = self.get_key_file_path(base_dest_path) if password:
self.password_cache[username] = password
return "-", password
key_file_path = self.get_key_file_path(base_dest_path, profile_name)
if os.path.exists(key_file_path): if os.path.exists(key_file_path):
self.logger.log(
f"Using key file for LUKS operation: {key_file_path}")
return f'--key-file "{key_file_path}"' return f'--key-file "{key_file_path}"'
# 3. If nothing found, prompt for password
self.logger.log(
"No password in keyring and no keyfile found. Prompting user.")
# This will now definitely open the dialog
password = self.get_password(username, confirm=False) password = self.get_password(username, confirm=False)
if not password: if not password:
return "", None return "", None
return "-", password return "-", password
def is_encrypted(self, base_dest_path: str) -> bool: def is_encrypted(self, base_dest_path: str, profile_name: str) -> bool:
return os.path.exists(self.get_container_path(base_dest_path)) return os.path.exists(self.get_container_path(base_dest_path, profile_name))
def get_mount_point(self, base_dest_path: str) -> str: def get_mount_point(self, base_dest_path: str, profile_name: str) -> str:
"""Constructs the unique, static mount point path for a given destination.""" return os.path.join(base_dest_path, "pybackup", f"encrypted_{profile_name.lower()}")
return os.path.join(base_dest_path, "pybackup", "encrypted")
def is_mounted(self, base_dest_path: str) -> bool: def is_mounted(self, base_dest_path: str, profile_name: str) -> bool:
mount_point = self.get_mount_point(base_dest_path) mount_point = self.get_mount_point(base_dest_path, profile_name)
return os.path.ismount(mount_point) or base_dest_path in self.mounted_destinations return os.path.ismount(mount_point) or (base_dest_path, profile_name) in self.mounted_destinations
def mount_for_deletion(self, base_dest_path: str, is_system: bool, password: str) -> Optional[str]: def prepare_encrypted_destination(self, base_dest_path: str, profile_name: str, is_system: bool, source_size: int, queue) -> Optional[str]:
self.logger.log("Mounting container for deletion operation.") container_path = self.get_container_path(base_dest_path, profile_name)
if self._open_and_mount(base_dest_path, is_system, password):
mount_point = self.get_mount_point(base_dest_path)
self.mounted_destinations.add(base_dest_path)
return mount_point
self.logger.log("Failed to mount container for deletion.")
return None
def prepare_encrypted_destination(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]:
container_path = self.get_container_path(base_dest_path)
if os.path.exists(container_path): if os.path.exists(container_path):
return self._handle_existing_container(base_dest_path, is_system, source_size, queue) return self._handle_existing_container(base_dest_path, profile_name, is_system, source_size, queue)
else: else:
return self._handle_new_container(base_dest_path, is_system, source_size, queue) return self._handle_new_container(base_dest_path, profile_name, is_system, source_size, queue)
def _handle_existing_container(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]:
self.logger.log("Handling existing container.")
def _handle_existing_container(self, base_dest_path: str, profile_name: str, is_system: bool, source_size: int, queue) -> Optional[str]:
self.logger.log(
f"Handling existing container for profile: {profile_name}")
username = os.path.basename(base_dest_path.rstrip('/')) username = os.path.basename(base_dest_path.rstrip('/'))
mount_point = self.get_mount_point(base_dest_path) mount_point = self.get_mount_point(base_dest_path, profile_name)
if not self.is_mounted(base_dest_path): if not self.is_mounted(base_dest_path, profile_name):
if not self._open_and_mount(base_dest_path, is_system): if not self._open_and_mount(base_dest_path, profile_name, is_system):
self.logger.log("Failed to mount container for size check.") self.logger.log("Failed to mount container for size check.")
return None return None
@@ -188,33 +172,38 @@ class EncryptionManager:
if required_space > free_space: if required_space > free_space:
self.logger.log( self.logger.log(
f"Resize needed. Free: {free_space}, Required: {required_space}") f"Resize needed for {profile_name}. Free: {free_space}, Required: {required_space}")
queue.put(('status_update', "Container zu klein. Vergrößere...")) queue.put(
('status_update', f"Container für {profile_name} zu klein. Vergrößere..."))
key_or_pass_arg, password = self._get_password_or_key_cmd(
base_dest_path, profile_name, username)
if not key_or_pass_arg:
return None
current_total = shutil.disk_usage(mount_point).total current_total = shutil.disk_usage(mount_point).total
needed_additional = required_space - free_space needed_additional = required_space - free_space
new_total_size = current_total + needed_additional new_total_size = current_total + needed_additional
new_total_size = math.ceil(new_total_size / 4096) * 4096
container_path = self.get_container_path(base_dest_path) container_path = self.get_container_path(
mapper_name = f"pybackup_luks_{username}" base_dest_path, profile_name)
mapper_name = f"pybackup_luks_{username}_{profile_name}"
chown_cmd = self._get_chown_command(mount_point, is_system) chown_cmd = self._get_chown_command(mount_point, is_system)
key_or_pass_arg, password = self._get_password_or_key_cmd(
base_dest_path, username)
if not key_or_pass_arg:
return None
luks_open_cmd = f'echo -n "$LUKSPASS" | cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}' if password else f'cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}' luks_open_cmd = f'echo -n "$LUKSPASS" | cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}' if password else f'cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}'
resize_script = f""" resize_script = f"""set -e
# Unmount cleanly first # Unmount cleanly first
umount -l \"{mount_point}\" || true umount \"{mount_point}\"
cryptsetup luksClose {mapper_name} || true cryptsetup luksClose {mapper_name}
# Resize container file # Resize container file
truncate -s {int(new_total_size)} \"{container_path}\" truncate -s {int(new_total_size)} \"{container_path}\"
sleep 1
# Re-open, check, and resize filesystem # Re-open, check, and resize filesystem
{luks_open_cmd} {luks_open_cmd}
cryptsetup resize {mapper_name}
e2fsck -fy \"/dev/mapper/{mapper_name}\" e2fsck -fy \"/dev/mapper/{mapper_name}\"
resize2fs \"/dev/mapper/{mapper_name}\" resize2fs \"/dev/mapper/{mapper_name}\"
@@ -225,115 +214,111 @@ mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"
if not self._execute_as_root(resize_script, password): if not self._execute_as_root(resize_script, password):
self.logger.log("Failed to execute resize and remount script.") self.logger.log("Failed to execute resize and remount script.")
return None self._open_and_mount(
base_dest_path, profile_name, is_system, password_override=password)
return mount_point
if not self.is_mounted(base_dest_path): self.mounted_destinations.add((base_dest_path, profile_name))
self.logger.log(
"CRITICAL: Mount failed after resize script, but script reported success. Aborting.")
return None
self.mounted_destinations.add(base_dest_path)
return mount_point return mount_point
def _handle_new_container(self, base_dest_path: str, is_system: bool, source_size: int, queue) -> Optional[str]: def _handle_new_container(self, base_dest_path: str, profile_name: str, is_system: bool, source_size: int, queue) -> Optional[str]:
self.logger.log("Handling new container creation.") self.logger.log(
f"Handling new container creation for profile: {profile_name}")
size_gb = math.ceil((source_size * 1.2) / (1024**3)) + 5 size_gb = math.ceil((source_size * 1.2) / (1024**3)) + 5
username = os.path.basename(base_dest_path.rstrip('/')) username = os.path.basename(base_dest_path.rstrip('/'))
password = self.get_password(username, confirm=True) password = self.get_password(username, confirm=True)
if not password: if not password:
return None return None
container_path = self.get_container_path(base_dest_path) container_path = self.get_container_path(base_dest_path, profile_name)
mount_point = self.get_mount_point(base_dest_path) mount_point = self.get_mount_point(base_dest_path, profile_name)
mapper_name = f"pybackup_luks_{username}" mapper_name = f"pybackup_luks_{username}_{profile_name}"
chown_cmd = self._get_chown_command(mount_point, is_system) chown_cmd = self._get_chown_command(mount_point, is_system)
script = f""" script = f"""set -e
mkdir -p \"{os.path.dirname(container_path)}\"\n mkdir -p \"{mount_point}\"\n truncate -s {int(size_gb)}G \"{container_path}\"\n echo -n \"$LUKSPASS\" | cryptsetup luksFormat \"{container_path}\" - mkdir -p \"{os.path.dirname(container_path)}\"
mkdir -p \"{mount_point}\"
truncate -s {int(size_gb)}G \"{container_path}\"
echo -n \"$LUKSPASS\" | cryptsetup luksFormat \"{container_path}\" -
echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} - echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} -
mkfs.ext4 \"/dev/mapper/{mapper_name}\" mkfs.ext4 \"/dev/mapper/{mapper_name}\"
mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"\n {chown_cmd}\n """ sleep 1
mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"
{chown_cmd}"""
if not self._execute_as_root(script, password): if not self._execute_as_root(script, password):
return None return None
self.mounted_destinations.add(base_dest_path) self.mounted_destinations.add((base_dest_path, profile_name))
return mount_point return mount_point
def _open_and_mount(self, base_dest_path: str, is_system: bool, password_override: Optional[str] = None) -> bool: def _open_and_mount(self, base_dest_path: str, profile_name: str, is_system: bool, password_override: Optional[str] = None) -> bool:
username = os.path.basename(base_dest_path.rstrip('/')) username = os.path.basename(base_dest_path.rstrip('/'))
key_or_pass_arg, password = "", None key_or_pass_arg, password = "", None
if password_override: if password_override:
password = password_override password = password_override
key_or_pass_arg = "-" key_or_pass_arg = "-"
else: else:
key_or_pass_arg, password = self._get_password_or_key_cmd( key_or_pass_arg, password = self._get_password_or_key_cmd(
base_dest_path, username) base_dest_path, profile_name, username)
if not key_or_pass_arg: if not key_or_pass_arg:
return False return False
container_path = self.get_container_path(base_dest_path) container_path = self.get_container_path(base_dest_path, profile_name)
mount_point = self.get_mount_point(base_dest_path) mount_point = self.get_mount_point(base_dest_path, profile_name)
mapper_name = f"pybackup_luks_{username}" mapper_name = f"pybackup_luks_{username}_{profile_name}"
chown_cmd = self._get_chown_command(mount_point, is_system) chown_cmd = self._get_chown_command(mount_point, is_system)
luks_open_cmd = f'echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} {key_or_pass_arg}' if password else f'cryptsetup luksOpen \"{container_path}\" {mapper_name} {key_or_pass_arg}' luks_open_cmd = f'echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} {key_or_pass_arg}' if password else f'cryptsetup luksOpen \"{container_path}\" {mapper_name} {key_or_pass_arg}'
script = f""" script = f"""set -e
umount -l \"{mount_point}\" || true umount \"{mount_point}\" || true\
cryptsetup luksClose {mapper_name} || true cryptsetup luksClose {mapper_name} || true\
mkdir -p \"{mount_point}\" mkdir -p \"{mount_point}\"\
{luks_open_cmd} {luks_open_cmd}
mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\" mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"\
{chown_cmd} {chown_cmd}
""" """
if self._execute_as_root(script, password): if self._execute_as_root(script, password):
self.add_to_lock_file(base_dest_path, mapper_name) self.add_to_lock_file(base_dest_path, profile_name, mapper_name)
return True return True
return False return False
def unmount_and_reset_owner(self, base_dest_path: str, force_unmap=False): def unmount_and_reset_owner(self, base_dest_path: str, profile_name: str, force_unmap=False):
username = os.path.basename(base_dest_path.rstrip('/')) username = os.path.basename(base_dest_path.rstrip('/'))
mapper_name = f"pybackup_luks_{username}" mapper_name = f"pybackup_luks_{username}_{profile_name}"
if not os.path.exists(f"/dev/mapper/{mapper_name}") and not self.is_mounted(base_dest_path): if not os.path.exists(f"/dev/mapper/{mapper_name}") and not self.is_mounted(base_dest_path, profile_name):
if not force_unmap: if not force_unmap:
return return
self.logger.log(f"Unmounting and resetting owner for {base_dest_path}") self.logger.log(
mount_point = self.get_mount_point(base_dest_path) f"Unmounting and resetting owner for {base_dest_path}/{profile_name}")
mount_point = self.get_mount_point(base_dest_path, profile_name)
script = f""" script = f""" chown root:root \"{mount_point}\" || true
chown root:root \"{mount_point}\" || true umount \"{mount_point}\" || true
umount -l \"{mount_point}\" cryptsetup luksClose {mapper_name} || true
cryptsetup luksClose {mapper_name}
""" """
password = self.password_cache.get(username) password = self.password_cache.get(username)
self._execute_as_root(script, password) self._execute_as_root(script, password)
self.remove_from_lock_file(base_dest_path) self.remove_from_lock_file(mapper_name)
if base_dest_path in self.mounted_destinations: if (base_dest_path, profile_name) in self.mounted_destinations:
self.mounted_destinations.remove(base_dest_path) self.mounted_destinations.remove((base_dest_path, profile_name))
if username in self.password_cache: # Do not clear password cache here, it might be needed by another profile
del self.password_cache[username]
def unmount_all(self): def unmount_all(self):
self.logger.log(f"Unmounting all: {self.mounted_destinations}") self.logger.log(f"Unmounting all: {self.mounted_destinations}")
for path in list(self.mounted_destinations): # Create a copy of the set to avoid issues with modifying it while iterating
self.unmount_and_reset_owner(path, force_unmap=True) for base_path, profile_name in list(self.mounted_destinations):
self.unmount_and_reset_owner(
def unmount_all_encrypted_drives(self, password: str) -> Tuple[bool, str]: base_path, profile_name, force_unmap=True)
for path in list(self.mounted_destinations):
self.unmount_and_reset_owner(path, force_unmap=True)
return True, "Successfully unmounted all drives."
def _get_chown_command(self, mount_point: str, is_system: bool) -> str: def _get_chown_command(self, mount_point: str, is_system: bool) -> str:
if not is_system: if not is_system:
try: try:
uid = os.getuid() uid = os.getuid()
gid = os.getgid() gid = os.getgid()
return f"chown {uid}:{gid} \"{mount_point}\"" return f'chown {uid}:{gid} "{mount_point}"'
except Exception as e: except Exception as e:
self.logger.log( self.logger.log(
f"Could not get current user UID/GID for chown: {e}") f"Could not get current user UID/GID for chown: {e}")
@@ -341,9 +326,7 @@ mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"
def _execute_as_root(self, script_content: str, password_for_stdin: Optional[str] = None) -> bool: def _execute_as_root(self, script_content: str, password_for_stdin: Optional[str] = None) -> bool:
try: try:
if password_for_stdin: password = password_for_stdin if password_for_stdin is not None else ""
escaped_password = password_for_stdin.replace("'", "'\\\''")
script_content = f"LUKSPASS='{escaped_password}'\n{script_content}"
project_root = os.path.abspath( project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..')) os.path.join(os.path.dirname(__file__), '..'))
@@ -355,26 +338,19 @@ mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"
f"CRITICAL: Privileged script runner not found at {runner_script_path}") f"CRITICAL: Privileged script runner not found at {runner_script_path}")
return False return False
command = ['pkexec', runner_script_path] command = ['pkexec', runner_script_path, password, script_content]
log_lines = []
for line in script_content.split('\n'):
if "LUKSPASS=" in line:
log_lines.append("LUKSPASS='[REDACTED]'")
else:
log_lines.append(line)
sanitized_script_content = "\n".join(log_lines)
self.logger.log( self.logger.log(
f"Executing privileged command via runner: {runner_script_path}") f"Executing privileged command via runner: {runner_script_path}")
# Simplified logging to avoid complexity
self.logger.log( self.logger.log(
f"Script content to be piped:\n---\n{sanitized_script_content}\n---") f"Script content passed as argument:\n---\n{script_content}\n---")
result = subprocess.run( result = subprocess.run(
command, input=script_content, capture_output=True, text=True, check=False) command, capture_output=True, text=True, check=False)
if result.returncode == 0: if result.returncode == 0:
log_output = f"Privileged script executed successfully." log_output = ("Privileged script executed successfully.")
if result.stdout: if result.stdout:
log_output += f"\nStdout:\n{result.stdout}" log_output += f"\nStdout:\n{result.stdout}"
if result.stderr: if result.stderr:

View File

@@ -1,5 +1,10 @@
#!/bin/bash #!/bin/bash
# This script executes commands passed to its standard input. # This script executes commands passed as arguments.
# The 'set -e' command ensures that the script will exit immediately if any command fails. # The 'set -e' command ensures that the script will exit immediately if any command fails.
set -e set -e
/bin/bash
# The password is the first argument, script content is the second.
export LUKSPASS="$1"
SCRIPT_TO_RUN="$2"
/bin/bash -c "$SCRIPT_TO_RUN"

View File

@@ -371,13 +371,14 @@ class MainApplication(tk.Tk):
self.destination_total_bytes = total self.destination_total_bytes = total
self.destination_used_bytes = used self.destination_used_bytes = used
# If the destination is already mounted from a previous session, # Check for any pre-existing mounts for all known profiles
# adopt it into the current session's state so it can be cleaned up properly. known_profiles = list(AppConfig.FOLDER_PATHS.keys()) + ["system"]
if self.backup_manager.encryption_manager.is_mounted(backup_dest_path): for profile_name in known_profiles:
app_logger.log( if self.backup_manager.encryption_manager.is_mounted(backup_dest_path, profile_name):
f"Adopting pre-existing mount for {backup_dest_path} into session.") app_logger.log(
self.backup_manager.encryption_manager.mounted_destinations.add( f"Adopting pre-existing mount for profile {profile_name} at {backup_dest_path} into session.")
backup_dest_path) self.backup_manager.encryption_manager.mounted_destinations.add(
(backup_dest_path, profile_name))
if hasattr(self, 'header_frame'): if hasattr(self, 'header_frame'):
self.header_frame.refresh_status() self.header_frame.refresh_status()
@@ -775,13 +776,14 @@ class MainApplication(tk.Tk):
if os.path.exists(mapper_path): if os.path.exists(mapper_path):
stale_mounts_found = True stale_mounts_found = True
app_logger.log( app_logger.log(
f"Found stale mount: {lock['mapper_name']} for path {lock['base_path']}. Attempting to close.") f"Found stale mount: {lock['mapper_name']} for path {lock['base_path']} and profile {lock['profile_name']}. Attempting to close.")
self.backup_manager.encryption_manager.unmount_and_reset_owner( self.backup_manager.encryption_manager.unmount_and_reset_owner(
lock['base_path'], force_unmap=True) lock['base_path'], lock['profile_name'], force_unmap=True)
if not stale_mounts_found: if not stale_mounts_found:
app_logger.log("No stale mounts detected.") app_logger.log("No stale mounts detected.")
if locks: if locks:
# If no stale mounts were found, the lock file should be empty.
self.backup_manager.encryption_manager._write_lock_file([]) self.backup_manager.encryption_manager._write_lock_file([])
except Exception as e: except Exception as e:
@@ -880,3 +882,5 @@ if __name__ == "__main__":
else: else:
app = MainApplication() app = MainApplication()
app.mainloop() app.mainloop()
app = MainApplication()
app.mainloop()

View File

@@ -74,7 +74,14 @@ class HeaderFrame(tk.Frame):
dest_path = self.app.destination_path dest_path = self.app.destination_path
app_logger.log(f"HeaderFrame: Destination path is '{dest_path}'") app_logger.log(f"HeaderFrame: Destination path is '{dest_path}'")
if not dest_path or not self.encryption_manager.is_encrypted(dest_path): source_name = self.app.left_canvas_data.get('folder')
if not source_name:
self.keyring_status_label.config(text="")
return
profile_name = "system" if source_name == "Computer" else source_name
if not dest_path or not self.encryption_manager.is_encrypted(dest_path, profile_name):
app_logger.log( app_logger.log(
"HeaderFrame: No destination path or not encrypted. Clearing status.") "HeaderFrame: No destination path or not encrypted. Clearing status.")
# Clear status if not encrypted # Clear status if not encrypted
@@ -85,7 +92,7 @@ class HeaderFrame(tk.Frame):
username = os.path.basename(dest_path.rstrip('/')) username = os.path.basename(dest_path.rstrip('/'))
app_logger.log(f"HeaderFrame: Username is '{username}'") app_logger.log(f"HeaderFrame: Username is '{username}'")
is_mounted = self.encryption_manager.is_mounted(dest_path) is_mounted = self.encryption_manager.is_mounted(dest_path, profile_name)
app_logger.log(f"HeaderFrame: Is mounted? {is_mounted}") app_logger.log(f"HeaderFrame: Is mounted? {is_mounted}")
if is_mounted: if is_mounted:
@@ -104,7 +111,7 @@ class HeaderFrame(tk.Frame):
username) username)
app_logger.log(f"HeaderFrame: Key in keyring? {key_in_keyring}") app_logger.log(f"HeaderFrame: Key in keyring? {key_in_keyring}")
key_file_exists = os.path.exists( key_file_exists = os.path.exists(
self.encryption_manager.get_key_file_path(dest_path)) self.encryption_manager.get_key_file_path(dest_path, profile_name))
app_logger.log(f"HeaderFrame: Key file exists? {key_file_exists}") app_logger.log(f"HeaderFrame: Key file exists? {key_file_exists}")
if key_in_keyring: if key_in_keyring:

View File

@@ -161,6 +161,7 @@ class SystemBackupContentFrame(ttk.Frame):
path_to_delete=folder_to_delete, path_to_delete=folder_to_delete,
is_encrypted=is_encrypted, is_encrypted=is_encrypted,
is_system=True, is_system=True,
source_name="system",
base_dest_path=self.backup_path, base_dest_path=self.backup_path,
password=password, password=password,
queue=self.winfo_toplevel().queue queue=self.winfo_toplevel().queue

View File

@@ -148,6 +148,7 @@ class UserBackupContentFrame(ttk.Frame):
path_to_delete=folder_to_delete, path_to_delete=folder_to_delete,
is_encrypted=is_encrypted, is_encrypted=is_encrypted,
is_system=False, is_system=False,
source_name=selected_backup.get('source'),
base_dest_path=self.backup_path, base_dest_path=self.backup_path,
password=password, password=password,
queue=self.winfo_toplevel().queue queue=self.winfo_toplevel().queue