6 Commits

10 changed files with 281 additions and 201 deletions

1
.gitignore vendored
View File

@ -1,5 +1,6 @@
debug.log
.venv
.venv.bak
.idea
.vscode
__pycache__

View File

@ -10,7 +10,7 @@ from typing import Optional, Dict, Any, NoReturn, TextIO, Tuple, List
import zipfile
from datetime import datetime
from pathlib import Path
from subprocess import check_call, CompletedProcess
from subprocess import CompletedProcess
from tkinter import ttk, Toplevel
from wp_app_config import AppConfig, Msg
import requests
@ -27,100 +27,50 @@ class Create:
"""
@staticmethod
def dir_and_files() -> None:
"""
check and create folders and files if not present
"""
pth: Path = Path.home() / ".config/wire_py"
pth.mkdir(parents=True, exist_ok=True)
sett: Path = Path.home() / ".config/wire_py/settings"
AppConfig.KEYS_FILE
if sett.exists():
pass
else:
sett.touch()
sett.write_text(
"[UPDATES]\non\n[THEME]\nlight\n[TOOLTIP]\nTrue\n[AUTOSTART ON]\noff\n"
)
if AppConfig.KEYS_FILE.exists():
pass
else:
AppConfig.KEYS_FILE.touch()
@staticmethod
def files_for_autostart() -> None:
"""
check and create a file for auto start if not present and enable the service
"""
pth2: Path = Path.home() / ".config/systemd/user"
pth2.mkdir(parents=True, exist_ok=True)
wg_ser: Path = Path.home() / ".config/systemd/user/wg_start.service"
if wg_ser.exists():
pass
else:
wg_ser.touch()
wg_ser.write_text(
"[Unit]\nDescription=Automatic Tunnel Start\nAfter=network-online.target\n\n[Service]\n"
"Type=oneshot\nExecStartPre=/bin/sleep 5\nExecStart=/usr/local/bin/start_wg.py\n[Install]"
"\nWantedBy=default.target"
)
check_call(["systemctl", "--user", "enable", "wg_start.service"])
@staticmethod
def make_dir() -> None:
"""Folder Name "tlecdewg" = Tunnel Encrypt Decrypt Wireguard"""
if AppConfig.TEMP_DIR.exists():
pass
else:
AppConfig.TEMP_DIR.mkdir()
@staticmethod
def decrypt() -> None:
def decrypt() -> str:
"""
Starts SSL dencrypt
"""
process: CompletedProcess[str] = subprocess.run(
["pkexec", "/usr/local/bin/ssl_decrypt.py"],
stdout=subprocess.PIPE,
capture_output=True,
text=True,
check=True,
check=False,
)
path: Path = Path.home() / ".config/wire_py/"
file_in_path: list[Path] = list(path.rglob("*.dat"))
if file_in_path:
if process.returncode == 0:
print("File successfully decrypted...")
else:
print(f"Error with the following code... {process.returncode}")
# Output from Openssl
# if process.stdout:
# print(process.stdout)
# Output from Openssl Error
if process.stderr:
print(process.stderr)
if process.returncode == 0:
print("Files successfully decrypted...")
else:
print(_("Ready for import"))
print(f"Error process decrypt: Code {process.returncode}")
@staticmethod
def encrypt() -> None:
def encrypt() -> str:
"""
Starts SSL encryption
"""
process: CompletedProcess[str] = subprocess.run(
["pkexec", "/usr/local/bin/ssl_encrypt.py"],
stdout=subprocess.PIPE,
capture_output=True,
text=True,
check=True,
check=False,
)
print(process.stdout)
# Output from Openssl Error
if process.stderr:
print(process.stderr)
if process.returncode == 0:
print("All Files successfully encrypted...")
print("Files successfully encrypted...")
else:
print(f"Error with the following code... {process.returncode}")
print(f"Error process encrypt: Code {process.returncode}")
class LxTools(tk.Tk):
@ -131,6 +81,57 @@ class LxTools(tk.Tk):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
@staticmethod
def ckeck_key_is_exist(
directorys: list[str], search_string: str = None
) -> bool | None:
"""
Check if the key is exist in the file
Args:
directorys (list[str]): list of directories to search in
search_string (str): string to search for
Returns:
bool: True if the key is found, False otherwise
"""
if search_string:
result = False
for directory in directorys:
in_paths = Path(directory)
if not in_paths.exists() or not in_paths.is_dir():
continue
# Get a list of all files in the directorys
files = [file for file in in_paths.iterdir() if file.is_file()]
if not files:
continue
# Search for the string in the files
for file in files:
try:
with open(file, "r", errors="ignore") as f:
for line in f:
if search_string in line:
# Set the result to True if the string is found
result = True
break
# If the string is found, stop searching for the string in other files
if result:
break
except Exception:
# Ignore errors and continue to the next file
continue
else:
result = None
print(result)
return result
@staticmethod
def center_window_cross_platform(window, width, height):
"""
@ -258,10 +259,9 @@ class LxTools(tk.Tk):
check=True,
)
if result.returncode != 0:
exit(1)
else:
print(result.stdout.strip())
return result.stdout.strip()
pass
return result.stdout.strip()
except subprocess.CalledProcessError:
pass

View File

@ -25,6 +25,7 @@ License along with this library. If not, see
<action id="org.ssl_encrypt">
<defaults>
<allow_any>auth_admin_keep</allow_any>
<allow_inactive>auth_admin_keep</allow_inactive>
<allow_active>yes</allow_active>
</defaults>
<annotate key="org.freedesktop.policykit.exec.path">/usr/local/bin/ssl_encrypt.py</annotate>

View File

@ -1,8 +1,9 @@
[UPDATES]
# Configuration
on
[THEME]
light
[TOOLTIP]
# Theme
dark
# Tooltips
True
[AUTOSTART ON]
# Autostart
off

View File

@ -1,26 +1,19 @@
#!/usr/bin/python3
""" This Script decrypt Wireguard files for Wirepy users """
import os
import shutil
from pathlib import Path
from subprocess import check_call
import shutil
from subprocess import CompletedProcess
import subprocess
from wp_app_config import AppConfig
import getpass
log_name: str = getpass.getuser()
if log_name == "root":
from common_tools import LxTools
log_name: str = LxTools.get_username()
print("replacement method applied")
log_name = AppConfig.USER_FILE.read_text().strip()
keyfile: Path = Path(f"/home/{log_name}/.config/wire_py/pbwgk.pem")
path_of_crypted_tunnel: Path = Path(f"/home/{log_name}/.config/wire_py")
if not keyfile.is_file():
check_call(
process: CompletedProcess[str] = subprocess.run(
[
"openssl",
"rsa",
@ -31,21 +24,27 @@ if not keyfile.is_file():
"-outform",
"PEM",
"-pubout",
]
],
capture_output=True,
text=True,
check=False,
)
print(process.stdout)
if process.returncode == 0:
print("Public key generated successfully.")
else:
print(f"Error with the following code... {process.returncode}")
shutil.chown(keyfile, 1000, 1000)
AppConfig.TEMP_DIR2 = f"/home/{log_name}/.config/wire_py/"
detl: list[str] = os.listdir(AppConfig.TEMP_DIR2)
os.chdir(AppConfig.TEMP_DIR2)
detl.remove("keys")
detl.remove("settings")
if os.path.exists(f"{AppConfig.TEMP_DIR2}pbwgk.pem"):
detl.remove("pbwgk.pem")
for detunnels in detl:
tlname2 = f"{detunnels[:-4]}.conf"
extpath = f"{AppConfig.TEMP_DIR}/{tlname2}"
check_call(
if AppConfig.PUBLICKEY.exists:
crypted__tunnel = [str(file) for file in path_of_crypted_tunnel.glob("*.dat")]
for tunnel_path in crypted__tunnel:
base_name = Path(tunnel_path).stem
process: CompletedProcess[str] = subprocess.run(
[
"openssl",
"pkeyutl",
@ -53,9 +52,25 @@ if os.path.exists(f"{AppConfig.TEMP_DIR2}pbwgk.pem"):
"-inkey",
AppConfig.SYSTEM_PATHS["pkey_path"],
"-in",
detunnels,
tunnel_path, # full path to the file
"-out",
extpath,
]
f"{AppConfig.TEMP_DIR}/{base_name}.conf",
],
capture_output=True,
text=True,
check=False,
)
shutil.chown(extpath, 1000, 1000)
shutil.chown(f"{AppConfig.TEMP_DIR}/{base_name}.conf", 1000, 1000)
print(f"Processing of the file: {tunnel_path}")
if process.stdout:
print(process.stdout)
# Output from Openssl Error
if process.stderr:
print("(Error):", process.stderr)
if process.returncode == 0:
print(f"File {base_name}.dat successfully decrypted.")
else:
print(f"Error by {tunnel_path}: Code: {process.returncode}")

View File

@ -1,18 +1,20 @@
#!/usr/bin/python3
""" This Script encrypt Wireguardfiles for Wirepy users for more Security """
import os
import shutil
from pathlib import Path
from subprocess import check_call
import shutil
import subprocess
from subprocess import CompletedProcess
from wp_app_config import AppConfig
from common_tools import LxTools
keyfile: Path = AppConfig.PUBLICKEY
log_name = AppConfig.USER_FILE.read_text().strip()
keyfile: Path = Path(f"/home/{log_name}/.config/wire_py/pbwgk.pem")
target: Path = Path(f"/home/{log_name}/.config/wire_py/")
if not keyfile.is_file():
check_call(
process: CompletedProcess[str] = subprocess.run(
[
"openssl",
"rsa",
@ -23,56 +25,57 @@ if not keyfile.is_file():
"-outform",
"PEM",
"-pubout",
]
],
capture_output=True,
text=True,
check=False,
)
if process.stdout:
print(process.stdout)
# Output from Openssl Error
if process.stderr:
print("(Error):", process.stderr)
if process.returncode == 0:
print("Public key generated successfully.")
else:
print(f"Error generate Publickey: Code: {process.returncode}")
shutil.chown(keyfile, 1000, 1000)
if AppConfig.TEMP_DIR.exists():
tl = LxTools.get_file_name(AppConfig.TEMP_DIR)
CPTH: str = f"{keyfile}"
CRYPTFILES: str = CPTH[:-9]
# any() get True when directory is not empty
if AppConfig.TEMP_DIR.exists() and any(AppConfig.TEMP_DIR.iterdir()):
clear_files = [str(file) for file in AppConfig.TEMP_DIR.glob("*.conf")]
if keyfile.exists() and len(tl) != 0:
for tunnels in tl:
sourcetl: str = f"{AppConfig.TEMP_DIR}/{tunnels}"
tlname: str = f"{CRYPTFILES}{tunnels[:-5]}.dat"
check_call(
[
"openssl",
"pkeyutl",
"-encrypt",
"-inkey",
keyfile,
"-pubin",
"-in",
sourcetl,
"-out",
tlname,
]
)
for config_file in clear_files:
base_name = Path(config_file).stem
process: CompletedProcess[str] = subprocess.run(
[
"openssl",
"pkeyutl",
"-encrypt",
"-inkey",
keyfile,
"-pubin",
"-in",
config_file,
"-out",
f"{target}/{base_name}.dat",
],
capture_output=True,
text=True,
check=False,
)
else:
print(f"Processing of the file: {config_file}")
if AppConfig.TEMP_DIR.exists():
tl: list[str] = os.listdir(f"{AppConfig.TEMP_DIR}")
CPTH: str = f"{keyfile}"
CRYPTFILES: str = CPTH[:-9]
# Output from Openssl Error
if process.stderr:
print("(Error):", process.stderr)
if keyfile.exists() and len(tl) != 0:
for tunnels in tl:
sourcetl: str = f"{AppConfig.TEMP_DIR}/{tunnels}"
tlname: str = f"{CRYPTFILES}{tunnels[:-5]}.dat"
check_call(
[
"openssl",
"pkeyutl",
"-encrypt",
"-inkey",
keyfile,
"-pubin",
"-in",
sourcetl,
"-out",
tlname,
]
)
if process.returncode == 0:
print(f"File {base_name}.dat successfully encrypted.")
else:
print(f"Error by {config_file}: Code: {process.returncode}")

View File

@ -4,13 +4,23 @@
"""
from pathlib import Path
from subprocess import check_call
import subprocess
from subprocess import CompletedProcess
path_to_file = Path(Path.home() / ".config/wire_py/settings")
a_con = Path(path_to_file).read_text(encoding="utf-8").splitlines(keepends=True)
a_con = a_con[7].strip()
if a_con != "off":
check_call(["nmcli", "connection", "up", a_con])
process: CompletedProcess[str] = subprocess.run(
["nmcli", "connection", "up", a_con],
capture_output=True,
text=True,
check=False,
)
# Output from start_wg error
if process.stderr:
print(process.stderr) # this is for the error, later on logfile
else:
pass

View File

@ -6,5 +6,6 @@ After=network-online.target
Type=oneshot
ExecStartPre=/bin/sleep 5
ExecStart=/usr/local/bin/start_wg.py
[Install]
WantedBy=default.target

View File

@ -2,6 +2,8 @@
"""
this script is a simple GUI for managing Wireguard Tunnels
"""
import getpass
import os
import shutil
import subprocess
@ -9,7 +11,7 @@ import sys
import tkinter as tk
import webbrowser
from pathlib import Path
from subprocess import check_call
from subprocess import CompletedProcess
from tkinter import TclError, filedialog, ttk
from common_tools import (
@ -23,8 +25,9 @@ from common_tools import (
)
from wp_app_config import AppConfig, Msg
Create.dir_and_files()
Create.make_dir()
AppConfig.USER_FILE.write_text(getpass.getuser())
AppConfig.ensure_directories()
AppConfig.create_default_settings()
Create.decrypt()
@ -642,7 +645,7 @@ class FrameWidgets(ttk.Frame):
def import_sl(self) -> None:
"""validity check of wireguard config files"""
Create.dir_and_files()
AppConfig.ensure_directories()
try:
filepath = filedialog.askopenfilename(
initialdir=f"{Path.home()}",
@ -693,10 +696,12 @@ class FrameWidgets(ttk.Frame):
new_conf = f"{AppConfig.TEMP_DIR}/{path_split}"
if self.a != "":
check_call(["nmcli", "connection", "down", self.a])
process: CompletedProcess[str] = subprocess.run(
["nmcli", "connection", "down", self.a]
)
self.reset_fields()
subprocess.check_output(
process: CompletedProcess[str] = subprocess.run(
[
"nmcli",
"connection",
@ -708,15 +713,18 @@ class FrameWidgets(ttk.Frame):
],
text=True,
)
Create.encrypt()
else:
shutil.copy(filepath, f"{AppConfig.TEMP_DIR}/")
if self.a != "":
check_call(["nmcli", "connection", "down", self.a])
process: CompletedProcess[str] = subprocess.run(
["nmcli", "connection", "down", self.a]
)
self.reset_fields()
subprocess.check_output(
process: CompletedProcess[str] = subprocess.run(
[
"nmcli",
"connection",
@ -728,8 +736,8 @@ class FrameWidgets(ttk.Frame):
],
text=True,
)
Create.encrypt()
Create.encrypt()
self.str_var.set("")
self.a = Tunnel.active()
self.l_box.insert(0, self.a)
@ -757,7 +765,7 @@ class FrameWidgets(ttk.Frame):
self.color_label()
self.stop()
data = self.handle_tunnel_data(self.a)
check_call(
process: CompletedProcess[str] = subprocess.run(
[
"nmcli",
"con",
@ -767,6 +775,7 @@ class FrameWidgets(ttk.Frame):
"no",
]
)
elif ("PrivateKey = " in read) and ("Endpoint = " in read):
pass
else:
@ -798,7 +807,9 @@ class FrameWidgets(ttk.Frame):
) as file2:
key = Tunnel.con_to_dict(file2)
pre_key = key[3]
check_call(["nmcli", "connection", "delete", select_tl])
process: CompletedProcess[str] = subprocess.run(
["nmcli", "connection", "delete", select_tl]
)
self.l_box.delete(self.select_tunnel[0])
with open(AppConfig.SETTINGS_FILE, "r", encoding="utf-8") as set_f6:
lines6 = set_f6.readlines()
@ -871,7 +882,7 @@ class FrameWidgets(ttk.Frame):
"""
checkbox for enable autostart Tunnel
"""
Create.files_for_autostart()
AppConfig.get_autostart_content()
if self.l_box.size() != 0:
self.wg_autostart.configure(state="normal")
self.lb_rename.config(state="normal")
@ -1142,7 +1153,9 @@ class FrameWidgets(ttk.Frame):
"""
if action == "stop":
if self.a:
check_call(["nmcli", "connection", "down", self.a])
process: CompletedProcess[str] = subprocess.run(
["nmcli", "connection", "down", self.a]
)
self.update_connection_display()
self.reset_fields()
self.start()
@ -1150,7 +1163,9 @@ class FrameWidgets(ttk.Frame):
elif action == "start":
if tunnel_name or self.a:
target_tunnel = tunnel_name or self.a
check_call(["nmcli", "connection", "up", target_tunnel])
process: CompletedProcess[str] = subprocess.run(
["nmcli", "connection", "up", target_tunnel]
)
self.update_connection_display()
data = self.handle_tunnel_data(self.a)
self.init_and_report(data)

View File

@ -4,6 +4,7 @@
import gettext
import locale
from pathlib import Path
import subprocess
from typing import Dict, Any
@ -24,7 +25,14 @@ class AppConfig:
# Configuration files
SETTINGS_FILE: Path = CONFIG_DIR / "settings"
KEYS_FILE: Path = CONFIG_DIR / "keys"
SYSTEMD_USER_FOLDER: Path = Path.home() / ".config/systemd/user"
AUTOSTART_SERVICE: Path = Path.home() / ".config/systemd/user/wg_start.service"
DEFAULT_SETTINGS: Dict[str, str] = {
"# Configuration": "on",
"# Theme": "dark",
"# Tooltips": True,
"# Autostart": "off",
}
# Updates
# 1 = 1. Year, 09 = Month of the Year, 2924 = Day and Year of the Year
@ -49,6 +57,13 @@ class AppConfig:
"pkey_path": "/usr/local/etc/ssl/pwgk.pem",
}
# Lists of searches
DIRECTORYS: list[str] = [
"/etc/netplan/",
"/etc/NetworkManager/system-connections/",
"/var/lib/NetworkManager/user-connections/",
]
# Images and icons paths
IMAGE_PATHS: Dict[str, str] = {
"icon_vpn": "/usr/share/icons/lx-icons/48/wg_vpn.png",
@ -79,8 +94,12 @@ class AppConfig:
@classmethod
def ensure_directories(cls) -> None:
"""Ensures that all required directories exist"""
cls.CONFIG_DIR.mkdir(parents=True, exist_ok=True)
if not cls.CONFIG_DIR.exists():
cls.CONFIG_DIR.mkdir(parents=True, exist_ok=True)
cls.KEYS_FILE.touch()
cls.TEMP_DIR.mkdir(parents=True, exist_ok=True)
if not cls.KEYS_FILE.exists():
cls.KEYS_FILE.touch()
@classmethod
def create_default_settings(cls) -> None:
@ -91,29 +110,43 @@ class AppConfig:
)
cls.SETTINGS_FILE.write_text(content)
@classmethod
def get_image_paths(cls) -> Dict[str, Path]:
"""Returns paths to UI images"""
return {
"main_icon": cls.SYSTEM_PATHS["image_path"] / "48/wg_vpn.png",
"warning": cls.CONFIG_DIR / "images/warning.png",
"success": cls.CONFIG_DIR / "images/success.png",
"error": cls.CONFIG_DIR / "images/error.png",
}
@classmethod
def get_autostart_content(cls) -> str:
"""Returns the content for the autostart service file"""
SYSTEMD_FILE: list[str] = [
"[Unit]",
"Description=Automatic Tunnel Start",
"After=network-online.target",
"",
"[Service]",
"Type=oneshot",
"ExecStartPre=/bin/sleep 5",
"ExecStart=/usr/local/bin/start_wg.py",
"",
"[Install]",
"WantedBy=default.target",
]
if not cls.SYSTEMD_USER_FOLDER.exists():
cls.SYSTEMD_USER_FOLDER.mkdir(parents=True, exist_ok=True)
return """[Unit]Description=Automatic Tunnel Start
After=network-online.target
from subprocess import CompletedProcess
[Service]
Type=oneshot
ExecStartPre=/bin/sleep 5
ExecStart=/usr/local/bin/start_wg.py
[Install]
WantedBy=default.target"""
if not cls.AUTOSTART_SERVICE.is_file():
content = "\n".join([line for line in SYSTEMD_FILE])
cls.AUTOSTART_SERVICE.write_text(content)
process: CompletedProcess[str] = subprocess.run(
["systemctl", "--user", "enable", "wg_start.service"],
capture_output=True,
text=True,
check=False,
)
print(process.stdout)
if process.returncode == 0:
print(process.stdout)
else:
print(f"Error with the following code... {process.returncode}")
# here is inizialize the class for translate strrings