⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Spring-Cloud-Gateway/filter-websocket-routing/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Spring-Cloud-Gateway 2.0.X M4


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

阅读源码最好的方式,是使用 IDEA 进行调试 Spring Cloud Gateway 源码,不然会一脸懵逼。

胖友可以点击「芋道源码」扫码关注,回复 git019 关键字
获得艿艿添加了中文注释的 Spring Cloud Gateway 源码地址。

阅读源码很孤单,加入源码交流群,一起坚持!

1. 概述

本文主要分享 WebsocketRoutingFilter 的代码实现

WebsocketRoutingFilter ,Websocket 路由网关过滤器。其根据 ws:// / wss:// 前缀( Scheme )过滤处理,代理后端 Websocket 服务,提供给客户端连接。如下图 :

  • 目前一个 RouteDefinition 只能指定一个后端 WebSocket 服务。官方正在计划在 LoadBalancerClientFilter 上实现 Websocket 的负载均衡功能。也就说,未来一个 RouteDefinition 能够指定多个后端 WebSocket 服务。

Websocket 的 RouteDefinition 配置如下 :

cloud:
gateway:
routes:
- id: websocket_test
uri: ws://localhost:9000
order: 8000
predicates:
- Path=/echo

  • uri 使用 ws:// 或者 wss:// 为前缀。

推荐 Spring Cloud 书籍

2. 环境搭建

在解析源码之前,我们先以 wscat 搭建一个 WebSocket 服务。

第一步,安装 wscat 。

npm install -g wscat

第二步,启动 wscat 。

wscat --listen 9000

第三步,连接 wscat 。

wscat --listen 9000

第四步,配置 RouteDefinition ,并启动 Spring Cloud Gateway 。

cloud:
gateway:
routes:
- id: websocket_test
uri: ws://localhost:9000
order: 8000
predicates:
- Path=/echo

第五步,通过 Gateway 连接 wscat 。

wscat --connect ws://localhost:8080/echo

大功告成。

注意,wscat 同一时间仅允许一个客户端连接。

3. WebsocketRoutingFilter

org.springframework.cloud.gateway.filter.WebsocketRoutingFilter ,Websocket 路由网关过滤器。

构造方法,代码如下 :

public class WebsocketRoutingFilter implements GlobalFilter, Ordered {
public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";

private final WebSocketClient webSocketClient;
private final WebSocketService webSocketService;

public WebsocketRoutingFilter(WebSocketClient webSocketClient) {
this(webSocketClient, new HandshakeWebSocketService());
}

public WebsocketRoutingFilter(WebSocketClient webSocketClient,
WebSocketService webSocketService) {
this.webSocketClient = webSocketClient;
this.webSocketService = webSocketService;
}

}


#getOrder() 方法,代码如下 :

@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}


#filter(ServerWebExchange, GatewayFilterChain) 方法,代码如下 :

 1: @Override
2: public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
3: // 获得 requestUrl
4: URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
5:
6: // 判断是否能够处理
7: String scheme = requestUrl.getScheme();
8: if (isAlreadyRouted(exchange) || (!scheme.equals("ws") && !scheme.equals("wss"))) {
9: return chain.filter(exchange);
10: }
11:
12: // 设置已经路由
13: setAlreadyRouted(exchange);
14:
15: // 处理连接请求
16: return this.webSocketService.handleRequest(exchange,
17: new ProxyWebSocketHandler(requestUrl, this.webSocketClient, exchange.getRequest().getHeaders()));
18: }

  • 第 4 行 :获得 requestUrl

  • 第 7 至 10 行 :判断 ForwardRoutingFilter 是否能够处理该请求,需要满足两个条件 :

    • ws:// 或者 wss:// 前缀( Scheme ) 。

    • 调用 ServerWebExchangeUtils#isAlreadyRouted(ServerWebExchange) 方法,判断该请求暂未被其他 Routing 网关处理。代码如下 :

      public static boolean isAlreadyRouted(ServerWebExchange exchange) {
      return exchange.getAttributeOrDefault(GATEWAY_ALREADY_ROUTED_ATTR, false);
      }

      • x
  • 第 13 行 :设置该请求已经被处理。代码如下 :

    public static void setAlreadyRouted(ServerWebExchange exchange) {
    exchange.getAttributes().put(GATEWAY_ALREADY_ROUTED_ATTR, true);
    }

  • 第 15 至 16 行 :调用 WebSocketService#hanldeRequest(ServerWebExchange, WebSocketHandler) 方法,处理客户端发起的连接请求( Handshake Request ) 。这个方法的实现不在本文范围内,但是良心如笔者,大概讲下涉及到的类 :

3.1 ProxyWebSocketHandler

org.springframework.cloud.gateway.filter.WebsocketRoutingFilter.ProxyWebSocketHandler代理后端 WebSocket 服务处理器。

构造方法,代码如下 :

 1: private static class ProxyWebSocketHandler implements WebSocketHandler {
2:
3: private final WebSocketClient client;
4: private final URI url;
5: private final HttpHeaders headers;
6: private final List<String> subProtocols;
7:
8: public ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers) {
9: this.client = client;
10: this.url = url;
11: this.headers = new HttpHeaders();//headers;
12: //TODO: better strategy to filter these headers?
13: headers.entrySet().forEach(header -> {
14: if (!header.getKey().toLowerCase().startsWith("sec-websocket")
15: && !header.getKey().equalsIgnoreCase("upgrade")
16: && !header.getKey().equalsIgnoreCase("connection")) {
17: this.headers.addAll(header.getKey(), header.getValue());
18: }
19: });
20: List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);
21: if (protocols != null) {
22: this.subProtocols = protocols;
23: } else {
24: this.subProtocols = Collections.emptyList();
25: }
26: }
27: }


#handle(WebSocketSession) 方法,代码如下 :

 1: @Override
2: public Mono<Void> handle(WebSocketSession session) {
3: // pass headers along so custom headers can be sent through
4: return client.execute(url, this.headers, new WebSocketHandler() {
5: @Override
6: public Mono<Void> handle(WebSocketSession proxySession) {
7: // Use retain() for Reactor Netty
8: // 转发消息 客户端 =》后端服务
9: Mono<Void> proxySessionSend = proxySession
10: .send(session.receive().doOnNext(WebSocketMessage::retain));
11: // 转发消息 后端服务=》客户端
12: // .log("proxySessionSend", Level.FINE);
13: Mono<Void> serverSessionSend = session
14: .send(proxySession.receive().doOnNext(WebSocketMessage::retain));
15: // .log("sessionSend", Level.FINE);
16:
17: //
18: return Mono.when(proxySessionSend, serverSessionSend).then();
19: }
20:
21: /**
22: * Copy subProtocols so they are available downstream.
23: * @return
24: */
25: @Override
26: public List<String> getSubProtocols() {
27: return ProxyWebSocketHandler.this.subProtocols;
28: }
29: });
30: }

  • 第 6 行 :调用 WebSocketClient#execute(URI, HttpHeaders, WebSocketHandler) 方法,连接后端【被代理】的 WebSocket 服务。连接成功后,回调 WebSocketHandler 实现的内部类的 #handle(WebSocketSession) 方法。
  • WebSocketHandler 实现的内部类
    • 第 9 至 10 行 :转发消息,客户端 => 后端服务。
    • 第 13 至 14 行 :转发消息,后端服务 => 客户端。
    • 第 18 行 :调用 Mono#when() 方法,合并 proxySessionSend / serverSessionSend 两个 Mono 。调用 Mono#then() 方法,参数为空,合并的 Mono 不发射数据出来。RxJava 和 Reactor 类似,可以参考 《ReactiveX文档中文翻译 —— And/Then/When》 学习下 when / and / then 操作符。
    • 下图可以帮助理解下这个类的用途 :

666. 彩蛋

知识星球

😈 限于对 Reactor 和 Netty 了解不够深入,写的不够透彻。回头深入理解下它们。

胖友,分享一波朋友圈可好!

文章目录
  1. 1. 1. 概述
  2. 2. 2. 环境搭建
  3. 3. 3. WebsocketRoutingFilter
    1. 3.1. 3.1 ProxyWebSocketHandler
  4. 4. 666. 彩蛋