企业级 RAG(检索增强生成)系统的 Python AI 服务层,基于 FastAPI + LangGraph + Claude 构建。
本节梳理项目中大量使用的 Python 语言特性,每个特性附上项目内真实代码片段,并与 Java 等价写法对比,帮助有 Java 背景的开发者快速建立映射关系。
Python 3.10+ 原生联合类型和泛型,无需从 typing 导入。
Python(本项目)
# 联合类型:int 或 None
old_version: int | None = None
# 原生泛型(Python 3.9+,无需 import)
allowed_roles: list[str] = []
chunk_map: dict[str, ChunkResult] = {}
# 函数返回值类型
def split_pages(...) -> tuple[list[ParentChunk], list[ChildChunk]]:
...
async def stream_query(...) -> AsyncIterator[str]:
...Java 等价写法
// 可空类型:Java 用 @Nullable 注解或 Optional<T> 表达
@Nullable Integer oldVersion = null;
Optional<Integer> oldVersion = Optional.empty();
// 泛型集合:Java 必须写完整泛型参数
List<String> allowedRoles = new ArrayList<>();
Map<String, ChunkResult> chunkMap = new HashMap<>();
// 函数返回值(Java 没有元组,需自定义 record 或 Pair)
record SplitResult(List<ParentChunk> parents, List<ChildChunk> children) {}
SplitResult splitPages(...) { ... }
// 异步流:Java 用 Flux<String>(Project Reactor)或 Stream<String>
Flux<String> streamQuery(...) { ... }关键差异:Python 类型注解是可选的(运行时不强制),主要供静态分析工具(mypy、IDE)使用;Java 类型在编译期严格检查,不可省略。
出处:app/models/schemas.py、app/parsers/chunker.py、app/pipeline/query_pipeline.py
用 BaseModel 做数据校验、序列化和文档生成,是 FastAPI 的核心依赖。
Python(本项目)
from pydantic import BaseModel, Field
from typing import Literal
class IngestMessage(BaseModel):
tenant_id: str
doc_version: int = 1 # 默认值
operation: Literal["create", "update"] = "create" # 枚举约束
old_version: int | None = None # 可选字段
allowed_roles: list[str] = []
created_at: datetime = Field(default_factory=datetime.utcnow)
# 不可变更新:返回新对象,原对象不变
state = state.model_copy(update={"answer": "...", "sources": [...]})Java 等价写法
// Java 需要 Jackson + Bean Validation 注解组合实现同等功能
public class IngestMessage {
@NotNull
private String tenantId;
private int docVersion = 1;
@Pattern(regexp = "create|update") // 或用枚举类型
private String operation = "create";
@Nullable
private Integer oldVersion;
private List<String> allowedRoles = new ArrayList<>();
// 默认值工厂:Java 需在构造器或字段初始化中处理
private LocalDateTime createdAt = LocalDateTime.now();
}
// 不可变更新:Java 用 Builder 模式或 Lombok @With
IngestMessage updated = original.toBuilder()
.answer("...")
.sources(List.of(...))
.build();关键差异:Python 一个 BaseModel 类同时获得校验、序列化、OpenAPI Schema 生成;Java 需要 Jackson(序列化)+ Bean Validation(校验)+ SpringDoc(文档)三个库分别配置。
出处:app/models/schemas.py、app/pipeline/query_pipeline.py
BaseSettings 自动从环境变量或 .env 文件加载配置,字段名自动转大写匹配。
Python(本项目)
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
anthropic_api_key: str # 必填,缺失则启动报错
claude_model: str = "claude-sonnet-4-6"
milvus_top_k: int = 20 # 自动从 MILVUS_TOP_K 环境变量读取
class Config:
env_file = ".env" # 优先级:环境变量 > .env > 默认值
settings = Settings() # 模块级单例Java 等价写法
// Spring Boot application.yml + @ConfigurationProperties
@ConfigurationProperties(prefix = "app")
@Configuration
public class Settings {
@NotBlank
private String anthropicApiKey; // 对应 app.anthropic-api-key
private String claudeModel = "claude-sonnet-4-6";
private int milvusTopK = 20;
// getters/setters ...
}
// application.yml 中配置:
// app:
// anthropic-api-key: ${ANTHROPIC_API_KEY} # 从环境变量读取
// milvus-top-k: 20关键差异:Python 一个类定义即完成配置绑定;Java 需要 @ConfigurationProperties + @Configuration + YAML 文件三层配合,且 Spring Boot 的环境变量映射规则(kebab-case vs UPPER_CASE)比 Python 复杂。
出处:app/config.py
FastAPI 基于 asyncio 单线程事件循环,I/O 等待时切换协程,而非为每个请求创建线程。
Python(本项目)
async def embed_query(query: str) -> list[float]:
# await 挂起当前协程,事件循环去处理其他请求,I/O 完成后恢复
cached = await redis_client.get(f"emb:{key}")
if cached:
return json.loads(cached)
vector = get_model().encode(query).tolist()
await redis_client.setex(key, ttl, json.dumps(vector))
return vector
# 并发:两个检索同时进行,总耗时 ≈ max(Milvus耗时, ES耗时)
milvus_results, es_results = await asyncio.gather(
milvus_client.search(...),
es_client.search(...),
)Java 等价写法
// Spring WebFlux(Project Reactor)的响应式写法
public Mono<List<Float>> embedQuery(String query) {
return redisClient.get("emb:" + key)
.flatMap(cached -> {
if (cached != null) return Mono.just(parseJson(cached));
List<Float> vector = model.encode(query);
return redisClient.setex(key, ttl, toJson(vector))
.thenReturn(vector);
});
}
// 并发:等价于 asyncio.gather
Mono.zip(
milvusClient.search(...),
esClient.search(...)
).subscribe(tuple -> {
List<ChunkResult> milvusResults = tuple.getT1();
List<ChunkResult> esResults = tuple.getT2();
});
// 传统 Spring MVC:用 CompletableFuture
CompletableFuture<List<ChunkResult>> f1 = CompletableFuture.supplyAsync(() -> milvusSearch(...));
CompletableFuture<List<ChunkResult>> f2 = CompletableFuture.supplyAsync(() -> esSearch(...));
CompletableFuture.allOf(f1, f2).join();关键差异:Python async/await 是语言级语法糖,代码逻辑与同步代码几乎一致,只需加 async/await 关键字;Java 响应式编程(Reactor/RxJava)需要链式 flatMap/map/zip,思维模型完全不同,学习成本高。
出处:app/services/embedding.py、app/pipeline/query_pipeline.py
函数内含 yield 即成为生成器;async def + yield 是异步生成器,按需逐个产出值。
Python(本项目)
# yield 使函数变为生成器:每次 yield 暂停,调用方取值后继续
async def stream_query(...) -> AsyncIterator[str]:
# ... 前序步骤 ...
with _anthropic.messages.stream(...) as stream:
for text in stream.text_stream:
yield text # 产出一个 token,暂停等待消费
# 调用方:async for 驱动生成器逐步执行
async def event_generator():
async for token in stream_query(...):
yield f"data: {json.dumps({'token': token})}\n\n"
yield "data: [DONE]\n\n"Java 等价写法
// Java 没有原生生成器语法,需用 Flux(响应式流)模拟
public Flux<String> streamQuery(...) {
return Flux.create(sink -> {
// ... 前序步骤 ...
anthropicClient.streamMessages(...)
.doOnNext(text -> sink.next(text)) // 等价于 yield text
.doOnComplete(sink::complete)
.subscribe();
});
}
// 调用方消费
streamQuery(...).subscribe(token -> {
sendSseEvent("{\"token\": \"" + token + "\"}");
});
// 或 Spring MVC SseEmitter
SseEmitter emitter = new SseEmitter();
streamQuery(...).subscribe(
token -> emitter.send(token),
emitter::completeWithError,
emitter::complete
);关键差异:Python yield 是协程级暂停恢复,代码写起来像同步逻辑;Java 需要将逻辑拆碎放进回调/lambda,执行顺序不直观。
出处:app/pipeline/query_pipeline.py、app/api/chat.py
Python(本项目)
# gather:并发执行,等所有完成,结果顺序与传入顺序一致
await asyncio.gather(
asyncio.to_thread(milvus_client.delete_by_doc_version, ...),
es_client.delete_by_doc_version(...),
delete_parent_chunks_by_version(...),
)
# to_thread:将同步阻塞函数放到线程池,不阻塞事件循环
result = await asyncio.to_thread(sync_heavy_function, arg1, arg2)
# create_task:后台运行,不等待结果(fire-and-forget)
asyncio.create_task(_delete_old_version(tenant_id, doc_id, old_version))Java 等价写法
// CompletableFuture.allOf:等价于 asyncio.gather
CompletableFuture<Void> all = CompletableFuture.allOf(
CompletableFuture.runAsync(() -> milvusClient.deleteByDocVersion(...)),
CompletableFuture.runAsync(() -> esClient.deleteByDocVersion(...)),
CompletableFuture.runAsync(() -> redisClient.deleteParentChunks(...))
);
all.join(); // 等待全部完成
// 线程池提交同步任务:等价于 asyncio.to_thread
ExecutorService executor = Executors.newFixedThreadPool(4);
Future<Result> future = executor.submit(() -> syncHeavyFunction(arg1, arg2));
// fire-and-forget:等价于 create_task
CompletableFuture.runAsync(() -> deleteOldVersion(tenantId, docId, oldVersion));关键差异:Java 的 CompletableFuture 基于多线程,每个任务占用一个线程;Python asyncio 是单线程事件循环,协程切换开销极小,更适合大量 I/O 并发。
出处:app/pipeline/ingest_pipeline.py
Python 独有的简洁集合构建语法,Java 需要 Stream API 实现等效逻辑。
Python(本项目)
# 列表推导式:一行完成 map 操作
child_texts = [c.content for c in children]
# 带条件过滤:相当于 filter + map
filtered = [line for line in lines if line.strip() not in repeated]
# 集合推导式(去重)
repeated = {line for line, cnt in line_count.items() if cnt >= 3}
# 嵌套 enumerate:同时取索引和值
milvus_rows = [
{"chunk_id": c.chunk_id, "embedding": vectors[i]}
for i, c in enumerate(children)
]Java 等价写法
// Java Stream API 实现等效逻辑(更冗长)
List<String> childTexts = children.stream()
.map(c -> c.getContent())
.collect(Collectors.toList());
// 带条件过滤
List<String> filtered = lines.stream()
.filter(line -> !repeated.contains(line.strip()))
.collect(Collectors.toList());
// 集合(Set)去重
Set<String> repeated = lineCount.entrySet().stream()
.filter(e -> e.getValue() >= 3)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
// 带索引:Java Stream 无原生 enumerate,需 IntStream
List<Map<String, Object>> milvusRows = IntStream.range(0, children.size())
.mapToObj(i -> Map.of(
"chunk_id", children.get(i).getChunkId(),
"embedding", vectors.get(i)
))
.collect(Collectors.toList());关键差异:Python 推导式在一行内完成,可直接用于字典/集合/列表;Java Stream 需要 .stream() → 中间操作 → .collect() 三段式,且字典推导没有直接等价物(需 Collectors.toMap)。
出处:app/parsers/pdf_parser.py、app/pipeline/ingest_pipeline.py、app/api/chat.py
with 确保资源在块退出时自动释放,等价于 Java 的 try-with-resources。
Python(本项目)
# 文件资源自动关闭
with fitz.open(pdf_path) as doc:
for page in doc:
text = page.get_text("text")
# 离开 with 块后,doc 自动关闭,无论是否抛出异常
# HTTP 流式连接自动关闭
with _anthropic.messages.stream(...) as stream:
for text in stream.text_stream:
yield textJava 等价写法
// Java 7+ try-with-resources:与 Python with 语义完全一致
// 要求资源类实现 AutoCloseable 接口
try (PdfDocument doc = fitz.open(pdfPath)) {
for (Page page : doc) {
String text = page.getText();
}
} // 离开 try 块后自动调用 doc.close()
// 流式 HTTP 连接
try (MessageStream stream = anthropicClient.stream(...)) {
for (String text : stream.textStream()) {
yield(text);
}
}关键差异:语义完全一致,Python 用 with ... as,Java 用 try (...)。Python 任何实现了 __enter__/__exit__ 方法的对象都可用 with;Java 要求实现 AutoCloseable 接口。
出处:app/parsers/pdf_parser.py、app/pipeline/query_pipeline.py、app/pipeline/ingest_pipeline.py
模型文件 GB 级,用模块级变量 + global 关键字实现首次访问时才初始化。
Python(本项目)
_model: SentenceTransformer | None = None # 模块级变量
def get_model() -> SentenceTransformer:
global _model # 声明要修改模块级变量
if _model is None:
_model = SentenceTransformer("BAAI/bge-m3")
return _modelJava 等价写法
// 方式一:双重检查锁定(线程安全懒加载)
public class ModelHolder {
private static volatile SentenceTransformer model = null;
public static SentenceTransformer getModel() {
if (model == null) {
synchronized (ModelHolder.class) {
if (model == null) {
model = new SentenceTransformer("BAAI/bge-m3");
}
}
}
return model;
}
}
// 方式二:静态内部类(推荐,JVM 保证线程安全)
public class ModelHolder {
private static class Holder {
static final SentenceTransformer INSTANCE =
new SentenceTransformer("BAAI/bge-m3");
}
public static SentenceTransformer getModel() {
return Holder.INSTANCE;
}
}
// 方式三:Spring @Bean + @Lazy(Spring 项目常用)
@Bean
@Lazy
public SentenceTransformer sentenceTransformer() {
return new SentenceTransformer("BAAI/bge-m3");
}关键差异:Python 是单线程事件循环,无需考虑线程安全,写法极简;Java 多线程环境需要 volatile + synchronized 或借助 JVM 类加载机制保证安全。
出处:app/services/embedding.py、app/services/reranker.py、app/services/milvus_client.py
将 import 放在函数体内,只在真正需要时触发,减少冷启动时间。
Python(本项目)
def parse_scanned_pdf(pdf_path: str) -> list[dict]:
import pdf2image # 只有扫描件才导入,普通 PDF 不加载此库
import pytesseract
from PIL import Image
...
def describe_image(image_bytes: bytes) -> str:
from anthropic import Anthropic # 只在图片描述时才导入
...Java 等价写法
// Java 没有"延迟导入"的概念:import 是编译期静态声明,不影响运行时加载时机
// Java 实现类似效果需要反射或 ServiceLoader
// 反射方式(等价于"按需加载某个类")
public List<Map<String, Object>> parseScannedPdf(String path) {
try {
Class<?> ocrClass = Class.forName("com.example.TesseractOCR");
Object ocr = ocrClass.getDeclaredConstructor().newInstance();
// 通过反射调用方法...
} catch (ClassNotFoundException e) {
throw new RuntimeException("OCR library not available", e);
}
}
// 更常见做法:用接口 + 多实现 + 工厂方法,按需选择实现
PdfParser parser = PdfParserFactory.create(fileType);关键差异:Python 的 import 本身是运行时操作,放在函数内即实现延迟;Java 类加载由 JVM 在首次使用时自动完成,开发者无法通过 import 语句控制加载时机。
出处:app/parsers/pdf_parser.py
@dataclass 装饰器自动生成 __init__、__repr__、__eq__ 等样板方法。
Python(本项目)
from dataclasses import dataclass, field
@dataclass
class ChildChunk:
chunk_id: str
parent_id: str
content: str
page: int
chunk_index: int
# field(default_factory=list):每个实例独立列表,避免共享引用
allowed_roles: list[str] = field(default_factory=list)
# 自动生成:__init__(self, chunk_id, parent_id, ..., allowed_roles=None)
# 自动生成:__repr__ 打印所有字段
chunk = ChildChunk(chunk_id="abc", parent_id="xyz", content="...", page=1, chunk_index=0)Java 等价写法
// 方式一:Java 16+ Record(最接近 dataclass,不可变)
public record ChildChunk(
String chunkId,
String parentId,
String content,
int page,
int chunkIndex,
List<String> allowedRoles // Record 没有 default_factory,需在构造器处理
) {
public ChildChunk {
if (allowedRoles == null) allowedRoles = new ArrayList<>();
}
}
// 方式二:Lombok @Data(可变,功能最接近)
@Data
public class ChildChunk {
private String chunkId;
private String parentId;
private String content;
private int page;
private int chunkIndex;
private List<String> allowedRoles = new ArrayList<>();
}关键差异:Python @dataclass 默认可变(字段可修改),加 frozen=True 变不可变;Java Record 天生不可变;Lombok @Data 生成全套 getter/setter,功能最丰富但需要注解处理器。
出处:app/parsers/chunker.py
访问不存在的键时自动插入默认值,省去 if key not in dict 的判断。
Python(本项目)
from collections import defaultdict
rrf_scores: dict[str, float] = defaultdict(float) # 默认值工厂:float() == 0.0
for results in result_lists:
for rank, chunk in enumerate(results, start=1):
rrf_scores[chunk.chunk_id] += 1.0 / (60 + rank) # 首次访问自动初始化为 0.0Java 等价写法
// 方式一:Map.getOrDefault(每次取值时指定默认值)
Map<String, Double> rrfScores = new HashMap<>();
for (List<ChunkResult> results : resultLists) {
for (int rank = 1; rank <= results.size(); rank++) {
String chunkId = results.get(rank - 1).getChunkId();
double current = rrfScores.getOrDefault(chunkId, 0.0);
rrfScores.put(chunkId, current + 1.0 / (60 + rank));
}
}
// 方式二:computeIfAbsent(更接近 defaultdict 语义)
rrfScores.computeIfAbsent(chunkId, k -> 0.0);
rrfScores.merge(chunkId, 1.0 / (60 + rank), Double::sum);
// 方式三:merge 一步完成(最简洁)
rrfScores.merge(chunkId, 1.0 / (60 + rank), Double::sum);关键差异:Python defaultdict 在构造时绑定默认值工厂,此后所有访问都透明处理;Java 没有原生等价物,merge 方法是最接近的单行写法,但每次都要显式传入默认值和合并函数。
出处:app/pipeline/query_pipeline.py
将两个列表配对 → 按某字段排序 → 切片取 Top-K 的函数式惯用法。
Python(本项目)
scores: list[float] = model.predict(pairs).tolist()
# zip:将两个等长列表打包为 [(score, chunk), ...]
# sorted:按 lambda 提取的键降序排列
# x[0]:取元组第一个元素(分数)
scored = sorted(zip(scores, chunks), key=lambda x: x[0], reverse=True)
for score, chunk in scored[:top_k]: # 切片取前 K 条,解包元组
chunk.score = float(score)Java 等价写法
List<Float> scores = model.predict(pairs);
// 将分数和 chunk 配对(Java 无内置 zip,需手动用索引)
List<Map.Entry<Float, ChunkResult>> scored = new ArrayList<>();
for (int i = 0; i < chunks.size(); i++) {
scored.add(Map.entry(scores.get(i), chunks.get(i)));
}
// 按分数降序排列
scored.sort((a, b) -> Float.compare(b.getKey(), a.getKey()));
// 取前 topK 条并更新分数
scored.stream()
.limit(topK)
.forEach(entry -> entry.getValue().setScore(entry.getKey()));
// Java 16+ 可用 record 配对更优雅
record ScoredChunk(float score, ChunkResult chunk) {}
List<ScoredChunk> scored = IntStream.range(0, chunks.size())
.mapToObj(i -> new ScoredChunk(scores.get(i), chunks.get(i)))
.sorted(Comparator.comparingDouble(ScoredChunk::score).reversed())
.limit(topK)
.toList();关键差异:Python zip 是内置函数,sorted + lambda 一行完成;Java 需要手动用索引配对(无内置 zip),排序用 Comparator 链式 API,代码量约为 Python 的 3 倍。
出处:app/services/reranker.py
将多条命令打包一次发送,减少网络往返,类似 JDBC 的 addBatch/executeBatch。
Python(本项目)
pipe = redis_client.pipeline() # 开始积累命令,不立即发送
pipe.setex(f"parent:{parent_id}", ttl, json.dumps(content))
pipe.sadd(f"doc_parents:{doc_id}:{doc_version}", parent_id)
pipe.expire(f"doc_parents:{doc_id}:{doc_version}", ttl)
await pipe.execute() # 一次性发送所有命令Java 等价写法
// Lettuce(异步 Redis 客户端)Pipeline
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisAsyncCommands<String, String> async = connection.async();
async.setAutoFlushCommands(false); // 禁止自动发送,开始批量积累
RedisFuture<String> f1 = async.setex("parent:" + parentId, ttl, toJson(content));
RedisFuture<Long> f2 = async.sadd("doc_parents:" + docId + ":" + version, parentId);
RedisFuture<Boolean> f3 = async.expire("doc_parents:" + docId + ":" + version, ttl);
async.flushCommands(); // 一次性发送
LettuceFutures.awaitAll(1, TimeUnit.SECONDS, f1, f2, f3);
// Jedis Pipeline
try (Pipeline pipeline = jedis.pipelined()) {
pipeline.setex("parent:" + parentId, (int) ttl, toJson(content));
pipeline.sadd("doc_parents:" + docId, parentId);
pipeline.expire("doc_parents:" + docId, (int) ttl);
pipeline.sync(); // 发送并等待结果
}关键差异:语义完全一致(批量减少 RTT),Python await pipe.execute() 天然异步;Java Jedis 是同步的,Lettuce 异步但 API 更复杂。
出处:app/services/redis_client.py
捕获 SIGTERM/SIGINT 实现"处理完当前消息再退出"的优雅停机。
Python(本项目)
import signal
_running = True
def _handle_shutdown(signum, frame):
global _running
_running = False # 设置标志,不强制中断
signal.signal(signal.SIGTERM, _handle_shutdown)
signal.signal(signal.SIGINT, _handle_shutdown)
async def consume():
async for msg in consumer:
if not _running:
break # 当前消息处理完才退出
await process(msg)
await consumer.commit()Java 等价写法
// Java 方式一:Runtime.addShutdownHook(JVM 关闭钩子)
// SIGTERM/SIGINT 会触发 JVM 关闭流程,自动执行注册的 hook
volatile boolean running = true;
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
running = false;
// 等待当前消息处理完成
try { Thread.sleep(5000); } catch (InterruptedException ignored) {}
}));
// 消费循环
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
consumer.commitSync();
}
}
// Java 方式二:Spring Kafka @KafkaListener 的优雅停机
// Spring Boot 内置支持,application.yml 配置即可:
// spring.kafka.listener.ack-mode: manual
// spring.kafka.listener.phase: 2147483647 (最后关闭)关键差异:Python signal.signal 直接注册回调,信号处理函数中可修改任意全局变量;Java 的 ShutdownHook 运行在新线程,需注意线程安全。Spring Boot 项目通常直接用框架的优雅停机机制,不需要手动处理信号。
出处:app/kafka/consumer.py
FastAPI 以 Python 类型注解驱动依赖注入、请求校验、响应序列化和 OpenAPI 文档,等价于 Spring MVC + Spring Boot 自动配置。
Python(本项目)
# 依赖注入:声明参数类型,框架自动注入(无需任何注解)
async def trigger_ingest(
message: IngestMessage, # 自动从请求体解析+校验
background_tasks: BackgroundTasks, # FastAPI 内置类型,自动注入
):
background_tasks.add_task(run_ingest, message)
# 流式响应
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache"},
)
# HTTP 错误
raise HTTPException(status_code=500, detail=result.error)Java 等价写法
// Spring MVC 等价实现
@RestController
@RequestMapping("/ingest")
public class IngestController {
@PostMapping
public ResponseEntity<IngestResult> triggerIngest(
@RequestBody @Valid IngestMessage message, // @RequestBody + @Valid 两个注解
@Autowired TaskExecutor taskExecutor // @Autowired 依赖注入
) {
taskExecutor.execute(() -> ingestPipeline.run(message));
return ResponseEntity.ok(new IngestResult(...));
}
// 流式响应(Spring MVC + SseEmitter)
@GetMapping("/stream")
public SseEmitter stream() {
SseEmitter emitter = new SseEmitter(0L);
executorService.submit(() -> {
streamQuery().forEach(token -> {
try { emitter.send(token); }
catch (IOException e) { emitter.completeWithError(e); }
});
emitter.complete();
});
return emitter;
}
// HTTP 错误
@ExceptionHandler
public ResponseEntity<ErrorResponse> handleError(IngestException e) {
return ResponseEntity.status(500)
.body(new ErrorResponse(e.getMessage()));
}
}关键差异:FastAPI 通过类型注解自动完成参数解析、校验、注入,每个 endpoint 只需声明函数签名;Spring MVC 需要 @RequestBody、@Valid、@Autowired、@ExceptionHandler 等多个注解,且流式响应需要 SseEmitter 配合线程池,写法更繁琐。
出处:app/api/ingest.py、app/api/chat.py
以 Pydantic 模型为共享状态,节点函数只负责自己的字段,框架自动 merge 更新。
Python(本项目)
from langgraph.graph import StateGraph, START, END
builder = StateGraph(QueryState)
# 节点:接收完整状态,只返回需要变更的字段
async def step3_rrf_merge(state: QueryState) -> dict:
merged = _rrf_merge([state.milvus_results, state.es_results])
return {"merged_results": merged} # 框架自动 merge 回 QueryState
builder.add_node("rrf_merge", step3_rrf_merge)
builder.add_edge("parallel_retrieve", "rrf_merge")
# 条件边:运行时动态路由
builder.add_conditional_edges(
"confidence_check",
lambda s: "parent_recall" if s.is_confident else "fallback",
{"parent_recall": "parent_recall", "fallback": "fallback"},
)
graph = builder.compile()
result = await graph.ainvoke(initial_state)Java 等价写法
// Java 没有直接等价的框架,需用 Spring State Machine 或自行实现
// Spring State Machine 方式
@Configuration
@EnableStateMachine
public class QueryStateMachineConfig extends StateMachineConfigurerAdapter<States, Events> {
@Override
public void configure(StateMachineTransitionConfigurer<States, Events> transitions) throws Exception {
transitions
.withExternal().source(QUERY_REWRITE).target(PARALLEL_RETRIEVE).event(NEXT)
.and()
.withExternal().source(PARALLEL_RETRIEVE).target(RRF_MERGE).event(NEXT)
// 条件转换
.withChoice().source(CONFIDENCE_CHECK)
.first(PARENT_RECALL, ctx -> ctx.getExtendedState()
.get("isConfident", Boolean.class))
.last(FALLBACK);
}
}
// 手动链式调用(更常见):
QueryState state = new QueryState(query, tenantId, userRole, sessionId);
state = step1QueryRewrite(state);
state = step2ParallelRetrieve(state);
state = step3RrfMerge(state);
// ...条件分支
if (state.isConfident()) {
state = step7ParentRecall(state);
state = step8Generate(state);
} else {
state = stepFallback(state);
}关键差异:LangGraph 提供声明式图结构,节点函数无需关心调用顺序和状态传递,框架负责路由和 merge;Java 通常需要手动编排调用链,或引入 Spring State Machine(配置复杂,远不如 LangGraph 简洁)。
出处:app/pipeline/query_pipeline.py
Python(本项目)
# 非流式调用 + Prompt Caching
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
system=SYSTEM_PROMPT,
messages=[{
"role": "user",
"content": [
{
"type": "text",
"text": f"【参考资料】\n{context}",
"cache_control": {"type": "ephemeral"}, # 标记此块可缓存(5分钟TTL)
},
{"type": "text", "text": f"【用户问题】\n{query}"},
],
}],
betas=["prompt-caching-2024-07-31"],
)
answer = response.content[0].text
# 流式调用
with client.messages.stream(model=..., messages=...) as stream:
for text in stream.text_stream:
yield textJava 等价写法
// 使用 Anthropic Java SDK(或通过 OpenAI 兼容接口)
// 非流式调用
AnthropicClient client = AnthropicClient.builder()
.apiKey(System.getenv("ANTHROPIC_API_KEY"))
.build();
Message response = client.messages().create(
CreateMessageParams.builder()
.model("claude-sonnet-4-6")
.maxTokens(1024)
.system(SYSTEM_PROMPT)
.addUserMessage(ContentBlock.builder()
.type("text")
.text("【参考资料】\n" + context)
.cacheControl(CacheControl.builder().type("ephemeral").build())
.build())
.addUserMessage(ContentBlock.text("【用户问题】\n" + query))
.build()
);
String answer = response.content().get(0).text();
// 流式调用(SSE)
client.messages().stream(params)
.textStream()
.forEach(text -> sseEmitter.send(text));关键差异:Python SDK 的流式调用用同步 with + for 即可,代码直观;Java SDK 通常基于响应式流(Flux/Publisher),需要订阅者模型。Prompt Caching 的 cache_control 字段两个 SDK 格式相同,但 Python 直接传字典,Java 需要 Builder 构建对象。
出处:app/pipeline/query_pipeline.py
| Python 特性 | Java 等价 | 代码量对比 |
|---|---|---|
int | None |
Optional<Integer> / @Nullable |
Python 更简洁 |
Pydantic BaseModel |
Jackson + Bean Validation + SpringDoc | Python 1个类 vs Java 3个库 |
BaseSettings |
@ConfigurationProperties + YAML |
相当 |
async/await |
Project Reactor Mono/Flux |
Python 更直观 |
async def + yield |
Flux.create() + sink.next() |
Python 更直观 |
asyncio.gather |
CompletableFuture.allOf |
相当 |
| 列表推导式 | stream().map().collect() |
Python 更简洁 |
with 语句 |
try-with-resources |
完全等价 |
| 懒加载单例 | 双重检查锁 / 静态内部类 | Java 需处理线程安全 |
延迟 import |
反射 / 工厂模式 | Python 原生支持 |
@dataclass |
record / Lombok @Data |
Python 更简洁 |
defaultdict |
Map.merge() / computeIfAbsent |
相当 |
zip + sorted + lambda |
IntStream + Comparator |
Python 更简洁 |
| Redis Pipeline | Jedis/Lettuce Pipeline | 完全等价 |
signal.signal |
Runtime.addShutdownHook |
相当 |
| FastAPI 依赖注入 | Spring @Autowired + @RequestBody |
Python 注解更少 |
| LangGraph 状态机 | Spring State Machine / 手动编排 | Python 声明式更简洁 |
| Anthropic SDK 流式 | Anthropic Java SDK + Flux | 相当 |