diff --git a/common_tools.py b/common_tools.py index 9eec161..bbd1b2d 100755 --- a/common_tools.py +++ b/common_tools.py @@ -6,7 +6,7 @@ import signal import base64 import secrets import subprocess -from subprocess import CompletedProcess +from subprocess import CompletedProcess, run import re import sys import tkinter as tk @@ -30,36 +30,36 @@ class CryptoUtil: """ @staticmethod - def decrypt() -> None: + def decrypt(user) -> None: """ Starts SSL dencrypt """ - crypted_tunnel = [str(file) for file in AppConfig.CONFIG_DIR.glob("*.dat")] - if crypted_tunnel == []: - return - process: CompletedProcess[str] = subprocess.run( - ["pkexec", "/usr/local/bin/ssl_decrypt.py"], - capture_output=True, - text=True, - check=False, - ) - - # Output from Openssl Error - if process.stderr: - print(process.stderr) - - if process.returncode == 0: - print("Files successfully decrypted...") + if len([file.stem for file in AppConfig.CONFIG_DIR.glob("*.dat")]) == 0: + pass else: - print(f"Error process decrypt: Code {process.returncode}") + process: CompletedProcess[str] = run( + ["pkexec", "/usr/local/bin/ssl_decrypt.py", "--user", user], + capture_output=True, + text=True, + check=False, + ) + + # Output from Openssl Error + if process.stderr: + print(process.stderr) + + if process.returncode == 0: + print("Files successfully decrypted...") + else: + print(f"Error process decrypt: Code {process.returncode}") @staticmethod - def encrypt() -> None: + def encrypt(user) -> None: """ Starts SSL encryption """ - process: CompletedProcess[str] = subprocess.run( - ["pkexec", "/usr/local/bin/ssl_encrypt.py"], + process: CompletedProcess[str] = run( + ["pkexec", "/usr/local/bin/ssl_encrypt.py", "--user", user], capture_output=True, text=True, check=False, @@ -80,7 +80,7 @@ class CryptoUtil: Checks if the private key already exists in the system using an external script. Returns True only if the full key is found exactly (no partial match). """ - process: CompletedProcess[bool] = subprocess.run( + process: CompletedProcess[bool] = run( ["pkexec", "/usr/local/bin/match_found.py", key], capture_output=True, text=True, @@ -224,9 +224,9 @@ class LxTools: even if the script is running with root privileges. """ try: - result = subprocess.run( + result = run( ["logname"], - stdout=subprocess.PIPE, + stdout=PIPE, text=True, check=True, ) diff --git a/match_found.py b/match_found.py index b69ea65..1a73de3 100755 --- a/match_found.py +++ b/match_found.py @@ -42,11 +42,10 @@ def search_string_in_directory( except Exception: continue # Skip files that cause errors - # Invert the logic: return False if string is found, True otherwise return result -def main() -> bool: +def main() -> None: parser = argparse.ArgumentParser( description="Script only for use to compare the private key in the Network configurations to avoid errors with the network manager." ) diff --git a/ssl_decrypt.py b/ssl_decrypt.py index 03e4525..0439c79 100755 --- a/ssl_decrypt.py +++ b/ssl_decrypt.py @@ -1,19 +1,30 @@ #!/usr/bin/python3 """ This Script decrypt Wireguard files for Wirepy users """ - +import argparse from pathlib import Path +import pwd import shutil -from subprocess import CompletedProcess -import subprocess +from subprocess import CompletedProcess, run from wp_app_config import AppConfig -log_name = AppConfig.USER_FILE.read_text().strip() +parser = argparse.ArgumentParser() +parser.add_argument("--user", required=True, help="Username of the target file system") +args = parser.parse_args() -keyfile: Path = Path(f"/home/{log_name}/.config/wire_py/pbwgk.pem") -path_of_crypted_tunnel: Path = Path(f"/home/{log_name}/.config/wire_py") +try: + # Retrieve UID and GID + user_info = pwd.getpwnam(args.user) + uid = user_info.pw_uid # User ID (e.g., 1000) + gid = user_info.pw_gid # Group ID (e.g., 1000) +except KeyError: + print(f"User '{args.user}' not found.") + exit(1) + +keyfile: Path = Path(f"/home/{args.user}/.config/wire_py/pbwgk.pem") +path_of_crypted_tunnel: Path = Path(f"/home/{args.user}/.config/wire_py") if not keyfile.is_file(): - process: CompletedProcess[str] = subprocess.run( + process: CompletedProcess[str] = run( [ "openssl", "rsa", @@ -34,9 +45,9 @@ if not keyfile.is_file(): print("Public key generated successfully.") else: print(f"Error with the following code... {process.returncode}") - shutil.chown(keyfile, 1000, 1000) + shutil.chown(keyfile, uid, gid) -if AppConfig.PUBLICKEY.exists: +if AppConfig.PUBLICKEY.exists(): crypted__tunnel = [str(file) for file in path_of_crypted_tunnel.glob("*.dat")] @@ -44,7 +55,7 @@ if AppConfig.PUBLICKEY.exists: base_name = Path(tunnel_path).stem - process: CompletedProcess[str] = subprocess.run( + process: CompletedProcess[str] = run( [ "openssl", "pkeyutl", @@ -60,7 +71,7 @@ if AppConfig.PUBLICKEY.exists: text=True, check=False, ) - shutil.chown(f"{AppConfig.TEMP_DIR}/{base_name}.conf", 1000, 1000) + shutil.chown(f"{AppConfig.TEMP_DIR}/{base_name}.conf", uid, gid) print(f"Processing of the file: {tunnel_path}") if process.stdout: diff --git a/ssl_encrypt.py b/ssl_encrypt.py index 7147140..537572b 100755 --- a/ssl_encrypt.py +++ b/ssl_encrypt.py @@ -1,20 +1,33 @@ #!/usr/bin/python3 """ This Script encrypt Wireguardfiles for Wirepy users for more Security """ + +import argparse from pathlib import Path +import pwd import shutil -import subprocess -from subprocess import CompletedProcess +from subprocess import CompletedProcess, run from wp_app_config import AppConfig -log_name = AppConfig.USER_FILE.read_text().strip() +parser = argparse.ArgumentParser() +parser.add_argument("--user", required=True, help="Username of the target file system") +args = parser.parse_args() -keyfile: Path = Path(f"/home/{log_name}/.config/wire_py/pbwgk.pem") +try: + # Retrieve UID and GID + user_info = pwd.getpwnam(args.user) + uid = user_info.pw_uid # User ID (e.g., 1000) + gid = user_info.pw_gid # Group ID (e.g., 1000) +except KeyError: + print(f"User '{args.user}' not found.") + exit(1) -target: Path = Path(f"/home/{log_name}/.config/wire_py/") +keyfile: Path = Path(f"/home/{args.user}/.config/wire_py/pbwgk.pem") + +target: Path = Path(f"/home/{args.user}/.config/wire_py/") if not keyfile.is_file(): - process: CompletedProcess[str] = subprocess.run( + process: CompletedProcess[str] = run( [ "openssl", "rsa", @@ -43,7 +56,7 @@ if not keyfile.is_file(): else: print(f"Error generate Publickey: Code: {process.returncode}") - shutil.chown(keyfile, 1000, 1000) + shutil.chown(keyfile, uid, gid) # any() get True when directory is not empty if AppConfig.TEMP_DIR.exists() and any(AppConfig.TEMP_DIR.iterdir()): @@ -51,7 +64,7 @@ if AppConfig.TEMP_DIR.exists() and any(AppConfig.TEMP_DIR.iterdir()): for config_file in clear_files: base_name = Path(config_file).stem - process: CompletedProcess[str] = subprocess.run( + process: CompletedProcess[str] = run( [ "openssl", "pkeyutl", diff --git a/start_wg.py b/start_wg.py index 4842df3..e4a755d 100755 --- a/start_wg.py +++ b/start_wg.py @@ -3,15 +3,14 @@ This script belongs to wirepy and is for the auto start of the tunnel """ -import subprocess -from subprocess import CompletedProcess +from subprocess import CompletedProcess, run from wp_app_config import AppConfig from common_tools import ConfigManager ConfigManager.init(AppConfig.SETTINGS_FILE) if ConfigManager.get("autostart") != "off": - process: CompletedProcess[str] = subprocess.run( + process: CompletedProcess[str] = run( ["nmcli", "connection", "up", ConfigManager.get("autostart")], capture_output=True, text=True, diff --git a/wirepy.py b/wirepy.py index 930c394..4071ad2 100755 --- a/wirepy.py +++ b/wirepy.py @@ -10,7 +10,7 @@ import sys import tkinter as tk import webbrowser from pathlib import Path -from subprocess import CompletedProcess +from subprocess import CompletedProcess, run from tkinter import TclError, filedialog, ttk from common_tools import ( @@ -24,10 +24,9 @@ from common_tools import ( ) from wp_app_config import AppConfig, Msg -AppConfig.USER_FILE.write_text(getpass.getuser()) AppConfig.ensure_directories() AppConfig.create_default_settings() -CryptoUtil.decrypt() +CryptoUtil.decrypt(getpass.getuser()) class Wirepy(tk.Tk): @@ -538,7 +537,13 @@ class FrameWidgets(ttk.Frame): self.tooltip_label.set(_("Enable Tooltips")) def tooltips_toggle(self): - """Toggles tooltips on/off and updates the menu label""" + """ + Toggles the visibility of tooltips (on/off) and updates + the corresponding menu label. Inverts the current tooltip state + (`self.tooltip_state`), saves the new value in the configuration, + and applies the change immediately. Updates the menu entry's label to + reflect the new tooltip status (e.g., "Tooltips: On" or "Tooltips: Off"). + """ # Toggle the boolean state new_bool_state = not self.tooltip_state.get() # Save the converted value in the configuration @@ -674,7 +679,7 @@ class FrameWidgets(ttk.Frame): self.tl.update(data_import) if self.a != "": - process: CompletedProcess[str] = subprocess.run( + process: CompletedProcess[str] = run( ["nmcli", "connection", "down", self.a], capture_output=True, text=True, @@ -688,7 +693,7 @@ class FrameWidgets(ttk.Frame): print(f"Error process decrypt: Code {process.returncode}") self.reset_fields() - process: CompletedProcess[str] = subprocess.run( + process: CompletedProcess[str] = run( [ "nmcli", "connection", @@ -711,7 +716,7 @@ class FrameWidgets(ttk.Frame): else: print(f"Error process decrypt: Code {process.returncode}") - CryptoUtil.encrypt() + CryptoUtil.encrypt(getpass.getuser()) LxTools.clean_files(AppConfig.TEMP_DIR, file=None) AppConfig.ensure_directories() self.str_var.set("") @@ -740,7 +745,7 @@ class FrameWidgets(ttk.Frame): self.stop() self.handle_tunnel_data(self.a, self.tl) self.show_data() - process: CompletedProcess[str] = subprocess.run( + process: CompletedProcess[str] = run( ["nmcli", "con", "mod", self.a, "connection.autoconnect", "no"], capture_output=True, text=True, @@ -779,7 +784,7 @@ class FrameWidgets(ttk.Frame): self.select_tunnel = self.l_box.curselection() select_tl = self.l_box.get(self.select_tunnel[0]) - process: CompletedProcess[str] = subprocess.run( + process: CompletedProcess[str] = run( ["nmcli", "connection", "delete", select_tl], capture_output=True, text=True, @@ -973,7 +978,7 @@ class FrameWidgets(ttk.Frame): select_tl = self.l_box.get(self.select_tunnel[0]) # nmcli connection modify old connection.id iphone - process: CompletedProcess[str] = subprocess.run( + process: CompletedProcess[str] = run( [ "nmcli", "connection", @@ -1036,7 +1041,13 @@ class FrameWidgets(ttk.Frame): def show_data(self) -> None: """ - shows data in the label + Displays network-related data (address, DNS, endpoint) + in the UI using ttk.Label widgets. + Creates three labels for address, DNS, and endpoint with + specific styling (color, font), positioning them in a + grid layout (`lb_frame` and `lb_frame2`). + Each label is linked to a corresponding text variable + (`self.add`, `self.DNS`, `self.enp`) for dynamic data updates. """ # Address Label self.address = ttk.Label( @@ -1059,7 +1070,13 @@ class FrameWidgets(ttk.Frame): def wg_switch(self, event=None) -> None: """ - Deals with switching the VPN connection + Manages switching between active and inactiveVPN connections. + If no tunnel is selected (`self.a == ""`), it starts a new connection + with the selected tunnel from the listbox (`l_box`). + Otherwise, it stops the current connection and updates + tunnel data using `handle_tunnel_data`. + Handles errors like `IndexError` by displaying appropriate + messages if no items are selected or the listbox is empty. """ try: if self.a == "": @@ -1102,7 +1119,7 @@ class FrameWidgets(ttk.Frame): """ if action == "stop": if self.a: - process: CompletedProcess[str] = subprocess.run( + process: CompletedProcess[str] = run( ["nmcli", "connection", "down", self.a], capture_output=True, text=True, @@ -1122,7 +1139,7 @@ class FrameWidgets(ttk.Frame): elif action == "start": if tunnel_name or self.a: target_tunnel = tunnel_name or self.a - process: CompletedProcess[str] = subprocess.run( + process: CompletedProcess[str] = run( ["nmcli", "connection", "up", target_tunnel], capture_output=True, text=True, @@ -1164,7 +1181,7 @@ class FrameWidgets(ttk.Frame): if __name__ == "__main__": _ = AppConfig.setup_translations() - LxTools.sigi(AppConfig.TEMP_DIR, AppConfig.USER_FILE) + LxTools.sigi(AppConfig.TEMP_DIR) window = Wirepy() """ the hidden files are hidden in Filedialog @@ -1177,5 +1194,5 @@ if __name__ == "__main__": window.tk.call("set", "::tk::dialog::file::showHiddenVar", "0") window.mainloop() -LxTools.clean_files(AppConfig.TEMP_DIR, AppConfig.USER_FILE) +LxTools.clean_files(AppConfig.TEMP_DIR) sys.exit(0) diff --git a/wp_app_config.py b/wp_app_config.py index 18fb3df..7719a11 100644 --- a/wp_app_config.py +++ b/wp_app_config.py @@ -4,7 +4,7 @@ import gettext import locale from pathlib import Path -import subprocess +from subprocess import CompletedProcess, run from typing import Dict, Any @@ -36,7 +36,6 @@ class AppConfig: BASE_DIR: Path = Path.home() CONFIG_DIR: Path = BASE_DIR / ".config/wire_py" TEMP_DIR: Path = Path("/tmp/tlecdcwg") - USER_FILE: Path = Path("/tmp/.log_user") PUBLICKEY: Path = CONFIG_DIR / "pbwgk.pem" # Configuration files @@ -135,14 +134,12 @@ class AppConfig: if not cls.SYSTEMD_USER_FOLDER.exists(): cls.SYSTEMD_USER_FOLDER.mkdir(parents=True, exist_ok=True) - from subprocess import CompletedProcess - if not cls.AUTOSTART_SERVICE.is_file(): content = "\n".join([line for line in SYSTEMD_FILE]) cls.AUTOSTART_SERVICE.write_text(content) - process: CompletedProcess[str] = subprocess.run( + process: CompletedProcess[str] = run( ["systemctl", "--user", "enable", "wg_start.service"], capture_output=True, text=True,