基于Kafka和Elasticsearch构建实时站内搜索功能的实践

基于,kafka,elasticsearch,构建,实时,站内搜索,功能,实践 · 浏览次数 : 370

小编点评

**标题:Binlog 采集端:实现数据库表更新** **内容:** * 概述 binlog 采集的流程 * 组件 binlogEventHandler 处理 binlog 事件 * binlogEventHandler 的方法 handle 处理 binlog 事件 * 如何判断是否为删除操作 * 如何在 Elasticsearch 中将数据删除 * 使用手写的 Java 代码实现处理器

正文

作者:京东物流 纪卓志

目前我们在构建一个多租户多产品类网站,为了让用户更好的找到他们所需要的产品,我们需要构建站内搜索功能,并且它应该是实时更新的。本文将会讨论构建这一功能的核心基础设施,以及支持此搜索能力的技术栈。

问题的定义与决策

为了构建一个快速、实时的搜索引擎,我们必须做出某些设计决策。我们使用 MySQL 作为主数据库存储,因此有以下选择:

  1. 直接在 MySQL 数据库中查询用户在搜索框中输入的每个关键词,就像%#{word1}%#{word2}%...这样。 😐
  2. 使用一个高效的搜索数据库,如 Elasticsearch。😮

考虑到我们是一个多租户应用程序,同时被搜索的实体可能需要大量的关联操作(如果我们使用的是 MySQL 一类的关系型数据库),因为不同类型的产品有不同的数据结构,所以我们还可以能需要同时遍历多个数据表来查询用户输入的关键词。所以我们决定不使用直接在 MySQL 中查询关键词的方案。🤯

因此,我们必须决定一种高效、可靠的方式,将数据实时地从 MySQL 迁移到 Elasticsearch 中。接下来需要做出如下的决定:

  1. 使用 Worker 定期查询 MySQL 数据库,并将所有变化的数据发送到 Elasticsearch。😶
  2. 在应用程序中使用 Elasticsearch 客户端,将数据同时写入到 MySQL 和 Elasticsearch 中。🤔
  3. 使用基于事件的流引擎,将 MySQL 数据库中的数据更改作为事件,发送到流处理服务器上,经过处理后将其转发到 Elasticsearch。🥳

选项 1 并不是实时的,所以可以直接排除,而且即使我们缩短轮询间隔,也会造成全表扫描给数据库造成查询压力。除了不是实时的之外,选项 1 无法支持对数据的删除操作,如果对数据进行了删除,那么我们需要额外的表记录之前存在过的数据,这样才能保证用户不会搜索到已经删除了的脏数据。对于其他两种选择,不同的应用场景做出的决定可能会有所不同。在我们的场景中,如果选择选项 2,那么我们可以预见一些问题:如过 Elasticsearch 建立网络连接并确认更新时速度很慢,那么这可能会降低我们应用程序的速度;或者在写入 Elasticsearch 时发生了未知异常,我们该如何对这一操作进行重试来保证数据完整性;不可否认开发团队中不是所有开发人员都能了解所有的功能,如果有开发人员在开发新的与产品有关的业务逻辑时没有引入 Elasticsearch 客户端,那么我们将在 Elasticsearch 中更新这次数据的更改,无法保证 MySQL 与 Elasticsearch 间的数据一致性。

接下来我们该考虑如何将 MySQL 数据库中的数据更改作为事件,发送到流处理服务器上。我们可以在数据库变更后,在应用程序中使用消息管道的客户端同步地将事件发送到消息管道,但是这并没有解决上面提到的使用 Elasticsearch 客户端带来的问题,只不过是将风险从 Elasticsearch 转移到了消息管道。最终我们决定通过采集 MySQL Binlog,将 MySQL Binlog 作为事件发送到消息管道中的方式来实现基于事件的流引擎。关于 binlog 的内容可以点击链接,在这里不再赘述。

服务简介

为了对外提供统一的搜索接口,我们首先需要定义用于搜索的数据结构。对于大部分的搜索系统而言,对用户展示的搜索结果通常包括为标题内容,这部分内容我们称之可搜索内容(Searchable Content)。在多租户系统中我们还需要在搜索结果中标示出该搜索结果属于哪个租户,或用来过滤当前租户下可搜索的内容,我们还需要额外的信息来帮助用户筛选自己想要搜索的产品类别,我们将这部分通用的但不用来进行搜索的内容称为元数据(Metadata)。最后,在我们展示搜索结果时可能希望根据不同类型的产品提供不同的展示效果,我们需要在搜索结果中返回这些个性化展示所需要的原始内容(Raw Content)。到此为止我们可以定义出了存储到 Elasticsearch 中的通用数据结构:

{
	"searchable": {
		"title": "string",
		"content": "string"
	},
	"metadata": {
		"tenant_id": "long",
		"type": "long",
		"created_at": "date",
		"created_by": "string",
		"updated_at": "date",
		"updated_by": "string"
	},
	"raw": {}
}

基础设施

Apache Kafka:Apache Kafka 是开源的分布式事件流平台。我们使用 Apache kafka 作为数据库事件(插入、修改和删除)的持久化存储。

mysql-binlog-connector-java:我们使用mysql-binlog-connector-java从 MySQL Binlog 中获取数据库事件,并将它发送到 Apache Kafka 中。我们将单独启动一个服务来完成这个过程。

在接收端我们也将单独启动一个服务来消费 Kafka 中的事件,并对数据进行处理然后发送到 Elasticsearch 中。

Q:为什么不使用Elasticsearch connector之类的连接器对数据进行处理并发送到Elasticsearch中?
A:在我们的系统中是不允许将大文本存入到MySQL中的,所以我们使用了额外的对象存储服务来存放我们的产品文档,所以我们无法直接使用连接器将数据发送到Elasticsearch中。
Q:为什么不在发送到Kafka前就将数据进行处理?
A:这样会有大量的数据被持久化到Kafka中,占用Kafka的磁盘空间,而这部分数据实际上也被存储到了Elasticsearch。
Q:为什么要用单独的服务来采集binlog,而不是使用Filebeat之类的agent?
A:当然可以直接在MySQL数据库中安装agent来直接采集binlog并发送到Kafka中。但是在部分情况下开发者使用的是云服务商或其他基础设施部门提供的MySQL服务器,这种情况下我们无法直接进入服务器安装agent,所以使用更加通用的、无侵入性的C/S结构来消费MySQL的binlog。

配置技术栈

我们使用 docker 和 docker-compose 来配置和部署服务。为了简单起见,MySQL 直接使用了 root 作为用户名和密码,Kafka 和 Elasticsearch 使用的是单节点集群,且没有设置任何鉴权方式,仅供开发环境使用,请勿直接用于生产环境。

version: "3"
services:
  mysql:
    image: mysql:5.7
    container_name: mysql
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: app
    ports:
      - 3306:3306
    volumes:
      - mysql:/var/lib/mysql
  zookeeper:
    image: bitnami/zookeeper:3.6.2
    container_name: zookeeper
    ports:
      - 2181:2181
    volumes:
      - zookeeper:/bitnami
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: bitnami/kafka:2.7.0
    container_name: kafka
    ports:
      - 9092:9092
    volumes:
      - kafka:/bitnami
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
    volumes:
      - elasticsearch:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
volumes:
  mysql:
    driver: local
  zookeeper:
    driver: local
  kafka:
    driver: local
  elasticsearch:
    driver: local

在服务启动成功后我们需要为 Elasticsearch 创建索引,在这里我们直接使用 curl 调用 Elasticsearch 的 RESTful API,也可以使用 busybox 基础镜像创建服务来完成这个步骤。

# Elasticsearch
curl "http://localhost:9200/search" -XPUT -d '
{
  "mappings": {
    "properties": {
      "searchable": {
        "type": "nested",
        "properties": {
          "title": {
            "type": "text"
          },
          "content": {
            "type": "text"
          }
        }
      },
      "metadata": {
        "type": "nested",
        "properties": {
          "tenant_id": {
            "type": "long"
          },
          "type": {
            "type": "integer"
          },
          "created_at": {
            "type": "date"
          },
          "created_by": {
            "type": "keyword"
          },
          "updated_at": {
            "type": "date"
          },
          "updated_by": {
            "type": "keyword"
          }
        }
      },
      "raw": {
        "type": "nested"
      }
    }
  }
}'

核心代码实现(SpringBoot + Kotlin)

Binlog 采集端:

    override fun run() {
        client.serverId = properties.serverId
        val eventDeserializer = EventDeserializer()
        eventDeserializer.setCompatibilityMode(
            EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
        )
        client.setEventDeserializer(eventDeserializer)
        client.registerEventListener {
            val header = it.getHeader<EventHeader>()
            val data = it.getData<EventData>()
            if (header.eventType == EventType.TABLE_MAP) {
                tableRepository.updateTable(Table.of(data as TableMapEventData))
            } else if (EventType.isRowMutation(header.eventType)) {
                val events = when {
                    EventType.isWrite(header.eventType) -> mapper.map(data as WriteRowsEventData)
                    EventType.isUpdate(header.eventType) -> mapper.map(data as UpdateRowsEventData)
                    EventType.isDelete(header.eventType) -> mapper.map(data as DeleteRowsEventData)
                    else -> emptyList()
                }
                logger.info("Mutation events: {}", events)
                for (event in events) {
                    kafkaTemplate.send("binlog", objectMapper.writeValueAsString(event))
                }
            }
        }
        client.connect()
    }

在这段代码里面,我们首先是对 binlog 客户端进行了初始化,随后开始监听 binlog 事件。binlog 事件类型有很多,大部分都是我们不需要关心的事件,我们只需要关注 TABLE_MAP 和 WRITE/UPDATE/DELETE 就可以。当我们接收到 TABLE_MAP 事件,我们会对内存中的数据库表结构进行更新,在后续的 WRITE/UPDATE/DELETE 事件中,我们会使用内存缓存的数据库结构进行映射。整个过程大概如下所示:

Table: ["id", "title", "content",...]
Row: [1, "Foo", "Bar",...]
=>
{
	"id": 1,
	"title": "Foo",
	"content": "Bar"
}

随后我们将收集到的事件发送到 Kafka 中,并由 Event Processor 进行消费处理。

事件处理器

@Component
class KafkaBinlogTopicListener(
    val binlogEventHandler: BinlogEventHandler
) {

    companion object {
        private val logger = LoggerFactory.getLogger(KafkaBinlogTopicListener::class.java)
    }

    private val objectMapper = jacksonObjectMapper()

    @KafkaListener(topics = ["binlog"])
    fun process(message: String) {
        val binlogEvent = objectMapper.readValue<BinlogEvent>(message)
        logger.info("Consume binlog event: {}", binlogEvent)
        binlogEventHandler.handle(binlogEvent)
    }
}

首先使用SpringBoot Message Kafka提供的注解对事件进行消费,接下来将事件委托到binlogEventHandler去进行处理。实际上BinlogEventHandler是个自定义的函数式接口,我们自定义事件处理器实现该接口后通过 Spring Bean 的方式注入到KafkaBinlogTopicListener中。

@Component
class ElasticsearchIndexerBinlogEventHandler(
    val restHighLevelClient: RestHighLevelClient
) : BinlogEventHandler {
    override fun handle(binlogEvent: BinlogEvent) {
        val payload = binlogEvent.payload as Map<*, *>
        val documentId = "${binlogEvent.database}_${binlogEvent.table}_${payload["id"]}"
        // Should delete from Elasticsearch
        if (binlogEvent.eventType == EVENT_TYPE_DELETE) {
            val deleteRequest = DeleteRequest()
            deleteRequest
                .index("search")
                .id(documentId)
            restHighLevelClient.delete(deleteRequest, DEFAULT)
        } else {
            // Not ever WRITE or UPDATE, just reindex
            val indexRequest = IndexRequest()
            indexRequest
                .index("search")
                .id(documentId)
                .source(
                    mapOf<String, Any>(
                        "searchable" to mapOf(
                            "title" to payload["title"],
                            "content" to payload["content"]
                        ),
                        "metadata" to mapOf(
                            "tenantId" to payload["tenantId"],
                            "type" to payload["type"],
                            "createdAt" to payload["createdAt"],
                            "createdBy" to payload["createdBy"],
                            "updatedAt" to payload["updatedAt"],
                            "updatedBy" to payload["updatedBy"]
                        )
                    )
                )
            restHighLevelClient.index(indexRequest, DEFAULT)
        }
    }
}

在这里我们只需要简单地判断是否为删除操作就可以,如果是删除操作需要在 Elasticsearch 中将数据删除,而如果是非删除操作只需要在 Elasticsearch 重新按照为文档建立索引即可。这段代码简单地使用了 Kotlin 中提供的 mapOf 方法对数据进行映射,如果需要其他复杂的处理只需要按照 Java 代码的方式编写处理器即可。

总结

其实 Binlog 的处理部分有很多开源的处理引擎,包括 Alibaba Canal,本文使用手动处理的方式也是为其他使用非 MySQL 数据源的同学类似的解决方案。大家可以按需所取,因地制宜,为自己的网站设计属于自己的实时站内搜索引擎!

与基于Kafka和Elasticsearch构建实时站内搜索功能的实践相似的内容:

基于Kafka和Elasticsearch构建实时站内搜索功能的实践

目前我们在构建一个多租户多产品类网站,为了让用户更好的找到他们所需要的产品,我们需要构建站内搜索功能,并且它应该是实时更新的。本文将会讨论构建这一功能的核心基础设施,以及支持此搜索能力的技术栈。

[转帖]kafka漏洞升级记录,基于SASL JAAS 配置和 SASL 协议,涉及版本3.4以下

攻击者可以使用基于 SASL JAAS 配置和 SASL 协议的任意 Kafka 客户端,在对 Kafka Connect worker 创建或修改连接器时,通过构造特殊的配置,进行 JNDI 注入。 影响范围:2.3.0 <= Apache Kafka <= 3.3.2 解决办法:升级到3.4版本

Kafka为什么这么快?

Kafka 是一个基于发布-订阅模式的消息系统,它可以在多个生产者和消费者之间传递大量的数据。Kafka 的一个显著特点是它的高吞吐率,即每秒可以处理百万级别的消息。那么 Kafka 是如何实现这样高得性能呢?本文将从七个方面来分析 Kafka 的速度优势。 - 零拷贝技术 - 仅可追加日志结构 -

Strimzi Kafka Bridge(桥接)实战之三:自制sdk(golang版本)

欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos 本篇概览 本文是《Strimzi Kafka Bridge(桥接)实战》的第三篇,前文咱们掌握了Strimzi Kafka Bridge的基本功能:基于h

[转帖]Kafka-LEO和HW概念及更新流程

https://www.cnblogs.com/youngchaolin/p/12641463.html 目录 LEO&HW基本概念 LEO&HW更新流程 LEO HW 更新流程示例分析 引言 记录下和kafka相关的LEO和HW的内容,文中很多理解参考文末书籍还有某前辈。 回到顶部 LEO&HW基

从kafka与Flink的事务原理来看二阶段提交与事务日志的结合使用

两阶段提交的成立要基于以下假设: - 该分布式系统中,存在一个节点作为协调者,其他节点作为参与者,且节点之间可以进行网络通信。 - 所有节点都采用预写式日志,且日志被写入后即被保存在可靠的存储设备上,即使节点损坏也不会导致日志数据的丢失。 - 所有节点不会永久性损坏,即使损坏后也可以恢复。 ###

[转帖]Kafka可靠性之HW与Leader Epoch

《深入理解Kafka:核心设计与实现原理》是基于2.0.0版本的书 在这本书中,终于看懂了笔者之前提过的几个问题 准备知识 1、leader里存着4个数据:leader_LEO、leader_HW、remote_LEO集合、remote_HW集合 2、follower里只保存自身的:follower

聊聊我认为的分布式、集群实现关键点

基于常见的中间件(Mysql、ElasticSearch、Zookeeper、Kafka、Redis)等分布式集群设计的机制,自己总结了在在集群设计过程中需要考虑的通用问题。 ### 节点通信机制 主节点的增加、删除、通信机制。 ### 路由算法 即数据路由到哪个节点的策略机制。在集群内有多个节点,

Kafka 线上性能调优

Kafka 线上性能调优是一项综合工程,不仅仅是 Kafka 本身,还应该从硬件(存储、网络、CPU)以及操作系统方面来整体考量,首先我们要有一套生产部署方案,基于这套方案再进行调优,这样就有了可靠的底层保证,才能保证 Kafka 集群整体的稳定性。 1. 线上部署方案 1.1 操作系统 我们知道

[转帖]Day63_Kafka(一)

第一讲 Kafka基础操作 课程大纲 课程内容 学习效果 掌握目标 Kafka简介 消息队列 掌握 Kafka简介 Kafka分布式环境 Kafka操作 Kafka shell 掌握 Kafka api Flume整合kafka 一、Kafka简介 (一)消息队列 1、为甚要有消息队列 2、消息队列