Dubbo的服务导出
简介
Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会立即执行服务导出逻辑。也可以通过api直接执行export进行导出。整个逻辑大致可分为三个部分,第一部分是前置工作,主要用于检查参数,组装 URL。第二部分是导出服务,包含导出服务到本地 (JVM),和导出服务到远程两个过程。第三部分是向注册中心注册服务,用于服务发现。
为了方便研究源码,我们通过api直接导出服务。
源码分析
示例代码
api方式导出源码示例:
1 | public static void main(String[] args) throws Exception { |
dubbo源码
发布服务前准备
导出服务分为延迟导出和立即导出,最后发布导出事件
1 | public synchronized void export() { |
1 | protected synchronized void doExport() { |
收集配置信息组装URL
下面的方法比较长,主要是收集配置信息,组装URL
1 | private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { |
服务的导出
服务的导出核心就是Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
,其中PROTOCOL是通过扩展加载器获取的private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
,要分析导出逻辑就要先分析PROTOCOL的真实身份。
下面是获取到PROTOCOL对象的class源码,从源码可以看出,最终会通过wrapperInvoker的Ulr中的protocol参数来决定加载哪个协议。
1 | public class Protocol$Adaptive |
dubbo支持多个协议,详细见Protocol的SPI配置:
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
hessian=org.apache.dubbo.rpc.protocol.hessian.HessianProtocol
http=org.apache.dubbo.rpc.protocol.http.HttpProtocol
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
registry=org.apache.dubbo.registry.integration.RegistryProtocol
service-discovery-registry=org.apache.dubbo.registry.client.ServiceDiscoveryRegistryProtocol
…
由于url = registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&export=dubbo://xxxxxx,因此会获取名字为registry的扩展点RegistryProtocol,
其中Protocol有两个Wrapper(ProtocolFilterWrapper,ProtocolListenerWrapper),那么通过自适应加载器获取到的PROTOCOL是排序最靠前的ProtocolFilterWrapper对象,执行export顺序如下:
ProtocolFilterWrapper.export() 》ProtocolListenerWrapper.export() 》 RegistryProtocol.export()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40// ProtocolFilterWrapper
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (UrlUtils.isRegistry(invoker.getUrl())) {// 如果是registry继续往下传递
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
// ProtocolListenerWrapper
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (UrlUtils.isRegistry(invoker.getUrl())) {// 如果是registry继续往下传递
return protocol.export(invoker);
}
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
}
// RegistryProtocol
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
URL providerUrl = getProviderUrl(originInvoker);
//export invoker 真正开始导出服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
// 创建和链接注册中心
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
// 通知协议导出后监听器
notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}导出服务
1
2
3
4
5
6
7
8private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
// 此时的invoker的url协议为dubbo,那么protocol的实例对象就是对应者DubboProtocol
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}导出服务时候和上面导出协议类似,第一个protocol实例就是ProtocolFilterWrapper,执行顺序为:
ProtocolFilterWrapper.export() 》ProtocolListenerWrapper.export() 》 DubboProtocol.export()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72// ProtocolFilterWrapper
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
// 非注册协议,建立过滤器调用链
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
// ProtocolListenerWrapper
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
// 非注册协议,调用导出监听器exported方法(此处感觉有点早)
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
}
// DubboProtocol 执行真正的导出
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);// org.apache.dubbo.demo.DemoService:20880
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);// 缓存
// 开启服务并导出
openServer(url);
optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
// find server.
String key = url.getAddress();// 192.168.32.216:20880
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
// 创建服务
serverMap.put(key, createServer(url));
}
}
}
}
}
private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url)
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);// 默认netty
// 判断协议是否存在
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
// 调用交换机发起绑定服务
// requestHandler 就是当服务被调用时候的处理逻辑
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 返回服务
return new DubboProtocolServer(server);
}过滤器调用链的创建细节:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 根据url和key及分组获取所有过滤器
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
// 将过滤器封装为Invoker,并形成一个调用链
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
try {
Filter.Listener listener = listenableFilter.listener(invocation);
if (listener != null) {
listener.onError(e, invoker, invocation);
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
listener.onError(e, invoker, invocation);
}
throw e;
} finally {
}
}
};
}
}
return last;
}每个调用链的大致源码如下,通过next链接下一个,调用者调用invoker传入下一个调用者next:
1
2
3
4
5
6
7
8
9
10
11
12
13
14// Invoker在FilterChainMaker中的匿名类
public class FilterChainMaker_1 implements Invoker {
final InvokerFilter filter;
final Invoker next;
FilterChainMaker_1(InvokerFilter invokerFilter, Invoker invoker) {
this.filter = invokerFilter;
this.next = invoker;
}
public String invoke(String doing) {
return this.filter.invoke(this.next, doing);
}
}绑定服务
打开和创建一个服务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35private void openServer(URL url) {
String key = url.getAddress();// org.apache.dubbo.demo.DemoService:20880
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
// 防止重复绑定
server = serverMap.get(key);
if (server == null) {
// 创建一个服务
serverMap.put(key, createServer(url));
}
}
} else {
server.reset(url);
}
}
}
private ProtocolServer createServer(URL url) {
// 完善url,设置必要参数
url = URLBuilder.from(url)
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);// 默认netty
ExchangeServer server;
try {
// 绑定服务
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {}
return new DubboProtocolServer(server);
}Exchangers.bind(url, requestHandler)
,将服务绑定到某个网络服务上。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29// Exchangers
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
public static Exchanger getExchanger(URL url) {
// 交换器是header(头部交换器,协议在头部),返回HeaderExchanger
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
//HeaderExchanger.bind();
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 创建头部交换处理器
HeaderExchangeHandler headerExchangeHandler = new HeaderExchangeHandler(handler);
// 解码处理器
DecodeHandler decodeHandler = new DecodeHandler(headerExchangeHandler);
// 解码器绑定到网络传输器上
RemotingServer remotingServer = Transporters.bind(url, decodeHandler);
// 创建交换服务,并将远程服务注入
HeaderExchangeServer headerExchangeServer = new HeaderExchangeServer(remotingServer);
return headerExchangeServer;
}Transporters.bind(url, decodeHandler)
,绑定地址和解码器到某个网络服务上。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
// 获取网络传输器,绑定地址和解码器
return getTransporter().bind(url, handler);
}
// 获取网络传输器自动适应扩展,根据url上的Server_key=netty可知为:NettyServer
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
// NettyServer
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
return new NettyServer(url, handler);
}
// NettyServer.super
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
// 开启网络服务
doOpen();
} catch (Throwable t) {
}
executor = executorRepository.createExecutorIfAbsent(url);
}此处的逻辑就是把服务的处理器绑定到某个网络端口上:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("backlog", getUrl().getPositiveParameter(BACKLOG_KEY, Constants.DEFAULT_BACKLOG));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
// 设置通道的rpc解码,编码,和处理器
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// netty服务绑定到某个端口上
channel = bootstrap.bind(getBindAddress());
}到此,服务已经绑定到某个网络传输服务的端口上。下面将介绍服务是如何注册到某个注册中心的。
服务的注册
在上面服务导出后,需要注册到注册中心,注册方式如下,我们以zookeeper为例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28// RegistryProtocol
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//...
//export invoker 真正开始导出服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
// 根据spi创建和链接注册中心获取的是ListenerRegistryWrapper
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
// 通知协议导出后监听器
notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
private void register(URL registryUrl, URL registeredProviderUrl) {
// 此处根据spi获取的是ListenerRegistryWrapper
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}ListenerRegistryWrapper的register为的是注册后通知注册监听器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void register(URL url) {
try {
// 调用ZookeeperRegistry.的父类的register方法
registry.register(url);
} finally {
// 通知注册监听器
if (CollectionUtils.isNotEmpty(listeners)) {
RuntimeException exception = null;
for (RegistryServiceListener listener : listeners) {
if (listener != null) {
try {
listener.onRegister(url);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}调用ZookeeperRegistry的父类FailbackRegistry的register方法,最终调用到自身的doRegister方法,通过zk客户端向zookeeper中创建持久节点(dubbo/xx.service.xxx/provider/)和临时节点(rpc请求协议地址dubbo://xxx.xxx)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35// ZookeeperRegistry
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
// ZookeeperClient
// 创建zk节点,注册服务
public void create(String path, boolean ephemeral) {
if (!ephemeral) {
if(persistentExistNodePath.contains(path)){
return;
}
if (checkExists(path)) {
persistentExistNodePath.add(path);
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
// 递归创建上级持久路径
create(path.substring(0, i), false);
}
if (ephemeral) {
// 创建临时协议
createEphemeral(path);
} else {
createPersistent(path);
persistentExistNodePath.add(path);
}
}服务注册后逻辑
通知注册协议监听器:
1
2
3
4
5
6
7
8
9
10// RegistryProtocol
private <T> void notifyExport(ExporterChangeableWrapper<T> exporter) {
List<RegistryProtocolListener> listeners = ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class)
.getActivateExtension(exporter.getOriginInvoker().getUrl(), "registry.protocol.listener");
if (CollectionUtils.isNotEmpty(listeners)) {
for (RegistryProtocolListener listener : listeners) {
listener.onExport(this, exporter);
}
}
}发布服务已导出事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27// ServiceConfig
public void exported() {
// dispatch a ServiceConfigExportedEvent since 2.7.4
dispatch(new ServiceConfigExportedEvent(this));
}
private void dispatch(Event event) {
EventDispatcher.getDefaultExtension().dispatch(event);
}
// AbstractEventDispatcher
public void dispatch(Event event) {
Executor executor = getExecutor();
// execute in sequential or parallel execution model
executor.execute(() -> {
sortedListeners(entry -> entry.getKey().isAssignableFrom(event.getClass()))
.forEach(listener -> {
if (listener instanceof ConditionalEventListener) {
ConditionalEventListener predicateEventListener = (ConditionalEventListener) listener;
if (!predicateEventListener.accept(event)) { // No accept
return;
}
}
// Handle the event
listener.onEvent(event);
});
});
}到这里整个服务的导出和注册就结束了。
大致的流程总结
- 获取和组装配置参数,并将参数组装为要导出的服务URL和协议URL
- 根据注册中心的Url的协议register://,获取RegistryProtocol扩展点,连接注册中心
- 再次根据服务的Url的协议dubbo://,获取DubboProtocol扩展点,然后将服务绑定到netty服务上完成服务导出
- 根据注册中心的客户端在zookeeper中创建服务节点,完成服务注册
参考文献
http://dubbo.apache.org/zh/docs/v2.7/dev/source/export-service/