public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temperature");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("t1");
AtomicInteger i = new AtomicInteger(0);
stream
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(5)))
.aggregate(
()->0L,
(k,v,sum)->{
long aLong = Long.parseLong(v);
if(sum == 0){
sum = aLong;
}
return sum>aLong?aLong:sum;
},
Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-temp-stream-store")
.withValueSerde(Serdes.Long())
).toStream().foreach((k,v)->{
System.out.println(v);
});
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(),props);
kafkaStreams.start();
}
|