453 lines
16 KiB
Python
453 lines
16 KiB
Python
import socket
|
|
import os
|
|
import urllib.request
|
|
import json
|
|
import hashlib
|
|
import subprocess
|
|
from typing import Union, List
|
|
import re
|
|
import tempfile
|
|
|
|
|
|
class GPGManager:
|
|
|
|
@staticmethod
|
|
def get_gpg() -> bool:
|
|
"""
|
|
Check if gpg is installed.
|
|
|
|
Returns:
|
|
bool: True if `gpg` is installed, False otherwise.
|
|
"""
|
|
result = subprocess.run(
|
|
["which gpg || command -v gpg"],
|
|
capture_output=True,
|
|
shell=True,
|
|
text=True,
|
|
check=False,
|
|
)
|
|
if result.returncode == 0:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def is_key_already_imported(key_id: str) -> bool:
|
|
"""
|
|
Prüft, ob der Schlüssel bereits im Keyring importiert ist.
|
|
|
|
Args:
|
|
key_id (str): ID des öffentlichen Schlüssels (z. B. '7D8A6E1F9B4C3A5D...')
|
|
|
|
Returns:
|
|
bool: True, wenn der Schlüssel vorhanden ist.
|
|
"""
|
|
from message import MessageDialog
|
|
from manager import LocaleStrings
|
|
|
|
# Check if `gpg` is installed
|
|
if not GPGManager.get_gpg():
|
|
|
|
MessageDialog("warning", LocaleStrings.MSGGPG["gpg_missing"]).show()
|
|
return False
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
["gpg", "--list-keys", key_id],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
if key_id in result.stdout:
|
|
return True
|
|
else:
|
|
return False
|
|
except Exception as e:
|
|
if e.returncode == 2:
|
|
return False
|
|
else:
|
|
MessageDialog(
|
|
"error", f"{LocaleStrings.MSGA['error_gpg_check']}{e}"
|
|
).show()
|
|
return False
|
|
|
|
@staticmethod
|
|
def is_url_reachable(url: str, timeout: int = 5) -> bool:
|
|
"""
|
|
Checks if a given URL is reachable.
|
|
|
|
Args:
|
|
url (str): The URL to check.
|
|
timeout (int): Timeout in seconds for the connection attempt.
|
|
|
|
Returns:
|
|
bool: True if the URL is reachable, False otherwise.
|
|
"""
|
|
try:
|
|
urllib.request.urlopen(url, timeout=timeout)
|
|
return True
|
|
except Exception as e:
|
|
from message import MessageDialog
|
|
from manager import LocaleStrings
|
|
|
|
MessageDialog(
|
|
"error",
|
|
f"{LocaleStrings.MSGGPG['url_not_reachable']}{url} - {LocaleStrings.MSGA['error']}{e}",
|
|
)
|
|
return False
|
|
|
|
@staticmethod
|
|
def import_key_from_url(
|
|
key_url: str, expected_fingerprint: str, filename: str
|
|
) -> bool:
|
|
"""
|
|
Downloads a GPG public key from the given URL, verifies its fingerprint matches
|
|
the expected value, and imports it into the local GPG keyring.
|
|
|
|
Args:
|
|
key_url (str): URL to the `.asc` public key file.
|
|
expected_fingerprint (str): Expected 40-character hexadecimal fingerprint of the key.
|
|
filename (str): Destination filename for saving the downloaded key in `PUBLIC_KEYS_DIR`.
|
|
|
|
Returns:
|
|
bool: True if the key was downloaded, verified, and successfully imported. False otherwise.
|
|
"""
|
|
from message import MessageDialog
|
|
from manager import LocaleStrings
|
|
|
|
# Configuration: Define the directory for storing public key files
|
|
PUBLIC_KEYS_DIR = "/tmp/public_keys"
|
|
if not os.path.exists(PUBLIC_KEYS_DIR):
|
|
os.makedirs(PUBLIC_KEYS_DIR)
|
|
|
|
try:
|
|
# Construct full path to save the key file
|
|
key_path = os.path.join(PUBLIC_KEYS_DIR, filename)
|
|
print(f"Downloading public key from {key_url} to {key_path}")
|
|
urllib.request.urlretrieve(key_url, key_path)
|
|
|
|
result = subprocess.run(
|
|
["gpg", "-fingerprint", key_path],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
# Extract all fingerprints from GPG output (40-character hexadecimal)
|
|
fingerprints_from_key: List[str] = []
|
|
for line in result.stdout.splitlines():
|
|
matches = re.findall(r"\b[0-9A-Fa-f]{40}\b", line)
|
|
fingerprints_from_key.extend(matches)
|
|
|
|
if not fingerprints_from_key:
|
|
MessageDialog(
|
|
"warning", LocaleStrings.MSGGPG["corrupted_file"]
|
|
).show()
|
|
return False
|
|
|
|
# Check if any of the extracted fingerprints match the expected one
|
|
found = any(fp == expected_fingerprint for fp in fingerprints_from_key)
|
|
if not found:
|
|
MessageDialog(
|
|
"warning",
|
|
f"{LocaleStrings.MSGGPG['mismatch']}\n{expected_fingerprint}\n{LocaleStrings.MSGGPG['but_got']}\n{fingerprints_from_key}",
|
|
wraplength=450,
|
|
).show()
|
|
|
|
return False
|
|
|
|
else:
|
|
# Import key into GPG keyring
|
|
result_import = subprocess.run(
|
|
["gpg", "--import", key_path],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
if result_import.returncode == 0:
|
|
print(f"Public key from {key_url} successfully imported.")
|
|
return True
|
|
else:
|
|
MessageDialog(
|
|
"error",
|
|
f"{LocaleStrings.MSGGPG['failed_import']}{result_import.stderr}",
|
|
).show()
|
|
return False
|
|
|
|
else:
|
|
MessageDialog(
|
|
"error",
|
|
f"{LocaleStrings.MSGGPG['fingerprint_extract']}{result.stderr}",
|
|
)
|
|
return False
|
|
|
|
except Exception as e:
|
|
MessageDialog(
|
|
"error", f"{LocaleStrings.MSGGPG['error_import_key']}{key_url}: {e}"
|
|
)
|
|
return False
|
|
|
|
@staticmethod
|
|
def update_gpg_trust_level(key_id: str, trust_level: int = 5) -> bool:
|
|
"""
|
|
Sets the trust level for a specified GPG public key.
|
|
|
|
Args:
|
|
key_id (str): The hexadecimal ID of the key to update.
|
|
trust_level (int): Trust level (1-5). Default is 5 (fully trusted).
|
|
|
|
Returns:
|
|
bool: True if the trust level was successfully updated, False otherwise.
|
|
"""
|
|
script = f"""#!/bin/bash
|
|
# Set required environment variables
|
|
export GPG_TTY=$(tty)
|
|
export GNUPG_STATUS="1"
|
|
export GNUPGAGENT_INFO_FILE="/dev/null"
|
|
gpg --batch \
|
|
--no-tty \
|
|
--command-fd 0 \
|
|
--edit-key {key_id} << EOF
|
|
trust
|
|
{trust_level}
|
|
quit
|
|
EOF
|
|
"""
|
|
|
|
try:
|
|
|
|
from message import MessageDialog
|
|
from manager import LocaleStrings
|
|
|
|
# Create temporary script file
|
|
temp_script = tempfile.NamedTemporaryFile(
|
|
mode="w", suffix=".sh", delete=False, encoding="utf-8"
|
|
)
|
|
temp_script.write(script)
|
|
temp_script.close()
|
|
|
|
print(f"Setting trust level {trust_level} for key {key_id}")
|
|
result = subprocess.run(
|
|
["bash", temp_script.name], capture_output=True, text=True, check=True
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
|
|
MessageDialog(
|
|
"error",
|
|
f"{LocaleStrings.MSGGPG['error_updating_trust_level']}{result.stderr}",
|
|
).show()
|
|
|
|
except (subprocess.CalledProcessError, FileNotFoundError) as e:
|
|
MessageDialog(
|
|
"error", f"{LocaleStrings.MSGGPG['error_executing_script']}{e}"
|
|
).show()
|
|
return False
|
|
|
|
finally:
|
|
try:
|
|
os.unlink(temp_script.name)
|
|
except OSError:
|
|
pass
|
|
|
|
return True
|
|
|
|
|
|
class GiteaUpdater:
|
|
@staticmethod
|
|
def get_latest_version_from_api(url: str, current_version: str = "") -> str:
|
|
"""
|
|
Fetches the latest version of a project from the Gitea API.
|
|
|
|
Args:
|
|
url (str): The URL to query the Gitea API for releases.
|
|
current_version (str, optional): Not used in this implementation. Defaults to "".
|
|
|
|
Returns:
|
|
str: Latest version tag name without the 'v' prefix.
|
|
If an error occurs, returns "Unknown".
|
|
"""
|
|
try:
|
|
with urllib.request.urlopen(url, timeout=10) as response:
|
|
data = json.loads(response.read().decode())
|
|
if data and len(data) > 0:
|
|
latest_version = data[0].get("tag_name", "Unknown")
|
|
return latest_version.lstrip("v") # Remove 'v' prefix
|
|
return "Unknown"
|
|
except Exception as e:
|
|
print(f"API Error: {e}")
|
|
return "Unknown"
|
|
|
|
@staticmethod
|
|
def download_appimage(
|
|
version: str = "latest",
|
|
verify_checksum: bool = False,
|
|
verify_signature: bool = False,
|
|
) -> Union[str, None]:
|
|
"""
|
|
Downloads the AppImage file to /tmp/portinstaller and performs verification checks.
|
|
|
|
Args:
|
|
version (str): 'latest' for the latest version or a specific version.
|
|
verify_checksum (bool): Whether to perform SHA256 checksum verification.
|
|
verify_signature (bool): Whether to validate GPG signature.
|
|
|
|
Returns:
|
|
str: Full path to the AppImage if all checks pass. Otherwise, returns an error message.
|
|
None: If an exception occurs during download or verification.
|
|
"""
|
|
base_url = "https://git.ilunix.de/punix/lxtools_installer/releases/download/"
|
|
|
|
from message import MessageDialog
|
|
from manager import LocaleStrings
|
|
|
|
# Get latest version from API if not provided
|
|
if version == "latest":
|
|
try:
|
|
|
|
with urllib.request.urlopen(
|
|
"https://git.ilunix.de/api/v1/repos/punix/lxtools_installer/releases?limit=1",
|
|
timeout=10,
|
|
) as response:
|
|
data = json.loads(response.read().decode())
|
|
latest_version = data[0].get("tag_name")
|
|
if not latest_version:
|
|
print(LocaleStrings.MSGA["Failed_retrieving"])
|
|
return None
|
|
except Exception as e:
|
|
print(f"{LocaleStrings.MSGA['Error_retrieving']}{e}")
|
|
return None
|
|
|
|
else:
|
|
latest_version = version
|
|
|
|
# Create /tmp/portinstaller directory if it doesn't exist
|
|
download_dir = "/tmp/portinstaller"
|
|
os.makedirs(download_dir, exist_ok=True)
|
|
|
|
filename = f"{download_dir}/lxtools_installer{latest_version}-x86_64.AppImage"
|
|
appimage_url = f"{base_url}{latest_version}/{filename.split('/')[-1]}"
|
|
checksum_url = f"{base_url}{latest_version}/{filename.split('/')[-1]}.sha256"
|
|
signature_url = f"{base_url}{latest_version}/{filename.split('/')[-1]}.asc"
|
|
|
|
try:
|
|
print("Downloading AppImage Updater...")
|
|
urllib.request.urlretrieve(appimage_url, filename)
|
|
|
|
# SHA256 checksum verification
|
|
result = None
|
|
if verify_checksum:
|
|
try:
|
|
with urllib.request.urlopen(checksum_url, timeout=10) as response:
|
|
checksum_content = response.read().decode()
|
|
expected_hash, _ = checksum_content.strip().split(" ")
|
|
except Exception as e:
|
|
|
|
result = MessageDialog(
|
|
"ask",
|
|
f"{LocaleStrings.MSGA['SHA256_File_not_found']}{e} {LocaleStrings.MSGA['SHA256_File_not_found1']}",
|
|
buttons=["Yes", "No"],
|
|
).show()
|
|
|
|
if not result:
|
|
return "Checksum file not found"
|
|
|
|
if result:
|
|
pass
|
|
else:
|
|
with open(filename, "rb") as f:
|
|
file_hash = hashlib.sha256(f.read()).hexdigest()
|
|
|
|
if expected_hash != file_hash:
|
|
|
|
MessageDialog(
|
|
"warning", LocaleStrings.MSGA["SHA256 hash mismatch"]
|
|
).show()
|
|
return "Checksum mismatch"
|
|
|
|
# GPG signature verification
|
|
if verify_signature:
|
|
signature_path = f"{download_dir}/{filename.split('/')[-1]}.asc"
|
|
try:
|
|
urllib.request.urlretrieve(signature_url, signature_path)
|
|
except Exception as e:
|
|
|
|
MessageDialog(
|
|
"error", f"{LocaleStrings.MSGA['not_gpg_found']}{e}"
|
|
).show()
|
|
return "Signature file not found"
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
["gpg", "--verify", signature_path, filename],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
if result.returncode == 0:
|
|
print(LocaleStrings.MSGA["gpg_verify_success"])
|
|
except Exception as e:
|
|
from message import MessageDialog
|
|
|
|
MessageDialog(
|
|
"error", f"{LocaleStrings.MSGA['error_gpg_check']}{e.stderr}"
|
|
).show()
|
|
return "Signature verification failed"
|
|
|
|
# Return the full path to the AppImage
|
|
return filename
|
|
|
|
except Exception as e:
|
|
from message import MessageDialog
|
|
|
|
MessageDialog(
|
|
"error", f"{LocaleStrings.MSGA['error_gpg_download']}{e}"
|
|
).show()
|
|
return None
|
|
|
|
|
|
class NetworkChecker:
|
|
@staticmethod
|
|
def check_internet_connection(
|
|
host: str = "8.8.8.8", port: int = 53, timeout: float = 3
|
|
) -> bool:
|
|
"""
|
|
Checks if an internet connection is available.
|
|
|
|
Args:
|
|
host (str): Host to connect to for testing. Defaults to "8.8.8.8" (Google DNS).
|
|
port (int): Port number to use for the test. Defaults to 53.
|
|
timeout (float): Timeout in seconds for the connection attempt. Defaults to 3.
|
|
|
|
Returns:
|
|
bool: True if internet is available, False otherwise.
|
|
"""
|
|
try:
|
|
socket.setdefaulttimeout(timeout)
|
|
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port))
|
|
return True
|
|
except socket.error:
|
|
return False
|
|
|
|
@staticmethod
|
|
def check_repository_access(
|
|
url: str = "https://git.ilunix.de", timeout: float = 5
|
|
) -> bool:
|
|
"""
|
|
Checks if the Gitea repository is accessible.
|
|
|
|
Args:
|
|
url (str): The URL of the Gitea repository. Defaults to "https://git.ilunix.de".
|
|
timeout (float): Timeout in seconds for the connection attempt. Defaults to 5.
|
|
|
|
Returns:
|
|
bool: True if the repository is accessible, False otherwise.
|
|
"""
|
|
try:
|
|
urllib.request.urlopen(url, timeout=timeout)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error accessing Gitea repository: {e}")
|
|
return False
|