本文共 9567 字,大约阅读时间需要 31 分钟。
Netty+Spring Boot 实现 长连接 - 心跳 - 自动重连 - 通信
最近,我被Netty这个网络框架差点整疯了。作为一个对网络技术一窍不通的程序员,我通常很少接触网络相关的技术,但最近公司和各大论坛里Netty的热度简直不可思议。我被好奇心驱使,想要试一试,结果发现自己搭建了一个简单的Netty项目来实现长连接、心跳、自动重连和通信功能。这个过程充满了挑战和惊喜,特别是当我第一次成功运行代码时,那种成就感简直无法描述。
作为一个对网络技术生疏的人,我原本只是简单地了解一下Netty的基本概念,但很快我就被它的强大功能深深吸引。Netty是一个基于NIO的高性能异步网络框架,处理大量连接和数据流量非常高效。而Spring Boot作为一个流行的微服务框架,能够快速实现各种功能,这两者的结合让我看到了无限的可能。
在项目中,我主要实现了以下几个核心功能:
在实现过程中,我深刻体会到了Netty的强大之处。通过自定义 Pipeline 和各种handler,我能够灵活配置每个连接的处理逻辑。IdleStateHandler 用于检测空闲时间,确保在一定时间内发送心跳信号。MsgPckEncode 和 MsgPckDecode 用于数据的编码和解码,确保数据能够准确传输。
在项目初期,我遇到了一个棘手的问题:当客户端和服务器同时运行时,Scanner 和 Channel 调用之间的竞争问题,导致程序运行异常。经过反复调试和分析,我发现问题出在代码中对资源管理的不当处理。通过引入 ChannelFuture 和事件驱动模型,我成功解决了这个问题。
以下是改进后的代码示例,展示了客户端和服务器端的实现:
package com.gzky.study.netty;import io.netty.bootstrap.Bootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.timeout.IdleStateHandler;import java.util.Scanner;import java.util.concurrent.TimeUnit;public class Client { private NioEventLoopGroup worker = new NioEventLoopGroup(); private Channel channel; private Bootstrap bootstrap; boolean flag = true; public static void main(String[] args) { Client client = new Client(); client.start(); client.sendData(); client.close(); } private void close() { channel.close(); worker.shutdownGracefully(); } private void start() { bootstrap = new Bootstrap(); bootstrap.group(worker) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer () { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(3, 3, 5)); pipeline.addLast(new MsgPckDecode()); pipeline.addLast(new MsgPckEncode()); pipeline.addLast(new Client3Handler(Client.this)); } }); doConnect(); } protected void doConnect() { if (channel != null && channel.isActive()) { return; } ChannelFuture connect = bootstrap.connect("127.0.0.1", 8089); connect.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { channel = channelFuture.channel(); System.out.println("连接成功"); } else { if (flag) { System.out.println("每隔2s重连...."); channelFuture.channel().eventLoop().schedule(new Runnable() { @Override public void run() { doConnect(); } }, 2, TimeUnit.SECONDS); } } } }); } private void sendData() { while (channel == null || !channel.isActive()) { System.out.println("等待连接···"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("连接成功等待输入:"); flag = true; Scanner sc = new Scanner(System.in); while (flag) { String nextLine = sc.nextLine(); if ("end".equalsIgnoreCase(nextLine)) { flag = false; } Model model = new Model(); model.setType(TypeData.CUSTOMER); model.setBody(nextLine); channel.writeAndFlush(model); } }} package com.gzky.study.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.timeout.IdleStateHandler;public class Server { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(4); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(8089) .childHandler(new ChannelInitializer () { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(10, 3, 10)); pipeline.addLast(new MsgPckDecode()); pipeline.addLast(new MsgPckEncode()); pipeline.addLast(new Server3Handler()); } }); System.out.println("start server 8089 --"); ChannelFuture sync = serverBootstrap.bind().sync(); sync.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }} package com.gzky.study.netty;public interface TypeData { byte PING = 1; byte PONG = 2; byte CUSTOMER = 3;} package com.gzky.study.netty;import org.msgpack.annotation.Message;import java.io.Serializable;@Messagepublic class Model implements Serializable { private static final long serialVersionUID = 1L; private int type; private String body; public int getType() { return type; } public void setType(int type) { this.type = type; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } @Override public String toString() { return "Model{" + "type=" + type + ", body='" + body + '\'' + '}'; }} package com.gzky.study.netty;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;import org.msgpack.MessagePack;public class MsgPckEncode extends MessageToByteEncoder
package com.gzky.study.netty;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageDecoder;import org.msgpack.MessagePack;import java.util.List;public class MsgPckDecode extends MessageToMessageDecoder
package com.gzky.study.netty;import io.netty.channel.ChannelHandlerContext;public class Client3Handler extends Middleware { private Client client; public Client3Handler(Client client) { super("client"); this.client = client; } @Override protected void handlerData(ChannelHandlerContext ctx, Object msg) { Model model = (Model) msg; System.out.println("client 收到数据: " + model.toString()); } @Override protected void handlerAllIdle(ChannelHandlerContext ctx) { super.handlerAllIdle(ctx); sendPingMsg(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); client.doConnect(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(name + "exception : " + cause.toString()); }} package com.gzky.study.netty;import io.netty.channel.ChannelHandlerContext;public class Server3Handler extends Middleware { public Server3Handler() { super("server"); } @Override protected void handlerData(ChannelHandlerContext ctx, Object msg) { Model model = (Model) msg; System.out.println("server 接收数据: " + model.toString()); model.setType(TypeData.CUSTOMER); model.setBody("client你好,server已接收到数据:" + model.getBody()); ctx.channel().writeAndFlush(model); System.out.println("server 发送数据: " + model.toString()); } @Override protected void handlerReaderIdle(ChannelHandlerContext ctx) { super.handlerReaderIdle(ctx); System.err.println(" ---- client " + ctx.channel().remoteAddress().toString() + " reader timeOut, --- close it"); ctx.close(); }} java -jar target/app.jar
java -jar target/client.jar
在客户端控制台输入任意消息(如"hello"),服务端控制台会输出相应的回应。
通过这次项目,我深刻体会到了Netty的强大力量。虽然过程中遇到了一些困难,但通过不断的调试和学习,我最终成功实现了长连接、心跳、自动重连和通信功能。对于刚接触Netty的开发者来说,这个项目是一个非常好的起点,可以帮助你更好地理解Netty的工作原理和实际应用场景。如果你对网络编程感兴趣,不妨试试这个项目,它会带你走向网络开发的深度世界!
转载地址:http://qvcfk.baihongyu.com/