原文转:http://blog.csdn.net/zhanghj07409/article/details/51781413
本文将主要介绍Server端处理一次请求的流程,同时讲解一个比较巧妙的设计——Filter。
根据前面的分析我们可以推断出Server端处理网络通信的组件为NettyServer,对应处理具体事件的handler为NettyHandler,它的构造函数需要一个ChannelHandler的参数,这里传递的就是NettyServer实例的引用。这样一来,handler对messageReceived()的事件处理,又传递给了NettyServer的receive()方法
| @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { handler.received(channel, e.getMessage()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } |
NettyServer本身没有实现receive方法,这个调用由基类AbstractPeer处理,而它也是再调用自己维护的ChannelHandler,也就是构造NettyServer时传入的handler。这是一个ChannelHandlerDispatcher实例,它允许同时触发一组普通的handler。实际构建时的handler为new DecodeHandler(new HeaderExchangeHandler(handler)),HeaderExchangeHandler中有一部分处理的逻辑,同时还会调用外部传递handler。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | publicvoidreceived(Channelchannel,Objectmessage)throwsRemotingException{ ExchangeChannelexchangeChannel=HeaderExchangeChannel.getOrAddChannel(channel); try{ if(messageinstanceofRequest){ // handle request. Requestrequest=(Request)message; if(request.isEvent()){ handlerEvent(channel,request); }else{ if(request.isTwoWay()){ Responseresponse=handleRequest(exchangeChannel,request); channel.send(response); }else{ handler.received(exchangeChannel,request.getData()); } } }elseif(messageinstanceofResponse){ ... }else{ handler.received(exchangeChannel,message); } }finally{ HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } ResponsehandleRequest(ExchangeChannelchannel,Requestreq)throwsRemotingException{ Responseres=newResponse(req.getId(),req.getVersion()); // find handler by message class. Objectmsg=req.getData(); try{ // handle data. Objectresult=handler.reply(channel,msg); res.setStatus(Response.OK); res.setResult(result); }catch(Throwablee){ res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); } returnres; } |
最后调用了handelr.reply()方法, 它的实现与具体的协议有关,比如默认配置下就是在DubboProtocol中构建的requestHandler,在createServer()方法中传递给Exchanger。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | private ExchangeServer createServer(URL url) { ... ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } ... return server; } private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } } }; Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException{ String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); return exporter.getInvoker(); } |
看到了invoker.invoke(),也就是真正执行调用的地方。这个Invoker实例来自于根据serviceKey查找的Exporter,它是通过ExtensionLoader创建的,是一个ProtocolFilterWrapper实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | public<T>Exporter<T>export(Invoker<T>invoker)throwsRpcException{ if(Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())){ returnprotocol.export(invoker); } returnprotocol.export(buildInvokerChain(invoker,Constants.SERVICE_FILTER_KEY,Constants.PROVIDER)); } privatestatic<T>Invoker<T>buildInvokerChain(finalInvoker<T>invoker,Stringkey,Stringgroup){ Invoker<T>last=invoker; List<Filter>filters=ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(),key,group); if(filters.size()>0){ for(inti=filters.size()-1;i>=0;i--){ finalFilterfilter=filters.get(i); finalInvoker<T>next=last; last=newInvoker<T>(){ ... publicResultinvoke(Invocationinvocation)throwsRpcException{ returnfilter.invoke(next,invocation); } ... }; } } returnlast; } |
这里会将原来的Invoker通过各种Filter包装成一个InvokerChain,一次调用会依次经过这些FilterChain到达最终的Filter。在这些Filter中可以进行超时校验、数据监控等工作,每个Filter相对独立,使得代码结构非常清晰,也便于为新功能进行扩展。
这个链的结构是:除了初始传入的Invoker外,对于每个Filter都新建一个Invoker,并返回最后一个创建的Invoker。当执行这些后来构建的Invoker.invoker()方法时,实际调用了filter.invoker(next, invocation),这样会去执行filter中的逻辑,然后再由filter调用下一个Invoker的invoke方法。直至最后一个原始的Invoker,它的invoke方法不会调用filter,而是正常的invoke逻辑。
| invokerN.invoke()开始 -> filterN.invoke()开始 -> ... -> invoker1.invoke()开始 -> filter1.invoke()开始 -> invoker0.invoke()开始 -> invoker0.invoke()结束 -> filter1.invoke()结束 -> invoker1.invoke()结束 -> ... -> filterN.invoke()结束 -> invokerN.invoke()结束 |
以TimeoutFilter为例具体来看一下
1 2 3 4 5 6 7 8 9 10 11 12 13 | @Activate(group=Constants.PROVIDER) publicclassTimeoutFilterimplementsFilter{ publicResultinvoke(Invoker<?>invoker,Invocationinvocation)throwsRpcException{ longstart=System.currentTimeMillis(); Resultresult=invoker.invoke(invocation); longelapsed=System.currentTimeMillis()-start; if(invoker.getUrl()!=null &&elapsed>invoker.getUrl().getMethodParameter(invocation.getMethodName(),"timeout",Integer.MAX_VALUE)){ // log timeout info } returnresult; } } |
在调用下一个Invoker的前后记录时间,并将超时的信息打印出来。其中原始的Invoker来自JavassistProxyFactory创建的实例
| public<T>Invoker<T>getInvoker(Tproxy,Class<T>type,URLurl){ finalWrapperwrapper=Wrapper.getWrapper(proxy.getClass().getName().indexOf('$')<0?proxy.getClass():type); returnnewAbstractProxyInvoker<T>(proxy,type,url){ @Override protectedObjectdoInvoke(Tproxy,StringmethodName, Class<?>[]parameterTypes, Object[]arguments)throwsThrowable{ returnwrapper.invokeMethod(proxy,methodName,parameterTypes,arguments); } }; } |
基类的invoke方法调用子类的doInvoke,去执行真正的反射调用。Server端处理一次请求的流程就介绍到这里。
文中提到的核心类包括
- NettyServer
- NettyHandler
- HeaderExchangeHandler
- DubboProtocol
- ProtocolFilterWrapper
- JavassistProxyFactory