Wire-Py/common_tools.py

825 lines
27 KiB
Python
Executable File

""" Classes Method and Functions for lx Apps """
import os
import shutil
import signal
import subprocess
import sys
import tkinter as tk
from typing import Optional, Dict, Any, NoReturn, TextIO, Tuple, List
import zipfile
from datetime import datetime
from pathlib import Path
from subprocess import check_call, CompletedProcess
from tkinter import ttk, Toplevel
from wp_app_config import AppConfig, Msg
import requests
# Translate
_ = AppConfig.setup_translations()
class Create:
"""
This class is for the creation of the folders and files
required by Wire-Py, as well as for decryption
the tunnel from the user's home directory
"""
@staticmethod
def dir_and_files() -> None:
"""
check and create folders and files if not present
"""
pth: Path = Path.home() / ".config/wire_py"
pth.mkdir(parents=True, exist_ok=True)
sett: Path = Path.home() / ".config/wire_py/settings"
AppConfig.KEYS_FILE
if sett.exists():
pass
else:
sett.touch()
sett.write_text(
"[UPDATES]\non\n[THEME]\nlight\n[TOOLTIP]\nTrue\n[AUTOSTART ON]\noff\n"
)
if AppConfig.KEYS_FILE.exists():
pass
else:
AppConfig.KEYS_FILE.touch()
@staticmethod
def files_for_autostart() -> None:
"""
check and create a file for auto start if not present and enable the service
"""
pth2: Path = Path.home() / ".config/systemd/user"
pth2.mkdir(parents=True, exist_ok=True)
wg_ser: Path = Path.home() / ".config/systemd/user/wg_start.service"
if wg_ser.exists():
pass
else:
wg_ser.touch()
wg_ser.write_text(
"[Unit]\nDescription=Automatic Tunnel Start\nAfter=network-online.target\n\n[Service]\n"
"Type=oneshot\nExecStartPre=/bin/sleep 5\nExecStart=/usr/local/bin/start_wg.py\n[Install]"
"\nWantedBy=default.target"
)
check_call(["systemctl", "--user", "enable", "wg_start.service"])
@staticmethod
def make_dir() -> None:
"""Folder Name "tlecdewg" = Tunnel Encrypt Decrypt Wireguard"""
if AppConfig.TEMP_DIR.exists():
pass
else:
AppConfig.TEMP_DIR.mkdir()
@staticmethod
def decrypt() -> None:
"""
Starts SSL dencrypt
"""
process: CompletedProcess[str] = subprocess.run(
["pkexec", "/usr/local/bin/ssl_decrypt.py"],
stdout=subprocess.PIPE,
text=True,
check=True,
)
path: Path = Path.home() / ".config/wire_py/"
file_in_path: list[Path] = list(path.rglob("*.dat"))
if file_in_path:
if process.returncode == 0:
print("File successfully decrypted...")
else:
print(f"Error with the following code... {process.returncode}")
else:
print(_("Ready for import"))
@staticmethod
def encrypt() -> None:
"""
Starts SSL encryption
"""
process: CompletedProcess[str] = subprocess.run(
["pkexec", "/usr/local/bin/ssl_encrypt.py"],
stdout=subprocess.PIPE,
text=True,
check=True,
)
print(process.stdout)
if process.returncode == 0:
print("All Files successfully encrypted...")
else:
print(f"Error with the following code... {process.returncode}")
class LxTools(tk.Tk):
"""
Class LinuxTools methods that can also be used for other apps
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
@staticmethod
def center_window_cross_platform(window, width, height):
"""
Centers a window on the primary monitor in a way that works on both X11 and Wayland
Args:
window: The tkinter window to center
width: Window width
height: Window height
"""
# Calculate the position before showing the window
# First attempt: Try to use GDK if available (works on both X11 and Wayland)
try:
import gi
gi.require_version("Gdk", "3.0")
from gi.repository import Gdk
display = Gdk.Display.get_default()
monitor = display.get_primary_monitor() or display.get_monitor(0)
geometry = monitor.get_geometry()
scale_factor = monitor.get_scale_factor()
# Calculate center position on primary monitor
x = geometry.x + (geometry.width - width // scale_factor) // 2
y = geometry.y + (geometry.height - height // scale_factor) // 2
# Set window geometry
window.geometry(f"{width}x{height}+{x}+{y}")
return
except (ImportError, AttributeError):
pass
# Second attempt: Try xrandr for X11
try:
import subprocess
output = subprocess.check_output(
["xrandr", "--query"], universal_newlines=True
)
# Parse the output to find the primary monitor
primary_info = None
for line in output.splitlines():
if "primary" in line:
parts = line.split()
for part in parts:
if "x" in part and "+" in part:
primary_info = part
break
break
if primary_info:
# Parse the geometry: WIDTHxHEIGHT+X+Y
geometry = primary_info.split("+")
dimensions = geometry[0].split("x")
primary_width = int(dimensions[0])
primary_height = int(dimensions[1])
primary_x = int(geometry[1])
primary_y = int(geometry[2])
# Calculate center position on primary monitor
x = primary_x + (primary_width - width) // 2
y = primary_y + (primary_height - height) // 2
# Set window geometry
window.geometry(f"{width}x{height}+{x}+{y}")
return
except (subprocess.SubprocessError, ImportError, IndexError, ValueError):
pass
# Final fallback: Use standard Tkinter method
screen_width = window.winfo_screenwidth()
screen_height = window.winfo_screenheight()
# Try to make an educated guess for multi-monitor setups
# If screen width is much larger than height, assume multiple monitors side by side
if (
screen_width > screen_height * 1.8
): # Heuristic for detecting multiple monitors
# Assume primary monitor is on the left half
screen_width = screen_width // 2
x = (screen_width - width) // 2
y = (screen_height - height) // 2
window.geometry(f"{width}x{height}+{x}+{y}")
@staticmethod
def get_file_name(path: Path, i: int = 5) -> List[str]:
"""
Recursively searches the specified path for files and returns a list of filenames,
with the last 'i' characters of each filename removed.
This method is useful for obtaining filenames without specific file extensions,
e.g., to remove '.conf' from Wireguard configuration files.
Args:
path (Path): The directory path to search
i (int, optional): Number of characters to remove from the end of each filename.
Default is 5, which typically corresponds to the length of '.conf'.
Returns:
List[str]: A list of filenames without the last 'i' characters
Example:
If path contains files like 'tunnel1.conf', 'tunnel2.conf' and i=5,
the method returns ['tunnel1', 'tunnel2'].
"""
lists_file = list(path.rglob("*"))
lists_file = [conf_file.name[:-i] for conf_file in lists_file]
return lists_file
@staticmethod
def uos() -> None:
"""
uos = LOGIN USERNAME
This method displays the username of the logged-in user,
even if you are rooted in a shell
"""
log_name: str = f"{Path.home()}"[6:]
file: Path = Path.home() / "/tmp/.log_user"
Path(file).write_text(log_name, encoding="utf-8")
@staticmethod
def clean_files(TEMP_DIR: Path = None, file: Path = None) -> None:
"""
method that can be added after need to delete a folder and a file when quitting.
Args:
:param file: default None
:param AppConfig.TEMP_DIR: default None
"""
if AppConfig.TEMP_DIR is not None:
shutil.rmtree(AppConfig.TEMP_DIR)
if file is not None:
Path.unlink(file)
@staticmethod
def msg_window(
image_path: Path,
image_path2: Path,
w_title: str,
w_txt: str,
txt2: Optional[str] = None,
com: Optional[str] = None,
) -> None:
"""
Creates message windows
:argument AppConfig.IMAGE_PATHS["icon_info"] = Image for TK window which is displayed to the left of the text
:argument AppConfig.IMAGE_PATHS["icon_vpn"] = Image for Task Icon
:argument w_title = Windows Title
:argument w_txt = Text for Tk Window
:argument txt2 = Text for Button two
:argument com = function for Button command
"""
msg: tk.Toplevel = tk.Toplevel()
msg.resizable(width=False, height=False)
msg.title(w_title)
msg.configure(pady=15, padx=15)
# Lade das erste Bild für das Fenster
try:
msg.img = tk.PhotoImage(file=image_path)
msg.i_window = tk.Label(msg, image=msg.img)
except Exception as e:
print(f"Fehler beim Laden des Fensterbildes: {e}")
msg.i_window = tk.Label(msg, text="Bild nicht gefunden")
label: tk.Label = tk.Label(msg, text=w_txt)
label.grid(column=1, row=0)
if txt2 is not None and com is not None:
label.config(font=("Ubuntu", 11), padx=15, justify="left")
msg.i_window.grid(column=0, row=0, sticky="nw")
button2: ttk.Button = ttk.Button(
msg, text=f"{txt2}", command=com, padding=4
)
button2.grid(column=0, row=1, sticky="e", columnspan=2)
button: ttk.Button = ttk.Button(
msg, text="OK", command=msg.destroy, padding=4
)
button.grid(column=0, row=1, sticky="w", columnspan=2)
else:
label.config(font=("Ubuntu", 11), padx=15)
msg.i_window.grid(column=0, row=0)
button: ttk.Button = ttk.Button(
msg, text="OK", command=msg.destroy, padding=4
)
button.grid(column=0, columnspan=2, row=1)
# Lade das Icon für das Fenster
try:
icon = tk.PhotoImage(file=image_path2)
msg.iconphoto(True, icon)
except Exception as e:
print(f"Fehler beim Laden des Fenstericons: {e}")
msg.columnconfigure(0, weight=1)
msg.rowconfigure(0, weight=1)
msg.winfo_toplevel()
@staticmethod
def sigi(file_path: Optional[Path] = None, file: Optional[Path] = None) -> None:
"""
Function for cleanup after a program interruption
:param file: Optional - File to be deleted
:param file_path: Optional - Directory to be deleted
"""
def signal_handler(signum: int, frame: Any) -> NoReturn:
"""
Determines clear text names for signal numbers and handles signals
Args:
signum: The signal number
frame: The current stack frame
Returns:
NoReturn since the function either exits the program or continues execution
"""
signals_to_names_dict: Dict[int, str] = dict(
(getattr(signal, n), n)
for n in dir(signal)
if n.startswith("SIG") and "_" not in n
)
signal_name: str = signals_to_names_dict.get(
signum, f"Unnamed signal: {signum}"
)
# End program for certain signals, report to others only reception
if signum in (signal.SIGINT, signal.SIGTERM):
exit_code: int = 1
print(
f"\nSignal {signal_name} {signum} received. => Aborting with exit code {exit_code}."
)
LxTools.clean_files(file_path, file)
print("Breakdown by user...")
sys.exit(exit_code)
else:
print(f"Signal {signum} received and ignored.")
LxTools.clean_files(file_path, file)
print("Process unexpectedly ended...")
# Register signal handlers for various signals
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
class Tunnel:
"""
Class of Methods for Wire-Py
"""
@classmethod
def con_to_dict(cls, file: TextIO) -> Tuple[str, str, str, Optional[str]]:
"""
Returns tuple of (address, dns, endpoint, pre_key)
"""
dictlist: List[str] = []
for lines in file.readlines():
line_plit: List[str] = lines.split()
dictlist = dictlist + line_plit
dictlist.remove("[Interface]")
dictlist.remove("[Peer]")
for items in dictlist:
if items == "=":
dictlist.remove(items)
if items == "::/0":
dictlist.remove(items)
# Here is the beginning (Loop) of convert List to Dictionary
for _ in dictlist:
a: List[str] = [dictlist[0], dictlist[1]]
b: List[str] = [dictlist[2], dictlist[3]]
c: List[str] = [dictlist[4], dictlist[5]]
d: List[str] = [dictlist[6], dictlist[7]]
e: List[str] = [dictlist[8], dictlist[9]]
f: List[str] = [dictlist[10], dictlist[11]]
g: List[str] = [dictlist[12], dictlist[13]]
h: List[str] = [dictlist[14], dictlist[15]]
new_list: List[List[str]] = [a, b, c, d, e, f, g, h]
final_dict: Dict[str, str] = {}
for elements in new_list:
final_dict[elements[0]] = elements[1]
# end... result a Dictionary
address: str = final_dict["Address"]
dns: str = final_dict["DNS"]
if "," in dns:
dns = dns[:-1]
endpoint: str = final_dict["Endpoint"]
pre_key: Optional[str] = final_dict.get("PresharedKey")
if pre_key is None:
pre_key: Optional[str] = final_dict.get("PreSharedKey")
return address, dns, endpoint, pre_key
@staticmethod
def active() -> str:
"""
Shows the Active Tunnel
"""
active = (
os.popen('nmcli con show --active | grep -iPo "(.*)(wireguard)"')
.read()
.split()
)
if not active:
active = ""
else:
active = active[0]
return active
@staticmethod
def list() -> List[str]:
"""
Returns a list of Wireguard tunnel names
"""
AppConfig.TEMP_DIR: Path = Path("/tmp/tlecdcwg/")
wg_s: List[str] = os.listdir(AppConfig.TEMP_DIR)
return wg_s
@staticmethod
def export(
image_path: Path = None,
image_path2: Path = None,
image_path3: Path = None,
image_path4: Path = None,
title: Dict = None,
window_msg: Dict = None,
) -> None:
"""
This will export the tunnels.
A zipfile with the current date and time is created
in the user's home directory with the correct right
Args:
AppConfig.IMAGE_PATHS["icon_info"]: Image for TK window which is displayed to the left of the text
AppConfig.IMAGE_PATHS["icon_vpn"]: Image for Task Icon
AppConfig.IMAGE_PATHS["icon_error"]: Image for TK window which is displayed to the left of the text
AppConfig.IMAGE_PATHS["icon_msg"]: Image for Task Icon
"""
now_time: datetime = datetime.now()
now_datetime: str = now_time.strftime("wg-exp-%m-%d-%Y-%H:%M")
tl: List[str] = Tunnel.list()
try:
if len(tl) != 0:
wg_tar: str = f"{Path.home()}/{now_datetime}"
shutil.copytree("/tmp/tlecdcwg/", "/tmp/wire_py", dirs_exist_ok=True)
source: Path = Path("/tmp/wire_py")
shutil.make_archive(wg_tar, "zip", source)
shutil.rmtree(source)
with zipfile.ZipFile(f"{wg_tar}.zip", "r") as zf:
if len(zf.namelist()) != 0:
LxTools.msg_window(
AppConfig.IMAGE_PATHS["icon_info"],
AppConfig.IMAGE_PATHS["icon_vpn"],
Msg.STR["exp_succ"],
Msg.STR["exp_in_home"],
)
else:
LxTools.msg_window(
AppConfig.IMAGE_PATHS["icon_error"],
AppConfig.IMAGE_PATHS["icon_msg"],
Msg.STR["exp_err"],
Msg.STR["exp_try"],
)
else:
LxTools.msg_window(
AppConfig.IMAGE_PATHS["icon_info"],
AppConfig.IMAGE_PATHS["icon_msg"],
Msg.STR["sel_tl"],
Msg.STR["tl_first"],
)
except TypeError:
pass
# ConfigManager with caching
class ConfigManager:
"""
Universal class for managing configuration files with caching.
Can be reused in different projects.
"""
_config = None
_config_file = None
@classmethod
def init(cls, config_file):
"""Initial the Configmanager with the given config file"""
cls._config_file = config_file
cls._config = None # Reset the cache
@classmethod
def load(cls):
"""Load the config file and return the config as dict"""
if not cls._config:
try:
lines = Path(cls._config_file).read_text(encoding="utf-8").splitlines()
cls._config = {
"updates": lines[1].strip(),
"theme": lines[3].strip(),
"tooltips": lines[5].strip()
== "True", # is converted here to boolean!!!
"autostart": lines[7].strip() if len(lines) > 7 else "off",
}
except (IndexError, FileNotFoundError):
# DeDefault values in case of error
cls._config = {
"updates": "on",
"theme": "light",
"tooltips": "True", # Default Value as string !
"autostart": "off",
}
return cls._config
@classmethod
def save(cls):
"""Save the config to the config file"""
if cls._config:
lines = [
"# Configuration\n",
f"{cls._config['updates']}\n",
"# Theme\n",
f"{cls._config['theme']}\n",
"# Tooltips\n",
f"{str(cls._config['tooltips'])}\n",
"# Autostart\n",
f"{cls._config['autostart']}\n",
]
Path(cls._config_file).write_text("".join(lines), encoding="utf-8")
@classmethod
def set(cls, key, value):
"""Sets a configuration value and saves the change"""
cls.load()
cls._config[key] = value
cls.save()
@classmethod
def get(cls, key, default=None):
"""Returns a configuration value"""
config = cls.load()
return config.get(key, default)
@property
def username(self) -> str:
"""
Returns the username of the logged-in user,
even if the script is running with root privileges.
Returns:
str: The username of the logged-in user
"""
import os
from pathlib import Path
# Versuche zuerst SUDO_USER zu bekommen
sudo_user = os.environ.get("SUDO_USER")
if sudo_user:
return sudo_user
# Extrahiere aus Path.home()
home_path = str(Path.home())
if "/home/" in home_path:
return home_path.split("/home/")[1].split("/")[0]
# Fallbacks
try:
return os.getlogin()
except:
import getpass
return getpass.getuser()
class ThemeManager:
@staticmethod
def change_theme(root, theme_in_use, theme_name=None):
"""Change application theme centrally"""
root.tk.call("set_theme", theme_in_use)
if theme_in_use == theme_name:
ConfigManager.set("theme", theme_in_use)
class GiteaUpdate:
"""
Calling download requests the download URL of the running script,
the taskbar image for the “Download OK” window, the taskbar image for the
“Download error” window and the variable res
"""
@staticmethod
def api_down(update_api_url: str, version: str, update_setting: str = None) -> str:
"""
Checks for updates via API
Args:
update_api_url: Update API URL
version: Current version
update_setting: Update setting from ConfigManager (on/off)
Returns:
New version or status message
"""
# If updates are disabled, return immediately
if update_setting != "on":
return "False"
try:
response: requests.Response = requests.get(update_api_url, timeout=10)
response.raise_for_status() # Raise exception for HTTP errors
response_data = response.json()
if not response_data:
return "No Updates"
latest_version = response_data[0].get("tag_name")
if not latest_version:
return "Invalid API Response"
# Compare versions (strip 'v. ' prefix if present)
current_version = version[3:] if version.startswith("v. ") else version
if current_version != latest_version:
return latest_version
else:
return "No Updates"
except requests.exceptions.RequestException:
return "No Internet Connection!"
except (ValueError, KeyError, IndexError):
return "Invalid API Response"
@staticmethod
def download(
urld: str,
res: str,
image_path: Path = None,
image_path2: Path = None,
image_path3: Path = None,
image_path4: Path = None,
) -> None:
"""
Downloads new version of wirepy
Args:
urld: Download URL
res: Result filename
AppConfig.IMAGE_PATHS["icon_info"]: Image for TK window which is displayed to the left of the text
AppConfig.IMAGE_PATHS["icon_vpn"]: Image for Task Icon
AppConfig.IMAGE_PATHS["icon_error"]: Image for TK window which is displayed to the left of the text
AppConfig.IMAGE_PATHS["icon_msg"]: Image for Task Icon
"""
try:
to_down: str = f"wget -qP {Path.home()} {" "} {urld}"
result: int = subprocess.call(to_down, shell=True)
if result == 0:
shutil.chown(f"{Path.home()}/{res}.zip", 1000, 1000)
wt: str = _("Download Successful")
msg_t: str = _("Your zip file is in home directory")
LxTools.msg_window(
AppConfig.IMAGE_PATHS["icon_info"],
AppConfig.IMAGE_PATHS["icon_vpn"],
wt,
msg_t,
)
else:
wt: str = _("Download error")
msg_t: str = _("Download failed! Please try again")
LxTools.msg_window(
AppConfig.IMAGE_PATHS["icon_error"],
AppConfig.IMAGE_PATHS["icon_msg"],
wt,
msg_t,
)
except subprocess.CalledProcessError:
wt: str = _("Download error")
msg_t: str = _("Download failed! No internet connection!")
LxTools.msg_window(
AppConfig.IMAGE_PATHS["icon_error"],
AppConfig.IMAGE_PATHS["icon_msg"],
wt,
msg_t,
)
class Tooltip:
"""Class for Tooltip
from common_tools.py import Tooltip
example: Tooltip(label, "Show tooltip on label")
example: Tooltip(button, "Show tooltip on button")
example: Tooltip(widget, "Text", state_var=tk.BooleanVar())
example: Tooltip(widget, "Text", state_var=tk.BooleanVar(), x_offset=20, y_offset=10)
info: label and button are parent widgets.
NOTE: When using with state_var, pass the tk.BooleanVar object directly,
NOT its value. For example: use state_var=my_bool_var, NOT state_var=my_bool_var.get()
"""
def __init__(
self,
widget: Any,
text: str,
state_var: Optional[tk.BooleanVar] = None,
x_offset: int = 65,
y_offset: int = 40,
) -> None:
"""Tooltip Class"""
self.widget: Any = widget
self.text: str = text
self.tooltip_window: Optional[Toplevel] = None
self.state_var = state_var
self.x_offset = x_offset
self.y_offset = y_offset
# Initial binding based on current state
self.update_bindings()
# Add trace to the state_var if provided
if self.state_var is not None:
self.state_var.trace_add("write", self.update_bindings)
def update_bindings(self, *args) -> None:
"""Updates the bindings based on the current state"""
# Remove existing bindings first
self.widget.unbind("<Enter>")
self.widget.unbind("<Leave>")
# Add new bindings if tooltips are enabled
if self.state_var is None or self.state_var.get():
self.widget.bind("<Enter>", self.show_tooltip)
self.widget.bind("<Leave>", self.hide_tooltip)
def show_tooltip(self, event: Optional[Any] = None) -> None:
"""Shows the tooltip"""
if self.tooltip_window or not self.text:
return
x: int
y: int
cx: int
cy: int
x, y, cx, cy = self.widget.bbox("insert")
x += self.widget.winfo_rootx() + self.x_offset
y += self.widget.winfo_rooty() + self.y_offset
self.tooltip_window = tw = tk.Toplevel(self.widget)
tw.wm_overrideredirect(True)
tw.wm_geometry(f"+{x}+{y}")
label: tk.Label = tk.Label(
tw,
text=self.text,
background="lightgreen",
foreground="black",
relief="solid",
borderwidth=1,
padx=5,
pady=5,
)
label.grid()
self.tooltip_window.after(2200, lambda: tw.destroy())
def hide_tooltip(self, event: Optional[Any] = None) -> None:
"""Hides the tooltip"""
if self.tooltip_window:
self.tooltip_window.destroy()
self.tooltip_window = None