import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ import nn.MyKafkaUtil
object StartupApp { ? def main(args: Array[String]): Unit = { ? ? val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
? ? val kafkaConsumer ?=MyKafkaUtil.getConsumer("gl")
? ? val dstream: DataStream[String] = environment.addSource(kafkaConsumer) ? ? dstream.print() ? ? environment.execute() ? } } ############################ package nn
import java.util.Properties
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object MyKafkaUtil {
// ?val prop = new Properties() ? private val prop = new Properties() ? prop.setProperty("bootstrap.servers","localhost:9092") // ?prop.setProperty("group.id","gmall")
? def getConsumer(topic:String ):FlinkKafkaConsumer[String]= { ? ? val myKafkaConsumer:FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop) ? ? myKafkaConsumer ? } }
|