# netty-socket **Repository Path**: ryub/netty-socket ## Basic Information - **Project Name**: netty-socket - **Description**: 开箱即用的socket服务端/客户端,使用netty作为底层框架 - **Primary Language**: Java - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2022-04-29 - **Last Updated**: 2022-04-29 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 基于netty的socket服务端/客户端 简化了服务端/客户端交互的方式 ## 快速入门 让服务端与客户端对消息+1 ### 服务端 继承NettyYmaServer,重写onOpen/onMessage方法 调用无参的构造方法在默认8080断开开启监听 ```dtd public class Server extends NettyYmaServer { @Override public Object onOpen(YmaSession session) { System.out.println("客户端连接"); send(session,"1".getBytes()); return null; } @Override public Object onMessage(YmaSession session, Object message) { String res = new String((byte[]) message); System.out.println(res); int req = Integer.parseInt(res) + 1; send(session,String.valueOf(req).getBytes()); return null; } public static void main(String[] args) { new Server().start(); } } ``` ### 客户端(可以多启几个客户端,试试性能) 继承NettyYmaClient,重写onMessage方法 使用无参的构造方法,默认连接8080端口 ```dtd public class Client extends NettyYmaClient { @Override public Object onMessage(Object message) { String res = new String((byte[]) message); System.out.println(res); int req = Integer.parseInt(res) + 1; send(String.valueOf(req).getBytes()); return null; } public static void main(String[] args) { Client client = new Client(); client.connect(); } } ``` ## 实现详情 ### 服务端 - 1.定义服务端接口 ```dtd public interface YmaServer { Object onOpen(YmaSession session); /** * 收到消息 * * 默认未byte[] * @param session * @param message * @return */ Object onMessage(YmaSession session,Object message); void onError(YmaSession session,Throwable e); Object onClose(YmaSession session); /** * 主动发信息 * @param session * @param message */ void send (YmaSession session,Object message); void onStart(); void onStop(); void start(); void stop(); } ``` - 2.定义配置文件接口 ```dtd public interface YmaConfig { EventLoopGroup getWorkerGroup(); SocketAddress getSocketAddress(); } ``` - 3.服务端配置文件接口扩展 ```dtd public interface YmlServerConfig extends YmaConfig { EventLoopGroup getBoosGroup(); } ``` - 4.实现服务端start/stop ```dtd public abstract class NettyYmaServer extends SessionAbleYmaServer { private static final Logger log = LoggerFactory.getLogger(NettyYmaServer.class); private final YmlServerConfig config; private Channel serverChannel; public NettyYmaServer(YmlServerConfig config){ this.config = config; } public NettyYmaServer(){ this(new DefaultServerConfig()); } @Override public void start(){ ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(config.getBoosGroup(),config.getWorkerGroup()) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new ByteArrayEncoder()) .addLast(new ByteArrayDecoder()) .addLast(NettyYmaServer.this); } }); ChannelFuture future = bootstrap.bind(config.getSocketAddress()); future.addListener(f -> { if(f.isDone() && f.isSuccess()){ this.serverChannel = future.channel(); log.info("Start ws server success"); } if(f.isDone() && f.cause() != null){ log.error("Start ws server fail throw={}", f.cause().getMessage()); future.channel().close(); } }); } @Override public void stop(){ if(serverChannel != null && serverChannel.isOpen()){ final int waitSec = 10; CountDownLatch latch = new CountDownLatch(1); serverChannel.close().addListener(f -> { config.getWorkerGroup().schedule(() -> { log.info("Shutdown dispatcher success..."); config.getWorkerGroup().shutdownGracefully(); latch.countDown(); }, waitSec - 2, TimeUnit.SECONDS); log.info("Close ws server socket success={}", f.isSuccess()); config.getBoosGroup().shutdownGracefully(); }); try{ latch.await(waitSec, TimeUnit.SECONDS); }catch (InterruptedException e){ log.warn("Shutdown ws server interrupted exception={}", e.getMessage()); } } } .... ``` - 5.父类实现session ```dtd @ChannelHandler.Sharable public abstract class SessionAbleYmaServer extends ChannelInboundHandlerAdapter implements YmaServer { private static final Logger log = LoggerFactory.getLogger(SessionAbleYmaServer.class); protected final ConcurrentHashMap sessions = new ConcurrentHashMap<>(); /** * 接受client发送的消息 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { YmaSession session = sessions.get(ctx); Object res = onMessage(session, msg); send(session,res); } /** * 不懂 * * 通知处理器最后的channelRead()是当前批处理中的最后一条消息时调用 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } /** * 捕获到异常 * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { onError(sessions.get(ctx),cause); ctx.close(); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { YmaSession session = sessions.get(ctx); Object res = onClose(session); send(session,res); sessions.remove(ctx); } /** * 客户端连接 * * 创建session * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { YmaSession session = new YmaSession(); session.put("context",ctx); sessions.put(ctx,session); Object res = onOpen(session); send(session,res); } @Override public void send(YmaSession session, Object message) { if(null != message) { ((ChannelHandlerContext)session.get("context")).writeAndFlush(message); }else { if (log.isDebugEnabled()) { log.trace("message is null"); } } } } ``` - 6.默认的配置实现 ```dtd public class DefaultServerConfig implements YmlServerConfig { @Override public EventLoopGroup getBoosGroup() { return new NioEventLoopGroup(); } @Override public EventLoopGroup getWorkerGroup() { return new NioEventLoopGroup(); } @Override public SocketAddress getSocketAddress() { return new InetSocketAddress("127.0.0.1",8080); } } ``` - 7.数据编码与解码 编码 ```dtd public class ByteArrayEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { if (msg instanceof byte[]) { byte[] data = (byte[]) msg; out.writeInt(data.length); out.writeBytes(data); } } } ``` 解码 ```dtd public class ByteArrayDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { if (in.readableBytes() < 4) { return; } in.markReaderIndex(); int dataLength = in.readInt(); if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); out.add(data); } } ``` ### 客户端类似(略)