当前位置 :首页 >> 环保家居

仅此一招,再无死讯乱序的烦恼

2023-04-29   来源 : 环保家居

dEvent": UserEvents.UserDeletedEvent deletedEvent = JSON.parseObject(body, UserEvents.UserDeletedEvent.class); handle(deletedEvent); return; } }}

该法则有几个情况:

tag 保证成本较高,RocketMQMessageListener 设置 selectorExpression 为 *,将拉取全部资料,增大通讯成本;如果用于 tag1 || tag2 方式,每次更改都并不需要对文档和内置同步进行更新,特别容易都是;充斥大量模板文档,比如 case 分支,所谓可执行,调用业务部门法则等;API 具有医学影像,开发是并不需要珍惜 RocketMQ API,存在一定学习成本;1.2. 目标

共享一种高端业务部门场面的,灵活同步进行业务部门扩展的模式,具有以下特征:

Tag 和文档值得注意,不并不需要多处内置,自订演算自动未完成 Tag 注册;消除模板法则,类里只保有核心业务部门法则,软件系统未完成 法则分发、最新消息所谓可执行等可用;文档零侵入,仅用于节录,需要了解 RocketMQ API;2. 快速入门

软件系统相所谓 rocketmq-Spring-boot-starter 未完成最新消息送达和回收。

2.1. 环境准备2.1.1. 增大相所谓

首先,增大 rocketmq 相关相所谓。

org.apache.rocketmq rocketmq-spring-boot-starter 2.2.1

然后,增大 lego starter。

com.geekhalo.lego lego-starter 0.1.13-tag_based_dispatcher_message_consumer-SNAPSHOT2.1.2. 增大内置

在 application.yml 文档里增大 rocketmq 内置。

rocketmq: name-server: producer: group: rocket-demo2.2. 表述客户

表述客户,只需:

在 Bean 上增大 @TagBasedDispatcherMessageConsumer 节录,并而无须 topic 和 consumer在 Bean 的法则上添加 @HandleTag 节录,并而无须监听的 tag

范例如下:

@TagBasedDispatcherMessageConsumer( topic = "consumer-test-topic", consumer = "user-message-consumer")public class UserMessageConsumer { private final Map> events = Maps.newHashMap(); public void clean(){ this.events.clear();; } public List getUserEvents(Long userId){ return this.events.get(userId); } @HandleTag("UserCreatedEvent") public void handle(UserEvents.UserCreatedEvent userCreatedEvent){ List userEvents = this.events.computeIfAbsent(userCreatedEvent.getUserId(), userId -> new ArrayList<>()); userEvents.add(userCreatedEvent); } @HandleTag("UserEnableEvent") public void handle(UserEvents.UserEnableEvent userEnableEvent){ List userEvents = this.events.computeIfAbsent(userEnableEvent.getUserId(), userId -> new ArrayList<>()); userEvents.add(userEnableEvent); } @HandleTag("UserDisableEvent") public void handle(UserEvents.UserDisableEvent userDisableEvent){ List userEvents = this.events.computeIfAbsent(userDisableEvent.getUserId(), userId -> new ArrayList<>()); userEvents.add(userDisableEvent); } @HandleTag("UserDeletedEvent") public void handle(UserEvents.UserDeletedEvent userDeletedEvent){ List userEvents = this.events.computeIfAbsent(userDeletedEvent.getUserId(), userId -> new ArrayList<>()); userEvents.add(userDeletedEvent); }}2.3. 飞行测试

编著飞行测试用例如下:

@SpringBootTest(classes = DemoApplication.class)@Slf4jclass UserMessageConsumerTest { @Autowired private UserMessageConsumer userMessageConsumer; @Autowired private RocketMQTemplate rocketMQTemplate; private List userIds; @BeforeEach void setUp() throws InterruptedException { this.userMessageConsumer.clean(); this.userIds = new ArrayList<>(); for (int i = 0; i< 100; i++){ userIds.add(10000L + i); } this.userIds.forEach(userId -> sendMessage(userId)); TimeUnit.SECONDS.sleep(3); } private void sendMessage(Long userId) { String topic = "consumer-test-topic"; { String tag = "UserCreatedEvent"; UserEvents.UserCreatedEvent userCreatedEvent = new UserEvents.UserCreatedEvent(); userCreatedEvent.setUserId(userId); userCreatedEvent.setUserName("Name-" + userId); sendOrderlyMessage(topic, tag, userCreatedEvent); } { String tag = "UserEnableEvent"; UserEvents.UserEnableEvent userEnableEvent = new UserEvents.UserEnableEvent(); userEnableEvent.setUserId(userId); userEnableEvent.setUserName("Name-" + userId); sendOrderlyMessage(topic, tag, userEnableEvent); } { String tag = "UserDisableEvent"; UserEvents.UserDisableEvent userDisableEvent = new UserEvents.UserDisableEvent(); userDisableEvent.setUserId(userId); userDisableEvent.setUserName("Name-" + userId); sendOrderlyMessage(topic, tag, userDisableEvent); } { String tag = "UserDeletedEvent"; UserEvents.UserDeletedEvent userDeletedEvent = new UserEvents.UserDeletedEvent(); userDeletedEvent.setUserId(userId); userDeletedEvent.setUserName("Name-" + userId); sendOrderlyMessage(topic, tag, userDeletedEvent); } } private void sendOrderlyMessage(String topic, String tag, UserEvents.UserEvent event) { String shardingKey = String.valueOf(event.getUserId()); String json = JSON.toJSONString(event); Message msg = MessageBuilder .withPayload(json) .build(); String destination = createDestination(topic, tag); SendResult sendResult = this.rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000); log.info("Send result is {} for msg", sendResult, msg); } protected String createDestination(String topic, String tag) { if (org.apache.commons.lang3.StringUtils.isNotEmpty(tag)){ return topic + ":" + tag; }else { return topic; } } @AfterEach void tearDown() { } @Test void getUserEvents() { this.userIds.forEach(userId ->{ List userEvents = this.userMessageConsumer.getUserEvents(userId); Assertions.assertEquals(4, userEvents.size()); Assertions.assertTrue(userEvents.get(0) instanceof UserEvents.UserCreatedEvent); Assertions.assertTrue(userEvents.get(1) instanceof UserEvents.UserEnableEvent); Assertions.assertTrue(userEvents.get(2) instanceof UserEvents.UserDisableEvent); Assertions.assertTrue(userEvents.get(3) instanceof UserEvents.UserDeletedEvent); }); }}

启动时,可以看到如下笔记:

TagBasedDispatcherConsumerContainer : success to subscribe , topic consumer-test-topic, tag UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent, group user-message-consumer

从笔记上可以看出,软件系统以组 group user-message-consumer 创始 Consumer,并订阅 consumer-test-topic 的 UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent 等 Tag,绑定时序适用预期。

飞行测试演算比较简单,演算如下:

创始 100 个其他用户每个其他用户创始并南至北发布领域事件,UserCreatedEvent、UserEnableEvent、UserDisableEvent、UserDeletedEvent消费送达未完成后,中止 3 秒南至北样品每个其他用户收到的最新消息,并对顺序同步进行样品

观察笔记,可以看到送达和消费笔记交替显现:

UserMessageConsumerTest : Send result is SendResult [sendStatus=SEND_OK, msgId=2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4900FD, offsetMsgId=C0A8010A00002A9F00000000056077FB, messageQueue=MessageQueue [topic=consumer-test-topic, brokerName=bogon, queueId=2], queueOffset=1121] for msgTagBasedDispatcherConsumerContainer : consume 2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4700FC cost: 0 ms

用例通过,运营结果适用预期。

3. 设计Co扩展3.1. 绑定时序

image

软件系统绑定时序如下:

TagBasedDispatcherConsumerContainerRegistry 实现 Spring 的 BeanPostProcessor 接口,南至北对转交 bean 同步进行处理;如果 Bean 上存在 @TagBasedDispatcherMessageConsumer 节录,便才会提炼出内置资料,协作 TagBasedDispatcherConsumerContainer 比如说TagBasedDispatcherConsumerContainer 收集法则上的 @HandleTag 节录,结合 @TagBasedDispatcherMessageConsumer 上的 topic、consumer 等资料协作 DefaultMQPushConsumer 并未完成 topic 和 tag 的订阅TagBasedDispatcherConsumerContainer 内部才会协作 tag 与 method 的映射关系,以对而无须tag同步进行处理;3.2. 运营时序

image运营时序如下:

最新消息也就是说将最新消息送达至 MQ;MQ 将最新消息送达至 Consumer;Consumer 收到最新消息后,根据 tag 对最新消息同步进行分发;GPU对最新消息同步进行所谓可执行,获取调用数值,然后调用法则执行业务部门演算;4. 计划资料

计划仓库住址:

计划文档住址:

迈普新
喉咙痛用什么药好
艾得辛艾拉莫德片效果如何
关节僵硬怎么解决
哪个牌子的多维元素片好
顺丰同城(09699.HK)发布公告,于董事会第一次内阁会议上,董事会通过决议案,重选陈飞先生为董事会主席

商用车同城09699.HK发布公告,于校董会第一次内阁会议上,校董会通过决议案,重选陈飞恩师为校董会执行主席。截至2022年6年底21日收盘,商用车同城09699.HK报收...

友情链接