- Ensure backup content view updates after backup completion or deletion. - Fix "Please wait" animation not stopping after backup deletion. - Refactor backup type selection logic for more reliable UI updates. - Correct backup detection after resetting settings. - Ensure correct size is written to info file for all backup types.
732 lines
32 KiB
Python
732 lines
32 KiB
Python
import subprocess
|
|
import os
|
|
import threading
|
|
import re
|
|
import signal
|
|
import datetime
|
|
from typing import Optional, List, Dict, Any
|
|
from pathlib import Path
|
|
from crontab import CronTab
|
|
import tempfile
|
|
import stat
|
|
|
|
from pbp_app_config import AppConfig
|
|
|
|
|
|
class BackupManager:
|
|
"""
|
|
Handles the logic for creating and managing backups using rsync.
|
|
"""
|
|
|
|
def __init__(self, logger):
|
|
self.logger = logger
|
|
self.process = None
|
|
self.app_tag = "# Py-Backup Job"
|
|
self.is_system_process = False
|
|
|
|
def _execute_as_root(self, script_content: str) -> bool:
|
|
"""Executes a shell script with root privileges using pkexec."""
|
|
script_path = ''
|
|
try:
|
|
# Use tempfile for secure temporary file creation
|
|
with tempfile.NamedTemporaryFile(mode='w', delete=False, prefix="pybackup_script_", suffix=".sh", dir="/tmp") as tmp_script:
|
|
tmp_script.write("#!/bin/bash\n\n")
|
|
tmp_script.write("set -e\n\n") # Exit on error
|
|
tmp_script.write(script_content)
|
|
script_path = tmp_script.name
|
|
|
|
# Make the script executable
|
|
os.chmod(script_path, stat.S_IRWXU | stat.S_IRGRP |
|
|
stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
|
|
|
|
command = ['pkexec', script_path]
|
|
|
|
self.logger.log(
|
|
f"Executing privileged command via script: {script_path}")
|
|
self.logger.log(
|
|
f"Script content:\n---\n{script_content}\n---")
|
|
|
|
result = subprocess.run(
|
|
command, capture_output=True, text=True, check=False)
|
|
|
|
if result.returncode == 0:
|
|
self.logger.log(
|
|
f"Privileged script executed successfully. Output:\n{result.stdout}")
|
|
return True
|
|
else:
|
|
self.logger.log(
|
|
f"Privileged script failed. Return code: {result.returncode}\nStderr: {result.stderr}\nStdout: {result.stdout}")
|
|
return False
|
|
except Exception as e:
|
|
self.logger.log(
|
|
f"Failed to set up or execute privileged command: {e}")
|
|
return False
|
|
finally:
|
|
if script_path and os.path.exists(script_path):
|
|
os.remove(script_path)
|
|
|
|
def cancel_and_delete_privileged_backup(self, delete_path: str):
|
|
"""Cancels a running system backup and deletes the target directory in one atomic pkexec call."""
|
|
if not self.process or self.process.poll() is not None:
|
|
self.logger.log("No active backup process to cancel.")
|
|
return
|
|
|
|
self.logger.log(
|
|
"Attempting to cancel backup and delete directory with root privileges...")
|
|
try:
|
|
pgid = os.getpgid(self.process.pid)
|
|
|
|
script_parts = [
|
|
f"echo 'Attempting to terminate process group {pgid}'",
|
|
f"kill -SIGTERM -- -{pgid} || echo 'Process group {pgid} not found or already terminated.'",
|
|
f"echo 'Attempting to delete directory {delete_path}'",
|
|
f'if [ -n "{delete_path}" ] && [ "{delete_path}" != "/" ]; then',
|
|
f' rm -rf "{delete_path}"',
|
|
f'fi'
|
|
]
|
|
script_content = "\n".join(script_parts)
|
|
|
|
if self._execute_as_root(script_content):
|
|
self.logger.log(
|
|
"Backup cancellation and deletion script succeeded.")
|
|
else:
|
|
self.logger.log(
|
|
"Backup cancellation and deletion script failed.")
|
|
|
|
except ProcessLookupError:
|
|
self.logger.log("Backup process already terminated before action.")
|
|
# Still try to delete the directory
|
|
self.delete_privileged_path(delete_path)
|
|
except Exception as e:
|
|
self.logger.log(
|
|
f"An error occurred during privileged cancel and delete: {e}")
|
|
|
|
def delete_privileged_path(self, path: str):
|
|
"""Deletes a given path using root privileges."""
|
|
self.logger.log(f"Requesting privileged deletion of: {path}")
|
|
if not path or path == "/":
|
|
self.logger.log("Invalid path for deletion provided.")
|
|
return
|
|
|
|
script_content = f'rm -rf "{path}"'
|
|
if self._execute_as_root(script_content):
|
|
self.logger.log(f"Successfully deleted path: {path}")
|
|
else:
|
|
self.logger.log(f"Failed to delete path: {path}")
|
|
|
|
def start_delete_system_backup(self, path: str, queue):
|
|
"""Starts a threaded system backup deletion."""
|
|
thread = threading.Thread(target=self._run_delete, args=(path, queue))
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
def _run_delete(self, path: str, queue):
|
|
"""Runs the deletion and puts a message on the queue when done."""
|
|
try:
|
|
info_file = f"{path}.txt"
|
|
# Build a script to remove both the folder and the info file in one go.
|
|
# Use -f to avoid errors if the info file doesn't exist.
|
|
script_content = f"""
|
|
rm -rf '{path}'
|
|
rm -f '{info_file}'
|
|
"""
|
|
if self._execute_as_root(script_content):
|
|
self.logger.log(f"Successfully deleted {path} and {info_file}")
|
|
queue.put(('deletion_complete', True))
|
|
else:
|
|
self.logger.log(f"Failed to delete {path}")
|
|
queue.put(('deletion_complete', False))
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"Error during threaded deletion: {e}")
|
|
queue.put(('deletion_complete', False))
|
|
|
|
def cancel_backup(self):
|
|
if self.process and self.process.poll() is None: # Check if process is still running
|
|
self.logger.log("Attempting to cancel backup...")
|
|
try:
|
|
pgid = os.getpgid(self.process.pid)
|
|
if self.is_system_process:
|
|
self.logger.log(
|
|
f"Cancelling system process with pgid {pgid} via privileged script.")
|
|
script_content = f"kill -SIGTERM -- -{pgid}"
|
|
self._execute_as_root(script_content)
|
|
else:
|
|
os.killpg(pgid, signal.SIGTERM)
|
|
self.logger.log("Backup process terminated.")
|
|
except ProcessLookupError:
|
|
self.logger.log(
|
|
"Backup process already terminated or not found.")
|
|
except Exception as e:
|
|
self.logger.log(f"Failed to terminate backup process: {e}")
|
|
else:
|
|
self.logger.log("No active backup process to cancel.")
|
|
|
|
def start_backup(self, queue, source_path: str, dest_path: str, is_system: bool, is_dry_run: bool = False, exclude_files: Optional[List[Path]] = None, source_size: int = 0, is_compressed: bool = False):
|
|
"""Starts a generic backup process for a specific path, reporting to a queue."""
|
|
thread = threading.Thread(target=self._run_backup_path, args=(
|
|
queue, source_path, dest_path, is_system, is_dry_run, exclude_files, source_size, is_compressed))
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
def _find_latest_backup(self, base_backup_path: str) -> Optional[str]:
|
|
"""Finds the most recent backup directory in a given path."""
|
|
self.logger.log(f"Searching for latest backup in: {base_backup_path}")
|
|
|
|
backup_names = self.list_backups(base_backup_path)
|
|
|
|
if not backup_names:
|
|
self.logger.log("No previous backups found to link against.")
|
|
return None
|
|
|
|
latest_backup_name = backup_names[0]
|
|
latest_backup_path = os.path.join(base_backup_path, latest_backup_name)
|
|
|
|
if os.path.isdir(latest_backup_path):
|
|
self.logger.log(f"Found latest backup for --link-dest: {latest_backup_path}")
|
|
return latest_backup_path
|
|
|
|
self.logger.log(f"Latest backup entry '{latest_backup_name}' was not a directory. No link will be used.")
|
|
return None
|
|
|
|
def _compress_and_cleanup(self, dest_path: str, is_system: bool) -> bool:
|
|
"""Compresses the backup directory and cleans up the original."""
|
|
self.logger.log(f"Starting compression for: {dest_path}")
|
|
parent_dir = os.path.dirname(dest_path)
|
|
archive_name = os.path.basename(dest_path) + ".tar.gz"
|
|
archive_path = os.path.join(parent_dir, archive_name)
|
|
|
|
# Using -C is important to avoid storing the full path in the tarball
|
|
# Ensure paths with spaces are quoted for the shell script
|
|
tar_command = f"tar -czf '{archive_path}' -C '{parent_dir}' '{os.path.basename(dest_path)}'"
|
|
rm_command = f"rm -rf '{dest_path}'"
|
|
|
|
script_content = f"""
|
|
#!/bin/bash
|
|
set -e
|
|
|
|
{tar_command}
|
|
echo \"tar command finished with exit code $?.\"
|
|
|
|
{rm_command}
|
|
echo \"rm command finished with exit code $?.\"
|
|
"""
|
|
|
|
if is_system:
|
|
self.logger.log("Executing compression and cleanup as root.")
|
|
if self._execute_as_root(script_content):
|
|
self.logger.log("Compression and cleanup script executed successfully.")
|
|
return True
|
|
else:
|
|
self.logger.log("Compression and cleanup script failed.")
|
|
return False
|
|
else:
|
|
# For non-system backups, run commands directly
|
|
try:
|
|
self.logger.log(f"Executing local command: {tar_command}")
|
|
tar_result = subprocess.run(tar_command, shell=True, capture_output=True, text=True, check=True)
|
|
self.logger.log(f"tar command successful. Output: {tar_result.stdout}")
|
|
|
|
self.logger.log(f"Executing local command: {rm_command}")
|
|
rm_result = subprocess.run(rm_command, shell=True, capture_output=True, text=True, check=True)
|
|
self.logger.log(f"rm command successful. Output: {rm_result.stdout}")
|
|
|
|
return True
|
|
except subprocess.CalledProcessError as e:
|
|
self.logger.log(f"A command failed during local compression/cleanup. Return code: {e.returncode}")
|
|
self.logger.log(f"Stdout: {e.stdout}")
|
|
self.logger.log(f"Stderr: {e.stderr}")
|
|
return False
|
|
except Exception as e:
|
|
self.logger.log(f"An unexpected error occurred during local compression/cleanup: {e}")
|
|
return False
|
|
|
|
def _run_backup_path(self, queue, source_path: str, dest_path: str, is_system: bool, is_dry_run: bool, exclude_files: Optional[List[Path]], source_size: int, is_compressed: bool):
|
|
try:
|
|
self.is_system_process = is_system
|
|
self.logger.log(
|
|
f"Starting backup from '{source_path}' to '{dest_path}'...")
|
|
|
|
if os.path.isdir(source_path) and not source_path.endswith('/'):
|
|
source_path += '/'
|
|
|
|
parent_dest = os.path.dirname(dest_path)
|
|
# Ensure the parent directory exists. For system backups, rsync with pkexec will create the final destination.
|
|
# For user backups, this creates the destination.
|
|
if not os.path.exists(parent_dest):
|
|
os.makedirs(parent_dest, exist_ok=True)
|
|
|
|
latest_backup_path = self._find_latest_backup(parent_dest)
|
|
|
|
command = []
|
|
if is_system:
|
|
command.extend(['pkexec', 'rsync', '-aAXHv'])
|
|
else:
|
|
command.extend(['rsync', '-av'])
|
|
|
|
if latest_backup_path and not is_dry_run:
|
|
self.logger.log(f"Using --link-dest='{latest_backup_path}'")
|
|
command.append(f"--link-dest={latest_backup_path}")
|
|
|
|
command.extend(['--info=progress2'])
|
|
|
|
if exclude_files:
|
|
for exclude_file in exclude_files:
|
|
command.append(f"--exclude-from={exclude_file}")
|
|
|
|
if AppConfig.MANUAL_EXCLUDE_LIST_PATH.exists():
|
|
command.append(f"--exclude-from={AppConfig.MANUAL_EXCLUDE_LIST_PATH}")
|
|
|
|
if is_dry_run:
|
|
command.append('--dry-run')
|
|
|
|
command.extend([source_path, dest_path])
|
|
|
|
transferred_size = self._execute_rsync(queue, command)
|
|
|
|
if self.process:
|
|
return_code = self.process.returncode
|
|
self.logger.log(
|
|
f"Rsync process finished with return code: {return_code}")
|
|
|
|
status = 'error'
|
|
if return_code == 0:
|
|
status = 'success'
|
|
elif return_code in [23, 24]: # rsync warnings
|
|
status = 'warning'
|
|
elif return_code in [143, -15, 15, -9]: # SIGTERM/SIGKILL
|
|
status = 'cancelled'
|
|
|
|
if status in ['success', 'warning'] and not is_dry_run:
|
|
info_filename_base = os.path.basename(dest_path)
|
|
final_size = transferred_size
|
|
|
|
if is_compressed:
|
|
self.logger.log(f"Compression requested for {dest_path}")
|
|
queue.put(('status_update', 'Phase 2/2: Komprimiere Backup...'))
|
|
queue.put(('progress_mode', 'indeterminate'))
|
|
queue.put(('cancel_button_state', 'disabled'))
|
|
|
|
if self._compress_and_cleanup(dest_path, is_system):
|
|
info_filename_base += ".tar.gz"
|
|
else:
|
|
self.logger.log("Compression failed, keeping uncompressed backup.")
|
|
|
|
queue.put(('progress_mode', 'determinate'))
|
|
queue.put(('cancel_button_state', 'normal'))
|
|
|
|
self._create_info_file(
|
|
dest_path, f"{info_filename_base}.txt", final_size)
|
|
|
|
queue.put(('completion', {'status': status, 'returncode': return_code}))
|
|
else:
|
|
self.logger.log(
|
|
"Rsync process did not start or self.process is None.")
|
|
queue.put(('completion', {'status': 'error', 'returncode': -1}))
|
|
|
|
self.logger.log(
|
|
f"Backup to '{dest_path}' completed.")
|
|
finally:
|
|
self.process = None
|
|
|
|
def _create_info_file(self, dest_path: str, filename: str, source_size: int):
|
|
try:
|
|
# Info file is now stored in the parent directory of the backup folder.
|
|
parent_dir = os.path.dirname(dest_path)
|
|
info_file_path = os.path.join(parent_dir, filename)
|
|
|
|
original_bytes = source_size
|
|
if source_size > 0:
|
|
power = 1024
|
|
n = 0
|
|
power_labels = {0: 'B', 1: 'KB', 2: 'MB', 3: 'GB', 4: 'TB'}
|
|
display_size = original_bytes
|
|
while display_size >= power and n < len(power_labels) - 1:
|
|
display_size /= power
|
|
n += 1
|
|
size_str = f"{display_size:.2f} {power_labels[n]}"
|
|
else:
|
|
size_str = "0 B"
|
|
date_str = datetime.datetime.now().strftime("%d. %B %Y, %H:%M:%S")
|
|
|
|
info_content = (
|
|
f"Backup-Datum: {date_str}\n"
|
|
f"Originalgröße: {size_str} ({original_bytes} Bytes)\n"
|
|
)
|
|
|
|
self.logger.log(
|
|
f"Attempting to write info file to {info_file_path} as current user.")
|
|
with open(info_file_path, 'w') as f:
|
|
f.write(info_content)
|
|
self.logger.log(
|
|
f"Successfully created metadata file: {info_file_path}")
|
|
|
|
except Exception as e:
|
|
self.logger.log(
|
|
f"Failed to create metadata file. Please check permissions for {os.path.dirname(info_file_path)}. Error: {e}")
|
|
|
|
def _execute_rsync(self, queue, command: List[str]):
|
|
transferred_size = 0
|
|
try:
|
|
try:
|
|
self.process = subprocess.Popen(
|
|
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, preexec_fn=os.setsid)
|
|
except FileNotFoundError:
|
|
self.logger.log(
|
|
"Error: 'pkexec' or 'rsync' command not found in PATH during Popen call.")
|
|
queue.put(('error', None))
|
|
return 0
|
|
except Exception as e:
|
|
self.logger.log(
|
|
f"Error starting rsync process with Popen: {e}")
|
|
queue.put(('error', None))
|
|
return 0
|
|
|
|
if self.process is None: # This check might be redundant if exceptions are caught, but good for safety
|
|
self.logger.log(
|
|
"Error: subprocess.Popen returned None for rsync process (after exception handling).")
|
|
queue.put(('error', None))
|
|
return 0 # Exit early if process didn't start
|
|
|
|
progress_regex = re.compile(r'\s*(\d+)%\s+')
|
|
output_lines = []
|
|
|
|
if self.process.stdout:
|
|
for line in iter(self.process.stdout.readline, ''):
|
|
stripped_line = line.strip()
|
|
self.logger.log(stripped_line)
|
|
output_lines.append(stripped_line)
|
|
|
|
match = progress_regex.search(stripped_line)
|
|
if match:
|
|
percentage = int(match.group(1))
|
|
queue.put(('progress', percentage))
|
|
else:
|
|
if stripped_line and not stripped_line.startswith(('sending incremental file list', 'sent', 'total size')):
|
|
queue.put(('file_update', stripped_line))
|
|
|
|
self.process.wait()
|
|
if self.process.stderr:
|
|
stderr_output = self.process.stderr.read()
|
|
if stderr_output:
|
|
self.logger.log(f"Rsync Error: {stderr_output.strip()}")
|
|
output_lines.extend(stderr_output.strip().split('\n'))
|
|
|
|
# After process completion, parse the output for transferred size
|
|
for line in output_lines:
|
|
if line.startswith('Total transferred file size:'):
|
|
try:
|
|
size_str = line.split(':')[1].strip().split(' ')[0]
|
|
transferred_size = int(size_str.replace(',', '').replace('.', ''))
|
|
self.logger.log(f"Detected transferred size: {transferred_size} bytes")
|
|
break
|
|
except (ValueError, IndexError):
|
|
self.logger.log(f"Could not parse transferred size from line: {line}")
|
|
|
|
except FileNotFoundError:
|
|
self.logger.log(
|
|
"Error: 'rsync' command not found. Please ensure it is installed and in your PATH.")
|
|
queue.put(('error', None))
|
|
except Exception as e:
|
|
self.logger.log(f"An unexpected error occurred: {e}")
|
|
queue.put(('error', None))
|
|
|
|
return transferred_size
|
|
|
|
def start_restore(self, source_path: str, dest_path: str, is_compressed: bool):
|
|
"""Starts a restore process in a separate thread."""
|
|
# We need the queue from the app instance to report progress
|
|
# A bit of a hack, but avoids passing the queue all the way down from the UI
|
|
try:
|
|
queue = self.app.queue
|
|
except AttributeError:
|
|
self.logger.log("Could not get queue from app instance. Restore progress will not be reported.")
|
|
# Create a dummy queue
|
|
from queue import Queue
|
|
queue = Queue()
|
|
|
|
thread = threading.Thread(target=self._run_restore, args=(
|
|
queue, source_path, dest_path, is_compressed))
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
def _run_restore(self, queue, source_path: str, dest_path: str, is_compressed: bool):
|
|
"""Executes the restore logic for a system backup."""
|
|
self.logger.log(f"Starting restore from {source_path} to {dest_path}")
|
|
status = 'error'
|
|
try:
|
|
if is_compressed:
|
|
# For compressed files, we extract to the destination.
|
|
# The -C flag tells tar to change to that directory before extracting.
|
|
script_content = f"tar -xzf '{source_path}' -C '{dest_path}'"
|
|
else:
|
|
# For regular directories, we rsync the content.
|
|
# Ensure source path has a trailing slash to copy contents.
|
|
source = source_path.rstrip('/') + '/'
|
|
script_content = f"rsync -aAXHv '{source}' '{dest_path}'"
|
|
|
|
if self._execute_as_root(script_content):
|
|
self.logger.log("Restore script executed successfully.")
|
|
status = 'success'
|
|
else:
|
|
self.logger.log("Restore script failed.")
|
|
status = 'error'
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"An unexpected error occurred during restore: {e}")
|
|
status = 'error'
|
|
finally:
|
|
# Use a generic completion message for now.
|
|
# The queue processing logic in main_app might need a 'restore_completion' type.
|
|
queue.put(('completion', {'status': status, 'returncode': 0 if status == 'success' else 1}))
|
|
|
|
def get_scheduled_jobs(self) -> List[Dict[str, Any]]:
|
|
jobs_list = []
|
|
try:
|
|
user_cron = CronTab(user=True)
|
|
for job in user_cron:
|
|
if self.app_tag in job.comment:
|
|
details = self._parse_job_comment(job.comment)
|
|
if details:
|
|
jobs_list.append({
|
|
"id": job.comment,
|
|
"active": job.is_enabled(),
|
|
"type": details.get("type", "N/A"),
|
|
"frequency": details.get("freq", "N/A"),
|
|
"destination": details.get("dest", "N/A"),
|
|
"sources": details.get("sources", []),
|
|
"command": job.command
|
|
})
|
|
except Exception as e:
|
|
self.logger.log(f"Error loading cron jobs: {e}")
|
|
return jobs_list
|
|
|
|
def add_scheduled_job(self, job_details: Dict[str, Any]):
|
|
try:
|
|
user_cron = CronTab(user=True)
|
|
job = user_cron.new(
|
|
command=job_details["command"], comment=job_details["comment"])
|
|
|
|
if job_details["frequency"] == "daily":
|
|
job.day.every(1)
|
|
elif job_details["frequency"] == "weekly":
|
|
job.dow.every(1)
|
|
elif job_details["frequency"] == "monthly":
|
|
job.dom.every(1)
|
|
|
|
job.enable()
|
|
user_cron.write()
|
|
self.logger.log(
|
|
f"Job successfully added: {job_details['comment']}")
|
|
except Exception as e:
|
|
self.logger.log(f"Error adding cron job: {e}")
|
|
|
|
def remove_scheduled_job(self, job_id: str):
|
|
try:
|
|
user_cron = CronTab(user=True)
|
|
user_cron.remove_all(comment=job_id)
|
|
user_cron.write()
|
|
self.logger.log(f"Job successfully removed: {job_id}")
|
|
except Exception as e:
|
|
self.logger.log(f"Error removing cron job: {e}")
|
|
|
|
def _parse_job_comment(self, comment: str) -> Dict[str, Any]:
|
|
details = {}
|
|
parts = comment.split("; ")
|
|
for part in parts:
|
|
if ":" in part:
|
|
key, value = part.split(":", 1)
|
|
if key.strip() == "sources":
|
|
details[key.strip()] = [s.strip()
|
|
for s in value.split(",")]
|
|
else:
|
|
details[key.strip()] = value.strip()
|
|
return details
|
|
|
|
def list_backups(self, base_backup_path: str) -> List[str]:
|
|
backups = []
|
|
if os.path.isdir(base_backup_path):
|
|
for item in os.listdir(base_backup_path):
|
|
full_path = os.path.join(base_backup_path, item)
|
|
if os.path.isdir(full_path):
|
|
backups.append(item)
|
|
return sorted(backups, reverse=True)
|
|
|
|
def list_system_backups(self, base_backup_path: str) -> List[Dict[str, str]]:
|
|
"""Lists all system backups found in the pybackup subdirectory."""
|
|
system_backups = []
|
|
pybackup_path = os.path.join(base_backup_path, "pybackup")
|
|
|
|
if not os.path.isdir(pybackup_path):
|
|
return system_backups
|
|
|
|
# Regex to parse folder names like '6-März-2024_143000_system_full' or '6-März-2024_143000_system_full.tar.gz'
|
|
name_regex = re.compile(
|
|
r"^(\d{1,2}-\w+-\d{4})_(\d{6})_system_(full|incremental)(\.tar\.gz)?$", re.IGNORECASE)
|
|
|
|
for item in os.listdir(pybackup_path):
|
|
# Skip info files
|
|
if item.endswith('.txt'):
|
|
continue
|
|
|
|
match = name_regex.match(item)
|
|
if not match:
|
|
continue
|
|
|
|
full_path = os.path.join(pybackup_path, item)
|
|
date_str = match.group(1)
|
|
# time_str = match.group(2) # Not currently used in UI, but available
|
|
backup_type_base = match.group(3).capitalize()
|
|
is_compressed = match.group(4) is not None
|
|
|
|
backup_type = backup_type_base
|
|
if is_compressed:
|
|
backup_type += " (Compressed)"
|
|
|
|
backup_size = "N/A"
|
|
comment = ""
|
|
|
|
# Info file is named after the backup item (e.g., 'backup_name.txt' or 'backup_name.tar.gz.txt')
|
|
info_file_path = os.path.join(pybackup_path, f"{item}.txt")
|
|
if os.path.exists(info_file_path):
|
|
try:
|
|
with open(info_file_path, 'r') as f:
|
|
for line in f:
|
|
if line.strip().lower().startswith("originalgröße:"):
|
|
size_match = re.search(r":\s*(.*?)\s*\(", line)
|
|
if size_match:
|
|
backup_size = size_match.group(1).strip()
|
|
else:
|
|
backup_size = line.split(":")[1].strip()
|
|
elif line.strip().lower().startswith("kommentar:"):
|
|
comment = line.split(":", 1)[1].strip()
|
|
except Exception as e:
|
|
self.logger.log(
|
|
f"Could not read info file {info_file_path}: {e}")
|
|
|
|
system_backups.append({
|
|
"date": date_str,
|
|
"type": backup_type,
|
|
"size": backup_size,
|
|
"folder_name": item,
|
|
"full_path": full_path,
|
|
"comment": comment,
|
|
"is_compressed": is_compressed
|
|
})
|
|
|
|
# Sort by parsing the date from the folder name
|
|
try:
|
|
system_backups.sort(key=lambda x: datetime.datetime.strptime(
|
|
x['date'], '%d-%B-%Y'), reverse=True)
|
|
except ValueError:
|
|
self.logger.log(
|
|
"Could not sort backups by date due to format mismatch.")
|
|
# Fallback to simple string sort if date parsing fails
|
|
system_backups.sort(key=lambda x: x['folder_name'], reverse=True)
|
|
|
|
return system_backups
|
|
|
|
def list_user_backups(self, base_backup_path: str) -> List[Dict[str, str]]:
|
|
"""Lists all user backups found in the base backup path."""
|
|
user_backups = []
|
|
if not os.path.isdir(base_backup_path):
|
|
return user_backups
|
|
|
|
for item in os.listdir(base_backup_path):
|
|
full_path = os.path.join(base_backup_path, item)
|
|
if not os.path.isdir(full_path):
|
|
continue
|
|
|
|
# NEW: Look for info file in the parent directory, named after the backup folder
|
|
info_file_path = os.path.join(base_backup_path, f"{item}.txt")
|
|
|
|
# We identify a user backup by the presence of its corresponding info file.
|
|
if os.path.exists(info_file_path):
|
|
backup_size = "N/A"
|
|
backup_date = "N/A"
|
|
comment = ""
|
|
try:
|
|
with open(info_file_path, 'r') as f:
|
|
for line in f:
|
|
if line.strip().lower().startswith("originalgröße:"):
|
|
size_match = re.search(r":\s*(.*?)\s*\(", line)
|
|
if size_match:
|
|
backup_size = size_match.group(1).strip()
|
|
else:
|
|
backup_size = line.split(":")[1].strip()
|
|
elif line.strip().lower().startswith("backup-datum:"):
|
|
backup_date = line.split(":", 1)[1].strip()
|
|
elif line.strip().lower().startswith("kommentar:"):
|
|
comment = line.split(":", 1)[1].strip()
|
|
except Exception as e:
|
|
self.logger.log(
|
|
f"Could not read info file {info_file_path}: {e}")
|
|
|
|
user_backups.append({
|
|
"date": backup_date,
|
|
"size": backup_size,
|
|
"folder_name": item,
|
|
"full_path": full_path,
|
|
"comment": comment
|
|
})
|
|
|
|
user_backups.sort(key=lambda x: x['folder_name'], reverse=True)
|
|
return user_backups
|
|
|
|
def get_comment(self, info_file_path: str) -> str:
|
|
"""Reads an info file and returns the comment, if it exists."""
|
|
if not os.path.exists(info_file_path):
|
|
return ""
|
|
try:
|
|
with open(info_file_path, 'r') as f:
|
|
for line in f:
|
|
if line.strip().lower().startswith("kommentar:"):
|
|
return line.split(":", 1)[1].strip()
|
|
except Exception as e:
|
|
self.logger.log(f"Error reading comment from {info_file_path}: {e}")
|
|
return ""
|
|
|
|
def update_comment(self, info_file_path: str, new_comment: str):
|
|
"""Updates the comment in a given info file."""
|
|
try:
|
|
lines = []
|
|
comment_found = False
|
|
if os.path.exists(info_file_path):
|
|
with open(info_file_path, 'r') as f:
|
|
lines = f.readlines()
|
|
|
|
new_lines = []
|
|
for line in lines:
|
|
if line.strip().lower().startswith("kommentar:"):
|
|
if new_comment: # Update existing comment
|
|
new_lines.append(f"Kommentar: {new_comment}\n")
|
|
comment_found = True
|
|
# If new_comment is empty, the old line is effectively deleted
|
|
else:
|
|
new_lines.append(line)
|
|
|
|
if not comment_found and new_comment:
|
|
new_lines.append(f"Kommentar: {new_comment}\n")
|
|
|
|
with open(info_file_path, 'w') as f:
|
|
f.writelines(new_lines)
|
|
self.logger.log(f"Successfully updated comment in {info_file_path}")
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"Error updating comment in {info_file_path}: {e}")
|
|
|
|
def test_pkexec_rsync(self, source_path: str, dest_path: str):
|
|
self.logger.log(f"Testing pkexec rsync command...")
|
|
command = ['pkexec', 'rsync', '-aAXHv', source_path, dest_path]
|
|
try:
|
|
result = subprocess.run(
|
|
command, capture_output=True, text=True, check=False)
|
|
self.logger.log(f"pkexec rsync return code: {result.returncode}")
|
|
self.logger.log(f"pkexec rsync stdout: {result.stdout.strip()}")
|
|
self.logger.log(f"pkexec rsync stderr: {result.stderr.strip()}")
|
|
except FileNotFoundError:
|
|
self.logger.log("Error: 'pkexec' or 'rsync' command not found.")
|
|
except Exception as e:
|
|
self.logger.log(
|
|
f"An unexpected error occurred during pkexec rsync test: {e}")
|