IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 从topic中过滤数据存入另一个topic -> 正文阅读

[大数据]从topic中过滤数据存入另一个topic

/**
 * @author baowf1
 * @date 2021/8/3 09:33:23
 * @description: TODO:水电数据过滤掉关机和暂停上报的数据数据
 * @project:
 **/
public class WaterPowerFilter {

    private static final Logger logger = LoggerFactory.getLogger(WaterPowerFilter.class);

    public static void main(String[] args) throws Exception {
        //指定kafka集群
        //String kafkaAddress = PropertiesUtil.getPropertyValue(PropertyKey.KAFKA_ADDRESS);
        String kafkaAddress = "121.40.72.22:9092";
        //指定topic列表,同时消费
        ArrayList<String> topics = new ArrayList<>();
        topics.add("d9");
        topics.add("da");
        topics.add("db");
        //指定消费组id
        //String kafkaConsumerGroupId = PropertiesUtil.getPropertyValue(PropertyKey.TOPIC_GROUP_ID);
        String kafkaConsumerGroupId = "water-power-new2";
                //指定checkpoint文件位置
        //String checkpointDir = PropertiesUtil.getPropertyValue(PropertyKey.CHECKPOINT_DIR);
        String checkpointDir = "file:\\D:\\MyData\\checkpoint\\water_power2";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //检查点配置
        env.enableCheckpointing(1800 * 1000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setStateBackend(new FsStateBackend(checkpointDir));

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",kafkaAddress);
        properties.setProperty("group.id",kafkaConsumerGroupId);
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


        //source
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topics, new SimpleStringSchema(), properties);
        File file = new File(checkpointDir);
        if (file.exists()) {
            kafkaSource.setStartFromGroupOffsets();
        } else {
            kafkaSource.setStartFromEarliest();
        }

        DataStreamSource<String> kafkaStream = env.addSource(kafkaSource);

        //transformation
        //输入流转换成json
        SingleOutputStreamOperator<JSONObject> inputObjectStream = kafkaStream.map(JSONObject::parseObject);

        //过滤掉不符合条件的数据
        SingleOutputStreamOperator<JSONObject>  filtered= inputObjectStream.filter((FilterFunction<JSONObject>) jsonObject -> {
            String status = jsonObject.getString("status");
            return !status.equals("关机") || !status.equals("暂停") || !status.equals("待机") || !status.equals("NULL") || !status.equals("故障");
        });


        //抽取事件时间
        filtered.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(JSONObject jsonObject) {
                String timeValue = jsonObject.getString("time");
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
                if (timeValue.length() > 14) {
                     simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmssSSS");
                }
                simpleDateFormat.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));
                try {
                    Date date = simpleDateFormat.parse(timeValue);
                    return date.getTime();
                } catch (ParseException e) {
                    logger.error("parse time value error,timeValue=[{}]",timeValue);
                }
                return new Date().getTime();
            }
        });
        SingleOutputStreamOperator<String> mapped = filtered.map(JSON::toString);

        //sink
        logger.info("开始将过滤出来的数据写入water-power中");
        mapped.addSink(new FlinkKafkaProducer<>(kafkaAddress, "water-power", new SimpleStringSchema()));

        env.execute("FilteredData1");

    }
}

错误日志

11:23:20.946 [Source: Custom Source -> Map -> Filter -> (Timestamps/Watermarks, Map -> Sink: Unnamed) (3/4)] INFO  o.a.k.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 0 ms.
11:23:20.946 [Source: Custom Source -> Map -> Filter -> (Timestamps/Watermarks, Map -> Sink: Unnamed) (3/4)] INFO  o.a.k.clients.producer.KafkaProducer - [Producer clientId=producer-1] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.
11:23:20.957 [Source: Custom Source -> Map -> Filter -> (Timestamps/Watermarks, Map -> Sink: Unnamed) (3/4)] WARN  o.a.flink.runtime.taskmanager.Task - Source: Custom Source -> Map -> Filter -> (Timestamps/Watermarks, Map -> Sink: Unnamed) (3/4) (3d387284cb2715b565917dc73f4accfa) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
	at com.littleswan.bigdata.online.waterandelectricity.filter.WaterPowerFilter.lambda$main$992cca04$1(WaterPowerFilter.java:94)
	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-27 11:55:47  更:2021-08-27 11:56:46 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:42:47-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码