Wire-Py/tunnel.py
2025-06-04 18:49:17 +02:00

231 lines
8.0 KiB
Python

#!/usr/bin/python3
import logging
import getpass
import zipfile
from datetime import datetime
from pathlib import Path
import shutil
from subprocess import run, CompletedProcess
import secrets
from shared_libs.wp_app_config import AppConfig, Msg
from shared_libs.common_tools import LxTools, CryptoUtil
# Translate
_ = AppConfig.setup_translations()
class Tunnel:
"""
Class of Methods for Wire-Py
"""
@staticmethod
def parse_files_to_dictionary(
directory: Path = None, filepath: str = None, content: str = None
) -> tuple[dict, str] | dict | None:
data = {}
if filepath is not None:
filepath = Path(filepath)
try:
content = filepath.read_text()
# parse the content
address_line = next(
line for line in content.splitlines() if line.startswith("Address")
)
dns_line = next(
line for line in content.splitlines() if line.startswith("DNS")
)
endpoint_line = next(
line for line in content.splitlines() if line.startswith("Endpoint")
)
private_key_line = next(
line
for line in content.splitlines()
if line.startswith("PrivateKey")
)
content = secrets.token_bytes(len(content))
# extract the values
address = address_line.split("=")[1].strip()
dns = dns_line.split("=")[1].strip()
endpoint = endpoint_line.split("=")[1].strip()
private_key = private_key_line.split("=")[1].strip()
# Shorten the tunnel name to the maximum allowed length if it exceeds 12 characters.
original_stem = filepath.stem
truncated_stem = (
original_stem[-12:] if len(original_stem) > 12 else original_stem
)
# save in the dictionary
data[truncated_stem] = {
"Address": address,
"DNS": dns,
"Endpoint": endpoint,
"PrivateKey": private_key,
}
content = secrets.token_bytes(len(content))
except StopIteration:
pass
elif directory is not None:
if not directory.exists() or not directory.is_dir():
logging.error(
"Temp directory does not exist or is not a directory.",
exc_info=True,
)
return None
# Get a list of all files in the directory
files = [file for file in AppConfig.TEMP_DIR.iterdir() if file.is_file()]
# Search for the string in the files
for file in files:
try:
content = file.read_text()
# parse the content
address_line = next(
line
for line in content.splitlines()
if line.startswith("Address")
)
dns_line = next(
line for line in content.splitlines() if line.startswith("DNS")
)
endpoint_line = next(
line
for line in content.splitlines()
if line.startswith("Endpoint")
)
# extract values
address = address_line.split("=")[1].strip()
dns = dns_line.split("=")[1].strip()
endpoint = endpoint_line.split("=")[1].strip()
# save values to dictionary
data[file.stem] = {
"Address": address,
"DNS": dns,
"Endpoint": endpoint,
}
except Exception:
# Ignore errors and continue to the next file
continue
if content is not None:
content = secrets.token_bytes(len(content))
if filepath is not None:
return data, truncated_stem
else:
return data
@staticmethod
def get_active() -> str:
"""
Shows the Active Tunnel
"""
active = None
try:
process: CompletedProcess[str] = run(
["nmcli", "-t", "-f", "NAME,TYPE", "connection", "show", "--active"],
capture_output=True,
text=True,
check=False,
)
active = next(
line.split(":")[0].strip()
for line in process.stdout.splitlines()
if line.endswith("wireguard")
)
if process.stderr and "error" in process.stderr.lower():
logging.error(f"Error output on nmcli: {process.stderr}")
except StopIteration:
active = None
except Exception as e:
logging.error(f"Error on nmcli: {e}")
active = None
return active if active is not None else ""
@staticmethod
def export() -> bool | None:
"""
This will export the tunnels.
A zipfile with the current date and time is created
in the user's home directory with the correct right
"""
now_time: datetime = datetime.now()
now_datetime: str = now_time.strftime("wg-exp-%m-%d-%Y-%H:%M")
try:
AppConfig.ensure_directories()
CryptoUtil.decrypt(getpass.getuser())
if len([file.name for file in AppConfig.TEMP_DIR.glob("*.conf")]) == 0:
LxTools.msg_window(
AppConfig.IMAGE_PATHS["icon_info"],
AppConfig.IMAGE_PATHS["icon_msg"],
Msg.STR["sel_tl"],
Msg.STR["tl_first"],
)
return False
else:
wg_tar: str = f"{AppConfig.BASE_DIR}/{now_datetime}"
try:
shutil.make_archive(wg_tar, "zip", AppConfig.TEMP_DIR)
with zipfile.ZipFile(f"{wg_tar}.zip", "r") as zf:
if zf.namelist():
LxTools.msg_window(
AppConfig.IMAGE_PATHS["icon_info"],
AppConfig.IMAGE_PATHS["icon_vpn"],
Msg.STR["exp_succ"],
Msg.STR["exp_in_home"],
)
else:
logging.error(
"There was a mistake at creating the Zip file. File is empty."
)
LxTools.msg_window(
AppConfig.IMAGE_PATHS["icon_error"],
AppConfig.IMAGE_PATHS["icon_msg"],
Msg.STR["exp_err"],
Msg.STR["exp_zip"],
)
return False
return True
except PermissionError:
logging.error(
f"Permission denied when creating archive in {wg_tar}"
)
return False
except zipfile.BadZipFile as e:
logging.error(f"Invalid ZIP file: {e}")
return False
except TypeError:
pass
except Exception as e:
logging.error(f"Export failed: {str(e)}")
LxTools.msg_window(
AppConfig.IMAGE_PATHS["icon_error"],
AppConfig.IMAGE_PATHS["icon_msg"],
Msg.STR["exp_err"],
Msg.STR["exp_try"],
)
return False
finally:
LxTools.clean_files(AppConfig.TEMP_DIR)
AppConfig.ensure_directories()