diff --git a/common_tools.py b/common_tools.py
index ee823cb..e6583f5 100755
--- a/common_tools.py
+++ b/common_tools.py
@@ -3,14 +3,17 @@
import os
import shutil
import signal
+import base64
+import secrets
import subprocess
+from subprocess import CompletedProcess, run
+import re
import sys
import tkinter as tk
from typing import Optional, Dict, Any, NoReturn, TextIO, Tuple, List
import zipfile
from datetime import datetime
from pathlib import Path
-from subprocess import CompletedProcess
from tkinter import ttk, Toplevel
from wp_app_config import AppConfig, Msg
import requests
@@ -19,7 +22,7 @@ import requests
_ = AppConfig.setup_translations()
-class Create:
+class CryptoUtil:
"""
This class is for the creation of the folders and files
required by Wire-Py, as well as for decryption
@@ -72,6 +75,52 @@ class Create:
else:
print(f"Error process encrypt: Code {process.returncode}")
+ @staticmethod
+ def find_key(key: str = "") -> bool:
+ """
+ Checks if the private key already exists in the system using an external script.
+ Returns True only if the full key is found exactly (no partial match).
+ """
+ process: CompletedProcess[bool] = run(
+ ["pkexec", "/usr/local/bin/match_found.py", key],
+ capture_output=True,
+ text=True,
+ check=False,
+ )
+ if "True" in process.stdout:
+ return True
+ elif "False" in process.stdout:
+ return False
+ print(
+ f"Unexpected output from the external script:\nSTDOUT: {process.stdout}\nSTDERR: {process.stderr}"
+ )
+ return False
+
+ @staticmethod
+ def is_valid_base64(key: str) -> bool:
+ """
+ Validates if the input is a valid Base64 string (WireGuard private key format).
+ Returns True only for non-empty strings that match the expected length.
+ """
+ # Check for empty string
+ if not key or key.strip() == "":
+ return False
+
+ # Regex pattern to validate Base64: [A-Za-z0-9+/]+={0,2}
+ base64_pattern = r"^[A-Za-z0-9+/]+={0,2}$"
+ if not re.match(base64_pattern, key):
+ return False
+
+ try:
+ # Decode and check length (WireGuard private keys are 32 bytes long)
+ decoded = base64.b64decode(key)
+ if len(decoded) != 32: # 32 bytes = 256 bits
+ return False
+ except Exception:
+ return False
+
+ return True
+
class LxTools(tk.Tk):
"""
@@ -81,57 +130,6 @@ class LxTools(tk.Tk):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
- @staticmethod
- def ckeck_key_is_exist(
- directorys: list[str], search_string: str = None
- ) -> bool | None:
- """
- Check if the key is exist in the file
- Args:
- directorys (list[str]): list of directories to search in
- search_string (str): string to search for
- Returns:
- bool: True if the key is found, False otherwise
- """
-
- if search_string:
- result = False
-
- for directory in directorys:
-
- in_paths = Path(directory)
-
- if not in_paths.exists() or not in_paths.is_dir():
- continue
-
- # Get a list of all files in the directorys
- files = [file for file in in_paths.iterdir() if file.is_file()]
- if not files:
- continue
-
- # Search for the string in the files
- for file in files:
- try:
- with open(file, "r", errors="ignore") as f:
- for line in f:
- if search_string in line:
- # Set the result to True if the string is found
- result = True
- break
-
- # If the string is found, stop searching for the string in other files
- if result:
- break
-
- except Exception:
- # Ignore errors and continue to the next file
- continue
-
- else:
- result = None
- print(result)
- return result
-
@staticmethod
def center_window_cross_platform(window, width, height):
"""
@@ -405,23 +403,74 @@ class Tunnel:
"""
@staticmethod
- def parse_files_to_dictionary() -> Dict[str, List[str]]:
+ def parse_files_to_dictionary(
+ directory: Path = None, filepath: str = None
+ ) -> dict | str | None:
data = {}
- if not AppConfig.TEMP_DIR.exists() or not AppConfig.TEMP_DIR.is_dir():
- pass
-
- # Get a list of all files in the directorys
- files = [file for file in AppConfig.TEMP_DIR.iterdir() if file.is_file()]
- if not files:
- pass
-
- # Search for the string in the files
- for file in files:
+ if filepath is not None:
+ filepath = Path(filepath)
try:
- with open(file, "r") as f:
- content = f.read()
- # Hier parsen wir die relevanten Zeilen aus dem Inhalt
+ content = filepath.read_text()
+
+ # parse the content
+ address_line = next(
+ line for line in content.splitlines() if line.startswith("Address")
+ )
+ dns_line = next(
+ line for line in content.splitlines() if line.startswith("DNS")
+ )
+ endpoint_line = next(
+ line for line in content.splitlines() if line.startswith("Endpoint")
+ )
+ private_key_line = next(
+ line
+ for line in content.splitlines()
+ if line.startswith("PrivateKey")
+ )
+
+ content = secrets.token_bytes(len(content))
+
+ # extract the values
+ address = address_line.split("=")[1].strip()
+ dns = dns_line.split("=")[1].strip()
+ endpoint = endpoint_line.split("=")[1].strip()
+ private_key = private_key_line.split("=")[1].strip()
+
+ # Shorten the tunnel name to the maximum allowed length if it exceeds 12 characters.
+ original_stem = filepath.stem
+ truncated_stem = (
+ original_stem[-12:] if len(original_stem) > 12 else original_stem
+ )
+
+ # save in the dictionary
+ data[truncated_stem] = {
+ "Address": address,
+ "DNS": dns,
+ "Endpoint": endpoint,
+ "PrivateKey": private_key,
+ }
+
+ content = secrets.token_bytes(len(content))
+
+ except StopIteration as e:
+ print(f"Error: {e}")
+ pass
+
+ elif directory is not None:
+
+ if not directory.exists() or not directory.is_dir():
+ print("Temp directory does not exist or is not a directory.")
+ return None
+
+ # Get a list of all files in the directory
+ files = [file for file in AppConfig.TEMP_DIR.iterdir() if file.is_file()]
+
+ # Search for the string in the files
+ for file in files:
+ try:
+ content = file.read_text()
+ # parse the content
address_line = next(
line
for line in content.splitlines()
@@ -436,67 +485,27 @@ class Tunnel:
if line.startswith("Endpoint")
)
- # Extrahiere die Werte
+ # extract values
address = address_line.split("=")[1].strip()
dns = dns_line.split("=")[1].strip()
endpoint = endpoint_line.split("=")[1].strip()
- # Speichere im Dictionary
+ # save values to dictionary
data[file.stem] = {
"Address": address,
"DNS": dns,
"Endpoint": endpoint,
}
- except Exception:
- # Ignore errors and continue to the next file
- continue
- return data
+ except Exception:
+ # Ignore errors and continue to the next file
+ continue
- @classmethod
- def con_to_dict(cls, file: TextIO) -> Tuple[str, str, str, Optional[str]]:
- """
- Returns tuple of (address, dns, endpoint, pre_key)
- """
-
- dictlist: List[str] = []
- for lines in file.readlines():
- line_plit: List[str] = lines.split()
- dictlist = dictlist + line_plit
- dictlist.remove("[Interface]")
- dictlist.remove("[Peer]")
- for items in dictlist:
- if items == "=":
- dictlist.remove(items)
- if items == "::/0":
- dictlist.remove(items)
-
- # Here is the beginning (Loop) of convert List to Dictionary
- for _ in dictlist:
- a: List[str] = [dictlist[0], dictlist[1]]
- b: List[str] = [dictlist[2], dictlist[3]]
- c: List[str] = [dictlist[4], dictlist[5]]
- d: List[str] = [dictlist[6], dictlist[7]]
- e: List[str] = [dictlist[8], dictlist[9]]
- f: List[str] = [dictlist[10], dictlist[11]]
- g: List[str] = [dictlist[12], dictlist[13]]
- h: List[str] = [dictlist[14], dictlist[15]]
- new_list: List[List[str]] = [a, b, c, d, e, f, g, h]
- final_dict: Dict[str, str] = {}
- for elements in new_list:
- final_dict[elements[0]] = elements[1]
-
- # end... result a Dictionary
-
- address: str = final_dict["Address"]
- dns: str = final_dict["DNS"]
- if "," in dns:
- dns = dns[:-1]
- endpoint: str = final_dict["Endpoint"]
- pre_key: Optional[str] = final_dict.get("PresharedKey")
- if pre_key is None:
- pre_key: Optional[str] = final_dict.get("PreSharedKey")
- return address, dns, endpoint, pre_key
+ content = secrets.token_bytes(len(content))
+ if filepath is not None:
+ return data, truncated_stem
+ else:
+ return data
@staticmethod
def active() -> str:
diff --git a/install b/install
index 0495dde..cb4818f 100755
--- a/install
+++ b/install
@@ -18,6 +18,7 @@ install_file_with(){
else
sudo apt install python3-tk && \
sudo cp -fv wirepy.py start_wg.py wp_app_config.py common_tools.py ssl_encrypt.py ssl_decrypt.py /usr/local/bin/ && \
+ sudo cp -fv match_found.py /usr/local/bin/ && \
sudo cp -uR lx-icons /usr/share/icons/ && sudo cp -uR TK-Themes /usr/share/ && \
sudo cp -u languages/de/*.mo /usr/share/locale/de/LC_MESSAGES/ && \
sudo cp -fv Wire-Py.desktop /usr/share/applications/ && \
@@ -44,6 +45,7 @@ install_arch_d(){
else
sudo pacman -S --noconfirm tk python3 python-requests && \
sudo cp -fv wirepy.py start_wg.py wp_app_config.py common_tools.py ssl_encrypt.py ssl_decrypt.py /usr/local/bin/ && \
+ sudo cp -fv match_found.py /usr/local/bin/ && \
sudo cp -uR lx-icons /usr/share/icons/ && sudo cp -uR TK-Themes /usr/share/ && \
sudo cp -u languages/de/*.mo /usr/share/locale/de/LC_MESSAGES/ && \
sudo cp -fv Wire-Py.desktop /usr/share/applications/ && \
@@ -121,6 +123,7 @@ install(){
else
sudo dnf install python3-tkinter -y
sudo cp -fv wirepy.py start_wg.py wp_app_config.py common_tools.py ssl_encrypt.py ssl_decrypt.py /usr/local/bin/ && \
+ sudo cp -fv match_found.py /usr/local/bin/ && \
sudo cp -uR lx-icons /usr/share/icons/ && sudo cp -uR TK-Themes /usr/share/ && \
sudo cp -u languages/de/*.mo /usr/share/locale/de/LC_MESSAGES/ && \
sudo cp -fv Wire-Py.desktop /usr/share/applications/ && \
@@ -146,6 +149,7 @@ install(){
exit 0
else
sudo cp -fv wirepy.py start_wg.py wp_app_config.py common_tools.py ssl_encrypt.py ssl_decrypt.py /usr/local/bin/ && \
+ sudo cp -fv match_found.py /usr/local/bin/ && \
sudo cp -uR lx-icons /usr/share/icons/ && sudo cp -uR TK-Themes /usr/share/ && \
sudo cp -u languages/de/*.mo /usr/share/locale/de/LC_MESSAGES/ && \
sudo cp -fv Wire-Py.desktop /usr/share/applications/ && \
@@ -181,7 +185,8 @@ install(){
remove(){
sudo rm -f /usr/local/bin/wirepy /usr/local/bin/wirepy.py /usr/local/bin/start_wg.py \
- /usr/local/bin/wp_app_config.py common_tools.py /usr/local/bin/ssl_encrypt.py /usr/local/bin/ssl_decrypt.py
+ /usr/local/bin/wp_app_config.py common_tools.py /usr/local/bin/ssl_encrypt.py \
+ /usr/local/bin/ssl_decrypt.py /usr/local/bin/match_found.py
if [ $? -ne 0 ]
then
exit 0
diff --git a/match_found.py b/match_found.py
new file mode 100755
index 0000000..b69ea65
--- /dev/null
+++ b/match_found.py
@@ -0,0 +1,61 @@
+#!/usr/bin/python3
+
+import argparse
+from pathlib import Path
+
+
+directorys: list[str] = [
+ "/etc/netplan/",
+ "/etc/NetworkManager/system-connections/",
+ "/var/lib/NetworkManager/user-connections/",
+]
+
+
+def search_string_in_directory(
+ directories: list[str] = directorys, # Use the predefined list as default
+ search_string: str = "", # Default is empty string
+) -> bool:
+
+ if len(search_string) == 0:
+ return False
+
+ result = False
+ for directory in directories:
+ in_paths = Path(directory)
+ if not in_paths.exists() or not in_paths.is_dir():
+ continue
+
+ files = [file for file in in_paths.iterdir() if file.is_file()]
+ if not files:
+ continue
+
+ # Search for the string in each file
+ for file in files:
+ try:
+ with open(file, "r", errors="ignore") as f:
+ for line in f:
+ if search_string in line:
+ result = True # String found
+ break
+ if result:
+ break # No need to check further
+ except Exception:
+ continue # Skip files that cause errors
+
+ # Invert the logic: return False if string is found, True otherwise
+ return result
+
+
+def main() -> bool:
+ parser = argparse.ArgumentParser(
+ description="Script only for use to compare the private key in the Network configurations to avoid errors with the network manager."
+ )
+ parser.add_argument("search_string", help="Search string")
+ args = parser.parse_args()
+
+ result = search_string_in_directory(search_string=args.search_string)
+ print(result)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/org.sslcrypt.policy b/org.sslcrypt.policy
index d9b5100..84fff56 100644
--- a/org.sslcrypt.policy
+++ b/org.sslcrypt.policy
@@ -38,6 +38,14 @@ License along with this library. If not, see
yes
/usr/local/bin/ssl_decrypt.py
-
+
+
+
+
+ auth_admin_keep
+ auth_admin_keep
+ yes
+
+ /usr/local/bin/match_found.py
\ No newline at end of file
diff --git a/wirepy.py b/wirepy.py
index 618e57a..98da4c1 100755
--- a/wirepy.py
+++ b/wirepy.py
@@ -17,7 +17,7 @@ from tkinter import TclError, filedialog, ttk
from common_tools import (
ConfigManager,
ThemeManager,
- Create,
+ CryptoUtil,
GiteaUpdate,
Tunnel,
Tooltip,
@@ -28,7 +28,7 @@ from wp_app_config import AppConfig, Msg
AppConfig.USER_FILE.write_text(getpass.getuser())
AppConfig.ensure_directories()
AppConfig.create_default_settings()
-Create.decrypt()
+CryptoUtil.decrypt()
class Wirepy(tk.Tk):
@@ -242,15 +242,20 @@ class FrameWidgets(ttk.Frame):
self.l_box.configure(yscrollcommand=self.scrollbar.set)
# Tunnel List
- self.tl = LxTools.get_file_name(AppConfig.TEMP_DIR)
- for tunnels in self.tl:
+ self.tl = Tunnel.parse_files_to_dictionary(directory=AppConfig.TEMP_DIR)
+
+ LxTools.clean_files(AppConfig.TEMP_DIR, file=None)
+ AppConfig.ensure_directories()
+ # self.tl = LxTools.get_file_name(AppConfig.TEMP_DIR)
+ for tunnels, values in self.tl.items():
self.l_box.insert("end", tunnels)
self.l_box.update()
# Button Vpn
if self.a != "":
self.stop()
- data = self.handle_tunnel_data(self.a)
+ self.handle_tunnel_data(self.a, self.tl)
+ self.show_data()
else:
self.start()
@@ -586,22 +591,6 @@ class FrameWidgets(ttk.Frame):
else:
Tooltip(self.btn_stst, Msg.TTIP["start_tl"], self.tooltip_state)
- def handle_tunnel_data(self, tunnel_name: str) -> tuple[str, str, str, str | None]:
- """_summary_
-
- Args:
- tunnel_name (str): name of a tunnel
-
- Returns:
- tuple[str, str]: tuple with tunnel data
- """
- wg_read = f"/tmp/tlecdcwg/{tunnel_name}.conf"
- with open(wg_read, "r", encoding="utf-8") as file:
- data = Tunnel.con_to_dict(file)
- self.init_and_report(data)
- self.show_data()
- return data
-
def color_label(self) -> None:
"""
View activ Tunnel in the color green or yellow
@@ -652,25 +641,19 @@ class FrameWidgets(ttk.Frame):
title=_("Select Wireguard config File"),
filetypes=[(_("WG config files"), "*.conf")],
)
-
- # Überprüfe, ob der Benutzer den Dialog abgebrochen hat
+ # Check if a file was selected
if not filepath:
- print("File import: abort by user...")
return
- with open(filepath, "r", encoding="utf-8") as file:
- read = file.read()
-
- path_split = filepath.split("/")
- path_split1 = path_split[-1]
+ filepath = Path(filepath)
+ read = filepath.read_text(encoding="utf-8")
if (
"PrivateKey = " in read
and "PublicKey = " in read
- and "Endpoint =" in read
+ and "Endpoint = " in read
):
- with open(filepath, "r", encoding="utf-8") as file:
- key = Tunnel.con_to_dict(file)
+ key = Tunnel.con_to_dict(read)
pre_key = key[3]
if len(pre_key) != 0:
@@ -714,7 +697,7 @@ class FrameWidgets(ttk.Frame):
text=True,
)
- Create.encrypt()
+ CryptoUtil.encrypt()
else:
shutil.copy(filepath, f"{AppConfig.TEMP_DIR}/")
@@ -737,7 +720,7 @@ class FrameWidgets(ttk.Frame):
text=True,
)
- Create.encrypt()
+ CryptoUtil.encrypt()
self.str_var.set("")
self.a = Tunnel.active()
self.l_box.insert(0, self.a)
@@ -764,7 +747,7 @@ class FrameWidgets(ttk.Frame):
self.str_var.set(self.a)
self.color_label()
self.stop()
- data = self.handle_tunnel_data(self.a)
+ self.handle_tunnel_data(self.a, self.tl)
process: CompletedProcess[str] = subprocess.run(
[
"nmcli",
@@ -1053,7 +1036,7 @@ class FrameWidgets(ttk.Frame):
theme_set5.writelines(lines5)
self.autoconnect_var.set(value=new_a_connect)
self.update_connection_display()
- Create.encrypt()
+ CryptoUtil.encrypt()
except IndexError:
@@ -1070,18 +1053,17 @@ class FrameWidgets(ttk.Frame):
except EOFError as e:
print(e)
- def init_and_report(self, data=None) -> None:
- """
- Displays the value address, DNS and peer in the labels
- or empty it again
- """
+ def handle_tunnel_data(self, active=None, data=None) -> None:
+
+ tunnel = active
+ values = data[tunnel]
# Address Label
self.add = tk.StringVar()
- self.add.set(f"{_("Address: ")}{data[0]}")
+ self.add.set(f" Address: {values['Address']}")
self.DNS = tk.StringVar()
- self.DNS.set(f" DNS: {data[1]}")
+ self.DNS.set(f" DNS: {values['DNS']}")
self.enp = tk.StringVar()
- self.enp.set(f"{_("Endpoint: ")}{data[2]}")
+ self.enp.set(f"Endpoint: {values['Endpoint']}")
def show_data(self) -> None:
"""
@@ -1118,10 +1100,8 @@ class FrameWidgets(ttk.Frame):
else:
- data = self.handle_tunnel_data(self.a)
- if data:
-
- self.handle_connection_state("stop")
+ self.handle_tunnel_data(self.a, self.tl)
+ self.handle_connection_state("stop")
except IndexError:
@@ -1167,8 +1147,7 @@ class FrameWidgets(ttk.Frame):
["nmcli", "connection", "up", target_tunnel]
)
self.update_connection_display()
- data = self.handle_tunnel_data(self.a)
- self.init_and_report(data)
+ self.handle_tunnel_data(self.a, self.tl)
self.show_data()
self.color_label()
self.stop()
diff --git a/wp_app_config.py b/wp_app_config.py
index b9d07e5..8f34591 100644
--- a/wp_app_config.py
+++ b/wp_app_config.py
@@ -57,13 +57,6 @@ class AppConfig:
"pkey_path": "/usr/local/etc/ssl/pwgk.pem",
}
- # Lists of searches
- DIRECTORYS: list[str] = [
- "/etc/netplan/",
- "/etc/NetworkManager/system-connections/",
- "/var/lib/NetworkManager/user-connections/",
- ]
-
# Images and icons paths
IMAGE_PATHS: Dict[str, str] = {
"icon_vpn": "/usr/share/icons/lx-icons/48/wg_vpn.png",