feat: Improve encrypted drive unmount robustness and UI feedback

This commit addresses several issues related to encrypted drive handling during application shutdown and provides better user feedback.

- **Fix(GUI):** Prevent `bgerror` by cancelling pending `_process_queue` calls when the application is closing.
- **Feat(Unmount):** Implement logic to prevent application closure if encrypted drives fail to unmount gracefully. A `MessageDialog` is now displayed to the user in such cases.
- **Feat(UI):** Enhance `HeaderFrame` to display the current mount status of encrypted drives, providing immediate visual feedback.
- **Fix(Syntax):** Correct `SyntaxError: unterminated f-string literal` in `encryption_manager.py` by using triple-quoted f-strings for multi-line content.
- **Fix(Translation):** Add missing translation strings (`unmount_failed_title`, `unmount_failed_message`) to `core/pbp_app_config.py`.
- **Fix(Unmount):** Remove `|| true` from shell commands in `unmount_and_reset_owner` to ensure actual unmount failures are correctly reported and handled by the application logic.
This commit is contained in:
2025-09-13 00:08:42 +02:00
parent 9682e41710
commit 9983a9769f
4 changed files with 89 additions and 68 deletions

View File

@@ -190,25 +190,25 @@ class EncryptionManager:
base_dest_path, profile_name)
mapper_name = f"pybackup_luks_{username}_{profile_name}"
chown_cmd = self._get_chown_command(mount_point, is_system)
luks_open_cmd = f'echo -n \"$LUKSPASS\" | cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}' if password else f'cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}'
luks_open_cmd = f'echo -n "$LUKSPASS" | cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}' if password else f'cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}'
resize_script = f"""set -e
# Unmount cleanly first
umount \"{mount_point}\"
umount "{mount_point}"
cryptsetup luksClose {mapper_name}
# Resize container file
truncate -s {int(new_total_size)} \"{container_path}\"
truncate -s {int(new_total_size)} "{container_path}"
sleep 1
# Re-open, check, and resize filesystem
{luks_open_cmd}
cryptsetup resize {mapper_name}
e2fsck -fy \"/dev/mapper/{mapper_name}\"
resize2fs \"/dev/mapper/{mapper_name}\"
e2fsck -fy "/dev/mapper/{mapper_name}"
resize2fs "/dev/mapper/{mapper_name}"
# Now mount it
mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"
mount "/dev/mapper/{mapper_name}" "{mount_point}"
{chown_cmd}
"""
@@ -236,14 +236,14 @@ mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"
chown_cmd = self._get_chown_command(mount_point, is_system)
script = f"""set -e
mkdir -p \"{os.path.dirname(container_path)}\"
mkdir -p \"{mount_point}\"
truncate -s {int(size_gb)}G \"{container_path}\"
echo -n \"$LUKSPASS\" | cryptsetup luksFormat \"{container_path}\" -
echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} -
mkfs.ext4 \"/dev/mapper/{mapper_name}\"
mkdir -p "{os.path.dirname(container_path)}"
mkdir -p "{mount_point}"
truncate -s {int(size_gb)}G "{container_path}"
echo -n "$LUKSPASS" | cryptsetup luksFormat "{container_path}" -
echo -n "$LUKSPASS" | cryptsetup luksOpen "{container_path}" {mapper_name} -
mkfs.ext4 "/dev/mapper/{mapper_name}"
sleep 1
mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"
mount "/dev/mapper/{mapper_name}" "{mount_point}"
{chown_cmd}"""
if not self._execute_as_root(script, password):
return None
@@ -268,49 +268,63 @@ mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"
mount_point = self.get_mount_point(base_dest_path, profile_name)
mapper_name = f"pybackup_luks_{username}_{profile_name}"
chown_cmd = self._get_chown_command(mount_point, is_system)
luks_open_cmd = f'echo -n \"$LUKSPASS\" | cryptsetup luksOpen \"{container_path}\" {mapper_name} {key_or_pass_arg}' if password else f'cryptsetup luksOpen \"{container_path}\" {mapper_name} {key_or_pass_arg}'
luks_open_cmd = f'echo -n "$LUKSPASS" | cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}' if password else f'cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}'
script = f"""set -e
umount \"{mount_point}\" || true
umount "{mount_point}" || true
cryptsetup luksClose {mapper_name} || true
mkdir -p \"{mount_point}\"
mkdir -p "{mount_point}"
{luks_open_cmd}
mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"
mount "/dev/mapper/{mapper_name}" "{mount_point}"
{chown_cmd}"""
if self._execute_as_root(script, password):
self.add_to_lock_file(base_dest_path, profile_name, mapper_name)
if self.app and hasattr(self.app, 'header_frame'):
self.app.header_frame.refresh_status()
return True
return False
def unmount_and_reset_owner(self, base_dest_path: str, profile_name: str, force_unmap=False):
def unmount_and_reset_owner(self, base_dest_path: str, profile_name: str, force_unmap=False) -> bool:
username = os.path.basename(base_dest_path.rstrip('/'))
mapper_name = f"pybackup_luks_{username}_{profile_name}"
if not os.path.exists(f"/dev/mapper/{mapper_name}") and not self.is_mounted(base_dest_path, profile_name):
if not force_unmap:
return
return True # Already unmounted or not present, consider it successful
self.logger.log(
f"Unmounting and resetting owner for {base_dest_path}/{profile_name}")
mount_point = self.get_mount_point(base_dest_path, profile_name)
script = f"""chown root:root \"{mount_point}\" || true
umount \"{mount_point}\" || true
cryptsetup luksClose {mapper_name} || true
script = f"""chown root:root "{mount_point}"
umount "{mount_point}"
cryptsetup luksClose {mapper_name}
"""
password = self.password_cache.get(username)
self._execute_as_root(script, password)
self.remove_from_lock_file(mapper_name)
if (base_dest_path, profile_name) in self.mounted_destinations:
self.mounted_destinations.remove((base_dest_path, profile_name))
success = self._execute_as_root(script, password)
if success:
self.remove_from_lock_file(mapper_name)
if (base_dest_path, profile_name) in self.mounted_destinations:
self.mounted_destinations.remove(
(base_dest_path, profile_name))
self.logger.log(
f"Successfully unmounted {profile_name} at {base_dest_path}")
if self.app and hasattr(self.app, 'header_frame'):
self.app.header_frame.refresh_status()
else:
self.logger.log(
f"Failed to unmount {profile_name} at {base_dest_path}")
return success
# Do not clear password cache here, it might be needed by another profile
def unmount_all(self):
def unmount_all(self) -> bool:
self.logger.log(f"Unmounting all: {self.mounted_destinations}")
all_unmounted_successfully = True
# Create a copy of the set to avoid issues with modifying it while iterating
for base_path, profile_name in list(self.mounted_destinations):
self.unmount_and_reset_owner(
base_path, profile_name, force_unmap=True)
if not self.unmount_and_reset_owner(
base_path, profile_name, force_unmap=True):
all_unmounted_successfully = False
return all_unmounted_successfully
def _get_chown_command(self, mount_point: str, is_system: bool) -> str:
if not is_system:
@@ -343,7 +357,7 @@ mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"
f"Executing privileged command via runner: {runner_script_path}")
# Simplified logging to avoid complexity
self.logger.log(
f"Script content passed as argument:\n---\n{script_content}\n---")
f"""Script content passed as argument:\n---\n{script_content}\n---""")
result = subprocess.run(
command, capture_output=True, text=True, check=False)
@@ -358,7 +372,7 @@ mount \"/dev/mapper/{mapper_name}\" \"{mount_point}\"
return True
else:
self.logger.log(
f"Privileged script failed. Return code: {result.returncode}\nStderr: {result.stderr}\nStdout: {result.stdout}")
f"Privileged script failed. Return code: {result.returncode} Stderr: {result.stderr} Stdout: {result.stdout}")
return False
except Exception as e:
self.logger.log(

View File

@@ -324,6 +324,8 @@ class Msg:
"err_no_backup_selected": _("Please select a backup from the list."),
"err_unlock_failed": _("Failed to unlock the container. Please check the password and try again."),
"err_encrypted_not_mounted": _("Encrypted container is not unlocked. Please unlock it first from the header bar."),
"unmount_failed_title": _("Unmount Failed"),
"unmount_failed_message": _("Failed to unmount all encrypted drives. Please unmount them manually or try again."),
"confirm_user_restore_title": _("Confirm User Data Restore"),
"confirm_user_restore_msg": _("Do you really want to restore the backup of '{backup_name}' to its original location? Any newer files may be overwritten."),
"confirm_delete_title": _("Confirm Deletion"),

View File

@@ -11,6 +11,7 @@ from shared_libs.log_window import LogWindow
from shared_libs.logger import app_logger
from shared_libs.animated_icon import AnimatedIcon
from shared_libs.common_tools import IconManager
from shared_libs.message import MessageDialog
from core.config_manager import ConfigManager
from core.backup_manager import BackupManager
from core.pbp_app_config import AppConfig, Msg
@@ -540,7 +541,17 @@ class MainApplication(tk.Tk):
def on_closing(self):
self.config_manager.set_setting(
"refresh_log", self.refresh_log_var.get())
self.backup_manager.encryption_manager.unmount_all()
# Attempt to unmount all encrypted drives
unmount_success = self.backup_manager.encryption_manager.unmount_all()
# Check if any drives are still mounted after the attempt
if not unmount_success and self.backup_manager.encryption_manager.mounted_destinations:
app_logger.log(
"WARNING: Not all encrypted drives could be unmounted. Preventing application closure.")
MessageDialog(
message_type="error", title=Msg.STR["unmount_failed_title"], text=Msg.STR["unmount_failed_message"]).show()
return # Prevent application from closing
self.config_manager.set_setting("last_mode", self.mode)

View File

@@ -69,7 +69,7 @@ class HeaderFrame(tk.Frame):
self.refresh_status()
def refresh_status(self):
"""Checks the keyring status based on the current destination and updates the label."""
"""Checks the keyring and mount status based on the current destination and updates the label."""
app_logger.log("HeaderFrame: Refreshing status...")
dest_path = self.app.destination_path
app_logger.log(f"HeaderFrame: Destination path is '{dest_path}'")
@@ -92,41 +92,35 @@ class HeaderFrame(tk.Frame):
username = os.path.basename(dest_path.rstrip('/'))
app_logger.log(f"HeaderFrame: Username is '{username}'")
is_mounted = self.encryption_manager.is_mounted(dest_path, profile_name)
is_mounted = self.encryption_manager.is_mounted(
dest_path, profile_name)
app_logger.log(f"HeaderFrame: Is mounted? {is_mounted}")
if is_mounted:
status_text = "Key: In Use"
auth_method = getattr(self.encryption_manager, 'auth_method', None)
if auth_method == 'keyring':
status_text += " (Keyring)"
elif auth_method == 'keyfile':
status_text += " (Keyfile)"
self.keyring_status_label.config(
text=status_text,
fg="#2E8B57" # SeaGreen
)
else:
key_in_keyring = self.encryption_manager.is_key_in_keyring(
username)
app_logger.log(f"HeaderFrame: Key in keyring? {key_in_keyring}")
key_file_exists = os.path.exists(
self.encryption_manager.get_key_file_path(dest_path, profile_name))
app_logger.log(f"HeaderFrame: Key file exists? {key_file_exists}")
status_text = ""
fg_color = ""
if key_in_keyring:
self.keyring_status_label.config(
text="Key: Available (Keyring)",
fg="#FFD700" # Gold
)
elif key_file_exists:
self.keyring_status_label.config(
text="Key: Available (Keyfile)",
fg="#FFD700" # Gold
)
else:
self.keyring_status_label.config(
text="Key: Not Available",
fg="#A9A9A9" # DarkGray
)
if is_mounted:
status_text = "Status: Mounted"
fg_color = "#6bbbff" # LightBlue
else:
status_text = "Status: Not Mounted"
fg_color = "#eb7f11" # Orange
key_in_keyring = self.encryption_manager.is_key_in_keyring(username)
key_file_exists = os.path.exists(
self.encryption_manager.get_key_file_path(dest_path, profile_name))
if key_in_keyring:
status_text += " (Keyring Available)"
elif key_file_exists:
status_text += " (Keyfile Available)"
else:
status_text += " (Key Not Available)"
if not is_mounted: # If not mounted and key not available, make it more prominent
fg_color = "#DC143C" # Crimson
self.keyring_status_label.config(
text=status_text,
fg=fg_color
)
app_logger.log("HeaderFrame: Status refresh complete.")