From 55f2119bc37707771ac275a67ad05bd5f674f158 Mon Sep 17 00:00:00 2001 From: punix Date: Mon, 19 May 2025 21:35:14 +0200 Subject: [PATCH] conversion to app and configmanager part 2 export still missing --- common_tools.py | 18 +-- start_wg.py | 11 +- wirepy.py | 391 +++++++++++++++++++++++------------------------ wp_app_config.py | 9 +- 4 files changed, 206 insertions(+), 223 deletions(-) diff --git a/common_tools.py b/common_tools.py index e6583f5..0b14624 100755 --- a/common_tools.py +++ b/common_tools.py @@ -10,7 +10,7 @@ from subprocess import CompletedProcess, run import re import sys import tkinter as tk -from typing import Optional, Dict, Any, NoReturn, TextIO, Tuple, List +from typing import Optional, Dict, Any, NoReturn, List import zipfile from datetime import datetime from pathlib import Path @@ -34,6 +34,9 @@ class CryptoUtil: """ Starts SSL dencrypt """ + crypted_tunnel = [str(file) for file in AppConfig.CONFIG_DIR.glob("*.dat")] + if crypted_tunnel == []: + return process: CompletedProcess[str] = subprocess.run( ["pkexec", "/usr/local/bin/ssl_decrypt.py"], capture_output=True, @@ -41,10 +44,6 @@ class CryptoUtil: check=False, ) - # Output from Openssl - # if process.stdout: - # print(process.stdout) - # Output from Openssl Error if process.stderr: print(process.stderr) @@ -404,7 +403,7 @@ class Tunnel: @staticmethod def parse_files_to_dictionary( - directory: Path = None, filepath: str = None + directory: Path = None, filepath: str = None, content: str = None ) -> dict | str | None: data = {} @@ -453,8 +452,7 @@ class Tunnel: content = secrets.token_bytes(len(content)) - except StopIteration as e: - print(f"Error: {e}") + except StopIteration: pass elif directory is not None: @@ -500,8 +498,8 @@ class Tunnel: except Exception: # Ignore errors and continue to the next file continue - - content = secrets.token_bytes(len(content)) + if content is not None: + content = secrets.token_bytes(len(content)) if filepath is not None: return data, truncated_stem else: diff --git a/start_wg.py b/start_wg.py index 20ca206..4842df3 100755 --- a/start_wg.py +++ b/start_wg.py @@ -3,17 +3,16 @@ This script belongs to wirepy and is for the auto start of the tunnel """ -from pathlib import Path import subprocess from subprocess import CompletedProcess +from wp_app_config import AppConfig +from common_tools import ConfigManager -path_to_file = Path(Path.home() / ".config/wire_py/settings") +ConfigManager.init(AppConfig.SETTINGS_FILE) -a_con = Path(path_to_file).read_text(encoding="utf-8").splitlines(keepends=True) -a_con = a_con[7].strip() -if a_con != "off": +if ConfigManager.get("autostart") != "off": process: CompletedProcess[str] = subprocess.run( - ["nmcli", "connection", "up", a_con], + ["nmcli", "connection", "up", ConfigManager.get("autostart")], capture_output=True, text=True, check=False, diff --git a/wirepy.py b/wirepy.py index 98da4c1..2e75505 100755 --- a/wirepy.py +++ b/wirepy.py @@ -4,7 +4,6 @@ this script is a simple GUI for managing Wireguard Tunnels """ import getpass -import os import shutil import subprocess import sys @@ -243,7 +242,6 @@ class FrameWidgets(ttk.Frame): # Tunnel List self.tl = Tunnel.parse_files_to_dictionary(directory=AppConfig.TEMP_DIR) - LxTools.clean_files(AppConfig.TEMP_DIR, file=None) AppConfig.ensure_directories() # self.tl = LxTools.get_file_name(AppConfig.TEMP_DIR) @@ -638,137 +636,133 @@ class FrameWidgets(ttk.Frame): try: filepath = filedialog.askopenfilename( initialdir=f"{Path.home()}", - title=_("Select Wireguard config File"), - filetypes=[(_("WG config files"), "*.conf")], + title="Select Wireguard config File", + filetypes=[("WG config files", "*.conf")], ) - # Check if a file was selected - if not filepath: - return + data_import, key_name = Tunnel.parse_files_to_dictionary(filepath=filepath) - filepath = Path(filepath) - read = filepath.read_text(encoding="utf-8") - - if ( - "PrivateKey = " in read - and "PublicKey = " in read - and "Endpoint = " in read - ): - key = Tunnel.con_to_dict(read) - pre_key = key[3] - - if len(pre_key) != 0: - p_key = AppConfig.KEYS_FILE.read_text(encoding="utf-8") - - if pre_key in p_key or f"{pre_key}\n" in p_key: - LxTools.msg_window( - AppConfig.IMAGE_PATHS["icon_error"], - AppConfig.IMAGE_PATHS["icon_msg"], - Msg.STR["imp_err"], - Msg.STR["tl_exist"], - ) - else: - with open( - AppConfig.KEYS_FILE, "a", encoding="utf-8" - ) as keyfile: - keyfile.write(f"{pre_key}\r") - - if len(path_split1) > 17: - p1 = shutil.copy(filepath, AppConfig.TEMP_DIR) - path_split = path_split1[len(path_split1) - 17 :] - os.rename(p1, f"{AppConfig.TEMP_DIR}/{path_split}") - new_conf = f"{AppConfig.TEMP_DIR}/{path_split}" - - if self.a != "": - process: CompletedProcess[str] = subprocess.run( - ["nmcli", "connection", "down", self.a] - ) - self.reset_fields() - - process: CompletedProcess[str] = subprocess.run( - [ - "nmcli", - "connection", - "import", - "type", - "wireguard", - "file", - new_conf, - ], - text=True, - ) - - CryptoUtil.encrypt() - else: - shutil.copy(filepath, f"{AppConfig.TEMP_DIR}/") - - if self.a != "": - process: CompletedProcess[str] = subprocess.run( - ["nmcli", "connection", "down", self.a] - ) - self.reset_fields() - - process: CompletedProcess[str] = subprocess.run( - [ - "nmcli", - "connection", - "import", - "type", - "wireguard", - "file", - filepath, - ], - text=True, - ) - - CryptoUtil.encrypt() - self.str_var.set("") - self.a = Tunnel.active() - self.l_box.insert(0, self.a) - self.wg_autostart.configure(state="normal") - self.l_box.selection_clear(0, tk.END) - self.l_box.update() - self.l_box.selection_set(0) - - Tooltip( - self.wg_autostart, - Msg.TTIP["autostart"], - self.tooltip_state, - x_offset=-10, - y_offset=-40, - ) - Tooltip(self.btn_tr, Msg.TTIP["trash_tl"], self.tooltip_state) - Tooltip(self.btn_exp, Msg.TTIP["export_tl"], self.tooltip_state) - Tooltip( - self.btn_rename, Msg.TTIP["rename_tl"], self.tooltip_state - ) - - self.lb_rename.insert(0, "Max. 12 characters!") - self.str_var = tk.StringVar() - self.str_var.set(self.a) - self.color_label() - self.stop() - self.handle_tunnel_data(self.a, self.tl) - process: CompletedProcess[str] = subprocess.run( - [ - "nmcli", - "con", - "mod", - self.a, - "connection.autoconnect", - "no", - ] - ) - - elif ("PrivateKey = " in read) and ("Endpoint = " in read): - pass - else: + if CryptoUtil.find_key(f"{data_import[key_name]["PrivateKey"]}="): LxTools.msg_window( AppConfig.IMAGE_PATHS["icon_error"], AppConfig.IMAGE_PATHS["icon_msg"], Msg.STR["imp_err"], - Msg.STR["no_valid_file"], + Msg.STR["tl_exist"], ) + elif not CryptoUtil.is_valid_base64( + f"{data_import[key_name]["PrivateKey"]}=" + ): # 2. Second check: Is it valid Base64? + LxTools.msg_window( + AppConfig.IMAGE_PATHS["icon_error"], + AppConfig.IMAGE_PATHS["icon_msg"], + Msg.STR["imp_err"], + Msg.STR["invalid_base64"], + ) + else: + print("Key is valid and does not exist – import allowed!") + filepath = Path(filepath) + # Shorten the tunnel name to the maximum allowed length if it exceeds 12 characters. + original_name = filepath.name + truncated_name = ( + original_name[-17:] if len(original_name) > 17 else original_name + ) + import_file = shutil.copy2( + filepath, AppConfig.TEMP_DIR / truncated_name + ) + import_file = Path(import_file) + + del data_import[key_name]["PrivateKey"] + self.tl.update(data_import) + + if self.a != "": + process: CompletedProcess[str] = subprocess.run( + ["nmcli", "connection", "down", self.a], + capture_output=True, + text=True, + check=False, + ) + + if process.stderr: + print(process.stderr) + + else: + print(f"Error process decrypt: Code {process.returncode}") + self.reset_fields() + + process: CompletedProcess[str] = subprocess.run( + [ + "nmcli", + "connection", + "import", + "type", + "wireguard", + "file", + import_file, + ], + capture_output=True, + text=True, + check=False, + ) + + if process.stderr: + print(process.stderr) + + if process.returncode == 0: + print(f"Tunnel >> {import_file.stem} << import successfull") + else: + print(f"Error process decrypt: Code {process.returncode}") + + CryptoUtil.encrypt() + LxTools.clean_files(AppConfig.TEMP_DIR, file=None) + AppConfig.ensure_directories() + self.str_var.set("") + self.a = Tunnel.active() + self.l_box.insert(0, self.a) + self.wg_autostart.configure(state="normal") + self.l_box.selection_clear(0, tk.END) + self.l_box.update() + self.l_box.selection_set(0) + + Tooltip( + self.wg_autostart, + Msg.TTIP["autostart"], + self.tooltip_state, + x_offset=-10, + y_offset=-40, + ) + Tooltip(self.btn_tr, Msg.TTIP["trash_tl"], self.tooltip_state) + Tooltip(self.btn_exp, Msg.TTIP["export_tl"], self.tooltip_state) + Tooltip(self.btn_rename, Msg.TTIP["rename_tl"], self.tooltip_state) + + self.lb_rename.insert(0, "Max. 12 characters!") + self.str_var = tk.StringVar() + self.str_var.set(self.a) + self.color_label() + self.stop() + self.handle_tunnel_data(self.a, self.tl) + self.show_data() + process: CompletedProcess[str] = subprocess.run( + ["nmcli", "con", "mod", self.a, "connection.autoconnect", "no"], + capture_output=True, + text=True, + check=False, + ) + + if process.stderr: + print(process.stderr) + + if process.returncode == 0: + print(f">> {import_file.stem} << autostart is disabled by default") + + except UnboundLocalError: + LxTools.msg_window( + AppConfig.IMAGE_PATHS["icon_error"], + AppConfig.IMAGE_PATHS["icon_msg"], + Msg.STR["imp_err"], + Msg.STR["no_valid_file"], + ) + except IsADirectoryError: + print("File import: abort by user...") except EOFError as e: print(e) except TypeError: @@ -785,37 +779,30 @@ class FrameWidgets(ttk.Frame): try: self.select_tunnel = self.l_box.curselection() select_tl = self.l_box.get(self.select_tunnel[0]) - with open( - f"/tmp/tlecdcwg/{select_tl}.conf", "r+", encoding="utf-8" - ) as file2: - key = Tunnel.con_to_dict(file2) - pre_key = key[3] + process: CompletedProcess[str] = subprocess.run( - ["nmcli", "connection", "delete", select_tl] + ["nmcli", "connection", "delete", select_tl], + capture_output=True, + text=True, + check=False, ) + + if process.stderr: + print(process.stderr) + + if process.returncode == 0: + print(f"Tunnel >> {select_tl} << successfully deleted...") + else: + print(f"Error process: Code {process.returncode}") + self.l_box.delete(self.select_tunnel[0]) - with open(AppConfig.SETTINGS_FILE, "r", encoding="utf-8") as set_f6: - lines6 = set_f6.readlines() - if select_tl == lines6[7].strip() and "off\n" not in lines6[7].strip(): - lines6[7] = "off\n" - with open(AppConfig.SETTINGS_FILE, "w", encoding="utf-8") as set_f7: - set_f7.writelines(lines6) - self.selected_option.set(0) - self.autoconnect_var.set(_("no Autoconnect")) - is_encrypt = Path.home() / f".config/wire_py/{select_tl}.dat" - if is_encrypt.is_file(): - Path.unlink(f"{Path.home()}/.config/wire_py/{select_tl}.dat") - Path.unlink(f"/tmp/tlecdcwg/{select_tl}.conf") - with open(AppConfig.KEYS_FILE, "r", encoding="utf-8") as readfile: - with open( - f"{Path.home()}/.config/wire_py/keys2", "w", encoding="utf-8" - ) as writefile: - for line in readfile: - if pre_key not in line.strip("\n"): - writefile.write(line) - file_one = Path(f"{Path.home()}/.config/wire_py/keys2") - file_two = file_one.with_name("keys") - file_one.replace(file_two) + Path.unlink(f"{AppConfig.CONFIG_DIR}/{select_tl}.dat") + + if select_tl == ConfigManager.get("autostart"): + ConfigManager.set("autostart", "off") + self.selected_option.set(0) + self.autoconnect_var.set(_("no Autoconnect")) + self.wg_autostart.configure(state="disabled") # for disabling checkbox when Listbox empty @@ -878,22 +865,18 @@ class FrameWidgets(ttk.Frame): Set (on), the selected tunnel is displayed in the label. At (off) the label is first emptied then filled with No Autoconnect """ - lines = ( - Path(AppConfig.SETTINGS_FILE) - .read_text(encoding="utf-8") - .splitlines(keepends=True) - ) - if lines[7] != "off\n": - print(f"{lines[7]} starts automatically when the system starts.") + if ConfigManager.get("autostart") != "off": + print( + f"{ConfigManager.get("autostart")} starts automatically when the system starts." + ) self.selected_option.set(1) self.autoconnect_var.set("") - self.auto_con = lines[7] + self.auto_con = ConfigManager.get("autostart") else: self.selected_option.set(0) self.auto_con = _("no Autoconnect") - print("Autostart disabled.") self.autoconnect_var.set("") self.autoconnect_var = tk.StringVar() self.autoconnect_var.set(self.auto_con) @@ -923,31 +906,15 @@ class FrameWidgets(ttk.Frame): select_tl = self.l_box.get(select_tunnel[0]) if self.selected_option.get() == 0: - lines = ( - Path(AppConfig.SETTINGS_FILE) - .read_text(encoding="utf-8") - .splitlines(keepends=True) - ) - lines[7] = "off\n" - Path(AppConfig.SETTINGS_FILE).write_text( - "".join(lines), encoding="utf-8" - ) + ConfigManager.set("autostart", "off") - tl = LxTools.get_file_name(AppConfig.TEMP_DIR) + tl = [f"{file}" for file in AppConfig.CONFIG_DIR.glob("*.dat")] if len(tl) == 0: self.wg_autostart.configure(state="disabled") if self.selected_option.get() >= 1: - lines = ( - Path(AppConfig.SETTINGS_FILE) - .read_text(encoding="utf-8") - .splitlines(keepends=True) - ) - lines[7] = select_tl - Path(AppConfig.SETTINGS_FILE).write_text( - "".join(lines), encoding="utf-8" - ) + ConfigManager.set("autostart", select_tl) except IndexError: self.selected_option.set(1) @@ -1004,7 +971,7 @@ class FrameWidgets(ttk.Frame): select_tl = self.l_box.get(self.select_tunnel[0]) # nmcli connection modify old connection.id iphone - subprocess.check_output( + process: CompletedProcess[str] = subprocess.run( [ "nmcli", "connection", @@ -1013,30 +980,28 @@ class FrameWidgets(ttk.Frame): "connection.id", self.lb_rename.get(), ], + capture_output=True, text=True, + check=False, ) - source = Path(f"/tmp/tlecdcwg/{select_tl}.conf") - destination = source.with_name(f"{self.lb_rename.get()}.conf") - source.replace(destination) - Path.unlink(f"{Path.home()}/.config/wire_py/{select_tl}.dat") + if process.stderr: + print(process.stderr) + if process.returncode != 0: + print(f"Error process: Code {process.returncode}") + + source = Path(f"{AppConfig.CONFIG_DIR}/{select_tl}.dat") + destination = AppConfig.CONFIG_DIR / f"{self.lb_rename.get()}.dat" + source.replace(destination) + self.tl[self.lb_rename.get()] = self.tl.pop(select_tl) + if select_tl == ConfigManager.get("autostart"): + ConfigManager.set("autostart", self.lb_rename.get()) + self.autoconnect_var.set(value=self.lb_rename.get()) self.l_box.delete(self.select_tunnel[0]) self.l_box.insert("end", self.lb_rename.get()) self.l_box.update() - new_a_connect = self.lb_rename.get() self.lb_rename.delete(0, tk.END) - - with open(AppConfig.SETTINGS_FILE, "r", encoding="utf-8") as set_f5: - lines5 = set_f5.readlines() - if select_tl == lines5[7].strip() and "off\n" not in lines5[7].strip(): - lines5[7] = new_a_connect - with open( - AppConfig.SETTINGS_FILE, "w", encoding="utf-8" - ) as theme_set5: - theme_set5.writelines(lines5) - self.autoconnect_var.set(value=new_a_connect) self.update_connection_display() - CryptoUtil.encrypt() except IndexError: @@ -1059,9 +1024,9 @@ class FrameWidgets(ttk.Frame): values = data[tunnel] # Address Label self.add = tk.StringVar() - self.add.set(f" Address: {values['Address']}") + self.add.set(f"Address: {values['Address']}") self.DNS = tk.StringVar() - self.DNS.set(f" DNS: {values['DNS']}") + self.DNS.set(f" DNS: {values['DNS']}") self.enp = tk.StringVar() self.enp.set(f"Endpoint: {values['Endpoint']}") @@ -1134,8 +1099,18 @@ class FrameWidgets(ttk.Frame): if action == "stop": if self.a: process: CompletedProcess[str] = subprocess.run( - ["nmcli", "connection", "down", self.a] + ["nmcli", "connection", "down", self.a], + capture_output=True, + text=True, + check=False, ) + + if process.stderr: + print(process.stderr) + + if process.returncode != 0: + print(f"Error process: Code {process.returncode}") + self.update_connection_display() self.reset_fields() self.start() @@ -1144,8 +1119,20 @@ class FrameWidgets(ttk.Frame): if tunnel_name or self.a: target_tunnel = tunnel_name or self.a process: CompletedProcess[str] = subprocess.run( - ["nmcli", "connection", "up", target_tunnel] + ["nmcli", "connection", "up", target_tunnel], + capture_output=True, + text=True, + check=False, ) + + if process.stderr: + print(process.stderr) + + if process.returncode == 0: + print(f"Tunnel >> {target_tunnel} << started") + else: + print(f"Error process: Code {process.returncode}") + self.update_connection_display() self.handle_tunnel_data(self.a, self.tl) self.show_data() diff --git a/wp_app_config.py b/wp_app_config.py index 8f34591..39e827f 100644 --- a/wp_app_config.py +++ b/wp_app_config.py @@ -24,7 +24,6 @@ class AppConfig: # Configuration files SETTINGS_FILE: Path = CONFIG_DIR / "settings" - KEYS_FILE: Path = CONFIG_DIR / "keys" SYSTEMD_USER_FOLDER: Path = Path.home() / ".config/systemd/user" AUTOSTART_SERVICE: Path = Path.home() / ".config/systemd/user/wg_start.service" DEFAULT_SETTINGS: Dict[str, str] = { @@ -89,10 +88,7 @@ class AppConfig: """Ensures that all required directories exist""" if not cls.CONFIG_DIR.exists(): cls.CONFIG_DIR.mkdir(parents=True, exist_ok=True) - cls.KEYS_FILE.touch() cls.TEMP_DIR.mkdir(parents=True, exist_ok=True) - if not cls.KEYS_FILE.exists(): - cls.KEYS_FILE.touch() @classmethod def create_default_settings(cls) -> None: @@ -180,7 +176,7 @@ class Msg: "sel_list": _("Please select a tunnel from the list"), "sign_len": _("The new name may contain only 12 characters"), "zero_signs": _("At least one character must be entered"), - "false signs": _( + "false_signs": _( "No valid sign. These must not be used.\nBlank, Slash, Backslash and { }\n" ), "is_in_use": _("The tunnel is already in use"), @@ -188,6 +184,9 @@ class Msg: "Oh... no valid Wireguard File!\nPlease select a valid Wireguard File" ), "tl_exist": _("Tunnel already available!\nPlease use another file for import"), + "invalid_base64": _( + "Invalid base64 format!\nPlease use a Config file with valid key." + ), } TTIP: Dict[str, str] = { # Strings for Tooltips