Xshell插件开发挑战:用Python扩展Xshell功能,开发个性化网络管理工具
摘要
本文详细介绍了如何利用Python扩展Xshell功能,开发个性化网络管理工具。全文共分为六个部分,从Xshell插件架构解析开始,逐步深入Python插件开发环境配置、核心功能实现、网络管理工具开发实战、高级功能与集成,最后进行性能优化与部署。通过2万字的详尽解析,读者将掌握完整的Xshell插件开发流程,能够创建功能强大的网络管理工具。
第一部分:Xshell插件架构深度解析
1.1 Xshell插件体系概览
Xshell作为一款主流的SSH客户端和终端模拟器,提供了强大的插件扩展机制。其插件架构基于COM(Component Object Model)技术,允许开发者通过标准接口扩展软件功能。
Xshell插件类型:
-
菜单插件:添加自定义菜单项到Xshell界面
-
工具栏插件:创建自定义工具栏按钮
-
会话插件:扩展会话管理功能
-
事件处理插件:响应Xshell内部事件
-
协议插件:扩展支持的协议类型
1.2 COM接口与Python集成机制
Xshell通过COM接口暴露其功能,Python可以通过win32com库与这些接口交互。Xshell的主要COM对象包括:
python
# Xshell主要COM对象结构 # Xshell.Application - 顶级应用程序对象 # Xshell.Session - 会话对象 # Xshell.Tab - 标签页对象 # Xshell.Window - 窗口对象 # Xshell.Terminal - 终端对象
1.3 插件生命周期管理
Xshell插件遵循特定的生命周期:
-
加载阶段:插件DLL被Xshell加载
-
初始化阶段:插件注册COM组件
-
运行阶段:插件响应事件和处理请求
-
卸载阶段:插件清理资源
第二部分:Python插件开发环境配置
2.1 开发环境搭建
所需工具:
-
Xshell 7+(开发版本)
-
Python 3.8+(推荐3.9+)
-
PyCharm或VS Code(IDE)
-
Python for Windows Extensions(pywin32)
-
Microsoft Visual C++ Build Tools
环境配置步骤:
bash
# 1. 安装Python依赖
pip install pywin32 comtypes psutil netifaces paramiko
pip install pyinstaller cx_Freeze # 打包工具
pip install pyqt5 pyside6 # GUI框架可选
# 2. 安装Xshell SDK(如有)
# Xshell不直接提供Python SDK,需通过COM接口访问
# 3. 配置开发目录结构
mkdir xshell_plugin_project
cd xshell_plugin_project
mkdir -p src/{core,ui,network,tools}
mkdir -p resources/{icons,configs}
mkdir tests docs
2.2 COM接口Python封装
创建Xshell COM接口的Python封装类:
python
# src/core/xshell_com.py
import win32com.client
import pythoncom
from typing import Optional, List, Dict, Any
class XShellCOM:
"""Xshell COM接口封装类"""
def __init__(self):
self._app = None
self._session = None
self._connected = False
def connect(self) -> bool:
"""连接到Xshell应用程序"""
try:
self._app = win32com.client.Dispatch("Xshell.Application")
self._connected = True
return True
except Exception as e:
print(f"连接Xshell失败: {e}")
return False
@property
def application(self):
"""获取应用程序对象"""
if not self._connected:
self.connect()
return self._app
@property
def active_session(self):
"""获取活动会话"""
app = self.application
if app:
return app.ActiveSession
return None
@property
def sessions(self) -> List:
"""获取所有会话"""
app = self.application
if app:
return list(app.Sessions)
return []
def execute_command(self, command: str,
wait: bool = True,
timeout: int = 5000) -> str:
"""在当前会话执行命令"""
session = self.active_session
if not session:
return ""
# 发送命令
session.SendString(command + "\r")
if wait:
# 等待命令执行完成
import time
start_time = time.time()
while time.time() - start_time < timeout/1000:
time.sleep(0.1)
# 获取输出(简化版本)
# 实际需要更复杂的逻辑来捕获命令输出
return "Command executed"
def create_session(self, name: str, host: str,
protocol: str = "SSH") -> bool:
"""创建新会话"""
try:
app = self.application
sessions = app.Sessions
# 创建新会话
new_session = sessions.Add()
new_session.Name = name
new_session.Host = host
new_session.Protocol = protocol
# 保存会话
sessions.Save()
return True
except Exception as e:
print(f"创建会话失败: {e}")
return False
2.3 插件注册机制
Xshell插件需要注册到系统注册表:
python
# src/core/plugin_registry.py
import winreg
import os
import sys
from typing import Dict, List
class PluginRegistry:
"""Xshell插件注册管理"""
XSHELL_REG_PATH = r"SOFTWARE\NetSarang\Xshell\Plugins"
@classmethod
def register_plugin(cls, plugin_name: str,
plugin_path: str,
description: str = "",
version: str = "1.0.0",
author: str = "") -> bool:
"""注册插件到Xshell"""
try:
# 打开注册表键
key_path = f"{cls.XSHELL_REG_PATH}\\{plugin_name}"
key = winreg.CreateKey(winreg.HKEY_CURRENT_USER, key_path)
# 设置插件信息
winreg.SetValueEx(key, "Path", 0, winreg.REG_SZ, plugin_path)
winreg.SetValueEx(key, "Description", 0, winreg.REG_SZ, description)
winreg.SetValueEx(key, "Version", 0, winreg.REG_SZ, version)
winreg.SetValueEx(key, "Author", 0, winreg.REG_SZ, author)
winreg.SetValueEx(key, "Enabled", 0, winreg.REG_DWORD, 1)
winreg.CloseKey(key)
return True
except Exception as e:
print(f"插件注册失败: {e}")
return False
@classmethod
def unregister_plugin(cls, plugin_name: str) -> bool:
"""从Xshell取消注册插件"""
try:
key_path = f"{cls.XSHELL_REG_PATH}\\{plugin_name}"
winreg.DeleteKey(winreg.HKEY_CURRENT_USER, key_path)
return True
except Exception as e:
print(f"插件卸载失败: {e}")
return False
@classmethod
def list_plugins(cls) -> List[Dict]:
"""列出所有已注册插件"""
plugins = []
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, cls.XSHELL_REG_PATH)
i = 0
while True:
try:
plugin_name = winreg.EnumKey(key, i)
plugin_key = winreg.OpenKey(key, plugin_name)
plugin_info = {"name": plugin_name}
try:
plugin_info["path"] = winreg.QueryValueEx(plugin_key, "Path")[0]
plugin_info["description"] = winreg.QueryValueEx(plugin_key, "Description")[0]
plugin_info["enabled"] = bool(winreg.QueryValueEx(plugin_key, "Enabled")[0])
except:
pass
plugins.append(plugin_info)
winreg.CloseKey(plugin_key)
i += 1
except OSError:
break
winreg.CloseKey(key)
except:
pass
return plugins
第三部分:Python插件核心功能实现
3.1 菜单插件开发
创建自定义菜单插件,添加网络管理相关功能:
python
# src/core/menu_plugin.py
import win32com.client
from win32com.client import Dispatch
import pythoncom
from typing import Dict, List, Callable
class XShellMenuPlugin:
"""Xshell菜单插件基类"""
def __init__(self, plugin_name: str):
self.plugin_name = plugin_name
self.menu_items = {}
self.app = None
self.menu_bar = None
def initialize(self) -> bool:
"""初始化插件"""
try:
# 连接到Xshell
self.app = Dispatch("Xshell.Application")
# 获取主菜单栏
self.menu_bar = self.app.MenuBar
# 创建插件菜单
self.create_menu()
return True
except Exception as e:
print(f"菜单插件初始化失败: {e}")
return False
def create_menu(self):
"""创建菜单结构"""
# 在帮助菜单后添加自定义菜单
help_menu_index = self.find_menu_index("Help")
# 创建插件主菜单
plugin_menu = self.menu_bar.InsertMenu(
help_menu_index + 1,
f"&{self.plugin_name}"
)
# 添加菜单项
self.add_menu_item(plugin_menu, 0, "网络扫描", self.on_network_scan)
self.add_menu_item(plugin_menu, 1, "会话管理", self.on_session_manager)
self.add_menu_item(plugin_menu, 2, "-") # 分隔符
self.add_menu_item(plugin_menu, 3, "端口监控", self.on_port_monitor)
self.add_menu_item(plugin_menu, 4, "流量分析", self.on_traffic_analysis)
self.add_menu_item(plugin_menu, 5, "-")
self.add_menu_item(plugin_menu, 6, "插件设置", self.on_plugin_settings)
def find_menu_index(self, menu_name: str) -> int:
"""查找菜单索引"""
for i in range(self.menu_bar.Count):
try:
if self.menu_bar.Item(i+1).Text == menu_name:
return i
except:
pass
return -1
def add_menu_item(self, parent_menu, index: int,
text: str, callback: Callable):
"""添加菜单项"""
try:
menu_item = parent_menu.InsertMenuItem(index, text)
# 存储回调函数
self.menu_items[text] = callback
return menu_item
except Exception as e:
print(f"添加菜单项失败: {e}")
return None
# 菜单事件处理函数
def on_network_scan(self):
"""网络扫描菜单处理"""
from src.network.scanner import NetworkScanner
scanner = NetworkScanner()
scanner.scan_local_network()
def on_session_manager(self):
"""会话管理菜单处理"""
from src.tools.session_manager import SessionManager
manager = SessionManager()
manager.show()
def on_port_monitor(self):
"""端口监控菜单处理"""
from src.network.port_monitor import PortMonitor
monitor = PortMonitor()
monitor.start()
def on_traffic_analysis(self):
"""流量分析菜单处理"""
from src.network.traffic_analyzer import TrafficAnalyzer
analyzer = TrafficAnalyzer()
analyzer.analyze()
def on_plugin_settings(self):
"""插件设置菜单处理"""
from src.ui.settings_dialog import SettingsDialog
dialog = SettingsDialog()
dialog.exec_()
3.2 工具栏插件实现
python
# src/core/toolbar_plugin.py
import win32com.client
from typing import List, Dict, Any
class XShellToolbarPlugin:
"""Xshell工具栏插件"""
def __init__(self):
self.app = None
self.toolbar = None
self.buttons = []
def create_toolbar(self, name: str):
"""创建工具栏"""
try:
self.app = win32com.client.Dispatch("Xshell.Application")
# 创建工具栏
toolbars = self.app.ToolBars
self.toolbar = toolbars.Add(name)
# 设置工具栏属性
self.toolbar.Visible = True
self.toolbar.Dock = 1 # 停靠位置
return True
except Exception as e:
print(f"创建工具栏失败: {e}")
return False
def add_button(self, caption: str,
tooltip: str,
icon_path: str,
callback) -> bool:
"""添加工具栏按钮"""
try:
# 创建按钮
button = self.toolbar.Buttons.Add()
button.Caption = caption
button.ToolTipText = tooltip
# 设置图标
if icon_path:
button.SetPicture(icon_path)
# 存储回调
self.buttons.append({
'button': button,
'callback': callback
})
return True
except Exception as e:
print(f"添加工具栏按钮失败: {e}")
return False
def add_separator(self):
"""添加分隔符"""
try:
self.toolbar.Buttons.AddSeparator()
return True
except:
return False
def create_network_toolbar(self):
"""创建网络管理工具栏"""
if not self.create_toolbar("网络管理"):
return False
# 添加常用网络工具按钮
self.add_button(
"扫描网络",
"扫描局域网设备",
"resources/icons/scan.ico",
self.on_scan_network
)
self.add_button(
"端口检测",
"检测开放端口",
"resources/icons/port.ico",
self.on_check_ports
)
self.add_separator()
self.add_button(
"带宽测试",
"测试网络带宽",
"resources/icons/speed.ico",
self.on_bandwidth_test
)
self.add_button(
"路由跟踪",
"跟踪网络路由",
"resources/icons/trace.ico",
self.on_trace_route
)
return True
# 按钮回调函数
def on_scan_network(self):
"""网络扫描回调"""
pass
def on_check_ports(self):
"""端口检测回调"""
pass
def on_bandwidth_test(self):
"""带宽测试回调"""
pass
def on_trace_route(self):
"""路由跟踪回调"""
pass
3.3 事件监听插件
python
# src/core/event_plugin.py
import win32com.client
import pythoncom
from dataclasses import dataclass
from typing import Dict, List, Callable
import threading
import queue
@dataclass
class XShellEvent:
"""Xshell事件数据类"""
event_type: str
session_name: str
host: str
timestamp: float
data: Dict
class XShellEventListener:
"""Xshell事件监听器"""
def __init__(self):
self.app = None
self.event_handlers = {}
self.event_queue = queue.Queue()
self.listening = False
self.thread = None
def start_listening(self):
"""开始监听事件"""
if self.listening:
return
self.listening = True
self.thread = threading.Thread(target=self._event_loop)
self.thread.daemon = True
self.thread.start()
def stop_listening(self):
"""停止监听事件"""
self.listening = False
if self.thread:
self.thread.join(timeout=2)
def _event_loop(self):
"""事件循环"""
pythoncom.CoInitialize()
try:
self.app = win32com.client.DispatchWithEvents(
"Xshell.Application",
XShellEvents
)
# 存储事件处理器引用
self.app._event_listener = self
# 消息循环
while self.listening:
pythoncom.PumpWaitingMessages()
pythoncom.PumpMessages()
except Exception as e:
print(f"事件监听错误: {e}")
finally:
pythoncom.CoUninitialize()
def register_handler(self, event_type: str, handler: Callable):
"""注册事件处理器"""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler)
def handle_event(self, event: XShellEvent):
"""处理事件"""
# 放入事件队列
self.event_queue.put(event)
# 调用事件处理器
handlers = self.event_handlers.get(event.event_type, [])
for handler in handlers:
try:
handler(event)
except Exception as e:
print(f"事件处理器错误: {e}")
class XShellEvents:
"""Xshell事件处理类"""
def OnSessionCreated(self, session):
"""会话创建事件"""
event = XShellEvent(
event_type="session_created",
session_name=session.Name,
host=session.Host,
timestamp=time.time(),
data={"session": session}
)
self._event_listener.handle_event(event)
def OnSessionClosed(self, session):
"""会话关闭事件"""
event = XShellEvent(
event_type="session_closed",
session_name=session.Name,
host=session.Host,
timestamp=time.time(),
data={"session": session}
)
self._event_listener.handle_event(event)
def OnDataReceived(self, session, data):
"""数据接收事件"""
event = XShellEvent(
event_type="data_received",
session_name=session.Name,
host=session.Host,
timestamp=time.time(),
data={"session": session, "data": data}
)
self._event_listener.handle_event(event)
第四部分:网络管理工具开发实战
4.1 网络扫描模块
python
# src/network/scanner.py
import ipaddress
import socket
import threading
import queue
import time
from typing import List, Dict, Tuple
import psutil
import netifaces
class NetworkScanner:
"""网络扫描器"""
def __init__(self):
self.scan_results = []
self.scan_threads = []
self.scan_queue = queue.Queue()
self.is_scanning = False
self.timeout = 2
self.max_threads = 50
def get_local_networks(self) -> List[str]:
"""获取本地网络接口信息"""
networks = []
for interface, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family == socket.AF_INET and not addr.address.startswith('127.'):
try:
# 计算网络地址
ip = ipaddress.ip_address(addr.address)
netmask = ipaddress.ip_address(addr.netmask)
network = ipaddress.ip_network(f"{addr.address}/{netmask}", strict=False)
networks.append(str(network))
except:
pass
return list(set(networks))
def scan_local_network(self, network: str = None) -> List[Dict]:
"""扫描本地网络"""
if not network:
networks = self.get_local_networks()
if not networks:
return []
network = networks[0]
self.is_scanning = True
self.scan_results = []
# 解析网络范围
net = ipaddress.ip_network(network, strict=False)
# 创建扫描队列
for ip in net.hosts():
self.scan_queue.put(str(ip))
# 启动扫描线程
for _ in range(min(self.max_threads, self.scan_queue.qsize())):
thread = threading.Thread(target=self._scan_worker)
thread.daemon = True
thread.start()
self.scan_threads.append(thread)
# 等待扫描完成
self.scan_queue.join()
# 等待线程结束
for thread in self.scan_threads:
thread.join(timeout=1)
self.is_scanning = False
return self.scan_results
def _scan_worker(self):
"""扫描工作线程"""
while self.is_scanning and not self.scan_queue.empty():
try:
ip = self.scan_queue.get_nowait()
result = self._scan_ip(ip)
if result:
self.scan_results.append(result)
self.scan_queue.task_done()
except queue.Empty:
break
except Exception as e:
print(f"扫描错误: {e}")
self.scan_queue.task_done()
def _scan_ip(self, ip: str) -> Dict:
"""扫描单个IP地址"""
result = {
'ip': ip,
'hostname': None,
'ports': [],
'os_info': None,
'mac_address': None,
'response_time': None
}
# 尝试Ping
ping_result = self._ping_host(ip)
if not ping_result['alive']:
return None
result['response_time'] = ping_result['time']
# 尝试获取主机名
try:
hostname = socket.gethostbyaddr(ip)[0]
result['hostname'] = hostname
except:
pass
# 扫描常见端口
common_ports = [21, 22, 23, 25, 53, 80, 110, 135, 139, 143, 443, 445, 3389]
open_ports = self._scan_ports(ip, common_ports)
result['ports'] = open_ports
# 获取MAC地址(局域网内)
result['mac_address'] = self._get_mac_address(ip)
return result
def _ping_host(self, ip: str) -> Dict:
"""Ping主机"""
import subprocess
import platform
param = '-n' if platform.system().lower() == 'windows' else '-c'
command = ['ping', param, '1', '-w', str(self.timeout * 1000), ip]
try:
start_time = time.time()
output = subprocess.run(command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=self.timeout + 1)
end_time = time.time()
if output.returncode == 0:
# 解析响应时间
time_str = None
for line in output.stdout.decode().split('\n'):
if 'time=' in line or 'time<' in line:
time_str = line.split('time=')[-1].split(' ')[0]
break
response_time = float(time_str.replace('ms', '')) if time_str else end_time - start_time
return {'alive': True, 'time': response_time}
except:
pass
return {'alive': False, 'time': None}
def _scan_ports(self, ip: str, ports: List[int]) -> List[Dict]:
"""扫描端口"""
open_ports = []
for port in ports:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((ip, port))
if result == 0:
# 尝试识别服务
service = self._identify_service(port)
open_ports.append({
'port': port,
'service': service,
'state': 'open'
})
sock.close()
except:
pass
return open_ports
def _identify_service(self, port: int) -> str:
"""识别端口服务"""
services = {
21: 'FTP',
22: 'SSH',
23: 'Telnet',
25: 'SMTP',
53: 'DNS',
80: 'HTTP',
110: 'POP3',
135: 'RPC',
139: 'NetBIOS',
143: 'IMAP',
443: 'HTTPS',
445: 'SMB',
3389: 'RDP'
}
return services.get(port, 'Unknown')
def _get_mac_address(self, ip: str) -> str:
"""获取MAC地址"""
import subprocess
try:
if platform.system().lower() == 'windows':
cmd = f"arp -a {ip}"
output = subprocess.check_output(cmd, shell=True).decode()
for line in output.split('\n'):
if ip in line:
parts = line.split()
if len(parts) >= 3:
return parts[1]
else:
# Linux/Mac实现
pass
except:
pass
return None
4.2 会话管理工具
python
# src/tools/session_manager.py
import json
import os
from datetime import datetime
from typing import List, Dict, Any
import xml.etree.ElementTree as ET
class SessionManager:
"""Xshell会话管理器"""
def __init__(self):
self.sessions_dir = self._get_sessions_dir()
self.backup_dir = os.path.join(os.path.expanduser('~'),
'.xshell_sessions_backup')
def _get_sessions_dir(self) -> str:
"""获取Xshell会话目录"""
possible_paths = [
os.path.join(os.environ['APPDATA'], 'NetSarang', 'Xshell', 'Sessions'),
os.path.join(os.path.expanduser('~'), 'Documents', 'NetSarang', 'Xshell', 'Sessions')
]
for path in possible_paths:
if os.path.exists(path):
return path
return None
def list_sessions(self) -> List[Dict]:
"""列出所有会话"""
sessions = []
if not self.sessions_dir or not os.path.exists(self.sessions_dir):
return sessions
for filename in os.listdir(self.sessions_dir):
if filename.endswith('.xsh'):
session_path = os.path.join(self.sessions_dir, filename)
session_info = self._parse_session_file(session_path)
if session_info:
sessions.append(session_info)
return sessions
def _parse_session_file(self, filepath: str) -> Dict:
"""解析会话文件"""
try:
tree = ET.parse(filepath)
root = tree.getroot()
session_info = {
'filename': os.path.basename(filepath),
'path': filepath,
'name': '',
'host': '',
'port': 22,
'username': '',
'protocol': 'SSH',
'last_modified': datetime.fromtimestamp(os.path.getmtime(filepath))
}
# 解析会话属性
for elem in root.iter():
if elem.tag == 'Session':
session_info['name'] = elem.get('Name', '')
elif elem.tag == 'Host':
session_info['host'] = elem.text or ''
elif elem.tag == 'Port':
session_info['port'] = int(elem.text) if elem.text else 22
elif elem.tag == 'UserName':
session_info['username'] = elem.text or ''
elif elem.tag == 'Protocol':
session_info['protocol'] = elem.text or 'SSH'
return session_info
except Exception as e:
print(f"解析会话文件失败: {e}")
return None
def backup_sessions(self, backup_name: str = None) -> bool:
"""备份所有会话"""
if not backup_name:
backup_name = datetime.now().strftime('backup_%Y%m%d_%H%M%S')
backup_path = os.path.join(self.backup_dir, backup_name)
os.makedirs(backup_path, exist_ok=True)
sessions = self.list_sessions()
backup_info = {
'backup_time': datetime.now().isoformat(),
'session_count': len(sessions),
'sessions': []
}
for session in sessions:
src = session['path']
dst = os.path.join(backup_path, session['filename'])
try:
import shutil
shutil.copy2(src, dst)
backup_info['sessions'].append(session['filename'])
except Exception as e:
print(f"备份会话失败 {session['filename']}: {e}")
# 保存备份信息
info_file = os.path.join(backup_path, 'backup_info.json')
with open(info_file, 'w', encoding='utf-8') as f:
json.dump(backup_info, f, ensure_ascii=False, indent=2)
return True
def export_sessions(self, export_path: str,
format: str = 'json') -> bool:
"""导出会话信息"""
sessions = self.list_sessions()
if format == 'json':
with open(export_path, 'w', encoding='utf-8') as f:
json.dump(sessions, f, ensure_ascii=False, indent=2,
default=str)
elif format == 'csv':
import csv
with open(export_path, 'w', newline='', encoding='utf-8') as f:
if sessions:
writer = csv.DictWriter(f, fieldnames=sessions[0].keys())
writer.writeheader()
writer.writerows(sessions)
elif format == 'html':
self._export_html(sessions, export_path)
return True
def _export_html(self, sessions: List[Dict], export_path: str):
"""导出为HTML格式"""
html = '''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Xshell Sessions Export</title>
<style>
table { border-collapse: collapse; width: 100%; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #4CAF50; color: white; }
tr:nth-child(even) { background-color: #f2f2f2; }
.protocol { font-weight: bold; }
.ssh { color: #2196F3; }
.telnet { color: #4CAF50; }
.serial { color: #FF9800; }
</style>
</head>
<body>
<h1>Xshell Sessions Export</h1>
<p>Generated: ''' + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + '''</p>
<table>
<tr>
<th>Name</th>
<th>Host</th>
<th>Port</th>
<th>Username</th>
<th>Protocol</th>
<th>Last Modified</th>
</tr>
'''
for session in sessions:
protocol_class = session['protocol'].lower()
html += f'''
<tr>
<td>{session['name']}</td>
<td>{session['host']}</td>
<td>{session['port']}</td>
<td>{session['username']}</td>
<td class="protocol {protocol_class}">{session['protocol']}</td>
<td>{session['last_modified']}</td>
</tr>
'''
html += '''
</table>
</body>
</html>
'''
with open(export_path, 'w', encoding='utf-8') as f:
f.write(html)
def find_duplicate_sessions(self) -> List[List[Dict]]:
"""查找重复会话"""
sessions = self.list_sessions()
duplicates = []
seen = {}
for session in sessions:
key = (session['host'], session['port'], session['username'])
if key in seen:
if seen[key] not in duplicates:
duplicates.append([seen[key]])
duplicates[-1].append(session)
else:
seen[key] = session
return [dup for dup in duplicates if len(dup) > 1]
def cleanup_sessions(self, older_than_days: int = 180) -> int:
"""清理旧会话"""
sessions = self.list_sessions()
cutoff_date = datetime.now().timestamp() - (older_than_days * 24 * 3600)
removed_count = 0
for session in sessions:
if session['last_modified'].timestamp() < cutoff_date:
try:
os.remove(session['path'])
removed_count += 1
except:
pass
return removed_count
4.3 批量命令执行器
python
# src/tools/batch_executor.py
import threading
import queue
import time
from typing import List, Dict, Callable
from concurrent.futures import ThreadPoolExecutor
class BatchCommandExecutor:
"""批量命令执行器"""
def __init__(self, max_workers: int = 10):
self.max_workers = max_workers
self.results = []
self.is_running = False
self.progress_callbacks = []
def execute_on_sessions(self, sessions: List[Dict],
commands: List[str],
timeout: int = 30) -> List[Dict]:
"""在多个会话上执行命令"""
self.is_running = True
self.results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for session in sessions:
future = executor.submit(
self._execute_on_session,
session,
commands,
timeout
)
futures.append(future)
# 收集结果
for future in futures:
try:
result = future.result(timeout=timeout + 5)
self.results.append(result)
except Exception as e:
self.results.append({
'session': session.get('name', 'Unknown'),
'success': False,
'error': str(e),
'outputs': []
})
self.is_running = False
return self.results
def _execute_on_session(self, session: Dict,
commands: List[str],
timeout: int) -> Dict:
"""在单个会话上执行命令"""
result = {
'session': session['name'],
'host': session['host'],
'success': False,
'error': None,
'outputs': [],
'execution_time': None
}
start_time = time.time()
try:
# 连接到会话并执行命令
# 这里需要根据实际情况实现
# 可以使用paramiko或通过Xshell COM接口
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接参数
connect_params = {
'hostname': session['host'],
'port': session.get('port', 22),
'username': session.get('username', ''),
'timeout': timeout
}
# 如果有密码或密钥,可以在这里添加
ssh.connect(**connect_params)
# 执行命令
for cmd in commands:
stdin, stdout, stderr = ssh.exec_command(cmd, timeout=timeout)
output = {
'command': cmd,
'stdout': stdout.read().decode('utf-8', errors='ignore'),
'stderr': stderr.read().decode('utf-8', errors='ignore'),
'exit_code': stdout.channel.recv_exit_status()
}
result['outputs'].append(output)
ssh.close()
result['success'] = True
except Exception as e:
result['error'] = str(e)
result['success'] = False
result['execution_time'] = time.time() - start_time
return result
def export_results(self, format: str = 'html',
output_file: str = None) -> str:
"""导出执行结果"""
if not output_file:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = f'batch_execution_results_{timestamp}.{format}'
if format == 'html':
return self._export_results_html(output_file)
elif format == 'json':
return self._export_results_json(output_file)
elif format == 'csv':
return self._export_results_csv(output_file)
return None
def _export_results_html(self, output_file: str) -> str:
"""导出结果为HTML格式"""
html = '''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>批量命令执行结果</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.session { margin-bottom: 30px; border: 1px solid #ddd; padding: 15px; }
.success { border-left: 5px solid #4CAF50; }
.failed { border-left: 5px solid #f44336; }
.command { background-color: #f9f9f9; padding: 10px; margin: 5px 0; }
.output { background-color: #fff; padding: 10px; margin: 5px 0;
border: 1px solid #ddd; font-family: monospace; }
.stats { background-color: #e3f2fd; padding: 10px; margin: 10px 0; }
</style>
</head>
<body>
<h1>批量命令执行结果</h1>
<p>生成时间: ''' + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + '''</p>
'''
# 统计信息
success_count = sum(1 for r in self.results if r['success'])
total_count = len(self.results)
html += f'''
<div class="stats">
<h3>执行统计</h3>
<p>总会话数: {total_count}</p>
<p>成功: {success_count}</p>
<p>失败: {total_count - success_count}</p>
<p>成功率: {success_count/total_count*100:.1f}%</p>
</div>
'''
# 详细结果
for result in self.results:
status_class = 'success' if result['success'] else 'failed'
status_text = '成功' if result['success'] else '失败'
html += f'''
<div class="session {status_class}">
<h3>{result['session']} - {result['host']} ({status_text})</h3>
<p>执行时间: {result['execution_time']:.2f}秒</p>
'''
if result['error']:
html += f'<p class="error">错误: {result["error"]}</p>'
for output in result['outputs']:
html += f'''
<div class="command">
<strong>命令:</strong> {output['command']}
</div>
'''
if output['stdout']:
html += f'''
<div class="output">
<strong>输出:</strong><br>
<pre>{output['stdout'][:1000]}{'...' if len(output['stdout']) > 1000 else ''}</pre>
</div>
'''
if output['stderr']:
html += f'''
<div class="output">
<strong>错误输出:</strong><br>
<pre>{output['stderr'][:1000]}{'...' if len(output['stderr']) > 1000 else ''}</pre>
</div>
'''
html += '</div>'
html += '</body></html>'
with open(output_file, 'w', encoding='utf-8') as f:
f.write(html)
return output_file
第五部分:高级功能与集成
5.1 网络监控仪表板
python
# src/ui/network_dashboard.py
from PyQt5.QtWidgets import (QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QLabel, QTableWidget,
QTableWidgetItem, QPushButton, QTabWidget,
QProgressBar, QGroupBox, QSplitter)
from PyQt5.QtCore import QTimer, Qt, pyqtSignal
from PyQt5.QtGui import QFont, QColor
import matplotlib.pyplot as plt
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg
import psutil
import threading
import time
class NetworkDashboard(QMainWindow):
"""网络监控仪表板"""
update_signal = pyqtSignal(dict)
def __init__(self):
super().__init__()
self.setWindowTitle("网络监控仪表板")
self.setGeometry(100, 100, 1200, 800)
self.monitoring = False
self.data_history = []
self.max_history = 100
self.init_ui()
self.setup_signals()
self.start_monitoring()
def init_ui(self):
"""初始化UI"""
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
# 控制面板
control_panel = self.create_control_panel()
main_layout.addWidget(control_panel)
# 标签页
self.tab_widget = QTabWidget()
# 网络状态标签页
network_tab = self.create_network_tab()
self.tab_widget.addTab(network_tab, "网络状态")
# 会话监控标签页
session_tab = self.create_session_tab()
self.tab_widget.addTab(session_tab, "会话监控")
# 流量分析标签页
traffic_tab = self.create_traffic_tab()
self.tab_widget.addTab(traffic_tab, "流量分析")
main_layout.addWidget(self.tab_widget)
# 状态栏
self.status_label = QLabel("就绪")
self.statusBar().addWidget(self.status_label)
def create_control_panel(self) -> QWidget:
"""创建控制面板"""
panel = QWidget()
layout = QHBoxLayout(panel)
# 开始/停止按钮
self.monitor_btn = QPushButton("开始监控")
self.monitor_btn.clicked.connect(self.toggle_monitoring)
layout.addWidget(self.monitor_btn)
# 刷新间隔
layout.addWidget(QLabel("刷新间隔:"))
self.interval_spin = QSpinBox()
self.interval_spin.setRange(1, 60)
self.interval_spin.setValue(5)
self.interval_spin.setSuffix(" 秒")
layout.addWidget(self.interval_spin)
# 导出按钮
export_btn = QPushButton("导出数据")
export_btn.clicked.connect(self.export_data)
layout.addWidget(export_btn)
layout.addStretch()
return panel
def create_network_tab(self) -> QWidget:
"""创建网络状态标签页"""
tab = QWidget()
layout = QVBoxLayout(tab)
# 网络接口列表
self.interface_table = QTableWidget()
self.interface_table.setColumnCount(6)
self.interface_table.setHorizontalHeaderLabels([
"接口", "IP地址", "子网掩码", "发送速度", "接收速度", "状态"
])
layout.addWidget(self.interface_table)
# 图表区域
splitter = QSplitter(Qt.Vertical)
# 流量图表
traffic_figure = plt.figure()
self.traffic_canvas = FigureCanvasQTAgg(traffic_figure)
self.traffic_ax = traffic_figure.add_subplot(111)
splitter.addWidget(self.traffic_canvas)
# CPU/内存图表
sys_figure = plt.figure()
self.sys_canvas = FigureCanvasQTAgg(sys_figure)
self.sys_ax = sys_figure.add_subplot(111)
splitter.addWidget(self.sys_canvas)
layout.addWidget(splitter)
return tab
def create_session_tab(self) -> QWidget:
"""创建会话监控标签页"""
tab = QWidget()
layout = QVBoxLayout(tab)
# 会话表格
self.session_table = QTableWidget()
self.session_table.setColumnCount(8)
self.session_table.setHorizontalHeaderLabels([
"会话名", "主机", "用户", "协议", "状态",
"持续时间", "输入字节", "输出字节"
])
layout.addWidget(self.session_table)
# 会话统计
stats_group = QGroupBox("会话统计")
stats_layout = QHBoxLayout()
self.total_sessions_label = QLabel("总会话: 0")
self.active_sessions_label = QLabel("活动会话: 0")
self.total_traffic_label = QLabel("总流量: 0 MB")
stats_layout.addWidget(self.total_sessions_label)
stats_layout.addWidget(self.active_sessions_label)
stats_layout.addWidget(self.total_traffic_label)
stats_layout.addStretch()
stats_group.setLayout(stats_layout)
layout.addWidget(stats_group)
return tab
def create_traffic_tab(self) -> QWidget:
"""创建流量分析标签页"""
tab = QWidget()
layout = QVBoxLayout(tab)
# 流量过滤器
filter_group = QGroupBox("过滤设置")
filter_layout = QHBoxLayout()
filter_layout.addWidget(QLabel("时间范围:"))
self.time_range_combo = QComboBox()
self.time_range_combo.addItems(["1小时", "6小时", "24小时", "7天"])
filter_layout.addWidget(self.time_range_combo)
filter_layout.addWidget(QLabel("接口:"))
self.interface_combo = QComboBox()
filter_layout.addWidget(self.interface_combo)
filter_layout.addStretch()
filter_group.setLayout(filter_layout)
layout.addWidget(filter_group)
# 流量详情表格
self.traffic_table = QTableWidget()
self.traffic_table.setColumnCount(7)
self.traffic_table.setHorizontalHeaderLabels([
"时间", "接口", "源IP", "目标IP", "协议",
"发送字节", "接收字节"
])
layout.addWidget(self.traffic_table)
return tab
def setup_signals(self):
"""设置信号连接"""
self.update_signal.connect(self.update_display)
# 定时器
self.update_timer = QTimer()
self.update_timer.timeout.connect(self.collect_data)
# 刷新间隔变化
self.interval_spin.valueChanged.connect(self.update_interval)
def start_monitoring(self):
"""开始监控"""
self.monitoring = True
self.monitor_btn.setText("停止监控")
self.update_timer.start(self.interval_spin.value() * 1000)
self.status_label.setText("监控中...")
def stop_monitoring(self):
"""停止监控"""
self.monitoring = False
self.monitor_btn.setText("开始监控")
self.update_timer.stop()
self.status_label.setText("监控已停止")
def toggle_monitoring(self):
"""切换监控状态"""
if self.monitoring:
self.stop_monitoring()
else:
self.start_monitoring()
def update_interval(self):
"""更新刷新间隔"""
if self.monitoring:
self.update_timer.setInterval(self.interval_spin.value() * 1000)
def collect_data(self):
"""收集监控数据"""
# 在网络性能监控中,这里会收集各种网络数据
data = {
'timestamp': time.time(),
'interfaces': self.get_network_interfaces(),
'sessions': self.get_active_sessions(),
'system': self.get_system_stats()
}
self.data_history.append(data)
if len(self.data_history) > self.max_history:
self.data_history.pop(0)
# 发送更新信号
self.update_signal.emit(data)
def get_network_interfaces(self):
"""获取网络接口信息"""
interfaces = []
for name, stats in psutil.net_io_counters(pernic=True).items():
addrs = psutil.net_if_addrs().get(name, [])
ip_address = ''
netmask = ''
for addr in addrs:
if addr.family == socket.AF_INET:
ip_address = addr.address
netmask = addr.netmask
break
interfaces.append({
'name': name,
'ip_address': ip_address,
'netmask': netmask,
'bytes_sent': stats.bytes_sent,
'bytes_recv': stats.bytes_recv,
'packets_sent': stats.packets_sent,
'packets_recv': stats.packets_recv
})
return interfaces
def get_active_sessions(self):
"""获取活动会话"""
# 这里需要从Xshell获取活动会话信息
# 简化实现
return []
def get_system_stats(self):
"""获取系统统计信息"""
return {
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'disk_usage': psutil.disk_usage('/').percent
}
def update_display(self, data):
"""更新显示"""
# 更新网络接口表格
self.update_interface_table(data['interfaces'])
# 更新图表
self.update_charts(data)
# 更新会话表格
self.update_session_table(data['sessions'])
# 更新状态
self.status_label.setText(
f"最后更新: {time.strftime('%H:%M:%S')} | "
f"CPU: {data['system']['cpu_percent']:.1f}% | "
f"内存: {data['system']['memory_percent']:.1f}%"
)
def update_interface_table(self, interfaces):
"""更新网络接口表格"""
self.interface_table.setRowCount(len(interfaces))
for i, interface in enumerate(interfaces):
# 计算速度(需要历史数据)
speed_sent = "0 KB/s"
speed_recv = "0 KB/s"
if len(self.data_history) > 1:
prev_data = self.data_history[-2]
prev_interfaces = {iface['name']: iface
for iface in prev_data['interfaces']}
if interface['name'] in prev_interfaces:
time_diff = self.data_history[-1]['timestamp'] - prev_data['timestamp']
bytes_sent_diff = interface['bytes_sent'] - prev_interfaces[interface['name']]['bytes_sent']
bytes_recv_diff = interface['bytes_recv'] - prev_interfaces[interface['name']]['bytes_recv']
speed_sent = f"{bytes_sent_diff/time_diff/1024:.1f} KB/s"
speed_recv = f"{bytes_recv_diff/time_diff/1024:.1f} KB/s"
items = [
QTableWidgetItem(interface['name']),
QTableWidgetItem(interface['ip_address']),
QTableWidgetItem(interface['netmask']),
QTableWidgetItem(speed_sent),
QTableWidgetItem(speed_recv),
QTableWidgetItem("活动" if interface['ip_address'] else "未连接")
]
for j, item in enumerate(items):
self.interface_table.setItem(i, j, item)
def update_charts(self, data):
"""更新图表"""
# 更新流量图表
self.traffic_ax.clear()
if len(self.data_history) > 1:
timestamps = [d['timestamp'] for d in self.data_history]
bytes_sent = [d['interfaces'][0]['bytes_sent'] if d['interfaces'] else 0
for d in self.data_history]
bytes_recv = [d['interfaces'][0]['bytes_recv'] if d['interfaces'] else 0
for d in self.data_history]
# 转换为相对时间
rel_times = [(t - timestamps[0]) / 60 for t in timestamps] # 分钟
self.traffic_ax.plot(rel_times, bytes_sent, label='发送', color='blue')
self.traffic_ax.plot(rel_times, bytes_recv, label='接收', color='green')
self.traffic_ax.set_xlabel('时间 (分钟)')
self.traffic_ax.set_ylabel('字节数')
self.traffic_ax.legend()
self.traffic_ax.grid(True, alpha=0.3)
self.traffic_canvas.draw()
# 更新系统资源图表
self.sys_ax.clear()
if self.data_history:
cpus = [d['system']['cpu_percent'] for d in self.data_history]
memories = [d['system']['memory_percent'] for d in self.data_history]
self.sys_ax.plot(cpus, label='CPU使用率', color='red')
self.sys_ax.plot(memories, label='内存使用率', color='orange')
self.sys_ax.set_ylabel('使用率 (%)')
self.sys_ax.set_ylim(0, 100)
self.sys_ax.legend()
self.sys_ax.grid(True, alpha=0.3)
self.sys_canvas.draw()
def export_data(self):
"""导出数据"""
from datetime import datetime
import json
filename = f"network_monitor_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
export_data = {
'export_time': datetime.now().isoformat(),
'monitoring_duration': len(self.data_history) * self.interval_spin.value(),
'data_points': len(self.data_history),
'data': self.data_history
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(export_data, f, default=str, indent=2)
self.status_label.setText(f"数据已导出到: {filename}")
5.2 自动化脚本生成器
python
# src/tools/script_generator.py
import json
import os
from datetime import datetime
from typing import List, Dict, Any
class XShellScriptGenerator:
"""Xshell脚本生成器"""
def __init__(self):
self.templates = self.load_templates()
self.variables = {}
def load_templates(self) -> Dict:
"""加载脚本模板"""
templates = {
'batch_connection': '''
# Xshell脚本 - 批量连接
# 生成时间: {timestamp}
# 设置默认参数
set timeout {timeout}
set logfile "{logfile}"
# 定义服务器列表
{server_list}
# 批量连接函数
function connect_all {{
foreach server $servers {{
set server_info [split $server ":"]
set host [lindex $server_info 0]
set port [lindex $server_info 1]
set user [lindex $server_info 2]
log "正在连接到 $host:$port"
# 打开新标签页
newtab ssh://$user@$host:$port
# 等待连接
wait "login:"
send "{password}"
wait "$"
# 执行命令
{commands}
# 关闭标签页
close
}}
}}
# 执行批量连接
connect_all
''',
'backup_script': '''
# Xshell脚本 - 备份脚本
# 生成时间: {timestamp}
# 备份函数
function backup_server {{
set server $1
set backup_dir $2
log "开始备份服务器: $server"
# 连接到服务器
open ssh://root@$server
wait "$"
# 创建备份
send "mkdir -p /tmp/backup"
wait "$"
send "tar czf /tmp/backup/server_backup.tar.gz /etc /var/www 2>/dev/null"
wait "$"
# 下载备份
sz /tmp/backup/server_backup.tar.gz
# 清理
send "rm -rf /tmp/backup"
wait "$"
close
log "服务器 $server 备份完成"
}}
# 备份服务器列表
{server_list}
foreach server $servers {{
backup_server $server "{backup_dir}"
}}
''',
'monitoring_script': '''
# Xshell脚本 - 监控脚本
# 生成时间: {timestamp}
# 监控函数
function monitor_server {{
set server $1
set interval {interval}
log "开始监控服务器: $server"
open ssh://root@$server
while {{1}} {{
# 获取系统状态
wait "$"
send "date"
wait "$"
send "uptime"
wait "$"
send "free -m"
wait "$"
send "df -h"
wait "$"
# 等待指定间隔
sleep $interval
}}
}}
# 监控服务器列表
{server_list}
foreach server $servers {{
# 在新标签页中监控每个服务器
newtab
monitor_server $server
}}
'''
}
return templates
def generate_batch_script(self, servers: List[Dict],
commands: List[str],
output_file: str = None) -> str:
"""生成批量连接脚本"""
# 准备服务器列表
server_list = []
for server in servers:
server_str = f"{server['host']}:{server.get('port', 22)}:{server.get('username', 'root')}"
server_list.append(server_str)
server_list_code = 'set servers [list]\n'
for server in server_list:
server_list_code += f'lappend servers "{server}"\n'
# 准备命令
commands_code = ''
for cmd in commands:
commands_code += f'send "{cmd}"\nwait "$"\n'
# 填充模板
script = self.templates['batch_connection'].format(
timestamp=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
timeout=30,
logfile='batch_connection.log',
server_list=server_list_code,
password='[password]', # 实际使用时需要替换
commands=commands_code
)
# 保存文件
if not output_file:
output_file = f'batch_connection_{datetime.now().strftime("%Y%m%d_%H%M%S")}.xsh'
with open(output_file, 'w', encoding='utf-8') as f:
f.write(script)
return output_file
def generate_backup_script(self, servers: List[Dict],
backup_dir: str = None) -> str:
"""生成备份脚本"""
if not backup_dir:
backup_dir = os.path.join(os.path.expanduser('~'),
'xshell_backups',
datetime.now().strftime('%Y%m%d'))
# 准备服务器列表
server_list = []
for server in servers:
server_list.append(server['host'])
server_list_code = 'set servers [list]\n'
for server in server_list:
server_list_code += f'lappend servers "{server}"\n'
# 填充模板
script = self.templates['backup_script'].format(
timestamp=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
server_list=server_list_code,
backup_dir=backup_dir
)
# 保存文件
output_file = f'backup_script_{datetime.now().strftime("%Y%m%d_%H%M%S")}.xsh'
with open(output_file, 'w', encoding='utf-8') as f:
f.write(script)
return output_file
def generate_monitoring_dashboard(self, servers: List[Dict],
interval: int = 60) -> str:
"""生成监控仪表板脚本"""
# 准备服务器列表
server_list = []
for server in servers:
server_list.append(server['host'])
server_list_code = 'set servers [list]\n'
for server in server_list:
server_list_code += f'lappend servers "{server}"\n'
# 生成监控脚本
script = self.templates['monitoring_script'].format(
timestamp=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
interval=interval,
server_list=server_list_code
)
# 保存文件
output_file = f'monitoring_script_{datetime.now().strftime("%Y%m%d_%H%M%S")}.xsh'
with open(output_file, 'w', encoding='utf-8') as f:
f.write(script)
# 同时生成HTML报告模板
self.generate_html_dashboard(servers, output_file.replace('.xsh', '.html'))
return output_file
def generate_html_dashboard(self, servers: List[Dict],
output_file: str) -> str:
"""生成HTML监控仪表板"""
html = '''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>服务器监控仪表板</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.server { border: 1px solid #ddd; margin: 10px; padding: 15px; }
.status { display: inline-block; width: 15px; height: 15px;
border-radius: 50%; margin-right: 10px; }
.online { background-color: #4CAF50; }
.offline { background-color: #f44336; }
.metrics { display: flex; flex-wrap: wrap; }
.metric { flex: 1; min-width: 200px; margin: 10px;
padding: 10px; background-color: #f9f9f9; }
.chart { height: 200px; background-color: #f0f0f0;
margin: 10px 0; }
.auto-refresh { margin: 20px 0; }
</style>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
</head>
<body>
<h1>服务器监控仪表板</h1>
<p>生成时间: ''' + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + '''</p>
<div class="auto-refresh">
<label>
<input type="checkbox" id="autoRefresh" checked>
自动刷新 (每60秒)
</label>
<button οnclick="refreshData()">立即刷新</button>
</div>
<div class="metrics">
'''
for server in servers:
html += f'''
<div class="server">
<h3>
<span class="status online" id="status_{server['host']}"></span>
{server['host']} - {server.get('name', '')}
</h3>
<div class="metric">
<h4>CPU使用率</h4>
<canvas class="chart" id="cpu_{server['host']}"></canvas>
</div>
<div class="metric">
<h4>内存使用率</h4>
<canvas class="chart" id="memory_{server['host']}"></canvas>
</div>
<div class="metric">
<h4>磁盘使用率</h4>
<canvas class="chart" id="disk_{server['host']}"></canvas>
</div>
<div class="metric">
<h4>连接信息</h4>
<p>用户: {server.get('username', 'N/A')}</p>
<p>端口: {server.get('port', 22)}</p>
<p>最后检查: <span id="lastcheck_{server['host']}">-</span></p>
</div>
</div>
'''
html += '''
</div>
<script>
// 监控数据
const servers = ''' + json.dumps(servers) + ''';
// 图表实例
const charts = {};
// 初始化图表
function initCharts() {
servers.forEach(server => {
const host = server.host;
// CPU图表
charts[`cpu_${host}`] = new Chart(
document.getElementById(`cpu_${host}`), {
type: 'line',
data: {
labels: [],
datasets: [{
label: 'CPU使用率',
data: [],
borderColor: '#2196F3',
fill: false
}]
},
options: {
responsive: true,
scales: {
y: {
min: 0,
max: 100
}
}
}
}
);
// 内存图表
charts[`memory_${host}`] = new Chart(
document.getElementById(`memory_${host}`), {
type: 'line',
data: {
labels: [],
datasets: [{
label: '内存使用率',
data: [],
borderColor: '#4CAF50',
fill: false
}]
},
options: {
responsive: true,
scales: {
y: {
min: 0,
max: 100
}
}
}
}
);
// 磁盘图表
charts[`disk_${host}`] = new Chart(
document.getElementById(`disk_${host}`), {
type: 'doughnut',
data: {
labels: ['已使用', '可用'],
datasets: [{
data: [30, 70],
backgroundColor: ['#FF9800', '#E0E0E0']
}]
}
}
);
});
}
// 刷新数据
function refreshData() {
servers.forEach(async server => {
try {
// 这里应该通过API获取实际数据
// 现在使用模拟数据
const now = new Date();
const timeStr = now.toLocaleTimeString();
// 更新最后检查时间
document.getElementById(`lastcheck_${server.host}`).textContent = timeStr;
// 模拟数据
const cpu = Math.random() * 100;
const memory = 30 + Math.random() * 50;
// 更新图表
updateChart(`cpu_${server.host}`, timeStr, cpu);
updateChart(`memory_${server.host}`, timeStr, memory);
// 随机设置状态
const statusElem = document.getElementById(`status_${server.host}`);
if (Math.random() > 0.1) {
statusElem.className = 'status online';
} else {
statusElem.className = 'status offline';
}
} catch (error) {
console.error(`获取服务器 ${server.host} 数据失败:`, error);
}
});
}
// 更新图表
function updateChart(chartId, label, value) {
const chart = charts[chartId];
if (!chart) return;
chart.data.labels.push(label);
chart.data.datasets[0].data.push(value);
// 保持最后20个数据点
if (chart.data.labels.length > 20) {
chart.data.labels.shift();
chart.data.datasets[0].data.shift();
}
chart.update();
}
// 页面加载完成
document.addEventListener('DOMContentLoaded', () => {
initCharts();
refreshData();
// 设置自动刷新
setInterval(() => {
if (document.getElementById('autoRefresh').checked) {
refreshData();
}
}, 60000);
});
</script>
</body>
</html>
'''
with open(output_file, 'w', encoding='utf-8') as f:
f.write(html)
return output_file
第六部分:性能优化与部署
6.1 插件性能优化
python
# src/core/performance_optimizer.py
import time
import threading
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List, Any, Callable
import gc
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.metrics = {}
self.start_time = time.time()
self.sampling_interval = 5 # 秒
def start_monitoring(self):
"""开始性能监控"""
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def _monitor_loop(self):
"""监控循环"""
while True:
self.collect_metrics()
time.sleep(self.sampling_interval)
def collect_metrics(self):
"""收集性能指标"""
import psutil
import os
process = psutil.Process(os.getpid())
metrics = {
'timestamp': time.time(),
'cpu_percent': process.cpu_percent(),
'memory_mb': process.memory_info().rss / 1024 / 1024,
'memory_percent': process.memory_percent(),
'thread_count': process.num_threads(),
'open_files': len(process.open_files()),
'connections': len(process.connections()),
'gc_stats': gc.get_stats()
}
self.metrics[time.time()] = metrics
self.cleanup_old_metrics()
# 检查内存泄漏
if len(self.metrics) > 10:
self.check_memory_leak()
def cleanup_old_metrics(self, max_age: int = 3600):
"""清理旧的性能指标"""
cutoff = time.time() - max_age
keys_to_remove = [k for k in self.metrics.keys() if k < cutoff]
for k in keys_to_remove:
del self.metrics[k]
def check_memory_leak(self):
"""检查内存泄漏"""
timestamps = sorted(self.metrics.keys())
if len(timestamps) < 10:
return
memory_values = [self.metrics[t]['memory_mb'] for t in timestamps]
# 简单线性回归检查趋势
import numpy as np
x = np.array(range(len(memory_values)))
y = np.array(memory_values)
# 计算斜率
if len(x) > 1:
slope = (len(x) * np.sum(x*y) - np.sum(x) * np.sum(y)) / \
(len(x) * np.sum(x*x) - np.sum(x)**2)
if slope > 0.1: # 每秒增加0.1MB以上
print(f"警告:检测到可能的内存泄漏,斜率: {slope:.3f} MB/秒")
def get_performance_report(self) -> Dict:
"""获取性能报告"""
if not self.metrics:
return {}
timestamps = sorted(self.metrics.keys())
latest = self.metrics[timestamps[-1]]
report = {
'current': latest,
'summary': {
'uptime': time.time() - self.start_time,
'metric_count': len(self.metrics),
'avg_cpu': np.mean([self.metrics[t]['cpu_percent'] for t in timestamps]),
'avg_memory': np.mean([self.metrics[t]['memory_mb'] for t in timestamps]),
'max_memory': max([self.metrics[t]['memory_mb'] for t in timestamps])
}
}
return report
class AsyncTaskManager:
"""异步任务管理器"""
def __init__(self, max_workers: int = 10):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.futures = {}
self.task_counter = 0
self.callbacks = {}
def submit_task(self, task_func: Callable,
task_id: str = None,
callback: Callable = None,
**kwargs):
"""提交异步任务"""
if not task_id:
self.task_counter += 1
task_id = f"task_{self.task_counter}"
future = self.executor.submit(task_func, **kwargs)
self.futures[task_id] = future
if callback:
self.callbacks[task_id] = callback
# 设置完成回调
future.add_done_callback(lambda f: self._task_done(task_id, f))
return task_id
def _task_done(self, task_id: str, future):
"""任务完成回调"""
try:
result = future.result()
# 执行回调
if task_id in self.callbacks:
try:
self.callbacks[task_id](result)
except Exception as e:
print(f"任务回调执行失败 {task_id}: {e}")
# 清理
del self.futures[task_id]
if task_id in self.callbacks:
del self.callbacks[task_id]
except Exception as e:
print(f"任务执行失败 {task_id}: {e}")
def wait_for_tasks(self, timeout: float = None):
"""等待所有任务完成"""
import concurrent.futures
try:
concurrent.futures.wait(
list(self.futures.values()),
timeout=timeout,
return_when=concurrent.futures.ALL_COMPLETED
)
except:
pass
def get_task_status(self) -> Dict:
"""获取任务状态"""
status = {
'total_tasks': len(self.futures),
'running_tasks': 0,
'completed_tasks': self.task_counter - len(self.futures),
'tasks': []
}
for task_id, future in self.futures.items():
task_status = {
'id': task_id,
'done': future.done(),
'cancelled': future.cancelled(),
'running': future.running()
}
if future.done():
try:
task_status['result'] = future.result()
except Exception as e:
task_status['error'] = str(e)
status['tasks'].append(task_status)
if future.running():
status['running_tasks'] += 1
return status
@lru_cache(maxsize=128)
def cached_network_scan(network: str) -> List[Dict]:
"""缓存网络扫描结果"""
from src.network.scanner import NetworkScanner
scanner = NetworkScanner()
return scanner.scan_local_network(network)
class ConnectionPool:
"""连接池"""
def __init__(self, max_size: int = 10):
self.max_size = max_size
self.pool = {}
self.lock = threading.Lock()
self.cleanup_interval = 300 # 5分钟
def get_connection(self, host: str, port: int,
username: str, password: str):
"""获取连接"""
key = f"{host}:{port}:{username}"
with self.lock:
if key in self.pool:
conn = self.pool[key]
if self._is_connection_alive(conn):
return conn
else:
del self.pool[key]
# 创建新连接
if len(self.pool) >= self.max_size:
self._cleanup_oldest()
conn = self._create_connection(host, port, username, password)
self.pool[key] = conn
return conn
def _create_connection(self, host: str, port: int,
username: str, password: str):
"""创建新连接"""
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(host, port=port,
username=username,
password=password,
timeout=10)
return ssh
except Exception as e:
print(f"创建连接失败 {host}:{port}: {e}")
return None
def _is_connection_alive(self, conn) -> bool:
"""检查连接是否存活"""
try:
if conn and conn.get_transport() and conn.get_transport().is_active():
# 发送空包测试连接
conn.exec_command('echo test', timeout=1)
return True
except:
pass
return False
def _cleanup_oldest(self):
"""清理最老的连接"""
if not self.pool:
return
# 这里应该实现更复杂的清理策略
# 简化的实现:随机清理一个
import random
if self.pool:
key_to_remove = random.choice(list(self.pool.keys()))
try:
self.pool[key_to_remove].close()
except:
pass
del self.pool[key_to_remove]
def cleanup_all(self):
"""清理所有连接"""
with self.lock:
for conn in self.pool.values():
try:
conn.close()
except:
pass
self.pool.clear()
6.2 插件打包与部署
python
# setup.py
from setuptools import setup, find_packages
import sys
import os
# 插件信息
PLUGIN_NAME = "XshellNetworkManager"
PLUGIN_VERSION = "1.0.0"
PLUGIN_AUTHOR = "Your Name"
PLUGIN_DESCRIPTION = "Xshell网络管理插件"
PLUGIN_LICENSE = "MIT"
# 依赖包
INSTALL_REQUIRES = [
'pywin32>=300',
'psutil>=5.8.0',
'netifaces>=0.11.0',
'paramiko>=2.8.0',
'PyQt5>=5.15.0',
'matplotlib>=3.4.0',
]
# 创建安装脚本
setup(
name=PLUGIN_NAME,
version=PLUGIN_VERSION,
author=PLUGIN_AUTHOR,
description=PLUGIN_DESCRIPTION,
license=PLUGIN_LICENSE,
packages=find_packages(where='src'),
package_dir={'': 'src'},
install_requires=INSTALL_REQUIRES,
entry_points={
'console_scripts': [
'xshell-network-manager=main:main',
],
},
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: MIT License',
'Operating System :: Microsoft :: Windows',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
],
)
# 创建安装脚本
# install.bat
'''
@echo off
echo 正在安装Xshell网络管理插件...
REM 检查Python
python --version >nul 2>&1
if errorlevel 1 (
echo 错误:未找到Python,请先安装Python 3.8+
pause
exit /b 1
)
REM 安装依赖
echo 安装Python依赖...
pip install -r requirements.txt
REM 注册插件
echo 注册Xshell插件...
python -c "from src.core.plugin_registry import PluginRegistry; PluginRegistry.register_plugin('NetworkManager', '%CD%\\dist\\xshell_network_manager.exe', 'Xshell网络管理插件', '1.0.0', 'Your Name')"
echo 安装完成!
echo 请重启Xshell使插件生效。
pause
'''
# 创建卸载脚本
# uninstall.bat
'''
@echo off
echo 正在卸载Xshell网络管理插件...
REM 取消注册插件
echo 取消注册插件...
python -c "from src.core.plugin_registry import PluginRegistry; PluginRegistry.unregister_plugin('NetworkManager')"
REM 删除文件
echo 删除文件...
if exist "dist" rmdir /s /q "dist"
if exist "build" rmdir /s /q "build"
if exist "__pycache__" rmdir /s /q "__pycache__"
echo 卸载完成!
pause
'''
# 使用PyInstaller打包
# build.py
import PyInstaller.__main__
import os
import shutil
def build_plugin():
"""构建插件"""
# 清理之前的构建
if os.path.exists('dist'):
shutil.rmtree('dist')
if os.path.exists('build'):
shutil.rmtree('build')
# PyInstaller配置
pyinstaller_args = [
'src/main.py',
'--name=xshell_network_manager',
'--onefile',
'--windowed',
'--icon=resources/icons/plugin.ico',
'--add-data=resources;resources',
'--hidden-import=win32timezone',
'--hidden-import=pywintypes',
'--hidden-import=win32api',
'--hidden-import=win32com',
'--hidden-import=win32con',
'--hidden-import=win32gui',
'--hidden-import=matplotlib.backends.backend_qt5agg',
]
# 执行构建
PyInstaller.__main__.run(pyinstaller_args)
# 复制配置文件
if os.path.exists('config'):
shutil.copytree('config', 'dist/config', dirs_exist_ok=True)
# 复制文档
if os.path.exists('docs'):
shutil.copytree('docs', 'dist/docs', dirs_exist_ok=True)
print("构建完成!插件位于 dist/xshell_network_manager.exe")
# 创建配置文件
# config/settings.json
{
"plugin": {
"name": "NetworkManager",
"version": "1.0.0",
"auto_start": true,
"check_updates": true
},
"network": {
"scan_timeout": 2,
"max_scan_threads": 50,
"default_ports": [21, 22, 23, 25, 53, 80, 110, 135, 139, 143, 443, 445, 3389]
},
"monitoring": {
"update_interval": 5,
"max_history": 100,
"alert_thresholds": {
"cpu": 90,
"memory": 85,
"disk": 90
}
},
"ui": {
"theme": "dark",
"font_size": 10,
"show_toolbar": true,
"show_statusbar": true
}
}
# 创建插件主入口
# src/main.py
import sys
import os
import json
from pathlib import Path
def main():
"""插件主函数"""
# 添加插件目录到Python路径
plugin_dir = Path(__file__).parent
if str(plugin_dir) not in sys.path:
sys.path.insert(0, str(plugin_dir))
# 加载配置
config_path = plugin_dir.parent / 'config' / 'settings.json'
if config_path.exists():
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
else:
config = {}
# 初始化插件
from src.core.menu_plugin import XShellMenuPlugin
from src.core.toolbar_plugin import XShellToolbarPlugin
from src.core.event_plugin import XShellEventListener
print("Xshell网络管理插件启动...")
# 创建菜单插件
menu_plugin = XShellMenuPlugin("网络管理")
if menu_plugin.initialize():
print("菜单插件初始化成功")
else:
print("菜单插件初始化失败")
# 创建工具栏插件
toolbar_plugin = XShellToolbarPlugin()
if toolbar_plugin.create_network_toolbar():
print("工具栏插件初始化成功")
else:
print("工具栏插件初始化失败")
# 启动事件监听
event_listener = XShellEventListener()
event_listener.start_listening()
print("事件监听器已启动")
# 保持运行
try:
import time
while True:
time.sleep(1)
except KeyboardInterrupt:
print("插件正在关闭...")
event_listener.stop_listening()
print("插件已关闭")
if __name__ == "__main__":
main()
6.3 插件测试与调试
python
# tests/test_plugin.py
import unittest
import sys
import os
from unittest.mock import Mock, patch, MagicMock
# 添加插件目录到路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
from core.xshell_com import XShellCOM
from network.scanner import NetworkScanner
from tools.session_manager import SessionManager
class TestXShellCOM(unittest.TestCase):
"""测试Xshell COM接口"""
def setUp(self):
self.xshell = XShellCOM()
@patch('win32com.client.Dispatch')
def test_connect_success(self, mock_dispatch):
"""测试成功连接"""
mock_app = Mock()
mock_dispatch.return_value = mock_app
result = self.xshell.connect()
self.assertTrue(result)
self.assertTrue(self.xshell._connected)
mock_dispatch.assert_called_once_with("Xshell.Application")
@patch('win32com.client.Dispatch')
def test_connect_failure(self, mock_dispatch):
"""测试连接失败"""
mock_dispatch.side_effect = Exception("COM错误")
result = self.xshell.connect()
self.assertFalse(result)
self.assertFalse(self.xshell._connected)
class TestNetworkScanner(unittest.TestCase):
"""测试网络扫描器"""
def setUp(self):
self.scanner = NetworkScanner()
@patch('socket.gethostbyaddr')
@patch('subprocess.run')
def test_scan_ip(self, mock_subprocess, mock_gethostbyaddr):
"""测试IP扫描"""
# 模拟Ping成功
mock_process = Mock()
mock_process.returncode = 0
mock_process.stdout = b'Reply from 192.168.1.1: bytes=32 time=1ms TTL=64'
mock_subprocess.return_value = mock_process
# 模拟主机名解析
mock_gethostbyaddr.return_value = ('testhost', [], ['192.168.1.1'])
result = self.scanner._scan_ip('192.168.1.1')
self.assertIsNotNone(result)
self.assertEqual(result['ip'], '192.168.1.1')
self.assertEqual(result['hostname'], 'testhost')
class TestSessionManager(unittest.TestCase):
"""测试会话管理器"""
def setUp(self):
self.manager = SessionManager()
@patch('os.path.exists')
@patch('os.listdir')
def test_list_sessions(self, mock_listdir, mock_exists):
"""测试列出会话"""
# 模拟会话目录存在
mock_exists.return_value = True
mock_listdir.return_value = ['test_session.xsh']
# 模拟会话文件内容
with patch('builtins.open', unittest.mock.mock_open(
read_data='<Session Name="Test"><Host>192.168.1.1</Host></Session>')):
sessions = self.manager.list_sessions()
self.assertEqual(len(sessions), 1)
self.assertEqual(sessions[0]['name'], 'Test')
self.assertEqual(sessions[0]['host'], '192.168.1.1')
class PluginIntegrationTest(unittest.TestCase):
"""插件集成测试"""
def test_plugin_initialization(self):
"""测试插件初始化"""
# 测试插件各个模块能否正常初始化
modules_to_test = [
'core.menu_plugin',
'core.toolbar_plugin',
'core.event_plugin',
'network.scanner',
'tools.session_manager',
'tools.batch_executor'
]
for module_name in modules_to_test:
try:
__import__(module_name)
print(f"{module_name} 导入成功")
except ImportError as e:
self.fail(f"无法导入模块 {module_name}: {e}")
def test_config_loading(self):
"""测试配置加载"""
import json
# 创建测试配置
test_config = {
"plugin": {
"name": "TestPlugin",
"version": "1.0.0"
}
}
# 测试JSON序列化/反序列化
config_str = json.dumps(test_config)
loaded_config = json.loads(config_str)
self.assertEqual(loaded_config['plugin']['name'], 'TestPlugin')
self.assertEqual(loaded_config['plugin']['version'], '1.0.0')
def run_all_tests():
"""运行所有测试"""
# 创建测试套件
suite = unittest.TestSuite()
# 添加测试类
suite.addTest(unittest.makeSuite(TestXShellCOM))
suite.addTest(unittest.makeSuite(TestNetworkScanner))
suite.addTest(unittest.makeSuite(TestSessionManager))
suite.addTest(unittest.makeSuite(PluginIntegrationTest))
# 运行测试
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(suite)
return result.wasSuccessful()
if __name__ == '__main__':
success = run_all_tests()
sys.exit(0 if success else 1)
# 创建性能测试
# tests/performance_test.py
import time
import psutil
import threading
from concurrent.futures import ThreadPoolExecutor
def test_network_scanner_performance():
"""测试网络扫描器性能"""
from network.scanner import NetworkScanner
scanner = NetworkScanner()
# 测试小范围扫描
start_time = time.time()
results = scanner.scan_local_network('192.168.1.0/28') # 16个IP
scan_time = time.time() - start_time
print(f"扫描16个IP耗时: {scan_time:.2f}秒")
print(f"发现主机数: {len(results)}")
# 检查内存使用
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
print(f"内存使用: {memory_mb:.1f} MB")
return scan_time < 5.0 # 应该在5秒内完成
def test_concurrent_operations():
"""测试并发操作"""
from tools.batch_executor import BatchCommandExecutor
executor = BatchCommandExecutor(max_workers=5)
# 模拟会话
test_sessions = [
{'name': f'TestSession{i}', 'host': '192.168.1.1',
'port': 22, 'username': 'test'}
for i in range(10)
]
# 测试命令
test_commands = ['echo test', 'whoami', 'date']
# 执行批量命令
start_time = time.time()
results = executor.execute_on_sessions(
test_sessions, test_commands, timeout=10
)
total_time = time.time() - start_time
print(f"10个会话并发执行耗时: {total_time:.2f}秒")
# 统计结果
success_count = sum(1 for r in results if r['success'])
print(f"成功数: {success_count}/{len(results)}")
return total_time < 15.0 # 应该在15秒内完成
def stress_test():
"""压力测试"""
print("开始压力测试...")
# 创建多个线程同时操作
def worker(worker_id):
from network.scanner import NetworkScanner
scanner = NetworkScanner()
for i in range(3):
scanner.scan_local_network('192.168.1.0/24')
time.sleep(0.1)
return f"Worker {worker_id} completed"
# 启动多个工作线程
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(worker, i) for i in range(10)]
# 等待所有线程完成
results = []
for future in futures:
results.append(future.result())
print(f"压力测试完成,共执行 {len(results)} 个任务")
# 检查资源泄漏
gc.collect()
import objgraph
objgraph.show_growth()
return True
if __name__ == '__main__':
print("运行性能测试...")
# 运行各个性能测试
tests = [
("网络扫描器性能", test_network_scanner_performance),
("并发操作测试", test_concurrent_operations),
("压力测试", stress_test)
]
results = []
for test_name, test_func in tests:
print(f"\n=== {test_name} ===")
try:
success = test_func()
results.append((test_name, success))
print(f"结果: {'通过' if success else '失败'}")
except Exception as e:
print(f"测试失败: {e}")
results.append((test_name, False))
# 汇总结果
print("\n=== 测试汇总 ===")
passed = sum(1 for _, success in results if success)
total = len(results)
print(f"通过: {passed}/{total}")
if passed == total:
print("所有性能测试通过!")
else:
print("部分性能测试失败!")
总结
本文详细介绍了如何使用Python扩展Xshell功能,开发个性化网络管理工具。我们从Xshell插件架构解析开始,逐步深入开发环境配置、核心功能实现、网络管理工具开发、高级功能集成,最后到性能优化与部署。
关键技术点:
-
COM接口集成:通过
win32com库与Xshell COM接口交互 -
插件架构:菜单插件、工具栏插件、事件监听插件
-
网络管理功能:网络扫描、会话管理、批量操作、实时监控
-
用户界面:使用PyQt5创建现代化GUI界面
-
性能优化:异步操作、连接池、缓存机制、性能监控
-
部署打包:使用PyInstaller打包,提供安装卸载脚本
插件特色功能:
-
智能网络扫描:自动发现局域网设备和服务
-
会话批量管理:批量连接、命令执行、配置导出
-
实时监控仪表板:可视化监控网络状态和系统资源
-
自动化脚本生成:自动生成Xshell脚本,简化重复操作
-
性能优化:支持高并发操作,资源占用低
开发建议:
-
版本兼容性:针对不同Xshell版本测试COM接口兼容性
-
错误处理:完善的异常处理和日志记录
-
用户配置:提供灵活的配置文件,支持自定义设置
-
安全考虑:注意敏感信息(如密码)的安全存储
-
性能监控:内置性能监控,及时发现和解决性能问题
这个插件框架为网络管理员提供了强大的工具集,可以显著提高日常工作的效率。开发者可以根据实际需求,继续扩展和定制功能模块。
更多推荐
所有评论(0)