aistudy/app.py

508 lines
16 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
import os
import re
import queue
import threading
import tkinter as tk
import tempfile
import subprocess
from tkinter import ttk, messagebox, scrolledtext
from graphviz import Source
from openai import OpenAI
from dotenv import load_dotenv
from PIL import Image, ImageTk
# 兼容性处理
try:
resample_mode = Image.Resampling.LANCZOS
except AttributeError:
resample_mode = Image.ANTIALIAS
load_dotenv()
# 配置常量
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
PLANTUML_JAR = os.path.join(PROJECT_ROOT, "plantuml-1.2025.2.jar")
FONT_NAME = "Microsoft YaHei"
# 线程安全队列
analysis_queue = queue.Queue()
diagram_queue = queue.Queue()
# 临时文件目录
TEMP_DIR = tempfile.mkdtemp()
# 界面样式配置
STYLE_CONFIG = {
"font_family": FONT_NAME,
"font_size": 11,
"dark_bg": "#2d2d2d",
"light_bg": "#f0f0f0",
"primary": "#007acc",
"success": "#4CAF50",
"danger": "#f44336",
"text_primary": "#ffffff",
"text_secondary": "#333333",
"code_bg": "#f8f9fa"
}
class EnhancedCanvas(tk.Canvas):
"""支持缩放和平移的画布组件"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.scale_factor = 1.0
self.last_x = 0
self.last_y = 0
self.image_item = None
# 事件绑定
self.bind("<MouseWheel>", self.zoom)
self.bind("<ButtonPress-2>", self.start_pan)
self.bind("<B2-Motion>", self.pan)
self.bind("<Button-4>", self.zoom) # Linux支持
self.bind("<Button-5>", self.zoom)
def zoom(self, event):
"""鼠标滚轮缩放"""
scale = 1.1 if event.delta > 0 or event.num == 4 else 0.9
self.scale_factor *= scale
self.scale_factor = max(0.5, min(3.0, self.scale_factor))
x = self.canvasx(event.x)
y = self.canvasy(event.y)
self.scale(tk.ALL, x, y, scale, scale)
self.configure(scrollregion=self.bbox(tk.ALL))
def start_pan(self, event):
"""开始平移"""
self.last_x = event.x
self.last_y = event.y
self.scan_mark(event.x, event.y)
def pan(self, event):
"""平移视图"""
self.scan_dragto(event.x, event.y, gain=1)
self.configure(scrollregion=self.bbox(tk.ALL))
def update_image(self, img_path):
"""更新画布图片"""
if self.image_item:
self.delete(self.image_item)
try:
img = Image.open(img_path)
self.tk_img = ImageTk.PhotoImage(img)
self.image_item = self.create_image(0, 0, anchor=tk.NW, image=self.tk_img)
self.configure(scrollregion=self.bbox(tk.ALL))
except Exception as e:
messagebox.showerror("图片加载失败", str(e))
class MarkdownRenderer(scrolledtext.ScrolledText):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.configure(
state='disabled',
wrap=tk.WORD,
padx=15,
pady=15,
font=(STYLE_CONFIG['font_family'], STYLE_CONFIG['font_size'])
)
self._init_tags()
def _init_tags(self):
self.tag_configure('h1', font=(FONT_NAME, 16, 'bold'), spacing3=10)
self.tag_configure('h2', font=(FONT_NAME, 14, 'bold'), spacing2=8)
self.tag_configure('bold', font=(FONT_NAME, STYLE_CONFIG['font_size'], 'bold'))
self.tag_configure('italic', font=(FONT_NAME, STYLE_CONFIG['font_size'], 'italic'))
self.tag_configure('code',
background=STYLE_CONFIG['code_bg'],
relief='sunken',
borderwidth=1,
font='Consolas 10')
def render(self, markdown_text):
self.configure(state='normal')
self.delete(1.0, tk.END)
lines = markdown_text.split('\n')
in_code_block = False
for line in lines:
if line.strip().startswith('```'):
in_code_block = not in_code_block
continue
if in_code_block:
self.insert(tk.END, line + '\n', 'code')
continue
if line.startswith('# '):
self.insert(tk.END, line[2:] + '\n', 'h1')
elif line.startswith('## '):
self.insert(tk.END, line[3:] + '\n', 'h2')
else:
processed_line = self._process_inline_formatting(line)
self.insert(tk.END, processed_line + '\n')
self.configure(state='disabled')
def _process_inline_formatting(self, line):
line = re.sub(r'\*\*(.*?)\*\*', lambda m: self._apply_tag(m.group(1), 'bold'), line)
line = re.sub(r'\*(.*?)\*', lambda m: self._apply_tag(m.group(1), 'italic'), line)
return line
def _apply_tag(self, text, tag):
self.insert(tk.END, text, tag)
return ''
class AsyncAnalyzer(threading.Thread):
"""异步分析线程"""
def __init__(self, code, callback):
super().__init__()
self.code = code
self.callback = callback
self.daemon = True
def run(self):
try:
client = OpenAI(
api_key=DEEPSEEK_API_KEY,
base_url="https://api.deepseek.com"
)
response = client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": "请严格按以下Markdown格式分析代码\n"
"## 语法检查\n- 错误列表\n- 修正建议\n\n"
"## 逻辑分析\n1. 步骤说明\n2. 流程图必须使用标准DOT语法用```dot标记\n"
"3. 类图(用```plantuml标记\n\n"
"## 优化建议\n- 性能优化\n- 可读性建议\n\n"
"DOT语法要求\n"
"1. 使用有意义的节点名称\n"
"2. 所有边使用 -> 符号\n"
"3. 节点属性用方括号包裹"},
{"role": "user", "content": f"分析代码:\n```python\n{self.code}\n```"}
],
temperature=0.3
)
self.callback(response.choices[0].message.content, None)
except Exception as e:
self.callback(None, str(e))
class DiagramGenerator:
@staticmethod
def generate_flow(dot_code, filename):
"""生成中文流程图"""
try:
font_config = f'''
graph [fontname="{FONT_NAME}"];
node [fontname="{FONT_NAME}"];
edge [fontname="{FONT_NAME}"];
'''
dot_code = re.sub(r'(digraph\s*\w*\s*{)', f'\\1\n{font_config}', dot_code)
filepath = os.path.join(TEMP_DIR, filename)
src = Source(dot_code, format='png', engine='dot', encoding='utf-8')
output_path = src.render(filepath, cleanup=True)
return output_path
except Exception as e:
return f"流程图错误:{str(e)}"
@staticmethod
def generate_class(plantuml_code, filename):
"""生成类图(英文)"""
try:
plantuml_code = re.sub(r'^\s*```\s*plantuml\s*\n', '', plantuml_code, flags=re.IGNORECASE)
plantuml_code = re.sub(r'\n\s*```\s*$', '', plantuml_code)
uml_path = os.path.join(TEMP_DIR, f"{filename}.puml")
img_path = os.path.join(TEMP_DIR, f"{filename}.png")
with open(uml_path, 'w', encoding='utf-8') as f:
f.write(plantuml_code)
cmd = [
'java',
'-Djava.awt.headless=true',
'-jar', PLANTUML_JAR,
'-tpng',
uml_path,
'-o', TEMP_DIR
]
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
timeout=30
)
if result.returncode != 0:
error_msg = result.stdout.strip()
return f"类图生成失败:{error_msg.split('ERROR')[-1].strip()}"
return img_path
except subprocess.TimeoutExpired:
return "错误生成超时30秒"
except Exception as e:
return f"系统错误:{str(e)}"
finally:
if os.path.exists(uml_path):
os.remove(uml_path)
class CodeAnalyzerApp:
def __init__(self, root):
self.root = root
self._init_ui()
self._init_state()
self._start_queue_processor()
def _init_ui(self):
"""初始化界面"""
self.root.title("AI代码分析工具 v4.3")
self.root.geometry("1440x960")
# 主布局
main_paned = ttk.PanedWindow(self.root, orient=tk.VERTICAL)
main_paned.pack(fill=tk.BOTH, expand=True)
# 上部区域
top_pane = ttk.PanedWindow(main_paned, orient=tk.HORIZONTAL)
main_paned.add(top_pane)
# 代码输入区
input_frame = ttk.LabelFrame(top_pane, text=" 代码输入 ", padding=10)
top_pane.add(input_frame, weight=1)
self.code_input = scrolledtext.ScrolledText(
input_frame,
font=('Consolas', 11),
wrap=tk.WORD,
padx=10,
pady=10
)
self.code_input.pack(fill=tk.BOTH, expand=True)
# 分析结果区
result_frame = ttk.Frame(top_pane)
top_pane.add(result_frame, weight=1)
self.md_view = MarkdownRenderer(result_frame)
self.md_view.pack(fill=tk.BOTH, expand=True)
# 下部图表区
bottom_pane = ttk.PanedWindow(main_paned, orient=tk.HORIZONTAL)
main_paned.add(bottom_pane, weight=1)
# 流程图面板
flow_frame = ttk.LabelFrame(bottom_pane, text=" 流程图 ", padding=5)
bottom_pane.add(flow_frame, weight=1)
self.flow_canvas = EnhancedCanvas(
flow_frame,
bg="white",
bd=2,
relief=tk.GROOVE
)
self.flow_canvas.pack(fill=tk.BOTH, expand=True)
# 类图面板
class_frame = ttk.LabelFrame(bottom_pane, text=" 类图 ", padding=5)
bottom_pane.add(class_frame, weight=1)
self.class_canvas = EnhancedCanvas(
class_frame,
bg="white",
bd=2,
relief=tk.GROOVE
)
self.class_canvas.pack(fill=tk.BOTH, expand=True)
# 控制栏
self._init_controls()
def _init_controls(self):
"""初始化控制组件"""
control_frame = ttk.Frame(self.root)
control_frame.pack(fill=tk.X, pady=5)
self.analyze_btn = ttk.Button(
control_frame,
text="开始分析",
command=self._start_analysis
)
self.analyze_btn.pack(side=tk.LEFT, padx=10)
ttk.Button(
control_frame,
text="重置视图",
command=self._reset_view
).pack(side=tk.RIGHT, padx=10)
# 状态栏
self.status_var = tk.StringVar()
status_bar = ttk.Label(
self.root,
textvariable=self.status_var,
relief=tk.SUNKEN,
anchor=tk.W
)
status_bar.pack(side=tk.BOTTOM, fill=tk.X)
def _init_state(self):
"""初始化状态"""
self.is_processing = False
def _start_analysis(self):
"""启动分析任务"""
if self.is_processing:
return
code = self.code_input.get("1.0", tk.END).strip()
if len(code) < 20:
messagebox.showwarning("输入错误", "请输入有效的代码内容")
return
self.is_processing = True
self.analyze_btn.config(state=tk.DISABLED)
self.status_var.set("正在分析代码...")
AsyncAnalyzer(
code=code,
callback=self._handle_analysis_result
).start()
def _handle_analysis_result(self, result, error):
"""处理分析结果"""
self.is_processing = False
self.analyze_btn.config(state=tk.NORMAL)
if error:
self.status_var.set(f"分析失败:{error}")
messagebox.showerror("分析错误", error)
return
self.md_view.render(result)
self.status_var.set("正在生成图表...")
self._process_diagrams(result)
def _process_diagrams(self, analysis_result):
"""处理图表生成"""
dot_code = self._extract_code_block(analysis_result, 'dot')
plantuml_code = self._extract_code_block(analysis_result, 'plantuml')
if dot_code:
threading.Thread(
target=self._generate_diagram,
args=('flow', dot_code, 'flow_diagram'),
daemon=True
).start()
if plantuml_code:
threading.Thread(
target=self._generate_diagram,
args=('class', plantuml_code, 'class_diagram'),
daemon=True
).start()
def _generate_diagram(self, diagram_type, code, filename):
"""生成并更新图表"""
try:
if diagram_type == 'flow':
path = DiagramGenerator.generate_flow(code, filename)
else:
path = DiagramGenerator.generate_class(code, filename)
diagram_queue.put((diagram_type, path))
except Exception as e:
diagram_queue.put((diagram_type, f"生成错误:{str(e)}"))
def _start_queue_processor(self):
"""启动队列处理"""
def process():
if not diagram_queue.empty():
diagram_type, path = diagram_queue.get()
if diagram_type == 'flow':
self._update_flow_diagram(path)
else:
self._update_class_diagram(path)
self.root.after(100, process)
process()
def _update_flow_diagram(self, img_path):
"""更新流程图"""
if img_path.startswith(("错误:", "流程图错误:")):
self.status_var.set(img_path)
messagebox.showerror("流程图错误", img_path.split("", 1)[-1])
return
try:
self.flow_canvas.update_image(img_path)
self.status_var.set("流程图已更新")
except Exception as e:
messagebox.showerror("流程图加载失败", str(e))
def _update_class_diagram(self, img_path):
"""更新类图"""
if img_path.startswith(("错误:", "类图生成失败:")):
messagebox.showerror("类图错误", img_path.split("", 1)[-1])
return
try:
self.class_canvas.update_image(img_path)
self.status_var.set("类图已更新")
except Exception as e:
messagebox.showerror("类图加载失败", str(e))
def _reset_view(self):
"""重置视图"""
for canvas in [self.flow_canvas, self.class_canvas]:
canvas.scale_factor = 1.0
canvas.scale(tk.ALL, 0, 0, 1, 1)
canvas.configure(scrollregion=canvas.bbox(tk.ALL))
@staticmethod
def _extract_code_block(text, lang):
"""提取代码块"""
pattern = rf'```{lang}\n(.*?)\n```'
match = re.search(pattern, text, re.DOTALL)
return match.group(1).strip() if match else None
def check_environment():
"""环境检查"""
try:
subprocess.run(
['java', '-version'],
check=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
timeout=10,
creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
)
if not os.path.exists(PLANTUML_JAR):
messagebox.showerror("配置错误", f"未找到PlantUML JAR文件\n{PLANTUML_JAR}")
return False
return True
except Exception as e:
messagebox.showerror("环境错误", f"Java环境异常{str(e)}")
return False
if __name__ == "__main__":
if not check_environment():
exit(1)
root = tk.Tk()
app = CodeAnalyzerApp(root)
root.mainloop()