大家好,欢迎来到IT知识分享网。
文章目录
WebSocket快速上手
1. WebSocket是什么?
WebSocket建立在TCP协议之上,并且与HTTP协议有着良好的兼容性,最大的特点就是服务器可以主动向客户端推送消息,客户端也可以主动向服务器发送消息。
1.1. WebSocket连接过程
- 客户端通过HTTP协议向服务端发送握手,服务端向客户端返回ACK,此时握手成功,建立连接并维持该连接;
- 后面服务端和客户端就可以基于建立的连接进行双向通信,直到连接关闭。
1.2. WebSocket与HTTP对比
HTTP | WebSocket | |
---|---|---|
通信方式 | 单工 | 全双工 |
端口 | 80/443 | 80/443 |
协议 | HTTP/HTTPS | WS/WSS |
跨域问题 | 有 | 无 |
网络开销 | – | 较小 |
1.3. WebSocket协议
WebSocket的协议标识符是 ws
,加密为 wss
,例如:
ws://ip:port/some/path
2. 快速上手
本次快速上手基于Netty实现。
2.1 服务端
WebSocketServer
主服务用于启动服务端,监听请求,并对请求信息进行解码,调用处理程序 WebSocketHandler
对请求进行处理。
public class WebSocketServer {
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 5).childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("http-codec", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65535));
pipeline.addLast("http-chunked", new ChunkedWriteHandler());
pipeline.addLast("handler", new WebSocketHandler());
}
});
try {
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new WebSocketServer().run();
}
}
WebSocketHandler
作为请求处理器,主要接收并处理两种请求:
- 客户端握手请求
- 客户端消息请求,并给出响应。
public class WebSocketHandler extends ChannelInboundHandlerAdapter {
private WebSocketServerHandshaker handshaker;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
// websocket连接请求
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
// websocket业务处理
handleWebSocketRequest(ctx, (WebSocketFrame) msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
/** * 获取WebSocket服务信息 * * @param req * @return */
private static String getWebSocketLocation(FullHttpRequest req) {
String location = req.headers().get("Host") + "/ws";
return "ws://" + location;
}
/** * 接收握手请求,并响应 * * @param ctx * @param request */
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
if (!request.decoderResult().isSuccess() || !StringUtils.equals(request.headers().get("Upgrade"), "websocket")) {
sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
WebSocketServerHandshakerFactory handshakerFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(request), null, false);
handshaker = handshakerFactory.newHandshaker(request);
if (handshaker == null) {
// 不支持websocket
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
// 获取请求参数
QueryStringDecoder decoder = new QueryStringDecoder(request.uri());
Map<String, List<String>> parameters = decoder.parameters();
String userid = parameters.get("userid").get(0);
// 通过它构造握手响应消息返回给客户端
ChannelFuture future = handshaker.handshake(ctx.channel(), request);
if (future.isSuccess()) {
String msg = "客户端" + userid + "加入聊天室";
ctx.channel().writeAndFlush(new TextWebSocketFrame(msg));
}
}
}
/** * 接收WebSocket请求 * * @param ctx * @param req * @throws Exception */
private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame req) throws Exception {
if (req instanceof CloseWebSocketFrame) {
//关闭socket连接请求
handshaker.close(ctx.channel(), (CloseWebSocketFrame) req.retain());
return;
}
if (req instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(req.content().retain()));
return;
}
if (!(req instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息");
}
if (ctx == null || this.handshaker == null || ctx.isRemoved()) {
throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息");
}
ctx.channel().write(new TextWebSocketFrame(((TextWebSocketFrame) req).text()));
}
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
// BAD_REQUEST(400) 客户端请求错误返回的应答消息
if (res.status().code() != 200) {
// 将返回的状态码放入缓存中,Unpooled没有使用缓存池
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
HttpUtil.setContentLength(res, res.content().readableBytes());
}
// 发送应答消息
ChannelFuture cf = ctx.channel().writeAndFlush(res);
// 非法连接直接关闭连接
if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
cf.addListener(ChannelFutureListener.CLOSE);
}
}
}
2.2 客户端
2.2.1 浏览器客户端
需要浏览器支持。
export default class WebSocketUtils {
constructor(url) {
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket
}
if (window.WebSocket) {
this.socket = new WebSocket(url)
this.socket.onopen = function(event) {
var ta = document.getElementById('responseText')
ta.value = '连接开启!'
console.log(event)
}
this.socket.onclose = function(event) {
var ta = document.getElementById('responseText')
ta.value = ta.value + '连接被关闭'
}
this.socket.onmessage = function(event) {
var ta = document.getElementById('responseText')
console.log(event.data);
ta.value = ta.value + '\n' + event.data
}
} else {
alert('你的浏览器不支持 WebSocket!')
}
this.send = this.send.bind(this)
}
send(message) {
if (!window.WebSocket) {
return
}
if (this.socket.readyState == WebSocket.OPEN) {
this.socket.send(message)
} else {
alert('连接没有开启.')
}
}
}
2.2.2 Java客户端
WebSocketClient
用于与服务端建立通信连接,以及发送信息的主入口。
public class ChatRoomClient {
private URI uri;
private Channel channel;
private ChatRoomClientHandler handler;
public ChatRoomClient(String uri) {
this.uri = URI.create(uri);
}
public void run() throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
String protocal = uri.getScheme();
if (!StringUtils.equals(protocal, "ws")) {
throw new ProtocolException("Unsupported protocal:" + protocal);
}
handler = new ChatRoomClientHandler(uri);
bootstrap.channel(NioSocketChannel.class).group(workerGroup).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS,20000).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("http-codec", new HttpClientCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65535));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(handler);
}
});
this.channel = bootstrap.connect(uri.getHost(), uri.getPort()).sync().channel();
ChannelFuture future = handler.handshakerFuture();//handshakerFuture用于等待握手结果,标识握手是否成功
future.sync();//这里处理同步等待,一直等到握手成功,调用setSuccess()方法才会结束,终止等待状态
}
public void close() throws InterruptedException {
this.channel.writeAndFlush(new CloseWebSocketFrame());
this.channel.closeFuture().sync();//等待调用close()方法
}
public void send(final String text) {
if (this.handler.handshakerFuture().isSuccess()) {
this.channel.writeAndFlush(new TextWebSocketFrame(text));
} else {
System.out.println("没有握手成功!");
}
}
}
WebSocketClientHandler
1、在建立连接成功后发起握手请求;
2、在接收到消息时对消息进行处理。
public class ChatRoomClientHandler extends SimpleChannelInboundHandler<Object> {
private WebSocketClientHandshaker handshaker;
private ChannelPromise handshakerFuture;
public ChatRoomClientHandler(URI uri) {
this.handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, EmptyHttpHeaders.INSTANCE, 1280000);
}
public ChannelFuture handshakerFuture() {
return this.handshakerFuture;
}
@Override
/** * 处理器加入到处理pipeline后,新建握手等待标识Future */
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("准备好处理事件");
handshakerFuture = ctx.newPromise();
}
@Override
/** * 连接建立成功后,发起握手请求 */
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接成功!" + ctx.name());
this.handshaker.handshake(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接断开!");
System.err.println("WebSocket client disconnected!");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (!this.handshakerFuture.isDone()) {
this.handshakerFuture.cancel(true);
}
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (!this.handshakerFuture.isDone()) {
this.handshakerFuture.setFailure(cause);
}
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
final Channel channel = ctx.channel();
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
if (!response.decoderResult().isSuccess()) {
throw new ProtocolException("响应内容解析失败!");
} else if (!this.handshaker.isHandshakeComplete()) {
this.handshaker.finishHandshake(channel, (FullHttpResponse) msg);
handshakerFuture.setSuccess();//标识握手成功
System.out.println("握手成功");
return;
}
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
System.out.println(response.toString());
}
final WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
System.out.println(((TextWebSocketFrame) frame).text());
} else if (frame instanceof CloseWebSocketFrame) {
channel.close();
} else if (frame instanceof PongWebSocketFrame) {
System.out.println(((PongWebSocketFrame) frame).toString());
}
}
}
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/23273.html