流式查询1. mybatis的游标Cursor,分页大数据查询

mybatis,cursor · 浏览次数 : 34

小编点评

流式查询是一种数据库查询方式,它允许应用程序从数据库中一次获取一条记录,而不是一次性获取所有记录。这种方式可以降低内存使用,特别是在处理大量数据时。流式查询的优点在于它可以减少内存占用,提高查询效率。 在MyBatis Plus中,游标分页是一种常用的流式查询实现方式。使用游标分页,可以在mybatis-config.xml或application.yml中添加相应的配置。例如: ```xml mybatis-plus: configuration: settings: useCursorFetch: true ``` 在Mapper接口中,定义一个返回Cursor类型的方法,例如: ```java public interface CustomerLoyCardMapper { Cursor getCardCursorByBatchIdOrCardNum(String batchId, String startingNumber, String endingNumber, String cardRange); } ``` 实现类需要实现该方法,例如: ```java @Override public Cursor getCardCursorByBatchIdOrCardNum(String batchId, String startingNumber, String endingNumber, String cardRange) { // 实现方法逻辑 } ``` SQL语句应该使用`resultType`和`resultSetType`指定结果类型和游标类型,例如: ```xml ``` 使用游标分页时,需要注意以下几点: 1. 确保数据库支持游标分页功能。 2. 根据实际情况设置合适的分页大小。 3. 及时关闭游标,释放资源。 4. 进行性能监控和测试,确保游标分页带来的性能提升符合预期。 5. 合理使用缓存,提高查询效率。 6. 关注MyBatis Plus的最新版本特性和改进。 总之,流式查询和游标分页是一种提高查询效率和降低内存占用的有效方式,但在使用时需要注意数据库支持、分页大小、连接管理等方面的问题。

正文

流式查询
流式查询 指的是查询成功后不是返回一个集合而是返回一个迭代器,应用可以通过迭代器每次取一条查询结果。流式查询的好处是能够降低内存使用。例如我们想要从数据库取 1000 万条记录而又没有足够的内存时,就不得不分页查询。

而分页查询就需要我们按照顺序查询并设置一个参数来记录当前进度并在下次查询时将进度作为参数传入。

(比如按id升序查询,记录每次查询结果的最大id, 下次查询将这个最大id传入只查询大于这个id的),否则就会出现深度分页的情况。而查询效率取决于表设计,如果设计的不好,那么每次查询都会是一次单独的低效查询。
   而流式查询不需要自己记录进度(数据库来记录),且即使表设计的较差或在sql比较复杂,也仅仅只需要一次低效查询。
    流式查询的过程当中,要保证数据库连接是保持打开状态的,否则就会流关闭。
   MyBaits通过游标Cursor实现了流式查询。 MyBaits Plus基于Mybais, 自然也是支持的。

如何使用
写一个获取流的Mapper
不需要其他配置, 像平常我们写查询一样在Mapper定义查询,并将返回结果设为Cursor即可实现一个流式查询。
sql也正常按照时间查询条件写,不需要加limit之类的参数。

1. 配置游标分页

      在MyBatis Plus中,使用游标分页需要在mybatis-config.xml或application.yml 中添加配置:

mybatis-plus:
  configuration:
    settings:
      useCursorFetch: true

    @Transactional
    @Async("asyncServiceExecutor")
    public void asyncSaveAndUpdateCardPool(String buId, CardStatusReqVO cardReqVO) {
        try (ShardingCtx s = ShardingCtx.setShardingValue(buId)) {
            log.info("asyncSaveAndUpdateCardPool: {}, {}", buId, cardReqVO);

            try (Cursor<CardUpdateVO> cursor = memberCardRepository.getCardCursorByBatchIdOrCardNum(
                    cardReqVO.getBatchId(),
                    cardReqVO.getStartingNumber(),
                    cardReqVO.getEndingNumber(),
                    cardReqVO.getCardRange())) {

                final int BATCH_SIZE = 1000;
                AtomicBoolean isFirstIteration = new AtomicBoolean(true);
                CopyOnWriteArrayList<CardUpdateVO> updateVOList = new CopyOnWriteArrayList<>();

                for (CardUpdateVO cardUpdateVO : cursor) {
                    updateVOList.add(cardUpdateVO);
                    if (updateVOList.size() == BATCH_SIZE) {
                        batchUpdateCardStatusAsync(buId, updateVOList, cardReqVO, isFirstIteration);
                        updateVOList.clear();
                    }
                }

                if (!updateVOList.isEmpty()) {
                    batchUpdateCardStatusAsync(buId, updateVOList, cardReqVO, isFirstIteration);
                }
            } catch (Exception e) {
                log.error("Cursor process error: ", e);
            }
        }
        log.info("Finished asyncSaveAndUpdateCardPool for cardReqVO: {}, {}", buId, cardReqVO);
    }
接口:
Cursor<CardUpdateVO> getCardCursorByBatchIdOrCardNum(String batchId, String startingNumber, String endingNumber, String cardRange);

实现类:
 @Override
 public Cursor<CardUpdateVO> getCardCursorByBatchIdOrCardNum(String batchId, String startingNumber, String endingNumber, String cardRange) {
     return customerLoyCardMapper.getCardCursorByBatchIdOrCardNum(batchId, startingNumber, endingNumber, cardRange);
 }
    
mapper:
Cursor<CardUpdateVO> getCardCursorByBatchIdOrCardNum(@Param("batchId")String batchId,
                                                         @Param("startingNumber")String startingNumber,
                                                         @Param("endingNumber")String endingNumber,
                                                         @Param("cardRange")String cardRange);
sql:
<select id="getCardCursorByBatchIdOrCardNum" resultType="com.aswatson.csc.member.req.CardUpdateVO" resultSetType="FORWARD_ONLY" fetchSize = "500">
        SELECT M.BU_ID as buId,
        M.MEMBER_ID as memberId,
        M.PROGRAM_ID as programId,
        C.CARD_ID as cardId,
        C.CARD_NUM as cardNum,
        C.VISIBLE_CARD as visibleCard,
        C.CARD_TYPE_CD as cardTypeCd,
        C.STATUS_CD as statusCd,
        C.ACTIVE_FLAG as activeFlag
        FROM CUSTOMER_LOY_CARD C, CUSTOMER_LOY_MEMBER M
        WHERE C.MEMBER_ID = M.MEMBER_ID
        <if test="startingNumber != null and endingNumber != null and cardRange == 1 ">
            AND C.VISIBLE_CARD BETWEEN #{startingNumber} AND #{endingNumber}
        </if>
        <if test="batchId != null and batchId != '' and cardRange == 2 ">
            AND C.BATCH_ID = #{batchId}
        </if>
    </select>

需要注意的点:

获取一个打开状态的连接。

上面提过,使用流式查询需要保证数据库连接是保持打开状态。而正常情况下我们使用mybaits执行一次查询,连接都会被关闭或在重置。因此我们需要一些方法来保持连接。

使用事务:事务执行完毕之前连接会一直保持因此,我们可以来使用事务来保持连接。

 这是最简单的方法,但是需要注意的是,由于cursor在遍历结束的方法末尾后会主动关闭连接。因此:

1. 方法内事务是正常的,在cursor查询前和遍历后的的数据操作依然是一个事务。从这里可以猜测,连接并不在在遍历完立即关闭,而是采用了类似AOP的手段在方法末尾关闭。
2. 如果你使用了数据库连接池,那么池中的这个连接会被关掉,这可能并不会导致你的连接池数量减少,因为连接池的连接数据依赖于各自的规则,
    当有连接断开,连接池会根据自己的策略对做相应的处理,比如重新建立,因此不需要过分关注这个,需要注意的是这个行为可能带来的影响,
    比如druid连接池,druid在事务执行完毕后会进行连接的清理。但是这个连接在myDbTableMapper遍历完成时已经关闭,
    就会导致日志打印java.sql.SQLException: No operations allowed after statement closed。这个问题不会带来数据影响,
    但是依然属于一个错误,见github-druid-issues,在1.2.10, druid将这个日志改为debug级别进行了临时的屏蔽。其他连接池尚未进行测试。

循环获取数据
Cursor继承了迭代器,可以通过Cursor获取他的Iterator, 或者直接使用for循环来获取数据。
需要注意的是,Cursor在查询一瞬间数据就固化了,如果你先查询,再更新,再遍历,即使在同一个事务里面,获得的结果也是未更新的数据。

关闭流
cursor在遍历结束后会主动关闭连接。如果未遍历结束中途退出,可以调用cursor的close方法关闭连接。推荐finaly内总是调用close或者try(resoure)来保证连接总是被正常关闭。

 

在使用MyBatis Plus的游标分页时,有一些注意事项和建议:

数据库支持: 游标分页依赖数据库的游标支持,因此确保数据库支持游标分页功能。

分页大小: 需要根据实际情况设置合适的分页大小,过小可能导致频繁查询,过大可能失去游标分页的优势。

及时关闭游标: 使用Cursor时,确保在处理完数据后及时关闭游标,释放资源。

性能监控: 对于大数据量的场景,建议进行性能监控和测试,确保游标分页带来的性能提升符合预期。

合理使用缓存: 需要根据实际情况考虑是否使用缓存,以及如何合理使用缓存,以提高查询效率。

版本更新: MyBatis Plus的版本可能会更新,建议关注最新版本的特性和改进,以获取更好的支持和性能。
————————————————

与流式查询1. mybatis的游标Cursor,分页大数据查询相似的内容:

流式查询1. mybatis的游标Cursor,分页大数据查询

流式查询流式查询 指的是查询成功后不是返回一个集合而是返回一个迭代器,应用可以通过迭代器每次取一条查询结果。流式查询的好处是能够降低内存使用。例如我们想要从数据库取 1000 万条记录而又没有足够的内存时,就不得不分页查询。 而分页查询就需要我们按照顺序查询并设置一个参数来记录当前进度并在下次查询时

[转帖]QPS、TPS、RT、并发量、 吞吐量

QPS 每秒查询率(QPS,Queries-per-second)是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准。 每秒查询率(QPS,Queries-per-second)是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准,在因特网上,作为域名系统服务器的机器的性能经常用每

数据库系列:MySQL慢查询分析和性能优化

1 背景 我们的业务服务随着功能规模扩大,用户量扩增,流量的不断的增长,经常会遇到一个问题,就是数据存储服务响应变慢。 导致数据库服务变慢的诱因很多,而RD最重要的工作之一就是找到问题并解决问题。 下面以MySQL为例子,我们从几个角度分析可能产生原因,并讨论解决的方案。 2 定位慢查询的原因并优化

《流畅的Python》 读书笔记 第二章数据结构(1) 231007

第2章 数据结构 ABC语言是Python的爸爸~ 很多点子在现在看来都很有 Python 风格:序列的泛型操作、内置的元组和映射类型、用缩进来架构的源码、无需变量声明的强类型 不管是哪种数据结构,字符串、列表、字节序列、数组、XML 元素,抑或是数据库查询结果,它们都共用一套丰富的操作:迭代、切片

【源码解读(一)】EFCORE源码解读之创建DBContext查询拦截

引言 在网上很少看到有关于系统讲解EFCore源码的,可能大概也许是因为EFCore的源码总体是没有asp.net web的源码流程清晰,正如群友所说,EFCore的源码大致看起来有点凌乱,与其说凌乱,不如说是没有一个好的方向;然后昨天在群里有一个朋友再说,EfCore的拦截器如何注入Web的服务,

实时数仓构建:Flink+OLAP查询的一些实践与思考

以Flink为主的计算引擎配合OLAP查询分析引擎组合进而构建实时数仓**,其技术方案的选择是我们在技术选型过程中最常见的问题之一。也是很多公司和业务支持过程中会实实在在遇到的问题。 很多人一提起实时数仓,就直接大谈特谈Hudi,Flink的流批一体等,但实际上,**实时数仓包括任何架构体系的构建如...

通过 Wireshark 解密 Kerberos 票据

前言 在使用 Wireshark 分析 Active Directory 的 Kerberos 的流量时,会遇到加密票据的情况,这对进一步探究 AD 下的漏洞篡改事件的详细过程造成了影响。在查询资料时也了解到也有一些攻击流量的 payload 也可能存在被解析为加密存根导致分析中断。 此处记录一下如

Redis缓存的主要异常及解决方案

作者:京东物流 陈昌浩 1 导读 Redis 是当前最流行的 NoSQL数据库。Redis主要用来做缓存使用,在提高数据查询效率、保护数据库等方面起到了关键性的作用,很大程度上提高系统的性能。当然在使用过程中,也会出现一些异常情景,导致Redis失去缓存作用。 2 异常类型 异常主要有 缓存雪崩 缓

函数式编程(Lambda、Stream流、Optional等)

# 声明 文档来源:Github@shuhongfan 源文档:B站UP主:三更草堂 # 函数式编程-Stream流 # 概述 # 为什么学? 基操,否则看不懂别人写的优雅代码 简化代码,不想看到有些恶心代码 大数据下处理集合效率高 // 【恶心级代码】查询未成年作家的评分在70以上的书籍 由于洋流

OData WebAPI实践-与ABP vNext集成

本文属于 OData 系列文章 ABP 是一个流行的 ASP. NET 开发框架,旧版的的 ABP 已经能够非常好的支持了 OData ,并提供了对应的 OData 包。 ABP vNext 是一个重新设计的,面向微服务的框架,提供了一些非常有用的特性,包括分页查询等但是它并不能原生支持 OData