IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink与外部存储交互优化方案 -> 正文阅读

[大数据]Flink与外部存储交互优化方案

在Flink流式程序设计中,经常需要与外部系统进行交互,很多时候外部系统的性能会成为任务整体吞吐的瓶颈,通常的解决方案会通过提高任务并发度增加对外部系统并发访问,如此会带来Flink额外的资源管理负载以及整体cpu利用率不高的问题。

对于Flink与外部存储交互的场景,可以通过Flink 异步IO和单并发度多线程的机制提高任务吞吐能力,而不需要提高任务并发度从而提升整体资源利用率。

一 Flink异步IO

对于Flink程序,通常的交互实现为同步请求,即发送一个请求,直到收到响应,继续处理,很多情况下这种等待占据了函数的绝大多数时间,当外部系统出现性能瓶颈会大幅降低任务的吞吐能力。Flink提供了异步IO机制,可以实现发送请求以后,不用等待结果返回继续发送下一个请求,对于查询结果是异步返回的,返回结果之后会自动进入下一个算子的计算,从而避免外部系统性能对整个计算任务的影响,可以提高整体吞吐和资源利用率。

1604913170230-792.png
示例代码:

public class AsyncHbase  extends RichAsyncFunction<String, String> {

    private transient HbaseClient client;
    private transient ExecutorService executorService;
    @Override
    public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
        try {
            executorService.submit(() -> {
                // submit query
                ObjectMapper mapper=new ObjectMapper();
                String imei = null;
                try {
                    imei = (String)mapper.readValue(input, HashMap.class).get("user_id");
                } catch (IOException e) {
                    e.printStackTrace();
                }

                String user = client.query(imei);
                //System.out.println("----------"+user);
                resultFuture.complete(Collections.singletonList(user));
            });
        } catch (Exception e) {
            //log.error("get from redis fail", e);
            throw new RuntimeException("get from mysql fail", e);
        }

    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        client = new HbaseClient();
        client.init("hdp_teu_dpd:hdp_teu_dpd_flink_iotest");
        //线程池大小
        executorService = Executors.newFixedThreadPool(30);
    }
    //异步客户端

    @Override
    public void close() throws Exception {
        super.close();
        executorService.shutdown();
    }
}

public class HbaseClient {

    private static Configuration conf =null;
    private static final String ZKconnect="10.162.12.102:2181";

    public void init (String tableName) throws IOException {
        conf= HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", ZKconnect);
        htable=new HTable(conf, Bytes.toBytes(tableName));
    }

    private HTable htable;
    public String query(String id) {
        String user = "";
        try {
            Get get=new Get(Bytes.toBytes(id));
            get.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("user"));

            Result result=htable.get(get);
            byte[] resByte = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("user"));
            user =  Bytes.toString(resByte);
            //flink record不能为null
            if(user == null){
                user = "";
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return user;

    }

}

Flink 异步IO的实现需要外部系统支持异步client,对于不支持异步client的系统,可以采用多线程机制替代实现。

二 单并发度多线程

Flink 异步IO主要针对从外部系统读取数据,对于写数据的场景,可以在Sink端实现多线程的方式

1604923449196-800.png

示例代码:

public class SinkToMySQLMultiThread extends RichSinkFunction<Data> implements CheckpointedFunction {

    private Logger LOG = LoggerFactory.getLogger(SinkToMySQLMultiThread.class);

    // Client 线程的默认数量
    private final int DEFAULT_CLIENT_THREAD_NUM = 5;
    // 数据缓冲队列的默认容量
    private final int DEFAULT_QUEUE_CAPACITY = 5000;

    private LinkedBlockingQueue<Data> bufferQueue;
    private CyclicBarrier clientBarrier;


    /**
     * open()
     */
    @Override
    public void open(Configuration parameters) throws Exception {

        super.open(parameters);

        // new 一个容量为 DEFAULT_CLIENT_THREAD_NUM 的线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DEFAULT_CLIENT_THREAD_NUM, DEFAULT_CLIENT_THREAD_NUM,
                0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

        // new 一个容量为 DEFAULT_QUEUE_CAPACITY 的数据缓冲队列
        this.bufferQueue = Queues.newLinkedBlockingQueue(DEFAULT_QUEUE_CAPACITY);

        // barrier 需要拦截 (DEFAULT_CLIENT_THREAD_NUM + 1) 个线程
        this.clientBarrier = new CyclicBarrier(DEFAULT_CLIENT_THREAD_NUM + 1);

        // 创建并开启消费者线程
        MultiThreadConsumerClient consumerClient = new MultiThreadConsumerClient(bufferQueue, clientBarrier);
        for (int i = 0; i < DEFAULT_CLIENT_THREAD_NUM; i++) {
            threadPoolExecutor.execute(consumerClient);
        }

    }


    @Override
    public void invoke(Data value, Context context) throws Exception {
        // 往 bufferQueue 的队尾添加数据
        bufferQueue.put(value);
    }

    @Override
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        LOG.info("snapshotState : 所有的 client 准备 flush !!!");
        // barrier 开始等待
        clientBarrier.await();
    }

    @Override
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

}
public class MultiThreadConsumerClient implements Runnable {

    private Logger LOG = LoggerFactory.getLogger(MultiThreadConsumerClient.class);
    private static String jdbcUrl = "jdbc:mysql://nightfury-test.db.58dns.org:23832/dbwww58com_nightfury?useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false";
    private static String username = "nightfury_user";
    private static String password = "6d436b6156ebe38b";
    private static String driver = "com.mysql.jdbc.Driver";

    private LinkedBlockingQueue<Data> bufferQueue;
    private CyclicBarrier barrier;

    private Connection connection = null;
    private PreparedStatement ps = null;

    public MultiThreadConsumerClient(
            LinkedBlockingQueue<Data> bufferQueue, CyclicBarrier barrier) {
        this.bufferQueue = bufferQueue;
        this.barrier = barrier;
    }

    @Override
    public void run() {

        try {
            //1.加载驱动
            Class.forName(driver);
            //2.创建连接
            connection = DriverManager.getConnection(jdbcUrl, username, password);
            String sql = "insert into mutiflinktest(id,behavior,category_id,item_id)values(?,?,?,?);";
            //3.获得执行语句
            ps = connection.prepareStatement(sql);

            int batchSize = 0;

            Data entity;
            while (true) {

                // 从 bufferQueue 的队首消费数据,并设置 timeout
                entity = bufferQueue.poll(50, TimeUnit.MILLISECONDS);
                // entity != null 表示 bufferQueue 有数据
                if (entity != null) {

                    System.out.println(Thread.currentThread().getName());
                    System.out.println(batchSize);

                    // 执行 client 消费数据的逻辑
                    dobatch(entity);

                    batchSize++;

                    if (batchSize > 5) {
                        ps.executeBatch();
                        batchSize = 0;
                    }

                } else {
                    // entity == null 表示 bufferQueue 中已经没有数据了,
                    // 且 barrier wait 大于 0 表示当前正在执行 Checkpoint,
                    // client 需要执行 flush,保证 Checkpoint 之前的数据都消费完成
                    //System.out.println(barrier.getNumberWaiting());
                    if (barrier.getNumberWaiting() > 0) {
                        LOG.info("MultiThreadConsumerClient 执行 flush, " + "当前 wait 的线程数:" + barrier.getNumberWaiting());

                        // client 执行 flush 操作,防止丢数据
                        ps.executeBatch();
                        batchSize = 0;

                        barrier.await();
                    }
                }

            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // client 消费数据的逻辑
    private void dobatch(Data entity) throws Exception {

        //4.批量插入
        ps.setInt(1, entity.getUser_id());
        ps.setString(2, entity.getBehavior());
        ps.setString(3, entity.getCategory_id());
        ps.setString(4, entity.getItem_id());
        ps.addBatch();

    }


public class MultiThreadMysqlFlink {

    public static void main(String args[]) throws Exception{
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "xxx:9092");
        props.setProperty("zookeeper.connect", "xxxx:2181/58_kafka_cluster");
        props.setProperty("group.id", "flink-kafka_sync_v1");
        env.setParallelism(1);
        env.enableCheckpointing(1000*60*3);
        FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011("hdp_teu_dpd_flink", new SimpleStringSchema(), props);
        DataStream<Data> stream = env.addSource(consumer).map(new RichMapFunction<String, Data>() {
            @Override
            public Data map(String input) throws Exception {
                Data data = new Data();
         /*       int user_id = JSON.parseObject(input).getInteger("user_id");
                String item_id = JSON.parseObject(input).getString("behavior");;
                String category_id = JSON.parseObject(input).getString("category_id");;
                String behavior = JSON.parseObject(input).getString("item_id"); */
                int user_id = 1;
                String item_id ="";
                String category_id = "";
                String behavior = "" ;

                data.setUser_id(user_id);
                data.setBehavior(behavior);
                data.setCategory_id(category_id);
                data.setItem_id(item_id);
                return data;
            }

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);

            }
        });

        stream.addSink(new SinkToMySQLMultiThread());
        // resultStream.print();
        env.execute("MultiThreadMysqlFlink");
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-07 13:47:24  更:2022-02-07 13:48:19 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 0:01:20-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码