diff --git a/aio-core/pom.xml b/aio-core/pom.xml
index ef7bec2ca64ca69c984bb696957b1e85cf57a274..8ab12a8018efdd486273d61e2ba149de5d937675 100644
--- a/aio-core/pom.xml
+++ b/aio-core/pom.xml
@@ -19,29 +19,13 @@
调用AioQuickServer的各setXX()方法,都是为了设置config的各配置项
@@ -124,7 +124,7 @@ public final class AioQuickServer { */ public void start() throws IOException { if (config.isBannerEnabled()) { - System.out.println(IoServerConfig.BANNER + "\r\n :: smart-socket ::\t(" + IoServerConfig.VERSION + ")"); + System.out.println(IoServerConfig.BANNER + "\r\n :: smart-socket " + (config.isAioEnhance() ? "[enhance]" : "") + "::\t(" + IoServerConfig.VERSION + ")"); } start0(channel -> new TcpAioSession(channel, config, aioReadCompletionHandler, aioWriteCompletionHandler, bufferPool.allocateBufferPage())); } @@ -144,14 +144,17 @@ public final class AioQuickServer { this.innerBufferPool = bufferPool; } this.aioSessionFunction = aioSessionFunction; - if (AIO_ENHANCE_PROVIDER.equals(System.getProperty(ASYNCHRONOUS_CHANNEL_PROVIDER))) { + AsynchronousChannelProvider provider; + if (config.isAioEnhance()) { aioReadCompletionHandler = new ReadCompletionHandler(); + provider = new EnhanceAsynchronousChannelProvider(); } else { concurrentReadCompletionHandlerExecutor = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); aioReadCompletionHandler = new ConcurrentReadCompletionHandler(new Semaphore(config.getThreadNum() - 1), concurrentReadCompletionHandlerExecutor); + provider = AsynchronousChannelProvider.provider(); } - asynchronousChannelGroup = AsynchronousChannelGroup.withFixedThreadPool(config.getThreadNum(), new ThreadFactory() { + asynchronousChannelGroup = provider.openAsynchronousChannelGroup(config.getThreadNum(), new ThreadFactory() { private byte index = 0; @Override @@ -269,7 +272,7 @@ public final class AioQuickServer { /** * 停止服务端 */ - public final void shutdown() { + public void shutdown() { try { if (serverSocketChannel != null) { serverSocketChannel.close(); @@ -311,6 +314,16 @@ public final class AioQuickServer { return this; } + /** + * 是否启用 AIO 增强模式。默认:true + * + * @param enabled true:启用;false:禁用 + */ + public AioQuickServer setAioEnhance(boolean enabled) { + config.setAioEnhance(enabled); + return this; + } + /** * 是否启用控制台Banner打印 * diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java index c540571019037119d5f483c648f0eec6ed1c3dc1..d996af9cbf3fbeeb66452cc17dd2779ed96dfdd5 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java @@ -12,6 +12,7 @@ package org.smartboot.socket.transport; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; /** @@ -55,6 +56,11 @@ public abstract class AioSession { */ public abstract WriteBuffer writeBuffer(); + /** + * 获取读缓冲区对象 + */ + public abstract ByteBuffer readBuffer(); + /** * 强制关闭当前AIOSession。 *若此时还存留待输出的数据,则会导致该部分数据丢失
diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IOUtil.java b/aio-core/src/main/java/org/smartboot/socket/transport/IOUtil.java index e6dab7bc8a43fb3e62f932656a5e5461ab8a1081..046dc4eb8fdcac15404116e0ac144a92ef1d3cba 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/IOUtil.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/IOUtil.java @@ -11,6 +11,7 @@ package org.smartboot.socket.transport; import java.io.IOException; import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.NotYetConnectedException; /** * @author 三刀 @@ -21,13 +22,18 @@ final class IOUtil { * @param channel 需要被关闭的通道 */ public static void close(AsynchronousSocketChannel channel) { + boolean connected = true; try { channel.shutdownInput(); } catch (IOException ignored) { + } catch (NotYetConnectedException e) { + connected = false; } try { - channel.shutdownOutput(); - } catch (IOException ignored) { + if (connected) { + channel.shutdownOutput(); + } + } catch (IOException | NotYetConnectedException ignored) { } try { channel.close(); diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java index 361cd97517004f250ba25769915af9f7b67d396c..5b3b8de0c130c0fdeb55ff76560500297772272f 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java @@ -39,7 +39,7 @@ final class IoServerConfig { /** * 当前smart-socket版本号 */ - public static final String VERSION = "v1.5.10"; + public static final String VERSION = "v1.5.11"; /** * 消息体缓存大小,字节 @@ -99,6 +99,10 @@ final class IoServerConfig { */ private BufferFactory bufferFactory = BufferFactory.DISABLED_BUFFER_FACTORY; + /** + * 启用 aio 增强 + */ + private boolean aioEnhance = true; /** * 获取默认内存块大小 @@ -234,20 +238,31 @@ final class IoServerConfig { this.backlog = backlog; } + public boolean isAioEnhance() { + return aioEnhance; + } + + public void setAioEnhance(boolean aioEnhance) { + this.aioEnhance = aioEnhance; + } + @Override public String toString() { return "IoServerConfig{" + "readBufferSize=" + readBufferSize + - ", writeQueueCapacity=" + writeBufferCapacity + + ", writeBufferSize=" + writeBufferSize + + ", writeBufferCapacity=" + writeBufferCapacity + ", host='" + host + '\'' + ", monitor=" + monitor + ", port=" + port + + ", backlog=" + backlog + ", processor=" + processor + ", protocol=" + protocol + ", bannerEnabled=" + bannerEnabled + ", socketOptions=" + socketOptions + ", threadNum=" + threadNum + - ", writeBufferSize=" + writeBufferSize + + ", bufferFactory=" + bufferFactory + + ", aioEnhance=" + aioEnhance + '}'; } } diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/TcpAioSession.java b/aio-core/src/main/java/org/smartboot/socket/transport/TcpAioSession.java index b535d82b3088ad97751de5cb3c42922e85dfdd56..7eb64a0585f1cb4826719b7f107a080a1c6bcb54 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/TcpAioSession.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/TcpAioSession.java @@ -165,10 +165,15 @@ final class TcpAioSession extends AioSession { /** * @return 输入流 */ - public final WriteBuffer writeBuffer() { + public WriteBuffer writeBuffer() { return byteBuf; } + @Override + public ByteBuffer readBuffer() { + return readBuffer.buffer(); + } + @Override public void awaitRead() { modCount++; diff --git a/aio-enhance/pom.xml b/aio-enhance/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..0894834b615f19ad5ddc5f60b80ba922aadb9975 --- /dev/null +++ b/aio-enhance/pom.xml @@ -0,0 +1,24 @@ + + + +