Apache Tomcat怎么高并发处理请求(apache,tomcat,开发技术)

时间:2024-04-30 01:21:54 作者 : 石家庄SEO 分类 : 开发技术
  • TAG :

介绍

作为常用的http协议服务器,tomcat应用非常广泛。tomcat也是遵循Servelt协议的,Servelt协议可以让服务器与真实服务逻辑代码进行解耦。各自只需要关注Servlet协议即可。
对于tomcat是如何作为一个高性能的服务器的呢?你是不是也会有这样的疑问?

tomcat是如何接收网络请求?

如何做到高性能的http协议服务器?

tomcat从8.0往后开始使用了NIO非阻塞io模型,提高了吞吐量,本文的源码是tomcat 9.0.48版本

接收Socket请求

org.apache.tomcat.util.net.Acceptor实现了Runnable接口,在一个单独的线程中以死循环的方式一直进行socket的监听

线程的初始化及启动是在方法org.apache.tomcat.util.net.AbstractEndpoint#startAcceptorThread

有个很重要的属性org.apache.tomcat.util.net.AbstractEndpoint;同时实现了run方法,方法中主要有以下功能:

  • 请求最大连接数限制: 最大为 8*1024;请你注意到达最大连接数后操作系统底层还是会接收客户端连接,但用户层已经不再接收

  • 获取socketChannel

publicvoidrun(){interrorDelay=0;try{//Loopuntilwereceiveashutdowncommandwhile(!stopCalled){ ...if(stopCalled){break;}state=AcceptorState.RUNNING;try{//ifwehavereachedmaxconnections,wait//如果连接超过了8*1024,则线程阻塞等待;是使用org.apache.tomcat.util.threads.LimitLatch类实现了分享锁(内部实现了AbstractQueuedSynchronizer)//请你注意到达最大连接数后操作系统底层还是会接收客户端连接,但用户层已经不再接收。endpoint.countUpOrAwaitConnection();//Endpointmighthavebeenpausedwhilewaitingforlatch//Ifthatisthecase,don'tacceptnewconnectionsif(endpoint.isPaused()){continue;}Usocket=null;try{//Acceptthenextincomingconnectionfromtheserver//socket//抽象方法,不同的endPoint有不同的实现方法。NioEndPoint为例,实现方法为serverSock.accept(),这个方法主要看serverSock实例化时如果为阻塞,accept方法为阻塞;反之为立即返回,如果没有socket链接,则为nullsocket=endpoint.serverSocketAccept();}catch(Exceptionioe){//Wedidn'tgetasocketendpoint.countDownConnection();if(endpoint.isRunning()){//IntroducedelayifnecessaryerrorDelay=handleExceptionWithDelay(errorDelay);//re-throwthrowioe;}else{break;}}//Successfulaccept,resettheerrordelayerrorDelay=0;//Configurethesocketif(!stopCalled&&!endpoint.isPaused()){//setSocketOptions()willhandthesocketoffto//anappropriateprocessorifsuccessful//endPoint类的抽象方法,不同的endPoint有不同的实现。处理获取到的socketChannel链接,如果该socket链接能正常处理,那么该方法会返回true,否则为falseif(!endpoint.setSocketOptions(socket)){endpoint.closeSocket(socket);}}else{endpoint.destroySocket(socket);}}catch(Throwablet){...}}}finally{stopLatch.countDown();}state=AcceptorState.ENDED;}

再来看下org.apache.tomcat.util.net.NioEndpoint#setSocketOptions方法的具体实现(NioEndpoint为例)

这个方法中主要做的事:

  • 创建NioChannel

  • 设置socket为非阻塞

  • 将socket添加到Poller的队列中

protectedbooleansetSocketOptions(SocketChannelsocket){NioSocketWrappersocketWrapper=null;try{//Allocatechannelandwrapper//优先使用已有的缓存nioChannelNioChannelchannel=null;if(nioChannels!=null){channel=nioChannels.pop();}if(channel==null){SocketBufferHandlerbufhandler=newSocketBufferHandler(socketProperties.getAppReadBufSize(),socketProperties.getAppWriteBufSize(),socketProperties.getDirectBuffer());if(isSSLEnabled()){channel=newSecureNioChannel(bufhandler,this);}else{channel=newNioChannel(bufhandler);}}//将nioEndpoint与NioChannel进行包装NioSocketWrappernewWrapper=newNioSocketWrapper(channel,this);channel.reset(socket,newWrapper);connections.put(socket,newWrapper);socketWrapper=newWrapper;//Setsocketproperties//Disableblocking,pollingwillbeused//设置当前链接的socket为非阻塞socket.configureBlocking(false);if(getUnixDomainSocketPath()==null){socketProperties.setProperties(socket.socket());}socketWrapper.setReadTimeout(getConnectionTimeout());socketWrapper.setWriteTimeout(getConnectionTimeout());socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());//将包装后的nioChannel与nioEndpoint进行注册,注册到Poller,将对应的socket包装类添加到Poller的队列中,同时唤醒selectorpoller.register(socketWrapper);returntrue;}catch(Throwablet){ExceptionUtils.handleThrowable(t);try{log.error(sm.getString("endpoint.socketOptionsError"),t);}catch(Throwablett){ExceptionUtils.handleThrowable(tt);}if(socketWrapper==null){destroySocket(socket);}}//Telltoclosethesocketifneededreturnfalse;}

Socket请求轮询

上一小节是接收到了socket请求,进行包装之后,将socket添加到了Poller的队列上,并可能唤醒了Selector,本小节就来看看,Poller是如何进行socket的轮询的。

首先org.apache.tomcat.util.net.NioEndpoint.Poller也是实现了Runnable接口,是一个可以单独启动的线程

初始化及启动是在org.apache.tomcat.util.net.NioEndpoint#startInternal

重要的属性:

  • java.nio.channels.Selector:在Poller对象初始化的时候,就会启动轮询器

  • SynchronizedQueue<PollerEvent>:同步的事件队列

再来看下具体处理逻辑,run方法的源码

 publicvoidrun(){//Loopuntildestroy()iscalledwhile(true){booleanhasEvents=false;try{if(!close){//去SynchronizedQueue事件队列中拉去,看是否已经有了事件,如果有,则返回true//如果从队列中拉取到了event(即上一步将NioSocketWrapper封装为PollerEvent添加到次队列中),将socketChannel注册到Selector上,标记为SelectionKey.OP_READ,添加处理函数attachment(为Accetpor添加到Poller时的//NioSocketWrapper)hasEvents=events();if(wakeupCounter.getAndSet(-1)>0){//Ifwearehere,meanswehaveotherstufftodo//DoanonblockingselectkeyCount=selector.selectNow();}else{keyCount=selector.select(selectorTimeout);}wakeupCounter.set(0);}if(close){events();timeout(0,false);try{selector.close();}catch(IOExceptionioe){log.error(sm.getString("endpoint.nio.selectorCloseFail"),ioe);}break;}//Eitherwetimedoutorwewokeup,processeventsfirstif(keyCount==0){hasEvents=(hasEvents|events());}}catch(Throwablex){ExceptionUtils.handleThrowable(x);log.error(sm.getString("endpoint.nio.selectorLoopError"),x);continue;}Iterator<SelectionKey>iterator=keyCount>0?selector.selectedKeys().iterator():null;//Walkthroughthecollectionofreadykeysanddispatch//anyactiveevent.//selector轮询获取已经注册的事件,如果有事件准备好,此时通过selectKeys方法就能拿到对应的事件while(iterator!=null&&iterator.hasNext()){SelectionKeysk=iterator.next();//获取到事件后,从迭代器删除事件,防止事件重复轮询iterator.remove();//获取事件的处理器,这个attachment是在event()方法中注册的,后续这个事件的处理,就交给这个wrapper去处理NioSocketWrappersocketWrapper=(NioSocketWrapper)sk.attachment();//Attachmentmaybenullifanotherthreadhascalled//cancelledKey()if(socketWrapper!=null){processKey(sk,socketWrapper);}}//Processtimeoutstimeout(keyCount,hasEvents);}getStopLatch().countDown();}

在这里,有一个很重要的方法,org.apache.tomcat.util.net.NioEndpoint.Poller#events(),他是从Poller的事件队列中获取Acceptor接收到的可用socket,并将其注册到Selector

 /***ProcesseseventsintheeventqueueofthePoller.**@return<code>true</code>ifsomeeventswereprocessed,*<code>false</code>ifqueuewasempty*/publicbooleanevents(){booleanresult=false;PollerEventpe=null;//如果Acceptor将socket添加到队列中,那么events.poll()方法就能拿到对应的事件,否则拿不到就返回falsefor(inti=0,size=events.size();i<size&&(pe=events.poll())!=null;i++){result=true;NioSocketWrappersocketWrapper=pe.getSocketWrapper();SocketChannelsc=socketWrapper.getSocket().getIOChannel();intinterestOps=pe.getInterestOps();if(sc==null){log.warn(sm.getString("endpoint.nio.nullSocketChannel"));socketWrapper.close();}elseif(interestOps==OP_REGISTER){//如果是Acceptor刚添加到队列中的事件,那么此时的ops就是OP_REGISTERtry{,//将次socket注册到selector上,标记为OP_READ事件,添加事件触发时处理函数socketWrappersc.register(getSelector(),SelectionKey.OP_READ,socketWrapper);}catch(Exceptionx){log.error(sm.getString("endpoint.nio.registerFail"),x);}}else{//??这里的逻辑,不清楚什么情况下会进入到这个分支里面finalSelectionKeykey=sc.keyFor(getSelector());if(key==null){//Thekeywascancelled(e.g.duetosocketclosure)//andremovedfromtheselectorwhileitwasbeing//processed.Countdowntheconnectionsatthispoint//sinceitwon'thavebeencounteddownwhenthesocket//closed.socketWrapper.close();}else{finalNioSocketWrapperattachment=(NioSocketWrapper)key.attachment();if(attachment!=null){//Weareregisteringthekeytostartwith,resetthefairnesscounter.try{intops=key.interestOps()|interestOps;attachment.interestOps(ops);key.interestOps(ops);}catch(CancelledKeyExceptionckx){cancelledKey(key,socketWrapper);}}else{cancelledKey(key,socketWrapper);}}}if(running&&!paused&&eventCache!=null){pe.reset();eventCache.push(pe);}}returnresult;}

还有一个重要方法就是org.apache.tomcat.util.net.NioEndpoint.Poller#processKey,上一个方法是获取event,并注册到selector,那这个方法就是通过Selector获取到的数据准备好的event,并开始封装成对应的业务处理线程SocketProcessorBase,扔到线程池里开始处理

protectedvoidprocessKey(SelectionKeysk,NioSocketWrappersocketWrapper){try{if(close){cancelledKey(sk,socketWrapper);}elseif(sk.isValid()){if(sk.isReadable()||sk.isWritable()){if(socketWrapper.getSendfileData()!=null){processSendfile(sk,socketWrapper,false);}else{unreg(sk,socketWrapper,sk.readyOps());booleancloseSocket=false;//Readgoesbeforewriteif(sk.isReadable()){//这里如果是异步的操作,就会走这里if(socketWrapper.readOperation!=null){if(!socketWrapper.readOperation.process()){closeSocket=true;}}elseif(socketWrapper.readBlocking){//readBlocking默认为falsesynchronized(socketWrapper.readLock){socketWrapper.readBlocking=false;socketWrapper.readLock.notify();}}elseif(!processSocket(socketWrapper,SocketEvent.OPEN_READ,true)){//处理正常的事件,这里的processSocket就要正式开始处理请求了。//将对应的事件封装成对应的线程,然后交给线程池去处理正式的请求业务closeSocket=true;}}if(!closeSocket&&sk.isWritable()){if(socketWrapper.writeOperation!=null){if(!socketWrapper.writeOperation.process()){closeSocket=true;}}elseif(socketWrapper.writeBlocking){synchronized(socketWrapper.writeLock){socketWrapper.writeBlocking=false;socketWrapper.writeLock.notify();}}elseif(!processSocket(socketWrapper,SocketEvent.OPEN_WRITE,true)){closeSocket=true;}}if(closeSocket){cancelledKey(sk,socketWrapper);}}}}else{//InvalidkeycancelledKey(sk,socketWrapper);}}catch(CancelledKeyExceptionckx){cancelledKey(sk,socketWrapper);}catch(Throwablet){ExceptionUtils.handleThrowable(t);log.error(sm.getString("endpoint.nio.keyProcessingError"),t);}}

请求具体处理

上一步,Selector获取到了就绪的请求socket,然后根据socket注册的触发处理函数等,将这些数据进行封装,扔到了线程池里,开始具体的业务逻辑处理。本节就是从工作线程封装开始,org.apache.tomcat.util.net.SocketProcessorBase为工作线程类的抽象类,实现了Runnable接口,不同的Endpoint实现具体的处理逻辑,本节以NioEndpoint为例

以下为org.apache.tomcat.util.net.AbstractEndpoint#processSocket方法源码

/***ProcessthegivenSocketWrapperwiththegivenstatus.Usedtotrigger*processingasifthePoller(forthoseendpointsthathaveone)*selectedthesocket.**@paramsocketWrapperThesocketwrappertoprocess*@parameventThesocketeventtobeprocessed*@paramdispatchShouldtheprocessingbeperformedonanew*containerthread**@returnifprocessingwastriggeredsuccessfully*/publicbooleanprocessSocket(SocketWrapperBase<S>socketWrapper,SocketEventevent,booleandispatch){try{if(socketWrapper==null){returnfalse;}//优先使用已经存在的线程SocketProcessorBase<S>sc=null;if(processorCache!=null){sc=processorCache.pop();}if(sc==null){sc=createSocketProcessor(socketWrapper,event);}else{sc.reset(socketWrapper,event);}//获取线程池。线程池的初始化,是在Acceptor、Poller这两个单独线程启动之前创建//tomcat使用了自定义的org.apache.tomcat.util.threads.TaskQueue,这块tomcat也进行了小的适配开发//核心线程为10个,最大200线程Executorexecutor=getExecutor();if(dispatch&&executor!=null){executor.execute(sc);}else{sc.run();}}catch(RejectedExecutionExceptionree){getLog().warn(sm.getString("endpoint.executor.fail",socketWrapper),ree);returnfalse;}catch(Throwablet){ExceptionUtils.handleThrowable(t);//ThismeanswegotanOOMorsimilarcreatingathread,orthat//thepoolanditsqueuearefullgetLog().error(sm.getString("endpoint.process.fail"),t);returnfalse;}returntrue;}

上面的方法是得到了处理业务逻辑的线程SocketProcessorBase,NioEndpoint内部类org.apache.tomcat.util.net.NioEndpoint.SocketProcessor继承了这个抽象类,也就是具体的业务处理逻辑在org.apache.tomcat.util.net.NioEndpoint.SocketProcessor#doRun方法中,最终调用到我们的Servlet

protectedvoiddoRun(){/**Donotcacheandre-usethevalueofsocketWrapper.getSocket()in*thismethod.Ifthesocketclosesthevaluewillbeupdatedto*CLOSED_NIO_CHANNELandthepreviousvaluepotentiallyre-usedfor*anewconnection.Thatcanresultinastalecachedvaluewhich*inturncanresultinunintentionallyclosingcurrentlyactive*connections.*/Pollerpoller=NioEndpoint.this.poller;if(poller==null){socketWrapper.close();return;}try{inthandshake=-1;try{//握手相关判断逻辑...}catch(IOExceptionx){...}//三次握手成功了if(handshake==0){SocketStatestate=SocketState.OPEN;//Processtherequestfromthissocket//event为SocketEvent.OPEN_READ,这个变量是org.apache.tomcat.util.net.NioEndpoint.Poller#processKey方法赋值if(event==null){state=getHandler().process(socketWrapper,SocketEvent.OPEN_READ);}else{//这里就开始正式处理请求了state=getHandler().process(socketWrapper,event);}if(state==SocketState.CLOSED){poller.cancelledKey(getSelectionKey(),socketWrapper);}}elseif(handshake==-1){getHandler().process(socketWrapper,SocketEvent.CONNECT_FAIL);poller.cancelledKey(getSelectionKey(),socketWrapper);}elseif(handshake==SelectionKey.OP_READ){socketWrapper.registerReadInterest();}elseif(handshake==SelectionKey.OP_WRITE){socketWrapper.registerWriteInterest();}}catch(CancelledKeyExceptioncx){poller.cancelledKey(getSelectionKey(),socketWrapper);}catch(VirtualMachineErrorvme){ExceptionUtils.handleThrowable(vme);}catch(Throwablet){log.error(sm.getString("endpoint.processing.fail"),t);poller.cancelledKey(getSelectionKey(),socketWrapper);}finally{socketWrapper=null;event=null;//returntocacheif(running&&!paused&&processorCache!=null){processorCache.push(this);}}}

总结

  • Tomcat是如何接收网络请求?

    使用java nio的同步非阻塞去进行网络监听。

    org.apache.tomcat.util.net.AbstractEndpoint#bindWithCleanup中初始化网络监听、SSL

     { ....serverSock=ServerSocketChannel.open();socketProperties.setProperties(serverSock.socket());InetSocketAddressaddr=newInetSocketAddress(getAddress(),getPortWithOffset());//当应用层面的连接数到达最大值时,操作系统可以继续接收连接,那么操作系统能继续接收的最大连接数就是这个队列长度,可以通过acceptCount参数配置,默认是100serverSock.bind(addr,getAcceptCount());}serverSock.configureBlocking(true);//mimicAPRbehavior

    org.apache.tomcat.util.net.NioEndpoint#startInternal中初始化业务处理的线程池、连接限制器、Poller线程、Acceptor线程

  • 如何做到高性能的http协议服务器?

    Tomcat把接收连接、检测 I/O 事件以及处理请求进行了拆分,用不同规模的线程去做对应的事情,这也是tomcat能高并发处理请求的原因。不让线程阻塞,尽量让CPU忙起来

Apache Tomcat怎么高并发处理请求

  • 是怎么设计的呢?

    通过接口、抽象类等,将不同的处理逻辑拆分,各司其职

    • org.apache.tomcat.util.net.NioEndpoint.Poller:引用了java.nio.channels.Selector,内部有个事件队列,监听I/O事件具体就是在这里做的

    • org.apache.tomcat.util.net.NioEndpoint.NioSocketWrapper

    • org.apache.tomcat.util.net.NioEndpoint.SocketProcessor: 具体处理请求的线程类

    • org.apache.tomcat.util.net.AbstractEndpoint:I/O事件的检测、处理逻辑都在这个类的实现类里面。使用模板方法,不同的协议有不同的实现方法。NioEndpoint/Nio2Endpoint/AprEndpoint

 </div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
本文:Apache Tomcat怎么高并发处理请求的详细内容,希望对您有所帮助,信息来源于网络。
上一篇:C语言线性表中顺序表的示例分析下一篇:

16 人围观 / 0 条评论 ↓快速评论↓

(必须)

(必须,保密)

阿狸1 阿狸2 阿狸3 阿狸4 阿狸5 阿狸6 阿狸7 阿狸8 阿狸9 阿狸10 阿狸11 阿狸12 阿狸13 阿狸14 阿狸15 阿狸16 阿狸17 阿狸18