akka - Cluster sharding使用shardRegionProxy报错Trying to register to coordinator
1.1、完整报错
代理节点处报错:
[WARN] [06/23/2022 11:59:55.449] [shardingSystem-akka.actor.internal-dispatcher-29] [akka://shardingSystem@127.0.0.1:2553/system/sharding111111/CounterProxy] Counter: Trying to register to coordinator at [ActorSelection[Anchor(akka://shardingSystem@127.0.0.1:2551/), Path(/system/sharding111111/CounterCoordinator/singleton/coordinator)]], but no acknowledgement. Total [1] buffered messages. [Coordinator [Member(akka://shardingSystem@127.0.0.1:2551, Up)] is reachable.]
想直接看结果,直接到本章结论处查看
1.2、场景再现
集群和节点关系:
1.2.1、两个shardRegion的节点配置
文件名:testApplication.conf
akka {
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
actor {
provider = "cluster"
}
remote.artery {
canonical {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka://shardingSystem@127.0.0.1:2551",
"akka://shardingSystem@127.0.0.1:2552"
]
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}
akka.actor.allow-java-serialization = on
akka.cluster.sharding {
# 该配置会影响到具体的路径 akka://shardingSystem/system/sharding111111/Counter/121/111#-247227349
guardian-name = sharding111111
# ......
}
1.2.2、两个shardRegion节点的worderActor和启动代码
创建node1和node2节点,端口号分别为2551、2552
package com.test
import akka.actor.AbstractActor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.client.ClusterClientReceptionist
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.ShardRegion
import com.typesafe.config.ConfigFactory
import java.time.Duration
class Counter: AbstractActor(), java.io.Serializable {
class Get(var counterId: Long): java.io.Serializable
override fun createReceive(): Receive {
return receiveBuilder()
.match(Get::class.java) {
println("能够正常走到这里~~~~~~~")
}
.build();
}
}
class MyMessageExtractor: ShardRegion.MessageExtractor {
override fun entityId(message: Any?): String? {
return "2222"
}
override fun entityMessage(message: Any?): Any? {
return message
}
override fun shardId(message: Any?): String? {
return "9999"
}
}
fun main() {
val config =
ConfigFactory.parseString("akka.remote.artery.canonical.port=2551").withFallback(ConfigFactory.load("testApplication.conf"))
var system = ActorSystem.create("shardingSystem", config)
val settings = ClusterShardingSettings.create(system)
val shardingRegion = ClusterSharding.get(system).start(
"Counter",
Props.create(Counter::class.java),
settings,
MyMessageExtractor())
}
1.2.3、代理端代码
节点端口号:2553
package com.test
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import com.typesafe.config.ConfigFactory
import scala.Option
import java.util.*
fun main() {
val config = ConfigFactory.parseString("akka.remote.artery.canonical.port=2553").withFallback(ConfigFactory.load("testApplication.conf"))
var system = ActorSystem.create("shardingSystem", config)
ClusterSharding.get(system).startProxy(
"Counter",
Optional。empty(),
MyMessageExtractor()
).let {
println(" shard proxy $it started.")
}
Thread.sleep(3000)
val shardRegion = ClusterSharding.get(system).shardRegion("Counter")
shardRegion.tell(Counter.Get(123), ActorRef.noSender())
}
1.2.4、运行、报错
- 先启动node1、node2
- 再启动代理节点proxy
代理节点报错:
[WARN] [06/23/2022 12:15:33.901] [shardingSystem-akka.actor.internal-dispatcher-34] [akka://shardingSystem@127.0.0.1:2553/system/sharding111111/CounterProxy] Counter: Trying to register to coordinator at [ActorSelection[Anchor(akka://shardingSystem@127.0.0.1:2551/), Path(/system/sharding111111/CounterCoordinator/singleton/coordinator)]], but no acknowledgement. Total [1] buffered messages. [Coordinator [Member(akka://shardingSystem@127.0.0.1:2551, Up)] is reachable.]
1.3、分析
- 由报错来看,是代理节点无法注册到ShardRegion中的coordinator协调器,导致消息无法通过coordinator协调器分片到对应的实体
这里困扰我很久了,最后询问leader后,在这个网址找到解决方案:
Proxy try to register to itselves, so ClusterSharding does not work - Akka / Akka Cluster - Discussion Forum for Akka Serverless and Akka Platform technologies (lightbend.com)
其中有一句话:
Note that if you want to only run actual shards on a subset of the nodes you will need to use roles for those nodes and tell sharding that it should be limited to those nodes. If not using roles sharding will have to be started on all nodes to work.
Look for “roles” in this docs page for details https://doc.akka.io/docs/akka/current/cluster-sharding.html 12
翻译后得到两点(只有满足这两点的其中一点才可以通过代理访问到):
- 如果只想在节点的子集上运行实际的分片,则需要为这些节点使用角色,并告诉分片应该限制在这些节点上。
- 如果不使用角色,则必须在所有节点上启动分片才能工作
所以代码中的proxy节点虽然开启了代理服务,但是该节点既没有启动分片,也没有指定角色(不满足上述条件之一),所以报错
1.4、结论
(只有满足这两点的其中一点才可以通过代理访问到):
- 如果只想在节点的子集上运行实际的分片,则需要为这些节点使用角色,并告诉分片应该限制在这些节点上。
- 如果不使用角色,则必须在所有节点上启动分片才能工作
修改后的节点关系:
1.4.1、增加角色配置
akka {
cluster {
roles = ["counterRole"]
sharding {
role = "counterRole"
}
}
}
下面是testApplication.conf的配置文件
akka {
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
actor {
provider = "cluster"
}
remote.artery {
canonical {
# 第二个节点改一下端口
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
roles = ["counterRole"]
sharding {
role = "counterRole"
}
seed-nodes = [
"akka://shardingSystem@127.0.0.1:2551",
"akka://shardingSystem@127.0.0.1:2552"
]
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}
akka.actor.allow-java-serialization = on
akka.cluster.sharding {
# 该配置会影响到具体的路径 akka://shardingSystem/system/sharding111111/Counter/121/111#-247227349
guardian-name = sharding111111
# ......
}
1.4.2、增加代理类的配置文件
文件名:proxyApplication.conf
akka {
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
actor {
provider = "cluster"
}
remote.artery {
canonical {
hostname = "127.0.0.1"
port = 2553
}
}
# 增加角色
cluster {
roles = ["countProxy"]
sharding {
role = "counterRole"
}
seed-nodes = [
"akka://shardingSystem@127.0.0.1:2551",
"akka://shardingSystem@127.0.0.1:2552"
]
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}
akka.actor.allow-java-serialization = on
akka.cluster.sharding {akka://shardingSystem/system/sharding111111/Counter/121/111#-247227349
guardian-name = sharding111111
# ......
}
1.4.3、在ShardRegion启动服务方法中指定角色
// 创建ClusterShading配置
val settings = ClusterShardingSettings.create(system)
val shardingRegion = ClusterSharding.get(system).start(
"Counter",
Props.create(Counter::class.java),
settings.withRole("counterRole"),
MyMessageExtractor())
1.4.4、在代理节点切换配置文件并指定代理ShardRegion属于的角色
# 切换配置文件:
val config = ConfigFactory.parseString("akka.remote.artery.canonical.port=2553").withFallback(ConfigFactory.load("proxyApplication.conf"))
.....
# 开启代理服务并指定代理ShardRegion属于的角色
ClusterSharding.get(system).startProxy(
"Counter",
Optional.of("counterRole"),
MyMessageExtractor()
)
完整代码:
注意:这里并不是一个ShardRegion,如果要实现在某个非ShardRegion节点的某个子Actor中使用ShardRegion代理,必须在ShardRegion节点和代理节点和配置文件上配置角色
否则必须在所有节点上使用ShardRegion服务
package com.test
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import com.typesafe.config.ConfigFactory
import scala.Option
import java.util.*
fun main() {
val config = ConfigFactory.parseString("akka.remote.artery.canonical.port=2553").withFallback(ConfigFactory.load("testApplication.conf"))
var system = ActorSystem.create("shardingSystem", config)
ClusterSharding.get(system).startProxy(
"Counter",
Optional.of("counterRole"),
MyMessageExtractor()
).let {
println(" shard proxy $it started.")
}
Thread.sleep(3000)
val shardRegion = ClusterSharding.get(system).shardRegion("Counter")
shardRegion.tell(Counter.Get(123), ActorRef.noSender())
}
|