简介 本篇文件我们研究dubbo服务的调用过程,即从消费端发起接口调用到服务端接收请求,然后返回到消费端结果的整个一个调用过程。
消费端发起调用 invoker的调用 直接看在服务引用过程中被代理后的接口源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class proxy0 implements ClassGenerator .DC , Destroyable , EchoService , DemoService { public static Method[] methods; private InvocationHandler handler; public String sayHello (String string) { Object[] arrobject = new Object[]{string}; Object object = this .handler.invoke(this , methods[0 ], arrobject); return (String)object; } public proxy0 (InvocationHandler invocationHandler) { this .handler = invocationHandler; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class InvokerInvocationHandler implements InvocationHandler { private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class); private final Invoker<?> invoker; private ConsumerModel consumerModel; public InvokerInvocationHandler (Invoker<?> handler) { this .invoker = handler; String serviceKey = invoker.getUrl().getServiceKey(); if (serviceKey != null ) { this .consumerModel = ApplicationModel.getConsumerModel(serviceKey); } } @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args); String serviceKey = invoker.getUrl().getServiceKey(); rpcInvocation.setTargetServiceUniqueName(serviceKey); if (consumerModel != null ) { rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel); rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method)); } return invoker.invoke(rpcInvocation).recreate(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 public class MockClusterInvoker <T > implements ClusterInvoker <T > { private static final Logger logger = LoggerFactory.getLogger(MockClusterInvoker.class); private final Directory<T> directory; private final Invoker<T> invoker; public MockClusterInvoker (Directory<T> directory, Invoker<T> invoker) { this .directory = directory; this .invoker = invoker; } @Override public Result invoke (Invocation invocation) throws RpcException { Result result = null ; String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || "false" .equalsIgnoreCase(value)) { result = this .invoker.invoke(invocation); } else if (value.startsWith("force" )) { if (logger.isWarnEnabled()) { logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl()); } result = doMockInvoke(invocation, null ); } else { try { result = this .invoker.invoke(invocation); if (result.getException() != null && result.getException() instanceof RpcException){ RpcException rpcException= (RpcException)result.getException(); if (rpcException.isBiz()){ throw rpcException; }else { result = doMockInvoke(invocation, rpcException); } } } catch (RpcException e) { if (e.isBiz()) { throw e; } if (logger.isWarnEnabled()) { logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e); } result = doMockInvoke(invocation, e); } } return result; } } 继续下一个请求,把请求传递给AbstractCluster的内部类:InterceptorInvokerNode,进行cluster拦截器的调用前后调用: ```java protected class InterceptorInvokerNode <T > extends AbstractClusterInvoker <T > { private AbstractClusterInvoker<T> clusterInvoker; private ClusterInterceptor interceptor; private AbstractClusterInvoker<T> next; @Override public Result invoke (Invocation invocation) throws RpcException { Result asyncResult; try { interceptor.before(next, invocation); asyncResult = interceptor.intercept(next, invocation); } catch (Exception e) { if (interceptor instanceof ClusterInterceptor.Listener) { ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; listener.onError(e, clusterInvoker, invocation); } throw e; } finally { interceptor.after(next, invocation); } return asyncResult.whenCompleteWithContext((r, t) -> { if (interceptor instanceof ClusterInterceptor.Listener) { ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; if (t == null ) { listener.onMessage(r, clusterInvoker, invocation); } else { listener.onError(t, clusterInvoker, invocation); } } }); } }
调用FailoverClusterInvoker.doInvoke之前,会先执行其父类的invoke方法: 父类:AbstractClusterInvoker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 public abstract class AbstractClusterInvoker <T > implements ClusterInvoker <T > { @Override public Result invoke (final Invocation invocation) throws RpcException { checkWhetherDestroyed(); Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments(); if (contextAttachments != null && contextAttachments.size() != 0 ) { ((RpcInvocation) invocation).addObjectAttachments(contextAttachments); } List<Invoker<T>> invokers = list(invocation); LoadBalance loadbalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); } protected void checkWhetherDestroyed () { if (destroyed.get()) { throw new RpcException("Rpc cluster invoker for " + getInterface() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + " is now destroyed! Can not invoke any more." ); } } protected List<Invoker<T>> list(Invocation invocation) throws RpcException { return directory.list(invocation); } protected LoadBalance initLoadBalance (List<Invoker<T>> invokers, Invocation invocation) { if (CollectionUtils.isNotEmpty(invokers)) { return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0 ).getUrl() .getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE)); } else { return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE); } } protected Invoker<T> select (LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (CollectionUtils.isEmpty(invokers)) { return null ; } String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName(); boolean sticky = invokers.get(0 ).getUrl() .getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY); if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { stickyInvoker = null ; } if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { if (availablecheck && stickyInvoker.isAvailable()) { return stickyInvoker; } } Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected); if (sticky) { stickyInvoker = invoker; } return invoker; } private Invoker<T> doSelect (LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (invokers.size() == 1 ) { return invokers.get(0 ); } Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { try { Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if (rInvoker != null ) { invoker = rInvoker; } else { int index = invokers.indexOf(invoker); try { invoker = invokers.get((index + 1 ) % invokers.size()); } catch (Exception e) { logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore." , e); } } } catch (Throwable t) { logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url" , t); } } return invoker; } private Invoker<T> reselect (LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException { List<Invoker<T>> reselectInvokers = new ArrayList<>( invokers.size() > 1 ? (invokers.size() - 1 ) : invokers.size()); for (Invoker<T> invoker : invokers) { if (availablecheck && !invoker.isAvailable()) { continue ; } if (selected == null || !selected.contains(invoker)) { reselectInvokers.add(invoker); } } if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } if (selected != null ) { for (Invoker<T> invoker : selected) { if ((invoker.isAvailable()) && !reselectInvokers.contains(invoker)) { reselectInvokers.add(invoker); } } } if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } return null ; } protected abstract Result doInvoke (Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException ;}
zone-aware=org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster 每个具体实现会在后续的文章进行介绍,本次只简单介绍下FailoverCluster的源码:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public class FailoverClusterInvoker <T > extends AbstractClusterInvoker <T > { private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); public FailoverClusterInvoker (Directory<T> directory) { super (directory); } @Override public Result doInvoke (Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyInvokers = invokers; String methodName = RpcUtils.getMethodName(invocation); int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1 ; if (len <= 0 ) { len = 1 ; } RpcException le = null ; List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); Set<String> providers = new HashSet<String>(len); for (int i = 0 ; i < len; i++) { if (i > 0 ) { super .checkWhetherDestroyed(); copyInvokers = super .list(invocation); } Invoker<T> invoker = super .select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); return result; } catch (RpcException e) { if (e.isBiz()) { throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(); } }
稍微看下异步转同步的相关源码,这样在最后返回结果的时候不会迷茫:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public Result invoke (Invocation invocation) throws RpcException { Result asyncResult = invoker.invoke(invocation); try { if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) { asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } } return asyncResult; } @Override public Result get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; threadlessExecutor.waitAndDrain(); } return responseFuture.get(timeout, unit); } public void waitAndDrain () throws InterruptedException { if (finished) { return ; } Runnable runnable = queue.take(); synchronized (lock) { waiting = false ; runnable.run(); } runnable = queue.poll(); while (runnable != null ) { try { runnable.run(); } catch (Throwable t) { logger.info(t); } runnable = queue.poll(); } finished = true ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public abstract class AbstractInvoker <T > implements Invoker <T > { private final Class<T> type; private final URL url; private final Map<String, Object> attachment; private volatile boolean available = true ; private AtomicBoolean destroyed = new AtomicBoolean(false ); @Override public Result invoke (Invocation inv) throws RpcException { RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker(this ); if (CollectionUtils.isNotEmptyMap(attachment)) { invocation.addObjectAttachmentsIfAbsent(attachment); } Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments(); if (CollectionUtils.isNotEmptyMap(contextAttachments)) { invocation.addObjectAttachments(contextAttachments); } invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation)); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); AsyncRpcResult asyncResult; try { asyncResult = (AsyncRpcResult) doInvoke(invocation); } catch (InvocationTargetException e) { Throwable te = e.getTargetException(); if (te == null ) { asyncResult = AsyncRpcResult.newDefaultAsyncResult(null , e, invocation); } else { if (te instanceof RpcException) { ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION); } asyncResult = AsyncRpcResult.newDefaultAsyncResult(null , te, invocation); } } catch (RpcException e) { if (e.isBiz()) { asyncResult = AsyncRpcResult.newDefaultAsyncResult(null , e, invocation); } else { throw e; } } catch (Throwable e) { asyncResult = AsyncRpcResult.newDefaultAsyncResult(null , e, invocation); } RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture())); return asyncResult; } protected ExecutorService getCallbackExecutor (URL url, Invocation inv) { ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url); if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) { return new ThreadlessExecutor(sharedExecutor); } else { return sharedExecutor; } } protected abstract Result doInvoke (Invocation invocation) throws Throwable ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public class DubboInvoker <T > extends AbstractInvoker <T > { private final ExchangeClient[] clients; private final AtomicPositiveInteger index = new AtomicPositiveInteger(); private final String version; private final ReentrantLock destroyLock = new ReentrantLock(); private final Set<Invoker<?>> invokers; @Override protected Result doInvoke (final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1 ) { currentClient = clients[0 ]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = calculateTimeout(invocation, methodName); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false ); currentClient.send(inv, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { ExecutorService executor = getCallbackExecutor(getUrl(), inv); CompletableFuture<Object> request = currentClient.request(inv, timeout, executor); CompletableFuture<AppResponse> appResponseFuture = request.thenApply(obj -> (AppResponse) obj); FutureContext.getContext().setCompatibleFuture(appResponseFuture); AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv); result.setExecutor(executor); return result; } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } @Override public boolean isAvailable () { if (!super .isAvailable()) { return false ; } for (ExchangeClient client : clients) { if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)) { return true ; } } return false ; } }
proxy0.sayHello 代理类执行
InvokerInvocationHandler.invoke 代理类增强器执行
MockClusterInvoker.invoke mock 集群执行
AbstractCluster$InterceptorInvokerNode.invoke 执行cluster拦截器
FailoverClusterInvoker.doInvoke 容错集群执行
InvokerWrapper.invoke invoker包装类
ProtocolFilterWrapper$1.invoke 进行过滤器调用链
ListenerInvokerWrapper.invoke 执行监听wrapper
AsyncToSyncInvoker.invoke 结果获取异步转同步
DubboInvoker.doInvoke 最终交给交换器发起request请求
ReferenceCountExchangeClient.request 引用计数器交换器,只做当前被调用服务引用个数的统计
HeaderExchangeClient.request 初始化头协议通道所谓头协议,是因为协议的参数放在请求头中
HeaderExchangeChannel.request 将请求数据封装Request,然后将Request,channel,线程池执行器excutor封装到DefaultFuture(里面封装了id和DefaultFuture映射)
NettyClient.send 根据netty的NioSocketChannel获取或创建dubbo定义的NettyChannel
NettyChannel.send 内部发送数据是通过netty的NioSocketChannel进行发送 看下HeaderExchangeChannel封装的Request对象:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public CompletableFuture<Object> request (Object request, int timeout, ExecutorService executor) throws RemotingException { if (closed) { throw new RemotingException(this .getLocalAddress(), null , "Failed to send request " + request + ", cause: The channel " + this + " is closed!" ); } Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true ); req.setData(request); DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
最终经过多次调用,会经过NettyChannel的send方法:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public void send (Object message, boolean sent) throws RemotingException { super .send(message, sent); boolean success = true ; int timeout = 0 ; try { ChannelFuture future = channel.writeAndFlush(message); if (sent) { timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.cause(); if (cause != null ) { throw cause; } } catch (Throwable e) { removeChannelIfDisconnected(channel); throw new RemotingException(this , "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this , "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 protected void doOpen () throws Throwable { final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this ); bootstrap = new Bootstrap(); bootstrap.group(NIO_EVENT_LOOP_GROUP) .option(ChannelOption.SO_KEEPALIVE, true ) .option(ChannelOption.TCP_NODELAY, true ) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .channel(socketChannelClass()); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000 , getConnectTimeout())); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { int heartbeatInterval = UrlUtils.getHeartbeat(getUrl()); if (getUrl().getParameter(SSL_ENABLED_KEY, false )) { ch.pipeline().addLast("negotiation" , SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler)); } NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this ); ch.pipeline() .addLast("decoder" , adapter.getDecoder()) .addLast("encoder" , adapter.getEncoder()) .addLast("client-idle-handler" , new IdleStateHandler(heartbeatInterval, 0 , 0 , MILLISECONDS)) .addLast("handler" , nettyClientHandler); String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST); if (socksProxyHost != null ) { int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT)); Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort)); ch.pipeline().addFirst(socks5ProxyHandler); } } }); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private class InternalEncoder extends MessageToByteEncoder { @Override protected void encode (ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { org.apache.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out); Channel ch = ctx.channel(); NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); codec.encode(channel, buffer, msg); } }
1 2 3 4 5 6 7 8 9 10 public void encode (Channel channel, ChannelBuffer buffer, Object msg) throws IOException { if (msg instanceof Request) { encodeRequest(channel, buffer, (Request) msg); } else if (msg instanceof Response) { encodeResponse(channel, buffer, (Response) msg); } else { super .encode(channel, buffer, msg); } }
我们这次是发送数据,因此会进入encodeRequest方法进行真正的编码,我们分析编码先了解下dubbo的协议: Dubbo 数据包分为消息头和消息体,消息头用于存储一些元信息,比如魔数(Magic),数据包类型(Request/Response),消息体长度(Data Length)等。消息体中用于存储具体的调用消息,比如方法名称,参数列表等。下面简单列举一下消息头的内容。 |偏移量(Bit) |字段 |取值 | | ————– | ————– | ————– | |0 ~ 7 |魔数高位| 0xda00 | |8 ~ 15 |魔数低位| 0xbb | |16 |数据包类型| 0 - Response, 1 - Request | |17 |调用方式| 仅在第16位被设为1的情况下有效,0 - 单向调用,1 - 双向调用 | |18 |事件标识| 0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包 | |19 ~ 23 |序列化器编号| 2 - Hessian2Serialization 3 - JavaSerialization 4 - CompactedJavaSerialization 6 - FastJsonSerialization 7 - NativeJavaSerialization 8 - KryoSerialization 9 - FstSerialization | |24 ~ 31| 状态 |20 - OK | |30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE | |32 ~ 95| 请求编号| 共8字节,运行时生成 | |96 ~ 127| 消息体长度| 运行时计算 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 protected void encodeRequest (Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); byte [] header = new byte [HEADER_LENGTH]; Bytes.short2bytes(MAGIC, header); header[2 ] = (byte ) (FLAG_REQUEST | serialization.getContentTypeId()); if (req.isTwoWay()) { header[2 ] |= FLAG_TWOWAY; } if (req.isEvent()) { header[2 ] |= FLAG_EVENT; } Bytes.long2bytes(req.getId(), header, 4 ); int savedWriteIndex = buffer.writerIndex(); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (req.isEvent()) { encodeEventData(channel, out, req.getData()); } else { encodeRequestData(channel, out, req.getData(), req.getVersion()); } out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); int len = bos.writtenBytes(); checkPayload(channel, len); Bytes.int2bytes(len, header, 12 ); buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private class InternalDecoder extends ByteToMessageDecoder { @Override protected void decode (ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception { ChannelBuffer message = new NettyBackedChannelBuffer(input); NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); do { int saveReaderIndex = message.readerIndex(); Object msg = codec.decode(channel, message); if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { message.readerIndex(saveReaderIndex); break ; } else { if (saveReaderIndex == message.readerIndex()) { throw new IOException("Decode without read data." ); } if (msg != null ) { out.add(msg); } } } while (message.readable()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public Object decode (Channel channel, ChannelBuffer buffer) throws IOException { int save = buffer.readerIndex(); MultiMessage result = MultiMessage.create(); do { Object obj = codec.decode(channel, buffer); if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) { buffer.readerIndex(save); break ; } else { result.addMessage(obj); logMessageLength(obj, buffer.readerIndex() - save); save = buffer.readerIndex(); } } while (true ); if (result.isEmpty()) { return Codec2.DecodeResult.NEED_MORE_INPUT; } if (result.size() == 1 ) { return result.get(0 ); } return result; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 public Object decode (Channel channel, ChannelBuffer buffer) throws IOException { int readable = buffer.readableBytes(); byte [] header = new byte [Math.min(readable, HEADER_LENGTH)]; buffer.readBytes(header); return decode(channel, buffer, readable, header); } protected Object decode (Channel channel, ChannelBuffer buffer, int readable, byte [] header) throws IOException { if (readable > 0 && header[0 ] != MAGIC_HIGH || readable > 1 && header[1 ] != MAGIC_LOW) { int length = header.length; if (header.length < readable) { header = Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); } for (int i = 1 ; i < header.length - 1 ; i++) { if (header[i] == MAGIC_HIGH && header[i + 1 ] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break ; } } return super .decode(channel, buffer, readable, header); } if (readable < HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } int len = Bytes.bytes2int(header, 12 ); checkPayload(channel, len); int tt = len + HEADER_LENGTH; if (readable < tt) { return DecodeResult.NEED_MORE_INPUT; } ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { return decodeBody(channel, is, header); } finally { } } protected Object decodeBody (Channel channel, InputStream is, byte [] header) throws IOException { byte flag = header[2 ], proto = (byte ) (flag & SERIALIZATION_MASK); long id = Bytes.bytes2long(header, 4 ); if ((flag & FLAG_REQUEST) == 0 ) { Response res = new Response(id); if ((flag & FLAG_EVENT) != 0 ) { res.setEvent(true ); } byte status = header[3 ]; res.setStatus(status); try { if (status == Response.OK) { Object data; if (res.isEvent()) { ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); data = decodeEventData(channel, in); } else { DecodeableRpcResult result; if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) { result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto); result.decode(); } else { result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto); } data = result; } res.setResult(data); } else { ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); res.setErrorMessage(in.readUTF()); } } catch (Throwable t) { if (log.isWarnEnabled()) { log.warn("Decode response failed: " + t.getMessage(), t); } res.setStatus(Response.CLIENT_ERROR); res.setErrorMessage(StringUtils.toString(t)); } return res; } else { Request req = new Request(id); req.setVersion(Version.getProtocolVersion()); req.setTwoWay((flag & FLAG_TWOWAY) != 0 ); if ((flag & FLAG_EVENT) != 0 ) { req.setEvent(true ); } try { Object data; if (req.isEvent()) { ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); data = decodeEventData(channel, in); } else { DecodeableRpcInvocation inv; if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) { inv = new DecodeableRpcInvocation(channel, req, is, proto); inv.decode(); } else { inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto); } data = inv; } req.setData(data); } catch (Throwable t) { if (log.isWarnEnabled()) { log.warn("Decode request failed: " + t.getMessage(), t); } req.setBroken(true ); req.setData(t); } return req; } }
1 2 3 4 5 6 7 8 public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); handler.received(channel, msg); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void received (Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if (message instanceof Request && t instanceof RejectedExecutionException){ sendFeedback(channel, (Request) message, t); return ; } throw new ExecutionException(message, channel, getClass() + " error when process received event ." , t); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Override public void run () { if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break ; case DISCONNECTED: try { handler.disconnected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break ; case SENT: try { handler.sent(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } break ; case CAUGHT: try { handler.caught(channel, exception); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is: " + message + ", exception is " + exception, e); } break ; default : logger.warn("unknown state: " + state + ", message is " + message); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public void received (Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { decode(message); if (message instanceof Request) { decode(((Request) message).getData()); } if (message instanceof Response) { decode(((Response) message).getResult()); } handler.received(channel, message); } private void decode (Object message) { if (message instanceof Decodeable) { try { ((Decodeable) message).decode(); } catch (Throwable e) { } } } public Object decode (Channel channel, InputStream input) throws IOException { ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) .deserialize(channel.getUrl(), input); byte flag = in.readByte(); switch (flag) { case DubboCodec.RESPONSE_NULL_VALUE: break ; case DubboCodec.RESPONSE_VALUE: handleValue(in); break ; case DubboCodec.RESPONSE_WITH_EXCEPTION: handleException(in); break ; case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS: handleAttachment(in); break ; case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS: handleValue(in); handleAttachment(in); break ; case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS: handleException(in); handleAttachment(in); break ; default : throw new IOException("Unknown result flag, expect '0' '1' '2' '3' '4' '5', but received: " + flag); } if (in instanceof Cleanable) { ((Cleanable) in).cleanup(); } return this ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void received (Channel channel, Object message) throws RemotingException { final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); if (message instanceof Request) { Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { handleRequest(exchangeChannel, request); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { } else { handler.received(exchangeChannel, message); } } static void handleResponse (Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public class DefaultFuture extends CompletableFuture <Object > { private DefaultFuture (Channel channel, Request request, int timeout) { this .channel = channel; this .request = request; this .id = request.getId(); this .timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); FUTURES.put(id, this ); CHANNELS.put(id, channel); } public static DefaultFuture newFuture (Channel channel, Request request, int timeout, ExecutorService executor) { final DefaultFuture future = new DefaultFuture(channel, request, timeout); future.setExecutor(executor); if (executor instanceof ThreadlessExecutor) { ((ThreadlessExecutor) executor).setWaitingFuture(future); } timeoutCheck(future); return future; } public static void received (Channel channel, Response response) { received(channel, response, false ); } public static void received (Channel channel, Response response, boolean timeout) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null ) { future.doReceived(response); } else { } } finally { CHANNELS.remove(response.getId()); } } private void doReceived (Response res) { if (res == null ) { throw new IllegalStateException("response cannot be null" ); } if (res.getStatus() == Response.OK) { this .complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { this .completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this .completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; if (threadlessExecutor.isWaiting()) { threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" + " which is not an expected state, interrupt the thread manually by returning an exception." )); } } } }
消费端调用流程如下: 至此响应结果获取完毕,经过过滤器逐步回到调用层InvokerInvocationHandler,然后返回到代理对象proxy0到业务层,客户端整个调用过程比较复杂,要有耐心,了解消费端调用和获取响应结果的过程后,我们分析服务端对调用的处理就会轻松很多。 趁着还有感觉,下面我们就直接分析服务端收到请求后的一系列操作过程。
服务端处理请求 接收和解码请求 我们在分析消费者调用过程时候就说过,netty的发送和接收数据后首先会进入编码器和解码器,而服务端接收请求后首先会进入解码器进行解码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 private class InternalDecoder extends ByteToMessageDecoder { @Override protected void decode (ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception { ChannelBuffer message = new NettyBackedChannelBuffer(input); NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); do { int saveReaderIndex = message.readerIndex(); Object msg = codec.decode(channel, message); if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { message.readerIndex(saveReaderIndex); break ; } else { if (saveReaderIndex == message.readerIndex()) { throw new IOException("Decode without read data." ); } if (msg != null ) { out.add(msg); } } } while (message.readable()); } } public Object decode (Channel channel, ChannelBuffer buffer) throws IOException { int save = buffer.readerIndex(); MultiMessage result = MultiMessage.create(); do { Object obj = codec.decode(channel, buffer); if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) { buffer.readerIndex(save); break ; } else { result.addMessage(obj); logMessageLength(obj, buffer.readerIndex() - save); save = buffer.readerIndex(); } } while (true ); if (result.isEmpty()) { return Codec2.DecodeResult.NEED_MORE_INPUT; } if (result.size() == 1 ) { return result.get(0 ); } return result; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 public Object decode (Channel channel, ChannelBuffer buffer) throws IOException { int readable = buffer.readableBytes(); byte [] header = new byte [Math.min(readable, HEADER_LENGTH)]; buffer.readBytes(header); return decode(channel, buffer, readable, header); } @Override protected Object decode (Channel channel, ChannelBuffer buffer, int readable, byte [] header) throws IOException { if (readable > 0 && header[0 ] != MAGIC_HIGH || readable > 1 && header[1 ] != MAGIC_LOW) { int length = header.length; if (header.length < readable) { header = Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); } for (int i = 1 ; i < header.length - 1 ; i++) { if (header[i] == MAGIC_HIGH && header[i + 1 ] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break ; } } return super .decode(channel, buffer, readable, header); } if (readable < HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } int len = Bytes.bytes2int(header, 12 ); checkPayload(channel, len); int tt = len + HEADER_LENGTH; if (readable < tt) { return DecodeResult.NEED_MORE_INPUT; } ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { return decodeBody(channel, is, header); } finally { } } protected Object decodeBody (Channel channel, InputStream is, byte [] header) throws IOException { byte flag = header[2 ], proto = (byte ) (flag & SERIALIZATION_MASK); long id = Bytes.bytes2long(header, 4 ); if ((flag & FLAG_REQUEST) == 0 ) { return res; } else { Request req = new Request(id); req.setVersion(Version.getProtocolVersion()); req.setTwoWay((flag & FLAG_TWOWAY) != 0 ); if ((flag & FLAG_EVENT) != 0 ) { req.setEvent(true ); } try { Object data; if (req.isEvent()) { ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); data = decodeEventData(channel, in); } else { DecodeableRpcInvocation inv; if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) { inv = new DecodeableRpcInvocation(channel, req, is, proto); inv.decode(); } else { inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto); } data = inv; } req.setData(data); } catch (Throwable t) { if (log.isWarnEnabled()) { log.warn("Decode request failed: " + t.getMessage(), t); } req.setBroken(true ); req.setData(t); } return req; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void received (Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if (message instanceof Request && t instanceof RejectedExecutionException){ sendFeedback(channel, (Request) message, t); return ; } throw new ExecutionException(message, channel, getClass() + " error when process received event ." , t); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 public void run () { if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e); } } else { } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 public void received (Channel channel, Object message) throws RemotingException { if (message instanceof Request) { decode(((Request) message).getData()); } handler.received(channel, message); } private void decode (Object message) { if (message instanceof Decodeable) { try { ((Decodeable) message).decode(); } catch (Throwable e) {} } } public void decode () throws Exception { if (!hasDecoded && channel != null && inputStream != null ) { try { decode(channel, inputStream); } catch (Throwable e) { } finally { hasDecoded = true ; } } } public Object decode (Channel channel, InputStream input) throws IOException { ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) .deserialize(channel.getUrl(), input); String dubboVersion = in.readUTF(); request.setVersion(dubboVersion); setAttachment(DUBBO_VERSION_KEY, dubboVersion); String path = in.readUTF(); setAttachment(PATH_KEY, path); setAttachment(VERSION_KEY, in.readUTF()); setMethodName(in.readUTF()); String desc = in.readUTF(); setParameterTypesDesc(desc); try { Object[] args = DubboCodec.EMPTY_OBJECT_ARRAY; Class<?>[] pts = DubboCodec.EMPTY_CLASS_ARRAY; if (desc.length() > 0 ) { ServiceRepository repository = ApplicationModel.getServiceRepository(); ServiceDescriptor serviceDescriptor = repository.lookupService(path); if (serviceDescriptor != null ) { MethodDescriptor methodDescriptor = serviceDescriptor.getMethod(getMethodName(), desc); if (methodDescriptor != null ) { pts = methodDescriptor.getParameterClasses(); this .setReturnTypes(methodDescriptor.getReturnTypes()); } } if (pts == DubboCodec.EMPTY_CLASS_ARRAY) { if (!RpcUtils.isGenericCall(desc, getMethodName()) && !RpcUtils.isEcho(desc, getMethodName())) { throw new IllegalArgumentException("Service not found:" + path + ", " + getMethodName()); } pts = ReflectUtils.desc2classArray(desc); } args = new Object[pts.length]; for (int i = 0 ; i < args.length; i++) { try { args[i] = in.readObject(pts[i]); } catch (Exception e) { } } } setParameterTypes(pts); Map<String, Object> map = in.readAttachments(); if (map != null && map.size() > 0 ) { Map<String, Object> attachment = getObjectAttachments(); if (attachment == null ) { attachment = new HashMap<>(); } attachment.putAll(map); setObjectAttachments(attachment); } for (int i = 0 ; i < args.length; i++) { args[i] = decodeInvocationArgument(channel, this , pts, i, args[i]); } setArguments(args); String targetServiceName = buildKey((String) getAttachment(PATH_KEY), getAttachment(GROUP_KEY), getAttachment(VERSION_KEY)); setTargetServiceUniqueName(targetServiceName); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read invocation data failed." , e)); } finally { if (in instanceof Cleanable) { ((Cleanable) in).cleanup(); } } return this ; }
找到服务端的invoker HeaderExchangeHandler.received源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public void received (Channel channel, Object message) throws RemotingException { final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); if (message instanceof Request) { Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { handleRequest(exchangeChannel, request); } else { handler.received(exchangeChannel, request.getData()); } } } } void handleRequest (final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); channel.send(res); return ; } Object msg = req.getData(); try { CompletionStage<Object> future = handler.reply(channel, msg); future.whenComplete((appResult, t) -> { try { if (t == null ) { res.setStatus(Response.OK); res.setResult(appResult); } else { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } channel.send(res); } catch (RemotingException e) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e); } }); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public CompletableFuture<Object> reply (ExchangeChannel channel, Object message) throws RemotingException { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods" ); boolean hasMethod = false ; if (methodsStr == null || !methodsStr.contains("," )) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split("," ); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true ; break ; } } } if (!hasMethod) { return null ; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); Result result = invoker.invoke(inv); return result.thenApply(Function.identity()); } Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { boolean isCallBackServiceInvoke = false ; boolean isStubServiceInvoke = false ; int port = channel.getLocalAddress().getPort(); String path = (String) inv.getObjectAttachments().get(PATH_KEY); String serviceKey = serviceKey( port, path, (String) inv.getObjectAttachments().get(VERSION_KEY), (String) inv.getObjectAttachments().get(GROUP_KEY) ); DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null ) { throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv)); } return exporter.getInvoker(); }
invoker的执行 invoker执行逻辑第一站就是执行ProtocolFilterWrapper下构建的过滤器链: EchoFilter>ClassLoaderFilter>GenericFilter>ContextFilter>TraceFilter>TimeoutFilter>MonitorFilter>ExceptionFilter>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public Result invoke (Invocation invocation) throws RpcException { Result asyncResult; try { asyncResult = filter.invoke(next, invocation); } catch (Exception e) { if (filter instanceof ListenableFilter) { ListenableFilter listenableFilter = ((ListenableFilter) filter); try { Filter.Listener listener = listenableFilter.listener(invocation); if (listener != null ) { listener.onError(e, invoker, invocation); } } finally { listenableFilter.removeListener(invocation); } } else if (filter instanceof Filter.Listener) { Filter.Listener listener = (Filter.Listener) filter; listener.onError(e, invoker, invocation); } throw e; } finally { } return asyncResult.whenCompleteWithContext((r, t) -> { if (filter instanceof ListenableFilter) { ListenableFilter listenableFilter = ((ListenableFilter) filter); Filter.Listener listener = listenableFilter.listener(invocation); try { if (listener != null ) { if (t == null ) { listener.onResponse(r, invoker, invocation); } else { listener.onError(t, invoker, invocation); } } } finally { listenableFilter.removeListener(invocation); } } else if (filter instanceof Filter.Listener) { Filter.Listener listener = (Filter.Listener) filter; if (t == null ) { listener.onResponse(r, invoker, invocation); } else { listener.onError(t, invoker, invocation); } } }); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public Result invoke (Invocation invocation) throws RpcException { try { Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); CompletableFuture<Object> future = wrapWithFuture(value); CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> { AppResponse result = new AppResponse(); if (t != null ) { if (t instanceof CompletionException) { result.setException(t.getCause()); } else { result.setException(t); } } else { result.setValue(obj); } return result; }); return new AsyncRpcResult(appResponseFuture, invocation); } catch (InvocationTargetException e) { return AsyncRpcResult.newDefaultAsyncResult(null , e.getTargetException(), invocation); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } } public <T> Invoker<T> getInvoker (T proxy, Class<T> type, URL url) { final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$' ) < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke (T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
服务的响应 返回结果:”Hello dubbo, response from provider:”,此结果首先在JavasisstProxyFactory中被包装为CompletableFuture对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public Result invoke (Invocation invocation) throws RpcException { try { Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); CompletableFuture<Object> future = wrapWithFuture(value); CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> { AppResponse result = new AppResponse(); if (t != null ) { if (t instanceof CompletionException) { result.setException(t.getCause()); } else { result.setException(t); } } else { result.setValue(obj); } return result; }); return new AsyncRpcResult(appResponseFuture, invocation); } catch (InvocationTargetException e) { return AsyncRpcResult.newDefaultAsyncResult(null , e.getTargetException(), invocation); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } } private CompletableFuture<Object> wrapWithFuture (Object value) { if (RpcContext.getContext().isAsyncStarted()) { return ((AsyncContextImpl)(RpcContext.getContext().getAsyncContext())).getInternalFuture(); } else if (value instanceof CompletableFuture) { return (CompletableFuture<Object>) value; } return CompletableFuture.completedFuture(value); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void handleRequest (final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); Object msg = req.getData(); try { CompletionStage<Object> future = handler.reply(channel, msg); future.whenComplete((appResult, t) -> { try { if (t == null ) { res.setStatus(Response.OK); res.setResult(appResult); } else { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } channel.send(res); } catch (RemotingException e) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e); } }); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void send (Object message, boolean sent) throws RemotingException { if (message instanceof Request || message instanceof Response || message instanceof String) { channel.send(message, sent); } else { Request request = new Request(); request.setVersion(Version.getProtocolVersion()); request.setTwoWay(false ); request.setData(message); channel.send(request, sent); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void send (Object message, boolean sent) throws RemotingException { boolean success = true ; int timeout = 0 ; try { ChannelFuture future = channel.writeAndFlush(message); if (sent) { timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.cause(); if (cause != null ) { throw cause; } } catch (Throwable e) { removeChannelIfDisconnected(channel); throw new RemotingException(this , "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } }
1 2 3 4 5 6 7 8 9 private class InternalEncoder extends MessageToByteEncoder { @Override protected void encode (ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { org.apache.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out); Channel ch = ctx.channel(); NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); codec.encode(channel, buffer, msg); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 protected void encodeResponse (Channel channel, ChannelBuffer buffer, Response res) throws IOException { int savedWriteIndex = buffer.writerIndex(); try { Serialization serialization = getSerialization(channel); byte [] header = new byte [HEADER_LENGTH]; Bytes.short2bytes(MAGIC, header); header[2 ] = serialization.getContentTypeId(); if (res.isHeartbeat()) { header[2 ] |= FLAG_EVENT; } byte status = res.getStatus(); header[3 ] = status; Bytes.long2bytes(res.getId(), header, 4 ); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (status == Response.OK) { if (res.isHeartbeat()) { encodeEventData(channel, out, res.getResult()); } else { encodeResponseData(channel, out, res.getResult(), res.getVersion()); } } else { out.writeUTF(res.getErrorMessage()); } out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); int len = bos.writtenBytes(); checkPayload(channel, len); Bytes.int2bytes(len, header, 12 ); buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); } catch (Throwable t) { } }
总结 本篇文章从消费者调用到服务端响应整个过程走了一遍,整个过程相对比较复杂。其中涉及到了路由,集群容错,负载均衡,过滤器责任链,监听器,异步转同步,biz线程和io线程的互转,请求的发送和响应结果匹配,协议的编码解码,对象的序列化和反序列化等等。 本文目的是对整个调用过程的熟悉,涉及多个重要点只是简单提下,在今后的文章中会进行补充。