remove USER_FILE usage in ssl_decrypt.py and ssl_encrypt.py; switch to argparse for command-line arguments
This commit is contained in:
parent
4cdcfadbac
commit
5ac37ad9ad
@ -6,7 +6,7 @@ import signal
|
||||
import base64
|
||||
import secrets
|
||||
import subprocess
|
||||
from subprocess import CompletedProcess
|
||||
from subprocess import CompletedProcess, run
|
||||
import re
|
||||
import sys
|
||||
import tkinter as tk
|
||||
@ -30,36 +30,36 @@ class CryptoUtil:
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def decrypt() -> None:
|
||||
def decrypt(user) -> None:
|
||||
"""
|
||||
Starts SSL dencrypt
|
||||
"""
|
||||
crypted_tunnel = [str(file) for file in AppConfig.CONFIG_DIR.glob("*.dat")]
|
||||
if crypted_tunnel == []:
|
||||
return
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
["pkexec", "/usr/local/bin/ssl_decrypt.py"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
|
||||
# Output from Openssl Error
|
||||
if process.stderr:
|
||||
print(process.stderr)
|
||||
|
||||
if process.returncode == 0:
|
||||
print("Files successfully decrypted...")
|
||||
if len([file.stem for file in AppConfig.CONFIG_DIR.glob("*.dat")]) == 0:
|
||||
pass
|
||||
else:
|
||||
print(f"Error process decrypt: Code {process.returncode}")
|
||||
process: CompletedProcess[str] = run(
|
||||
["pkexec", "/usr/local/bin/ssl_decrypt.py", "--user", user],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
|
||||
# Output from Openssl Error
|
||||
if process.stderr:
|
||||
print(process.stderr)
|
||||
|
||||
if process.returncode == 0:
|
||||
print("Files successfully decrypted...")
|
||||
else:
|
||||
print(f"Error process decrypt: Code {process.returncode}")
|
||||
|
||||
@staticmethod
|
||||
def encrypt() -> None:
|
||||
def encrypt(user) -> None:
|
||||
"""
|
||||
Starts SSL encryption
|
||||
"""
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
["pkexec", "/usr/local/bin/ssl_encrypt.py"],
|
||||
process: CompletedProcess[str] = run(
|
||||
["pkexec", "/usr/local/bin/ssl_encrypt.py", "--user", user],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False,
|
||||
@ -80,7 +80,7 @@ class CryptoUtil:
|
||||
Checks if the private key already exists in the system using an external script.
|
||||
Returns True only if the full key is found exactly (no partial match).
|
||||
"""
|
||||
process: CompletedProcess[bool] = subprocess.run(
|
||||
process: CompletedProcess[bool] = run(
|
||||
["pkexec", "/usr/local/bin/match_found.py", key],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
@ -224,9 +224,9 @@ class LxTools:
|
||||
even if the script is running with root privileges.
|
||||
"""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
result = run(
|
||||
["logname"],
|
||||
stdout=subprocess.PIPE,
|
||||
stdout=PIPE,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
|
@ -42,11 +42,10 @@ def search_string_in_directory(
|
||||
except Exception:
|
||||
continue # Skip files that cause errors
|
||||
|
||||
# Invert the logic: return False if string is found, True otherwise
|
||||
return result
|
||||
|
||||
|
||||
def main() -> bool:
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Script only for use to compare the private key in the Network configurations to avoid errors with the network manager."
|
||||
)
|
||||
|
@ -1,19 +1,30 @@
|
||||
#!/usr/bin/python3
|
||||
""" This Script decrypt Wireguard files for Wirepy users """
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
import pwd
|
||||
import shutil
|
||||
from subprocess import CompletedProcess
|
||||
import subprocess
|
||||
from subprocess import CompletedProcess, run
|
||||
from wp_app_config import AppConfig
|
||||
|
||||
log_name = AppConfig.USER_FILE.read_text().strip()
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--user", required=True, help="Username of the target file system")
|
||||
args = parser.parse_args()
|
||||
|
||||
keyfile: Path = Path(f"/home/{log_name}/.config/wire_py/pbwgk.pem")
|
||||
path_of_crypted_tunnel: Path = Path(f"/home/{log_name}/.config/wire_py")
|
||||
try:
|
||||
# Retrieve UID and GID
|
||||
user_info = pwd.getpwnam(args.user)
|
||||
uid = user_info.pw_uid # User ID (e.g., 1000)
|
||||
gid = user_info.pw_gid # Group ID (e.g., 1000)
|
||||
except KeyError:
|
||||
print(f"User '{args.user}' not found.")
|
||||
exit(1)
|
||||
|
||||
keyfile: Path = Path(f"/home/{args.user}/.config/wire_py/pbwgk.pem")
|
||||
path_of_crypted_tunnel: Path = Path(f"/home/{args.user}/.config/wire_py")
|
||||
|
||||
if not keyfile.is_file():
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
process: CompletedProcess[str] = run(
|
||||
[
|
||||
"openssl",
|
||||
"rsa",
|
||||
@ -34,9 +45,9 @@ if not keyfile.is_file():
|
||||
print("Public key generated successfully.")
|
||||
else:
|
||||
print(f"Error with the following code... {process.returncode}")
|
||||
shutil.chown(keyfile, 1000, 1000)
|
||||
shutil.chown(keyfile, uid, gid)
|
||||
|
||||
if AppConfig.PUBLICKEY.exists:
|
||||
if AppConfig.PUBLICKEY.exists():
|
||||
|
||||
crypted__tunnel = [str(file) for file in path_of_crypted_tunnel.glob("*.dat")]
|
||||
|
||||
@ -44,7 +55,7 @@ if AppConfig.PUBLICKEY.exists:
|
||||
|
||||
base_name = Path(tunnel_path).stem
|
||||
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
process: CompletedProcess[str] = run(
|
||||
[
|
||||
"openssl",
|
||||
"pkeyutl",
|
||||
@ -60,7 +71,7 @@ if AppConfig.PUBLICKEY.exists:
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
shutil.chown(f"{AppConfig.TEMP_DIR}/{base_name}.conf", 1000, 1000)
|
||||
shutil.chown(f"{AppConfig.TEMP_DIR}/{base_name}.conf", uid, gid)
|
||||
print(f"Processing of the file: {tunnel_path}")
|
||||
|
||||
if process.stdout:
|
||||
|
@ -1,20 +1,33 @@
|
||||
#!/usr/bin/python3
|
||||
""" This Script encrypt Wireguardfiles for Wirepy users for more Security """
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
import pwd
|
||||
import shutil
|
||||
import subprocess
|
||||
from subprocess import CompletedProcess
|
||||
from subprocess import CompletedProcess, run
|
||||
from wp_app_config import AppConfig
|
||||
|
||||
log_name = AppConfig.USER_FILE.read_text().strip()
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--user", required=True, help="Username of the target file system")
|
||||
args = parser.parse_args()
|
||||
|
||||
keyfile: Path = Path(f"/home/{log_name}/.config/wire_py/pbwgk.pem")
|
||||
try:
|
||||
# Retrieve UID and GID
|
||||
user_info = pwd.getpwnam(args.user)
|
||||
uid = user_info.pw_uid # User ID (e.g., 1000)
|
||||
gid = user_info.pw_gid # Group ID (e.g., 1000)
|
||||
except KeyError:
|
||||
print(f"User '{args.user}' not found.")
|
||||
exit(1)
|
||||
|
||||
target: Path = Path(f"/home/{log_name}/.config/wire_py/")
|
||||
keyfile: Path = Path(f"/home/{args.user}/.config/wire_py/pbwgk.pem")
|
||||
|
||||
target: Path = Path(f"/home/{args.user}/.config/wire_py/")
|
||||
|
||||
if not keyfile.is_file():
|
||||
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
process: CompletedProcess[str] = run(
|
||||
[
|
||||
"openssl",
|
||||
"rsa",
|
||||
@ -43,7 +56,7 @@ if not keyfile.is_file():
|
||||
else:
|
||||
print(f"Error generate Publickey: Code: {process.returncode}")
|
||||
|
||||
shutil.chown(keyfile, 1000, 1000)
|
||||
shutil.chown(keyfile, uid, gid)
|
||||
|
||||
# any() get True when directory is not empty
|
||||
if AppConfig.TEMP_DIR.exists() and any(AppConfig.TEMP_DIR.iterdir()):
|
||||
@ -51,7 +64,7 @@ if AppConfig.TEMP_DIR.exists() and any(AppConfig.TEMP_DIR.iterdir()):
|
||||
|
||||
for config_file in clear_files:
|
||||
base_name = Path(config_file).stem
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
process: CompletedProcess[str] = run(
|
||||
[
|
||||
"openssl",
|
||||
"pkeyutl",
|
||||
|
@ -3,15 +3,14 @@
|
||||
This script belongs to wirepy and is for the auto start of the tunnel
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
from subprocess import CompletedProcess
|
||||
from subprocess import CompletedProcess, run
|
||||
from wp_app_config import AppConfig
|
||||
from common_tools import ConfigManager
|
||||
|
||||
ConfigManager.init(AppConfig.SETTINGS_FILE)
|
||||
|
||||
if ConfigManager.get("autostart") != "off":
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
process: CompletedProcess[str] = run(
|
||||
["nmcli", "connection", "up", ConfigManager.get("autostart")],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
|
49
wirepy.py
49
wirepy.py
@ -10,7 +10,7 @@ import sys
|
||||
import tkinter as tk
|
||||
import webbrowser
|
||||
from pathlib import Path
|
||||
from subprocess import CompletedProcess
|
||||
from subprocess import CompletedProcess, run
|
||||
from tkinter import TclError, filedialog, ttk
|
||||
|
||||
from common_tools import (
|
||||
@ -24,10 +24,9 @@ from common_tools import (
|
||||
)
|
||||
from wp_app_config import AppConfig, Msg
|
||||
|
||||
AppConfig.USER_FILE.write_text(getpass.getuser())
|
||||
AppConfig.ensure_directories()
|
||||
AppConfig.create_default_settings()
|
||||
CryptoUtil.decrypt()
|
||||
CryptoUtil.decrypt(getpass.getuser())
|
||||
|
||||
|
||||
class Wirepy(tk.Tk):
|
||||
@ -538,7 +537,13 @@ class FrameWidgets(ttk.Frame):
|
||||
self.tooltip_label.set(_("Enable Tooltips"))
|
||||
|
||||
def tooltips_toggle(self):
|
||||
"""Toggles tooltips on/off and updates the menu label"""
|
||||
"""
|
||||
Toggles the visibility of tooltips (on/off) and updates
|
||||
the corresponding menu label. Inverts the current tooltip state
|
||||
(`self.tooltip_state`), saves the new value in the configuration,
|
||||
and applies the change immediately. Updates the menu entry's label to
|
||||
reflect the new tooltip status (e.g., "Tooltips: On" or "Tooltips: Off").
|
||||
"""
|
||||
# Toggle the boolean state
|
||||
new_bool_state = not self.tooltip_state.get()
|
||||
# Save the converted value in the configuration
|
||||
@ -674,7 +679,7 @@ class FrameWidgets(ttk.Frame):
|
||||
self.tl.update(data_import)
|
||||
|
||||
if self.a != "":
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
process: CompletedProcess[str] = run(
|
||||
["nmcli", "connection", "down", self.a],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
@ -688,7 +693,7 @@ class FrameWidgets(ttk.Frame):
|
||||
print(f"Error process decrypt: Code {process.returncode}")
|
||||
self.reset_fields()
|
||||
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
process: CompletedProcess[str] = run(
|
||||
[
|
||||
"nmcli",
|
||||
"connection",
|
||||
@ -711,7 +716,7 @@ class FrameWidgets(ttk.Frame):
|
||||
else:
|
||||
print(f"Error process decrypt: Code {process.returncode}")
|
||||
|
||||
CryptoUtil.encrypt()
|
||||
CryptoUtil.encrypt(getpass.getuser())
|
||||
LxTools.clean_files(AppConfig.TEMP_DIR, file=None)
|
||||
AppConfig.ensure_directories()
|
||||
self.str_var.set("")
|
||||
@ -740,7 +745,7 @@ class FrameWidgets(ttk.Frame):
|
||||
self.stop()
|
||||
self.handle_tunnel_data(self.a, self.tl)
|
||||
self.show_data()
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
process: CompletedProcess[str] = run(
|
||||
["nmcli", "con", "mod", self.a, "connection.autoconnect", "no"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
@ -779,7 +784,7 @@ class FrameWidgets(ttk.Frame):
|
||||
self.select_tunnel = self.l_box.curselection()
|
||||
select_tl = self.l_box.get(self.select_tunnel[0])
|
||||
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
process: CompletedProcess[str] = run(
|
||||
["nmcli", "connection", "delete", select_tl],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
@ -973,7 +978,7 @@ class FrameWidgets(ttk.Frame):
|
||||
select_tl = self.l_box.get(self.select_tunnel[0])
|
||||
|
||||
# nmcli connection modify old connection.id iphone
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
process: CompletedProcess[str] = run(
|
||||
[
|
||||
"nmcli",
|
||||
"connection",
|
||||
@ -1036,7 +1041,13 @@ class FrameWidgets(ttk.Frame):
|
||||
|
||||
def show_data(self) -> None:
|
||||
"""
|
||||
shows data in the label
|
||||
Displays network-related data (address, DNS, endpoint)
|
||||
in the UI using ttk.Label widgets.
|
||||
Creates three labels for address, DNS, and endpoint with
|
||||
specific styling (color, font), positioning them in a
|
||||
grid layout (`lb_frame` and `lb_frame2`).
|
||||
Each label is linked to a corresponding text variable
|
||||
(`self.add`, `self.DNS`, `self.enp`) for dynamic data updates.
|
||||
"""
|
||||
# Address Label
|
||||
self.address = ttk.Label(
|
||||
@ -1059,7 +1070,13 @@ class FrameWidgets(ttk.Frame):
|
||||
|
||||
def wg_switch(self, event=None) -> None:
|
||||
"""
|
||||
Deals with switching the VPN connection
|
||||
Manages switching between active and inactiveVPN connections.
|
||||
If no tunnel is selected (`self.a == ""`), it starts a new connection
|
||||
with the selected tunnel from the listbox (`l_box`).
|
||||
Otherwise, it stops the current connection and updates
|
||||
tunnel data using `handle_tunnel_data`.
|
||||
Handles errors like `IndexError` by displaying appropriate
|
||||
messages if no items are selected or the listbox is empty.
|
||||
"""
|
||||
try:
|
||||
if self.a == "":
|
||||
@ -1102,7 +1119,7 @@ class FrameWidgets(ttk.Frame):
|
||||
"""
|
||||
if action == "stop":
|
||||
if self.a:
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
process: CompletedProcess[str] = run(
|
||||
["nmcli", "connection", "down", self.a],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
@ -1122,7 +1139,7 @@ class FrameWidgets(ttk.Frame):
|
||||
elif action == "start":
|
||||
if tunnel_name or self.a:
|
||||
target_tunnel = tunnel_name or self.a
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
process: CompletedProcess[str] = run(
|
||||
["nmcli", "connection", "up", target_tunnel],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
@ -1164,7 +1181,7 @@ class FrameWidgets(ttk.Frame):
|
||||
if __name__ == "__main__":
|
||||
|
||||
_ = AppConfig.setup_translations()
|
||||
LxTools.sigi(AppConfig.TEMP_DIR, AppConfig.USER_FILE)
|
||||
LxTools.sigi(AppConfig.TEMP_DIR)
|
||||
window = Wirepy()
|
||||
"""
|
||||
the hidden files are hidden in Filedialog
|
||||
@ -1177,5 +1194,5 @@ if __name__ == "__main__":
|
||||
window.tk.call("set", "::tk::dialog::file::showHiddenVar", "0")
|
||||
window.mainloop()
|
||||
|
||||
LxTools.clean_files(AppConfig.TEMP_DIR, AppConfig.USER_FILE)
|
||||
LxTools.clean_files(AppConfig.TEMP_DIR)
|
||||
sys.exit(0)
|
||||
|
@ -4,7 +4,7 @@
|
||||
import gettext
|
||||
import locale
|
||||
from pathlib import Path
|
||||
import subprocess
|
||||
from subprocess import CompletedProcess, run
|
||||
from typing import Dict, Any
|
||||
|
||||
|
||||
@ -36,7 +36,6 @@ class AppConfig:
|
||||
BASE_DIR: Path = Path.home()
|
||||
CONFIG_DIR: Path = BASE_DIR / ".config/wire_py"
|
||||
TEMP_DIR: Path = Path("/tmp/tlecdcwg")
|
||||
USER_FILE: Path = Path("/tmp/.log_user")
|
||||
PUBLICKEY: Path = CONFIG_DIR / "pbwgk.pem"
|
||||
|
||||
# Configuration files
|
||||
@ -135,14 +134,12 @@ class AppConfig:
|
||||
if not cls.SYSTEMD_USER_FOLDER.exists():
|
||||
cls.SYSTEMD_USER_FOLDER.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
from subprocess import CompletedProcess
|
||||
|
||||
if not cls.AUTOSTART_SERVICE.is_file():
|
||||
|
||||
content = "\n".join([line for line in SYSTEMD_FILE])
|
||||
cls.AUTOSTART_SERVICE.write_text(content)
|
||||
|
||||
process: CompletedProcess[str] = subprocess.run(
|
||||
process: CompletedProcess[str] = run(
|
||||
["systemctl", "--user", "enable", "wg_start.service"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
|
Loading…
x
Reference in New Issue
Block a user