博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty websocket
阅读量:6907 次
发布时间:2019-06-27

本文共 7726 字,大约阅读时间需要 25 分钟。

1 全局保存websocket的通道  NettyConfig.java

  

public class NettyConfig {    public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);}

2  WebsocketHandler.java  接收处理响应  客户端发来的消息

/** * 接收处理响应客户端处理 * */public class WebsocketHandler extends SimpleChannelInboundHandler{    private WebSocketServerHandshaker handshaker;    private static final String WEB_SOCKET_URL = "ws://192.168.3.167:8888/websocket";    //客户端与服务端创建连接的时候调用    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        NettyConfig.group.add(ctx.channel());        System.out.println("客户端与服务端连接开启...");    }    //客户端与服务端断开连接的时候调用    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        NettyConfig.group.remove(ctx.channel());        System.out.println("客户端与服务端连接关闭...");    }    //服务端接收客户端发送过来的数据结束之后调用    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {            ctx.flush();    }    //工程出现异常的时候调用    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        cause.printStackTrace();        ctx.close();    }    //服务端处理客户端websocket请求的核心方法    @Override    protected void messageReceived(ChannelHandlerContext context, Object msg) throws Exception {        //处理客户端向服务端发起http握手请求的业务        if (msg instanceof FullHttpRequest) {            handHttpRequest(context,  (FullHttpRequest)msg);        }else if (msg instanceof WebSocketFrame) { //处理websocket连接业务            handWebsocketFrame(context, (WebSocketFrame)msg);        }    }        /**     * 处理客户端与服务端之前的websocket业务     * @param ctx     * @param frame     */    private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){        //判断是否是关闭websocket的指令        if (frame instanceof CloseWebSocketFrame) {            handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());        }        //判断是否是ping消息        if (frame instanceof PingWebSocketFrame) {            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));            return;        }                //判断是否是二进制消息,如果是二进制消息,抛出异常        if( ! (frame instanceof TextWebSocketFrame) ){            System.out.println("目前我们不支持二进制消息");            throw new RuntimeException("【"+this.getClass().getName()+"】不支持消息");        }        //返回应答消息        //获取客户端向服务端发送的消息        String request = ((TextWebSocketFrame) frame).text();        System.out.println("服务端收到客户端的消息====>>>" + request);        TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString()                                                                                         + request);        //群发,服务端向每个连接上来的客户端群发消息        //NettyConfig.group.writeAndFlush(tws);                //单发  发给莫个人        NettyConfig.group.find(ctx.channel().id()).writeAndFlush(tws);                        }    /**     * 处理客户端向服务端发起http握手请求的业务     * @param ctx     * @param req     */    private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){        if (!req.getDecoderResult().isSuccess()                 || ! ("websocket".equals(req.headers().get("Upgrade")))) {            sendHttpResponse(ctx, req,                     new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));            return;        }        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(                WEB_SOCKET_URL, null, false);        handshaker = wsFactory.newHandshaker(req);        if (handshaker == null) {            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());        }else{            handshaker.handshake(ctx.channel(), req);        }    }        /**     * 服务端向客户端响应消息     * @param ctx     * @param req     * @param res     */    private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req,            DefaultFullHttpResponse res){        if (res.getStatus().code() != 200) {            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);            res.content().writeBytes(buf);            buf.release();        }        //服务端向客户端发送数据        ChannelFuture f = ctx.channel().writeAndFlush(res);        if (res.getStatus().code() != 200) {            f.addListener(ChannelFutureListener.CLOSE);        }    }    }

3  初始化连接时候的各个组件  

** * 初始化连接时候的各个组件 * */public class MyWebSocketChannelHandler extends ChannelInitializer
{ @Override protected void initChannel(SocketChannel e) throws Exception { e.pipeline().addLast("http-codec", new HttpServerCodec()); e.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); e.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); e.pipeline().addLast("handler", new WebsocketHandler()); }}

4 启动服务

/** * 程序的入口,负责启动应用 * */public class Main {    public static void main(String[] args) {        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workGroup);            b.channel(NioServerSocketChannel.class);            b.childHandler(new MyWebSocketChannelHandler());            System.err.println("服务端开启等待客户端连接....");            Channel ch = b.bind(8888).sync().channel();            ch.closeFuture().sync();        } catch (Exception e) {            e.printStackTrace();        }finally{            //优雅的退出程序            bossGroup.shutdownGracefully();            workGroup.shutdownGracefully();        }    }}

 5 客户端连接服务

            
WebSocket客户端

客户端接收到服务端返回的应答消息

 

 

 6 也可以用java连接websocket服务

/** * java websocket客户端 * */public class WebSocketClientTest {          public static WebSocketClient client;      public static void main(String[] args) {          try {              client = new WebSocketClient(new URI("ws://192.168.3.167:8888/websocket"),new Draft_6455()) {                  @Override                  public void onOpen(ServerHandshake serverHandshake) {                       System.err.println("握手成功");                  }                  @Override                  public void onMessage(String msg) {                       System.err.println("收到消息=========="+msg);                       if(msg.equals("over")){                           client.close();                       }                                         }                    @Override                  public void onClose(int i, String s, boolean b) {                       System.err.println("链接已关闭");                  }                    @Override                  public void onError(Exception e){                      e.printStackTrace();                      System.err.println("发生错误已关闭");                  }              };          } catch (URISyntaxException e) {              e.printStackTrace();          }          client.connect();          System.err.println(client.getDraft());         while(!client.getReadyState().equals(WebSocket.READYSTATE.OPEN)){             System.err.println("正在连接...");          }         //连接成功,发送信息      client.send("哈喽,连接一下啊");               }  }

 

7 netty 和websocketclient依赖

io.netty
netty-all
5.0.0.Alpha1
org.java-websocket
Java-WebSocket
1.3.5

 

8 感谢慕课网相关资源 

转载于:https://www.cnblogs.com/syscn/p/9116601.html

你可能感兴趣的文章
MongoDB系列一(索引及C#如何操作MongoDB)
查看>>
解决Android SDK下载和更新失败的方法(Win系统) 和离线安装
查看>>
解决eclipse+MAVEN提示One or more constraints have not been satisfied.的问题
查看>>
nginx主配置文件 在那找怎么打开
查看>>
Android:Intent
查看>>
C++标准转换运算符const_cast
查看>>
【Cocos2d-x】Mac 在 Cocos2d-x 3.X 打包Android
查看>>
测试计划与测试方案的区别
查看>>
Hadoop 读取文件API报错
查看>>
JS实现密码加密
查看>>
HTML+CSS-如何定义让两个div横向排列
查看>>
Matlab画柱状和折线对照图
查看>>
javascript时间戳和日期字符串相互转换
查看>>
链接详解--静态库
查看>>
从0开始学java——JUnit4 复习,其实基本思想还是那些,不过采用了新的注释格式的语法...
查看>>
GNU M4 - GNU Project - 免费软件基金会(FSF)
查看>>
jsp中将后台传递过来的json格式的list数据绑定到下拉菜单select
查看>>
Project Euler 85 :Counting rectangles 数长方形
查看>>
MYSQL查询某字段中以逗号分隔的字符串的方法
查看>>
Excel设置下拉菜单并隐藏下拉菜单来源单元格内容
查看>>