? 上篇介绍了kafka at-least-once消费模式。kafka消费模式以commit-offset的时间节点代表不同的消费模式,分别是:at-least-once, at-most-once, exactly-once。上篇介绍的at-least-once消费模式是通过kafka自身的auto-commit实现的。事后想了想,这个应该算是at-most-once模式,因为消费过程不会影响auto-commit,kafka在每个设定的间隔都会自动进行offset-commit。如果这个间隔够短,比整个消费过程短,那么在完成消费过程前就已经保存了offset,所以是at-most-once模式。不过,如果确定这个间隔一定大于消费过程,那么又变成了at-least-once模式。具体能实现什么消费模式并不能明确,因为auto-commit是无法从外部进行控制的。看来实现正真意义上的at-least-once消费模式还必须取得offset-commit的控制权才行。
alpakka-kafka提供了一种CommittableSource:
def committableSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[CommittableMessage[K, V], Control] {...}
从这个CommittableSource输出的元素是CommittableMessage[K,V]:
final case class CommittableMessage[K, V](
record: ConsumerRecord[K, V],
committableOffset: CommittableOffset
)
这个CommittableMessage除原始消息之外还提供了CommittableOffset。通过Flow或Sink都可以进行offset-commit。alpakka-kafka提供了Committer,通过Committer.sink, Committer.Flow帮助实现offset-commit,Committer.flow如下:
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(1) { msg =>
updateStock.map(_ => msg.committableOffset)
}
.via(Committer.flow(committerDefaults.withMaxBatch(1)))
.to(Sink.seq)
.run()
或Committer.sink:
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(1) { msg =>
updateStock.map(_ => msg.committableOffset)
}
.toMat(Committer.sink(committerSettings))(Keep.left)
.run()
下面是一个具体的at-least-once示范:
val committerSettings = CommitterSettings(sys).withMaxBatch(commitMaxBatch)
val stkTxns = new DocToStkTxns(trace)
val curStk = new CurStk(trace)
val pcmTxns = new PcmTxns(trace)
val commitableSource = Consumer
.committableSource(consumerSettings, subscription)
def start =
(1 to numReaders).toList.map { _ =>
RestartSource
.onFailuresWithBackoff(restartSource) { () => commitableSource }
// .viaMat(KillSwitches.single)(Keep.right)
.async.mapAsync(1) { msg =>
for {
_ <- FastFuture.successful {
log.step(s"AtLeastOnceReaderGroup-msg: ${msg.record}")(Messages.MachineId("", ""))
}
_ <- stkTxns.docToStkTxns(msg.record.value())
pmsg <- FastFuture.successful {
log.step(s"AtLeastOnceReaderGroup-docToStkTxns: ${msg.record}")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
curstks <- curStk.updateStk(msg.record.value())
pmsg <- FastFuture.successful {
log.step(s"AtLeastOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
pcm <- pcmTxns.writePcmTxn(msg.record.value())
pmsg <- FastFuture.successful {
log.step(s"AtLeastOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
_ <- pcmTxns.updatePcm(msg.record.value())
} yield "Completed"
FastFuture.successful(msg.committableOffset)
}
.toMat(Committer.sink(committerSettings))(Keep.left)
.run()
}
消费过程其它部分的设计考虑和实现,如多线程、异常处理等可参考上篇讨论。
对于at-most-once消费模式的实现,alpakka-kafka提供了atMostOnceSource:
def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[ConsumerRecord[K, V], Control] = {...}
下面是用这个Source实现at-most-once的示范:
val atmostonceSource = Consumer
.atMostOnceSource(consumerSettings, subscription)
def start =
(1 to numReaders).toList.map { _ =>
RestartSource
.onFailuresWithBackoff(restartSource) { () => atmostonceSource }
// .viaMat(KillSwitches.single)(Keep.right)
.async.mapAsync(1) { msg =>
for {
_ <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-msg: $msg")(Messages.MachineId("", ""))
}
_ <- stkTxns.docToStkTxns(msg.value())
pmsg <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-docToStkTxns: $msg")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
_ <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-updateStk: msg: $msg")(Messages.MachineId("", ""))
}
curstks <- curStk.updateStk(msg.value())
pmsg<- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
_ <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-writePcmTxn: msg: $msg")(Messages.MachineId("", ""))
}
pcm <- pcmTxns.writePcmTxn(msg.value())
pmsg <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
_ <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-updatePcm: msg: $msg")(Messages.MachineId("", ""))
}
_ <- pcmTxns.updatePcm(msg.value())
_ <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-updateStk: updatePcm-$msg")(Messages.MachineId("", ""))
}
} yield "Completed"
}
.toMat(Sink.seq)(Keep.left)
.run()
}
由于offset-commit和消息消费是两个独立的过程,无论如何努力都无法保证只读一次,必须把这两个过程合并成一个才有可能实现。所以,exactly-once可以通过数据库系统的事务处理transaction-processing来实现,就是把offset-commit和数据更新两个动作放到同一个事务transaction里,通过事务处理的ACID原子特性保证两个动作同进同退的一致性。这也意味着这个exactly-once消费模式必须在一个提供事务处理功能的数据库系统里实现,也代表kafka-offset必须和其它交易数据一起存放在同一种数据库里。mongodb4.0以上支持事务处理,可以用来作示范。
首先,先研究一下exactly-once模式的框架:
val mergedSource = Consumer
.plainPartitionedManualOffsetSource(consumerSettings,subscription,
loadOffsets)
.flatMapMerge(maxReaders, _._2)
.async.mapAsync(1) { msg =>
for {
cmt <- stkTxns.stkTxnsWithRetry(msg.value(), msg.partition(), msg.offset()).toFuture().map(_ => "Completed")
pmsg <- FastFuture.successful {
log.step(s"ExactlyOnceReaderGroup-stkTxnsWithRetry: committed transaction-$cmt")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.mapAsync(1) { msg =>
for {
curstks <- curStk.updateStk(msg.value())
pmsg <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.toMat(Sink.seq)(Keep.left)
.run()
}
}
在上面的例子里使用了plainPartitionedManualOffsetSource:
def plainPartitionedManualOffsetSource[K, V](
settings: ConsumerSettings[K, V],
subscription: AutoSubscription,
getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, Long]],
onRevoke: Set[TopicPartition] => Unit = _ => ()
): Source[(TopicPartition, Source[ConsumerRecord[K, V], NotUsed]), Control] = {...}
getOffsetsOnAssign提供指定partition的offset(从数据库里读出指定partition的offset值),如下:
private def loadOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition,Long]] = {
offsetStore.getOffsets(partitions)
}
def getOffsets(partitions: Set[TopicPartition])(
implicit ec: ExecutionContext) = {
log.step(s"OffsetStore-getOffsets: ($partitions)")(Messages.MachineId("", ""))
def getOffset(tp: TopicPartition) = {
val query = and(equal(KfkModels.SCHEMA.TOPIC, tp.topic()),
equal(KfkModels.SCHEMA.PARTITION,tp.partition()))
def offset: Future[Seq[Document]] = colOffset.find(query).toFuture()
for {
docs <- offset
ofs <- FastFuture.successful(if(docs.isEmpty) None
else Some(Offsets.fromDocument(docs.head)))
} yield ofs
}
val listFut = partitions.toList.map(getOffset)
val futList: Future[List[Option[KfkModels.Offsets]]] = FastFuture.sequence(listFut)
futList.map { oofs =>
oofs.foldRight(Map[TopicPartition,Long]()){(oof,m) =>
oof match {
case None => m
case ofs => m + (new TopicPartition(ofs.get.topic,ofs.get.partition) -> ofs.get.offset)
}
}
}
}
注意loadOffset的函数类型: ?Set[TopicPartition] => Future[Map[TopicPartition, Long]],返回的是个Map[partition,offset]。
另外,plainPartitionedManualSource返回Source[...Source[ConsumerRecord[K, V]],要用flatMapMerge打平:
/**
* Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by merging, where at most `breadth`
* substreams are being consumed at any given time.
*
* '''Emits when''' a currently consumed substream has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and all consumed substreams complete
*
* '''Cancels when''' downstream cancels
*/
def flatMapMerge[T, M](breadth: Int, f: Out => Graph[SourceShape[T], M]): Repr[T] =
map(f).via(new FlattenMerge[T, M](breadth))
参数breadth代表需合并的source数量。
还有,saveOffset和writeStkTxns在同一个事务处理里:
def docToStkTxns(jsonDoc: String, partition: Int, offset: Long, observable: SingleObservable[ClientSession]) = {
val bizDoc = fromJson[BizDoc](jsonDoc)
log.step(s"TxnalDocToStkTxns-docToStkTxns: $bizDoc")(Messages.MachineId("", ""))
observable.map(clientSession => {
val transactionOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build()
clientSession.startTransaction(transactionOptions)
val txns = StkTxns.docToTxns(dbStkTxn,dbVtx,dbVendor,bizDoc,trace)
StkTxns.writeStkTxns(clientSession,colStkTxn,colPcm,txns,trace)
offsetStore.saveOffset(clientSession,partition,offset)
clientSession.commitTransaction()
clientSession
})
}
注意:mongodb的事务处理必须在复制集replica-set上进行。这也很容易理解,在复制集上才方便交易回滚rollback。
完整的exactly-once实现代码如下:
private def loadOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition,Long]] = {
offsetStore.getOffsets(partitions)
}
val mergedSource = Consumer
.plainPartitionedManualOffsetSource(consumerSettings,subscription,
loadOffsets)
.flatMapMerge(maxReaders, _._2)
def start = {
(1 to numReaders).toList.map {_ =>
RestartSource
.onFailuresWithBackoff(restartSource) { () => mergedSource }
// .viaMat(KillSwitches.single)(Keep.right)
.async.mapAsync(1) { msg =>
for {
cmt <- stkTxns.stkTxnsWithRetry(msg.value(), msg.partition(), msg.offset()).toFuture().map(_ => "Completed")
pmsg <- FastFuture.successful {
log.step(s"ExactlyOnceReaderGroup-stkTxnsWithRetry: committed transaction-$cmt")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
curstks <- curStk.updateStk(msg.value())
pmsg <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
pcm <- pcmTxns.writePcmTxn(msg.value())
pmsg <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
_ <- pcmTxns.updatePcm(msg.value())
} yield "Completed"
}
.toMat(Sink.seq)(Keep.left)
.run()
}
}
只有第一个异步阶段使用了事务处理。也就是说保证了writeStkTxns只执行一次。这个函数的功能主要是把前端产生的交易全部固化。为了避免消费过程中出现异常中断造成了前端交易的遗失或者重复入账,必须保证前端交易只固化一次。其它阶段的数据处理都是基于已正确固化的交易记录的。如果出现问题,可以通过重算交易记录获取正确的状态。为了保证平台运行效率,选择了不使用事务处理的方式更新数据。
|