RAG 知识库如何实现动态与持续更新?
先说结论: 生产级 RAG 的知识库更新,不是“重新跑一遍脚本”,而是围绕 doc_id、content_hash、version_id 和 ACL 建立一套持续同步系统。普通修改最稳的方式是“按文档删除旧 chunk,再重新切分入库”;高
先说结论:生产级 RAG 的知识库更新,不是“重新跑一遍脚本”,而是围绕 doc_id、content_hash、version_id 和 ACL 建立一套持续同步系统。普通修改最稳的方式是“按文档删除旧 chunk,再重新切分入库”;高风险变更走双索引灰度;所有更新都要能幂等、能补偿、能回滚。 |
一、为什么这件事值得单独讲?
很多 RAG Demo 做完后看起来很顺:上传文档、切 chunk、生成向量、用户提问、检索 TopK、交给大模型回答。问题是,一到生产环境,文档开始变化,知识库就会暴露出真实难度。
产品手册会改版,合同模板会更新,客服 FAQ 会调整,政策文件会作废,权限也会变化。如果知识库没有同步,模型即使检索到了资料,也可能检索到的是旧资料、错资料、越权资料。
RAG 知识库更新要解决的不是“怎么写入新文档”,而是五个更底层的问题:
新鲜度:源文档变了,检索结果要跟着变。
准确性:召回内容必须对应当前版本,不能混入旧版本。
一致性:元数据库、向量库、全文索引、缓存要保持同一事实。
安全性:权限变化、删除敏感文档后,检索层必须立即不可见。
可回滚:新策略效果变差时,能快速切回上一个健康版本。
这就是为什么“动态更新”不是边角功能,而是 RAG 从玩具走向生产的必备能力。
二、RAG 更新为什么比普通数据库 UPDATE 更麻烦?
普通数据库更新一条记录,执行 UPDATE 就行。但 RAG 不是一条记录对应一个答案,而是一篇文档先被解析、清洗、切分成多个 chunk,再分别生成 embedding,最后写入向量库。
原始文档与 chunk 是一对多关系,修改后 chunk 边界可能整体变化
这里最容易犯的错,是以为“只改了一段文字,那就只更新对应的一个 chunk”。现实中只要文本前面多了一句话,后面所有 chunk 的边界都可能移动。旧的 chunk_003 可能变成新版本的 chunk_004 的一部分,已经没有稳定的一一对应关系。
所以生产环境更稳的原则是:修改文档时,不要在旧 chunk 上打补丁;按 doc_id 找到这篇文档的旧 chunk,全部下线,再基于新内容重新切分、Embedding、入库。
这看起来暴力,但它可靠。动态更新系统最怕的不是多算几次 embedding,而是旧 chunk 悄悄残留,最后被检索出来误导模型。
三、三种变更:新增、修改、删除
新增最简单:源系统出现新的 doc_id,跑完整入库流程即可。关键是幂等,因为消息队列可能重复投递,任务失败后也可能重跑。
修改最复杂:doc_id 没变,但内容 hash 变了。正确流程是先让旧版本 chunk 不可见,再写入新版本。高风险业务不要直接物理删除旧版本,而是用 version_id 或 latest 标记做版本过滤,确认新版本没问题后再清理旧数据。
删除最敏感:文档从源系统下线、权限被收回、或者被标记为敏感,都必须同步到检索层。删除不能只删元数据库,也不能只删向量库;全文索引、rerank 缓存、问答缓存、引用缓存也要一起处理。
四、变更检测:先粗筛,再精判
动态更新的第一步,是知道哪些文档真的变了。最常见的做法是 updated_at + content_hash。updated_at 用来粗筛,content_hash 用来精判。小林这章也强调,可以通过内容 hash 判断文档是否变化,低频场景用轮询,高频场景用 Webhook 或消息队列。
只看 updated_at 不够,因为有些系统会因为格式化、权限同步、自动保存而更新修改时间,但正文没变。只看 hash 也不够,因为大规模文档每次全量读正文会给源系统和解析服务带来压力。
更稳的处理方式是:
先读取源文档的 doc_id、updated_at、etag 等轻量字段。
只对 updated_at 变化的文档拉取正文。
对正文做归一化,去掉时间戳、页脚、动态广告、随机 ID。
计算 SHA-256 这类稳定 hash。
如果 hash 相同,跳过;如果不同,进入重建流程;如果源文档消失,进入删除流程。
注意:hash 应该基于“规范化后的正文”,不要直接基于原始 HTML 或 PDF 二进制。否则页脚时间变了、广告位变了、解析顺序变了,都会制造假变更。
代码示例 1:基于规范化正文计算 content_hash
import hashlib import re def normalize_text(text: str) -> str: """对正文做稳定化处理,避免页脚时间、连续空格造成假变更。""" text = re.sub(r"更新时间:\d{4}-\d{2}-\d{2}.*", "", text) text = re.sub(r"\s+", " ", text) return text.strip() def content_hash(text: str) -> str: normalized = normalize_text(text) return hashlib.sha256(normalized.encode("utf-8")).hexdigest() def has_changed(doc_id: str, new_text: str, registry) -> bool: new_hash = content_hash(new_text) old_hash = registry.get_content_hash(doc_id) return new_hash != old_hash |
五、文档 ID、chunk ID 和元数据设计
动态更新的底座是元数据。没有 doc_id,就不知道一批 chunk 属于哪篇文档;没有 version_id,就处理不了乱序事件;没有 content_hash,就无法跳过未变化文档;没有 acl,就会发生越权召回。
chunk_id 建议使用确定性 ID,而不是完全随机 UUID。比如 doc_id + version_id + chunk_index + chunk_hash。这样做的好处是:同一份内容重复处理,不会产生一堆重复 chunk;删除某篇文档时,也能通过 doc_id 前缀或 metadata 过滤快速找到所有相关 chunk。
代码示例 2:chunk 元数据示例
{ "doc_id": "refund_policy_001", "chunk_id": "refund_policy_001_v3_0007", "source_id": "confluence_page_9283", "version_id": 3, "content_hash": "sha256:8f12...", "chunk_strategy": "semantic_v2", "embedding_model": "text-embedding-3-large", "embedding_dimension": 3072, "tenant_id": "customer_service", "acl": ["role:cs_agent", "team:refund"], "is_deleted": false, "index_status": "ready", "source_updated_at": "2026-06-29T09:00:00Z" } |
这里有一个硬规则:embedding_model、embedding_dimension、chunk_strategy 必须记录。只要 embedding 模型或切分策略发生变化,旧向量和新向量就不应该混在同一个检索空间里直接排序。更稳的方式是新建索引,完成全量重建后再灰度切流。
六、生产级更新链路怎么搭?
生产链路一般会拆成五层。第一层是源系统,比如 Confluence、Notion、Git、数据库、对象存储。第二层是变更感知,可以是 Webhook、CDC、消息队列,也可以是定时轮询兜底。第三层是处理链路,负责解析、清洗、切分、hash、embedding。第四层是三类存储:元数据库、向量库、全文索引。第五层是质量闭环,包括离线评估、灰度、监控、死信队列、回滚。
为什么要把元数据库放在核心位置?因为向量库和全文索引通常不是同一个事务系统。一次更新可能出现“元数据库写成功、向量库失败”“向量库成功、全文索引失败”的部分成功状态。没有一个事实源记录状态,后续补偿任务就不知道该修哪里。
推荐把元数据库作为 source of truth,每个 chunk 记录 index_status:pending、embedding_done、vector_ready、fulltext_ready、ready、partial_failed。补偿任务定期扫描 partial_failed,重新写失败的那一端。
代码示例 3:元数据库中的 chunk 注册表
CREATE TABLE rag_chunk_registry ( doc_id VARCHAR(128) NOT NULL, chunk_id VARCHAR(256) PRIMARY KEY, version_id BIGINT NOT NULL, content_hash CHAR(64) NOT NULL, source_updated_at TIMESTAMP NOT NULL, embedding_model VARCHAR(128) NOT NULL, chunk_strategy VARCHAR(64) NOT NULL, tenant_id VARCHAR(64) NOT NULL, acl_json JSON NOT NULL, is_deleted BOOLEAN DEFAULT FALSE, index_status VARCHAR(32) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE (doc_id, version_id, chunk_id) ); CREATE INDEX idx_rag_doc_latest ON rag_chunk_registry(doc_id, version_id, is_deleted); |
七、核心流程:先删后增,不做危险的局部更新
在文档修改场景,最可靠的实现是按 doc_id 删除旧版本,再写入新版本。这里的“删除”可以是软删除,也可以是向量库里的物理删除,取决于业务风险。
如果你的业务需要审计和回滚,建议旧版本先软删除或标记 latest=false,不要立刻物理删除。查询时只检索 latest=true 且 is_deleted=false 的数据。确认新版本稳定后,再异步清理历史版本。
代码示例 4:事件驱动的先删后增更新流程
def process_change_event(event): """处理新增、修改、删除事件。要求 event 携带 doc_id、revision、type。""" doc_id = event.doc_id with distributed_lock(f"rag:update:{doc_id}"): current = registry.get_latest_doc_state(doc_id) # 乱序保护:旧事件不能覆盖新事件 if current and event.revision < current.revision: audit.log("stale_event_ignored", doc_id=doc_id, event_revision=event.revision) return if event.type == "delete": registry.mark_deleted(doc_id, revision=event.revision) vector_store.delete(filter={"doc_id": doc_id}) fulltext_index.delete_by_doc_id(doc_id) cache.invalidate_by_doc_id(doc_id) return text = source_loader.load_text(event.source_id) new_hash = content_hash(text) if current and current.content_hash == new_hash: registry.mark_seen(doc_id, revision=event.revision) return # 修改文档:旧 chunk 下线,新版本重新切分入库 registry.mark_old_chunks_not_latest(doc_id) vector_store.delete(filter={"doc_id": doc_id, "latest": True}) fulltext_index.delete_by_doc_id(doc_id) chunks = chunker.split(text) vectors = embedding_model.embed([chunk.text for chunk in chunks]) for i, (chunk, vector) in enumerate(zip(chunks, vectors)): chunk_id = make_chunk_id(doc_id, event.revision, i, chunk.text) metadata = build_metadata(event, chunk_id, new_hash, latest=True) registry.insert_chunk(metadata, status="embedding_done") vector_store.upsert(id=chunk_id, vector=vector, metadata=metadata) fulltext_index.upsert(id=chunk_id, text=chunk.text, metadata=metadata) registry.mark_ready(chunk_id) cache.invalidate_by_doc_id(doc_id) |
这段伪代码里有三个关键点:第一,用分布式锁或文档级顺序消费避免并发覆盖;第二,用 revision 做乱序保护;第三,写完新版本后清理缓存,防止模型继续拿旧上下文回答。
八、向量库层面的操作差异
不同向量库对 upsert、delete、filter、最终一致性的实现不一样。工程上不要假设所有向量库都像数据库一样强一致、可事务。
Milvus 的 upsert 可以按主键插入或更新;override 模式本质上是插入新实体并删除原主键实体,merge 模式支持部分字段更新,但仍要注意主键和 schema 限制。
Pinecone 官方文档建议:当文档 chunk 数量或顺序变化时,先用 metadata filter 删除整篇文档的所有 chunk,再 upsert 新 chunk;同时它也提醒写后立即读可能遇到最终一致性延迟。
Qdrant 的 point 是向量和 payload 的组合,point 修改是异步写 WAL;同 ID 重复上传是幂等覆盖,这对消息队列重复投递很有帮助。
OpenAI Vector Store 删除 vector store file 只是把文件从 vector store 中移除,并不等于删除原始 file;源文件生命周期需要单独管理。
因此,更新链路一定要把“向量库写入状态”和“源文档状态”分开记录,不能只看接口返回成功就默认全链路一致。
九、增量更新、全量重建、灰度切换怎么选?
日常文档变化,应该走增量更新。比如客服 FAQ 一天改几十条,没必要把全库重新切分和 embedding。
但是以下场景更适合全量重建:embedding 模型升级、chunk 策略变化、解析器升级、元数据 schema 调整、增量链路长期异常导致历史数据不可信。全量重建不要在原索引上直接覆盖,更稳的是新建 index_v2,验证通过后通过 alias 或版本过滤切流。
灰度切换的关键不是“能切”,而是“切之前有证据”。至少要准备一组离线评估问题,对比旧索引和新索引的命中率、答案相关性、引用准确性、权限过滤结果。线上小流量期间还要观察追问率、点踩率、无答案率、平均延迟、转人工率。
十、可靠性:动态更新最容易出问题的地方
动态更新链路的难点不在 happy path,而在异常路径。只要接入消息队列,就要接受重复投递;只要跨多个存储,就要接受部分成功;只要源系统和索引系统不同步,就要接受短时间不一致。
几个关键防线:
幂等:用 doc_id + version_id 或 doc_id + content_hash 做唯一约束,重复事件不会产生重复 chunk。
乱序:事件携带 revision 或 updated_at,低版本事件不能覆盖高版本。Kafka 这类队列可以按 doc_id 做 key,让同一文档进入同一分区。
补偿:向量库、全文索引、元数据库任一端失败,都要记录状态,后续 reconciliation 任务对账修复。
死信:永久性错误不要无限重试,进入 DLQ,附带失败原因和原始事件。
权限:ACL 变化也算变更,不能只处理正文变化。
缓存:删除或更新成功后,必须失效引用缓存、rerank 缓存、问答缓存。
代码示例 5:三端一致性补偿任务
def reconcile_index_state(batch_size=1000): """定期对账:以元数据库为事实源,修复向量库和全文索引差异。""" chunks = registry.scan(status_in=["ready", "partial_failed"], limit=batch_size) for chunk in chunks: if chunk.is_deleted: vector_store.delete(id=chunk.chunk_id) fulltext_index.delete(id=chunk.chunk_id) registry.mark_cleaned(chunk.chunk_id) continue vector_exists = vector_store.exists(chunk.chunk_id) text_exists = fulltext_index.exists(chunk.chunk_id) if not vector_exists: vector = embedding_model.embed_one(chunk.text) vector_store.upsert(chunk.chunk_id, vector, chunk.metadata) if not text_exists: fulltext_index.upsert(chunk.chunk_id, chunk.text, chunk.metadata) if vector_exists and text_exists: registry.mark_ready(chunk.chunk_id) else: registry.mark_partial_failed(chunk.chunk_id) |
十一、别只测“能不能跑”
对抗式审查的核心,是主动把系统往坏处想。不要只测试“上传一篇新文档能不能检索到”,还要测试删除是否彻底、旧版本是否残留、权限是否漂移、乱序是否会覆盖、写后读延迟是否会误判、缓存是否仍引用旧答案。
一个合格的动态更新系统,至少要能回答下面这些问题:
如果源系统删除事件丢了,兜底轮询多久能发现?
如果同一文档 v3 先到、v2 后到,系统会不会回退?
如果只写成功向量库,全文索引失败,用户会看到什么?
如果权限从公开改为私有,旧 ACL 会不会继续被检索?
如果 embedding 模型升级,新旧向量是否隔离?
如果新索引评估变差,是否能在分钟级回滚?
对抗式审查不是为了让文档看起来专业,而是为了避免真实用户成为测试环境。
十二、面试版总结
RAG 知识库动态更新的核心挑战,是文档变化后 chunk 和向量都会变化,不能把它当成普通数据库 UPDATE。生产上最稳的方案是:用 updated_at + content_hash 感知变更,用 doc_id 关联所有 chunk,修改时先删旧 chunk 再重新切分入库,删除时保证检索层不可见。
低频场景可以定时轮询,高频场景用 Webhook、CDC 或消息队列。大规模系统要做幂等、乱序保护、失败重试、死信队列、三端对账、权限同步和缓存失效。Embedding 模型或 chunk 策略变化时,不要原地更新,应该新建索引,全量重建,离线评估后灰度切流,并保留旧索引用于回滚。
一句话:RAG 的更新能力,本质上是“知识库发布系统”。它不是把文档塞进向量库,而是持续保证线上回答引用的是当前正确、可见、可追溯的知识。
要点速读
先说结论: 生产级 RAG 的知识库更新,不是“重新跑一遍脚本”,而是围绕 doc_id、content_hash、ve
- 先说结论: 生产级 RAG 的知识库更新,不是“重新跑一遍脚本”,而是围绕 doc_id、content_hash、ve
- 更多细节仍在持续更新中
- 更多细节仍在持续更新中