2.2.2 EndPoint Acceptor组件

2.2.2 EndPoint Acceptor组件

#startAcceptorThreads

protected void startAcceptorThreads() {
        int count = this.getAcceptorThreadCount();
        this.acceptors = new ArrayList(count);

        for(int i = 0; i < count; ++i) {
            Acceptor<U> acceptor = new Acceptor(this);
            String threadName = this.getName() + "-Acceptor-" + i;
            acceptor.setThreadName(threadName);
            this.acceptors.add(acceptor);
            Thread t = new Thread(acceptor, threadName);
            t.setPriority(this.getAcceptorThreadPriority());
            t.setDaemon(this.getDaemon());
            t.start();
        }

    }

默认count

protected int acceptorThreadCount = 1;

Acceptor

看名字是接收器的意思 应该是HTTP请求的接受处理组件
看源码 和 Poller一样 是个runable

public class Acceptor<U> implements Runnable {}

看看有什么属性,以备查看

属性

 private static final Log log = LogFactory.getLog(Acceptor.class);
    private static final StringManager sm = StringManager.getManager(Acceptor.class);

    private static final int INITIAL_ERROR_DELAY = 50;
    private static final int MAX_ERROR_DELAY = 1600;

    private final AbstractEndpoint<?,U> endpoint;
    private String threadName;
    protected volatile AcceptorState state = AcceptorState.NEW;

run方法

@Override
    public void run() {

        int errorDelay = 0;

        // Loop until we receive a shutdown command
        while (endpoint.isRunning()) {

            // Loop if endpoint is paused
            //这是一个神奇的判断 是暂停并且是运行?先不管 往下看
            while (endpoint.isPaused() && endpoint.isRunning()) {
                state = AcceptorState.PAUSED;
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // Ignore
                }
            }

            if (!endpoint.isRunning()) {
                break;
            }
            state = AcceptorState.RUNNING;

            try {
                //if we have reached max connections, wait
                endpoint.countUpOrAwaitConnection();

                // Endpoint might have been paused while waiting for latch
                // If that is the case, don"t accept new connections
                if (endpoint.isPaused()) {
                    continue;
                }
                //到这里才是真正的获取请求,得到了一个socket
                //这里的U 挺有意思
                //从哪里来的呢
                //从NioEndPoint的定义 U就是SocketChannel
                //public class NioEndpoint extends AbstractJsseEndpoint<NioChannel, SocketChannel> {}
                //而Nio2EndPoint是这么定义的
                //public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel, AsynchronousSocketChannel> {}
                //U 就是AsynchronousSocketChannel
                U socket = null;
                try {
                    // Accept the next incoming connection from the server
                    // socket
                    socket = endpoint.serverSocketAccept();
                }
                //异常处理部分先不管,下面的那个也是一样
                catch (Exception ioe) {
                    // We didn"t get a socket
                    endpoint.countDownConnection();
                    if (endpoint.isRunning()) {
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    } else {
                        break;
                    }
                }
                // Successful accept, reset the error delay
                errorDelay = 0;

                // Configure the socket
                //这里有一个关键的方法 endpoint.setSocketOptions(socket)
                //下面列出他的源码,看看这个方法干了些什么呢
                if (endpoint.isRunning() && !endpoint.isPaused()) {
                    // setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    if (!endpoint.setSocketOptions(socket)) {
                        endpoint.closeSocket(socket);
                    }
                } else {
                    endpoint.destroySocket(socket);
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                String msg = sm.getString("endpoint.accept.fail");
                // APR specific.
                // Could push this down but not sure it is worth the trouble.
                if (t instanceof Error) {
                    Error e = (Error) t;
                    if (e.getError() == 233) {
                        // Not an error on HP-UX so log as a warning
                        // so it can be filtered out on that platform
                        // See bug 50273
                        log.warn(msg, t);
                    } else {
                        log.error(msg, t);
                    }
                } else {
                        log.error(msg, t);
                }
            }
        }
        state = AcceptorState.ENDED;
    }

setSocketOptions

  //参数就是上面获取的socket
 @Override
    protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //又设置了一堆属性
            //disable blocking, APR style, we are gonna be polling it
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);
            //获取NioChannel 是一个Tomcat自定义的Channel
            NioChannel channel = nioChannels.pop();
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                channel.reset();
            }
            //重点来了 重点老是在不经意的地方
            //调用的poller的注册方法 把刚才那个channel注册进去了
            //获取的socket是channel的一个属性
            //接着去看poller的register方法干了什么
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error(sm.getString("endpoint.socketOptionsError"), t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }

接着去看poller的register方法干了什么
请看【2.2.3 EndPoint Poller组件】

hmoban主题是根据ripro二开的主题,极致后台体验,无插件,集成会员系统
自学咖网 » 2.2.2 EndPoint Acceptor组件