1. 写入到 kafka
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
package com.atguigu.apitest.sinttest
import java.util.Properties
import com.atguigu.apitest.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
object KafkaSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 从kafka读取数据
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
val stream = env.addSource( new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties) )
// 先转换成样例类类型(简单转换操作)
val dataStream = stream
.map( data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString
} )
// 写入到kafka
dataStream.addSink( new FlinkKafkaProducer011[String]("localhost:9092", "sinktest", new SimpleStringSchema()) )
env.execute("kafka sink test")
}
}
2. 写入到?Redis
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
package com.atguigu.apitest.sinttest
import com.atguigu.apitest.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
object RedisSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 读取数据
val inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
val inputStream = env.readTextFile(inputPath)
// 先转换成样例类类型(简单转换操作)
val dataStream = inputStream
.map( data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
} )
// 定义一个FlinkJedisConfigBase
val conf = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build()
dataStream.addSink( new RedisSink[SensorReading]( conf, new MyRedisMapper ) )
env.execute("redis sink test")
}
}
// 定义一个RedisMapper
class MyRedisMapper extends RedisMapper[SensorReading]{
// 定义保存数据写入redis的命令,HSET 表名 key value
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "sensor_temp")
}
// 将温度值指定为value
override def getValueFromData(data: SensorReading): String = data.temperature.toString
// 将id指定为key
override def getKeyFromData(data: SensorReading): String = data.id
}
3.?写入到 Es
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.10.1</version>
</dependency>
package com.atguigu.apitest.sinttest
import java.util
import com.atguigu.apitest.SensorReading
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
object EsSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 读取数据
val inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
val inputStream = env.readTextFile(inputPath)
// 先转换成样例类类型(简单转换操作)
val dataStream = inputStream
.map(data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
})
// 定义HttpHosts
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("localhost", 9200))
// 自定义写入es的EsSinkFunction
val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReading] {
override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
// 包装一个Map作为data source
val dataSource = new util.HashMap[String, String]()
dataSource.put("id", t.id)
dataSource.put("temperature", t.temperature.toString)
dataSource.put("ts", t.timestamp.toString)
// 创建index request,用于发送http请求
val indexRequest = Requests.indexRequest()
.index("sensor")
.`type`("readingdata")
.source(dataSource)
// 用indexer发送请求
requestIndexer.add(indexRequest)
}
}
dataStream.addSink(new ElasticsearchSink
.Builder[SensorReading](httpHosts, myEsSinkFunc)
.build()
)
env.execute("es sink test")
}
}
4. 写入到自定义JDBC
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
package sinkTest
import java.sql.{Connection, DriverManager, PreparedStatement}
import apiTest.{MySensorSource, SensorReading}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala._
object JdbcSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new MySensorSource())
stream.addSink( new MyJdbcSinkFunc() )
env.execute("jdbc sink test")
}
}
class MyJdbcSinkFunc() extends RichSinkFunction[SensorReading]{
// 定义连接、预编译语句
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
insertStmt = conn.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)")
updateStmt = conn.prepareStatement("update sensor_temp set temp = ? where id = ?")
}
override def invoke(value: SensorReading): Unit = {
// 先执行更新操作,查到就更新
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
updateStmt.execute()
// 如果更新没有查到数据,那么就插入
if( updateStmt.getUpdateCount == 0 ){
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
}
}
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
|