系列五、Java操作RocketMQ

一、环境搭建

1.1、pom.xml

<dependencies>

	<dependency>
		<groupId>com.alibaba</groupId>
		<artifactId>fastjson</artifactId>
		<version>1.2.83</version>
	</dependency>

	<dependency>
		<groupId>org.apache.rocketmq</groupId>
		<artifactId>rocketmq-client</artifactId>
		<version>4.9.2</version>
	</dependency>

	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.18.22</version>
	</dependency>
	<dependency>
		<groupId>org.slf4j</groupId>
		<artifactId>slf4j-api</artifactId>
		<version>1.7.32</version>
	</dependency>
	<dependency>
		<groupId>ch.qos.logback</groupId>
		<artifactId>logback-classic</artifactId>
		<version>1.2.10</version>
	</dependency>

	<dependency>
		<groupId>junit</groupId>
		<artifactId>junit</artifactId>
		<version>4.13.2</version>
	</dependency>

	<dependency>
		<groupId>org.apache.commons</groupId>
		<artifactId>commons-lang3</artifactId>
		<version>3.11</version>
	</dependency>

	<dependency>
		<groupId>org.apache.commons</groupId>
		<artifactId>commons-collections4</artifactId>
		<version>4.4</version>
	</dependency>
</dependencies>

1.2、常量类

public class RocketMQConstant {

    public static final String NAME_SERVER_ADDR = "";

}

二、案例代码

2.1、发送&接收简单消息

2.1.1、应用场景

发送后会有一个返回值,这种方式非常安全,但是性能上没有那么高,而且在MQ集群中,也是要等到所有的从机都复制了消息之后才返回,所以针对重要的消息可以选择这种方式。

2.1.2、发送简单消息

/**
 * 发送同步消息
 *
 * @throws Exception
 */
@Test
public void sendSyncMessage() throws Exception {
	// 1、创建生产者,指定组名
	DefaultMQProducer producer = new MyDefaultMQProducer("GROUP_SYNC");
	producer.setSendMsgTimeout(1000 * 10);
	// 2、指定Namesrv地址
	producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	// 3、启动生产者者
	producer.start();
	for (int i = 1; i <= 10; i++) {
		// 4、创建消息对象,指定Topic、Tag、消息体
		Message message = new Message("TOPIC_SYNC", "TAGS_SYNC", (" 我是同步消息 " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
		// 5、发送消息
		SendResult sendResult = producer.send(message);
		log.info("i:{},发送同步消息响应结果:{}", i, JSON.toJSONString(sendResult));
	}
	// 6、关闭生产者
	producer.shutdown();
}

 

2.1.3、接收简单消息

/**
 * 接收同步消息
 * @throws Exception
 */
@Test
public void receiveSyncMessage() throws Exception {
	// 1、创建消费者
	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_SYNC");
	// 2、连接Namesrv
	consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	/**
	 * 3、订阅主题
	 *      参数1:主题名称
	 *      参数2:主题中消息类型,*表示订阅所有
	 */
	consumer.subscribe("TOPIC_SYNC","*");
	// 4、注册监听器
	consumer.registerMessageListener(new MessageListenerConcurrently() {
		@Override
		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
			log.info("消费者消费消息");
			log.info("消息:{}", JSON.toJSONString(msgs.get(0)));
			log.info("消息内容:{}", new String(msgs.get(0).getBody()));
			log.info("消费上下文:{}",context);

			/**
			 * CONSUME_SUCCESS:成功,消息会从MQ出队
			 * RECONSUME_LATER:失败,消息会重新回到队列,过一会儿重新投递出来,给当前消费者或者其他消费者消费
			 */
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		}
	});
	// 5、启动消费者
	consumer.start();
	// 6、挂起当前JVM
	System.in.read();
}

2.2、发送&接收异步消息

2.2.1、应用场景

异步消息通常应用在对响应时间比较敏感的业务场景中,即发送端不能容忍长时间地等待Broker的响应,发送完毕后会有一个异步消息通知。

2.2.2、发送异步消息

/**
 * 发送异步消息
 * 应用场景:异步消息通常应用在对响应时间比较敏感的业务场景中,即发送端不能容忍长时间地等待Broker的响应,发送完毕后会有一个异步消息通知
 */
@Test
public void sendAsyncMessage() throws Exception {
	// 1、创建生产者,指定组名
	DefaultMQProducer producer = new MyDefaultMQProducer("GROUP_ASYNC");
	producer.setSendMsgTimeout(1000 * 10);
	// 2、指定Namesrv地址
	producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	// 3、启动生产者者
	producer.start();
	// 4、创建消息对象
	Message message = new Message("TOPIC_ASYNC", "我是一个异步消息".getBytes());
	// 5、发送消息
	producer.send(message, new SendCallback() {
		@Override
		public void onSuccess(SendResult sendResult) {
			log.info("异步消息发送成功! sendResult:{}", JSON.toJSONString(sendResult));
		}

		@Override
		public void onException(Throwable throwable) {
			log.info("异步消息发送失败! errorMsg:{}", throwable.getMessage());
		}
	});
	log.info("==================我先执行==================");
	// 挂起jvm,因为回调是异步的,不然测试不出来
	System.in.read();
	// 6、关闭生产者
	producer.shutdown();
}

2.2.3、接收异步消息

/**
 * 接收异步消息
 * @throws Exception
 */
@Test
public void receiveAsyncMessage() throws Exception {
	// 1、创建消费者
	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_ASYNC");
	// 2、连接Namesrv
	consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	/**
	 * 3、订阅主题
	 *      参数1:主题名称
	 *      参数2:主题中消息类型,*表示订阅所有
	 */
	consumer.subscribe("TOPIC_ASYNC","*");
	// 4、注册监听器
	consumer.registerMessageListener(new MessageListenerConcurrently() {
		@Override
		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
			log.info("接收异步消息,消息内容:{}", new String(msgs.get(0).getBody()));

			/**
			 * CONSUME_SUCCESS:成功,消息会从MQ出队
			 * RECONSUME_LATER:失败,消息会重新回到队列,过一会儿重新投递出来,给当前消费者或者其他消费者消费
			 */
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		}
	});
	// 5、启动消费者
	consumer.start();
	// 6、挂起当前JVM
	System.in.read();
}

2.3、发送&接收单向消息

2.3.1、应用场景

发送单向消息主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送。

2.3.2、发送单向消息

/**
 * 发送单向消息
 * 应用场景:发送单向消息主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送
 *
 * @throws Exception
 */
@Test
public void sendOnewayMessage() throws Exception {
	// 1、创建生产者,指定组名
	DefaultMQProducer producer = new MyDefaultMQProducer("GROUP_ONE_WAY");
	producer.setSendMsgTimeout(1000 * 10);
	// 2、指定Namesrv地址
	producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	// 3、启动生产者者
	producer.start();
	// 4、创建消息对象
	Message message = new Message("TOPIC_ONE_WAY", "我是一个单向消息".getBytes());
	// 5、发送消息(发送单向消息无返回值)
	producer.sendOneway(message);
	// 6、关闭生产者
	producer.shutdown();
	log.info("发送单向消息 success!");
}

2.3.3、接收单向消息

/**
 * 接收单向消息
 * @throws Exception
 */
@Test
public void receiveOnewayMessage() throws Exception {
	// 1、创建消费者
	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_ONE_WAY");
	// 2、连接Namesrv
	consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	/**
	 * 3、订阅主题
	 *      参数1:主题名称
	 *      参数2:主题中消息类型,*表示订阅所有
	 */
	consumer.subscribe("TOPIC_ONE_WAY","*");
	// 4、注册监听器
	consumer.registerMessageListener(new MessageListenerConcurrently() {
		@Override
		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
			log.info("接收单向消息,消息内容:{}", new String(msgs.get(0).getBody()));

			/**
			 * CONSUME_SUCCESS:成功,消息会从MQ出队
			 * RECONSUME_LATER:失败,消息会重新回到队列,过一会儿重新投递出来,给当前消费者或者其他消费者消费
			 */
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		}
	});
	// 5、启动消费者
	consumer.start();
	// 6、挂起当前JVM
	System.in.read();
}

2.4、发送&接收延迟消息

2.4.1、应用场景

消息放入MQ后,过一段时间才会被监听到,然后消费。比如下订单业务,提交了一个订单就可以发送一个延迟消息,30min后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

2.4.2、发送延迟消息

/**
 * 发送延迟消息
 * 应用场景:
 *     消息放入MQ后,过一段时间才会被监听到,然后消费。比如下订单业务,提交了一个订单就可以发送一个延迟消息,30min后去检查这个订单的状态,
 * 如果还是未付款就取消订单释放库存。
 *
 * 注意事项:
 *      RocketMQ不支持任意时间的延迟,只支持以下几个固定的延时等级,等级1对应延时1s、等级2对应延时5s、等级3对应延时10s,最高支持2h延时。
 *      (1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)
 *
 * @throws Exception
 */
@Test
public void sendDelayMessage() throws Exception {
	// 1、创建生产者,指定组名
	DefaultMQProducer producer = new DefaultMQProducer("GROUP_DELAY");
	// 2、指定Namesrv地址
	producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	// 3、启动生产者者
	producer.start();
	// 4、创建消息对象:设定消息的延迟等级(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)
	Message message = new Message("TOPIC_DELAY", "我是一个延迟消息".getBytes());
	message.setDelayTimeLevel(3); // 1对应1s,2对应5s,3对应10s,4对应30s,依次类推
	// 5、发送消息
	SendResult result = producer.send(message);
	log.info("发送延迟消息成功!当前时间{},发送结果:{} " + LocalDateTimeUtil.getCurrentTime(),result.getSendStatus());
	// 6、关闭生产者
	producer.shutdown();

}       

2.4.3、接收延迟消息

/**
 * 接收延迟消息
 * @throws Exception
 */
@Test
public void receiveDelayMessage() throws Exception {
	// 1、创建消费者
	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_DELAY");
	// 2、连接Namesrv
	consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	/**
	 * 3、订阅主题
	 *      参数1:主题名称
	 *      参数2:主题中消息类型,*表示订阅所有
	 */
	consumer.subscribe("TOPIC_DELAY","*");
	// 4、注册监听器
	consumer.registerMessageListener(new MessageListenerConcurrently() {
		@Override
		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
			log.info("接收延迟消息,当前时间:{},消息内容:{}", LocalDateTimeUtil.getCurrentTime(),new String(msgs.get(0).getBody()));

			/**
			 * CONSUME_SUCCESS:成功,消息会从MQ出队
			 * RECONSUME_LATER:失败,消息会重新回到队列,过一会儿重新投递出来,给当前消费者或者其他消费者消费
			 */
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		}
	});
	// 5、启动消费者
	consumer.start();
	// 6、挂起当前JVM
	System.in.read();
}

2.5、发送&接收批量消息

2.5.1、应用场景

RocketMQ可以一次性发送一组消息,那么这一组消息会被当做一个消息消费。

2.5.2、发送批量消息

/**
 * 发送批量消息
 * 应用场景:RocketMQ可以一次性发送一组消息,那么这一组消息会被当做一个消息消费
 * @throws Exception
 */
@Test
public void sendBatchMessage() throws Exception {
	// 1、创建生产者,指定组名
	DefaultMQProducer producer = new DefaultMQProducer("GROUP_BATCH");
	// 2、指定Namesrv地址
	producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	// 3、启动生产者者
	producer.start();
	// 4、创建消息对象
	List<Message> messages = Arrays.asList(
			new Message("TOPIC_BATCH", "我是一组消息的A消息".getBytes()),
			new Message("TOPIC_BATCH", "我是一组消息的B消息".getBytes()),
			new Message("TOPIC_BATCH", "我是一组消息的C消息".getBytes())
	);

	// 5、发送消息
	SendResult sendResult = producer.send(messages);
	log.info("发送批量消息成功,sendResult:{}",JSON.toJSONString(sendResult));
	// 6、关闭生产者
	producer.shutdown();
}

// 控制台打印结果
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
14:24:18.634 [main] INFO org.star.RocketMQProducerTests - 发送批量消息成功,sendResult:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"TOPIC_BATCH"},"msgId":"7F00000125A418B4AAC22C4A2B4A0000,7F00000125A418B4AAC22C4A2B4A0001,7F00000125A418B4AAC22C4A2B4A0002","offsetMsgId":"C0A8B58A00002A9F000000000000192E,C0A8B58A00002A9F00000000000019E3,C0A8B58A00002A9F0000000000001A98","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

2.5.3、接收批量消息

/**
 * 接收批量消息
 * @throws Exception
 */
@Test
public void receiveBatchMessage() throws Exception {
	// 1、创建消费者
	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_BATCH");
	// 2、连接Namesrv
	consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	/**
	 * 3、订阅主题
	 *      参数1:主题名称
	 *      参数2:主题中消息类型,*表示订阅所有
	 */
	consumer.subscribe("TOPIC_BATCH","*");
	// 4、注册监听器
	consumer.registerMessageListener(new MessageListenerConcurrently() {
		@Override
		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
			log.info("接收批量消息,消息内容:{}", new String(msgs.get(0).getBody()));

			/**
			 * CONSUME_SUCCESS:成功,消息会从MQ出队
			 * RECONSUME_LATER:失败,消息会重新回到队列,过一会儿重新投递出来,给当前消费者或者其他消费者消费
			 */
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		}
	});
	// 5、启动消费者
	consumer.start();
	// 6、挂起当前JVM
	System.in.read();
}

2.6、发送&接收顺序消息

2.6.1、发送顺序消息

/**
 * 发送顺序消息:
 *      消息有序指的是可以按照消息的发送顺序来消费(FIFO),RocketMQ可以严格的保证消息有序,可以分为:分区有序或者全局有序。可能大家会有疑问,MQ不就是FIFO吗?RocketMQ中的Broker机制,
 *  导致了RocketMQ会有这个问题,因为一个Broker中对应了4个queue
 *
 *  顺序消费消息的原理:
 *  https://blog.csdn.net/weixin_43767015/article/details/121028059
 *
 *  场景分析:
 *      模拟一个订单的发送流程,创建两个订单,发送的消息分别是:
 *      订单号orderSn1消息流程 下订单->物流->签收
 *      订单号orderSn2消息流程 下订单->物流->拒收
 * @throws Exception
 */
@Test
public void sendOrderMessage() throws Exception {
	// 1、创建生产者,指定组名
	DefaultMQProducer producer = new DefaultMQProducer("GROUP_ORDER");
	// 2、指定Namesrv地址
	producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	// 3、启动生产者者
	producer.start();

	// 4、创建消息对象
	Integer orderSn1 = OrderSnUtil.generateOrderSn();
	Integer orderSn2 = OrderSnUtil.generateOrderSn();
	List<OrderDTO> orders = Arrays.asList(
			new OrderDTO(SnowflakeIdUtil.generateId(), orderSn1, 59D, LocalDateTime.now(), "下订单"),
			new OrderDTO(SnowflakeIdUtil.generateId(), orderSn1, 59D, LocalDateTime.now(), "物流"),
			new OrderDTO(SnowflakeIdUtil.generateId(), orderSn1, 59D, LocalDateTime.now(), "签收"),
			new OrderDTO(SnowflakeIdUtil.generateId(), orderSn2, 89D, LocalDateTime.now(), "下订单"),
			new OrderDTO(SnowflakeIdUtil.generateId(), orderSn2, 89D, LocalDateTime.now(), "物流"),
			new OrderDTO(SnowflakeIdUtil.generateId(), orderSn2, 89D, LocalDateTime.now(), "拒收")
	);
	// 5、发送消息
	orders.forEach(e -> {
		Message message = new Message("TOPIC_ORDER", e.toString().getBytes());
		try {
			// 发送的时候,相同的订单号选择同一个队列
			producer.send(message, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List<MessageQueue> messageQueues, Message message, Object o) {
					// 5.1、当前主题有多少个队列
					int size = messageQueues.size();
					// 5.2、这个o就是 e.getOrderSn()
					Integer orderSn = (Integer) o;
					// 5.3、用这个值去 % 队列的个数,得到一个队列
					int index = orderSn % size;
					log.info("发送顺序消息,当前主题队列个数:{},订单号:{},队列索引:{}",size,orderSn,index);
					// 5.4、返回选择的这个队列即可,那么相同的订单号就会被放在相同的队列里,进而实现FIFO
					return messageQueues.get(index);
				}
			}, e.getOrderSn());
		} catch (Exception ex) {
			log.info("发送顺序消息 error:{}",ex.getMessage());
		}
	});
	log.info("发送顺序消息 success! ");
	// 6、关闭生产者
	producer.shutdown();

}

2.6.2、接收顺序消息

/**
 * 接收顺序消息
 * @throws Exception
 */
@Test
public void receiveOrderMessage() throws Exception {
	// 1、创建消费者
	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_ORDER");
	// 2、连接Namesrv
	consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	/**
	 * 3、订阅主题
	 *      参数1:主题名称
	 *      参数2:主题中消息类型,*表示订阅所有
	 */
	consumer.subscribe("TOPIC_ORDER","*");
	// 4、注册监听器
	consumer.registerMessageListener(new MessageListenerConcurrently() {
		@Override
		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
			log.info("接收顺序消息,消息内容:{}", new String(msgs.get(0).getBody()));

			/**
			 * CONSUME_SUCCESS:成功,消息会从MQ出队
			 * RECONSUME_LATER:失败,消息会重新回到队列,过一会儿重新投递出来,给当前消费者或者其他消费者消费
			 */
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		}
	});
	// 5、启动消费者
	consumer.start();
	// 6、挂起当前JVM
	System.in.read();
}
// 控制台打印结果

14:51:35.388 [ConsumeMessageThread_7] INFO org.star.RocketMQConsumerTests - 接收顺序消息,消息内容:OrderDTO(orderId=849146426436935680, orderSn=129941428, price=59.0, createTime=2023-08-09T14:51:34.672, description=下订单)
14:51:35.391 [ConsumeMessageThread_8] INFO org.star.RocketMQConsumerTests - 接收顺序消息,消息内容:OrderDTO(orderId=849146426462101504, orderSn=129941428, price=59.0, createTime=2023-08-09T14:51:34.672, description=物流)
14:51:35.398 [ConsumeMessageThread_9] INFO org.star.RocketMQConsumerTests - 接收顺序消息,消息内容:OrderDTO(orderId=849146426462101505, orderSn=129941428, price=59.0, createTime=2023-08-09T14:51:34.672, description=签收)
14:51:35.400 [ConsumeMessageThread_10] INFO org.star.RocketMQConsumerTests - 接收顺序消息,消息内容:OrderDTO(orderId=849146426462101506, orderSn=14790438, price=89.0, createTime=2023-08-09T14:51:34.672, description=下订单)
14:51:35.402 [ConsumeMessageThread_11] INFO org.star.RocketMQConsumerTests - 接收顺序消息,消息内容:OrderDTO(orderId=849146426462101507, orderSn=14790438, price=89.0, createTime=2023-08-09T14:51:34.672, description=物流)
14:51:35.402 [ConsumeMessageThread_12] INFO org.star.RocketMQConsumerTests - 接收顺序消息,消息内容:OrderDTO(orderId=849146426462101508, orderSn=14790438, price=89.0, createTime=2023-08-09T14:51:34.672, description=拒收)

2.7、发送&接收带标签的消息

2.7.1、发送带标签的消息

/**
 * 发送标签消息
 * 场景:我们往一个主题里面发送消息的时候,根据业务逻辑,可能需要区分,比如带有tagA标签的被A消费,带有tagB标签的被B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,我们也需要通过过滤进行区别对待
 *
 * 问:什么时候该使用Topic?什么时候该使用Tag?
 * 答:不同的业务应该使用不同的Topic,如果是相同的业务,里面有不同表的表现形式,那么我们要使用tag进行区分;
 *
 * 可以从以下几个方面进行判断
 *      (1)消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的Topic,无法通过 Tag 进行区分
 *      (2)业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的Topic进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分
 *      (3)消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分
 *      (4)消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic
 * 总结:
 *      总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,
 *   而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系
 * @throws Exception
 */
@Test
public void sendTagMessage() throws Exception {
	// 1、创建生产者,指定组名
	DefaultMQProducer producer = new DefaultMQProducer("GROUP_TAG");
	// 2、指定Namesrv地址
	producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	// 3、启动生产者者
	producer.start();
	// 4、创建消息对象
	Message message1 = new Message("TOPIC_TAG", "TAG_VIP1","我是VIP1的文章".getBytes());
	Message message2 = new Message("TOPIC_TAG", "TAG_VIP2","我是VIP2的文章".getBytes());
	// 5、发送消息
	SendResult sendResult1 = producer.send(message1);
	SendResult sendResult2 = producer.send(message2);
	log.info("发送标签消息成功!sendResult1:{},sendResult2:{}",JSON.toJSONString(sendResult1),JSON.toJSONString(sendResult2));
	// 6、关闭生产者
	producer.shutdown();
}

// 控制台打印结果
15:15:48.365 [main] INFO org.star.RocketMQProducerTests - 发送标签消息成功!sendResult1:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"TOPIC_TAG"},"msgId":"7F00000105CC18B4AAC22C7952020000","offsetMsgId":"C0A8B58A00002A9F0000000000002B25","queueOffset":2,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true},sendResult2:{"messageQueue":{"brokerName":"broker-a","queueId":0,"topic":"TOPIC_TAG"},"msgId":"7F00000105CC18B4AAC22C7952080001","offsetMsgId":"C0A8B58A00002A9F0000000000002BDD","queueOffset":1,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

2.7.2、接收带标签的消息

/**
 * 接收带有标签的消息
 * @throws Exception
 */
@Test
public void receiveTagMessage1() throws Exception {
	// 1、创建消费者
	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TAG");
	// 2、连接Namesrv
	consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	/**
	 * 3、订阅主题
	 *      参数1:主题名称
	 *      参数2:主题中消息类型
	 *              *:表示订阅所有
	 *              TAG_NAME_A || TAG_NAME_B || TAG_NAME_C:表示只要符合其中任何一个标签都可以消费
	 */
	consumer.subscribe("TOPIC_TAG","TAG_VIP1");
	// 4、注册监听器(MessageListenerConcurrently:并发消费,默认20个线程一起消费,可以参考consumer.setConsumeThreadMax())
	consumer.registerMessageListener(new MessageListenerConcurrently() {
		@Override
		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
			log.info("我是VIP1的消费者,消息内容:{}", new String(msgs.get(0).getBody()));

			/**
			 * CONSUME_SUCCESS:成功,消息会从MQ出队
			 * RECONSUME_LATER:失败,消息会重新回到队列,过一会儿重新投递出来,给当前消费者或者其他消费者消费
			 */
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		}
	});
	// 5、启动消费者
	consumer.start();
	// 6、挂起当前JVM
	System.in.read();
}

// 控制台打印结果
15:17:14.878 [ConsumeMessageThread_1] INFO org.star.RocketMQConsumerTests - 我是VIP1的消费者,消息内容:我是VIP1的文章
@Test
public void receiveTagMessage2() throws Exception {
	// 1、创建消费者
	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TAG");
	// 2、连接Namesrv
	consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	/**
	 * 3、订阅主题
	 *      参数1:主题名称
	 *      参数2:主题中消息类型
	 *              *:表示订阅所有
	 *              TAG_NAME_A || TAG_NAME_B || TAG_NAME_C:表示只要符合其中任何一个标签都可以消费
	 */
	consumer.subscribe("TOPIC_TAG","TAG_VIP1 || TAG_VIP2");
	// 4、注册监听器(MessageListenerConcurrently:并发消费,默认20个线程一起消费,可以参考consumer.setConsumeThreadMax())
	consumer.registerMessageListener(new MessageListenerConcurrently() {
		@Override
		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
			log.info("我是VIP2的消费者,消息内容:{}", new String(msgs.get(0).getBody()));

			/**
			 * CONSUME_SUCCESS:成功,消息会从MQ出队
			 * RECONSUME_LATER:失败,消息会重新回到队列,过一会儿重新投递出来,给当前消费者或者其他消费者消费
			 */
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		}
	});
	// 5、启动消费者
	consumer.start();
	// 6、挂起当前JVM
	System.in.read();
}

2.8、发送&接收带Key的消息

2.8.1、发送带Key的消息

/**
 * 发送带key的消息
 * @throws Exception
 */
@Test
public void sendKeyMessage() throws Exception {
	// 1、创建生产者,指定组名
	DefaultMQProducer producer = new DefaultMQProducer("GROUP_KEY");
	// 2、指定Namesrv地址
	producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	// 3、启动生产者者
	producer.start();
	// 4、创建消息对象
	String key = UUID.randomUUID().toString();
	Message message = new Message("TOPIC_KEY", "TAG_VIP1",key,("我是key为" + key + " ,tag为TAG_VIP1的消息").getBytes());
	// 5、发送消息
	SendResult sendResult = producer.send(message);
	log.info("发送带key的消息成功! 发送结果:{}",JSON.toJSONString(sendResult));
	// 6、关闭生产者
	producer.shutdown();
}

// 控制台打印结果
15:25:43.515 [main] INFO org.star.RocketMQProducerTests - 发送带key的消息成功! 发送结果:{"messageQueue":{"brokerName":"broker-a","queueId":2,"topic":"TOPIC_KEY"},"msgId":"7F000001697418B4AAC22C8266CF0000","offsetMsgId":"C0A8B58A00002A9F0000000000002E05","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

2.8.2、接收带Key的消息

/**
 * 接收带有key的消息
 * @throws Exception
 */
@Test
public void receiveKeyMessage() throws Exception {
	// 1、创建消费者
	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_KEY");
	// 2、连接Namesrv
	consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	/**
	 * 3、订阅主题
	 *      参数1:主题名称
	 *      参数2:主题中消息类型
	 *              *:表示订阅所有
	 *              TAG_NAME_A || TAG_NAME_B || TAG_NAME_C:表示只要符合其中任何一个标签都可以消费
	 */
	consumer.subscribe("TOPIC_KEY","*");
	// 4、注册监听器(MessageListenerConcurrently:并发消费,默认20个线程一起消费,可以参考consumer.setConsumeThreadMax())
	consumer.registerMessageListener(new MessageListenerConcurrently() {
		@Override
		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
			MessageExt messageExt = msgs.get(0);
			log.info("接收带有key的消息,我是vip1的消费者,我正在消费消息:{}",new String(messageExt.getBody()));
			log.info("我的业务标识:{}",messageExt.getKeys());
			/**
			 * CONSUME_SUCCESS:成功,消息会从MQ出队
			 * RECONSUME_LATER:失败,消息会重新回到队列,过一会儿重新投递出来,给当前消费者或者其他消费者消费
			 */
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		}
	});
	// 5、启动消费者
	consumer.start();
	// 6、挂起当前JVM
	System.in.read();
}

// 控制台打印结果
15:31:15.672 [ConsumeMessageThread_1] INFO org.star.RocketMQConsumerTests - 接收带有key的消息,我是vip1的消费者,我正在消费消息:我是key为83ae5aab-dfc9-4803-afb2-c035633497f6 ,tag为TAG_VIP1的消息
15:31:15.673 [ConsumeMessageThread_1] INFO org.star.RocketMQConsumerTests - 我的业务标识:83ae5aab-dfc9-4803-afb2-c035633497f6

2.9、发送&接收重试消息

2.9.1、发送重试消息

/**
 * 发送重试消息
 * @throws Exception
 */
@Test
public void sendRetryMessage() throws Exception {
	// 1、创建生产者,指定组名
	DefaultMQProducer producer = new DefaultMQProducer("GROUP_RETRY");
	// 2、指定Namesrv地址
	producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	// 3、启动生产者者
	producer.start();
	// 3.1、生产者发送消息,设置重试次数
	producer.setRetryTimesWhenSendFailed(2);
	producer.setRetryTimesWhenSendAsyncFailed(2);
	String key = UUID.randomUUID().toString();
	log.info("sendRetryMessage key:{}",key);
	// 4、创建消息对象
	Message message = new Message("TOPIC_RETRY", "TAG_RETRY", key, "我是一个重试消息".getBytes());
	// 5、发送消息
	SendResult sendResult = producer.send(message);
	log.info("发送重试消息成功! sendResult:{}",JSON.toJSONString(sendResult));
	// 6、关闭生产者
	producer.shutdown();
}
// 控制台打印结果
15:47:26.784 [main] INFO org.star.RocketMQProducerTests - sendRetryMessage key:90235fd5-5c20-4024-81c9-d7a2a650c75b
15:47:27.531 [main] INFO org.star.RocketMQProducerTests - 发送重试消息成功! sendResult:{"messageQueue":{"brokerName":"broker-a","queueId":2,"topic":"TOPIC_RETRY"},"msgId":"7F000001724418B4AAC22C964CA30000","offsetMsgId":"C0A8B58A00002A9F000000000000389E","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

2.9.2、接收重试消息

/**
 * 接收重试消息
 *    重试的时间间隔:
 *    10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
 *    默认重试16次
 *    1.能否自定义重试次数
 *    2.如果重试了16次(并发模式) 顺序模式下(int最大值次)都是失败的?  是一个死信消息 则会放在一个死信主题中去 主题的名称:%DLQ%retry-consumer-group
 *    3.当消息处理失败的时候 该如何正确的处理?
 *    --------------
 *    重试的次数一般 5次
 * @throws Exception
 */
@Test
public void receiveRetryMessage() throws Exception {
	// 1、创建消费者
	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_RETRY");
	// 2、连接Namesrv
	consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
	/**
	 * 3、订阅主题
	 *      参数1:主题名称
	 *      参数2:主题中消息类型
	 *              *:表示订阅所有
	 *              TAG_NAME_A || TAG_NAME_B || TAG_NAME_C:表示只要符合其中任何一个标签都可以消费
	 */
	consumer.subscribe("TOPIC_RETRY","*");
	// 3.1、设定重试次数
	consumer.setMaxReconsumeTimes(2);
	// 4、注册监听器(MessageListenerConcurrently:并发消费,默认20个线程一起消费,可以参考consumer.setConsumeThreadMax())
	consumer.registerMessageListener(new MessageListenerConcurrently() {
		@Override
		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
			MessageExt messageExt = msgs.get(0);
			log.info("接收重复消息时间:{}",messageExt.getReconsumeTimes());
			log.info("{}",new String(messageExt.getBody()));

			/**
			 * CONSUME_SUCCESS:成功,消息会从MQ出队
			 * RECONSUME_LATER:失败,消息会重新回到队列,过一会儿重新投递出来,给当前消费者或者其他消费者消费
			 */
			// 业务报错了 返回null 返回 RECONSUME_LATER 都会重试
			return ConsumeConcurrentlyStatus.RECONSUME_LATER;
		}
	});
	// 5、启动消费者
	consumer.start();
	// 6、挂起当前JVM
	System.in.read();
}

 

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
扎眼的阳光的头像扎眼的阳光普通用户
上一篇 2023年12月21日
下一篇 2023年12月21日

相关推荐