Files
Py-Backup/core/backup_manager.py
Désiré Werner Menrath 94a44881e6 feat(ui): Refine "Backup Content" view display logic
This commit implements several UI/UX improvements for the "Backup Content" list view based on user feedback.

- feat(ui): User backups are now grouped by their full/incremental chains, similar to system backups, for a more logical and organized view.
- feat(ui): The color scheme for backup chains has been simplified. Each chain (a full backup and its incrementals) now shares a single color to improve visual grouping.
- feat(ui): Incremental backups are now denoted by a ▲ icon in the Type column instead of a different color or font style, providing a clear and clean indicator.
- fix(ui): Adjusted all column widths in the backup lists to ensure all data (especially Date and Time) is fully visible without truncation.
2025-09-09 14:22:37 +02:00

783 lines
34 KiB
Python

import subprocess
import os
import threading
import re
import signal
import datetime
import math
import shutil
import json
from typing import Optional, List, Dict, Any
from pathlib import Path
from crontab import CronTab
import tempfile
import stat
from core.pbp_app_config import AppConfig
from pyimage_ui.password_dialog import PasswordDialog
from core.encryption_manager import EncryptionManager
class BackupManager:
"""
Handles the logic for creating and managing backups using rsync.
"""
def __init__(self, logger, app=None):
self.logger = logger
self.process = None
self.app_tag = "# Py-Backup Job"
self.is_system_process = False
self.app = app
self.encryption_manager = EncryptionManager(logger, app)
self.inhibit_cookie = None
def _inhibit_screensaver(self):
if not shutil.which("gdbus"):
return
try:
self.logger.log(
"Attempting to inhibit screensaver and power management.")
command = [
"gdbus", "call", "--session", "--dest", "org.freedesktop.ScreenSaver",
"--object-path", "/org/freedesktop/ScreenSaver",
"--method", "org.freedesktop.ScreenSaver.Inhibit",
"Py-Backup", "Backup in progress"
]
result = subprocess.run(
command, capture_output=True, text=True, check=True)
match = re.search(r'uint32\s+(\d+)', result.stdout)
if match:
self.inhibit_cookie = int(match.group(1))
self.logger.log(
f"Successfully inhibited screensaver with cookie {self.inhibit_cookie}")
except Exception as e:
self.logger.log(
f"An unexpected error occurred while inhibiting screensaver: {e}")
def _uninhibit_screensaver(self):
if self.inhibit_cookie is None:
return
if not shutil.which("gdbus"):
return
try:
self.logger.log(
f"Attempting to uninhibit screensaver with cookie {self.inhibit_cookie}")
command = [
"gdbus", "call", "--session", "--dest", "org.freedesktop.ScreenSaver",
"--object-path", "/org/freedesktop/ScreenSaver",
"--method", "org.freedesktop.ScreenSaver.UnInhibit",
str(self.inhibit_cookie)
]
subprocess.run(command, capture_output=True, text=True, check=True)
self.logger.log("Successfully uninhibited screensaver.")
except Exception as e:
self.logger.log(f"Failed to uninhibit screensaver: {e}")
finally:
self.inhibit_cookie = None
def _get_profile_path(self, base_dest_path: str, is_system: bool, source_name: str, is_encrypted: bool) -> str:
"""Helper function to construct the path to a specific backup profile directory."""
pybackup_dir = os.path.join(base_dest_path, "pybackup")
if is_encrypted:
base_data_dir = self.encryption_manager.get_mount_point(
base_dest_path)
else:
base_data_dir = os.path.join(pybackup_dir, "unencrypted")
if is_system:
return os.path.join(base_data_dir, "system")
else:
return os.path.join(base_data_dir, "user", source_name)
def check_for_full_backup(self, dest_path: str, source_name: str, is_encrypted: bool) -> bool:
"""Checks if a full backup already exists for a given source."""
self.logger.log(
f"Checking for existing full backup for source '{source_name}' in '{dest_path}' (Encrypted: {is_encrypted})")
is_system = source_name == 'system'
profile_path = self._get_profile_path(
dest_path, is_system, source_name, is_encrypted)
if not os.path.isdir(profile_path):
self.logger.log(
f"Profile directory '{profile_path}' does not exist. No full backup found.")
return False
enc_suffix = "enc" if is_encrypted else "plain"
pattern = re.compile(
rf"\d{{8}}-\d{{6}}_{re.escape(source_name)}_full_{enc_suffix}")
for dirname in os.listdir(profile_path):
if pattern.match(dirname):
self.logger.log(
f"Found existing full backup directory: {dirname}")
return True
self.logger.log(
f"No existing full backup found for source '{source_name}'.")
return False
def start_backup(self, queue, source_path: str, dest_path: str, is_system: bool, source_name: str, is_dry_run: bool = False, exclude_files: Optional[List[Path]] = None, source_size: int = 0, is_compressed: bool = False, is_encrypted: bool = False, mode: str = "incremental", use_trash_bin: bool = False, no_trash_bin: bool = False):
self.is_system_process = is_system
self._inhibit_screensaver()
mount_point = None
if is_encrypted:
mount_point = self.encryption_manager.prepare_encrypted_destination(
dest_path, is_system, source_size, queue)
if not mount_point:
self.logger.log(
"Failed to prepare encrypted destination. Aborting backup.")
queue.put(
('completion', {'status': 'error', 'returncode': -1}))
self._uninhibit_screensaver()
return None
thread = threading.Thread(target=self._run_backup_path, args=(
queue, source_path, dest_path, is_system, source_name, is_dry_run, exclude_files, source_size, is_compressed, is_encrypted, mode, use_trash_bin, no_trash_bin))
thread.daemon = True
thread.start()
return thread
def _run_backup_path(self, queue, source_path: str, dest_path: str, is_system: bool, source_name: str, is_dry_run: bool, exclude_files: Optional[List[Path]], source_size: int, is_compressed: bool, is_encrypted: bool, mode: str, use_trash_bin: bool, no_trash_bin: bool):
base_dest_path = dest_path
try:
profile_path = self._get_profile_path(
base_dest_path, is_system, source_name, is_encrypted)
os.makedirs(profile_path, exist_ok=True)
latest_full_backup_path = self._find_latest_backup(
profile_path, source_name)
if mode == "incremental" and not latest_full_backup_path:
self.logger.log(
f"Mode is incremental, but no full backup found for source '{source_name}'. Forcing full backup.")
mode = "full"
now = datetime.datetime.now()
timestamp = now.strftime("%Y%m%d-%H%M%S")
encryption_suffix = "enc" if is_encrypted else "plain"
backup_dir_name = f"{timestamp}_{source_name}_{mode}_{encryption_suffix}"
rsync_dest = os.path.join(profile_path, backup_dir_name)
queue.put(('current_path', rsync_dest))
rsync_command_parts = [
'rsync', '-aAXHv'] if is_system else ['rsync', '-aLv']
if mode == "incremental" and latest_full_backup_path and not is_dry_run:
rsync_command_parts.append(
f"--link-dest={latest_full_backup_path}")
rsync_command_parts.extend(['--info=progress2'])
if exclude_files:
rsync_command_parts.extend(
[f"--exclude-from={f}" for f in exclude_files])
if AppConfig.MANUAL_EXCLUDE_LIST_PATH.exists():
rsync_command_parts.append(
f"--exclude-from={AppConfig.MANUAL_EXCLUDE_LIST_PATH}")
if is_dry_run:
rsync_command_parts.append('--dry-run')
if not is_system:
user_base_dir = os.path.dirname(profile_path)
trash_bin_path = os.path.join(user_base_dir, ".Trash")
if use_trash_bin:
rsync_command_parts.extend(
['--backup', f'--backup-dir=\'{trash_bin_path}\'', '--delete'])
elif no_trash_bin:
rsync_command_parts.append('--delete')
if use_trash_bin or no_trash_bin:
rsync_command_parts.append(
f"--exclude='{os.path.basename(trash_bin_path)}/'")
if is_system:
rsync_command_parts.extend(
[f"'{source_path}'", f"'{rsync_dest}'"])
rsync_cmd_str = ' '.join(rsync_command_parts)
full_system_cmd = f"mkdir -p '{rsync_dest}' && {rsync_cmd_str}"
command = ['pkexec', 'bash', '-c', full_system_cmd]
else:
rsync_command_parts.extend([source_path, rsync_dest])
os.makedirs(rsync_dest, exist_ok=True)
command = rsync_command_parts
self.logger.log(f"Executing command: {' '.join(command)}")
transferred_size, total_size, stderr = self._execute_rsync(
queue, command)
return_code = self.process.returncode if self.process else -1
if self.process:
status = 'success' if return_code == 0 else 'warning' if return_code in [
23, 24] else 'cancelled' if return_code in [143, -15, 15, -9] else 'error'
if status in ['success', 'warning'] and not is_dry_run:
if mode == 'incremental' and latest_full_backup_path:
if is_system:
final_size = self._get_incremental_size_system(
rsync_dest)
else:
final_size = self._get_incremental_size_user(
rsync_dest, latest_full_backup_path)
else:
final_size = self._get_directory_size(rsync_dest)
self._create_info_json(
base_dest_path=base_dest_path,
backup_dir_name=backup_dir_name,
source_name=source_name,
backup_type="system" if is_system else "user",
mode=mode,
size_bytes=final_size,
is_encrypted=is_encrypted,
based_on=os.path.basename(
latest_full_backup_path) if latest_full_backup_path and mode == 'incremental' else None
)
queue.put(
('completion', {'status': status, 'returncode': return_code}))
except Exception as e:
self.logger.log(f"Exception in _run_backup_path: {e}")
queue.put(('completion', {'status': 'error', 'returncode': -1}))
finally:
self._uninhibit_screensaver()
self.process = None
def estimate_incremental_size(self, source_path: str, is_system: bool, source_name: str, base_dest_path: str, is_encrypted: bool, exclude_files: list) -> int:
"""
Calculates the approximate size of an incremental backup using rsync's
dry-run feature.
"""
self.logger.log(
f"Estimating incremental backup size for source: {source_path}")
if not base_dest_path:
self.logger.log(
"No destination path provided, cannot estimate incremental size.")
return 0
profile_path = self._get_profile_path(
base_dest_path, is_system, source_name, is_encrypted)
latest_backup_path = self._find_latest_backup(
profile_path, source_name)
if not latest_backup_path:
self.logger.log(
"No previous full backup found. Accurate incremental size cannot be estimated, returning 0.")
return 0
command = []
if is_system:
command.extend(['pkexec', 'rsync', '-aAXHvn', '--stats'])
else:
command.extend(['rsync', '-avn', '--stats'])
command.append(f"--link-dest={latest_backup_path}")
if exclude_files:
for exclude_file in exclude_files:
command.append(f"--exclude-from={exclude_file}")
try:
with tempfile.TemporaryDirectory() as dummy_dest:
command.extend([source_path, dummy_dest])
self.logger.log(
f"Executing rsync dry-run command: {' '.join(command)}")
result = subprocess.run(
command, capture_output=True, text=True, check=False)
# rsync exit code 24 means some files vanished during transfer, which is okay for a dry-run estimate.
if result.returncode != 0 and result.returncode != 24:
self.logger.log(
f"Rsync dry-run failed with code {result.returncode}: {result.stderr}")
return 0
output = result.stdout + "\\n" + result.stderr
match = re.search(
r"Total transferred file size: ([\d,.]+) bytes", output)
if match:
size_str = match.group(1).replace(',', '').replace('.', '')
size_bytes = int(size_str)
self.logger.log(
f"Estimated incremental backup size: {size_bytes} bytes")
return size_bytes
else:
self.logger.log(
"Could not find 'Total transferred file size' in rsync output.")
self.logger.log(
f"Full rsync output for debugging:\\n{output}")
return 0
except FileNotFoundError:
self.logger.log("Error: 'rsync' or 'pkexec' command not found.")
return 0
except Exception as e:
self.logger.log(
f"An unexpected error occurred during incremental size estimation: {e}")
return 0
def _get_directory_size(self, path: str) -> int:
if not os.path.isdir(path):
return 0
try:
result = subprocess.run(
["du", "-sb", path], capture_output=True, text=True, check=True)
size_in_bytes = int(result.stdout.split()[0])
return size_in_bytes
except (subprocess.CalledProcessError, FileNotFoundError, ValueError, IndexError) as e:
self.logger.log(
f"Could not calculate directory size for {path} using du: {e}")
total_size = 0
try:
for dirpath, dirnames, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
if not os.path.islink(fp):
total_size += os.path.getsize(fp)
return total_size
except Exception as fallback_e:
self.logger.log(
f"Fallback size calculation also failed for {path}: {fallback_e}")
return 0
def _get_incremental_size_user(self, inc_path: str, full_path: str) -> int:
total_size = 0
for dirpath, _, filenames in os.walk(inc_path):
for filename in filenames:
inc_file_path = os.path.join(dirpath, filename)
relative_path = os.path.relpath(inc_file_path, inc_path)
full_file_path = os.path.join(full_path, relative_path)
try:
inc_stat = os.stat(inc_file_path)
if os.path.exists(full_file_path):
full_stat = os.stat(full_file_path)
if inc_stat.st_ino == full_stat.st_ino:
continue
total_size += inc_stat.st_size
except FileNotFoundError:
continue
return total_size
def _get_incremental_size_system(self, inc_path: str) -> int:
self.logger.log(
f"Calculating incremental size for system backup: {inc_path}")
command_str = f"find '{inc_path}' -type f -links 1 -print0 | xargs -0 stat -c %s | awk '{{s+=$1}} END {{print s}}'"
try:
full_command = ['pkexec', 'bash', '-c', command_str]
result = subprocess.run(
full_command, capture_output=True, text=True, check=True)
output = result.stdout.strip()
if output:
return int(output)
else:
return 0
except (subprocess.CalledProcessError, FileNotFoundError, ValueError) as e:
self.logger.log(
f"Failed to calculate incremental system backup size: {e}")
return self._get_directory_size(inc_path)
def list_all_backups(self, base_dest_path: str, mount_if_needed: bool = True):
pybackup_dir = os.path.join(base_dest_path, "pybackup")
metadata_dir = os.path.join(pybackup_dir, "metadata")
if not os.path.isdir(metadata_dir):
return [], []
all_backups = []
for info_file_name in os.listdir(metadata_dir):
if not info_file_name.endswith(".json"):
continue
info_file_path = os.path.join(metadata_dir, info_file_name)
try:
with open(info_file_path, 'r') as f:
info_data = json.load(f)
is_encrypted = info_data.get("is_encrypted", False)
is_system = info_data.get("backup_type") == "system"
source_name = info_data.get("source_name", "N/A")
backup_dir_name = info_file_name.replace(".json", "")
profile_path = self._get_profile_path(
base_dest_path, is_system, source_name, is_encrypted)
full_path = os.path.join(profile_path, backup_dir_name)
if not os.path.isdir(full_path):
if not is_encrypted:
self.logger.log(
f"Metadata file found for {backup_dir_name} but data directory not found at {full_path}. Skipping.")
continue
if not self.encryption_manager.is_mounted(base_dest_path):
self.logger.log(
f"Mounting {base_dest_path} to check for encrypted backup data...")
self.encryption_manager.prepare_encrypted_destination(
base_dest_path, is_system, 0, self.app.queue if self.app else None)
if not os.path.isdir(full_path):
self.logger.log(
f"Data directory {full_path} still not found after mount attempt. Skipping.")
continue
dt_obj = datetime.datetime.fromisoformat(
info_data["creation_date"])
backup_type_display = info_data["mode"].capitalize()
if is_encrypted:
backup_type_display += " (Encrypted)"
backup_info = {
"date": dt_obj.strftime('%d-%m-%Y'),
"time": dt_obj.strftime('%H:%M:%S'),
"type": backup_type_display,
"size": info_data.get("size_readable", "N/A"),
"folder_name": backup_dir_name,
"full_path": full_path,
"info_file_path": info_file_path,
"comment": info_data.get("comment", ""),
"is_encrypted": is_encrypted,
"backup_type_base": info_data["mode"].capitalize(),
"datetime": dt_obj,
"source": source_name,
"is_system": is_system
}
all_backups.append(backup_info)
except (IOError, json.JSONDecodeError, KeyError) as e:
self.logger.log(
f"Could not read or parse info file {info_file_path}: {e}")
system_backups = sorted(
[b for b in all_backups if b["is_system"]], key=lambda x: x['datetime'], reverse=True)
user_backups = sorted([b for b in all_backups if not b["is_system"]],
key=lambda x: x['datetime'], reverse=True)
# Group system backups
grouped_system_backups = []
temp_group = []
for backup in reversed(system_backups):
if backup['backup_type_base'] == 'Full':
if temp_group:
grouped_system_backups.append(temp_group)
temp_group = [backup]
else:
if not temp_group:
grouped_system_backups.append([backup])
else:
temp_group.append(backup)
if temp_group:
grouped_system_backups.append(temp_group)
grouped_system_backups.sort(
key=lambda g: g[0]['datetime'], reverse=True)
final_system_list = [
item for group in grouped_system_backups for item in group]
# Group user backups by source, then by chains
user_backups_by_source = {}
for backup in user_backups:
source = backup.get('source', 'Unknown')
if source not in user_backups_by_source:
user_backups_by_source[source] = []
user_backups_by_source[source].append(backup)
final_user_list = []
for source in sorted(user_backups_by_source.keys()):
source_backups = user_backups_by_source[source]
grouped_source_backups = []
temp_group = []
for backup in reversed(source_backups):
if backup['backup_type_base'] == 'Full':
if temp_group:
grouped_source_backups.append(temp_group)
temp_group = [backup]
else:
if not temp_group:
grouped_source_backups.append([backup])
else:
temp_group.append(backup)
if temp_group:
grouped_source_backups.append(temp_group)
grouped_source_backups.sort(key=lambda g: g[0]['datetime'], reverse=True)
for group in grouped_source_backups:
final_user_list.extend(group)
return final_system_list, final_user_list
def _find_latest_backup(self, profile_path: str, source_name: str) -> Optional[str]:
self.logger.log(
f"Searching for latest full backup for source '{source_name}' in: {profile_path}")
full_backups = []
if os.path.isdir(profile_path):
pattern = re.compile(
rf"^(\d{{8}}-\d{{6}})_{re.escape(source_name)}_full_(plain|enc)$")
for item in os.listdir(profile_path):
item_path = os.path.join(profile_path, item)
if os.path.isdir(item_path) and pattern.match(item):
full_backups.append(item)
if not full_backups:
self.logger.log("No full backups found.")
return None
full_backups.sort(reverse=True)
latest_backup_dir = full_backups[0]
latest_backup_path = os.path.join(profile_path, latest_backup_dir)
self.logger.log(
f"Found latest full backup for --link-dest: {latest_backup_path}")
return latest_backup_path
def _create_info_json(self, base_dest_path: str, backup_dir_name: str, source_name: str, backup_type: str, mode: str, size_bytes: int, is_encrypted: bool, based_on: Optional[str] = None, comment: str = ""):
try:
metadata_path = os.path.join(
base_dest_path, "pybackup", "metadata")
os.makedirs(metadata_path, exist_ok=True)
info_file_path = os.path.join(
metadata_path, f"{backup_dir_name}.json")
if size_bytes > 0:
power = 1024
n = 0
power_labels = {0: 'B', 1: 'KB', 2: 'MB', 3: 'GB', 4: 'TB'}
display_size = size_bytes
while display_size >= power and n < len(power_labels) - 1:
display_size /= power
n += 1
size_str = f"{display_size:.2f} {power_labels[n]}"
else:
size_str = "0 B"
info_data = {
"creation_date": datetime.datetime.now().isoformat(),
"backup_type": backup_type,
"source_name": source_name,
"mode": mode,
"size_bytes": size_bytes,
"size_readable": size_str,
"is_encrypted": is_encrypted,
"based_on": based_on,
"comment": comment
}
with open(info_file_path, 'w') as f:
json.dump(info_data, f, indent=4)
self.logger.log(
f"Successfully created metadata file: {info_file_path}")
except Exception as e:
self.logger.log(f"Failed to create metadata file. Error: {e}")
def get_comment(self, info_file_path: str) -> str:
try:
with open(info_file_path, 'r') as f:
data = json.load(f)
return data.get("comment", "")
except (IOError, json.JSONDecodeError):
return ""
def update_comment(self, info_file_path: str, new_comment: str):
try:
with open(info_file_path, 'r') as f:
data = json.load(f)
data["comment"] = new_comment
with open(info_file_path, 'w') as f:
json.dump(data, f, indent=4)
self.logger.log(
f"Successfully updated comment in {info_file_path}")
except (IOError, json.JSONDecodeError) as e:
self.logger.log(
f"Failed to update comment in {info_file_path}: {e}")
def _execute_rsync(self, queue, command: List[str]):
transferred_size, total_size, stderr_output = 0, 0, ""
try:
env = os.environ.copy()
env['LC_ALL'] = 'C'
self.process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, preexec_fn=os.setsid, env=env)
if self.process.stdout:
for line in iter(self.process.stdout.readline, ''):
stripped_line = line.strip()
self.logger.log(f"Rsync stdout: {stripped_line}")
if '%' in stripped_line:
match = re.search(r'\s*(\d+)%\s+', stripped_line)
if match:
queue.put(('progress', int(match.group(1))))
elif stripped_line and not stripped_line.startswith(('sending', 'sent', 'total')):
queue.put(('file_update', stripped_line))
self.process.wait()
if self.process.stderr:
stderr_output = self.process.stderr.read()
if stderr_output:
self.logger.log(f"Rsync Stderr: {stderr_output.strip()}")
except FileNotFoundError:
self.logger.log(f"Error: '{command[0]}' not found.")
queue.put(('error', None))
except Exception as e:
self.logger.log(f"Rsync execution error: {e}")
queue.put(('error', None))
return transferred_size, total_size, stderr_output
def start_restore(self, source_path: str, dest_path: str, is_compressed: bool):
from queue import Queue
queue = self.app.queue if hasattr(self.app, 'queue') else Queue()
thread = threading.Thread(target=self._run_restore, args=(
queue, source_path, dest_path, is_compressed))
thread.daemon = True
thread.start()
def _run_restore(self, queue, source_path: str, dest_path: str, is_compressed: bool):
self.logger.log(f"Starting restore from {source_path} to {dest_path}")
status = 'error'
try:
source = source_path.rstrip('/') + '/'
script_content = f"tar -xzf '{source_path}' -C '{dest_path}'" if is_compressed else f"rsync -aAXHv '{source}' '{dest_path}'"
if self.encryption_manager._execute_as_root(script_content):
status = 'success'
else:
self.logger.log("Restore script failed.")
except Exception as e:
self.logger.log(
f"An unexpected error occurred during restore: {e}")
finally:
queue.put(
('completion', {'status': status, 'returncode': 0 if status == 'success' else 1}))
def get_scheduled_jobs(self) -> List[Dict[str, Any]]:
jobs_list = []
try:
user_cron = CronTab(user=True)
for job in user_cron:
if self.app_tag in job.comment:
details = self._parse_job_comment(job.comment)
if details:
jobs_list.append(details)
except Exception as e:
self.logger.log(f"Error loading cron jobs: {e}")
return jobs_list
def start_delete_backup(self, path_to_delete: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str] = None):
thread = threading.Thread(target=self._run_delete, args=(
path_to_delete, is_encrypted, is_system, base_dest_path, queue, password))
thread.daemon = True
thread.start()
def _run_delete(self, path_to_delete: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str]):
try:
backup_dir_name = os.path.basename(path_to_delete.rstrip('/'))
metadata_file_path = os.path.join(
base_dest_path, "pybackup", "metadata", f"{backup_dir_name}.json")
if is_encrypted:
self.logger.log(
f"Starting encrypted deletion for {path_to_delete}")
mount_point = self.encryption_manager.get_mount_point(
base_dest_path)
if not mount_point or not self.encryption_manager.is_mounted(base_dest_path):
if password:
mount_point = self.encryption_manager.mount_for_deletion(
base_dest_path, is_system, password)
else:
self.logger.log(
"Password not provided for encrypted deletion.")
if not mount_point:
self.logger.log("Failed to unlock container for deletion.")
queue.put(('deletion_complete', False))
return
internal_path_to_delete = os.path.join(
mount_point, os.path.basename(os.path.dirname(path_to_delete)), backup_dir_name)
success = False
if is_system:
script_content = f"rm -rf '{internal_path_to_delete}'"
success = self.encryption_manager._execute_as_root(
script_content)
else:
try:
if os.path.isdir(internal_path_to_delete):
shutil.rmtree(internal_path_to_delete)
self.logger.log(
f"Successfully deleted {internal_path_to_delete}")
success = True
except Exception as e:
self.logger.log(
f"Failed to delete user backup {internal_path_to_delete}: {e}")
success = False
if not success:
self.logger.log(
"Failed to delete files within encrypted container.")
queue.put(('deletion_complete', False))
return
elif is_system:
script_content = f"rm -rf '{path_to_delete}'"
if not self.encryption_manager._execute_as_root(script_content):
self.logger.log(f"Failed to delete {path_to_delete}")
queue.put(('deletion_complete', False))
return
else:
try:
if os.path.isdir(path_to_delete):
shutil.rmtree(path_to_delete)
self.logger.log(f"Successfully deleted {path_to_delete}")
except Exception as e:
self.logger.log(
f"Failed to delete unencrypted user backup {path_to_delete}: {e}")
queue.put(('deletion_complete', False))
return
if os.path.exists(metadata_file_path):
try:
os.remove(metadata_file_path)
self.logger.log(
f"Successfully deleted metadata file {metadata_file_path}")
queue.put(('deletion_complete', True))
except Exception as e:
self.logger.log(
f"Failed to delete metadata file {metadata_file_path}: {e}")
queue.put(('deletion_complete', False))
else:
queue.put(('deletion_complete', True))
except Exception as e:
self.logger.log(f"Error during threaded deletion: {e}")
queue.put(('deletion_complete', False))
def cancel_and_delete_privileged_backup(self, delete_path: str):
if not self.process or self.process.poll() is not None:
return
self.logger.log(
"Attempting to cancel backup and delete directory with root privileges...")
try:
pgid = os.getpgid(self.process.pid)
script_content = f"""
kill -SIGTERM -- -{pgid} || echo 'Process group not found or already terminated.'
if [ -n "{delete_path}" ] && [ "{delete_path}" != "/" ]; then rm -rf "{delete_path}"; fi
"""
self.encryption_manager._execute_as_root(script_content)
except Exception as e:
self.logger.log(
f"An error occurred during privileged cancel and delete: {e}")
def cancel_backup(self):
if self.process and self.process.poll() is None:
self.logger.log(
f"Attempting to cancel backup process with PID: {self.process.pid}")
try:
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
self.logger.log("Successfully sent SIGTERM to process group.")
except ProcessLookupError:
self.logger.log("Process already finished.")
except Exception as e:
self.logger.log(f"Error cancelling process: {e}")