基于Spring Boot和Netty的实时聊天功能实现详解
本文发布于76 天前,其中的信息可能已经过时,如有错误请发送邮件到2394201457@qq.com

一、引言

在现代互联网应用中,实时聊天功能是一个重要的交互方式。本文将详细介绍如何基于Spring Boot和Netty框架实现一个高性能的实时聊天系统,支持单聊和群聊功能。

二、整体架构

该聊天系统采用Spring Boot作为后端框架,结合MyBatis-Plus进行数据库操作,使用Redis进行数据缓存,利用Netty框架实现WebSocket服务器,以支持实时通信。

三、核心功能实现

1. Netty服务器的启动与配置

@Component
public class NettyWebSocketStarter implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(NettyWebSocketStarter.class);
    private static EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private static EventLoopGroup workGroup = new NioEventLoopGroup();

    @Resource
    private HandlerWebSocket handlerWebSocket;

    @Resource
    private AppConfig appConfig;

    @PreDestroy
    public void close() {
        bossGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }

    @Override
    public void run() {
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup);
            serverBootstrap.channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(Channel channel) throws Exception {
                            ChannelPipeline pipeline = channel.pipeline();
                            // 设置几个重要的处理器
                            // 对http协议的支持,使用http的编码器,解码器
                            pipeline.addLast(new HttpServerCodec());
                            // 聚合解码 httpRequest/httpContent/lastHttpContent到fullHttpRequest
                            // 保证接收的http请求的完整性
                            pipeline.addLast(new HttpObjectAggregator(64 * 1024));
                            // 心跳 long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit
                            // readerIdleTime 读超时事件 即测试段一定事件内未接收到被测试段消息
                            // writerIdleTime 为写超时时间 即测试端一定时间内想被测试端发送消息
                            // allIdleTime 所有类型的超时时间
                            pipeline.addLast(new IdleStateHandler(6, 0, 0, TimeUnit.SECONDS));
                            pipeline.addLast(new HandlerHeartBeat());
                            //将http协议升级为ws协议 对websocket的支持
                            pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 64 * 1024, true, true, 10000L));
                            pipeline.addLast(handlerWebSocket);
                        }
                    });
            Integer wsPort = appConfig.getWsPort();
            String wsPortStr = System.getProperty("ws.port");
            if (!StringTools.isEmpty(wsPortStr)) {
                wsPort = Integer.parseInt(wsPortStr);
            }

            ChannelFuture channelFuture = serverBootstrap.bind(appConfig.getWsPort()).sync();
            logger.info("netty服务启动成功,端口: {}", appConfig.getWsPort());
            channelFuture.channel().closeFuture().sync();

        } catch (Exception e) {

            logger.error("启动netty失败", e);
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }


}

1. NettyWebSocketStarter 

作用:Netty服务端启动器,负责初始化Netty线程组、配置协议处理链并绑定端口。

  • 通过@Async异步启动独立的Netty线程,避免阻塞Spring Boot主线程。
  • 配置bossGroupworkGroup线程组,分别处理连接请求和I/O操作。
  • 构建协议处理链:支持HTTP升级为WebSocket、心跳检测(60秒读超时)、消息聚合、业务处理器等。
  • 通过@PreDestroy注解实现优雅关闭,释放Netty线程资源。

2. WebSocket连接处理

HandlerWebSocket类处理WebSocket连接的生命周期事件

@Component
@ChannelHandler.Sharable
public class HandlerWebSocket extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private static final Logger logger = LoggerFactory.getLogger(HandlerWebSocket.class);

    @Resource
    private RedisComponent redisComponent;
    @Resource
    private ChannelContextUtils channelContextUtils;

    /**
     * 通道就绪后 调用,一般用来做初始化
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        logger.info("有新的连接加入.....");

    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        logger.info("有连接断开.....");
        //清除掉存入的内存信息
        channelContextUtils.removeContext(ctx.channel());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
        Channel channel = ctx.channel();
        Attribute<String> attribute = channel.attr(AttributeKey.valueOf(channel.id().toString()));
        String userId = attribute.get();
        //logger.info("接收UserId{}的消息:{}", userId, textWebSocketFrame.text());

        redisComponent.saveUserHeartBeat(userId); //保存心跳

    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
            String url = complete.requestUri();
            String token = getToken(url);
            if (token == null) {
                ctx.channel().close();
            }

            TokenUserInfoDto tokenUserInfoDto = redisComponent.getTokenUserInfoDto(token);
            if (tokenUserInfoDto == null) {
                ctx.channel().close();
            }

            channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel());
        }

    }

作用:WebSocket核心业务处理器,管理连接生命周期与消息路由。

  • 继承SimpleChannelInboundHandler,处理TextWebSocketFrame文本消息。
  • 连接建立:完成WebSocket握手后,从URL提取Token验证用户身份,调用ChannelContextUtils绑定用户与Channel。
  • 消息接收:转发消息到Redis保存心跳(redisComponent.saveUserHeartBeat)。
  • 连接断开:清理内存中的Channel信息,更新用户离线时间。

3. 心跳检测

HandlerHeartBeat类通过IdleStateHandler维护连接活性

public class HandlerHeartBeat extends ChannelDuplexHandler {
private static final Logger logger = LoggerFactory.getLogger(HandlerHeartBeat.class);

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
Channel channel = ctx.channel();
Attribute<String> attribute = channel.attr(AttributeKey.valueOf(channel.id().toString()));
String userId = attribute.get();
logger.info("用户{}心跳超时:",userId);

ctx.close();
} else if (e.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush("heart");
}
}
}
}

作用:心跳检测处理器,维护长连接活性。

  • 继承ChannelDuplexHandler,监听IdleStateEvent事件。
  • 处理读空闲(READER_IDLE):若60秒内无读操作,关闭连接并记录日志。
  • 处理写空闲(WRITER_IDLE):发送"heart"消息保活(客户端持续发送心跳)。
  • 通过AttributeKey绑定用户ID到Channel,实现连接与用户的关联。

4. 消息处理与分发

MessageHandler类负责消息的分发

@Component("messageHandler")
public class MessageHandler {

private static final Logger logger = LoggerFactory.getLogger(MessageHandler.class);
private static final String MESSAGE_TOPIC = "message.topic";

@Resource
private RedissonClient redissonClient;

@Resource
private ChannelContextUtils channelContextUtils; //发消息的

@PostConstruct
public void lisMessage() {
RTopic rTopic = redissonClient.getTopic(MESSAGE_TOPIC);
rTopic.addListener(MessageSendDto.class, (MessageSendDto, sendDto) -> {
logger.info("收到广播消息:{}", JsonUtils.convertObj2Json(sendDto));
channelContextUtils.sendMessage(sendDto);
});
}

public void sendMessage(MessageSendDto messageSendDto) {
//往广播Topic发消息
RTopic rTopic = redissonClient.getTopic(MESSAGE_TOPIC);
rTopic.publish(messageSendDto);
}
}

作用:消息分发中心,基于Redis实现跨服务消息路由。

  • 使用Redis的发布订阅(Pub/Sub)模式,监听message.topic主题。
  • 消息发布:调用sendMessage()将消息广播到Redis Topic。
  • 消息订阅:通过RTopic.addListener()接收消息,调用ChannelContextUtils将消息推送到指定用户或群组。
  • 解耦业务逻辑与Netty底层通信,支持分布式扩展

ChannelContextUtils类管理客户端连接和消息发送

作用:全局Channel管理器,维护用户/群组与Netty Channel的映射关系。

  • 数据结构
    • USER_CONTEXT_MAP:用户ID到Channel的映射(单聊)。
    • GROUP_CONTEXT_MAP:群组ID到ChannelGroup的映射(群聊)。
  • 核心功能
    • 用户登录:绑定用户Channel,加载历史会话和消息。
    • 群组管理:用户加入/退出群组时更新ChannelGroup
    • 消息路由
      • 单聊:通过USER_CONTEXT_MAP找到目标用户Channel。
      • 群聊:通过GROUP_CONTEXT_MAP获取群组ChannelGroup广播消息。
    • 连接清理:用户离线时移除Channel并更新数据库状态。

4. InitRun

@Component("initRun")
public class InitRun implements ApplicationRunner {
    private static final Logger logger = LoggerFactory.getLogger(InitRun.class);

    @Resource
    private DataSource dataSource;

    @Resource
    private RedisUtils redisUtils;

    @Resource
    private NettyWebSocketStarter nettyWebSocketStarter;
    @Override
    public void run(ApplicationArguments args) {
        try {
            dataSource.getConnection();
            redisUtils.get("test");
          new Thread(nettyWebSocketStarter).start();
        logger.error("服务启动成功,可以开始愉快的开发了");
        } catch (SQLException e) {
            logger.error("数据库配置错误,请检查数据库配置");
        } catch (Exception e) {
            logger.error("服务启动失败", e);
        }
    }
}

作用:Spring Boot启动初始化类,集成Netty到Spring生态。

  • 实现ApplicationRunner接口,在Spring Boot启动后执行初始化逻辑。
  • 检查数据库和Redis连接可用性。
  • 启动Netty服务端线程(new Thread(nettyWebSocketStarter).start())。
  • 统一管理服务启动状态,输出关键日志。

协作关系示意图

Spring Boot启动 → InitRun
  │
  ├─ 启动NettyWebSocketStarter → 监听WebSocket连接
  │   ├─ HandlerWebSocket处理握手/消息 → 调用ChannelContextUtils
  │   └─ HandlerHeartBeat检测连接活性
  │
  └─ MessageHandler接收业务层消息 → 通过Redis广播 → ChannelContextUtils路由到用户/群组

关键设计总结

  1. 分层解耦
    • Netty处理底层通信(NettyWebSocketStarterHandlerWebSocket)。
    • Spring Boot管理生命周期(InitRun)。
    • Redis实现跨服务消息分发(MessageHandler)。
  2. 群聊实现
    • 通过ChannelGroup管理群组成员,消息广播效率高。
    • 用户登录时自动加入关联群组(ChannelContextUtils.addContext)。
  3. 扩展性
    • 使用ConcurrentHashMapChannelGroup支持高并发。
    • Redis Pub/Sub机制便于横向扩展多实例服务。

转载请注明文章地址及作者哦~
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇