IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> alpakka-kafka(8)-kafka数据消费模式实现 -> 正文阅读

[大数据]alpakka-kafka(8)-kafka数据消费模式实现

? 上篇介绍了kafka at-least-once消费模式。kafka消费模式以commit-offset的时间节点代表不同的消费模式,分别是:at-least-once, at-most-once, exactly-once。上篇介绍的at-least-once消费模式是通过kafka自身的auto-commit实现的。事后想了想,这个应该算是at-most-once模式,因为消费过程不会影响auto-commit,kafka在每个设定的间隔都会自动进行offset-commit。如果这个间隔够短,比整个消费过程短,那么在完成消费过程前就已经保存了offset,所以是at-most-once模式。不过,如果确定这个间隔一定大于消费过程,那么又变成了at-least-once模式。具体能实现什么消费模式并不能明确,因为auto-commit是无法从外部进行控制的。看来实现正真意义上的at-least-once消费模式还必须取得offset-commit的控制权才行。

alpakka-kafka提供了一种CommittableSource:

  def committableSource[K, V](settings: ConsumerSettings[K, V],
                              subscription: Subscription): Source[CommittableMessage[K, V], Control] {...}

从这个CommittableSource输出的元素是CommittableMessage[K,V]:

  final case class CommittableMessage[K, V](
      record: ConsumerRecord[K, V],
      committableOffset: CommittableOffset
  )

这个CommittableMessage除原始消息之外还提供了CommittableOffset。通过Flow或Sink都可以进行offset-commit。alpakka-kafka提供了Committer,通过Committer.sink, Committer.Flow帮助实现offset-commit,Committer.flow如下:

    Consumer
      .committableSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(1) { msg =>
        updateStock.map(_ => msg.committableOffset)
      }
      .via(Committer.flow(committerDefaults.withMaxBatch(1)))
      .to(Sink.seq)
      .run()

或Committer.sink:

   Consumer
      .committableSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(1) { msg =>
        updateStock.map(_ => msg.committableOffset)
      }
      .toMat(Committer.sink(committerSettings))(Keep.left)
      .run()

下面是一个具体的at-least-once示范:

  val committerSettings = CommitterSettings(sys).withMaxBatch(commitMaxBatch)


  val stkTxns = new DocToStkTxns(trace)
  val curStk = new CurStk(trace)
  val pcmTxns = new PcmTxns(trace)


  val commitableSource = Consumer
    .committableSource(consumerSettings, subscription)

  def start =
    (1 to numReaders).toList.map { _ =>
      RestartSource
        .onFailuresWithBackoff(restartSource) { () => commitableSource }
        //      .viaMat(KillSwitches.single)(Keep.right)
        .async.mapAsync(1) { msg =>
        for {
          _ <- FastFuture.successful {
            log.step(s"AtLeastOnceReaderGroup-msg: ${msg.record}")(Messages.MachineId("", ""))
          }
          _ <- stkTxns.docToStkTxns(msg.record.value())
          pmsg <- FastFuture.successful {
            log.step(s"AtLeastOnceReaderGroup-docToStkTxns: ${msg.record}")(Messages.MachineId("", ""))
            msg
          }
        } yield pmsg
      }
        .async.mapAsync(1) { msg =>
        for {
          curstks <- curStk.updateStk(msg.record.value())
          pmsg <- FastFuture.successful {
            log.step(s"AtLeastOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", ""))
            msg
          }
        } yield pmsg
      }
        .async.mapAsync(1) { msg =>
        for {
          pcm <- pcmTxns.writePcmTxn(msg.record.value())
          pmsg <- FastFuture.successful {
            log.step(s"AtLeastOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", ""))
            msg
          }
        } yield pmsg
      }
        .async.mapAsync(1) { msg =>
        for {
          _ <- pcmTxns.updatePcm(msg.record.value())
        } yield "Completed"
        FastFuture.successful(msg.committableOffset)
      }
      .toMat(Committer.sink(committerSettings))(Keep.left)
      .run()
    }

消费过程其它部分的设计考虑和实现,如多线程、异常处理等可参考上篇讨论。

对于at-most-once消费模式的实现,alpakka-kafka提供了atMostOnceSource:

  def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
                             subscription: Subscription): Source[ConsumerRecord[K, V], Control] = {...}

下面是用这个Source实现at-most-once的示范:

  val atmostonceSource = Consumer
    .atMostOnceSource(consumerSettings, subscription)

  def start =
    (1 to numReaders).toList.map { _ =>
      RestartSource
        .onFailuresWithBackoff(restartSource) { () => atmostonceSource }
        //       .viaMat(KillSwitches.single)(Keep.right)
        .async.mapAsync(1) { msg =>
        for {
          _ <- FastFuture.successful {
            log.step(s"AtMostOnceReaderGroup-msg: $msg")(Messages.MachineId("", ""))
          }
          _ <- stkTxns.docToStkTxns(msg.value())
          pmsg <- FastFuture.successful {
            log.step(s"AtMostOnceReaderGroup-docToStkTxns: $msg")(Messages.MachineId("", ""))
            msg
          }
        } yield pmsg
      }
        .async.mapAsync(1) { msg =>
        for {
          _ <- FastFuture.successful {
            log.step(s"AtMostOnceReaderGroup-updateStk: msg: $msg")(Messages.MachineId("", ""))
          }
          curstks <- curStk.updateStk(msg.value())
          pmsg<- FastFuture.successful {
            log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", ""))
            msg
          }
        } yield pmsg
      }
        .async.mapAsync(1) { msg =>
        for {
          _ <- FastFuture.successful {
            log.step(s"AtMostOnceReaderGroup-writePcmTxn: msg: $msg")(Messages.MachineId("", ""))
          }
          pcm <- pcmTxns.writePcmTxn(msg.value())
          pmsg <- FastFuture.successful {
            log.step(s"AtMostOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", ""))
            msg
          }
        } yield pmsg
      }
        .async.mapAsync(1) { msg =>
        for {
          _ <- FastFuture.successful {
            log.step(s"AtMostOnceReaderGroup-updatePcm: msg: $msg")(Messages.MachineId("", ""))
          }
          _ <- pcmTxns.updatePcm(msg.value())
          _ <- FastFuture.successful {
            log.step(s"AtMostOnceReaderGroup-updateStk: updatePcm-$msg")(Messages.MachineId("", ""))
          }
        } yield "Completed"
      }
      .toMat(Sink.seq)(Keep.left)
      .run()
    }

由于offset-commit和消息消费是两个独立的过程,无论如何努力都无法保证只读一次,必须把这两个过程合并成一个才有可能实现。所以,exactly-once可以通过数据库系统的事务处理transaction-processing来实现,就是把offset-commit和数据更新两个动作放到同一个事务transaction里,通过事务处理的ACID原子特性保证两个动作同进同退的一致性。这也意味着这个exactly-once消费模式必须在一个提供事务处理功能的数据库系统里实现,也代表kafka-offset必须和其它交易数据一起存放在同一种数据库里。mongodb4.0以上支持事务处理,可以用来作示范。

首先,先研究一下exactly-once模式的框架:

  val mergedSource = Consumer
    .plainPartitionedManualOffsetSource(consumerSettings,subscription,
      loadOffsets)
    .flatMapMerge(maxReaders, _._2)
    .async.mapAsync(1) { msg =>
          for {
            cmt <- stkTxns.stkTxnsWithRetry(msg.value(), msg.partition(), msg.offset()).toFuture().map(_ => "Completed")
            pmsg <- FastFuture.successful {
              log.step(s"ExactlyOnceReaderGroup-stkTxnsWithRetry: committed transaction-$cmt")(Messages.MachineId("", ""))
              msg
            }
          } yield pmsg
        }
        .mapAsync(1) { msg =>
          for {
            curstks <- curStk.updateStk(msg.value())
            pmsg <- FastFuture.successful {
              log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", ""))
              msg
            }
          } yield pmsg
        }
        .toMat(Sink.seq)(Keep.left)
        .run()
    }
  }

在上面的例子里使用了plainPartitionedManualOffsetSource:

def plainPartitionedManualOffsetSource[K, V](
      settings: ConsumerSettings[K, V],
      subscription: AutoSubscription,
      getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, Long]],
      onRevoke: Set[TopicPartition] => Unit = _ => ()
  ): Source[(TopicPartition, Source[ConsumerRecord[K, V], NotUsed]), Control] = {...}

getOffsetsOnAssign提供指定partition的offset(从数据库里读出指定partition的offset值),如下:

  private def loadOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition,Long]] = {
    offsetStore.getOffsets(partitions)
  }

 def getOffsets(partitions: Set[TopicPartition])(
    implicit ec: ExecutionContext) = {
    log.step(s"OffsetStore-getOffsets: ($partitions)")(Messages.MachineId("", ""))

      def getOffset(tp: TopicPartition) = {
      val query =  and(equal(KfkModels.SCHEMA.TOPIC, tp.topic()),
                         equal(KfkModels.SCHEMA.PARTITION,tp.partition()))
      def offset: Future[Seq[Document]] = colOffset.find(query).toFuture()
      for {
          docs <- offset
          ofs <- FastFuture.successful(if(docs.isEmpty) None
          else Some(Offsets.fromDocument(docs.head)))
        } yield ofs
      }
      val listFut = partitions.toList.map(getOffset)
      val futList: Future[List[Option[KfkModels.Offsets]]] = FastFuture.sequence(listFut)
      futList.map { oofs =>
        oofs.foldRight(Map[TopicPartition,Long]()){(oof,m) =>
          oof match {
            case None => m
            case ofs => m + (new TopicPartition(ofs.get.topic,ofs.get.partition) -> ofs.get.offset)
          }
        }
      }
  }

注意loadOffset的函数类型: ?Set[TopicPartition] => Future[Map[TopicPartition, Long]],返回的是个Map[partition,offset]。

另外,plainPartitionedManualSource返回Source[...Source[ConsumerRecord[K, V]],要用flatMapMerge打平:

  /**
   * Transform each input element into a `Source` of output elements that is
   * then flattened into the output stream by merging, where at most `breadth`
   * substreams are being consumed at any given time.
   *
   * '''Emits when''' a currently consumed substream has an element available
   *
   * '''Backpressures when''' downstream backpressures
   *
   * '''Completes when''' upstream completes and all consumed substreams complete
   *
   * '''Cancels when''' downstream cancels
   */
  def flatMapMerge[T, M](breadth: Int, f: Out => Graph[SourceShape[T], M]): Repr[T] =
    map(f).via(new FlattenMerge[T, M](breadth))

参数breadth代表需合并的source数量。

还有,saveOffset和writeStkTxns在同一个事务处理里:

  def docToStkTxns(jsonDoc: String, partition: Int, offset: Long, observable: SingleObservable[ClientSession]) = {
    val bizDoc = fromJson[BizDoc](jsonDoc)
    log.step(s"TxnalDocToStkTxns-docToStkTxns: $bizDoc")(Messages.MachineId("", ""))

    observable.map(clientSession => {
      val transactionOptions = TransactionOptions.builder()
        .readPreference(ReadPreference.primary())
        .readConcern(ReadConcern.SNAPSHOT)
        .writeConcern(WriteConcern.MAJORITY)
        .build()
      clientSession.startTransaction(transactionOptions)
      val txns = StkTxns.docToTxns(dbStkTxn,dbVtx,dbVendor,bizDoc,trace)
      StkTxns.writeStkTxns(clientSession,colStkTxn,colPcm,txns,trace)
      offsetStore.saveOffset(clientSession,partition,offset)
      clientSession.commitTransaction()
      clientSession
    })

  }

注意:mongodb的事务处理必须在复制集replica-set上进行。这也很容易理解,在复制集上才方便交易回滚rollback。

完整的exactly-once实现代码如下:

  private def loadOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition,Long]] = {
    offsetStore.getOffsets(partitions)
  }

  val mergedSource = Consumer
    .plainPartitionedManualOffsetSource(consumerSettings,subscription,
      loadOffsets)
    .flatMapMerge(maxReaders, _._2)

  def start = {
    (1 to numReaders).toList.map {_ =>
      RestartSource
        .onFailuresWithBackoff(restartSource) { () => mergedSource }
//        .viaMat(KillSwitches.single)(Keep.right)
        .async.mapAsync(1) { msg =>
          for {
            cmt <- stkTxns.stkTxnsWithRetry(msg.value(), msg.partition(), msg.offset()).toFuture().map(_ => "Completed")
            pmsg <- FastFuture.successful {
              log.step(s"ExactlyOnceReaderGroup-stkTxnsWithRetry: committed transaction-$cmt")(Messages.MachineId("", ""))
              msg
            }
          } yield pmsg
        }
        .async.mapAsync(1) { msg =>
          for {
            curstks <- curStk.updateStk(msg.value())
            pmsg <- FastFuture.successful {
              log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", ""))
              msg
            }
          } yield pmsg
        }
        .async.mapAsync(1) { msg =>
          for {
            pcm <- pcmTxns.writePcmTxn(msg.value())
            pmsg <- FastFuture.successful {
              log.step(s"AtMostOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", ""))
              msg
            }
          } yield pmsg
        }
        .async.mapAsync(1) { msg =>
          for {
            _ <- pcmTxns.updatePcm(msg.value())
          } yield "Completed"
        }
        .toMat(Sink.seq)(Keep.left)
        .run()
    }
  }

只有第一个异步阶段使用了事务处理。也就是说保证了writeStkTxns只执行一次。这个函数的功能主要是把前端产生的交易全部固化。为了避免消费过程中出现异常中断造成了前端交易的遗失或者重复入账,必须保证前端交易只固化一次。其它阶段的数据处理都是基于已正确固化的交易记录的。如果出现问题,可以通过重算交易记录获取正确的状态。为了保证平台运行效率,选择了不使用事务处理的方式更新数据。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-17 01:28:56  更:2021-08-17 01:29:00 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:13:57-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码