IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Waterdrop总结 -> 正文阅读

[大数据]Waterdrop总结

目录

1、简介

使用场景

Waterdrop 的特性

Waterdrop 的工作流程

核心概念

安装

Waterdrop 支持的插件(v1.x)

环境依赖

2、功能测试

任务提交方式

从Hive表导入数据

从PostgreSQL导入数据

Waterdrop 读写hive需要注意的地方

3、v1.x 与 v2.x 区别

4、参考链接


1、简介

Waterdrop 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark 和 Apache Flink之上。

大大简化分布式数据处理难度外,Waterdrop尽所能为您解决可能遇到的问题:

  • 数据丢失与重复

  • 任务堆积与延迟

  • 吞吐量低

  • 应用到生产环境周期长

  • 缺少应用运行状态监控

使用场景

  • 海量数据ETL

  • 海量数据聚合

  • 多源数据处理

Waterdrop 的特性

  • 简单易用,灵活配置,无需开发

  • 实时流式处理

  • 高性能

  • 海量数据处理能力

  • 模块化和插件化,易于扩展

  • 支持利用SQL做数据处理和聚合

  • Spark Structured Streaming

  • 支持Spark 2.x

Waterdrop 的工作流程

? Input[数据源输入] -> Filter[数据处理]...Filter[数据处理] -> Output[结果输出]
  • 多个Filter构建了数据处理的Pipeline,满足各种各样的数据处理需求。

  • 也可以直接通过SQL构建数据处理的Pipeline,简单高效。

  • 您也可以开发自己的数据处理插件,整个系统是易于扩展的。

核心概念

  • Row 是Waterdrop逻辑意义上一条数据,是数据处理的基本单位。在Filter处理数据时,所有的数据都会被映射为Row。

  • Field 是Row的一个字段。Row可以包含嵌套层级的字段。

  • raw_message 指的是从input输入的数据在Row中的raw_message字段。

  • root 指的是Row的最顶级的字段相同的字段层级,常用于指定数据处理过程中生成的新字段在Row中的存储位置(top level field)。

安装

spark >= 2.3 下载 waterdrop-1.5.1.zip, spark < 2.3 下载waterdrop-1.5.1-with-spark.zip

?> wget https://github.com/InterestingLab/waterdrop/releases/download/v1.5.1/waterdrop-1.5.1.zip
??
?> unzip waterdrop-1.5.1.zip
??
?> ln -s waterdrop-1.5.1.zip waterdrop

Waterdrop 支持的插件(v1.x)

  • Input/Source plugin

    • Alluxio

    • ElasticSearch

    • FakeStream

    • File

    • FileStream

    • Hdfs

    • HdfsStream

    • Jdbc

    • KafkaStream

    • Kudu

    • MongoDB

    • MySQL

    • Hive

    • S3Stream

    • SocketStream

    • RedisStream

    • Redis

    • Tidb

    • 自行开发的Input plugin

  • Filter/Transform plugin

    相当于Spark的转换算子操作

    • Add

    • Checksum

    • Convert

    • Date

    • Drop

    • Grok

    • Join

    • Json

    • Kv

    • Lowercase

    • Remove

    • Rename

    • Repartition

    • Replace

    • Sample

    • Script

    • Split

    • SQL

    • Table

    • Truncate

    • Uppercase

    • Urlencode

    • Urldecode

    • Uuid

    • Watermark

    • 自行开发的Filter plugin

  • Output/Sink plugin

    • Alluxio

    • Clickhouse

    • Elasticsearch

    • File

    • Hdfs

    • Jdbc

    • Kafka

    • Kudu

    • MongoDB

    • MySQL

    • Opentsdb

    • S3

    • Stdout

    • Tidb

    • 自行开发的Output plugin

Input插件通用参数

  • result_table_name

    不指定 result_table_name时 ,此插件处理后的数据,不会被注册为一个可供其他插件直接访问的数据集(dataset),或者被称为临时表(table);

    指定 result_table_name 时,此插件处理后的数据,会被注册为一个可供其他插件直接访问的数据集(dataset),或者被称为临时表(table)。此处注册的数据集(dataset),其他插件可通过指定 source_table_name 来直接访问。

    如果希望对Input插件数据集进行清洗过滤操作,并且会被Filter插件访问到,则需要指明此属性。相当于注册临时表。

环境依赖

  1. java运行环境,java >= 8

  2. 如果您要在集群环境中运行Waterdrop,那么需要以下Spark集群环境的任意一种:

  • Spark on Yarn

  • Spark Standalone

  • Spark on Mesos

功能验证,可以仅使用local模式启动,无需集群环境

2、功能测试

任务提交方式

  • local 方式

    local、local[K]、local[*]

    ?# local提交,一般用于本地测试
    ?./bin/start-waterdrop.sh --master local[*] --deploy-mode client --config ./config/streaming.conf
  • yarn-client 方式

    ?./bin/start-waterdrop.sh --master yarn --deploy-mode client --config ./config/streaming.conf
  • yarn-cluster 方式

    ?./bin/start-waterdrop.sh --master yarn --deploy-mode cluster --config ./config/batch_pg.conf

    区别:yarn-cluster执行效率比yarn-client高

从Hive表导入数据

?spark {
?  spark.app.name = "Waterdrop"
?  spark.executor.instances = 2
?  spark.executor.cores = 1
?  spark.executor.memory = "1g"
?  spark.sql.catalogImplementation = "hive"
?}
??
?input {
?  hive {
? ?  pre_sql = "select * from dwd.test_user_info"
? ?  result_table_name = "test_table_1"
?  }
?}
??
?filter {
??
?}
??
??
?output {
?  hdfs {
? ?  path = "hdfs://namenode:8020/warehouse/tablespace/managed/hive/dwd.db/test_user_info_1"
? ?  save_mode = "overwrite"
? ?  format = "orc"
?  }
?}

从PostgreSQL导入数据

?spark {
?  spark.app.name = "Waterdrop"
?  spark.executor.instances = 2
?  spark.executor.cores = 1
?  spark.executor.memory = "1g"
?  spark.sql.catalogImplementation = "hive"
?}
??
?input {
??
?  jdbc {
? ?  driver = "org.postgresql.Driver"
? ?  url = "jdbc:postgresql://pg_hostname:1921/yunpos"
? ?  table = "(select id,activation_code,groups,begin_date,end_date,is_used,period,cast(remark as varchar) remark,create_by,create_at,revise_by,revise_at from pos.activationcodes where revise_at >= '2020-12-01 00:00:00' and revise_at < '2021-01-01 00:00:00') AS tab"
? ?  result_table_name = "pos_activationcodes"
? ?  user = "username"
? ?  password = "password"
?  }
?}
??
?filter {
??
?}
??
?output {
?  hdfs {
? ?  path = "hdfs://namenode:8020/warehouse/tablespace/managed/hive/dwd.db/dwd_yunpos_pos_activationcodes/ymd=20201207"
? ?  save_mode = "append"
? ?  format = "orc"
?  }
?}

说明:

  • 需要将jdbc驱动jar包放置到指定的目录下

    ?> cd waterdrop
    ?> mkdir -p plugins/my_plugins/lib
    ?> cp xxx-jdbc.jar plugins/my_plugins/lib

Waterdrop 读写hive需要注意的地方

  1. 关闭hive事务功能,默认hive的事务功能是关闭的

  2. 配置Ambari Spark,将metastore.catalog.default=hive,默认是spark

  3. 配置Waterdrop spark config

    ?# Waterdrop 配置文件中的spark section中:
    ?spark {
    ?  ...
    ?  spark.sql.catalogImplementation = "hive"
    ?  ...
    ?}

3、v1.x 与 v2.x 区别

v1.x VS v2.x

-v1.xv2.x
支持SparkYesYes
开发Spark插件YesYes
支持FlinkNoYes
开发Flink插件NoYes
支持的Waterdrop运行模式local, Spark Standalone Cluster, on Yarn, on k8slocal, Spark/Flink Standalone Cluster, on Yarn, on k8s
支持SQL计算YesYes
配置文件动态变量替换YesYes
项目代码编译方式sbt(下载依赖很困难,我们正式放弃sbt)maven
主要编程语言scalajava

Waterdrop v1.x 与 v2.x 还有一个很大的区别,就是配置文件中,input改名为source, filter改名为transform, output改名为sink,如下:

# v1.x 的配置文件:

spark {}

input {}

filter {}

output {}

# v2.x 的配置文件:

env {} # spark -> env

source {} # input -> source

transform {} # filter -> transform

sink {} # output -> sink

4、参考链接

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-07 12:09:17  更:2021-08-07 12:10:33 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 15:32:45-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码