IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink-侧边流 -> 正文阅读

[大数据]Flink-侧边流

侧边流

废话少说,上代码

测试数据:
{"data":{"order_id":"100001","user_id":"A001","amount":"10.2","create_time":"2021-07-08 00:59:59"},"table_name":"order_info"}
{"data":{"order_id":"100001","sku_id":"11","sku_name":"可乐500ml","amount":"3.0","create_time":"2021-07-08 01:00:00"},"table_name":"order_detail_info"}
{"data":{"order_id":"100001","sku_id":"12","sku_name":"可乐1.5L","amount":"7.2","create_time":"2021-07-08 01:00:00"},"table_name":"order_detail_info"}
{"table_name":"xxx"}
使用nc 来虚拟数据流
nc -lk 9999
import com.alibaba.fastjson.JSONObject;
import com.xxx.demo.pojo.OrderDetailInfoDemo;
import com.xxx.demo.pojo.OrderInfoDemo;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class Demo {

    public static void main(String[] args) throws Exception{
        // 创建执行环境 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


		
        final  OutputTag<OrderInfoDemo> orderInfoDemoOutputTag = new OutputTag<OrderInfoDemo>("订单数据"){};
        final  OutputTag<OrderDetailInfoDemo> orderDetailInfoDemoOutputTag = new OutputTag<OrderDetailInfoDemo>("订单详情数据"){};

        DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 9999).setParallelism(1);

        //判断数据是否为空
        SingleOutputStreamOperator<JSONObject> process = socketTextStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String data) throws Exception {
                boolean flag = false;

                //判断是否为空
                if (!data.isEmpty()) {
                    flag = true;
                }
                return flag;
            }
        }).map(new MapFunction<String, JSONObject>() {
            @Override
            public JSONObject map(String value) throws Exception {
            	// 转化为json数据
                return JSONObject.parseObject(value);
            }
        }).process(new ProcessFunction<JSONObject, JSONObject>() {
            @Override
            public void processElement(JSONObject jsonData, Context context, Collector<JSONObject> collector) throws Exception {
            	// 获取数据
                JSONObject data = jsonData.getJSONObject("data");
                if (jsonData.getString("table_name").equals("order_info")) {
                    OrderInfoDemo orderInfoDemo = new OrderInfoDemo();
                    orderInfoDemo.setUserId(data.getString("user_id"));
                    orderInfoDemo.setOrderId(data.getInteger("order_id"));
                    orderInfoDemo.setAmount(data.getDouble("amount"));
                    orderInfoDemo.setTableName(jsonData.getString("table_name"));
                    orderInfoDemo.setCreateTime(data.getString("create_time"));
                    context.output(orderInfoDemoOutputTag, orderInfoDemo);
                } else if (jsonData.getString("table_name").equals("order_detail_info")) {
                    OrderDetailInfoDemo orderDetailInfoDemo = new OrderDetailInfoDemo();
                    orderDetailInfoDemo.setSkuId(data.getInteger("sku_id"));
                    orderDetailInfoDemo.setSkuName(data.getString("sku_name"));
                    orderDetailInfoDemo.setOrderId(data.getInteger("order_id"));
                    orderDetailInfoDemo.setAmount(data.getDouble("amount"));
                    orderDetailInfoDemo.setTableName(jsonData.getString("table_name"));
                    orderDetailInfoDemo.setCreateTime(data.getString("create_time"));
                    context.output(orderDetailInfoDemoOutputTag, orderDetailInfoDemo);
                }else{
                    collector.collect(jsonData);
                }

            }
        });

        process.getSideOutput(orderInfoDemoOutputTag).print("orderInfo");
        process.getSideOutput(orderDetailInfoDemoOutputTag).print("orderDetailInfo");
        process.print("other");

        env.execute("demo");

    }
}
/**
 * @Classname OrderDetailInfoDemo
 * @Description TODO
 * @Date 2021/7/8 2:36 下午
 * @Created by guopengfei
 */
public class OrderDetailInfoDemo {

    // {"data":{"order_id":"100001","sku_id":"11","sku_name":"可乐500ml","amount":"3.0","create_time":"2021-07-08 01:00:00"},table_name:"order_detail_info"}
    // {"data":{"order_id":"100001","sku_id":"12","sku_name":"可乐1.5L","amount":"7.2","create_time":"2021-07-08 01:00:00"},table_name:"order_detail_info"}
    private int orderId;
    private double amount;
    private int skuId;
    private String skuName;
    private String createTime;
    private String tableName;

    public int getOrderId() {
        return orderId;
    }

    public void setOrderId(int orderId) {
        this.orderId = orderId;
    }

    public double getAmount() {
        return amount;
    }

    public void setAmount(double amount) {
        this.amount = amount;
    }

    public int getSkuId() {
        return skuId;
    }

    public void setSkuId(int skuId) {
        this.skuId = skuId;
    }

    public String getSkuName() {
        return skuName;
    }

    public void setSkuName(String skuName) {
        this.skuName = skuName;
    }

    public String getCreateTime() {
        return createTime;
    }

    public void setCreateTime(String createTime) {
        this.createTime = createTime;
    }

    public String getTableName() {
        return tableName;
    }

    public void setTableName(String tableName) {
        this.tableName = tableName;
    }

    @Override
    public String toString() {
        return "OrderDetailInfoDemo{" +
                "orderId=" + orderId +
                ", amount=" + amount +
                ", skuId=" + skuId +
                ", skuName='" + skuName + '\'' +
                ", createTime='" + createTime + '\'' +
                ", tableName='" + tableName + '\'' +
                '}';
    }
}
/**
 * @Classname OrderInfoDemo
 * @Description TODO
 * @Date 2021/7/8 2:37 下午
 * @Created by guopengfei
 */
public class OrderInfoDemo {

    // {"data":{"order_id":"100001","user_id":"A001","amount":"10.2","create_time":"2021-07-08 00:59:59"},table_name:"order_info"}
    private int orderId;
    private String userId;
    private double amount;
    private String tableName;
    private String createTime;

    public String getCreateTime() {

        return createTime;
    }

    public void setCreateTime(String createTime) {
        this.createTime = createTime;
    }

    public int getOrderId() {
        return orderId;
    }

    public void setOrderId(int orderId) {
        this.orderId = orderId;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public double getAmount() {
        return amount;
    }

    public void setAmount(double amount) {
        this.amount = amount;
    }

    public String getTableName() {
        return tableName;
    }

    public void setTableName(String tableName) {
        this.tableName = tableName;
    }

    public OrderInfoDemo() {
    }

    @Override
    public String toString() {
        return "OrderInfoDemo{" +
                "orderId=" + orderId +
                ", userId='" + userId + '\'' +
                ", amount=" + amount +
                ", tableName='" + tableName + '\'' +
                ", createTime='" + createTime + '\'' +
                '}';
    }
}

注意:
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-09 17:33:49  更:2021-07-09 17:36:01 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 14:33:05-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码