Apache Tomcat怎么高并发处理请求
导读:本文共9587.5字符,通常情况下阅读需要32分钟。同时您也可以点击右侧朗读,来听本文内容。按键盘←(左) →(右) 方向键可以翻页。
摘要: 介绍作为常用的http协议服务器,tomcat应用非常广泛。tomcat也是遵循Servelt协议的,Servelt协议可以让服务器与真实服务逻辑代码进行解耦。各自只需要关注Servlet协议即可。对于tomcat是如何作为一个高性能的服务器的呢?你是不是也会有这样的疑问?tomcat是如何接收网络请求?如何做到高性能的http协议服务器?tomcat从8.0往... ...
目录
(为您整理了一些要点),点击可以直达。介绍
作为常用的http
协议服务器,tomcat
应用非常广泛。tomcat也是遵循Servelt协议的,Servelt
协议可以让服务器与真实服务逻辑代码进行解耦。各自只需要关注Servlet
协议即可。
对于tomcat是如何作为一个高性能的服务器的呢?你是不是也会有这样的疑问?
tomcat是如何接收网络请求?
如何做到高性能的http
协议服务器?
tomcat从8.0往后开始使用了NIO非阻塞io模型,提高了吞吐量,本文的源码是tomcat 9.0.48版本
接收Socket请求
org.apache.tomcat.util.net.Acceptor
实现了Runnable
接口,在一个单独的线程中以死循环的方式一直进行socket的监听
线程的初始化及启动是在方法org.apache.tomcat.util.net.AbstractEndpoint#startAcceptorThread
有个很重要的属性org.apache.tomcat.util.net.AbstractEndpoint
;同时实现了run
方法,方法中主要有以下功能:
请求最大连接数限制: 最大为 8*1024;请你注意到达最大连接数后操作系统底层还是会接收客户端连接,但用户层已经不再接收
获取
socketChannel
publicvoidrun(){interrorDelay=0;try{//Loopuntilwereceiveashutdowncommandwhile(!stopCalled){ ...if(stopCalled){break;}state=AcceptorState.RUNNING;try{//ifwehavereachedmaxconnections,wait//如果连接超过了8*1024,则线程阻塞等待;是使用org.apache.tomcat.util.threads.LimitLatch类实现了分享锁(内部实现了AbstractQueuedSynchronizer)//请你注意到达最大连接数后操作系统底层还是会接收客户端连接,但用户层已经不再接收。endpoint.countUpOrAwaitConnection();//Endpointmighthavebeenpausedwhilewaitingforlatch//Ifthatisthecase,don'tacceptnewconnectionsif(endpoint.isPaused()){continue;}Usocket=null;try{//Acceptthenextincomingconnectionfromtheserver//socket//抽象方法,不同的endPoint有不同的实现方法。NioEndPoint为例,实现方法为serverSock.accept(),这个方法主要看serverSock实例化时如果为阻塞,accept方法为阻塞;反之为立即返回,如果没有socket链接,则为nullsocket=endpoint.serverSocketAccept();}catch(Exceptionioe){//Wedidn'tgetasocketendpoint.countDownConnection();if(endpoint.isRunning()){//IntroducedelayifnecessaryerrorDelay=handleExceptionWithDelay(errorDelay);//re-throwthrowioe;}else{break;}}//Successfulaccept,resettheerrordelayerrorDelay=0;//Configurethesocketif(!stopCalled&&!endpoint.isPaused()){//setSocketOptions()willhandthesocketoffto//anappropriateprocessorifsuccessful//endPoint类的抽象方法,不同的endPoint有不同的实现。处理获取到的socketChannel链接,如果该socket链接能正常处理,那么该方法会返回true,否则为falseif(!endpoint.setSocketOptions(socket)){endpoint.closeSocket(socket);}}else{endpoint.destroySocket(socket);}}catch(Throwablet){...}}}finally{stopLatch.countDown();}state=AcceptorState.ENDED;}
再来看下org.apache.tomcat.util.net.NioEndpoint#setSocketOptions
方法的具体实现(NioEndpoint为例)
这个方法中主要做的事:
创建NioChannel
设置socket为非阻塞
将socket添加到Poller的队列中
protectedbooleansetSocketOptions(SocketChannelsocket){NioSocketWrappersocketWrapper=null;try{//Allocatechannelandwrapper//优先使用已有的缓存nioChannelNioChannelchannel=null;if(nioChannels!=null){channel=nioChannels.pop();}if(channel==null){SocketBufferHandlerbufhandler=newSocketBufferHandler(socketProperties.getAppReadBufSize(),socketProperties.getAppWriteBufSize(),socketProperties.getDirectBuffer());if(isSSLEnabled()){channel=newSecureNioChannel(bufhandler,this);}else{channel=newNioChannel(bufhandler);}}//将nioEndpoint与NioChannel进行包装NioSocketWrappernewWrapper=newNioSocketWrapper(channel,this);channel.reset(socket,newWrapper);connections.put(socket,newWrapper);socketWrapper=newWrapper;//Setsocketproperties//Disableblocking,pollingwillbeused//设置当前链接的socket为非阻塞socket.configureBlocking(false);if(getUnixDomainSocketPath()==null){socketProperties.setProperties(socket.socket());}socketWrapper.setReadTimeout(getConnectionTimeout());socketWrapper.setWriteTimeout(getConnectionTimeout());socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());//将包装后的nioChannel与nioEndpoint进行注册,注册到Poller,将对应的socket包装类添加到Poller的队列中,同时唤醒selectorpoller.register(socketWrapper);returntrue;}catch(Throwablet){ExceptionUtils.handleThrowable(t);try{log.error(sm.getString("endpoint.socketOptionsError"),t);}catch(Throwablett){ExceptionUtils.handleThrowable(tt);}if(socketWrapper==null){destroySocket(socket);}}//Telltoclosethesocketifneededreturnfalse;}
Socket请求轮询
上一小节是接收到了socket请求,进行包装之后,将socket添加到了Poller
的队列上,并可能唤醒了Selector
,本小节就来看看,Poller是如何进行socket的轮询的。
首先org.apache.tomcat.util.net.NioEndpoint.Poller
也是实现了Runnable接口,是一个可以单独启动的线程
初始化及启动是在org.apache.tomcat.util.net.NioEndpoint#startInternal
重要的属性:
java.nio.channels.Selector
:在Poller对象初始化的时候,就会启动轮询器SynchronizedQueue<PollerEvent>
:同步的事件队列
再来看下具体处理逻辑,run方法的源码
publicvoidrun(){//Loopuntildestroy()iscalledwhile(true){booleanhasEvents=false;try{if(!close){//去SynchronizedQueue事件队列中拉去,看是否已经有了事件,如果有,则返回true//如果从队列中拉取到了event(即上一步将NioSocketWrapper封装为PollerEvent添加到次队列中),将socketChannel注册到Selector上,标记为SelectionKey.OP_READ,添加处理函数attachment(为Accetpor添加到Poller时的//NioSocketWrapper)hasEvents=events();if(wakeupCounter.getAndSet(-1)>0){//Ifwearehere,meanswehaveotherstufftodo//DoanonblockingselectkeyCount=selector.selectNow();}else{keyCount=selector.select(selectorTimeout);}wakeupCounter.set(0);}if(close){events();timeout(0,false);try{selector.close();}catch(IOExceptionioe){log.error(sm.getString("endpoint.nio.selectorCloseFail"),ioe);}break;}//Eitherwetimedoutorwewokeup,processeventsfirstif(keyCount==0){hasEvents=(hasEvents|events());}}catch(Throwablex){ExceptionUtils.handleThrowable(x);log.error(sm.getString("endpoint.nio.selectorLoopError"),x);continue;}Iterator<SelectionKey>iterator=keyCount>0?selector.selectedKeys().iterator():null;//Walkthroughthecollectionofreadykeysanddispatch//anyactiveevent.//selector轮询获取已经注册的事件,如果有事件准备好,此时通过selectKeys方法就能拿到对应的事件while(iterator!=null&&iterator.hasNext()){SelectionKeysk=iterator.next();//获取到事件后,从迭代器删除事件,防止事件重复轮询iterator.remove();//获取事件的处理器,这个attachment是在event()方法中注册的,后续这个事件的处理,就交给这个wrapper去处理NioSocketWrappersocketWrapper=(NioSocketWrapper)sk.attachment();//Attachmentmaybenullifanotherthreadhascalled//cancelledKey()if(socketWrapper!=null){processKey(sk,socketWrapper);}}//Processtimeoutstimeout(keyCount,hasEvents);}getStopLatch().countDown();}
在这里,有一个很重要的方法,org.apache.tomcat.util.net.NioEndpoint.Poller#events()
,他是从Poller
的事件队列中获取Acceptor
接收到的可用socket,并将其注册到Selector
上
/***ProcesseseventsintheeventqueueofthePoller.**@return<code>true</code>ifsomeeventswereprocessed,*<code>false</code>ifqueuewasempty*/publicbooleanevents(){booleanresult=false;PollerEventpe=null;//如果Acceptor将socket添加到队列中,那么events.poll()方法就能拿到对应的事件,否则拿不到就返回falsefor(inti=0,size=events.size();i<size&&(pe=events.poll())!=null;i++){result=true;NioSocketWrappersocketWrapper=pe.getSocketWrapper();SocketChannelsc=socketWrapper.getSocket().getIOChannel();intinterestOps=pe.getInterestOps();if(sc==null){log.warn(sm.getString("endpoint.nio.nullSocketChannel"));socketWrapper.close();}elseif(interestOps==OP_REGISTER){//如果是Acceptor刚添加到队列中的事件,那么此时的ops就是OP_REGISTERtry{,//将次socket注册到selector上,标记为OP_READ事件,添加事件触发时处理函数socketWrappersc.register(getSelector(),SelectionKey.OP_READ,socketWrapper);}catch(Exceptionx){log.error(sm.getString("endpoint.nio.registerFail"),x);}}else{//??这里的逻辑,不清楚什么情况下会进入到这个分支里面finalSelectionKeykey=sc.keyFor(getSelector());if(key==null){//Thekeywascancelled(e.g.duetosocketclosure)//andremovedfromtheselectorwhileitwasbeing//processed.Countdowntheconnectionsatthispoint//sinceitwon'thavebeencounteddownwhenthesocket//closed.socketWrapper.close();}else{finalNioSocketWrapperattachment=(NioSocketWrapper)key.attachment();if(attachment!=null){//Weareregisteringthekeytostartwith,resetthefairnesscounter.try{intops=key.interestOps()|interestOps;attachment.interestOps(ops);key.interestOps(ops);}catch(CancelledKeyExceptionckx){cancelledKey(key,socketWrapper);}}else{cancelledKey(key,socketWrapper);}}}if(running&&!paused&&eventCache!=null){pe.reset();eventCache.push(pe);}}returnresult;}
还有一个重要方法就是org.apache.tomcat.util.net.NioEndpoint.Poller#processKey
,上一个方法是获取event,并注册到selector,那这个方法就是通过Selector
获取到的数据准备好的event,并开始封装成对应的业务处理线程SocketProcessorBase
,扔到线程池里开始处理
protectedvoidprocessKey(SelectionKeysk,NioSocketWrappersocketWrapper){try{if(close){cancelledKey(sk,socketWrapper);}elseif(sk.isValid()){if(sk.isReadable()||sk.isWritable()){if(socketWrapper.getSendfileData()!=null){processSendfile(sk,socketWrapper,false);}else{unreg(sk,socketWrapper,sk.readyOps());booleancloseSocket=false;//Readgoesbeforewriteif(sk.isReadable()){//这里如果是异步的操作,就会走这里if(socketWrapper.readOperation!=null){if(!socketWrapper.readOperation.process()){closeSocket=true;}}elseif(socketWrapper.readBlocking){//readBlocking默认为falsesynchronized(socketWrapper.readLock){socketWrapper.readBlocking=false;socketWrapper.readLock.notify();}}elseif(!processSocket(socketWrapper,SocketEvent.OPEN_READ,true)){//处理正常的事件,这里的processSocket就要正式开始处理请求了。//将对应的事件封装成对应的线程,然后交给线程池去处理正式的请求业务closeSocket=true;}}if(!closeSocket&&sk.isWritable()){if(socketWrapper.writeOperation!=null){if(!socketWrapper.writeOperation.process()){closeSocket=true;}}elseif(socketWrapper.writeBlocking){synchronized(socketWrapper.writeLock){socketWrapper.writeBlocking=false;socketWrapper.writeLock.notify();}}elseif(!processSocket(socketWrapper,SocketEvent.OPEN_WRITE,true)){closeSocket=true;}}if(closeSocket){cancelledKey(sk,socketWrapper);}}}}else{//InvalidkeycancelledKey(sk,socketWrapper);}}catch(CancelledKeyExceptionckx){cancelledKey(sk,socketWrapper);}catch(Throwablet){ExceptionUtils.handleThrowable(t);log.error(sm.getString("endpoint.nio.keyProcessingError"),t);}}
请求具体处理
上一步,Selector获取到了就绪的请求socket,然后根据socket注册的触发处理函数等,将这些数据进行封装,扔到了线程池里,开始具体的业务逻辑处理。本节就是从工作线程封装开始,org.apache.tomcat.util.net.SocketProcessorBase
为工作线程类的抽象类,实现了Runnable接口,不同的Endpoint实现具体的处理逻辑,本节以NioEndpoint为例
以下为org.apache.tomcat.util.net.AbstractEndpoint#processSocket
方法源码
/***ProcessthegivenSocketWrapperwiththegivenstatus.Usedtotrigger*processingasifthePoller(forthoseendpointsthathaveone)*selectedthesocket.**@paramsocketWrapperThesocketwrappertoprocess*@parameventThesocketeventtobeprocessed*@paramdispatchShouldtheprocessingbeperformedonanew*containerthread**@returnifprocessingwastriggeredsuccessfully*/publicbooleanprocessSocket(SocketWrapperBase<S>socketWrapper,SocketEventevent,booleandispatch){try{if(socketWrapper==null){returnfalse;}//优先使用已经存在的线程SocketProcessorBase<S>sc=null;if(processorCache!=null){sc=processorCache.pop();}if(sc==null){sc=createSocketProcessor(socketWrapper,event);}else{sc.reset(socketWrapper,event);}//获取线程池。线程池的初始化,是在Acceptor、Poller这两个单独线程启动之前创建//tomcat使用了自定义的org.apache.tomcat.util.threads.TaskQueue,这块tomcat也进行了小的适配开发//核心线程为10个,最大200线程Executorexecutor=getExecutor();if(dispatch&&executor!=null){executor.execute(sc);}else{sc.run();}}catch(RejectedExecutionExceptionree){getLog().warn(sm.getString("endpoint.executor.fail",socketWrapper),ree);returnfalse;}catch(Throwablet){ExceptionUtils.handleThrowable(t);//ThismeanswegotanOOMorsimilarcreatingathread,orthat//thepoolanditsqueuearefullgetLog().error(sm.getString("endpoint.process.fail"),t);returnfalse;}returntrue;}
上面的方法是得到了处理业务逻辑的线程SocketProcessorBase,NioEndpoint内部类org.apache.tomcat.util.net.NioEndpoint.SocketProcessor
继承了这个抽象类,也就是具体的业务处理逻辑在org.apache.tomcat.util.net.NioEndpoint.SocketProcessor#doRun
方法中,最终调用到我们的Servlet
protectedvoiddoRun(){/**Donotcacheandre-usethevalueofsocketWrapper.getSocket()in*thismethod.Ifthesocketclosesthevaluewillbeupdatedto*CLOSED_NIO_CHANNELandthepreviousvaluepotentiallyre-usedfor*anewconnection.Thatcanresultinastalecachedvaluewhich*inturncanresultinunintentionallyclosingcurrentlyactive*connections.*/Pollerpoller=NioEndpoint.this.poller;if(poller==null){socketWrapper.close();return;}try{inthandshake=-1;try{//握手相关判断逻辑...}catch(IOExceptionx){...}//三次握手成功了if(handshake==0){SocketStatestate=SocketState.OPEN;//Processtherequestfromthissocket//event为SocketEvent.OPEN_READ,这个变量是org.apache.tomcat.util.net.NioEndpoint.Poller#processKey方法赋值if(event==null){state=getHandler().process(socketWrapper,SocketEvent.OPEN_READ);}else{//这里就开始正式处理请求了state=getHandler().process(socketWrapper,event);}if(state==SocketState.CLOSED){poller.cancelledKey(getSelectionKey(),socketWrapper);}}elseif(handshake==-1){getHandler().process(socketWrapper,SocketEvent.CONNECT_FAIL);poller.cancelledKey(getSelectionKey(),socketWrapper);}elseif(handshake==SelectionKey.OP_READ){socketWrapper.registerReadInterest();}elseif(handshake==SelectionKey.OP_WRITE){socketWrapper.registerWriteInterest();}}catch(CancelledKeyExceptioncx){poller.cancelledKey(getSelectionKey(),socketWrapper);}catch(VirtualMachineErrorvme){ExceptionUtils.handleThrowable(vme);}catch(Throwablet){log.error(sm.getString("endpoint.processing.fail"),t);poller.cancelledKey(getSelectionKey(),socketWrapper);}finally{socketWrapper=null;event=null;//returntocacheif(running&&!paused&&processorCache!=null){processorCache.push(this);}}}
总结
Tomcat
是如何接收网络请求?使用java nio的同步非阻塞去进行网络监听。
org.apache.tomcat.util.net.AbstractEndpoint#bindWithCleanup
中初始化网络监听、SSL{ ....serverSock=ServerSocketChannel.open();socketProperties.setProperties(serverSock.socket());InetSocketAddressaddr=newInetSocketAddress(getAddress(),getPortWithOffset());//当应用层面的连接数到达最大值时,操作系统可以继续接收连接,那么操作系统能继续接收的最大连接数就是这个队列长度,可以通过acceptCount参数配置,默认是100serverSock.bind(addr,getAcceptCount());}serverSock.configureBlocking(true);//mimicAPRbehavior
org.apache.tomcat.util.net.NioEndpoint#startInternal
中初始化业务处理的线程池、连接限制器、Poller线程、Acceptor线程如何做到高性能的
http
协议服务器?Tomcat把接收连接、检测 I/O 事件以及处理请求进行了拆分,用不同规模的线程去做对应的事情,这也是tomcat能高并发处理请求的原因。不让线程阻塞,尽量让CPU忙起来
是怎么设计的呢?
通过接口、抽象类等,将不同的处理逻辑拆分,各司其职
org.apache.tomcat.util.net.NioEndpoint.Poller
:引用了java.nio.channels.Selector
,内部有个事件队列,监听I/O事件具体就是在这里做的org.apache.tomcat.util.net.NioEndpoint.NioSocketWrapper
org.apache.tomcat.util.net.NioEndpoint.SocketProcessor
: 具体处理请求的线程类org.apache.tomcat.util.net.AbstractEndpoint
:I/O事件的检测、处理逻辑都在这个类的实现类里面。使用模板方法,不同的协议有不同的实现方法。NioEndpoint/Nio2Endpoint/AprEndpoint
</div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
Apache Tomcat怎么高并发处理请求的详细内容,希望对您有所帮助,信息来源于网络。