import socket import os import urllib.request import json import hashlib import subprocess from typing import Union, List import re import tempfile class GPGManager: @staticmethod def get_gpg() -> bool: """ Check if gpg is installed. Returns: bool: True if `gpg` is installed, False otherwise. """ result = subprocess.run( ["which gpg || command -v gpg"], capture_output=True, shell=True, text=True, check=False, ) if result.returncode == 0: return True else: return False @staticmethod def is_key_already_imported(key_id: str) -> bool: """ Prüft, ob der Schlüssel bereits im Keyring importiert ist. Args: key_id (str): ID des öffentlichen Schlüssels (z. B. '7D8A6E1F9B4C3A5D...') Returns: bool: True, wenn der Schlüssel vorhanden ist. """ from message import MessageDialog from manager import LocaleStrings # Check if `gpg` is installed if not GPGManager.get_gpg(): MessageDialog("warning", LocaleStrings.MSGGPG["gpg_missing"]).show() return False try: result = subprocess.run( ["gpg", "--list-keys", key_id], capture_output=True, text=True, check=True, ) if key_id in result.stdout: return True else: return False except Exception as e: if e.returncode == 2: return False else: MessageDialog( "error", f"{LocaleStrings.MSGA['error_gpg_check']}{e}" ).show() return False @staticmethod def is_url_reachable(url: str, timeout: int = 5) -> bool: """ Checks if a given URL is reachable. Args: url (str): The URL to check. timeout (int): Timeout in seconds for the connection attempt. Returns: bool: True if the URL is reachable, False otherwise. """ try: urllib.request.urlopen(url, timeout=timeout) return True except Exception as e: from message import MessageDialog from manager import LocaleStrings MessageDialog( "error", f"{LocaleStrings.MSGGPG['url_not_reachable']}{url} - {LocaleStrings.MSGA['error']}{e}", ) return False @staticmethod def import_key_from_url( key_url: str, expected_fingerprint: str, filename: str ) -> bool: """ Downloads a GPG public key from the given URL, verifies its fingerprint matches the expected value, and imports it into the local GPG keyring. Args: key_url (str): URL to the `.asc` public key file. expected_fingerprint (str): Expected 40-character hexadecimal fingerprint of the key. filename (str): Destination filename for saving the downloaded key in `PUBLIC_KEYS_DIR`. Returns: bool: True if the key was downloaded, verified, and successfully imported. False otherwise. """ from message import MessageDialog from manager import LocaleStrings # Configuration: Define the directory for storing public key files PUBLIC_KEYS_DIR = "/tmp/public_keys" if not os.path.exists(PUBLIC_KEYS_DIR): os.makedirs(PUBLIC_KEYS_DIR) try: # Construct full path to save the key file key_path = os.path.join(PUBLIC_KEYS_DIR, filename) print(f"Downloading public key from {key_url} to {key_path}") urllib.request.urlretrieve(key_url, key_path) result = subprocess.run( ["gpg", "-fingerprint", key_path], capture_output=True, text=True, check=True, ) if result.returncode == 0: # Extract all fingerprints from GPG output (40-character hexadecimal) fingerprints_from_key: List[str] = [] for line in result.stdout.splitlines(): matches = re.findall(r"\b[0-9A-Fa-f]{40}\b", line) fingerprints_from_key.extend(matches) if not fingerprints_from_key: MessageDialog( "warning", LocaleStrings.MSGGPG["corrupted_file"] ).show() return False # Check if any of the extracted fingerprints match the expected one found = any(fp == expected_fingerprint for fp in fingerprints_from_key) if not found: MessageDialog( "warning", f"{LocaleStrings.MSGGPG['mismatch']}\n{expected_fingerprint}\n{LocaleStrings.MSGGPG['but_got']}\n{fingerprints_from_key}", wraplength=450, ).show() return False else: # Import key into GPG keyring result_import = subprocess.run( ["gpg", "--import", key_path], capture_output=True, text=True, check=True, ) if result_import.returncode == 0: print(f"Public key from {key_url} successfully imported.") return True else: MessageDialog( "error", f"{LocaleStrings.MSGGPG['failed_import']}{result_import.stderr}", ).show() return False else: MessageDialog( "error", f"{LocaleStrings.MSGGPG['fingerprint_extract']}{result.stderr}", ) return False except Exception as e: MessageDialog( "error", f"{LocaleStrings.MSGGPG['error_import_key']}{key_url}: {e}" ) return False @staticmethod def update_gpg_trust_level(key_id: str, trust_level: int = 5) -> bool: """ Sets the trust level for a specified GPG public key. Args: key_id (str): The hexadecimal ID of the key to update. trust_level (int): Trust level (1-5). Default is 5 (fully trusted). Returns: bool: True if the trust level was successfully updated, False otherwise. """ script = f"""#!/bin/bash # Set required environment variables export GPG_TTY=$(tty) export GNUPG_STATUS="1" export GNUPGAGENT_INFO_FILE="/dev/null" gpg --batch \ --no-tty \ --command-fd 0 \ --edit-key {key_id} << EOF trust {trust_level} quit EOF """ try: from message import MessageDialog from manager import LocaleStrings # Create temporary script file temp_script = tempfile.NamedTemporaryFile( mode="w", suffix=".sh", delete=False, encoding="utf-8" ) temp_script.write(script) temp_script.close() print(f"Setting trust level {trust_level} for key {key_id}") result = subprocess.run( ["bash", temp_script.name], capture_output=True, text=True, check=True ) if result.returncode != 0: MessageDialog( "error", f"{LocaleStrings.MSGGPG['error_updating_trust_level']}{result.stderr}", ).show() except (subprocess.CalledProcessError, FileNotFoundError) as e: MessageDialog( "error", f"{LocaleStrings.MSGGPG['error_executing_script']}{e}" ).show() return False finally: try: os.unlink(temp_script.name) except OSError: pass return True class GiteaUpdater: @staticmethod def get_latest_version_from_api(url: str, current_version: str = "") -> str: """ Fetches the latest version of a project from the Gitea API. Args: url (str): The URL to query the Gitea API for releases. current_version (str, optional): Not used in this implementation. Defaults to "". Returns: str: Latest version tag name without the 'v' prefix. If an error occurs, returns "Unknown". """ try: with urllib.request.urlopen(url, timeout=10) as response: data = json.loads(response.read().decode()) if data and len(data) > 0: latest_version = data[0].get("tag_name", "Unknown") return latest_version.lstrip("v") # Remove 'v' prefix return "Unknown" except Exception as e: print(f"API Error: {e}") return "Unknown" @staticmethod def download_appimage( version: str = "latest", verify_checksum: bool = False, verify_signature: bool = False, ) -> Union[str, None]: """ Downloads the AppImage file to /tmp/portinstaller and performs verification checks. Args: version (str): 'latest' for the latest version or a specific version. verify_checksum (bool): Whether to perform SHA256 checksum verification. verify_signature (bool): Whether to validate GPG signature. Returns: str: Full path to the AppImage if all checks pass. Otherwise, returns an error message. None: If an exception occurs during download or verification. """ base_url = "https://git.ilunix.de/punix/lxtools_installer/releases/download/" from message import MessageDialog from manager import LocaleStrings # Get latest version from API if not provided if version == "latest": try: with urllib.request.urlopen( "https://git.ilunix.de/api/v1/repos/punix/lxtools_installer/releases?limit=1", timeout=10, ) as response: data = json.loads(response.read().decode()) latest_version = data[0].get("tag_name") if not latest_version: print(LocaleStrings.MSGA["Failed_retrieving"]) return None except Exception as e: print(f"{LocaleStrings.MSGA['Error_retrieving']}{e}") return None else: latest_version = version # Create /tmp/portinstaller directory if it doesn't exist download_dir = "/tmp/portinstaller" os.makedirs(download_dir, exist_ok=True) filename = f"{download_dir}/lxtools_installer{latest_version}-x86_64.AppImage" appimage_url = f"{base_url}{latest_version}/{filename.split('/')[-1]}" checksum_url = f"{base_url}{latest_version}/{filename.split('/')[-1]}.sha256" signature_url = f"{base_url}{latest_version}/{filename.split('/')[-1]}.asc" try: print("Downloading AppImage Updater...") urllib.request.urlretrieve(appimage_url, filename) # SHA256 checksum verification result = None if verify_checksum: try: with urllib.request.urlopen(checksum_url, timeout=10) as response: checksum_content = response.read().decode() expected_hash, _ = checksum_content.strip().split(" ") except Exception as e: result = MessageDialog( "ask", f"{LocaleStrings.MSGA['SHA256_File_not_found']}{e} {LocaleStrings.MSGA['SHA256_File_not_found1']}", buttons=["Yes", "No"], ).show() if not result: return "Checksum file not found" if result: pass else: with open(filename, "rb") as f: file_hash = hashlib.sha256(f.read()).hexdigest() if expected_hash != file_hash: MessageDialog( "warning", LocaleStrings.MSGA["SHA256 hash mismatch"] ).show() return "Checksum mismatch" # GPG signature verification if verify_signature: signature_path = f"{download_dir}/{filename.split('/')[-1]}.asc" try: urllib.request.urlretrieve(signature_url, signature_path) except Exception as e: MessageDialog( "error", f"{LocaleStrings.MSGA['not_gpg_found']}{e}" ).show() return "Signature file not found" try: result = subprocess.run( ["gpg", "--verify", signature_path, filename], capture_output=True, text=True, check=True, ) if result.returncode == 0: print(LocaleStrings.MSGA["gpg_verify_success"]) except Exception as e: from message import MessageDialog MessageDialog( "error", f"{LocaleStrings.MSGA['error_gpg_check']}{e.stderr}" ).show() return "Signature verification failed" # Return the full path to the AppImage return filename except Exception as e: from message import MessageDialog MessageDialog( "error", f"{LocaleStrings.MSGA['error_gpg_download']}{e}" ).show() return None class NetworkChecker: @staticmethod def check_internet_connection( host: str = "8.8.8.8", port: int = 53, timeout: float = 3 ) -> bool: """ Checks if an internet connection is available. Args: host (str): Host to connect to for testing. Defaults to "8.8.8.8" (Google DNS). port (int): Port number to use for the test. Defaults to 53. timeout (float): Timeout in seconds for the connection attempt. Defaults to 3. Returns: bool: True if internet is available, False otherwise. """ try: socket.setdefaulttimeout(timeout) socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port)) return True except socket.error: return False @staticmethod def check_repository_access( url: str = "https://git.ilunix.de", timeout: float = 5 ) -> bool: """ Checks if the Gitea repository is accessible. Args: url (str): The URL of the Gitea repository. Defaults to "https://git.ilunix.de". timeout (float): Timeout in seconds for the connection attempt. Defaults to 5. Returns: bool: True if the repository is accessible, False otherwise. """ try: urllib.request.urlopen(url, timeout=timeout) return True except Exception as e: print(f"Error accessing Gitea repository: {e}") return False