fix path to encryption container
This commit is contained in:
@@ -12,6 +12,7 @@ from typing import Optional
|
||||
from pbp_app_config import AppConfig
|
||||
from pyimage_ui.password_dialog import PasswordDialog
|
||||
|
||||
|
||||
class EncryptionManager:
|
||||
def __init__(self, logger, app=None):
|
||||
try:
|
||||
@@ -30,39 +31,46 @@ class EncryptionManager:
|
||||
|
||||
def create_and_add_key_file(self, base_dest_path: str, password: str) -> Optional[str]:
|
||||
"""Creates a new key file and adds it as a valid key to the LUKS container."""
|
||||
self.logger.log(f"Attempting to create and add key file for {base_dest_path}")
|
||||
self.logger.log(
|
||||
f"Attempting to create and add key file for {base_dest_path}")
|
||||
pybackup_dir = os.path.join(base_dest_path, "pybackup")
|
||||
container_path = os.path.join(pybackup_dir, "pybackup_encrypted.luks")
|
||||
container_path = os.path.join(
|
||||
pybackup_dir, "encrypted/pybackup_encrypted.luks")
|
||||
key_file_path = self.get_key_file_path(base_dest_path)
|
||||
|
||||
if not os.path.exists(container_path):
|
||||
self.logger.log(f"Container does not exist at {container_path}. Cannot add key file.")
|
||||
self.logger.log(
|
||||
f"Container does not exist at {container_path}. Cannot add key file.")
|
||||
return None
|
||||
|
||||
if os.path.exists(key_file_path):
|
||||
self.logger.log(f"Key file already exists at {key_file_path}. Aborting.")
|
||||
self.logger.log(
|
||||
f"Key file already exists at {key_file_path}. Aborting.")
|
||||
return key_file_path
|
||||
|
||||
# Create a temporary file for the new key
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False, prefix="temp_keyfile_") as tmp_keyfile:
|
||||
tmp_keyfile_path = tmp_keyfile.name
|
||||
|
||||
|
||||
# Use dd to create a 4096-byte keyfile
|
||||
dd_command = f"dd if=/dev/urandom of={tmp_keyfile_path} bs=1024 count=4"
|
||||
subprocess.run(dd_command, shell=True, check=True, capture_output=True)
|
||||
subprocess.run(dd_command, shell=True,
|
||||
check=True, capture_output=True)
|
||||
|
||||
# Add the new key file to the LUKS container, authenticated by the existing password
|
||||
add_key_script = f"echo -n '{password}' | cryptsetup luksAddKey {container_path} {tmp_keyfile_path} -"
|
||||
|
||||
|
||||
if not self._execute_as_root(add_key_script):
|
||||
self.logger.log("Failed to add new key file to LUKS container.")
|
||||
self.logger.log(
|
||||
"Failed to add new key file to LUKS container.")
|
||||
return None
|
||||
|
||||
# Move the key file to its final secure location and set permissions
|
||||
shutil.move(tmp_keyfile_path, key_file_path)
|
||||
os.chmod(key_file_path, stat.S_IRUSR) # Read-only for user
|
||||
self.logger.log(f"Successfully created and added key file: {key_file_path}")
|
||||
os.chmod(key_file_path, stat.S_IRUSR) # Read-only for user
|
||||
self.logger.log(
|
||||
f"Successfully created and added key file: {key_file_path}")
|
||||
return key_file_path
|
||||
|
||||
except Exception as e:
|
||||
@@ -116,15 +124,16 @@ class EncryptionManager:
|
||||
if password:
|
||||
self.session_password = password
|
||||
return password
|
||||
|
||||
dialog = PasswordDialog(self.app, title=f"Enter password for {username}", confirm=confirm)
|
||||
|
||||
dialog = PasswordDialog(
|
||||
self.app, title=f"Enter password for {username}", confirm=confirm)
|
||||
password, save_to_keyring = dialog.get_password()
|
||||
if password and save_to_keyring:
|
||||
self.set_password_in_keyring(username, password)
|
||||
|
||||
|
||||
if password:
|
||||
self.session_password = password
|
||||
|
||||
|
||||
return password
|
||||
|
||||
def is_container_mounted(self, base_dest_path: str) -> bool:
|
||||
@@ -133,16 +142,20 @@ class EncryptionManager:
|
||||
return os.path.ismount(mount_point)
|
||||
|
||||
def unlock_container(self, base_dest_path: str, password: Optional[str] = None, key_file: Optional[str] = None) -> Optional[str]:
|
||||
self.logger.log(f"Attempting to unlock encrypted container for base path {base_dest_path}")
|
||||
|
||||
self.logger.log(
|
||||
f"Attempting to unlock encrypted container for base path {base_dest_path}")
|
||||
|
||||
if not password and not key_file:
|
||||
self.logger.log("Unlock failed: Either password or key_file must be provided.")
|
||||
self.logger.log(
|
||||
"Unlock failed: Either password or key_file must be provided.")
|
||||
return None
|
||||
|
||||
pybackup_dir = os.path.join(base_dest_path, "pybackup")
|
||||
container_path = os.path.join(pybackup_dir, "pybackup_encrypted.luks")
|
||||
container_path = os.path.join(
|
||||
pybackup_dir, "encrypted/pybackup_encrypted.luks")
|
||||
if not os.path.exists(container_path):
|
||||
self.logger.log(f"Encrypted container not found at {container_path}")
|
||||
self.logger.log(
|
||||
f"Encrypted container not found at {container_path}")
|
||||
return None
|
||||
|
||||
mapper_name = f"pybackup_{os.path.basename(base_dest_path.rstrip('/'))}"
|
||||
@@ -154,7 +167,7 @@ class EncryptionManager:
|
||||
|
||||
if password:
|
||||
auth_part = f"echo -n '{password}' | cryptsetup luksOpen {container_path} {mapper_name} -"
|
||||
else: # key_file is provided
|
||||
else: # key_file is provided
|
||||
auth_part = f"cryptsetup luksOpen {container_path} {mapper_name} --key-file {key_file}"
|
||||
|
||||
script = f"""
|
||||
@@ -163,11 +176,13 @@ class EncryptionManager:
|
||||
mount /dev/mapper/{mapper_name} {mount_point}
|
||||
"""
|
||||
if not self._execute_as_root(script):
|
||||
self.logger.log("Failed to unlock existing encrypted container. Check password/key file or permissions.")
|
||||
self.logger.log(
|
||||
"Failed to unlock existing encrypted container. Check password/key file or permissions.")
|
||||
self.cleanup_encrypted_backup(base_dest_path)
|
||||
return None
|
||||
|
||||
self.logger.log(f"Encrypted container unlocked and mounted at {mount_point}")
|
||||
|
||||
self.logger.log(
|
||||
f"Encrypted container unlocked and mounted at {mount_point}")
|
||||
return mount_point
|
||||
|
||||
def lock_container(self, base_dest_path: str):
|
||||
@@ -175,7 +190,7 @@ class EncryptionManager:
|
||||
|
||||
def setup_encrypted_backup(self, queue, base_dest_path: str, size_gb: int, password: Optional[str] = None, key_file: Optional[str] = None) -> Optional[str]:
|
||||
self.logger.log(f"Setting up encrypted container at {base_dest_path}")
|
||||
|
||||
|
||||
if not shutil.which("cryptsetup"):
|
||||
self.logger.log("Error: cryptsetup is not installed.")
|
||||
queue.put(('error', "cryptsetup is not installed."))
|
||||
@@ -188,23 +203,27 @@ class EncryptionManager:
|
||||
|
||||
if not password and not key_file:
|
||||
self.logger.log("No password or key file provided for encryption.")
|
||||
queue.put(('error', "No password or key file provided for encryption."))
|
||||
queue.put(
|
||||
('error', "No password or key file provided for encryption."))
|
||||
return None
|
||||
|
||||
if os.path.ismount(mount_point):
|
||||
self.logger.log(f"Mount point {mount_point} already exists. Cleaning up before proceeding.")
|
||||
self.logger.log(
|
||||
f"Mount point {mount_point} already exists. Cleaning up before proceeding.")
|
||||
self.cleanup_encrypted_backup(base_dest_path)
|
||||
|
||||
if os.path.exists(container_path):
|
||||
self.logger.log(f"Encrypted container {container_path} already exists. Attempting to unlock.")
|
||||
self.logger.log(
|
||||
f"Encrypted container {container_path} already exists. Attempting to unlock.")
|
||||
return self.unlock_container(base_dest_path, password=password, key_file=key_file)
|
||||
else:
|
||||
self.logger.log(f"Creating new encrypted container: {container_path}")
|
||||
|
||||
self.logger.log(
|
||||
f"Creating new encrypted container: {container_path}")
|
||||
|
||||
if password:
|
||||
format_auth_part = f"echo -n '{password}' | cryptsetup luksFormat {container_path} -"
|
||||
open_auth_part = f"echo -n '{password}' | cryptsetup luksOpen {container_path} {mapper_name} -"
|
||||
else: # key_file is provided
|
||||
else: # key_file is provided
|
||||
format_auth_part = f"cryptsetup luksFormat {container_path} --key-file {key_file}"
|
||||
open_auth_part = f"cryptsetup luksOpen {container_path} {mapper_name} --key-file {key_file}"
|
||||
|
||||
@@ -217,16 +236,18 @@ class EncryptionManager:
|
||||
mkfs.ext4 /dev/mapper/{mapper_name}
|
||||
mount /dev/mapper/{mapper_name} {mount_point}
|
||||
"""
|
||||
|
||||
|
||||
if not self._execute_as_root(script):
|
||||
self.logger.log("Failed to create and setup encrypted container.")
|
||||
self.logger.log(
|
||||
"Failed to create and setup encrypted container.")
|
||||
self.cleanup_encrypted_backup(base_dest_path)
|
||||
if os.path.exists(container_path):
|
||||
self._execute_as_root(f"rm -f {container_path}")
|
||||
queue.put(('error', "Failed to setup encrypted container."))
|
||||
return None
|
||||
|
||||
self.logger.log(f"Encrypted container is ready and mounted at {mount_point}")
|
||||
self.logger.log(
|
||||
f"Encrypted container is ready and mounted at {mount_point}")
|
||||
return mount_point
|
||||
|
||||
def cleanup_encrypted_backup(self, base_dest_path: str):
|
||||
@@ -255,7 +276,8 @@ class EncryptionManager:
|
||||
|
||||
command = ['pkexec', script_path]
|
||||
|
||||
sanitized_script_content = re.sub(r"echo -n '.*?'", "echo -n '[REDACTED]'", script_content)
|
||||
sanitized_script_content = re.sub(
|
||||
r"echo -n '.*?'", "echo -n '[REDACTED]'", script_content)
|
||||
self.logger.log(
|
||||
f"Executing privileged command via script: {script_path}")
|
||||
self.logger.log(
|
||||
@@ -278,4 +300,4 @@ class EncryptionManager:
|
||||
return False
|
||||
finally:
|
||||
if script_path and os.path.exists(script_path):
|
||||
os.remove(script_path)
|
||||
os.remove(script_path)
|
||||
|
||||
23
core/user_utils.py
Normal file
23
core/user_utils.py
Normal file
@@ -0,0 +1,23 @@
|
||||
import os
|
||||
import pwd
|
||||
|
||||
|
||||
def is_admin_user() -> bool:
|
||||
"""Checks if the current user is a member of the 'sudo' or 'wheel' group."""
|
||||
try:
|
||||
# Get current user's groups
|
||||
groups = [g.gr_name for g in pwd.getpwall() if os.getuid() in g.gr_mem]
|
||||
# Also get groups current user is in by gid
|
||||
groups.extend([g.gr_name for g in pwd.getgrall()
|
||||
if os.getgid() == g.gr_gid])
|
||||
|
||||
# Common admin groups
|
||||
admin_groups = ["sudo", "wheel", "admin"]
|
||||
|
||||
for group in admin_groups:
|
||||
if group in groups:
|
||||
return True
|
||||
return False
|
||||
except Exception:
|
||||
# Fallback if group lookup fails for some reason
|
||||
return False
|
||||
@@ -8,6 +8,7 @@ from pyimage_ui.user_backup_content_frame import UserBackupContentFrame
|
||||
from shared_libs.logger import app_logger
|
||||
from shared_libs.message import MessageDialog
|
||||
|
||||
|
||||
class BackupContentFrame(ttk.Frame):
|
||||
def __init__(self, master, backup_manager, actions, app, **kwargs):
|
||||
super().__init__(master, **kwargs)
|
||||
@@ -58,32 +59,41 @@ class BackupContentFrame(ttk.Frame):
|
||||
content_container.grid_rowconfigure(0, weight=1)
|
||||
content_container.grid_columnconfigure(0, weight=1)
|
||||
|
||||
self.system_backups_frame = SystemBackupContentFrame(content_container, backup_manager, actions, parent_view=self)
|
||||
self.user_backups_frame = UserBackupContentFrame(content_container, backup_manager, actions, parent_view=self)
|
||||
self.system_backups_frame = SystemBackupContentFrame(
|
||||
content_container, backup_manager, actions, parent_view=self)
|
||||
self.user_backups_frame = UserBackupContentFrame(
|
||||
content_container, backup_manager, actions, parent_view=self)
|
||||
self.system_backups_frame.grid(row=0, column=0, sticky=tk.NSEW)
|
||||
self.user_backups_frame.grid(row=0, column=0, sticky=tk.NSEW)
|
||||
|
||||
action_button_frame = ttk.Frame(self, padding=10)
|
||||
action_button_frame.grid(row=2, column=0, sticky="ew")
|
||||
|
||||
self.toggle_encrypted_button = ttk.Button(action_button_frame, text=Msg.STR["show_encrypted_backups"], command=self._toggle_encrypted_view)
|
||||
self.toggle_encrypted_button = ttk.Button(
|
||||
action_button_frame, text=Msg.STR["show_encrypted_backups"], command=self._toggle_encrypted_view)
|
||||
self.toggle_encrypted_button.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
self.restore_button = ttk.Button(action_button_frame, text=Msg.STR["restore"], command=self._restore_selected, state="disabled")
|
||||
self.restore_button = ttk.Button(
|
||||
action_button_frame, text=Msg.STR["restore"], command=self._restore_selected, state="disabled")
|
||||
self.restore_button.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
self.delete_button = ttk.Button(action_button_frame, text=Msg.STR["delete"], command=self._delete_selected, state="disabled")
|
||||
self.delete_button = ttk.Button(
|
||||
action_button_frame, text=Msg.STR["delete"], command=self._delete_selected, state="disabled")
|
||||
self.delete_button.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
self.edit_comment_button = ttk.Button(action_button_frame, text=Msg.STR["comment"], command=self._edit_comment, state="disabled")
|
||||
self.edit_comment_button = ttk.Button(
|
||||
action_button_frame, text=Msg.STR["comment"], command=self._edit_comment, state="disabled")
|
||||
self.edit_comment_button.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
self._switch_view(0)
|
||||
|
||||
def update_button_state(self, is_selected):
|
||||
self.restore_button.config(state="normal" if is_selected else "disabled")
|
||||
self.delete_button.config(state="normal" if is_selected else "disabled")
|
||||
self.edit_comment_button.config(state="normal" if is_selected else "disabled")
|
||||
self.restore_button.config(
|
||||
state="normal" if is_selected else "disabled")
|
||||
self.delete_button.config(
|
||||
state="normal" if is_selected else "disabled")
|
||||
self.edit_comment_button.config(
|
||||
state="normal" if is_selected else "disabled")
|
||||
|
||||
def _get_active_subframe(self):
|
||||
return self.system_backups_frame if self.current_view_index == 0 else self.user_backups_frame
|
||||
@@ -99,27 +109,34 @@ class BackupContentFrame(ttk.Frame):
|
||||
|
||||
def _toggle_encrypted_view(self):
|
||||
if not self.app.destination_path:
|
||||
MessageDialog(master=self.app, message_type="info", title=Msg.STR["info_menu"], text=Msg.STR["err_no_dest_folder"])
|
||||
MessageDialog(master=self.app, message_type="info",
|
||||
title=Msg.STR["info_menu"], text=Msg.STR["err_no_dest_folder"])
|
||||
return
|
||||
|
||||
if not self.viewing_encrypted:
|
||||
username = os.path.basename(self.app.destination_path.rstrip('/'))
|
||||
password = self.app.backup_manager.encryption_manager.get_password(username, confirm=False)
|
||||
password = self.app.backup_manager.encryption_manager.get_password(
|
||||
username, confirm=False)
|
||||
if not password:
|
||||
return
|
||||
|
||||
mount_point = self.app.backup_manager.encryption_manager.unlock_container(self.app.destination_path, password)
|
||||
mount_point = self.app.backup_manager.encryption_manager.unlock_container(
|
||||
self.app.destination_path, password)
|
||||
if mount_point:
|
||||
self.viewing_encrypted = True
|
||||
self.toggle_encrypted_button.config(text=Msg.STR["show_normal_backups"])
|
||||
self.toggle_encrypted_button.config(
|
||||
text=Msg.STR["show_normal_backups"])
|
||||
self.show(mount_point)
|
||||
self.app.header_frame.refresh_status()
|
||||
else:
|
||||
MessageDialog(master=self.app, message_type="error", title=Msg.STR["error"], text=Msg.STR["err_unlock_failed"])
|
||||
MessageDialog(master=self.app, message_type="error",
|
||||
title=Msg.STR["error"], text=Msg.STR["err_unlock_failed"])
|
||||
else:
|
||||
self.app.backup_manager.encryption_manager.lock_container(self.app.destination_path)
|
||||
self.app.backup_manager.encryption_manager.lock_container(
|
||||
self.app.destination_path)
|
||||
self.viewing_encrypted = False
|
||||
self.toggle_encrypted_button.config(text=Msg.STR["show_encrypted_backups"])
|
||||
self.toggle_encrypted_button.config(
|
||||
text=Msg.STR["show_encrypted_backups"])
|
||||
self.show(self.app.destination_path)
|
||||
self.app.header_frame.refresh_status()
|
||||
|
||||
@@ -148,9 +165,10 @@ class BackupContentFrame(ttk.Frame):
|
||||
self.nav_progress_bars[i].pack_forget()
|
||||
|
||||
def show(self, backup_path):
|
||||
app_logger.log(f"BackupContentFrame: show called with path {backup_path}")
|
||||
app_logger.log(
|
||||
f"BackupContentFrame: show called with path {backup_path}")
|
||||
self.grid(row=2, column=0, sticky="nsew")
|
||||
|
||||
|
||||
self.base_backup_path = backup_path
|
||||
|
||||
if self.viewing_encrypted:
|
||||
@@ -159,15 +177,16 @@ class BackupContentFrame(ttk.Frame):
|
||||
actual_backup_path = os.path.join(backup_path, "pybackup")
|
||||
|
||||
if not os.path.isdir(actual_backup_path):
|
||||
app_logger.log(f"Backup path {actual_backup_path} does not exist or is not a directory.")
|
||||
app_logger.log(
|
||||
f"Backup path {actual_backup_path} does not exist or is not a directory.")
|
||||
# Clear views if path is invalid
|
||||
self.system_backups_frame.show(None)
|
||||
self.system_backups_frame.show(None)
|
||||
self.user_backups_frame.show(None)
|
||||
return
|
||||
|
||||
self.system_backups_frame.show(actual_backup_path)
|
||||
self.user_backups_frame.show(actual_backup_path)
|
||||
|
||||
|
||||
config_key = "last_encrypted_backup_content_view" if self.viewing_encrypted else "last_backup_content_view"
|
||||
last_view = self.app.config_manager.get_setting(config_key, 0)
|
||||
self._switch_view(last_view)
|
||||
self._switch_view(last_view)
|
||||
|
||||
Reference in New Issue
Block a user