码农编程阁-全国最大的中文编程交流平台

 找回密码
 立即注册
查看: 80|回复: 0

[文字教程] Spring Boot + Netty + WebSocket 实现消息推送

[复制链接]
  • TA的每日心情
    擦汗
    2025-3-10 11:56
  • 签到天数: 95 天

    连续签到: 1 天

    [LV.6]常住居民II

    170

    主题

    902

    帖子

    2万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    26745
    发表于 2025-1-19 01:53:51 | 显示全部楼层 |阅读模式
    关于Netty
    Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。
    2
    Maven依赖

    1. <dependencies>
    2. <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
    3. <dependency>
    4.   <groupId>io.netty</groupId>
    5.   <artifactId>netty-all</artifactId>
    6.   <version>4.1.36.Final</version>
    7. </dependency>
    8. </dependencies>
    复制代码



    3
    SpringBootApplication
    启动器中需要new一个NettyServer,并显式调用启动netty。

    1. @SpringBootApplication
    2. public class SpringCloudStudyDemoApplication {
    3. public static void main(String[] args) {
    4.   SpringApplication.run(SpringCloudStudyDemoApplication.class,args);
    5.   try {
    6.    new NettyServer(12345).start();
    7.    System.out.println("https://blog.csdn.net/moshowgame");
    8.    System.out.println("http://127.0.0.1:6688/netty-websocket/index");
    9.   }catch(Exception e) {
    10.    System.out.println("NettyServerError:"+e.getMessage());
    11.   }
    12. }
    13. }
    复制代码



    4
    NettyServer
    启动的NettyServer,这里进行配置

    1. /**
    2. * NettyServer Netty服务器配置
    3. */
    4. public class NettyServer {

    5.     private final int port;

    6.     public NettyServer(int port) {
    7.         this.port = port;
    8.     }

    9.     public void start() throws Exception {
    10.         EventLoopGroup bossGroup = new NioEventLoopGroup();
    11.         EventLoopGroup group = new NioEventLoopGroup();

    12.         try {
    13.             ServerBootstrap sb = new ServerBootstrap();
    14.             sb.option(ChannelOption.SO_BACKLOG, 1024);
    15.             sb.group(group, bossGroup) // 绑定线程池
    16.                     .channel(NioServerSocketChannel.class) // 指定使用的channel
    17.                     .localAddress(this.port)// 绑定监听端口
    18.                     .childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作
    19.                         @Override
    20.                         protected void initChannel(SocketChannel ch) throws Exception {
    21.                             System.out.println("收到新连接");

    22.                             //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
    23.                             ch.pipeline().addLast(new HttpServerCodec());

    24.                             //以块的方式来写的处理器
    25.                             ch.pipeline().addLast(new ChunkedWriteHandler());
    26.                             ch.pipeline().addLast(new HttpObjectAggregator(8192));
    27.                             ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10));
    28.                             ch.pipeline().addLast(new MyWebSocketHandler());
    29.                         }
    30.                     });

    31.             ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
    32.             System.out.println(NettyServer.class + " 启动正在监听:" + cf.channel().localAddress());
    33.             cf.channel().closeFuture().sync(); // 关闭服务器通道
    34.         } finally {
    35.             group.shutdownGracefully().sync(); // 释放线程池资源
    36.             bossGroup.shutdownGracefully().sync();
    37.         }
    38.     }
    39. }
    复制代码

    5
    MyChannelHandlerPool
    通道组池,管理所有websocket连接

    1. /**
    2. * MyChannelHandlerPool
    3. * 通道组池,管理所有websocket连接
    4. */
    5. public class MyChannelHandlerPool {

    6.     public MyChannelHandlerPool(){}

    7.     public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    8. }
    复制代码

    6
    MyWebSocketHandler
    处理ws一下几种情况:
    • channelActive与客户端建立连接
    • channelInactive与客户端断开连接
    • channelRead0客户端发送消息处理
    1. /**
    2. * NettyServer Netty服务器配置
    3. */
    4. public class NettyServer {

    5.     private final int port;

    6.     public NettyServer(int port) {
    7.         this.port = port;
    8.     }

    9.     public void start() throws Exception {
    10.         EventLoopGroup bossGroup = new NioEventLoopGroup();
    11.         EventLoopGroup group = new NioEventLoopGroup();
    12.         try {
    13.             ServerBootstrap sb = new ServerBootstrap();
    14.             sb.option(ChannelOption.SO_BACKLOG, 1024);
    15.             sb.group(group, bossGroup) // 绑定线程池
    16.                     .channel(NioServerSocketChannel.class) // 指定使用的channel
    17.                     .localAddress(this.port)// 绑定监听端口
    18.                     .childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作

    19.                         @Override
    20.                         protected void initChannel(SocketChannel ch) throws Exception {
    21.                             System.out.println("收到新连接");
    22.                             //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
    23.                             ch.pipeline().addLast(new HttpServerCodec());
    24.                             //以块的方式来写的处理器
    25.                             ch.pipeline().addLast(new ChunkedWriteHandler());
    26.                             ch.pipeline().addLast(new HttpObjectAggregator(8192));
    27.                             ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
    28.                             ch.pipeline().addLast(new MyWebSocketHandler());
    29.                         }
    30.                     });

    31.             ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
    32.             System.out.println(NettyServer.class + " 启动正在监听:" + cf.channel().localAddress());
    33.             cf.channel().closeFuture().sync(); // 关闭服务器通道
    34.         } finally {
    35.             group.shutdownGracefully().sync(); // 释放线程池资源
    36.             bossGroup.shutdownGracefully().sync();
    37.         }
    38.     }
    39. }
    复制代码

    7
    socket.html
    主要是连接ws,发送消息,以及消息反馈

    1. <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
    2. <html xmlns="http://www.w3.org/1999/xhtml">
    3. <head>
    4.     <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    5.     <title>Netty-Websocket</title>
    6.     <script type="text/javascript">
    7.         // by zhengkai.blog.csdn.net
    8.         var socket;
    9.         if(!window.WebSocket){
    10.             window.WebSocket = window.MozWebSocket;
    11.         }

    12.         if(window.WebSocket){
    13.             socket = new WebSocket("ws://127.0.0.1:12345/ws");
    14.             socket.onmessage = function(event){
    15.                 var ta = document.getElementById('responseText');
    16.                 ta.value += event.data+"\r\n";
    17.             };
    18.             socket.onopen = function(event){
    19.                 var ta = document.getElementById('responseText');
    20.                 ta.value = "Netty-WebSocket服务器。。。。。。连接 \r\n";
    21.             };
    22.             socket.onclose = function(event){
    23.                 var ta = document.getElementById('responseText');
    24.                 ta.value = "Netty-WebSocket服务器。。。。。。关闭 \r\n";
    25.             };
    26.         }else{
    27.             alert("您的浏览器不支持WebSocket协议!");
    28.         }

    29.         function send(message){
    30.             if(!window.WebSocket){return;}
    31.             if(socket.readyState == WebSocket.OPEN){
    32.                 socket.send(message);
    33.             }else{
    34.                 alert("WebSocket 连接没有建立成功!");
    35.             }
    36.         }
    37.     </script>
    38. </head>
    39. <body>
    40. <form onSubmit="return false;">
    41.     <label>ID</label><input type="text" name="uid" value="${uid!!}" /> <br />
    42.     <label>TEXT</label><input type="text" name="message" value="这里输入消息" /> <br />
    43.     <br /> <input type="button" value="发送ws消息"
    44.                   onClick="send(this.form.uid.value+':'+this.form.message.value)" />
    45.     <hr color="black" />
    46.     <h3>服务端返回的应答消息</h3>
    47.     <textarea id="responseText" style="width: 1024px;height: 300px;"></textarea>
    48. </form>
    49. </body>
    50. </html>
    复制代码

    8
    Controller
    写好了html当然还需要一个controller来引导页面。

    1. @RestController
    2. public class IndexController {

    3. @GetMapping("/index")
    4. public ModelAndView index(){
    5.   ModelAndView mav=new ModelAndView("socket");
    6.   mav.addObject("uid", RandomUtil.randomNumbers(6));
    7.   return mav;
    8. }
    9. }
    复制代码

    640.png
    641.png
    642.png
    10
    改造netty支持url参数
    1.首先,调整一下加载handler的顺序,优先MyWebSocketHandler在WebSocketServerProtocolHandler之上。

    1. ch.pipeline().addLast(new MyWebSocketHandler());
    2. ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10));
    复制代码

    2.其次,改造MyWebSocketHandler 的channelRead方法,首次连接会是一个FullHttpRequest类型,可以通过FullHttpRequest.uri()获取完整ws的URL地址,之后接受信息的话,会是一个TextWebSocketFrame类型。

    1. public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    2.     @Override
    3.     public void channelActive(ChannelHandlerContext ctx) throws Exception {
    4.         System.out.println("与客户端建立连接,通道开启!");

    5.         //添加到channelGroup通道组
    6.         MyChannelHandlerPool.channelGroup.add(ctx.channel());
    7.     }

    8.     @Override
    9.     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    10.         System.out.println("与客户端断开连接,通道关闭!");

    11.         //添加到channelGroup 通道组
    12.         MyChannelHandlerPool.channelGroup.remove(ctx.channel());
    13.     }

    14.     @Override
    15.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    16.         //首次连接是FullHttpRequest,处理参数 by zhengkai.blog.csdn.net
    17.         if (null != msg && msg instanceof FullHttpRequest) {
    18.             FullHttpRequest request = (FullHttpRequest) msg;
    19.             String uri = request.uri();
    20.             Map paramMap=getUrlParams(uri);
    21.             System.out.println("接收到的参数是:"+JSON.toJSONString(paramMap));
    22.             //如果url包含参数,需要处理
    23.             if(uri.contains("?")){
    24.                 String newUri=uri.substring(0,uri.indexOf("?"));
    25.                 System.out.println(newUri);
    26.                 request.setUri(newUri);
    27.             }
    28.         }else if(msg instanceof TextWebSocketFrame){
    29.             //正常的TEXT消息类型
    30.             TextWebSocketFrame frame=(TextWebSocketFrame)msg;
    31.             System.out.println("客户端收到服务器数据:" +frame.text());
    32.             sendAllMessage(frame.text());
    33.         }
    34.         super.channelRead(ctx, msg);
    35.     }

    36.     @Override
    37.     protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {

    38.     }

    39.     private void sendAllMessage(String message){
    40.         //收到信息后,群发给所有channel
    41.         MyChannelHandlerPool.channelGroup.writeAndFlush( new TextWebSocketFrame(message));
    42.     }

    43.     private static Map getUrlParams(String url){
    44.         Map<String,String> map = new HashMap<>();
    45.         url = url.replace("?",";");
    46.         if (!url.contains(";")){
    47.             return map;
    48.         }

    49.         if (url.split(";").length > 0){
    50.             String[] arr = url.split(";")[1].split("&");
    51.             for (String s : arr){
    52.                 String key = s.split("=")[0];
    53.                 String value = s.split("=")[1];
    54.                 map.put(key,value);
    55.             }
    56.             return  map;
    57.         }else{
    58.             return map;
    59.         }
    60.     }
    61. }
    复制代码


    3.html中的ws地址也进行改造

    1. socket = new WebSocket("ws://127.0.0.1:12345/ws?uid=666&gid=777");
    复制代码

    4.改造后控制台输出情况

    1. 收到新连接
    2. 与客户端建立连接,通道开启!
    3. 接收到的参数是:{"uid":"666","gid":"777"}
    4. /ws
    5. 客户端收到服务器数据:142531:这里输入消息
    6. 客户端收到服务器数据:142531:这里输入消息
    7. 客户端收到服务器数据:142531:这里输入消息
    复制代码
    1. failed: WebSocket opening handshake timed out
    复制代码

    听说是ssl wss的情况下才会出现,来自 @around-gao 的解决方法:
    把MyWebSocketHandler和WebSocketServerProtocolHandler调下顺序就好了。









    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|码农编程阁-全国最大的中文编程交流平台

    GMT+8, 2025-4-29 21:25 , Processed in 0.164914 second(s), 26 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2020, Tencent Cloud.

    快速回复 返回顶部 返回列表