import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
public class FileSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
String path="C:\\Users\\ss\\Desktop\\files";
TextInputFormat textInputFormat = new TextInputFormat(null);
textInputFormat.setFilesFilter(new FilePathFilter() {
@Override
public boolean filterPath(Path filePath) {
return filePath.getName().endsWith(".txt") == false;
}
});
DataStream<String> dataStreamSource = env.readFile(textInputFormat, path, FileProcessingMode.PROCESS_CONTINUOUSLY,30000);
dataStreamSource.print();
env.execute();
}
}
|