2.2.2 EndPoint Acceptor组件
#startAcceptorThreads
protected void startAcceptorThreads() {
int count = this.getAcceptorThreadCount();
this.acceptors = new ArrayList(count);
for(int i = 0; i < count; ++i) {
Acceptor<U> acceptor = new Acceptor(this);
String threadName = this.getName() + "-Acceptor-" + i;
acceptor.setThreadName(threadName);
this.acceptors.add(acceptor);
Thread t = new Thread(acceptor, threadName);
t.setPriority(this.getAcceptorThreadPriority());
t.setDaemon(this.getDaemon());
t.start();
}
}
默认count
protected int acceptorThreadCount = 1;
Acceptor
看名字是接收器的意思 应该是HTTP请求的接受处理组件
看源码 和 Poller一样 是个runable
public class Acceptor<U> implements Runnable {}
看看有什么属性,以备查看
属性
private static final Log log = LogFactory.getLog(Acceptor.class);
private static final StringManager sm = StringManager.getManager(Acceptor.class);
private static final int INITIAL_ERROR_DELAY = 50;
private static final int MAX_ERROR_DELAY = 1600;
private final AbstractEndpoint<?,U> endpoint;
private String threadName;
protected volatile AcceptorState state = AcceptorState.NEW;
run方法
@Override
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (endpoint.isRunning()) {
// Loop if endpoint is paused
//这是一个神奇的判断 是暂停并且是运行?先不管 往下看
while (endpoint.isPaused() && endpoint.isRunning()) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!endpoint.isRunning()) {
break;
}
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait
endpoint.countUpOrAwaitConnection();
// Endpoint might have been paused while waiting for latch
// If that is the case, don"t accept new connections
if (endpoint.isPaused()) {
continue;
}
//到这里才是真正的获取请求,得到了一个socket
//这里的U 挺有意思
//从哪里来的呢
//从NioEndPoint的定义 U就是SocketChannel
//public class NioEndpoint extends AbstractJsseEndpoint<NioChannel, SocketChannel> {}
//而Nio2EndPoint是这么定义的
//public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel, AsynchronousSocketChannel> {}
//U 就是AsynchronousSocketChannel
U socket = null;
try {
// Accept the next incoming connection from the server
// socket
socket = endpoint.serverSocketAccept();
}
//异常处理部分先不管,下面的那个也是一样
catch (Exception ioe) {
// We didn"t get a socket
endpoint.countDownConnection();
if (endpoint.isRunning()) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
//这里有一个关键的方法 endpoint.setSocketOptions(socket)
//下面列出他的源码,看看这个方法干了些什么呢
if (endpoint.isRunning() && !endpoint.isPaused()) {
// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
String msg = sm.getString("endpoint.accept.fail");
// APR specific.
// Could push this down but not sure it is worth the trouble.
if (t instanceof Error) {
Error e = (Error) t;
if (e.getError() == 233) {
// Not an error on HP-UX so log as a warning
// so it can be filtered out on that platform
// See bug 50273
log.warn(msg, t);
} else {
log.error(msg, t);
}
} else {
log.error(msg, t);
}
}
}
state = AcceptorState.ENDED;
}
setSocketOptions
//参数就是上面获取的socket
@Override
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//又设置了一堆属性
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
//获取NioChannel 是一个Tomcat自定义的Channel
NioChannel channel = nioChannels.pop();
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
//重点来了 重点老是在不经意的地方
//调用的poller的注册方法 把刚才那个channel注册进去了
//获取的socket是channel的一个属性
//接着去看poller的register方法干了什么
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
接着去看poller的register方法干了什么
请看【2.2.3 EndPoint Poller组件】