Files
Py-Backup/backup_manager.py

176 lines
7.0 KiB
Python

import subprocess
import os
import threading
import re
import datetime
import signal
from typing import Callable, Optional, List, Dict, Any
from pathlib import Path
from crontab import CronTab
class BackupManager:
"""
Handles the logic for creating and managing backups using rsync.
"""
def __init__(self, logger):
self.logger = logger
self.process = None
self.app_tag = "# Py-Backup Job"
def pause_backup(self):
if self.process and self.process.poll() is None:
os.killpg(os.getpgid(self.process.pid), signal.SIGSTOP)
self.logger.log("Backup paused.")
def resume_backup(self):
if self.process and self.process.poll() is None:
os.killpg(os.getpgid(self.process.pid), signal.SIGCONT)
self.logger.log("Backup resumed.")
def start_backup(self, source_path: str, dest_path: str, is_system: bool, is_dry_run: bool = False, exclude_files: Optional[List[Path]] = None, on_progress: Optional[Callable[[int], None]] = None, on_completion: Optional[Callable[[], None]] = None, on_error: Optional[Callable[[], None]] = None):
"""Starts a generic backup process for a specific path."""
thread = threading.Thread(target=self._run_backup_path, args=(
source_path, dest_path, is_system, is_dry_run, exclude_files, on_progress, on_completion, on_error))
thread.daemon = True
thread.start()
def _run_backup_path(self, source_path: str, dest_path: str, is_system: bool, is_dry_run: bool, exclude_files: Optional[List[Path]], on_progress: Optional[Callable[[int], None]], on_completion: Optional[Callable[[], None]], on_error: Optional[Callable[[], None]]):
try:
self.logger.log(
f"Starting backup from '{source_path}' to '{dest_path}'...")
if os.path.isdir(source_path) and not source_path.endswith('/'):
source_path += '/'
parent_dest = os.path.dirname(dest_path)
if not os.path.exists(parent_dest):
os.makedirs(parent_dest, exist_ok=True)
command = []
if is_system:
command.extend(['pkexec', 'rsync', '-aAXH'])
else:
command.extend(['rsync', '-a'])
command.extend(['--info=progress2'])
if is_dry_run:
command.append('--dry-run')
command.extend([source_path, dest_path])
self._execute_rsync(command, exclude_files, on_progress, on_error)
self.logger.log(
f"Backup to '{dest_path}' completed.")
finally:
if on_completion:
on_completion()
def _execute_rsync(self, command: List[str], exclude_files: Optional[List[Path]] = None, on_progress: Optional[Callable[[int], None]] = None, on_error: Optional[Callable[[], None]] = None):
if exclude_files:
for exclude_file in exclude_files:
command.insert(1, f"--exclude-from={exclude_file}")
try:
self.process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, preexec_fn=os.setsid)
progress_regex = re.compile(r'\s*(\d+)%')
if self.process.stdout:
for line in iter(self.process.stdout.readline, ''):
self.logger.log(line.strip())
match = progress_regex.search(line)
if match and on_progress:
percentage = int(match.group(1))
on_progress(percentage)
self.process.wait()
if self.process.stderr:
stderr_output = self.process.stderr.read()
if stderr_output:
self.logger.log(f"Rsync Error: {stderr_output.strip()}")
if on_error:
on_error()
except FileNotFoundError:
self.logger.log(
"Error: 'rsync' command not found. Please ensure it is installed and in your PATH.")
if on_error:
on_error()
except Exception as e:
self.logger.log(f"An unexpected error occurred: {e}")
if on_error:
on_error()
def get_scheduled_jobs(self) -> List[Dict[str, Any]]:
jobs_list = []
try:
user_cron = CronTab(user=True)
for job in user_cron:
if self.app_tag in job.comment:
details = self._parse_job_comment(job.comment)
if details:
jobs_list.append({
"id": job.comment,
"active": job.is_enabled(),
"type": details.get("type", "N/A"),
"frequency": details.get("freq", "N/A"),
"destination": details.get("dest", "N/A"),
"sources": details.get("sources", []),
"command": job.command
})
except Exception as e:
self.logger.log(f"Error loading cron jobs: {e}")
return jobs_list
def add_scheduled_job(self, job_details: Dict[str, Any]):
try:
user_cron = CronTab(user=True)
job = user_cron.new(
command=job_details["command"], comment=job_details["comment"])
if job_details["frequency"] == "daily":
job.day.every(1)
elif job_details["frequency"] == "weekly":
job.dow.every(1)
elif job_details["frequency"] == "monthly":
job.dom.every(1)
job.enable()
user_cron.write()
self.logger.log(
f"Job successfully added: {job_details['comment']}")
except Exception as e:
self.logger.log(f"Error adding cron job: {e}")
def remove_scheduled_job(self, job_id: str):
try:
user_cron = CronTab(user=True)
user_cron.remove_all(comment=job_id)
user_cron.write()
self.logger.log(f"Job successfully removed: {job_id}")
except Exception as e:
self.logger.log(f"Error removing cron job: {e}")
def _parse_job_comment(self, comment: str) -> Dict[str, Any]:
details = {}
parts = comment.split("; ")
for part in parts:
if ":" in part:
key, value = part.split(":", 1)
if key.strip() == "sources":
details[key.strip()] = [s.strip()
for s in value.split(",")]
else:
details[key.strip()] = value.strip()
return details
def list_backups(self, base_backup_path: str) -> List[str]:
backups = []
if os.path.isdir(base_backup_path):
for item in os.listdir(base_backup_path):
full_path = os.path.join(base_backup_path, item)
if os.path.isdir(full_path):
backups.append(item)
return sorted(backups, reverse=True)