Netty
是什么:一个网络编程框架(基于Java NIO 事件驱动 异步) 异步:Netty 的所有 I/O 操作均为异步 -> 当调用一个方法时,它立即返回一个
ChannelFuture对象,而操作本身在后台执行。可以通过这个 Future 对象来获取操作完成的通知或等待其完成。事件驱动:“当数据来了通知我” Netty基于 事件模型 来处理网络事件 -> 当某个事件发生时(如连接建立、数据到达、异常抛出),Netty自动触发相应的事件,调用预先注册的处理器进行处理。这种模型非常高效,可以处理成千上万的并发连接。NIO:Netty的底层完全依赖于 Java NIO -> Java NIO(New Input/Output)是一个I/O API,它提供了一种基于缓冲区、非阻塞的I/O操作方式,NIO高效地处理大量的并发连接和数据读写操作。
**非阻塞I/O操作:先返回结果,再工作 -> **当任务来了,它立马告诉它知道了,然后开始工作
**阻塞I/O操作:**传统的I/O模式,线程必须等待操作完成
责任链模式:
ChannelPipeline是责任链模式的完美体现。数据进出会像在一条流水线上一样,依次经过多个ChannelHandler的处理。每个 Handler 只关心自己负责的逻辑(如解码、日志、业务处理),实现了功能的解耦和高度可定制化。作用:让程序在高并发下依然又快又稳
怎么用:搭建Websokect连接
- 关键组件
-
EventLoopGroup和EventLoop:类似线程组和线程,
EventLoopGroup分配EventLoop来处理新的连接 和 已有的连接中的事件。一般会创建BossGroup和WorkerGroup两个EventLoopGroup。 **EventLoop:事件循环(I/O事件调度器) -> **一个不断循环的执行体,处理绑定的Channel。BossGroup&WorkerGroup:一个负责接收客户端的连接。另一个处理已被接受的连接的 I/O 操作。
-
ServerBootstrap:服务器端的组装和启动类,负责将所有核心组件组合在一起并启动服务器。
-
Channel:**通道,**表示一个网络连接,负责数据的读写和网络操作。所有 I/O 操作都是通过 Channel 进行的。底层是抽象类,只能被子类实现
-
ChannelPipeline:管道。处理数据的责任链,包含一系列处理器 -> 为Channel提供处理器。类似Stream,拦截流经Channel的进出事件
-
ChannelHandler:管道的事件处理器。类似Filter处理 Channel 上各种事件。 常见处理器:
HttpServerCodec,HttpObjectAggregator用法:pipeline.addLast(new HttpServerCodec());
-
ChannelFuture:异步操作结果容器,代表一个尚未完成的异步操作。提供查询操作状态的方法
(isDone(), isSuccess())。I/O 操作都会立即返回一个
ChannelFuture -
- 实际应用
现在要用异步处理WebSoket连接
3.1 添加依赖
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.86.Final</version> <!-- 请使用最新稳定版 --></dependency>3.2 创建服务器启动类-通常在websoket-netty目录下
①设置属性
implements Runnable:异步,在单独线程中运行appConfig:加载配置文件,设置了WS的端口号handlerWebSocket:处理中心handlerTokenValidation:检验TOKEN事件调度器组bossGroup:处理连接 -> 老板,接收业务workerGroup:处理消息 -> 员工,处理业务@Component@Slf4jpublic class NettyWebSocketStarter implements Runnable {@Resourceprivate AppConfig appConfig;@Resourceprivate HandlerWebSocket handlerWebSocket;@Resourceprivate HandlerTokenValidation handlerTokenValidation;private EventLoopGroup bossGroup = new NioEventLoopGroup();private EventLoopGroup workerGroup = new NioEventLoopGroup();package com.easymeeting.entity.config;@Component("appConfig")public class AppConfig {private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);/*** websocket 端口*/@Value("${ws.port:}")private Integer wsPort;public Integer getWsPort() {return wsPort;}}③创建启动方法,先开店(ServerBootstrap),再接客(Channel)。channel创建后才会执行serverBootstrap.channel
ServerBootstrap serverBootstrap:创建服务器启动引导类 -> 公司创业指导手册,告诉你怎么开这家快递公司serverBootstrap.group(bossGroup, workerGroup):设置线程组.channel(NioServerSocketChannel.class):设置通道类型 -> 选择用最高效的NIO运输通道.handler(new LoggingHandler(LogLevel.DEBUG)):处理服务器本身事件.childHandler(new ChannelInitializer<Channel>() {}:处理客户端连接事件ChannelInitializer<Channel>():匿名内部类,需要实现初始化initChannelChannelPipeline:流水线传送带@Overridepublic void run() {try {// 1、ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup);serverBootstrap.channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) {// 3、ChannelPipeline pipeline = channel.pipeline();核心,这里写处理逻辑}});// 2、Channel channel=serverBootstrap.bind(appConfig.getWsPort()).sync().channel();log.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort());// 阻塞主线程,防止服务器提前退出,等待关闭信号channel.closeFuture().sync();} catch (Exception e) {log.error("netty启动失败", e);// 优雅关闭,确保所有任务完成后再释放资源} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}④关闭方法
@PreDestroy:Spring容器销毁前执行shutdownGracefully():处理完业务再关闭@PreDestroypublic void close() {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}⑤核心管道配置
HttpServerCodec:解码器HttpObjectAggregator:聚合器IdleStateHandler :心跳监听 -> 看看连接断没断,下面要结合自己的类(HandlerHeartBeat)使用HandlerHeartBeat :处理心跳结果 -> 要继承“ChannelDuplexHandler”类,这是个Netty的超级Handler类,所有Handler方法都要继承.childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) {ChannelPipeline pipeline = channel.pipeline();/*** 对http协议的支持,使用http的编码器,解码器* 通常作为第一个处理器添加* 必须在 HttpObjectAggregator 之前*/pipeline.addLast(new HttpServerCodec());/*** 这是一个 HTTP 消息聚合器,主要功能是将分片的 HTTP 消息* (如 chunked 传输编码的消息)聚合成完整的 FullHttpRequest 或 FullHttpResponse。*/pipeline.addLast(new HttpObjectAggregator(64 * 1024));/*** 检测连接空闲状态的处理器 会传递给下一个处理器* @param readerIdleTime 一段时间内未收到客户端数据。* @param writerIdleTime 一段时间内未向客户端发送数据。* @param allIdleTime -读和写均无活动*/pipeline.addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));/*** 处理空闲事件*/pipeline.addLast(new HandlerHeartBeat());/*** 拦截 channelRead 事件* TOKEN校验*/pipeline.addLast(handlerTokenValidation);/*** WebSocket协议处理器(关键配置)39* websocketPath 指定 WebSocket 的端点路径* subprotocols 指定支持的子协议* allowExtensions 是否允许 WebSocket 扩展* maxFrameSize 设置最大帧大小 65536* allowMaskMismatch 是否允许掩码不匹配* checkStartsWith 是否严格检查路径开头* handshakeTimeoutMillis 握手超时时间(毫秒)*/pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536, true, true, 10000L));pipeline.addLast(handlerWebSocket);}});⑥自定的拦截
package com.easymeeting.websocket.netty;import io.netty.channel.ChannelDuplexHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.timeout.IdleState;import io.netty.handler.timeout.IdleStateEvent;import io.netty.util.Attribute;import io.netty.util.AttributeKey;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class HandlerHeartBeat extends ChannelDuplexHandler {private static final Logger logger = LoggerFactory.getLogger(HandlerHeartBeat.class);/*** 处理空闲事件,判断是否是 读取 空闲事件或 写入 空闲事件* @param ctx 上下文:ChannelHandlerContext* @param evt 事件:IdleStateEvent*/// IdleStateEvent 是 Netty 提供的事件类,用于表示通道的空闲状态事件。// 当通道在指定的时间间隔内没有进行读取或写入操作时,就会触发空闲事件。// 这里的参数由 Netty 自动传入的@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// 判断是否是空闲事件if (evt instanceof IdleStateEvent) {IdleStateEvent e = (IdleStateEvent) evt;// 判断是否是 读取 空闲事件if (e.state() == IdleState.READER_IDLE) {// 从channel的属性中获取用户idAttribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));String userId = attribute.get();// 关闭连接logger.info("用户{}没有发送心跳断开连接", userId);ctx.close();// 判断是否是 写入 空闲事件} else if (e.state() == IdleState.WRITER_IDLE) {// 发送心跳包ctx.writeAndFlush("heart");}}}}package com.easymeeting.websocket.netty;@ChannelHandler.Sharable@Component@Slf4jpublic class HandlerTokenValidation extends SimpleChannelInboundHandler<FullHttpRequest> {@Resourceprivate ChannelContextUtils channelContextUtils;@Resourceprivate RedisComponet redisComponet;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {String uri = request.uri();QueryStringDecoder queryDecoder = new QueryStringDecoder(uri);List<String> tokens = queryDecoder.parameters().get("token");List<String> reconnect = queryDecoder.parameters().get("reconnect");if (tokens == null) {sendErrorResponse(ctx);return;}String token = tokens.get(0);//这里是优化后的重连机制,ws断开后会将用户的会议信息清除,所以重连后从临时信息中获取token信息TokenUserInfoDto tokenUserInfoDto = null;if (reconnect != null && Boolean.parseBoolean(reconnect.get(0))) {log.info("重连中。。。");//判断临时token信息tokenUserInfoDto = redisComponet.getTokenUserInfoDtoFromTemp(token);if (tokenUserInfoDto != null) {//如果临时缓存有,说明是重连,设置到正式缓存信息中redisComponet.saveTokenUserInfoDto(tokenUserInfoDto);}} else {tokenUserInfoDto = checkToken(token);}if (tokenUserInfoDto == null) {log.error("校验token失败:{}", token);sendErrorResponse(ctx);return;}// 如果需要转发消息 增加引用计数ctx.fireChannelRead(request.retain());//加入通道channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel());}private TokenUserInfoDto checkToken(String token) {if (StringTools.isEmpty(token)) {return null;}TokenUserInfoDto tokenUserInfoDto = redisComponet.getTokenUserInfoDto(token);return tokenUserInfoDto;}private void sendErrorResponse(ChannelHandlerContext ctx) {FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN, Unpooled.copiedBuffer("token无效", CharsetUtil.UTF_8));response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}}package com.easymeeting.websocket.netty;/*** 设置通道共享*/@ChannelHandler.Sharable@Component("handlerWebSocket")public class HandlerWebSocket extends SimpleChannelInboundHandler<TextWebSocketFrame> {private static final Logger logger = LoggerFactory.getLogger(HandlerWebSocket.class);@Resourceprivate MeetingInfoService meetingInfoService;@Resourceprivate RedisComponet redisComponet;@Resourceprivate MessageHandler messageHandler;/*** 当通道就绪后会调用此方法,通常我们会在这里做一些初始化操作** @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// Channel channel = ctx.channel();logger.info("有新的连接加入。。。");}/*** 当通道不再活跃时(连接关闭)会调用此方法,我们可以在这里做一些清理工作** @param ctx* @throws Exception*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) {logger.info("有连接已经断开。。。");meetingInfoService.removeContext(ctx.channel());}/*** 读就绪事件 当有消息可读时会调用此方法,我们可以在这里读取消息并处理。** @param ctx* @param textWebSocketFrame* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) {//接收心跳//logger.info("收到消息:{}", textWebSocketFrame.text());String text = textWebSocketFrame.text();if (Constants.PING.equals(text)) {Channel channel = ctx.channel();Attribute<String> attribute = channel.attr(AttributeKey.valueOf(channel.id().toString()));String userId = attribute.get();redisComponet.saveUserHeartBeat(userId);return;}PeerConnectionDataDto dataDto = JsonUtils.convertJson2Obj(text, PeerConnectionDataDto.class);TokenUserInfoDto tokenUserInfoDto = redisComponet.getTokenUserInfoDto(dataDto.getToken());if (tokenUserInfoDto == null) {return;}MessageSendDto messageSendDto = new MessageSendDto();messageSendDto.setMessageType(MessageTypeEnum.PEER.getType());PeerMessageDto peerMessageDto = new PeerMessageDto();peerMessageDto.setSignalType(dataDto.getSignalType());peerMessageDto.setSignalData(dataDto.getSignalData());messageSendDto.setMessageContent(peerMessageDto);messageSendDto.setMeetingId(tokenUserInfoDto.getCurrentMeetingId());messageSendDto.setSendUserId(tokenUserInfoDto.getUserId());messageSendDto.setReceiveUserId(dataDto.getReceiveUserId());messageSendDto.setMessageSend2Type(MessageSend2TypeEnum.USER.getType());messageHandler.sendMessage(messageSendDto);}}⑦完整实现
package com.easymeeting.websocket.netty;/*** @Description: ws初始化类*/@Component@Slf4jpublic class NettyWebSocketStarter implements Runnable {@Resourceprivate AppConfig appConfig;@Resourceprivate HandlerWebSocket handlerWebSocket;@Resourceprivate HandlerTokenValidation handlerTokenValidation;/*** boss线程组,用于处理连接*/private EventLoopGroup bossGroup = new NioEventLoopGroup();/*** work线程组,用于处理消息*/private EventLoopGroup workerGroup = new NioEventLoopGroup();/*** 资源关闭 -> 在容器销毁时关闭线程组*/@PreDestroypublic void close() {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}@Overridepublic void run() {try {//创建服务端启动助手ServerBootstrap serverBootstrap = new ServerBootstrap();// 添加线程组serverBootstrap.group(bossGroup, workerGroup);// 添加通道工厂serverBootstrap.channel(NioServerSocketChannel.class)// 添加通道初 始化器.handler(new LoggingHandler(LogLevel.DEBUG))// 添加子通道初 始化器.childHandler(new ChannelInitializer<Channel>() {// 初始化子通道(处理器)@Overrideprotected void initChannel(Channel channel) {ChannelPipeline pipeline = channel.pipeline();/*** 对http协议的支持,使用http的编码器,解码器* 通常作为第一个处理器添加* 必须在 HttpObjectAggregator 之前*/pipeline.addLast(new HttpServerCodec());/*** 这是一个 HTTP 消息聚合器,主要功能是将分片的 HTTP 消息* (如 chunked 传输编码的消息)聚合成完整的 FullHttpRequest 或 FullHttpResponse。*/pipeline.addLast(new HttpObjectAggregator(64 * 1024));/*** 检测连接空闲状态的处理器 会传递给下一个处理器* @param readerIdleTime 一段时间内未收到客户端数据。* @param writerIdleTime 一段时间内未向客户端发送数据。* @param allIdleTime -读和写均无活动*/pipeline.addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));/** vcx* 处理空闲事件*/pipeline.addLast(new HandlerHeartBeat());/*** 拦截 channelRead 事件* TOKEN校验*/pipeline.addLast(handlerTokenValidation);/*** WebSocket协议处理器(关键配置)39* websocketPath 指定 WebSocket 的端点路径* subprotocols 指定支持的子协议* allowExtensions 是否允许 WebSocket 扩展* maxFrameSize 设置最大帧大小 65536* allowMaskMismatch 是否允许掩码不匹配* checkStartsWith 是否严格检查路径开头* handshakeTimeoutMillis 握手超时时间(毫秒)*/pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536, true, true, 10000L));pipeline.addLast(handlerWebSocket);}});//启动Channel channel = serverBootstrap.bind(appConfig.getWsPort()).sync().channel();log.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort());channel.closeFuture().sync();} catch (Exception e) {log.error("netty启动失败", e);} finally {// 关闭线程组bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}3.3 创建另一个启动类,用于异步启动其它服务 -> 与启动类在同一目录,异步启动创建netty
package com.easymeeting;/*** 异步初始化服务*/@Component("initRun")public class InitRun implements ApplicationRunner {@Resourceprivate NettyWebSocketStarter nettyWebSocketStarter;@Overridepublic void run(ApplicationArguments args) {//启动Neetynew Thread(nettyWebSocketStarter).start();}}
部分信息可能已经过时






