IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 你该不会也觉得Dubbo参数回调中callbacks属性是用来限制回调次数的吧? -> 正文阅读

[网络协议]你该不会也觉得Dubbo参数回调中callbacks属性是用来限制回调次数的吧?

点击关注强哥,还有100多G的面试资料等你来拿?

哈喽,大家好,我是强哥。

前些天,一个同事在使用Dubbo的参数回调时,骂骂咧咧的说,Dubbo的这个回调真是奇葩,居然会限制回调次数,自己不得不把callbacks属性值设置的非常大,但是还是会怕服务运行太久后超过回调次数限制,后续的回调就无法正常执行。

突然被他这么一说,我倒是有点奇怪,正常来说Dubbo一个这么牛逼的框架不应该会有这样的限制才对啊。于是,强哥便开始对这个callbacks产生了兴趣,官网和百度找了一番也没有找到具体这个属性的详细解释,哈哈,这不就是又到我亲自研究的时候了吗。

Dubbo参数回调

首先讲一下什么时候Dubbo的参数回调吧,简单的说就是:通过参数回调从服务器端调用客户端逻辑。

参数回调方式与调用本地 callback 或 listener 相同,只需要在 Spring 的配置文件中声明哪个参数是 callback 类型即可。Dubbo 将基于长连接生成反向代理,这样就可以从服务器端调用客户端逻辑。

其实就是说,我们的provider在提供接口的时候,可以设置某个参数是让consumer放回调对象的。当consumer调用provider接口时,provider就可以拿到这个回调对象并进行反向调用(provider回调consumer接口)。

具体配置的代码如下:

<dubbo:service?interface="org.apache.dubbo.samples.callback.api.CallbackService"?ref="callbackService"
???????????????connections="1"?callbacks="1000">
????<dubbo:method?name="addListener">
????????<dubbo:argument?index="1"?callback="true"/>
????</dubbo:method>
</dubbo:service>

其中<dubbo:argument index="1" callback="true"/>的index就表示接口CallbackService的第一个参数是用来让consumer传递回调对象的。

callbacks属性

我们从上面的代码中可以看到,在dubbo:service的配置中,有一个属性是callbacks="1000",这个就是上面我同事所谓的“Dubbo对于回调次数的限制”,按他的说法,如果这么配,就相当于回调1000次之后,就会导致Dubbo之后的回调都失败。

这显然不能让强哥相信他理解的正确性,真要是这样,谁还会用Dubbo的回调啊。当然,事实胜于雄辩。直接搞几个测试代码试试就知道了。

测试准备

要说测试,Dubbo这点做得就挺好。在其GitHub仓库的源码目录dubbo-samples下,有参数回调对应的dubbo-samples-callback项目,我们直接就拿这个项目来做测试。

项目结构相对比较简单,具体如下:

这里强哥也要说说项目中使用了内嵌式的ZookeeperEmbeddedZookeeper,不用单独部署ZK,这点确实方便很多,尤其是写这种小示例的时候,太方便了。

不过,我们需要对Dubbo的QoS功能进行关闭,否则在同一台机子上启动多个Dubbo服务会出现端口已被占用的情况。具体需要在callback-provider.xmlcallback-consumer两个文件中添加配置:

<dubbo:application?name="callback-provider">
????<dubbo:parameter?key="qos.enable"?value="false"/>
</dubbo:application>
<dubbo:application?name="callback-consumer">
????<dubbo:parameter?key="qos.enable"?value="false"/>
</dubbo:application>

Dubbo的QoS功能是干什么的?

QoS,全称为Quality of Service, 是常见于网络设备中的一个术语 ,例如在路由器中,可以通过Qos动态的调整和控制某些端口的权重,从而优先的保障运行在这些端口上的服务质量。 在Dubbo中,QoS这个概念被用于动态的对服务进行查询和控制。例如对获取当前提供和消费的所有服务,以及对服务进行动态的上下线,即从注册中心上进行注册和反注册操作。

开启该功能需要占用端口,这里为了简化就不配置多个端口了,直接关了就行。

项目中的provider参数回调配置如下:

<dubbo:service?interface="org.apache.dubbo.samples.callback.api.CallbackService"?ref="callbackService"
???????????????connections="1"?callbacks="1">
????<dubbo:method?name="addListener">
????????<dubbo:argument?index="1"?callback="true"/>
????</dubbo:method>
</dubbo:service>

注意这里callbacks参数被我修改成了1,也就是说,如果callbacks真的是对回调次数的限制,那么,consumer只要回调一次后,下一次就会失败。

测试

改好配置后,直接开测,首先启动provider,provider在启动的时候会创建CallbackServiceImpl,其代码内容如下:

public?class?CallbackServiceImpl?implements?CallbackService?{

????private?final?Map<String,?CallbackListener>?listeners?=?new?ConcurrentHashMap<String,?CallbackListener>();

????public?CallbackServiceImpl()?{
????????Thread?t?=?new?Thread(new?Runnable()?{
????????????public?void?run()?{
????????????????while?(true)?{
????????????????????try?{
????????????????????????for?(Map.Entry<String,?CallbackListener>?entry?:?listeners.entrySet())?{
????????????????????????????try?{
????????????????????????????????entry.getValue().changed(getChanged(entry.getKey()));
????????????????????????????}?catch?(Throwable?t)?{
????????????????????????????????listeners.remove(entry.getKey());
????????????????????????????}
????????????????????????}
????????????????????????Thread.sleep(5000);?//?timely?trigger?change?event
????????????????????}?catch?(Throwable?t)?{
????????????????????????t.printStackTrace();
????????????????????}
????????????????}
????????????}
????????});
????????t.setDaemon(true);
????????t.start();
????}

????public?void?addListener(String?key,?CallbackListener?listener)?{
????????listeners.put(key,?listener);
????????listener.changed(getChanged(key));?//?send?notification?for?change
????}

????private?String?getChanged(String?key)?{
????????return?"Changed:?"?+?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss").format(new?Date());
????}

}

整体就是在创建时,启动一个线程死循环每隔5秒调用一下从consumer加入进来的回调对象的changed方法。我们不会对provider的代码进行修改,所以启动好放着就行。

重点集中在consumer,强哥这里通过几种情况来对consumer进行测试。

情况1

consumer的代码如下:

public?class?CallbackConsumerBootstrap?{

????public?static?void?main(String[]?args)?throws?Exception?{
????????ClassPathXmlApplicationContext?context?=?new?ClassPathXmlApplicationContext(new?String[]{"spring/callback-consumer.xml"});
????????context.start();
????????CallbackService?callbackService?=?(CallbackService)?context.getBean("callbackService");
????????callbackService.addListener("foo.bar1",?new?CallbackListener()?{
????????????public?void?changed(String?msg)?{
????????????????System.out.println("callback1:"?+?msg);
????????????}
????????});
????????System.in.read();
????}

}

内容就是获取到provider提供的CallbackService接口实现,然后调用它的addListener方法,addListener方法的第二个参数就是consumer设置的回调对象,示例中使用了匿名内部类new CallbackListener()

我们直接启动,控制台输出内容如下:

callback1:Changed:?2022-04-06?20:40:07
callback1:Changed:?2022-04-06?20:40:08
callback1:Changed:?2022-04-06?20:40:13
callback1:Changed:?2022-04-06?20:40:18
callback1:Changed:?2022-04-06?20:40:23
callback1:Changed:?2022-04-06?20:40:28
callback1:Changed:?2022-04-06?20:40:33
callback1:Changed:?2022-04-06?20:40:38
……

可见,在设置了callbacks="1"后,回调并没有在第一次:"2022-04-07 14:40:07"执行结束后就失败,直接就可以证明callbacks并不是用来限制回调次数的

情况2

callbacks并不是用来限制回调次数的这点证明之后,那么callbacks到底表示的是什么呢?

会不会是限制回调连接的个数呢?我们继续操作:情况1的步骤保持不变,我们简单改下CallbackConsumerBootstrapcallbackService.addListener的第一个参数keyfoo.banir2,然后再起一个连接。结果如下,同时第一个consumer也还是正常输出日志。

callback1:Changed:?2022-04-06?20:25:15
callback1:Changed:?2022-04-06?20:25:19
callback1:Changed:?2022-04-06?20:25:24
callback1:Changed:?2022-04-06?20:25:29
callback1:Changed:?2022-04-06?20:25:34
callback1:Changed:?2022-04-06?20:25:39
callback1:Changed:?2022-04-06?20:25:44
……

连接数2超过了callbacks的个数限制。可见,**callbacks也不是限制回调连接的个数**。

情况3

我们对情况1的代码进行修改,在CallbackConsumerBootstrap中多添加一个回调:

public?class?CallbackConsumerBootstrap2?{

????public?static?void?main(String[]?args)?throws?Exception?{
????????ClassPathXmlApplicationContext?context?=?new?ClassPathXmlApplicationContext(new?String[]{"spring/callback-consumer.xml"});
????????context.start();
????????CallbackService?callbackService?=?(CallbackService)?context.getBean("callbackService");
????????//第一个
????????callbackService.addListener("foo.bar1",?new?CallbackListener()?{
????????????public?void?changed(String?msg)?{
????????????????System.out.println("callback1:"?+?msg);
????????????}
????????});
????????//第二个
????????callbackService.addListener("foo.bar2",?new?CallbackListener()?{
????????????public?void?changed(String?msg)?{
????????????????System.out.println("callback2:"?+?msg);
????????????}
????????});
????????System.in.read();
????}

}

关闭旧的consumer,启动当前的CallbackConsumerBootstrap2,启动后代码就报错了:

Caused?by:?java.lang.IllegalStateException:?interface?org.apache.dubbo.samples.callback.api.CallbackListener?`s?callback?instances?num?exceed?providers?limit?:1?,current?num:?2.?The?new?callback?service?will?not?work?!!!?you?can?cancle?the?callback?service?which?exported?before.?channel?:NettyChannel?[channel=[id:?0xde91033b,?L:/10.0.227.75:63364?-?R:/10.0.227.75:20880]]
?at?com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.isInstancesOverLimit(CallbackServiceCodec.java:210)
?at?com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.exportOrunexportCallbackService(CallbackServiceCodec.java:107)
?at?com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.encodeInvocationArgument(CallbackServiceCodec.java:255)
?at?com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.encodeRequestData(DubboCodec.java:180)
?at?com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encodeRequest(ExchangeCodec.java:235)
?at?com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encode(ExchangeCodec.java:72)
?at?com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec.encode(DubboCountCodec.java:38)
?at?com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter$InternalEncoder.encode(NettyCodecAdapter.java:70)
?at?io.netty.handler.codec.MessageToByteEncoder.write(MessageToByteEncoder.java:107)
?...?18?more

?at?com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:109)
?at?com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:244)
?at?com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:75)
?at?com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52)
?at?com.alibaba.dubbo.common.bytecode.proxy0.addListener(proxy0.java)
?at?org.apache.dubbo.samples.callback.CallbackConsumerBootstrap2.main(CallbackConsumerBootstrap2.java:41)

原因是:callback instances num exceed providers limit :1 ,current num: 2.。也就是说,回调的实例个数2个超过了阈值1个的限制。

由此可见,**callbacks就是用来限制同一个客户端(连接)连接的回调实例个数限制。**

这里需要重点说一下是:

  • 同一个连接

  • 回调实例个数而不是回调个数 怎么证明是实例个数呢?我们接着看。

情况4

还是修改CallbackConsumerBootstrap代码,我们调用两次addListener,但是使用同一个回调实例对象:

public?class?CallbackConsumerBootstrap3?{

????public?static?void?main(String[]?args)?throws?Exception?{
????????ClassPathXmlApplicationContext?context?=?new?ClassPathXmlApplicationContext(new?String[]{"spring/callback-consumer.xml"});
????????context.start();
????????CallbackService?callbackService?=?(CallbackService)?context.getBean("callbackService");
????????CallbackListener?callbackListener?=?new?CallbackListener()?{
????????????public?void?changed(String?msg)?{
????????????????System.out.println("callback2:"?+?msg);
????????????}
????????};

????????callbackService.addListener("foo.bar1",?callbackListener);

????????callbackService.addListener("foo.bar2",?callbackListener);
????????System.in.read();
????}

}

运行后,输出如下:

callback2:Changed:?2022-04-06?20:39:47
callback2:Changed:?2022-04-06?20:39:47
callback2:Changed:?2022-04-06?20:39:48
callback2:Changed:?2022-04-06?20:39:48
callback2:Changed:?2022-04-06?20:39:53
callback2:Changed:?2022-04-06?20:39:53
callback2:Changed:?2022-04-06?20:39:58
callback2:Changed:?2022-04-06?20:39:58
callback2:Changed:?2022-04-06?20:40:03
callback2:Changed:?2022-04-06?20:40:03
……

代码没有报错,而且会正常回调两次。由此我们就证明了**callbacks是针对实例的。**

总结+源码分析

综上所述,我们便从4个代码示例中,层层证明,得到: callbacks属性是限制同一个连接的回调实例个数的。而并不是限制consumer的次数。

其实,强哥这次没有从源码的角度来进行分析。主要是为了通过不同的情况,来让大家更好的理解callbacks属性的含义,直接证明,比较一目了然。当然,我们在情况3报错的时候,可以通过错误栈来定位到具体的判断callbacks属性个数的源码位置:CallbackServiceCodecexportOrunexportCallbackService方法:

?if?(export)?{
??????//?one?channel?can?have?multiple?callback?instances,?no?need?to?re-export?for?different?instance.
??????if?(!channel.hasAttribute(cacheKey))?{
??????????if?(!isInstancesOverLimit(channel,?url,?clazz.getName(),?instid,?false))?{
??????????????Invoker<?>?invoker?=?proxyFactory.getInvoker(inst,?clazz,?exporturl);
??????????????//?should?destroy?resource?
??????????????Exporter<?>?exporter?=?protocol.export(invoker);
??????????????//?this?is?used?for?tracing?if?instid?has?published?service?or?not.
??????????????channel.setAttribute(cacheKey,?exporter);
??????????????logger.info("export?a?callback?service?:"?+?exporturl?+?",?on?"?+?channel?+?",?url?is:?"?+?url);
??????????????increaseInstanceCount(channel,?countkey);
??????????}
??????}
??}

其中的isInstancesOverLimit方法内容就是判断的地方:

private?static?boolean?isInstancesOverLimit(Channel?channel,?URL?url,?String?interfaceClass,?int?instid,?boolean?isServer)?{
????Integer?count?=?(Integer)?channel.getAttribute(isServer???getServerSideCountKey(channel,?interfaceClass)?:?getClientSideCountKey(interfaceClass));
????int?limit?=?url.getParameter(Constants.CALLBACK_INSTANCES_LIMIT_KEY,?Constants.DEFAULT_CALLBACK_INSTANCES);
????if?(count?!=?null?&&?count?>=?limit)?{
????????//client?side?error
????????throw?new?IllegalStateException("interface?"?+?interfaceClass?+?"?`s?callback?instances?num?exceed?providers?limit?:"?+?limit
????????????????+?"?,current?num:?"?+?(count?+?1)?+?".?The?new?callback?service?will?not?work?!!!?you?can?cancle?the?callback?service?which?exported?before.?channel?:"?+?channel);
????}?else?{
????????return?false;
????}
}

count为服务端模式的时候,是从getServerSideCountKey中获取的,也就是:

private?static?String?getServerSideCountKey(Channel?channel,?String?interfaceClass)?{
????return?Constants.CALLBACK_SERVICE_PROXY_KEY?+?"."?+?System.identityHashCode(channel)?+?"."?+?interfaceClass?+?".COUNT";
}

其中:

  • System.identityHashCode(channel)代表着一个连接

  • interfaceClass就表一个实例 这个的COUNT就是我们说的:每个客户端的一个接口的回调服务实例的个数。

再与callbacks个数比较,来判断是否超过了限制,超过则抛出情况3的异常:

if?(count?!=?null?&&?count?>=?limit)?{
????//client?side?error
????throw?new?IllegalStateException("interface?"?+?interfaceClass?+?"?`s?callback?instances?num?exceed?providers?limit?:"?+?limit
????????????+?"?,current?num:?"?+?(count?+?1)?+?".?The?new?callback?service?will?not?work?!!!?you?can?cancle?the?callback?service?which?exported?before.?channel?:"?+?channel);
}?else?{
????return?false;
}

由此,源码也证明结束。

一个小小的属性,也真是要费这么大篇幅来说明。不过强哥也是想让大家能够更好地理解。至少走这么一遍,可以让大家对这个Dubbo的回调机制有个更深入的认识。

小伙伴们如果也想自己试试的话,可以关注「强哥叨逼叨」回复「dubbo回调」获取强哥配置好的测试代码哦。

?

点击关注强哥,还有100多G的面试资料等你来拿

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-04-09 18:53:50  更:2022-04-09 18:54:28 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/31 4:12:07-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码