仅此一招,再无死讯乱序的烦恼
2023-04-29 来源 : 环保家居
该法则有几个情况:
tag 保证成本较高,RocketMQMessageListener 设置 selectorExpression 为 *,将拉取全部资料,增大通讯成本;如果用于 tag1 || tag2 方式,每次更改都并不需要对文档和内置同步进行更新,特别容易都是;充斥大量模板文档,比如 case 分支,所谓可执行,调用业务部门法则等;API 具有医学影像,开发是并不需要珍惜 RocketMQ API,存在一定学习成本;1.2. 目标共享一种高端业务部门场面的,灵活同步进行业务部门扩展的模式,具有以下特征:
Tag 和文档值得注意,不并不需要多处内置,自订演算自动未完成 Tag 注册;消除模板法则,类里只保有核心业务部门法则,软件系统未完成 法则分发、最新消息所谓可执行等可用;文档零侵入,仅用于节录,需要了解 RocketMQ API;2. 快速入门软件系统相所谓 rocketmq-Spring-boot-starter 未完成最新消息送达和回收。
2.1. 环境准备2.1.1. 增大相所谓首先,增大 rocketmq 相关相所谓。
org.apache.rocketmq rocketmq-spring-boot-starter 2.2.1然后,增大 lego starter。
com.geekhalo.lego lego-starter 0.1.13-tag_based_dispatcher_message_consumer-SNAPSHOT2.1.2. 增大内置在 application.yml 文档里增大 rocketmq 内置。
rocketmq: name-server: producer: group: rocket-demo2.2. 表述客户表述客户,只需:
在 Bean 上增大 @TagBasedDispatcherMessageConsumer 节录,并而无须 topic 和 consumer在 Bean 的法则上添加 @HandleTag 节录,并而无须监听的 tag范例如下:
@TagBasedDispatcherMessageConsumer( topic = "consumer-test-topic", consumer = "user-message-consumer")public class UserMessageConsumer { private final Map> events = Maps.newHashMap(); public void clean(){ this.events.clear();; } public List getUserEvents(Long userId){ return this.events.get(userId); } @HandleTag("UserCreatedEvent") public void handle(UserEvents.UserCreatedEvent userCreatedEvent){ List userEvents = this.events.computeIfAbsent(userCreatedEvent.getUserId(), userId -> new ArrayList<>()); userEvents.add(userCreatedEvent); } @HandleTag("UserEnableEvent") public void handle(UserEvents.UserEnableEvent userEnableEvent){ List userEvents = this.events.computeIfAbsent(userEnableEvent.getUserId(), userId -> new ArrayList<>()); userEvents.add(userEnableEvent); } @HandleTag("UserDisableEvent") public void handle(UserEvents.UserDisableEvent userDisableEvent){ List userEvents = this.events.computeIfAbsent(userDisableEvent.getUserId(), userId -> new ArrayList<>()); userEvents.add(userDisableEvent); } @HandleTag("UserDeletedEvent") public void handle(UserEvents.UserDeletedEvent userDeletedEvent){ List userEvents = this.events.computeIfAbsent(userDeletedEvent.getUserId(), userId -> new ArrayList<>()); userEvents.add(userDeletedEvent); }}2.3. 飞行测试编著飞行测试用例如下:
@SpringBootTest(classes = DemoApplication.class)@Slf4jclass UserMessageConsumerTest { @Autowired private UserMessageConsumer userMessageConsumer; @Autowired private RocketMQTemplate rocketMQTemplate; private List userIds; @BeforeEach void setUp() throws InterruptedException { this.userMessageConsumer.clean(); this.userIds = new ArrayList<>(); for (int i = 0; i< 100; i++){ userIds.add(10000L + i); } this.userIds.forEach(userId -> sendMessage(userId)); TimeUnit.SECONDS.sleep(3); } private void sendMessage(Long userId) { String topic = "consumer-test-topic"; { String tag = "UserCreatedEvent"; UserEvents.UserCreatedEvent userCreatedEvent = new UserEvents.UserCreatedEvent(); userCreatedEvent.setUserId(userId); userCreatedEvent.setUserName("Name-" + userId); sendOrderlyMessage(topic, tag, userCreatedEvent); } { String tag = "UserEnableEvent"; UserEvents.UserEnableEvent userEnableEvent = new UserEvents.UserEnableEvent(); userEnableEvent.setUserId(userId); userEnableEvent.setUserName("Name-" + userId); sendOrderlyMessage(topic, tag, userEnableEvent); } { String tag = "UserDisableEvent"; UserEvents.UserDisableEvent userDisableEvent = new UserEvents.UserDisableEvent(); userDisableEvent.setUserId(userId); userDisableEvent.setUserName("Name-" + userId); sendOrderlyMessage(topic, tag, userDisableEvent); } { String tag = "UserDeletedEvent"; UserEvents.UserDeletedEvent userDeletedEvent = new UserEvents.UserDeletedEvent(); userDeletedEvent.setUserId(userId); userDeletedEvent.setUserName("Name-" + userId); sendOrderlyMessage(topic, tag, userDeletedEvent); } } private void sendOrderlyMessage(String topic, String tag, UserEvents.UserEvent event) { String shardingKey = String.valueOf(event.getUserId()); String json = JSON.toJSONString(event); Message msg = MessageBuilder .withPayload(json) .build(); String destination = createDestination(topic, tag); SendResult sendResult = this.rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000); log.info("Send result is {} for msg", sendResult, msg); } protected String createDestination(String topic, String tag) { if (org.apache.commons.lang3.StringUtils.isNotEmpty(tag)){ return topic + ":" + tag; }else { return topic; } } @AfterEach void tearDown() { } @Test void getUserEvents() { this.userIds.forEach(userId ->{ List userEvents = this.userMessageConsumer.getUserEvents(userId); Assertions.assertEquals(4, userEvents.size()); Assertions.assertTrue(userEvents.get(0) instanceof UserEvents.UserCreatedEvent); Assertions.assertTrue(userEvents.get(1) instanceof UserEvents.UserEnableEvent); Assertions.assertTrue(userEvents.get(2) instanceof UserEvents.UserDisableEvent); Assertions.assertTrue(userEvents.get(3) instanceof UserEvents.UserDeletedEvent); }); }}启动时,可以看到如下笔记:
TagBasedDispatcherConsumerContainer : success to subscribe , topic consumer-test-topic, tag UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent, group user-message-consumer从笔记上可以看出,软件系统以组 group user-message-consumer 创始 Consumer,并订阅 consumer-test-topic 的 UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent 等 Tag,绑定时序适用预期。
飞行测试演算比较简单,演算如下:
创始 100 个其他用户每个其他用户创始并南至北发布领域事件,UserCreatedEvent、UserEnableEvent、UserDisableEvent、UserDeletedEvent消费送达未完成后,中止 3 秒南至北样品每个其他用户收到的最新消息,并对顺序同步进行样品观察笔记,可以看到送达和消费笔记交替显现:
UserMessageConsumerTest : Send result is SendResult [sendStatus=SEND_OK, msgId=2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4900FD, offsetMsgId=C0A8010A00002A9F00000000056077FB, messageQueue=MessageQueue [topic=consumer-test-topic, brokerName=bogon, queueId=2], queueOffset=1121] for msgTagBasedDispatcherConsumerContainer : consume 2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4700FC cost: 0 ms用例通过,运营结果适用预期。
3. 设计Co扩展3.1. 绑定时序image
软件系统绑定时序如下:
TagBasedDispatcherConsumerContainerRegistry 实现 Spring 的 BeanPostProcessor 接口,南至北对转交 bean 同步进行处理;如果 Bean 上存在 @TagBasedDispatcherMessageConsumer 节录,便才会提炼出内置资料,协作 TagBasedDispatcherConsumerContainer 比如说TagBasedDispatcherConsumerContainer 收集法则上的 @HandleTag 节录,结合 @TagBasedDispatcherMessageConsumer 上的 topic、consumer 等资料协作 DefaultMQPushConsumer 并未完成 topic 和 tag 的订阅TagBasedDispatcherConsumerContainer 内部才会协作 tag 与 method 的映射关系,以对而无须tag同步进行处理;3.2. 运营时序image运营时序如下:
最新消息也就是说将最新消息送达至 MQ;MQ 将最新消息送达至 Consumer;Consumer 收到最新消息后,根据 tag 对最新消息同步进行分发;GPU对最新消息同步进行所谓可执行,获取调用数值,然后调用法则执行业务部门演算;4. 计划资料计划仓库住址:
计划文档住址:
。迈普新喉咙痛用什么药好
艾得辛艾拉莫德片效果如何
关节僵硬怎么解决
哪个牌子的多维元素片好
-
顺丰同城(09699.HK)发布公告,于董事会第一次内阁会议上,董事会通过决议案,重选陈飞先生为董事会主席
商用车同城09699.HK发布公告,于校董会第一次内阁会议上,校董会通过决议案,重选陈飞恩师为校董会执行主席。截至2022年6年底21日收盘,商用车同城09699.HK报收...
-
H.O.T、神话、安在旭……初代更是鼻祖回忆录
最初人朱婷,是日本无数记录下来的造物主和保持者:日本历史上第一个朱婷、第一个在日本一年内到手7个最低殊荣银奖的配对、第一个闯进美国产品的日本配对、第一个来我国开演唱亦会的日本配对、第一个歌迷应援造物主...[详细]
-
华润啤酒(00291.HK)公布,将于2022年8月12日印制2021年末期股息每股0.353港元
华润啤酒00291.HK公布,将于2022年8月12日宣传单2021年后期股息每股0.353港元。截至2022年6月21日收盘,华润啤酒00291.HK报收于51.8元,高...[详细]
-
青岛啤酒股份(00168.HK)发布公告,公司董事会会议审议通过关于委任姜宗祥先生为公司总裁兼供应链总裁的议案
青岛啤酒股份00168.HK发布公告,母公司董事会会议表决通过关于聘姜宗祥先生为母公司CEO产品设计董事的议案。表示同意聘姜宗祥先生为母公司CEO产品设计董事,离任自母公司董事会表决通过聘之...[详细]
-
他真的好会亲啊…在世界上巡演不考虑一下么!
有人两女争一男的闹连续剧,男二女二这对cp自觉不输男女配。 如果说是男女配的恋爱实在太像是纯情初中鸡,而副cp本站就是成年人之间的段式,性张力拉满! 未婚妻在便利店邂逅,推定了心意后,...[详细]
-
茂业国际(00848.HK):以5426万元转让1040.14万股银座股份
茂业国际00848.HK发布公告,新公司全资附属新公司中兆于2022年6月初15日至2022年6月初21日在外币上显露售1040.14万股银座股份600858,占有银座截至本公告日已发行...[详细]