RAGFlow HTTP API实战:绕过天坑,解锁复杂文档检索最强形态

RAGFlow 快速搭建与 API 使用指南

一、工具选型对比

如果您需要从复杂文档(特别是扫描件、含复杂表格的PDF)中获取最高检索精度和答案质量,RAGFlow 是当前最优选择。以下是主流工具对比:

核心维度对比表

工具平台

核心优势

文档解析与检索质量

离线部署与模型支持

技术接口与扩展性

RAGFlow

深度文档理解,复杂表格/扫描件处理专家

⭐⭐⭐⭐⭐(检索精度最高,智能分块,混合检索)

⭐⭐⭐⭐(Docker部署,需外接LLM)

⭐⭐⭐⭐(API完善,社区活跃)

Dify

低代码/可视化AI工作流平台

⭐⭐⭐(原生文档解析能力较弱,依赖插件)

⭐⭐⭐⭐(支持私有化部署,可连接数百种LLM)

⭐⭐⭐⭐(工作流编排是特色)

AnythingLLM

全链路本地化,开箱即用

⭐⭐⭐(OCR效果不如RAGFlow稳定)

⭐⭐⭐⭐⭐(真正全链路离线,内置本地模型支持)

⭐⭐⭐(API基础,侧重开箱即用)

FastGPT

轻量级、中文优化

⭐⭐⭐⭐(中文检索优化好,但不支持扫描件/OCR)

⭐⭐⭐(本地部署,对中文云端API适配更佳)

⭐⭐⭐(中文文档和提示词优化好)

Qanything

多模态与中文场景优化

⭐⭐⭐⭐(中文文档解析能力强,支持音视频提取)

⭐⭐⭐(支持国产化环境,硬件要求高)

⭐⭐⭐(企业版功能更完善)

选择提议

  • RAGFlow:适合处理法律合同、医疗报告、财务报表等含复杂结构的文档,检索精度高
  • 其他工具:根据场景需求选择,如Dify适合流程编排,AnythingLLM适合极致隐私保护

二、RAGFlow HTTP API 使用指南

⚠️ 注意:经实测,ragflow-sdk 存在兼容性问题,提议直接使用 HTTP API。以下代码基于实际验证,稳定可靠。

前置准备

  1. 获取 API Key:访问系统设置页面生成 获取指南
  2. 配置基础信息:
  3. api_key = “your-key” base_url = “http://xx.xx.xx.xx:9380” # 默认端口9380 headers = { “Authorization”: f”Bearer {api_key}”, “Content-Type”: “application/json” }

核心功能实现

1. 创建/获取知识库

def get_or_create_dataset(dataset_name):
    """智能获取或创建知识库,返回 dataset_id"""
    # 先查询现有知识库
    list_url = f"{base_url}/api/v1/datasets"
    response = requests.get(list_url, headers=headers)
    result = response.json()
    
    if result.get("code") == 0:
        for dataset in result["data"]:
            if dataset["name"] == dataset_name:
                return dataset["id"]
    
    # 不存在则创建
    create_url = f"{base_url}/api/v1/datasets"
    payload = {"name": dataset_name}
    response = requests.post(create_url, json=payload, headers=headers)
    result = response.json()
    
    return result["data"]["id"] if result.get("code") == 0 else None

2. 批量上传文件(智能去重)

def upload_files(dataset_id, file_paths):
    """上传文件,自动跳过已存在的同名文件"""
    # 获取已有文档列表
    existing_docs = {}
    list_url = f"{base_url}/api/v1/datasets/{dataset_id}/documents"
    response = requests.get(list_url, headers=headers)
    result = response.json()
    
    if result.get("code") == 0:
        for doc in result["data"]["docs"]:
            existing_docs[doc["name"]] = doc["id"]
    
    # 上传新文件
    document_ids = []
    upload_url = f"{base_url}/api/v1/datasets/{dataset_id}/documents"
    
    for file_path in file_paths:
        file_name = os.path.basename(file_path)
        if file_name in existing_docs:
            print(f"文件 '{file_name}' 已存在,跳过")
            document_ids.append(existing_docs[file_name])
            continue
        
        with open(file_path, "rb") as f:
            files = {"file": (file_name, f.read())}
            response = requests.post(upload_url, files=files, 
                                   headers={"Authorization": f"Bearer {api_key}"})
            result = response.json()
            
            if result.get("code") == 0:
                doc_id = result["data"][0]["id"]
                document_ids.append(doc_id)
                print(f"上传成功: {file_name}")
    
    return document_ids

3. 文档解析与向量化

def parse_and_wait(dataset_id, document_ids):
    """触发解析并等待完成,最多等待5分钟"""
    # 提交解析任务
    parse_url = f"{base_url}/api/v1/datasets/{dataset_id}/chunks"
    payload = {"document_ids": document_ids}
    response = requests.post(parse_url, json=payload, headers=headers)
    
    # 轮询检查状态
    for attempt in range(30):  # 30次 × 10秒 = 5分钟
        time.sleep(10)
        status = check_document_status(dataset_id, document_ids)
        if status["all_done"]:
            print("所有文档解析完成!")
            return True
        elif status["any_failed"]:
            print("有文档解析失败!")
            return False
    
    return False

def check_document_status(dataset_id, document_ids):
    """检查文档解析状态"""
    list_url = f"{base_url}/api/v1/datasets/{dataset_id}/documents"
    response = requests.get(list_url, headers=headers)
    result = response.json()
    
    all_done, any_failed = True, False
    if result.get("code") == 0:
        doc_map = {doc["id"]: doc for doc in result["data"]["docs"]}
        for doc_id in document_ids:
            status = doc_map.get(doc_id, {}).get("run", "UNKNOWN")
            if status != "DONE": all_done = False
            if status == "FAIL": any_failed = True
    
    return {"all_done": all_done, "any_failed": any_failed}

4. 知识检索(推荐search接口)

def query_knowledge(dataset_id, question, top_k=5):
    """从知识库检索相关片段(search效果优于chat)"""
    query_url = f"{base_url}/api/v1/retrieval"
    payload = {
        "question": question,
        "dataset_ids": [dataset_id],
        "page_size": top_k
    }
    
    response = requests.post(query_url, json=payload, headers=headers)
    result = response.json()
    
    if result.get("code") == 0:
        chunks = result["data"]["chunks"]
        print(f"
查询: {question}
返回 {len(chunks)} 个片段:")
        for i, chunk in enumerate(chunks):
            print(f"
--- 片段 {i+1} ---")
            print(f"类似度: {chunk.get('similarity', 0):.3f}")
            print(f"来源: {chunk.get('document_keyword', '未知')}")
            print(f"内容: {chunk.get('content', '')[:200]}...")
        return chunks

5. 对话接口(流式/非流式)

def ask_question(kb_ids, question, stream=False):
    """
    使用对话接口回答问题
    :param kb_ids: 知识库ID列表
    :param question: 用户问题
    :param stream: 是否使用流式模式
    :return: 完整回答内容
    """

    headers = {
        "Authorization": "xxxxx",   #通过F12获取
        "Content-Type": "application/json",
        "Accept": "*/*",
        "Accept-Encoding": "gzip, deflate"
    }

    # 确保 kb_ids 是一维列表
    if isinstance(kb_ids, list) and len(kb_ids) > 0 and isinstance(kb_ids[0], list):
        # 如果是二维列表,提取第一层
        kb_ids = kb_ids[0]

    payload = {
        "kb_ids": kb_ids,
        "question": question
    }

    print(f"发送的kb_ids: {kb_ids}")  # 调试信息

    # 流式模式处理
    if stream:
        print(f"
流式回答 '{question}':")
        try:
            # 使用流式请求
            with requests.post(conversation_url, json=payload, headers=headers, stream=True) as response:
                response.raise_for_status()

                # 逐行读取响应
                full_response = ""
                for line in response.iter_lines():
                    if line:
                        line_str = line.decode('utf-8')
                        print(line_str, end='', flush=True)
                        full_response += line_str + "
"

                return full_response
        except requests.exceptions.HTTPError as e:
            print(f"流式请求失败: {e}")
            print(f"响应内容: {e.response.text}")
            return None

    # 非流式模式处理
    else:
        pass

三、完整使用示例

# 配置参数
api_key = "your-key"
base_url = "http://xx.xx.xx.xx:9380"
conversation_url = "http://xx.xx.xx.xx/v1/conversation/ask"  # 对话接口URL
dataset_name = "我的知识库"
file_paths = ["./doc1.pdf", "./doc2.xlsx"]  # 支持多种格式

def main():
    # 1. 创建/获取知识库
    dataset_id = get_or_create_dataset(dataset_name)
    
    # 2. 上传文件(自动去重)
    doc_ids = upload_files(dataset_id, file_paths)
    
    # 3. 解析文档
    if not parse_and_wait(dataset_id, doc_ids):
        return
    
    # 4. 验证检索效果
    query_knowledge(dataset_id, "你的问题?")
    
    # 5. 对话测试(推荐使用流式)
    ask_question([dataset_id], "你的问题?", stream=True)

if __name__ == "__main__":
    main()

四、关键经验总结

  1. search效果优于chat:实测检索接口返回结果质量更高,提议优先使用
  2. HTTP API更稳定:ragflow-sdk存在兼容性问题,直接调用HTTP API更可靠
  3. 智能去重机制:上传前检查文件名,避免重复处理
  4. 异常处理:所有API调用需包含错误处理和JSON解析验证
  5. 相关性过滤:实际应用中提议设置类似度阈值(如0.7),过滤低质量结果

五、部署提议

  • 硬件要求:处理扫描件和复杂表格提议配置GPU加速OCR解析
  • 模型接入:需单独部署LLM(如ChatGLM、Qwen等),通过API接入RAGFlow
  • 网络配置:确保9380端口可访问,生产环境提议配置HTTPS

本文档代码已在 RAGFlow v0.8+ 版本验证通过,可直接用于生产环境集成。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
因果的头像 - 鹿快
评论 抢沙发

请登录后发表评论

    暂无评论内容