博客
关于我
Netty的网络框架差点让我一夜秃头,哭了
阅读量:789 次
发布时间:2023-02-15

本文共 9567 字,大约阅读时间需要 31 分钟。

Netty+Spring Boot 实现 长连接 - 心跳 - 自动重连 - 通信

最近,我被Netty这个网络框架差点整疯了。作为一个对网络技术一窍不通的程序员,我通常很少接触网络相关的技术,但最近公司和各大论坛里Netty的热度简直不可思议。我被好奇心驱使,想要试一试,结果发现自己搭建了一个简单的Netty项目来实现长连接、心跳、自动重连和通信功能。这个过程充满了挑战和惊喜,特别是当我第一次成功运行代码时,那种成就感简直无法描述。

项目背景

作为一个对网络技术生疏的人,我原本只是简单地了解一下Netty的基本概念,但很快我就被它的强大功能深深吸引。Netty是一个基于NIO的高性能异步网络框架,处理大量连接和数据流量非常高效。而Spring Boot作为一个流行的微服务框架,能够快速实现各种功能,这两者的结合让我看到了无限的可能。

技术实现

在项目中,我主要实现了以下几个核心功能:

  • 长连接:保持客户端和服务器之间的长期连接
  • 心跳机制:定期发送心跳信号,确保连接的可行性
  • 自动重连:在连接中断时自动重建立立连接
  • 通信:实现消息的双向传输
  • 在实现过程中,我深刻体会到了Netty的强大之处。通过自定义 Pipeline 和各种handler,我能够灵活配置每个连接的处理逻辑。IdleStateHandler 用于检测空闲时间,确保在一定时间内发送心跳信号。MsgPckEncode 和 MsgPckDecode 用于数据的编码和解码,确保数据能够准确传输。

    问题与解决

    在项目初期,我遇到了一个棘手的问题:当客户端和服务器同时运行时,Scanner 和 Channel 调用之间的竞争问题,导致程序运行异常。经过反复调试和分析,我发现问题出在代码中对资源管理的不当处理。通过引入 ChannelFuture 和事件驱动模型,我成功解决了这个问题。

    代码示例

    以下是改进后的代码示例,展示了客户端和服务器端的实现:

    客户端

    package com.gzky.study.netty;import io.netty.bootstrap.Bootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.timeout.IdleStateHandler;import java.util.Scanner;import java.util.concurrent.TimeUnit;public class Client {    private NioEventLoopGroup worker = new NioEventLoopGroup();    private Channel channel;    private Bootstrap bootstrap;    boolean flag = true;    public static void main(String[] args) {        Client client = new Client();        client.start();        client.sendData();        client.close();    }    private void close() {        channel.close();        worker.shutdownGracefully();    }    private void start() {        bootstrap = new Bootstrap();        bootstrap.group(worker)                .channel(NioSocketChannel.class)                .option(ChannelOption.TCP_NODELAY, true)                .handler(new ChannelInitializer
    () { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(3, 3, 5)); pipeline.addLast(new MsgPckDecode()); pipeline.addLast(new MsgPckEncode()); pipeline.addLast(new Client3Handler(Client.this)); } }); doConnect(); } protected void doConnect() { if (channel != null && channel.isActive()) { return; } ChannelFuture connect = bootstrap.connect("127.0.0.1", 8089); connect.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { channel = channelFuture.channel(); System.out.println("连接成功"); } else { if (flag) { System.out.println("每隔2s重连...."); channelFuture.channel().eventLoop().schedule(new Runnable() { @Override public void run() { doConnect(); } }, 2, TimeUnit.SECONDS); } } } }); } private void sendData() { while (channel == null || !channel.isActive()) { System.out.println("等待连接···"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("连接成功等待输入:"); flag = true; Scanner sc = new Scanner(System.in); while (flag) { String nextLine = sc.nextLine(); if ("end".equalsIgnoreCase(nextLine)) { flag = false; } Model model = new Model(); model.setType(TypeData.CUSTOMER); model.setBody(nextLine); channel.writeAndFlush(model); } }}

    服务器端

    package com.gzky.study.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.timeout.IdleStateHandler;public class Server {    public static void main(String[] args) {        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup(4);        try {            ServerBootstrap serverBootstrap = new ServerBootstrap();            serverBootstrap.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .localAddress(8089)                    .childHandler(new ChannelInitializer
    () { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(10, 3, 10)); pipeline.addLast(new MsgPckDecode()); pipeline.addLast(new MsgPckEncode()); pipeline.addLast(new Server3Handler()); } }); System.out.println("start server 8089 --"); ChannelFuture sync = serverBootstrap.bind().sync(); sync.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}

    配置项

    package com.gzky.study.netty;public interface TypeData {    byte PING = 1;    byte PONG = 2;    byte CUSTOMER = 3;}

    消息类型分离器

    package com.gzky.study.netty;import org.msgpack.annotation.Message;import java.io.Serializable;@Messagepublic class Model implements Serializable {    private static final long serialVersionUID = 1L;    private int type;    private String body;    public int getType() {        return type;    }    public void setType(int type) {        this.type = type;    }    public String getBody() {        return body;    }    public void setBody(String body) {        this.body = body;    }    @Override    public String toString() {        return "Model{" +                "type=" + type +                ", body='" + body + '\'' +                '}';    }}

    编码器

    package com.gzky.study.netty;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;import org.msgpack.MessagePack;public class MsgPckEncode extends MessageToByteEncoder {    @Override    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf) throws Exception {        MessagePack pack = new MessagePack();        byte[] write = pack.write(msg);        buf.writeBytes(write);    }}

    解码器

    package com.gzky.study.netty;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageDecoder;import org.msgpack.MessagePack;import java.util.List;public class MsgPckDecode extends MessageToMessageDecoder {    @Override    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception {        final byte[] array;        final int length = msg.readableBytes();        array = new byte[length];        msg.getBytes(msg.readerIndex(), array, 0, length);        MessagePack pack = new MessagePack();        out.add(pack.read(array, Model.class));    }}

    客户端控制器

    package com.gzky.study.netty;import io.netty.channel.ChannelHandlerContext;public class Client3Handler extends Middleware {    private Client client;    public Client3Handler(Client client) {        super("client");        this.client = client;    }    @Override    protected void handlerData(ChannelHandlerContext ctx, Object msg) {        Model model = (Model) msg;        System.out.println("client 收到数据: " + model.toString());    }    @Override    protected void handlerAllIdle(ChannelHandlerContext ctx) {        super.handlerAllIdle(ctx);        sendPingMsg(ctx);    }    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        super.channelInactive(ctx);        client.doConnect();    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        System.out.println(name + "exception : " + cause.toString());    }}

    服务端控制器

    package com.gzky.study.netty;import io.netty.channel.ChannelHandlerContext;public class Server3Handler extends Middleware {    public Server3Handler() {        super("server");    }    @Override    protected void handlerData(ChannelHandlerContext ctx, Object msg) {        Model model = (Model) msg;        System.out.println("server 接收数据: " + model.toString());        model.setType(TypeData.CUSTOMER);        model.setBody("client你好,server已接收到数据:" + model.getBody());        ctx.channel().writeAndFlush(model);        System.out.println("server 发送数据: " + model.toString());    }    @Override    protected void handlerReaderIdle(ChannelHandlerContext ctx) {        super.handlerReaderIdle(ctx);        System.err.println(" ---- client " + ctx.channel().remoteAddress().toString() + " reader timeOut, --- close it");        ctx.close();    }}

    测试

  • 启动服务端
    java -jar target/app.jar
  • 启动客户端
    java -jar target/client.jar
  • 客户端发消息
    在客户端控制台输入任意消息(如"hello"),服务端控制台会输出相应的回应。
  • 总结

    通过这次项目,我深刻体会到了Netty的强大力量。虽然过程中遇到了一些困难,但通过不断的调试和学习,我最终成功实现了长连接、心跳、自动重连和通信功能。对于刚接触Netty的开发者来说,这个项目是一个非常好的起点,可以帮助你更好地理解Netty的工作原理和实际应用场景。如果你对网络编程感兴趣,不妨试试这个项目,它会带你走向网络开发的深度世界!

    转载地址:http://qvcfk.baihongyu.com/

    你可能感兴趣的文章
    Netty工作笔记0024---SelectionKey API
    查看>>
    Netty工作笔记0025---SocketChannel API
    查看>>
    Netty工作笔记0026---NIO 网络编程应用--群聊系统1---编写服务器1
    查看>>
    Netty工作笔记0027---NIO 网络编程应用--群聊系统2--服务器编写2
    查看>>
    Netty工作笔记0028---NIO 网络编程应用--群聊系统3--客户端编写1
    查看>>
    Netty工作笔记0029---NIO 网络编程应用--群聊系统4--客户端编写2
    查看>>
    Netty工作笔记0030---NIO与零拷贝原理剖析
    查看>>
    Netty工作笔记0031---NIO零拷贝应用案例
    查看>>
    Netty工作笔记0032---零拷贝AIO内容梳理
    查看>>
    Netty工作笔记0033---Netty概述
    查看>>
    Netty工作笔记0034---Netty架构设计--线程模型
    查看>>
    Netty工作笔记0035---Reactor模式图剖析
    查看>>
    Netty工作笔记0036---单Reactor单线程模式
    查看>>
    Netty工作笔记0037---主从Reactor多线程
    查看>>
    Netty工作笔记0038---Netty模型--通俗版
    查看>>
    Netty工作笔记0039---Netty模型--详细版
    查看>>
    Netty工作笔记0040---Netty入门--服务端1
    查看>>
    Netty工作笔记0041---Netty入门--服务端2
    查看>>
    Netty工作笔记0042---Netty入门--编写客户端
    查看>>
    Netty工作笔记0043---单Reactor多线程模式
    查看>>