在Flink流式程序设计中,经常需要与外部系统进行交互,很多时候外部系统的性能会成为任务整体吞吐的瓶颈,通常的解决方案会通过提高任务并发度增加对外部系统并发访问,如此会带来Flink额外的资源管理负载以及整体cpu利用率不高的问题。
对于Flink与外部存储交互的场景,可以通过Flink 异步IO和单并发度多线程的机制提高任务吞吐能力,而不需要提高任务并发度从而提升整体资源利用率。
一 Flink异步IO
对于Flink程序,通常的交互实现为同步请求,即发送一个请求,直到收到响应,继续处理,很多情况下这种等待占据了函数的绝大多数时间,当外部系统出现性能瓶颈会大幅降低任务的吞吐能力。Flink提供了异步IO机制,可以实现发送请求以后,不用等待结果返回继续发送下一个请求,对于查询结果是异步返回的,返回结果之后会自动进入下一个算子的计算,从而避免外部系统性能对整个计算任务的影响,可以提高整体吞吐和资源利用率。
示例代码:
public class AsyncHbase extends RichAsyncFunction<String, String> {
private transient HbaseClient client;
private transient ExecutorService executorService;
@Override
public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
try {
executorService.submit(() -> {
ObjectMapper mapper=new ObjectMapper();
String imei = null;
try {
imei = (String)mapper.readValue(input, HashMap.class).get("user_id");
} catch (IOException e) {
e.printStackTrace();
}
String user = client.query(imei);
resultFuture.complete(Collections.singletonList(user));
});
} catch (Exception e) {
throw new RuntimeException("get from mysql fail", e);
}
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
client = new HbaseClient();
client.init("hdp_teu_dpd:hdp_teu_dpd_flink_iotest");
executorService = Executors.newFixedThreadPool(30);
}
@Override
public void close() throws Exception {
super.close();
executorService.shutdown();
}
}
public class HbaseClient {
private static Configuration conf =null;
private static final String ZKconnect="10.162.12.102:2181";
public void init (String tableName) throws IOException {
conf= HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", ZKconnect);
htable=new HTable(conf, Bytes.toBytes(tableName));
}
private HTable htable;
public String query(String id) {
String user = "";
try {
Get get=new Get(Bytes.toBytes(id));
get.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("user"));
Result result=htable.get(get);
byte[] resByte = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("user"));
user = Bytes.toString(resByte);
if(user == null){
user = "";
}
} catch (Exception e) {
e.printStackTrace();
}
return user;
}
}
Flink 异步IO的实现需要外部系统支持异步client,对于不支持异步client的系统,可以采用多线程机制替代实现。
二 单并发度多线程
Flink 异步IO主要针对从外部系统读取数据,对于写数据的场景,可以在Sink端实现多线程的方式
示例代码:
public class SinkToMySQLMultiThread extends RichSinkFunction<Data> implements CheckpointedFunction {
private Logger LOG = LoggerFactory.getLogger(SinkToMySQLMultiThread.class);
private final int DEFAULT_CLIENT_THREAD_NUM = 5;
private final int DEFAULT_QUEUE_CAPACITY = 5000;
private LinkedBlockingQueue<Data> bufferQueue;
private CyclicBarrier clientBarrier;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DEFAULT_CLIENT_THREAD_NUM, DEFAULT_CLIENT_THREAD_NUM,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
this.bufferQueue = Queues.newLinkedBlockingQueue(DEFAULT_QUEUE_CAPACITY);
this.clientBarrier = new CyclicBarrier(DEFAULT_CLIENT_THREAD_NUM + 1);
MultiThreadConsumerClient consumerClient = new MultiThreadConsumerClient(bufferQueue, clientBarrier);
for (int i = 0; i < DEFAULT_CLIENT_THREAD_NUM; i++) {
threadPoolExecutor.execute(consumerClient);
}
}
@Override
public void invoke(Data value, Context context) throws Exception {
bufferQueue.put(value);
}
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
LOG.info("snapshotState : 所有的 client 准备 flush !!!");
clientBarrier.await();
}
@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
}
}
public class MultiThreadConsumerClient implements Runnable {
private Logger LOG = LoggerFactory.getLogger(MultiThreadConsumerClient.class);
private static String jdbcUrl = "jdbc:mysql://nightfury-test.db.58dns.org:23832/dbwww58com_nightfury?useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false";
private static String username = "nightfury_user";
private static String password = "6d436b6156ebe38b";
private static String driver = "com.mysql.jdbc.Driver";
private LinkedBlockingQueue<Data> bufferQueue;
private CyclicBarrier barrier;
private Connection connection = null;
private PreparedStatement ps = null;
public MultiThreadConsumerClient(
LinkedBlockingQueue<Data> bufferQueue, CyclicBarrier barrier) {
this.bufferQueue = bufferQueue;
this.barrier = barrier;
}
@Override
public void run() {
try {
Class.forName(driver);
connection = DriverManager.getConnection(jdbcUrl, username, password);
String sql = "insert into mutiflinktest(id,behavior,category_id,item_id)values(?,?,?,?);";
ps = connection.prepareStatement(sql);
int batchSize = 0;
Data entity;
while (true) {
entity = bufferQueue.poll(50, TimeUnit.MILLISECONDS);
if (entity != null) {
System.out.println(Thread.currentThread().getName());
System.out.println(batchSize);
dobatch(entity);
batchSize++;
if (batchSize > 5) {
ps.executeBatch();
batchSize = 0;
}
} else {
if (barrier.getNumberWaiting() > 0) {
LOG.info("MultiThreadConsumerClient 执行 flush, " + "当前 wait 的线程数:" + barrier.getNumberWaiting());
ps.executeBatch();
batchSize = 0;
barrier.await();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void dobatch(Data entity) throws Exception {
ps.setInt(1, entity.getUser_id());
ps.setString(2, entity.getBehavior());
ps.setString(3, entity.getCategory_id());
ps.setString(4, entity.getItem_id());
ps.addBatch();
}
public class MultiThreadMysqlFlink {
public static void main(String args[]) throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "xxx:9092");
props.setProperty("zookeeper.connect", "xxxx:2181/58_kafka_cluster");
props.setProperty("group.id", "flink-kafka_sync_v1");
env.setParallelism(1);
env.enableCheckpointing(1000*60*3);
FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011("hdp_teu_dpd_flink", new SimpleStringSchema(), props);
DataStream<Data> stream = env.addSource(consumer).map(new RichMapFunction<String, Data>() {
@Override
public Data map(String input) throws Exception {
Data data = new Data();
int user_id = 1;
String item_id ="";
String category_id = "";
String behavior = "" ;
data.setUser_id(user_id);
data.setBehavior(behavior);
data.setCategory_id(category_id);
data.setItem_id(item_id);
return data;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
});
stream.addSink(new SinkToMySQLMultiThread());
env.execute("MultiThreadMysqlFlink");
}
}
|