Files
lxtools_installer/network.py
2025-07-09 08:49:49 +02:00

453 lines
16 KiB
Python

import socket
import os
import urllib.request
import json
import hashlib
import subprocess
from typing import Union, List
import re
import tempfile
class GPGManager:
@staticmethod
def get_gpg() -> bool:
"""
Check if gpg is installed.
Returns:
bool: True if `gpg` is installed, False otherwise.
"""
result = subprocess.run(
["which gpg || command -v gpg"],
capture_output=True,
shell=True,
text=True,
check=False,
)
if result.returncode == 0:
return True
else:
return False
@staticmethod
def is_key_already_imported(key_id: str) -> bool:
"""
Prüft, ob der Schlüssel bereits im Keyring importiert ist.
Args:
key_id (str): ID des öffentlichen Schlüssels (z. B. '7D8A6E1F9B4C3A5D...')
Returns:
bool: True, wenn der Schlüssel vorhanden ist.
"""
from message import MessageDialog
from manager import LocaleStrings
# Check if `gpg` is installed
if not GPGManager.get_gpg():
MessageDialog("warning", LocaleStrings.MSGGPG["gpg_missing"]).show()
return False
try:
result = subprocess.run(
["gpg", "--list-keys", key_id],
capture_output=True,
text=True,
check=True,
)
if key_id in result.stdout:
return True
else:
return False
except Exception as e:
if e.returncode == 2:
return False
else:
MessageDialog(
"error", f"{LocaleStrings.MSGA['error_gpg_check']}{e}"
).show()
return False
@staticmethod
def is_url_reachable(url: str, timeout: int = 5) -> bool:
"""
Checks if a given URL is reachable.
Args:
url (str): The URL to check.
timeout (int): Timeout in seconds for the connection attempt.
Returns:
bool: True if the URL is reachable, False otherwise.
"""
try:
urllib.request.urlopen(url, timeout=timeout)
return True
except Exception as e:
from message import MessageDialog
from manager import LocaleStrings
MessageDialog(
"error",
f"{LocaleStrings.MSGGPG['url_not_reachable']}{url} - {LocaleStrings.MSGA['error']}{e}",
)
return False
@staticmethod
def import_key_from_url(
key_url: str, expected_fingerprint: str, filename: str
) -> bool:
"""
Downloads a GPG public key from the given URL, verifies its fingerprint matches
the expected value, and imports it into the local GPG keyring.
Args:
key_url (str): URL to the `.asc` public key file.
expected_fingerprint (str): Expected 40-character hexadecimal fingerprint of the key.
filename (str): Destination filename for saving the downloaded key in `PUBLIC_KEYS_DIR`.
Returns:
bool: True if the key was downloaded, verified, and successfully imported. False otherwise.
"""
from message import MessageDialog
from manager import LocaleStrings
# Configuration: Define the directory for storing public key files
PUBLIC_KEYS_DIR = "/tmp/public_keys"
if not os.path.exists(PUBLIC_KEYS_DIR):
os.makedirs(PUBLIC_KEYS_DIR)
try:
# Construct full path to save the key file
key_path = os.path.join(PUBLIC_KEYS_DIR, filename)
print(f"Downloading public key from {key_url} to {key_path}")
urllib.request.urlretrieve(key_url, key_path)
result = subprocess.run(
["gpg", "-fingerprint", key_path],
capture_output=True,
text=True,
check=True,
)
if result.returncode == 0:
# Extract all fingerprints from GPG output (40-character hexadecimal)
fingerprints_from_key: List[str] = []
for line in result.stdout.splitlines():
matches = re.findall(r"\b[0-9A-Fa-f]{40}\b", line)
fingerprints_from_key.extend(matches)
if not fingerprints_from_key:
MessageDialog(
"warning", LocaleStrings.MSGGPG["corrupted_file"]
).show()
return False
# Check if any of the extracted fingerprints match the expected one
found = any(fp == expected_fingerprint for fp in fingerprints_from_key)
if not found:
MessageDialog(
"warning",
f"{LocaleStrings.MSGGPG['mismatch']}\n{expected_fingerprint}\n{LocaleStrings.MSGGPG['but_got']}\n{fingerprints_from_key}",
wraplength=450,
).show()
return False
else:
# Import key into GPG keyring
result_import = subprocess.run(
["gpg", "--import", key_path],
capture_output=True,
text=True,
check=True,
)
if result_import.returncode == 0:
print(f"Public key from {key_url} successfully imported.")
return True
else:
MessageDialog(
"error",
f"{LocaleStrings.MSGGPG['failed_import']}{result_import.stderr}",
).show()
return False
else:
MessageDialog(
"error",
f"{LocaleStrings.MSGGPG['fingerprint_extract']}{result.stderr}",
)
return False
except Exception as e:
MessageDialog(
"error", f"{LocaleStrings.MSGGPG['error_import_key']}{key_url}: {e}"
)
return False
@staticmethod
def update_gpg_trust_level(key_id: str, trust_level: int = 5) -> bool:
"""
Sets the trust level for a specified GPG public key.
Args:
key_id (str): The hexadecimal ID of the key to update.
trust_level (int): Trust level (1-5). Default is 5 (fully trusted).
Returns:
bool: True if the trust level was successfully updated, False otherwise.
"""
script = f"""#!/bin/bash
# Set required environment variables
export GPG_TTY=$(tty)
export GNUPG_STATUS="1"
export GNUPGAGENT_INFO_FILE="/dev/null"
gpg --batch \
--no-tty \
--command-fd 0 \
--edit-key {key_id} << EOF
trust
{trust_level}
quit
EOF
"""
try:
from message import MessageDialog
from manager import LocaleStrings
# Create temporary script file
temp_script = tempfile.NamedTemporaryFile(
mode="w", suffix=".sh", delete=False, encoding="utf-8"
)
temp_script.write(script)
temp_script.close()
print(f"Setting trust level {trust_level} for key {key_id}")
result = subprocess.run(
["bash", temp_script.name], capture_output=True, text=True, check=True
)
if result.returncode != 0:
MessageDialog(
"error",
f"{LocaleStrings.MSGGPG['error_updating_trust_level']}{result.stderr}",
).show()
except (subprocess.CalledProcessError, FileNotFoundError) as e:
MessageDialog(
"error", f"{LocaleStrings.MSGGPG['error_executing_script']}{e}"
).show()
return False
finally:
try:
os.unlink(temp_script.name)
except OSError:
pass
return True
class GiteaUpdater:
@staticmethod
def get_latest_version_from_api(url: str, current_version: str = "") -> str:
"""
Fetches the latest version of a project from the Gitea API.
Args:
url (str): The URL to query the Gitea API for releases.
current_version (str, optional): Not used in this implementation. Defaults to "".
Returns:
str: Latest version tag name without the 'v' prefix.
If an error occurs, returns "Unknown".
"""
try:
with urllib.request.urlopen(url, timeout=10) as response:
data = json.loads(response.read().decode())
if data and len(data) > 0:
latest_version = data[0].get("tag_name", "Unknown")
return latest_version.lstrip("v") # Remove 'v' prefix
return "Unknown"
except Exception as e:
print(f"API Error: {e}")
return "Unknown"
@staticmethod
def download_appimage(
version: str = "latest",
verify_checksum: bool = False,
verify_signature: bool = False,
) -> Union[str, None]:
"""
Downloads the AppImage file to /tmp/portinstaller and performs verification checks.
Args:
version (str): 'latest' for the latest version or a specific version.
verify_checksum (bool): Whether to perform SHA256 checksum verification.
verify_signature (bool): Whether to validate GPG signature.
Returns:
str: Full path to the AppImage if all checks pass. Otherwise, returns an error message.
None: If an exception occurs during download or verification.
"""
base_url = "https://git.ilunix.de/punix/lxtools_installer/releases/download/"
from message import MessageDialog
from manager import LocaleStrings
# Get latest version from API if not provided
if version == "latest":
try:
with urllib.request.urlopen(
"https://git.ilunix.de/api/v1/repos/punix/lxtools_installer/releases?limit=1",
timeout=10,
) as response:
data = json.loads(response.read().decode())
latest_version = data[0].get("tag_name")
if not latest_version:
print(LocaleStrings.MSGA["Failed_retrieving"])
return None
except Exception as e:
print(f"{LocaleStrings.MSGA['Error_retrieving']}{e}")
return None
else:
latest_version = version
# Create /tmp/portinstaller directory if it doesn't exist
download_dir = "/tmp/portinstaller"
os.makedirs(download_dir, exist_ok=True)
filename = f"{download_dir}/lxtools_installer{latest_version}-x86_64.AppImage"
appimage_url = f"{base_url}{latest_version}/{filename.split('/')[-1]}"
checksum_url = f"{base_url}{latest_version}/{filename.split('/')[-1]}.sha256"
signature_url = f"{base_url}{latest_version}/{filename.split('/')[-1]}.asc"
try:
print("Downloading AppImage Updater...")
urllib.request.urlretrieve(appimage_url, filename)
# SHA256 checksum verification
result = None
if verify_checksum:
try:
with urllib.request.urlopen(checksum_url, timeout=10) as response:
checksum_content = response.read().decode()
expected_hash, _ = checksum_content.strip().split(" ")
except Exception as e:
result = MessageDialog(
"ask",
f"{LocaleStrings.MSGA['SHA256_File_not_found']}{e} {LocaleStrings.MSGA['SHA256_File_not_found1']}",
buttons=["Yes", "No"],
).show()
if not result:
return "Checksum file not found"
if result:
pass
else:
with open(filename, "rb") as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
if expected_hash != file_hash:
MessageDialog(
"warning", LocaleStrings.MSGA["SHA256 hash mismatch"]
).show()
return "Checksum mismatch"
# GPG signature verification
if verify_signature:
signature_path = f"{download_dir}/{filename.split('/')[-1]}.asc"
try:
urllib.request.urlretrieve(signature_url, signature_path)
except Exception as e:
MessageDialog(
"error", f"{LocaleStrings.MSGA['not_gpg_found']}{e}"
).show()
return "Signature file not found"
try:
result = subprocess.run(
["gpg", "--verify", signature_path, filename],
capture_output=True,
text=True,
check=True,
)
if result.returncode == 0:
print(LocaleStrings.MSGA["gpg_verify_success"])
except Exception as e:
from message import MessageDialog
MessageDialog(
"error", f"{LocaleStrings.MSGA['error_gpg_check']}{e.stderr}"
).show()
return "Signature verification failed"
# Return the full path to the AppImage
return filename
except Exception as e:
from message import MessageDialog
MessageDialog(
"error", f"{LocaleStrings.MSGA['error_gpg_download']}{e}"
).show()
return None
class NetworkChecker:
@staticmethod
def check_internet_connection(
host: str = "8.8.8.8", port: int = 53, timeout: float = 3
) -> bool:
"""
Checks if an internet connection is available.
Args:
host (str): Host to connect to for testing. Defaults to "8.8.8.8" (Google DNS).
port (int): Port number to use for the test. Defaults to 53.
timeout (float): Timeout in seconds for the connection attempt. Defaults to 3.
Returns:
bool: True if internet is available, False otherwise.
"""
try:
socket.setdefaulttimeout(timeout)
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port))
return True
except socket.error:
return False
@staticmethod
def check_repository_access(
url: str = "https://git.ilunix.de", timeout: float = 5
) -> bool:
"""
Checks if the Gitea repository is accessible.
Args:
url (str): The URL of the Gitea repository. Defaults to "https://git.ilunix.de".
timeout (float): Timeout in seconds for the connection attempt. Defaults to 5.
Returns:
bool: True if the repository is accessible, False otherwise.
"""
try:
urllib.request.urlopen(url, timeout=timeout)
return True
except Exception as e:
print(f"Error accessing Gitea repository: {e}")
return False