创建一个实体类
public class SensorTemperature {
private String id;
private Long timesatmp;
private Double temperature;
public SensorTemperature() {
}
public SensorTemperature(String id, Long timesatmp, Double temperature) {
this.id = id;
this.timesatmp = timesatmp;
this.temperature = temperature;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimesatmp() {
return timesatmp;
}
public void setTimesatmp(Long timesatmp) {
this.timesatmp = timesatmp;
}
public Double getTemperature() {
return temperature;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "SensorTemperature{" +
"id='" + id + '\'' +
", timesatmp=" + timesatmp +
", temperature=" + temperature +
'}';
}
}
从文本读取数据
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;
import java.util.Arrays;
public class StreamCount {
? ? public static void main(String[] args) throws Exception { ? ?? ?? ??? ? ? ? ? ?StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ?? ??? ? ? ? ? ?DataStream<SensorTemperature> dataStream = env.fromCollection(Arrays.asList(new SensorTemperature("sensor_1",123455665L,35.5), ?? ??? ? ? ? ? ? ? ? ? ?new SensorTemperature("sensor_2",143455665L,36.5), ?? ??? ? ? ? ? ? ? ? ? ?new SensorTemperature("sensor_3",163455665L,37.5), ?? ??? ? ? ? ? ? ? ? ? ?new SensorTemperature("sensor_4",173455665L,38.5)));
?? ??? ? ? ? ? ?DataStream<Integer> integerDataStream = env.fromElements(1,2,3,4);
?? ??? ? ? ? ? ?dataStream.print(); ?? ??? ? ? ? ? ?integerDataStream.print();
?? ??? ? ? ? ? ?env.execute();
? ? } }
|