前言
在开发中很多场景下,需要通过读取外部的文件作为输入的数据源进行数据提取,分析和转换,为后续进一步的数据处理做准备,比如读取日志数据等
核心代码
使用flink读取外部数据其实很简单,只需要指定外部数据源的文件路径即可,下面直接贴出代码
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SoureTest2 {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//从环境的集合中获取数据
String path = "E:\\code-self\\flink_study\\src\\main\\resources\\sensor.txt";
DataStreamSource<String> dataStreamSource = env.readTextFile(path);
dataStreamSource.print();
env.execute();
}
}
在工程目录的resources目录下有一个sensor.txt文件,如下图所示
?
运行上面的代码,观察控制台输出结果
?默认情况下,启动之后,flink会开启多个并行的任务来处理,如果希望输出的数据有序,只需要设置一下并行度为1即可
dataStreamSource.print().setParallelism(1);
再次运行,观察结果,
?
?
|