Sink有下沉的意思,在Flink中所谓的Sink其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出操作. 之前我们一直在使用的print方法其实就是一种Sink
Flink内置了一些Sink, 除此之外的Sink需要用户自定义!
一、 KafkaSink
public class Flink01_KafkaSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputDS = env.socketTextStream("localhost", 9999);
// TODO Sink - kafka
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
"flink0923",
new SimpleStringSchema(),
properties
);
inputDS.addSink(kafkaSink);
env.execute();
}
}
二、RedisSink
public class Flink02_RedisSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputDS = env.socketTextStream("localhost", 9999);
// TODO Sink - Redis
FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost("hadoop102")
.setPort(6379)
.build();
RedisSink<String> redisSink = new RedisSink<>(
flinkJedisPoolConfig,
new RedisMapper<String>() {
@Override
public RedisCommandDescription getCommandDescription() {
// 第一个参数:redis命令的封装
// 第二个参数:redis 最外层的 key
return new RedisCommandDescription(RedisCommand.HSET, "flink0923");
}
/* 从数据里提取key,如果是 Hash结构,那么key就是hash的key
*/
@Override
public String getKeyFromData(String data) {
return data.split(",")[1];
}
// 从数据里提取value,如果是 hash结构,那么 value就是hash的value
@Override
public String getValueFromData(String data) {
return data.split(",")[2];
}
}
);
inputDS.addSink(redisSink);
env.execute();
}
}
三、ElasticsearchSink
public class Flink03_EsSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputDS = env.socketTextStream("hadoop102", 9999);
// TODO Sink - ElasticSearch
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("hadoop102", 9200));
httpHosts.add(new HttpHost("hadoop103", 9200));
httpHosts.add(new HttpHost("hadoop104", 9200));
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
Map<String, String> dataMap = new HashMap<>();
dataMap.put("data", element);
// ESAPI的写法
IndexRequest indexRequest = Requests.indexRequest("flink0923").type("dasfgdasf").source(dataMap);
indexer.add(indexRequest);
}
}
);
// TODO 为了演示,bulk设为1,生产环境不要这么设置
esSinkBuilder.setBulkFlushMaxActions(1);
inputDS.addSink(esSinkBuilder.build());
env.execute();
}
}
/*
ES 5.x : index -》 库, type -》 表
ES 6.x : 每个 index 只能有 一个 type,所以可以认为 index是一个 表
ES 7.x : 移除了 Type
url查看index:
查看 index列表:http://hadoop102:9200/_cat/indices?v
查看 index内容:http://hadoop102:9200/flink0923/_search
*/
四、自定义Sink:MyHbaseSink
- 一、MyHbaseSink
- 1、继承RichSinkFunction<输入的数据类型>类
- 2、实现open方法,创建连接对象
- 3、实现invoke方法,批次写入数据到Hbase
- 4、实现close方法,关闭连接
1、继承RichSinkFunction<输入的数据类型>类
public class MyHbaseSink extends RichSinkFunction<Tuple2<String, Double>> {
private transient Integer maxSize = 1000;
private transient Long delayTime = 5000L;
public MyHbaseSink() {
}
public MyHbaseSink(Integer maxSize, Long delayTime) {
this.maxSize = maxSize;
this.delayTime = delayTime;
}
private transient Connection connection;
private transient Long lastInvokeTime;
private transient List<Put> puts = new ArrayList<>(maxSize);
?2、实现open方法,创建连接对象
// 创建连接
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 获取全局配置文件,并转为ParameterTool
ParameterTool params =
(ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
//创建一个Hbase的连接
connection = HBaseUtil.getConnection(
params.getRequired("hbase.zookeeper.quorum"),
params.getInt("hbase.zookeeper.property.clientPort", 2181)
);
// 获取系统当前时间
lastInvokeTime = System.currentTimeMillis();
}
3、实现invoke方法,批次写入数据到Hbase
@Override
public void invoke(Tuple2<String, Double> value, Context context) throws Exception {
String rk = value.f0;
//创建put对象,并赋rk值
Put put = new Put(rk.getBytes());
// 添加值:f1->列族, order->属性名 如age, 第三个->属性值 如25
put.addColumn("f1".getBytes(), "order".getBytes(), value.f1.toString().getBytes());
puts.add(put);// 添加put对象到list集合
//使用ProcessingTime
long currentTime = System.currentTimeMillis();
//开始批次提交数据
if (puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) {
//获取一个Hbase表
Table table = connection.getTable(TableName.valueOf("database:table"));
table.put(puts);//批次提交
puts.clear();
lastInvokeTime = currentTime;
table.close();
}
}
4、实现close方法,关闭连接
@Override
public void close() throws Exception {
connection.close();
}
二、HBaseUtil工具类
public class HBaseUtil {
/**
* @param zkQuorum zookeeper地址,多个要用逗号分隔
* @param port zookeeper端口号
* @return connection
*/
public static Connection getConnection(String zkQuorum, int port) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zkQuorum);
conf.set("hbase.zookeeper.property.clientPort", port + "");
Connection connection = ConnectionFactory.createConnection(conf);
return connection;
}
}
四、自定义Sink:Mysql?
我们自定义一个到Mysql的Sink
public class Flink04_MySink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputDS = env.socketTextStream("hadoop102", 9999);
SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
// 切分
String[] line = value.split(",");
return new WaterSensor(line[0], Long.valueOf(line[1]), Integer.valueOf(line[2]));
}
});
// TODO Sink - 自定义:MySQL
sensorDS.addSink(new MySinkFunction());
env.execute();
}
public static class MySinkFunction extends RichSinkFunction<WaterSensor> {
Connection conn;
PreparedStatement pstmt;
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test", "root", "000000");
pstmt = conn.prepareStatement("insert into sensor values (?,?,?)");
}
@Override
public void close() throws Exception {
if (pstmt != null) {
pstmt.close();
}
if (conn != null){
conn.close();
}
}
@Override
public void invoke(WaterSensor value, Context context) throws Exception {
pstmt.setString(1, value.getId());
pstmt.setLong(2, value.getTs());
pstmt.setInt(3, value.getVc());
pstmt.execute();
}
}
}
|