/**
* @author baowf1
* @date 2021/8/3 09:33:23
* @description: TODO:水电数据过滤掉关机和暂停上报的数据数据
* @project:
**/
public class WaterPowerFilter {
private static final Logger logger = LoggerFactory.getLogger(WaterPowerFilter.class);
public static void main(String[] args) throws Exception {
//指定kafka集群
//String kafkaAddress = PropertiesUtil.getPropertyValue(PropertyKey.KAFKA_ADDRESS);
String kafkaAddress = "121.40.72.22:9092";
//指定topic列表,同时消费
ArrayList<String> topics = new ArrayList<>();
topics.add("d9");
topics.add("da");
topics.add("db");
//指定消费组id
//String kafkaConsumerGroupId = PropertiesUtil.getPropertyValue(PropertyKey.TOPIC_GROUP_ID);
String kafkaConsumerGroupId = "water-power-new2";
//指定checkpoint文件位置
//String checkpointDir = PropertiesUtil.getPropertyValue(PropertyKey.CHECKPOINT_DIR);
String checkpointDir = "file:\\D:\\MyData\\checkpoint\\water_power2";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//检查点配置
env.enableCheckpointing(1800 * 1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new FsStateBackend(checkpointDir));
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers",kafkaAddress);
properties.setProperty("group.id",kafkaConsumerGroupId);
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//source
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topics, new SimpleStringSchema(), properties);
File file = new File(checkpointDir);
if (file.exists()) {
kafkaSource.setStartFromGroupOffsets();
} else {
kafkaSource.setStartFromEarliest();
}
DataStreamSource<String> kafkaStream = env.addSource(kafkaSource);
//transformation
//输入流转换成json
SingleOutputStreamOperator<JSONObject> inputObjectStream = kafkaStream.map(JSONObject::parseObject);
//过滤掉不符合条件的数据
SingleOutputStreamOperator<JSONObject> filtered= inputObjectStream.filter((FilterFunction<JSONObject>) jsonObject -> {
String status = jsonObject.getString("status");
return !status.equals("关机") || !status.equals("暂停") || !status.equals("待机") || !status.equals("NULL") || !status.equals("故障");
});
//抽取事件时间
filtered.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) {
@Override
public long extractTimestamp(JSONObject jsonObject) {
String timeValue = jsonObject.getString("time");
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
if (timeValue.length() > 14) {
simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmssSSS");
}
simpleDateFormat.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));
try {
Date date = simpleDateFormat.parse(timeValue);
return date.getTime();
} catch (ParseException e) {
logger.error("parse time value error,timeValue=[{}]",timeValue);
}
return new Date().getTime();
}
});
SingleOutputStreamOperator<String> mapped = filtered.map(JSON::toString);
//sink
logger.info("开始将过滤出来的数据写入water-power中");
mapped.addSink(new FlinkKafkaProducer<>(kafkaAddress, "water-power", new SimpleStringSchema()));
env.execute("FilteredData1");
}
}
错误日志
11:23:20.946 [Source: Custom Source -> Map -> Filter -> (Timestamps/Watermarks, Map -> Sink: Unnamed) (3/4)] INFO o.a.k.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 0 ms.
11:23:20.946 [Source: Custom Source -> Map -> Filter -> (Timestamps/Watermarks, Map -> Sink: Unnamed) (3/4)] INFO o.a.k.clients.producer.KafkaProducer - [Producer clientId=producer-1] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.
11:23:20.957 [Source: Custom Source -> Map -> Filter -> (Timestamps/Watermarks, Map -> Sink: Unnamed) (3/4)] WARN o.a.flink.runtime.taskmanager.Task - Source: Custom Source -> Map -> Filter -> (Timestamps/Watermarks, Map -> Sink: Unnamed) (3/4) (3d387284cb2715b565917dc73f4accfa) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
at com.littleswan.bigdata.online.waterandelectricity.filter.WaterPowerFilter.lambda$main$992cca04$1(WaterPowerFilter.java:94)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
|