Files
Py-Backup/core/data_processing.py
Désiré Werner Menrath 058dc1e951 feat: Add manual exclude list functionality
- Create a separate file for manual excludes (`rsync-manual-excludes.conf`) that is not cleared on reset.
- Add a button to the settings frame to add files/folders to the manual exclude list.
- Update the backup and calculation logic to use the manual exclude list.
- Ensure the UI reflects the combined exclude lists.
2025-09-01 16:16:55 +02:00

289 lines
13 KiB
Python

# pyimage/core/data_processing.py
import os
import fnmatch
import shutil
import re
import subprocess
from queue import Empty
from pbp_app_config import AppConfig, Msg
from shared_libs.logger import app_logger
class DataProcessing:
def __init__(self, app):
self.app = app
def load_exclude_patterns(self):
all_patterns = set()
try:
if AppConfig.GENERATED_EXCLUDE_LIST_PATH.exists():
with open(AppConfig.GENERATED_EXCLUDE_LIST_PATH, 'r') as f:
generated_patterns = [
line.strip() for line in f if line.strip() and not line.startswith('#')]
all_patterns.update(generated_patterns)
app_logger.log(
f"Loaded generated exclusion patterns: {generated_patterns}")
except FileNotFoundError:
app_logger.log(
f"Generated exclusion list not found: {AppConfig.GENERATED_EXCLUDE_LIST_PATH}")
except IOError as e:
app_logger.log(f"Error loading generated exclusion list: {e}")
try:
if AppConfig.USER_EXCLUDE_LIST_PATH.exists():
with open(AppConfig.USER_EXCLUDE_LIST_PATH, 'r') as f:
user_patterns = [
line.strip() for line in f if line.strip() and not line.startswith('#')]
all_patterns.update(user_patterns)
app_logger.log(
f"Loaded user-defined exclusion patterns: {user_patterns}")
except FileNotFoundError:
app_logger.log(
f"User-defined exclusion list not found: {AppConfig.USER_EXCLUDE_LIST_PATH}")
except IOError as e:
app_logger.log(f"Error loading user-defined exclusion list: {e}")
try:
if AppConfig.MANUAL_EXCLUDE_LIST_PATH.exists():
with open(AppConfig.MANUAL_EXCLUDE_LIST_PATH, 'r') as f:
manual_patterns = [
line.strip() for line in f if line.strip() and not line.startswith('#')]
all_patterns.update(manual_patterns)
app_logger.log(
f"Loaded manual exclusion patterns: {manual_patterns}")
except FileNotFoundError:
app_logger.log(
f"Manual exclusion list not found: {AppConfig.MANUAL_EXCLUDE_LIST_PATH}")
except IOError as e:
app_logger.log(f"Error loading manual exclusion list: {e}")
final_patterns = sorted(list(all_patterns))
app_logger.log(f"Combined exclusion patterns: {final_patterns}")
return final_patterns
def get_folder_size_threaded(self, path, button_text, stop_event, exclude_patterns=None, mode='backup'):
total_size = 0
if exclude_patterns is None:
exclude_patterns = []
# Compile exclude patterns into a single regex for performance
if exclude_patterns:
exclude_regex = re.compile(
'|'.join(fnmatch.translate(p) for p in exclude_patterns))
else:
exclude_regex = None
for dirpath, dirnames, filenames in os.walk(path, topdown=True):
if stop_event.is_set():
return # Stop the calculation
if exclude_regex:
dirnames[:] = [d for d in dirnames if not exclude_regex.match(
os.path.join(dirpath, d))]
for f in filenames:
if stop_event.is_set():
return # Stop the calculation
fp = os.path.join(dirpath, f)
if not exclude_regex or not exclude_regex.match(fp):
if not os.path.islink(fp):
try:
total_size += os.path.getsize(fp)
except OSError:
pass
if not stop_event.is_set():
self.app.queue.put((button_text, total_size, mode))
def get_user_folder_size_threaded(self, path, button_text, stop_event, mode='backup'):
"""Calculates folder size without applying any exclusion lists."""
total_size = 0
for dirpath, dirnames, filenames in os.walk(path, topdown=True):
if stop_event.is_set():
return # Stop the calculation
for f in filenames:
if stop_event.is_set():
return # Stop the calculation
fp = os.path.join(dirpath, f)
if not os.path.islink(fp):
try:
total_size += os.path.getsize(fp)
except OSError:
pass
if not stop_event.is_set():
self.app.queue.put((button_text, total_size, mode))
def get_incremental_backup_size(self, source_path: str, dest_path: str, is_system: bool, exclude_files: list = None) -> int:
"""
Calculates the approximate size of an incremental backup using rsync's dry-run feature.
This is much faster than walking the entire file tree.
"""
app_logger.log(f"Calculating incremental backup size for source: {source_path}")
parent_dest = os.path.dirname(dest_path)
if not os.path.exists(parent_dest):
# If the parent destination doesn't exist, there are no previous backups to link to.
# In this case, the incremental size is the full size of the source.
# We can use the existing full-size calculation method.
# This is a simplified approach for the estimation.
# A more accurate one would run rsync without --link-dest.
app_logger.log("Destination parent does not exist, cannot calculate incremental size. Returning 0.")
return 0
# Find the latest backup to link against
try:
backups = sorted([d for d in os.listdir(parent_dest) if os.path.isdir(os.path.join(parent_dest, d))], reverse=True)
if not backups:
app_logger.log("No previous backups found. Incremental size is full size.")
return 0 # Or trigger a full size calculation
latest_backup_path = os.path.join(parent_dest, backups[0])
except FileNotFoundError:
app_logger.log("Could not list backups, assuming no prior backups exist.")
return 0
command = []
if is_system:
command.extend(['pkexec', 'rsync', '-aAXHvn', '--stats'])
else:
command.extend(['rsync', '-avn', '--stats'])
command.append(f"--link-dest={latest_backup_path}")
if exclude_files:
for exclude_file in exclude_files:
command.append(f"--exclude-from={exclude_file}")
if AppConfig.MANUAL_EXCLUDE_LIST_PATH.exists():
command.append(f"--exclude-from={AppConfig.MANUAL_EXCLUDE_LIST_PATH}")
# The destination for a dry run can be a dummy path, but it must exist.
# Let's use a temporary directory.
dummy_dest = os.path.join(parent_dest, "dry_run_dest")
os.makedirs(dummy_dest, exist_ok=True)
command.extend([source_path, dummy_dest])
app_logger.log(f"Executing rsync dry-run command: {' '.join(command)}")
try:
result = subprocess.run(command, capture_output=True, text=True, check=False)
# Clean up the dummy directory
shutil.rmtree(dummy_dest)
if result.returncode != 0:
app_logger.log(f"Rsync dry-run failed with code {result.returncode}: {result.stderr}")
return 0
output = result.stdout + "\n" + result.stderr
# The regex now accepts dots as thousands separators (e.g., 1.234.567).
match = re.search(r"Total transferred file size: ([\d,.]+) bytes", output)
if match:
# Remove both dots and commas before converting to an integer.
size_str = match.group(1).replace(',', '').replace('.', '')
size_bytes = int(size_str)
app_logger.log(f"Estimated incremental backup size: {size_bytes} bytes")
return size_bytes
else:
app_logger.log("Could not find 'Total transferred file size' in rsync output.")
# Log the output just in case something changes in the future
app_logger.log(f"Full rsync output for debugging:\n{output}")
return 0
except FileNotFoundError:
app_logger.log("Error: 'rsync' or 'pkexec' command not found.")
return 0
except Exception as e:
app_logger.log(f"An unexpected error occurred during incremental size calculation: {e}")
return 0
def process_queue(self):
try:
message = self.app.queue.get_nowait()
# Check for the new message format with status
calc_type, status = None, None
if len(message) == 5:
button_text, folder_size, mode_when_started, calc_type, status = message
elif len(message) == 3:
button_text, folder_size, mode_when_started = message
else:
return # Ignore malformed messages
if mode_when_started != self.app.mode:
if calc_type == 'accurate_incremental':
self.app.actions._set_ui_state(True) # Unlock UI
self.app.genaue_berechnung_var.set(False) # Uncheck the box
self.app.accurate_calculation_running = False
self.app.animated_icon.stop("DISABLE")
return # Discard stale result
# --- Update Main Canvas ---
current_folder_name = self.app.left_canvas_data.get('folder')
if current_folder_name == button_text:
if self.app.left_canvas_animation:
self.app.left_canvas_animation.stop()
self.app.left_canvas_animation.destroy()
self.app.left_canvas_animation = None
size_in_gb = folder_size / (1024**3)
size_str = f"{size_in_gb:.2f} GB" if size_in_gb >= 1 else f"{folder_size / (1024*1024):.2f} MB"
self.app.left_canvas_data['size'] = size_str
self.app.left_canvas_data['total_bytes'] = folder_size
self.app.left_canvas_data['calculating'] = False
self.app.drawing.redraw_left_canvas()
self.app.source_size_bytes = folder_size
# --- Update Bottom Canvases ---
if self.app.mode == 'backup':
# Ensure button_text is a valid key in FOLDER_PATHS
if button_text in AppConfig.FOLDER_PATHS:
total_disk_size, _, _ = shutil.disk_usage(AppConfig.FOLDER_PATHS[button_text])
if folder_size > total_disk_size:
self.app.source_larger_than_partition = True
else:
self.app.source_larger_than_partition = False
percentage = (folder_size / total_disk_size) * 100 if total_disk_size > 0 else 0
self.app.source_size_canvas.delete("all")
fill_width = (self.app.source_size_canvas.winfo_width() / 100) * percentage
self.app.source_size_canvas.create_rectangle(0, 0, fill_width, self.app.source_size_canvas.winfo_height(), fill="#0078d7", outline="")
self.app.source_size_label.config(text=f"{folder_size / (1024**3):.2f} GB / {total_disk_size / (1024**3):.2f} GB")
self.app.drawing.update_target_projection()
# --- Enable Start Button Logic ---
if self.app.mode == 'backup' and self.app.destination_path:
self.app.start_pause_button.config(state="normal")
# --- Handle Accurate Calculation Completion ---
if calc_type == 'accurate_incremental':
self.app.source_size_bytes = folder_size # Update the source size
self.app.drawing.update_target_projection() # Redraw the projection
self.app.animated_icon.stop("DISABLE")
self.app.task_progress.stop()
self.app.task_progress.config(mode="determinate", value=0)
self.app.actions._set_ui_state(True)
self.app.genaue_berechnung_var.set(False)
self.app.accurate_calculation_running = False
self.app.start_pause_button.config(text=Msg.STR["start"])
if status == 'success':
self.app.info_label.config(text=Msg.STR["accurate_size_success"], foreground="#0078d7")
self.app.current_file_label.config(text="")
else:
self.app.info_label.config(text=Msg.STR["accurate_size_failed"], foreground="#D32F2F") # Red for failed
self.app.current_file_label.config(text="")
except Empty:
pass
finally:
self.app.after(100, self.process_queue)