Refactor: Implement new backup structure, fix paths, and resolve runtime errors.

This commit is contained in:
2025-09-06 00:49:35 +02:00
parent 77ac4f5c4f
commit 0359b37ff8
18 changed files with 266 additions and 390 deletions

View File

@@ -11,7 +11,7 @@ import tempfile
import stat
import shutil
from pbp_app_config import AppConfig
from core.pbp_app_config import AppConfig
from pyimage_ui.password_dialog import PasswordDialog
from core.encryption_manager import EncryptionManager
@@ -50,7 +50,7 @@ class BackupManager:
]
script_content = "\n".join(script_parts)
if self._execute_as_root(script_content):
if self.encryption_manager._execute_as_root(script_content):
self.logger.log(
"Backup cancellation and deletion script succeeded.")
else:
@@ -72,7 +72,7 @@ class BackupManager:
return
script_content = f'rm -rf "{path}"'
if self._execute_as_root(script_content):
if self.encryption_manager._execute_as_root(script_content):
self.logger.log(f"Successfully deleted path: {path}")
else:
self.logger.log(f"Failed to delete path: {path}")
@@ -111,7 +111,7 @@ rm -f '{info_file}'
self.logger.log(
f"Cancelling system process with pgid {pgid} via privileged script.")
script_content = f"kill -SIGTERM -- -{pgid}"
self._execute_as_root(script_content)
self.encryption_manager._execute_as_root(script_content)
else:
os.killpg(pgid, signal.SIGTERM)
self.logger.log("Backup process terminated.")
@@ -172,9 +172,9 @@ set -e
echo \"rm command finished with exit code $?.\"
"""
if is_system:
if is_system or is_encrypted:
self.logger.log("Executing compression and cleanup as root.")
if self._execute_as_root(script_content):
if self.encryption_manager._execute_as_root(script_content):
self.logger.log("Compression and cleanup script executed successfully.")
return True
else:
@@ -202,19 +202,40 @@ set -e
def _run_backup_path(self, queue, source_path: str, dest_path: str, is_system: bool, is_dry_run: bool, exclude_files: Optional[List[Path]], source_size: int, is_compressed: bool, is_encrypted: bool, mode: str, password: Optional[str], key_file: Optional[str]):
try:
# 1. Determine all paths based on new structure
base_dest_path = os.path.dirname(dest_path) # e.g., /backup
pybackup_dir = os.path.join(base_dest_path, "pybackup")
backup_name = os.path.basename(dest_path) # e.g., 2025-09-05..._system_full
os.makedirs(pybackup_dir, exist_ok=True)
mount_point = None
if is_encrypted:
base_dest_path = os.path.dirname(dest_path)
size_gb = int(source_size / (1024**3) * 1.1) + 1
mount_point = self.encryption_manager.setup_encrypted_backup(queue, base_dest_path, size_gb, password=password, key_file=key_file)
size_gb = int(source_size / (1024**3) * 1.1) + 1
mount_point = self.encryption_manager.setup_encrypted_backup(
queue, base_dest_path, size_gb, password=password, key_file=key_file)
if not mount_point:
return
rsync_base_dest = mount_point
rsync_dest = os.path.join(rsync_base_dest, os.path.basename(dest_path))
else:
rsync_base_dest = os.path.dirname(dest_path)
rsync_dest = dest_path
if not is_system:
user_backup_dir = os.path.join(mount_point, "user_backups")
# Create the directory as root since the mount point is root-owned
if not self.encryption_manager._execute_as_root(f"mkdir -p {user_backup_dir}"):
self.logger.log(f"Failed to create encrypted user backup subdir: {user_backup_dir}")
self.encryption_manager.cleanup_encrypted_backup(base_dest_path)
return
rsync_base_dest = user_backup_dir
rsync_dest = os.path.join(rsync_base_dest, backup_name)
else: # Not encrypted
rsync_base_dest = pybackup_dir
if not is_system:
rsync_base_dest = os.path.join(pybackup_dir, "user_backups")
os.makedirs(rsync_base_dest, exist_ok=True)
rsync_dest = os.path.join(rsync_base_dest, backup_name)
self.logger.log(
f"Starting backup from '{source_path}' to '{rsync_dest}'...")
@@ -228,7 +249,8 @@ set -e
latest_backup_path = self._find_latest_backup(rsync_base_dest)
command = []
if is_system:
# Use pkexec if it's a system backup OR an encrypted backup (as mount is root-owned)
if is_system or is_encrypted:
command.extend(['pkexec', 'rsync', '-aAXHv'])
else:
command.extend(['rsync', '-av'])
@@ -269,23 +291,15 @@ set -e
status = 'cancelled'
if status in ['success', 'warning'] and not is_dry_run:
info_filename_base = os.path.basename(dest_path)
self.logger.log(f"latest_backup_path: {latest_backup_path}")
self.logger.log(f"source_size (from UI): {source_size}")
if mode == "full" or latest_backup_path is None:
final_size = total_size if total_size > 0 else source_size
else:
final_size = transferred_size
info_filename_base = backup_name
if is_compressed:
self.logger.log(f"Compression requested for {dest_path}")
self.logger.log(f"Compression requested for {rsync_dest}")
queue.put(('status_update', 'Phase 2/2: Komprimiere Backup...'))
queue.put(('progress_mode', 'indeterminate'))
queue.put(('cancel_button_state', 'disabled'))
if self._compress_and_cleanup(dest_path, is_system):
if self._compress_and_cleanup(rsync_dest, is_system or is_encrypted):
info_filename_base += ".tar.gz"
else:
self.logger.log("Compression failed, keeping uncompressed backup.")
@@ -293,8 +307,13 @@ set -e
queue.put(('progress_mode', 'determinate'))
queue.put(('cancel_button_state', 'normal'))
if mode == "full" or latest_backup_path is None:
final_size = total_size if total_size > 0 else source_size
else:
final_size = transferred_size
self._create_info_file(
dest_path, f"{info_filename_base}.txt", final_size)
pybackup_dir, info_filename_base, final_size, is_encrypted)
queue.put(('completion', {'status': status, 'returncode': return_code}))
else:
@@ -303,17 +322,16 @@ set -e
queue.put(('completion', {'status': 'error', 'returncode': -1}))
self.logger.log(
f"Backup to '{dest_path}' completed.")
f"Backup to '{rsync_dest}' completed.")
finally:
if is_encrypted and mount_point:
mapper_name = f"pybackup_{os.path.basename(os.path.dirname(dest_path))}"
self.encryption_manager.cleanup_encrypted_backup(mapper_name, mount_point)
self.encryption_manager.cleanup_encrypted_backup(base_dest_path)
self.process = None
def _create_info_file(self, dest_path: str, filename: str, source_size: int):
def _create_info_file(self, pybackup_dir: str, backup_name: str, source_size: int, is_encrypted: bool):
try:
parent_dir = os.path.dirname(dest_path)
info_file_path = os.path.join(parent_dir, filename)
info_filename = f"{backup_name}_encrypted.txt" if is_encrypted else f"{backup_name}.txt"
info_file_path = os.path.join(pybackup_dir, info_filename)
original_bytes = source_size
if source_size > 0:
@@ -343,7 +361,7 @@ set -e
except Exception as e:
self.logger.log(
f"Failed to create metadata file. Please check permissions for {os.path.dirname(info_file_path)}. Error: {e}")
f"Failed to create metadata file. Please check permissions for {pybackup_dir}. Error: {e}")
def _execute_rsync(self, queue, command: List[str]):
transferred_size = 0
@@ -490,7 +508,7 @@ set -e
source = source_path.rstrip('/') + '/'
script_content = f"rsync -aAXHv '{source}' '{dest_path}'"
if self._execute_as_root(script_content):
if self.encryption_manager._execute_as_root(script_content):
self.logger.log("Restore script executed successfully.")
status = 'success'
else:
@@ -567,11 +585,17 @@ set -e
return details
def has_encrypted_backups(self, base_backup_path: str) -> bool:
"""Checks if any encrypted system backups exist in the destination."""
"""Checks if any encrypted backups (system or user) exist in the destination."""
system_backups = self.list_system_backups(base_backup_path)
for backup in system_backups:
if backup.get('is_encrypted'):
return True
user_backups = self.list_user_backups(base_backup_path)
for backup in user_backups:
if backup.get('is_encrypted'):
return True
return False
def list_backups(self, base_backup_path: str) -> List[str]:
@@ -583,39 +607,44 @@ set -e
backups.append(item)
return sorted(backups, reverse=True)
def list_system_backups(self, scan_path: str, is_encrypted_and_mounted: bool = False) -> List[Dict[str, str]]:
"""Lists all system backups, handling encrypted containers and sorting into groups."""
if not os.path.isdir(scan_path):
return []
try:
self.logger.log(f"Scanning for backups in {scan_path}. Contents: {os.listdir(scan_path)}")
except Exception as e:
self.logger.log(f"Could not list directory {scan_path}: {e}")
def list_system_backups(self, base_dest_path: str) -> List[Dict[str, str]]:
"""Lists all system backups by scanning for info files in the central pybackup directory."""
pybackup_dir = os.path.join(base_dest_path, "pybackup")
if not os.path.isdir(pybackup_dir):
return []
all_backups = []
# Regex to capture details from the info file name
name_regex = re.compile(
r"^(\d{2}-\d{2}-\d{4})_(\d{2}:\d{2}:\d{2})_system_(full|incremental)(\.tar\.gz)?$", re.IGNORECASE)
for item in os.listdir(scan_path):
if item.endswith('.txt') or item.endswith('.luks'):
continue
r"^(\d{2}-\d{2}-\d{4})_(\d{2}:\d{2}:\d{2})_system_(full|incremental)(\.tar\.gz)?(_encrypted)?\.txt$", re.IGNORECASE)
for item in os.listdir(pybackup_dir):
match = name_regex.match(item)
if not match:
continue
full_path = os.path.join(scan_path, item)
date_str, time_str, backup_type_base, extension = match.groups()
is_compressed = (extension == ".tar.gz")
date_str, time_str, backup_type_base, comp_ext, enc_suffix = match.groups()
is_encrypted = (enc_suffix is not None)
is_compressed = (comp_ext is not None)
backup_name = item.replace(".txt", "").replace("_encrypted", "")
if is_encrypted:
encrypted_dir = os.path.join(pybackup_dir, "encrypted")
full_path = os.path.join(encrypted_dir, backup_name)
else:
full_path = os.path.join(pybackup_dir, backup_name)
backup_type = backup_type_base.capitalize()
if is_compressed:
backup_type += " (Compressed)"
if is_encrypted:
backup_type += " (Encrypted)"
backup_size = "N/A"
comment = ""
info_file_path = os.path.join(scan_path, f"{item}.txt")
info_file_path = os.path.join(pybackup_dir, item)
if os.path.exists(info_file_path):
try:
with open(info_file_path, 'r') as f:
@@ -632,11 +661,11 @@ set -e
"time": time_str,
"type": backup_type,
"size": backup_size,
"folder_name": item,
"full_path": full_path,
"folder_name": backup_name,
"full_path": full_path, # This path might not be accessible if encrypted container is not mounted
"comment": comment,
"is_compressed": is_compressed,
"is_encrypted": is_encrypted_and_mounted,
"is_encrypted": is_encrypted,
"backup_type_base": backup_type_base.capitalize(),
"datetime": datetime.datetime.strptime(f"{date_str} {time_str}", '%d-%m-%Y %H:%M:%S')
})
@@ -668,68 +697,60 @@ set -e
return final_sorted_list
def list_user_backups(self, base_backup_path: str) -> List[Dict[str, str]]:
"""Lists all user backups found in the base backup path."""
def list_user_backups(self, base_dest_path: str) -> List[Dict[str, str]]:
"""Lists all user backups by scanning for info files in the central pybackup directory."""
pybackup_dir = os.path.join(base_dest_path, "pybackup")
if not os.path.isdir(pybackup_dir):
return []
user_backups = []
if not os.path.isdir(base_backup_path):
return user_backups
try:
self.logger.log(f"Scanning for user backups in {base_backup_path}. Contents: {os.listdir(base_backup_path)}")
except Exception as e:
self.logger.log(f"Could not list directory {base_backup_path}: {e}")
return user_backups
# Regex to capture details from user backup info file names
name_regex = re.compile(
r"^(\d{2}-\d{2}-\d{4})_(\d{2}:\d{2}:\d{2})_user_(.+?)(_encrypted)?\.txt$", re.IGNORECASE)
system_backup_regex = re.compile(
r"^(\d{2}-\d{2}-\d{4})_(\d{2}:\d{2}:\d{2})_system_(full|incremental)(\.tar\.gz)?$", re.IGNORECASE)
for item in os.listdir(base_backup_path):
if system_backup_regex.match(item):
continue # Skip system backups
full_path = os.path.join(base_backup_path, item)
if not os.path.isdir(full_path):
for item in os.listdir(pybackup_dir):
match = name_regex.match(item)
if not match:
continue
info_file_path = os.path.join(base_backup_path, f"{item}.txt")
date_str, time_str, source_name, enc_suffix = match.groups()
is_encrypted = (enc_suffix is not None)
backup_name = item.replace(".txt", "").replace("_encrypted", "")
if is_encrypted:
encrypted_dir = os.path.join(pybackup_dir, "encrypted", "user_backups")
full_path = os.path.join(encrypted_dir, backup_name)
else:
user_backups_dir = os.path.join(pybackup_dir, "user_backups")
full_path = os.path.join(user_backups_dir, backup_name)
backup_size = "N/A"
comment = ""
info_file_path = os.path.join(pybackup_dir, item)
if os.path.exists(info_file_path):
backup_size = "N/A"
backup_date = "N/A"
backup_time = "N/A"
comment = ""
try:
with open(info_file_path, 'r') as f:
for line in f:
if line.strip().lower().startswith("originalgröße:"):
size_part = line.split(":", 1)[1].strip()
if '(' in size_part:
backup_size = size_part.split('(')[0].strip()
else:
backup_size = size_part
elif line.strip().lower().startswith("backup-datum:"):
full_date_str = line.split(":", 1)[1].strip()
date_parts = full_date_str.split()
if len(date_parts) >= 2:
backup_date = date_parts[0]
backup_time = date_parts[1]
else:
backup_date = full_date_str
backup_size = line.split(":", 1)[1].strip().split('(')[0].strip()
elif line.strip().lower().startswith("kommentar:"):
comment = line.split(":", 1)[1].strip()
except Exception as e:
self.logger.log(
f"Could not read info file {info_file_path}: {e}")
self.logger.log(f"Could not read info file {info_file_path}: {e}")
user_backups.append({
"date": backup_date,
"time": backup_time,
"size": backup_size,
"folder_name": item,
"full_path": full_path,
"comment": comment
})
user_backups.append({
"date": date_str,
"time": time_str,
"size": backup_size,
"folder_name": backup_name,
"full_path": full_path,
"comment": comment,
"is_encrypted": is_encrypted,
"source": source_name
})
# Sort chronologically, oldest first
user_backups.sort(key=lambda x: f"{x['date']} {x['time']}", reverse=False)
user_backups.sort(key=lambda x: f"{x['date']} {x['time']}", reverse=True)
return user_backups
def get_comment(self, info_file_path: str) -> str:

View File

@@ -5,7 +5,7 @@ import shutil
import re
import subprocess
from queue import Empty
from pbp_app_config import AppConfig, Msg
from core.pbp_app_config import AppConfig, Msg
from shared_libs.logger import app_logger

View File

@@ -9,7 +9,7 @@ import stat
import re
from typing import Optional
from pbp_app_config import AppConfig
from core.pbp_app_config import AppConfig
from pyimage_ui.password_dialog import PasswordDialog
@@ -34,8 +34,9 @@ class EncryptionManager:
self.logger.log(
f"Attempting to create and add key file for {base_dest_path}")
pybackup_dir = os.path.join(base_dest_path, "pybackup")
encrypted_dir = os.path.join(pybackup_dir, "encrypted")
container_path = os.path.join(
pybackup_dir, "encrypted/pybackup_encrypted.luks")
encrypted_dir, "pybackup_encrypted.luks")
key_file_path = self.get_key_file_path(base_dest_path)
if not os.path.exists(container_path):
@@ -137,8 +138,8 @@ class EncryptionManager:
return password
def is_container_mounted(self, base_dest_path: str) -> bool:
mapper_name = f"pybackup_{os.path.basename(base_dest_path.rstrip('/'))}"
mount_point = f"/mnt/{mapper_name}"
pybackup_dir = os.path.join(base_dest_path, "pybackup")
mount_point = os.path.join(pybackup_dir, "encrypted")
return os.path.ismount(mount_point)
def unlock_container(self, base_dest_path: str, password: Optional[str] = None, key_file: Optional[str] = None) -> Optional[str]:
@@ -151,15 +152,16 @@ class EncryptionManager:
return None
pybackup_dir = os.path.join(base_dest_path, "pybackup")
encrypted_dir = os.path.join(pybackup_dir, "encrypted")
container_path = os.path.join(
pybackup_dir, "encrypted/pybackup_encrypted.luks")
encrypted_dir, "pybackup_encrypted.luks")
if not os.path.exists(container_path):
self.logger.log(
f"Encrypted container not found at {container_path}")
return None
mapper_name = f"pybackup_{os.path.basename(base_dest_path.rstrip('/'))}"
mount_point = f"/mnt/{mapper_name}"
mount_point = encrypted_dir
if os.path.ismount(mount_point):
self.logger.log(f"Container already mounted at {mount_point}")
@@ -197,9 +199,10 @@ class EncryptionManager:
return None
pybackup_dir = os.path.join(base_dest_path, "pybackup")
container_path = os.path.join(pybackup_dir, "pybackup_encrypted.luks")
encrypted_dir = os.path.join(pybackup_dir, "encrypted")
container_path = os.path.join(encrypted_dir, "pybackup_encrypted.luks")
mapper_name = f"pybackup_{os.path.basename(base_dest_path.rstrip('/'))}"
mount_point = f"/mnt/{mapper_name}"
mount_point = encrypted_dir
if not password and not key_file:
self.logger.log("No password or key file provided for encryption.")
@@ -228,10 +231,9 @@ class EncryptionManager:
open_auth_part = f"cryptsetup luksOpen {container_path} {mapper_name} --key-file {key_file}"
script = f"""
mkdir -p {pybackup_dir}
mkdir -p {encrypted_dir}
fallocate -l {size_gb}G {container_path}
{format_auth_part}
mkdir -p {mount_point}
{open_auth_part}
mkfs.ext4 /dev/mapper/{mapper_name}
mount /dev/mapper/{mapper_name} {mount_point}
@@ -251,13 +253,13 @@ class EncryptionManager:
return mount_point
def cleanup_encrypted_backup(self, base_dest_path: str):
pybackup_dir = os.path.join(base_dest_path, "pybackup")
mount_point = os.path.join(pybackup_dir, "encrypted")
mapper_name = f"pybackup_{os.path.basename(base_dest_path.rstrip('/'))}"
mount_point = f"/mnt/{mapper_name}"
self.logger.log(f"Cleaning up encrypted backup: {mapper_name}")
script = f"""
umount {mount_point} || echo "Mount point {mount_point} not found or already unmounted."
cryptsetup luksClose {mapper_name} || echo "Mapper {mapper_name} not found or already closed."
rmdir {mount_point} || echo "Mount point directory {mount_point} not found or already removed."
"""
if not self._execute_as_root(script):
self.logger.log("Encrypted backup cleanup script failed.")

View File

@@ -10,9 +10,9 @@ from shared_libs.log_window import LogWindow
from shared_libs.logger import app_logger
from shared_libs.animated_icon import AnimatedIcon
from shared_libs.common_tools import IconManager
from shared_libs.config_manager import ConfigManager
from backup_manager import BackupManager
from pbp_app_config import AppConfig, Msg
from core.config_manager import ConfigManager
from core.backup_manager import BackupManager
from core.pbp_app_config import AppConfig, Msg
from pyimage_ui.scheduler_frame import SchedulerFrame
from pyimage_ui.backup_content_frame import BackupContentFrame
from pyimage_ui.header_frame import HeaderFrame

View File

@@ -5,9 +5,9 @@ import os
from queue import Queue
import threading
from backup_manager import BackupManager
from core.backup_manager import BackupManager
from shared_libs.logger import app_logger
from pbp_app_config import AppConfig
from core.pbp_app_config import AppConfig
# A simple logger for the CLI that just prints to the console
class CliLogger:

View File

@@ -9,7 +9,7 @@ from typing import Optional
from shared_libs.message import MessageDialog
from shared_libs.custom_file_dialog import CustomFileDialog
from pbp_app_config import AppConfig, Msg
from core.pbp_app_config import AppConfig, Msg
from shared_libs.logger import app_logger
from shared_libs.common_tools import message_box_animation
@@ -27,32 +27,37 @@ class Actions:
self.app.inkrementell_var.set(True)
def _update_backup_type_controls(self):
# Only applies to system backups in backup mode
if self.app.mode != 'backup' or self.app.left_canvas_data.get('folder') != "Computer":
self._set_backup_type("full") # Default for user backups
self.app.full_backup_cb.config(state='disabled')
self.app.incremental_cb.config(state='disabled')
return
else:
# Re-enable if we switch back to system backup
self.app.full_backup_cb.config(state='normal')
self.app.incremental_cb.config(state='normal')
if self.app.full_backup_cb.cget('state') == 'disabled':
# If controls are forced by advanced settings, do nothing
if self.app.full_backup_cb.cget('state') == 'disabled' and self.app.incremental_cb.cget('state') == 'disabled':
return
full_backup_exists = False
if self.app.destination_path and os.path.isdir(self.app.destination_path):
pybackup_dir = os.path.join(self.app.destination_path, "pybackup")
encrypted_container = os.path.join(pybackup_dir, "pybackup_encrypted.luks")
if not os.path.isdir(pybackup_dir):
self._set_backup_type("full")
return
if os.path.exists(encrypted_container):
# If an encrypted container exists, we check for any directory inside the pybackup folder.
# The presence of any item other than the .luks file and its .txt info file suggests a backup has been made.
if os.path.isdir(pybackup_dir):
for item in os.listdir(pybackup_dir):
if not item.endswith(('.luks', '.txt')) and os.path.isdir(os.path.join(pybackup_dir, item)):
full_backup_exists = True
break
else:
# For non-encrypted backups, check for a directory that represents a full backup.
system_backups = self.app.backup_manager.list_system_backups(pybackup_dir)
for backup in system_backups:
if backup.get('backup_type_base') == 'Full':
full_backup_exists = True
break
is_encrypted_backup = self.app.encrypted_var.get()
system_backups = self.app.backup_manager.list_system_backups(self.app.destination_path)
for backup in system_backups:
# Match the encryption status and check if it's a full backup
if backup.get('is_encrypted') == is_encrypted_backup and backup.get('backup_type_base') == 'Full':
full_backup_exists = True
break
if full_backup_exists:
self._set_backup_type("incremental")
@@ -592,12 +597,23 @@ class Actions:
self.app.animated_icon.start()
if self.app.mode == "backup":
source_folder = self.app.left_canvas_data.get('folder')
source_size_bytes = self.app.source_size_bytes
if self.app.vollbackup_var.get():
self._start_system_backup("full", source_size_bytes)
if not source_folder:
app_logger.log("No source folder selected, aborting backup.")
self.app.backup_is_running = False
self.app.start_pause_button["text"] = Msg.STR["start"]
self._set_ui_state(True)
return
if source_folder == "Computer":
mode = "full" if self.app.vollbackup_var.get() else "incremental"
self._start_system_backup(mode, source_size_bytes)
else:
self._start_system_backup("incremental", source_size_bytes)
else:
self._start_user_backup()
else: # restore mode
# Restore logic would go here
pass
def _start_system_backup(self, mode, source_size_bytes):
@@ -635,7 +651,7 @@ class Actions:
date_str = now.strftime("%d-%m-%Y")
time_str = now.strftime("%H:%M:%S")
folder_name = f"{date_str}_{time_str}_system_{mode}"
final_dest = os.path.join(base_dest, "pybackup", folder_name)
final_dest = os.path.join(base_dest, folder_name) # The backup_manager will add /pybackup/
self.app.current_backup_path = final_dest
source_size_bytes = self.app.left_canvas_data.get('total_bytes', 0)
@@ -664,18 +680,52 @@ class Actions:
mode=mode,
password=password)
def _start_user_backup(self, sources):
dest = self.app.destination_path
if not dest:
def _start_user_backup(self):
base_dest = self.app.destination_path
source_path = self.app.left_canvas_data.get('path_display')
source_name = self.app.left_canvas_data.get('folder')
source_size_bytes = self.app.left_canvas_data.get('total_bytes', 0)
if not base_dest or not source_path:
MessageDialog(master=self.app, message_type="error",
title=Msg.STR["error"], text=Msg.STR["err_no_dest_folder"])
self.app.backup_is_running = False
self.app.start_pause_button["text"] = Msg.STR["start"]
self._set_ui_state(True)
return
is_encrypted = self.app.encrypted_var.get()
password = None
if is_encrypted:
username = os.path.basename(base_dest.rstrip('/'))
password = self.app.backup_manager.encryption_manager.get_password(username, confirm=True)
if not password:
app_logger.log("Encryption enabled, but no password provided. Aborting backup.")
self.app.backup_is_running = False
self.app.start_pause_button["text"] = Msg.STR["start"]
self._set_ui_state(True)
return
now = datetime.datetime.now()
date_str = now.strftime("%d-%m-%Y")
time_str = now.strftime("%H:%M:%S")
folder_name = f"{date_str}_{time_str}_user_{source_name}"
final_dest = os.path.join(base_dest, folder_name)
self.app.current_backup_path = final_dest
is_dry_run = self.app.testlauf_var.get()
for source in sources:
self.app.backup_manager.start_backup(
queue=self.app.queue,
source_path=source,
dest_path=dest,
is_system=False,
is_dry_run=is_dry_run)
is_compressed = self.app.compressed_var.get()
self.app.backup_manager.start_backup(
queue=self.app.queue,
source_path=source_path,
dest_path=final_dest,
is_system=False,
is_dry_run=is_dry_run,
exclude_files=None,
source_size=source_size_bytes,
is_compressed=is_compressed,
is_encrypted=is_encrypted,
mode='full',
password=password)

View File

@@ -3,7 +3,7 @@ from tkinter import ttk
import os
from pathlib import Path
from pbp_app_config import AppConfig, Msg
from core.pbp_app_config import AppConfig, Msg
from shared_libs.animated_icon import AnimatedIcon
from pyimage_ui.shared_logic import enforce_backup_type_exclusivity
from shared_libs.message import MessageDialog

View File

@@ -2,7 +2,7 @@ import tkinter as tk
from tkinter import ttk
import os
from shared_libs.animated_icon import AnimatedIcon
from pbp_app_config import Msg
from core.pbp_app_config import Msg
from pyimage_ui.system_backup_content_frame import SystemBackupContentFrame
from pyimage_ui.user_backup_content_frame import UserBackupContentFrame
from shared_libs.logger import app_logger
@@ -185,21 +185,18 @@ class BackupContentFrame(ttk.Frame):
self.base_backup_path = backup_path
if self.viewing_encrypted:
actual_backup_path = backup_path
else:
actual_backup_path = os.path.join(backup_path, "pybackup")
pybackup_dir = os.path.join(backup_path, "pybackup")
if not os.path.isdir(actual_backup_path):
if not os.path.isdir(pybackup_dir):
app_logger.log(
f"Backup path {actual_backup_path} does not exist or is not a directory.")
f"Backup path {pybackup_dir} does not exist or is not a directory.")
# Clear views if path is invalid
self.system_backups_frame.show(None)
self.user_backups_frame.show(None)
return
self.system_backups_frame.show(actual_backup_path)
self.user_backups_frame.show(actual_backup_path)
self.system_backups_frame.show(backup_path)
self.user_backups_frame.show(backup_path)
config_key = "last_encrypted_backup_content_view" if self.viewing_encrypted else "last_backup_content_view"
last_view = self.app.config_manager.get_setting(config_key, 0)

View File

@@ -1,6 +1,6 @@
# pyimage/ui/drawing.py
import tkinter as tk
from pbp_app_config import AppConfig, Msg
from core.pbp_app_config import AppConfig, Msg
import os
import threading
from shared_libs.animated_icon import AnimatedIcon

View File

@@ -1,7 +1,7 @@
import tkinter as tk
import os
from pbp_app_config import Msg
from core.pbp_app_config import Msg
from shared_libs.common_tools import IconManager
class HeaderFrame(tk.Frame):

View File

@@ -2,7 +2,7 @@
import os
import shutil
from shared_libs.message import MessageDialog
from pbp_app_config import Msg
from core.pbp_app_config import Msg
class Navigation:

View File

@@ -4,7 +4,7 @@ import os
from shared_libs.custom_file_dialog import CustomFileDialog
from shared_libs.message import MessageDialog
from pbp_app_config import Msg
from core.pbp_app_config import Msg
class SchedulerFrame(ttk.Frame):

View File

@@ -3,7 +3,7 @@ from tkinter import ttk
import os
from pathlib import Path
from pbp_app_config import AppConfig, Msg
from core.pbp_app_config import AppConfig, Msg
from pyimage_ui.advanced_settings_frame import AdvancedSettingsFrame
from shared_libs.custom_file_dialog import CustomFileDialog
from shared_libs.message import MessageDialog

View File

@@ -2,7 +2,7 @@ import tkinter as tk
from tkinter import ttk
import os
from pbp_app_config import Msg
from core.pbp_app_config import Msg
from pyimage_ui.comment_editor_dialog import CommentEditorDialog
class SystemBackupContentFrame(ttk.Frame):
@@ -82,7 +82,7 @@ class SystemBackupContentFrame(ttk.Frame):
if not selected_item_id:
return
info_file_path = os.path.join(self.backup_path, f"{selected_item_id}.txt")
info_file_path = os.path.join(self.backup_path, "pybackup", f"{selected_item_id}.txt")
if not os.path.exists(info_file_path):
self.backup_manager.update_comment(info_file_path, "")
@@ -117,7 +117,21 @@ class SystemBackupContentFrame(ttk.Frame):
if not selected_item_id:
return
folder_to_delete = os.path.join(self.backup_path, selected_item_id)
selected_backup = next((b for b in self.system_backups_list if b.get("folder_name") == selected_item_id), None)
if not selected_backup:
return
folder_to_delete = selected_backup['full_path']
info_file_to_delete = os.path.join(self.backup_path, "pybackup", f"{selected_item_id}.txt")
dialog = MessageDialog(master=self, message_type="warning",
title=Msg.STR["confirm_delete_title"],
text=Msg.STR["confirm_delete_text"].format(
folder_name=selected_item_id),
buttons=["ok_cancel"])
if dialog.get_result() != "ok":
return
self.actions._set_ui_state(False)
# This needs to be adapted, as the deletion status is now in the parent view

View File

@@ -3,7 +3,7 @@ from tkinter import ttk
import os
import shutil
from pbp_app_config import Msg
from core.pbp_app_config import Msg
from pyimage_ui.comment_editor_dialog import CommentEditorDialog
from shared_libs.message import MessageDialog
@@ -65,7 +65,7 @@ class UserBackupContentFrame(ttk.Frame):
if not selected_item_id:
return
info_file_path = os.path.join(self.backup_path, f"{selected_item_id}.txt")
info_file_path = os.path.join(self.backup_path, "pybackup", f"{selected_item_id}.txt")
if not os.path.exists(info_file_path):
self.backup_manager.update_comment(info_file_path, "")
@@ -85,8 +85,14 @@ class UserBackupContentFrame(ttk.Frame):
if not selected_item_id:
return
folder_to_delete = os.path.join(self.backup_path, selected_item_id)
info_file_to_delete = os.path.join(self.backup_path, f"{selected_item_id}.txt")
selected_backup = next((b for b in self.user_backups_list if b.get("folder_name") == selected_item_id), None)
if not selected_backup:
return
selected_backup = next((b for b in self.user_backups_list if b.get("folder_name") == selected_item_id), None)
if not selected_backup:
return
folder_to_delete = selected_backup['full_path']
info_file_to_delete = os.path.join(self.backup_path, "pybackup", f"{selected_item_id}.txt")
dialog = MessageDialog(master=self, message_type="warning",
title=Msg.STR["confirm_delete_title"],

View File

@@ -1,214 +0,0 @@
import tkinter as tk
from tkinter import ttk
import os
from shared_libs.message import MessageDialog
from shared_libs.custom_file_dialog import CustomFileDialog
from pbp_app_config import Msg
class ScheduleJobDialog(tk.Toplevel):
def __init__(self, parent, backup_manager):
super().__init__(parent)
self.parent = parent
self.backup_manager = backup_manager
self.result = None
self.title(Msg.STR["add_job_title"])
self.geometry("550x550") # Increased size
self.transient(parent)
self.grab_set()
self.backup_type = tk.StringVar(value="system")
self.destination = tk.StringVar()
self.user_sources = {
Msg.STR["cat_images"]: tk.BooleanVar(value=False),
Msg.STR["cat_documents"]: tk.BooleanVar(value=False),
Msg.STR["cat_music"]: tk.BooleanVar(value=False),
Msg.STR["cat_videos"]: tk.BooleanVar(value=False)
}
self.frequency = tk.StringVar(value="daily")
self.use_key_file = tk.BooleanVar(value=False)
self.key_file_path = tk.StringVar()
self._create_widgets()
self.protocol("WM_DELETE_WINDOW", self._on_cancel)
self.wait_window()
def _create_widgets(self):
main_frame = ttk.Frame(self, padding=10)
main_frame.pack(fill=tk.BOTH, expand=True)
# Backup Type
type_frame = ttk.LabelFrame(
main_frame, text=Msg.STR["backup_type"], padding=10)
type_frame.pack(fill=tk.X, padx=5, pady=5)
ttk.Radiobutton(type_frame, text=Msg.STR["system_backup_menu"], variable=self.backup_type,
value="system", command=self._toggle_user_sources).pack(anchor=tk.W)
ttk.Radiobutton(type_frame, text=Msg.STR["user_backup_menu"], variable=self.backup_type,
value="user", command=self._toggle_user_sources).pack(anchor=tk.W)
# Destination
dest_frame = ttk.LabelFrame(
main_frame, text=Msg.STR["dest_folder"], padding=10)
dest_frame.pack(fill=tk.X, padx=5, pady=5)
ttk.Entry(dest_frame, textvariable=self.destination, state="readonly",
width=50).pack(side=tk.LEFT, fill=tk.X, expand=True)
ttk.Button(dest_frame, text=Msg.STR["browse"], command=self._select_destination).pack(
side=tk.RIGHT)
# User Sources (initially hidden)
self.user_sources_frame = ttk.LabelFrame(
main_frame, text=Msg.STR["source_folders"], padding=10)
for name, var in self.user_sources.items():
ttk.Checkbutton(self.user_sources_frame, text=name,
variable=var).pack(anchor=tk.W)
self._toggle_user_sources() # Set initial visibility
# Key File Options
key_file_frame = ttk.LabelFrame(main_frame, text="Key File for Encrypted Backups", padding=10)
key_file_frame.pack(fill=tk.X, padx=5, pady=5)
ttk.Checkbutton(key_file_frame, text="Use Key File for automated access", variable=self.use_key_file, command=self._toggle_key_file_entry).pack(anchor=tk.W)
self.key_file_entry = ttk.Entry(key_file_frame, textvariable=self.key_file_path, state="disabled", width=50)
self.key_file_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
self.key_file_browse_button = ttk.Button(key_file_frame, text=Msg.STR["browse"], command=self._select_key_file, state="disabled")
self.key_file_browse_button.pack(side=tk.RIGHT)
# Frequency
freq_frame = ttk.LabelFrame(
main_frame, text=Msg.STR["frequency"], padding=10)
freq_frame.pack(fill=tk.X, padx=5, pady=5)
ttk.Radiobutton(freq_frame, text=Msg.STR["freq_daily"],
variable=self.frequency, value="daily").pack(anchor=tk.W)
ttk.Radiobutton(freq_frame, text=Msg.STR["freq_weekly"],
variable=self.frequency, value="weekly").pack(anchor=tk.W)
ttk.Radiobutton(freq_frame, text=Msg.STR["freq_monthly"],
variable=self.frequency, value="monthly").pack(anchor=tk.W)
# Buttons
button_frame = ttk.Frame(main_frame)
button_frame.pack(pady=10)
ttk.Button(button_frame, text=Msg.STR["save"], command=self._on_save).pack(
side=tk.LEFT, padx=5)
ttk.Button(button_frame, text=Msg.STR["cancel"], command=self._on_cancel).pack(
side=tk.LEFT, padx=5)
def _toggle_user_sources(self):
if self.backup_type.get() == "user":
self.user_sources_frame.pack(fill=tk.X, padx=5, pady=5)
else:
self.user_sources_frame.pack_forget()
def _toggle_key_file_entry(self):
state = "normal" if self.use_key_file.get() else "disabled"
self.key_file_entry.config(state=state)
self.key_file_browse_button.config(state=state)
def _select_destination(self):
dialog = CustomFileDialog(
self, mode="dir", title=Msg.STR["select_dest_folder_title"])
self.wait_window(dialog)
result = dialog.get_result()
if result:
self.destination.set(result)
def _select_key_file(self):
dialog = CustomFileDialog(
self, mode="file", title="Select Key File", filetypes=[("Key Files", "*.key"), ("All Files", "*.*")]
)
self.wait_window(dialog)
result = dialog.get_result()
if result:
self.key_file_path.set(result)
def _on_save(self):
dest = self.destination.get()
if not dest:
MessageDialog(master=self, message_type="error",
title=Msg.STR["error"], text=Msg.STR["err_no_dest_folder"])
return
job_type = self.backup_type.get()
job_frequency = self.frequency.get()
job_sources = []
if job_type == "user":
job_sources = [name for name,
var in self.user_sources.items() if var.get()]
if not job_sources:
MessageDialog(master=self, message_type="error",
title=Msg.STR["error"], text=Msg.STR["err_no_source_folder"])
return
# Check if destination is an encrypted container
is_encrypted_container = os.path.exists(os.path.join(dest, "pybackup", "pybackup_encrypted.luks"))
if is_encrypted_container and not self.use_key_file.get():
MessageDialog(master=self, message_type="error",
title=Msg.STR["error"], text="For encrypted destinations, 'Use Key File' must be selected for automated backups.")
return
if self.use_key_file.get() and not self.key_file_path.get():
MessageDialog(master=self, message_type="error",
title=Msg.STR["error"], text="Please select a key file path.")
return
# Construct the CLI command
script_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), "pybackup-cli.py")) # Call the CLI script
command = f"python3 {script_path} --backup-type {job_type} --destination \"{dest}\""
if job_type == "user":
# For CLI, we assume a single source path for simplicity
if len(job_sources) > 1:
MessageDialog(master=self, message_type="warning",
title="Warning", text="For user backups, only the first selected source will be used in the automated job.")
if job_sources:
# Use the actual path from AppConfig.FOLDER_PATHS
source_folder_name = job_sources[0]
source_path = str(AppConfig.FOLDER_PATHS.get(source_folder_name, ""))
if not source_path:
MessageDialog(master=self, message_type="error",
title=Msg.STR["error"], text=f"Unknown source folder: {source_folder_name}")
return
command += f" --source \"{source_path}\""
else:
MessageDialog(master=self, message_type="error",
title=Msg.STR["error"], text="Please select a source folder for user backup.")
return
if is_encrypted_container:
command += " --encrypted"
if self.use_key_file.get():
command += f" --key-file \"{self.key_file_path.get()}\""
# No --password option for cronjobs, as it's insecure
# Construct the cron job comment
comment = f"{self.backup_manager.app_tag}; type:{job_type}; freq:{job_frequency}; dest:{dest}"
if job_type == "user":
comment += f"; source:{source_path}"
if is_encrypted_container:
comment += "; encrypted"
if self.use_key_file.get():
comment += "; key_file"
self.result = {
"command": command,
"comment": comment,
"type": job_type,
"frequency": job_frequency,
"destination": dest,
"sources": job_sources # Keep original sources for display if needed
}
self.destroy()
def _on_cancel(self):
self.result = None
self.destroy()
def show(self):
self.parent.wait_window(self)
return self.result