数据处理语义的概念
批处理:批处理任务失败,支持replay重跑就可 流处理:需要机制能够保证任务出错或数据出错时能够保证数据是正确有效的。
数据出现故障了(计算错误),还能进行恢复并重新计算,保证数据有效处理一次. 在计算时:比如1+2+3+4+,在+5的时候报错,恢复后重新从+5开始计算,所以我们需要把中间结果通过checkpoint存起来在写出端,出现写入错误,重写时会出现重复,需要幂等写入或事务机制
数据处理语义的分类
- At-least-Once 至少一次语义,允许重复
- At-Most-Once 至多一次语义,不允许重复,可以丢失
- Exactly-Once 精确一致性 ,数据被不多不少有效的处理一次
处理仅一次语义的方式(at least once)
-
at least once + 去重 -
at least once + 幂等
幂等:处理数据的次数和处理数据本身不影响。 比如 1 ^ N = 1, 幂等性实现如下2种 ①插入数据时,进行upsert操作,当前key存在就update更新 ,否则就插入 ② HBase、redis 根据 key插入时,不考虑版本,基于key,value只有一个
- 分布式快照 (checkpoint) + 事务机制(二段提交)
分布式快照: checkpoint将 读入source,处理 transformation,写出sink等中间结果都保存在checkpoint
事务机制: 预提交+正式提交(回滚)
事务机制(二段提交),先预提交到底层的预写文件中事务成功了,正式提交,提交失败了就回滚.目前,oracle0.11+,kafka,mysql都支持事务机制
端对端的仅一次实现
source
记录每次消费的位置,将 offset 保存到指定位置 checkpoint 中
transformation
算子的状态要保存到 checkpoint 中,定期保存,如果程序出现bug,从checkpoint中快速恢复,恢复到之前的最新一次快照。
sink
事务机制 Flink借鉴了数据库中的事务处理技术,同时结合自身的Checkpoint机制来保证Sink只对外部输出产生一次影响。
Flink 事务写提供两个方式(具体实现方式) 预写日志(Write-Ahead-Log,WAL)和 两阶段提交(Two-Phase-Commit,2PC) ; 两种方式区别: WAL方式通用性更强,适合几乎所有外部系统,但也不能提供百分百端到端的Exactly-Once,因为WAL预习日志会先写内存,而内存是易失介质。 如果外部系统自身就支持事务(比如MySQL、Kafka),可以使用2PC方式,可以提供百分百端到端的Exactly-Once。 事务写入缺点:事务写的方式能提供端到端的Exactly-Once一致性,它的代价也是非常明显的,就是牺牲了延迟。但输出数据不再是实时写入到外部系统,而 是分批次地提交。 幂等写入 幂等写操作是指:任意多次向一个系统写入数据,只对目标系统产生一次结果影响。 比如,重复向一个HashMap里插入同一个Key-Value二元对,第一次插入时这个HashMap发生变化,后续的插入操作不会改变HashMap的结果,这就是一个幂等写操作。 HBase、Redis和Cassandra这样的KV数据库一般经常用来作为Sink,用以实现端到端的Exactly-Once。 注意:并不是说一个KV数据库就百分百支持幂等写。幂等写对KV对有要求,那就是Key-Value必须是可确定性(Deterministic)计算的。假如我们设计的Key是:name+ curTimestamp,每次执行数据重发时,生成的Key都不相同,会产生多次结果,整个操作不是幂等的。因此,为了追求端到端的Exactly-Once,我们设计业务逻辑时要尽量使用确定性的计算逻辑和数据模型
Flink-kafka的端对端的仅一次语义
需求 从kafka中读取数据并写入到 kafka ,模拟数据出错,快速恢复并保证数据的端对端仅一次语义
需要外部系统支持事务机制 比如kafka 0.11+ ,普通数据库基本都支持事务机制
- Flink 处理数据到 kafka 或 数据库,需要 TwoPhaseCommitSinkFunction 二段提交,完成四个方法的重写
beginTransaction 开启事务,创建临时的文件,数据库的文件 precommit 将结果数据写入到这个临时文件 commit将临时文件移动到数据库中的文件中 abort 如果当前写入失败,直接删除临时文件 端对端的仅一次语义 Flink 实现方式 :checkpoint + 二段提交
代码实现步骤 1.checkpoint 2.支持事务机制,sink 保证 Semantic.Exactly-Once ,设置参数为 事务的超时时间
准备: linux 开启kafka的生产者和消费者
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic words_output
bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic words_input
详情代码
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.util.Properties;
import java.util.Random;
import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
public class KafkaToKafkaExactlyOnce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000);
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointStorage("file:///d:/chk-02");
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setCheckpointTimeout(1000 * 60);
config.setMaxConcurrentCheckpoints(1);
config.setTolerableCheckpointFailureNumber(10);
config.setMinPauseBetweenCheckpoints(500);
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2 * 1000));
Properties consumer_props = new Properties();
consumer_props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");
consumer_props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "_consumer_word_input_");
consumer_props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumer_props.setProperty(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, 30 * 60 * 1000 + "");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(
"words_input",
new SimpleStringSchema(),
consumer_props
);
consumer.setStartFromLatest();
consumer.setCommitOffsetsOnCheckpoints(true);
DataStreamSource<String> source = env.addSource(consumer);
SingleOutputStreamOperator<Tuple2<String, Integer>> flatMapStream = source
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
Random rm = new Random();
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(",");
int idx = rm.nextInt(5);
System.out.println("当前随机值是:" + idx);
for (String word : words) {
if (idx > 3) {
throw new RuntimeException("除零错误,程序bug");
}
out.collect(Tuple2.of(word, 1));
}
}
});
SingleOutputStreamOperator<String> result = flatMapStream.keyBy(t -> t.f0)
.sum(1)
.map(new MapFunction<Tuple2<String, Integer>, String>() {
@Override
public String map(Tuple2<String, Integer> value) throws Exception {
return value.f0 + ":::" + value.f1;
}
});
Properties producer_props = new Properties();
producer_props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");
producer_props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15 * 1000 + "");
result.addSink(new FlinkKafkaProducer<String>(
"words_output",
new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
return new ProducerRecord(
"words_output",
element.getBytes()
);
}
},
producer_props,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE
));
env.execute();
}
}
Flink-mysql实现端对端的仅一次语义(二段提交)
需求 从socket中读取数据将数据进行wordcount处理,使用二段提交的方式将数据保存到数据库中,如果出现错误,九江数据回滚到之前最新的latest的状态重新提交
代码步骤
准备
nc -lk 9999
create database mysql;
use mysql;
create table t_wordcount(
word char(10) not null,
counts tinyint not null
);
详情代码
public class SocketToMysqlExactlyOnce {
private static Logger Logger = LoggerFactory.getLogger(SocketToMysqlExactlyOnce.class.getSimpleName());
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000L);
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointStorage("file:///d:/chk-03");
config.setCheckpointTimeout(60000);
config.setMaxConcurrentCheckpoints(1);
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setMinPauseBetweenCheckpoints(500);
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStreamSource<String> source = env.socketTextStream("node1", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> result = source.flatMap(new FlatMapFunction<String,
Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
}).keyBy(t -> t.f0)
.sum(1);
result.addSink(new MyTwoPhaseCommitsink());
env.execute();
}
private static class MyTwoPhaseCommitsink extends TwoPhaseCommitSinkFunction<Tuple2<String, Integer>,
ConnectionState, Void> {
public MyTwoPhaseCommitsink() {
super(new KryoSerializer<>(ConnectionState.class, new ExecutionConfig()), VoidSerializer.INSTANCE);
}
public MyTwoPhaseCommitsink(TypeSerializer<ConnectionState> transactionSerializer,
TypeSerializer<Void> contextSerializer) {
super(transactionSerializer, contextSerializer);
}
@Override
protected ConnectionState beginTransaction() throws Exception {
Class.forName("com.mysql.jdbc.Driver");
Connection conn = DriverManager.getConnection(
"jdbc:mysql://node1:3306/mysql?useSSL=false&charactorEncoding=utf-8",
"root",
"123456"
);
conn.setAutoCommit(false);
ConnectionState connectionState = new ConnectionState(conn);
return connectionState;
}
@Override
protected void invoke(ConnectionState transaction, Tuple2<String, Integer> value,
Context context) throws Exception {
Connection conn = transaction.conn;
String sql = "insert into t_wordcount(word,counts) value(?,?) on duplicate key update counts=?";
PreparedStatement ps = conn.prepareStatement(sql);
ps.setString(1, value.f0);
ps.setInt(2, value.f1);
ps.setInt(3, value.f1);
ps.executeUpdate();
if (value.f0.equalsIgnoreCase("error")) {
throw new RuntimeException("程序出现 bug,请检查!");
}
}
@Override
protected void preCommit(ConnectionState connectionState) throws Exception {
Logger.info("当前程序被预提交");
}
@Override
protected void commit(ConnectionState transaction) {
Connection conn = transaction.conn;
try {
conn.commit();
if (!conn.isClosed()) {
conn.commit();
}
} catch (SQLException e) {
Logger.error("当前提交数据异常" + e.getSQLState() + e.getMessage());
}
}
@Override
protected void abort(ConnectionState transaction) {
Connection conn = transaction.conn;
try {
conn.rollback();
} catch (SQLException e) {
Logger.error("当前程序回滚!");
}finally {
try {
if (!conn.isClosed()){
conn.close();
}
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
}
}
*/
/*
static class ConnectionState {
private transient Connection conn;
public ConnectionState(Connection _conn) {
this.conn = _conn;
}
}
}
|