Files
Py-Backup/core/backup_manager.py

859 lines
39 KiB
Python

import subprocess
import os
import threading
import re
import signal
import datetime
import math
import shutil
import json
from typing import Optional, List, Dict, Any
from pathlib import Path
from crontab import CronTab
import tempfile
import stat
import shutil
from core.pbp_app_config import AppConfig
from pyimage_ui.password_dialog import PasswordDialog
from core.encryption_manager import EncryptionManager
class BackupManager:
"""
Handles the logic for creating and managing backups using rsync.
"""
def __init__(self, logger, app=None):
self.logger = logger
self.process = None
self.app_tag = "# Py-Backup Job"
self.is_system_process = False
self.app = app
self.encryption_manager = EncryptionManager(logger, app)
self.inhibit_cookie = None
def _inhibit_screensaver(self):
if not shutil.which("gdbus"):
return
try:
self.logger.log(
"Attempting to inhibit screensaver and power management.")
command = [
"gdbus", "call", "--session", "--dest", "org.freedesktop.ScreenSaver",
"--object-path", "/org/freedesktop/ScreenSaver",
"--method", "org.freedesktop.ScreenSaver.Inhibit",
"Py-Backup", "Backup in progress"
]
result = subprocess.run(
command, capture_output=True, text=True, check=True)
match = re.search(r'uint32\s+(\d+)', result.stdout)
if match:
self.inhibit_cookie = int(match.group(1))
self.logger.log(
f"Successfully inhibited screensaver with cookie {self.inhibit_cookie}")
except Exception as e:
self.logger.log(
f"An unexpected error occurred while inhibiting screensaver: {e}")
def _uninhibit_screensaver(self):
if self.inhibit_cookie is None:
return
if not shutil.which("gdbus"):
return
try:
self.logger.log(
f"Attempting to uninhibit screensaver with cookie {self.inhibit_cookie}")
command = [
"gdbus", "call", "--session", "--dest", "org.freedesktop.ScreenSaver",
"--object-path", "/org/freedesktop/ScreenSaver",
"--method", "org.freedesktop.ScreenSaver.UnInhibit",
str(self.inhibit_cookie)
]
subprocess.run(command, capture_output=True, text=True, check=True)
self.logger.log("Successfully uninhibited screensaver.")
except Exception as e:
self.logger.log(f"Failed to uninhibit screensaver: {e}")
finally:
self.inhibit_cookie = None
def check_for_full_backup(self, dest_path: str, source_name: str, is_encrypted: bool) -> bool:
"""Checks if a full backup already exists for a given source in the new flat structure."""
self.logger.log(
f"Checking for existing full backup for source '{source_name}' in '{dest_path}' (Encrypted: {is_encrypted})")
pybackup_dir = os.path.join(dest_path, "pybackup")
scan_dir = self.encryption_manager.get_mount_point(
dest_path) if is_encrypted else pybackup_dir
if not scan_dir or not os.path.isdir(scan_dir):
self.logger.log(
f"Scan directory '{scan_dir}' does not exist. No full backup found.")
return False
enc_suffix = "enc" if is_encrypted else "plain"
# Pattern matches: 20250908-133000_system_full_plain
pattern = re.compile(
rf"\d{{8}}-\d{{6}}_{re.escape(source_name)}_full_{enc_suffix}")
for dirname in os.listdir(scan_dir):
if pattern.match(dirname):
self.logger.log(
f"Found existing full backup directory: {dirname}")
return True
self.logger.log(
f"No existing full backup found for source '{source_name}'.")
return False
def _find_latest_backup(self, rsync_base_dir: str, is_system: bool, source_name: str) -> Optional[str]:
self.logger.log(
f"Searching for latest full backup for source '{source_name}' in: {rsync_base_dir}")
full_backups = []
if os.path.isdir(rsync_base_dir):
# Pattern matches any full backup for the given source name, encrypted or not
pattern = re.compile(
rf"^(\d{{8}}-\d{{6}})_{re.escape(source_name)}_full_(plain|enc)$")
for item in os.listdir(rsync_base_dir):
item_path = os.path.join(rsync_base_dir, item)
if os.path.isdir(item_path) and pattern.match(item):
full_backups.append(item)
if not full_backups:
self.logger.log("No full backups found.")
return None
full_backups.sort(reverse=True)
latest_backup_dir = full_backups[0]
latest_backup_path = os.path.join(rsync_base_dir, latest_backup_dir)
self.logger.log(
f"Found latest full backup for --link-dest: {latest_backup_path}")
return latest_backup_path
def start_backup(self, queue, source_path: str, dest_path: str, is_system: bool, source_name: str, is_dry_run: bool = False, exclude_files: Optional[List[Path]] = None, source_size: int = 0, is_compressed: bool = False, is_encrypted: bool = False, mode: str = "incremental", use_trash_bin: bool = False, no_trash_bin: bool = False):
self.is_system_process = is_system
self._inhibit_screensaver()
mount_point = None
if is_encrypted:
mount_point = self.encryption_manager.prepare_encrypted_destination(
dest_path, is_system, source_size, queue)
if not mount_point:
self.logger.log(
"Failed to prepare encrypted destination. Aborting backup.")
queue.put(
('completion', {'status': 'error', 'returncode': -1}))
self._uninhibit_screensaver()
return None
thread = threading.Thread(target=self._run_backup_path, args=(
queue, source_path, dest_path, is_system, source_name, is_dry_run, exclude_files, source_size, is_compressed, is_encrypted, mode, mount_point, use_trash_bin, no_trash_bin))
thread.daemon = True
thread.start()
return thread
def _run_backup_path(self, queue, source_path: str, dest_path: str, is_system: bool, source_name: str, is_dry_run: bool, exclude_files: Optional[List[Path]], source_size: int, is_compressed: bool, is_encrypted: bool, mode: str, mount_point: Optional[str], use_trash_bin: bool, no_trash_bin: bool):
base_dest_path = dest_path # The user-selected destination path
rsync_dest = None # Initialize to None
try:
pybackup_dir = os.path.join(base_dest_path, "pybackup")
if not os.path.isdir(pybackup_dir):
os.makedirs(pybackup_dir, exist_ok=True)
rsync_base_dir = mount_point if is_encrypted else pybackup_dir
latest_full_backup_path = self._find_latest_backup(
rsync_base_dir, is_system, source_name)
if mode == "incremental" and not latest_full_backup_path:
self.logger.log(
f"Mode is incremental, but no full backup found for source '{source_name}'. Forcing full backup.")
mode = "full"
now = datetime.datetime.now()
timestamp = now.strftime("%Y%m%d-%H%M%S")
encryption_suffix = "enc" if is_encrypted else "plain"
backup_dir_name = f"{timestamp}_{source_name}_{mode}_{encryption_suffix}"
rsync_dest = os.path.join(rsync_base_dir, backup_dir_name)
# Send the determined path back to the main thread via the queue
queue.put(('current_path', rsync_dest))
# --- Rsync command construction ---
rsync_command_parts = [
'rsync', '-aAXHv'] if is_system else ['rsync', '-aL']
if mode == "incremental" and latest_full_backup_path and not is_dry_run:
rsync_command_parts.append(
f"--link-dest='{latest_full_backup_path}'")
rsync_command_parts.extend(['--info=progress2'])
if exclude_files:
rsync_command_parts.extend(
[f"--exclude-from='{f}'" for f in exclude_files])
if AppConfig.MANUAL_EXCLUDE_LIST_PATH.exists():
rsync_command_parts.append(
f"--exclude-from='{AppConfig.MANUAL_EXCLUDE_LIST_PATH}'")
if is_dry_run:
rsync_command_parts.append('--dry-run')
if not is_system:
trash_bin_path = os.path.join(rsync_base_dir, ".Trash")
if use_trash_bin:
rsync_command_parts.extend(
['--backup', f'--backup-dir=\'{trash_bin_path}\'', '--delete'])
elif no_trash_bin:
rsync_command_parts.append('--delete')
if use_trash_bin or no_trash_bin:
rsync_command_parts.append(
f"--exclude='{os.path.basename(trash_bin_path)}/'")
rsync_command_parts.extend([f"'{source_path}'", f"'{rsync_dest}'"])
if is_system:
# Restore the working single-password solution
rsync_cmd_str = ' '.join(rsync_command_parts)
# Important: Use single quotes around paths to handle spaces, and use -p with mkdir.
full_system_cmd = f"mkdir -p '{rsync_dest}' && {rsync_cmd_str}"
command = ['pkexec', 'bash', '-c', full_system_cmd]
else:
os.makedirs(rsync_dest, exist_ok=True)
command = rsync_command_parts
self.logger.log(f"Executing command: {' '.join(command)}")
transferred_size, total_size, stderr = self._execute_rsync(
queue, command)
return_code = self.process.returncode if self.process else -1
if self.process:
status = 'success' if return_code == 0 else 'warning' if return_code in [
23, 24] else 'cancelled' if return_code in [143, -15, 15, -9] else 'error'
if status in ['success', 'warning'] and not is_dry_run:
# After a successful backup, get the true size of the destination directory
final_size = self._get_directory_size(rsync_dest)
self._create_info_json(
base_dest_path=base_dest_path,
backup_dir_name=backup_dir_name,
source_name=source_name,
backup_type="system" if is_system else "user",
mode=mode,
size_bytes=final_size,
is_encrypted=is_encrypted,
based_on=os.path.basename(
latest_full_backup_path) if latest_full_backup_path and mode == 'incremental' else None
)
queue.put(
('completion', {'status': status, 'returncode': return_code}))
except Exception as e:
self.logger.log(f"Exception in _run_backup_path: {e}")
queue.put(('completion', {'status': 'error', 'returncode': -1}))
finally:
self._uninhibit_screensaver()
self.process = None
def _get_directory_size(self, path: str) -> int:
"""Calculates the total disk space used by a directory using `du`. """
if not os.path.isdir(path):
return 0
try:
# Use `du -sb` to get the real disk usage in bytes, correctly handling hard links.
result = subprocess.run(["du", "-sb", path], capture_output=True, text=True, check=True)
# Output is like "12345\t/path/to/dir", so we split and take the first part.
size_in_bytes = int(result.stdout.split()[0])
return size_in_bytes
except (subprocess.CalledProcessError, FileNotFoundError, ValueError, IndexError) as e:
self.logger.log(f"Could not calculate directory size for {path} using du: {e}")
# Fallback to a simpler, less accurate method if du fails
total_size = 0
try:
for dirpath, dirnames, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
if not os.path.islink(fp):
total_size += os.path.getsize(fp)
return total_size
except Exception as fallback_e:
self.logger.log(f"Fallback size calculation also failed for {path}: {fallback_e}")
return 0
def _prepare_and_get_mounted_path(self, base_dest_path: str, is_system: bool, mount_if_needed: bool) -> Optional[str]:
if not self.encryption_manager.is_encrypted(base_dest_path):
return None
if not self.encryption_manager.is_mounted(base_dest_path):
if mount_if_needed:
if not self.encryption_manager.prepare_encrypted_destination(base_dest_path, is_system=is_system, source_size=0, queue=self.app.queue):
return None
else:
return None
if self.encryption_manager.is_mounted(base_dest_path):
pybackup_dir = os.path.join(base_dest_path, "pybackup")
return os.path.join(pybackup_dir, "encrypted")
return None
def list_all_backups(self, base_dest_path: str, mount_if_needed: bool = True):
is_encrypted_dest = self.encryption_manager.is_encrypted(
base_dest_path)
scan_dir = self.encryption_manager.get_mount_point(
base_dest_path) if is_encrypted_dest else os.path.join(base_dest_path, "pybackup")
if not scan_dir or not os.path.isdir(scan_dir):
# Try to mount if it wasn't already
if is_encrypted_dest and mount_if_needed:
if not self.encryption_manager.prepare_encrypted_destination(base_dest_path, is_system=False, source_size=0, queue=self.app.queue):
return [], [] # Mount failed
scan_dir = self.encryption_manager.get_mount_point(
base_dest_path)
if not scan_dir or not os.path.isdir(scan_dir):
return [], []
else:
return [], []
metadata_dir = os.path.join(base_dest_path, "pybackup", "metadata")
if not os.path.isdir(metadata_dir):
return [], []
all_backups = []
for backup_dir_name in os.listdir(scan_dir):
backup_dir_path = os.path.join(scan_dir, backup_dir_name)
if not os.path.isdir(backup_dir_path):
continue
info_file_path = os.path.join(
metadata_dir, f"{backup_dir_name}.json")
if os.path.isfile(info_file_path):
try:
with open(info_file_path, 'r') as f:
info_data = json.load(f)
dt_obj = datetime.datetime.fromisoformat(
info_data["creation_date"])
backup_type_display = info_data["mode"].capitalize()
if info_data.get("is_compressed", False):
backup_type_display += " (Compressed)"
if info_data.get("is_encrypted", False):
backup_type_display += " (Encrypted)"
backup_info = {
"date": dt_obj.strftime('%d-%m-%Y'),
"time": dt_obj.strftime('%H:%M:%S'),
"type": backup_type_display,
"size": info_data.get("size_readable", "N/A"),
"folder_name": backup_dir_name,
"full_path": backup_dir_path,
"info_file_path": info_file_path,
"comment": info_data.get("comment", ""),
"is_encrypted": info_data.get("is_encrypted", False),
"is_compressed": info_data.get("is_compressed", False),
"backup_type_base": info_data["mode"].capitalize(),
"datetime": dt_obj,
"source": info_data.get("source_name", "N/A"),
"is_system": info_data.get("backup_type") == "system"
}
all_backups.append(backup_info)
except (IOError, json.JSONDecodeError, KeyError) as e:
self.logger.log(
f"Could not read or parse info file {info_file_path}: {e}")
# Separate into system and user backups for correct sorting and display
system_backups = sorted(
[b for b in all_backups if b["is_system"]], key=lambda x: x['datetime'], reverse=True)
user_backups = sorted([b for b in all_backups if not b["is_system"]],
key=lambda x: x['datetime'], reverse=True)
# Further group system backups by full/inc chains
grouped_system_backups = []
temp_group = []
# Sort from oldest to newest for grouping
for backup in reversed(system_backups):
if backup['backup_type_base'] == 'Full':
if temp_group:
grouped_system_backups.append(temp_group)
temp_group = [backup]
else:
if not temp_group: # Orphaned incremental
grouped_system_backups.append([backup])
else:
temp_group.append(backup)
if temp_group:
grouped_system_backups.append(temp_group)
# Sort groups by the date of the first element (the full backup), descending
grouped_system_backups.sort(
key=lambda g: g[0]['datetime'], reverse=True)
final_system_list = [
item for group in grouped_system_backups for item in group]
return final_system_list, user_backups
def _find_latest_backup(self, rsync_base_dir: str, is_system: bool, source_name: str) -> Optional[str]:
self.logger.log(
f"Searching for latest full backup for source '{source_name}' in: {rsync_base_dir}")
full_backups = []
if os.path.isdir(rsync_base_dir):
# Pattern matches any full backup for the given source name, encrypted or not
pattern = re.compile(
rf"^(\d{{8}}-\d{{6}})_{re.escape(source_name)}_full_(plain|enc)$")
for item in os.listdir(rsync_base_dir):
item_path = os.path.join(rsync_base_dir, item)
if os.path.isdir(item_path) and pattern.match(item):
full_backups.append(item)
if not full_backups:
self.logger.log("No full backups found.")
return None
full_backups.sort(reverse=True)
latest_backup_dir = full_backups[0]
latest_backup_path = os.path.join(rsync_base_dir, latest_backup_dir)
self.logger.log(
f"Found latest full backup for --link-dest: {latest_backup_path}")
return latest_backup_path
def _create_info_json(self, base_dest_path: str, backup_dir_name: str, source_name: str, backup_type: str, mode: str, size_bytes: int, is_encrypted: bool, based_on: Optional[str] = None, comment: str = ""):
"""Creates a backup_info.json file inside the user-writable metadata directory."""
try:
# All metadata files go into a single, flat metadata directory for simplicity
metadata_path = os.path.join(base_dest_path, "pybackup", "metadata")
os.makedirs(metadata_path, exist_ok=True)
info_file_path = os.path.join(metadata_path, f"{backup_dir_name}.json")
# Format size for human-readable display
if size_bytes > 0:
power = 1024
n = 0
power_labels = {0: 'B', 1: 'KB', 2: 'MB', 3: 'GB', 4: 'TB'}
display_size = size_bytes
while display_size >= power and n < len(power_labels) - 1:
display_size /= power
n += 1
size_str = f"{display_size:.2f} {power_labels[n]}"
else:
size_str = "0 B"
info_data = {
"creation_date": datetime.datetime.now().isoformat(),
"backup_type": backup_type,
"source_name": source_name,
"mode": mode,
"size_bytes": size_bytes,
"size_readable": size_str,
"is_encrypted": is_encrypted,
"based_on": based_on,
"comment": comment
}
with open(info_file_path, 'w') as f:
json.dump(info_data, f, indent=4)
self.logger.log(f"Successfully created metadata file: {info_file_path}")
except Exception as e:
self.logger.log(f"Failed to create metadata file. Error: {e}")
def get_comment(self, info_file_path: str) -> str:
"""Reads the comment from a backup_info.json file."""
try:
with open(info_file_path, 'r') as f:
data = json.load(f)
return data.get("comment", "")
except (IOError, json.JSONDecodeError):
return ""
def update_comment(self, info_file_path: str, new_comment: str):
"""Updates the comment in a backup_info.json file."""
try:
with open(info_file_path, 'r') as f:
data = json.load(f)
data["comment"] = new_comment
with open(info_file_path, 'w') as f:
json.dump(data, f, indent=4)
self.logger.log(f"Successfully updated comment in {info_file_path}")
except (IOError, json.JSONDecodeError) as e:
self.logger.log(f"Failed to update comment in {info_file_path}: {e}")
def _execute_rsync(self, queue, command: List[str]):
transferred_size, total_size, stderr_output = 0, 0, ""
try:
env = os.environ.copy()
env['LC_ALL'] = 'C'
self.process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, preexec_fn=os.setsid, env=env)
if self.process.stdout:
for line in iter(self.process.stdout.readline, ''):
stripped_line = line.strip()
self.logger.log(f"Rsync stdout: {stripped_line}")
if '%' in stripped_line:
match = re.search(r'\s*(\d+)%\s+', stripped_line)
if match:
queue.put(('progress', int(match.group(1))))
elif stripped_line and not stripped_line.startswith(('sending', 'sent', 'total')):
queue.put(('file_update', stripped_line))
self.process.wait()
if self.process.stderr:
stderr_output = self.process.stderr.read()
if stderr_output:
self.logger.log(f"Rsync Stderr: {stderr_output.strip()}")
except FileNotFoundError:
self.logger.log(f"Error: '{command[0]}' not found.")
queue.put(('error', None))
except Exception as e:
self.logger.log(f"Rsync execution error: {e}")
queue.put(('error', None))
return transferred_size, total_size, stderr_output
def start_restore(self, source_path: str, dest_path: str, is_compressed: bool):
from queue import Queue
queue = self.app.queue if hasattr(self.app, 'queue') else Queue()
thread = threading.Thread(target=self._run_restore, args=(
queue, source_path, dest_path, is_compressed))
thread.daemon = True
thread.start()
def _run_restore(self, queue, source_path: str, dest_path: str, is_compressed: bool):
self.logger.log(f"Starting restore from {source_path} to {dest_path}")
status = 'error'
try:
source = source_path.rstrip('/') + '/'
script_content = f"tar -xzf '{source_path}' -C '{dest_path}'" if is_compressed else f"rsync -aAXHv '{source}' '{dest_path}'"
if self.encryption_manager._execute_as_root(script_content):
status = 'success'
else:
self.logger.log("Restore script failed.")
except Exception as e:
self.logger.log(
f"An unexpected error occurred during restore: {e}")
finally:
queue.put(
('completion', {'status': status, 'returncode': 0 if status == 'success' else 1}))
def get_scheduled_jobs(self) -> List[Dict[str, Any]]:
jobs_list = []
try:
user_cron = CronTab(user=True)
for job in user_cron:
if self.app_tag in job.comment:
details = self._parse_job_comment(job.comment)
if details:
jobs_list.append(details)
except Exception as e:
self.logger.log(f"Error loading cron jobs: {e}")
return jobs_list
def start_delete_backup(self, path_to_delete: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str] = None):
thread = threading.Thread(target=self._run_delete, args=(
path_to_delete, is_encrypted, is_system, base_dest_path, queue, password))
thread.daemon = True
thread.start()
def _run_delete(self, path_to_delete: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str]):
try:
# Determine metadata file path before any deletion
backup_dir_name = os.path.basename(path_to_delete.rstrip('/'))
metadata_file_path = os.path.join(
base_dest_path, "pybackup", "metadata", f"{backup_dir_name}.json")
# Delete the backup data directory
if is_encrypted:
self.logger.log(
f"Starting encrypted deletion for {path_to_delete}")
mount_point = self.encryption_manager.get_mount_point(
base_dest_path)
if not mount_point:
if password:
mount_point = self.encryption_manager.mount_for_deletion(
base_dest_path, is_system, password)
else:
self.logger.log(
"Password not provided for encrypted deletion.")
if not mount_point:
self.logger.log("Failed to unlock container for deletion.")
queue.put(('deletion_complete', False))
return
internal_path_to_delete = os.path.join(
mount_point, backup_dir_name)
success = False
if is_system:
script_content = f"rm -rf '{internal_path_to_delete}'"
success = self.encryption_manager._execute_as_root(
script_content)
else: # User backup, no root needed
try:
if os.path.isdir(internal_path_to_delete):
shutil.rmtree(internal_path_to_delete)
self.logger.log(
f"Successfully deleted {internal_path_to_delete}")
success = True
except Exception as e:
self.logger.log(
f"Failed to delete user backup {internal_path_to_delete}: {e}")
success = False
if not success:
self.logger.log(
"Failed to delete files within encrypted container.")
queue.put(('deletion_complete', False))
return # Stop if data deletion failed
elif is_system:
script_content = f"rm -rf '{path_to_delete}'"
if not self.encryption_manager._execute_as_root(script_content):
self.logger.log(f"Failed to delete {path_to_delete}")
queue.put(('deletion_complete', False))
return # Stop if data deletion failed
else: # Unencrypted user backup
try:
if os.path.isdir(path_to_delete):
shutil.rmtree(path_to_delete)
self.logger.log(f"Successfully deleted {path_to_delete}")
except Exception as e:
self.logger.log(
f"Failed to delete unencrypted user backup {path_to_delete}: {e}")
queue.put(('deletion_complete', False))
return # Stop if data deletion failed
# Finally, delete the metadata file (with user permissions)
try:
if os.path.exists(metadata_file_path):
os.remove(metadata_file_path)
self.logger.log(
f"Successfully deleted metadata file {metadata_file_path}")
queue.put(('deletion_complete', True))
except Exception as e:
self.logger.log(
f"Failed to delete metadata file {metadata_file_path}: {e}")
queue.put(('deletion_complete', False))
except Exception as e:
self.logger.log(f"Error during threaded deletion: {e}")
queue.put(('deletion_complete', False))
def get_comment(self, info_file_path: str) -> str:
"""Reads the comment from a backup_info.json file."""
try:
with open(info_file_path, 'r') as f:
data = json.load(f)
return data.get("comment", "")
except (IOError, json.JSONDecodeError):
return ""
def update_comment(self, info_file_path: str, new_comment: str):
"""Updates the comment in a backup_info.json file."""
try:
with open(info_file_path, 'r') as f:
data = json.load(f)
data["comment"] = new_comment
with open(info_file_path, 'w') as f:
json.dump(data, f, indent=4)
self.logger.log(f"Successfully updated comment in {info_file_path}")
except (IOError, json.JSONDecodeError) as e:
self.logger.log(f"Failed to update comment in {info_file_path}: {e}")
def _execute_rsync(self, queue, command: List[str]):
transferred_size, total_size, stderr_output = 0, 0, ""
try:
env = os.environ.copy()
env['LC_ALL'] = 'C'
self.process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, preexec_fn=os.setsid, env=env)
if self.process.stdout:
for line in iter(self.process.stdout.readline, ''):
stripped_line = line.strip()
self.logger.log(f"Rsync stdout: {stripped_line}")
if '%' in stripped_line:
match = re.search(r'\s*(\d+)%\s+', stripped_line)
if match:
queue.put(('progress', int(match.group(1))))
elif stripped_line and not stripped_line.startswith(('sending', 'sent', 'total')):
queue.put(('file_update', stripped_line))
self.process.wait()
if self.process.stderr:
stderr_output = self.process.stderr.read()
if stderr_output:
self.logger.log(f"Rsync Stderr: {stderr_output.strip()}")
except FileNotFoundError:
self.logger.log(f"Error: '{command[0]}' not found.")
queue.put(('error', None))
except Exception as e:
self.logger.log(f"Rsync execution error: {e}")
queue.put(('error', None))
return transferred_size, total_size, stderr_output
def start_restore(self, source_path: str, dest_path: str, is_compressed: bool):
from queue import Queue
queue = self.app.queue if hasattr(self.app, 'queue') else Queue()
thread = threading.Thread(target=self._run_restore, args=(
queue, source_path, dest_path, is_compressed))
thread.daemon = True
thread.start()
def _run_restore(self, queue, source_path: str, dest_path: str, is_compressed: bool):
self.logger.log(f"Starting restore from {source_path} to {dest_path}")
status = 'error'
try:
source = source_path.rstrip('/') + '/'
script_content = f"tar -xzf '{source_path}' -C '{dest_path}'" if is_compressed else f"rsync -aAXHv '{source}' '{dest_path}'"
if self.encryption_manager._execute_as_root(script_content):
status = 'success'
else:
self.logger.log("Restore script failed.")
except Exception as e:
self.logger.log(
f"An unexpected error occurred during restore: {e}")
finally:
queue.put(
('completion', {'status': status, 'returncode': 0 if status == 'success' else 1}))
def get_scheduled_jobs(self) -> List[Dict[str, Any]]:
jobs_list = []
try:
user_cron = CronTab(user=True)
for job in user_cron:
if self.app_tag in job.comment:
details = self._parse_job_comment(job.comment)
if details:
jobs_list.append(details)
except Exception as e:
self.logger.log(f"Error loading cron jobs: {e}")
return jobs_list
def start_delete_backup(self, path_to_delete: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str] = None):
thread = threading.Thread(target=self._run_delete, args=(
path_to_delete, is_encrypted, is_system, base_dest_path, queue, password))
thread.daemon = True
thread.start()
def _run_delete(self, path_to_delete: str, is_encrypted: bool, is_system: bool, base_dest_path: str, queue, password: Optional[str]):
try:
# Determine metadata file path before any deletion
backup_dir_name = os.path.basename(path_to_delete.rstrip('/'))
metadata_file_path = os.path.join(
base_dest_path, "pybackup", "metadata", f"{backup_dir_name}.json")
# Delete the backup data directory
if is_encrypted:
self.logger.log(
f"Starting encrypted deletion for {path_to_delete}")
mount_point = self.encryption_manager.get_mount_point(
base_dest_path)
if not mount_point:
if password:
mount_point = self.encryption_manager.mount_for_deletion(
base_dest_path, is_system, password)
else:
self.logger.log(
"Password not provided for encrypted deletion.")
if not mount_point:
self.logger.log("Failed to unlock container for deletion.")
queue.put(('deletion_complete', False))
return
internal_path_to_delete = os.path.join(
mount_point, backup_dir_name)
success = False
if is_system:
script_content = f"rm -rf '{internal_path_to_delete}'"
success = self.encryption_manager._execute_as_root(
script_content)
else: # User backup, no root needed
try:
if os.path.isdir(internal_path_to_delete):
shutil.rmtree(internal_path_to_delete)
self.logger.log(
f"Successfully deleted {internal_path_to_delete}")
success = True
except Exception as e:
self.logger.log(
f"Failed to delete user backup {internal_path_to_delete}: {e}")
success = False
if not success:
self.logger.log(
"Failed to delete files within encrypted container.")
queue.put(('deletion_complete', False))
return # Stop if data deletion failed
elif is_system:
script_content = f"rm -rf '{path_to_delete}'"
if not self.encryption_manager._execute_as_root(script_content):
self.logger.log(f"Failed to delete {path_to_delete}")
queue.put(('deletion_complete', False))
return # Stop if data deletion failed
else: # Unencrypted user backup
try:
if os.path.isdir(path_to_delete):
shutil.rmtree(path_to_delete)
self.logger.log(f"Successfully deleted {path_to_delete}")
except Exception as e:
self.logger.log(
f"Failed to delete unencrypted user backup {path_to_delete}: {e}")
queue.put(('deletion_complete', False))
return # Stop if data deletion failed
# Finally, delete the metadata file (with user permissions)
try:
if os.path.exists(metadata_file_path):
os.remove(metadata_file_path)
self.logger.log(
f"Successfully deleted metadata file {metadata_file_path}")
queue.put(('deletion_complete', True))
except Exception as e:
self.logger.log(
f"Failed to delete metadata file {metadata_file_path}: {e}")
queue.put(('deletion_complete', False))
except Exception as e:
self.logger.log(f"Error during threaded deletion: {e}")
queue.put(('deletion_complete', False))
def cancel_and_delete_privileged_backup(self, delete_path: str):
if not self.process or self.process.poll() is not None:
return
self.logger.log(
"Attempting to cancel backup and delete directory with root privileges...")
try:
pgid = os.getpgid(self.process.pid)
script_content = f"""
kill -SIGTERM -- -{pgid} || echo 'Process group not found or already terminated.'
if [ -n "{delete_path}" ] && [ "{delete_path}" != "/" ]; then rm -rf "{delete_path}"; fi
"""
self.encryption_manager._execute_as_root(script_content)
except Exception as e:
self.logger.log(
f"An error occurred during privileged cancel and delete: {e}")
def cancel_backup(self):
if self.process and self.process.poll() is None:
self.logger.log(
f"Attempting to cancel backup process with PID: {self.process.pid}")
try:
# Terminate the entire process group to stop rsync
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
self.logger.log("Successfully sent SIGTERM to process group.")
except ProcessLookupError:
self.logger.log("Process already finished.")
except Exception as e:
self.logger.log(f"Error cancelling process: {e}")