new script vor encryption manager and add combobox fo encrypt profiles part one

This commit is contained in:
2025-09-13 18:29:44 +02:00
parent eb970733bc
commit 41d63743c1
7 changed files with 534 additions and 189 deletions

View File

@@ -215,13 +215,13 @@ class BackupManager:
status = 'success' if return_code == 0 else 'warning' if return_code in [
23, 24] else 'cancelled' if return_code in [143, -15, 15, -9] else 'error'
if status in ['success', 'warning'] and not is_dry_run:
if mode == 'incremental' and latest_full_backup_path:
if mode == 'incremental' and latest_backup_path:
if is_system:
final_size = self._get_incremental_size_system(
rsync_dest)
else:
final_size = self._get_incremental_size_user(
rsync_dest, latest_full_backup_path)
rsync_dest, latest_backup_path)
else:
final_size = self._get_directory_size(rsync_dest)
self._create_info_json(
@@ -233,7 +233,7 @@ class BackupManager:
size_bytes=final_size,
is_encrypted=is_encrypted,
based_on=os.path.basename(
latest_full_backup_path) if latest_full_backup_path and mode == 'incremental' else None
latest_backup_path) if latest_backup_path and mode == 'incremental' else None
)
queue.put(
('completion', {'status': status, 'returncode': return_code}))
@@ -383,13 +383,15 @@ class BackupManager:
f"Failed to calculate incremental system backup size: {e}")
return self._get_directory_size(inc_path)
def list_all_backups(self, base_dest_path: str, mount_if_needed: bool = True):
def list_all_backups(self, base_dest_path: str):
pybackup_dir = os.path.join(base_dest_path, "pybackup")
metadata_dir = os.path.join(pybackup_dir, "metadata")
if not os.path.isdir(metadata_dir):
return [], []
return {"system_backups": [], "user_backups": [], "encrypted_profiles": {}}
all_backups = []
encrypted_profiles = {}
for info_file_name in os.listdir(metadata_dir):
if not info_file_name.endswith(".json"):
continue
@@ -402,29 +404,33 @@ class BackupManager:
is_encrypted = info_data.get("is_encrypted", False)
is_system = info_data.get("backup_type") == "system"
source_name = info_data.get("source_name", "N/A")
backup_dir_name = info_file_name.replace(".json", "")
profile_name = "system" if is_system else source_name
if is_encrypted:
is_mounted = self.encryption_manager.is_mounted(
base_dest_path, profile_name)
if profile_name not in encrypted_profiles:
encrypted_profiles[profile_name] = {
"is_mounted": is_mounted,
"base_dest_path": base_dest_path,
"is_system": is_system
}
else:
is_mounted = False # Not relevant for unencrypted
profile_path = self._get_profile_path(
base_dest_path, is_system, source_name, is_encrypted)
backup_dir_name = info_file_name.replace(".json", "")
full_path = os.path.join(profile_path, backup_dir_name)
# For encrypted backups, only add them to the list if their container is mounted.
if is_encrypted and not is_mounted:
continue
if not os.path.isdir(full_path):
if not is_encrypted:
self.logger.log(
f"Metadata file found for {backup_dir_name} but data directory not found at {full_path}. Skipping.")
continue
profile_name = "system" if is_system else source_name
if not self.encryption_manager.is_mounted(base_dest_path, profile_name):
self.logger.log(
f"Mounting container for profile {profile_name} at {base_dest_path} to check for backup data...")
self.encryption_manager.prepare_encrypted_destination(
base_dest_path, profile_name, is_system, 0, self.app.queue if self.app else None)
if not os.path.isdir(full_path):
self.logger.log(
f"Data directory {full_path} still not found after mount attempt. Skipping.")
continue
self.logger.log(
f"Metadata file found for {backup_dir_name} but data directory not found at {full_path}. Skipping.")
continue
dt_obj = datetime.datetime.fromisoformat(
info_data["creation_date"])
@@ -511,7 +517,11 @@ class BackupManager:
for group in grouped_source_backups:
final_user_list.extend(group)
return final_system_list, final_user_list
return {
"system_backups": final_system_list,
"user_backups": final_user_list,
"encrypted_profiles": encrypted_profiles
}
def _find_latest_backup(self, profile_path: str, source_name: str) -> Optional[str]:
self.logger.log(

182
core/encryption_helper.sh Executable file
View File

@@ -0,0 +1,182 @@
#!/bin/bash
# Exit immediately if a command exits with a non-zero status.
set -e
# Print each command to stderr before executing.
set -x
# --- Logger ---
log() {
echo "[HELPER] $1" >&2
}
log "Helper script started. All arguments: $@"
# --- Argument Parsing ---
COMMAND="$1"
log "Attempting to read password from stdin..."
# Temporarily disable exit-on-error for the read command, as it can fail in non-interactive shells
set +e
# Read the LUKS password from stdin. The -s flag prevents it from being echoed.
read -s LUKSPASS
# Re-enable exit-on-error
set -e
log "Password read from stdin."
shift # Remove command from arguments
log "Executing command: $COMMAND"
# --- Command Functions ---
do_mount() {
local container_path="$1"
local mapper_name="$2"
local mount_point="$3"
local uid="$4"
local gid="$5"
local key_file_arg="$6"
log "Mounting $container_path"
umount "$mount_point" 2>/dev/null || true
cryptsetup luksClose "$mapper_name" 2>/dev/null || true
mkdir -p "$mount_point"
if [ -n "$LUKSPASS" ]; then
log "Opening LUKS container with password."
echo -n "$LUKSPASS" | cryptsetup luksOpen "$container_path" "$mapper_name" -
elif [ -n "$key_file_arg" ]; then
log "Opening LUKS container with keyfile: $key_file_arg"
cryptsetup luksOpen "$container_path" "$mapper_name" --key-file "$key_file_arg"
else
log "Mount command needs either a password or a keyfile."
exit 1
fi
mount "/dev/mapper/$mapper_name" "$mount_point"
log "Mounted on $mount_point"
if [ -n "$uid" ] && [ -n "$gid" ]; then
log "Changing owner of $mount_point to $uid:$gid"
chown "$uid:$gid" "$mount_point"
fi
}
do_unmount() {
local mount_point="$1"
local mapper_name="$2"
log "Unmounting $mount_point"
if mountpoint -q "$mount_point"; then
chown root:root "$mount_point"
umount "$mount_point"
fi
if [ -e "/dev/mapper/$mapper_name" ]; then
cryptsetup luksClose "$mapper_name"
fi
log "Unmount complete."
}
do_create() {
local container_path="$1"
local mount_point="$2"
local size_gb="$3"
local mapper_name="$4"
local uid="$5"
local gid="$6"
log "Creating new container $container_path with size ${size_gb}G"
mkdir -p "$(dirname "$container_path")"
mkdir -p "$mount_point"
truncate -s "${size_gb}G" "$container_path"
log "Formatting LUKS container."
echo -n "$LUKSPASS" | cryptsetup luksFormat "$container_path" -
log "Opening new LUKS container."
echo -n "$LUKSPASS" | cryptsetup luksOpen "$container_path" "$mapper_name" -
log "Creating filesystem."
mkfs.ext4 "/dev/mapper/$mapper_name"
sleep 1
mount "/dev/mapper/$mapper_name" "$mount_point"
log "Mounted on $mount_point"
if [ -n "$uid" ] && [ -n "$gid" ]; then
log "Changing owner of $mount_point to $uid:$gid"
chown "$uid:$gid" "$mount_point"
fi
}
do_resize() {
local container_path="$1"
local mapper_name="$2"
local mount_point="$3"
local new_total_size="$4"
local uid="$5"
local gid="$6"
local key_file_arg="$7"
log "Resizing container $container_path to $new_total_size bytes"
# 1. Unmount
do_unmount "$mount_point" "$mapper_name"
# 2. Resize container file
log "Truncating container file."
truncate -s "$new_total_size" "$container_path"
sleep 1
# 3. Re-open, check, and resize filesystem
log "Re-opening container to resize filesystem."
if [ -n "$LUKSPASS" ]; then
echo -n "$LUKSPASS" | cryptsetup luksOpen "$container_path" "$mapper_name" -
elif [ -n "$key_file_arg" ]; then
cryptsetup luksOpen "$container_path" "$mapper_name" --key-file "$key_file_arg"
else
log "Resize command needs either a password or a keyfile."
exit 1
fi
log "Resizing LUKS volume."
cryptsetup resize "$mapper_name"
log "Checking and resizing filesystem."
e2fsck -fy "/dev/mapper/$mapper_name"
resize2fs "/dev/mapper/$mapper_name"
# 4. Mount it again
log "Remounting container."
mount "/dev/mapper/$mapper_name" "$mount_point"
if [ -n "$uid" ] && [ -n "$gid" ]; then
log "Changing owner of $mount_point to $uid:$gid"
chown "$uid:$gid" "$mount_point"
fi
log "Resize and remount complete."
}
# --- Main Command Dispatcher ---
case "$COMMAND" in
mount)
do_mount "$@"
;;
unmount)
do_unmount "$@"
;;
create)
do_create "$@"
;;
resize)
do_resize "$@"
;;
*)
log "Unknown command: $COMMAND"
exit 1
;;
esac
exit 0

View File

@@ -5,6 +5,7 @@ import os
import shutil
import subprocess
import math
import shlex
from typing import Optional, Tuple
from core.pbp_app_config import AppConfig, Msg
@@ -120,24 +121,30 @@ class EncryptionManager:
os.remove(key_file_path)
return None
def _get_password_or_key_cmd(self, base_dest_path: str, profile_name: str, username: str) -> Tuple[str, Optional[str]]:
def _get_password_and_keyfile(self, base_dest_path: str, profile_name: str, username: str) -> Tuple[Optional[str], Optional[str]]:
"""Gets the password (from cache, keyring, or user) or the keyfile path."""
# 1. Try password from cache
password = self.password_cache.get(username)
if password:
return "-", password
return password, None
# 2. Try password from keyring
password = self.get_password_from_keyring(username)
if password:
self.password_cache[username] = password
return "-", password
return password, None
# 3. Check for key file
key_file_path = self.get_key_file_path(base_dest_path, profile_name)
if os.path.exists(key_file_path):
return f'--key-file "{key_file_path}"'
return None, key_file_path
# 4. Ask user for password
password = self.get_password(username, confirm=False)
if not password:
return "", None
return "-", password
if password:
return password, None
return None, None
def is_encrypted(self, base_dest_path: str, profile_name: str) -> bool:
return os.path.exists(self.get_container_path(base_dest_path, profile_name))
@@ -168,7 +175,7 @@ class EncryptionManager:
return None
free_space = shutil.disk_usage(mount_point).free
required_space = int(source_size * 1.15)
required_space = int(source_size * 1.10)
if required_space > free_space:
self.logger.log(
@@ -176,44 +183,33 @@ class EncryptionManager:
queue.put(
('status_update', f"Container für {profile_name} zu klein. Vergrößere..."))
key_or_pass_arg, password = self._get_password_or_key_cmd(
password, key_file_path = self._get_password_and_keyfile(
base_dest_path, profile_name, username)
if not key_or_pass_arg:
if not password and not key_file_path:
self.logger.log(
"Could not get password or keyfile to resize container.")
return None
current_total = shutil.disk_usage(mount_point).total
needed_additional = required_space - free_space
needed_additional = required_space - free_space - \
(4 * 1024 * 1024 * 1024) # (4 * 1024 * 1024 * 1024) = 4 GB
new_total_size = current_total + needed_additional
new_total_size = math.ceil(new_total_size / 4096) * 4096
container_path = self.get_container_path(
base_dest_path, profile_name)
mapper_name = f"pybackup_luks_{username}_{profile_name}"
chown_cmd = self._get_chown_command(mount_point, is_system)
luks_open_cmd = f'echo -n "$LUKSPASS" | cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}' if password else f'cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}'
uid, gid = self._get_chown_ids(is_system)
resize_script = f"""set -e
# Unmount cleanly first
umount "{mount_point}"
cryptsetup luksClose {mapper_name}
helper_path = os.path.join(os.path.dirname(
__file__), 'encryption_helper.sh')
# Resize container file
truncate -s {int(new_total_size)} "{container_path}"
sleep 1
script = f'"{helper_path}" resize "{container_path}" "{mapper_name}" "{mount_point}" "{int(new_total_size)}" "{uid or ''}" "{gid or ''}" "{key_file_path or ''}"'
# Re-open, check, and resize filesystem
{luks_open_cmd}
cryptsetup resize {mapper_name}
e2fsck -fy "/dev/mapper/{mapper_name}"
resize2fs "/dev/mapper/{mapper_name}"
# Now mount it
mount "/dev/mapper/{mapper_name}" "{mount_point}"
{chown_cmd}
"""
if not self._execute_as_root(resize_script, password):
if not self._execute_as_root(script, password):
self.logger.log("Failed to execute resize and remount script.")
# Attempt to remount in the old state if resize fails
self._open_and_mount(
base_dest_path, profile_name, is_system, password_override=password)
return mount_point
@@ -224,7 +220,7 @@ mount "/dev/mapper/{mapper_name}" "{mount_point}"
def _handle_new_container(self, base_dest_path: str, profile_name: str, is_system: bool, source_size: int, queue) -> Optional[str]:
self.logger.log(
f"Handling new container creation for profile: {profile_name}")
size_gb = math.ceil((source_size * 1.2) / (1024**3)) + 5
size_gb = math.ceil((source_size * 1.2) / (1024**3)) + 2
username = os.path.basename(base_dest_path.rstrip('/'))
password = self.get_password(username, confirm=True)
if not password:
@@ -233,18 +229,12 @@ mount "/dev/mapper/{mapper_name}" "{mount_point}"
container_path = self.get_container_path(base_dest_path, profile_name)
mount_point = self.get_mount_point(base_dest_path, profile_name)
mapper_name = f"pybackup_luks_{username}_{profile_name}"
chown_cmd = self._get_chown_command(mount_point, is_system)
uid, gid = self._get_chown_ids(is_system)
helper_path = os.path.join(os.path.dirname(__file__), 'encryption_helper.sh')
script = f'"{helper_path}" create "{container_path}" "{mount_point}" "{int(size_gb)}" "{mapper_name}" "{uid or ''}" "{gid or ''}"'
script = f"""set -e
mkdir -p "{os.path.dirname(container_path)}"
mkdir -p "{mount_point}"
truncate -s {int(size_gb)}G "{container_path}"
echo -n "$LUKSPASS" | cryptsetup luksFormat "{container_path}" -
echo -n "$LUKSPASS" | cryptsetup luksOpen "{container_path}" {mapper_name} -
mkfs.ext4 "/dev/mapper/{mapper_name}"
sleep 1
mount "/dev/mapper/{mapper_name}" "{mount_point}"
{chown_cmd}"""
if not self._execute_as_root(script, password):
return None
@@ -253,32 +243,31 @@ mount "/dev/mapper/{mapper_name}" "{mount_point}"
def _open_and_mount(self, base_dest_path: str, profile_name: str, is_system: bool, password_override: Optional[str] = None) -> bool:
username = os.path.basename(base_dest_path.rstrip('/'))
key_or_pass_arg, password = "", None
password, key_file_path = None, None
if password_override:
password = password_override
key_or_pass_arg = "-"
else:
key_or_pass_arg, password = self._get_password_or_key_cmd(
password, key_file_path = self._get_password_and_keyfile(
base_dest_path, profile_name, username)
if not key_or_pass_arg:
if not password and not key_file_path:
self.logger.log(
"Could not get password or keyfile to mount container.")
return False
container_path = self.get_container_path(base_dest_path, profile_name)
mount_point = self.get_mount_point(base_dest_path, profile_name)
mapper_name = f"pybackup_luks_{username}_{profile_name}"
chown_cmd = self._get_chown_command(mount_point, is_system)
luks_open_cmd = f'echo -n "$LUKSPASS" | cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}' if password else f'cryptsetup luksOpen "{container_path}" {mapper_name} {key_or_pass_arg}'
uid, gid = self._get_chown_ids(is_system)
helper_path = os.path.join(os.path.dirname(__file__), 'encryption_helper.sh')
script = f'"{helper_path}" mount "{container_path}" "{mapper_name}" "{mount_point}" "{uid or ''}" "{gid or ''}" "{key_file_path or ''}"'
script = f"""set -e
umount "{mount_point}" || true
cryptsetup luksClose {mapper_name} || true
mkdir -p "{mount_point}"
{luks_open_cmd}
mount "/dev/mapper/{mapper_name}" "{mount_point}"
{chown_cmd}"""
if self._execute_as_root(script, password):
self.add_to_lock_file(base_dest_path, profile_name, mapper_name)
self.mounted_destinations.add((base_dest_path, profile_name))
if self.app and hasattr(self.app, 'header_frame'):
self.app.header_frame.refresh_status()
return True
@@ -287,19 +276,24 @@ mount "/dev/mapper/{mapper_name}" "{mount_point}"
def unmount_and_reset_owner(self, base_dest_path: str, profile_name: str, force_unmap=False) -> bool:
username = os.path.basename(base_dest_path.rstrip('/'))
mapper_name = f"pybackup_luks_{username}_{profile_name}"
if not os.path.exists(f"/dev/mapper/{mapper_name}") and not self.is_mounted(base_dest_path, profile_name):
mount_point = self.get_mount_point(base_dest_path, profile_name)
if not os.path.exists(f"/dev/mapper/{mapper_name}") and not os.path.ismount(mount_point):
if not force_unmap:
return True # Already unmounted or not present, consider it successful
self.logger.log(
f"Container {profile_name} at {base_dest_path} is already unmounted.")
return True
self.logger.log(
f"Unmounting and resetting owner for {base_dest_path}/{profile_name}")
mount_point = self.get_mount_point(base_dest_path, profile_name)
script = f"""chown root:root "{mount_point}"
umount "{mount_point}"
cryptsetup luksClose {mapper_name}
"""
helper_path = os.path.join(os.path.dirname(__file__), 'encryption_helper.sh')
# The unmount command doesn't need the LUKS password, but pkexec might need the user's password.
password = self.password_cache.get(username)
script = f'"{helper_path}" unmount "{mount_point}" "{mapper_name}"'
success = self._execute_as_root(script, password)
if success:
self.remove_from_lock_file(mapper_name)
@@ -314,7 +308,6 @@ mount "/dev/mapper/{mapper_name}" "{mount_point}"
self.logger.log(
f"Failed to unmount {profile_name} at {base_dest_path}")
return success
# Do not clear password cache here, it might be needed by another profile
def unmount_all(self) -> bool:
self.logger.log(f"Unmounting all: {self.mounted_destinations}")
@@ -326,53 +319,46 @@ mount "/dev/mapper/{mapper_name}" "{mount_point}"
all_unmounted_successfully = False
return all_unmounted_successfully
def _get_chown_command(self, mount_point: str, is_system: bool) -> str:
def _get_chown_ids(self, is_system: bool) -> Tuple[Optional[int], Optional[int]]:
if not is_system:
try:
uid = os.getuid()
gid = os.getgid()
return f'chown {uid}:{gid} "{mount_point}"'
return os.getuid(), os.getgid()
except Exception as e:
self.logger.log(
f"Could not get current user UID/GID for chown: {e}")
return ""
return None, None
def _execute_as_root(self, script_content: str, password_for_stdin: Optional[str] = None) -> bool:
def _execute_as_root(self, script_content: str, password_for_luks: Optional[str] = None) -> bool:
try:
password = password_for_stdin if password_for_stdin is not None else ""
command = ['pkexec', '/bin/bash', '-c', script_content]
project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..'))
runner_script_path = os.path.join(
project_root, 'core', 'privileged_script_runner.sh')
self.logger.log(f"Executing privileged command: {' '.join(command)}")
self.logger.log(f"Script content passed to bash -c: {script_content}")
if not os.path.exists(runner_script_path):
self.logger.log(
f"CRITICAL: Privileged script runner not found at {runner_script_path}")
return False
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding='utf-8'
)
command = ['pkexec', runner_script_path, password, script_content]
# Write the LUKS password to the script's stdin, which the helper script now reads.
luks_password_input = password_for_luks if password_for_luks is not None else ""
stdout, stderr = process.communicate(input=luks_password_input)
self.logger.log(
f"Executing privileged command via runner: {runner_script_path}")
# Simplified logging to avoid complexity
self.logger.log(
f"""Script content passed as argument:\n---\n{script_content}\n---""")
result = subprocess.run(
command, capture_output=True, text=True, check=False)
if result.returncode == 0:
log_output = ("Privileged script executed successfully.")
if result.stdout:
log_output += f"\nStdout:\n{result.stdout}"
if result.stderr:
log_output += f"\nStderr:\n{result.stderr}"
if process.returncode == 0:
log_output = "Privileged script executed successfully."
if stdout:
log_output += f"\nStdout:\n{stdout}"
if stderr:
log_output += f"\nStderr:\n{stderr}"
self.logger.log(log_output)
return True
else:
self.logger.log(
f"Privileged script failed. Return code: {result.returncode} Stderr: {result.stderr} Stdout: {result.stdout}")
f"Privileged script failed. Return code: {process.returncode}\nStderr: {stderr}\nStdout: {stdout}")
return False
except Exception as e:
self.logger.log(

View File

@@ -1,10 +0,0 @@
#!/bin/bash
# This script executes commands passed as arguments.
# The 'set -e' command ensures that the script will exit immediately if any command fails.
set -e
# The password is the first argument, script content is the second.
export LUKSPASS="$1"
SCRIPT_TO_RUN="$2"
/bin/bash -c "$SCRIPT_TO_RUN"

View File

@@ -6,6 +6,7 @@ from core.pbp_app_config import Msg
from pyimage_ui.system_backup_content_frame import SystemBackupContentFrame
from pyimage_ui.user_backup_content_frame import UserBackupContentFrame
from shared_libs.logger import app_logger
from shared_libs.message import MessageDialog
class BackupContentFrame(ttk.Frame):
@@ -18,7 +19,7 @@ class BackupContentFrame(ttk.Frame):
self.base_backup_path = None
self.current_view_index = 0
self.viewing_encrypted = False
self.encrypted_profiles = {}
self.grid_rowconfigure(1, weight=1)
self.grid_columnconfigure(0, weight=1)
@@ -94,8 +95,47 @@ class BackupContentFrame(ttk.Frame):
action_button_frame, text=Msg.STR["comment"], command=self._edit_comment, state="disabled")
self.edit_comment_button.pack(side=tk.LEFT, padx=5)
# Mount controls for encrypted profiles, now at the bottom
self.mount_frame = ttk.Frame(action_button_frame)
self.mount_frame.pack(side=tk.RIGHT, padx=10)
mount_label = ttk.Label(self.mount_frame, text="Encrypted Profile:")
mount_label.pack(side=tk.LEFT, padx=(0, 5))
self.profile_combobox = ttk.Combobox(
self.mount_frame, state="readonly", width=20)
self.profile_combobox.pack(side=tk.LEFT)
self.mount_button = ttk.Button(
self.mount_frame, text="Mount", command=self._mount_selected_profile)
self.mount_button.pack(side=tk.LEFT, padx=5)
self._switch_view(0)
def _mount_selected_profile(self):
profile_name = self.profile_combobox.get()
if not profile_name:
return
profile_data = self.encrypted_profiles.get(profile_name)
if not profile_data:
return
self.app.config(cursor="watch")
self.update()
success = self.backup_manager.encryption_manager._open_and_mount(
base_dest_path=profile_data['base_dest_path'],
profile_name=profile_name,
is_system=profile_data['is_system']
)
self.app.config(cursor="")
if success:
# Refresh the view
self.show(self.base_backup_path, self.current_view_index)
else:
MessageDialog(message_type="error", title="Error",
text=f"Failed to mount profile '{profile_name}'.\nPlease check the password and try again.").show()
def update_button_state(self, is_selected):
self.restore_button.config(
state="normal" if is_selected else "disabled")
@@ -142,39 +182,35 @@ class BackupContentFrame(ttk.Frame):
app_logger.log(
f"BackupContentFrame: show called with path {backup_path}")
self.grid(row=2, column=0, sticky="nsew")
self.base_backup_path = backup_path
source_name = self.app.left_canvas_data.get('folder')
profile_name = "system" if source_name == "Computer" else source_name
# Check if the destination is encrypted and trigger mount if necessary
is_encrypted = self.backup_manager.encryption_manager.is_encrypted(
backup_path, profile_name)
self.viewing_encrypted = is_encrypted # Set this flag for remembering the view
pybackup_dir = os.path.join(backup_path, "pybackup")
if not os.path.isdir(pybackup_dir):
app_logger.log(
f"Backup path {pybackup_dir} does not exist or is not a directory.")
# Clear views if path is invalid
self.system_backups_frame.show(backup_path, [])
self.user_backups_frame.show(backup_path, [])
self.mount_frame.pack_forget()
return
all_backups = self.backup_manager.list_all_backups(
backup_path, mount_if_needed=True)
if all_backups:
system_backups, user_backups = all_backups
self.system_backups_frame.show(backup_path, system_backups)
self.user_backups_frame.show(backup_path, user_backups)
else:
# Handle case where inspection returns None (e.g. encrypted and mount_if_needed=False)
self.system_backups_frame.show(backup_path, [])
self.user_backups_frame.show(backup_path, [])
backup_data = self.backup_manager.list_all_backups(backup_path)
self.system_backups_frame.show(
backup_path, backup_data["system_backups"])
self.user_backups_frame.show(backup_path, backup_data["user_backups"])
self.encrypted_profiles = backup_data["encrypted_profiles"]
unmounted_profiles = [
name for name, data in self.encrypted_profiles.items() if not data['is_mounted']
]
if unmounted_profiles:
self.profile_combobox['values'] = unmounted_profiles
self.profile_combobox.set(unmounted_profiles[0])
self.mount_frame.pack(side=tk.RIGHT, padx=10)
else:
self.mount_frame.pack_forget()
# Use the passed index to switch to the correct view
self.after(10, lambda: self._switch_view(initial_tab_index))
def hide(self):

View File

@@ -69,55 +69,34 @@ class HeaderFrame(tk.Frame):
self.refresh_status()
def refresh_status(self):
"""Checks the keyring and mount status based on the current destination and updates the label."""
"""Checks the mount status of all encrypted profiles and updates the label."""
app_logger.log("HeaderFrame: Refreshing status...")
dest_path = self.app.destination_path
app_logger.log(f"HeaderFrame: Destination path is '{dest_path}'")
source_name = self.app.left_canvas_data.get('folder')
if not source_name:
if not dest_path or not os.path.isdir(dest_path):
self.keyring_status_label.config(text="")
return
profile_name = "system" if source_name == "Computer" else source_name
# Get all profile info without triggering a mount
backup_data = self.app.backup_manager.list_all_backups(dest_path)
encrypted_profiles = backup_data.get("encrypted_profiles", {})
if not dest_path or not self.encryption_manager.is_encrypted(dest_path, profile_name):
app_logger.log(
"HeaderFrame: No destination path or not encrypted. Clearing status.")
# Clear status if not encrypted
if not encrypted_profiles:
self.keyring_status_label.config(text="")
return
app_logger.log("HeaderFrame: Destination is encrypted.")
username = os.path.basename(dest_path.rstrip('/'))
app_logger.log(f"HeaderFrame: Username is '{username}'")
total_count = len(encrypted_profiles)
mounted_count = sum(
1 for profile in encrypted_profiles.values() if profile['is_mounted'])
is_mounted = self.encryption_manager.is_mounted(
dest_path, profile_name)
app_logger.log(f"HeaderFrame: Is mounted? {is_mounted}")
status_text = f"Encrypted: {mounted_count}/{total_count} Mounted"
status_text = ""
fg_color = ""
if is_mounted:
status_text = "Status: Mounted"
fg_color = "#6bbbff" # LightBlue
if mounted_count == total_count:
fg_color = "#6bbbff" # LightBlue (All Mounted)
elif mounted_count > 0:
fg_color = "#eb7f11" # Orange (Partially Mounted)
else:
status_text = "Status: Not Mounted"
fg_color = "#eb7f11" # Orange
key_in_keyring = self.encryption_manager.is_key_in_keyring(username)
key_file_exists = os.path.exists(
self.encryption_manager.get_key_file_path(dest_path, profile_name))
if key_in_keyring:
status_text += " (Keyring Available)"
elif key_file_exists:
status_text += " (Keyfile Available)"
else:
status_text += " (Key Not Available)"
if not is_mounted: # If not mounted and key not available, make it more prominent
fg_color = "#DC143C" # Crimson
fg_color = "#E8740C" # A different Orange/Red for None Mounted
self.keyring_status_label.config(
text=status_text,

162
python.py Normal file
View File

@@ -0,0 +1,162 @@
#!/usr/bin/python3
def _create_install_script(self, project_key):
"""Create installation script based on project"""
if project_key == "wirepy":
return self._create_wirepy_install_script()
elif project_key == "logviewer":
return self._create_logviewer_install_script()
else:
raise Exception(f"{LocaleStrings.MSGI['unknow_project']}{project_key}")
def _create_wirepy_install_script(self):
gpg_checker = self.check_gpg()
result_appimage = self.check_appimage()
if not gpg_checker:
print(gpg_checker)
return
if result_appimage is True or result_appimage is None:
return
detected_os = Detector.get_os()
if detected_os == "Arch Linux":
result_unzip = Detector.get_unzip()
result_wget = Detector.get_wget()
result_requests = Detector.get_requests()
else:
result_unzip = None
result_wget = None
result_requests = None
"""Create Wire-Py installation script"""
script = f"""#!/bin/bash
set -e
if [ "{result_appimage}" = "False" ]; then
cp -f /tmp/portinstaller/lxtools_installer /usr/local/bin/lxtools_installer
fi
if [ "{detected_os}" = "Arch Linux" ]; then
if [ "{result_unzip}" = "False" ]; then
pacman -S --noconfirm unzip
fi
if [ "{result_wget}" = "False" ]; then
pacman -S --noconfirm wget
fi
if [ "{result_requests}" = "False" ]; then
pacman -S --noconfirm python-requests
fi
fi
{LXToolsAppConfig.TKINTER_INSTALL_COMMANDS[detected_os]} 2>&1 | grep -v "apt does not have a stable CLI interface"
# Create necessary directories
mkdir -p {LXToolsAppConfig.SHARED_LIBS_DESTINATION[detected_os]}
mkdir -p /usr/share/icons/lx-icons
mkdir -p /usr/share/locale/de/LC_MESSAGES
mkdir -p /usr/share/applications
mkdir -p /usr/local/etc/ssl
mkdir -p /usr/share/polkit-1/actions
mkdir -p /usr/share/TK-Themes
# Download and extract Wire-Py
cd /tmp
rm -rf wirepy_install
mkdir wirepy_install
cd wirepy_install
echo "Downloading Wire-Py..."
wget -q "{LXToolsAppConfig.WIREPY_URL}" -O wirepy.zip
unzip -q wirepy.zip
WIREPY_DIR=$(find . -name "wire-py" -type d | head -1)
echo "Downloading shared libraries..."
wget -q "{LXToolsAppConfig.SHARED_LIBS_URL}" -O shared.zip
unzip -q shared.zip
SHARED_DIR=$(find . -name "shared_libs" -type d | head -1)
# Install Wire-Py files
echo "Installing Wire-Py executables..."
for file in wirepy.py start_wg.py ssl_encrypt.py ssl_decrypt.py match_found.py tunnel.py; do
if [ -f "$WIREPY_DIR/$file" ]; then
cp -f "$WIREPY_DIR/$file" /usr/local/bin/
chmod 755 /usr/local/bin/$file
echo "Installed $file"
fi
done
# Install config
if [ -f "$WIREPY_DIR/wp_app_config.py" ]; then
cp -f "$WIREPY_DIR/wp_app_config.py" {LXToolsAppConfig.SHARED_LIBS_DESTINATION[detected_os]}/
echo "Installed wp_app_config.py"
fi
# Install shared libraries
echo "Installing shared libraries..."
for file in common_tools.py message.py file_and_dir_ensure.py gitea.py __init__.py logview_app_config.py; do
if [ -f "$SHARED_DIR/$file" ]; then
cp -f "$SHARED_DIR/$file" {LXToolsAppConfig.SHARED_LIBS_DESTINATION[detected_os]}/
echo "Installed shared lib: $file"
fi
done
# Install LogViewer executable
if [ -f "$SHARED_DIR/logviewer.py" ]; then
cp -f "$SHARED_DIR/logviewer.py" {LXToolsAppConfig.SHARED_LIBS_DESTINATION[detected_os]}/
chmod 755 {LXToolsAppConfig.SHARED_LIBS_DESTINATION[detected_os]}/logviewer.py
echo "Installed logviewer.py (executable)"
fi
# Install icons
if [ -d "$WIREPY_DIR/lx-icons" ]; then
echo "Installing icons..."
cp -rf "$WIREPY_DIR/lx-icons"/* /usr/share/icons/lx-icons/
fi
# Install TK-Themes
if [ -d "$WIREPY_DIR/TK-Themes" ]; then
echo "Installing TK-Themes..."
cp -rf "$WIREPY_DIR/TK-Themes"/* /usr/share/TK-Themes/
fi
# Install desktop file
if [ -f "$WIREPY_DIR/Wire-Py.desktop" ]; then
cp -f "$WIREPY_DIR/Wire-Py.desktop" /usr/share/applications/
echo "Installed desktop file"
fi
# Install language files
if [ -d "$WIREPY_DIR/languages/de" ]; then
echo "Installing language files..."
cp -f "$WIREPY_DIR/languages/de"/*.mo /usr/share/locale/de/LC_MESSAGES/ 2>/dev/null || true
fi
# Install policy file
if [ -f "$WIREPY_DIR/org.sslcrypt.policy" ]; then
cp -f "$WIREPY_DIR/org.sslcrypt.policy" /usr/share/polkit-1/actions/
echo "Installed policy file"
fi
# Create symlink for Wirepy
ln -sf /usr/local/bin/wirepy.py /usr/local/bin/wirepy
# Create symlink for LogViewer
ln -sf {LXToolsAppConfig.SHARED_LIBS_DESTINATION[detected_os]}/logviewer.py /usr/local/bin/logviewer
echo "Created Wirepy and LogViewer symlink"
# Install language files if available
if [ -d "$SHARED_DIR/languages/de" ]; then
echo "Installing language files..."
cp "$SHARED_DIR/languages/de"/*.mo /usr/share/locale/de/LC_MESSAGES/ 2>/dev/null || true
fi
echo "Created symlink"
# Create SSL key if not exists
if [ ! -f /usr/local/etc/ssl/pwgk.pem ]; then
echo "Creating SSL key..."
openssl genrsa -out /usr/local/etc/ssl/pwgk.pem 4096
chmod 600 /usr/local/etc/ssl/pwgk.pem
fi
# Cleanup
cd /tmp
rm -rf wirepy_install portinstaller
echo "Wire-Py installation completed!"
"""
return script