feat: Implement incremental user backup logic and display

This commit introduces the core logic for handling incremental user backups.

Changes include:
- Updated `core/backup_manager.py`:
    - Modified `_list_user_backups_from_path` to parse new naming convention for user backups (including `_full` and `_incremental`).
    - Enhanced `_find_latest_backup` to filter by source name for user backups.
    - Adjusted `_run_backup_path` to dynamically determine backup mode (full/incremental) for user backups and apply `--link-dest` accordingly.
- Updated `pyimage_ui/user_backup_content_frame.py`:
    - Added `tag_colors` attribute for visual differentiation.
    - Included 'type' column in the Treeview.
    - Modified `_load_backup_content` to apply coloring based on backup type.
This commit is contained in:
2025-09-06 16:47:49 +02:00
parent e1b12227d0
commit a843a875c6
2 changed files with 77 additions and 43 deletions

View File

@@ -105,38 +105,24 @@ class BackupManager:
base_dest_path = os.path.dirname(dest_path)
pybackup_dir = os.path.join(base_dest_path, "pybackup")
backup_name = os.path.basename(dest_path)
user_source_name = None
if not is_system:
# Extract source name from backup_name (e.g., 2025-09-06_10-00-00_user_MyDocs_full.txt -> MyDocs)
match = re.match(r"^\d{2}-\d{2}-\d{4}_\d{2}:\d{2}:\d{2}_user_(.+?)_(full|incremental)(_encrypted)?$", backup_name)
if match:
user_source_name = match.group(1)
else:
self.logger.log(f"Could not parse user source name from backup_name: {backup_name}")
if is_encrypted:
if not mount_point:
self.logger.log("Encrypted backup running without a mount point. Aborting.")
queue.put(('completion', {'status': 'error', 'returncode': -1}))
return
rsync_base_dest = mount_point
if not is_system:
user_backup_dir = os.path.join(mount_point, "user_encrypt")
if not self.encryption_manager._execute_as_root(f"mkdir -p {user_backup_dir}"):
self.logger.log(f"Failed to create encrypted user backup subdir: {user_backup_dir}")
queue.put(('completion', {'status': 'error', 'returncode': -1}))
return
rsync_base_dest = user_backup_dir
rsync_dest = os.path.join(rsync_base_dest, backup_name)
else: # Not encrypted
rsync_base_dest = pybackup_dir
if not is_system:
rsync_base_dest = os.path.join(pybackup_dir, "user_backups")
os.makedirs(rsync_base_dest, exist_ok=True)
rsync_dest = os.path.join(rsync_base_dest, backup_name)
# ... (rsync_base_dest and rsync_dest calculation) ...
self.logger.log(f"Starting backup from '{source_path}' to '{rsync_dest}'...")
latest_backup_path = self._find_latest_backup(rsync_base_dest, user_source_name)
if os.path.isdir(source_path) and not source_path.endswith('/'):
source_path += '/'
if not os.path.exists(rsync_base_dest):
if not is_encrypted:
os.makedirs(rsync_base_dest, exist_ok=True)
latest_backup_path = self._find_latest_backup(rsync_base_dest)
# Determine actual mode for user backups
if not is_system and not latest_backup_path:
mode = "full" # If no previous backup, force full
elif not is_system and latest_backup_path:
mode = "incremental" # If previous backup exists, default to incremental
command = []
if is_system or is_encrypted:
@@ -301,20 +287,27 @@ class BackupManager:
return []
user_backups = []
name_regex = re.compile(
r"^(\d{2}-\d{2}-\d{4})_(\d{2}:\d{2}:\d{2})_user_(.+?)(_encrypted)?\.txt$", re.IGNORECASE)
r"^(\d{2}-\d{2}-\d{4})_(\d{2}:\d{2}:\d{2})_user_(.+?)_(full|incremental)(_encrypted)?\.txt$", re.IGNORECASE)
for item in os.listdir(pybackup_dir):
match = name_regex.match(item)
if not match:
continue
date_str, time_str, source_name, enc_suffix = match.groups()
date_str, time_str, source_name, backup_type_base, enc_suffix = match.groups()
is_encrypted = (enc_suffix is not None)
is_compressed = False # User backups are not compressed in this context
backup_name = item.replace(".txt", "").replace("_encrypted", "")
if mounted_path:
user_backup_dir = os.path.join(mounted_path, "user_encrypt")
full_path = os.path.join(user_backup_dir, backup_name)
else:
user_backups_dir = os.path.join(pybackup_dir, "user_backups")
full_path = os.path.join(user_backups_dir, backup_name)
backup_type = backup_type_base.capitalize()
if is_compressed: backup_type += " (Compressed)"
if is_encrypted: backup_type += " (Encrypted)"
backup_size = "N/A"
comment = ""
info_file_path = os.path.join(pybackup_dir, item)
@@ -329,22 +322,40 @@ class BackupManager:
except Exception as e:
self.logger.log(f"Could not read info file {info_file_path}: {e}")
user_backups.append({
"date": date_str, "time": time_str, "size": backup_size,
"folder_name": backup_name, "full_path": full_path, "comment": comment,
"is_encrypted": is_encrypted, "source": source_name
"date": date_str, "time": time_str, "type": backup_type,
"size": backup_size, "folder_name": backup_name, "full_path": full_path,
"comment": comment, "is_encrypted": is_encrypted, "source": source_name,
"is_compressed": is_compressed, "backup_type_base": backup_type_base.capitalize(),
"datetime": datetime.datetime.strptime(f"{date_str} {time_str}", '%d-%m-%Y %H:%M:%S')
})
user_backups.sort(key=lambda x: f"{x['date']} {x['time']}", reverse=True)
user_backups.sort(key=lambda x: x['datetime'], reverse=True)
return user_backups
def has_encrypted_backups(self, base_backup_path: str) -> bool:
return self.encryption_manager.is_encrypted(base_backup_path)
def _find_latest_backup(self, base_backup_path: str) -> Optional[str]:
"""Finds the most recent backup directory in a given path."""
self.logger.log(f"Searching for latest backup in: {base_backup_path}")
backup_names = self.list_backups(base_backup_path)
def _find_latest_backup(self, base_backup_path: str, source_name: Optional[str] = None) -> Optional[str]:
"""Finds the most recent backup directory in a given path, optionally filtered by source name."""
self.logger.log(f"Searching for latest backup in: {base_backup_path} for source: {source_name or 'All'}")
backup_names = []
if os.path.isdir(base_backup_path):
for item in os.listdir(base_backup_path):
# Only consider directories that match the expected backup name pattern
# and optionally filter by source_name
if os.path.isdir(os.path.join(base_backup_path, item)):
if source_name:
if f"_user_{source_name}_" in item:
backup_names.append(item)
else:
# For system backups or if no source_name is provided, include all
if "_system_" in item or "_user_" not in item: # Simple check to exclude other user backups if source_name is None
backup_names.append(item)
# Sort by date and time (assuming format YYYY-MM-DD_HH-MM-SS or similar at the beginning)
# This is a simplified sort, a more robust one would parse datetime objects
backup_names.sort(reverse=True)
if not backup_names:
self.logger.log("No previous backups found to link against.")
return None

View File

@@ -16,16 +16,25 @@ class UserBackupContentFrame(ttk.Frame):
self.user_backups_list = []
self.backup_path = None
columns = ("date", "time", "size", "comment", "folder_name")
self.tag_colors = [
("full_blue", "#0078D7", "inc_blue", "#50E6FF"),
("full_orange", "#E8740C", "inc_orange", "#FFB366"),
("full_green", "#107C10", "inc_green", "#50E680"),
("full_purple", "#8B107C", "inc_purple", "#D46EE5"),
]
columns = ("date", "time", "type", "size", "comment", "folder_name")
self.content_tree = ttk.Treeview(self, columns=columns, show="headings")
self.content_tree.heading("date", text=Msg.STR["date"])
self.content_tree.heading("time", text=Msg.STR["time"])
self.content_tree.heading("type", text=Msg.STR["type"])
self.content_tree.heading("size", text=Msg.STR["size"])
self.content_tree.heading("comment", text=Msg.STR["comment"])
self.content_tree.heading("folder_name", text=Msg.STR["folder"])
self.content_tree.column("date", width=100, anchor="w")
self.content_tree.column("time", width=80, anchor="center")
self.content_tree.column("type", width=120, anchor="center")
self.content_tree.column("size", width=100, anchor="e")
self.content_tree.column("comment", width=250, anchor="w")
self.content_tree.column("folder_name", width=200, anchor="w")
@@ -45,14 +54,28 @@ class UserBackupContentFrame(ttk.Frame):
if not self.user_backups_list:
return
for backup_info in self.user_backups_list:
color_index = -1
for i, backup_info in enumerate(self.user_backups_list):
if backup_info.get("backup_type_base") == "Full":
color_index = (color_index + 1) % len(self.tag_colors)
full_tag, full_color, inc_tag, inc_color = self.tag_colors[color_index]
self.content_tree.tag_configure(
full_tag, foreground=full_color)
self.content_tree.tag_configure(
inc_tag, foreground=inc_color, font=("Helvetica", 10, "bold"))
current_tag = full_tag
else:
_, _, inc_tag, _ = self.tag_colors[color_index]
current_tag = inc_tag
self.content_tree.insert("", "end", values=(
backup_info.get("date", "N/A"),
backup_info.get("time", "N/A"),
backup_info.get("type", "N/A"),
backup_info.get("size", "N/A"),
backup_info.get("comment", ""),
backup_info.get("folder_name", "N/A")
), iid=backup_info.get("folder_name"))
), tags=(current_tag,), iid=backup_info.get("folder_name"))
self._on_item_select(None)
def _on_item_select(self, event):