From 01c7fbe3076efc221aea0bab3842b72a4f710a2e Mon Sep 17 00:00:00 2001 From: liu_yx Date: Thu, 30 Jun 2022 14:19:44 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E2=9C=A8=E6=89=A9=E5=B1=95Rocketmq?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E5=8F=91=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ballcat-common-websocket/pom.xml | 7 +- .../RocketmqMessageDistributor.java | 77 +++++++++++++++++++ .../MessageDistributorTypeConstants.java | 5 ++ .../websocket/WebSocketAutoConfiguration.java | 3 +- .../RocketMqMessageDistributorConfig.java | 36 +++++++++ 5 files changed, 126 insertions(+), 2 deletions(-) create mode 100644 ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/distribute/RocketmqMessageDistributor.java create mode 100644 ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/config/RocketMqMessageDistributorConfig.java diff --git a/ballcat-common/ballcat-common-websocket/pom.xml b/ballcat-common/ballcat-common-websocket/pom.xml index 7a4ef783..20e22f70 100644 --- a/ballcat-common/ballcat-common-websocket/pom.xml +++ b/ballcat-common/ballcat-common-websocket/pom.xml @@ -35,6 +35,11 @@ jakarta.annotation jakarta.annotation-api + + org.apache.rocketmq + rocketmq-spring-boot-starter + 2.2.0 + - \ No newline at end of file + diff --git a/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/distribute/RocketmqMessageDistributor.java b/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/distribute/RocketmqMessageDistributor.java new file mode 100644 index 00000000..d065c45a --- /dev/null +++ b/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/distribute/RocketmqMessageDistributor.java @@ -0,0 +1,77 @@ +package com.hccake.ballcat.common.websocket.distribute; + +import com.alibaba.fastjson.JSON; +import com.hccake.ballcat.common.util.JsonUtils; +import com.hccake.ballcat.common.websocket.exception.ErrorJsonMessageException; +import com.hccake.ballcat.common.websocket.session.WebSocketSessionStore; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Value; + +import java.nio.charset.StandardCharsets; + +/** + * @ClassName RocketmqMessageDistributor.java + * @Author liu_yx + * @Version 1.0.0 + * @Description MQ发送消息,接收到消息时进行推送, 广播模式 + * @CreateTime 2022年06月30日 14:10:10 + */ +@Slf4j +@RocketMQMessageListener( + consumerGroup = "${spring.application.name:default-ballcat-application}-${spring.profiles.active:dev}", + topic = "${spring.application.name:default-ballcat-application}-${spring.profiles.active:dev}", + selectorExpression = "${ballcat.websocket.mq.tag}", messageModel = MessageModel.BROADCASTING) +public class RocketmqMessageDistributor extends AbstractMessageDistributor implements RocketMQListener { + + @Value("${spring.application.name}") + private String appName; + + @Value("${ballcat.websocket.mq.tag}") + private String tag; + + private final RocketMQTemplate template; + + public RocketmqMessageDistributor(WebSocketSessionStore webSocketSessionStore, RocketMQTemplate template) { + super(webSocketSessionStore); + this.template = template; + } + + /** + * 消息分发 + * @param messageDO 发送的消息 + */ + @Override + public void distribute(MessageDO messageDO) { + log.info("the send message body is [{}]", messageDO); + String destination = this.appName + ":" + this.tag; + SendResult sendResult = this.template.sendAndReceive(destination, JsonUtils.toJson(messageDO), + SendResult.class); + if (log.isDebugEnabled()) { + log.debug("send message to `{}` finished. result:{}", destination, sendResult); + } + } + + /** + * 消息消费 + * @param message 接收的消息 + */ + @Override + public void onMessage(MessageExt message) { + String body = new String(message.getBody(), StandardCharsets.UTF_8); + MessageDO event = JSON.parseObject(body, MessageDO.class); + log.info("the content is [{}]", event); + try { + this.doSend(event); + } catch (Exception e) { + log.error("MQ消费信息处理异常: {}", e.getMessage(), e); + throw new ErrorJsonMessageException("MQ消费信息处理异常, " + e.getMessage()); + } + } + +} diff --git a/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/MessageDistributorTypeConstants.java b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/MessageDistributorTypeConstants.java index 7236f80c..0614af3d 100644 --- a/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/MessageDistributorTypeConstants.java +++ b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/MessageDistributorTypeConstants.java @@ -20,6 +20,11 @@ public final class MessageDistributorTypeConstants { */ public static final String REDIS = "redis"; + /** + * 基于 rocketmq 广播 + */ + public static final String ROCKETMQ = "rocketmq"; + /** * 自定义 */ diff --git a/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/WebSocketAutoConfiguration.java b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/WebSocketAutoConfiguration.java index 8cfc33ee..a8fca9f2 100644 --- a/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/WebSocketAutoConfiguration.java +++ b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/WebSocketAutoConfiguration.java @@ -1,6 +1,7 @@ package com.hccake.ballcat.autoconfigure.websocket; import com.hccake.ballcat.autoconfigure.websocket.config.LocalMessageDistributorConfig; +import com.hccake.ballcat.autoconfigure.websocket.config.RocketMqMessageDistributorConfig; import com.hccake.ballcat.autoconfigure.websocket.config.RedisMessageDistributorConfig; import com.hccake.ballcat.autoconfigure.websocket.config.WebSocketHandlerConfig; import com.hccake.ballcat.common.websocket.handler.JsonMessageHandler; @@ -30,7 +31,7 @@ import java.util.List; * @author Yakir Hccake */ @AutoConfiguration -@Import({ WebSocketHandlerConfig.class, LocalMessageDistributorConfig.class, RedisMessageDistributorConfig.class }) +@Import({ WebSocketHandlerConfig.class, LocalMessageDistributorConfig.class, RedisMessageDistributorConfig.class, RocketMqMessageDistributorConfig.class}) @EnableWebSocket @RequiredArgsConstructor @EnableConfigurationProperties(WebSocketProperties.class) diff --git a/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/config/RocketMqMessageDistributorConfig.java b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/config/RocketMqMessageDistributorConfig.java new file mode 100644 index 00000000..5042d2a2 --- /dev/null +++ b/ballcat-starters/ballcat-spring-boot-starter-websocket/src/main/java/com/hccake/ballcat/autoconfigure/websocket/config/RocketMqMessageDistributorConfig.java @@ -0,0 +1,36 @@ +package com.hccake.ballcat.autoconfigure.websocket.config; + +import com.hccake.ballcat.autoconfigure.websocket.MessageDistributorTypeConstants; +import com.hccake.ballcat.autoconfigure.websocket.WebSocketProperties; +import com.hccake.ballcat.common.websocket.distribute.MessageDistributor; +import com.hccake.ballcat.common.websocket.distribute.RocketmqMessageDistributor; +import com.hccake.ballcat.common.websocket.session.WebSocketSessionStore; +import lombok.RequiredArgsConstructor; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @ClassName RocketMqMessageDistributorConfig.java + * @Author liu_yx + * @Version 1.0.0 + * @Description MQ的消息分发器配置 + * @CreateTime 2022年06月30日 14:11:34 + */ +@ConditionalOnProperty(prefix = WebSocketProperties.PREFIX, name = "message-distributor", + havingValue = MessageDistributorTypeConstants.ROCKETMQ, matchIfMissing = true) +@Configuration(proxyBeanMethods = false) +@RequiredArgsConstructor +public class RocketMqMessageDistributorConfig { + + private final WebSocketSessionStore webSocketSessionStore; + + @Bean + @ConditionalOnMissingBean(MessageDistributor.class) + public RocketmqMessageDistributor messageDistributor(RocketMQTemplate template) { + return new RocketmqMessageDistributor(webSocketSessionStore, template); + } + +} -- Gitee From 31c50df5ee093bc0b6800e827fe0d5b3ff230bdd Mon Sep 17 00:00:00 2001 From: liu_yx Date: Thu, 30 Jun 2022 14:23:50 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E2=99=BB=EF=B8=8F=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E8=8E=B7=E5=8F=96sessionKey=E6=8A=A5NPE=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../websocket/session/DefaultWebSocketSessionStore.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/session/DefaultWebSocketSessionStore.java b/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/session/DefaultWebSocketSessionStore.java index 5d2b20af..8f80a951 100644 --- a/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/session/DefaultWebSocketSessionStore.java +++ b/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/session/DefaultWebSocketSessionStore.java @@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.WebSocketSession; import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -81,7 +82,12 @@ public class DefaultWebSocketSessionStore implements WebSocketSessionStore { */ @Override public Collection getSessions(Object sessionKey) { - return sessionKeyToWsSessions.get(sessionKey).values(); + Map sessions = this.sessionKeyToWsSessions.get(sessionKey); + if (sessions != null) { + return this.sessionKeyToWsSessions.get(sessionKey).values(); + } + log.warn("根据指定的sessionKey: {} 获取对应的wsSessions为空!", sessionKey); + return Collections.emptyList(); } /** -- Gitee From 3ebc2200b38cdeb39dd7900a18c4f03e6e96e38d Mon Sep 17 00:00:00 2001 From: liu_yx Date: Wed, 24 Aug 2022 18:13:24 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E2=99=BB=EF=B8=8F=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E7=89=88=E6=9C=AC=E5=AE=9A=E4=B9=89=E5=92=8Cjson=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=E5=B7=A5=E5=85=B7=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ballcat-common/ballcat-common-websocket/pom.xml | 2 +- .../websocket/distribute/RocketmqMessageDistributor.java | 3 +-- ballcat-dependencies/pom.xml | 6 ++++++ 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/ballcat-common/ballcat-common-websocket/pom.xml b/ballcat-common/ballcat-common-websocket/pom.xml index 20e22f70..ae4b7613 100644 --- a/ballcat-common/ballcat-common-websocket/pom.xml +++ b/ballcat-common/ballcat-common-websocket/pom.xml @@ -38,7 +38,7 @@ org.apache.rocketmq rocketmq-spring-boot-starter - 2.2.0 + true diff --git a/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/distribute/RocketmqMessageDistributor.java b/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/distribute/RocketmqMessageDistributor.java index d065c45a..b30fa475 100644 --- a/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/distribute/RocketmqMessageDistributor.java +++ b/ballcat-common/ballcat-common-websocket/src/main/java/com/hccake/ballcat/common/websocket/distribute/RocketmqMessageDistributor.java @@ -1,6 +1,5 @@ package com.hccake.ballcat.common.websocket.distribute; -import com.alibaba.fastjson.JSON; import com.hccake.ballcat.common.util.JsonUtils; import com.hccake.ballcat.common.websocket.exception.ErrorJsonMessageException; import com.hccake.ballcat.common.websocket.session.WebSocketSessionStore; @@ -64,7 +63,7 @@ public class RocketmqMessageDistributor extends AbstractMessageDistributor imple @Override public void onMessage(MessageExt message) { String body = new String(message.getBody(), StandardCharsets.UTF_8); - MessageDO event = JSON.parseObject(body, MessageDO.class); + MessageDO event = JsonUtils.toObj(body, MessageDO.class); log.info("the content is [{}]", event); try { this.doSend(event); diff --git a/ballcat-dependencies/pom.xml b/ballcat-dependencies/pom.xml index 2c8f6268..611726e6 100644 --- a/ballcat-dependencies/pom.xml +++ b/ballcat-dependencies/pom.xml @@ -63,6 +63,7 @@ 0.4.2 4.11.28.ALL 2.17.154 + 2.2.0 @@ -588,6 +589,11 @@ nimbus-jose-jwt 9.15.2 + + org.apache.rocketmq + rocketmq-spring-boot-starter + ${rocketmq.version} + -- Gitee