使用背景
项目后台使用mongoDb,门户页使用elasticsearch,需求是将mongoDb的数据同步到elasticsearch中。于是联想到mysql有基础binlog开发的canal。mg是否有类似binlog这种机制实现的实时监控同步工具。
方案
- MongoShake(项目地址:https://github.com/alibaba/MongoShake)
- ChangeStream (monggodb的3.6版本之后推出的)
MongoShake
MongoShake是一个以go语言编写的通用的平台型服务,通过读取MongoDB集群的Oplog日志, 对MongoDB的数据进行复制,后续通过操作日志实现特定需求。
MongoShake从源库抓取oplog数据,然后发送到各个不同的tunnel通道。源库支持:ReplicaSet,Sharding,Mongod,目的库支持:Mongos,Mongod。阿里官方文档: https://developer.aliyun.com/article/763827?spm=a2c6h.13813017.content3.1.3eb032a4yx0ZoM#slide-11
项目地址: https://github.com/alibaba/MongoShake
ChangeStream
changestream是monggodb的3.6版本之后出现的一种基于collection(数据库集合)的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更
想必对mysql主从复制原理比较熟悉的同学应该知道,其根本就是从节点通过监听binlog日志,然后解析binlog日志数据达到数据同步的目的,于是,基于mysql主从复制原理,阿里开源了canal这样的数据同步中间件工具
关于changestream做如下说明,提供参考
- 在该特性出现之前,开发者可通过拉取 oplog达到同样的目的;
- 但 oplog 的处理及解析相对复杂,而且存在被回滚的风险,如果使用不当的话还会带来性能问题;
- Change Stream 可以与aggregate framework结合使用,对变更集进行进一步的过滤或转换;
- 由于Change Stream 利用了存储在 oplog 中的信息,因此对于单进程部署的MongoDB无法支持Change Stream功能,其只能用于启用了副本集的独立集群或分片集群
changestream可用于监听的mongodb目标类型
- 单个集合,除系统库(admin/local/config)之外的集合,3.6版本支持
- 单个数据库,除系统库(admin/local/config)之外的数据库集合,4.0版本支持
- 整个集群,整个集群内除去系统库( (admin/local/config)之外的集合 ,4.0版本支持
一个Change Stream Event的基本结构如下所示:
{
_id : { <BSON Object> },
"operationType" : "<operation>",
"fullDocument" : { <document> },
"ns" : {
"db" : "<database>",
"coll" : "<collection"
},
"documentKey" : { "_id" : <ObjectId> },
"updateDescription" : {
"updatedFields" : { <document> },
"removedFields" : [ "<field>", ... ]
}
"clusterTime" : <Timestamp>,
"txnNumber" : <NumberLong>,
"lsid" : {
"id" : <UUID>,
"uid" : <BinData>
}
}
关于上面的数据结构,做简单的解释说明,
- _id,变更事件的Token对象
- operationType,变更类型(见下面介绍)
- fullDocument,文档内容
- ns,监听的目标
- ns.db,变更的数据库
- ns.coll,变更的集合
- documentKey,变更文档的键值,含_id字段
- updateDescription,变更描述
- updateDescription.updatedFields,变更中更新字段
- updateDescription.removedFields,变更中删除字段
- clusterTime,对应oplog的时间戳
- txnNumber,事务编号,仅在多文档事务中出现,4.0版本支持
- lsid,事务关联的会话编号,仅在多文档事务中出现,4.0版本支持
Change Steram支持的变更类型,对于上面的operationType 这个参数,主要包括有以下几个:
- insert,插入文档
- delete,删除文档
- replace,替换文档,当执行replace操作指定upsert时,可能是insert事件
- update,更新文档,当执行update操作指定upsert时,可能是insert事件
- invalidate,失效事件,比如执行了collection.drop或collection.rename
以上的几种类型,可以简单理解为,监听的mongo用户操作的事件类型,比如新增数据,删除数据,修改数据等
Java客户端操作changestream
1、引入maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
2、注册MessageListenerContainer
package com.iflytek.databus.conf;
import com.iflytek.databus.entity.OriginalOplogCkpt;
import com.iflytek.databus.init.DocumnetMessageListener;
import com.mongodb.client.model.changestream.FullDocument;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.bson.BsonDocument;
import org.bson.Document;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import java.util.concurrent.Executor;
import static org.springframework.data.mongodb.core.aggregation.Aggregation.match;
import static org.springframework.data.mongodb.core.aggregation.Aggregation.newAggregation;
import static org.springframework.data.mongodb.core.query.Criteria.where;
/**
* @author xyding6
* @Classname MongoConfig
* @Description TODO
* @date 2023/2/21
*/
@Configuration
@Slf4j
public class MongoConfig {
@Value("${spring.data.mongodb.database}")
String mongoDatabase;
@Bean
MessageListenerContainer messageListenerContainer(MongoTemplate template, DocumnetMessageListener documnetMessageListener,Executor asyncExecutor) {
// Executor executor = Executors.newSingleThreadExecutor();
MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, asyncExecutor) {
@Override
public boolean isAutoStartup() {
return true;
}
};
//oplog_check_point
OriginalOplogCkpt originalOplogCkpt = template.findById("oplog_check_point", OriginalOplogCkpt.class, "original_oplog_ckpt");
ChangeStreamRequest.ChangeStreamRequestBuilder<Document> documentChangeStreamRequestBuilder = ChangeStreamRequest.builder(documnetMessageListener)
.database(mongoDatabase)
// .collection("original_aydoc_test") //需要监听的集合名,不指定默认监听数据库的
.filter(newAggregation(match(where("operationType").in("insert", "update", "replace", "delete")))) //过滤需要监听的操作类型,可以根据需求指定过滤条件
.fullDocumentLookup(FullDocument.UPDATE_LOOKUP);//不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息
if(originalOplogCkpt != null && StringUtils.isNotBlank(originalOplogCkpt.getResumeToken())){
log.info("==mongo监听从位点处恢复执行,位点resumeToken:{}==",originalOplogCkpt.getResumeToken());
documentChangeStreamRequestBuilder.resumeToken(BsonDocument.parse(originalOplogCkpt.getResumeToken()));
}
ChangeStreamRequest request = documentChangeStreamRequestBuilder.build();
//TODO 测试暂不开启
messageListenerContainer.register(request, Document.class);
return messageListenerContainer;
}
}
这一步是注册自己编写的监听器,oplog_check_point这是本地mg的一个集合,用来记录每次位点的信息。
3. 编写MessageListener
package com.iflytek.databus.init;
import com.google.common.collect.Lists;
import com.iflytek.databus.entity.OriginalOplogCkpt;
import com.iflytek.databus.service.TransToEsService;
import com.iflytek.databus.utils.EntityUtils;
import com.iflytek.databus.utils.GsonUtils;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.OperationType;
import lombok.extern.slf4j.Slf4j;
import org.bson.Document;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.MessageListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @author xyding6
* @Classname DocumnetMessageListener
* @Description TODO
* @date 2023/2/21
*/
@Component
@Slf4j
public class DocumnetMessageListener implements MessageListener<ChangeStreamDocument<Document>, Document> {
@Autowired
RestHighLevelClient client;
@Autowired
MongoTemplate mongoTemplate;
@Autowired
TransToEsService transToEsService;
@Override
public void onMessage(Message<ChangeStreamDocument<Document>, Document> message) {
try {
//数据库
String databaseName = message.getProperties().getDatabaseName();
//集合名称
String collectionName = message.getProperties().getCollectionName();
//操作类型
String operationType = message.getRaw().getOperationType().getValue();
//位点token
String resumeToken = message.getRaw().getResumeToken().toJson();
//消息
Document fullDocument = message.getRaw().getFullDocument();
if(Lists.newArrayList("original_oplog_ckpt","original_mq_log","original_sys_login_log").contains(collectionName)){
return;
}
log.info("Received Message in collection: {},message raw: {}, message body:{}",
message.getProperties().getCollectionName(), message.getRaw(), message.getBody());
//消息主键
String messageId = message.getRaw().getDocumentKey().getObjectId("_id").getValue().toHexString();
Class clazz = Class.forName("com.iflytek.databus.entity."+ EntityUtils.toHump(collectionName.replace("original_","")));
//1. 处理消息 TODO 考虑放入消息中间件消费
if(Lists.newArrayList(OperationType.INSERT.getValue(),OperationType.UPDATE.getValue(),OperationType.REPLACE.getValue()).contains(operationType)){
Object claszzObj = GsonUtils.fromJsonToBean(fullDocument, clazz);
transToEsService.mongoToEs(collectionName, Lists.newArrayList(claszzObj),false);
}
if(OperationType.DELETE.getValue().equals(operationType)){
transToEsService.oplogDelTrigger(collectionName,messageId,clazz);
}
//2. 每次处理消息成功,更新resumeToken,以便于故障重启后能继续同步消息
OriginalOplogCkpt originalOplogCkpt = new OriginalOplogCkpt();
originalOplogCkpt.setId("oplog_check_point");
originalOplogCkpt.setMessageId(messageId);
originalOplogCkpt.setDatabaseName(databaseName);
originalOplogCkpt.setCollectionName(collectionName);
originalOplogCkpt.setOperationType(operationType);
originalOplogCkpt.setResumeToken(resumeToken);
originalOplogCkpt.setCreateTime(new Date());
mongoTemplate.save(originalOplogCkpt);
} catch (Exception e) {
e.printStackTrace();
log.error("Mongo消息实时监控处理失败,错误信息:{}",e.getStackTrace());
}
}
}
消息监听器中可以去实现自己对消息处理的逻辑,这里我的需求是将mg的变化同步到es中。
4. 动态的启停changestream
package com.iflytek.databus.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.stereotype.Service;
/**
* @author xyding6
* @Classname ChangeStreamService
* @Description TODO
* @date 2023/2/24
*/
@Service
@Slf4j
public class ChangeStreamService {
@Autowired
private MessageListenerContainer messageListenerContainer;
public void start(){
log.info("ChangeStreamService start");
messageListenerContainer.start();
}
public void stop(){
log.info("ChangeStreamService stop MessageListenerContainer");
messageListenerContainer.stop();
}
}
可以在容器运行过程中,去操作changestream的启停。自己可以编写Controller来进行调用。
到此这篇关于springboot整合mongodb changestream的示例代码的文章就介绍到这了,如果项目对性能和延迟性要求不是太高可以选中changestream进行实现,否则还是推荐使用阿里开源的MongoShake.
版权声明:本文为博主作者:Ayding艾丁原创文章,版权归属原作者,如果侵权,请联系我们删除!
原文链接:https://blog.csdn.net/weixin_43947714/article/details/129954016