文章系列
【一、dubbo源码解析之框架粗谈】
【二、dubbo源码解析之dubbo配置解析】
【三、dubbo源码解析之服务发布与注册】
【四、dubbo源码解析之服务发现】
【五、dubbo源码解析之服务调用(通信)流程】
【六、dubbo获取服务提供者IP列表】
Spring 启动过程中,会扫描所有包目录 resources/META-INF/ 的 spring.handlers 文件,将其对应的 DubboNamespaceHandler 装载到 Spring IoC 容器中,并调用调用其中的 init() 方法,通过注册一个BeanDefinitionParser 解析器,完成Bean对象的注册,如下:
spring.handlers
http\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
http\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
DubboNamespaceHandler
public class DubboNamespaceHandler extends NamespaceHandlerSupport implements ConfigurableSourceBeanMetadataElement {
@Override
public void init() {
// 注册 Bean 定义解析器
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
registerBeanDefinitionParser("ssl", new DubboBeanDefinitionParser(SslConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, true));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
}
dubbo 会向 Spring IoC 容器中,注入以上 Bean 对象,关于上面 Bean 对象的各种作用,请参照【Dubbo配置及属性详解】,这里不作过多阐述。
本系列文章参照 dubbo-2.7.17 进行源码分析。

其中 registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)) 代码,会向Spring IoC 容器注入一个 ServiceBean 对象,该对象用于服务提供者发布服务。
ServiceBean 类结构图如下:

看到这里,熟悉 Spring 的小伙伴都知道,这里应用了 Spring 很多的扩展接口,并且继承了一个 ServiceConfig 对象(注意:重点,后续服务发布会用到)。
void afterPropertiesSet() 方法,完成一些初始化动作void destroy() 方法释放资源进过对 ServiceBean 的代码分析,你会发型,并没有什么实质性东西,没有找到服务发布相关的内容,那 dubbo 是如何进行服务发布的呢?
上面分析到,dubbo 在启动中,会将向 Spring IoC 容器中注入很多 BeanDefinition 对象,而 Spring IoC在完成所有 Bean 初始化之后,会调用 finishRefresh() 方法,该方法会发布一个 ContextRefreshedEvent 事件,进而所有监听了 ContextRefreshedEvent 事件的 Listener 都会执行其 onApplicationEvent() 方法。
@Override
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
// 省略其他代码......
// 初始化容器的生命周期事件处理器,并发布容器的生命周期事件
finishRefresh();
// 省略其他代码......
}
}
// 初始化容器的生命周期事件处理器,并发布容器的生命周期事件
protected void finishRefresh() {
// Clear context-level resource caches (such as ASM metadata from scanning).
clearResourceCaches();
// Initialize lifecycle processor for this context.
initLifecycleProcessor();
// Propagate refresh to lifecycle processor first.
getLifecycleProcessor().onRefresh();
// Publish the final event.
// 发布 ContextRefreshedEvent 事件
publishEvent(new ContextRefreshedEvent(this));
// Participate in LiveBeansView MBean, if active.
LiveBeansView.registerApplicationContext(this);
}
在回到 dubbo 中,辣么,dubbo 是否也利用了 Spring 这一拓展呢?
不妨我们在 dubbo 源码中全局搜索一下 ContextRefreshedEvent,果不其然,在很多地方,都监听了 ContextRefreshedEvent 事件,通过对其分析,其中 DubboBootstrapApplicationListener 中会启动一个 dubbo 的 Bootstrap。
public class DubboBootstrapApplicationListener extends OnceApplicationContextEventListener implements Ordered {
@Override
public void onApplicationContextEvent(ApplicationContextEvent event) {
if (DubboBootstrapStartStopListenerSpringAdapter.applicationContext == null) {
DubboBootstrapStartStopListenerSpringAdapter.applicationContext = event.getApplicationContext();
}
if (event instanceof ContextRefreshedEvent) {
// 刷新上下文
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
// 启动dubbo Bootstrap
dubboBootstrap.start();
}
}
public class DubboBootstrap {
/**
* Start the bootstrap
*/
public DubboBootstrap start() {
if (started.compareAndSet(false, true)) {
// 省略其他代码......
// 1. export Dubbo Services
// 发布 dubbo 服务
exportServices();
// 省略其他代码......
}
return this;
}
// 发布 dubbo 服务
private void exportServices() {
// configManager = (ConfigManager)ExtensionLoader.getExtensionLoader(FrameworkExt.class).getExtension("config");
// dubbo SPI 机制
// configManager.getServices(),获取所有需要发布的 ServiceConfig
configManager.getServices().forEach(sc -> {
// TODO, compatible with ServiceConfig.export()
ServiceConfig serviceConfig = (ServiceConfig) sc;
serviceConfig.setBootstrap(this);
if (exportAsync) { // 异步
ExecutorService executor = executorRepository.getServiceExporterExecutor();
Future<?> future = executor.submit(() -> {
try {
// 发布服务
exportService(serviceConfig);
} catch (Throwable t) {
logger.error("export async catch error : " + t.getMessage(), t);
}
});
asyncExportingFutures.add(future);
} else { // 同步
// 发布服务
exportService(serviceConfig);
}
});
}
// 发布服务
private void exportService(ServiceConfig sc) {
if (exportedServices.containsKey(sc.getServiceName())) {
throw new IllegalStateException("There are multiple ServiceBean instances with the same service name: [" +
sc.getServiceName() + "], instances: [" +
exportedServices.get(sc.getServiceName()).toString() + ", " +
sc.toString() + "]. Only one service can be exported for the same triple (group, interface, version), " +
"please modify the group or version if you really need to export multiple services of the same interface.");
}
// 发布服务
sc.export();
// 缓存
exportedServices.put(sc.getServiceName(), sc);
}
}
dubbo 通过监听 Spring IoC 容器 ContextRefreshedEvent 事件,启动一个 DubboBootstrap,进而获取并遍历所有需要发布的 ServiceConfig 对象,调用其 org.apache.dubbo.config.ServiceConfig#export() 方法完成服务发布的整体流程。
下面我们看一下 dubbo 官网提供的发布服务时序图。

dubbo 通过 ServiceConfig 中的 export() 方法,完成服务的发布。
public class ServiceConfig<T> extends ServiceConfigBase<T> {
@Override
public synchronized void export() {
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
// compatible with api call.
if (null != this.getRegistry()) {
bootstrap.registries(this.getRegistries());
}
bootstrap.initialize();
}
checkAndUpdateSubConfigs();
// 初始化元数据
initServiceMetadata(provider);
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setTarget(getRef());
serviceMetadata.generateServiceKey();
if (!shouldExport()) {
return;
}
// 是否延迟发布
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(() -> {
try {
// 服务发布
this.doExport();
} catch (Exception e) {
logger.error("delay export server occur exception, please check it.", e);
}
}, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 服务发布
doExport();
}
exported();
}
// 服务发布
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {
return;
}
// 保证只发布一次
exported = true;
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
// 执行服务发布逻辑
doExportUrls();
bootstrap.setReady(true);
}
}
在 serviceConfig.doExportUrls() 中主要做了两件事:
public class ServiceConfig<T> extends ServiceConfigBase<T> {
protected List<ProtocolConfig> protocols;
// 服务发布
@SuppressWarnings({"unchecked", "rawtypes"})
private void doExportUrls() {
// 获取一个服务管理产库
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);
// 获取所有注册中心地址:dubbo支持多注册中心
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
int protocolConfigNum = protocols.size();
// 遍历所有协议,一一进行注册
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// In case user specified path, register service one more time to map it to path.
repository.registerService(pathKey, interfaceClass);
// 使用协议进行服务发布
doExportUrlsFor1Protocol(protocolConfig, registryURLs, protocolConfigNum);
}
}
}
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo&dubbo=2.0.2&id=org.apache.dubbo.config.RegistryConfig#0&pid=17620&qos.enable=false®istry=zookeeper&release=2.7.14&timeout=300000×tamp=1661440104032
使用对应协议,进行服务发布。
以常用的 dubbo 协议为例,示例如下:
dubbo://127.0.0.1:20880/com.example.demo.provider.DemoProvider
?anyhost=true
&application=dubbo-demo // 应用名称
&bind.ip=127.0.0.1 // 服务提供者id
&bind.port=20880 // 服务提供者port
&deprecated=false
&dubbo=2.0.2
&dynamic=true
&generic=false
&interface=com.example.demo.provider.DemoProvider // 接口
&metadata-type=remote // 远程
&methods=method1,method2 // 接口方法
&pid=17620
&qos.enable=false
&release=2.7.14
&service.name=ServiceBean:/com.example.demo.provider.DemoProvider
&side=provider
×tamp=1661440543854
核心逻辑如下:
public class ServiceConfig<T> extends ServiceConfigBase<T> {
// 使用对应协议,进行服务发布
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs, int protocolConfigNum) {
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
// 默认使用dubbo协议
name = DUBBO;
}
/****************step1:URL参数组装 start******************/
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);
// 添加一些运行时参数
ServiceConfig.appendRuntimeParameters(map);
AbstractConfig.appendParameters(map, getMetrics());
AbstractConfig.appendParameters(map, getApplication());
AbstractConfig.appendParameters(map, getModule());
// remove 'default.' prefix for configs from ProviderConfig
// appendParameters(map, provider, Constants.DEFAULT_KEY);
AbstractConfig.appendParameters(map, provider);
AbstractConfig.appendParameters(map, protocolConfig);
AbstractConfig.appendParameters(map, this);
MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
if (metadataReportConfig != null && metadataReportConfig.isValid()) {
map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
}
if (CollectionUtils.isNotEmpty(getMethods())) {
for (MethodConfig method : getMethods()) {
AbstractConfig.appendParameters(map, method, method.getName());
String retryKey = method.getName() + RETRY_SUFFIX;
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if (FALSE_VALUE.equals(retryValue)) {
map.put(method.getName() + RETRIES_SUFFIX, ZERO_VALUE);
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (CollectionUtils.isNotEmpty(arguments)) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
AbstractConfig
.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException(
"Argument config error : the index attribute and type attribute not match :index :" +
argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException(
"Argument config error : the index attribute and type attribute not match :index :" +
argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException(
"Argument config must set index or type attribute.eg: or ");
}
}
}
} // end of methods for
}
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
/**
* Here the token value configured by the provider is used to assign the value to ServiceConfig#token
*/
if (ConfigUtils.isEmpty(token) && provider != null) {
token = provider.getToken();
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}
//init serviceMetadata attachments
serviceMetadata.getAttachments().putAll(map);
/****************step1:URL参数组装 end******************/
// export service
// 获取服务提供者host
String host = findConfigedHosts(protocolConfig, registryURLs, map);
// 获取服务提供者协议port
Integer port = findConfigedPorts(protocolConfig, name, map, protocolConfigNum);
// step2:组装URL:结果类似上面提供的示例
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
// You can customize Configurator to append extra parameters
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
// 获取url中scope参数
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) { // 如果scope=none,不进行服务发布
// export to local if the config is not remote (export to remote only when config is remote)
// 如果配置不是远程的,则发布到本地(仅当配置为远程时才导出到远程)
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
// 发布到本地
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
// 如果配置不是本地的,则发布到远程(仅当配置为本地时才导出到本地)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
// 发布到远程
if (CollectionUtils.isNotEmpty(registryURLs)) {
// step3:遍历所有注册中心地址,进行服务发布
for (URL registryURL : registryURLs) {
if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true");
}
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " +
registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// SPI机制:通过代理工厂(默认 javassist)生成一个代理类
// 注意,这个传输的 url 为 registryURL
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// 包装器模式,本质还是Invoker
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// SPI机制:协议
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
MetadataUtils.publishServiceDefinition(url);
}
}
this.urls.add(url);
}
// SPI机制:代理工厂JavassistProxyFactory、JdkProxyFactory、StubProxyFactoryWrapper,默认 javassist
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
// SPI机制:协议
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
}
在 serviceConfig.doExportUrlsFor1Protocol() 方法中,涉及了两个 dubbo SPI 扩展:
对应SPI文件为 org.apache.dubbo.rpc.ProxyFactory,内容如下:
stub=org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
jdk=org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory
javassist=org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory
接口:
@SPI("javassist")
public interface ProxyFactory {
@Adaptive("proxy")
<T> T getProxy(Invoker<T> invoker) throws RpcException;
@Adaptive("proxy")
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
@Adaptive("proxy")
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
默认为 javassist=org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory 代理工厂。
ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
基于 dubbo SPI 的理解,在上面代码,会形成一个调用链路 StubProxyFactoryWrapper -> JavassistProxyFactory。
通过调用 proxyFactory.getInvoker(T proxy, Class 方法,生成了一个代理对象,其中对应参数为:
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService
?application=dubbo-demo // 引用名称
&dubbo=2.0.2
&id=org.apache.dubbo.config.RegistryConfig#0
&pid=10040
&qos.enable=false
®istry=zookeeper // 注册类型
&release=2.7.14
&timeout=300000
×tamp=1661590126866
对应SPI文件为 org.apache.dubbo.rpc.Protocol,内容如下:
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=org.apache.dubbo.rpc.support.MockProtocol
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
http=org.apache.dubbo.rpc.protocol.http.HttpProtocol
rmi=org.apache.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=org.apache.dubbo.rpc.protocol.hessian.HessianProtocol
webservice=org.apache.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=org.apache.dubbo.rpc.protocol.thrift.ThriftProtocol
native-thrift=org.apache.dubbo.rpc.protocol.nativethrift.ThriftProtocol
memcached=org.apache.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=org.apache.dubbo.rpc.protocol.redis.RedisProtocol
rest=org.apache.dubbo.rpc.protocol.rest.RestProtocol
xmlrpc=org.apache.dubbo.xml.rpc.protocol.xmlrpc.XmlRpcProtocol
grpc=org.apache.dubbo.rpc.protocol.grpc.GrpcProtocol
registry=org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol
service-discovery-registry=org.apache.dubbo.registry.integration.RegistryProtocol
qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper
接口:
@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
default List<ProtocolServer> getServers() {
return Collections.emptyList();
}
}
默认为 dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol 协议。
Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
基于 dubbo SPI 的理解,在上面代码,会形成一个调用链路 QosProtocolWrapper -> ProtocolFilterWrapper -> ProtocolListenerWrapper -> XXXXXXXProtocol。
通过调用 protocol.export(Invoker 方法,进行服务的发布。
注意:在 Exporter> exporter = PROTOCOL.export(wrapperInvoker); 这个段代码中,invoke 中 URL getUrl(); 方法,返回的是 registryURL,所以,后续代码应该进入 RegistryProtocol 的 export() 方法中。
通过 RegistryProtocol 进行服务发布与注册。
public class RegistryProtocol implements Protocol {
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// step1:获取注册中心地址:如zk地址为 zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService
URL registryUrl = getRegistryUrl(originInvoker);
// step2:获取需要发布的本地服务URL,如dubbo://127.0.0.1:20880/com.example.demo.provider.DemoProvider
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// export invoker
// step3:服务发布
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
// step4:获取注册中心
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// 判断是否延迟发布
// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// step5:服务注册
registry.register(registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
}
通过上面的分析,在 RegistryProtocol.export() 方法中,一共做了几件事
各个步骤作用如下:
// URL registryUrl = getRegistryUrl(originInvoker);
protected URL getRegistryUrl(Invoker<?> originInvoker) {
return originInvoker.getUrl();
}
其中 originInvoker 为 ServiceConfig.export() 中传输过来的值,originInvoker.getUrl() 获取的值为注册中心地址,例如以Zookeeper为例,获取地址为 :
zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?省略其他参数…
// URL providerUrl = getProviderUrl(originInvoker);
private URL getProviderUrl(final Invoker<?> originInvoker) {
// 获取注册中心URL key=export 的参数
String export = originInvoker.getUrl().getParameterAndDecoded(EXPORT_KEY);
if (export == null || export.length() == 0) {
throw new IllegalArgumentException("The registry export url is null! registry: " + originInvoker.getUrl());
}
return URL.valueOf(export);
}
其中 originInvoker 为 ServiceConfig.export() 中传输过来的值,originInvoker.getUrl() 获取的值为注册中心地址,回归一下,在 ServiceConfig.export() 中,进行 invoker 获取时,有这样一段代码:
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
所以 originInvoker.getUrl().getParameterAndDecoded(EXPORT_KEY); 获取的参数为 dubbo 需要发布的本地服务URL,例如:
dubbo://127.0.0.1:20880/com.example.demo.provider.DemoProvider?省略其他参数…
进行本地服务发布。
public class RegistryProtocol implements Protocol {
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 省略其他代码......
// export invoker
// 进行本地服务发布
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// 省略其他代码......
}
// 进行本地服务发布
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
// key 为 getProviderUrl(originInvoker) 去除参数dynamic、enabled后的字符串
String key = getCacheKey(originInvoker);
// map.computeIfAbsent(key, Function):解决RMI重复暴露端口冲突的问题,已经暴露的服务不再暴露。
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
// protocol.export(invokerDelegate) 进行服务发布
// 其中 invokerDelegate.getUrl() 返回的是 providerUrl
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
// map.computeIfAbsent(key, Function):解决RMI重复暴露端口冲突的问题,已经暴露的服务不再暴露。
private final ConcurrentMap<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<>();
}
在 RegistryProtocol.doLocalExport() 方法中,通过 protocol.export(invokerDelegate) 进行服务发布,其中,invokerDelegate.getUrl() 返回的是 providerUrl(providerUrl=dubbo://127.0.0.1:20880/com.example.demo.provider.DemoProvider?省略其他参数......,所以,基于 dubbo SPI 的理解,在上面代码,会形成一个调用链路 QosProtocolWrapper -> ProtocolFilterWrapper -> ProtocolListenerWrapper -> DubboProtocol。
进行 dubbo 服务的发布,暴露一个本地端口,用于监听并处理客户端连接请求。
public class DubboProtocol extends AbstractProtocol {
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
// key=serviceClassName + : + port,如com.example.demo.provider.DemoProvider:20880
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.addExportMap(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
}
}
// 打开服务器:暴露一个本地端口,用于监听并处理客户端连接请求。
openServer(url);
// 优先采用的序列化算法
optimizeSerialization(url);
return exporter;
}
// 打开服务器:暴露一个本地端口,用于监听并处理客户端连接请求。
private void openServer(URL url) {
// find server.
// key为 ip:port
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ProtocolServer server = serverMap.get(key);
if (server == null) { // 加锁,双重校验
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
// createServer(url) 创建一个服务端
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
// createServer(url) 创建一个服务端
private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
// url.getParameter("server", "netty"); 默认采用netty
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
// 判断str是否有效
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
// 绑定监听
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return new DubboProtocolServer(server);
}
}
Exchangers.bind(url, requestHandler) 方法存在两段逻辑:
1. 创建一个服务端,暴露端口,监听客户端连接请求
public class Exchangers {
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 绑定端口
return getExchanger(url).bind(url, handler);
}
}
public class HeaderExchanger implements Exchanger {
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// Transporters.bind:绑定端口
// DecodeHandler:指定解码器
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
最终会通过 SPI 才判断采用哪种方式进行端口bind,代码如下:
ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension().bind(url, handler)
对应SPI文件为 org.apache.dubbo.remoting.Transporter,内容如下:
netty3=org.apache.dubbo.remoting.transport.netty.NettyTransporter
netty4=org.apache.dubbo.remoting.transport.netty4.NettyTransporter
netty=org.apache.dubbo.remoting.transport.netty4.NettyTransporter
mina=org.apache.dubbo.remoting.transport.mina.MinaTransporter
grizzly=org.apache.dubbo.remoting.transport.grizzly.GrizzlyTransporter
接口:
@SPI("netty")
public interface Transporter {
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
默认为 netty=org.apache.dubbo.remoting.transport.netty4.NettyTransporter 协议。
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
return new NettyServer(url, handler);
}
}
主要是就是创建了一个 NettyServer 对象,通过该对象进行服务端口的bind,在 NettyServer.deOpen() 方法中,我们会看到 Netty 启动服务端的代码。
2. 通过 requestHandler 处理请求
在 NettyServer 的构造函数中,存在这一一段代码 ChannelHandlers.wrap(handler, url) ,通过该方法,会形成一个 handler 处理链
ChannelHandlers.wrap(handler, url) 代码逻辑如下:
public class ChannelHandlers {
private static ChannelHandlers INSTANCE = new ChannelHandlers();
protected ChannelHandlers() {
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected static ChannelHandlers getInstance() {
return INSTANCE;
}
static void setTestingChannelHandlers(ChannelHandlers instance) {
INSTANCE = instance;
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
// 链路构建
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
最终会构建成一条链路,如下:
MultiMessageHandler -> HeartbeatHandler -> ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url))
其中 handler 参数为 DubboProtocol 中的成员变量 ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
…
}
通过 Registry registry = getRegistry(originInvoker); 获取一个注册中心实例。
public class RegistryProtocol implements Protocol {
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 省略其他代码...
// url to registry
final Registry registry = getRegistry(originInvoker);
// 省略其他代码...
}
protected Registry getRegistry(final Invoker<?> originInvoker) {
// zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?省略其他参数......
URL registryUrl = getRegistryUrl(originInvoker);
return getRegistry(registryUrl);
}
protected Registry getRegistry(URL url) {
try {
// 创建一个Registry实例
return registryFactory.getRegistry(url);
} catch (Throwable t) {
LOGGER.error(t.getMessage(), t);
throw t;
}
}
最后,通过 AbstractRegistryFactory 类中的 getRegistry(URL url) 方法获取一个注册中心实例。
public abstract class AbstractRegistryFactory implements RegistryFactory {
@Override
public Registry getRegistry(URL url) {
Registry defaultNopRegistry = getDefaultNopRegistryIfDestroyed();
if (null != defaultNopRegistry) {
return defaultNopRegistry;
}
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
// key -> zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService
String key = createRegistryCacheKey(url);
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
// double check
// fix https://github.com/apache/dubbo/issues/7265.
defaultNopRegistry = getDefaultNopRegistryIfDestroyed();
if (null != defaultNopRegistry) {
return defaultNopRegistry;
}
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
// 创建一个Registry实例
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
// 缓存
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
}
通过一个模版方法设计模式,根据 URL 创建一个注册中心实例对象,

通过上面的 Registry(注册中心实例) 进行服务注册。
public class RegistryProtocol implements Protocol {
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 省略其他代码...
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// 服务注册
registry.register(registeredProviderUrl);
}
// 省略其他代码...
}
}
如果采用 Zookeeper 作为服务注册中心,则调用链路如下:
ListenerRegistryWrapper#register(URL url)
↓
FailbackRegistry#register(URL url)
↓
ZookeeperRegistry#doRegister(URL url)
public class ZookeeperRegistry extends FailbackRegistry {
@Override
public void doRegister(URL url) {
try {
// 默认创建一个临时节点
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
至此,dubbo 就完成了发布与注册。