Skip to content

fsharechat/python-ai

Repository files navigation

Enterprise RAG — Python AI Layer

企业级 RAG(检索增强生成)系统的 Python AI 服务层,基于 FastAPI + LangGraph + Claude 构建。


项目 Python 核心语法总结(含 Java 对比)

本节梳理项目中大量使用的 Python 语言特性,每个特性附上项目内真实代码片段,并与 Java 等价写法对比,帮助有 Java 背景的开发者快速建立映射关系。


1. 类型注解(Type Hints)

Python 3.10+ 原生联合类型和泛型,无需从 typing 导入。

Python(本项目)

# 联合类型:int 或 None
old_version: int | None = None

# 原生泛型(Python 3.9+,无需 import)
allowed_roles: list[str] = []
chunk_map: dict[str, ChunkResult] = {}

# 函数返回值类型
def split_pages(...) -> tuple[list[ParentChunk], list[ChildChunk]]:
    ...

async def stream_query(...) -> AsyncIterator[str]:
    ...

Java 等价写法

// 可空类型:Java 用 @Nullable 注解或 Optional<T> 表达
@Nullable Integer oldVersion = null;
Optional<Integer> oldVersion = Optional.empty();

// 泛型集合:Java 必须写完整泛型参数
List<String> allowedRoles = new ArrayList<>();
Map<String, ChunkResult> chunkMap = new HashMap<>();

// 函数返回值(Java 没有元组,需自定义 record 或 Pair)
record SplitResult(List<ParentChunk> parents, List<ChildChunk> children) {}
SplitResult splitPages(...) { ... }

// 异步流:Java 用 Flux<String>(Project Reactor)或 Stream<String>
Flux<String> streamQuery(...) { ... }

关键差异:Python 类型注解是可选的(运行时不强制),主要供静态分析工具(mypy、IDE)使用;Java 类型在编译期严格检查,不可省略。

出处app/models/schemas.pyapp/parsers/chunker.pyapp/pipeline/query_pipeline.py


2. Pydantic 数据模型

BaseModel 做数据校验、序列化和文档生成,是 FastAPI 的核心依赖。

Python(本项目)

from pydantic import BaseModel, Field
from typing import Literal

class IngestMessage(BaseModel):
    tenant_id: str
    doc_version: int = 1                               # 默认值
    operation: Literal["create", "update"] = "create" # 枚举约束
    old_version: int | None = None                     # 可选字段
    allowed_roles: list[str] = []

    created_at: datetime = Field(default_factory=datetime.utcnow)

# 不可变更新:返回新对象,原对象不变
state = state.model_copy(update={"answer": "...", "sources": [...]})

Java 等价写法

// Java 需要 Jackson + Bean Validation 注解组合实现同等功能
public class IngestMessage {
    @NotNull
    private String tenantId;

    private int docVersion = 1;

    @Pattern(regexp = "create|update")   // 或用枚举类型
    private String operation = "create";

    @Nullable
    private Integer oldVersion;

    private List<String> allowedRoles = new ArrayList<>();

    // 默认值工厂:Java 需在构造器或字段初始化中处理
    private LocalDateTime createdAt = LocalDateTime.now();
}

// 不可变更新:Java 用 Builder 模式或 Lombok @With
IngestMessage updated = original.toBuilder()
    .answer("...")
    .sources(List.of(...))
    .build();

关键差异:Python 一个 BaseModel 类同时获得校验、序列化、OpenAPI Schema 生成;Java 需要 Jackson(序列化)+ Bean Validation(校验)+ SpringDoc(文档)三个库分别配置。

出处app/models/schemas.pyapp/pipeline/query_pipeline.py


3. pydantic-settings 配置管理

BaseSettings 自动从环境变量或 .env 文件加载配置,字段名自动转大写匹配。

Python(本项目)

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    anthropic_api_key: str          # 必填,缺失则启动报错
    claude_model: str = "claude-sonnet-4-6"
    milvus_top_k: int = 20          # 自动从 MILVUS_TOP_K 环境变量读取

    class Config:
        env_file = ".env"           # 优先级:环境变量 > .env > 默认值

settings = Settings()               # 模块级单例

Java 等价写法

// Spring Boot application.yml + @ConfigurationProperties
@ConfigurationProperties(prefix = "app")
@Configuration
public class Settings {

    @NotBlank
    private String anthropicApiKey;            // 对应 app.anthropic-api-key

    private String claudeModel = "claude-sonnet-4-6";
    private int milvusTopK = 20;

    // getters/setters ...
}

// application.yml 中配置:
// app:
//   anthropic-api-key: ${ANTHROPIC_API_KEY}   # 从环境变量读取
//   milvus-top-k: 20

关键差异:Python 一个类定义即完成配置绑定;Java 需要 @ConfigurationProperties + @Configuration + YAML 文件三层配合,且 Spring Boot 的环境变量映射规则(kebab-case vs UPPER_CASE)比 Python 复杂。

出处app/config.py


4. async / await 异步编程

FastAPI 基于 asyncio 单线程事件循环,I/O 等待时切换协程,而非为每个请求创建线程。

Python(本项目)

async def embed_query(query: str) -> list[float]:
    # await 挂起当前协程,事件循环去处理其他请求,I/O 完成后恢复
    cached = await redis_client.get(f"emb:{key}")
    if cached:
        return json.loads(cached)
    vector = get_model().encode(query).tolist()
    await redis_client.setex(key, ttl, json.dumps(vector))
    return vector

# 并发:两个检索同时进行,总耗时 ≈ max(Milvus耗时, ES耗时)
milvus_results, es_results = await asyncio.gather(
    milvus_client.search(...),
    es_client.search(...),
)

Java 等价写法

// Spring WebFlux(Project Reactor)的响应式写法
public Mono<List<Float>> embedQuery(String query) {
    return redisClient.get("emb:" + key)
        .flatMap(cached -> {
            if (cached != null) return Mono.just(parseJson(cached));
            List<Float> vector = model.encode(query);
            return redisClient.setex(key, ttl, toJson(vector))
                              .thenReturn(vector);
        });
}

// 并发:等价于 asyncio.gather
Mono.zip(
    milvusClient.search(...),
    esClient.search(...)
).subscribe(tuple -> {
    List<ChunkResult> milvusResults = tuple.getT1();
    List<ChunkResult> esResults     = tuple.getT2();
});

// 传统 Spring MVC:用 CompletableFuture
CompletableFuture<List<ChunkResult>> f1 = CompletableFuture.supplyAsync(() -> milvusSearch(...));
CompletableFuture<List<ChunkResult>> f2 = CompletableFuture.supplyAsync(() -> esSearch(...));
CompletableFuture.allOf(f1, f2).join();

关键差异:Python async/await 是语言级语法糖,代码逻辑与同步代码几乎一致,只需加 async/await 关键字;Java 响应式编程(Reactor/RxJava)需要链式 flatMap/map/zip,思维模型完全不同,学习成本高。

出处app/services/embedding.pyapp/pipeline/query_pipeline.py


5. 异步生成器(AsyncIterator / yield)

函数内含 yield 即成为生成器;async def + yield 是异步生成器,按需逐个产出值。

Python(本项目)

# yield 使函数变为生成器:每次 yield 暂停,调用方取值后继续
async def stream_query(...) -> AsyncIterator[str]:
    # ... 前序步骤 ...
    with _anthropic.messages.stream(...) as stream:
        for text in stream.text_stream:
            yield text              # 产出一个 token,暂停等待消费

# 调用方:async for 驱动生成器逐步执行
async def event_generator():
    async for token in stream_query(...):
        yield f"data: {json.dumps({'token': token})}\n\n"
    yield "data: [DONE]\n\n"

Java 等价写法

// Java 没有原生生成器语法,需用 Flux(响应式流)模拟
public Flux<String> streamQuery(...) {
    return Flux.create(sink -> {
        // ... 前序步骤 ...
        anthropicClient.streamMessages(...)
            .doOnNext(text -> sink.next(text))   // 等价于 yield text
            .doOnComplete(sink::complete)
            .subscribe();
    });
}

// 调用方消费
streamQuery(...).subscribe(token -> {
    sendSseEvent("{\"token\": \"" + token + "\"}");
});

// 或 Spring MVC SseEmitter
SseEmitter emitter = new SseEmitter();
streamQuery(...).subscribe(
    token -> emitter.send(token),
    emitter::completeWithError,
    emitter::complete
);

关键差异:Python yield 是协程级暂停恢复,代码写起来像同步逻辑;Java 需要将逻辑拆碎放进回调/lambda,执行顺序不直观。

出处app/pipeline/query_pipeline.pyapp/api/chat.py


6. asyncio 并发工具

Python(本项目)

# gather:并发执行,等所有完成,结果顺序与传入顺序一致
await asyncio.gather(
    asyncio.to_thread(milvus_client.delete_by_doc_version, ...),
    es_client.delete_by_doc_version(...),
    delete_parent_chunks_by_version(...),
)

# to_thread:将同步阻塞函数放到线程池,不阻塞事件循环
result = await asyncio.to_thread(sync_heavy_function, arg1, arg2)

# create_task:后台运行,不等待结果(fire-and-forget)
asyncio.create_task(_delete_old_version(tenant_id, doc_id, old_version))

Java 等价写法

// CompletableFuture.allOf:等价于 asyncio.gather
CompletableFuture<Void> all = CompletableFuture.allOf(
    CompletableFuture.runAsync(() -> milvusClient.deleteByDocVersion(...)),
    CompletableFuture.runAsync(() -> esClient.deleteByDocVersion(...)),
    CompletableFuture.runAsync(() -> redisClient.deleteParentChunks(...))
);
all.join();  // 等待全部完成

// 线程池提交同步任务:等价于 asyncio.to_thread
ExecutorService executor = Executors.newFixedThreadPool(4);
Future<Result> future = executor.submit(() -> syncHeavyFunction(arg1, arg2));

// fire-and-forget:等价于 create_task
CompletableFuture.runAsync(() -> deleteOldVersion(tenantId, docId, oldVersion));

关键差异:Java 的 CompletableFuture 基于多线程,每个任务占用一个线程;Python asyncio 是单线程事件循环,协程切换开销极小,更适合大量 I/O 并发。

出处app/pipeline/ingest_pipeline.py


7. 推导式(Comprehensions)

Python 独有的简洁集合构建语法,Java 需要 Stream API 实现等效逻辑。

Python(本项目)

# 列表推导式:一行完成 map 操作
child_texts = [c.content for c in children]

# 带条件过滤:相当于 filter + map
filtered = [line for line in lines if line.strip() not in repeated]

# 集合推导式(去重)
repeated = {line for line, cnt in line_count.items() if cnt >= 3}

# 嵌套 enumerate:同时取索引和值
milvus_rows = [
    {"chunk_id": c.chunk_id, "embedding": vectors[i]}
    for i, c in enumerate(children)
]

Java 等价写法

// Java Stream API 实现等效逻辑(更冗长)
List<String> childTexts = children.stream()
    .map(c -> c.getContent())
    .collect(Collectors.toList());

// 带条件过滤
List<String> filtered = lines.stream()
    .filter(line -> !repeated.contains(line.strip()))
    .collect(Collectors.toList());

// 集合(Set)去重
Set<String> repeated = lineCount.entrySet().stream()
    .filter(e -> e.getValue() >= 3)
    .map(Map.Entry::getKey)
    .collect(Collectors.toSet());

// 带索引:Java Stream 无原生 enumerate,需 IntStream
List<Map<String, Object>> milvusRows = IntStream.range(0, children.size())
    .mapToObj(i -> Map.of(
        "chunk_id", children.get(i).getChunkId(),
        "embedding", vectors.get(i)
    ))
    .collect(Collectors.toList());

关键差异:Python 推导式在一行内完成,可直接用于字典/集合/列表;Java Stream 需要 .stream() → 中间操作 → .collect() 三段式,且字典推导没有直接等价物(需 Collectors.toMap)。

出处app/parsers/pdf_parser.pyapp/pipeline/ingest_pipeline.pyapp/api/chat.py


8. 上下文管理器(with 语句)

with 确保资源在块退出时自动释放,等价于 Java 的 try-with-resources。

Python(本项目)

# 文件资源自动关闭
with fitz.open(pdf_path) as doc:
    for page in doc:
        text = page.get_text("text")
# 离开 with 块后,doc 自动关闭,无论是否抛出异常

# HTTP 流式连接自动关闭
with _anthropic.messages.stream(...) as stream:
    for text in stream.text_stream:
        yield text

Java 等价写法

// Java 7+ try-with-resources:与 Python with 语义完全一致
// 要求资源类实现 AutoCloseable 接口
try (PdfDocument doc = fitz.open(pdfPath)) {
    for (Page page : doc) {
        String text = page.getText();
    }
}  // 离开 try 块后自动调用 doc.close()

// 流式 HTTP 连接
try (MessageStream stream = anthropicClient.stream(...)) {
    for (String text : stream.textStream()) {
        yield(text);
    }
}

关键差异:语义完全一致,Python 用 with ... as,Java 用 try (...)。Python 任何实现了 __enter__/__exit__ 方法的对象都可用 with;Java 要求实现 AutoCloseable 接口。

出处app/parsers/pdf_parser.pyapp/pipeline/query_pipeline.pyapp/pipeline/ingest_pipeline.py


9. 懒加载单例(Lazy Singleton)

模型文件 GB 级,用模块级变量 + global 关键字实现首次访问时才初始化。

Python(本项目)

_model: SentenceTransformer | None = None  # 模块级变量

def get_model() -> SentenceTransformer:
    global _model                   # 声明要修改模块级变量
    if _model is None:
        _model = SentenceTransformer("BAAI/bge-m3")
    return _model

Java 等价写法

// 方式一:双重检查锁定(线程安全懒加载)
public class ModelHolder {
    private static volatile SentenceTransformer model = null;

    public static SentenceTransformer getModel() {
        if (model == null) {
            synchronized (ModelHolder.class) {
                if (model == null) {
                    model = new SentenceTransformer("BAAI/bge-m3");
                }
            }
        }
        return model;
    }
}

// 方式二:静态内部类(推荐,JVM 保证线程安全)
public class ModelHolder {
    private static class Holder {
        static final SentenceTransformer INSTANCE =
            new SentenceTransformer("BAAI/bge-m3");
    }
    public static SentenceTransformer getModel() {
        return Holder.INSTANCE;
    }
}

// 方式三:Spring @Bean + @Lazy(Spring 项目常用)
@Bean
@Lazy
public SentenceTransformer sentenceTransformer() {
    return new SentenceTransformer("BAAI/bge-m3");
}

关键差异:Python 是单线程事件循环,无需考虑线程安全,写法极简;Java 多线程环境需要 volatile + synchronized 或借助 JVM 类加载机制保证安全。

出处app/services/embedding.pyapp/services/reranker.pyapp/services/milvus_client.py


10. 延迟导入(Lazy Import)

import 放在函数体内,只在真正需要时触发,减少冷启动时间。

Python(本项目)

def parse_scanned_pdf(pdf_path: str) -> list[dict]:
    import pdf2image       # 只有扫描件才导入,普通 PDF 不加载此库
    import pytesseract
    from PIL import Image
    ...

def describe_image(image_bytes: bytes) -> str:
    from anthropic import Anthropic   # 只在图片描述时才导入
    ...

Java 等价写法

// Java 没有"延迟导入"的概念:import 是编译期静态声明,不影响运行时加载时机
// Java 实现类似效果需要反射或 ServiceLoader

// 反射方式(等价于"按需加载某个类")
public List<Map<String, Object>> parseScannedPdf(String path) {
    try {
        Class<?> ocrClass = Class.forName("com.example.TesseractOCR");
        Object ocr = ocrClass.getDeclaredConstructor().newInstance();
        // 通过反射调用方法...
    } catch (ClassNotFoundException e) {
        throw new RuntimeException("OCR library not available", e);
    }
}

// 更常见做法:用接口 + 多实现 + 工厂方法,按需选择实现
PdfParser parser = PdfParserFactory.create(fileType);

关键差异:Python 的 import 本身是运行时操作,放在函数内即实现延迟;Java 类加载由 JVM 在首次使用时自动完成,开发者无法通过 import 语句控制加载时机。

出处app/parsers/pdf_parser.py


11. dataclass

@dataclass 装饰器自动生成 __init____repr____eq__ 等样板方法。

Python(本项目)

from dataclasses import dataclass, field

@dataclass
class ChildChunk:
    chunk_id: str
    parent_id: str
    content: str
    page: int
    chunk_index: int
    # field(default_factory=list):每个实例独立列表,避免共享引用
    allowed_roles: list[str] = field(default_factory=list)

# 自动生成:__init__(self, chunk_id, parent_id, ..., allowed_roles=None)
# 自动生成:__repr__ 打印所有字段
chunk = ChildChunk(chunk_id="abc", parent_id="xyz", content="...", page=1, chunk_index=0)

Java 等价写法

// 方式一:Java 16+ Record(最接近 dataclass,不可变)
public record ChildChunk(
    String chunkId,
    String parentId,
    String content,
    int page,
    int chunkIndex,
    List<String> allowedRoles   // Record 没有 default_factory,需在构造器处理
) {
    public ChildChunk {
        if (allowedRoles == null) allowedRoles = new ArrayList<>();
    }
}

// 方式二:Lombok @Data(可变,功能最接近)
@Data
public class ChildChunk {
    private String chunkId;
    private String parentId;
    private String content;
    private int page;
    private int chunkIndex;
    private List<String> allowedRoles = new ArrayList<>();
}

关键差异:Python @dataclass 默认可变(字段可修改),加 frozen=True 变不可变;Java Record 天生不可变;Lombok @Data 生成全套 getter/setter,功能最丰富但需要注解处理器。

出处app/parsers/chunker.py


12. collections.defaultdict

访问不存在的键时自动插入默认值,省去 if key not in dict 的判断。

Python(本项目)

from collections import defaultdict

rrf_scores: dict[str, float] = defaultdict(float)  # 默认值工厂:float() == 0.0

for results in result_lists:
    for rank, chunk in enumerate(results, start=1):
        rrf_scores[chunk.chunk_id] += 1.0 / (60 + rank)  # 首次访问自动初始化为 0.0

Java 等价写法

// 方式一:Map.getOrDefault(每次取值时指定默认值)
Map<String, Double> rrfScores = new HashMap<>();

for (List<ChunkResult> results : resultLists) {
    for (int rank = 1; rank <= results.size(); rank++) {
        String chunkId = results.get(rank - 1).getChunkId();
        double current = rrfScores.getOrDefault(chunkId, 0.0);
        rrfScores.put(chunkId, current + 1.0 / (60 + rank));
    }
}

// 方式二:computeIfAbsent(更接近 defaultdict 语义)
rrfScores.computeIfAbsent(chunkId, k -> 0.0);
rrfScores.merge(chunkId, 1.0 / (60 + rank), Double::sum);

// 方式三:merge 一步完成(最简洁)
rrfScores.merge(chunkId, 1.0 / (60 + rank), Double::sum);

关键差异:Python defaultdict 在构造时绑定默认值工厂,此后所有访问都透明处理;Java 没有原生等价物,merge 方法是最接近的单行写法,但每次都要显式传入默认值和合并函数。

出处app/pipeline/query_pipeline.py


13. zip + sorted + lambda

将两个列表配对 → 按某字段排序 → 切片取 Top-K 的函数式惯用法。

Python(本项目)

scores: list[float] = model.predict(pairs).tolist()

# zip:将两个等长列表打包为 [(score, chunk), ...]
# sorted:按 lambda 提取的键降序排列
# x[0]:取元组第一个元素(分数)
scored = sorted(zip(scores, chunks), key=lambda x: x[0], reverse=True)

for score, chunk in scored[:top_k]:   # 切片取前 K 条,解包元组
    chunk.score = float(score)

Java 等价写法

List<Float> scores = model.predict(pairs);

// 将分数和 chunk 配对(Java 无内置 zip,需手动用索引)
List<Map.Entry<Float, ChunkResult>> scored = new ArrayList<>();
for (int i = 0; i < chunks.size(); i++) {
    scored.add(Map.entry(scores.get(i), chunks.get(i)));
}

// 按分数降序排列
scored.sort((a, b) -> Float.compare(b.getKey(), a.getKey()));

// 取前 topK 条并更新分数
scored.stream()
    .limit(topK)
    .forEach(entry -> entry.getValue().setScore(entry.getKey()));

// Java 16+ 可用 record 配对更优雅
record ScoredChunk(float score, ChunkResult chunk) {}
List<ScoredChunk> scored = IntStream.range(0, chunks.size())
    .mapToObj(i -> new ScoredChunk(scores.get(i), chunks.get(i)))
    .sorted(Comparator.comparingDouble(ScoredChunk::score).reversed())
    .limit(topK)
    .toList();

关键差异:Python zip 是内置函数,sorted + lambda 一行完成;Java 需要手动用索引配对(无内置 zip),排序用 Comparator 链式 API,代码量约为 Python 的 3 倍。

出处app/services/reranker.py


14. Redis Pipeline(批量命令)

将多条命令打包一次发送,减少网络往返,类似 JDBC 的 addBatch/executeBatch

Python(本项目)

pipe = redis_client.pipeline()          # 开始积累命令,不立即发送

pipe.setex(f"parent:{parent_id}", ttl, json.dumps(content))
pipe.sadd(f"doc_parents:{doc_id}:{doc_version}", parent_id)
pipe.expire(f"doc_parents:{doc_id}:{doc_version}", ttl)

await pipe.execute()                    # 一次性发送所有命令

Java 等价写法

// Lettuce(异步 Redis 客户端)Pipeline
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisAsyncCommands<String, String> async = connection.async();

async.setAutoFlushCommands(false);      // 禁止自动发送,开始批量积累

RedisFuture<String> f1 = async.setex("parent:" + parentId, ttl, toJson(content));
RedisFuture<Long>   f2 = async.sadd("doc_parents:" + docId + ":" + version, parentId);
RedisFuture<Boolean> f3 = async.expire("doc_parents:" + docId + ":" + version, ttl);

async.flushCommands();                  // 一次性发送
LettuceFutures.awaitAll(1, TimeUnit.SECONDS, f1, f2, f3);

// Jedis Pipeline
try (Pipeline pipeline = jedis.pipelined()) {
    pipeline.setex("parent:" + parentId, (int) ttl, toJson(content));
    pipeline.sadd("doc_parents:" + docId, parentId);
    pipeline.expire("doc_parents:" + docId, (int) ttl);
    pipeline.sync();                    // 发送并等待结果
}

关键差异:语义完全一致(批量减少 RTT),Python await pipe.execute() 天然异步;Java Jedis 是同步的,Lettuce 异步但 API 更复杂。

出处app/services/redis_client.py


15. 信号处理与优雅退出

捕获 SIGTERM/SIGINT 实现"处理完当前消息再退出"的优雅停机。

Python(本项目)

import signal

_running = True

def _handle_shutdown(signum, frame):
    global _running
    _running = False                    # 设置标志,不强制中断

signal.signal(signal.SIGTERM, _handle_shutdown)
signal.signal(signal.SIGINT, _handle_shutdown)

async def consume():
    async for msg in consumer:
        if not _running:
            break                       # 当前消息处理完才退出
        await process(msg)
        await consumer.commit()

Java 等价写法

// Java 方式一:Runtime.addShutdownHook(JVM 关闭钩子)
// SIGTERM/SIGINT 会触发 JVM 关闭流程,自动执行注册的 hook
volatile boolean running = true;

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    running = false;
    // 等待当前消息处理完成
    try { Thread.sleep(5000); } catch (InterruptedException ignored) {}
}));

// 消费循环
while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record);
        consumer.commitSync();
    }
}

// Java 方式二:Spring Kafka @KafkaListener 的优雅停机
// Spring Boot 内置支持,application.yml 配置即可:
// spring.kafka.listener.ack-mode: manual
// spring.kafka.listener.phase: 2147483647  (最后关闭)

关键差异:Python signal.signal 直接注册回调,信号处理函数中可修改任意全局变量;Java 的 ShutdownHook 运行在新线程,需注意线程安全。Spring Boot 项目通常直接用框架的优雅停机机制,不需要手动处理信号。

出处app/kafka/consumer.py


16. FastAPI 核心特性

FastAPI 以 Python 类型注解驱动依赖注入、请求校验、响应序列化和 OpenAPI 文档,等价于 Spring MVC + Spring Boot 自动配置。

Python(本项目)

# 依赖注入:声明参数类型,框架自动注入(无需任何注解)
async def trigger_ingest(
    message: IngestMessage,              # 自动从请求体解析+校验
    background_tasks: BackgroundTasks,   # FastAPI 内置类型,自动注入
):
    background_tasks.add_task(run_ingest, message)

# 流式响应
return StreamingResponse(
    event_generator(),
    media_type="text/event-stream",
    headers={"Cache-Control": "no-cache"},
)

# HTTP 错误
raise HTTPException(status_code=500, detail=result.error)

Java 等价写法

// Spring MVC 等价实现
@RestController
@RequestMapping("/ingest")
public class IngestController {

    @PostMapping
    public ResponseEntity<IngestResult> triggerIngest(
        @RequestBody @Valid IngestMessage message,    // @RequestBody + @Valid 两个注解
        @Autowired TaskExecutor taskExecutor          // @Autowired 依赖注入
    ) {
        taskExecutor.execute(() -> ingestPipeline.run(message));
        return ResponseEntity.ok(new IngestResult(...));
    }

    // 流式响应(Spring MVC + SseEmitter)
    @GetMapping("/stream")
    public SseEmitter stream() {
        SseEmitter emitter = new SseEmitter(0L);
        executorService.submit(() -> {
            streamQuery().forEach(token -> {
                try { emitter.send(token); }
                catch (IOException e) { emitter.completeWithError(e); }
            });
            emitter.complete();
        });
        return emitter;
    }

    // HTTP 错误
    @ExceptionHandler
    public ResponseEntity<ErrorResponse> handleError(IngestException e) {
        return ResponseEntity.status(500)
            .body(new ErrorResponse(e.getMessage()));
    }
}

关键差异:FastAPI 通过类型注解自动完成参数解析、校验、注入,每个 endpoint 只需声明函数签名;Spring MVC 需要 @RequestBody@Valid@Autowired@ExceptionHandler 等多个注解,且流式响应需要 SseEmitter 配合线程池,写法更繁琐。

出处app/api/ingest.pyapp/api/chat.py


17. LangGraph 状态机

以 Pydantic 模型为共享状态,节点函数只负责自己的字段,框架自动 merge 更新。

Python(本项目)

from langgraph.graph import StateGraph, START, END

builder = StateGraph(QueryState)

# 节点:接收完整状态,只返回需要变更的字段
async def step3_rrf_merge(state: QueryState) -> dict:
    merged = _rrf_merge([state.milvus_results, state.es_results])
    return {"merged_results": merged}   # 框架自动 merge 回 QueryState

builder.add_node("rrf_merge", step3_rrf_merge)
builder.add_edge("parallel_retrieve", "rrf_merge")

# 条件边:运行时动态路由
builder.add_conditional_edges(
    "confidence_check",
    lambda s: "parent_recall" if s.is_confident else "fallback",
    {"parent_recall": "parent_recall", "fallback": "fallback"},
)

graph = builder.compile()
result = await graph.ainvoke(initial_state)

Java 等价写法

// Java 没有直接等价的框架,需用 Spring State Machine 或自行实现

// Spring State Machine 方式
@Configuration
@EnableStateMachine
public class QueryStateMachineConfig extends StateMachineConfigurerAdapter<States, Events> {

    @Override
    public void configure(StateMachineTransitionConfigurer<States, Events> transitions) throws Exception {
        transitions
            .withExternal().source(QUERY_REWRITE).target(PARALLEL_RETRIEVE).event(NEXT)
            .and()
            .withExternal().source(PARALLEL_RETRIEVE).target(RRF_MERGE).event(NEXT)
            // 条件转换
            .withChoice().source(CONFIDENCE_CHECK)
                .first(PARENT_RECALL, ctx -> ctx.getExtendedState()
                    .get("isConfident", Boolean.class))
                .last(FALLBACK);
    }
}

// 手动链式调用(更常见):
QueryState state = new QueryState(query, tenantId, userRole, sessionId);
state = step1QueryRewrite(state);
state = step2ParallelRetrieve(state);
state = step3RrfMerge(state);
// ...条件分支
if (state.isConfident()) {
    state = step7ParentRecall(state);
    state = step8Generate(state);
} else {
    state = stepFallback(state);
}

关键差异:LangGraph 提供声明式图结构,节点函数无需关心调用顺序和状态传递,框架负责路由和 merge;Java 通常需要手动编排调用链,或引入 Spring State Machine(配置复杂,远不如 LangGraph 简洁)。

出处app/pipeline/query_pipeline.py


18. Anthropic SDK — Prompt Caching 与流式输出

Python(本项目)

# 非流式调用 + Prompt Caching
response = client.messages.create(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    system=SYSTEM_PROMPT,
    messages=[{
        "role": "user",
        "content": [
            {
                "type": "text",
                "text": f"【参考资料】\n{context}",
                "cache_control": {"type": "ephemeral"},  # 标记此块可缓存(5分钟TTL)
            },
            {"type": "text", "text": f"【用户问题】\n{query}"},
        ],
    }],
    betas=["prompt-caching-2024-07-31"],
)
answer = response.content[0].text

# 流式调用
with client.messages.stream(model=..., messages=...) as stream:
    for text in stream.text_stream:
        yield text

Java 等价写法

// 使用 Anthropic Java SDK(或通过 OpenAI 兼容接口)

// 非流式调用
AnthropicClient client = AnthropicClient.builder()
    .apiKey(System.getenv("ANTHROPIC_API_KEY"))
    .build();

Message response = client.messages().create(
    CreateMessageParams.builder()
        .model("claude-sonnet-4-6")
        .maxTokens(1024)
        .system(SYSTEM_PROMPT)
        .addUserMessage(ContentBlock.builder()
            .type("text")
            .text("【参考资料】\n" + context)
            .cacheControl(CacheControl.builder().type("ephemeral").build())
            .build())
        .addUserMessage(ContentBlock.text("【用户问题】\n" + query))
        .build()
);
String answer = response.content().get(0).text();

// 流式调用(SSE)
client.messages().stream(params)
    .textStream()
    .forEach(text -> sseEmitter.send(text));

关键差异:Python SDK 的流式调用用同步 with + for 即可,代码直观;Java SDK 通常基于响应式流(Flux/Publisher),需要订阅者模型。Prompt Caching 的 cache_control 字段两个 SDK 格式相同,但 Python 直接传字典,Java 需要 Builder 构建对象。

出处app/pipeline/query_pipeline.py


特性对比速查表

Python 特性 Java 等价 代码量对比
int | None Optional<Integer> / @Nullable Python 更简洁
Pydantic BaseModel Jackson + Bean Validation + SpringDoc Python 1个类 vs Java 3个库
BaseSettings @ConfigurationProperties + YAML 相当
async/await Project Reactor Mono/Flux Python 更直观
async def + yield Flux.create() + sink.next() Python 更直观
asyncio.gather CompletableFuture.allOf 相当
列表推导式 stream().map().collect() Python 更简洁
with 语句 try-with-resources 完全等价
懒加载单例 双重检查锁 / 静态内部类 Java 需处理线程安全
延迟 import 反射 / 工厂模式 Python 原生支持
@dataclass record / Lombok @Data Python 更简洁
defaultdict Map.merge() / computeIfAbsent 相当
zip + sorted + lambda IntStream + Comparator Python 更简洁
Redis Pipeline Jedis/Lettuce Pipeline 完全等价
signal.signal Runtime.addShutdownHook 相当
FastAPI 依赖注入 Spring @Autowired + @RequestBody Python 注解更少
LangGraph 状态机 Spring State Machine / 手动编排 Python 声明式更简洁
Anthropic SDK 流式 Anthropic Java SDK + Flux 相当

About

企业级 RAG 系统 Python AI 层 — 基于 FastAPI + Claude + Milvus,支持多租户文档问答与 SSE 流式输出

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors