From d6c20b81f92e9a0643f82e275794c1ef0c8e9982 Mon Sep 17 00:00:00 2001 From: punix Date: Sun, 18 May 2025 12:47:52 +0200 Subject: [PATCH] part 1 load data from dictionary works --- common_tools.py | 243 +++++++++++++++++++++++--------------------- install | 7 +- match_found.py | 61 +++++++++++ org.sslcrypt.policy | 10 +- wirepy.py | 79 ++++++-------- wp_app_config.py | 7 -- 6 files changed, 231 insertions(+), 176 deletions(-) create mode 100755 match_found.py diff --git a/common_tools.py b/common_tools.py index ee823cb..e6583f5 100755 --- a/common_tools.py +++ b/common_tools.py @@ -3,14 +3,17 @@ import os import shutil import signal +import base64 +import secrets import subprocess +from subprocess import CompletedProcess, run +import re import sys import tkinter as tk from typing import Optional, Dict, Any, NoReturn, TextIO, Tuple, List import zipfile from datetime import datetime from pathlib import Path -from subprocess import CompletedProcess from tkinter import ttk, Toplevel from wp_app_config import AppConfig, Msg import requests @@ -19,7 +22,7 @@ import requests _ = AppConfig.setup_translations() -class Create: +class CryptoUtil: """ This class is for the creation of the folders and files required by Wire-Py, as well as for decryption @@ -72,6 +75,52 @@ class Create: else: print(f"Error process encrypt: Code {process.returncode}") + @staticmethod + def find_key(key: str = "") -> bool: + """ + Checks if the private key already exists in the system using an external script. + Returns True only if the full key is found exactly (no partial match). + """ + process: CompletedProcess[bool] = run( + ["pkexec", "/usr/local/bin/match_found.py", key], + capture_output=True, + text=True, + check=False, + ) + if "True" in process.stdout: + return True + elif "False" in process.stdout: + return False + print( + f"Unexpected output from the external script:\nSTDOUT: {process.stdout}\nSTDERR: {process.stderr}" + ) + return False + + @staticmethod + def is_valid_base64(key: str) -> bool: + """ + Validates if the input is a valid Base64 string (WireGuard private key format). + Returns True only for non-empty strings that match the expected length. + """ + # Check for empty string + if not key or key.strip() == "": + return False + + # Regex pattern to validate Base64: [A-Za-z0-9+/]+={0,2} + base64_pattern = r"^[A-Za-z0-9+/]+={0,2}$" + if not re.match(base64_pattern, key): + return False + + try: + # Decode and check length (WireGuard private keys are 32 bytes long) + decoded = base64.b64decode(key) + if len(decoded) != 32: # 32 bytes = 256 bits + return False + except Exception: + return False + + return True + class LxTools(tk.Tk): """ @@ -81,57 +130,6 @@ class LxTools(tk.Tk): def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) - @staticmethod - def ckeck_key_is_exist( - directorys: list[str], search_string: str = None - ) -> bool | None: - """ - Check if the key is exist in the file - Args: - directorys (list[str]): list of directories to search in - search_string (str): string to search for - Returns: - bool: True if the key is found, False otherwise - """ - - if search_string: - result = False - - for directory in directorys: - - in_paths = Path(directory) - - if not in_paths.exists() or not in_paths.is_dir(): - continue - - # Get a list of all files in the directorys - files = [file for file in in_paths.iterdir() if file.is_file()] - if not files: - continue - - # Search for the string in the files - for file in files: - try: - with open(file, "r", errors="ignore") as f: - for line in f: - if search_string in line: - # Set the result to True if the string is found - result = True - break - - # If the string is found, stop searching for the string in other files - if result: - break - - except Exception: - # Ignore errors and continue to the next file - continue - - else: - result = None - print(result) - return result - @staticmethod def center_window_cross_platform(window, width, height): """ @@ -405,23 +403,74 @@ class Tunnel: """ @staticmethod - def parse_files_to_dictionary() -> Dict[str, List[str]]: + def parse_files_to_dictionary( + directory: Path = None, filepath: str = None + ) -> dict | str | None: data = {} - if not AppConfig.TEMP_DIR.exists() or not AppConfig.TEMP_DIR.is_dir(): - pass - - # Get a list of all files in the directorys - files = [file for file in AppConfig.TEMP_DIR.iterdir() if file.is_file()] - if not files: - pass - - # Search for the string in the files - for file in files: + if filepath is not None: + filepath = Path(filepath) try: - with open(file, "r") as f: - content = f.read() - # Hier parsen wir die relevanten Zeilen aus dem Inhalt + content = filepath.read_text() + + # parse the content + address_line = next( + line for line in content.splitlines() if line.startswith("Address") + ) + dns_line = next( + line for line in content.splitlines() if line.startswith("DNS") + ) + endpoint_line = next( + line for line in content.splitlines() if line.startswith("Endpoint") + ) + private_key_line = next( + line + for line in content.splitlines() + if line.startswith("PrivateKey") + ) + + content = secrets.token_bytes(len(content)) + + # extract the values + address = address_line.split("=")[1].strip() + dns = dns_line.split("=")[1].strip() + endpoint = endpoint_line.split("=")[1].strip() + private_key = private_key_line.split("=")[1].strip() + + # Shorten the tunnel name to the maximum allowed length if it exceeds 12 characters. + original_stem = filepath.stem + truncated_stem = ( + original_stem[-12:] if len(original_stem) > 12 else original_stem + ) + + # save in the dictionary + data[truncated_stem] = { + "Address": address, + "DNS": dns, + "Endpoint": endpoint, + "PrivateKey": private_key, + } + + content = secrets.token_bytes(len(content)) + + except StopIteration as e: + print(f"Error: {e}") + pass + + elif directory is not None: + + if not directory.exists() or not directory.is_dir(): + print("Temp directory does not exist or is not a directory.") + return None + + # Get a list of all files in the directory + files = [file for file in AppConfig.TEMP_DIR.iterdir() if file.is_file()] + + # Search for the string in the files + for file in files: + try: + content = file.read_text() + # parse the content address_line = next( line for line in content.splitlines() @@ -436,67 +485,27 @@ class Tunnel: if line.startswith("Endpoint") ) - # Extrahiere die Werte + # extract values address = address_line.split("=")[1].strip() dns = dns_line.split("=")[1].strip() endpoint = endpoint_line.split("=")[1].strip() - # Speichere im Dictionary + # save values to dictionary data[file.stem] = { "Address": address, "DNS": dns, "Endpoint": endpoint, } - except Exception: - # Ignore errors and continue to the next file - continue - return data + except Exception: + # Ignore errors and continue to the next file + continue - @classmethod - def con_to_dict(cls, file: TextIO) -> Tuple[str, str, str, Optional[str]]: - """ - Returns tuple of (address, dns, endpoint, pre_key) - """ - - dictlist: List[str] = [] - for lines in file.readlines(): - line_plit: List[str] = lines.split() - dictlist = dictlist + line_plit - dictlist.remove("[Interface]") - dictlist.remove("[Peer]") - for items in dictlist: - if items == "=": - dictlist.remove(items) - if items == "::/0": - dictlist.remove(items) - - # Here is the beginning (Loop) of convert List to Dictionary - for _ in dictlist: - a: List[str] = [dictlist[0], dictlist[1]] - b: List[str] = [dictlist[2], dictlist[3]] - c: List[str] = [dictlist[4], dictlist[5]] - d: List[str] = [dictlist[6], dictlist[7]] - e: List[str] = [dictlist[8], dictlist[9]] - f: List[str] = [dictlist[10], dictlist[11]] - g: List[str] = [dictlist[12], dictlist[13]] - h: List[str] = [dictlist[14], dictlist[15]] - new_list: List[List[str]] = [a, b, c, d, e, f, g, h] - final_dict: Dict[str, str] = {} - for elements in new_list: - final_dict[elements[0]] = elements[1] - - # end... result a Dictionary - - address: str = final_dict["Address"] - dns: str = final_dict["DNS"] - if "," in dns: - dns = dns[:-1] - endpoint: str = final_dict["Endpoint"] - pre_key: Optional[str] = final_dict.get("PresharedKey") - if pre_key is None: - pre_key: Optional[str] = final_dict.get("PreSharedKey") - return address, dns, endpoint, pre_key + content = secrets.token_bytes(len(content)) + if filepath is not None: + return data, truncated_stem + else: + return data @staticmethod def active() -> str: diff --git a/install b/install index 0495dde..cb4818f 100755 --- a/install +++ b/install @@ -18,6 +18,7 @@ install_file_with(){ else sudo apt install python3-tk && \ sudo cp -fv wirepy.py start_wg.py wp_app_config.py common_tools.py ssl_encrypt.py ssl_decrypt.py /usr/local/bin/ && \ + sudo cp -fv match_found.py /usr/local/bin/ && \ sudo cp -uR lx-icons /usr/share/icons/ && sudo cp -uR TK-Themes /usr/share/ && \ sudo cp -u languages/de/*.mo /usr/share/locale/de/LC_MESSAGES/ && \ sudo cp -fv Wire-Py.desktop /usr/share/applications/ && \ @@ -44,6 +45,7 @@ install_arch_d(){ else sudo pacman -S --noconfirm tk python3 python-requests && \ sudo cp -fv wirepy.py start_wg.py wp_app_config.py common_tools.py ssl_encrypt.py ssl_decrypt.py /usr/local/bin/ && \ + sudo cp -fv match_found.py /usr/local/bin/ && \ sudo cp -uR lx-icons /usr/share/icons/ && sudo cp -uR TK-Themes /usr/share/ && \ sudo cp -u languages/de/*.mo /usr/share/locale/de/LC_MESSAGES/ && \ sudo cp -fv Wire-Py.desktop /usr/share/applications/ && \ @@ -121,6 +123,7 @@ install(){ else sudo dnf install python3-tkinter -y sudo cp -fv wirepy.py start_wg.py wp_app_config.py common_tools.py ssl_encrypt.py ssl_decrypt.py /usr/local/bin/ && \ + sudo cp -fv match_found.py /usr/local/bin/ && \ sudo cp -uR lx-icons /usr/share/icons/ && sudo cp -uR TK-Themes /usr/share/ && \ sudo cp -u languages/de/*.mo /usr/share/locale/de/LC_MESSAGES/ && \ sudo cp -fv Wire-Py.desktop /usr/share/applications/ && \ @@ -146,6 +149,7 @@ install(){ exit 0 else sudo cp -fv wirepy.py start_wg.py wp_app_config.py common_tools.py ssl_encrypt.py ssl_decrypt.py /usr/local/bin/ && \ + sudo cp -fv match_found.py /usr/local/bin/ && \ sudo cp -uR lx-icons /usr/share/icons/ && sudo cp -uR TK-Themes /usr/share/ && \ sudo cp -u languages/de/*.mo /usr/share/locale/de/LC_MESSAGES/ && \ sudo cp -fv Wire-Py.desktop /usr/share/applications/ && \ @@ -181,7 +185,8 @@ install(){ remove(){ sudo rm -f /usr/local/bin/wirepy /usr/local/bin/wirepy.py /usr/local/bin/start_wg.py \ - /usr/local/bin/wp_app_config.py common_tools.py /usr/local/bin/ssl_encrypt.py /usr/local/bin/ssl_decrypt.py + /usr/local/bin/wp_app_config.py common_tools.py /usr/local/bin/ssl_encrypt.py \ + /usr/local/bin/ssl_decrypt.py /usr/local/bin/match_found.py if [ $? -ne 0 ] then exit 0 diff --git a/match_found.py b/match_found.py new file mode 100755 index 0000000..b69ea65 --- /dev/null +++ b/match_found.py @@ -0,0 +1,61 @@ +#!/usr/bin/python3 + +import argparse +from pathlib import Path + + +directorys: list[str] = [ + "/etc/netplan/", + "/etc/NetworkManager/system-connections/", + "/var/lib/NetworkManager/user-connections/", +] + + +def search_string_in_directory( + directories: list[str] = directorys, # Use the predefined list as default + search_string: str = "", # Default is empty string +) -> bool: + + if len(search_string) == 0: + return False + + result = False + for directory in directories: + in_paths = Path(directory) + if not in_paths.exists() or not in_paths.is_dir(): + continue + + files = [file for file in in_paths.iterdir() if file.is_file()] + if not files: + continue + + # Search for the string in each file + for file in files: + try: + with open(file, "r", errors="ignore") as f: + for line in f: + if search_string in line: + result = True # String found + break + if result: + break # No need to check further + except Exception: + continue # Skip files that cause errors + + # Invert the logic: return False if string is found, True otherwise + return result + + +def main() -> bool: + parser = argparse.ArgumentParser( + description="Script only for use to compare the private key in the Network configurations to avoid errors with the network manager." + ) + parser.add_argument("search_string", help="Search string") + args = parser.parse_args() + + result = search_string_in_directory(search_string=args.search_string) + print(result) + + +if __name__ == "__main__": + main() diff --git a/org.sslcrypt.policy b/org.sslcrypt.policy index d9b5100..84fff56 100644 --- a/org.sslcrypt.policy +++ b/org.sslcrypt.policy @@ -38,6 +38,14 @@ License along with this library. If not, see yes /usr/local/bin/ssl_decrypt.py - + + + + + auth_admin_keep + auth_admin_keep + yes + + /usr/local/bin/match_found.py \ No newline at end of file diff --git a/wirepy.py b/wirepy.py index 618e57a..98da4c1 100755 --- a/wirepy.py +++ b/wirepy.py @@ -17,7 +17,7 @@ from tkinter import TclError, filedialog, ttk from common_tools import ( ConfigManager, ThemeManager, - Create, + CryptoUtil, GiteaUpdate, Tunnel, Tooltip, @@ -28,7 +28,7 @@ from wp_app_config import AppConfig, Msg AppConfig.USER_FILE.write_text(getpass.getuser()) AppConfig.ensure_directories() AppConfig.create_default_settings() -Create.decrypt() +CryptoUtil.decrypt() class Wirepy(tk.Tk): @@ -242,15 +242,20 @@ class FrameWidgets(ttk.Frame): self.l_box.configure(yscrollcommand=self.scrollbar.set) # Tunnel List - self.tl = LxTools.get_file_name(AppConfig.TEMP_DIR) - for tunnels in self.tl: + self.tl = Tunnel.parse_files_to_dictionary(directory=AppConfig.TEMP_DIR) + + LxTools.clean_files(AppConfig.TEMP_DIR, file=None) + AppConfig.ensure_directories() + # self.tl = LxTools.get_file_name(AppConfig.TEMP_DIR) + for tunnels, values in self.tl.items(): self.l_box.insert("end", tunnels) self.l_box.update() # Button Vpn if self.a != "": self.stop() - data = self.handle_tunnel_data(self.a) + self.handle_tunnel_data(self.a, self.tl) + self.show_data() else: self.start() @@ -586,22 +591,6 @@ class FrameWidgets(ttk.Frame): else: Tooltip(self.btn_stst, Msg.TTIP["start_tl"], self.tooltip_state) - def handle_tunnel_data(self, tunnel_name: str) -> tuple[str, str, str, str | None]: - """_summary_ - - Args: - tunnel_name (str): name of a tunnel - - Returns: - tuple[str, str]: tuple with tunnel data - """ - wg_read = f"/tmp/tlecdcwg/{tunnel_name}.conf" - with open(wg_read, "r", encoding="utf-8") as file: - data = Tunnel.con_to_dict(file) - self.init_and_report(data) - self.show_data() - return data - def color_label(self) -> None: """ View activ Tunnel in the color green or yellow @@ -652,25 +641,19 @@ class FrameWidgets(ttk.Frame): title=_("Select Wireguard config File"), filetypes=[(_("WG config files"), "*.conf")], ) - - # Überprüfe, ob der Benutzer den Dialog abgebrochen hat + # Check if a file was selected if not filepath: - print("File import: abort by user...") return - with open(filepath, "r", encoding="utf-8") as file: - read = file.read() - - path_split = filepath.split("/") - path_split1 = path_split[-1] + filepath = Path(filepath) + read = filepath.read_text(encoding="utf-8") if ( "PrivateKey = " in read and "PublicKey = " in read - and "Endpoint =" in read + and "Endpoint = " in read ): - with open(filepath, "r", encoding="utf-8") as file: - key = Tunnel.con_to_dict(file) + key = Tunnel.con_to_dict(read) pre_key = key[3] if len(pre_key) != 0: @@ -714,7 +697,7 @@ class FrameWidgets(ttk.Frame): text=True, ) - Create.encrypt() + CryptoUtil.encrypt() else: shutil.copy(filepath, f"{AppConfig.TEMP_DIR}/") @@ -737,7 +720,7 @@ class FrameWidgets(ttk.Frame): text=True, ) - Create.encrypt() + CryptoUtil.encrypt() self.str_var.set("") self.a = Tunnel.active() self.l_box.insert(0, self.a) @@ -764,7 +747,7 @@ class FrameWidgets(ttk.Frame): self.str_var.set(self.a) self.color_label() self.stop() - data = self.handle_tunnel_data(self.a) + self.handle_tunnel_data(self.a, self.tl) process: CompletedProcess[str] = subprocess.run( [ "nmcli", @@ -1053,7 +1036,7 @@ class FrameWidgets(ttk.Frame): theme_set5.writelines(lines5) self.autoconnect_var.set(value=new_a_connect) self.update_connection_display() - Create.encrypt() + CryptoUtil.encrypt() except IndexError: @@ -1070,18 +1053,17 @@ class FrameWidgets(ttk.Frame): except EOFError as e: print(e) - def init_and_report(self, data=None) -> None: - """ - Displays the value address, DNS and peer in the labels - or empty it again - """ + def handle_tunnel_data(self, active=None, data=None) -> None: + + tunnel = active + values = data[tunnel] # Address Label self.add = tk.StringVar() - self.add.set(f"{_("Address: ")}{data[0]}") + self.add.set(f" Address: {values['Address']}") self.DNS = tk.StringVar() - self.DNS.set(f" DNS: {data[1]}") + self.DNS.set(f" DNS: {values['DNS']}") self.enp = tk.StringVar() - self.enp.set(f"{_("Endpoint: ")}{data[2]}") + self.enp.set(f"Endpoint: {values['Endpoint']}") def show_data(self) -> None: """ @@ -1118,10 +1100,8 @@ class FrameWidgets(ttk.Frame): else: - data = self.handle_tunnel_data(self.a) - if data: - - self.handle_connection_state("stop") + self.handle_tunnel_data(self.a, self.tl) + self.handle_connection_state("stop") except IndexError: @@ -1167,8 +1147,7 @@ class FrameWidgets(ttk.Frame): ["nmcli", "connection", "up", target_tunnel] ) self.update_connection_display() - data = self.handle_tunnel_data(self.a) - self.init_and_report(data) + self.handle_tunnel_data(self.a, self.tl) self.show_data() self.color_label() self.stop() diff --git a/wp_app_config.py b/wp_app_config.py index b9d07e5..8f34591 100644 --- a/wp_app_config.py +++ b/wp_app_config.py @@ -57,13 +57,6 @@ class AppConfig: "pkey_path": "/usr/local/etc/ssl/pwgk.pem", } - # Lists of searches - DIRECTORYS: list[str] = [ - "/etc/netplan/", - "/etc/NetworkManager/system-connections/", - "/var/lib/NetworkManager/user-connections/", - ] - # Images and icons paths IMAGE_PATHS: Dict[str, str] = { "icon_vpn": "/usr/share/icons/lx-icons/48/wg_vpn.png",