package risen.source
import java.util
import java.util.Properties
import com.google.gson.JsonObject
import com.google.gson.JsonParser
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource, SingleOutputStreamOperator}
import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kudu.client.SessionConfiguration.FlushMode
import org.apache.kudu.client._
import risen.util.ParameterToolUtil
import scala.collection.JavaConversions._
object KafkaSource {
def main(args: Array[String]): Unit = {
//配置环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//全局参数
val parameterTool: ParameterTool = ParameterToolUtil.getParameter
env.getConfig.setGlobalJobParameters(parameterTool)
enableCheckPoint(env)
//kafka消费者
val consumer: FlinkKafkaConsumer[String] = buildFlinkKafkaConsumer(parameterTool)
//consumer.setStartFromLatest()
consumer.setStartFromLatest()
val kafkaStream: DataStreamSource[String] = env.addSource(consumer)
kafkaStream.map(new MapFunction[String, JsonObject] {
override def map(t: String): JsonObject = {
println(t)
JsonParser.parseString(t).getAsJsonObject
}
}).addSink(new RichSinkFunction[JsonObject] {
var client: KuduClient = null
var session: KuduSession = null
var count: Int = 0
override def open(parameters: Configuration): Unit = {
client = new KuduClient.KuduClientBuilder(parameterTool.get("kudu.master")).build
session = client.newSession
session.setFlushMode(FlushMode.MANUAL_FLUSH)
session.setMutationBufferSpace(parameterTool.getInt("kudu.flush.buffer.bytes"))
session.setFlushInterval(parameterTool.getInt("kudu.flush.ms"))
}
override def invoke(value: JsonObject, context: SinkFunction.Context): Unit = {
println("开始写数据")
val source: JsonObject = JsonParser.parseString(value.get("source").toString).getAsJsonObject
val after: JsonObject = JsonParser.parseString(value.get("after").toString).getAsJsonObject
val tableName = source.get("table").getAsString
//val table: KuduTable = client.openTable("impala::ld_yq_bigdata.ods_" + tableName)
val table: KuduTable = client.openTable(parameterTool.get("mysql.table.prefix") + tableName)
println(parameterTool.get("mysql.table.prefix") + tableName)
val upsert: Upsert = table.newUpsert()
// 设置字段内容
val afterValue: util.Iterator[String] = after.keySet().iterator()
count += 1
while (afterValue.hasNext) {
val key = afterValue.next()
var com = key.toLowerCase()
//正则表达式可能有误差,直接查kudu的类型即可
//if (Pattern.compile("\\d*").matcher(after.get(key).toString).matches())
if (after.get(key).isJsonNull)
upsert.getRow.isNull(com)
else {
if (table.getSchema.getColumn(com).getType.toString.split(" ")(1) == "string")
upsert.getRow.addString(com, after.get(key).getAsString)
else {
if (table.getSchema.getColumn(com).getType.toString.split(" ")(1) == "int64")
upsert.getRow.addLong(com, after.get(key).getAsLong)
else
upsert.getRow.addInt(com, after.get(key).getAsInt)
}
}
}
session.apply(upsert)
if (count >= parameterTool.getInt("kudu.flush.buffer.bytes") / 2) {
session.flush
}
}
override def close(): Unit = {
session.close()
client.close()
}
}
).setParallelism(5)
env.execute()
}
def buildFlinkKafkaConsumer(parameter: ParameterTool) = {
val properties = new Properties
properties.setProperty("bootstrap.servers", parameter.get("kafka.bootstrap.servers"))
properties.setProperty("group.id", parameter.get("kafka.group.id"))
properties.setProperty("request.timeout.ms", parameter.get("kafka.request.timeout.ms"))
properties.setProperty("fetch.max.bytes", parameter.get("kafka.fetch.max.bytes"))
val topic: String = parameter.get("kafka.bootstrap.topic")
//new FlinkKafkaConsumer[String](java.util.regex.Pattern.compile(topic + ".*"), new SimpleStringSchema, properties)
new FlinkKafkaConsumer[String](topic.split(",").toList, new SimpleStringSchema, properties)
}
def enableCheckPoint(env: StreamExecutionEnvironment): Unit = {
env.enableCheckpointing(1000 * 60 * 3)
val chkConfig: CheckpointConfig = env.getCheckpointConfig
chkConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
chkConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
chkConfig.setMinPauseBetweenCheckpoints(1000 * 60 * 2)
chkConfig.setCheckpointTimeout(1000 * 60 * 15)
chkConfig.setMaxConcurrentCheckpoints(5)
chkConfig.setTolerableCheckpointFailureNumber(500)
}
}
读配置文件的工具类
package risen.util;
import org.apache.flink.api.java.utils.ParameterTool;
import java.io.IOException;
import static org.apache.kafka.common.requests.DeleteAclsResponse.log;
public class ParameterToolUtil {
private static final String PROPERTIES_FILE_PATH = "/application.properties";
/**
* ParameterTool全局参数
*
* @return
*/
public static ParameterTool getParameter() {
try {
return ParameterTool
.fromPropertiesFile(ParameterToolUtil.class.getResourceAsStream(PROPERTIES_FILE_PATH))
.mergeWith(ParameterTool.fromSystemProperties());
} catch (IOException e) {
log.error("获取ParameterTool全局参数异常");
}
return ParameterTool.fromSystemProperties();
}
}
|