1、使用累加器,计算输入单词的个数,在job结束后,输出结果
2、代码实现
public class Demo10 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> line = env.socketTextStream("localhost", 8888);
line.keyBy(e->e.split(",")[0]).process(
new KeyedProcessFunction<String, String, Tuple2<String,Integer>>() {
private ValueState<Integer> valueState;
private IntCounter wordCount = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("count_state", Integer.class);
stateDescriptor.setQueryable("query_count");
valueState = getRuntimeContext().getState(stateDescriptor);
System.out.println("job_id=>"+getRuntimeContext().getJobId());
getRuntimeContext().addAccumulator("word_count",wordCount);
}
@Override
public void processElement(String line, KeyedProcessFunction<String, String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] fields = line.split(",");
String name = fields[0];
Integer count = Integer.valueOf(fields[1]);
wordCount.add(1);
Integer oldResult = valueState.value();
if(oldResult == null)
{
oldResult = 0;
}
count = oldResult + count;
valueState.update(count);
out.collect(new Tuple2<>(name,count));
}
}
).print();
JobExecutionResult result = env.execute();
Integer num = result.getAccumulatorResult("word_count");
System.out.println("计算单词个数是:"+num);
}
}
|