侧边流
废话少说,上代码
测试数据:
{"data":{"order_id":"100001","user_id":"A001","amount":"10.2","create_time":"2021-07-08 00:59:59"},"table_name":"order_info"}
{"data":{"order_id":"100001","sku_id":"11","sku_name":"可乐500ml","amount":"3.0","create_time":"2021-07-08 01:00:00"},"table_name":"order_detail_info"}
{"data":{"order_id":"100001","sku_id":"12","sku_name":"可乐1.5L","amount":"7.2","create_time":"2021-07-08 01:00:00"},"table_name":"order_detail_info"}
{"table_name":"xxx"}
使用nc 来虚拟数据流
nc -lk 9999
import com.alibaba.fastjson.JSONObject;
import com.xxx.demo.pojo.OrderDetailInfoDemo;
import com.xxx.demo.pojo.OrderInfoDemo;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class Demo {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final OutputTag<OrderInfoDemo> orderInfoDemoOutputTag = new OutputTag<OrderInfoDemo>("订单数据"){};
final OutputTag<OrderDetailInfoDemo> orderDetailInfoDemoOutputTag = new OutputTag<OrderDetailInfoDemo>("订单详情数据"){};
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 9999).setParallelism(1);
SingleOutputStreamOperator<JSONObject> process = socketTextStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String data) throws Exception {
boolean flag = false;
if (!data.isEmpty()) {
flag = true;
}
return flag;
}
}).map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String value) throws Exception {
return JSONObject.parseObject(value);
}
}).process(new ProcessFunction<JSONObject, JSONObject>() {
@Override
public void processElement(JSONObject jsonData, Context context, Collector<JSONObject> collector) throws Exception {
JSONObject data = jsonData.getJSONObject("data");
if (jsonData.getString("table_name").equals("order_info")) {
OrderInfoDemo orderInfoDemo = new OrderInfoDemo();
orderInfoDemo.setUserId(data.getString("user_id"));
orderInfoDemo.setOrderId(data.getInteger("order_id"));
orderInfoDemo.setAmount(data.getDouble("amount"));
orderInfoDemo.setTableName(jsonData.getString("table_name"));
orderInfoDemo.setCreateTime(data.getString("create_time"));
context.output(orderInfoDemoOutputTag, orderInfoDemo);
} else if (jsonData.getString("table_name").equals("order_detail_info")) {
OrderDetailInfoDemo orderDetailInfoDemo = new OrderDetailInfoDemo();
orderDetailInfoDemo.setSkuId(data.getInteger("sku_id"));
orderDetailInfoDemo.setSkuName(data.getString("sku_name"));
orderDetailInfoDemo.setOrderId(data.getInteger("order_id"));
orderDetailInfoDemo.setAmount(data.getDouble("amount"));
orderDetailInfoDemo.setTableName(jsonData.getString("table_name"));
orderDetailInfoDemo.setCreateTime(data.getString("create_time"));
context.output(orderDetailInfoDemoOutputTag, orderDetailInfoDemo);
}else{
collector.collect(jsonData);
}
}
});
process.getSideOutput(orderInfoDemoOutputTag).print("orderInfo");
process.getSideOutput(orderDetailInfoDemoOutputTag).print("orderDetailInfo");
process.print("other");
env.execute("demo");
}
}
public class OrderDetailInfoDemo {
private int orderId;
private double amount;
private int skuId;
private String skuName;
private String createTime;
private String tableName;
public int getOrderId() {
return orderId;
}
public void setOrderId(int orderId) {
this.orderId = orderId;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount) {
this.amount = amount;
}
public int getSkuId() {
return skuId;
}
public void setSkuId(int skuId) {
this.skuId = skuId;
}
public String getSkuName() {
return skuName;
}
public void setSkuName(String skuName) {
this.skuName = skuName;
}
public String getCreateTime() {
return createTime;
}
public void setCreateTime(String createTime) {
this.createTime = createTime;
}
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
@Override
public String toString() {
return "OrderDetailInfoDemo{" +
"orderId=" + orderId +
", amount=" + amount +
", skuId=" + skuId +
", skuName='" + skuName + '\'' +
", createTime='" + createTime + '\'' +
", tableName='" + tableName + '\'' +
'}';
}
}
public class OrderInfoDemo {
private int orderId;
private String userId;
private double amount;
private String tableName;
private String createTime;
public String getCreateTime() {
return createTime;
}
public void setCreateTime(String createTime) {
this.createTime = createTime;
}
public int getOrderId() {
return orderId;
}
public void setOrderId(int orderId) {
this.orderId = orderId;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount) {
this.amount = amount;
}
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public OrderInfoDemo() {
}
@Override
public String toString() {
return "OrderInfoDemo{" +
"orderId=" + orderId +
", userId='" + userId + '\'' +
", amount=" + amount +
", tableName='" + tableName + '\'' +
", createTime='" + createTime + '\'' +
'}';
}
}
注意: 
|