Files
Py-Backup/backup_manager.py

797 lines
36 KiB
Python

import subprocess
import os
import threading
import re
import signal
import datetime
from typing import Optional, List, Dict, Any
from pathlib import Path
from crontab import CronTab
import tempfile
import stat
import shutil
from pbp_app_config import AppConfig
from pyimage_ui.password_dialog import PasswordDialog
from core.encryption_manager import EncryptionManager
class BackupManager:
"""
Handles the logic for creating and managing backups using rsync.
"""
def __init__(self, logger, app=None):
self.logger = logger
self.process = None
self.app_tag = "# Py-Backup Job"
self.is_system_process = False
self.app = app
self.encryption_manager = EncryptionManager(logger, app)
def cancel_and_delete_privileged_backup(self, delete_path: str):
"""Cancels a running system backup and deletes the target directory in one atomic pkexec call."""
if not self.process or self.process.poll() is not None:
self.logger.log("No active backup process to cancel.")
return
self.logger.log(
"Attempting to cancel backup and delete directory with root privileges...")
try:
pgid = os.getpgid(self.process.pid)
script_parts = [
f"echo 'Attempting to terminate process group {pgid}'",
f"kill -SIGTERM -- -{pgid} || echo 'Process group {pgid} not found or already terminated.'",
f"echo 'Attempting to delete directory {delete_path}'",
f'if [ -n "{delete_path}" ] && [ "{delete_path}" != "/" ]; then',
f' rm -rf "{delete_path}"',
f'fi'
]
script_content = "\n".join(script_parts)
if self._execute_as_root(script_content):
self.logger.log(
"Backup cancellation and deletion script succeeded.")
else:
self.logger.log(
"Backup cancellation and deletion script failed.")
except ProcessLookupError:
self.logger.log("Backup process already terminated before action.")
# Still try to delete the directory
self.delete_privileged_path(delete_path)
except Exception as e:
self.logger.log(
f"An error occurred during privileged cancel and delete: {e}")
def delete_privileged_path(self, path: str):
"""Deletes a given path using root privileges."""
self.logger.log(f"Requesting privileged deletion of: {path}")
if not path or path == "/":
self.logger.log("Invalid path for deletion provided.")
return
script_content = f'rm -rf "{path}"'
if self._execute_as_root(script_content):
self.logger.log(f"Successfully deleted path: {path}")
else:
self.logger.log(f"Failed to delete path: {path}")
def start_delete_system_backup(self, path: str, queue):
"""Starts a threaded system backup deletion."""
thread = threading.Thread(target=self._run_delete, args=(path, queue))
thread.daemon = True
thread.start()
def _run_delete(self, path: str, queue):
"""Runs the deletion and puts a message on the queue when done."""
try:
info_file = f"{path}.txt"
# Build a script to remove both the folder and the info file in one go.
# Use -f to avoid errors if the info file doesn't exist.
script_content = f"""
rm -rf '{path}'
rm -f '{info_file}'
"""
if self._execute_as_root(script_content):
self.logger.log(f"Successfully deleted {path} and {info_file}")
queue.put(('deletion_complete', True))
else:
self.logger.log(f"Failed to delete {path}")
queue.put(('deletion_complete', False))
except Exception as e:
self.logger.log(f"Error during threaded deletion: {e}")
queue.put(('deletion_complete', False))
def cancel_backup(self):
if self.process and self.process.poll() is None: # Check if process is still running
self.logger.log("Attempting to cancel backup...")
try:
pgid = os.getpgid(self.process.pid)
if self.is_system_process:
self.logger.log(
f"Cancelling system process with pgid {pgid} via privileged script.")
script_content = f"kill -SIGTERM -- -{pgid}"
self._execute_as_root(script_content)
else:
os.killpg(pgid, signal.SIGTERM)
self.logger.log("Backup process terminated.")
except ProcessLookupError:
self.logger.log(
"Backup process already terminated or not found.")
except Exception as e:
self.logger.log(f"Failed to terminate backup process: {e}")
else:
self.logger.log("No active backup process to cancel.")
def start_backup(self, queue, source_path: str, dest_path: str, is_system: bool, is_dry_run: bool = False, exclude_files: Optional[List[Path]] = None, source_size: int = 0, is_compressed: bool = False, is_encrypted: bool = False, mode: str = "incremental", password: str = None):
thread = threading.Thread(target=self._run_backup_path, args=(
queue, source_path, dest_path, is_system, is_dry_run, exclude_files, source_size, is_compressed, is_encrypted, mode, password))
thread.daemon = True
thread.start()
def _find_latest_backup(self, base_backup_path: str) -> Optional[str]:
"""Finds the most recent backup directory in a given path."""
self.logger.log(f"Searching for latest backup in: {base_backup_path}")
backup_names = self.list_backups(base_backup_path)
if not backup_names:
self.logger.log("No previous backups found to link against.")
return None
latest_backup_name = backup_names[0]
latest_backup_path = os.path.join(base_backup_path, latest_backup_name)
if os.path.isdir(latest_backup_path):
self.logger.log(f"Found latest backup for --link-dest: {latest_backup_path}")
return latest_backup_path
self.logger.log(f"Latest backup entry '{latest_backup_name}' was not a directory. No link will be used.")
return None
def _compress_and_cleanup(self, dest_path: str, is_system: bool) -> bool:
"""Compresses the backup directory and cleans up the original."""
self.logger.log(f"Starting compression for: {dest_path}")
parent_dir = os.path.dirname(dest_path)
archive_name = os.path.basename(dest_path) + ".tar.gz"
archive_path = os.path.join(parent_dir, archive_name)
# Using -C is important to avoid storing the full path in the tarball
# Ensure paths with spaces are quoted for the shell script
tar_command = f"tar -czf '{archive_path}' -C '{parent_dir}' '{os.path.basename(dest_path)}'"
rm_command = f"rm -rf '{dest_path}'"
script_content = f"""
#!/bin/bash
set -e
{tar_command}
echo \"tar command finished with exit code $?.\"
{rm_command}
echo \"rm command finished with exit code $?.\"
"""
if is_system:
self.logger.log("Executing compression and cleanup as root.")
if self._execute_as_root(script_content):
self.logger.log("Compression and cleanup script executed successfully.")
return True
else:
self.logger.log("Compression and cleanup script failed.")
return False
else:
# For non-system backups, run commands directly
try:
self.logger.log(f"Executing local command: {tar_command}")
tar_result = subprocess.run(tar_command, shell=True, capture_output=True, text=True, check=True)
self.logger.log(f"tar command successful. Output: {tar_result.stdout}")
self.logger.log(f"Executing local command: {rm_command}")
rm_result = subprocess.run(rm_command, shell=True, capture_output=True, text=True, check=True)
self.logger.log(f"rm command successful. Output: {rm_result.stdout}")
return True
except subprocess.CalledProcessError as e:
self.logger.log(f"A command failed during local compression/cleanup. Return code: {e.returncode}")
self.logger.log(f"Stdout: {e.stdout}")
self.logger.log(f"Stderr: {e.stderr}")
return False
except Exception as e:
self.logger.log(f"An unexpected error occurred during local compression/cleanup: {e}")
return False
def _run_backup_path(self, queue, source_path: str, dest_path: str, is_system: bool, is_dry_run: bool, exclude_files: Optional[List[Path]], source_size: int, is_compressed: bool, is_encrypted: bool, mode: str, password: str):
try:
if is_encrypted:
# For encrypted backups, the dest_path is the container file.
container_path = dest_path + ".luks"
# Estimate container size to be 110% of source size
size_gb = int(source_size / (1024**3) * 1.1) + 1
mount_point = self.encryption_manager.setup_encrypted_backup(queue, container_path, size_gb, password)
if not mount_point:
return # Error or cancellation already handled in setup method
# The actual destination for rsync is the mount point
rsync_dest = mount_point
else:
rsync_dest = dest_path
self.logger.log(
f"Starting backup from '{source_path}' to '{dest_path}'...")
if os.path.isdir(source_path) and not source_path.endswith('/'):
source_path += '/'
parent_dest = os.path.dirname(dest_path)
# Ensure the parent directory exists. For system backups, rsync with pkexec will create the final destination.
# For user backups, this creates the destination.
if not os.path.exists(parent_dest):
os.makedirs(parent_dest, exist_ok=True)
latest_backup_path = self._find_latest_backup(parent_dest)
command = []
if is_system:
command.extend(['pkexec', 'rsync', '-aAXHv'])
else:
command.extend(['rsync', '-av'])
if mode == "incremental" and latest_backup_path and not is_dry_run and not is_encrypted:
self.logger.log(f"Using --link-dest='{latest_backup_path}'")
command.append(f"--link-dest={latest_backup_path}")
command.extend(['--info=progress2'])
if exclude_files:
for exclude_file in exclude_files:
command.append(f"--exclude-from={exclude_file}")
if AppConfig.MANUAL_EXCLUDE_LIST_PATH.exists():
command.append(f"--exclude-from={AppConfig.MANUAL_EXCLUDE_LIST_PATH}")
if is_dry_run:
command.append('--dry-run')
command.extend([source_path, rsync_dest])
self.logger.log(f"Rsync command: {' '.join(command)}")
transferred_size, total_size = self._execute_rsync(queue, command)
self.logger.log(f"_execute_rsync returned: transferred_size={transferred_size}, total_size={total_size}")
if self.process:
return_code = self.process.returncode
self.logger.log(
f"Rsync process finished with return code: {return_code}")
status = 'error'
if return_code == 0:
status = 'success'
elif return_code in [23, 24]: # rsync warnings
status = 'warning'
elif return_code in [143, -15, 15, -9]: # SIGTERM/SIGKILL
status = 'cancelled'
if status in ['success', 'warning'] and not is_dry_run:
info_filename_base = os.path.basename(dest_path)
self.logger.log(f"latest_backup_path: {latest_backup_path}")
self.logger.log(f"source_size (from UI): {source_size}")
if mode == "full": # If explicitly a full backup
final_size = total_size if total_size > 0 else source_size
self.logger.log(f"Explicit Full backup: final_size set to {final_size} (total_size if >0 else source_size)")
elif latest_backup_path is None: # This was the first backup to this location (implicitly full)
final_size = total_size if total_size > 0 else source_size
self.logger.log(f"Implicit Full backup (first to location): final_size set to {final_size} (total_size if >0 else source_size)")
else: # This was an incremental backup
final_size = transferred_size
self.logger.log(f"Incremental backup: final_size set to {final_size} (transferred_size)")
if is_compressed:
self.logger.log(f"Compression requested for {dest_path}")
queue.put(('status_update', 'Phase 2/2: Komprimiere Backup...'))
queue.put(('progress_mode', 'indeterminate'))
queue.put(('cancel_button_state', 'disabled'))
if self._compress_and_cleanup(dest_path, is_system):
info_filename_base += ".tar.gz"
else:
self.logger.log("Compression failed, keeping uncompressed backup.")
queue.put(('progress_mode', 'determinate'))
queue.put(('cancel_button_state', 'normal'))
self._create_info_file(
dest_path, f"{info_filename_base}.txt", final_size)
queue.put(('completion', {'status': status, 'returncode': return_code}))
else:
self.logger.log(
"Rsync process did not start or self.process is None.")
queue.put(('completion', {'status': 'error', 'returncode': -1}))
self.logger.log(
f"Backup to '{dest_path}' completed.")
finally:
if is_encrypted and 'mount_point' in locals() and mount_point:
self.encryption_manager.cleanup_encrypted_backup(f"pybackup_{os.path.basename(dest_path + '.luks')}", mount_point)
self.process = None
def _create_info_file(self, dest_path: str, filename: str, source_size: int):
try:
# Info file is now stored in the parent directory of the backup folder.
parent_dir = os.path.dirname(dest_path)
info_file_path = os.path.join(parent_dir, filename)
original_bytes = source_size
if source_size > 0:
power = 1024
n = 0
power_labels = {0: 'B', 1: 'KB', 2: 'MB', 3: 'GB', 4: 'TB'}
display_size = original_bytes
while display_size >= power and n < len(power_labels) - 1:
display_size /= power
n += 1
size_str = f"{display_size:.2f} {power_labels[n]}"
else:
size_str = "0 B"
date_str = datetime.datetime.now().strftime("%d. %B %Y, %H:%M:%S")
info_content = (
f"Backup-Datum: {date_str}\n"
f"Originalgröße: {size_str} ({original_bytes} Bytes)\n"
)
self.logger.log(
f"Attempting to write info file to {info_file_path} as current user.")
with open(info_file_path, 'w') as f:
f.write(info_content)
self.logger.log(
f"Successfully created metadata file: {info_file_path}")
except Exception as e:
self.logger.log(
f"Failed to create metadata file. Please check permissions for {os.path.dirname(info_file_path)}. Error: {e}")
def _execute_rsync(self, queue, command: List[str]):
transferred_size = 0
total_size = 0
try:
try:
# Force C locale to ensure rsync output is in English for parsing
env = os.environ.copy()
env['LC_ALL'] = 'C'
self.process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, preexec_fn=os.setsid, env=env)
except FileNotFoundError:
self.logger.log(
"Error: 'pkexec' or 'rsync' command not found in PATH during Popen call.")
queue.put(('error', None))
return 0, 0
except Exception as e:
self.logger.log(
f"Error starting rsync process with Popen: {e}")
queue.put(('error', None))
return 0, 0
if self.process is None: # This check might be redundant if exceptions are caught, but good for safety
self.logger.log(
"Error: subprocess.Popen returned None for rsync process (after exception handling).")
queue.put(('error', None))
return 0, 0 # Exit early if process didn't start
progress_regex = re.compile(r'\s*(\d+)%\s+')
output_lines = []
if self.process.stdout:
full_stdout = []
for line in iter(self.process.stdout.readline, ''):
stripped_line = line.strip()
self.logger.log(f"Rsync stdout line: {stripped_line}") # Log every line
full_stdout.append(stripped_line)
match = progress_regex.search(stripped_line)
if match:
percentage = int(match.group(1))
queue.put(('progress', percentage))
else:
if stripped_line and not stripped_line.startswith(('sending incremental file list', 'sent', 'total size')):
queue.put(('file_update', stripped_line))
self.process.wait()
if self.process.stderr:
stderr_output = self.process.stderr.read()
if stderr_output:
self.logger.log(f"Rsync Stderr: {stderr_output.strip()}") # Log stderr
full_stdout.extend(stderr_output.strip().split('\n')) # Add stderr to output_lines for parsing
output_lines = full_stdout # Use the collected stdout/stderr for parsing
# After process completion, parse the output for transferred size.
# This is tricky because the output format can vary. We'll try to find the
# summary line from --info=progress2, which looks like "sent X bytes received Y bytes".
transferred_size = 0
total_size = 0
summary_regex = re.compile(r"sent ([\d,.]+) bytes\s+received ([\d,.]+) bytes")
total_size_regex = re.compile(r"total size is ([\d,.]+) speedup")
for line in reversed(output_lines): # Search from the end, as summary is usually last
match = summary_regex.search(line)
if match and transferred_size == 0: # Only set if not already found
try:
sent_str = match.group(1).replace(',', '').replace('.', '')
received_str = match.group(2).replace(',', '').replace('.', '')
bytes_sent = int(sent_str)
bytes_received = int(received_str)
transferred_size = bytes_sent + bytes_received
self.logger.log(
f"Detected total bytes transferred from summary: {transferred_size} bytes")
except (ValueError, IndexError) as e:
self.logger.log(
f"Could not parse sent/received bytes from line: '{line}'. Error: {e}")
total_match = total_size_regex.search(line)
if total_match and total_size == 0: # Only set if not already found
try:
total_size_str = total_match.group(1).replace(',', '').replace('.', '')
total_size = int(total_size_str)
self.logger.log(f"Detected total size from summary: {total_size} bytes")
except(ValueError, IndexError) as e:
self.logger.log(f"Could not parse total size from line: '{line}'. Error: {e}")
self.logger.log(f"_execute_rsync final parsed values: transferred_size={transferred_size}, total_size={total_size}")
if transferred_size == 0:
# Fallback for --stats format if the regex fails
bytes_sent = 0
bytes_received = 0
for line in output_lines:
if line.strip().startswith('Total bytes sent:'):
try:
size_str = line.split(':')[1].strip()
bytes_sent = int(size_str.replace(',', '').replace('.', ''))
except (ValueError, IndexError):
self.logger.log(f"Could not parse bytes sent from line: {line}")
elif line.strip().startswith('Total bytes received:'):
try:
size_str = line.split(':')[1].strip()
bytes_received = int(size_str.replace(',', '').replace('.', ''))
except (ValueError, IndexError):
self.logger.log(f"Could not parse bytes received from line: {line}")
if bytes_sent > 0 or bytes_received > 0:
transferred_size = bytes_sent + bytes_received
self.logger.log(
f"Detected total bytes transferred from --stats: {transferred_size} bytes")
else:
self.logger.log(
"Could not determine transferred size from rsync output. Size will be 0.")
except FileNotFoundError:
self.logger.log(
"Error: 'rsync' command not found. Please ensure it is installed and in your PATH.")
queue.put(('error', None))
except Exception as e:
self.logger.log(f"An unexpected error occurred: {e}")
queue.put(('error', None))
return transferred_size, total_size
def start_restore(self, source_path: str, dest_path: str, is_compressed: bool):
"""Starts a restore process in a separate thread."""
# We need the queue from the app instance to report progress
# A bit of a hack, but avoids passing the queue all the way down from the UI
try:
queue = self.app.queue
except AttributeError:
self.logger.log("Could not get queue from app instance. Restore progress will not be reported.")
# Create a dummy queue
from queue import Queue
queue = Queue()
thread = threading.Thread(target=self._run_restore, args=(
queue, source_path, dest_path, is_compressed))
thread.daemon = True
thread.start()
def _run_restore(self, queue, source_path: str, dest_path: str, is_compressed: bool):
"""Executes the restore logic for a system backup."""
self.logger.log(f"Starting restore from {source_path} to {dest_path}")
status = 'error'
try:
if is_compressed:
# For compressed files, we extract to the destination.
# The -C flag tells tar to change to that directory before extracting.
script_content = f"tar -xzf '{source_path}' -C '{dest_path}'"
else:
# For regular directories, we rsync the content.
# Ensure source path has a trailing slash to copy contents.
source = source_path.rstrip('/') + '/'
script_content = f"rsync -aAXHv '{source}' '{dest_path}'"
if self._execute_as_root(script_content):
self.logger.log("Restore script executed successfully.")
status = 'success'
else:
self.logger.log("Restore script failed.")
status = 'error'
except Exception as e:
self.logger.log(f"An unexpected error occurred during restore: {e}")
status = 'error'
finally:
# Use a generic completion message for now.
# The queue processing logic in main_app might need a 'restore_completion' type.
queue.put(('completion', {'status': status, 'returncode': 0 if status == 'success' else 1}))
def get_scheduled_jobs(self) -> List[Dict[str, Any]]:
jobs_list = []
try:
user_cron = CronTab(user=True)
for job in user_cron:
if self.app_tag in job.comment:
details = self._parse_job_comment(job.comment)
if details:
jobs_list.append({
"id": job.comment,
"active": job.is_enabled(),
"type": details.get("type", "N/A"),
"frequency": details.get("freq", "N/A"),
"destination": details.get("dest", "N/A"),
"sources": details.get("sources", []),
"command": job.command
})
except Exception as e:
self.logger.log(f"Error loading cron jobs: {e}")
return jobs_list
def add_scheduled_job(self, job_details: Dict[str, Any]):
try:
user_cron = CronTab(user=True)
job = user_cron.new(
command=job_details["command"], comment=job_details["comment"])
if job_details["frequency"] == "daily":
job.day.every(1)
elif job_details["frequency"] == "weekly":
job.dow.every(1)
elif job_details["frequency"] == "monthly":
job.dom.every(1)
job.enable()
user_cron.write()
self.logger.log(
f"Job successfully added: {job_details['comment']}")
except Exception as e:
self.logger.log(f"Error adding cron job: {e}")
def remove_scheduled_job(self, job_id: str):
try:
user_cron = CronTab(user=True)
user_cron.remove_all(comment=job_id)
user_cron.write()
self.logger.log(f"Job successfully removed: {job_id}")
except Exception as e:
self.logger.log(f"Error removing cron job: {e}")
def _parse_job_comment(self, comment: str) -> Dict[str, Any]:
details = {}
parts = comment.split("; ")
for part in parts:
if ":" in part:
key, value = part.split(":", 1)
if key.strip() == "sources":
details[key.strip()] = [s.strip()
for s in value.split(",")]
else:
details[key.strip()] = value.strip()
return details
def has_encrypted_backups(self, base_backup_path: str) -> bool:
"""Checks if any encrypted system backups exist in the destination."""
system_backups = self.list_system_backups(base_backup_path)
for backup in system_backups:
if backup.get('is_encrypted'):
return True
return False
def list_backups(self, base_backup_path: str) -> List[str]:
backups = []
if os.path.isdir(base_backup_path):
for item in os.listdir(base_backup_path):
full_path = os.path.join(base_backup_path, item)
if os.path.isdir(full_path):
backups.append(item)
return sorted(backups, reverse=True)
def list_system_backups(self, base_backup_path: str) -> List[Dict[str, str]]:
"""Lists all system backups found in the pybackup subdirectory."""
system_backups = []
pybackup_path = os.path.join(base_backup_path, "pybackup")
if not os.path.isdir(pybackup_path):
return system_backups
# Regex to parse folder names like '6-März-2024_143000_system_full' or '6-März-2024_143000_system_full.tar.gz'
name_regex = re.compile(
r"^(\d{1,2}-\w+-\d{4})_(\d{6})_system_(full|incremental)(\.tar\.gz|\.luks)?$", re.IGNORECASE)
for item in os.listdir(pybackup_path):
# Skip info files
if item.endswith('.txt'):
continue
match = name_regex.match(item)
if not match:
continue
full_path = os.path.join(pybackup_path, item)
date_str = match.group(1)
# time_str = match.group(2) # Not currently used in UI, but available
backup_type_base = match.group(3).capitalize()
extension = match.group(4)
is_compressed = (extension == ".tar.gz")
is_encrypted = (extension == ".luks")
backup_type = backup_type_base
if is_compressed:
backup_type += " (Compressed)"
elif is_encrypted:
backup_type += " (Encrypted)"
backup_size = "N/A"
comment = ""
# Info file is named after the backup item (e.g., 'backup_name.txt' or 'backup_name.tar.gz.txt')
info_file_path = os.path.join(pybackup_path, f"{item}.txt")
if os.path.exists(info_file_path):
try:
with open(info_file_path, 'r') as f:
for line in f:
if line.strip().lower().startswith("originalgröße:"):
size_match = re.search(r":\s*(.*?)\s*\(", line)
if size_match:
backup_size = size_match.group(1).strip()
else:
backup_size = line.split(":")[1].strip()
elif line.strip().lower().startswith("kommentar:"):
comment = line.split(":", 1)[1].strip()
except Exception as e:
self.logger.log(
f"Could not read info file {info_file_path}: {e}")
system_backups.append({
"date": date_str,
"type": backup_type,
"size": backup_size,
"folder_name": item,
"full_path": full_path,
"comment": comment,
"is_compressed": is_compressed,
"is_encrypted": is_encrypted
})
# Sort by parsing the date from the folder name
try:
system_backups.sort(key=lambda x: datetime.datetime.strptime(
x['date'], '%d-%B-%Y'), reverse=True)
except ValueError:
self.logger.log(
"Could not sort backups by date due to format mismatch.")
# Fallback to simple string sort if date parsing fails
system_backups.sort(key=lambda x: x['folder_name'], reverse=True)
return system_backups
def list_user_backups(self, base_backup_path: str) -> List[Dict[str, str]]:
"""Lists all user backups found in the base backup path."""
user_backups = []
if not os.path.isdir(base_backup_path):
return user_backups
for item in os.listdir(base_backup_path):
full_path = os.path.join(base_backup_path, item)
if not os.path.isdir(full_path):
continue
# NEW: Look for info file in the parent directory, named after the backup folder
info_file_path = os.path.join(base_backup_path, f"{item}.txt")
# We identify a user backup by the presence of its corresponding info file.
if os.path.exists(info_file_path):
backup_size = "N/A"
backup_date = "N/A"
comment = ""
try:
with open(info_file_path, 'r') as f:
for line in f:
if line.strip().lower().startswith("originalgröße:"):
size_match = re.search(r":\s*(.*?)\s*\(", line)
if size_match:
backup_size = size_match.group(1).strip()
else:
backup_size = line.split(":")[1].strip()
elif line.strip().lower().startswith("backup-datum:"):
backup_date = line.split(":", 1)[1].strip()
elif line.strip().lower().startswith("kommentar:"):
comment = line.split(":", 1)[1].strip()
except Exception as e:
self.logger.log(
f"Could not read info file {info_file_path}: {e}")
user_backups.append({
"date": backup_date,
"size": backup_size,
"folder_name": item,
"full_path": full_path,
"comment": comment
})
user_backups.sort(key=lambda x: x['folder_name'], reverse=True)
return user_backups
def get_comment(self, info_file_path: str) -> str:
"""Reads an info file and returns the comment, if it exists."""
if not os.path.exists(info_file_path):
return ""
try:
with open(info_file_path, 'r') as f:
for line in f:
if line.strip().lower().startswith("kommentar:"):
return line.split(":", 1)[1].strip()
except Exception as e:
self.logger.log(f"Error reading comment from {info_file_path}: {e}")
return ""
def update_comment(self, info_file_path: str, new_comment: str):
"""Updates the comment in a given info file."""
try:
lines = []
comment_found = False
if os.path.exists(info_file_path):
with open(info_file_path, 'r') as f:
lines = f.readlines()
new_lines = []
for line in lines:
if line.strip().lower().startswith("kommentar:"):
if new_comment: # Update existing comment
new_lines.append(f"Kommentar: {new_comment}\n")
comment_found = True
# If new_comment is empty, the old line is effectively deleted
else:
new_lines.append(line)
if not comment_found and new_comment:
new_lines.append(f"Kommentar: {new_comment}\n")
with open(info_file_path, 'w') as f:
f.writelines(new_lines)
self.logger.log(f"Successfully updated comment in {info_file_path}")
except Exception as e:
self.logger.log(f"Error updating comment in {info_file_path}: {e}")
def test_pkexec_rsync(self, source_path: str, dest_path: str):
self.logger.log(f"Testing pkexec rsync command...")
command = ['pkexec', 'rsync', '-aAXHv', source_path, dest_path]
try:
result = subprocess.run(
command, capture_output=True, text=True, check=False)
self.logger.log(f"pkexec rsync return code: {result.returncode}")
self.logger.log(f"pkexec rsync stdout: {result.stdout.strip()}")
self.logger.log(f"pkexec rsync stderr: {result.stderr.strip()}")
except FileNotFoundError:
self.logger.log("Error: 'pkexec' or 'rsync' command not found.")
except Exception as e:
self.logger.log(
f"An unexpected error occurred during pkexec rsync test: {e}")