智能工作流:Spring AI高效批量化提示访问方案

spring,ai · 浏览次数 : 0

小编点评

## Summary of the content This document describes an approach for efficiently handling multiple prompts for GPT API calls in a Spring framework application. **Main points:** * **Multi-threading:** Instead of processing prompts sequentially, use a thread pool to handle requests concurrently. * **Dynamic key allocation:** Dynamically allocate API keys based on the task index to avoid rate limiting. * **Result storage and management:** Save prompts and answers in a database for later analysis and retrieval. * **API key load balancing:** Use a thread pool to distribute API keys across multiple tasks, ensuring even distribution and preventing limitations. * **Workflow in ChatController:** * Receive and validate the uploaded file and username. * Create a task object with information like file name, start time, user ID, and status. * Submit the task to the `chatService` for processing. * Return a success message upon successful processing. **Additional details:** * The code utilizes the `ExecutorService` for thread pool management. * It reads API keys from a local database and assigns them dynamically based on the task index. * The application demonstrates a database ER diagram with relationships between question IDs and user IDs. **Overall, this approach improves efficiency by processing prompts in parallel, managing API keys, and employing a robust database management system.**

正文

基于SpringAI搭建系统,依靠线程池\负载均衡等技术进行请求优化,用于解决科研&开发过程中对GPT接口进行批量化接口请求中出现的问题。

github地址:https://github.com/linkcao/springai-wave

大语言模型接口以OpenAI的GPT 3.5为例,JDK版本为17,其他依赖版本可见仓库pom.xml

拟解决的问题

在处理大量提示文本时,存在以下挑战:

  1. API密钥请求限制: 大部分AI服务提供商对API密钥的请求次数有限制,单个密钥每分钟只能发送有限数量的请求。
  2. 处理速度慢: 大量的提示文本需要逐条发送请求,处理速度较慢,影响效率。
  3. 结果保存和分析困难: 处理完成的结果需要保存到本地数据库中,并进行后续的数据分析,但这一过程相对复杂。

解决方案

为了解决上述问题,本文提出了一种基于Spring框架的批量化提示访问方案,如下图所示:

image-20240511160521257

其中具体包括以下步骤:

  1. 多线程处理提示文本: 将每个提示文本看作一个独立的任务,采用线程池的方式进行多线程处理,提高处理效率。
  2. 动态分配API密钥: 在线程池初始化时,通过读取本地数据库中存储的API密钥信息,动态分配每个线程单元所携带的密钥,实现负载均衡。
  3. 结果保存和管理: 在请求完成后,将每个请求的问题和回答保存到本地数据库中,以便后续的数据分析和管理。
  4. 状态实时更新: 将整个批量请求任务区分为进行中、失败和完成状态,并通过数据库保存状态码实时更新任务状态,方便监控和管理。

关键代码示例

  1. 多线程异步请求提示信息(所在包: ChatService)
    // 线程池初始化
	private static final ExecutorService executor = Executors.newFixedThreadPool(10);
    /**
     * 多线程请求提示
     * @param prompts
     * @param user
     * @param task
     * @return
     */
    @Async
    public CompletableFuture<Void> processPrompts(List<String> prompts, Users user, Task task) {
        for (int i = 0; i < prompts.size();i++) {
            int finalI = i;
            // 提交任务
            executor.submit(() -> processPrompt(prompts.get(finalI), user, finalI));
        }
        // 设置批量任务状态
        task.setStatus(TaskStatus.COMPLETED);
        taskService.setTask(task);
        return CompletableFuture.completedFuture(null);
    }
  • 如上所示,利用了Spring框架的@Async注解和线程池的功能,实现了多线程异步处理提示信息。

  • 首先,使用了ExecutorService创建了一个固定大小的线程池,以便同时处理多个提示文本。

  • 然后,通过CompletableFuture来实现异步任务的管理。

  • 在处理每个提示文本时,通过executor.submit()方法提交一个任务给线程池,让线程池来处理。

  • 处理完成后,将批量任务的状态设置为已完成,并更新任务状态。

  • 一个线程任务需要绑定请求的用户以及所在的批量任务,当前任务所分配的key由任务所在队列的下标决定。

  1. 处理单条提示信息(所在包: ChatService)
    /**
     * 处理单条提示文本
     * @param prompt 提示文本
     * @param user 用户
     * @param index 所在队列下标
     */
    public void processPrompt(String prompt, Users user, int index) {
        // 获取Api Key
        OpenAiApi openAiApi = getApiByIndex(user, index);
        assert openAiApi != null;
        ChatClient client = new OpenAiChatClient(openAiApi);
        // 提示文本请求
        String response = client.call(prompt);
        // 日志记录
        log.info("提示信息" + prompt );
        log.info("输出" + response );
        // 回答保存数据库
        saveQuestionAndAnswer(user, prompt, response);
    }
  • 首先根据任务队列的下标获取对应的API密钥
  • 然后利用该密钥创建一个与AI服务进行通信的客户端。
  • 接着,使用客户端发送提示文本请求,并获取AI模型的回答。
  • 最后,将问题和回答保存到本地数据库和日志中,以便后续的数据分析和管理。
  1. Api Key 负载均衡(所在包: ChatService)
    /**
     * 采用任务下标分配key的方式进行负载均衡
     * @param index 任务下标
     * @return OpenAiApi
     */
    private OpenAiApi getApiByIndex(int index){
        List<KeyInfo> keyInfoList = keyRepository.findAll();
        if (keyInfoList.isEmpty()) {
            return null;
        }
        // 根据任务队列下标分配 Key
        KeyInfo keyInfo = keyInfoList.get(index % keyInfoList.size());
        return new OpenAiApi(keyInfo.getApi(),keyInfo.getKeyValue());
    }
  • 首先从本地数据库中获取所有可用的API密钥信息
  • 然后根据任务队列的下标来动态分配API密钥。
  • 确保每个线程单元都携带了不同的API密钥,避免了因为某个密钥请求次数达到限制而导致的请求失败问题。
  1. 依靠线程池批量请求GPT整体方法(所在包: ChatController)
/**
     * 依靠线程池批量请求GPT
     * @param promptFile 传入的批量提示文件,每一行为一个提示语句
     * @param username 调用的用户
     * @return 处理状态
     */
    @PostMapping("/batch")
    public String batchPrompt(MultipartFile promptFile, String username){
        if (promptFile.isEmpty()) {
            return "上传的文件为空";
        }
        // 批量请求任务
        Task task = new Task();
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(promptFile.getInputStream()));
            List<String> prompts = new ArrayList<>();
            String line;
            while ((line = reader.readLine()) != null) {
                prompts.add(line);
            }
            // 用户信息请求
            Users user = userService.findByUsername(username);
            // 任务状态设置
            task.setFileName(promptFile.getName());
            task.setStartTime(LocalDateTime.now());
            task.setUserId(user.getUserId());
            task.setStatus(TaskStatus.PROCESSING);
            // 线程池处理
            chatService.processPrompts(prompts, user, task);
            return "文件上传成功,已开始批量处理提示";
        } catch ( IOException e) {
            // 处理失败
            e.printStackTrace();
            task.setStatus(TaskStatus.FAILED);
            return "上传文件时出错:" + e.getMessage();
        } finally {
            // 任务状态保存
            taskService.setTask(task);
        }
    }
  • 首先,接收用户上传的批量提示文件和用户名信息。
  • 然后,读取文件中的每一行提示文本,并将它们存储在一个列表中。
  • 接着,根据用户名信息找到对应的用户,并创建一个任务对象来跟踪批量处理的状态。
  • 最后,调用ChatService中的processPrompts()方法来处理提示文本,并返回处理状态给用户。

数据库ER图

所有信息都与用户ID强绑定,便于管理和查询,ER图如下所示:

image-20240511165330676

演示示例

  1. 通过postman携带批量请求文件username信息进行Post请求访问localhost:8080/batch接口:

image-20240511165636797

  1. 在实际应用中,可以根据具体需求对提示文本进行定制和扩展,以满足不同场景下的需求,演示所携带的请求文件内容如下:
请回答1+2=?
请回答8*12=?
请回答12*9=?
请回答321-12=?
请回答12/4=?
请回答32%2=?
  1. 最终返回的数据库结果,左为问题库,右为回答库:

image-20240511165910247

  • 问题库和答案库通过question_iduser_id进行绑定,由于一个问题可以让GPT回答多次,因此两者的关系为多对一,将问题和答案分在两个独立的表中也便于后续的垂域定制和扩展。

与智能工作流:Spring AI高效批量化提示访问方案相似的内容:

智能工作流:Spring AI高效批量化提示访问方案

集用SpringAI搭建系统,依靠线程池\负载均衡等技术进行请求优化,用于解决科研&开发过程中对GPT接口进行批量化接口请求中出现的问题。大语言模型接口以OpenAI的GPT 3.5为例,JDK版本为17。

重塑未来的1课:组装式交付新引擎——智能化低代码平台

摘要:智能化低代码必修课。 紧跟低代码技术飞速发展——华为云Astro智能工作流惊艳HDC.Cloud 2023!企业对未来智能化组装式交付的期待已不是空想。智能化低代码即将重新定义传统交付模式,密切连接AI科技与创造力。 在HDC.Cloud 2023华为云Astro分论坛,云计算大咖、行业翘楚科

PhiData 一款开发AI搜索、agents智能体和工作流应用的AI框架

PhiData以其强大的功能集成和灵活的部署选项,为AI产品开发提供了极大的便利和高效性。它为构建智能AI助手提供了一个全新的视角,让开发者能够探索AI的无限可能。如果你对构建AI产品感兴趣,不妨试试PhiData。

5分钟搭建图片压缩应用

摘要:用华为云函数工作流FunctionGraph搭建图片压缩应用。 本文分享自华为云社区《真正的按需计费丨函数工作流 FunctionGraph实战,5分钟搭建图片压缩应用》,作者:华为云PaaS服务小智。 1.背景介绍 互联网时代,各类app,小程序为人们的生活,办公,学习,休闲,娱乐提供着便利

Serverless: AI everywhere的下一块拼图

摘要:本文介绍华为云函数工作流(FunctionGraph)的灵活、速度,如何让开发人员提升工程效率,缩短TTM等 本文分享自华为云社区《华为云FunctionGraph函数工作流—— Serverless“遇见”AI,释放AI生产力》,作者: 华为云PaaS服务小智 。 华为云Serverless

华夏天信携手华为云开天aPaaS,打造安全、高效、节能的主煤流运输系统

摘要:基于开天aPaaS集成工作台,主煤流运输系统如何实现多源异构数据融合、皮带物料和人员违章的智能感知,以及皮带的智能控制。灵活架构、高效集成、快速开发! 本文分享自华为云社区《华夏天信携手华为云开天aPaaS,打造安全、高效、节能的主煤流运输系统》,作者:开天aPaaS小助手。 据权威数据显示,

基于云原生的物联大数据智能服务

摘要:物联大数据已成为当前物联网系统建设的核心,基于物联大数据的涌现智能和应用以及借此对物理世界的反馈和控制是未来物联网系统的建设目标。 本文分享自华为云社区《基于云原生的物联大数据智能服务》,作者:赵卓峰 、丁维龙 、于淇 / 北方工业大学数据工程研究院、大规模流数据集成与分析北京市重点实验室。

探索Semantic Plugins:开启大模型的技能之门

前言 在之前的章节中我们或多或少的已经接触到了 Semantic Kernel 的 Plugins,本章我们讲详细介绍如何使用插件。 Semantic Kernel 的一大特点是拥有强大的插件,通过结合自定义/预定义的插件解决智能业务的问题。让传统的代码和智能插件一起工作灵活地接入到应用场景简化传统

#Powerbi 利用时间智能函数,进行周度分析

在实际工作中,我们往往需要同比分析,月度和年度的分析都有对应的时间智能函数,分别是MTD和YTD,但是缺少了周度的时间智能函数,而 恰恰日常工作中,我们又需要以周度来进行对应的分析,今天我们来学习一下,如何使用Powerbi来进行周度分析。 我们这里假设要进行流量的周度分析,流量表里包含了日期、曝光

不只是负载均衡,活字格智能集群的架构与搭建方案

还在单机服务器,时刻面临宕机风险吗? 优化程度不够,响应速度缓慢,系统工作响应像老汉拉车吗? 为了帮助大家具备企业级应用的部署能力,轻松应对核心业务系统的部署要求,我们准备了《活字格智能集群的架构与搭建方案》高级教程。 作为一款优秀的企业级低代码开发平台,活字格除了本身开发集成的强大功能之外,负载均