IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 初识Flink -> 正文阅读

[大数据]初识Flink

Flink

批流一体
支持数据类型
编程模型 *
DataStream ****
Data source
内置
对接第三方
自定义
Transformation
sink
内置
第三方
自定义
时间 & 窗口 & WM *****
Connector
State 状态管理
Table API & SQL 1.11 和1.10 系列 完全不一样
CEP
项目: 搞一个 成数据接入到后面 全是实时的
在这里插入图片描述

1. Flink 初用

1.1 flink 批处理

        <scala.binary.version>2.12</scala.binary.version>
        <flink.version>1.11.2</flink.version>
        
    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
        </dependency>
    </dependencies>
    
package com.hpznyf.flink.basic

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object BatchWCApp {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("data/wc.txt")

    text.flatMap(_.split(",")).map((_,1)).groupBy(0).sum(1).print()

  }
}

1.2 flink 流处理

package com.hpznyf.flink.basic

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

object StreamingWCApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.socketTextStream("hadoop001", 9527)

    text.flatMap(_.split(",")).map((_,1)).keyBy(_._1).sum(1).print()

    env.execute(getClass.getCanonicalName)
  }
}

[root@hadoop001 ~]# nc -l -p 9527
ruoze,ruoze,ruoze
pk,pk,pk

在这里插入图片描述
思考,这里面有没有状态?
肯定有啊!!!
前面有固定的分区号~

如果设置并行度

setParallelism(1)

那么就没有并行度,大家都在一个上面

1.3 java版本批处理

package com.ruozedata.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWCApp {
    public static void main(String[] args) throws  Exception{
        final ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

        executionEnvironment.readTextFile("data/wc.txt")
                .flatMap(new RuozedataFlatMapFunction())
                .map(new RuozedataMapFunction())
                .groupBy(0)
                .sum(1)
                .print();
    }
}
class RuozedataFlatMapFunction implements FlatMapFunction<String,String>{

    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        String[] splits = value.split(",");
        for(String word : splits){
            out.collect(word);
        }
    }
}

class RuozedataMapFunction implements MapFunction<String, Tuple2<String,Integer>>{

    @Override
    public Tuple2<String, Integer> map(String value) throws Exception {
        return new Tuple2(value, 1);
    }
}

1.4 java版本流处理

package com.ruozedata.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWCApp {
    public static void main(String[] args) throws Exception{
        final StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        executionEnvironment.socketTextStream("hadoop001", 9527)
                .flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public void flatMap(String value, Collector<String> out) throws Exception {
                        String[] split = value.split(",");
                        for(String word : split){
                            out.collect(word);
                        }
                    }
                }).map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return new Tuple2(value, 1);
            }
        }).keyBy(0).sum(1).print();

        executionEnvironment.execute(StreamWCApp.class.getCanonicalName());

    }
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-10 13:29:13  更:2021-08-10 13:31:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 21:15:53-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码