# -*- coding: utf-8 -*- import os import re import queue import threading import tkinter as tk import tempfile import subprocess from tkinter import ttk, messagebox, scrolledtext from graphviz import Source from openai import OpenAI from dotenv import load_dotenv from PIL import Image, ImageTk # 兼容性处理 try: resample_mode = Image.Resampling.LANCZOS except AttributeError: resample_mode = Image.ANTIALIAS load_dotenv() # 配置常量 DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY") PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__)) PLANTUML_JAR = os.path.join(PROJECT_ROOT, "plantuml-1.2025.2.jar") FONT_NAME = "Microsoft YaHei" # 线程安全队列 analysis_queue = queue.Queue() diagram_queue = queue.Queue() # 临时文件目录 TEMP_DIR = tempfile.mkdtemp() # 界面样式配置 STYLE_CONFIG = { "font_family": FONT_NAME, "font_size": 11, "dark_bg": "#2d2d2d", "light_bg": "#f0f0f0", "primary": "#007acc", "success": "#4CAF50", "danger": "#f44336", "text_primary": "#ffffff", "text_secondary": "#333333", "code_bg": "#f8f9fa" } class EnhancedCanvas(tk.Canvas): """支持缩放和平移的画布组件""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.scale_factor = 1.0 self.last_x = 0 self.last_y = 0 self.image_item = None # 事件绑定 self.bind("", self.zoom) self.bind("", self.start_pan) self.bind("", self.pan) self.bind("", self.zoom) # Linux支持 self.bind("", self.zoom) def zoom(self, event): """鼠标滚轮缩放""" scale = 1.1 if event.delta > 0 or event.num == 4 else 0.9 self.scale_factor *= scale self.scale_factor = max(0.5, min(3.0, self.scale_factor)) x = self.canvasx(event.x) y = self.canvasy(event.y) self.scale(tk.ALL, x, y, scale, scale) self.configure(scrollregion=self.bbox(tk.ALL)) def start_pan(self, event): """开始平移""" self.last_x = event.x self.last_y = event.y self.scan_mark(event.x, event.y) def pan(self, event): """平移视图""" self.scan_dragto(event.x, event.y, gain=1) self.configure(scrollregion=self.bbox(tk.ALL)) def update_image(self, img_path): """更新画布图片""" if self.image_item: self.delete(self.image_item) try: img = Image.open(img_path) self.tk_img = ImageTk.PhotoImage(img) self.image_item = self.create_image(0, 0, anchor=tk.NW, image=self.tk_img) self.configure(scrollregion=self.bbox(tk.ALL)) except Exception as e: messagebox.showerror("图片加载失败", str(e)) class MarkdownRenderer(scrolledtext.ScrolledText): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.configure( state='disabled', wrap=tk.WORD, padx=15, pady=15, font=(STYLE_CONFIG['font_family'], STYLE_CONFIG['font_size']) ) self._init_tags() def _init_tags(self): self.tag_configure('h1', font=(FONT_NAME, 16, 'bold'), spacing3=10) self.tag_configure('h2', font=(FONT_NAME, 14, 'bold'), spacing2=8) self.tag_configure('bold', font=(FONT_NAME, STYLE_CONFIG['font_size'], 'bold')) self.tag_configure('italic', font=(FONT_NAME, STYLE_CONFIG['font_size'], 'italic')) self.tag_configure('code', background=STYLE_CONFIG['code_bg'], relief='sunken', borderwidth=1, font='Consolas 10') def render(self, markdown_text): self.configure(state='normal') self.delete(1.0, tk.END) lines = markdown_text.split('\n') in_code_block = False for line in lines: if line.strip().startswith('```'): in_code_block = not in_code_block continue if in_code_block: self.insert(tk.END, line + '\n', 'code') continue if line.startswith('# '): self.insert(tk.END, line[2:] + '\n', 'h1') elif line.startswith('## '): self.insert(tk.END, line[3:] + '\n', 'h2') else: processed_line = self._process_inline_formatting(line) self.insert(tk.END, processed_line + '\n') self.configure(state='disabled') def _process_inline_formatting(self, line): line = re.sub(r'\*\*(.*?)\*\*', lambda m: self._apply_tag(m.group(1), 'bold'), line) line = re.sub(r'\*(.*?)\*', lambda m: self._apply_tag(m.group(1), 'italic'), line) return line def _apply_tag(self, text, tag): self.insert(tk.END, text, tag) return '' class AsyncAnalyzer(threading.Thread): """异步分析线程""" def __init__(self, code, callback): super().__init__() self.code = code self.callback = callback self.daemon = True def run(self): try: client = OpenAI( api_key=DEEPSEEK_API_KEY, base_url="https://api.deepseek.com" ) response = client.chat.completions.create( model="deepseek-chat", messages=[ {"role": "system", "content": "请严格按以下Markdown格式分析代码:\n" "## 语法检查\n- 错误列表\n- 修正建议\n\n" "## 逻辑分析\n1. 步骤说明\n2. 流程图(必须使用标准DOT语法,用```dot标记)\n" "3. 类图(用```plantuml标记)\n\n" "## 优化建议\n- 性能优化\n- 可读性建议\n\n" "DOT语法要求:\n" "1. 使用有意义的节点名称\n" "2. 所有边使用 -> 符号\n" "3. 节点属性用方括号包裹"}, {"role": "user", "content": f"分析代码:\n```python\n{self.code}\n```"} ], temperature=0.3 ) self.callback(response.choices[0].message.content, None) except Exception as e: self.callback(None, str(e)) class DiagramGenerator: @staticmethod def generate_flow(dot_code, filename): """生成中文流程图""" try: font_config = f''' graph [fontname="{FONT_NAME}"]; node [fontname="{FONT_NAME}"]; edge [fontname="{FONT_NAME}"]; ''' dot_code = re.sub(r'(digraph\s*\w*\s*{)', f'\\1\n{font_config}', dot_code) filepath = os.path.join(TEMP_DIR, filename) src = Source(dot_code, format='png', engine='dot', encoding='utf-8') output_path = src.render(filepath, cleanup=True) return output_path except Exception as e: return f"流程图错误:{str(e)}" @staticmethod def generate_class(plantuml_code, filename): """生成类图(英文)""" try: plantuml_code = re.sub(r'^\s*```\s*plantuml\s*\n', '', plantuml_code, flags=re.IGNORECASE) plantuml_code = re.sub(r'\n\s*```\s*$', '', plantuml_code) uml_path = os.path.join(TEMP_DIR, f"{filename}.puml") img_path = os.path.join(TEMP_DIR, f"{filename}.png") with open(uml_path, 'w', encoding='utf-8') as f: f.write(plantuml_code) cmd = [ 'java', '-Djava.awt.headless=true', '-jar', PLANTUML_JAR, '-tpng', uml_path, '-o', TEMP_DIR ] result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, timeout=30 ) if result.returncode != 0: error_msg = result.stdout.strip() return f"类图生成失败:{error_msg.split('ERROR')[-1].strip()}" return img_path except subprocess.TimeoutExpired: return "错误:生成超时(30秒)" except Exception as e: return f"系统错误:{str(e)}" finally: if os.path.exists(uml_path): os.remove(uml_path) class CodeAnalyzerApp: def __init__(self, root): self.root = root self._init_ui() self._init_state() self._start_queue_processor() def _init_ui(self): """初始化界面""" self.root.title("AI代码分析工具 v4.3") self.root.geometry("1440x960") # 主布局 main_paned = ttk.PanedWindow(self.root, orient=tk.VERTICAL) main_paned.pack(fill=tk.BOTH, expand=True) # 上部区域 top_pane = ttk.PanedWindow(main_paned, orient=tk.HORIZONTAL) main_paned.add(top_pane) # 代码输入区 input_frame = ttk.LabelFrame(top_pane, text=" 代码输入 ", padding=10) top_pane.add(input_frame, weight=1) self.code_input = scrolledtext.ScrolledText( input_frame, font=('Consolas', 11), wrap=tk.WORD, padx=10, pady=10 ) self.code_input.pack(fill=tk.BOTH, expand=True) # 分析结果区 result_frame = ttk.Frame(top_pane) top_pane.add(result_frame, weight=1) self.md_view = MarkdownRenderer(result_frame) self.md_view.pack(fill=tk.BOTH, expand=True) # 下部图表区 bottom_pane = ttk.PanedWindow(main_paned, orient=tk.HORIZONTAL) main_paned.add(bottom_pane, weight=1) # 流程图面板 flow_frame = ttk.LabelFrame(bottom_pane, text=" 流程图 ", padding=5) bottom_pane.add(flow_frame, weight=1) self.flow_canvas = EnhancedCanvas( flow_frame, bg="white", bd=2, relief=tk.GROOVE ) self.flow_canvas.pack(fill=tk.BOTH, expand=True) # 类图面板 class_frame = ttk.LabelFrame(bottom_pane, text=" 类图 ", padding=5) bottom_pane.add(class_frame, weight=1) self.class_canvas = EnhancedCanvas( class_frame, bg="white", bd=2, relief=tk.GROOVE ) self.class_canvas.pack(fill=tk.BOTH, expand=True) # 控制栏 self._init_controls() def _init_controls(self): """初始化控制组件""" control_frame = ttk.Frame(self.root) control_frame.pack(fill=tk.X, pady=5) self.analyze_btn = ttk.Button( control_frame, text="开始分析", command=self._start_analysis ) self.analyze_btn.pack(side=tk.LEFT, padx=10) ttk.Button( control_frame, text="重置视图", command=self._reset_view ).pack(side=tk.RIGHT, padx=10) # 状态栏 self.status_var = tk.StringVar() status_bar = ttk.Label( self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W ) status_bar.pack(side=tk.BOTTOM, fill=tk.X) def _init_state(self): """初始化状态""" self.is_processing = False def _start_analysis(self): """启动分析任务""" if self.is_processing: return code = self.code_input.get("1.0", tk.END).strip() if len(code) < 20: messagebox.showwarning("输入错误", "请输入有效的代码内容") return self.is_processing = True self.analyze_btn.config(state=tk.DISABLED) self.status_var.set("正在分析代码...") AsyncAnalyzer( code=code, callback=self._handle_analysis_result ).start() def _handle_analysis_result(self, result, error): """处理分析结果""" self.is_processing = False self.analyze_btn.config(state=tk.NORMAL) if error: self.status_var.set(f"分析失败:{error}") messagebox.showerror("分析错误", error) return self.md_view.render(result) self.status_var.set("正在生成图表...") self._process_diagrams(result) def _process_diagrams(self, analysis_result): """处理图表生成""" dot_code = self._extract_code_block(analysis_result, 'dot') plantuml_code = self._extract_code_block(analysis_result, 'plantuml') if dot_code: threading.Thread( target=self._generate_diagram, args=('flow', dot_code, 'flow_diagram'), daemon=True ).start() if plantuml_code: threading.Thread( target=self._generate_diagram, args=('class', plantuml_code, 'class_diagram'), daemon=True ).start() def _generate_diagram(self, diagram_type, code, filename): """生成并更新图表""" try: if diagram_type == 'flow': path = DiagramGenerator.generate_flow(code, filename) else: path = DiagramGenerator.generate_class(code, filename) diagram_queue.put((diagram_type, path)) except Exception as e: diagram_queue.put((diagram_type, f"生成错误:{str(e)}")) def _start_queue_processor(self): """启动队列处理""" def process(): if not diagram_queue.empty(): diagram_type, path = diagram_queue.get() if diagram_type == 'flow': self._update_flow_diagram(path) else: self._update_class_diagram(path) self.root.after(100, process) process() def _update_flow_diagram(self, img_path): """更新流程图""" if img_path.startswith(("错误:", "流程图错误:")): self.status_var.set(img_path) messagebox.showerror("流程图错误", img_path.split(":", 1)[-1]) return try: self.flow_canvas.update_image(img_path) self.status_var.set("流程图已更新") except Exception as e: messagebox.showerror("流程图加载失败", str(e)) def _update_class_diagram(self, img_path): """更新类图""" if img_path.startswith(("错误:", "类图生成失败:")): messagebox.showerror("类图错误", img_path.split(":", 1)[-1]) return try: self.class_canvas.update_image(img_path) self.status_var.set("类图已更新") except Exception as e: messagebox.showerror("类图加载失败", str(e)) def _reset_view(self): """重置视图""" for canvas in [self.flow_canvas, self.class_canvas]: canvas.scale_factor = 1.0 canvas.scale(tk.ALL, 0, 0, 1, 1) canvas.configure(scrollregion=canvas.bbox(tk.ALL)) @staticmethod def _extract_code_block(text, lang): """提取代码块""" pattern = rf'```{lang}\n(.*?)\n```' match = re.search(pattern, text, re.DOTALL) return match.group(1).strip() if match else None def check_environment(): """环境检查""" try: subprocess.run( ['java', '-version'], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, timeout=10, creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0 ) if not os.path.exists(PLANTUML_JAR): messagebox.showerror("配置错误", f"未找到PlantUML JAR文件:\n{PLANTUML_JAR}") return False return True except Exception as e: messagebox.showerror("环境错误", f"Java环境异常:{str(e)}") return False if __name__ == "__main__": if not check_environment(): exit(1) root = tk.Tk() app = CodeAnalyzerApp(root) root.mainloop()