@Component @RequiredArgsConstructor public class WebSocketServer implements ApplicationRunner, DisposableBean { @Value("${netty.port}") private int port; private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup workerGroup = new NioEventLoopGroup(); private final ServerBootstrap b =new ServerBootstrap(); private final Map<String, Channel> map = new HashMap<>(); @Override public void destroy() { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } @Override public void run(ApplicationArguments args) throws Exception { b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new WebSocketServerProtocolHandler("/ws", true, 3000L)); pipeline.addLast(new WebSocketFrameHandler(map)); } }); ChannelFuture future = b.bind(port).sync(); System.out.println("WebSocket server started on port " + port); future.channel().closeFuture().sync(); } } @Slf4j public class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> { private final Map<String, Channel> map; public static final AttributeKey<String> URI_ATTRIBUTE_KEY = AttributeKey.valueOf("URI"); public WebSocketFrameHandler(Map<String, Channel> map) { this.map = map; } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt; Channel currentChannel = ctx.channel(); String uri = handshakeComplete.requestUri(); currentChannel.attr(URI_ATTRIBUTE_KEY).set(uri); Channel exist = map.get(uri); if (Objects.isNull(exist)) { map.put(uri, currentChannel); log.info(new Message(uri, currentChannel.id().toString(), 1).toString()); } else { //无效 exist.close().sync();//明确在 close 执行完成后在重新上线,但是好像没生效一样,直接就输出上线! 1= 上线,0= 下线 map.put(uri, currentChannel); log.info(new Message(uri, currentChannel.id().toString(), 1).toString()); //无效 //exist.close().addListener(future -> log.info(new Message(uri, currentChannel.id().toString(), 1).toString())); //无效 //exist.close().sync().addListener(future -> log.info(new Message(uri, currentChannel.id().toString(), 1).toString())); } } else { super.userEventTriggered(ctx, evt); } } @Override public void channelInactive(ChannelHandlerContext ctx) { Channel channel = ctx.channel(); String uri = channel.attr(URI_ATTRIBUTE_KEY).get(); log.info(new Message(uri, channel.id().toString(), 0).toString()); } }
日志输出:
2025-09-21 03:16:54.669 INFO 30008 --- [ntLoopGroup-5-1] org.example.WebSocketFrameHandler : Message{uri='/ws/abc_123', sessiOnId='86aeadad', status=上线} 2025-09-21 03:16:58.837 INFO 30008 --- [ntLoopGroup-5-2] org.example.WebSocketFrameHandler : Message{uri='/ws/abc_123', sessiOnId='fa3a5fbb', status=上线} 2025-09-21 03:16:58.838 INFO 30008 --- [ntLoopGroup-5-1] org.example.WebSocketFrameHandler : Message{uri='/ws/abc_123', sessiOnId='86aeadad', status=下线}
我用 netty 提供的 websocket 服务,想达到新设备踢出老设备的在线状态, 但是控制不好状态
预期的效果:
日志输出:
但是现在看结果是错误的顺序。
想在这个服务里保证状态的正确,我尝试使用自定义 eventGroup 并且设置线程数 1 来执行代码也不行,也尝试跟 ai 沟通了半宿也研究明白
netty-all 的版本是 4.2.6.Final
1 hsiafan 21 天前 ![]() Channel 的 close 应该只是会 fire 一个 ChannelInactive 的 event ,不会直接调用 channelInactive 这个方法。channelInactive 要由 eventloop 再去找对应 ChannelPipeline 来调用,这时候已经晚了。 你这个代码只是日志打印的顺序看起来错乱而已,该 close 的 Channel 实际已经 close 了。 |
![]() | 2 zizon 21 天前 你把事情放到 exist.close()的回调里去做呗. |
3 moverinfo 20 天前 via iPhone 应该是生效的,只是日志有按预期打印。 |
![]() | 4 mightofcode 20 天前 netty 的代码真的就是一坨 |
![]() | 5 changlin1024 19 天前 exist.close().addListener(future -> { map.put(uri, currentChannel); log.info(new Message(uri, currentChannel.id().toString(), 1).toString()); }); |
![]() | 6 DreamLu 18 天前 我一个业务要求高性能 http api ,用 netty ,调半天上不去,换 undertow-core 直逼 10W/s ,都不用怎么调。 |