【MongoDB集群基于Oplog日志,实现数据监听】

使用背景

项目后台使用mongoDb,门户页使用elasticsearch,需求是将mongoDb的数据同步到elasticsearch中。于是联想到mysql有基础binlog开发的canal。mg是否有类似binlog这种机制实现的实时监控同步工具。

方案

  1. MongoShake(项目地址:https://github.com/alibaba/MongoShake)
  2. ChangeStream (monggodb的3.6版本之后推出的)

MongoShake

  MongoShake是一个以go语言编写的通用的平台型服务,通过读取MongoDB集群的Oplog日志, 对MongoDB的数据进行复制,后续通过操作日志实现特定需求。
  MongoShake从源库抓取oplog数据,然后发送到各个不同的tunnel通道。源库支持:ReplicaSet,Sharding,Mongod,目的库支持:Mongos,Mongod。

阿里官方文档: https://developer.aliyun.com/article/763827?spm=a2c6h.13813017.content3.1.3eb032a4yx0ZoM#slide-11
项目地址: https://github.com/alibaba/MongoShake

ChangeStream

changestream是monggodb的3.6版本之后出现的一种基于collection(数据库集合)的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更
想必对mysql主从复制原理比较熟悉的同学应该知道,其根本就是从节点通过监听binlog日志,然后解析binlog日志数据达到数据同步的目的,于是,基于mysql主从复制原理,阿里开源了canal这样的数据同步中间件工具

关于changestream做如下说明,提供参考

  • 在该特性出现之前,开发者可通过拉取 oplog达到同样的目的;
  • 但 oplog 的处理及解析相对复杂,而且存在被回滚的风险,如果使用不当的话还会带来性能问题;
  • Change Stream 可以与aggregate framework结合使用,对变更集进行进一步的过滤或转换;
  • 由于Change Stream 利用了存储在 oplog 中的信息,因此对于单进程部署的MongoDB无法支持Change Stream功能,其只能用于启用了副本集的独立集群或分片集群

changestream可用于监听的mongodb目标类型

  • 单个集合,除系统库(admin/local/config)之外的集合,3.6版本支持
  • 单个数据库,除系统库(admin/local/config)之外的数据库集合,4.0版本支持
  • 整个集群,整个集群内除去系统库( (admin/local/config)之外的集合 ,4.0版本支持

一个Change Stream Event的基本结构如下所示:

{
   _id : { <BSON Object> },
   "operationType" : "<operation>",
   "fullDocument" : { <document> },
   "ns" : {
      "db" : "<database>",
      "coll" : "<collection"
   },
   "documentKey" : { "_id" : <ObjectId> },
   "updateDescription" : {
      "updatedFields" : { <document> },
      "removedFields" : [ "<field>", ... ]
   }
   "clusterTime" : <Timestamp>,
   "txnNumber" : <NumberLong>,
   "lsid" : {
      "id" : <UUID>,
      "uid" : <BinData>
   }
}

关于上面的数据结构,做简单的解释说明,

  • _id,变更事件的Token对象
  • operationType,变更类型(见下面介绍)
  • fullDocument,文档内容
  • ns,监听的目标
  • ns.db,变更的数据库
  • ns.coll,变更的集合
  • documentKey,变更文档的键值,含_id字段
  • updateDescription,变更描述
  • updateDescription.updatedFields,变更中更新字段
  • updateDescription.removedFields,变更中删除字段
  • clusterTime,对应oplog的时间戳
  • txnNumber,事务编号,仅在多文档事务中出现,4.0版本支持
  • lsid,事务关联的会话编号,仅在多文档事务中出现,4.0版本支持

Change Steram支持的变更类型,对于上面的operationType 这个参数,主要包括有以下几个:

  • insert,插入文档
  • delete,删除文档
  • replace,替换文档,当执行replace操作指定upsert时,可能是insert事件
  • update,更新文档,当执行update操作指定upsert时,可能是insert事件
  • invalidate,失效事件,比如执行了collection.drop或collection.rename
    以上的几种类型,可以简单理解为,监听的mongo用户操作的事件类型,比如新增数据,删除数据,修改数据等

Java客户端操作changestream

1、引入maven依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

2、注册MessageListenerContainer

package com.iflytek.databus.conf;

import com.iflytek.databus.entity.OriginalOplogCkpt;
import com.iflytek.databus.init.DocumnetMessageListener;
import com.mongodb.client.model.changestream.FullDocument;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.bson.BsonDocument;
import org.bson.Document;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;

import java.util.concurrent.Executor;

import static org.springframework.data.mongodb.core.aggregation.Aggregation.match;
import static org.springframework.data.mongodb.core.aggregation.Aggregation.newAggregation;
import static org.springframework.data.mongodb.core.query.Criteria.where;

/**
 * @author xyding6
 * @Classname MongoConfig
 * @Description TODO
 * @date 2023/2/21
 */
@Configuration
@Slf4j
public class MongoConfig {


    @Value("${spring.data.mongodb.database}")
    String mongoDatabase;


    @Bean
    MessageListenerContainer messageListenerContainer(MongoTemplate template, DocumnetMessageListener documnetMessageListener,Executor asyncExecutor) {
//        Executor executor = Executors.newSingleThreadExecutor();
        MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, asyncExecutor) {
            @Override
            public boolean isAutoStartup() {
                return true;
            }
        };

        //oplog_check_point
        OriginalOplogCkpt originalOplogCkpt = template.findById("oplog_check_point", OriginalOplogCkpt.class, "original_oplog_ckpt");


        ChangeStreamRequest.ChangeStreamRequestBuilder<Document> documentChangeStreamRequestBuilder = ChangeStreamRequest.builder(documnetMessageListener)
                .database(mongoDatabase)
//                .collection("original_aydoc_test")  //需要监听的集合名,不指定默认监听数据库的
                .filter(newAggregation(match(where("operationType").in("insert", "update", "replace", "delete"))))  //过滤需要监听的操作类型,可以根据需求指定过滤条件
                .fullDocumentLookup(FullDocument.UPDATE_LOOKUP);//不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息
        if(originalOplogCkpt != null && StringUtils.isNotBlank(originalOplogCkpt.getResumeToken())){
            log.info("==mongo监听从位点处恢复执行,位点resumeToken:{}==",originalOplogCkpt.getResumeToken());
            documentChangeStreamRequestBuilder.resumeToken(BsonDocument.parse(originalOplogCkpt.getResumeToken()));
        }
        ChangeStreamRequest request = documentChangeStreamRequestBuilder.build();

        //TODO 测试暂不开启
        messageListenerContainer.register(request, Document.class);

        return messageListenerContainer;
    }
}

这一步是注册自己编写的监听器,oplog_check_point这是本地mg的一个集合,用来记录每次位点的信息。

3. 编写MessageListener

package com.iflytek.databus.init;

import com.google.common.collect.Lists;
import com.iflytek.databus.entity.OriginalOplogCkpt;
import com.iflytek.databus.service.TransToEsService;
import com.iflytek.databus.utils.EntityUtils;
import com.iflytek.databus.utils.GsonUtils;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.OperationType;
import lombok.extern.slf4j.Slf4j;
import org.bson.Document;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.MessageListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @author xyding6
 * @Classname DocumnetMessageListener
 * @Description TODO
 * @date 2023/2/21
 */
@Component
@Slf4j
public class DocumnetMessageListener implements MessageListener<ChangeStreamDocument<Document>, Document> {

    @Autowired
    RestHighLevelClient client;
    @Autowired
    MongoTemplate mongoTemplate;
    @Autowired
    TransToEsService transToEsService;

    @Override
    public void onMessage(Message<ChangeStreamDocument<Document>, Document> message) {
        try {
            //数据库
            String databaseName = message.getProperties().getDatabaseName();
            //集合名称
            String collectionName = message.getProperties().getCollectionName();
            //操作类型
            String operationType = message.getRaw().getOperationType().getValue();
            //位点token
            String resumeToken = message.getRaw().getResumeToken().toJson();
            //消息
            Document fullDocument = message.getRaw().getFullDocument();

            if(Lists.newArrayList("original_oplog_ckpt","original_mq_log","original_sys_login_log").contains(collectionName)){
                return;
            }
            log.info("Received Message in collection: {},message raw: {}, message body:{}",
                    message.getProperties().getCollectionName(), message.getRaw(), message.getBody());
            //消息主键
            String messageId = message.getRaw().getDocumentKey().getObjectId("_id").getValue().toHexString();
            Class clazz = Class.forName("com.iflytek.databus.entity."+ EntityUtils.toHump(collectionName.replace("original_","")));
            //1. 处理消息  TODO 考虑放入消息中间件消费
            if(Lists.newArrayList(OperationType.INSERT.getValue(),OperationType.UPDATE.getValue(),OperationType.REPLACE.getValue()).contains(operationType)){
                Object claszzObj = GsonUtils.fromJsonToBean(fullDocument, clazz);
                transToEsService.mongoToEs(collectionName, Lists.newArrayList(claszzObj),false);
            }
            if(OperationType.DELETE.getValue().equals(operationType)){
                transToEsService.oplogDelTrigger(collectionName,messageId,clazz);
            }

            //2. 每次处理消息成功,更新resumeToken,以便于故障重启后能继续同步消息
            OriginalOplogCkpt originalOplogCkpt = new OriginalOplogCkpt();
            originalOplogCkpt.setId("oplog_check_point");
            originalOplogCkpt.setMessageId(messageId);
            originalOplogCkpt.setDatabaseName(databaseName);
            originalOplogCkpt.setCollectionName(collectionName);
            originalOplogCkpt.setOperationType(operationType);
            originalOplogCkpt.setResumeToken(resumeToken);
            originalOplogCkpt.setCreateTime(new Date());
            mongoTemplate.save(originalOplogCkpt);


        } catch (Exception e) {
            e.printStackTrace();
            log.error("Mongo消息实时监控处理失败,错误信息:{}",e.getStackTrace());
        }

    }
}

消息监听器中可以去实现自己对消息处理的逻辑,这里我的需求是将mg的变化同步到es中。

4. 动态的启停changestream

package com.iflytek.databus.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.stereotype.Service;

/**
 * @author xyding6
 * @Classname ChangeStreamService
 * @Description TODO
 * @date 2023/2/24
 */
@Service
@Slf4j
public class ChangeStreamService {

    @Autowired
    private MessageListenerContainer messageListenerContainer;

    public void start(){
        log.info("ChangeStreamService start");
        messageListenerContainer.start();
    }


    public void stop(){
        log.info("ChangeStreamService stop MessageListenerContainer");
        messageListenerContainer.stop();
    }
}

可以在容器运行过程中,去操作changestream的启停。自己可以编写Controller来进行调用。

到此这篇关于springboot整合mongodb changestream的示例代码的文章就介绍到这了,如果项目对性能和延迟性要求不是太高可以选中changestream进行实现,否则还是推荐使用阿里开源的MongoShake.

版权声明:本文为博主作者:Ayding艾丁原创文章,版权归属原作者,如果侵权,请联系我们删除!

原文链接:https://blog.csdn.net/weixin_43947714/article/details/129954016

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
乘风的头像乘风管理团队
上一篇 2024年4月22日
下一篇 2024年4月22日

相关推荐