项目开发日记:静态分析Agent的实现
基于tree-sitter构建代码智能分析工具
在代码审查流程中,需要自动分析代码质量、发现潜在问题。我开发了这个静态分析Agent,它能解析代码AST,提取函数签名、调用图、复杂度指标,并输出结构化的issues列表供下游消费。
为什么是tree-sitter?
在对比了正则表达式和Python AST后,我选择了tree-sitter,原因是:
- 多语言支持:不只限于Python,还能支持JavaScript、Go等多种语言
- 错误容忍:即使代码有语法错误,也能生成部分AST
- 精确定位:能精确获取每个节点的行列位置
- 增量解析:支持增量更新,适合大型项目
我甚至考虑过LLM,这个能理解上下文,提出建议,但考虑成本,延迟,结果不可靠,以及我这个项目的目的
架构设计
核心模块
StaticAnalyzerAgent (主控制器) ↓ TreeSitterParser (解析封装) ↓ 分析引擎 (AST分析 + 正则降级) ↓ 结构化输出数据模型
我定义了以下几个核心数据结构:
Issue:发现的问题
- severity: error/warning/info
- line: 行号
- description: 问题描述
- suggestion: 修复建议
FunctionInfo:函数信息
- name: 函数名
- start_line/end_line: 起止行号
- parameters: 参数列表
- complexity: 圈复杂度
- is_recursive: 是否递归
- calls: 调用的函数列表
AnalysisResult:分析结果
- success: 是否成功
- failure_category: 失败类型
- issues: 问题列表
- functions: 函数列表
- call_graph: 调用图
- metrics: 代码指标
- degraded: 是否降级
核心功能实现
1. 代码解析
我封装了TreeSitterParser类来处理解析逻辑:
classTreeSitterParser:def__init__(self):self.python_language=Language(tspython.language())self.parser=Parser(self.python_language)asyncdefparse(self,code:str,language:str):iflanguage!="python":raiseValueError(f"Unsupported language:{language}")tree=awaitasyncio.to_thread(self.parser.parse,code.encode("utf-8"),)returntree这里使用异步执行避免阻塞事件循环,同时支持超时控制。
2. 函数提取
遍历AST提取所有顶层函数定义:
def_extract_functions(self,root_node,code):functions=[]deftraverse(node):ifnode.type=="function_definition":func_info=self._parse_function_node(node,code)iffunc_info:functions.append(func_info)return# 不进入函数体内部,只提取顶层ifnode.type=="class_definition":return# 类方法由_extract_classes处理forchildinnode.children:traverse(child)traverse(root_node)returnfunctions函数信息解析包括:
- 从
name字段获取函数名 - 从
parameters子节点提取参数列表 - 计算圈复杂度
- 检测是否递归调用
- 提取函数内部的调用关系
3. 类信息提取
类似地提取类定义及其方法:
def_extract_classes(self,root_node):classes=[]deftraverse(node):ifnode.type=="class_definition":class_info=self._parse_class_node(node)ifclass_info:classes.append(class_info)forchildinnode.children:traverse(child)traverse(root_node)returnclasses4. 圈复杂度计算
遍历函数AST,统计控制流语句数量:
def_calculate_complexity(self,node):complexity=1defcount_branches(n):nonlocalcomplexityifn.typein("if_statement","for_statement","while_statement","except_clause"):complexity+=1forchildinn.children:count_branches(child)count_branches(node)returncomplexity5. 调用图构建
从函数信息中提取调用关系:
def_build_call_graph(self,functions):return{func.name:func.callsforfuncinfunctionsiffunc.calls}6. 代码指标计算
统计代码行数、注释行数和空行:
def_calculate_metrics(self,code):lines=code.split("\n")total_lines=len(lines)code_lines=comment_lines=blank_lines=0forlineinlines:stripped=line.strip()ifnotstripped:blank_lines+=1elifstripped.startswith("#"):comment_lines+=1else:code_lines+=1returnCodeMetrics(total_lines=total_lines,code_lines=code_lines,comment_lines=comment_lines,blank_lines=blank_lines)7. 问题生成规则
基于AST分析结果生成issues:
def_generate_issues(self,functions,code):issues=[]forfuncinfunctions:# 高复杂度检查iffunc.complexity>10:issues.append(Issue(severity="warning",line=func.start_line,description=f"函数'{func.name}'圈复杂度为{func.complexity},建议拆分",suggestion="将复杂逻辑拆分为多个职责单一的子函数"))# 参数过多检查iflen(func.parameters)>5:issues.append(Issue(severity="warning",line=func.start_line,description=f"函数有{len(func.parameters)}个参数,建议不超过5个",suggestion="考虑使用dataclass或dict封装参数"))# 函数过长检查func_length=func.end_line-func.start_lineiffunc_length>50:issues.append(Issue(severity="info",line=func.start_line,description=f"函数有{func_length}行,建议不超过50行",suggestion="提取子函数以降低函数长度"))returnissues8. 正则降级策略
当tree-sitter解析失败时,自动降级为正则模式匹配:
_REGEX_PATTERNS=[(re.compile(r"eval\s*\(",re.MULTILINE),"warning","使用eval()存在代码注入风险"),(re.compile(r"except\s*:",re.MULTILINE),"warning","裸except会捕获所有异常"),# ... 更多模式]def_regex_fallback(self,code):issues=[]forpattern,severity,descriptionin_REGEX_PATTERNS:formatchinpattern.finditer(code):line_num=code[:match.start()].count("\n")+1issues.append(Issue(severity=severity,line=line_num,description=description,suggestion=description))returnAnalysisResult(success=True,failure_category=FailureCategory.PARSE_ERROR,error_message="tree-sitter解析失败,降级为正则匹配",issues=issues,metrics=self._calculate_metrics(code),degraded=True)输出格式
Agent输出JSON格式的结构化结果:
{"issues":[{"severity":"warning","line":42,"description":"函数'process_data'圈复杂度为15,建议拆分","suggestion":"将复杂逻辑拆分为多个职责单一的子函数"}],"static_findings":"found","degraded":false,"metrics":{"total_lines":200,"code_lines":150,"comment_lines":30,"blank_lines":20}}当没有发现问题时,标记static_findings: none:
{"issues":[],"static_findings":"none","degraded":false,"metrics":{...}}故障处理
1. 解析失败降级
当tree-sitter解析失败时,自动切换为正则模式匹配,标记degraded为true。
2. 超时处理
设置超时控制,超时后返回部分结果并标记:
try:asyncwithasyncio.timeout(self.config.agent_timeout):tree=awaitself.parser.parse(code,language)# ... 分析逻辑exceptTimeoutError:returnAnalysisResult(success=True,failure_category=FailureCategory.UPSTREAM_TIMEOUT,degraded=True,error_message="分析超时,返回部分结果")3. 不支持的语言
遇到不支持的语言时,自动降级为正则匹配。
使用示例
基本用法
agent=StaticAnalyzerAgent(config)# 分析代码result=awaitagent.analyze_code(""" def complex_function(x): if x > 0: for i in range(x): if i % 2 == 0: print(i) return x ""","python")# 输出结果print(f"发现{len(result.issues)}个问题")forissueinresult.issues:print(f"[{issue.severity}] Line{issue.line}:{issue.description}")作为Agent运行
# 通过run方法执行task=json.dumps({"diff":code_content,"language":"python","files":["example.py"]})result=awaitagent.run(task)output=json.loads(result.output)性能优化
异步执行
所有耗时操作使用asyncio.to_thread避免阻塞事件循环。
解析器复用
解析器实例在Agent生命周期内复用,避免重复初始化开销。
正则预编译
所有正则模式在模块加载时预编译,提高匹配效率。