本篇内容介绍了“Netty NioEventLoop启动过程是怎样的”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
启动
分析NioEventLoop的execute()接口,主要逻辑如下:
添加任务队列
绑定当前线程到EventLoop上
调用EventLoop的run()方法
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // 通过eventLoop来执行channel绑定的Task
channel.eventLoop().execute(new Runnable() { @Override
public void run() { if (regFuture.isSuccess()) { // channel绑定
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
往下追踪到 SingleThreadEventExecutor 中 execute 接口,如下:
@Overridepublic void execute(Runnable task) { if (task == null) { throw new NullPointerException("task");
} // 判断当前运行时线程是否与EventLoop中绑定的线程一致
// 这里还未绑定Thread,所以先返回false
boolean inEventLoop = inEventLoop(); // 将任务添加任务队列,也就是我们前面讲EventLoop创建时候提到的 MpscQueue.
addTask(task); if (!inEventLoop) { // 启动线程
startThread(); if (isShutdown() && removeTask(task)) {
reject();
}
} if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
启动线程接口:
private void startThread() { // 状态比较,最开始时state = 1 ,为true
if (state == ST_NOT_STARTED) { // cs操作后,state状态设置为 2
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { try { // 启动接口
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}// 执行线程启动方法private void doStartThread() { // 断言判断 SingleThreadEventExecutor 还未绑定 Thread
assert thread == null; // executor 执行任务
executor.execute(new Runnable() { @Override
public void run() { // 将 SingleThreadEventExecutor(在我们的案例中就是NioEventLoop) 与 当前线程进行绑定
thread = Thread.currentThread(); if (interrupted) {
thread.interrupt();
} // 设置状态为 false
boolean success = false; // 更新最近一次任务的执行时间
updateLastExecutionTime(); try { // 往下调用 NioEventLoop 的 run 方法,执行
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...
}
}
});
}
执行
往下调用到 NioEventLoop 中的 run 方法,通过无限for循环,主要做以下三件事情:
@Overrideprotected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: // 轮训检测I/O事件
// wakenUp为了标记selector是否是唤醒状态,每次select操作,都设置为false,也就是未唤醒状态。
select(wakenUp.getAndSet(false)); // 'wakenUp.compareAndSet(false, true)' 总是在调用 'selector.wakeup()' 之前进行评估,以减少唤醒的开销
// (Selector.wakeup() 是非常耗性能的操作.)
// 但是,这种方法存在竞争条件。当「wakeup」太早设置为true时触发竞争条件
// 在下面两种情况下,「wakenUp」会过早设置为true:
// 1)Selector 在 'wakenUp.set(false)' 与 'selector.select(...)' 之间被唤醒。(BAD)
// 2)Selector 在 'selector.select(...)' 与 'if (wakenUp.get()) { ... }' 之间被唤醒。(OK)
// 在第一种情况下,'wakenUp'设置为true,后面的'selector.select(...)'将立即唤醒。 直到'wakenUp'在下一轮中再次设置为false,'wakenUp.compareAndSet(false,true)'将失败,因此任何唤醒选择器的尝试也将失败,从而导致以下'selector.select(。 ..)'呼吁阻止不必要的。
// 要解决这个问题,如果在selector.select(...)操作之后wakenUp立即为true,我们会再次唤醒selector。 它是低效率的,因为它唤醒了第一种情况(BAD - 需要唤醒)和第二种情况(OK - 不需要唤醒)的选择器。
if (wakenUp.get()) {
selector.wakeup();
} // fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false; // ioRatio 表示处理I/O事件与执行具体任务事件之间所耗时间的比值。
// ioRatio 默认为50
final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { // 处理I/O事件
processSelectedKeys();
} finally { // 处理任务队列
runAllTasks();
}
} else { // 处理IO事件的开始时间
final long ioStartTime = System.nanoTime(); try { // 处理I/O事件
processSelectedKeys();
} finally { // 记录io所耗时间
final long ioTime = System.nanoTime() - ioStartTime; // 处理任务队列,设置最大的超时时间
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try { if (isShuttingDown()) {
closeAll(); if (confirmShutdown()) { return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
轮循检测I/O事件
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector; try { // select操作计数
int selectCnt = 0; // 记录当前系统时间
long currentTimeNanos = System.nanoTime(); // delayNanos方法用于计算定时任务队列,最近一个任务的截止时间
// selectDeadLineNanos 表示当前select操作所不能超过的最大截止时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { // 计算超时时间,判断是否超时
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; // 如果 timeoutMillis <= 0, 表示超时,进行一个非阻塞的 select 操作。设置 selectCnt 为 1. 并终止本次循环。
if (timeoutMillis <= 0) { if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
} break;
} // 当wakenUp为ture时,恰好有task被提交,这个task将无法获得调用的机会
// Selector#wakeup. 因此,在执行select操作之前,需要再次检查任务队列
// 如果不这么做,这个Task将一直挂起,直到select操作超时
// 如果 pipeline 中存在 IdleStateHandler ,那么Task将一直挂起直到 空闲超时。
if (hasTasks() && wakenUp.compareAndSet(false, true)) { // 调用非阻塞方法
selector.selectNow();
selectCnt = 1; break;
} // 如果当前任务队列为空,并且超时时间未到,则进行一个阻塞式的selector操作。timeoutMillis 为最大的select时间
int selectedKeys = selector.select(timeoutMillis); // 操作计数 +1
selectCnt ++;
// 存在以下情况,本次selector则终止
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // - 轮训到了事件(Selected something,)
// - 被用户唤醒(waken up by user,)
// - 已有任务队列(the task queue has a pending task.)
// - 已有定时任务(a scheduled task is ready for processing)
break;
} if (Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1; break;
} // 记录当前时间
long time = System.nanoTime(); // 如果time > currentTimeNanos + timeoutMillis(超时时间),则表明已经执行过一次select操作
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected.
selectCnt = 1;
}
// 如果 time <= currentTimeNanos + timeoutMillis,表示触发了空轮训
// 如果空轮训的次数超过 SELECTOR_AUTO_REBUILD_THRESHOLD (512),则重建一个新的selctor,避免空轮训
else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector); // 重建创建一个新的selector
rebuildSelector();
selector = this.selector; // Select again to populate selectedKeys.
// 对重建后的selector进行一次非阻塞调用,用于获取最新的selectedKeys
selector.selectNow(); // 设置select计数
selectCnt = 1; break;
}
currentTimeNanos = time;
} if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) { if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
} // Harmless exception - log anyway
}
}
重新创建一个新的Selector
该方法的主要逻辑就是:
public void rebuildSelector() { if (!inEventLoop()) {
execute(new Runnable() { @Override
public void run() {
rebuildSelector0();
}
}); return;
}
rebuildSelector0();
}// 重新创建selectorprivate void rebuildSelector0() { // 暂存老的selector
final Selector oldSelector = selector; final SelectorTuple newSelectorTuple; if (oldSelector == null) { return;
} try { // 创建一个新的 SelectorTuple
// openSelector()在之前分析过了
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e); return;
} // Register all channels to the new Selector.
// 记录select上注册的channel数量
int nChannels = 0; // 遍历老的 selector 上的 SelectionKey
for (SelectionKey key: oldSelector.keys()) { // 获取 attachment,这里的attachment就是我们前面在讲 Netty Channel注册时,select会将channel赋值到 attachment 变量上。
// 获取老的selector上注册的channel
Object a = key.attachment(); try { if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue;
} // 获取兴趣集
int interestOps = key.interestOps(); // 取消 SelectionKey
key.cancel(); // 将老的兴趣集重新注册到前面新创建的selector上
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) { // Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
} // nChannels计数 + 1
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else { @SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
// 设置新的 selector
selector = newSelectorTuple.selector; // 设置新的 unwrappedSelector
unwrappedSelector = newSelectorTuple.unwrappedSelector; try { // time to close the old selector as everything else is registered to the new one
// 关闭老的seleclor
oldSelector.close();
} catch (Throwable t) { if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
} if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}
处理I/O事件
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
// 设置为null,有利于GC回收
selectedKeys.keys[i] = null; // 获取 SelectionKey 中的 attachment, 我们这里就是 NioChannel
final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { // 处理 SelectedKey
processSelectedKey(k, (AbstractNioChannel) a);
} else { @SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
} if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}// 处理 SelectedKeyprivate void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { // 获取Netty Channel中的 NioUnsafe 对象,用于后面的IO操作
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // 判断 SelectedKey 的有效性,如果无效,则直接返回并关闭channel
if (!k.isValid()) { final EventLoop eventLoop; try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
} // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) { return;
} // close the channel if the key is not valid anymore
// 关闭channel
unsafe.close(unsafe.voidPromise()); return;
} try { // 获取 SelectionKey 中所有准备就绪的操作集
int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
// 在调用处理READ与WRITE事件之间,先调用finishConnect()接口,避免异常 NotYetConnectedException 发生。
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
} // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
// 处理 WRITE 事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
} // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 处理 ACCEPT 与 READ 事件
// 如果当前的EventLoop是WorkGroup,则表示有 READ 事件
// 如果当前的EventLoop是BossGroup,则表示有 ACCEPT 事件,有新连接进来了
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 读取数据
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
关于 unsafe.read()
的分析,请看
后文
执行所有任务
接下来,我们了解一下执行具体Task任务的接口:runAllTasks。在EventLoop中,待执行的任务队列分为两种:一种是普通任务队列,一种是定时任务队列。
前面
我们讲 EventLoop 创建时提到过NioEventLoop中 taskQueue 的创建,是一个MpscQueue,关于高效率的MpscQueue 后面单独写文章进行介绍:
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
// 存放普通任务的队列
private final Queue<Runnable> taskQueue;
...
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = ObjectUtil.checkNotNull(executor, "executor"); // 创建TaskQueue
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
...
}public final class NioEventLoop extends SingleThreadEventLoop {
...
// NioEventLoop 创建TaskQueue队列
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
...
}
存放定时任务的队列在 AbstractScheduledEventExecutor 中,成员变量为 scheduledTaskQueue,代码如下:
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
// 优先级队列的比较器
private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR = new Comparator<ScheduledFutureTask<?>>() { @Override
public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) { return o1.compareTo(o2);
}
};
// 存放定时任务的优先级队列
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue; // 创建定时任务队列
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() { if (scheduledTaskQueue == null) {
scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
SCHEDULED_FUTURE_TASK_COMPARATOR, // Use same initial capacity as java.util.PriorityQueue
11);
} return scheduledTaskQueue;
}
// 保存定时任务
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(unit, "unit"); if (delay < 0) {
delay = 0;
}
validateScheduled0(delay, unit); return schedule(new ScheduledFutureTask<Void>( this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
} // 保存定时任务
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(callable, "callable");
ObjectUtil.checkNotNull(unit, "unit"); if (delay < 0) {
delay = 0;
}
validateScheduled0(delay, unit); return schedule(new ScheduledFutureTask<V>( this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
} // 保存定时任务
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { // 判断是否为当前线程
if (inEventLoop()) { // 添加定时任务队列
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() { @Override
public void run() { // 添加定时任务队列
scheduledTaskQueue().add(task);
}
});
} return task;
}
}
Netty存放定时任务队列为
DefaultPriorityQueue
,定时任务的封装对象为 ScheduledFutureTask ,在队列中的优先按照它们的截止时间进行排序,其次在按照id进行排序。
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
...
// 比较 ScheduledFutureTask 之间的排序
@Override
public int compareTo(Delayed o) { if (this == o) { return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o; long d = deadlineNanos() - that.deadlineNanos(); if (d < 0) { return -1;
} else if (d > 0) { return 1;
} else if (id < that.id) { return -1;
} else if (id == that.id) { throw new Error();
} else { return 1;
}
}
...
}
再来看看任务的执行逻辑,首先将定时任务取出,聚合到普通任务队列中,再去for循环运行每个Task。
protected boolean runAllTasks(long timeoutNanos) { // 将定时任务从定时队列中取出,放入普通队列中
fetchFromScheduledTaskQueue(); // 从队列中取出任务
Runnable task = pollTask(); if (task == null) {
afterRunningAllTasks(); return false;
} // 计算任务执行的最大超时时间
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; // 任务计数
long runTasks = 0; // 最近一次任务执行的时间
long lastExecutionTime; for (;;) { // 执行任务
safeExecute(task); // 任务计数 +1
runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
// 由于nanoTime() 是非常好性能的操作,因此每64次就对比一下 定时任务的执行时间与 deadline,
// 如果 lastExecutionTime >= deadline,则表示任务超时了,需要中断退出
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break;
}
}
// 获取任务
task = pollTask(); if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime(); break;
}
}
afterRunningAllTasks(); // 记录最后一次的执行时间
this.lastExecutionTime = lastExecutionTime; return true;
}// 取出任务protected Runnable pollTask() { assert inEventLoop(); return pollTaskFrom(taskQueue);
}// 从队列中取出任务protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) { for (;;) {
Runnable task = taskQueue.poll(); if (task == WAKEUP_TASK) { continue;
} return task;
}
}// 将定时任务从定时队列中取出,聚合到普通队列中:private boolean fetchFromScheduledTaskQueue() { // 得到nanoTime = 当前时间 - ScheduledFutureTask的START_TIME(开始时间)
long nanoTime = AbstractScheduledEventExecutor.nanoTime(); // 获得截止时间小于nanoTime的定时任务
Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { // 将定时任务放入普通队列中,以备运行
if (!taskQueue.offer(scheduledTask)) { // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
// 如果 taskQueue 没有足够的空间,导致添加失败,则将其返回定时任务队列中
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false;
}
scheduledTask = pollScheduledTask(nanoTime);
} return true;
}// 获得截止时间小于nanoTime的定时任务protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); // 获取定时任务队列
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; // 获取第一个定时任务
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null;
} // 如果该定时任务的截止时间 <= nanoTime ,则返回
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove(); return scheduledTask;
} return null;
}
“Netty NioEventLoop启动过程是怎样的”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注天达云网站,小编将为大家输出更多高质量的实用文章!