Mobile wallpaper 1Mobile wallpaper 2Mobile wallpaper 3Mobile wallpaper 4
2820 字
14 分钟
Netty在Java中的实际应用

Netty#

  1. 是什么:一个网络编程框架(基于Java NIO 事件驱动 异步) 异步:Netty 的所有 I/O 操作均为异步 -> 当调用一个方法时,它立即返回一个 ChannelFuture 对象,而操作本身在后台执行。可以通过这个 Future 对象来获取操作完成的通知或等待其完成。

    事件驱动“当数据来了通知我” Netty基于 事件模型 来处理网络事件 -> 当某个事件发生时(如连接建立、数据到达、异常抛出),Netty自动触发相应的事件,调用预先注册的处理器进行处理。这种模型非常高效,可以处理成千上万的并发连接。NIO:Netty的底层完全依赖于 Java NIO -> Java NIO(New Input/Output)是一个I/O API,它提供了一种基于缓冲区、非阻塞的I/O操作方式,NIO高效地处理大量并发连接和数据读写操作。

    **非阻塞I/O操作先返回结果,再工作 -> **当任务来了,它立马告诉它知道了,然后开始工作

    **阻塞I/O操作:**传统的I/O模式,线程必须等待操作完成

    责任链模式:ChannelPipeline 是责任链模式的完美体现。数据进出会像在一条流水线上一样,依次经过多个 ChannelHandler 的处理。每个 Handler 只关心自己负责的逻辑(如解码、日志、业务处理),实现了功能的解耦和高度可定制化。

  2. 作用:让程序在高并发下依然又快又稳

  3. 怎么用:搭建Websokect连接

  • 关键组件
    1. EventLoopGroup和EventLoop:类似线程组和线程,EventLoopGroup分配 EventLoop 来处理新的连接 和 已有的连接中的事件。一般会创建BossGroup和WorkerGroup两个EventLoopGroup。 **EventLoop:事件循环(I/O事件调度器) -> **一个不断循环的执行体,处理绑定的Channel

      BossGroup&WorkerGroup:一个负责接收客户端的连接。另一个处理已被接受的连接的 I/O 操作。

    2. ServerBootstrap:服务器端的组装和启动类,负责将所有核心组件组合在一起并启动服务器。

    3. Channel:**通道,**表示一个网络连接,负责数据的读写和网络操作。所有 I/O 操作都是通过 Channel 进行的。底层是抽象类,只能被子类实现

    4. ChannelPipeline:管道。处理数据的责任链,包含一系列处理器 -> 为Channel提供处理器。类似Stream,拦截流经Channel的进出事件

    5. ChannelHandler:管道的事件处理器。类似Filter处理 Channel 上各种事件。 常见处理器:HttpServerCodecHttpObjectAggregator

      用法:pipeline.addLast(new HttpServerCodec());

    6. ChannelFuture:异步操作结果容器,代表一个尚未完成的异步操作。提供查询操作状态的方法

    (isDone(), isSuccess())。I/O 操作都会立即返回一个 ChannelFuture

  • 实际应用

    现在要用异步处理WebSoket连接

    3.1 添加依赖

    <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.86.Final</version> <!-- 请使用最新稳定版 -->
    </dependency>

    3.2 创建服务器启动类-通常在websoket-netty目录下

    ①设置属性

    implements Runnable:异步,在单独线程中运行
    appConfig:加载配置文件,设置了WS的端口号
    handlerWebSocket:处理中心
    handlerTokenValidation:检验TOKEN
    事件调度器组
    bossGroup:处理连接 -> 老板,接收业务
    workerGroup:处理消息 -> 员工,处理业务
    @Component
    @Slf4j
    public class NettyWebSocketStarter implements Runnable {
    @Resource
    private AppConfig appConfig;
    @Resource
    private HandlerWebSocket handlerWebSocket;
    @Resource
    private HandlerTokenValidation handlerTokenValidation;
    private EventLoopGroup bossGroup = new NioEventLoopGroup();
    private EventLoopGroup workerGroup = new NioEventLoopGroup();
    package com.easymeeting.entity.config;
    @Component("appConfig")
    public class AppConfig {
    private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
    /**
    * websocket 端口
    */
    @Value("${ws.port:}")
    private Integer wsPort;
    public Integer getWsPort() {
    return wsPort;
    }
    }

    ③创建启动方法,先开店(ServerBootstrap),再接客(Channel)。channel创建后才会执行serverBootstrap.channel

    ServerBootstrap serverBootstrap:创建服务器启动引导类 -> 公司创业指导手册,告诉你怎么开这家快递公司
    serverBootstrap.group(bossGroup, workerGroup):设置线程组
    .channel(NioServerSocketChannel.class):设置通道类型 -> 选择用最高效的NIO运输通道
    .handler(new LoggingHandler(LogLevel.DEBUG)):处理服务器本身事件
    .childHandler(new ChannelInitializer<Channel>() {}:处理客户端连接事件
    ChannelInitializer<Channel>():匿名内部类,需要实现初始化initChannel
    ChannelPipeline:流水线传送带
    @Override
    public void run() {
    try {
    // 1、
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(bossGroup, workerGroup);
    serverBootstrap
    .channel(NioServerSocketChannel.class)
    .handler(new LoggingHandler(LogLevel.DEBUG))
    .childHandler(new ChannelInitializer<Channel>() {
    @Override
    protected void initChannel(Channel channel) {
    // 3、
    ChannelPipeline pipeline = channel.pipeline();
    核心,这里写处理逻辑
    }
    });
    // 2、
    Channel channel=serverBootstrap.bind(appConfig.getWsPort()).sync().channel();
    log.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort());
    // 阻塞主线程,防止服务器提前退出,等待关闭信号
    channel.closeFuture().sync();
    } catch (Exception e) {
    log.error("netty启动失败", e);
    // 优雅关闭,确保所有任务完成后再释放资源
    } finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }

    ④关闭方法

    @PreDestroy:Spring容器销毁前执行
    shutdownGracefully():处理完业务再关闭
    @PreDestroy
    public void close() {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }

    ⑤核心管道配置

    HttpServerCodec:解码器
    HttpObjectAggregator:聚合器
    IdleStateHandler :心跳监听 -> 看看连接断没断,下面要结合自己的类(HandlerHeartBeat)使用
    HandlerHeartBeat :处理心跳结果 -> 要继承“ChannelDuplexHandler”类,这是个Netty的超级Handler类,所有Handler方法都要继承
    .childHandler(new ChannelInitializer<Channel>() {
    @Override
    protected void initChannel(Channel channel) {
    ChannelPipeline pipeline = channel.pipeline();
    /**
    * 对http协议的支持,使用http的编码器,解码器
    * 通常作为第一个处理器添加
    * 必须在 HttpObjectAggregator 之前
    */
    pipeline.addLast(new HttpServerCodec());
    /**
    * 这是一个 HTTP 消息聚合器,主要功能是将分片的 HTTP 消息
    * (如 chunked 传输编码的消息)聚合成完整的 FullHttpRequest 或 FullHttpResponse。
    */
    pipeline.addLast(new HttpObjectAggregator(64 * 1024));
    /**
    * 检测连接空闲状态的处理器 会传递给下一个处理器
    * @param readerIdleTime 一段时间内未收到客户端数据。
    * @param writerIdleTime 一段时间内未向客户端发送数据。
    * @param allIdleTime -读和写均无活动
    */
    pipeline.addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));
    /**
    * 处理空闲事件
    */
    pipeline.addLast(new HandlerHeartBeat());
    /**
    * 拦截 channelRead 事件
    * TOKEN校验
    */
    pipeline.addLast(handlerTokenValidation);
    /**
    * WebSocket协议处理器(关键配置)39
    * websocketPath 指定 WebSocket 的端点路径
    * subprotocols 指定支持的子协议
    * allowExtensions 是否允许 WebSocket 扩展
    * maxFrameSize 设置最大帧大小 65536
    * allowMaskMismatch 是否允许掩码不匹配
    * checkStartsWith 是否严格检查路径开头
    * handshakeTimeoutMillis 握手超时时间(毫秒)
    */
    pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536, true, true, 10000L));
    pipeline.addLast(handlerWebSocket);
    }
    });

    ⑥自定的拦截

    package com.easymeeting.websocket.netty;
    import io.netty.channel.ChannelDuplexHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    import io.netty.util.Attribute;
    import io.netty.util.AttributeKey;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    public class HandlerHeartBeat extends ChannelDuplexHandler {
    private static final Logger logger = LoggerFactory.getLogger(HandlerHeartBeat.class);
    /**
    * 处理空闲事件,判断是否是 读取 空闲事件或 写入 空闲事件
    * @param ctx 上下文:ChannelHandlerContext
    * @param evt 事件:IdleStateEvent
    */
    // IdleStateEvent 是 Netty 提供的事件类,用于表示通道的空闲状态事件。
    // 当通道在指定的时间间隔内没有进行读取或写入操作时,就会触发空闲事件。
    // 这里的参数由 Netty 自动传入的
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    // 判断是否是空闲事件
    if (evt instanceof IdleStateEvent) {
    IdleStateEvent e = (IdleStateEvent) evt;
    // 判断是否是 读取 空闲事件
    if (e.state() == IdleState.READER_IDLE) {
    // 从channel的属性中获取用户id
    Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));
    String userId = attribute.get();
    // 关闭连接
    logger.info("用户{}没有发送心跳断开连接", userId);
    ctx.close();
    // 判断是否是 写入 空闲事件
    } else if (e.state() == IdleState.WRITER_IDLE) {
    // 发送心跳包
    ctx.writeAndFlush("heart");
    }
    }
    }
    }
    package com.easymeeting.websocket.netty;
    @ChannelHandler.Sharable
    @Component
    @Slf4j
    public class HandlerTokenValidation extends SimpleChannelInboundHandler<FullHttpRequest> {
    @Resource
    private ChannelContextUtils channelContextUtils;
    @Resource
    private RedisComponet redisComponet;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
    String uri = request.uri();
    QueryStringDecoder queryDecoder = new QueryStringDecoder(uri);
    List<String> tokens = queryDecoder.parameters().get("token");
    List<String> reconnect = queryDecoder.parameters().get("reconnect");
    if (tokens == null) {
    sendErrorResponse(ctx);
    return;
    }
    String token = tokens.get(0);
    //这里是优化后的重连机制,ws断开后会将用户的会议信息清除,所以重连后从临时信息中获取token信息
    TokenUserInfoDto tokenUserInfoDto = null;
    if (reconnect != null && Boolean.parseBoolean(reconnect.get(0))) {
    log.info("重连中。。。");
    //判断临时token信息
    tokenUserInfoDto = redisComponet.getTokenUserInfoDtoFromTemp(token);
    if (tokenUserInfoDto != null) {
    //如果临时缓存有,说明是重连,设置到正式缓存信息中
    redisComponet.saveTokenUserInfoDto(tokenUserInfoDto);
    }
    } else {
    tokenUserInfoDto = checkToken(token);
    }
    if (tokenUserInfoDto == null) {
    log.error("校验token失败:{}", token);
    sendErrorResponse(ctx);
    return;
    }
    // 如果需要转发消息 增加引用计数
    ctx.fireChannelRead(request.retain());
    //加入通道
    channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel());
    }
    private TokenUserInfoDto checkToken(String token) {
    if (StringTools.isEmpty(token)) {
    return null;
    }
    TokenUserInfoDto tokenUserInfoDto = redisComponet.getTokenUserInfoDto(token);
    return tokenUserInfoDto;
    }
    private void sendErrorResponse(ChannelHandlerContext ctx) {
    FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN, Unpooled.copiedBuffer("token无效", CharsetUtil.UTF_8));
    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
    response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
    ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }
    }
    package com.easymeeting.websocket.netty;
    /**
    * 设置通道共享
    */
    @ChannelHandler.Sharable
    @Component("handlerWebSocket")
    public class HandlerWebSocket extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private static final Logger logger = LoggerFactory.getLogger(HandlerWebSocket.class);
    @Resource
    private MeetingInfoService meetingInfoService;
    @Resource
    private RedisComponet redisComponet;
    @Resource
    private MessageHandler messageHandler;
    /**
    * 当通道就绪后会调用此方法,通常我们会在这里做一些初始化操作
    *
    * @param ctx
    * @throws Exception
    */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // Channel channel = ctx.channel();
    logger.info("有新的连接加入。。。");
    }
    /**
    * 当通道不再活跃时(连接关闭)会调用此方法,我们可以在这里做一些清理工作
    *
    * @param ctx
    * @throws Exception
    */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
    logger.info("有连接已经断开。。。");
    meetingInfoService.removeContext(ctx.channel());
    }
    /**
    * 读就绪事件 当有消息可读时会调用此方法,我们可以在这里读取消息并处理。
    *
    * @param ctx
    * @param textWebSocketFrame
    * @throws Exception
    */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) {
    //接收心跳
    //logger.info("收到消息:{}", textWebSocketFrame.text());
    String text = textWebSocketFrame.text();
    if (Constants.PING.equals(text)) {
    Channel channel = ctx.channel();
    Attribute<String> attribute = channel.attr(AttributeKey.valueOf(channel.id().toString()));
    String userId = attribute.get();
    redisComponet.saveUserHeartBeat(userId);
    return;
    }
    PeerConnectionDataDto dataDto = JsonUtils.convertJson2Obj(text, PeerConnectionDataDto.class);
    TokenUserInfoDto tokenUserInfoDto = redisComponet.getTokenUserInfoDto(dataDto.getToken());
    if (tokenUserInfoDto == null) {
    return;
    }
    MessageSendDto messageSendDto = new MessageSendDto();
    messageSendDto.setMessageType(MessageTypeEnum.PEER.getType());
    PeerMessageDto peerMessageDto = new PeerMessageDto();
    peerMessageDto.setSignalType(dataDto.getSignalType());
    peerMessageDto.setSignalData(dataDto.getSignalData());
    messageSendDto.setMessageContent(peerMessageDto);
    messageSendDto.setMeetingId(tokenUserInfoDto.getCurrentMeetingId());
    messageSendDto.setSendUserId(tokenUserInfoDto.getUserId());
    messageSendDto.setReceiveUserId(dataDto.getReceiveUserId());
    messageSendDto.setMessageSend2Type(MessageSend2TypeEnum.USER.getType());
    messageHandler.sendMessage(messageSendDto);
    }
    }

    ⑦完整实现

    package com.easymeeting.websocket.netty;
    /**
    * @Description: ws初始化类
    */
    @Component
    @Slf4j
    public class NettyWebSocketStarter implements Runnable {
    @Resource
    private AppConfig appConfig;
    @Resource
    private HandlerWebSocket handlerWebSocket;
    @Resource
    private HandlerTokenValidation handlerTokenValidation;
    /**
    * boss线程组,用于处理连接
    */
    private EventLoopGroup bossGroup = new NioEventLoopGroup();
    /**
    * work线程组,用于处理消息
    */
    private EventLoopGroup workerGroup = new NioEventLoopGroup();
    /**
    * 资源关闭 -> 在容器销毁时关闭线程组
    */
    @PreDestroy
    public void close() {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }
    @Override
    public void run() {
    try {
    //创建服务端启动助手
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    // 添加线程组
    serverBootstrap.group(bossGroup, workerGroup);
    // 添加通道工厂
    serverBootstrap.channel(NioServerSocketChannel.class)
    // 添加通道初 始化器
    .handler(new LoggingHandler(LogLevel.DEBUG))
    // 添加子通道初 始化器
    .childHandler(new ChannelInitializer<Channel>() {
    // 初始化子通道(处理器)
    @Override
    protected void initChannel(Channel channel) {
    ChannelPipeline pipeline = channel.pipeline();
    /**
    * 对http协议的支持,使用http的编码器,解码器
    * 通常作为第一个处理器添加
    * 必须在 HttpObjectAggregator 之前
    */
    pipeline.addLast(new HttpServerCodec());
    /**
    * 这是一个 HTTP 消息聚合器,主要功能是将分片的 HTTP 消息
    * (如 chunked 传输编码的消息)聚合成完整的 FullHttpRequest 或 FullHttpResponse。
    */
    pipeline.addLast(new HttpObjectAggregator(64 * 1024));
    /**
    * 检测连接空闲状态的处理器 会传递给下一个处理器
    * @param readerIdleTime 一段时间内未收到客户端数据。
    * @param writerIdleTime 一段时间内未向客户端发送数据。
    * @param allIdleTime -读和写均无活动
    */
    pipeline.addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));
    /** vcx
    * 处理空闲事件
    */
    pipeline.addLast(new HandlerHeartBeat());
    /**
    * 拦截 channelRead 事件
    * TOKEN校验
    */
    pipeline.addLast(handlerTokenValidation);
    /**
    * WebSocket协议处理器(关键配置)39
    * websocketPath 指定 WebSocket 的端点路径
    * subprotocols 指定支持的子协议
    * allowExtensions 是否允许 WebSocket 扩展
    * maxFrameSize 设置最大帧大小 65536
    * allowMaskMismatch 是否允许掩码不匹配
    * checkStartsWith 是否严格检查路径开头
    * handshakeTimeoutMillis 握手超时时间(毫秒)
    */
    pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536, true, true, 10000L));
    pipeline.addLast(handlerWebSocket);
    }
    });
    //启动
    Channel channel = serverBootstrap.bind(appConfig.getWsPort()).sync().channel();
    log.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort());
    channel.closeFuture().sync();
    } catch (Exception e) {
    log.error("netty启动失败", e);
    } finally {
    // 关闭线程组
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }
    }
    }

    3.3 创建另一个启动类,用于异步启动其它服务 -> 与启动类在同一目录,异步启动创建netty

    package com.easymeeting;
    /**
    * 异步初始化服务
    */
    @Component("initRun")
    public class InitRun implements ApplicationRunner {
    @Resource
    private NettyWebSocketStarter nettyWebSocketStarter;
    @Override
    public void run(ApplicationArguments args) {
    //启动Neety
    new Thread(nettyWebSocketStarter).start();
    }
    }
Netty在Java中的实际应用
https://zcnpjntqxpaw.feishu.cn/wiki/LB4owugcCiAMmekS9Q2c4hyknQd
作者
KafuSnow•卡夫雪
发布于
2025-09-01
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时