博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty-服务器端启动流程 源码分析.md
阅读量:5913 次
发布时间:2019-06-19

本文共 6424 字,大约阅读时间需要 21 分钟。

添加EventExecutorGroup

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {protected MultithreadEventExecutorGroup(int nThreads, Executor executor,                                            EventExecutorChooserFactory chooserFactory, Object... args) {        if (nThreads <= 0) {            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));        }        if (executor == null) {            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());        }        children = new EventExecutor[nThreads];        for (int i = 0; i < nThreads; i ++) {            boolean success = false;            try {                children[i] = newChild(executor, args);                success = true;            } catch (Exception e) {                // TODO: Think about if this is a good exception type                throw new IllegalStateException("failed to create a child event loop", e);            } finally {                if (!success) {                    for (int j = 0; j < i; j ++) {                        children[j].shutdownGracefully();                    }                    for (int j = 0; j < i; j ++) {                        EventExecutor e = children[j];                        try {                            while (!e.isTerminated()) {                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);                            }                        } catch (InterruptedException interrupted) {                            // Let the caller handle the interruption.                            Thread.currentThread().interrupt();                            break;                        }                    }                }            }        }        chooser = chooserFactory.newChooser(children);        final FutureListener terminationListener = new FutureListener() {            @Override            public void operationComplete(Future future) throws Exception {                if (terminatedChildren.incrementAndGet() == children.length) {                    terminationFuture.setSuccess(null);                }            }        };        for (EventExecutor e: children) {            e.terminationFuture().addListener(terminationListener);        }        Set
childrenSet = new LinkedHashSet
(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }复制代码

bind

private static void doBind0(            final ChannelFuture regFuture, final Channel channel,            final SocketAddress localAddress, final ChannelPromise promise) {        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up        // the pipeline in its channelRegistered() implementation.        channel.eventLoop().execute(new Runnable() { //提交/执行任务            @Override            public void run() { //业务方法                if (regFuture.isSuccess()) {                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); //                } else {                    promise.setFailure(regFuture.cause());                }            }        });    }复制代码
@Override    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {        if (localAddress == null) {            throw new NullPointerException("localAddress");        }        if (!validatePromise(promise, false)) {            // cancelled            return promise;        }        final AbstractChannelHandlerContext next = findContextOutbound();        EventExecutor executor = next.executor();        if (executor.inEventLoop()) {            next.invokeBind(localAddress, promise);        } else {            safeExecute(executor, new Runnable() {                @Override                public void run() {                    next.invokeBind(localAddress, promise);                }            }, promise, null);        }        return promise;    }复制代码
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap        implements ChannelHandlerContext, ResourceLeakHint {@Override    public ChannelFuture connect(            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {        if (remoteAddress == null) {            throw new NullPointerException("remoteAddress");        }        if (!validatePromise(promise, false)) {            // cancelled            return promise;        }        final AbstractChannelHandlerContext next = findContextOutbound();        EventExecutor executor = next.executor();        if (executor.inEventLoop()) {            next.invokeConnect(remoteAddress, localAddress, promise);        } else {            safeExecute(executor, new Runnable() {                @Override                public void run() {                    next.invokeConnect(remoteAddress, localAddress, promise);                }            }, promise, null);        }        return promise;    }复制代码
class ServerSocketChannelImpl    extends ServerSocketChannel    implements SelChImpl{@Override    public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {        synchronized (lock) {            if (!isOpen())                throw new ClosedChannelException();            if (isBound())                throw new AlreadyBoundException();            InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) :                Net.checkAddress(local);            SecurityManager sm = System.getSecurityManager();            if (sm != null)                sm.checkListen(isa.getPort());            NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());            Net.bind(fd, isa.getAddress(), isa.getPort()); //最终还是调用ServerSocketChannelImpl.bind            Net.listen(fd, backlog < 1 ? 50 : backlog);             synchronized (stateLock) {                localAddress = Net.localAddress(fd);            }        }        return this;    }复制代码

addListener()

public class DefaultChannelPromise extends DefaultPromise
implements ChannelPromise, FlushCheckpoint {@Override public ChannelPromise addListener(GenericFutureListener
> listener) { super.addListener(listener); return this; }复制代码

转载于:https://juejin.im/post/5c511490e51d453f5e6b6ea7

你可能感兴趣的文章
读书笔记-深入理解JVM虚拟机-1.OOM初探
查看>>
机器学习入门之二:一个故事说明什么是机器学习(转载)
查看>>
Yii CDbCriteria 常用方法
查看>>
libgc 加 .make 在 vc6 vs2008 中的编译方法
查看>>
用条件变量实现事件等待器的正确与错误做法
查看>>
软件度量都该度个啥?(5)——被吹得最多的六西格玛
查看>>
Maven教程初级篇02:pom.xml配置初步
查看>>
pip更改下载源设置
查看>>
初识 zookeeper
查看>>
65536问题理解v4
查看>>
java并发之CountDownLatch使用指南
查看>>
redux和react-redux的吐槽
查看>>
在微信小程序中保存网络图片
查看>>
微信小程序--------语音识别(前端自己也能玩)
查看>>
来,膜拜下android roadmap,强大的执行力
查看>>
机器人定位导航技术 激光SLAM与视觉SLAM谁更胜一筹?
查看>>
JS中的call、apply、bind方法详解
查看>>
Etcd超全解:原理阐释及部署设置的最佳实践
查看>>
PAT A1038
查看>>
dva中组件的懒加载
查看>>