Files
Py-Backup/core/backup_manager.py
Désiré Werner Menrath 94afeb5d45 fix: Correct incremental size calculations and rsync handling
This commit refactors the backup size calculation logic and fixes several bugs related to rsync argument passing and UI actions.

- refactor(core): Centralized all incremental backup size calculation logic within the BackupManager class. This removes duplicated and buggy code from the DataProcessing class and ensures both post-backup reporting and pre-backup estimation use a single, robust implementation.

- fix(backup): The pre-backup "accurate size calculation" now works correctly for all backup types. It uses an `rsync --dry-run` with a proper temporary directory and correctly handles root permissions for system backups.

- fix(backup): The post-backup size reporting for incremental system backups is now accurate. It uses a root-privileged command to inspect file links and calculate the true disk usage, avoiding permission errors.

- fix(backup): Corrected multiple quoting issues with rsync arguments (`--link-dest`, `--exclude-from`) that caused backups to fail or misbehave.

- fix(ui): Fixed a TypeError that occurred when deleting a system backup from the "Backup Content" view.

- feat(backup): Added the `-v` (verbose) flag to rsync for user backups to provide better feedback in the UI.
2025-09-09 13:07:56 +02:00

750 lines
33 KiB
Python

import subprocess
import os
import threading
import re
import signal
import datetime
import math
import shutil
import json
from typing import Optional, List, Dict, Any
from pathlib import Path
from crontab import CronTab
import tempfile
import stat
from core.pbp_app_config import AppConfig
from pyimage_ui.password_dialog import PasswordDialog
from core.encryption_manager import EncryptionManager
class BackupManager:
"""
Handles the logic for creating and managing backups using rsync.
"""
def __init__(self, logger, app=None):
self.logger = logger
self.process = None
self.app_tag = "# Py-Backup Job"
self.is_system_process = False
self.app = app
self.encryption_manager = EncryptionManager(logger, app)
self.inhibit_cookie = None
def _inhibit_screensaver(self):
if not shutil.which("gdbus"):
return
try:
self.logger.log(
"Attempting to inhibit screensaver and power management.")
command = [
"gdbus", "call", "--session", "--dest", "org.freedesktop.ScreenSaver",
"--object-path", "/org/freedesktop/ScreenSaver",
"--method", "org.freedesktop.ScreenSaver.Inhibit",
"Py-Backup", "Backup in progress"
]
result = subprocess.run(
command, capture_output=True, text=True, check=True)
match = re.search(r'uint32\s+(\d+)', result.stdout)
if match:
self.inhibit_cookie = int(match.group(1))
self.logger.log(
f"Successfully inhibited screensaver with cookie {self.inhibit_cookie}")
except Exception as e:
self.logger.log(
f"An unexpected error occurred while inhibiting screensaver: {e}")
def _uninhibit_screensaver(self):
if self.inhibit_cookie is None:
return
if not shutil.which("gdbus"):
return
try:
self.logger.log(
f"Attempting to uninhibit screensaver with cookie {self.inhibit_cookie}")
command = [
"gdbus", "call", "--session", "--dest", "org.freedesktop.ScreenSaver",
"--object-path", "/org/freedesktop/ScreenSaver",
"--method", "org.freedesktop.ScreenSaver.UnInhibit",
str(self.inhibit_cookie)
]
subprocess.run(command, capture_output=True, text=True, check=True)
self.logger.log("Successfully uninhibited screensaver.")
except Exception as e:
self.logger.log(f"Failed to uninhibit screensaver: {e}")
finally:
self.inhibit_cookie = None
def _get_profile_path(self, base_dest_path: str, is_system: bool, source_name: str, is_encrypted: bool) -> str:
"""Helper function to construct the path to a specific backup profile directory."""
pybackup_dir = os.path.join(base_dest_path, "pybackup")
if is_encrypted:
base_data_dir = self.encryption_manager.get_mount_point(
base_dest_path)
else:
base_data_dir = os.path.join(pybackup_dir, "unencrypted")
if is_system:
return os.path.join(base_data_dir, "system")
else:
return os.path.join(base_data_dir, "user", source_name)
def check_for_full_backup(self, dest_path: str, source_name: str, is_encrypted: bool) -> bool:
"""Checks if a full backup already exists for a given source."""
self.logger.log(
f"Checking for existing full backup for source '{source_name}' in '{dest_path}' (Encrypted: {is_encrypted})")
is_system = source_name == 'system'
profile_path = self._get_profile_path(
dest_path, is_system, source_name, is_encrypted)
if not os.path.isdir(profile_path):
self.logger.log(
f"Profile directory '{profile_path}' does not exist. No full backup found.")
return False
enc_suffix = "enc" if is_encrypted else "plain"
pattern = re.compile(
rf"\d{{8}}-\d{{6}}_{re.escape(source_name)}_full_{enc_suffix}")
for dirname in os.listdir(profile_path):
if pattern.match(dirname):
self.logger.log(
f"Found existing full backup directory: {dirname}")
return True
self.logger.log(
f"No existing full backup found for source '{source_name}'.")
return False
def start_backup(self, queue, source_path: str, dest_path: str, is_system: bool, source_name: str, is_dry_run: bool = False, exclude_files: Optional[List[Path]] = None, source_size: int = 0, is_compressed: bool = False, is_encrypted: bool = False, mode: str = "incremental", use_trash_bin: bool = False, no_trash_bin: bool = False):
self.is_system_process = is_system
self._inhibit_screensaver()
mount_point = None
if is_encrypted:
mount_point = self.encryption_manager.prepare_encrypted_destination(
dest_path, is_system, source_size, queue)
if not mount_point:
self.logger.log(
"Failed to prepare encrypted destination. Aborting backup.")
queue.put(
('completion', {'status': 'error', 'returncode': -1}))
self._uninhibit_screensaver()
return None
thread = threading.Thread(target=self._run_backup_path, args=(
queue, source_path, dest_path, is_system, source_name, is_dry_run, exclude_files, source_size, is_compressed, is_encrypted, mode, use_trash_bin, no_trash_bin))
thread.daemon = True
thread.start()
return thread
def _run_backup_path(self, queue, source_path: str, dest_path: str, is_system: bool, source_name: str, is_dry_run: bool, exclude_files: Optional[List[Path]], source_size: int, is_compressed: bool, is_encrypted: bool, mode: str, use_trash_bin: bool, no_trash_bin: bool):
base_dest_path = dest_path
try:
profile_path = self._get_profile_path(
base_dest_path, is_system, source_name, is_encrypted)
os.makedirs(profile_path, exist_ok=True)
latest_full_backup_path = self._find_latest_backup(
profile_path, source_name)
if mode == "incremental" and not latest_full_backup_path:
self.logger.log(
f"Mode is incremental, but no full backup found for source '{source_name}'. Forcing full backup.")
mode = "full"
now = datetime.datetime.now()
timestamp = now.strftime("%Y%m%d-%H%M%S")
encryption_suffix = "enc" if is_encrypted else "plain"
backup_dir_name = f"{timestamp}_{source_name}_{mode}_{encryption_suffix}"
rsync_dest = os.path.join(profile_path, backup_dir_name)
queue.put(('current_path', rsync_dest))
rsync_command_parts = [
'rsync', '-aAXHv'] if is_system else ['rsync', '-aLv']
if mode == "incremental" and latest_full_backup_path and not is_dry_run:
rsync_command_parts.append(
f"--link-dest={latest_full_backup_path}")
rsync_command_parts.extend(['--info=progress2'])
if exclude_files:
rsync_command_parts.extend(
[f"--exclude-from={f}" for f in exclude_files])
if AppConfig.MANUAL_EXCLUDE_LIST_PATH.exists():
rsync_command_parts.append(
f"--exclude-from={AppConfig.MANUAL_EXCLUDE_LIST_PATH}")
if is_dry_run:
rsync_command_parts.append('--dry-run')
if not is_system:
user_base_dir = os.path.dirname(profile_path)
trash_bin_path = os.path.join(user_base_dir, ".Trash")
if use_trash_bin:
rsync_command_parts.extend(
['--backup', f'--backup-dir=\'{trash_bin_path}\'', '--delete'])
elif no_trash_bin:
rsync_command_parts.append('--delete')
if use_trash_bin or no_trash_bin:
rsync_command_parts.append(
f"--exclude='{os.path.basename(trash_bin_path)}/'")
if is_system:
rsync_command_parts.extend(
[f"'{source_path}'", f"'{rsync_dest}'"])
rsync_cmd_str = ' '.join(rsync_command_parts)
full_system_cmd = f"mkdir -p '{rsync_dest}' && {rsync_cmd_str}"
command = ['pkexec', 'bash', '-c', full_system_cmd]
else:
rsync_command_parts.extend([source_path, rsync_dest])
os.makedirs(rsync_dest, exist_ok=True)
command = rsync_command_parts
self.logger.log(f"Executing command: {' '.join(command)}")
transferred_size, total_size, stderr = self._execute_rsync(
queue, command)
return_code = self.process.returncode if self.process else -1
if self.process:
status = 'success' if return_code == 0 else 'warning' if return_code in [
23, 24] else 'cancelled' if return_code in [143, -15, 15, -9] else 'error'
if status in ['success', 'warning'] and not is_dry_run:
if mode == 'incremental' and latest_full_backup_path:
if is_system:
final_size = self._get_incremental_size_system(
rsync_dest)
else:
final_size = self._get_incremental_size_user(
rsync_dest, latest_full_backup_path)
else:
final_size = self._get_directory_size(rsync_dest)
self._create_info_json(
base_dest_path=base_dest_path,
backup_dir_name=backup_dir_name,
source_name=source_name,
backup_type="system" if is_system else "user",
mode=mode,
size_bytes=final_size,
is_encrypted=is_encrypted,
based_on=os.path.basename(
latest_full_backup_path) if latest_full_backup_path and mode == 'incremental' else None
)
queue.put(
('completion', {'status': status, 'returncode': return_code}))
except Exception as e:
self.logger.log(f"Exception in _run_backup_path: {e}")
queue.put(('completion', {'status': 'error', 'returncode': -1}))
finally:
self._uninhibit_screensaver()
self.process = None
def estimate_incremental_size(self, source_path: str, is_system: bool, source_name: str, base_dest_path: str, is_encrypted: bool, exclude_files: list) -> int:
"""
Calculates the approximate size of an incremental backup using rsync's
dry-run feature.
"""
self.logger.log(
f"Estimating incremental backup size for source: {source_path}")
if not base_dest_path:
self.logger.log(
"No destination path provided, cannot estimate incremental size.")
return 0
profile_path = self._get_profile_path(
base_dest_path, is_system, source_name, is_encrypted)
latest_backup_path = self._find_latest_backup(
profile_path, source_name)
if not latest_backup_path:
self.logger.log(
"No previous full backup found. Accurate incremental size cannot be estimated, returning 0.")
return 0
command = []
if is_system:
command.extend(['pkexec', 'rsync', '-aAXHvn', '--stats'])
else:
command.extend(['rsync', '-avn', '--stats'])
command.append(f"--link-dest={latest_backup_path}")
if exclude_files:
for exclude_file in exclude_files:
command.append(f"--exclude-from={exclude_file}")
try:
with tempfile.TemporaryDirectory() as dummy_dest:
command.extend([source_path, dummy_dest])
self.logger.log(
f"Executing rsync dry-run command: {' '.join(command)}")
result = subprocess.run(
command, capture_output=True, text=True, check=False)
# rsync exit code 24 means some files vanished during transfer, which is okay for a dry-run estimate.
if result.returncode != 0 and result.returncode != 24:
self.logger.log(
f"Rsync dry-run failed with code {result.returncode}: {result.stderr}")
return 0
output = result.stdout + "\\n" + result.stderr
match = re.search(
r"Total transferred file size: ([\d,.]+) bytes", output)
if match:
size_str = match.group(1).replace(',', '').replace('.', '')
size_bytes = int(size_str)
self.logger.log(
f"Estimated incremental backup size: {size_bytes} bytes")
return size_bytes
else:
self.logger.log(
"Could not find 'Total transferred file size' in rsync output.")
self.logger.log(
f"Full rsync output for debugging:\\n{output}")
return 0
except FileNotFoundError:
self.logger.log("Error: 'rsync' or 'pkexec' command not found.")
return 0
except Exception as e:
self.logger.log(
f"An unexpected error occurred during incremental size estimation: {e}")
return 0
def _get_directory_size(self, path: str) -> int:
if not os.path.isdir(path):
return 0
try:
result = subprocess.run(
["du", "-sb", path], capture_output=True, text=True, check=True)
size_in_bytes = int(result.stdout.split()[0])
return size_in_bytes
except (subprocess.CalledProcessError, FileNotFoundError, ValueError, IndexError) as e:
self.logger.log(
f"Could not calculate directory size for {path} using du: {e}")
total_size = 0
try:
for dirpath, dirnames, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
if not os.path.islink(fp):
total_size += os.path.getsize(fp)
return total_size
except Exception as fallback_e:
self.logger.log(
f"Fallback size calculation also failed for {path}: {fallback_e}")
return 0
def _get_incremental_size_user(self, inc_path: str, full_path: str) -> int:
total_size = 0
for dirpath, _, filenames in os.walk(inc_path):
for filename in filenames:
inc_file_path = os.path.join(dirpath, filename)
relative_path = os.path.relpath(inc_file_path, inc_path)
full_file_path = os.path.join(full_path, relative_path)
try:
inc_stat = os.stat(inc_file_path)
if os.path.exists(full_file_path):
full_stat = os.stat(full_file_path)
if inc_stat.st_ino == full_stat.st_ino:
continue
total_size += inc_stat.st_size
except FileNotFoundError:
continue
return total_size
def _get_incremental_size_system(self, inc_path: str) -> int:
self.logger.log(
f"Calculating incremental size for system backup: {inc_path}")
command_str = f"find '{inc_path}' -type f -links 1 -print0 | xargs -0 stat -c %s | awk '{{s+=$1}} END {{print s}}'"
try:
full_command = ['pkexec', 'bash', '-c', command_str]
result = subprocess.run(
full_command, capture_output=True, text=True, check=True)
output = result.stdout.strip()
if output:
return int(output)
else:
return 0
except (subprocess.CalledProcessError, FileNotFoundError, ValueError) as e:
self.logger.log(
f"Failed to calculate incremental system backup size: {e}")
return self._get_directory_size(inc_path)
def list_all_backups(self, base_dest_path: str, mount_if_needed: bool = True):
pybackup_dir = os.path.join(base_dest_path, "pybackup")
metadata_dir = os.path.join(pybackup_dir, "metadata")
if not os.path.isdir(metadata_dir):
return [], []
all_backups = []
for info_file_name in os.listdir(metadata_dir):
if not info_file_name.endswith(".json"):
continue
info_file_path = os.path.join(metadata_dir, info_file_name)
try:
with open(info_file_path, 'r') as f:
info_data = json.load(f)
is_encrypted = info_data.get("is_encrypted", False)
is_system = info_data.get("backup_type") == "system"
source_name = info_data.get("source_name", "N/A")
backup_dir_name = info_file_name.replace(".json", "")
profile_path = self._get_profile_path(
base_dest_path, is_system, source_name, is_encrypted)
full_path = os.path.join(profile_path, backup_dir_name)
if not os.path.isdir(full_path):
if not is_encrypted:
self.logger.log(
f"Metadata file found for {backup_dir_name} but data directory not found at {full_path}. Skipping.")
continue
if not self.encryption_manager.is_mounted(base_dest_path):
self.logger.log(
f"Mounting {base_dest_path} to check for encrypted backup data...")
self.encryption_manager.prepare_encrypted_destination(
base_dest_path, is_system, 0, self.app.queue if self.app else None)
if not os.path.isdir(full_path):
self.logger.log(
f"Data directory {full_path} still not found after mount attempt. Skipping.")
continue
dt_obj = datetime.datetime.fromisoformat(
info_data["creation_date"])
backup_type_display = info_data["mode"].capitalize()
if is_encrypted:
backup_type_display += " (Encrypted)"
backup_info = {
"date": dt_obj.strftime('%d-%m-%Y'),
"time": dt_obj.strftime('%H:%M:%S'),
"type": backup_type_display,
"size": info_data.get("size_readable", "N/A"),
"folder_name": backup_dir_name,
"full_path": full_path,
"info_file_path": info_file_path,
"comment": info_data.get("comment", ""),
"is_encrypted": is_encrypted,
"backup_type_base": info_data["mode"].capitalize(),
"datetime": dt_obj,
"source": source_name,
"is_system": is_system
}
all_backups.append(backup_info)
except (IOError, json.JSONDecodeError, KeyError) as e:
self.logger.log(
f"Could not read or parse info file {info_file_path}: {e}")
system_backups = sorted(
[b for b in all_backups if b["is_system"]], key=lambda x: x['datetime'], reverse=True)
user_backups = sorted([b for b in all_backups if not b["is_system"]],
key=lambda x: x['datetime'], reverse=True)
grouped_system_backups = []
temp_group = []
for backup in reversed(system_backups):
if backup['backup_type_base'] == 'Full':
if temp_group:
grouped_system_backups.append(temp_group)
temp_group = [backup]
else:
if not temp_group:
grouped_system_backups.append([backup])
else:
temp_group.append(backup)
if temp_group:
grouped_system_backups.append(temp_group)
grouped_system_backups.sort(
key=lambda g: g[0]['datetime'], reverse=True)
final_system_list = [
item for group in grouped_system_backups for item in group]
return final_system_list, user_backups
def _find_latest_backup(self, profile_path: str, source_name: str) -> Optional[str]:
self.logger.log(
f"Searching for latest full backup for source '{source_name}' in: {profile_path}")
full_backups = []
if os.path.isdir(profile_path):
pattern = re.compile(
rf"^(\d{{8}}-\d{{6}})_{re.escape(source_name)}_full_(plain|enc)$")
for item in os.listdir(profile_path):
item_path = os.path.join(profile_path, item)
if os.path.isdir(item_path) and pattern.match(item):
full_backups.append(item)
if not full_backups:
self.logger.log("No full backups found.")
return None
full_backups.sort(reverse=True)
latest_backup_dir = full_backups[0]
latest_backup_path = os.path.join(profile_path, latest_backup_dir)
self.logger.log(
f"Found latest full backup for --link-dest: {latest_backup_path}")
return latest_backup_path
def _create_info_json(self, base_dest_path: str, backup_dir_name: str, source_name: str, backup_type: str, mode: str, size_bytes: int, is_encrypted: bool, based_on: Optional[str] = None, comment: str = ""):
try:
metadata_path = os.path.join(
base_dest_path, "pybackup", "metadata")
os.makedirs(metadata_path, exist_ok=True)
info_file_path = os.path.join(
metadata_path, f"{backup_dir_name}.json")
if size_bytes > 0:
power = 1024
n = 0
power_labels = {0: 'B', 1: 'KB', 2: 'MB', 3: 'GB', 4: 'TB'}
display_size = size_bytes
while display_size >= power and n < len(power_labels) - 1:
display_size /= power
n += 1
size_str = f"{display_size:.2f} {power_labels[n]}"
else:
size_str = "0 B"
info_data = {
"creation_date": datetime.datetime.now().isoformat(),
"backup_type": backup_type,
"source_name": source_name,
"mode": mode,
"size_bytes": size_bytes,
"size_readable": size_str,
"is_encrypted": is_encrypted,
"based_on": based_on,
"comment": comment
}
with open(info_file_path, 'w') as f:
json.dump(info_data, f, indent=4)
self.logger.log(
f"Successfully created metadata file: {info_file_path}")
except Exception as e:
self.logger.log(f"Failed to create metadata file. Error: {e}")
def get_comment(self, info_file_path: str) -> str:
try:
with open(info_file_path, 'r') as f:
data = json.load(f)
return data.get("comment", "")
except (IOError, json.JSONDecodeError):
return ""
def update_comment(self, info_file_path: str, new_comment: str):
try:
with open(info_file_path, 'r') as f:
data = json.load(f)
data["comment"] = new_comment
with open(info_file_path, 'w') as f:
json.dump(data, f, indent=4)
self.logger.log(
f"Successfully updated comment in {info_file_path}")
except (IOError, json.JSONDecodeError) as e:
self.logger.log(
f"Failed to update comment in {info_file_path}: {e}")
def _execute_rsync(self, queue, command: List[str]):
transferred_size, total_size, stderr_output = 0, 0, ""
try:
env = os.environ.copy()
env['LC_ALL'] = 'C'
self.process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, preexec_fn=os.setsid, env=env)
if self.process.stdout:
for line in iter(self.process.stdout.readline, ''):
stripped_line = line.strip()
self.logger.log(f"Rsync stdout: {stripped_line}")
if '%' in stripped_line:
match = re.search(r'\s*(\d+)%\s+', stripped_line)
if match:
queue.put(('progress', int(match.group(1))))
elif stripped_line and not stripped_line.startswith(('sending', 'sent', 'total')):
queue.put(('file_update', stripped_line))
self.process.wait()
if self.process.stderr:
stderr_output = self.process.stderr.read()
if stderr_output:
self.logger.log(f"Rsync Stderr: {stderr_output.strip()}")
except FileNotFoundError:
self.logger.log(f"Error: '{command[0]}' not found.")
queue.put(('error', None))
except Exception as e:
self.logger.log(f"Rsync execution error: {e}")
queue.put(('error', None))
return transferred_size, total_size, stderr_output
def start_restore(self, source_path: str, dest_path: str, is_compressed: bool):
from queue import Queue
queue = self.app.queue if hasattr(self.app, 'queue') else Queue()
thread = threading.Thread(target=self._run_restore, args=(
queue, source_path, dest_path, is_compressed))
thread.daemon = True
thread.start()
def _run_restore(self, queue, source_path: str, dest_path: str, is_compressed: bool):
self.logger.log(f"Starting restore from {source_path} to {dest_path}")
status = 'error'
try:
source = source_path.rstrip('/') + '/'
script_content = f"tar -xzf '{source_path}' -C '{dest_path}'" if is_compressed else f"rsync -aAXHv '{source}' '{dest_path}'"
if self.encryption_manager._execute_as_root(script_content):
status = 'success'
else:
self.logger.log("Restore script failed.")
except Exception as e:
self.logger.log(
f"An unexpected error occurred during restore: {e}")
finally:
queue.put(
('completion', {'status': status, 'returncode': 0 if status == 'success' else 1}))
def get_scheduled_jobs(self) -> List[Dict[str, Any]]:
jobs_list = []
try:
user_cron = CronTab(user=True)
for job in user_cron:
if self.app_tag in job.comment:
details = self._parse_job_comment(job.comment)
if details:
jobs_list.append(details)
except Exception as e:
self.logger.log(f"Error loading cron jobs: {e}")
return jobs_list
def start_delete_backup(self, path_to_delete: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str] = None):
thread = threading.Thread(target=self._run_delete, args=(
path_to_delete, is_encrypted, is_system, base_dest_path, queue, password))
thread.daemon = True
thread.start()
def _run_delete(self, path_to_delete: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str]):
try:
backup_dir_name = os.path.basename(path_to_delete.rstrip('/'))
metadata_file_path = os.path.join(
base_dest_path, "pybackup", "metadata", f"{backup_dir_name}.json")
if is_encrypted:
self.logger.log(
f"Starting encrypted deletion for {path_to_delete}")
mount_point = self.encryption_manager.get_mount_point(
base_dest_path)
if not mount_point or not self.encryption_manager.is_mounted(base_dest_path):
if password:
mount_point = self.encryption_manager.mount_for_deletion(
base_dest_path, is_system, password)
else:
self.logger.log(
"Password not provided for encrypted deletion.")
if not mount_point:
self.logger.log("Failed to unlock container for deletion.")
queue.put(('deletion_complete', False))
return
internal_path_to_delete = os.path.join(
mount_point, os.path.basename(os.path.dirname(path_to_delete)), backup_dir_name)
success = False
if is_system:
script_content = f"rm -rf '{internal_path_to_delete}'"
success = self.encryption_manager._execute_as_root(
script_content)
else:
try:
if os.path.isdir(internal_path_to_delete):
shutil.rmtree(internal_path_to_delete)
self.logger.log(
f"Successfully deleted {internal_path_to_delete}")
success = True
except Exception as e:
self.logger.log(
f"Failed to delete user backup {internal_path_to_delete}: {e}")
success = False
if not success:
self.logger.log(
"Failed to delete files within encrypted container.")
queue.put(('deletion_complete', False))
return
elif is_system:
script_content = f"rm -rf '{path_to_delete}'"
if not self.encryption_manager._execute_as_root(script_content):
self.logger.log(f"Failed to delete {path_to_delete}")
queue.put(('deletion_complete', False))
return
else:
try:
if os.path.isdir(path_to_delete):
shutil.rmtree(path_to_delete)
self.logger.log(f"Successfully deleted {path_to_delete}")
except Exception as e:
self.logger.log(
f"Failed to delete unencrypted user backup {path_to_delete}: {e}")
queue.put(('deletion_complete', False))
return
if os.path.exists(metadata_file_path):
try:
os.remove(metadata_file_path)
self.logger.log(
f"Successfully deleted metadata file {metadata_file_path}")
queue.put(('deletion_complete', True))
except Exception as e:
self.logger.log(
f"Failed to delete metadata file {metadata_file_path}: {e}")
queue.put(('deletion_complete', False))
else:
queue.put(('deletion_complete', True))
except Exception as e:
self.logger.log(f"Error during threaded deletion: {e}")
queue.put(('deletion_complete', False))
def cancel_and_delete_privileged_backup(self, delete_path: str):
if not self.process or self.process.poll() is not None:
return
self.logger.log(
"Attempting to cancel backup and delete directory with root privileges...")
try:
pgid = os.getpgid(self.process.pid)
script_content = f"""
kill -SIGTERM -- -{pgid} || echo 'Process group not found or already terminated.'
if [ -n "{delete_path}" ] && [ "{delete_path}" != "/" ]; then rm -rf "{delete_path}"; fi
"""
self.encryption_manager._execute_as_root(script_content)
except Exception as e:
self.logger.log(
f"An error occurred during privileged cancel and delete: {e}")
def cancel_backup(self):
if self.process and self.process.poll() is None:
self.logger.log(
f"Attempting to cancel backup process with PID: {self.process.pid}")
try:
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
self.logger.log("Successfully sent SIGTERM to process group.")
except ProcessLookupError:
self.logger.log("Process already finished.")
except Exception as e:
self.logger.log(f"Error cancelling process: {e}")