diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..bba7b53950cf488f0b0cc507eeb4889dc4c04ac9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target/ +/.idea/ diff --git a/src/main/java/tech/smartboot/redisun/RedisMessageProcessor.java b/src/main/java/tech/smartboot/redisun/RedisMessageProcessor.java index 8ec373be1707c983388777348e07660b2ca0cea5..661f8c6015cc13c9dc6b34d0fc01e6b3ffb33c9d 100644 --- a/src/main/java/tech/smartboot/redisun/RedisMessageProcessor.java +++ b/src/main/java/tech/smartboot/redisun/RedisMessageProcessor.java @@ -85,6 +85,10 @@ class RedisMessageProcessor extends AbstractMessageProcessor implements Pr public void process0(AioSession session, RESP msg) { // 获取当前会话关联的Redis会话对象 RedisSession redisSession = session.getAttachment(); + if (redisSession.isPubSub()){ + redisSession.getPubSub().handleMessage(msg); + return; + } CompletableFuture future = redisSession.poll(); if (future == null) { // 如果没有等待的CompletableFuture,则将消息记录为错误并返回 @@ -128,6 +132,11 @@ class RedisMessageProcessor extends AbstractMessageProcessor implements Pr } case SESSION_CLOSED: { RedisSession redisSession = session.getAttachment(); + if (redisSession.isPubSub()){ + // 通知订阅会话 + redisSession.getPubSub().onSessionClosed(throwable); + break; + } CompletableFuture future; while ((future = redisSession.poll()) != null) { future.completeExceptionally(new RedisunException("session closed")); @@ -141,4 +150,4 @@ class RedisMessageProcessor extends AbstractMessageProcessor implements Pr // throwable.printStackTrace(); } } -} \ No newline at end of file +} diff --git a/src/main/java/tech/smartboot/redisun/RedisSession.java b/src/main/java/tech/smartboot/redisun/RedisSession.java index 5613ba76b4fbc407d7e80aabfb6882fe7a708cea..cdfa04891490c2b8f814f105065929a75102716d 100644 --- a/src/main/java/tech/smartboot/redisun/RedisSession.java +++ b/src/main/java/tech/smartboot/redisun/RedisSession.java @@ -37,6 +37,11 @@ final class RedisSession { private int offerCount = 0; private int pollCount = 0; + /** + * 订阅会话 + */ + private volatile RedisunPubSub pubSub; + public int incrOfferCount() { return ++offerCount; } @@ -82,4 +87,19 @@ final class RedisSession { return size >= 0 ? size : -size; } -} \ No newline at end of file + /** + * 设置订阅会话 + * @param pubSub 订阅会话 + */ + void setPubSub(RedisunPubSub pubSub) { + this.pubSub = pubSub; + } + + RedisunPubSub getPubSub() { + return pubSub; + } + + boolean isPubSub() { + return this.pubSub != null; + } +} diff --git a/src/main/java/tech/smartboot/redisun/Redisun.java b/src/main/java/tech/smartboot/redisun/Redisun.java index 2fa03011e6affcd1fc8ce4a9132a63222d216fb3..2c8e48b5d1f84660e6526cc34b83127d74d98b7b 100644 --- a/src/main/java/tech/smartboot/redisun/Redisun.java +++ b/src/main/java/tech/smartboot/redisun/Redisun.java @@ -4,37 +4,7 @@ import org.smartboot.socket.buffer.BufferPagePool; import org.smartboot.socket.extension.multiplex.MultiplexClient; import org.smartboot.socket.transport.AioQuickClient; import org.smartboot.socket.transport.AioSession; -import tech.smartboot.redisun.cmd.AppendCommand; -import tech.smartboot.redisun.cmd.DBSizeCommand; -import tech.smartboot.redisun.cmd.DecrByCommand; -import tech.smartboot.redisun.cmd.DecrCommand; -import tech.smartboot.redisun.cmd.DelCommand; -import tech.smartboot.redisun.cmd.ExistsCommand; -import tech.smartboot.redisun.cmd.ExpireCommand; -import tech.smartboot.redisun.cmd.FlushAllCommand; -import tech.smartboot.redisun.cmd.FlushDbCommand; -import tech.smartboot.redisun.cmd.GetCommand; -import tech.smartboot.redisun.cmd.HGetCommand; -import tech.smartboot.redisun.cmd.HSetCommand; -import tech.smartboot.redisun.cmd.HelloCommand; -import tech.smartboot.redisun.cmd.IncrByCommand; -import tech.smartboot.redisun.cmd.IncrCommand; -import tech.smartboot.redisun.cmd.LPopCommand; -import tech.smartboot.redisun.cmd.LPushCommand; -import tech.smartboot.redisun.cmd.MGetCommand; -import tech.smartboot.redisun.cmd.MSetCommand; -import tech.smartboot.redisun.cmd.RPopCommand; -import tech.smartboot.redisun.cmd.RPushCommand; -import tech.smartboot.redisun.cmd.SAddCommand; -import tech.smartboot.redisun.cmd.SelectCommand; -import tech.smartboot.redisun.cmd.SetCommand; -import tech.smartboot.redisun.cmd.StrlenCommand; -import tech.smartboot.redisun.cmd.TtlCommand; -import tech.smartboot.redisun.cmd.TypeCommand; -import tech.smartboot.redisun.cmd.ZAddCommand; -import tech.smartboot.redisun.cmd.ZRangeCommand; -import tech.smartboot.redisun.cmd.ZRemCommand; -import tech.smartboot.redisun.cmd.ZScoreCommand; +import tech.smartboot.redisun.cmd.*; import tech.smartboot.redisun.resp.Arrays; import tech.smartboot.redisun.resp.BulkStrings; import tech.smartboot.redisun.resp.Doubles; @@ -253,8 +223,8 @@ public final class Redisun { options.accept(cmd); } return execute(cmd).thenApply(resp -> { - if (resp instanceof tech.smartboot.redisun.resp.Arrays) { - List resps = ((tech.smartboot.redisun.resp.Arrays) resp).getValue(); + if (resp instanceof Arrays) { + List resps = ((Arrays) resp).getValue(); List result = new ArrayList<>(resps.size()); for (RESP r : resps) { ZRangeCommand.Tuple tuple = new ZRangeCommand.Tuple(); @@ -425,8 +395,8 @@ public final class Redisun { */ public CompletableFuture> asyncMget(List keys) { return execute(new MGetCommand(keys)).thenApply(resp -> { - if (resp instanceof tech.smartboot.redisun.resp.Arrays) { - List resps = ((tech.smartboot.redisun.resp.Arrays) resp).getValue(); + if (resp instanceof Arrays) { + List resps = ((Arrays) resp).getValue(); List result = new ArrayList<>(resps.size()); for (RESP r : resps) { if (r instanceof Nulls) { @@ -1173,4 +1143,108 @@ public final class Redisun { throw new RedisunException("invalid response:" + resp); }); } -} \ No newline at end of file + + /** + * 发布消息到指定频道 + * + * @param channel 频道名称 + * @param message 要发布的消息 + * @return 接收到此消息的客户端数量 + */ + public int publish(String channel, String message) { + try { + return asyncPublish(channel, message).get(); + } catch (Exception e) { + throw new RedisunException(e); + } + } + + /** + * 异步发布消息到指定频道 + * + * @param channel 频道名称 + * @param message 要发布的消息 + * @return 接收到此消息的客户端数量 + */ + public CompletableFuture asyncPublish(String channel, String message) { + return execute(new PublishCommand(channel, message)).thenApply(resp -> { + if (resp instanceof Integers) { + return ((Integers) resp).getValue(); + } + throw new RedisunException("invalid response:" + resp); + }); + } + + /** + * 订阅给定的一个或多个频道 + * 注意:一旦进入订阅状态,连接就不能用于执行其他命令,直到取消订阅 + * + * @param pubsub 消息回调处理类 + * @param channels 要订阅的频道列表 + * @return 订阅对象 + */ + public RedisunPubSub subscribe(RedisunPubSub pubsub, String... channels) { + // 获取零负载的连接,用于独占连接 + AioQuickClient client = findFirstZeroLoadClient(); + client.connectTimeout(0); + AioSession session = client.getSession(); + RedisSession redisSession = session.getAttachment(); + redisSession.setPubSub(pubsub); + // 设置取消订阅回调 + pubsub.setUnsubscribe(uChannels -> { + try { + new UnsubscribeCommand(uChannels).writeTo(session.writeBuffer()); + session.writeBuffer().flush(); + } catch (IOException e) { + throw new RedisunException(e); + } + }); + // 设置异常关闭时继续订阅 + pubsub.setSubscribe(this::subscribe); + // 设置释放连接回调 + pubsub.setReleaseClient(() -> { + multiplexClient.release(client); + redisSession.setPubSub(null); + }); + // 执行订阅命令 + try { + new SubscribeCommand(channels).writeTo(session.writeBuffer()); + session.writeBuffer().flush(); + } catch (IOException e) { + multiplexClient.release(client); + redisSession.setPubSub(null); + throw new RuntimeException(e); + } + return pubsub; + } + + /** + * 寻找一个零负载的连接 + * @return 零负载的连接 + */ + private AioQuickClient findFirstZeroLoadClient() { + List nonZeroClients = new ArrayList<>(); + try { + while (true) { + AioQuickClient client = multiplexClient.acquire(); + AioSession session = client.getSession(); + RedisSession redisSession = session.getAttachment(); + if (redisSession.load() == 0) { + // 找到零负载连接,先把其他连接归还 + for (int i = nonZeroClients.size() - 1; i >= 0; i--) { + multiplexClient.reuse(nonZeroClients.get(i)); + } + if (client != currentClient) { + return client; + } + } + nonZeroClients.add(client); + } + } catch (Throwable e) { + for (AioQuickClient c : nonZeroClients) { + multiplexClient.reuse(c); + } + throw new RedisunException(e); + } + } +} diff --git a/src/main/java/tech/smartboot/redisun/RedisunPubSub.java b/src/main/java/tech/smartboot/redisun/RedisunPubSub.java new file mode 100644 index 0000000000000000000000000000000000000000..3b4cf75b3a4565c85a8dd951431d3443649c1f1b --- /dev/null +++ b/src/main/java/tech/smartboot/redisun/RedisunPubSub.java @@ -0,0 +1,201 @@ +package tech.smartboot.redisun; + +import tech.smartboot.redisun.resp.Arrays; +import tech.smartboot.redisun.resp.BulkStrings; +import tech.smartboot.redisun.resp.RESP; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +/** + * Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。 + * Available since: Redis Open Source 2.0.0 + * + * @author dufuzhong + * @version v1.0 2025-12-07 + */ +@SuppressWarnings({"unused", "rawtypes"}) +public abstract class RedisunPubSub { + // 订阅频道 + private final Set channels = ConcurrentHashMap.newKeySet(); + // 取消订阅 + private Consumer unsubscribe; + // 关闭Redis连接 + private Runnable releaseClient; + // 网络异常重新订阅 + private BiConsumer subscribe; + + /** + * 接收到订阅消息的回调方法 + * @param channel 频道名称 + * @param message 消息内容 + */ + public abstract void onMessage(String channel, String message); + + /** + * 接收到订阅确认消息的回调方法 + * @param channel 频道名称 + */ + public void onSubscribe(String channel){} + + /** + * 接收到取消订阅确认消息的回调方法 + * @param channel 频道名称 + */ + public void onUnsubscribe(String channel){} + + /** + * 异常关闭处理方法 + * @param channels 订阅的频道列表 + * @param ex 异常信息 + * @return 是否需要重新订阅 (true:重新订阅) + */ + public boolean onSessionClosed(String[] channels, Throwable ex) { + return true; + } + + /** + * 设置取消订阅的回调方法 + * (Redisun 类设置的) + */ + void setUnsubscribe(Consumer unsubscribe) { + this.unsubscribe = unsubscribe; + } + + /** + * 取消订阅当前订阅的所有频道 + * 注意:此方法会关闭订阅连接 + */ + public void unsubscribeAll() { + unsubscribe.accept(new String[0]); + } + + /** + * 取消订阅指定的频道 + * @param channels 要取消订阅的频道列表 + */ + public void unsubscribe(String... channels) { + unsubscribe.accept(channels); + } + + void setReleaseClient(Runnable releaseClient) { + this.releaseClient = releaseClient; + } + + void setSubscribe(BiConsumer subscribe) { + this.subscribe = subscribe; + } + + /** + * 处理来自服务端的订阅/取消订阅消息 + */ + void handleMessage(RESP msg) { + if (!(msg instanceof Arrays)) { + System.err.println("Invalid message type"); + return; + } + Arrays arrays = (Arrays) msg; + if (arrays.getValue().size() < 3) { + System.err.println("Invalid message format"); + return; + } + RESP messageTypeResp = arrays.getValue().get(0); + if (!(messageTypeResp instanceof BulkStrings)) { + System.err.println("Invalid message type"); + return; + } + String messageType = ((BulkStrings) messageTypeResp).getValue(); + + // 接收来自服务器的发布消息 + if ("message".equals(messageType)) { + handleMessagePush(arrays); + return; + } + // 订阅: 订阅成功 或者 取消订阅成功 + if ("subscribe".equals(messageType) || "unsubscribe".equals(messageType)) { + handleSubscriptionConfirmation(messageType, arrays); + } + } + + void onSessionClosed(Throwable ex){ + String[] channelArray = getStrings(); + channels.clear(); + releaseClient.run(); + // 主动关闭连接的不需要重新订阅和通知 + if (ex == null || ex.getCause() == null || channelArray.length == 0) { + return; + } + try { + if (! this.onSessionClosed(channelArray, ex)){ + return; + } + }catch (Exception e) { + System.err.println("Error handling closed exception: " + e.getMessage()); + } + try { + subscribe.accept(this, channelArray); + }catch (Exception e) { + System.err.println("Error handling subscription confirmation: " + e.getMessage()); + } + } + + public String[] getStrings() { + return channels.toArray(new String[0]); + } + + /** + * 处理消息推送 + */ + private void handleMessagePush(Arrays arrays) { + RESP channelResp = arrays.getValue().get(1); + RESP messageResp = arrays.getValue().get(2); + if (!(channelResp instanceof BulkStrings) || !(messageResp instanceof BulkStrings)) { + return; + } + String channel = ((BulkStrings) channelResp).getValue(); + String message = ((BulkStrings) messageResp).getValue(); + // 调用订阅回调 + try { + this.onMessage(channel, message); + } catch (Exception e) { + System.err.println("Error handling message push: " + e.getMessage()); + } + } + + /** + * 处理订阅/取消订阅确认消息 + */ + private void handleSubscriptionConfirmation(String messageType, Arrays arrays) { + // 需要从订阅列表中移除相应的频道 + RESP channelResp = arrays.getValue().get(1); + if (!(channelResp instanceof BulkStrings)) { + return; + } + String channel = ((BulkStrings) channelResp).getValue(); + if ("subscribe".equals(messageType)){ + channels.add(channel); + try{ + this.onSubscribe(channel); + } catch (Exception e) { + System.err.println("Error handling subscription confirmation: " + e.getMessage()); + } + return; + } + if ("unsubscribe".equals(messageType)) { + channels.remove(channel); + // 如果所有订阅都已取消,释放订阅连接 + if (channels.isEmpty()) { + releaseClient.run(); + } + // 调用取消订阅回调 + try { + this.onUnsubscribe(channel); + }catch (Exception e) { + System.err.println("Error handling subscription confirmation: " + e.getMessage()); + } + } + } + +} diff --git a/src/main/java/tech/smartboot/redisun/cmd/PublishCommand.java b/src/main/java/tech/smartboot/redisun/cmd/PublishCommand.java new file mode 100644 index 0000000000000000000000000000000000000000..66b978071e0d933299e502ce02f2519fbf703d08 --- /dev/null +++ b/src/main/java/tech/smartboot/redisun/cmd/PublishCommand.java @@ -0,0 +1,34 @@ +package tech.smartboot.redisun.cmd; + +import tech.smartboot.redisun.Command; +import tech.smartboot.redisun.resp.BulkStrings; +import tech.smartboot.redisun.resp.RESP; + +import java.util.ArrayList; +import java.util.List; + +/** + * 将信息发送到指定的频道。 Available since: Redis Open Source 2.0.0 + * @author dufuzhong + * @version v1.0 2025-12-07 + * @see Redis PUBLISH Command + */ +public class PublishCommand extends Command { + private static final BulkStrings CMD_PUBLISH = BulkStrings.of("PUBLISH"); + private final String channel; + private final String message; + + public PublishCommand(String channel, String message) { + this.channel = channel; + this.message = message; + } + + @Override + protected List buildParams() { + List param = new ArrayList<>(3); + param.add(CMD_PUBLISH); + param.add(RESP.ofString(channel)); + param.add(RESP.ofString(message)); + return param; + } +} diff --git a/src/main/java/tech/smartboot/redisun/cmd/SubscribeCommand.java b/src/main/java/tech/smartboot/redisun/cmd/SubscribeCommand.java new file mode 100644 index 0000000000000000000000000000000000000000..f543d17c241599e97cdb964227afc0e93a42de89 --- /dev/null +++ b/src/main/java/tech/smartboot/redisun/cmd/SubscribeCommand.java @@ -0,0 +1,34 @@ +package tech.smartboot.redisun.cmd; + +import tech.smartboot.redisun.Command; +import tech.smartboot.redisun.resp.BulkStrings; +import tech.smartboot.redisun.resp.RESP; + +import java.util.ArrayList; +import java.util.List; + +/** + * 订阅给定的一个或多个频道的信息。 Available since: Redis Open Source 2.0.0 + * @author dufuzhong + * @version v1.0 2025-12-07 + * @see Redis SUBSCRIBE Command + */ +public class SubscribeCommand extends Command { + private static final BulkStrings CMD_SUBSCRIBE = BulkStrings.of("SUBSCRIBE"); + private final String[] channel; + + public SubscribeCommand(String... channel) { + this.channel = channel; + } + + @Override + protected List buildParams() { + List param = new ArrayList<>(channel.length + 1); + param.add(CMD_SUBSCRIBE); + for (String s : channel) { + param.add(RESP.ofString(s)); + } + return param; + } + +} diff --git a/src/main/java/tech/smartboot/redisun/cmd/UnsubscribeCommand.java b/src/main/java/tech/smartboot/redisun/cmd/UnsubscribeCommand.java new file mode 100644 index 0000000000000000000000000000000000000000..bbca099576df17f420766ab6b0a4b6bb2c9a3470 --- /dev/null +++ b/src/main/java/tech/smartboot/redisun/cmd/UnsubscribeCommand.java @@ -0,0 +1,41 @@ +package tech.smartboot.redisun.cmd; + +import tech.smartboot.redisun.Command; +import tech.smartboot.redisun.resp.BulkStrings; +import tech.smartboot.redisun.resp.RESP; + +import java.util.ArrayList; +import java.util.List; + +/** + * 取消订阅给定的频道 + * Available since: Redis Open Source 2.0.0 + * + * @author dufuzhong + * @version v1.0 2025-12-07 + * @see Redis UNSUBSCRIBE Command + */ +public class UnsubscribeCommand extends Command { + private static final BulkStrings CMD_UNSUBSCRIBE = BulkStrings.of("UNSUBSCRIBE"); + private final String[] channels; + + /** + * 创建 UNSUBSCRIBE 命令 + * 如果不指定频道,则取消订阅所有频道 + * + * @param channels 要取消订阅的频道列表,如果为空则取消所有订阅 + */ + public UnsubscribeCommand(String... channels) { + this.channels = channels != null ? channels : new String[0]; + } + + @Override + protected List buildParams() { + List param = new ArrayList<>(channels.length + 1); + param.add(CMD_UNSUBSCRIBE); + for (String channel : channels) { + param.add(RESP.ofString(channel)); + } + return param; + } +} diff --git a/src/main/java/tech/smartboot/redisun/resp/RESP.java b/src/main/java/tech/smartboot/redisun/resp/RESP.java index 6f87fcbd68e9b3b86b2b1072a2c6fab3bdfaecf8..9a464c6cb912dabd394bb654e7bfd1cb4fa430f6 100644 --- a/src/main/java/tech/smartboot/redisun/resp/RESP.java +++ b/src/main/java/tech/smartboot/redisun/resp/RESP.java @@ -171,6 +171,7 @@ public abstract class RESP implements Serialization { case RESP_DATA_TYPE_STRING: return SimpleStrings.of(buffer); case RESP_DATA_TYPE_ARRAY: + case RESP_DATA_TYPE_PUSH: return new Arrays(); case RESP_DATA_TYPE_MAP: return new Maps(); @@ -230,4 +231,4 @@ public abstract class RESP implements Serialization { arrays.setValue(list); return arrays; } -} \ No newline at end of file +} diff --git a/src/test/java/tech/smartboot/redisun/test/RedisunTest.java b/src/test/java/tech/smartboot/redisun/test/RedisunTest.java index e23a775f543b8cb7022a2af9c3e392c598490e84..afe15f24efa5263f82abae51ffc9866b9c8da3c1 100644 --- a/src/test/java/tech/smartboot/redisun/test/RedisunTest.java +++ b/src/test/java/tech/smartboot/redisun/test/RedisunTest.java @@ -6,6 +6,7 @@ import org.junit.Before; import org.junit.Test; import tech.smartboot.redisun.Redisun; import tech.smartboot.redisun.RedisunException; +import tech.smartboot.redisun.RedisunPubSub; import tech.smartboot.redisun.cmd.ZRangeCommand; import java.util.Arrays; @@ -934,4 +935,413 @@ public class RedisunTest { type = redisun.type(key); Assert.assertEquals("none", type); } -} \ No newline at end of file + + @Test + public void testPubSubCommands() throws InterruptedException { + String channel = topic + ":pubsub"; + String message = "Hello, Redisun!"; + + // 用于接收消息的变量 + final StringBuilder receivedMessage = new StringBuilder(); + final StringBuilder receivedChannel = new StringBuilder(); + + // 创建订阅者 + RedisunPubSub pubsub = new RedisunPubSub() { + @Override + public void onMessage(String ch, String msg) { + receivedChannel.append(ch); + receivedMessage.append(msg); + } + }; + + // 启动一个线程进行订阅 + Thread subscribeThread = new Thread(() -> { + // 创建新的Redisun实例用于订阅(因为订阅会独占连接) + Redisun subscriber = Redisun.create(opt -> opt.debug(true).setAddress("127.0.0.1:6379")); + try { + subscriber.subscribe(pubsub, channel); + } catch (Exception e) { + e.printStackTrace(); + } finally { + // 注意:实际应用中不要立即关闭,这里为了测试方便 + // subscriber.close(); + } + }); + + // 启动订阅线程 + subscribeThread.start(); + + // 等待订阅建立 + Thread.sleep(1000); + + // 发布消息 + int receivers = redisun.publish(channel, message); + + // 等待消息接收 + Thread.sleep(1000); + + // 验证结果 + Assert.assertEquals(1, receivers); // 应该有一个接收者 + Assert.assertEquals(channel, receivedChannel.toString()); + Assert.assertEquals(message, receivedMessage.toString()); + + // 清理资源 + subscribeThread.interrupt(); + } + + @Test + public void testUnsubscribe() throws InterruptedException { + String channel = topic + ":unsubscribe-test"; + String message1 = "Message 1"; + String message2 = "Message 2"; + + // 用于记录接收到的消息数量 + final java.util.concurrent.atomic.AtomicInteger messageCount = new java.util.concurrent.atomic.AtomicInteger(0); + final StringBuilder receivedMessages = new StringBuilder(); + + // 创建订阅者 + RedisunPubSub pubsub = new RedisunPubSub() { + @Override + public void onMessage(String ch, String msg) { + messageCount.incrementAndGet(); + if (receivedMessages.length() > 0) { + receivedMessages.append(","); + } + receivedMessages.append(msg); + } + }; + + // 启动一个线程进行订阅 + Thread subscribeThread = new Thread(() -> { + Redisun subscriber = Redisun.create(opt -> opt.debug(true).setAddress("127.0.0.1:6379")); + try { + subscriber.subscribe(pubsub, channel); + // 保持订阅状态,等待中断 + Thread.sleep(10000); + } catch (InterruptedException e) { + // 线程被中断,正常退出 + } catch (Exception e) { + e.printStackTrace(); + } + }); + + subscribeThread.start(); + + // 等待订阅建立 + Thread.sleep(1000); + + // 发布第一条消息 + int receivers1 = redisun.publish(channel, message1); + Thread.sleep(500); + Assert.assertEquals(1, receivers1); + Assert.assertEquals(1, messageCount.get()); + + // 取消订阅 + pubsub.unsubscribe(channel); + Thread.sleep(1000); + + // 发布第二条消息,应该没有接收者 + int receivers2 = redisun.publish(channel, message2); + Thread.sleep(500); + Assert.assertEquals(0, receivers2); // 已经取消订阅,没有接收者 + Assert.assertEquals(1, messageCount.get()); // 仍然只有一条消息 + + // 验证只接收到了第一条消息 + Assert.assertEquals(message1, receivedMessages.toString()); + + // 清理资源 + subscribeThread.interrupt(); + } + + @Test + public void testMultipleSubscriptions() throws InterruptedException { + String channel1 = topic + ":multi-sub-1"; + String channel2 = topic + ":multi-sub-2"; + String channel3 = topic + ":multi-sub-3"; + + // 用于记录每个频道接收到的消息 + final java.util.Map> receivedMessages = new java.util.concurrent.ConcurrentHashMap<>(); + receivedMessages.put(channel1, new java.util.concurrent.CopyOnWriteArrayList<>()); + receivedMessages.put(channel2, new java.util.concurrent.CopyOnWriteArrayList<>()); + receivedMessages.put(channel3, new java.util.concurrent.CopyOnWriteArrayList<>()); + + // 创建第一个订阅者 - 订阅 channel1 和 channel2 + RedisunPubSub pubsub1 = new RedisunPubSub() { + @Override + public void onMessage(String ch, String msg) { + System.out.println("[PubSub1] Received: " + msg + " on " + ch); + receivedMessages.get(ch).add(msg); + } + }; + + // 创建第二个订阅者 - 订阅 channel2 和 channel3 + RedisunPubSub pubsub2 = new RedisunPubSub() { + @Override + public void onMessage(String ch, String msg) { + System.out.println("[PubSub2] Received: " + msg + " on " + ch); + receivedMessages.get(ch).add(msg); + } + }; + + // 创建第三个订阅者 - 只订阅 channel1 + RedisunPubSub pubsub3 = new RedisunPubSub() { + @Override + public void onMessage(String ch, String msg) { + System.out.println("[PubSub3] Received: " + msg + " on " + ch); + receivedMessages.get(ch).add(msg); + } + }; + + // 启动订阅线程1 - 使用同一个Redisun实例 + Thread thread1 = new Thread(() -> { + Redisun subscriber = Redisun.create(opt -> opt.debug(false).setAddress("127.0.0.1:6379")); + try { + subscriber.subscribe(pubsub1, channel1, channel2); + Thread.sleep(10000); + } catch (InterruptedException e) { + // 正常退出 + } catch (Exception e) { + e.printStackTrace(); + } + }); + + // 启动订阅线程2 - 使用同一个Redisun实例 + Thread thread2 = new Thread(() -> { + Redisun subscriber = Redisun.create(opt -> opt.debug(false).setAddress("127.0.0.1:6379")); + try { + subscriber.subscribe(pubsub2, channel2, channel3); + Thread.sleep(10000); + } catch (InterruptedException e) { + // 正常退出 + } catch (Exception e) { + e.printStackTrace(); + } + }); + + // 启动订阅线程3 + Thread thread3 = new Thread(() -> { + Redisun subscriber = Redisun.create(opt -> opt.debug(false).setAddress("127.0.0.1:6379")); + try { + subscriber.subscribe(pubsub3, channel1); + Thread.sleep(10000); + } catch (InterruptedException e) { + // 正常退出 + } catch (Exception e) { + e.printStackTrace(); + } + }); + + thread1.start(); + thread2.start(); + thread3.start(); + + // 等待订阅建立 + Thread.sleep(1500); + + // 发布消息到 channel1 - 应该被 pubsub1 和 pubsub3 接收 + String msg1 = "Message to channel1"; + int receivers1 = redisun.publish(channel1, msg1); + Thread.sleep(500); + System.out.println("Channel1 receivers: " + receivers1); + Assert.assertEquals(2, receivers1); // pubsub1 和 pubsub3 + Assert.assertEquals(2, receivedMessages.get(channel1).size()); + Assert.assertTrue(receivedMessages.get(channel1).contains(msg1)); + + // 发布消息到 channel2 - 应该被 pubsub1 和 pubsub2 接收 + String msg2 = "Message to channel2"; + int receivers2 = redisun.publish(channel2, msg2); + Thread.sleep(500); + System.out.println("Channel2 receivers: " + receivers2); + Assert.assertEquals(2, receivers2); // pubsub1 和 pubsub2 + Assert.assertEquals(2, receivedMessages.get(channel2).size()); + Assert.assertTrue(receivedMessages.get(channel2).contains(msg2)); + + // 发布消息到 channel3 - 应该只被 pubsub2 接收 + String msg3 = "Message to channel3"; + int receivers3 = redisun.publish(channel3, msg3); + Thread.sleep(500); + System.out.println("Channel3 receivers: " + receivers3); + Assert.assertEquals(1, receivers3); // 只有 pubsub2 + Assert.assertEquals(1, receivedMessages.get(channel3).size()); + Assert.assertTrue(receivedMessages.get(channel3).contains(msg3)); + + // 测试取消订阅后的行为 + pubsub1.unsubscribe(channel1); // pubsub1 取消订阅 channel1,但保留 channel2 + Thread.sleep(1000); + + // 再次发布到 channel1 - 现在应该只有 pubsub3 接收 + String msg4 = "Another message to channel1"; + int receivers4 = redisun.publish(channel1, msg4); + Thread.sleep(500); + System.out.println("Channel1 receivers after unsubscribe: " + receivers4); + Assert.assertEquals(1, receivers4); // 只有 pubsub3 + Assert.assertEquals(3, receivedMessages.get(channel1).size()); // 之前2条 + 现在1条 + + // 发布到 channel2 - pubsub1 应该还能接收(只取消了 channel1) + String msg5 = "Another message to channel2"; + int receivers5 = redisun.publish(channel2, msg5); + Thread.sleep(500); + System.out.println("Channel2 receivers: " + receivers5); + Assert.assertEquals(2, receivers5); // pubsub1 和 pubsub2 都还在 + Assert.assertEquals(4, receivedMessages.get(channel2).size()); // 之前2条 + 现在2条 + + // 清理资源 + thread1.interrupt(); + thread2.interrupt(); + thread3.interrupt(); + + System.out.println("\n=== 测试总结 ==="); + System.out.println("Channel1 收到消息数: " + receivedMessages.get(channel1).size()); + System.out.println("Channel2 收到消息数: " + receivedMessages.get(channel2).size()); + System.out.println("Channel3 收到消息数: " + receivedMessages.get(channel3).size()); + } + + @Test + public void testSingleConnectionMultipleChannels() throws InterruptedException { + String channel1 = topic + ":single-conn-1"; + String channel2 = topic + ":single-conn-2"; + String channel3 = topic + ":single-conn-3"; + + // 用于记录每个频道接收到的消息 + final java.util.Map> receivedMessages = new java.util.concurrent.ConcurrentHashMap<>(); + receivedMessages.put(channel1, new java.util.concurrent.CopyOnWriteArrayList<>()); + receivedMessages.put(channel2, new java.util.concurrent.CopyOnWriteArrayList<>()); + receivedMessages.put(channel3, new java.util.concurrent.CopyOnWriteArrayList<>()); + + // 创建单个订阅者 - 同时订阅三个频道 + RedisunPubSub pubsub = new RedisunPubSub() { + @Override + public void onMessage(String ch, String msg) { + System.out.println("[SinglePubSub] Received: " + msg + " on " + ch); + receivedMessages.get(ch).add(msg); + } + }; + + // 启动订阅线程 - 使用单个Redisun实例同时订阅三个频道 + Thread subscribeThread = new Thread(() -> { + Redisun subscriber = Redisun.create(opt -> opt.debug(true).setAddress("127.0.0.1:6379")); + try { + // 一次性订阅三个频道,使用同一个连接 + subscriber.subscribe(pubsub, channel1, channel2, channel3); + Thread.sleep(15000); + } catch (InterruptedException e) { + // 正常退出 + } catch (Exception e) { + e.printStackTrace(); + } + }); + + subscribeThread.start(); + + // 等待订阅建立 + Thread.sleep(1500); + + System.out.println("\n=== 开始发布消息到三个频道 ==="); + + // 发布消息到 channel1 + String msg1 = "Message 1 to channel1"; + int receivers1 = redisun.publish(channel1, msg1); + Thread.sleep(500); + System.out.println("Channel1 receivers: " + receivers1); + Assert.assertEquals(1, receivers1); + Assert.assertEquals(1, receivedMessages.get(channel1).size()); + Assert.assertTrue(receivedMessages.get(channel1).contains(msg1)); + + // 发布消息到 channel2 + String msg2 = "Message 1 to channel2"; + int receivers2 = redisun.publish(channel2, msg2); + Thread.sleep(500); + System.out.println("Channel2 receivers: " + receivers2); + Assert.assertEquals(1, receivers2); + Assert.assertEquals(1, receivedMessages.get(channel2).size()); + Assert.assertTrue(receivedMessages.get(channel2).contains(msg2)); + + // 发布消息到 channel3 + String msg3 = "Message 1 to channel3"; + int receivers3 = redisun.publish(channel3, msg3); + Thread.sleep(500); + System.out.println("Channel3 receivers: " + receivers3); + Assert.assertEquals(1, receivers3); + Assert.assertEquals(1, receivedMessages.get(channel3).size()); + Assert.assertTrue(receivedMessages.get(channel3).contains(msg3)); + + // 快速连续发布多条消息到不同频道,测试消息处理的正确性 + System.out.println("\n=== 快速连续发布消息 ==="); + String msg4 = "Message 2 to channel1"; + String msg5 = "Message 2 to channel2"; + String msg6 = "Message 2 to channel3"; + + redisun.publish(channel1, msg4); + redisun.publish(channel2, msg5); + redisun.publish(channel3, msg6); + Thread.sleep(1000); + + // 验证所有消息都正确接收 + Assert.assertEquals(2, receivedMessages.get(channel1).size()); + Assert.assertEquals(2, receivedMessages.get(channel2).size()); + Assert.assertEquals(2, receivedMessages.get(channel3).size()); + Assert.assertTrue(receivedMessages.get(channel1).contains(msg4)); + Assert.assertTrue(receivedMessages.get(channel2).contains(msg5)); + Assert.assertTrue(receivedMessages.get(channel3).contains(msg6)); + + // 测试部分取消订阅 + System.out.println("\n=== 测试部分取消订阅 ==="); + pubsub.unsubscribe(channel2); // 只取消 channel2 + Thread.sleep(1000); + + // 再次发布消息 + String msg7 = "Message 3 to channel1"; + String msg8 = "Message 3 to channel2"; + String msg9 = "Message 3 to channel3"; + + int receivers7 = redisun.publish(channel1, msg7); + int receivers8 = redisun.publish(channel2, msg8); + int receivers9 = redisun.publish(channel3, msg9); + Thread.sleep(1000); + + // 验证:channel1 和 channel3 仍然接收,channel2 不再接收 + System.out.println("After unsubscribe channel2:"); + System.out.println(" Channel1 receivers: " + receivers7); + System.out.println(" Channel2 receivers: " + receivers8); + System.out.println(" Channel3 receivers: " + receivers9); + + Assert.assertEquals(1, receivers7); // channel1 还在 + Assert.assertEquals(0, receivers8); // channel2 已取消 + Assert.assertEquals(1, receivers9); // channel3 还在 + + Assert.assertEquals(3, receivedMessages.get(channel1).size()); // 3条消息 + Assert.assertEquals(2, receivedMessages.get(channel2).size()); // 还是2条,没有新消息 + Assert.assertEquals(3, receivedMessages.get(channel3).size()); // 3条消息 + + // 测试取消所有订阅 + System.out.println("\n=== 测试取消所有剩余订阅 ==="); + pubsub.unsubscribeAll(); // 取消所有订阅(channel1 和 channel3) + Thread.sleep(1000); + + // 再次发布消息,应该没有接收者 + int receivers10 = redisun.publish(channel1, "No one receives this"); + int receivers11 = redisun.publish(channel3, "No one receives this"); + Thread.sleep(500); + + System.out.println("After unsubscribe all:"); + System.out.println(" Channel1 receivers: " + receivers10); + System.out.println(" Channel3 receivers: " + receivers11); + + Assert.assertEquals(0, receivers10); + Assert.assertEquals(0, receivers11); + + // 消息数量不变 + Assert.assertEquals(3, receivedMessages.get(channel1).size()); + Assert.assertEquals(3, receivedMessages.get(channel3).size()); + + // 清理资源 + subscribeThread.interrupt(); + + System.out.println("\n=== 测试总结 ==="); + System.out.println("单连接多频道订阅测试通过!"); + System.out.println("Channel1 总共收到消息数: " + receivedMessages.get(channel1).size()); + System.out.println("Channel2 总共收到消息数: " + receivedMessages.get(channel2).size()); + System.out.println("Channel3 总共收到消息数: " + receivedMessages.get(channel3).size()); + } + +}