231 lines
8.0 KiB
Python
231 lines
8.0 KiB
Python
#!/usr/bin/python3
|
|
import logging
|
|
import getpass
|
|
import zipfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import shutil
|
|
from subprocess import run, CompletedProcess
|
|
import secrets
|
|
from shared_libs.wp_app_config import AppConfig, Msg
|
|
from shared_libs.common_tools import LxTools, CryptoUtil
|
|
|
|
# Translate
|
|
_ = AppConfig.setup_translations()
|
|
|
|
|
|
class Tunnel:
|
|
"""
|
|
Class of Methods for Wire-Py
|
|
"""
|
|
|
|
@staticmethod
|
|
def parse_files_to_dictionary(
|
|
directory: Path = None, filepath: str = None, content: str = None
|
|
) -> tuple[dict, str] | dict | None:
|
|
data = {}
|
|
|
|
if filepath is not None:
|
|
filepath = Path(filepath)
|
|
try:
|
|
content = filepath.read_text()
|
|
|
|
# parse the content
|
|
address_line = next(
|
|
line for line in content.splitlines() if line.startswith("Address")
|
|
)
|
|
dns_line = next(
|
|
line for line in content.splitlines() if line.startswith("DNS")
|
|
)
|
|
endpoint_line = next(
|
|
line for line in content.splitlines() if line.startswith("Endpoint")
|
|
)
|
|
private_key_line = next(
|
|
line
|
|
for line in content.splitlines()
|
|
if line.startswith("PrivateKey")
|
|
)
|
|
|
|
content = secrets.token_bytes(len(content))
|
|
|
|
# extract the values
|
|
address = address_line.split("=")[1].strip()
|
|
dns = dns_line.split("=")[1].strip()
|
|
endpoint = endpoint_line.split("=")[1].strip()
|
|
private_key = private_key_line.split("=")[1].strip()
|
|
|
|
# Shorten the tunnel name to the maximum allowed length if it exceeds 12 characters.
|
|
original_stem = filepath.stem
|
|
truncated_stem = (
|
|
original_stem[-12:] if len(original_stem) > 12 else original_stem
|
|
)
|
|
|
|
# save in the dictionary
|
|
data[truncated_stem] = {
|
|
"Address": address,
|
|
"DNS": dns,
|
|
"Endpoint": endpoint,
|
|
"PrivateKey": private_key,
|
|
}
|
|
|
|
content = secrets.token_bytes(len(content))
|
|
|
|
except StopIteration:
|
|
pass
|
|
|
|
elif directory is not None:
|
|
|
|
if not directory.exists() or not directory.is_dir():
|
|
logging.error(
|
|
"Temp directory does not exist or is not a directory.",
|
|
exc_info=True,
|
|
)
|
|
return None
|
|
|
|
# Get a list of all files in the directory
|
|
files = [file for file in AppConfig.TEMP_DIR.iterdir() if file.is_file()]
|
|
|
|
# Search for the string in the files
|
|
for file in files:
|
|
try:
|
|
content = file.read_text()
|
|
# parse the content
|
|
address_line = next(
|
|
line
|
|
for line in content.splitlines()
|
|
if line.startswith("Address")
|
|
)
|
|
dns_line = next(
|
|
line for line in content.splitlines() if line.startswith("DNS")
|
|
)
|
|
endpoint_line = next(
|
|
line
|
|
for line in content.splitlines()
|
|
if line.startswith("Endpoint")
|
|
)
|
|
|
|
# extract values
|
|
address = address_line.split("=")[1].strip()
|
|
dns = dns_line.split("=")[1].strip()
|
|
endpoint = endpoint_line.split("=")[1].strip()
|
|
|
|
# save values to dictionary
|
|
data[file.stem] = {
|
|
"Address": address,
|
|
"DNS": dns,
|
|
"Endpoint": endpoint,
|
|
}
|
|
|
|
except Exception:
|
|
# Ignore errors and continue to the next file
|
|
continue
|
|
if content is not None:
|
|
content = secrets.token_bytes(len(content))
|
|
if filepath is not None:
|
|
return data, truncated_stem
|
|
else:
|
|
return data
|
|
|
|
@staticmethod
|
|
def get_active() -> str:
|
|
"""
|
|
Shows the Active Tunnel
|
|
"""
|
|
active = None
|
|
try:
|
|
process: CompletedProcess[str] = run(
|
|
["nmcli", "-t", "-f", "NAME,TYPE", "connection", "show", "--active"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=False,
|
|
)
|
|
|
|
active = next(
|
|
line.split(":")[0].strip()
|
|
for line in process.stdout.splitlines()
|
|
if line.endswith("wireguard")
|
|
)
|
|
|
|
if process.stderr and "error" in process.stderr.lower():
|
|
logging.error(f"Error output on nmcli: {process.stderr}")
|
|
|
|
except StopIteration:
|
|
active = None
|
|
except Exception as e:
|
|
logging.error(f"Error on nmcli: {e}")
|
|
active = None
|
|
|
|
return active if active is not None else ""
|
|
|
|
@staticmethod
|
|
def export() -> bool | None:
|
|
"""
|
|
This will export the tunnels.
|
|
A zipfile with the current date and time is created
|
|
in the user's home directory with the correct right
|
|
"""
|
|
now_time: datetime = datetime.now()
|
|
now_datetime: str = now_time.strftime("wg-exp-%m-%d-%Y-%H:%M")
|
|
|
|
try:
|
|
AppConfig.ensure_directories()
|
|
CryptoUtil.decrypt(getpass.getuser())
|
|
if len([file.name for file in AppConfig.TEMP_DIR.glob("*.conf")]) == 0:
|
|
|
|
LxTools.msg_window(
|
|
AppConfig.IMAGE_PATHS["icon_info"],
|
|
AppConfig.IMAGE_PATHS["icon_msg"],
|
|
Msg.STR["sel_tl"],
|
|
Msg.STR["tl_first"],
|
|
)
|
|
return False
|
|
else:
|
|
wg_tar: str = f"{AppConfig.BASE_DIR}/{now_datetime}"
|
|
try:
|
|
shutil.make_archive(wg_tar, "zip", AppConfig.TEMP_DIR)
|
|
with zipfile.ZipFile(f"{wg_tar}.zip", "r") as zf:
|
|
if zf.namelist():
|
|
|
|
LxTools.msg_window(
|
|
AppConfig.IMAGE_PATHS["icon_info"],
|
|
AppConfig.IMAGE_PATHS["icon_vpn"],
|
|
Msg.STR["exp_succ"],
|
|
Msg.STR["exp_in_home"],
|
|
)
|
|
else:
|
|
logging.error(
|
|
"There was a mistake at creating the Zip file. File is empty."
|
|
)
|
|
LxTools.msg_window(
|
|
AppConfig.IMAGE_PATHS["icon_error"],
|
|
AppConfig.IMAGE_PATHS["icon_msg"],
|
|
Msg.STR["exp_err"],
|
|
Msg.STR["exp_zip"],
|
|
)
|
|
return False
|
|
return True
|
|
except PermissionError:
|
|
logging.error(
|
|
f"Permission denied when creating archive in {wg_tar}"
|
|
)
|
|
return False
|
|
|
|
except zipfile.BadZipFile as e:
|
|
logging.error(f"Invalid ZIP file: {e}")
|
|
return False
|
|
except TypeError:
|
|
pass
|
|
except Exception as e:
|
|
logging.error(f"Export failed: {str(e)}")
|
|
LxTools.msg_window(
|
|
AppConfig.IMAGE_PATHS["icon_error"],
|
|
AppConfig.IMAGE_PATHS["icon_msg"],
|
|
Msg.STR["exp_err"],
|
|
Msg.STR["exp_try"],
|
|
)
|
|
return False
|
|
|
|
finally:
|
|
LxTools.clean_files(AppConfig.TEMP_DIR)
|
|
AppConfig.ensure_directories()
|