本文发布于77 天前,其中的信息可能已经过时,如有错误请发送邮件到2394201457@qq.com
一、引言
在现代互联网应用中,实时聊天功能是一个重要的交互方式。本文将详细介绍如何基于Spring Boot和Netty框架实现一个高性能的实时聊天系统,支持单聊和群聊功能。
二、整体架构
该聊天系统采用Spring Boot作为后端框架,结合MyBatis-Plus进行数据库操作,使用Redis进行数据缓存,利用Netty框架实现WebSocket服务器,以支持实时通信。
三、核心功能实现
1. Netty服务器的启动与配置
@Component
public class NettyWebSocketStarter implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(NettyWebSocketStarter.class);
private static EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private static EventLoopGroup workGroup = new NioEventLoopGroup();
@Resource
private HandlerWebSocket handlerWebSocket;
@Resource
private AppConfig appConfig;
@PreDestroy
public void close() {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
@Override
public void run() {
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup);
serverBootstrap.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// 设置几个重要的处理器
// 对http协议的支持,使用http的编码器,解码器
pipeline.addLast(new HttpServerCodec());
// 聚合解码 httpRequest/httpContent/lastHttpContent到fullHttpRequest
// 保证接收的http请求的完整性
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
// 心跳 long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit
// readerIdleTime 读超时事件 即测试段一定事件内未接收到被测试段消息
// writerIdleTime 为写超时时间 即测试端一定时间内想被测试端发送消息
// allIdleTime 所有类型的超时时间
pipeline.addLast(new IdleStateHandler(6, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new HandlerHeartBeat());
//将http协议升级为ws协议 对websocket的支持
pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 64 * 1024, true, true, 10000L));
pipeline.addLast(handlerWebSocket);
}
});
Integer wsPort = appConfig.getWsPort();
String wsPortStr = System.getProperty("ws.port");
if (!StringTools.isEmpty(wsPortStr)) {
wsPort = Integer.parseInt(wsPortStr);
}
ChannelFuture channelFuture = serverBootstrap.bind(appConfig.getWsPort()).sync();
logger.info("netty服务启动成功,端口: {}", appConfig.getWsPort());
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("启动netty失败", e);
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
1. NettyWebSocketStarter
作用:Netty服务端启动器,负责初始化Netty线程组、配置协议处理链并绑定端口。
- 通过
@Async
异步启动独立的Netty线程,避免阻塞Spring Boot主线程。 - 配置
bossGroup
和workGroup
线程组,分别处理连接请求和I/O操作。 - 构建协议处理链:支持HTTP升级为WebSocket、心跳检测(60秒读超时)、消息聚合、业务处理器等。
- 通过
@PreDestroy
注解实现优雅关闭,释放Netty线程资源。
2. WebSocket连接处理
HandlerWebSocket
类处理WebSocket连接的生命周期事件
@Component
@ChannelHandler.Sharable
public class HandlerWebSocket extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final Logger logger = LoggerFactory.getLogger(HandlerWebSocket.class);
@Resource
private RedisComponent redisComponent;
@Resource
private ChannelContextUtils channelContextUtils;
/**
* 通道就绪后 调用,一般用来做初始化
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("有新的连接加入.....");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("有连接断开.....");
//清除掉存入的内存信息
channelContextUtils.removeContext(ctx.channel());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
Channel channel = ctx.channel();
Attribute<String> attribute = channel.attr(AttributeKey.valueOf(channel.id().toString()));
String userId = attribute.get();
//logger.info("接收UserId{}的消息:{}", userId, textWebSocketFrame.text());
redisComponent.saveUserHeartBeat(userId); //保存心跳
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
String url = complete.requestUri();
String token = getToken(url);
if (token == null) {
ctx.channel().close();
}
TokenUserInfoDto tokenUserInfoDto = redisComponent.getTokenUserInfoDto(token);
if (tokenUserInfoDto == null) {
ctx.channel().close();
}
channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel());
}
}
作用:WebSocket核心业务处理器,管理连接生命周期与消息路由。
- 继承
SimpleChannelInboundHandler
,处理TextWebSocketFrame
文本消息。 - 连接建立:完成WebSocket握手后,从URL提取Token验证用户身份,调用
ChannelContextUtils
绑定用户与Channel。 - 消息接收:转发消息到Redis保存心跳(
redisComponent.saveUserHeartBeat
)。 - 连接断开:清理内存中的Channel信息,更新用户离线时间。
3. 心跳检测
HandlerHeartBeat
类通过IdleStateHandler
维护连接活性
public class HandlerHeartBeat extends ChannelDuplexHandler {
private static final Logger logger = LoggerFactory.getLogger(HandlerHeartBeat.class);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
Channel channel = ctx.channel();
Attribute<String> attribute = channel.attr(AttributeKey.valueOf(channel.id().toString()));
String userId = attribute.get();
logger.info("用户{}心跳超时:",userId);
ctx.close();
} else if (e.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush("heart");
}
}
}
}
作用:心跳检测处理器,维护长连接活性。
- 继承
ChannelDuplexHandler
,监听IdleStateEvent
事件。 - 处理读空闲(
READER_IDLE
):若60秒内无读操作,关闭连接并记录日志。 - 处理写空闲(
WRITER_IDLE
):发送"heart"
消息保活(客户端持续发送心跳)。 - 通过
AttributeKey
绑定用户ID到Channel,实现连接与用户的关联。
4. 消息处理与分发
MessageHandler
类负责消息的分发
@Component("messageHandler")
public class MessageHandler {
private static final Logger logger = LoggerFactory.getLogger(MessageHandler.class);
private static final String MESSAGE_TOPIC = "message.topic";
@Resource
private RedissonClient redissonClient;
@Resource
private ChannelContextUtils channelContextUtils; //发消息的
@PostConstruct
public void lisMessage() {
RTopic rTopic = redissonClient.getTopic(MESSAGE_TOPIC);
rTopic.addListener(MessageSendDto.class, (MessageSendDto, sendDto) -> {
logger.info("收到广播消息:{}", JsonUtils.convertObj2Json(sendDto));
channelContextUtils.sendMessage(sendDto);
});
}
public void sendMessage(MessageSendDto messageSendDto) {
//往广播Topic发消息
RTopic rTopic = redissonClient.getTopic(MESSAGE_TOPIC);
rTopic.publish(messageSendDto);
}
}
作用:消息分发中心,基于Redis实现跨服务消息路由。
- 使用Redis的发布订阅(Pub/Sub)模式,监听
message.topic
主题。 - 消息发布:调用
sendMessage()
将消息广播到Redis Topic。 - 消息订阅:通过
RTopic.addListener()
接收消息,调用ChannelContextUtils
将消息推送到指定用户或群组。 - 解耦业务逻辑与Netty底层通信,支持分布式扩展
ChannelContextUtils
类管理客户端连接和消息发送
作用:全局Channel管理器,维护用户/群组与Netty Channel的映射关系。
- 数据结构:
USER_CONTEXT_MAP
:用户ID到Channel的映射(单聊)。GROUP_CONTEXT_MAP
:群组ID到ChannelGroup
的映射(群聊)。
- 核心功能:
- 用户登录:绑定用户Channel,加载历史会话和消息。
- 群组管理:用户加入/退出群组时更新
ChannelGroup
。 - 消息路由:
- 单聊:通过
USER_CONTEXT_MAP
找到目标用户Channel。 - 群聊:通过
GROUP_CONTEXT_MAP
获取群组ChannelGroup
广播消息。
- 单聊:通过
- 连接清理:用户离线时移除Channel并更新数据库状态。
4. InitRun
@Component("initRun")
public class InitRun implements ApplicationRunner {
private static final Logger logger = LoggerFactory.getLogger(InitRun.class);
@Resource
private DataSource dataSource;
@Resource
private RedisUtils redisUtils;
@Resource
private NettyWebSocketStarter nettyWebSocketStarter;
@Override
public void run(ApplicationArguments args) {
try {
dataSource.getConnection();
redisUtils.get("test");
new Thread(nettyWebSocketStarter).start();
logger.error("服务启动成功,可以开始愉快的开发了");
} catch (SQLException e) {
logger.error("数据库配置错误,请检查数据库配置");
} catch (Exception e) {
logger.error("服务启动失败", e);
}
}
}
作用:Spring Boot启动初始化类,集成Netty到Spring生态。
- 实现
ApplicationRunner
接口,在Spring Boot启动后执行初始化逻辑。 - 检查数据库和Redis连接可用性。
- 启动Netty服务端线程(
new Thread(nettyWebSocketStarter).start()
)。 - 统一管理服务启动状态,输出关键日志。
协作关系示意图
Spring Boot启动 → InitRun
│
├─ 启动NettyWebSocketStarter → 监听WebSocket连接
│ ├─ HandlerWebSocket处理握手/消息 → 调用ChannelContextUtils
│ └─ HandlerHeartBeat检测连接活性
│
└─ MessageHandler接收业务层消息 → 通过Redis广播 → ChannelContextUtils路由到用户/群组
关键设计总结
- 分层解耦:
- Netty处理底层通信(
NettyWebSocketStarter
、HandlerWebSocket
)。 - Spring Boot管理生命周期(
InitRun
)。 - Redis实现跨服务消息分发(
MessageHandler
)。
- Netty处理底层通信(
- 群聊实现:
- 通过
ChannelGroup
管理群组成员,消息广播效率高。 - 用户登录时自动加入关联群组(
ChannelContextUtils.addContext
)。
- 通过
- 扩展性:
- 使用
ConcurrentHashMap
和ChannelGroup
支持高并发。 - Redis Pub/Sub机制便于横向扩展多实例服务。
- 使用