Compare commits
3 Commits
a903666a26
...
3da54642a0
Author | SHA1 | Date | |
---|---|---|---|
3da54642a0 | |||
fb0158d1cd | |||
6604650adf |
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,5 +1,6 @@
|
||||
debug.log
|
||||
.venv
|
||||
.venv.bak
|
||||
.idea
|
||||
.vscode
|
||||
__pycache__
|
||||
|
@ -10,7 +10,7 @@ from typing import Optional, Dict, Any, NoReturn, TextIO, Tuple, List
|
||||
import zipfile
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from subprocess import check_call, CompletedProcess
|
||||
from subprocess import CompletedProcess
|
||||
from tkinter import ttk, Toplevel
|
||||
from wp_app_config import AppConfig, Msg
|
||||
import requests
|
||||
@ -26,63 +26,6 @@ class Create:
|
||||
the tunnel from the user's home directory
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def dir_and_files() -> None:
|
||||
"""
|
||||
check and create folders and files if not present
|
||||
"""
|
||||
|
||||
pth: Path = Path.home() / ".config/wire_py"
|
||||
pth.mkdir(parents=True, exist_ok=True)
|
||||
sett: Path = Path.home() / ".config/wire_py/settings"
|
||||
AppConfig.KEYS_FILE
|
||||
|
||||
if sett.exists():
|
||||
pass
|
||||
|
||||
else:
|
||||
sett.touch()
|
||||
sett.write_text(
|
||||
"[UPDATES]\non\n[THEME]\nlight\n[TOOLTIP]\nTrue\n[AUTOSTART ON]\noff\n"
|
||||
)
|
||||
|
||||
if AppConfig.KEYS_FILE.exists():
|
||||
pass
|
||||
|
||||
else:
|
||||
AppConfig.KEYS_FILE.touch()
|
||||
|
||||
@staticmethod
|
||||
def files_for_autostart() -> None:
|
||||
"""
|
||||
check and create a file for auto start if not present and enable the service
|
||||
"""
|
||||
|
||||
pth2: Path = Path.home() / ".config/systemd/user"
|
||||
pth2.mkdir(parents=True, exist_ok=True)
|
||||
wg_ser: Path = Path.home() / ".config/systemd/user/wg_start.service"
|
||||
|
||||
if wg_ser.exists():
|
||||
pass
|
||||
|
||||
else:
|
||||
wg_ser.touch()
|
||||
wg_ser.write_text(
|
||||
"[Unit]\nDescription=Automatic Tunnel Start\nAfter=network-online.target\n\n[Service]\n"
|
||||
"Type=oneshot\nExecStartPre=/bin/sleep 5\nExecStart=/usr/local/bin/start_wg.py\n[Install]"
|
||||
"\nWantedBy=default.target"
|
||||
)
|
||||
check_call(["systemctl", "--user", "enable", "wg_start.service"])
|
||||
|
||||
@staticmethod
|
||||
def make_dir() -> None:
|
||||
"""Folder Name "tlecdewg" = Tunnel Encrypt Decrypt Wireguard"""
|
||||
|
||||
if AppConfig.TEMP_DIR.exists():
|
||||
pass
|
||||
else:
|
||||
AppConfig.TEMP_DIR.mkdir()
|
||||
|
||||
@staticmethod
|
||||
def decrypt() -> str:
|
||||
"""
|
||||
@ -90,20 +33,23 @@ class Create:
|
||||
"""
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
["pkexec", "/usr/local/bin/ssl_decrypt.py"],
|
||||
stdout=subprocess.PIPE,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
check=False,
|
||||
)
|
||||
path: Path = Path.home() / ".config/wire_py/"
|
||||
file_in_path: list[Path] = list(path.rglob("*.dat"))
|
||||
if file_in_path:
|
||||
if process.returncode == 0:
|
||||
print("File successfully decrypted...")
|
||||
|
||||
else:
|
||||
print(f"Error with the following code... {process.returncode}")
|
||||
# Output from Openssl
|
||||
# if process.stdout:
|
||||
# print(process.stdout)
|
||||
|
||||
# Output from Openssl Error
|
||||
if process.stderr:
|
||||
print(process.stderr)
|
||||
|
||||
if process.returncode == 0:
|
||||
print("Files successfully decrypted...")
|
||||
else:
|
||||
print(_("Ready for import"))
|
||||
print(f"Error process decrypt: Code {process.returncode}")
|
||||
|
||||
@staticmethod
|
||||
def encrypt() -> str:
|
||||
@ -112,15 +58,19 @@ class Create:
|
||||
"""
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
["pkexec", "/usr/local/bin/ssl_encrypt.py"],
|
||||
stdout=subprocess.PIPE,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
check=False,
|
||||
)
|
||||
print(process.stdout)
|
||||
|
||||
# Output from Openssl Error
|
||||
if process.stderr:
|
||||
print(process.stderr)
|
||||
|
||||
if process.returncode == 0:
|
||||
print("All Files successfully encrypted...")
|
||||
print("Files successfully encrypted...")
|
||||
else:
|
||||
print(f"Error with the following code... {process.returncode}")
|
||||
print(f"Error process encrypt: Code {process.returncode}")
|
||||
|
||||
|
||||
class LxTools(tk.Tk):
|
||||
|
@ -25,6 +25,7 @@ License along with this library. If not, see
|
||||
<action id="org.ssl_encrypt">
|
||||
<defaults>
|
||||
<allow_any>auth_admin_keep</allow_any>
|
||||
<allow_inactive>auth_admin_keep</allow_inactive>
|
||||
<allow_active>yes</allow_active>
|
||||
</defaults>
|
||||
<annotate key="org.freedesktop.policykit.exec.path">/usr/local/bin/ssl_encrypt.py</annotate>
|
||||
|
@ -1,19 +1,19 @@
|
||||
#!/usr/bin/python3
|
||||
""" This Script decrypt Wireguard files for Wirepy users """
|
||||
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from subprocess import check_call
|
||||
import shutil
|
||||
from subprocess import CompletedProcess
|
||||
import subprocess
|
||||
from wp_app_config import AppConfig
|
||||
|
||||
log_name = AppConfig.USER_FILE.read_text()
|
||||
log_name = AppConfig.USER_FILE.read_text().strip()
|
||||
|
||||
keyfile: Path = Path(f"/home/{log_name}/.config/wire_py/pbwgk.pem")
|
||||
path_of_crypted_tunnel: Path = Path(f"/home/{log_name}/.config/wire_py")
|
||||
|
||||
if not keyfile.is_file():
|
||||
|
||||
check_call(
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
[
|
||||
"openssl",
|
||||
"rsa",
|
||||
@ -24,21 +24,27 @@ if not keyfile.is_file():
|
||||
"-outform",
|
||||
"PEM",
|
||||
"-pubout",
|
||||
]
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
print(process.stdout)
|
||||
if process.returncode == 0:
|
||||
print("Public key generated successfully.")
|
||||
else:
|
||||
print(f"Error with the following code... {process.returncode}")
|
||||
shutil.chown(keyfile, 1000, 1000)
|
||||
|
||||
AppConfig.TEMP_DIR2 = f"/home/{log_name}/.config/wire_py/"
|
||||
detl: list[str] = os.listdir(AppConfig.TEMP_DIR2)
|
||||
os.chdir(AppConfig.TEMP_DIR2)
|
||||
detl.remove("keys")
|
||||
detl.remove("settings")
|
||||
if os.path.exists(f"{AppConfig.TEMP_DIR2}pbwgk.pem"):
|
||||
detl.remove("pbwgk.pem")
|
||||
for detunnels in detl:
|
||||
tlname2 = f"{detunnels[:-4]}.conf"
|
||||
extpath = f"{AppConfig.TEMP_DIR}/{tlname2}"
|
||||
check_call(
|
||||
if AppConfig.PUBLICKEY.exists:
|
||||
|
||||
crypted__tunnel = [str(file) for file in path_of_crypted_tunnel.glob("*.dat")]
|
||||
|
||||
for tunnel_path in crypted__tunnel:
|
||||
|
||||
base_name = Path(tunnel_path).stem
|
||||
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
[
|
||||
"openssl",
|
||||
"pkeyutl",
|
||||
@ -46,9 +52,25 @@ if os.path.exists(f"{AppConfig.TEMP_DIR2}pbwgk.pem"):
|
||||
"-inkey",
|
||||
AppConfig.SYSTEM_PATHS["pkey_path"],
|
||||
"-in",
|
||||
detunnels,
|
||||
tunnel_path, # full path to the file
|
||||
"-out",
|
||||
extpath,
|
||||
]
|
||||
f"{AppConfig.TEMP_DIR}/{base_name}.conf",
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
shutil.chown(extpath, 1000, 1000)
|
||||
shutil.chown(f"{AppConfig.TEMP_DIR}/{base_name}.conf", 1000, 1000)
|
||||
print(f"Processing of the file: {tunnel_path}")
|
||||
|
||||
if process.stdout:
|
||||
print(process.stdout)
|
||||
|
||||
# Output from Openssl Error
|
||||
if process.stderr:
|
||||
print("(Error):", process.stderr)
|
||||
|
||||
if process.returncode == 0:
|
||||
print(f"File {base_name}.dat successfully decrypted.")
|
||||
else:
|
||||
print(f"Error by {tunnel_path}: Code: {process.returncode}")
|
||||
|
107
ssl_encrypt.py
107
ssl_encrypt.py
@ -1,20 +1,20 @@
|
||||
#!/usr/bin/python3
|
||||
""" This Script encrypt Wireguardfiles for Wirepy users for more Security """
|
||||
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from subprocess import check_call
|
||||
import shutil
|
||||
import subprocess
|
||||
from subprocess import CompletedProcess
|
||||
from wp_app_config import AppConfig
|
||||
from common_tools import LxTools
|
||||
|
||||
log_name = AppConfig.USER_FILE.read_text()
|
||||
log_name = AppConfig.USER_FILE.read_text().strip()
|
||||
|
||||
keyfile: Path = Path(f"/home/{log_name}/.config/wire_py/pbwgk.pem")
|
||||
|
||||
target: Path = Path(f"/home/{log_name}/.config/wire_py/")
|
||||
|
||||
if not keyfile.is_file():
|
||||
|
||||
check_call(
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
[
|
||||
"openssl",
|
||||
"rsa",
|
||||
@ -25,56 +25,57 @@ if not keyfile.is_file():
|
||||
"-outform",
|
||||
"PEM",
|
||||
"-pubout",
|
||||
]
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
|
||||
if process.stdout:
|
||||
print(process.stdout)
|
||||
|
||||
# Output from Openssl Error
|
||||
if process.stderr:
|
||||
print("(Error):", process.stderr)
|
||||
|
||||
if process.returncode == 0:
|
||||
print("Public key generated successfully.")
|
||||
else:
|
||||
print(f"Error generate Publickey: Code: {process.returncode}")
|
||||
|
||||
shutil.chown(keyfile, 1000, 1000)
|
||||
|
||||
if AppConfig.TEMP_DIR.exists():
|
||||
tl = LxTools.get_file_name(AppConfig.TEMP_DIR)
|
||||
CPTH: str = f"{keyfile}"
|
||||
CRYPTFILES: str = CPTH[:-9]
|
||||
# any() get True when directory is not empty
|
||||
if AppConfig.TEMP_DIR.exists() and any(AppConfig.TEMP_DIR.iterdir()):
|
||||
clear_files = [str(file) for file in AppConfig.TEMP_DIR.glob("*.conf")]
|
||||
|
||||
if keyfile.exists() and len(tl) != 0:
|
||||
for tunnels in tl:
|
||||
sourcetl: str = f"{AppConfig.TEMP_DIR}/{tunnels}"
|
||||
tlname: str = f"{CRYPTFILES}{tunnels[:-5]}.dat"
|
||||
check_call(
|
||||
[
|
||||
"openssl",
|
||||
"pkeyutl",
|
||||
"-encrypt",
|
||||
"-inkey",
|
||||
keyfile,
|
||||
"-pubin",
|
||||
"-in",
|
||||
sourcetl,
|
||||
"-out",
|
||||
tlname,
|
||||
]
|
||||
)
|
||||
for config_file in clear_files:
|
||||
base_name = Path(config_file).stem
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
[
|
||||
"openssl",
|
||||
"pkeyutl",
|
||||
"-encrypt",
|
||||
"-inkey",
|
||||
keyfile,
|
||||
"-pubin",
|
||||
"-in",
|
||||
config_file,
|
||||
"-out",
|
||||
f"{target}/{base_name}.dat",
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
|
||||
else:
|
||||
print(f"Processing of the file: {config_file}")
|
||||
|
||||
if AppConfig.TEMP_DIR.exists():
|
||||
tl: list[str] = os.listdir(f"{AppConfig.TEMP_DIR}")
|
||||
CPTH: str = f"{keyfile}"
|
||||
CRYPTFILES: str = CPTH[:-9]
|
||||
# Output from Openssl Error
|
||||
if process.stderr:
|
||||
print("(Error):", process.stderr)
|
||||
|
||||
if keyfile.exists() and len(tl) != 0:
|
||||
for tunnels in tl:
|
||||
sourcetl: str = f"{AppConfig.TEMP_DIR}/{tunnels}"
|
||||
tlname: str = f"{CRYPTFILES}{tunnels[:-5]}.dat"
|
||||
check_call(
|
||||
[
|
||||
"openssl",
|
||||
"pkeyutl",
|
||||
"-encrypt",
|
||||
"-inkey",
|
||||
keyfile,
|
||||
"-pubin",
|
||||
"-in",
|
||||
sourcetl,
|
||||
"-out",
|
||||
tlname,
|
||||
]
|
||||
)
|
||||
if process.returncode == 0:
|
||||
print(f"File {base_name}.dat successfully encrypted.")
|
||||
else:
|
||||
print(f"Error by {config_file}: Code: {process.returncode}")
|
||||
|
14
start_wg.py
14
start_wg.py
@ -4,13 +4,23 @@
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from subprocess import check_call
|
||||
import subprocess
|
||||
from subprocess import CompletedProcess
|
||||
|
||||
path_to_file = Path(Path.home() / ".config/wire_py/settings")
|
||||
|
||||
a_con = Path(path_to_file).read_text(encoding="utf-8").splitlines(keepends=True)
|
||||
a_con = a_con[7].strip()
|
||||
if a_con != "off":
|
||||
check_call(["nmcli", "connection", "up", a_con])
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
["nmcli", "connection", "up", a_con],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
# Output from start_wg error
|
||||
if process.stderr:
|
||||
print(process.stderr) # this is for the error, later on logfile
|
||||
|
||||
else:
|
||||
pass
|
||||
|
@ -6,5 +6,6 @@ After=network-online.target
|
||||
Type=oneshot
|
||||
ExecStartPre=/bin/sleep 5
|
||||
ExecStart=/usr/local/bin/start_wg.py
|
||||
|
||||
[Install]
|
||||
WantedBy=default.target
|
||||
|
36
wirepy.py
36
wirepy.py
@ -11,7 +11,7 @@ import sys
|
||||
import tkinter as tk
|
||||
import webbrowser
|
||||
from pathlib import Path
|
||||
from subprocess import check_call
|
||||
from subprocess import CompletedProcess
|
||||
from tkinter import TclError, filedialog, ttk
|
||||
|
||||
from common_tools import (
|
||||
@ -645,7 +645,7 @@ class FrameWidgets(ttk.Frame):
|
||||
def import_sl(self) -> None:
|
||||
"""validity check of wireguard config files"""
|
||||
|
||||
Create.dir_and_files()
|
||||
AppConfig.ensure_directories()
|
||||
try:
|
||||
filepath = filedialog.askopenfilename(
|
||||
initialdir=f"{Path.home()}",
|
||||
@ -696,10 +696,12 @@ class FrameWidgets(ttk.Frame):
|
||||
new_conf = f"{AppConfig.TEMP_DIR}/{path_split}"
|
||||
|
||||
if self.a != "":
|
||||
check_call(["nmcli", "connection", "down", self.a])
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
["nmcli", "connection", "down", self.a]
|
||||
)
|
||||
self.reset_fields()
|
||||
|
||||
subprocess.check_output(
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
[
|
||||
"nmcli",
|
||||
"connection",
|
||||
@ -711,15 +713,18 @@ class FrameWidgets(ttk.Frame):
|
||||
],
|
||||
text=True,
|
||||
)
|
||||
|
||||
Create.encrypt()
|
||||
else:
|
||||
shutil.copy(filepath, f"{AppConfig.TEMP_DIR}/")
|
||||
|
||||
if self.a != "":
|
||||
check_call(["nmcli", "connection", "down", self.a])
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
["nmcli", "connection", "down", self.a]
|
||||
)
|
||||
self.reset_fields()
|
||||
|
||||
subprocess.check_output(
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
[
|
||||
"nmcli",
|
||||
"connection",
|
||||
@ -731,8 +736,8 @@ class FrameWidgets(ttk.Frame):
|
||||
],
|
||||
text=True,
|
||||
)
|
||||
Create.encrypt()
|
||||
|
||||
Create.encrypt()
|
||||
self.str_var.set("")
|
||||
self.a = Tunnel.active()
|
||||
self.l_box.insert(0, self.a)
|
||||
@ -760,7 +765,7 @@ class FrameWidgets(ttk.Frame):
|
||||
self.color_label()
|
||||
self.stop()
|
||||
data = self.handle_tunnel_data(self.a)
|
||||
check_call(
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
[
|
||||
"nmcli",
|
||||
"con",
|
||||
@ -770,6 +775,7 @@ class FrameWidgets(ttk.Frame):
|
||||
"no",
|
||||
]
|
||||
)
|
||||
|
||||
elif ("PrivateKey = " in read) and ("Endpoint = " in read):
|
||||
pass
|
||||
else:
|
||||
@ -801,7 +807,9 @@ class FrameWidgets(ttk.Frame):
|
||||
) as file2:
|
||||
key = Tunnel.con_to_dict(file2)
|
||||
pre_key = key[3]
|
||||
check_call(["nmcli", "connection", "delete", select_tl])
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
["nmcli", "connection", "delete", select_tl]
|
||||
)
|
||||
self.l_box.delete(self.select_tunnel[0])
|
||||
with open(AppConfig.SETTINGS_FILE, "r", encoding="utf-8") as set_f6:
|
||||
lines6 = set_f6.readlines()
|
||||
@ -874,7 +882,7 @@ class FrameWidgets(ttk.Frame):
|
||||
"""
|
||||
checkbox for enable autostart Tunnel
|
||||
"""
|
||||
Create.files_for_autostart()
|
||||
AppConfig.get_autostart_content()
|
||||
if self.l_box.size() != 0:
|
||||
self.wg_autostart.configure(state="normal")
|
||||
self.lb_rename.config(state="normal")
|
||||
@ -1145,7 +1153,9 @@ class FrameWidgets(ttk.Frame):
|
||||
"""
|
||||
if action == "stop":
|
||||
if self.a:
|
||||
check_call(["nmcli", "connection", "down", self.a])
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
["nmcli", "connection", "down", self.a]
|
||||
)
|
||||
self.update_connection_display()
|
||||
self.reset_fields()
|
||||
self.start()
|
||||
@ -1153,7 +1163,9 @@ class FrameWidgets(ttk.Frame):
|
||||
elif action == "start":
|
||||
if tunnel_name or self.a:
|
||||
target_tunnel = tunnel_name or self.a
|
||||
check_call(["nmcli", "connection", "up", target_tunnel])
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
["nmcli", "connection", "up", target_tunnel]
|
||||
)
|
||||
self.update_connection_display()
|
||||
data = self.handle_tunnel_data(self.a)
|
||||
self.init_and_report(data)
|
||||
|
@ -122,18 +122,21 @@ class AppConfig:
|
||||
if not cls.SYSTEMD_USER_FOLDER.exists():
|
||||
cls.SYSTEMD_USER_FOLDER.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for line in SYSTEMD_FILE:
|
||||
cls.AUTOSTART_SERVICE.write_text(line)
|
||||
from subprocess import CompletedProcess
|
||||
|
||||
process = subprocess.run(
|
||||
if not cls.AUTOSTART_SERVICE.is_file():
|
||||
|
||||
content = "\n".join([line for line in SYSTEMD_FILE])
|
||||
cls.AUTOSTART_SERVICE.write_text(content)
|
||||
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
["systemctl", "--user", "enable", "wg_start.service"],
|
||||
stdout=subprocess.PIPE,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
check=False,
|
||||
)
|
||||
print(process.stdout)
|
||||
if process.returncode == 0:
|
||||
print("File for autostart created successfully")
|
||||
print(process.stdout)
|
||||
else:
|
||||
print(f"Error with the following code... {process.returncode}")
|
||||
|
Reference in New Issue
Block a user