Implements a new feature for creating compressed full backups and restoring from them. - Backups can now be created as compressed .tar.gz archives. - This option is only available for full backups to maintain the efficiency of incremental backups. - The UI now forces a full backup when compression is selected. - The backup list correctly identifies and labels compressed backups. - The restore process can handle both compressed and uncompressed backups. fix: Improve backup process feedback and reliability - Fixes a critical bug where backups could be overwritten if created on the same day. Backup names now include a timestamp to ensure uniqueness. - Improves UI feedback during compressed backups by showing distinct stages (transfer, compress) and using an indeterminate progress bar during the compression phase. - Disables the cancel button during the non-cancellable compression stage. - Fixes a bug where the incremental backup size was written to the info file for full backups. The correct total size is now used.
699 lines
30 KiB
Python
699 lines
30 KiB
Python
import subprocess
|
|
import os
|
|
import threading
|
|
import re
|
|
import signal
|
|
import datetime
|
|
from typing import Optional, List, Dict, Any
|
|
from pathlib import Path
|
|
from crontab import CronTab
|
|
import tempfile
|
|
import stat
|
|
|
|
|
|
class BackupManager:
|
|
"""
|
|
Handles the logic for creating and managing backups using rsync.
|
|
"""
|
|
|
|
def __init__(self, logger):
|
|
self.logger = logger
|
|
self.process = None
|
|
self.app_tag = "# Py-Backup Job"
|
|
self.is_system_process = False
|
|
|
|
def _execute_as_root(self, script_content: str) -> bool:
|
|
"""Executes a shell script with root privileges using pkexec."""
|
|
script_path = ''
|
|
try:
|
|
# Use tempfile for secure temporary file creation
|
|
with tempfile.NamedTemporaryFile(mode='w', delete=False, prefix="pybackup_script_", suffix=".sh", dir="/tmp") as tmp_script:
|
|
tmp_script.write("#!/bin/bash\n\n")
|
|
tmp_script.write("set -e\n\n") # Exit on error
|
|
tmp_script.write(script_content)
|
|
script_path = tmp_script.name
|
|
|
|
# Make the script executable
|
|
os.chmod(script_path, stat.S_IRWXU | stat.S_IRGRP |
|
|
stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
|
|
|
|
command = ['pkexec', script_path]
|
|
|
|
self.logger.log(
|
|
f"Executing privileged command via script: {script_path}")
|
|
self.logger.log(
|
|
f"Script content:\n---\n{script_content}\n---")
|
|
|
|
result = subprocess.run(
|
|
command, capture_output=True, text=True, check=False)
|
|
|
|
if result.returncode == 0:
|
|
self.logger.log(
|
|
f"Privileged script executed successfully. Output:\n{result.stdout}")
|
|
return True
|
|
else:
|
|
self.logger.log(
|
|
f"Privileged script failed. Return code: {result.returncode}\nStderr: {result.stderr}\nStdout: {result.stdout}")
|
|
return False
|
|
except Exception as e:
|
|
self.logger.log(
|
|
f"Failed to set up or execute privileged command: {e}")
|
|
return False
|
|
finally:
|
|
if script_path and os.path.exists(script_path):
|
|
os.remove(script_path)
|
|
|
|
def cancel_and_delete_privileged_backup(self, delete_path: str):
|
|
"""Cancels a running system backup and deletes the target directory in one atomic pkexec call."""
|
|
if not self.process or self.process.poll() is not None:
|
|
self.logger.log("No active backup process to cancel.")
|
|
return
|
|
|
|
self.logger.log(
|
|
"Attempting to cancel backup and delete directory with root privileges...")
|
|
try:
|
|
pgid = os.getpgid(self.process.pid)
|
|
|
|
script_parts = [
|
|
f"echo 'Attempting to terminate process group {pgid}'",
|
|
f"kill -SIGTERM -- -{pgid} || echo 'Process group {pgid} not found or already terminated.'",
|
|
f"echo 'Attempting to delete directory {delete_path}'",
|
|
f'if [ -n "{delete_path}" ] && [ "{delete_path}" != "/" ]; then',
|
|
f' rm -rf "{delete_path}"',
|
|
f'fi'
|
|
]
|
|
script_content = "\n".join(script_parts)
|
|
|
|
if self._execute_as_root(script_content):
|
|
self.logger.log(
|
|
"Backup cancellation and deletion script succeeded.")
|
|
else:
|
|
self.logger.log(
|
|
"Backup cancellation and deletion script failed.")
|
|
|
|
except ProcessLookupError:
|
|
self.logger.log("Backup process already terminated before action.")
|
|
# Still try to delete the directory
|
|
self.delete_privileged_path(delete_path)
|
|
except Exception as e:
|
|
self.logger.log(
|
|
f"An error occurred during privileged cancel and delete: {e}")
|
|
|
|
def delete_privileged_path(self, path: str):
|
|
"""Deletes a given path using root privileges."""
|
|
self.logger.log(f"Requesting privileged deletion of: {path}")
|
|
if not path or path == "/":
|
|
self.logger.log("Invalid path for deletion provided.")
|
|
return
|
|
|
|
script_content = f'rm -rf "{path}"'
|
|
if self._execute_as_root(script_content):
|
|
self.logger.log(f"Successfully deleted path: {path}")
|
|
else:
|
|
self.logger.log(f"Failed to delete path: {path}")
|
|
|
|
def cancel_backup(self):
|
|
if self.process and self.process.poll() is None: # Check if process is still running
|
|
self.logger.log("Attempting to cancel backup...")
|
|
try:
|
|
pgid = os.getpgid(self.process.pid)
|
|
if self.is_system_process:
|
|
self.logger.log(
|
|
f"Cancelling system process with pgid {pgid} via privileged script.")
|
|
script_content = f"kill -SIGTERM -- -{pgid}"
|
|
self._execute_as_root(script_content)
|
|
else:
|
|
os.killpg(pgid, signal.SIGTERM)
|
|
self.logger.log("Backup process terminated.")
|
|
except ProcessLookupError:
|
|
self.logger.log(
|
|
"Backup process already terminated or not found.")
|
|
except Exception as e:
|
|
self.logger.log(f"Failed to terminate backup process: {e}")
|
|
else:
|
|
self.logger.log("No active backup process to cancel.")
|
|
|
|
def start_backup(self, queue, source_path: str, dest_path: str, is_system: bool, is_dry_run: bool = False, exclude_files: Optional[List[Path]] = None, source_size: int = 0, is_compressed: bool = False):
|
|
"""Starts a generic backup process for a specific path, reporting to a queue."""
|
|
thread = threading.Thread(target=self._run_backup_path, args=(
|
|
queue, source_path, dest_path, is_system, is_dry_run, exclude_files, source_size, is_compressed))
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
def _find_latest_backup(self, base_backup_path: str) -> Optional[str]:
|
|
"""Finds the most recent backup directory in a given path."""
|
|
self.logger.log(f"Searching for latest backup in: {base_backup_path}")
|
|
|
|
backup_names = self.list_backups(base_backup_path)
|
|
|
|
if not backup_names:
|
|
self.logger.log("No previous backups found to link against.")
|
|
return None
|
|
|
|
latest_backup_name = backup_names[0]
|
|
latest_backup_path = os.path.join(base_backup_path, latest_backup_name)
|
|
|
|
if os.path.isdir(latest_backup_path):
|
|
self.logger.log(f"Found latest backup for --link-dest: {latest_backup_path}")
|
|
return latest_backup_path
|
|
|
|
self.logger.log(f"Latest backup entry '{latest_backup_name}' was not a directory. No link will be used.")
|
|
return None
|
|
|
|
def _compress_and_cleanup(self, dest_path: str, is_system: bool) -> bool:
|
|
"""Compresses the backup directory and cleans up the original."""
|
|
self.logger.log(f"Starting compression for: {dest_path}")
|
|
parent_dir = os.path.dirname(dest_path)
|
|
archive_name = os.path.basename(dest_path) + ".tar.gz"
|
|
archive_path = os.path.join(parent_dir, archive_name)
|
|
|
|
# Using -C is important to avoid storing the full path in the tarball
|
|
# Ensure paths with spaces are quoted for the shell script
|
|
tar_command = f"tar -czf '{archive_path}' -C '{parent_dir}' '{os.path.basename(dest_path)}'"
|
|
rm_command = f"rm -rf '{dest_path}'"
|
|
|
|
script_content = f"""
|
|
#!/bin/bash
|
|
set -e
|
|
|
|
{tar_command}
|
|
echo \"tar command finished with exit code $?.\"
|
|
|
|
{rm_command}
|
|
echo \"rm command finished with exit code $?.\"
|
|
"""
|
|
|
|
if is_system:
|
|
self.logger.log("Executing compression and cleanup as root.")
|
|
if self._execute_as_root(script_content):
|
|
self.logger.log("Compression and cleanup script executed successfully.")
|
|
return True
|
|
else:
|
|
self.logger.log("Compression and cleanup script failed.")
|
|
return False
|
|
else:
|
|
# For non-system backups, run commands directly
|
|
try:
|
|
self.logger.log(f"Executing local command: {tar_command}")
|
|
tar_result = subprocess.run(tar_command, shell=True, capture_output=True, text=True, check=True)
|
|
self.logger.log(f"tar command successful. Output: {tar_result.stdout}")
|
|
|
|
self.logger.log(f"Executing local command: {rm_command}")
|
|
rm_result = subprocess.run(rm_command, shell=True, capture_output=True, text=True, check=True)
|
|
self.logger.log(f"rm command successful. Output: {rm_result.stdout}")
|
|
|
|
return True
|
|
except subprocess.CalledProcessError as e:
|
|
self.logger.log(f"A command failed during local compression/cleanup. Return code: {e.returncode}")
|
|
self.logger.log(f"Stdout: {e.stdout}")
|
|
self.logger.log(f"Stderr: {e.stderr}")
|
|
return False
|
|
except Exception as e:
|
|
self.logger.log(f"An unexpected error occurred during local compression/cleanup: {e}")
|
|
return False
|
|
|
|
def _run_backup_path(self, queue, source_path: str, dest_path: str, is_system: bool, is_dry_run: bool, exclude_files: Optional[List[Path]], source_size: int, is_compressed: bool):
|
|
try:
|
|
self.is_system_process = is_system
|
|
self.logger.log(
|
|
f"Starting backup from '{source_path}' to '{dest_path}'...")
|
|
|
|
if os.path.isdir(source_path) and not source_path.endswith('/'):
|
|
source_path += '/'
|
|
|
|
parent_dest = os.path.dirname(dest_path)
|
|
# Ensure the parent directory exists. For system backups, rsync with pkexec will create the final destination.
|
|
# For user backups, this creates the destination.
|
|
if not os.path.exists(parent_dest):
|
|
os.makedirs(parent_dest, exist_ok=True)
|
|
|
|
latest_backup_path = self._find_latest_backup(parent_dest)
|
|
|
|
command = []
|
|
if is_system:
|
|
command.extend(['pkexec', 'rsync', '-aAXHv'])
|
|
else:
|
|
command.extend(['rsync', '-av'])
|
|
|
|
if latest_backup_path and not is_dry_run:
|
|
self.logger.log(f"Using --link-dest='{latest_backup_path}'")
|
|
command.append(f"--link-dest={latest_backup_path}")
|
|
|
|
command.extend(['--info=progress2'])
|
|
|
|
if exclude_files:
|
|
for exclude_file in exclude_files:
|
|
command.append(f"--exclude-from={exclude_file}")
|
|
|
|
if is_dry_run:
|
|
command.append('--dry-run')
|
|
|
|
command.extend([source_path, dest_path])
|
|
|
|
transferred_size = self._execute_rsync(queue, command)
|
|
|
|
if self.process:
|
|
return_code = self.process.returncode
|
|
self.logger.log(
|
|
f"Rsync process finished with return code: {return_code}")
|
|
|
|
status = 'error'
|
|
if return_code == 0:
|
|
status = 'success'
|
|
elif return_code in [23, 24]: # rsync warnings
|
|
status = 'warning'
|
|
elif return_code in [143, -15, 15, -9]: # SIGTERM/SIGKILL
|
|
status = 'cancelled'
|
|
|
|
if status in ['success', 'warning'] and not is_dry_run:
|
|
info_filename_base = os.path.basename(dest_path)
|
|
final_size = transferred_size if latest_backup_path else source_size
|
|
|
|
if is_compressed:
|
|
self.logger.log(f"Compression requested for {dest_path}")
|
|
queue.put(('status_update', 'Phase 2/2: Komprimiere Backup...'))
|
|
queue.put(('progress_mode', 'indeterminate'))
|
|
queue.put(('cancel_button_state', 'disabled'))
|
|
|
|
if self._compress_and_cleanup(dest_path, is_system):
|
|
info_filename_base += ".tar.gz"
|
|
else:
|
|
self.logger.log("Compression failed, keeping uncompressed backup.")
|
|
|
|
queue.put(('progress_mode', 'determinate'))
|
|
queue.put(('cancel_button_state', 'normal'))
|
|
|
|
self._create_info_file(
|
|
dest_path, f"{info_filename_base}.txt", final_size)
|
|
|
|
queue.put(('completion', {'status': status, 'returncode': return_code}))
|
|
else:
|
|
self.logger.log(
|
|
"Rsync process did not start or self.process is None.")
|
|
queue.put(('completion', {'status': 'error', 'returncode': -1}))
|
|
|
|
self.logger.log(
|
|
f"Backup to '{dest_path}' completed.")
|
|
finally:
|
|
self.process = None
|
|
|
|
def _create_info_file(self, dest_path: str, filename: str, source_size: int):
|
|
try:
|
|
# Info file is now stored in the parent directory of the backup folder.
|
|
parent_dir = os.path.dirname(dest_path)
|
|
info_file_path = os.path.join(parent_dir, filename)
|
|
|
|
original_bytes = source_size
|
|
if source_size > 0:
|
|
power = 1024
|
|
n = 0
|
|
power_labels = {0: 'B', 1: 'KB', 2: 'MB', 3: 'GB', 4: 'TB'}
|
|
display_size = original_bytes
|
|
while display_size >= power and n < len(power_labels) - 1:
|
|
display_size /= power
|
|
n += 1
|
|
size_str = f"{display_size:.2f} {power_labels[n]}"
|
|
else:
|
|
size_str = "0 B"
|
|
date_str = datetime.datetime.now().strftime("%d. %B %Y, %H:%M:%S")
|
|
|
|
info_content = (
|
|
f"Backup-Datum: {date_str}\n"
|
|
f"Originalgröße: {size_str} ({original_bytes} Bytes)\n"
|
|
)
|
|
|
|
self.logger.log(
|
|
f"Attempting to write info file to {info_file_path} as current user.")
|
|
with open(info_file_path, 'w') as f:
|
|
f.write(info_content)
|
|
self.logger.log(
|
|
f"Successfully created metadata file: {info_file_path}")
|
|
|
|
except Exception as e:
|
|
self.logger.log(
|
|
f"Failed to create metadata file. Please check permissions for {os.path.dirname(info_file_path)}. Error: {e}")
|
|
|
|
def _execute_rsync(self, queue, command: List[str]):
|
|
transferred_size = 0
|
|
try:
|
|
try:
|
|
self.process = subprocess.Popen(
|
|
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, preexec_fn=os.setsid)
|
|
except FileNotFoundError:
|
|
self.logger.log(
|
|
"Error: 'pkexec' or 'rsync' command not found in PATH during Popen call.")
|
|
queue.put(('error', None))
|
|
return 0
|
|
except Exception as e:
|
|
self.logger.log(
|
|
f"Error starting rsync process with Popen: {e}")
|
|
queue.put(('error', None))
|
|
return 0
|
|
|
|
if self.process is None: # This check might be redundant if exceptions are caught, but good for safety
|
|
self.logger.log(
|
|
"Error: subprocess.Popen returned None for rsync process (after exception handling).")
|
|
queue.put(('error', None))
|
|
return 0 # Exit early if process didn't start
|
|
|
|
progress_regex = re.compile(r'\s*(\d+)%\s+')
|
|
output_lines = []
|
|
|
|
if self.process.stdout:
|
|
for line in iter(self.process.stdout.readline, ''):
|
|
stripped_line = line.strip()
|
|
self.logger.log(stripped_line)
|
|
output_lines.append(stripped_line)
|
|
|
|
match = progress_regex.search(stripped_line)
|
|
if match:
|
|
percentage = int(match.group(1))
|
|
queue.put(('progress', percentage))
|
|
else:
|
|
if stripped_line and not stripped_line.startswith(('sending incremental file list', 'sent', 'total size')):
|
|
queue.put(('file_update', stripped_line))
|
|
|
|
self.process.wait()
|
|
if self.process.stderr:
|
|
stderr_output = self.process.stderr.read()
|
|
if stderr_output:
|
|
self.logger.log(f"Rsync Error: {stderr_output.strip()}")
|
|
output_lines.extend(stderr_output.strip().split('\n'))
|
|
|
|
# After process completion, parse the output for transferred size
|
|
for line in output_lines:
|
|
if line.startswith('Total transferred file size:'):
|
|
try:
|
|
size_str = line.split(':')[1].strip().split(' ')[0]
|
|
transferred_size = int(size_str.replace(',', '').replace('.', ''))
|
|
self.logger.log(f"Detected transferred size: {transferred_size} bytes")
|
|
break
|
|
except (ValueError, IndexError):
|
|
self.logger.log(f"Could not parse transferred size from line: {line}")
|
|
|
|
except FileNotFoundError:
|
|
self.logger.log(
|
|
"Error: 'rsync' command not found. Please ensure it is installed and in your PATH.")
|
|
queue.put(('error', None))
|
|
except Exception as e:
|
|
self.logger.log(f"An unexpected error occurred: {e}")
|
|
queue.put(('error', None))
|
|
|
|
return transferred_size
|
|
|
|
def start_restore(self, source_path: str, dest_path: str, is_compressed: bool):
|
|
"""Starts a restore process in a separate thread."""
|
|
# We need the queue from the app instance to report progress
|
|
# A bit of a hack, but avoids passing the queue all the way down from the UI
|
|
try:
|
|
queue = self.app.queue
|
|
except AttributeError:
|
|
self.logger.log("Could not get queue from app instance. Restore progress will not be reported.")
|
|
# Create a dummy queue
|
|
from queue import Queue
|
|
queue = Queue()
|
|
|
|
thread = threading.Thread(target=self._run_restore, args=(
|
|
queue, source_path, dest_path, is_compressed))
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
def _run_restore(self, queue, source_path: str, dest_path: str, is_compressed: bool):
|
|
"""Executes the restore logic for a system backup."""
|
|
self.logger.log(f"Starting restore from {source_path} to {dest_path}")
|
|
status = 'error'
|
|
try:
|
|
if is_compressed:
|
|
# For compressed files, we extract to the destination.
|
|
# The -C flag tells tar to change to that directory before extracting.
|
|
script_content = f"tar -xzf '{source_path}' -C '{dest_path}'"
|
|
else:
|
|
# For regular directories, we rsync the content.
|
|
# Ensure source path has a trailing slash to copy contents.
|
|
source = source_path.rstrip('/') + '/'
|
|
script_content = f"rsync -aAXHv '{source}' '{dest_path}'"
|
|
|
|
if self._execute_as_root(script_content):
|
|
self.logger.log("Restore script executed successfully.")
|
|
status = 'success'
|
|
else:
|
|
self.logger.log("Restore script failed.")
|
|
status = 'error'
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"An unexpected error occurred during restore: {e}")
|
|
status = 'error'
|
|
finally:
|
|
# Use a generic completion message for now.
|
|
# The queue processing logic in main_app might need a 'restore_completion' type.
|
|
queue.put(('completion', {'status': status, 'returncode': 0 if status == 'success' else 1}))
|
|
|
|
def get_scheduled_jobs(self) -> List[Dict[str, Any]]:
|
|
jobs_list = []
|
|
try:
|
|
user_cron = CronTab(user=True)
|
|
for job in user_cron:
|
|
if self.app_tag in job.comment:
|
|
details = self._parse_job_comment(job.comment)
|
|
if details:
|
|
jobs_list.append({
|
|
"id": job.comment,
|
|
"active": job.is_enabled(),
|
|
"type": details.get("type", "N/A"),
|
|
"frequency": details.get("freq", "N/A"),
|
|
"destination": details.get("dest", "N/A"),
|
|
"sources": details.get("sources", []),
|
|
"command": job.command
|
|
})
|
|
except Exception as e:
|
|
self.logger.log(f"Error loading cron jobs: {e}")
|
|
return jobs_list
|
|
|
|
def add_scheduled_job(self, job_details: Dict[str, Any]):
|
|
try:
|
|
user_cron = CronTab(user=True)
|
|
job = user_cron.new(
|
|
command=job_details["command"], comment=job_details["comment"])
|
|
|
|
if job_details["frequency"] == "daily":
|
|
job.day.every(1)
|
|
elif job_details["frequency"] == "weekly":
|
|
job.dow.every(1)
|
|
elif job_details["frequency"] == "monthly":
|
|
job.dom.every(1)
|
|
|
|
job.enable()
|
|
user_cron.write()
|
|
self.logger.log(
|
|
f"Job successfully added: {job_details['comment']}")
|
|
except Exception as e:
|
|
self.logger.log(f"Error adding cron job: {e}")
|
|
|
|
def remove_scheduled_job(self, job_id: str):
|
|
try:
|
|
user_cron = CronTab(user=True)
|
|
user_cron.remove_all(comment=job_id)
|
|
user_cron.write()
|
|
self.logger.log(f"Job successfully removed: {job_id}")
|
|
except Exception as e:
|
|
self.logger.log(f"Error removing cron job: {e}")
|
|
|
|
def _parse_job_comment(self, comment: str) -> Dict[str, Any]:
|
|
details = {}
|
|
parts = comment.split("; ")
|
|
for part in parts:
|
|
if ":" in part:
|
|
key, value = part.split(":", 1)
|
|
if key.strip() == "sources":
|
|
details[key.strip()] = [s.strip()
|
|
for s in value.split(",")]
|
|
else:
|
|
details[key.strip()] = value.strip()
|
|
return details
|
|
|
|
def list_backups(self, base_backup_path: str) -> List[str]:
|
|
backups = []
|
|
if os.path.isdir(base_backup_path):
|
|
for item in os.listdir(base_backup_path):
|
|
full_path = os.path.join(base_backup_path, item)
|
|
if os.path.isdir(full_path):
|
|
backups.append(item)
|
|
return sorted(backups, reverse=True)
|
|
|
|
def list_system_backups(self, base_backup_path: str) -> List[Dict[str, str]]:
|
|
"""Lists all system backups found in the pybackup subdirectory."""
|
|
system_backups = []
|
|
pybackup_path = os.path.join(base_backup_path, "pybackup")
|
|
|
|
if not os.path.isdir(pybackup_path):
|
|
return system_backups
|
|
|
|
# Regex to parse folder names like '6-März-2024_system_full' or '6-März-2024_system_full.tar.gz'
|
|
name_regex = re.compile(
|
|
r"^(\d{1,2}-\w+-\d{4})_system_(full|incremental)(\.tar\.gz)?$", re.IGNORECASE)
|
|
|
|
for item in os.listdir(pybackup_path):
|
|
# Skip info files
|
|
if item.endswith('.txt'):
|
|
continue
|
|
|
|
match = name_regex.match(item)
|
|
if not match:
|
|
continue
|
|
|
|
full_path = os.path.join(pybackup_path, item)
|
|
date_str = match.group(1)
|
|
backup_type_base = match.group(2).capitalize()
|
|
is_compressed = match.group(3) is not None
|
|
|
|
backup_type = backup_type_base
|
|
if is_compressed:
|
|
backup_type += " (Compressed)"
|
|
|
|
backup_size = "N/A"
|
|
comment = ""
|
|
|
|
# Info file is named after the backup item (e.g., 'backup_name.txt' or 'backup_name.tar.gz.txt')
|
|
info_file_path = os.path.join(pybackup_path, f"{item}.txt")
|
|
if os.path.exists(info_file_path):
|
|
try:
|
|
with open(info_file_path, 'r') as f:
|
|
for line in f:
|
|
if line.strip().lower().startswith("originalgröße:"):
|
|
size_match = re.search(r":\s*(.*?)\s*\(", line)
|
|
if size_match:
|
|
backup_size = size_match.group(1).strip()
|
|
else:
|
|
backup_size = line.split(":")[1].strip()
|
|
elif line.strip().lower().startswith("kommentar:"):
|
|
comment = line.split(":", 1)[1].strip()
|
|
except Exception as e:
|
|
self.logger.log(
|
|
f"Could not read info file {info_file_path}: {e}")
|
|
|
|
system_backups.append({
|
|
"date": date_str,
|
|
"type": backup_type,
|
|
"size": backup_size,
|
|
"folder_name": item,
|
|
"full_path": full_path,
|
|
"comment": comment,
|
|
"is_compressed": is_compressed
|
|
})
|
|
|
|
# Sort by parsing the date from the folder name
|
|
try:
|
|
system_backups.sort(key=lambda x: datetime.datetime.strptime(
|
|
x['date'], '%d-%B-%Y'), reverse=True)
|
|
except ValueError:
|
|
self.logger.log(
|
|
"Could not sort backups by date due to format mismatch.")
|
|
# Fallback to simple string sort if date parsing fails
|
|
system_backups.sort(key=lambda x: x['folder_name'], reverse=True)
|
|
|
|
return system_backups
|
|
|
|
def list_user_backups(self, base_backup_path: str) -> List[Dict[str, str]]:
|
|
"""Lists all user backups found in the base backup path."""
|
|
user_backups = []
|
|
if not os.path.isdir(base_backup_path):
|
|
return user_backups
|
|
|
|
for item in os.listdir(base_backup_path):
|
|
full_path = os.path.join(base_backup_path, item)
|
|
if not os.path.isdir(full_path):
|
|
continue
|
|
|
|
# NEW: Look for info file in the parent directory, named after the backup folder
|
|
info_file_path = os.path.join(base_backup_path, f"{item}.txt")
|
|
|
|
# We identify a user backup by the presence of its corresponding info file.
|
|
if os.path.exists(info_file_path):
|
|
backup_size = "N/A"
|
|
backup_date = "N/A"
|
|
comment = ""
|
|
try:
|
|
with open(info_file_path, 'r') as f:
|
|
for line in f:
|
|
if line.strip().lower().startswith("originalgröße:"):
|
|
size_match = re.search(r":\s*(.*?)\s*\(", line)
|
|
if size_match:
|
|
backup_size = size_match.group(1).strip()
|
|
else:
|
|
backup_size = line.split(":")[1].strip()
|
|
elif line.strip().lower().startswith("backup-datum:"):
|
|
backup_date = line.split(":", 1)[1].strip()
|
|
elif line.strip().lower().startswith("kommentar:"):
|
|
comment = line.split(":", 1)[1].strip()
|
|
except Exception as e:
|
|
self.logger.log(
|
|
f"Could not read info file {info_file_path}: {e}")
|
|
|
|
user_backups.append({
|
|
"date": backup_date,
|
|
"size": backup_size,
|
|
"folder_name": item,
|
|
"full_path": full_path,
|
|
"comment": comment
|
|
})
|
|
|
|
user_backups.sort(key=lambda x: x['folder_name'], reverse=True)
|
|
return user_backups
|
|
|
|
def get_comment(self, info_file_path: str) -> str:
|
|
"""Reads an info file and returns the comment, if it exists."""
|
|
if not os.path.exists(info_file_path):
|
|
return ""
|
|
try:
|
|
with open(info_file_path, 'r') as f:
|
|
for line in f:
|
|
if line.strip().lower().startswith("kommentar:"):
|
|
return line.split(":", 1)[1].strip()
|
|
except Exception as e:
|
|
self.logger.log(f"Error reading comment from {info_file_path}: {e}")
|
|
return ""
|
|
|
|
def update_comment(self, info_file_path: str, new_comment: str):
|
|
"""Updates the comment in a given info file."""
|
|
try:
|
|
lines = []
|
|
comment_found = False
|
|
if os.path.exists(info_file_path):
|
|
with open(info_file_path, 'r') as f:
|
|
lines = f.readlines()
|
|
|
|
new_lines = []
|
|
for line in lines:
|
|
if line.strip().lower().startswith("kommentar:"):
|
|
if new_comment: # Update existing comment
|
|
new_lines.append(f"Kommentar: {new_comment}\n")
|
|
comment_found = True
|
|
# If new_comment is empty, the old line is effectively deleted
|
|
else:
|
|
new_lines.append(line)
|
|
|
|
if not comment_found and new_comment:
|
|
new_lines.append(f"Kommentar: {new_comment}\n")
|
|
|
|
with open(info_file_path, 'w') as f:
|
|
f.writelines(new_lines)
|
|
self.logger.log(f"Successfully updated comment in {info_file_path}")
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"Error updating comment in {info_file_path}: {e}")
|
|
|
|
def test_pkexec_rsync(self, source_path: str, dest_path: str):
|
|
self.logger.log(f"Testing pkexec rsync command...")
|
|
command = ['pkexec', 'rsync', '-aAXHv', source_path, dest_path]
|
|
try:
|
|
result = subprocess.run(
|
|
command, capture_output=True, text=True, check=False)
|
|
self.logger.log(f"pkexec rsync return code: {result.returncode}")
|
|
self.logger.log(f"pkexec rsync stdout: {result.stdout.strip()}")
|
|
self.logger.log(f"pkexec rsync stderr: {result.stderr.strip()}")
|
|
except FileNotFoundError:
|
|
self.logger.log("Error: 'pkexec' or 'rsync' command not found.")
|
|
except Exception as e:
|
|
self.logger.log(
|
|
f"An unexpected error occurred during pkexec rsync test: {e}")
|