Package org.springframework.web.reactive.socket




public interface WebSocketHandler {

* 返回此处理程序支持的子协议列表。
* 默认情况下返回一个空列表。
default List<String> getSubProtocols() {
return Collections.emptyList();

* 处理WebSocket会话。
* @param session the session to handle
* @return completion {@code Mono<Void>} to indicate the outcome of the
* WebSocket session handling.
Mono<Void> handle(WebSocketSession session);





public interface WebSocketSession {

* Return the id for the session.
* 返回会话的ID。
String getId();

* Return information from the handshake request.
HandshakeInfo getHandshakeInfo();

* 返回一个DataBuffer Factory来创建消息有效载荷。
* Return a {@code DataBuffer} Factory to create message payloads.
* @return the buffer factory for the session
DataBufferFactory bufferFactory();

* 获取传入消息的流。
* Get the flux of incoming messages.
Flux<WebSocketMessage> receive();

* 将给定的消息写入WebSocket连接。
* Write the given messages to the WebSocket connection.
* @param messages the messages to write
Mono<Void> send(Publisher<WebSocketMessage> messages);

* 用CloseStatus.NORMAL关闭WebSocket会话。
* Close the WebSocket session with {@link CloseStatus#NORMAL}.
default Mono<Void> close() {
return close(CloseStatus.NORMAL);

* 关闭具有给定状态的WebSocket会话。
* Close the WebSocket session with the given status.
* @param status the close status
Mono<Void> close(CloseStatus status);

// WebSocketMessage factory methods

/**Factory方法使用会话的bufferFactory()创建文本WebSocketMessag * e。
WebSocketMessage textMessage(String payload);

* Factory方法使用会话的bufferFactory()创建二进制WebSocketMes* sage。
WebSocketMessage binaryMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);

* 工厂方法使用会话的bufferFactory()创建一个ping *WebSocketMessage。
WebSocketMessage pingMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);

* Factory方法使用会话的bufferFactory()创建pong *WebSocketMessage。
WebSocketMessage pongMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);





public class WebSocketMessage {

private final Type type;

private final DataBuffer payload;

* WebSocketMessage的构造函数.
* 请参阅WebSocketSession中的静态工厂方法,或使用 WebSocketSession.bufferFactory()创建有效内容,然后调用此构造函 数。
public WebSocketMessage(Type type, DataBuffer payload) {
Assert.notNull(type, "'type' must not be null");
Assert.notNull(payload, "'payload' must not be null");
this.type = type;
this.payload = payload;

* Return the message type (text, binary, etc).
public Type getType() {
return this.type;

* Return the message payload.
public DataBuffer getPayload() {
return this.payload;

* Return the message payload as UTF-8 text. This is a useful for text
* WebSocket messages.
public String getPayloadAsText() {
byte[] bytes = new byte[this.payload.readableByteCount()];
return new String(bytes, StandardCharsets.UTF_8);

* 保留消息有效载荷的数据缓冲区,这在运行时(例如Netty)和池缓冲区中很有用。一个快捷方式:
public WebSocketMessage retain() {
return this;

public void release() {

public boolean equals(Object other) {
if (this == other) {
return true;
if (!(other instanceof WebSocketMessage)) {
return false;
WebSocketMessage otherMessage = (WebSocketMessage) other;
return (this.type.equals(otherMessage.type) &&
ObjectUtils.nullSafeEquals(this.payload, otherMessage.payload));

public int hashCode() {
return this.type.hashCode() * 29 + this.payload.hashCode();

* WebSocket 消息类型.
public enum Type { TEXT, BINARY, PING, PONG }



表示WebSocket“关闭”的状态码和原因。 1xxx范围内的状态码由协议预先定义。

public final class CloseStatus {

* "1000 indicates a normal closure, meaning that the purpose for which the connection
* was established has been fulfilled."
public static final CloseStatus NORMAL = new CloseStatus(1000);

* "1001 indicates that an endpoint is "going away", such as a server going down or a
* browser having navigated away from a page."
public static final CloseStatus GOING_AWAY = new CloseStatus(1001);

* "1002 indicates that an endpoint is terminating the connection due to a protocol
* error."
public static final CloseStatus PROTOCOL_ERROR = new CloseStatus(1002);

* "1003 indicates that an endpoint is terminating the connection because it has
* received a type of data it cannot accept (e.g., an endpoint that understands only
* text data MAY send this if it receives a binary message)."
public static final CloseStatus NOT_ACCEPTABLE = new CloseStatus(1003);

// 10004: Reserved.
// The specific meaning might be defined in the future.

* "1005 is a reserved value and MUST NOT be set as a status code in a Close control
* frame by an endpoint. It is designated for use in applications expecting a status
* code to indicate that no status code was actually present."
public static final CloseStatus NO_STATUS_CODE = new CloseStatus(1005);

* "1006 is a reserved value and MUST NOT be set as a status code in a Close control
* frame by an endpoint. It is designated for use in applications expecting a status
* code to indicate that the connection was closed abnormally, e.g., without sending
* or receiving a Close control frame."
public static final CloseStatus NO_CLOSE_FRAME = new CloseStatus(1006);

* "1007 indicates that an endpoint is terminating the connection because it has
* received data within a message that was not consistent with the type of the message
* (e.g., non-UTF-8 [RFC3629] data within a text message)."
public static final CloseStatus BAD_DATA = new CloseStatus(1007);

* "1008 indicates that an endpoint is terminating the connection because it has
* received a message that violates its policy. This is a generic status code that can
* be returned when there is no other more suitable status code (e.g., 1003 or 1009)
* or if there is a need to hide specific details about the policy."
public static final CloseStatus POLICY_VIOLATION = new CloseStatus(1008);

* "1009 indicates that an endpoint is terminating the connection because it has
* received a message that is too big for it to process."
public static final CloseStatus TOO_BIG_TO_PROCESS = new CloseStatus(1009);

* "1010 indicates that an endpoint (client) is terminating the connection because it
* has expected the server to negotiate one or more extension, but the server didn't
* return them in the response message of the WebSocket handshake. The list of
* extensions that are needed SHOULD appear in the /reason/ part of the Close frame.
* Note that this status code is not used by the server, because it can fail the
* WebSocket handshake instead."
public static final CloseStatus REQUIRED_EXTENSION = new CloseStatus(1010);

* "1011 indicates that a server is terminating the connection because it encountered
* an unexpected condition that prevented it from fulfilling the request."
public static final CloseStatus SERVER_ERROR = new CloseStatus(1011);

* "1012 indicates that the service is restarted. A client may reconnect, and if it
* chooses to do, should reconnect using a randomized delay of 5 - 30s."
public static final CloseStatus SERVICE_RESTARTED = new CloseStatus(1012);

* "1013 indicates that the service is experiencing overload. A client should only
* connect to a different IP (when there are multiple for the target) or reconnect to
* the same IP upon user action."
public static final CloseStatus SERVICE_OVERLOAD = new CloseStatus(1013);

* "1015 is a reserved value and MUST NOT be set as a status code in a Close control
* frame by an endpoint. It is designated for use in applications expecting a status
* code to indicate that the connection was closed due to a failure to perform a TLS
* handshake (e.g., the server certificate can't be verified)."
public static final CloseStatus TLS_HANDSHAKE_FAILURE = new CloseStatus(1015);

private final int code;

private final String reason;

* Create a new {@link CloseStatus} instance.
* @param code the status code
public CloseStatus(int code) {
this(code, null);

* Create a new {@link CloseStatus} instance.
* @param code the status code
* @param reason the reason
public CloseStatus(int code, @Nullable String reason) {
Assert.isTrue((code >= 1000 && code < 5000), "Invalid status code");
this.code = code;
this.reason = reason;

* Return the status code.
public int getCode() {
return this.code;

* Return the reason, or {@code null} if none.
public String getReason() {
return this.reason;

* Create a new {@link CloseStatus} from this one with the specified reason.
* @param reason the reason
* @return a new {@link CloseStatus} instance
public CloseStatus withReason(String reason) {
Assert.hasText(reason, "Reason must not be empty");
return new CloseStatus(this.code, reason);

public boolean equalsCode(CloseStatus other) {
return (this.code == other.code);

public boolean equals(Object other) {
if (this == other) {
return true;
if (!(other instanceof CloseStatus)) {
return false;
CloseStatus otherStatus = (CloseStatus) other;
return (this.code == otherStatus.code &&
ObjectUtils.nullSafeEquals(this.reason, otherStatus.reason));

public int hashCode() {
return this.code * 29 + ObjectUtils.nullSafeHashCode(this.reason);

public String toString() {
return "CloseStatus[code=" + this.code + ", reason=" + this.reason + "]";


状态码 常量 表示
1000 NORMAL 1000表示一个正常的闭包,这意味着建立连接的目的已经完成
1001 GOING_AWAY 1001表示端点正在“消失”,比如服务器关闭或浏览器从页面上离开。
1002 PROTOCOL_ERROR 1002表示由于协议错误,端点正在终止连接
1003 NOT_ACCEPTABLE 1003表示端点正在终止连接,因为它接收了一种无法接受的数据类型(例如,一个只理解文本数据的端点,如果接收到二进制消息,则可以发送此数据
1004 Reserved 保留,未来可能在定义
1005 NO_STATUS_CODE 1005是一个保留值,不能在端点的闭合控制帧中设置为状态码。
1006 NO_CLOSE_FRAME 1006是一个保留值,不能在端点的闭合控制帧中设置为状态码
1007 BAD_DATA 1007表示端点正在终止连接,因为它在一条消息中接收到与消息类型不一致的数据(例如,文本消息中的非utf - 8[RFC3629]数据)。
1008 POLICY_VIOLATION 1008表示端点正在终止连接,因为它收到了违反其策略的消息。
1009 TOO_BIG_TO_PROCESS 1009表示端点正在终止连接,因为它收到了一个太大的消息,无法处理。
10010 REQUIRED_EXTENSION 1010表示端点(客户端)终止了连接,因为它期望服务器可以协商一个或多个扩展,但是服务器并没有在WebSocket握手的响应消息中返回它们
10011 SERVER_ERROR 1011表明服务器正在终止连接,因为它遇到了一个意想不到的情况,阻止它完成请求
10012 SERVICE_RESTARTED 1012表示该服务重新启动
10013 SERVICE_OVERLOAD 1013显示服务正在经历过载。
10015 TLS_HANDSHAKE_FAILURE 1015是一个保留的值,不能在端点的闭环控制帧中设置为状态码



public class HandshakeInfo {

private final URI uri;

private final Mono<Principal> principalMono;

private final HttpHeaders headers;

private final String protocol;

* Constructor with information about the handshake.
* @param uri the endpoint URL
* @param headers request headers for server or response headers or client
* @param principal the principal for the session
* @param protocol the negotiated sub-protocol (may be {@code null})
public HandshakeInfo(URI uri, HttpHeaders headers, Mono<Principal> principal, @Nullable String protocol) {
Assert.notNull(uri, "URI is required");
Assert.notNull(headers, "HttpHeaders are required");
Assert.notNull(principal, "Principal is required");
this.uri = uri;
this.headers = headers;
this.principalMono = principal;
this.protocol = protocol;

* 返回WebSocket端点的URL
* Return the URL for the WebSocket endpoint.
public URI getUri() {
return this.uri;

* Return the handshake HTTP headers. Those are the request headers for a
* server session and the response headers for a client session.
public HttpHeaders getHeaders() {
return this.headers;

* 返回与握手HTTP请求相关的主体。
* Return the principal associated with the handshake HTTP request.
public Mono<Principal> getPrincipal() {
return this.principalMono;

* 在握手时协商的子协议,如果没有,则为null。
* The sub-protocol negotiated at handshake time, or {@code null} if none.
* @see <a href="https://tools.ietf.org/html/rfc6455#section-1.9">
* https://tools.ietf.org/html/rfc6455#section-1.9</a>
public String getSubProtocol() {
return this.protocol;

public String toString() {
return "HandshakeInfo[uri=" + this.uri + ", headers=" + this.headers + "]";


Package org.springframework.web.reactive.socket.adapter

将Spring的Reactive WebSocket API与WebSocket运行时相适配的类。




在事件侦听器WebSocket API之间架设的WebSocketSession实现的基类

public abstract class AbstractWebSocketSession<T> implements WebSocketSession {

private final T delegate;

private final String id;

private final HandshakeInfo handshakeInfo;

private final DataBufferFactory bufferFactory;

* Create a new instance and associate the given attributes with it.
protected AbstractWebSocketSession(T delegate, String id, HandshakeInfo handshakeInfo,
DataBufferFactory bufferFactory) {

Assert.notNull(delegate, "Native session is required.");
Assert.notNull(id, "Session id is required.");
Assert.notNull(handshakeInfo, "HandshakeInfo is required.");
Assert.notNull(bufferFactory, "DataBuffer factory is required.");

this.delegate = delegate;
this.id = id;
this.handshakeInfo = handshakeInfo;
this.bufferFactory = bufferFactory;

protected T getDelegate() {
return this.delegate;

public String getId() {
return this.id;

public HandshakeInfo getHandshakeInfo() {
return this.handshakeInfo;

// 返回一个DataBuffer Factory来创建消息有效载荷。
public DataBufferFactory bufferFactory() {
return this.bufferFactory;

public abstract Flux<WebSocketMessage> receive();

public abstract Mono<Void> send(Publisher<WebSocketMessage> messages);

// WebSocketMessage factory methods

public WebSocketMessage textMessage(String payload) {
byte[] bytes = payload.getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = bufferFactory().wrap(bytes);
return new WebSocketMessage(WebSocketMessage.Type.TEXT, buffer);

// Factory方法使用WebSocketSession.bufferFactory()为会话创建二进制WebSocketMessage。
public WebSocketMessage binaryMessage(Function<DataBufferFactory, DataBuffer> payloadFactory) {
DataBuffer payload = payloadFactory.apply(bufferFactory());
return new WebSocketMessage(WebSocketMessage.Type.BINARY, payload);

//工厂方法使用WebSocketSession.bufferFactory()为会话创建一个ping WebSocketMessage。

public WebSocketMessage pingMessage(Function<DataBufferFactory, DataBuffer> payloadFactory) {
DataBuffer payload = payloadFactory.apply(bufferFactory());
return new WebSocketMessage(WebSocketMessage.Type.PING, payload);

//工厂方法创建一个使用WebSocketSession.bufferFactory pong WebSocketMessage会话()。
public WebSocketMessage pongMessage(Function<DataBufferFactory, DataBuffer> payloadFactory) {
DataBuffer payload = payloadFactory.apply(bufferFactory());
return new WebSocketMessage(WebSocketMessage.Type.PONG, payload);

public String toString() {
return getClass().getSimpleName() + "[id=" + getId() + ", uri=" + getHandshakeInfo().getUri() + "]";



在事件侦听器WebSocket API(例如Java WebSocket API JSR-356,Jetty,Undertow)和Reactive Streams之间进行桥接的WebSocketSession实现的基类。


public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSocketSession<T>
implements Subscriber<Void> {

* The "back-pressure" buffer size to use if the underlying WebSocket API
* does not have flow control for receiving messages.
private static final int RECEIVE_BUFFER_SIZE = 8192;

private final MonoProcessor<Void> completionMono;

private final WebSocketReceivePublisher receivePublisher = new WebSocketReceivePublisher();

private volatile WebSocketSendProcessor sendProcessor;

private final AtomicBoolean sendCalled = new AtomicBoolean();

* Base constructor.
* @param delegate the native WebSocket session, channel, or connection
* @param id the session id
* @param handshakeInfo the handshake info
* @param bufferFactory the DataBuffer factor for the current connection
public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo handshakeInfo,
DataBufferFactory bufferFactory) {

this(delegate, id, handshakeInfo, bufferFactory, null);

* Alternative constructor with completion {@code Mono&lt;Void&gt;} to propagate
* the session completion (success or error) (for client-side use).
public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo handshakeInfo,
DataBufferFactory bufferFactory, @Nullable MonoProcessor<Void> completionMono) {

super(delegate, id, handshakeInfo, bufferFactory);
this.completionMono = completionMono;

protected WebSocketSendProcessor getSendProcessor() {
WebSocketSendProcessor sendProcessor = this.sendProcessor;
Assert.state(sendProcessor != null, "No WebSocketSendProcessor available");
return sendProcessor;

public Flux<WebSocketMessage> receive() {
return canSuspendReceiving() ?
Flux.from(this.receivePublisher) :

public Mono<Void> send(Publisher<WebSocketMessage> messages) {
if (this.sendCalled.compareAndSet(false, true)) {
WebSocketSendProcessor sendProcessor = new WebSocketSendProcessor();
this.sendProcessor = sendProcessor;
return Mono.from(subscriber -> {
else {
return Mono.error(new IllegalStateException("send() has already been called"));

* 底层的WebSocket API是否具有流量控制功能,可以暂停和恢复接收消息。
protected abstract boolean canSuspendReceiving();

* Suspend receiving until received message(s) are processed and more demand
* is generated by the downstream Subscriber.
* <p><strong>Note:</strong> if the underlying WebSocket API does not provide
* flow control for receiving messages, and this method should be a no-op
* and {@link #canSuspendReceiving()} should return {@code false}.
protected abstract void suspendReceiving();

* Resume receiving new message(s) after demand is generated by the
* downstream Subscriber.
* <p><strong>Note:</strong> if the underlying WebSocket API does not provide
* flow control for receiving messages, and this method should be a no-op
* and {@link #canSuspendReceiving()} should return {@code false}.
protected abstract void resumeReceiving();

* Send the given WebSocket message.
protected abstract boolean sendMessage(WebSocketMessage message) throws IOException;

// WebSocketHandler adapter delegate methods

/** Handle a message callback from the WebSocketHandler adapter */
void handleMessage(Type type, WebSocketMessage message) {

/** Handle an error callback from the WebSocketHandler adapter */
void handleError(Throwable ex) {
WebSocketSendProcessor sendProcessor = this.sendProcessor;
if (sendProcessor != null) {

/** Handle a close callback from the WebSocketHandler adapter */
void handleClose(CloseStatus reason) {
WebSocketSendProcessor sendProcessor = this.sendProcessor;
if (sendProcessor != null) {

// Subscriber<Void> implementation

public void onSubscribe(Subscription subscription) {

public void onNext(Void aVoid) {
// no op

public void onError(Throwable ex) {
if (this.completionMono != null) {
int code = CloseStatus.SERVER_ERROR.getCode();
close(new CloseStatus(code, ex.getMessage()));

public void onComplete() {
if (this.completionMono != null) {

private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher<WebSocketMessage> {

private volatile WebSocketMessage webSocketMessage;

protected void checkOnDataAvailable() {
if (this.webSocketMessage != null) {

protected WebSocketMessage read() throws IOException {
if (this.webSocketMessage != null) {
WebSocketMessage result = this.webSocketMessage;
this.webSocketMessage = null;
return result;

return null;

void handleMessage(WebSocketMessage webSocketMessage) {
this.webSocketMessage = webSocketMessage;

protected final class WebSocketSendProcessor extends AbstractListenerWriteProcessor<WebSocketMessage> {

private volatile boolean isReady = true;

protected boolean write(WebSocketMessage message) throws IOException {
return sendMessage(message);

protected void releaseData() {
this.currentData = null;

protected boolean isDataEmpty(WebSocketMessage message) {
return (message.getPayload().readableByteCount() == 0);

protected boolean isWritePossible() {
return (this.isReady && this.currentData != null);

* Sub-classes can invoke this before sending a message (false) and
* after receiving the async send callback (true) effective translating
* async completion callback into simple flow control.
public void setReadyToSend(boolean ready) {
this.isReady = ready;


我们可以看到这两个类是由子类来实现的。不同的服务器有不同的实现。 @Override public abstract Flux<WebSocketMessage> receive();

public abstract Mono<Void> send(Publisher<WebSocketMessage> messages);



基于Netty的WebSocketSession适配器的基类,它提供了将Netty WebSocketFrames转换为WebSocketMessages和从WebSocketMessages转换的便利方法。

public abstract class NettyWebSocketSessionSupport<T> extends AbstractWebSocketSession<T> {

* 默认的最大大小用于聚集入站WebSocket帧。
* The default max size for aggregating inbound WebSocket frames.
protected static final int DEFAULT_FRAME_MAX_SIZE = 64 * 1024;

private static final Map<Class<?>, WebSocketMessage.Type> MESSAGE_TYPES;

static {
MESSAGE_TYPES = new HashMap<>(4);
MESSAGE_TYPES.put(TextWebSocketFrame.class, WebSocketMessage.Type.TEXT);
MESSAGE_TYPES.put(BinaryWebSocketFrame.class, WebSocketMessage.Type.BINARY);
MESSAGE_TYPES.put(PingWebSocketFrame.class, WebSocketMessage.Type.PING);
MESSAGE_TYPES.put(PongWebSocketFrame.class, WebSocketMessage.Type.PONG);

protected NettyWebSocketSessionSupport(T delegate, HandshakeInfo info, NettyDataBufferFactory factory) {
super(delegate, ObjectUtils.getIdentityHexString(delegate), info, factory);

//返回一个DataBuffer Factory来创建消息有效载荷。
public NettyDataBufferFactory bufferFactory() {
return (NettyDataBufferFactory) super.bufferFactory();

protected WebSocketMessage toMessage(WebSocketFrame frame) {
DataBuffer payload = bufferFactory().wrap(frame.content());
return new WebSocketMessage(MESSAGE_TYPES.get(frame.getClass()), payload);

protected WebSocketFrame toFrame(WebSocketMessage message) {
ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(message.getPayload());
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
return new TextWebSocketFrame(byteBuf);
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
return new BinaryWebSocketFrame(byteBuf);
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
return new PingWebSocketFrame(byteBuf);
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
return new PongWebSocketFrame(byteBuf);
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());



Spring WebSocketSession实现,可以适应反应堆Netty的WebSocket NettyInbound和NettyOutbound。

public class ReactorNettyWebSocketSession
extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> {

public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
HandshakeInfo info, NettyDataBufferFactory bufferFactory) {

super(new WebSocketConnection(inbound, outbound), info, bufferFactory);

public Flux<WebSocketMessage> receive() {
return getDelegate().getInbound()

public Mono<Void> send(Publisher<WebSocketMessage> messages) {
Flux<WebSocketFrame> frames = Flux.from(messages).map(this::toFrame);
return getDelegate().getOutbound()

public Mono<Void> close(CloseStatus status) {
return Mono.error(new UnsupportedOperationException(
"Currently in Reactor Netty applications are expected to use the " +
"Cancellation returned from subscribing to the \"receive\"-side Flux " +
"in order to close the WebSocket session."));

* Simple container for {@link NettyInbound} and {@link NettyOutbound}.
public static class WebSocketConnection {

private final WebsocketInbound inbound;

private final WebsocketOutbound outbound;

public WebSocketConnection(WebsocketInbound inbound, WebsocketOutbound outbound) {
this.inbound = inbound;
this.outbound = outbound;

public WebsocketInbound getInbound() {
return this.inbound;

public WebsocketOutbound getOutbound() {
return this.outbound;




为标准Java(JSR 356)会话跳转WebSocketSession适配器。


Java WebSocket API(JSR-356)的适配器,它将事件委托给一个被动的WebSocketHandler及其会话。




package org.springframework.web.reactive.socket.client;



public interface WebSocketClient {

// 具有自定义标头的执行
Mono<Void> execute(URI url, WebSocketHandler handler);

Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler);




public class WebSocketClientSupport {

private static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";

protected final Log logger = LogFactory.getLog(getClass());

protected List<String> beforeHandshake(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
if (logger.isDebugEnabled()) {
logger.debug("Executing handshake to " + url);
return handler.getSubProtocols();

protected HandshakeInfo afterHandshake(URI url, HttpHeaders responseHeaders) {
if (logger.isDebugEnabled()) {
logger.debug("Handshake response: " + url + ", " + responseHeaders);
String protocol = responseHeaders.getFirst(SEC_WEBSOCKET_PROTOCOL);
return new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);



用于Reactor Netty的WebSocketClient实现。

public class ReactorNettyWebSocketClient extends WebSocketClientSupport implements WebSocketClient {

private final HttpClient httpClient;

* Default constructor.
public ReactorNettyWebSocketClient() {
this(options -> {});

* Constructor that accepts an {@link HttpClientOptions.Builder} consumer
* to supply to {@link HttpClient#create(Consumer)}.
public ReactorNettyWebSocketClient(Consumer<? super HttpClientOptions.Builder> clientOptions) {
this.httpClient = HttpClient.create(clientOptions);

* Return the configured {@link HttpClient}.
public HttpClient getHttpClient() {
return this.httpClient;

public Mono<Void> execute(URI url, WebSocketHandler handler) {
return execute(url, new HttpHeaders(), handler);

public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler) {
List<String> protocols = beforeHandshake(url, headers, handler);

return getHttpClient()
nettyHeaders -> setNettyHeaders(headers, nettyHeaders),
.flatMap(response -> {
HandshakeInfo info = afterHandshake(url, toHttpHeaders(response));
ByteBufAllocator allocator = response.channel().alloc();
NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator);
return response.receiveWebsocket((in, out) -> {
WebSocketSession session = new ReactorNettyWebSocketSession(in, out, info, factory);
return handler.handle(session);

private void setNettyHeaders(HttpHeaders headers, io.netty.handler.codec.http.HttpHeaders nettyHeaders) {

private HttpHeaders toHttpHeaders(HttpClientResponse response) {
HttpHeaders headers = new HttpHeaders();
response.responseHeaders().forEach(entry -> {
String name = entry.getKey();
headers.put(name, response.responseHeaders().getAll(name));
return headers;








public interface RequestUpgradeStrategy {

* 升级到WebSocket会话并使用给定的处理程序处理它。
* @param exchange the current exchange
* @param webSocketHandler handler for the WebSocket session
* @param subProtocol the selected sub-protocol got the handler
* @return completion {@code Mono<Void>} to indicate the outcome of the
* WebSocket session handling.
Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler, @Nullable String subProtocol);




public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {

public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler, @Nullable String subProtocol) {
ReactorServerHttpResponse response = (ReactorServerHttpResponse) exchange.getResponse();
HandshakeInfo info = getHandshakeInfo(exchange, subProtocol);
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();

return response.getReactorResponse().sendWebsocket(subProtocol,
(in, out) -> handler.handle(new ReactorNettyWebSocketSession(in, out, info, bufferFactory)));

private HandshakeInfo getHandshakeInfo(ServerWebExchange exchange, @Nullable String protocol) {
ServerHttpRequest request = exchange.getRequest();
Mono<Principal> principal = exchange.getPrincipal();
return new HandshakeInfo(request.getURI(), request.getHeaders(), principal, protocol);








对于WebSocket端点,这意味着要处理初始的WebSocket HTTP握手请求。对于SockJS端点,这可能意味着处理所有在SockJS协议中定义的HTTP请求。

public interface WebSocketService {

* 处理HTTP请求并使用给定的WebSocketHandler。
* @param exchange the current exchange
* @param webSocketHandler handler for WebSocket session
* @return a completion Mono for the WebSocket session handling
Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler webSocketHandler);




public class HandshakeWebSocketService implements WebSocketService, Lifecycle {

private static final String SEC_WEBSOCKET_KEY = "Sec-WebSocket-Key";

private static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";

private static final boolean tomcatPresent = ClassUtils.isPresent(

private static final boolean jettyPresent = ClassUtils.isPresent(

private static final boolean undertowPresent = ClassUtils.isPresent(

private static final boolean reactorNettyPresent = ClassUtils.isPresent(

protected static final Log logger = LogFactory.getLog(HandshakeWebSocketService.class);

private final RequestUpgradeStrategy upgradeStrategy;

private volatile boolean running = false;

* 默认构造函数自动,基于类路径检测的RequestUpgradeStrategy的发现使用。
* Default constructor automatic, classpath detection based discovery of the
* {@link RequestUpgradeStrategy} to use.
public HandshakeWebSocketService() {

* 使用RequestUpgradeStrategy的替代构造函数。
* @param upgradeStrategy the strategy to use
public HandshakeWebSocketService(RequestUpgradeStrategy upgradeStrategy) {
Assert.notNull(upgradeStrategy, "RequestUpgradeStrategy is required");
this.upgradeStrategy = upgradeStrategy;

private static RequestUpgradeStrategy initUpgradeStrategy() {
String className;
if (tomcatPresent) {
className = "TomcatRequestUpgradeStrategy";
else if (jettyPresent) {
className = "JettyRequestUpgradeStrategy";
else if (undertowPresent) {
className = "UndertowRequestUpgradeStrategy";
else if (reactorNettyPresent) {
// As late as possible (Reactor Netty commonly used for WebClient)
className = "ReactorNettyRequestUpgradeStrategy";
else {
throw new IllegalStateException("No suitable default RequestUpgradeStrategy found");

try {
className = "org.springframework.web.reactive.socket.server.upgrade." + className;
Class<?> clazz = ClassUtils.forName(className, HandshakeWebSocketService.class.getClassLoader());
return (RequestUpgradeStrategy) ReflectionUtils.accessibleConstructor(clazz).newInstance();
catch (Throwable ex) {
throw new IllegalStateException(
"Failed to instantiate RequestUpgradeStrategy: " + className, ex);

* Return the {@link RequestUpgradeStrategy} for WebSocket requests.
public RequestUpgradeStrategy getUpgradeStrategy() {
return this.upgradeStrategy;

public boolean isRunning() {
return this.running;

public void start() {
if (!isRunning()) {
this.running = true;

protected void doStart() {
if (getUpgradeStrategy() instanceof Lifecycle) {
((Lifecycle) getUpgradeStrategy()).start();

public void stop() {
if (isRunning()) {
this.running = false;

protected void doStop() {
if (getUpgradeStrategy() instanceof Lifecycle) {
((Lifecycle) getUpgradeStrategy()).stop();

public Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler handler) {
ServerHttpRequest request = exchange.getRequest();
HttpMethod method = request.getMethod();
HttpHeaders headers = request.getHeaders();

if (logger.isDebugEnabled()) {
logger.debug("Handling " + request.getURI() + " with headers: " + headers);

if (HttpMethod.GET != method) {
return Mono.error(new MethodNotAllowedException(
request.getMethodValue(), Collections.singleton(HttpMethod.GET)));

if (!"WebSocket".equalsIgnoreCase(headers.getUpgrade())) {
return handleBadRequest("Invalid 'Upgrade' header: " + headers);

List<String> connectionValue = headers.getConnection();
if (!connectionValue.contains("Upgrade") && !connectionValue.contains("upgrade")) {
return handleBadRequest("Invalid 'Connection' header: " + headers);

String key = headers.getFirst(SEC_WEBSOCKET_KEY);
if (key == null) {
return handleBadRequest("Missing \"Sec-WebSocket-Key\" header");

String protocol = selectProtocol(headers, handler);
return this.upgradeStrategy.upgrade(exchange, handler, protocol);

private Mono<Void> handleBadRequest(String reason) {
if (logger.isDebugEnabled()) {
return Mono.error(new ServerWebInputException(reason));

private String selectProtocol(HttpHeaders headers, WebSocketHandler handler) {
String protocolHeader = headers.getFirst(SEC_WEBSOCKET_PROTOCOL);
if (protocolHeader != null) {
List<String> supportedProtocols = handler.getSubProtocols();
for (String protocol : StringUtils.commaDelimitedListToStringArray(protocolHeader)) {
if (supportedProtocols.contains(protocol)) {
return protocol;
return null;


WebSocketService实现通过委托给RequestUpgradeStrategy处理WebSocket HTTP握手请求,该请求可以从类路径自动检测(无参数构造函数),但也可以显式配置。

