173 lines
4.9 KiB
Python
173 lines
4.9 KiB
Python
# cfd_sftp_manager.py
|
|
|
|
try:
|
|
import paramiko
|
|
PARAMIKO_AVAILABLE = True
|
|
except ImportError:
|
|
paramiko = None
|
|
PARAMIKO_AVAILABLE = False
|
|
|
|
try:
|
|
import keyring
|
|
KEYRING_AVAILABLE = True
|
|
except ImportError:
|
|
keyring = None
|
|
KEYRING_AVAILABLE = False
|
|
|
|
class SFTPManager:
|
|
def __init__(self):
|
|
self.client = None
|
|
self.sftp = None
|
|
self.home_dir = None
|
|
|
|
def connect(self, host, port, username, password=None, key_file=None, passphrase=None):
|
|
if not PARAMIKO_AVAILABLE:
|
|
raise ImportError("Paramiko library is not installed. SFTP functionality is disabled.")
|
|
|
|
try:
|
|
self.client = paramiko.SSHClient()
|
|
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
|
|
self.client.connect(
|
|
hostname=host,
|
|
port=port,
|
|
username=username,
|
|
password=password,
|
|
key_filename=key_file,
|
|
passphrase=passphrase,
|
|
timeout=10,
|
|
allow_agent=False,
|
|
look_for_keys=False
|
|
)
|
|
self.sftp = self.client.open_sftp()
|
|
self.home_dir = self.get_home_directory()
|
|
return True, "Connection successful."
|
|
except Exception as e:
|
|
self.client = None
|
|
self.sftp = None
|
|
return False, str(e)
|
|
|
|
def get_home_directory(self):
|
|
if not self.sftp:
|
|
return None
|
|
try:
|
|
# normalize('.') is a common way to get the default directory, usually home.
|
|
return self.sftp.normalize('.')
|
|
except Exception:
|
|
# Fallback to root if normalize fails
|
|
return "/"
|
|
|
|
def disconnect(self):
|
|
if self.sftp:
|
|
self.sftp.close()
|
|
self.sftp = None
|
|
if self.client:
|
|
self.client.close()
|
|
self.client = None
|
|
self.home_dir = None
|
|
|
|
def list_directory(self, path):
|
|
if not self.sftp:
|
|
return [], "Not connected."
|
|
|
|
try:
|
|
items = self.sftp.listdir_attr(path)
|
|
# Sort directories first, then files
|
|
items.sort(key=lambda x: (not self.item_is_dir(x), x.filename.lower()))
|
|
return items, None
|
|
except Exception as e:
|
|
return [], str(e)
|
|
|
|
def item_is_dir(self, item):
|
|
# Helper to check if an SFTP attribute object is a directory
|
|
import stat
|
|
return stat.S_ISDIR(item.st_mode)
|
|
|
|
def path_is_dir(self, path):
|
|
if not self.sftp:
|
|
return False
|
|
try:
|
|
import stat
|
|
return stat.S_ISDIR(self.sftp.stat(path).st_mode)
|
|
except Exception:
|
|
return False
|
|
|
|
def exists(self, path):
|
|
if not self.sftp:
|
|
return False
|
|
try:
|
|
self.sftp.stat(path)
|
|
return True
|
|
except FileNotFoundError:
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
def mkdir(self, path):
|
|
if not self.sftp:
|
|
return False, "Not connected."
|
|
try:
|
|
self.sftp.mkdir(path)
|
|
return True, ""
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
def rmdir(self, path):
|
|
if not self.sftp:
|
|
return False, "Not connected."
|
|
try:
|
|
self.sftp.rmdir(path)
|
|
return True, ""
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
def rm(self, path):
|
|
if not self.sftp:
|
|
return False, "Not connected."
|
|
try:
|
|
self.sftp.remove(path)
|
|
return True, ""
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
def rename(self, old_path, new_path):
|
|
if not self.sftp:
|
|
return False, "Not connected."
|
|
try:
|
|
self.sftp.rename(old_path, new_path)
|
|
return True, ""
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
def touch(self, path):
|
|
if not self.sftp:
|
|
return False, "Not connected."
|
|
try:
|
|
with self.sftp.open(path, 'w') as f:
|
|
pass
|
|
return True, ""
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
def rm_recursive(self, path):
|
|
if not self.sftp:
|
|
return False, "Not connected."
|
|
|
|
try:
|
|
items = self.sftp.listdir_attr(path)
|
|
for item in items:
|
|
remote_path = f"{path}/{item.filename}"
|
|
if self.item_is_dir(item):
|
|
success, msg = self.rm_recursive(remote_path)
|
|
if not success:
|
|
return False, msg
|
|
else:
|
|
self.sftp.remove(remote_path)
|
|
self.sftp.rmdir(path)
|
|
return True, ""
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
@property
|
|
def is_connected(self):
|
|
return self.sftp is not None |