From 8e9d11a23a5fc9ca65cde22b745da9a351673ad9 Mon Sep 17 00:00:00 2001 From: zhengjw22 Date: Wed, 5 Nov 2025 10:08:17 +0800 Subject: [PATCH 1/2] =?UTF-8?q?chore(version):=20=E5=8D=87=E7=BA=A7?= =?UTF-8?q?=E9=A1=B9=E7=9B=AE=E7=89=88=E6=9C=AC=E8=87=B3=20v1.7.5-SNAPSHOT?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 更新 IoServerConfig 中的 VERSION 常量值为 v1.7.5-SNAPSHOT - 修改 aio-core、aio-pro、benchmark、example 和 smart-socket-parent 模块的 POM 版本号 - 更新根目录 pom.xml 中 parent 标签的版本引用- 修改 Makefile 中的 version 变量值并同步更新 sed 命令中的替换目标版本 --- Makefile | 2 +- aio-core/pom.xml | 2 +- .../EnhanceAsynchronousChannelProvider.java | 30 +++- ...nhanceAsynchronousServerSocketChannel.java | 61 +++----- .../EnhanceAsynchronousSocketChannel.java | 61 ++++---- .../enhance/FutureCompletionHandler.java | 141 ------------------ .../socket/transport/IoServerConfig.java | 2 +- aio-pro/pom.xml | 2 +- .../ssl/SslAsynchronousSocketChannel.java | 6 +- benchmark/pom.xml | 2 +- example/pom.xml | 2 +- pom.xml | 2 +- smart-socket-parent/pom.xml | 2 +- 13 files changed, 88 insertions(+), 227 deletions(-) delete mode 100644 aio-core/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java diff --git a/Makefile b/Makefile index b52a6e8c..90d5724a 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ # 当需要升级版本时,执行该命令 -version=1.7.4 +version=1.7.5-SNAPSHOT update_version: sed -i '' 's/public static final String VERSION = ".*";/public static final String VERSION = "v${version}";/' aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java mvn -f smart-socket-parent/pom.xml versions:set -DnewVersion=${version} versions:commit diff --git a/aio-core/pom.xml b/aio-core/pom.xml index 10db4746..d6ce53ea 100644 --- a/aio-core/pom.xml +++ b/aio-core/pom.xml @@ -19,7 +19,7 @@ io.github.smartboot.socket smart-socket-parent - 1.7.4 + 1.7.5-SNAPSHOT ../smart-socket-parent diff --git a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java index 83709d68..74e80249 100644 --- a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java +++ b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java @@ -13,9 +13,11 @@ import java.io.IOException; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; import java.nio.channels.SocketChannel; import java.nio.channels.spi.AsynchronousChannelProvider; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -28,14 +30,26 @@ import java.util.concurrent.TimeUnit; * 1. 创建和管理异步通道组,支持多线程处理 * 2. 提供服务器端和客户端Socket通道的创建 * 3. 支持低内存模式运行,优化资源使用 - * + *

* 该类是smart-socket框架中异步IO实现的核心组件之一,通过NIO实现了类似JDK7 AIO的编程模型, * 但性能更优,资源占用更少。在低内存模式下,会采用特殊的内存管理策略以减少内存占用。 - * + * * @author 三刀 * @version V1.0 , 2020/5/25 */ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChannelProvider { + + public static final CompletionHandler> SYNC_READ_HANDLER = new CompletionHandler>() { + @Override + public void completed(Integer result, CompletableFuture attachment) { + attachment.complete(result); + } + + @Override + public void failed(Throwable exc, CompletableFuture attachment) { + attachment.completeExceptionally(exc); + } + }; /** * 读监听信号 * 用于标识通道处于读监听状态,值为-2 @@ -61,7 +75,7 @@ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChanne * 创建一个新的异步通道组 * 使用指定的线程数和线程工厂创建一个新的异步通道组,用于管理异步通道 * - * @param nThreads 线程池中的线程数量 + * @param nThreads 线程池中的线程数量 * @param threadFactory 创建线程的工厂类 * @return 返回新创建的异步通道组实例 * @throws IOException 如果创建过程中发生IO错误 @@ -73,8 +87,8 @@ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChanne /** * 使用现有的线程池创建异步通道组 - * - * @param executor 用于执行异步IO操作的线程池 + * + * @param executor 用于执行异步IO操作的线程池 * @param initialSize 初始大小,用于确定内部数据结构的初始容量 * @return 返回新创建的异步通道组实例 * @throws IOException 如果创建过程中发生IO错误 @@ -87,7 +101,7 @@ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChanne /** * 创建一个新的异步服务器Socket通道 * 用于服务器端接受客户端连接请求 - * + * * @param group 关联的异步通道组,用于管理该通道的IO操作 * @return 返回新创建的服务器Socket通道 * @throws IOException 如果创建过程中发生IO错误 @@ -100,7 +114,7 @@ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChanne /** * 创建一个新的异步客户端Socket通道 * 用于客户端发起连接请求和数据传输 - * + * * @param group 关联的异步通道组,用于管理该通道的IO操作 * @return 返回新创建的客户端Socket通道 * @throws IOException 如果创建过程中发生IO错误 @@ -113,7 +127,7 @@ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChanne /** * 检查并获取增强型异步通道组实例 * 验证传入的通道组是否为EnhanceAsynchronousChannelGroup类型 - * + * * @param group 待检查的异步通道组 * @return 返回转换后的增强型异步通道组 * @throws RuntimeException 如果传入的通道组类型不正确 diff --git a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java index 20b4ac54..e2ecd23a 100644 --- a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java +++ b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java @@ -30,13 +30,13 @@ import java.util.concurrent.Future; * 2. 支持异步接受连接操作 * 3. 管理服务器Socket的生命周期 * 4. 提供回调机制处理连接事件 - * + *

* 该类是服务器端网络编程的核心组件,通过非阻塞IO和事件通知机制,实现了高效的连接处理: * - 支持Future和CompletionHandler两种异步编程模式 * - 实现了连接请求的排队和限流处理,避免服务器资源耗尽 * - 提供了优雅的异常处理和资源管理机制 * - 在低内存模式下采用特殊的资源管理策略 - * + * * @author 三刀 * @version V1.0 , 2020/5/25 */ @@ -45,37 +45,32 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc * 底层的服务器Socket通道,用于实际的网络IO操作 */ private final ServerSocketChannel serverSocketChannel; - + /** * 异步通道组,用于管理通道的线程资源和事件分发 */ private final EnhanceAsynchronousChannelGroup enhanceAsynchronousChannelGroup; - + /** * 接受连接的回调处理器,用于处理新连接建立后的回调逻辑 */ private CompletionHandler acceptCompletionHandler; - - /** - * 用于Future方式调用时的回调处理器 - */ - private FutureCompletionHandler acceptFuture; - + /** * 接受连接操作的附加对象,可在回调时传递额外的上下文信息 */ private Object attachment; - + /** * 用于接受连接操作的选择键,管理通道的接受事件注册 */ private SelectionKey selectionKey; - + /** * 标识是否有待处理的接受连接操作 */ private boolean acceptPending; - + /** * 是否启用低内存模式 * 在低内存模式下,会采用特殊的内存管理策略以减少内存占用 @@ -91,9 +86,9 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 构造函数 * 创建一个新的增强型异步服务器Socket通道实例 - * + * * @param enhanceAsynchronousChannelGroup 关联的异步通道组,用于管理该通道的资源 - * @param lowMemory 是否启用低内存模式 + * @param lowMemory 是否启用低内存模式 * @throws IOException 如果创建底层通道时发生IO错误 */ EnhanceAsynchronousServerSocketChannel(EnhanceAsynchronousChannelGroup enhanceAsynchronousChannelGroup, boolean lowMemory) throws IOException { @@ -106,8 +101,8 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 将服务器Socket绑定到指定的本地地址 - * - * @param local 要绑定的本地地址 + * + * @param local 要绑定的本地地址 * @param backlog 连接请求队列的最大长度 * @return 返回当前服务器Socket通道实例 * @throws IOException 如果绑定操作失败 @@ -120,8 +115,8 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 设置服务器Socket的选项 - * - * @param name 选项名称 + * + * @param name 选项名称 * @param value 选项值 * @return 返回当前服务器Socket通道实例 * @throws IOException 如果设置选项时发生错误 @@ -134,7 +129,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 获取服务器Socket的选项值 - * + * * @param name 选项名称 * @return 返回指定选项的当前值 * @throws IOException 如果获取选项值时发生错误 @@ -146,7 +141,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 获取服务器Socket支持的所有选项 - * + * * @return 返回支持的选项集合 */ @Override @@ -157,9 +152,9 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 异步接受客户端连接请求 * 该方法实现了异步接受连接的功能,通过回调机制通知连接建立的结果 - * + * * @param attachment 附加对象,可在回调时获取 - * @param handler 连接完成的回调处理器 + * @param handler 连接完成的回调处理器 * @throws AcceptPendingException 如果已有一个待处理的接受连接操作 */ @Override @@ -183,12 +178,6 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc */ public void doAccept() { try { - //此前通过Future调用,且触发了cancel - if (acceptFuture != null && acceptFuture.isDone()) { - resetAccept(); - EnhanceAsynchronousChannelGroup.removeOps(selectionKey, SelectionKey.OP_ACCEPT); - return; - } SocketChannel socketChannel = null; if (acceptInvoker++ < EnhanceAsynchronousChannelGroup.MAX_INVOKER) { socketChannel = serverSocketChannel.accept(); @@ -232,7 +221,6 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc */ private void resetAccept() { acceptPending = false; - acceptFuture = null; acceptCompletionHandler = null; attachment = null; } @@ -240,20 +228,17 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 以Future方式接受连接 * 提供基于Future的异步接受连接方式,允许调用者通过Future对象获取连接结果 - * + * * @return 返回Future对象,可用于获取连接结果 */ @Override public Future accept() { - FutureCompletionHandler acceptFuture = new FutureCompletionHandler<>(); - accept(null, acceptFuture); - this.acceptFuture = acceptFuture; - return acceptFuture; + throw new UnsupportedOperationException(); } /** * 获取服务器Socket的本地地址 - * + * * @return 返回服务器Socket绑定的本地地址 * @throws IOException 如果获取地址时发生IO错误 */ @@ -264,7 +249,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 检查服务器Socket通道是否打开 - * + * * @return 如果通道处于打开状态返回true,否则返回false */ @Override @@ -275,7 +260,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 关闭服务器Socket通道 * 关闭底层的服务器Socket通道,释放相关资源 - * + * * @throws IOException 如果关闭时发生IO错误 */ @Override diff --git a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java index be2ddcec..e2b8d697 100644 --- a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java +++ b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java @@ -24,6 +24,7 @@ import java.nio.channels.ShutdownChannelGroupException; import java.nio.channels.SocketChannel; import java.nio.channels.WritePendingException; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -237,9 +238,9 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { private void doConnect(SocketAddress remote, A attachment, CompletionHandler completionHandler) { try { //此前通过Future调用,且触发了cancel - if (completionHandler instanceof FutureCompletionHandler && ((FutureCompletionHandler) completionHandler).isDone()) { - return; - } +// if (completionHandler instanceof FutureCompletionHandler && ((FutureCompletionHandler) completionHandler).isDone()) { +// return; +// } boolean connected = channel.isConnectionPending(); if (connected || channel.connect(remote)) { connected = channel.finishConnect(); @@ -264,9 +265,7 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { @Override public Future connect(SocketAddress remote) { - FutureCompletionHandler connectFuture = new FutureCompletionHandler<>(); - connect(remote, null, connectFuture); - return connectFuture; + throw new UnsupportedOperationException(); } @Override @@ -284,13 +283,14 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { this.readBuffer = readBuffer; this.readAttachment = attachment; this.readCompletionHandler = (CompletionHandler) handler; - doRead(handler instanceof FutureCompletionHandler); + doRead(EnhanceAsynchronousChannelProvider.SYNC_READ_HANDLER.equals(readCompletionHandler)); } + @Override public final Future read(ByteBuffer readBuffer) { - FutureCompletionHandler readFuture = new FutureCompletionHandler<>(); - read(readBuffer, 0, TimeUnit.MILLISECONDS, null, readFuture); + CompletableFuture readFuture = new CompletableFuture<>(); + read(readBuffer, 0, TimeUnit.MILLISECONDS, readFuture, EnhanceAsynchronousChannelProvider.SYNC_READ_HANDLER); return readFuture; } @@ -348,11 +348,11 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { return; } // 处理Future调用被取消的情况 - if (readCompletionHandler instanceof FutureCompletionHandler && ((FutureCompletionHandler) readCompletionHandler).isDone()) { - EnhanceAsynchronousChannelGroup.removeOps(readSelectionKey, SelectionKey.OP_READ); - resetRead(); - return; - } +// if (readCompletionHandler instanceof FutureCompletionHandler && ((FutureCompletionHandler) readCompletionHandler).isDone()) { +// EnhanceAsynchronousChannelGroup.removeOps(readSelectionKey, SelectionKey.OP_READ); +// resetRead(); +// return; +// } // 低内存模式下的特殊处理:当没有缓冲区时,直接返回可读信号 if (lowMemory && direct && readBuffer == null) { CompletionHandler completionHandler = readCompletionHandler; @@ -376,23 +376,26 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { } //注册至异步线程 - if (readSize == 0 && readCompletionHandler instanceof FutureCompletionHandler) { - EnhanceAsynchronousChannelGroup.removeOps(readSelectionKey, SelectionKey.OP_READ); - group().commonWorker.addRegister(selector -> { - try { - channel.register(selector, SelectionKey.OP_READ, EnhanceAsynchronousSocketChannel.this); - } catch (ClosedChannelException e) { - doRead(true); - } - }); - return; - } - //释放内存 - if (lowMemory && readSize == 0 && readBuffer.position() == 0) { - readBuffer = null; - readCompletionHandler.completed(EnhanceAsynchronousChannelProvider.READ_MONITOR_SIGNAL, readAttachment); + if (readSize == 0) { + if (EnhanceAsynchronousChannelProvider.SYNC_READ_HANDLER.equals(readCompletionHandler)) { + EnhanceAsynchronousChannelGroup.removeOps(readSelectionKey, SelectionKey.OP_READ); + group().commonWorker.addRegister(selector -> { + try { + channel.register(selector, SelectionKey.OP_READ, EnhanceAsynchronousSocketChannel.this); + } catch (ClosedChannelException e) { + doRead(true); + } + }); + return; + } + //释放内存 + if (lowMemory && readBuffer.position() == 0) { + readBuffer = null; + readCompletionHandler.completed(EnhanceAsynchronousChannelProvider.READ_MONITOR_SIGNAL, readAttachment); + } } + if (readSize != 0 || !hasRemain) { CompletionHandler completionHandler = readCompletionHandler; Object attach = readAttachment; diff --git a/aio-core/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java b/aio-core/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java deleted file mode 100644 index 4f72b44f..00000000 --- a/aio-core/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java +++ /dev/null @@ -1,141 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2017-2021, org.smartboot. All rights reserved. - * project name: smart-socket - * file name: FutureCompletionHandler.java - * Date: 2021-07-29 - * Author: sandao (zhengjunweimail@163.com) - * - ******************************************************************************/ - -package org.smartboot.socket.enhance; - -import java.nio.channels.CompletionHandler; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * 一个同时实现了CompletionHandler和Future接口的工具类,用于异步操作的完成处理和Future模式支持。 - *

- * 该类提供了以下功能: - * 1. 作为CompletionHandler处理异步操作的完成和失败回调 - * 2. 作为Future提供异步操作的结果获取、取消和状态查询 - * 3. 支持带超时的异步操作结果获取 - *

- * @param 异步操作的结果类型 - * @param 异步操作的附加参数类型 - */ -public final class FutureCompletionHandler implements CompletionHandler, Future { - /** 异步操作的执行结果 */ - private V result; - /** 标记异步操作是否已完成(成功完成、失败或被取消) */ - private boolean done = false; - /** 标记异步操作是否被取消 */ - private boolean cancel = false; - /** 异步操作执行过程中发生的异常 */ - private Throwable exception; - - /** - * 异步操作成功完成时的回调方法 - * @param result 异步操作的执行结果 - * @param selectionKey 异步操作的附加参数 - */ - @Override - public void completed(V result, A selectionKey) { - this.result = result; - done = true; - synchronized (this) { - this.notify(); - } - } - - /** - * 异步操作失败时的回调方法 - * @param exc 异步操作执行过程中发生的异常 - * @param attachment 异步操作的附加参数 - */ - @Override - public void failed(Throwable exc, A attachment) { - exception = exc; - done = true; - } - - /** - * 尝试取消异步操作的执行 - * @param mayInterruptIfRunning 是否允许中断正在执行的操作(在此实现中该参数不起作用) - * @return 如果操作成功取消返回true,如果操作已完成或已被取消返回false - */ - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (done || cancel) { - return false; - } - cancel = true; - done = true; - synchronized (this) { - notify(); - } - return true; - } - - /** - * 检查异步操作是否被取消 - * @return 如果操作已被取消返回true,否则返回false - */ - @Override - public boolean isCancelled() { - return cancel; - } - - /** - * 检查异步操作是否已完成 - * @return 如果操作已完成(包括成功完成、失败或被取消)返回true,否则返回false - */ - @Override - public boolean isDone() { - return done; - } - - /** - * 获取异步操作的执行结果,如果操作未完成则阻塞等待 - * @return 异步操作的执行结果 - * @throws InterruptedException 等待过程中线程被中断 - * @throws ExecutionException 异步操作执行过程中发生异常 - */ - @Override - public synchronized V get() throws InterruptedException, ExecutionException { - if (done) { - if (exception != null) { - throw new ExecutionException(exception); - } - return result; - } else { - wait(); - } - return get(); - } - - /** - * 在指定的超时时间内获取异步操作的执行结果,如果操作未完成则阻塞等待 - * @param timeout 超时时间 - * @param unit 时间单位 - * @return 异步操作的执行结果 - * @throws InterruptedException 等待过程中线程被中断 - * @throws ExecutionException 异步操作执行过程中发生异常 - * @throws TimeoutException 超过指定时间仍未获得结果 - */ - @Override - public synchronized V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - if (done) { - return get(); - } else { - wait(unit.toMillis(timeout)); - } - if (done) { - return get(); - } - throw new TimeoutException(); - } - -} diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java index 3cede2c4..818957f9 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java @@ -38,7 +38,7 @@ final class IoServerConfig { /** * 当前smart-socket版本号 */ - public static final String VERSION = "v1.7.4"; + public static final String VERSION = "v1.7.5-SNAPSHOT"; /** * 消息体缓存大小,字节 diff --git a/aio-pro/pom.xml b/aio-pro/pom.xml index 0c9c3680..cd994b48 100644 --- a/aio-pro/pom.xml +++ b/aio-pro/pom.xml @@ -19,7 +19,7 @@ io.github.smartboot.socket smart-socket-parent - 1.7.4 + 1.7.5-SNAPSHOT ../smart-socket-parent diff --git a/aio-pro/src/main/java/org/smartboot/socket/extension/ssl/SslAsynchronousSocketChannel.java b/aio-pro/src/main/java/org/smartboot/socket/extension/ssl/SslAsynchronousSocketChannel.java index c3ef601f..27f80ea0 100644 --- a/aio-pro/src/main/java/org/smartboot/socket/extension/ssl/SslAsynchronousSocketChannel.java +++ b/aio-pro/src/main/java/org/smartboot/socket/extension/ssl/SslAsynchronousSocketChannel.java @@ -13,7 +13,6 @@ import org.smartboot.socket.buffer.BufferPagePool; import org.smartboot.socket.buffer.VirtualBuffer; import org.smartboot.socket.channels.AsynchronousSocketChannelProxy; import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider; -import org.smartboot.socket.enhance.FutureCompletionHandler; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; @@ -22,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -227,8 +227,8 @@ public class SslAsynchronousSocketChannel extends AsynchronousSocketChannelProxy @Override public Future read(ByteBuffer dst) { - FutureCompletionHandler readFuture = new FutureCompletionHandler<>(); - read(dst, 0, TimeUnit.MILLISECONDS, null, readFuture); + CompletableFuture readFuture = new CompletableFuture<>(); + read(dst, 0, TimeUnit.MILLISECONDS, readFuture, EnhanceAsynchronousChannelProvider.SYNC_READ_HANDLER); return readFuture; } diff --git a/benchmark/pom.xml b/benchmark/pom.xml index eeba86f7..31489210 100644 --- a/benchmark/pom.xml +++ b/benchmark/pom.xml @@ -18,7 +18,7 @@ io.github.smartboot.socket aio-pro - 1.7.4 + 1.7.5-SNAPSHOT org.slf4j diff --git a/example/pom.xml b/example/pom.xml index d39a0985..11d6778f 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -23,7 +23,7 @@ io.github.smartboot.socket aio-pro - 1.7.4 + 1.7.5-SNAPSHOT org.apache.commons diff --git a/pom.xml b/pom.xml index 11eb2a7a..486a2dab 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ io.github.smartboot.socket smart-socket-parent - 1.7.4 + 1.7.5-SNAPSHOT 2.6 diff --git a/smart-socket-parent/pom.xml b/smart-socket-parent/pom.xml index b71375a8..f93a84fd 100644 --- a/smart-socket-parent/pom.xml +++ b/smart-socket-parent/pom.xml @@ -15,7 +15,7 @@ 4.0.0 io.github.smartboot.socket smart-socket-parent - 1.7.4 + 1.7.5-SNAPSHOT pom -- Gitee From 4c8ef3038a74cf4ac805ce91dac09e8edb219c36 Mon Sep 17 00:00:00 2001 From: zhengjw22 Date: Wed, 5 Nov 2025 13:26:39 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat(socket):=20=E5=A2=9E=E5=BC=BA=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E9=80=9A=E9=81=93=E8=AF=BB=E5=8F=96=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E5=B9=B6=E4=BC=98=E5=8C=96=E5=A4=9A=E8=B7=AF=E5=A4=8D=E7=94=A8?= =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AF=E6=8F=92=E4=BB=B6=E5=8A=A0=E8=BD=BD?= =?UTF-8?q?-=20=E4=BF=AE=E6=94=B9=20doRead=20=E6=96=B9=E6=B3=95=E7=AD=BE?= =?UTF-8?q?=E5=90=8D=E4=BB=A5=E6=94=AF=E6=8C=81=E7=BA=BF=E7=A8=8B=E5=88=87?= =?UTF-8?q?=E6=8D=A2=E6=8E=A7=E5=88=B6-=20=E5=9C=A8=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E8=AF=BB=E5=8F=96=E5=9C=BA=E6=99=AF=E4=B8=AD=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=20ThreadLocal=20=E6=A0=87=E5=BF=97=E6=9B=BF=E4=BB=A3=E5=8E=9F?= =?UTF-8?q?=E6=9C=89=E7=9A=84=20CompletionHandler=20=E5=88=A4=E6=96=AD=20-?= =?UTF-8?q?=20=E6=9B=B4=E6=96=B0=20EnhanceAsynchronousChannelGroup=20?= =?UTF-8?q?=E4=B8=AD=E7=9A=84=E8=AF=BB=E5=8F=96=E6=B6=88=E8=B4=B9=E9=80=BB?= =?UTF-8?q?=E8=BE=91-=20=E4=BC=98=E5=8C=96=20MultiplexClient=20=E7=9A=84?= =?UTF-8?q?=E6=8F=92=E4=BB=B6=E5=8A=A0=E8=BD=BD=E9=A1=BA=E5=BA=8F=EF=BC=8C?= =?UTF-8?q?=E9=81=BF=E5=85=8D=E9=87=8D=E5=A4=8D=E6=B7=BB=E5=8A=A0=20SSL=20?= =?UTF-8?q?=E6=8F=92=E4=BB=B6=20-=20=E7=A1=AE=E4=BF=9D=20SSL=20=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E5=A5=97=E6=8E=A5=E5=AD=97=E9=80=9A=E9=81=93=E5=9C=A8?= =?UTF-8?q?=E8=AF=BB=E5=8F=96=E6=97=B6=E6=AD=A3=E7=A1=AE=E8=AE=BE=E7=BD=AE?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=A0=87=E5=BF=97-=20=E5=AE=8C=E5=96=84?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=E5=A4=84=E7=90=86=E6=B5=81=E7=A8=8B=E4=B8=AD?= =?UTF-8?q?=E7=9A=84=20doRead=20=E8=B0=83=E7=94=A8=E5=8F=82=E6=95=B0?= =?UTF-8?q?=E4=BC=A0=E9=80=92=20-=20=E7=BB=9F=E4=B8=80=20Future=E8=AF=BB?= =?UTF-8?q?=E5=8F=96=E6=96=B9=E6=B3=95=E4=B8=AD=E5=AF=B9=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E8=AF=BB=E5=8F=96=E7=8A=B6=E6=80=81=E7=9A=84=E7=AE=A1=E7=90=86?= =?UTF-8?q?=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../EnhanceAsynchronousChannelGroup.java | 6 +++--- .../EnhanceAsynchronousChannelProvider.java | 2 +- .../EnhanceAsynchronousSocketChannel.java | 18 ++++++++++++------ .../extension/multiplex/MultiplexClient.java | 7 +++++-- .../ssl/SslAsynchronousSocketChannel.java | 7 ++++++- 5 files changed, 27 insertions(+), 13 deletions(-) diff --git a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java index e32d16eb..e5e6f5ce 100644 --- a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java +++ b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java @@ -88,7 +88,7 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { //仅同步read会用到此线程资源 EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); removeOps(selectionKey, SelectionKey.OP_READ); - asynchronousSocketChannel.doRead(true); + asynchronousSocketChannel.doRead(true, false); } if ((interestOps & SelectionKey.OP_WRITE) > 0) { EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); @@ -97,7 +97,7 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { } }; - private static final Consumer readConsumer = selectionKey -> ((EnhanceAsynchronousSocketChannel) selectionKey.attachment()).doRead(true); + private static final Consumer readConsumer = selectionKey -> ((EnhanceAsynchronousSocketChannel) selectionKey.attachment()).doRead(true, false); /** * 初始化异步通道组实例。 @@ -147,7 +147,7 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { //仅同步read会用到此线程资源 EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); removeOps(selectionKey, SelectionKey.OP_READ); - asynchronousSocketChannel.doRead(true); + asynchronousSocketChannel.doRead(true, false); } else { throw new IllegalStateException("unexpect callback,key valid:" + selectionKey.isValid() + " ,interestOps:" + selectionKey.interestOps()); } diff --git a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java index 74e80249..bdfb8815 100644 --- a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java +++ b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java @@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit; * @version V1.0 , 2020/5/25 */ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChannelProvider { - + public static final ThreadLocal SYNC_READ_FLAG = ThreadLocal.withInitial(() -> false); public static final CompletionHandler> SYNC_READ_HANDLER = new CompletionHandler>() { @Override public void completed(Integer result, CompletableFuture attachment) { diff --git a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java index e2b8d697..cac03cba 100644 --- a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java +++ b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java @@ -152,7 +152,7 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { exception = e; } if (readCompletionHandler != null) { - doRead(true); + doRead(true, false); } if (readSelectionKey != null) { readSelectionKey.cancel(); @@ -283,14 +283,20 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { this.readBuffer = readBuffer; this.readAttachment = attachment; this.readCompletionHandler = (CompletionHandler) handler; - doRead(EnhanceAsynchronousChannelProvider.SYNC_READ_HANDLER.equals(readCompletionHandler)); + boolean syncRead = EnhanceAsynchronousChannelProvider.SYNC_READ_FLAG.get(); + doRead(syncRead, syncRead); } @Override public final Future read(ByteBuffer readBuffer) { CompletableFuture readFuture = new CompletableFuture<>(); - read(readBuffer, 0, TimeUnit.MILLISECONDS, readFuture, EnhanceAsynchronousChannelProvider.SYNC_READ_HANDLER); + EnhanceAsynchronousChannelProvider.SYNC_READ_FLAG.set(true); + try { + read(readBuffer, 0, TimeUnit.MILLISECONDS, readFuture, EnhanceAsynchronousChannelProvider.SYNC_READ_HANDLER); + } finally { + EnhanceAsynchronousChannelProvider.SYNC_READ_FLAG.set(false); + } return readFuture; } @@ -342,7 +348,7 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { * * @param direct 是否直接读取,true表示立即读取,false表示通过事件触发读取 */ - public final void doRead(boolean direct) { + public final void doRead(boolean direct, boolean switchThread) { try { if (readCompletionHandler == null) { return; @@ -377,13 +383,13 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { //注册至异步线程 if (readSize == 0) { - if (EnhanceAsynchronousChannelProvider.SYNC_READ_HANDLER.equals(readCompletionHandler)) { + if (switchThread) { EnhanceAsynchronousChannelGroup.removeOps(readSelectionKey, SelectionKey.OP_READ); group().commonWorker.addRegister(selector -> { try { channel.register(selector, SelectionKey.OP_READ, EnhanceAsynchronousSocketChannel.this); } catch (ClosedChannelException e) { - doRead(true); + doRead(true, false); } }); return; diff --git a/aio-pro/src/main/java/org/smartboot/socket/extension/multiplex/MultiplexClient.java b/aio-pro/src/main/java/org/smartboot/socket/extension/multiplex/MultiplexClient.java index b126fc67..b55b96a0 100644 --- a/aio-pro/src/main/java/org/smartboot/socket/extension/multiplex/MultiplexClient.java +++ b/aio-pro/src/main/java/org/smartboot/socket/extension/multiplex/MultiplexClient.java @@ -201,9 +201,9 @@ public class MultiplexClient { boolean noneSslPlugin = true; // 添加配置的插件 for (Plugin responsePlugin : multiplexOptions.getPlugins()) { - processor.addPlugin(responsePlugin); if (responsePlugin instanceof SslPlugin) { noneSslPlugin = false; + break; } } @@ -216,7 +216,10 @@ public class MultiplexClient { if (multiplexOptions.idleTimeout() > 0) { processor.addPlugin(new IdleStatePlugin<>(multiplexOptions.idleTimeout())); } - + // 添加配置的插件 + for (Plugin responsePlugin : multiplexOptions.getPlugins()) { + processor.addPlugin(responsePlugin); + } firstConnected = false; } } diff --git a/aio-pro/src/main/java/org/smartboot/socket/extension/ssl/SslAsynchronousSocketChannel.java b/aio-pro/src/main/java/org/smartboot/socket/extension/ssl/SslAsynchronousSocketChannel.java index 27f80ea0..092273a6 100644 --- a/aio-pro/src/main/java/org/smartboot/socket/extension/ssl/SslAsynchronousSocketChannel.java +++ b/aio-pro/src/main/java/org/smartboot/socket/extension/ssl/SslAsynchronousSocketChannel.java @@ -228,7 +228,12 @@ public class SslAsynchronousSocketChannel extends AsynchronousSocketChannelProxy @Override public Future read(ByteBuffer dst) { CompletableFuture readFuture = new CompletableFuture<>(); - read(dst, 0, TimeUnit.MILLISECONDS, readFuture, EnhanceAsynchronousChannelProvider.SYNC_READ_HANDLER); + EnhanceAsynchronousChannelProvider.SYNC_READ_FLAG.set(true); + try { + read(dst, 0, TimeUnit.MILLISECONDS, readFuture, EnhanceAsynchronousChannelProvider.SYNC_READ_HANDLER); + } finally { + EnhanceAsynchronousChannelProvider.SYNC_READ_FLAG.set(false); + } return readFuture; } -- Gitee