1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
| package com.imooc.netty;
import java.util.Date;
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.util.CharsetUtil;
public class MyWebSocketHandler extends SimpleChannelInboundHandler<Object> { private WebSocketServerHandshaker handshaker; private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket"; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { NettyConfig.group.add(ctx.channel()); System.out.println("客户端与服务端连接开启..."); }
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { NettyConfig.group.remove(ctx.channel()); System.out.println("客户端与服务端连接关闭..."); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }
@Override protected void messageReceived(ChannelHandlerContext context, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { handHttpRequest(context, (FullHttpRequest)msg); }else if (msg instanceof WebSocketFrame) { handWebsocketFrame(context, (WebSocketFrame)msg); } }
private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){ if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain()); } if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } if( ! (frame instanceof TextWebSocketFrame) ){ System.out.println("目前我们不支持二进制消息"); throw new RuntimeException("【"+this.getClass().getName()+"】不支持消息"); } String request = ((TextWebSocketFrame) frame).text(); System.out.println("服务端收到客户端的消息====>>>" + request); TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + " ===>>> " + request); NettyConfig.group.writeAndFlush(tws); }
private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){ if (!req.getDecoderResult().isSuccess() || ! ("websocket".equals(req.headers().get("Upgrade")))) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( WEB_SOCKET_URL, null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); }else{ handshaker.handshake(ctx.channel(), req); } }
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res){ if (res.getStatus().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } ChannelFuture f = ctx.channel().writeAndFlush(res); if (res.getStatus().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } }
|