{
"messageHeader" : {
"messageID" : 2,
"responseFlag" : -2
},
"messageBody" : {
"time" : "2021-09-09T12:45:48",
"vehicleBase" : {
"id" : null,
"vin" : null,
"vehicleStatus" : 1,
"chargeStatus" : 3
},
"alarmData" : {
"alarmFlag" : 0
},
"drivingMotorList" : [ {
"id" : null,
"vin" : null,
"sn" : 1,
"status" : 4,
"controllerTemp" : 62.0
} ],
"socStatusList" : {
"id" : null,
"vin" : null,
"sn" : 1,
"voltage" : 384.0,
"current" : 7.0
},
"extremumData" : {
"id" : null,
"vin" : null,
"highestVoltBatteryPkgSn" : 1,
"highestVoltCellSn" : 48
}
}
}
CREATE TABLE KafkaTable (
messageHeader row<messageID int,responseFlag int>,
messageBody row<vehicleBase row<vehicleStatus int>,alarmData row<alarmLevel int>>
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
CREATE TABLE sinkTable (
obj ROW<time1 TIME,str STRING,lg BIGINT>,
arr ARRAY<ROW<f1 STRING,f2 INT>>,
map1 MAP<STRING,STRING>
) WITH (
'connector' = 'filesystem',
'path' = 'file:///load/data/test',
'format' = 'json'
);
insert into sinkTable (obj,arr,map1)
select Row(CURRENT_TIME,'ss',123) as obj,Array[Row('f',1),Row('s',2)] as arr,Map['k1','v1','k2','v2'] as map1
from KafkaTable;
|