IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 移动开发 -> akka - Cluster sharding使用shardRegionProxy报错Trying to register to coordinator -> 正文阅读

[移动开发]akka - Cluster sharding使用shardRegionProxy报错Trying to register to coordinator

akka - Cluster sharding使用shardRegionProxy报错Trying to register to coordinator

1.1、完整报错

代理节点处报错:

[WARN] [06/23/2022 11:59:55.449] [shardingSystem-akka.actor.internal-dispatcher-29] [akka://shardingSystem@127.0.0.1:2553/system/sharding111111/CounterProxy] Counter: Trying to register to coordinator at [ActorSelection[Anchor(akka://shardingSystem@127.0.0.1:2551/), Path(/system/sharding111111/CounterCoordinator/singleton/coordinator)]], but no acknowledgement. Total [1] buffered messages. [Coordinator [Member(akka://shardingSystem@127.0.0.1:2551, Up)] is reachable.]

想直接看结果,直接到本章结论处查看

1.2、场景再现

集群和节点关系:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RTkVor4Y-1655962807272)(Timage/%E5%B7%A5%E4%BD%9C%E4%B8%AD%E7%9A%84FAQ_image/image-20220623133935349.png)]

1.2.1、两个shardRegion的节点配置

文件名:testApplication.conf

akka {
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
  actor {
    provider = "cluster"
  }
  remote.artery {
    canonical {
      hostname = "127.0.0.1"
      port = 2551
    }
  }

    cluster {
      seed-nodes = [
        "akka://shardingSystem@127.0.0.1:2551",
        "akka://shardingSystem@127.0.0.1:2552"
      ]

      downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
    }
}

akka.actor.allow-java-serialization = on

akka.cluster.sharding {
# 该配置会影响到具体的路径 akka://shardingSystem/system/sharding111111/Counter/121/111#-247227349
  guardian-name = sharding111111

  # ......

}

1.2.2、两个shardRegion节点的worderActor和启动代码

创建node1和node2节点,端口号分别为2551、2552

package com.test

import akka.actor.AbstractActor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.client.ClusterClientReceptionist
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.ShardRegion
import com.typesafe.config.ConfigFactory
import java.time.Duration

class Counter: AbstractActor(), java.io.Serializable {

    class Get(var counterId: Long): java.io.Serializable

    override fun createReceive(): Receive {
        return receiveBuilder()
            .match(Get::class.java) {
                println("能够正常走到这里~~~~~~~")
            }
            .build();
    }
}

// 分片规则,这里为了复现错误,就写的简单点
class MyMessageExtractor: ShardRegion.MessageExtractor {

    // 提取实体ID,实体对应的actor
    override fun entityId(message: Any?): String? {
        return "2222"
    }

    // 对消息可进行拆封操作
    override fun entityMessage(message: Any?): Any? {
        return message
    }

    // 根据实体ID,计算出对应分片ID(注意这里是根据实体Id)
    override fun shardId(message: Any?): String? {
        return "9999"
    }

}

fun main() {
    val config =
        // 另一个端口改一下端口号
        ConfigFactory.parseString("akka.remote.artery.canonical.port=2551").withFallback(ConfigFactory.load("testApplication.conf"))
    var system = ActorSystem.create("shardingSystem", config)

    // 创建ClusterShading配置
    val settings = ClusterShardingSettings.create(system)
        // 启动ShardRegion服务
    val shardingRegion = ClusterSharding.get(system).start(
        // 分区的名称
        "Counter",
        // 指定该分区下管理的实体类型
        Props.create(Counter::class.java),
        settings,
        // 分片规则
        MyMessageExtractor())
}

1.2.3、代理端代码

节点端口号:2553

package com.test

import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import com.typesafe.config.ConfigFactory
import scala.Option
import java.util.*

fun main() {
    val config = ConfigFactory.parseString("akka.remote.artery.canonical.port=2553").withFallback(ConfigFactory.load("testApplication.conf"))
    var system = ActorSystem.create("shardingSystem", config)
    ClusterSharding.get(system).startProxy(
        "Counter",
        Optionalempty(),
        MyMessageExtractor()
    ).let {
        println("  shard proxy $it started.")
    }

    Thread.sleep(3000)
    val shardRegion = ClusterSharding.get(system).shardRegion("Counter")

    shardRegion.tell(Counter.Get(123), ActorRef.noSender())
}

1.2.4、运行、报错

  • 先启动node1、node2
  • 再启动代理节点proxy

代理节点报错:

[WARN] [06/23/2022 12:15:33.901] [shardingSystem-akka.actor.internal-dispatcher-34] [akka://shardingSystem@127.0.0.1:2553/system/sharding111111/CounterProxy] Counter: Trying to register to coordinator at [ActorSelection[Anchor(akka://shardingSystem@127.0.0.1:2551/), Path(/system/sharding111111/CounterCoordinator/singleton/coordinator)]], but no acknowledgement. Total [1] buffered messages. [Coordinator [Member(akka://shardingSystem@127.0.0.1:2551, Up)] is reachable.]

1.3、分析

  • 我们知道分片集群是这样的结构:

    蓝框:集群节点

    红点:每个集群节点的ShardRegion

    黄点:整个ShardRegion集群的单例coordinator协调器(通过它找到和分配实体)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yfF1gBel-1655962807272)(Timage/%E5%B7%A5%E4%BD%9C%E4%B8%AD%E7%9A%84FAQ_image/image-20220623123101639.png)]

  • 由报错来看,是代理节点无法注册到ShardRegion中的coordinator协调器,导致消息无法通过coordinator协调器分片到对应的实体

这里困扰我很久了,最后询问leader后,在这个网址找到解决方案:

Proxy try to register to itselves, so ClusterSharding does not work - Akka / Akka Cluster - Discussion Forum for Akka Serverless and Akka Platform technologies (lightbend.com)

其中有一句话:

Note that if you want to only run actual shards on a subset of the nodes you will need to use roles for those nodes and tell sharding that it should be limited to those nodes. If not using roles sharding will have to be started on all nodes to work.

Look for “roles” in this docs page for details https://doc.akka.io/docs/akka/current/cluster-sharding.html 12

翻译后得到两点(只有满足这两点的其中一点才可以通过代理访问到):

  • 如果只想在节点的子集上运行实际的分片,则需要为这些节点使用角色,并告诉分片应该限制在这些节点上。
  • 如果不使用角色,则必须在所有节点上启动分片才能工作

所以代码中的proxy节点虽然开启了代理服务,但是该节点既没有启动分片,也没有指定角色(不满足上述条件之一),所以报错

1.4、结论

(只有满足这两点的其中一点才可以通过代理访问到):

  • 如果只想在节点的子集上运行实际的分片,则需要为这些节点使用角色,并告诉分片应该限制在这些节点上。
  • 如果不使用角色,则必须在所有节点上启动分片才能工作

修改后的节点关系:
在这里插入图片描述

1.4.1、增加角色配置

akka {
	cluster {
	      roles = ["counterRole"]
          sharding {
            role = "counterRole"
          }
	}
}

下面是testApplication.conf的配置文件

akka {
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
  actor {
    provider = "cluster"
  }
  remote.artery {
    canonical {
      # 第二个节点改一下端口
      hostname = "127.0.0.1"
      port = 2551
    }
  }

    cluster {
      roles = ["counterRole"]
      sharding {
        role = "counterRole"
      }
      seed-nodes = [
        "akka://shardingSystem@127.0.0.1:2551",
        "akka://shardingSystem@127.0.0.1:2552"
      ]

      downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
    }
}

akka.actor.allow-java-serialization = on

akka.cluster.sharding {
# 该配置会影响到具体的路径 akka://shardingSystem/system/sharding111111/Counter/121/111#-247227349
  guardian-name = sharding111111

  # ......

}

1.4.2、增加代理类的配置文件

文件名:proxyApplication.conf

akka {
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
  actor {
    provider = "cluster"
  }
  remote.artery {
    canonical {
      hostname = "127.0.0.1"
      port = 2553
    }
  }

# 增加角色
    cluster {
      roles = ["countProxy"]
      sharding {
        role = "counterRole"
      }
      seed-nodes = [
        "akka://shardingSystem@127.0.0.1:2551",
        "akka://shardingSystem@127.0.0.1:2552"
      ]

      downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
    }
}

akka.actor.allow-java-serialization = on

akka.cluster.sharding {akka://shardingSystem/system/sharding111111/Counter/121/111#-247227349
  guardian-name = sharding111111

  # ......

}

1.4.3、在ShardRegion启动服务方法中指定角色

// 创建ClusterShading配置
val settings = ClusterShardingSettings.create(system)
    val shardingRegion = ClusterSharding.get(system).start(
        "Counter",
        Props.create(Counter::class.java),
        // 增加配置,并指定角色
        settings.withRole("counterRole"),
        MyMessageExtractor())

1.4.4、在代理节点切换配置文件并指定代理ShardRegion属于的角色

# 切换配置文件:
val config = ConfigFactory.parseString("akka.remote.artery.canonical.port=2553").withFallback(ConfigFactory.load("proxyApplication.conf"))
.....
# 开启代理服务并指定代理ShardRegion属于的角色
ClusterSharding.get(system).startProxy(
"Counter",
Optional.of("counterRole"),
MyMessageExtractor()
)

完整代码:

注意:这里并不是一个ShardRegion,如果要实现在某个非ShardRegion节点的某个子Actor中使用ShardRegion代理,必须在ShardRegion节点和代理节点和配置文件上配置角色

否则必须在所有节点上使用ShardRegion服务

package com.test

import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import com.typesafe.config.ConfigFactory
import scala.Option
import java.util.*

fun main() {
    val config = ConfigFactory.parseString("akka.remote.artery.canonical.port=2553").withFallback(ConfigFactory.load("testApplication.conf"))
    var system = ActorSystem.create("shardingSystem", config)
    ClusterSharding.get(system).startProxy(
        "Counter",
        // 指定角色,则可代理成功
        Optional.of("counterRole"),
        MyMessageExtractor()
    ).let {
        println("  shard proxy $it started.")
    }

    Thread.sleep(3000)
    val shardRegion = ClusterSharding.get(system).shardRegion("Counter")

    shardRegion.tell(Counter.Get(123), ActorRef.noSender())
}
  移动开发 最新文章
Vue3装载axios和element-ui
android adb cmd
【xcode】Xcode常用快捷键与技巧
Android开发中的线程池使用
Java 和 Android 的 Base64
Android 测试文字编码格式
微信小程序支付
安卓权限记录
知乎之自动养号
【Android Jetpack】DataStore
上一篇文章      下一篇文章      查看所有文章
加:2022-06-29 19:12:58  更:2022-06-29 19:15:57 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/25 5:49:48-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码