import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MapStateDemo {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("linux01", 8888);
env.enableCheckpointing(10000);
SingleOutputStreamOperator<Tuple3<String, String, Double>> maplines = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {
@Override
public Tuple3<String, String, Double> map(String line) throws Exception {
String[] fields = line.split(",");
return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2]));
}
});
KeyedStream<Tuple3<String, String, Double>, String> keyedlines = maplines.keyBy(t -> t.f0);
SingleOutputStreamOperator<Tuple3<String, String, Double>> res = keyedlines.map(new RichMapFunction<Tuple3<String, String, Double>, Tuple3<String, String, Double>>() {
private transient MapState<String, Double> mapState;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>("map_state", String.class, Double.class);
mapState = getRuntimeContext().getMapState(mapStateDescriptor);
}
@Override
public Tuple3<String, String, Double> map(Tuple3<String, String, Double> value) throws Exception {
Double money = value.f2;
String city = value.f1;
Double historymoney = mapState.get(city);
if (historymoney == null) {
historymoney = 0.0;
}
Double totalmoney = historymoney + money;
mapState.put(city, totalmoney);
value.f2 = totalmoney;
return value;
}
});
res.print();
env.execute("MapStateDemo");
}
}

|