1. 依赖&版本
Flink 与 Kafka 的一些方法调用跟其版本有关, 下面是我使用的版本
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<maven.compiler.compilerVersion>8</maven.compiler.compilerVersion>
<flink.version>1.12.1</flink.version>
<scala.version>2.12.7</scala.version>
<kafka.version>2.4.1</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
2. Flink 读取 Kafka
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStreamSource
import java.util.Properties
Object Test {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaSubTopic = "topic-test"
val kafkaGroupId = "group-test"
val stream: DataStreamSource[String] = readKafka(env, kafkaSubTopic, kafkaGroupId, "latest")
stream.print()
env.execute()
}
def readKafka(
env: StreamExecutionEnvironment,
topic: String,
groupId: String,
offset: String = "latest"
)= {
props.put("bootstrap.servers", "{ip}:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", offset)
env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), props))
}
}
3. Flink 写入 Kafka
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, Callback, RecordMetadata}
import com.alibaba.fastjson.JSON
import java.util.Properties
Object Test {
def main(args: Array[String]): Unit = {
val producer = new MyKafkaProducer("Test")
val data = new JSONObject()
data.put("a", "1")
producer.send(data.toString())
}
class MyKafkaProducer(val topic: String) {
private val props = getProps()
private val producer = new KafkaProducer[String, String](props)
def send(data: String): Unit = {
val record = new ProducerRecord[String, String](topic, data)
producer.send(record, new ProducerCallBack)
}
class ProducerCallBack extends Callback {
def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception != null) exception.printStackTrace()
}
}
}
}
|