8 Commits

12 changed files with 620 additions and 502 deletions

1
.gitignore vendored
View File

@ -1,5 +1,6 @@
debug.log
.venv
.venv.bak
.idea
.vscode
__pycache__

View File

@ -3,14 +3,17 @@
import os
import shutil
import signal
import base64
import secrets
import subprocess
from subprocess import CompletedProcess, run
import re
import sys
import tkinter as tk
from typing import Optional, Dict, Any, NoReturn, TextIO, Tuple, List
from typing import Optional, Dict, Any, NoReturn, List
import zipfile
from datetime import datetime
from pathlib import Path
from subprocess import check_call, CompletedProcess
from tkinter import ttk, Toplevel
from wp_app_config import AppConfig, Msg
import requests
@ -19,7 +22,7 @@ import requests
_ = AppConfig.setup_translations()
class Create:
class CryptoUtil:
"""
This class is for the creation of the folders and files
required by Wire-Py, as well as for decryption
@ -27,100 +30,95 @@ class Create:
"""
@staticmethod
def dir_and_files() -> None:
"""
check and create folders and files if not present
"""
pth: Path = Path.home() / ".config/wire_py"
pth.mkdir(parents=True, exist_ok=True)
sett: Path = Path.home() / ".config/wire_py/settings"
AppConfig.KEYS_FILE
if sett.exists():
pass
else:
sett.touch()
sett.write_text(
"[UPDATES]\non\n[THEME]\nlight\n[TOOLTIP]\nTrue\n[AUTOSTART ON]\noff\n"
)
if AppConfig.KEYS_FILE.exists():
pass
else:
AppConfig.KEYS_FILE.touch()
@staticmethod
def files_for_autostart() -> None:
"""
check and create a file for auto start if not present and enable the service
"""
pth2: Path = Path.home() / ".config/systemd/user"
pth2.mkdir(parents=True, exist_ok=True)
wg_ser: Path = Path.home() / ".config/systemd/user/wg_start.service"
if wg_ser.exists():
pass
else:
wg_ser.touch()
wg_ser.write_text(
"[Unit]\nDescription=Automatic Tunnel Start\nAfter=network-online.target\n\n[Service]\n"
"Type=oneshot\nExecStartPre=/bin/sleep 5\nExecStart=/usr/local/bin/start_wg.py\n[Install]"
"\nWantedBy=default.target"
)
check_call(["systemctl", "--user", "enable", "wg_start.service"])
@staticmethod
def make_dir() -> None:
"""Folder Name "tlecdewg" = Tunnel Encrypt Decrypt Wireguard"""
if AppConfig.TEMP_DIR.exists():
pass
else:
AppConfig.TEMP_DIR.mkdir()
@staticmethod
def decrypt() -> None:
def decrypt() -> str:
"""
Starts SSL dencrypt
"""
crypted_tunnel = [str(file) for file in AppConfig.CONFIG_DIR.glob("*.dat")]
if crypted_tunnel == []:
return
process: CompletedProcess[str] = subprocess.run(
["pkexec", "/usr/local/bin/ssl_decrypt.py"],
stdout=subprocess.PIPE,
capture_output=True,
text=True,
check=True,
check=False,
)
path: Path = Path.home() / ".config/wire_py/"
file_in_path: list[Path] = list(path.rglob("*.dat"))
if file_in_path:
if process.returncode == 0:
print("File successfully decrypted...")
else:
print(f"Error with the following code... {process.returncode}")
# Output from Openssl Error
if process.stderr:
print(process.stderr)
if process.returncode == 0:
print("Files successfully decrypted...")
else:
print(_("Ready for import"))
print(f"Error process decrypt: Code {process.returncode}")
@staticmethod
def encrypt() -> None:
def encrypt() -> str:
"""
Starts SSL encryption
"""
process: CompletedProcess[str] = subprocess.run(
["pkexec", "/usr/local/bin/ssl_encrypt.py"],
stdout=subprocess.PIPE,
capture_output=True,
text=True,
check=True,
check=False,
)
print(process.stdout)
# Output from Openssl Error
if process.stderr:
print(process.stderr)
if process.returncode == 0:
print("All Files successfully encrypted...")
print("Files successfully encrypted...")
else:
print(f"Error with the following code... {process.returncode}")
print(f"Error process encrypt: Code {process.returncode}")
@staticmethod
def find_key(key: str = "") -> bool:
"""
Checks if the private key already exists in the system using an external script.
Returns True only if the full key is found exactly (no partial match).
"""
process: CompletedProcess[bool] = run(
["pkexec", "/usr/local/bin/match_found.py", key],
capture_output=True,
text=True,
check=False,
)
if "True" in process.stdout:
return True
elif "False" in process.stdout:
return False
print(
f"Unexpected output from the external script:\nSTDOUT: {process.stdout}\nSTDERR: {process.stderr}"
)
return False
@staticmethod
def is_valid_base64(key: str) -> bool:
"""
Validates if the input is a valid Base64 string (WireGuard private key format).
Returns True only for non-empty strings that match the expected length.
"""
# Check for empty string
if not key or key.strip() == "":
return False
# Regex pattern to validate Base64: [A-Za-z0-9+/]+={0,2}
base64_pattern = r"^[A-Za-z0-9+/]+={0,2}$"
if not re.match(base64_pattern, key):
return False
try:
# Decode and check length (WireGuard private keys are 32 bytes long)
decoded = base64.b64decode(key)
if len(decoded) != 32: # 32 bytes = 256 bits
return False
except Exception:
return False
return True
class LxTools(tk.Tk):
@ -258,10 +256,9 @@ class LxTools(tk.Tk):
check=True,
)
if result.returncode != 0:
exit(1)
else:
print(result.stdout.strip())
return result.stdout.strip()
pass
return result.stdout.strip()
except subprocess.CalledProcessError:
pass
@ -405,23 +402,73 @@ class Tunnel:
"""
@staticmethod
def parse_files_to_dictionary() -> Dict[str, List[str]]:
def parse_files_to_dictionary(
directory: Path = None, filepath: str = None, content: str = None
) -> dict | str | None:
data = {}
if not AppConfig.TEMP_DIR.exists() or not AppConfig.TEMP_DIR.is_dir():
pass
# Get a list of all files in the directorys
files = [file for file in AppConfig.TEMP_DIR.iterdir() if file.is_file()]
if not files:
pass
# Search for the string in the files
for file in files:
if filepath is not None:
filepath = Path(filepath)
try:
with open(file, "r") as f:
content = f.read()
# Hier parsen wir die relevanten Zeilen aus dem Inhalt
content = filepath.read_text()
# parse the content
address_line = next(
line for line in content.splitlines() if line.startswith("Address")
)
dns_line = next(
line for line in content.splitlines() if line.startswith("DNS")
)
endpoint_line = next(
line for line in content.splitlines() if line.startswith("Endpoint")
)
private_key_line = next(
line
for line in content.splitlines()
if line.startswith("PrivateKey")
)
content = secrets.token_bytes(len(content))
# extract the values
address = address_line.split("=")[1].strip()
dns = dns_line.split("=")[1].strip()
endpoint = endpoint_line.split("=")[1].strip()
private_key = private_key_line.split("=")[1].strip()
# Shorten the tunnel name to the maximum allowed length if it exceeds 12 characters.
original_stem = filepath.stem
truncated_stem = (
original_stem[-12:] if len(original_stem) > 12 else original_stem
)
# save in the dictionary
data[truncated_stem] = {
"Address": address,
"DNS": dns,
"Endpoint": endpoint,
"PrivateKey": private_key,
}
content = secrets.token_bytes(len(content))
except StopIteration:
pass
elif directory is not None:
if not directory.exists() or not directory.is_dir():
print("Temp directory does not exist or is not a directory.")
return None
# Get a list of all files in the directory
files = [file for file in AppConfig.TEMP_DIR.iterdir() if file.is_file()]
# Search for the string in the files
for file in files:
try:
content = file.read_text()
# parse the content
address_line = next(
line
for line in content.splitlines()
@ -436,67 +483,27 @@ class Tunnel:
if line.startswith("Endpoint")
)
# Extrahiere die Werte
# extract values
address = address_line.split("=")[1].strip()
dns = dns_line.split("=")[1].strip()
endpoint = endpoint_line.split("=")[1].strip()
# Speichere im Dictionary
# save values to dictionary
data[file.stem] = {
"Address": address,
"DNS": dns,
"Endpoint": endpoint,
}
except Exception:
# Ignore errors and continue to the next file
continue
return data
@classmethod
def con_to_dict(cls, file: TextIO) -> Tuple[str, str, str, Optional[str]]:
"""
Returns tuple of (address, dns, endpoint, pre_key)
"""
dictlist: List[str] = []
for lines in file.readlines():
line_plit: List[str] = lines.split()
dictlist = dictlist + line_plit
dictlist.remove("[Interface]")
dictlist.remove("[Peer]")
for items in dictlist:
if items == "=":
dictlist.remove(items)
if items == "::/0":
dictlist.remove(items)
# Here is the beginning (Loop) of convert List to Dictionary
for _ in dictlist:
a: List[str] = [dictlist[0], dictlist[1]]
b: List[str] = [dictlist[2], dictlist[3]]
c: List[str] = [dictlist[4], dictlist[5]]
d: List[str] = [dictlist[6], dictlist[7]]
e: List[str] = [dictlist[8], dictlist[9]]
f: List[str] = [dictlist[10], dictlist[11]]
g: List[str] = [dictlist[12], dictlist[13]]
h: List[str] = [dictlist[14], dictlist[15]]
new_list: List[List[str]] = [a, b, c, d, e, f, g, h]
final_dict: Dict[str, str] = {}
for elements in new_list:
final_dict[elements[0]] = elements[1]
# end... result a Dictionary
address: str = final_dict["Address"]
dns: str = final_dict["DNS"]
if "," in dns:
dns = dns[:-1]
endpoint: str = final_dict["Endpoint"]
pre_key: Optional[str] = final_dict.get("PresharedKey")
if pre_key is None:
pre_key: Optional[str] = final_dict.get("PreSharedKey")
return address, dns, endpoint, pre_key
except Exception:
# Ignore errors and continue to the next file
continue
if content is not None:
content = secrets.token_bytes(len(content))
if filepath is not None:
return data, truncated_stem
else:
return data
@staticmethod
def active() -> str:

View File

@ -18,6 +18,7 @@ install_file_with(){
else
sudo apt install python3-tk && \
sudo cp -fv wirepy.py start_wg.py wp_app_config.py common_tools.py ssl_encrypt.py ssl_decrypt.py /usr/local/bin/ && \
sudo cp -fv match_found.py /usr/local/bin/ && \
sudo cp -uR lx-icons /usr/share/icons/ && sudo cp -uR TK-Themes /usr/share/ && \
sudo cp -u languages/de/*.mo /usr/share/locale/de/LC_MESSAGES/ && \
sudo cp -fv Wire-Py.desktop /usr/share/applications/ && \
@ -44,6 +45,7 @@ install_arch_d(){
else
sudo pacman -S --noconfirm tk python3 python-requests && \
sudo cp -fv wirepy.py start_wg.py wp_app_config.py common_tools.py ssl_encrypt.py ssl_decrypt.py /usr/local/bin/ && \
sudo cp -fv match_found.py /usr/local/bin/ && \
sudo cp -uR lx-icons /usr/share/icons/ && sudo cp -uR TK-Themes /usr/share/ && \
sudo cp -u languages/de/*.mo /usr/share/locale/de/LC_MESSAGES/ && \
sudo cp -fv Wire-Py.desktop /usr/share/applications/ && \
@ -121,6 +123,7 @@ install(){
else
sudo dnf install python3-tkinter -y
sudo cp -fv wirepy.py start_wg.py wp_app_config.py common_tools.py ssl_encrypt.py ssl_decrypt.py /usr/local/bin/ && \
sudo cp -fv match_found.py /usr/local/bin/ && \
sudo cp -uR lx-icons /usr/share/icons/ && sudo cp -uR TK-Themes /usr/share/ && \
sudo cp -u languages/de/*.mo /usr/share/locale/de/LC_MESSAGES/ && \
sudo cp -fv Wire-Py.desktop /usr/share/applications/ && \
@ -146,6 +149,7 @@ install(){
exit 0
else
sudo cp -fv wirepy.py start_wg.py wp_app_config.py common_tools.py ssl_encrypt.py ssl_decrypt.py /usr/local/bin/ && \
sudo cp -fv match_found.py /usr/local/bin/ && \
sudo cp -uR lx-icons /usr/share/icons/ && sudo cp -uR TK-Themes /usr/share/ && \
sudo cp -u languages/de/*.mo /usr/share/locale/de/LC_MESSAGES/ && \
sudo cp -fv Wire-Py.desktop /usr/share/applications/ && \
@ -181,7 +185,8 @@ install(){
remove(){
sudo rm -f /usr/local/bin/wirepy /usr/local/bin/wirepy.py /usr/local/bin/start_wg.py \
/usr/local/bin/wp_app_config.py common_tools.py /usr/local/bin/ssl_encrypt.py /usr/local/bin/ssl_decrypt.py
/usr/local/bin/wp_app_config.py common_tools.py /usr/local/bin/ssl_encrypt.py \
/usr/local/bin/ssl_decrypt.py /usr/local/bin/match_found.py
if [ $? -ne 0 ]
then
exit 0

61
match_found.py Executable file
View File

@ -0,0 +1,61 @@
#!/usr/bin/python3
import argparse
from pathlib import Path
directorys: list[str] = [
"/etc/netplan/",
"/etc/NetworkManager/system-connections/",
"/var/lib/NetworkManager/user-connections/",
]
def search_string_in_directory(
directories: list[str] = directorys, # Use the predefined list as default
search_string: str = "", # Default is empty string
) -> bool:
if len(search_string) == 0:
return False
result = False
for directory in directories:
in_paths = Path(directory)
if not in_paths.exists() or not in_paths.is_dir():
continue
files = [file for file in in_paths.iterdir() if file.is_file()]
if not files:
continue
# Search for the string in each file
for file in files:
try:
with open(file, "r", errors="ignore") as f:
for line in f:
if search_string in line:
result = True # String found
break
if result:
break # No need to check further
except Exception:
continue # Skip files that cause errors
# Invert the logic: return False if string is found, True otherwise
return result
def main() -> bool:
parser = argparse.ArgumentParser(
description="Script only for use to compare the private key in the Network configurations to avoid errors with the network manager."
)
parser.add_argument("search_string", help="Search string")
args = parser.parse_args()
result = search_string_in_directory(search_string=args.search_string)
print(result)
if __name__ == "__main__":
main()

View File

@ -25,6 +25,7 @@ License along with this library. If not, see
<action id="org.ssl_encrypt">
<defaults>
<allow_any>auth_admin_keep</allow_any>
<allow_inactive>auth_admin_keep</allow_inactive>
<allow_active>yes</allow_active>
</defaults>
<annotate key="org.freedesktop.policykit.exec.path">/usr/local/bin/ssl_encrypt.py</annotate>
@ -37,6 +38,14 @@ License along with this library. If not, see
<allow_active>yes</allow_active>
</defaults>
<annotate key="org.freedesktop.policykit.exec.path">/usr/local/bin/ssl_decrypt.py</annotate>
</action>
<action id="org.match_found">
<defaults>
<allow_any>auth_admin_keep</allow_any>
<allow_inactive>auth_admin_keep</allow_inactive>
<allow_active>yes</allow_active>
</defaults>
<annotate key="org.freedesktop.policykit.exec.path">/usr/local/bin/match_found.py</annotate>
</action>
</policyconfig>

View File

@ -1,8 +1,9 @@
[UPDATES]
# Configuration
on
[THEME]
light
[TOOLTIP]
# Theme
dark
# Tooltips
True
[AUTOSTART ON]
# Autostart
off

View File

@ -1,26 +1,19 @@
#!/usr/bin/python3
""" This Script decrypt Wireguard files for Wirepy users """
import os
import shutil
from pathlib import Path
from subprocess import check_call
import shutil
from subprocess import CompletedProcess
import subprocess
from wp_app_config import AppConfig
import getpass
log_name: str = getpass.getuser()
if log_name == "root":
from common_tools import LxTools
log_name: str = LxTools.get_username()
print("replacement method applied")
log_name = AppConfig.USER_FILE.read_text().strip()
keyfile: Path = Path(f"/home/{log_name}/.config/wire_py/pbwgk.pem")
path_of_crypted_tunnel: Path = Path(f"/home/{log_name}/.config/wire_py")
if not keyfile.is_file():
check_call(
process: CompletedProcess[str] = subprocess.run(
[
"openssl",
"rsa",
@ -31,21 +24,27 @@ if not keyfile.is_file():
"-outform",
"PEM",
"-pubout",
]
],
capture_output=True,
text=True,
check=False,
)
print(process.stdout)
if process.returncode == 0:
print("Public key generated successfully.")
else:
print(f"Error with the following code... {process.returncode}")
shutil.chown(keyfile, 1000, 1000)
AppConfig.TEMP_DIR2 = f"/home/{log_name}/.config/wire_py/"
detl: list[str] = os.listdir(AppConfig.TEMP_DIR2)
os.chdir(AppConfig.TEMP_DIR2)
detl.remove("keys")
detl.remove("settings")
if os.path.exists(f"{AppConfig.TEMP_DIR2}pbwgk.pem"):
detl.remove("pbwgk.pem")
for detunnels in detl:
tlname2 = f"{detunnels[:-4]}.conf"
extpath = f"{AppConfig.TEMP_DIR}/{tlname2}"
check_call(
if AppConfig.PUBLICKEY.exists:
crypted__tunnel = [str(file) for file in path_of_crypted_tunnel.glob("*.dat")]
for tunnel_path in crypted__tunnel:
base_name = Path(tunnel_path).stem
process: CompletedProcess[str] = subprocess.run(
[
"openssl",
"pkeyutl",
@ -53,9 +52,25 @@ if os.path.exists(f"{AppConfig.TEMP_DIR2}pbwgk.pem"):
"-inkey",
AppConfig.SYSTEM_PATHS["pkey_path"],
"-in",
detunnels,
tunnel_path, # full path to the file
"-out",
extpath,
]
f"{AppConfig.TEMP_DIR}/{base_name}.conf",
],
capture_output=True,
text=True,
check=False,
)
shutil.chown(extpath, 1000, 1000)
shutil.chown(f"{AppConfig.TEMP_DIR}/{base_name}.conf", 1000, 1000)
print(f"Processing of the file: {tunnel_path}")
if process.stdout:
print(process.stdout)
# Output from Openssl Error
if process.stderr:
print("(Error):", process.stderr)
if process.returncode == 0:
print(f"File {base_name}.dat successfully decrypted.")
else:
print(f"Error by {tunnel_path}: Code: {process.returncode}")

View File

@ -1,18 +1,20 @@
#!/usr/bin/python3
""" This Script encrypt Wireguardfiles for Wirepy users for more Security """
import os
import shutil
from pathlib import Path
from subprocess import check_call
import shutil
import subprocess
from subprocess import CompletedProcess
from wp_app_config import AppConfig
from common_tools import LxTools
keyfile: Path = AppConfig.PUBLICKEY
log_name = AppConfig.USER_FILE.read_text().strip()
keyfile: Path = Path(f"/home/{log_name}/.config/wire_py/pbwgk.pem")
target: Path = Path(f"/home/{log_name}/.config/wire_py/")
if not keyfile.is_file():
check_call(
process: CompletedProcess[str] = subprocess.run(
[
"openssl",
"rsa",
@ -23,56 +25,57 @@ if not keyfile.is_file():
"-outform",
"PEM",
"-pubout",
]
],
capture_output=True,
text=True,
check=False,
)
if process.stdout:
print(process.stdout)
# Output from Openssl Error
if process.stderr:
print("(Error):", process.stderr)
if process.returncode == 0:
print("Public key generated successfully.")
else:
print(f"Error generate Publickey: Code: {process.returncode}")
shutil.chown(keyfile, 1000, 1000)
if AppConfig.TEMP_DIR.exists():
tl = LxTools.get_file_name(AppConfig.TEMP_DIR)
CPTH: str = f"{keyfile}"
CRYPTFILES: str = CPTH[:-9]
# any() get True when directory is not empty
if AppConfig.TEMP_DIR.exists() and any(AppConfig.TEMP_DIR.iterdir()):
clear_files = [str(file) for file in AppConfig.TEMP_DIR.glob("*.conf")]
if keyfile.exists() and len(tl) != 0:
for tunnels in tl:
sourcetl: str = f"{AppConfig.TEMP_DIR}/{tunnels}"
tlname: str = f"{CRYPTFILES}{tunnels[:-5]}.dat"
check_call(
[
"openssl",
"pkeyutl",
"-encrypt",
"-inkey",
keyfile,
"-pubin",
"-in",
sourcetl,
"-out",
tlname,
]
)
for config_file in clear_files:
base_name = Path(config_file).stem
process: CompletedProcess[str] = subprocess.run(
[
"openssl",
"pkeyutl",
"-encrypt",
"-inkey",
keyfile,
"-pubin",
"-in",
config_file,
"-out",
f"{target}/{base_name}.dat",
],
capture_output=True,
text=True,
check=False,
)
else:
print(f"Processing of the file: {config_file}")
if AppConfig.TEMP_DIR.exists():
tl: list[str] = os.listdir(f"{AppConfig.TEMP_DIR}")
CPTH: str = f"{keyfile}"
CRYPTFILES: str = CPTH[:-9]
# Output from Openssl Error
if process.stderr:
print("(Error):", process.stderr)
if keyfile.exists() and len(tl) != 0:
for tunnels in tl:
sourcetl: str = f"{AppConfig.TEMP_DIR}/{tunnels}"
tlname: str = f"{CRYPTFILES}{tunnels[:-5]}.dat"
check_call(
[
"openssl",
"pkeyutl",
"-encrypt",
"-inkey",
keyfile,
"-pubin",
"-in",
sourcetl,
"-out",
tlname,
]
)
if process.returncode == 0:
print(f"File {base_name}.dat successfully encrypted.")
else:
print(f"Error by {config_file}: Code: {process.returncode}")

View File

@ -3,14 +3,23 @@
This script belongs to wirepy and is for the auto start of the tunnel
"""
from pathlib import Path
from subprocess import check_call
import subprocess
from subprocess import CompletedProcess
from wp_app_config import AppConfig
from common_tools import ConfigManager
path_to_file = Path(Path.home() / ".config/wire_py/settings")
ConfigManager.init(AppConfig.SETTINGS_FILE)
if ConfigManager.get("autostart") != "off":
process: CompletedProcess[str] = subprocess.run(
["nmcli", "connection", "up", ConfigManager.get("autostart")],
capture_output=True,
text=True,
check=False,
)
# Output from start_wg error
if process.stderr:
print(process.stderr) # this is for the error, later on logfile
a_con = Path(path_to_file).read_text(encoding="utf-8").splitlines(keepends=True)
a_con = a_con[7].strip()
if a_con != "off":
check_call(["nmcli", "connection", "up", a_con])
else:
pass

View File

@ -6,5 +6,6 @@ After=network-online.target
Type=oneshot
ExecStartPre=/bin/sleep 5
ExecStart=/usr/local/bin/start_wg.py
[Install]
WantedBy=default.target

459
wirepy.py
View File

@ -2,20 +2,21 @@
"""
this script is a simple GUI for managing Wireguard Tunnels
"""
import os
import getpass
import shutil
import subprocess
import sys
import tkinter as tk
import webbrowser
from pathlib import Path
from subprocess import check_call
from subprocess import CompletedProcess
from tkinter import TclError, filedialog, ttk
from common_tools import (
ConfigManager,
ThemeManager,
Create,
CryptoUtil,
GiteaUpdate,
Tunnel,
Tooltip,
@ -23,9 +24,10 @@ from common_tools import (
)
from wp_app_config import AppConfig, Msg
Create.dir_and_files()
Create.make_dir()
Create.decrypt()
AppConfig.USER_FILE.write_text(getpass.getuser())
AppConfig.ensure_directories()
AppConfig.create_default_settings()
CryptoUtil.decrypt()
class Wirepy(tk.Tk):
@ -239,15 +241,19 @@ class FrameWidgets(ttk.Frame):
self.l_box.configure(yscrollcommand=self.scrollbar.set)
# Tunnel List
self.tl = LxTools.get_file_name(AppConfig.TEMP_DIR)
for tunnels in self.tl:
self.tl = Tunnel.parse_files_to_dictionary(directory=AppConfig.TEMP_DIR)
LxTools.clean_files(AppConfig.TEMP_DIR, file=None)
AppConfig.ensure_directories()
# self.tl = LxTools.get_file_name(AppConfig.TEMP_DIR)
for tunnels, values in self.tl.items():
self.l_box.insert("end", tunnels)
self.l_box.update()
# Button Vpn
if self.a != "":
self.stop()
data = self.handle_tunnel_data(self.a)
self.handle_tunnel_data(self.a, self.tl)
self.show_data()
else:
self.start()
@ -583,22 +589,6 @@ class FrameWidgets(ttk.Frame):
else:
Tooltip(self.btn_stst, Msg.TTIP["start_tl"], self.tooltip_state)
def handle_tunnel_data(self, tunnel_name: str) -> tuple[str, str, str, str | None]:
"""_summary_
Args:
tunnel_name (str): name of a tunnel
Returns:
tuple[str, str]: tuple with tunnel data
"""
wg_read = f"/tmp/tlecdcwg/{tunnel_name}.conf"
with open(wg_read, "r", encoding="utf-8") as file:
data = Tunnel.con_to_dict(file)
self.init_and_report(data)
self.show_data()
return data
def color_label(self) -> None:
"""
View activ Tunnel in the color green or yellow
@ -642,141 +632,137 @@ class FrameWidgets(ttk.Frame):
def import_sl(self) -> None:
"""validity check of wireguard config files"""
Create.dir_and_files()
AppConfig.ensure_directories()
try:
filepath = filedialog.askopenfilename(
initialdir=f"{Path.home()}",
title=_("Select Wireguard config File"),
filetypes=[(_("WG config files"), "*.conf")],
title="Select Wireguard config File",
filetypes=[("WG config files", "*.conf")],
)
data_import, key_name = Tunnel.parse_files_to_dictionary(filepath=filepath)
# Überprüfe, ob der Benutzer den Dialog abgebrochen hat
if not filepath:
print("File import: abort by user...")
return
with open(filepath, "r", encoding="utf-8") as file:
read = file.read()
path_split = filepath.split("/")
path_split1 = path_split[-1]
if (
"PrivateKey = " in read
and "PublicKey = " in read
and "Endpoint =" in read
):
with open(filepath, "r", encoding="utf-8") as file:
key = Tunnel.con_to_dict(file)
pre_key = key[3]
if len(pre_key) != 0:
p_key = AppConfig.KEYS_FILE.read_text(encoding="utf-8")
if pre_key in p_key or f"{pre_key}\n" in p_key:
LxTools.msg_window(
AppConfig.IMAGE_PATHS["icon_error"],
AppConfig.IMAGE_PATHS["icon_msg"],
Msg.STR["imp_err"],
Msg.STR["tl_exist"],
)
else:
with open(
AppConfig.KEYS_FILE, "a", encoding="utf-8"
) as keyfile:
keyfile.write(f"{pre_key}\r")
if len(path_split1) > 17:
p1 = shutil.copy(filepath, AppConfig.TEMP_DIR)
path_split = path_split1[len(path_split1) - 17 :]
os.rename(p1, f"{AppConfig.TEMP_DIR}/{path_split}")
new_conf = f"{AppConfig.TEMP_DIR}/{path_split}"
if self.a != "":
check_call(["nmcli", "connection", "down", self.a])
self.reset_fields()
subprocess.check_output(
[
"nmcli",
"connection",
"import",
"type",
"wireguard",
"file",
new_conf,
],
text=True,
)
Create.encrypt()
else:
shutil.copy(filepath, f"{AppConfig.TEMP_DIR}/")
if self.a != "":
check_call(["nmcli", "connection", "down", self.a])
self.reset_fields()
subprocess.check_output(
[
"nmcli",
"connection",
"import",
"type",
"wireguard",
"file",
filepath,
],
text=True,
)
Create.encrypt()
self.str_var.set("")
self.a = Tunnel.active()
self.l_box.insert(0, self.a)
self.wg_autostart.configure(state="normal")
self.l_box.selection_clear(0, tk.END)
self.l_box.update()
self.l_box.selection_set(0)
Tooltip(
self.wg_autostart,
Msg.TTIP["autostart"],
self.tooltip_state,
x_offset=-10,
y_offset=-40,
)
Tooltip(self.btn_tr, Msg.TTIP["trash_tl"], self.tooltip_state)
Tooltip(self.btn_exp, Msg.TTIP["export_tl"], self.tooltip_state)
Tooltip(
self.btn_rename, Msg.TTIP["rename_tl"], self.tooltip_state
)
self.lb_rename.insert(0, "Max. 12 characters!")
self.str_var = tk.StringVar()
self.str_var.set(self.a)
self.color_label()
self.stop()
data = self.handle_tunnel_data(self.a)
check_call(
[
"nmcli",
"con",
"mod",
self.a,
"connection.autoconnect",
"no",
]
)
elif ("PrivateKey = " in read) and ("Endpoint = " in read):
pass
else:
if CryptoUtil.find_key(f"{data_import[key_name]["PrivateKey"]}="):
LxTools.msg_window(
AppConfig.IMAGE_PATHS["icon_error"],
AppConfig.IMAGE_PATHS["icon_msg"],
Msg.STR["imp_err"],
Msg.STR["no_valid_file"],
Msg.STR["tl_exist"],
)
elif not CryptoUtil.is_valid_base64(
f"{data_import[key_name]["PrivateKey"]}="
): # 2. Second check: Is it valid Base64?
LxTools.msg_window(
AppConfig.IMAGE_PATHS["icon_error"],
AppConfig.IMAGE_PATHS["icon_msg"],
Msg.STR["imp_err"],
Msg.STR["invalid_base64"],
)
else:
print("Key is valid and does not exist import allowed!")
filepath = Path(filepath)
# Shorten the tunnel name to the maximum allowed length if it exceeds 12 characters.
original_name = filepath.name
truncated_name = (
original_name[-17:] if len(original_name) > 17 else original_name
)
import_file = shutil.copy2(
filepath, AppConfig.TEMP_DIR / truncated_name
)
import_file = Path(import_file)
del data_import[key_name]["PrivateKey"]
self.tl.update(data_import)
if self.a != "":
process: CompletedProcess[str] = subprocess.run(
["nmcli", "connection", "down", self.a],
capture_output=True,
text=True,
check=False,
)
if process.stderr:
print(process.stderr)
else:
print(f"Error process decrypt: Code {process.returncode}")
self.reset_fields()
process: CompletedProcess[str] = subprocess.run(
[
"nmcli",
"connection",
"import",
"type",
"wireguard",
"file",
import_file,
],
capture_output=True,
text=True,
check=False,
)
if process.stderr:
print(process.stderr)
if process.returncode == 0:
print(f"Tunnel >> {import_file.stem} << import successfull")
else:
print(f"Error process decrypt: Code {process.returncode}")
CryptoUtil.encrypt()
LxTools.clean_files(AppConfig.TEMP_DIR, file=None)
AppConfig.ensure_directories()
self.str_var.set("")
self.a = Tunnel.active()
self.l_box.insert(0, self.a)
self.wg_autostart.configure(state="normal")
self.l_box.selection_clear(0, tk.END)
self.l_box.update()
self.l_box.selection_set(0)
Tooltip(
self.wg_autostart,
Msg.TTIP["autostart"],
self.tooltip_state,
x_offset=-10,
y_offset=-40,
)
Tooltip(self.btn_tr, Msg.TTIP["trash_tl"], self.tooltip_state)
Tooltip(self.btn_exp, Msg.TTIP["export_tl"], self.tooltip_state)
Tooltip(self.btn_rename, Msg.TTIP["rename_tl"], self.tooltip_state)
self.lb_rename.insert(0, "Max. 12 characters!")
self.str_var = tk.StringVar()
self.str_var.set(self.a)
self.color_label()
self.stop()
self.handle_tunnel_data(self.a, self.tl)
self.show_data()
process: CompletedProcess[str] = subprocess.run(
["nmcli", "con", "mod", self.a, "connection.autoconnect", "no"],
capture_output=True,
text=True,
check=False,
)
if process.stderr:
print(process.stderr)
if process.returncode == 0:
print(f">> {import_file.stem} << autostart is disabled by default")
except UnboundLocalError:
LxTools.msg_window(
AppConfig.IMAGE_PATHS["icon_error"],
AppConfig.IMAGE_PATHS["icon_msg"],
Msg.STR["imp_err"],
Msg.STR["no_valid_file"],
)
except IsADirectoryError:
print("File import: abort by user...")
except EOFError as e:
print(e)
except TypeError:
@ -793,35 +779,30 @@ class FrameWidgets(ttk.Frame):
try:
self.select_tunnel = self.l_box.curselection()
select_tl = self.l_box.get(self.select_tunnel[0])
with open(
f"/tmp/tlecdcwg/{select_tl}.conf", "r+", encoding="utf-8"
) as file2:
key = Tunnel.con_to_dict(file2)
pre_key = key[3]
check_call(["nmcli", "connection", "delete", select_tl])
process: CompletedProcess[str] = subprocess.run(
["nmcli", "connection", "delete", select_tl],
capture_output=True,
text=True,
check=False,
)
if process.stderr:
print(process.stderr)
if process.returncode == 0:
print(f"Tunnel >> {select_tl} << successfully deleted...")
else:
print(f"Error process: Code {process.returncode}")
self.l_box.delete(self.select_tunnel[0])
with open(AppConfig.SETTINGS_FILE, "r", encoding="utf-8") as set_f6:
lines6 = set_f6.readlines()
if select_tl == lines6[7].strip() and "off\n" not in lines6[7].strip():
lines6[7] = "off\n"
with open(AppConfig.SETTINGS_FILE, "w", encoding="utf-8") as set_f7:
set_f7.writelines(lines6)
self.selected_option.set(0)
self.autoconnect_var.set(_("no Autoconnect"))
is_encrypt = Path.home() / f".config/wire_py/{select_tl}.dat"
if is_encrypt.is_file():
Path.unlink(f"{Path.home()}/.config/wire_py/{select_tl}.dat")
Path.unlink(f"/tmp/tlecdcwg/{select_tl}.conf")
with open(AppConfig.KEYS_FILE, "r", encoding="utf-8") as readfile:
with open(
f"{Path.home()}/.config/wire_py/keys2", "w", encoding="utf-8"
) as writefile:
for line in readfile:
if pre_key not in line.strip("\n"):
writefile.write(line)
file_one = Path(f"{Path.home()}/.config/wire_py/keys2")
file_two = file_one.with_name("keys")
file_one.replace(file_two)
Path.unlink(f"{AppConfig.CONFIG_DIR}/{select_tl}.dat")
if select_tl == ConfigManager.get("autostart"):
ConfigManager.set("autostart", "off")
self.selected_option.set(0)
self.autoconnect_var.set(_("no Autoconnect"))
self.wg_autostart.configure(state="disabled")
# for disabling checkbox when Listbox empty
@ -871,7 +852,7 @@ class FrameWidgets(ttk.Frame):
"""
checkbox for enable autostart Tunnel
"""
Create.files_for_autostart()
AppConfig.get_autostart_content()
if self.l_box.size() != 0:
self.wg_autostart.configure(state="normal")
self.lb_rename.config(state="normal")
@ -884,22 +865,18 @@ class FrameWidgets(ttk.Frame):
Set (on), the selected tunnel is displayed in the label.
At (off) the label is first emptied then filled with No Autoconnect
"""
lines = (
Path(AppConfig.SETTINGS_FILE)
.read_text(encoding="utf-8")
.splitlines(keepends=True)
)
if lines[7] != "off\n":
print(f"{lines[7]} starts automatically when the system starts.")
if ConfigManager.get("autostart") != "off":
print(
f"{ConfigManager.get("autostart")} starts automatically when the system starts."
)
self.selected_option.set(1)
self.autoconnect_var.set("")
self.auto_con = lines[7]
self.auto_con = ConfigManager.get("autostart")
else:
self.selected_option.set(0)
self.auto_con = _("no Autoconnect")
print("Autostart disabled.")
self.autoconnect_var.set("")
self.autoconnect_var = tk.StringVar()
self.autoconnect_var.set(self.auto_con)
@ -929,31 +906,15 @@ class FrameWidgets(ttk.Frame):
select_tl = self.l_box.get(select_tunnel[0])
if self.selected_option.get() == 0:
lines = (
Path(AppConfig.SETTINGS_FILE)
.read_text(encoding="utf-8")
.splitlines(keepends=True)
)
lines[7] = "off\n"
Path(AppConfig.SETTINGS_FILE).write_text(
"".join(lines), encoding="utf-8"
)
ConfigManager.set("autostart", "off")
tl = LxTools.get_file_name(AppConfig.TEMP_DIR)
tl = [f"{file}" for file in AppConfig.CONFIG_DIR.glob("*.dat")]
if len(tl) == 0:
self.wg_autostart.configure(state="disabled")
if self.selected_option.get() >= 1:
lines = (
Path(AppConfig.SETTINGS_FILE)
.read_text(encoding="utf-8")
.splitlines(keepends=True)
)
lines[7] = select_tl
Path(AppConfig.SETTINGS_FILE).write_text(
"".join(lines), encoding="utf-8"
)
ConfigManager.set("autostart", select_tl)
except IndexError:
self.selected_option.set(1)
@ -1010,7 +971,7 @@ class FrameWidgets(ttk.Frame):
select_tl = self.l_box.get(self.select_tunnel[0])
# nmcli connection modify old connection.id iphone
subprocess.check_output(
process: CompletedProcess[str] = subprocess.run(
[
"nmcli",
"connection",
@ -1019,30 +980,28 @@ class FrameWidgets(ttk.Frame):
"connection.id",
self.lb_rename.get(),
],
capture_output=True,
text=True,
check=False,
)
source = Path(f"/tmp/tlecdcwg/{select_tl}.conf")
destination = source.with_name(f"{self.lb_rename.get()}.conf")
source.replace(destination)
Path.unlink(f"{Path.home()}/.config/wire_py/{select_tl}.dat")
if process.stderr:
print(process.stderr)
if process.returncode != 0:
print(f"Error process: Code {process.returncode}")
source = Path(f"{AppConfig.CONFIG_DIR}/{select_tl}.dat")
destination = AppConfig.CONFIG_DIR / f"{self.lb_rename.get()}.dat"
source.replace(destination)
self.tl[self.lb_rename.get()] = self.tl.pop(select_tl)
if select_tl == ConfigManager.get("autostart"):
ConfigManager.set("autostart", self.lb_rename.get())
self.autoconnect_var.set(value=self.lb_rename.get())
self.l_box.delete(self.select_tunnel[0])
self.l_box.insert("end", self.lb_rename.get())
self.l_box.update()
new_a_connect = self.lb_rename.get()
self.lb_rename.delete(0, tk.END)
with open(AppConfig.SETTINGS_FILE, "r", encoding="utf-8") as set_f5:
lines5 = set_f5.readlines()
if select_tl == lines5[7].strip() and "off\n" not in lines5[7].strip():
lines5[7] = new_a_connect
with open(
AppConfig.SETTINGS_FILE, "w", encoding="utf-8"
) as theme_set5:
theme_set5.writelines(lines5)
self.autoconnect_var.set(value=new_a_connect)
self.update_connection_display()
Create.encrypt()
except IndexError:
@ -1059,18 +1018,17 @@ class FrameWidgets(ttk.Frame):
except EOFError as e:
print(e)
def init_and_report(self, data=None) -> None:
"""
Displays the value address, DNS and peer in the labels
or empty it again
"""
def handle_tunnel_data(self, active=None, data=None) -> None:
tunnel = active
values = data[tunnel]
# Address Label
self.add = tk.StringVar()
self.add.set(f"{_("Address: ")}{data[0]}")
self.add.set(f"Address: {values['Address']}")
self.DNS = tk.StringVar()
self.DNS.set(f" DNS: {data[1]}")
self.DNS.set(f" DNS: {values['DNS']}")
self.enp = tk.StringVar()
self.enp.set(f"{_("Endpoint: ")}{data[2]}")
self.enp.set(f"Endpoint: {values['Endpoint']}")
def show_data(self) -> None:
"""
@ -1107,10 +1065,8 @@ class FrameWidgets(ttk.Frame):
else:
data = self.handle_tunnel_data(self.a)
if data:
self.handle_connection_state("stop")
self.handle_tunnel_data(self.a, self.tl)
self.handle_connection_state("stop")
except IndexError:
@ -1142,7 +1098,19 @@ class FrameWidgets(ttk.Frame):
"""
if action == "stop":
if self.a:
check_call(["nmcli", "connection", "down", self.a])
process: CompletedProcess[str] = subprocess.run(
["nmcli", "connection", "down", self.a],
capture_output=True,
text=True,
check=False,
)
if process.stderr:
print(process.stderr)
if process.returncode != 0:
print(f"Error process: Code {process.returncode}")
self.update_connection_display()
self.reset_fields()
self.start()
@ -1150,10 +1118,23 @@ class FrameWidgets(ttk.Frame):
elif action == "start":
if tunnel_name or self.a:
target_tunnel = tunnel_name or self.a
check_call(["nmcli", "connection", "up", target_tunnel])
process: CompletedProcess[str] = subprocess.run(
["nmcli", "connection", "up", target_tunnel],
capture_output=True,
text=True,
check=False,
)
if process.stderr:
print(process.stderr)
if process.returncode == 0:
print(f"Tunnel >> {target_tunnel} << started")
else:
print(f"Error process: Code {process.returncode}")
self.update_connection_display()
data = self.handle_tunnel_data(self.a)
self.init_and_report(data)
self.handle_tunnel_data(self.a, self.tl)
self.show_data()
self.color_label()
self.stop()

View File

@ -4,6 +4,7 @@
import gettext
import locale
from pathlib import Path
import subprocess
from typing import Dict, Any
@ -23,8 +24,14 @@ class AppConfig:
# Configuration files
SETTINGS_FILE: Path = CONFIG_DIR / "settings"
KEYS_FILE: Path = CONFIG_DIR / "keys"
SYSTEMD_USER_FOLDER: Path = Path.home() / ".config/systemd/user"
AUTOSTART_SERVICE: Path = Path.home() / ".config/systemd/user/wg_start.service"
DEFAULT_SETTINGS: Dict[str, str] = {
"# Configuration": "on",
"# Theme": "dark",
"# Tooltips": True,
"# Autostart": "off",
}
# Updates
# 1 = 1. Year, 09 = Month of the Year, 2924 = Day and Year of the Year
@ -79,7 +86,8 @@ class AppConfig:
@classmethod
def ensure_directories(cls) -> None:
"""Ensures that all required directories exist"""
cls.CONFIG_DIR.mkdir(parents=True, exist_ok=True)
if not cls.CONFIG_DIR.exists():
cls.CONFIG_DIR.mkdir(parents=True, exist_ok=True)
cls.TEMP_DIR.mkdir(parents=True, exist_ok=True)
@classmethod
@ -91,29 +99,43 @@ class AppConfig:
)
cls.SETTINGS_FILE.write_text(content)
@classmethod
def get_image_paths(cls) -> Dict[str, Path]:
"""Returns paths to UI images"""
return {
"main_icon": cls.SYSTEM_PATHS["image_path"] / "48/wg_vpn.png",
"warning": cls.CONFIG_DIR / "images/warning.png",
"success": cls.CONFIG_DIR / "images/success.png",
"error": cls.CONFIG_DIR / "images/error.png",
}
@classmethod
def get_autostart_content(cls) -> str:
"""Returns the content for the autostart service file"""
SYSTEMD_FILE: list[str] = [
"[Unit]",
"Description=Automatic Tunnel Start",
"After=network-online.target",
"",
"[Service]",
"Type=oneshot",
"ExecStartPre=/bin/sleep 5",
"ExecStart=/usr/local/bin/start_wg.py",
"",
"[Install]",
"WantedBy=default.target",
]
if not cls.SYSTEMD_USER_FOLDER.exists():
cls.SYSTEMD_USER_FOLDER.mkdir(parents=True, exist_ok=True)
return """[Unit]Description=Automatic Tunnel Start
After=network-online.target
from subprocess import CompletedProcess
[Service]
Type=oneshot
ExecStartPre=/bin/sleep 5
ExecStart=/usr/local/bin/start_wg.py
[Install]
WantedBy=default.target"""
if not cls.AUTOSTART_SERVICE.is_file():
content = "\n".join([line for line in SYSTEMD_FILE])
cls.AUTOSTART_SERVICE.write_text(content)
process: CompletedProcess[str] = subprocess.run(
["systemctl", "--user", "enable", "wg_start.service"],
capture_output=True,
text=True,
check=False,
)
print(process.stdout)
if process.returncode == 0:
print(process.stdout)
else:
print(f"Error with the following code... {process.returncode}")
# here is inizialize the class for translate strrings
@ -154,7 +176,7 @@ class Msg:
"sel_list": _("Please select a tunnel from the list"),
"sign_len": _("The new name may contain only 12 characters"),
"zero_signs": _("At least one character must be entered"),
"false signs": _(
"false_signs": _(
"No valid sign. These must not be used.\nBlank, Slash, Backslash and { }\n"
),
"is_in_use": _("The tunnel is already in use"),
@ -162,6 +184,9 @@ class Msg:
"Oh... no valid Wireguard File!\nPlease select a valid Wireguard File"
),
"tl_exist": _("Tunnel already available!\nPlease use another file for import"),
"invalid_base64": _(
"Invalid base64 format!\nPlease use a Config file with valid key."
),
}
TTIP: Dict[str, str] = {
# Strings for Tooltips