⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 https://www.jianshu.com/p/1ad424c53e80 「占小狼」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

上一章节中,分析了Netty如何处理read事件,本节分析Netty如何把数据写会客户端。

把数据返回客户端,需要经历三个步骤: 1、申请一块缓存buf,写入数据。 2、将buf保存到ChannelOutboundBuffer中。 3、将ChannelOutboundBuffer中的buff输出到socketChannel中。

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ReferenceCountUtil.release(msg);

ByteBuf buf1 = ctx.alloc().buffer(4);
buf1.writeInt(1);

ByteBuf buf2 = ctx.alloc().buffer(4);
buf2.writeInt(2);

ByteBuf buf3 = ctx.alloc().buffer(4);
buf3.writeInt(3);

ctx.write(buf1);
ctx.write(buf2);
ctx.write(buf3);
ctx.flush();
}

为什么需要把buf保存到ChannelOutboundBuffer?

**ctx.write()**实现:

//AbstractChannelHandlerContext.java
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}

private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeWrite(msg, promise);
if (flush) {
next.invokeFlush();
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, msg, promise);
} else {
task = WriteTask.newInstance(next, msg, promise);
}
safeExecute(executor, task, promise, msg);
}
}

默认情况下,findContextOutbound()会找到pipeline的head节点,触发write方法。

//HeadContext.java
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}

//AbstractUnsafe
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
ReferenceCountUtil.release(msg);
return;
}

int size;
try {
msg = filterOutboundMessage(msg);
size = estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}

outboundBuffer.addMessage(msg, size, promise);
}

outboundBuffer 随着Unsafe一起实例化,最终将msg通过outboundBuffer封装起来。

ChannelOutboundBuffer内部维护了一个Entry链表,并使用Entry封装msg。 1、unflushedEntry:指向链表头部 2、tailEntry:指向链表尾部 3、totalPendingSize:保存msg的字节数 4、unwritable:不可写标识

public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}

// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(size, false);
}

通过Entry.newInstance返回Entry实例,Netty对Entry采用了缓存策略,使用完的Entry实例需要清空并回收,难道是因为Entry实例化比较耗时?

新的entry默认插入链表尾部,并让tailEntry指向它。

img

Paste_Image.png

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize >= channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}

方法incrementPendingOutboundBytes主要采用CAS更新totalPendingSize字段,并判断当前totalPendingSize是否超过阈值writeBufferHighWaterMark,默认是65536。如果totalPendingSize >= 65536,则采用CAS更新unwritable为1,并触发ChannelWritabilityChanged事件。

到此为止,全部的buf数据已经保存在outboundBuffer中。

**ctx.flush()**实现:

public ChannelHandlerContext flush() {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeFlush();
} else {
Runnable task = next.invokeFlushTask;
if (task == null) {
next.invokeFlushTask = task = new Runnable() {
@Override
public void run() {
next.invokeFlush();
}
};
}
safeExecute(executor, task, channel().voidPromise(), null);
}

return this;
}

默认情况下,findContextOutbound()会找到pipeline的head节点,触发flush方法。

//HeadContext.java
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}

//AbstractUnsafe
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}

方法addFlush主要对write过程添加的msg进行flush标识,其实我不清楚,这个标识过程有什么意义。

直接看flush0方法:

protected final void flush0() {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
if (isFlushPending()) {
return;
}
super.flush0();
}

private boolean isFlushPending() {
SelectionKey selectionKey = selectionKey();
return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}

1、如果当前selectionKey 是写事件,说明有线程执行flush过程,则直接返回。 2、否则直接执行flush操作。

protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}

final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}

inFlush0 = true;

// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}

try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, false);
} else {
outboundBuffer.failFlushed(t, true);
}
} finally {
inFlush0 = false;
}
}

public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}

1、如果当前socketChannel已经关闭,或断开连接,则执行失败操作。 2、否则执行doWrite把数据写入到socketChannel。

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
int size = in.size();
if (size == 0) {
// All written so clear OP_WRITE
clearOpWrite();
break;
}
long writtenBytes = 0;
boolean done = false;
boolean setOpWrite = false;

// Ensure the pending writes are made of ByteBufs only.
ByteBuffer[] nioBuffers = in.nioBuffers();
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
SocketChannel ch = javaChannel();

// Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
super.doWrite(in);
return;
case 1:
// Only one ByteBuf so use non-gathering write
ByteBuffer nioBuffer = nioBuffers[0];
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final int localWrittenBytes = ch.write(nioBuffer);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
default:
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
}

// Release the fully written buffers, and update the indexes of the partially written buffer.
in.removeBytes(writtenBytes);

if (!done) {
// Did not write all buffers completely.
incompleteWrite(setOpWrite);
break;
}
}
}

1、size方法返回outboundBuffer有多少Entry实例。 2、in.nioBuffers()负责把Entry中保存的ByteBuf类型的msg,重新返回Nio的ByteBuffer实例,并返回ByteBuffer数组nioBuffers,其实msg和ByteBuffer实例指向的是同一块内存,因为在UnpooledDirectByteBuf实现类中,已经维护了ByteBuffer的实例。 3、socketChannel.write()方法把nioBuffers的数据写到socket中,这是Nio中的实现。

到此为止,nioBuffers的数据都flush到socket,客户端可以准备接收了。

666. 彩蛋

如果你对 Netty 并发感兴趣,欢迎加入我的知识星球一起交流。

知识星球

文章目录
  1. 1. 666. 彩蛋