add gpg public_key ipmort
This commit is contained in:
430
network.py
430
network.py
@ -1,28 +1,428 @@
|
||||
import socket
|
||||
import os
|
||||
import urllib.request
|
||||
import json
|
||||
import hashlib
|
||||
import subprocess
|
||||
from typing import Union, List
|
||||
import re
|
||||
import tempfile
|
||||
|
||||
|
||||
class GiteaUpdate:
|
||||
class GPGManager:
|
||||
|
||||
@staticmethod
|
||||
def api_down(url, current_version=""):
|
||||
"""Get latest version from Gitea API"""
|
||||
def get_gpg() -> bool:
|
||||
"""
|
||||
Check if gpg is installed.
|
||||
|
||||
Returns:
|
||||
bool: True if `gpg` is installed, False otherwise.
|
||||
"""
|
||||
result = subprocess.run(
|
||||
["which gpg || command -v gpg"],
|
||||
capture_output=True,
|
||||
shell=True,
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def is_key_already_imported(key_id: str) -> bool:
|
||||
"""
|
||||
Prüft, ob der Schlüssel bereits im Keyring importiert ist.
|
||||
|
||||
Args:
|
||||
key_id (str): ID des öffentlichen Schlüssels (z. B. '7D8A6E1F9B4C3A5D...')
|
||||
|
||||
Returns:
|
||||
bool: True, wenn der Schlüssel vorhanden ist.
|
||||
"""
|
||||
from message import MessageDialog
|
||||
from manager import LocaleStrings
|
||||
|
||||
# Check if `gpg` is installed
|
||||
if not GPGManager.get_gpg():
|
||||
|
||||
MessageDialog("warning", LocaleStrings.MSGGPG["gpg_missing"]).show()
|
||||
return False
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["gpg", "--list-keys", key_id],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
if key_id in result.stdout:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except Exception as e:
|
||||
if e.returncode == 2:
|
||||
return False
|
||||
else:
|
||||
MessageDialog(
|
||||
"error", f"{LocaleStrings.MSGA['error_gpg_check']}{e}"
|
||||
).show()
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def is_url_reachable(url: str, timeout: int = 5) -> bool:
|
||||
"""
|
||||
Checks if a given URL is reachable.
|
||||
|
||||
Args:
|
||||
url (str): The URL to check.
|
||||
timeout (int): Timeout in seconds for the connection attempt.
|
||||
|
||||
Returns:
|
||||
bool: True if the URL is reachable, False otherwise.
|
||||
"""
|
||||
try:
|
||||
urllib.request.urlopen(url, timeout=timeout)
|
||||
return True
|
||||
except Exception as e:
|
||||
from message import MessageDialog
|
||||
from manager import LocaleStrings
|
||||
|
||||
MessageDialog(
|
||||
"error",
|
||||
f"{LocaleStrings.MSGGPG['url_not_reachable']}{url} - {LocaleStrings.MSGA['error']}{e}",
|
||||
)
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def import_key_from_url(
|
||||
key_url: str, expected_fingerprint: str, filename: str
|
||||
) -> bool:
|
||||
"""
|
||||
Downloads a GPG public key from the given URL, verifies its fingerprint matches
|
||||
the expected value, and imports it into the local GPG keyring.
|
||||
|
||||
Args:
|
||||
key_url (str): URL to the `.asc` public key file.
|
||||
expected_fingerprint (str): Expected 40-character hexadecimal fingerprint of the key.
|
||||
filename (str): Destination filename for saving the downloaded key in `PUBLIC_KEYS_DIR`.
|
||||
|
||||
Returns:
|
||||
bool: True if the key was downloaded, verified, and successfully imported. False otherwise.
|
||||
"""
|
||||
from message import MessageDialog
|
||||
from manager import LocaleStrings
|
||||
|
||||
# Configuration: Define the directory for storing public key files
|
||||
PUBLIC_KEYS_DIR = "/tmp/public_keys"
|
||||
if not os.path.exists(PUBLIC_KEYS_DIR):
|
||||
os.makedirs(PUBLIC_KEYS_DIR)
|
||||
|
||||
try:
|
||||
# Construct full path to save the key file
|
||||
key_path = os.path.join(PUBLIC_KEYS_DIR, filename)
|
||||
print(f"Downloading public key from {key_url} to {key_path}")
|
||||
urllib.request.urlretrieve(key_url, key_path)
|
||||
|
||||
result = subprocess.run(
|
||||
["gpg", "-fingerprint", key_path],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
|
||||
if result.returncode == 0:
|
||||
# Extract all fingerprints from GPG output (40-character hexadecimal)
|
||||
fingerprints_from_key: List[str] = []
|
||||
for line in result.stdout.splitlines():
|
||||
matches = re.findall(r"\b[0-9A-Fa-f]{40}\b", line)
|
||||
fingerprints_from_key.extend(matches)
|
||||
|
||||
if not fingerprints_from_key:
|
||||
MessageDialog(
|
||||
"warning", LocaleStrings.MSGGPG["corrupted_file"]
|
||||
).show()
|
||||
return False
|
||||
|
||||
# Check if any of the extracted fingerprints match the expected one
|
||||
found = any(fp == expected_fingerprint for fp in fingerprints_from_key)
|
||||
if not found:
|
||||
MessageDialog(
|
||||
"warning",
|
||||
f"{LocaleStrings.MSGGPG['mismatch']}\n{expected_fingerprint}\n{LocaleStrings.MSGGPG['but_got']}\n{fingerprints_from_key}",
|
||||
wraplength=450,
|
||||
).show()
|
||||
|
||||
return False
|
||||
|
||||
else:
|
||||
# Import key into GPG keyring
|
||||
result_import = subprocess.run(
|
||||
["gpg", "--import", key_path],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
if result_import.returncode == 0:
|
||||
print(f"Public key from {key_url} successfully imported.")
|
||||
return True
|
||||
else:
|
||||
MessageDialog(
|
||||
"error",
|
||||
f"{LocaleStrings.MSGGPG['failed_import']}{result_import.stderr}",
|
||||
).show()
|
||||
return False
|
||||
|
||||
else:
|
||||
MessageDialog(
|
||||
"error",
|
||||
f"{LocaleStrings.MSGGPG['fingerprint_extract']}{result.stderr}",
|
||||
)
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
MessageDialog(
|
||||
"error", f"{LocaleStrings.MSGGPG['error_import_key']}{key_url}: {e}"
|
||||
)
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def update_gpg_trust_level(key_id: str, trust_level: int = 5) -> bool:
|
||||
"""
|
||||
Sets the trust level for a specified GPG public key.
|
||||
|
||||
Args:
|
||||
key_id (str): The hexadecimal ID of the key to update.
|
||||
trust_level (int): Trust level (1-5). Default is 5 (fully trusted).
|
||||
|
||||
Returns:
|
||||
bool: True if the trust level was successfully updated, False otherwise.
|
||||
"""
|
||||
script = f"""#!/bin/bash
|
||||
# Set required environment variables
|
||||
export GPG_TTY=$(tty)
|
||||
export GNUPG_STATUS="1"
|
||||
export GNUPGAGENT_INFO_FILE="/dev/null"
|
||||
gpg --batch \
|
||||
--no-tty \
|
||||
--command-fd 0 \
|
||||
--edit-key {key_id} << EOF
|
||||
trust
|
||||
{trust_level}
|
||||
quit
|
||||
EOF
|
||||
"""
|
||||
|
||||
try:
|
||||
|
||||
from message import MessageDialog
|
||||
from manager import LocaleStrings
|
||||
|
||||
# Create temporary script file
|
||||
temp_script = tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".sh", delete=False, encoding="utf-8"
|
||||
)
|
||||
temp_script.write(script)
|
||||
temp_script.close()
|
||||
|
||||
print(f"Setting trust level {trust_level} for key {key_id}")
|
||||
result = subprocess.run(
|
||||
["bash", temp_script.name], capture_output=True, text=True, check=True
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
|
||||
MessageDialog(
|
||||
"error",
|
||||
f"{LocaleStrings.MSGGPG['error_updating_trust_level']}{result.stderr}",
|
||||
).show()
|
||||
|
||||
except (subprocess.CalledProcessError, FileNotFoundError) as e:
|
||||
MessageDialog(
|
||||
"error", f"{LocaleStrings.MSGGPG['error_executing_script']}{e}"
|
||||
).show()
|
||||
return False
|
||||
|
||||
finally:
|
||||
try:
|
||||
os.unlink(temp_script.name)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class GiteaUpdater:
|
||||
@staticmethod
|
||||
def get_latest_version_from_api(url: str, current_version: str = "") -> str:
|
||||
"""
|
||||
Fetches the latest version of a project from the Gitea API.
|
||||
|
||||
Args:
|
||||
url (str): The URL to query the Gitea API for releases.
|
||||
current_version (str, optional): Not used in this implementation. Defaults to "".
|
||||
|
||||
Returns:
|
||||
str: Latest version tag name without the 'v' prefix.
|
||||
If an error occurs, returns "Unknown".
|
||||
"""
|
||||
try:
|
||||
with urllib.request.urlopen(url, timeout=10) as response:
|
||||
data = json.loads(response.read().decode())
|
||||
if data and len(data) > 0:
|
||||
latest_version = data[0].get("tag_name", "Unknown")
|
||||
return latest_version.lstrip("v") # Remove 'v' prefix if present
|
||||
return latest_version.lstrip("v") # Remove 'v' prefix
|
||||
return "Unknown"
|
||||
except Exception as e:
|
||||
print(f"API Error: {e}")
|
||||
return "Unknown"
|
||||
|
||||
@staticmethod
|
||||
def download_appimage(
|
||||
version: str = "latest",
|
||||
verify_checksum: bool = False,
|
||||
verify_signature: bool = False,
|
||||
) -> Union[str, None]:
|
||||
"""
|
||||
Downloads the AppImage file to /tmp/portinstaller and performs verification checks.
|
||||
|
||||
Args:
|
||||
version (str): 'latest' for the latest version or a specific version.
|
||||
verify_checksum (bool): Whether to perform SHA256 checksum verification.
|
||||
verify_signature (bool): Whether to validate GPG signature.
|
||||
|
||||
Returns:
|
||||
str: Full path to the AppImage if all checks pass. Otherwise, returns an error message.
|
||||
None: If an exception occurs during download or verification.
|
||||
"""
|
||||
base_url = "https://git.ilunix.de/punix/lxtools_installer/releases/download/"
|
||||
|
||||
from message import MessageDialog
|
||||
from manager import LocaleStrings
|
||||
|
||||
# Get latest version from API if not provided
|
||||
if version == "latest":
|
||||
try:
|
||||
|
||||
with urllib.request.urlopen(
|
||||
"https://git.ilunix.de/api/v1/repos/punix/lxtools_installer/releases?limit=1",
|
||||
timeout=10,
|
||||
) as response:
|
||||
data = json.loads(response.read().decode())
|
||||
latest_version = data[0].get("tag_name")
|
||||
if not latest_version:
|
||||
print(LocaleStrings.MSGA["Failed_retrieving"])
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"{LocaleStrings.MSGA['Error_retrieving']}{e}")
|
||||
return None
|
||||
|
||||
else:
|
||||
latest_version = version
|
||||
|
||||
# Create /tmp/portinstaller directory if it doesn't exist
|
||||
download_dir = "/tmp/portinstaller"
|
||||
os.makedirs(download_dir, exist_ok=True)
|
||||
|
||||
filename = f"{download_dir}/lxtools_installer{latest_version}-x86_64.AppImage"
|
||||
appimage_url = f"{base_url}{latest_version}/{filename.split('/')[-1]}"
|
||||
checksum_url = f"{base_url}{latest_version}/{filename.split('/')[-1]}.sha256"
|
||||
signature_url = f"{base_url}{latest_version}/{filename.split('/')[-1]}.asc"
|
||||
|
||||
try:
|
||||
print("Downloading AppImage Updater...")
|
||||
urllib.request.urlretrieve(appimage_url, filename)
|
||||
|
||||
# SHA256 checksum verification
|
||||
result = None
|
||||
if verify_checksum:
|
||||
try:
|
||||
with urllib.request.urlopen(checksum_url, timeout=10) as response:
|
||||
checksum_content = response.read().decode()
|
||||
expected_hash, _ = checksum_content.strip().split(" ")
|
||||
except Exception as e:
|
||||
|
||||
result = MessageDialog(
|
||||
"ask",
|
||||
f"{LocaleStrings.MSGA['SHA256_File_not_found']}{e} {LocaleStrings.MSGA['SHA256_File_not_found1']}",
|
||||
buttons=["Yes", "No"],
|
||||
).show()
|
||||
|
||||
if not result:
|
||||
return "Checksum file not found"
|
||||
|
||||
if result:
|
||||
pass
|
||||
else:
|
||||
with open(filename, "rb") as f:
|
||||
file_hash = hashlib.sha256(f.read()).hexdigest()
|
||||
|
||||
if expected_hash != file_hash:
|
||||
|
||||
MessageDialog(
|
||||
"warning", LocaleStrings.MSGA["SHA256 hash mismatch"]
|
||||
).show()
|
||||
return "Checksum mismatch"
|
||||
|
||||
# GPG signature verification
|
||||
if verify_signature:
|
||||
signature_path = f"{download_dir}/{filename.split('/')[-1]}.asc"
|
||||
try:
|
||||
urllib.request.urlretrieve(signature_url, signature_path)
|
||||
except Exception as e:
|
||||
|
||||
MessageDialog(
|
||||
"error", f"{LocaleStrings.MSGA['not_gpg_found']}{e}"
|
||||
).show()
|
||||
return "Signature file not found"
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["gpg", "--verify", signature_path, filename],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
print(LocaleStrings.MSGA["gpg_verify_success"])
|
||||
except Exception as e:
|
||||
from message import MessageDialog
|
||||
|
||||
MessageDialog(
|
||||
"error", f"{LocaleStrings.MSGA['error_gpg_check']}{e.stderr}"
|
||||
).show()
|
||||
return "Signature verification failed"
|
||||
|
||||
# Return the full path to the AppImage
|
||||
return filename
|
||||
|
||||
except Exception as e:
|
||||
from message import MessageDialog
|
||||
|
||||
MessageDialog(
|
||||
"error", f"{LocaleStrings.MSGA['error_gpg_download']}{e}"
|
||||
).show()
|
||||
return None
|
||||
|
||||
|
||||
class NetworkChecker:
|
||||
@staticmethod
|
||||
def check_internet_connection(host="8.8.8.8", port=53, timeout=3):
|
||||
"""Check if internet connection is available"""
|
||||
def check_internet_connection(
|
||||
host: str = "8.8.8.8", port: int = 53, timeout: float = 3
|
||||
) -> bool:
|
||||
"""
|
||||
Checks if an internet connection is available.
|
||||
|
||||
Args:
|
||||
host (str): Host to connect to for testing. Defaults to "8.8.8.8" (Google DNS).
|
||||
port (int): Port number to use for the test. Defaults to 53.
|
||||
timeout (float): Timeout in seconds for the connection attempt. Defaults to 3.
|
||||
|
||||
Returns:
|
||||
bool: True if internet is available, False otherwise.
|
||||
"""
|
||||
try:
|
||||
socket.setdefaulttimeout(timeout)
|
||||
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port))
|
||||
@ -31,10 +431,22 @@ class NetworkChecker:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def check_repository_access(url="https://git.ilunix.de", timeout=5):
|
||||
"""Check if repository is accessible"""
|
||||
def check_repository_access(
|
||||
url: str = "https://git.ilunix.de", timeout: float = 5
|
||||
) -> bool:
|
||||
"""
|
||||
Checks if the Gitea repository is accessible.
|
||||
|
||||
Args:
|
||||
url (str): The URL of the Gitea repository. Defaults to "https://git.ilunix.de".
|
||||
timeout (float): Timeout in seconds for the connection attempt. Defaults to 5.
|
||||
|
||||
Returns:
|
||||
bool: True if the repository is accessible, False otherwise.
|
||||
"""
|
||||
try:
|
||||
urllib.request.urlopen(url, timeout=timeout)
|
||||
return True
|
||||
except:
|
||||
except Exception as e:
|
||||
print(f"Error accessing Gitea repository: {e}")
|
||||
return False
|
||||
|
Reference in New Issue
Block a user