简介 本篇文件我们研究dubbo服务的调用过程,即从消费端发起接口调用到服务端接收请求,然后返回到消费端结果的整个一个调用过程。
消费端发起调用 invoker的调用 直接看在服务引用过程中被代理后的接口源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class proxy0 implements ClassGenerator .DC , Destroyable , EchoService , DemoService { public static Method[] methods; private InvocationHandler handler; public String sayHello (String string) { Object[] arrobject = new Object[]{string}; Object object = this .handler.invoke(this , methods[0 ], arrobject); return (String)object; } public proxy0 (InvocationHandler invocationHandler) { this .handler = invocationHandler; } }
由上面源码可知,最终调用的sayHello方法会委托给InvokerInvocationHandler增强类,其源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class InvokerInvocationHandler implements InvocationHandler { private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class); private final Invoker<?> invoker; private ConsumerModel consumerModel; public InvokerInvocationHandler (Invoker<?> handler) { this .invoker = handler; String serviceKey = invoker.getUrl().getServiceKey(); if (serviceKey != null ) { this .consumerModel = ApplicationModel.getConsumerModel(serviceKey); } } @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args); String serviceKey = invoker.getUrl().getServiceKey(); rpcInvocation.setTargetServiceUniqueName(serviceKey); if (consumerModel != null ) { rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel); rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method)); } return invoker.invoke(rpcInvocation).recreate(); } }
继续根据调用链传给MockClusterInvoker类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 public class MockClusterInvoker <T > implements ClusterInvoker <T > { private static final Logger logger = LoggerFactory.getLogger(MockClusterInvoker.class); private final Directory<T> directory; private final Invoker<T> invoker; public MockClusterInvoker (Directory<T> directory, Invoker<T> invoker) { this .directory = directory; this .invoker = invoker; } @Override public Result invoke (Invocation invocation) throws RpcException { Result result = null ; String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || "false" .equalsIgnoreCase(value)) { result = this .invoker.invoke(invocation); } else if (value.startsWith("force" )) { if (logger.isWarnEnabled()) { logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl()); } result = doMockInvoke(invocation, null ); } else { try { result = this .invoker.invoke(invocation); if (result.getException() != null && result.getException() instanceof RpcException){ RpcException rpcException= (RpcException)result.getException(); if (rpcException.isBiz()){ throw rpcException; }else { result = doMockInvoke(invocation, rpcException); } } } catch (RpcException e) { if (e.isBiz()) { throw e; } if (logger.isWarnEnabled()) { logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e); } result = doMockInvoke(invocation, e); } } return result; } } 继续下一个请求,把请求传递给AbstractCluster的内部类:InterceptorInvokerNode,进行cluster拦截器的调用前后调用: ```java protected class InterceptorInvokerNode <T > extends AbstractClusterInvoker <T > { private AbstractClusterInvoker<T> clusterInvoker; private ClusterInterceptor interceptor; private AbstractClusterInvoker<T> next; @Override public Result invoke (Invocation invocation) throws RpcException { Result asyncResult; try { interceptor.before(next, invocation); asyncResult = interceptor.intercept(next, invocation); } catch (Exception e) { if (interceptor instanceof ClusterInterceptor.Listener) { ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; listener.onError(e, clusterInvoker, invocation); } throw e; } finally { interceptor.after(next, invocation); } return asyncResult.whenCompleteWithContext((r, t) -> { if (interceptor instanceof ClusterInterceptor.Listener) { ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; if (t == null ) { listener.onMessage(r, clusterInvoker, invocation); } else { listener.onError(t, clusterInvoker, invocation); } } }); } }
调用FailoverClusterInvoker.doInvoke之前,会先执行其父类的invoke方法: 父类:AbstractClusterInvoker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 public abstract class AbstractClusterInvoker <T > implements ClusterInvoker <T > { @Override public Result invoke (final Invocation invocation) throws RpcException { checkWhetherDestroyed(); Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments(); if (contextAttachments != null && contextAttachments.size() != 0 ) { ((RpcInvocation) invocation).addObjectAttachments(contextAttachments); } List<Invoker<T>> invokers = list(invocation); LoadBalance loadbalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); } protected void checkWhetherDestroyed () { if (destroyed.get()) { throw new RpcException("Rpc cluster invoker for " + getInterface() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + " is now destroyed! Can not invoke any more." ); } } protected List<Invoker<T>> list(Invocation invocation) throws RpcException { return directory.list(invocation); } protected LoadBalance initLoadBalance (List<Invoker<T>> invokers, Invocation invocation) { if (CollectionUtils.isNotEmpty(invokers)) { return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0 ).getUrl() .getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE)); } else { return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE); } } protected Invoker<T> select (LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (CollectionUtils.isEmpty(invokers)) { return null ; } String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName(); boolean sticky = invokers.get(0 ).getUrl() .getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY); if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { stickyInvoker = null ; } if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { if (availablecheck && stickyInvoker.isAvailable()) { return stickyInvoker; } } Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected); if (sticky) { stickyInvoker = invoker; } return invoker; } private Invoker<T> doSelect (LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (invokers.size() == 1 ) { return invokers.get(0 ); } Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { try { Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if (rInvoker != null ) { invoker = rInvoker; } else { int index = invokers.indexOf(invoker); try { invoker = invokers.get((index + 1 ) % invokers.size()); } catch (Exception e) { logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore." , e); } } } catch (Throwable t) { logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url" , t); } } return invoker; } private Invoker<T> reselect (LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException { List<Invoker<T>> reselectInvokers = new ArrayList<>( invokers.size() > 1 ? (invokers.size() - 1 ) : invokers.size()); for (Invoker<T> invoker : invokers) { if (availablecheck && !invoker.isAvailable()) { continue ; } if (selected == null || !selected.contains(invoker)) { reselectInvokers.add(invoker); } } if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } if (selected != null ) { for (Invoker<T> invoker : selected) { if ((invoker.isAvailable()) && !reselectInvokers.contains(invoker)) { reselectInvokers.add(invoker); } } } if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } return null ; } protected abstract Result doInvoke (Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException ;}
AbstractClusterInvoker集群抽象类有多个实现,包括以下几个:
mock=org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
failover=org.apache.dubbo.rpc.cluster.support.FailoverCluster
failfast=org.apache.dubbo.rpc.cluster.support.FailfastCluster
failsafe=org.apache.dubbo.rpc.cluster.support.FailsafeCluster
failback=org.apache.dubbo.rpc.cluster.support.FailbackCluster
forking=org.apache.dubbo.rpc.cluster.support.ForkingCluster
available=org.apache.dubbo.rpc.cluster.support.AvailableCluster
mergeable=org.apache.dubbo.rpc.cluster.support.MergeableCluster
broadcast=org.apache.dubbo.rpc.cluster.support.BroadcastCluster
zone-aware=org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster 每个具体实现会在后续的文章进行介绍,本次只简单介绍下FailoverCluster的源码:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public class FailoverClusterInvoker <T > extends AbstractClusterInvoker <T > { private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); public FailoverClusterInvoker (Directory<T> directory) { super (directory); } @Override public Result doInvoke (Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyInvokers = invokers; String methodName = RpcUtils.getMethodName(invocation); int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1 ; if (len <= 0 ) { len = 1 ; } RpcException le = null ; List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); Set<String> providers = new HashSet<String>(len); for (int i = 0 ; i < len; i++) { if (i > 0 ) { super .checkWhetherDestroyed(); copyInvokers = super .list(invocation); } Invoker<T> invoker = super .select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); return result; } catch (RpcException e) { if (e.isBiz()) { throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(); } }
稍微看下异步转同步的相关源码,这样在最后返回结果的时候不会迷茫:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public Result invoke (Invocation invocation) throws RpcException { Result asyncResult = invoker.invoke(invocation); try { if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) { asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } } return asyncResult; } @Override public Result get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; threadlessExecutor.waitAndDrain(); } return responseFuture.get(timeout, unit); } public void waitAndDrain () throws InterruptedException { if (finished) { return ; } Runnable runnable = queue.take(); synchronized (lock) { waiting = false ; runnable.run(); } runnable = queue.poll(); while (runnable != null ) { try { runnable.run(); } catch (Throwable t) { logger.info(t); } runnable = queue.poll(); } finished = true ; }
下面我们将略过过滤器调用链,直接分析AbstractInvoker和DubboInvoker源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public abstract class AbstractInvoker <T > implements Invoker <T > { private final Class<T> type; private final URL url; private final Map<String, Object> attachment; private volatile boolean available = true ; private AtomicBoolean destroyed = new AtomicBoolean(false ); @Override public Result invoke (Invocation inv) throws RpcException { RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker(this ); if (CollectionUtils.isNotEmptyMap(attachment)) { invocation.addObjectAttachmentsIfAbsent(attachment); } Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments(); if (CollectionUtils.isNotEmptyMap(contextAttachments)) { invocation.addObjectAttachments(contextAttachments); } invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation)); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); AsyncRpcResult asyncResult; try { asyncResult = (AsyncRpcResult) doInvoke(invocation); } catch (InvocationTargetException e) { Throwable te = e.getTargetException(); if (te == null ) { asyncResult = AsyncRpcResult.newDefaultAsyncResult(null , e, invocation); } else { if (te instanceof RpcException) { ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION); } asyncResult = AsyncRpcResult.newDefaultAsyncResult(null , te, invocation); } } catch (RpcException e) { if (e.isBiz()) { asyncResult = AsyncRpcResult.newDefaultAsyncResult(null , e, invocation); } else { throw e; } } catch (Throwable e) { asyncResult = AsyncRpcResult.newDefaultAsyncResult(null , e, invocation); } RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture())); return asyncResult; } protected ExecutorService getCallbackExecutor (URL url, Invocation inv) { ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url); if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) { return new ThreadlessExecutor(sharedExecutor); } else { return sharedExecutor; } } protected abstract Result doInvoke (Invocation invocation) throws Throwable ; }
AbstractInvoker根据不同的协议有多个实现包括dubboInvoker,redisInvoker,thriftInvoker,grpcInvoker等,我们这次简单研究下dubboInvoker的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public class DubboInvoker <T > extends AbstractInvoker <T > { private final ExchangeClient[] clients; private final AtomicPositiveInteger index = new AtomicPositiveInteger(); private final String version; private final ReentrantLock destroyLock = new ReentrantLock(); private final Set<Invoker<?>> invokers; @Override protected Result doInvoke (final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1 ) { currentClient = clients[0 ]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = calculateTimeout(invocation, methodName); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false ); currentClient.send(inv, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { ExecutorService executor = getCallbackExecutor(getUrl(), inv); CompletableFuture<Object> request = currentClient.request(inv, timeout, executor); CompletableFuture<AppResponse> appResponseFuture = request.thenApply(obj -> (AppResponse) obj); FutureContext.getContext().setCompatibleFuture(appResponseFuture); AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv); result.setExecutor(executor); return result; } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } @Override public boolean isAvailable () { if (!super .isAvailable()) { return false ; } for (ExchangeClient client : clients) { if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)) { return true ; } } return false ; } }
到此我们整个invoker的调用执行分析完毕,大致调用流程为:
proxy0.sayHello 代理类执行
InvokerInvocationHandler.invoke 代理类增强器执行
MockClusterInvoker.invoke mock 集群执行
AbstractCluster$InterceptorInvokerNode.invoke 执行cluster拦截器
FailoverClusterInvoker.doInvoke 容错集群执行
InvokerWrapper.invoke invoker包装类
ProtocolFilterWrapper$1.invoke 进行过滤器调用链
ListenerInvokerWrapper.invoke 执行监听wrapper
AsyncToSyncInvoker.invoke 结果获取异步转同步
DubboInvoker.doInvoke 最终交给交换器发起request请求
发起请求
ReferenceCountExchangeClient.request 引用计数器交换器,只做当前被调用服务引用个数的统计
HeaderExchangeClient.request 初始化头协议通道所谓头协议,是因为协议的参数放在请求头中
HeaderExchangeChannel.request 将请求数据封装Request,然后将Request,channel,线程池执行器excutor封装到DefaultFuture(里面封装了id和DefaultFuture映射)
NettyClient.send 根据netty的NioSocketChannel获取或创建dubbo定义的NettyChannel
NettyChannel.send 内部发送数据是通过netty的NioSocketChannel进行发送 看下HeaderExchangeChannel封装的Request对象:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public CompletableFuture<Object> request (Object request, int timeout, ExecutorService executor) throws RemotingException { if (closed) { throw new RemotingException(this .getLocalAddress(), null , "Failed to send request " + request + ", cause: The channel " + this + " is closed!" ); } Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true ); req.setData(request); DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
最终经过多次调用,会经过NettyChannel的send方法:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public void send (Object message, boolean sent) throws RemotingException { super .send(message, sent); boolean success = true ; int timeout = 0 ; try { ChannelFuture future = channel.writeAndFlush(message); if (sent) { timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.cause(); if (cause != null ) { throw cause; } } catch (Throwable e) { removeChannelIfDisconnected(channel); throw new RemotingException(this , "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this , "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit" ); } }
将我们封装好的Request做为message通过netty.NioSocketChannel.writeAndFlush准备发送到服务端,不过在发送之前,我们需要要对message进行编码。还记得吗?我们在引用服务创建的netty客户端的时候已经设置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 protected void doOpen () throws Throwable { final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this ); bootstrap = new Bootstrap(); bootstrap.group(NIO_EVENT_LOOP_GROUP) .option(ChannelOption.SO_KEEPALIVE, true ) .option(ChannelOption.TCP_NODELAY, true ) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .channel(socketChannelClass()); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000 , getConnectTimeout())); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { int heartbeatInterval = UrlUtils.getHeartbeat(getUrl()); if (getUrl().getParameter(SSL_ENABLED_KEY, false )) { ch.pipeline().addLast("negotiation" , SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler)); } NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this ); ch.pipeline() .addLast("decoder" , adapter.getDecoder()) .addLast("encoder" , adapter.getEncoder()) .addLast("client-idle-handler" , new IdleStateHandler(heartbeatInterval, 0 , 0 , MILLISECONDS)) .addLast("handler" , nettyClientHandler); String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST); if (socksProxyHost != null ) { int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT)); Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort)); ch.pipeline().addFirst(socks5ProxyHandler); } } }); }
因为这是客户端发送数据,所以发送前会执行InternalEncoder进行编码,这里只针对dubbo协议的编码进行介绍:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private class InternalEncoder extends MessageToByteEncoder { @Override protected void encode (ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { org.apache.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out); Channel ch = ctx.channel(); NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); codec.encode(channel, buffer, msg); } }
其中DubboCountCodec主要作用是用来解决接收数据时候数据包的问题,在接收数据的时候会直接交给DubboCodec,首先会执行DubboCodec的父类ExchangeCodec.encode的方法:
1 2 3 4 5 6 7 8 9 10 public void encode (Channel channel, ChannelBuffer buffer, Object msg) throws IOException { if (msg instanceof Request) { encodeRequest(channel, buffer, (Request) msg); } else if (msg instanceof Response) { encodeResponse(channel, buffer, (Response) msg); } else { super .encode(channel, buffer, msg); } }
我们这次是发送数据,因此会进入encodeRequest方法进行真正的编码,我们分析编码先了解下dubbo的协议: Dubbo 数据包分为消息头和消息体,消息头用于存储一些元信息,比如魔数(Magic),数据包类型(Request/Response),消息体长度(Data Length)等。消息体中用于存储具体的调用消息,比如方法名称,参数列表等。下面简单列举一下消息头的内容。 |偏移量(Bit) |字段 |取值 | | ————– | ————– | ————– | |0 ~ 7 |魔数高位| 0xda00 | |8 ~ 15 |魔数低位| 0xbb | |16 |数据包类型| 0 - Response, 1 - Request | |17 |调用方式| 仅在第16位被设为1的情况下有效,0 - 单向调用,1 - 双向调用 | |18 |事件标识| 0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包 | |19 ~ 23 |序列化器编号| 2 - Hessian2Serialization 3 - JavaSerialization 4 - CompactedJavaSerialization 6 - FastJsonSerialization 7 - NativeJavaSerialization 8 - KryoSerialization 9 - FstSerialization | |24 ~ 31| 状态 |20 - OK | |30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE | |32 ~ 95| 请求编号| 共8字节,运行时生成 | |96 ~ 127| 消息体长度| 运行时计算 |
然后我们再看下面的编码就会很轻松:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 protected void encodeRequest (Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); byte [] header = new byte [HEADER_LENGTH]; Bytes.short2bytes(MAGIC, header); header[2 ] = (byte ) (FLAG_REQUEST | serialization.getContentTypeId()); if (req.isTwoWay()) { header[2 ] |= FLAG_TWOWAY; } if (req.isEvent()) { header[2 ] |= FLAG_EVENT; } Bytes.long2bytes(req.getId(), header, 4 ); int savedWriteIndex = buffer.writerIndex(); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (req.isEvent()) { encodeEventData(channel, out, req.getData()); } else { encodeRequestData(channel, out, req.getData(), req.getVersion()); } out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); int len = bos.writtenBytes(); checkPayload(channel, len); Bytes.int2bytes(len, header, 12 ); buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); }
到此,我们已经将请求发送到了服务端,服务端的具体会在下面单独分析,我们继续分析消费端获取到响应后的操作。首先肯定是netty收到返回数据时候先调用解码器进行响应数据解码NettyCodecAdapter$InternalDecoder:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private class InternalDecoder extends ByteToMessageDecoder { @Override protected void decode (ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception { ChannelBuffer message = new NettyBackedChannelBuffer(input); NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); do { int saveReaderIndex = message.readerIndex(); Object msg = codec.decode(channel, message); if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { message.readerIndex(saveReaderIndex); break ; } else { if (saveReaderIndex == message.readerIndex()) { throw new IOException("Decode without read data." ); } if (msg != null ) { out.add(msg); } } } while (message.readable()); } }
DubboCountCodec.decode源码如下,主要是将接收多个消息然后封装到MultiMessage对象中,最终还是委托给DubboCodec来解码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public Object decode (Channel channel, ChannelBuffer buffer) throws IOException { int save = buffer.readerIndex(); MultiMessage result = MultiMessage.create(); do { Object obj = codec.decode(channel, buffer); if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) { buffer.readerIndex(save); break ; } else { result.addMessage(obj); logMessageLength(obj, buffer.readerIndex() - save); save = buffer.readerIndex(); } } while (true ); if (result.isEmpty()) { return Codec2.DecodeResult.NEED_MORE_INPUT; } if (result.size() == 1 ) { return result.get(0 ); } return result; }
DubboCodec的解码方法源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 public Object decode (Channel channel, ChannelBuffer buffer) throws IOException { int readable = buffer.readableBytes(); byte [] header = new byte [Math.min(readable, HEADER_LENGTH)]; buffer.readBytes(header); return decode(channel, buffer, readable, header); } protected Object decode (Channel channel, ChannelBuffer buffer, int readable, byte [] header) throws IOException { if (readable > 0 && header[0 ] != MAGIC_HIGH || readable > 1 && header[1 ] != MAGIC_LOW) { int length = header.length; if (header.length < readable) { header = Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); } for (int i = 1 ; i < header.length - 1 ; i++) { if (header[i] == MAGIC_HIGH && header[i + 1 ] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break ; } } return super .decode(channel, buffer, readable, header); } if (readable < HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } int len = Bytes.bytes2int(header, 12 ); checkPayload(channel, len); int tt = len + HEADER_LENGTH; if (readable < tt) { return DecodeResult.NEED_MORE_INPUT; } ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { return decodeBody(channel, is, header); } finally { } } protected Object decodeBody (Channel channel, InputStream is, byte [] header) throws IOException { byte flag = header[2 ], proto = (byte ) (flag & SERIALIZATION_MASK); long id = Bytes.bytes2long(header, 4 ); if ((flag & FLAG_REQUEST) == 0 ) { Response res = new Response(id); if ((flag & FLAG_EVENT) != 0 ) { res.setEvent(true ); } byte status = header[3 ]; res.setStatus(status); try { if (status == Response.OK) { Object data; if (res.isEvent()) { ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); data = decodeEventData(channel, in); } else { DecodeableRpcResult result; if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) { result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto); result.decode(); } else { result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto); } data = result; } res.setResult(data); } else { ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); res.setErrorMessage(in.readUTF()); } } catch (Throwable t) { if (log.isWarnEnabled()) { log.warn("Decode response failed: " + t.getMessage(), t); } res.setStatus(Response.CLIENT_ERROR); res.setErrorMessage(StringUtils.toString(t)); } return res; } else { Request req = new Request(id); req.setVersion(Version.getProtocolVersion()); req.setTwoWay((flag & FLAG_TWOWAY) != 0 ); if ((flag & FLAG_EVENT) != 0 ) { req.setEvent(true ); } try { Object data; if (req.isEvent()) { ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); data = decodeEventData(channel, in); } else { DecodeableRpcInvocation inv; if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) { inv = new DecodeableRpcInvocation(channel, req, is, proto); inv.decode(); } else { inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto); } data = inv; } req.setData(data); } catch (Throwable t) { if (log.isWarnEnabled()) { log.warn("Decode request failed: " + t.getMessage(), t); } req.setBroken(true ); req.setData(t); } return req; } }
netty调用解码器进行解码后,很快就会传递给当时设置的handler(nettyClientHandler),调用channelRead方法:
1 2 3 4 5 6 7 8 public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); handler.received(channel, msg); }
AllChannelHandler将是进入消费端业务线程池第一步,但是我们在这里不进行详细介绍,后期会单独介绍io线程和biz线程的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void received (Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if (message instanceof Request && t instanceof RejectedExecutionException){ sendFeedback(channel, (Request) message, t); return ; } throw new ExecutionException(message, channel, getClass() + " error when process received event ." , t); } }
提交的任务的run方法是这样的,这里的hander是:DecodeHandler,channel是NettyChannel,message是Response,从现在开始,所有的执行在biz线程中工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Override public void run () { if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break ; case DISCONNECTED: try { handler.disconnected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break ; case SENT: try { handler.sent(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } break ; case CAUGHT: try { handler.caught(channel, exception); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is: " + message + ", exception is " + exception, e); } break ; default : logger.warn("unknown state: " + state + ", message is " + message); } } }
DecodeHandler进行进一步解码:io线程中返回的message数据有可能是为完全解码的数据,比如我们的反序列化在io线程中并未操作,而是交给了biz线程,所以需要进一步解码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public void received (Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { decode(message); if (message instanceof Request) { decode(((Request) message).getData()); } if (message instanceof Response) { decode(((Response) message).getResult()); } handler.received(channel, message); } private void decode (Object message) { if (message instanceof Decodeable) { try { ((Decodeable) message).decode(); } catch (Throwable e) { } } } public Object decode (Channel channel, InputStream input) throws IOException { ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) .deserialize(channel.getUrl(), input); byte flag = in.readByte(); switch (flag) { case DubboCodec.RESPONSE_NULL_VALUE: break ; case DubboCodec.RESPONSE_VALUE: handleValue(in); break ; case DubboCodec.RESPONSE_WITH_EXCEPTION: handleException(in); break ; case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS: handleAttachment(in); break ; case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS: handleValue(in); handleAttachment(in); break ; case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS: handleException(in); handleAttachment(in); break ; default : throw new IOException("Unknown result flag, expect '0' '1' '2' '3' '4' '5', but received: " + flag); } if (in instanceof Cleanable) { ((Cleanable) in).cleanup(); } return this ; }
解码完毕后,紧接着交给HeaderExchangeHandler处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void received (Channel channel, Object message) throws RemotingException { final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); if (message instanceof Request) { Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { handleRequest(exchangeChannel, request); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { } else { handler.received(exchangeChannel, message); } } static void handleResponse (Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }
DefaultFuture可谓是功能多,但是功不可没,承接了跨不同线程的发送请求和获取结果的桥梁!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public class DefaultFuture extends CompletableFuture <Object > { private DefaultFuture (Channel channel, Request request, int timeout) { this .channel = channel; this .request = request; this .id = request.getId(); this .timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); FUTURES.put(id, this ); CHANNELS.put(id, channel); } public static DefaultFuture newFuture (Channel channel, Request request, int timeout, ExecutorService executor) { final DefaultFuture future = new DefaultFuture(channel, request, timeout); future.setExecutor(executor); if (executor instanceof ThreadlessExecutor) { ((ThreadlessExecutor) executor).setWaitingFuture(future); } timeoutCheck(future); return future; } public static void received (Channel channel, Response response) { received(channel, response, false ); } public static void received (Channel channel, Response response, boolean timeout) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null ) { future.doReceived(response); } else { } } finally { CHANNELS.remove(response.getId()); } } private void doReceived (Response res) { if (res == null ) { throw new IllegalStateException("response cannot be null" ); } if (res.getStatus() == Response.OK) { this .complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { this .completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this .completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; if (threadlessExecutor.isWaiting()) { threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" + " which is not an expected state, interrupt the thread manually by returning an exception." )); } } } }
消费端调用流程如下: 至此响应结果获取完毕,经过过滤器逐步回到调用层InvokerInvocationHandler,然后返回到代理对象proxy0到业务层,客户端整个调用过程比较复杂,要有耐心,了解消费端调用和获取响应结果的过程后,我们分析服务端对调用的处理就会轻松很多。 趁着还有感觉,下面我们就直接分析服务端收到请求后的一系列操作过程。
服务端处理请求 接收和解码请求 我们在分析消费者调用过程时候就说过,netty的发送和接收数据后首先会进入编码器和解码器,而服务端接收请求后首先会进入解码器进行解码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 private class InternalDecoder extends ByteToMessageDecoder { @Override protected void decode (ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception { ChannelBuffer message = new NettyBackedChannelBuffer(input); NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); do { int saveReaderIndex = message.readerIndex(); Object msg = codec.decode(channel, message); if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { message.readerIndex(saveReaderIndex); break ; } else { if (saveReaderIndex == message.readerIndex()) { throw new IOException("Decode without read data." ); } if (msg != null ) { out.add(msg); } } } while (message.readable()); } } public Object decode (Channel channel, ChannelBuffer buffer) throws IOException { int save = buffer.readerIndex(); MultiMessage result = MultiMessage.create(); do { Object obj = codec.decode(channel, buffer); if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) { buffer.readerIndex(save); break ; } else { result.addMessage(obj); logMessageLength(obj, buffer.readerIndex() - save); save = buffer.readerIndex(); } } while (true ); if (result.isEmpty()) { return Codec2.DecodeResult.NEED_MORE_INPUT; } if (result.size() == 1 ) { return result.get(0 ); } return result; }
最后交给DubboCodec进一步解码,解码方式在消费者调用时我们简单分析过:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 public Object decode (Channel channel, ChannelBuffer buffer) throws IOException { int readable = buffer.readableBytes(); byte [] header = new byte [Math.min(readable, HEADER_LENGTH)]; buffer.readBytes(header); return decode(channel, buffer, readable, header); } @Override protected Object decode (Channel channel, ChannelBuffer buffer, int readable, byte [] header) throws IOException { if (readable > 0 && header[0 ] != MAGIC_HIGH || readable > 1 && header[1 ] != MAGIC_LOW) { int length = header.length; if (header.length < readable) { header = Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); } for (int i = 1 ; i < header.length - 1 ; i++) { if (header[i] == MAGIC_HIGH && header[i + 1 ] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break ; } } return super .decode(channel, buffer, readable, header); } if (readable < HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } int len = Bytes.bytes2int(header, 12 ); checkPayload(channel, len); int tt = len + HEADER_LENGTH; if (readable < tt) { return DecodeResult.NEED_MORE_INPUT; } ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { return decodeBody(channel, is, header); } finally { } } protected Object decodeBody (Channel channel, InputStream is, byte [] header) throws IOException { byte flag = header[2 ], proto = (byte ) (flag & SERIALIZATION_MASK); long id = Bytes.bytes2long(header, 4 ); if ((flag & FLAG_REQUEST) == 0 ) { return res; } else { Request req = new Request(id); req.setVersion(Version.getProtocolVersion()); req.setTwoWay((flag & FLAG_TWOWAY) != 0 ); if ((flag & FLAG_EVENT) != 0 ) { req.setEvent(true ); } try { Object data; if (req.isEvent()) { ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); data = decodeEventData(channel, in); } else { DecodeableRpcInvocation inv; if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) { inv = new DecodeableRpcInvocation(channel, req, is, proto); inv.decode(); } else { inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto); } data = inv; } req.setData(data); } catch (Throwable t) { if (log.isWarnEnabled()) { log.warn("Decode request failed: " + t.getMessage(), t); } req.setBroken(true ); req.setData(t); } return req; } }
解码完毕后,netty执行内部逻辑后最终会调用dubbo设置的NettyServerHandler.channelRead方法,然后在以此经过NettyServer.received>MultiMessageHandler.received>AllChannelHandler.received,从这里开始我们的io线程会把后续工作转交给biz线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void received (Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if (message instanceof Request && t instanceof RejectedExecutionException){ sendFeedback(channel, (Request) message, t); return ; } throw new ExecutionException(message, channel, getClass() + " error when process received event ." , t); } }
接下来就biz线程池会执行ChannelEventRunnable任务的run方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 public void run () { if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e); } } else { } }
DecodeHandler会对可解码的message完成最后的解码工作,在消费者解码已经介绍过:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 public void received (Channel channel, Object message) throws RemotingException { if (message instanceof Request) { decode(((Request) message).getData()); } handler.received(channel, message); } private void decode (Object message) { if (message instanceof Decodeable) { try { ((Decodeable) message).decode(); } catch (Throwable e) {} } } public void decode () throws Exception { if (!hasDecoded && channel != null && inputStream != null ) { try { decode(channel, inputStream); } catch (Throwable e) { } finally { hasDecoded = true ; } } } public Object decode (Channel channel, InputStream input) throws IOException { ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) .deserialize(channel.getUrl(), input); String dubboVersion = in.readUTF(); request.setVersion(dubboVersion); setAttachment(DUBBO_VERSION_KEY, dubboVersion); String path = in.readUTF(); setAttachment(PATH_KEY, path); setAttachment(VERSION_KEY, in.readUTF()); setMethodName(in.readUTF()); String desc = in.readUTF(); setParameterTypesDesc(desc); try { Object[] args = DubboCodec.EMPTY_OBJECT_ARRAY; Class<?>[] pts = DubboCodec.EMPTY_CLASS_ARRAY; if (desc.length() > 0 ) { ServiceRepository repository = ApplicationModel.getServiceRepository(); ServiceDescriptor serviceDescriptor = repository.lookupService(path); if (serviceDescriptor != null ) { MethodDescriptor methodDescriptor = serviceDescriptor.getMethod(getMethodName(), desc); if (methodDescriptor != null ) { pts = methodDescriptor.getParameterClasses(); this .setReturnTypes(methodDescriptor.getReturnTypes()); } } if (pts == DubboCodec.EMPTY_CLASS_ARRAY) { if (!RpcUtils.isGenericCall(desc, getMethodName()) && !RpcUtils.isEcho(desc, getMethodName())) { throw new IllegalArgumentException("Service not found:" + path + ", " + getMethodName()); } pts = ReflectUtils.desc2classArray(desc); } args = new Object[pts.length]; for (int i = 0 ; i < args.length; i++) { try { args[i] = in.readObject(pts[i]); } catch (Exception e) { } } } setParameterTypes(pts); Map<String, Object> map = in.readAttachments(); if (map != null && map.size() > 0 ) { Map<String, Object> attachment = getObjectAttachments(); if (attachment == null ) { attachment = new HashMap<>(); } attachment.putAll(map); setObjectAttachments(attachment); } for (int i = 0 ; i < args.length; i++) { args[i] = decodeInvocationArgument(channel, this , pts, i, args[i]); } setArguments(args); String targetServiceName = buildKey((String) getAttachment(PATH_KEY), getAttachment(GROUP_KEY), getAttachment(VERSION_KEY)); setTargetServiceUniqueName(targetServiceName); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read invocation data failed." , e)); } finally { if (in instanceof Cleanable) { ((Cleanable) in).cleanup(); } } return this ; }
解码完毕后,下面将是通过解码后的message找到我们即将要调用的invoker。
找到服务端的invoker HeaderExchangeHandler.received源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public void received (Channel channel, Object message) throws RemotingException { final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); if (message instanceof Request) { Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { handleRequest(exchangeChannel, request); } else { handler.received(exchangeChannel, request.getData()); } } } } void handleRequest (final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); channel.send(res); return ; } Object msg = req.getData(); try { CompletionStage<Object> future = handler.reply(channel, msg); future.whenComplete((appResult, t) -> { try { if (t == null ) { res.setStatus(Response.OK); res.setResult(appResult); } else { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } channel.send(res); } catch (RemotingException e) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e); } }); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } }
我们看下DubboProtocol$ExchangeHandlerAdapter.reply,其主要目的就是获取要调用的invoker,其源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public CompletableFuture<Object> reply (ExchangeChannel channel, Object message) throws RemotingException { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods" ); boolean hasMethod = false ; if (methodsStr == null || !methodsStr.contains("," )) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split("," ); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true ; break ; } } } if (!hasMethod) { return null ; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); Result result = invoker.invoke(inv); return result.thenApply(Function.identity()); } Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { boolean isCallBackServiceInvoke = false ; boolean isStubServiceInvoke = false ; int port = channel.getLocalAddress().getPort(); String path = (String) inv.getObjectAttachments().get(PATH_KEY); String serviceKey = serviceKey( port, path, (String) inv.getObjectAttachments().get(VERSION_KEY), (String) inv.getObjectAttachments().get(GROUP_KEY) ); DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null ) { throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv)); } return exporter.getInvoker(); }
invoker的执行 invoker执行逻辑第一站就是执行ProtocolFilterWrapper下构建的过滤器链: EchoFilter>ClassLoaderFilter>GenericFilter>ContextFilter>TraceFilter>TimeoutFilter>MonitorFilter>ExceptionFilter>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public Result invoke (Invocation invocation) throws RpcException { Result asyncResult; try { asyncResult = filter.invoke(next, invocation); } catch (Exception e) { if (filter instanceof ListenableFilter) { ListenableFilter listenableFilter = ((ListenableFilter) filter); try { Filter.Listener listener = listenableFilter.listener(invocation); if (listener != null ) { listener.onError(e, invoker, invocation); } } finally { listenableFilter.removeListener(invocation); } } else if (filter instanceof Filter.Listener) { Filter.Listener listener = (Filter.Listener) filter; listener.onError(e, invoker, invocation); } throw e; } finally { } return asyncResult.whenCompleteWithContext((r, t) -> { if (filter instanceof ListenableFilter) { ListenableFilter listenableFilter = ((ListenableFilter) filter); Filter.Listener listener = listenableFilter.listener(invocation); try { if (listener != null ) { if (t == null ) { listener.onResponse(r, invoker, invocation); } else { listener.onError(t, invoker, invocation); } } } finally { listenableFilter.removeListener(invocation); } } else if (filter instanceof Filter.Listener) { Filter.Listener listener = (Filter.Listener) filter; if (t == null ) { listener.onResponse(r, invoker, invocation); } else { listener.onError(t, invoker, invocation); } } }); }
经过调用一系列过滤器和包装器,最终invoker会走到JavassistProxyFactory,进一步调用目标对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public Result invoke (Invocation invocation) throws RpcException { try { Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); CompletableFuture<Object> future = wrapWithFuture(value); CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> { AppResponse result = new AppResponse(); if (t != null ) { if (t instanceof CompletionException) { result.setException(t.getCause()); } else { result.setException(t); } } else { result.setValue(obj); } return result; }); return new AsyncRpcResult(appResponseFuture, invocation); } catch (InvocationTargetException e) { return AsyncRpcResult.newDefaultAsyncResult(null , e.getTargetException(), invocation); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } } public <T> Invoker<T> getInvoker (T proxy, Class<T> type, URL url) { final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$' ) < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke (T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
wrapper包装类最终调用类目标类proxy的目标方法sayHello,wrapper的源码如下:
到这里,服务方的目标方法执行完毕,下面将进入服务执行结果的响应逻辑分析。
服务的响应 返回结果:”Hello dubbo, response from provider: 172.11.11.77:2808”,此结果首先在JavasisstProxyFactory中被包装为CompletableFuture对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public Result invoke (Invocation invocation) throws RpcException { try { Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); CompletableFuture<Object> future = wrapWithFuture(value); CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> { AppResponse result = new AppResponse(); if (t != null ) { if (t instanceof CompletionException) { result.setException(t.getCause()); } else { result.setException(t); } } else { result.setValue(obj); } return result; }); return new AsyncRpcResult(appResponseFuture, invocation); } catch (InvocationTargetException e) { return AsyncRpcResult.newDefaultAsyncResult(null , e.getTargetException(), invocation); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } } private CompletableFuture<Object> wrapWithFuture (Object value) { if (RpcContext.getContext().isAsyncStarted()) { return ((AsyncContextImpl)(RpcContext.getContext().getAsyncContext())).getInternalFuture(); } else if (value instanceof CompletableFuture) { return (CompletableFuture<Object>) value; } return CompletableFuture.completedFuture(value); }
返回的CompletableFuture对象经过层层回调,最后会回到HeaderExchangeHandler的handleRequest中,在获取结果后开始响应请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void handleRequest (final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); Object msg = req.getData(); try { CompletionStage<Object> future = handler.reply(channel, msg); future.whenComplete((appResult, t) -> { try { if (t == null ) { res.setStatus(Response.OK); res.setResult(appResult); } else { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } channel.send(res); } catch (RemotingException e) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e); } }); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } }
HeaderExchangeChannel.send方法源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void send (Object message, boolean sent) throws RemotingException { if (message instanceof Request || message instanceof Response || message instanceof String) { channel.send(message, sent); } else { Request request = new Request(); request.setVersion(Version.getProtocolVersion()); request.setTwoWay(false ); request.setData(message); channel.send(request, sent); } }
NettyChannel发送数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void send (Object message, boolean sent) throws RemotingException { boolean success = true ; int timeout = 0 ; try { ChannelFuture future = channel.writeAndFlush(message); if (sent) { timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.cause(); if (cause != null ) { throw cause; } } catch (Throwable e) { removeChannelIfDisconnected(channel); throw new RemotingException(this , "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } }
到这里,我们肯定知道,netty发送数据前会先调用编码器进行编码的:NettyCodecAdapter$InternalEncoder
1 2 3 4 5 6 7 8 9 private class InternalEncoder extends MessageToByteEncoder { @Override protected void encode (ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { org.apache.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out); Channel ch = ctx.channel(); NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); codec.encode(channel, buffer, msg); } }
最终会调用DubboCodec进行编码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 protected void encodeResponse (Channel channel, ChannelBuffer buffer, Response res) throws IOException { int savedWriteIndex = buffer.writerIndex(); try { Serialization serialization = getSerialization(channel); byte [] header = new byte [HEADER_LENGTH]; Bytes.short2bytes(MAGIC, header); header[2 ] = serialization.getContentTypeId(); if (res.isHeartbeat()) { header[2 ] |= FLAG_EVENT; } byte status = res.getStatus(); header[3 ] = status; Bytes.long2bytes(res.getId(), header, 4 ); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (status == Response.OK) { if (res.isHeartbeat()) { encodeEventData(channel, out, res.getResult()); } else { encodeResponseData(channel, out, res.getResult(), res.getVersion()); } } else { out.writeUTF(res.getErrorMessage()); } out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); int len = bos.writtenBytes(); checkPayload(channel, len); Bytes.int2bytes(len, header, 12 ); buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); } catch (Throwable t) { } }
到此,服务端的响应分析结束。
总结 本篇文章从消费者调用到服务端响应整个过程走了一遍,整个过程相对比较复杂。其中涉及到了路由,集群容错,负载均衡,过滤器责任链,监听器,异步转同步,biz线程和io线程的互转,请求的发送和响应结果匹配,协议的编码解码,对象的序列化和反序列化等等。 本文目的是对整个调用过程的熟悉,涉及多个重要点只是简单提下,在今后的文章中会进行补充。
边看源码边写本文,其中有以下几个想法:
dubbo为了减少IO线程的阻塞,把工作尽量交给了biz线程,但是消费端的请求对象序列化和服务端的响应对象序列化依然绑定在IO上的,最为高性能之称,此处应该可以进一步优化。
服务降级,熔断,限流还很简陋。
路由不够好用。
协议不支持根据请求包的大小进行自动适配最合适的协议。
参考文献