IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka Streams -> 正文阅读

[大数据]Kafka Streams

流计算定义

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。批量处理模型中,一般先有全量数据集,然后定义计算逻辑,并将计算应用于全量数据。特点是全量计算,并且计算结果一次性全量输出。
在这里插入图片描述

Kafka Stream

Kafka Streams是一个客户端库,用于处理和分析存储在Kafka中的数据。它建立在重要的流处理概念之上,正确区分EventTime和ProcessTime,Widows计算,可以实现对应用状态高效管理和实时查询。Kafka Streams进入门槛低。可以在单机上验证流处理的概念。同时可以利用Kafka的并行加载模型,实现流处理并行扩展,也就意味着用户只需要将自己流处理程序运行多份即可达到并行计算的目的。

Kafka Streams优点:简单、轻巧易部署、无缝对接Kafka、基于分区实现计算并行、基于幂等和事务特性实现精确计算、单个记录毫秒级延迟计算-实时性高、提供了两套不同风格的流处理API-(High level-Domain Specific Language|DSL开箱即用;low-level Processor API.)

名词解析

Topology:表示一个流计算任务,等价于MapReduce中的job。不同的是MapReduce的job作业最终会停止,但是Topology会一直运行在内存中,除非人工关闭该Topology。

stream:它代表了一个无限的,不断更新的Record数据集。流是有序,可重放和容错的不可变数据记录序列,其中数据记录被定义为键值对。

所谓的流处理是通过Topology编织程序对stream中Record元素的处理的逻辑/流程。这种计算和早期MapReduce计算的最大差异是该计算的实时性比较高,可以满足绝大多数的实时计算场景。Kafka Stream以它的轻量级容易部署低延迟等特点在微服务领域相比较 专业的 Storm、spark streaming和Flink 而言有着不可替代的优势。有关Storm、SparkStreaming和Flink的内容随着课程的深入会在后续章节再展开讨论。

架构

Kafka Streams通过构建Kafka生产者和消费者库并利用Kafka的本机功能来提供数据并行性,分布式协调,容错和操作简便性,从而简化了应用程序开发。
在这里插入图片描述
Kafka的消息分区用于存储和传递消息, Kafka Streams对数据进行分区以进行处理。 Kafka Streams使用partition和 Task的概念作为基于Kafka Topic分区的并行模型的逻辑单元。在并行化的背景下,Kafka Streams和Kafka之间有着密切
的联系:

  • 每个stream分区都是完全有序的数据记录序列,并映射到Kafka Topic分区。
  • stream中的数据记录映射到该Topic的Kafka消息。
  • 数据记录的key决定了Kafka和Kafka Streams中数据的分区,即数据如何路由到Topic内的特定分区。

任务并行度

应用程序的处理器Topology通过将其分解为多个Task来扩展。更具体地说,Kafka Streams基于应用程序的输入流分区
创建固定数量的任务,每个任务分配来自输入流的分区列表。分区到任务的分配永远不会改变,因此每个任务都是应
用程序的固定平行单元。然后,任务可以根据分配的分区实例化自己的Topology;它们还为每个分配的分区维护一个缓
冲区,并从这些记录缓冲区一次一个地处理消息。因此,流任务可以独立并行地处理,无需人工干预。

用户可以启动多个KafkaStream实例,这样等价启动了多个Stream Tread,每个Thread处理1~n个Task。一个Task对应一
个分区,因此Kafka Stream流处理的并行度不会超越Topic的分区数。需要值得注意的是Kafka的每个Task都维护这自身
的一些状态,线程之间不存在状态共享和通信。因此Kafka在实现流处理的过程中扩展是非常高效的。

容错

Kafka Streams构建于Kafka本地集成的容错功能之上。 Kafka分区具有高可用性和复制性;因此当流数据持久保存到
Kafka时,即使应用程序失败并需要重新处理它也可用。 Kafka Streams中的任务利用Kafka消费者客户端提供的容错功
能来处理故障。如果任务运行的计算机故障了,Kafka Streams会自动在其余一个正在运行的应用程序实例中重新启动
该任务。

此外,Kafka Streams还确保local state store也很有力处理故障容错。对于每个state store,Kafka Stream维护一个带有
副本changelog的Topic,在该Topic中跟踪任何状态更新。这些changelog Topic也是分区的,该分区和Task是一一对应
的。如果Task在运行失败并Kafka Stream会在另一台计算机上重新启动该任务,Kafka Streams会保证在重新启动对新
启动的任务的处理之前,通过重播相应的更改日志主题,将其关联的状态存储恢复到故障之前的内容。

编程

所有资料均参考:https://kafka.apache.org/22/documentation/streams/developer-guide/

Low-Level API(低级API,非重点)

依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency> 
	<groupId>org.apache.kafka</groupId> 
	<artifactId>kafka-streams</artifactId> 
	<version>2.2.0</version> 
</dependency>

快速入门

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

//WordCountProcessor:数据计算逻辑
public class WordCountProcessor implements Processor<String, String> {
	private ProcessorContext context;
    private HashMap<String,Integer> wordPair=null;
    
	@Override
    public void init(ProcessorContext context) {
        System.out.println("-----init----");
        this.context=context;
        wordPair=new HashMap<>();
        //定时输出结果
        context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME,(ts)->{
            for (Map.Entry<String, Integer> entry : wordPair.entrySet()) {
                context.forward(entry.getKey(),entry.getValue());
            }
        });
    }

	@Override
	public void process(String key, String value) {
		String[] words = value.split("\\W+");
        for (int i = 0; i < words.length; i++) {
            int count=0;
            if(wordPair.containsKey(words[i])){
                count=wordPair.get(words[i]);
            }
            count+=1;
            wordPair.put(words[i],count);
        }
	}

	@Override
	public void close() {

	}
}
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;

//WordCountTopologyDemo:拓扑类,启动拓扑,处理数据
public class WordCountTopologyDemo {
    public static void main(String[] args) {
        //0.配置KafkaStreams的连接信息
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-lowlevel");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        //配置默认的key序列化和反序列化
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        //1.定义计算拓扑
        Topology topology=new Topology();

        topology.addSource("s1","topic01");
        topology.addProcessor("p1",() -> new WordCountProcessor(),"s1");
        topology.addSink("sk1","topic02",
                new StringSerializer(),
                new IntegerSerializer(),"p1");

        //3.创建KafkaStreams
        KafkaStreams kafkaStreams=new KafkaStreams(topology,props);
        //4.启动计算
        kafkaStreams.start();
    }
}


Processor API

Processor API允许开发人员定义和连接自定义Processor并与state store进行交互。使用Processor API,可以定义一次处理一个接收record的任意流处理器,并将这些处理器与其关联的状态存储连接起来,以组成代表自定义处理逻辑的处理器拓扑。

public interface Processor<K, V> {

    void init(ProcessorContext context);
    
    void process(K key, V value);

    void close();
}

思考状态可靠性?

就上面案例而言如果WordCountTopologyDemo存在以下问题

  • 1.宕机则计算的状态丢失
  • 2.并没有考虑状态中keys的数目,一旦数目过大,会导致流计算服务内存溢出。

以上问题的解决之道是通过配置Kafka stateStore存储。

配置StateStore

String storeName="wdcount";
Map<String, String> changelogConfig = new HashMap();
changelogConfig.put("min.insync.replicas", "1");
changelogConfig.put("cleanup.policy","compact");

StoreBuilder<KeyValueStore<String, Integer>> countStore = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore(storeName),
    Serdes.String(),
    Serdes.Integer())
    .withLoggingEnabled(changelogConfig);

事实StateStore本质是一个Topic,但是该topic的清除策略不再是delete,而是compact.

关联StateStore和Processor

public class WordCountTopologyDemo {
    public static void main(String[] args) {
        //0.配置KafkaStreams的连接信息
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-lowlevel");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        //配置默认的key序列化和反序列化
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        //1.定义计算拓扑
        Topology topology=new Topology();

        
        //创建state,存储状态信息
        String storeName="wdcount";
        Map<String, String> changelogConfig = new HashMap();
        changelogConfig.put("min.insync.replicas", "1");
        changelogConfig.put("cleanup.policy","compact");
        changelogConfig.put("log.cleaner.min.compaction.lag.ms","1000");

        StoreBuilder<KeyValueStore<String, Integer>> countStore = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(storeName),
                Serdes.String(),
                Serdes.Integer())
                .withLoggingEnabled(changelogConfig);


        topology.addSource("s1","topic01")
        .addProcessor("p1",() -> new WordCountProcessor(storeName),"s1")
        .addStateStore(countStore,"p1")
                .addSink("sk1","topic02",
                new StringSerializer(),
                new IntegerSerializer(),"p1");

        //3.创建KafkaStreams
        KafkaStreams kafkaStreams=new KafkaStreams(topology,props);
        //4.启动计算
        kafkaStreams.start();
    }
}

//数据处理类中使用state
public class WordCountProcessor implements Processor<String,String> {
    private ProcessorContext context;
    private String storeName;
    private KeyValueStore<String,Integer> stateStore;
    public WordCountProcessor(String storeName) {
        this.storeName = storeName;
    }

    @Override
    public void init(ProcessorContext context) {
        this.context=context;
        stateStore = (KeyValueStore<String, Integer>) context.getStateStore(storeName);
        //定时输出结果
        context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME,(ts)->{
            KeyValueIterator<String, Integer> keyValueIterator = stateStore.all();
            while (keyValueIterator.hasNext()){
                KeyValue<String, Integer> keyValue = keyValueIterator.next();
                context.forward(keyValue.key,keyValue.value);
            }
            keyValueIterator.close();
            context.commit();
        });
    }
    @Override
    public void process(String key, String value) {
        String[] words = value.split("\\W+");
        for (int i = 0; i < words.length; i++) {
            int count=0;
            Integer historyCount = stateStore.get(words[i]);
            if(historyCount!=null){
                count=historyCount;
            }
            count+=1;
            stateStore.put(words[i],count);
        }

    }
    @Override
    public void close() {

    }
}

注意在运行的时候可能会抛出以下异常:
Exception in thread “word-count-lowlevel-2f2bfa85-1cf7-4734-9630-c3e23353f119-StreamThread-1” java.lang.UnsatisfiedLinkError: C:\Users\Administrator\AppData\Local\Temp\librocksdbjni2211349152259948554.dll: Can’t find dependent libraries
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
at org.rocksdb.RocksDB.(RocksDB.java:35)
at org.rocksdb.DBOptions.(DBOptions.java:21)
at org.apache.kafka.streams.state.internals.RocksDBStore.openDB

需要安装vc_redist.x64.exe安装包https://download.microsoft.com/download/9/3/F/93FCF1E7-E6A4-478B-96E7-D4B285925B00/vc_redist.x64.exe

此时查询topic列表,会发现生成一个名为word-count-lowlevel-wdcount-changelog的topic,其中word-count-lowlevel为代码中设置的StreamsConfig.APPLICATION_ID_CONFIG,
wdcount为代码中设置的storeName,changelog为固定值

Streams DSL(重点)

Kafka Streams DSL(Domain Specific Language)构建于Streams Processor API之上。它是大多数用户推荐的,特别是初学者。大多数数据处理操作只能用几行DSL代码表示。在 Kafka Streams DSL 中有这么几个概念

KStream:表示数据流,所有的在topic中的记录被认定为是一个INSERT操作。

KTable:表示changelog数据流,每一则记录被解释称为一个update,如果你要将KTable存储到Kafka topic中,你可能想要启用Kafka的日志压缩功能,例如:节省存储空间。但是,在KStream的情况下启用日志压缩是不安全的,因为只要日志压缩开始清除相同key的旧数据记录,就会破坏数据的语义。KTable还提供了按key查找数据记录的当前value的功能。此表查找功能可通过join操作以及“交互式查询”获得。

KStream是一个数据流,可以认为所有记录都通过Insert only的方式插入进这个数据流里。而KTable代表一个完整的数据集,可以理解为数据库中的表。由于每条记录都是Key-Value对,这里可以将Key理解为数据库中的Primary Key,而Value可以理解为一行记录。可以认为KTable中的数据都是通过Update only的方式进入的。如果KTable对应的Topic中新进入的数据的Key已经存在,那么从KTable只会取出同一Key对应的最后一条数据,相当于新的数据更新了旧的数据。

在这里插入图片描述
以上图为例,假设有一个KStream和KTable,基于同一个Topic创建,并且该Topic中包含如下图所示5条数据。此时遍历KStream将得到与Topic内数据完全一样的所有5条数据,且顺序不变。而此时遍历KTable时,因为这5条记录中有3个不同的Key,所以将得到3条记录,每个Key对应最新的值,并且这三条数据之间的顺序与原来在Topic中的顺序保持一致。

GlobalKTable:和KTable类似,不同点在于KTable只能表示一个分区的信息,但是GlobalKTable表示的是全局 的状态信息。

快速入门(wordCount)

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Properties;

public class WordCountDSLTopology {
    public static void main(String[] args) {
        //0.配置KafkaStreams的连接信息
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-dsl");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        //1.创建StreamsBuilder
        StreamsBuilder builder = new StreamsBuilder();

        //2.编织topology ,学习使用 Processor API
        builder.stream("topic01", Consumed.with(Serdes.String(),Serdes.String()))
                .flatMapValues((value)-> Arrays.asList(value.split("\\W+")))
                .selectKey((key,value) -> value)
                .mapValues((v)->1)
                .groupBy((String k,Integer v)->k,Grouped.with(Serdes.String(),Serdes.Integer()))
                .reduce((v1,v2)->v1+v2,
                        Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("wdsl_count")
                                .withKeySerde(Serdes.String())
                                .withValueSerde(Serdes.Integer()))
                .toStream()
                .peek((k,v)->{
                    System.out.println(k+"\t"+v);
                });
        //3.提交计算
        KafkaStreams kafkaStreams=new KafkaStreams(builder.build(),props);
        kafkaStreams.start();

    }
}

KStream创建- 流处理入口

StreamsBuilder builder = new StreamsBuilder();
//指定topic中k,v序列化和反序列化
KStream<String, String> stream = builder.stream("topic01", 
                                                Consumed.with(Serdes.String(), Serdes.String()));

Transformations(转换算子)

算子分为有状态和无状态两种算子

无状态算子(返回值为KStream)

Branch(分支)

可以将一个Stream拆分成多个Stream

KStream<String, String>[] branches = builder.stream(
    "topic01", //输入topic
    Consumed.with(
      Serdes.String(), /* key serde */
      Serdes.String()   /* value serde */
  )
)
  .branch(
      (k, v) -> v.contains("login"), //带有login的分为一个stream
      (k, v) -> v.contains("cart"),  //带有cart的分为一个stream
      (k, v) -> true	//其他的为一组
	);
KStream<String, String> loginStream = branches[0];
KStream<String, String> cartStream = branches[1];
KStream<String, String> otherStream = branches[2];

//在控制台打印输出cartStream日志流 
cartStream.print(Printed.toSysOut());

Filter|filterNot

过滤满足条件的数据,将满足条件的结果向后传递

//只过滤含有ERROR的日志流
StreamsBuilder builder = new StreamsBuilder();
builder.stream("topic01", Consumed.with(Serdes.String(), Serdes.String()))
		.filter((k,v)-> v.contains("ERROR")) 
		.print(Printed.toSysOut());

map|mapValues

map算子主要针对k,v做转换要求返回 KeyValue<?,?> ;mapValues针对v做转换,要求返回Object

//使用map算子将value的值封装为User对象返回
builder.stream("topic01", Consumed.with(Serdes.String(), Serdes.String()))
	.map((k,v)->{
		String[] tokens = v.split("\\W+");
		User u=new User(Integer.parseInt(tokens[0]),tokens[1],
						Boolean.valueOf(tokens[2]),Integer.parseInt(tokens[3]));
		return new KeyValue<String,User>(k,u);
	})
	.print(Printed.toSysOut());


//使用mapValues算子将value的值封装为User对象返回
builder.stream("topic01", Consumed.with(Serdes.String(), Serdes.String()))
	.mapValues((v)->{
		String[] tokens = v.split("\\W+");
		User u=new User(Integer.parseInt(tokens[0]),tokens[1],
					Boolean.valueOf(tokens[2]),Integer.parseInt(tokens[3]));
		return u;
	})
	.print(Printed.toSysOut());

flatMap|flatMapValues

flatMap作用是将会一条记录变成多条记录并且将多条记录展开。
????????????????map??????????????????????????????? flat
k-> v --> k->[v1,v2,v3,…] —> k->v1,k-v2,k->v3,…

builder.stream("topic01", Consumed.with(Serdes.String(), Serdes.String()))
		.flatMap((k,v)->{
			List<KeyValue<String,String>> list=new ArrayList<>();
			String[] tokens = v.split("\\W+");
			for (String token : tokens) {
				list.add(new KeyValue<>(k,token));
			}
			return list;
		})
		.print(Printed.toSysOut());


builder.stream("topic01", Consumed.with(Serdes.String(), Serdes.String()))
		.flatMapValues((v)->{
			List<String> list=new ArrayList<>();
			String[] tokens = v.split("\\W+");
			for (String token : tokens) {
				list.add(token);
			}
			return list;
		})
		.print(Printed.toSysOut());

selectKey

修改记录中的key

builder.stream("topic01", Consumed.with(Serdes.String(), Serdes.String()))
		.mapValues((v)->{
			String[] tokens = v.split("\\W+");
			User u=new User(Integer.parseInt(tokens[0]),tokens[1],
							Boolean.valueOf(tokens[2]),Integer.parseInt(tokens[3]));
			return u;
		})
		.selectKey((k,v)->v.getId()) 
		.print(Printed.toSysOut());		

foreach

遍历kafkaStream计算的数据,一般用于写入第三方系统中.

builder.stream("topic01", Consumed.with(Serdes.String(), Serdes.String()))
		.mapValues((v)->{
			String[] tokens = v.split("\\W+");
			User u=new User(Integer.parseInt(tokens[0]),tokens[1],
							Boolean.valueOf(tokens[2]),Integer.parseInt(tokens[3]));
			return u;
		})
		.selectKey((k,v)->v.getId())
		.foreach((k,v)->{
			//这里只简单的打印一下
			System.out.println(k+"=>"+v);
		});

merger

可以将多个流中数据合并在一起输出

//user1结果:1 zhangsan false 18
KStream<String, User> user1 = builder.stream("topic01", Consumed.with(Serdes.String(), Serdes.String()))
							.mapValues((v) -> {
								String[] tokens = v.split("\\W+");
								User u=new User(Integer.parseInt(tokens[0]),tokens[1],
												Boolean.valueOf(tokens[2]),Integer.parseInt(tokens[3]));
								return u;
							});
//user2结果:2 lisi 18 false
KStream<String, User> user2 = builder.stream("topic02", Consumed.with(Serdes.String(), Serdes.String()))
							.mapValues((v) -> {
								String[] tokens = v.split("\\W+");
								User u=new User(Integer.parseInt(tokens[0]),tokens[1],
												Boolean.valueOf(tokens[2]),Integer.parseInt(tokens[3]));
								return u;
							});
//user1和user2合并一起输出
user1.merge(user2)
	.selectKey((k,v)->v.getId())
	.foreach((k,v)->{
		System.out.println(k+"=>"+v);
	});													

through

类似与shuffle功能,可以将key相同的record存储到同一个分区,上游的数据会经过through的topic实现shuffle的功能。

//wordshuffle:是创建的另一个topic
builder.stream("topic01", Consumed.with(Serdes.String(), Serdes.String()))
		.flatMapValues((line)-> Arrays.asList(line.split("\\W+")))
		.selectKey((k,v)->v)
		.through("wordshuffle",Produced.with(Serdes.String(),Serdes.String()))
		.print(Printed.toSysOut());

Peek

作为程序执行的探针,一般用于debug调试,因为peek并不会对后续的流数据带来任何影响。

builder.stream("topic01", Consumed.with(Serdes.String(), Serdes.String()))
		.peek((k,v)-> System.out.println(k+"\t"+v))
		.filter((k,v)->v.contains("login"))
		.peek((k,v)-> System.out.println(k+" ->" +v));

Print

打印输出

groupByKey:根据key进行分组
groupBy:根据自定义的信息进行分组
都是无状态算子,但一般和有状态算子一起使用。

有状态算子(返回值为KTable)

有状态转换值得是每一次的处理都需要操作关联StateStore实现有状态更新。例如,在aggregating 操作中,window state store用于收集每个window的最新聚合结果。在join操作中,窗口状态存储用于收集到目前为止在定义的window边界内接收的所有记录。状态存储是容错的。如果发生故障,Kafka Streams保证在恢复处理之前完全恢复所有状态存储。

DSL中可用的有状态转换包括

kafka官网上展示了它们的关系:
在这里插入图片描述

aggregate

聚合 有状态的转换算子

//0.配置KafkaStreams的连接信息 
Properties props = new Properties(); 
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-dsl"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092"); 
//设置本地状态存储 
props.put(StreamsConfig.STATE_DIR_CONFIG,"E:\\kafkastates"); 
props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG,"30000");

//1.构建StreamsBuilder
StreamsBuilder builder = new StreamsBuilder();
builder.stream("topic01", Consumed.with(Serdes.String(), Serdes.String()))
		.flatMapValues((v)->Arrays.asList(v.split("\\W+")))
		.map((k,v)-> new KeyValue<String,Integer>(v,1))
		.groupByKey(Grouped.with(Serdes.String(),Serdes.Integer()).withName("word_group"))
		// 第一参数:聚合的初始值  第二参数:聚合逻辑  第三个参数:【必须】指定状态存储的KV数据类型
		.aggregate(()->0,(k,v,agg)-> agg+v,Materialized.<String,Integer, KeyValueStore<Bytes, byte[]>>as("wordcount")
										.withKeySerde(Serdes.String()) .withValueSerde(Serdes.Integer()))
		.toStream()
		.print(Printed.toSysOut());

//3.启动拓扑 
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(),props); 
kafkaStreams.start();		

Count

统计key相同的record出现的次数

//1.构建StreamsBuilder
StreamsBuilder builder = new StreamsBuilder();
builder.stream("topic01", Consumed.with(Serdes.String(), Serdes.String()))
		.flatMapValues((v)->Arrays.asList(v.split("\\W+")))
		.map((k,v)-> new KeyValue<String,Integer>(v,1))
		.groupByKey(Grouped.with(Serdes.String(),Serdes.Integer()).withName("word_group"))
		.count(Materialized. <String,Long,KeyValueStore<Bytes,byte[]>>as("wordcout1")
				.withKeySerde(Serdes.String()).withValueSerde(Serdes.Long()))
		.toStream()
		.print(Printed.toSysOut());

Reduce

规约 计算 有状态的转换算子

StreamsBuilder builder = new StreamsBuilder();
builder.stream(
		"topic01", //输入topic
		Consumed.with(
				Serdes.String(), /* key serde */
				Serdes.String()   /* value serde */
		))
		.flatMapValues((String key, String value) -> {
					List<String> result = new ArrayList<>();
					String[] tokens = value.split("\\W+");
					for (String token : tokens) {
						result.add(token);
					}
					return result;
				}
		)
		.selectKey((key,value)->value)
		.mapValues((v)->1)
		.groupByKey(Grouped.with(
				Serdes.String(),
				Serdes.Integer()
		))
		.reduce((v1,v2)->v1+v2,
				Materialized.<String,Integer,KeyValueStore<Bytes,byte[]>>as("reduce-word-count")
						.withKeySerde(Serdes.String())
						.withValueSerde(Serdes.Integer()
						)
		)
		.toStream()
		.print(Printed.toSysOut());

Window :(micro batch(微批),时间维度数据范围的计算)
Window使用户可以控制如何将具有相同键的记录分组,以进行有状态操作,例如aggregate或join等。
DSL支持以下类型的窗口(下面是kafka官网表格):

Window nameBehaviorShort description
Tumbling time windowTime-basedFixed-size, overlapping windows
Hopping time windowTime-basedFixed-size, overlapping windows
Sliding time windowTime-basedFixed-size, overlapping windows that work on differences between record timestamps
Session windowSession-basedDynamically-sized, non-overlapping, data-driven windows

Tumbling time windows:翻滚窗口,固定大小 无重叠

翻滚窗口将流元素按照固定的时间间隔,拆分成指定的窗口,窗口和窗口间元素之间没有重叠。在下图不同颜色的record表示不同的key。可以看是在时间窗口内,每个key对应一个窗口。前闭后开

在这里插入图片描述
下面这个动图理解翻滚窗口会更直观些
在这里插入图片描述

StreamsBuilder builder = new StreamsBuilder();
builder.stream(
		"topic01", //输入topic
		Consumed.with(
				Serdes.String(), /* key serde */
				Serdes.String()   /* value serde */
		))
		.flatMapValues((String key, String value) -> {
					List<String> result = new ArrayList<>();
					String[] tokens = value.split("\\W+");
					for (String token : tokens) {
						result.add(token);
					}
					return result;
				}
		)
		.selectKey((key,value)->value)
		.mapValues((v)->1)
		.groupByKey(Grouped.with(
				Serdes.String(),
				Serdes.Integer()
		))
		.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
		.reduce((v1,v2)->v1+v2,
				Materialized.<String,Integer, WindowStore<Bytes,byte[]>>as("reduce-w-window-count")
						.withKeySerde(Serdes.String())
						.withValueSerde(Serdes.Integer()
						)
		)
		.toStream()
		.print(Printed.toSysOut());

Hopping time windows:跳跃窗口,固定大小 有重叠

Hopping time windows是基于时间间隔的窗口。他们模拟固定大小的(可能)重叠窗口。跳跃窗口由两个属性定义:窗口大小和其提前间隔(又名“hop”)。

在这里插入图片描述
下面这个动图理解跳跃窗口会更直观些
在这里插入图片描述

StreamsBuilder builder = new StreamsBuilder();
builder.stream(
		"topic01", //输入topic
		Consumed.with(
				Serdes.String(), /* key serde */
				Serdes.String()   /* value serde */
		))
		.flatMapValues((String key, String value) -> {
					List<String> result = new ArrayList<>();
					String[] tokens = value.split("\\W+");
					for (String token : tokens) {
						result.add(token);
					}
					return result;
				}
		)
		.selectKey((key,value)->value)
		.mapValues((v)->1)
		.groupByKey(Grouped.with(
				Serdes.String(),
				Serdes.Integer()
		))
		.windowedBy(TimeWindows.of(Duration.ofSeconds(5))
				.advanceBy(Duration.ofSeconds(1)))
		.reduce((v1,v2)->v1+v2,
				Materialized.<String,Integer, WindowStore<Bytes,byte[]>>as("reducewindow-w-count")
						.withKeySerde(Serdes.String())
						.withValueSerde(Serdes.Integer()
						)
		)
		.toStream()
		.print(Printed.toSysOut());

Sliding Window

此窗口只用于2个KStream进行Join计算时。该窗口的大小定义了Join两侧KStream的数据记录被认为在同一个窗口的最大时间差。假设该窗口的大小为5秒,则参与Join的2个KStream中,记录时间差小于5的记录被认为在同一个窗口中,可以进行Join计算。

Session Windows

Session Windows用于将基于key的事件聚合到所谓的会话中,其过程称为session化。会话表示由定义的不活动间隔(或“空闲”)分隔的活动时段。处理的任何事件都处于任何现有会话的不活动间隙内,并合并到现有会话中。如果事件超出会话间隙,则将创建新会话。会话窗口的主要应用领域是用户行为分析。基于会话的分析可以包括简单的指标.

在这里插入图片描述

如果我们接收到另外三条记录(包括两条迟到的记录),那么绿色记录key的两个现有会话将合并为一个会话,从时间0开始到结束时间6,包括共有三条记录。蓝色记录key的现有会话将延长到时间5结束,共包含两个记录。最后,将在11时开始和结束蓝键的新会话。

在这里插入图片描述

StreamsBuilder builder = new StreamsBuilder();
builder.stream(
		"topic01", //输入topic
		Consumed.with(
				Serdes.String(), /* key serde */
				Serdes.String()   /* value serde */
		))
		.flatMapValues((String key, String value) -> {
					List<String> result = new ArrayList<>();
					String[] tokens = value.split("\\W+");
					for (String token : tokens) {
						result.add(token);
					}
					return result;
				}
		)
		.selectKey((key,value)->value)
		.mapValues((v)->1)
		.groupByKey(Grouped.with(
				Serdes.String(),
				Serdes.Integer()
		))
		.windowedBy(SessionWindows.with(Duration.ofSeconds(5)))
		.reduce((v1,v2)->v1+v2,
				Materialized.<String,Integer, SessionStore<Bytes,byte[]>>as("session-word-count")
						.withKeySerde(Serdes.String())
						.withValueSerde(Serdes.Integer()
						)
		)
		.toStream()
		.print(Printed.toSysOut());

Window Final Results

在Kafka Streams中,窗口计算会不断更新其结果。当新数据到达窗口时,向下游发出新计算的结果。但是有时候希望在窗口结束的时候才开始发送最终结果出去,这个时候可以采用suppress方法,该方法会在窗口结束的时候才会将结果发送出去.

场景:计算一个小时内活跃度小于3的用户,并且给活跃度小于该阈值的用户进行发送报警。在这个场景中如果不适宜钳制手段,可能在窗口初期所有的用户都可能接收到该报警。

StreamsBuilder builder = new StreamsBuilder();
builder.stream(
		"topic01", //输入topic
		Consumed.with(
				Serdes.String(), /* key serde */
				Serdes.String()   /* value serde */
		))
		.flatMapValues((String key, String value) -> {
					List<String> result = new ArrayList<>();
					String[] tokens = value.split("\\W+");
					for (String token : tokens) {
						result.add(token);
					}
					return result;
				}
		)
		.selectKey((key,value)->value)
		.mapValues((v)->1)
		.groupByKey(Grouped.with(
				Serdes.String(),
				Serdes.Integer()
		))
		.windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofSeconds(20)))
		.reduce((v1,v2)->v1+v2,
				Materialized.<String,Integer, WindowStore<Bytes,byte[]>>as("session-word-count")
						.withKeySerde(Serdes.String())
						.withValueSerde(Serdes.Integer()
						)
		)
		.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
		.toStream()
    .peek((k,v)->{
          SimpleDateFormat sdf = new SimpleDateFormat("hh:mm:ss");
          Window window = k.window();
          String start=sdf.format(window.start());
          String end=sdf.format(window.end());
          System.out.println(start+" - "+end+"\t"+k.key()+":"+v);
		});

其中:grace表示延迟,例如本案记录触发的窗口的时间如果是12:00:00~12:01:00触发的窗口,系统会在12:01:20秒的时候触发窗口,期间如果有迟到的元素,还可以加进去计算。因为系统会在12:01:20将窗口关闭。

superess表示窗口钳制,也就是在什么时机可以触发窗口向后续的流数据输出窗口统计结果。其中Suppressed.untilWindowCloses表示直到窗口关闭的时候才会触发窗口。如果配置成untilTimeLimit可以指定钳制多久时间将窗口发送出去,这样可以减少更新KTable的时间,提升程序性能。

suppress(Suppressed.untilTimeLimit(Duration.ofMillis(100),
		Suppressed.BufferConfig.maxBytes(1024).emitEarlyWhenFull()))

SpringBoot 集成 KafkaStream

pom依赖

<properties>
  <kafka.version>2.2.0</kafka.version>
</properties>

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.1.0.RELEASE</version>
</parent>

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.5.RELEASE</version>
  </dependency>
  <!-- kafka client处理 -->
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka.version}</version>
  </dependency>
  <!-- kafka 流处理 -->
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>${kafka.version}</version>
  </dependency>
</dependencies>
# 生产者
spring.kafka.producer.bootstrap-servers=CentOS:9092,CentOS:9093,CentOS:9094
spring.kafka.producer.acks=all
spring.kafka.producer.retries=1
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# 消费者
spring.kafka.consumer.bootstrap-servers=CentOS:9092,CentOS:9093,CentOS:9094
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 流处理
spring.kafka.streams.application-id= springboot-kafka-01
spring.kafka.streams.bootstrap-servers=CentOS:9092,CentOS:9093,CentOS:9094
spring.kafka.streams.properties.commit.interval.ms=100	
@SpringBootApplication
@EnableKafkaStreams
@EnableScheduling
public class SpringApplicationTests {
  @Autowired
  private KafkaTemplate kafkaTemplate;

  public static void main(String[] args) {
    SpringApplication.run(SpringApplicationTests.class,args);
  }
  @Scheduled(cron = "00/1 * * * * ?")
  public void send(){
    System.out.println("--------------------------");
    String[] message=new String[]{"this is a demo","hello world","hello boy"};
    ListenableFuture future = kafkaTemplate.send("topic01", message[new Random().nextInt(message.length)]);
    future.addCallback(o -> System.out.println("send-消息发送成功:" + message), throwable -> System.out.println("消息发送失败:" + message));
  }

  @KafkaListener(topics = "topic02",id="g1")
  public void processMessage(ConsumerRecord<?, ?> record) {
    System.out.println("record:"+record);
  }

  @Bean
  public KStream<String,String> kStream(StreamsBuilder builder){
        return builder.stream("topic01", Consumed.with(Serdes.String(), Serdes.String()))
                .flatMapValues(v->Arrays.asList(v.split("\\W+")))
                .map((k,v)-> new KeyValue<String,Integer>(v,1))
                .groupByKey(Grouped.with(Serdes.String(),Serdes.Integer()).withName("word_group"))
                //使用跳跃窗口函数
                .windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofSeconds(1)))
                .reduce((v1,v2)->v1+v2,
                        Materialized.<String,Integer, WindowStore<Bytes,byte[]>>as("reduce_store")
                                .withKeySerde(Serdes.String())
                                .withValueSerde(Serdes.Integer()
                                )
                )
                .toStream()
                .map((Windowed<String>key,Integer value)->new KeyValue<>(key.key(),value+""))
                //穿过topic02
                .through("topic02",Produced.with(Serdes.String(),Serdes.String()));
    }

}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-31 16:42:40  更:2021-07-31 16:44:33 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 19:58:15-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码