目录
1、简介
使用场景
Waterdrop 的特性
Waterdrop 的工作流程
核心概念
安装
Waterdrop 支持的插件(v1.x)
环境依赖
2、功能测试
任务提交方式
从Hive表导入数据
从PostgreSQL导入数据
Waterdrop 读写hive需要注意的地方
3、v1.x 与 v2.x 区别
4、参考链接
1、简介
Waterdrop 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark 和 Apache Flink之上。
大大简化分布式数据处理难度外,Waterdrop尽所能为您解决可能遇到的问题:
-
数据丢失与重复 -
任务堆积与延迟 -
吞吐量低 -
应用到生产环境周期长 -
缺少应用运行状态监控
使用场景
Waterdrop 的特性
Waterdrop 的工作流程

? Input[数据源输入] -> Filter[数据处理]...Filter[数据处理] -> Output[结果输出]
-
多个Filter构建了数据处理的Pipeline,满足各种各样的数据处理需求。 -
也可以直接通过SQL构建数据处理的Pipeline,简单高效。 -
您也可以开发自己的数据处理插件,整个系统是易于扩展的。
核心概念
-
Row 是Waterdrop逻辑意义上一条数据,是数据处理的基本单位。在Filter处理数据时,所有的数据都会被映射为Row。 -
Field 是Row的一个字段。Row可以包含嵌套层级的字段。 -
raw_message 指的是从input输入的数据在Row中的raw_message 字段。 -
root 指的是Row的最顶级的字段相同的字段层级,常用于指定数据处理过程中生成的新字段在Row中的存储位置(top level field)。
安装
spark >= 2.3 下载 waterdrop-1.5.1.zip, spark < 2.3 下载waterdrop-1.5.1-with-spark.zip

?> wget https://github.com/InterestingLab/waterdrop/releases/download/v1.5.1/waterdrop-1.5.1.zip
??
?> unzip waterdrop-1.5.1.zip
??
?> ln -s waterdrop-1.5.1.zip waterdrop
Waterdrop 支持的插件(v1.x)
-
Input/Source plugin
-
Alluxio -
ElasticSearch -
FakeStream -
File -
FileStream -
Hdfs -
HdfsStream -
Jdbc -
KafkaStream -
Kudu -
MongoDB -
MySQL -
Hive -
S3Stream -
SocketStream -
RedisStream -
Redis -
Tidb -
自行开发的Input plugin
-
Filter/Transform plugin
相当于Spark的转换算子操作
-
Add -
Checksum -
Convert -
Date -
Drop -
Grok -
Join -
Json -
Kv -
Lowercase -
Remove -
Rename -
Repartition -
Replace -
Sample -
Script -
Split -
SQL -
Table -
Truncate -
Uppercase -
Urlencode -
Urldecode -
Uuid -
Watermark -
自行开发的Filter plugin
-
Output/Sink plugin
-
Alluxio -
Clickhouse -
Elasticsearch -
File -
Hdfs -
Jdbc -
Kafka -
Kudu -
MongoDB -
MySQL -
Opentsdb -
S3 -
Stdout -
Tidb -
自行开发的Output plugin
Input插件通用参数
-
result_table_name 不指定 result_table_name时 ,此插件处理后的数据,不会被注册为一个可供其他插件直接访问的数据集(dataset),或者被称为临时表(table); 指定 result_table_name 时,此插件处理后的数据,会被注册为一个可供其他插件直接访问的数据集(dataset),或者被称为临时表(table)。此处注册的数据集(dataset),其他插件可通过指定 source_table_name 来直接访问。 如果希望对Input插件数据集进行清洗过滤操作,并且会被Filter插件访问到,则需要指明此属性。相当于注册临时表。
环境依赖
-
java运行环境,java >= 8 -
如果您要在集群环境中运行Waterdrop,那么需要以下Spark集群环境的任意一种:
-
Spark on Yarn -
Spark Standalone -
Spark on Mesos
功能验证,可以仅使用local模式启动,无需集群环境
2、功能测试
任务提交方式
-
local 方式 local、local[K]、local[*] ?# local提交,一般用于本地测试
?./bin/start-waterdrop.sh --master local[*] --deploy-mode client --config ./config/streaming.conf -
yarn-client 方式 ?./bin/start-waterdrop.sh --master yarn --deploy-mode client --config ./config/streaming.conf -
yarn-cluster 方式 ?./bin/start-waterdrop.sh --master yarn --deploy-mode cluster --config ./config/batch_pg.conf
区别:yarn-cluster执行效率比yarn-client高
从Hive表导入数据
?spark {
? spark.app.name = "Waterdrop"
? spark.executor.instances = 2
? spark.executor.cores = 1
? spark.executor.memory = "1g"
? spark.sql.catalogImplementation = "hive"
?}
??
?input {
? hive {
? ? pre_sql = "select * from dwd.test_user_info"
? ? result_table_name = "test_table_1"
? }
?}
??
?filter {
??
?}
??
??
?output {
? hdfs {
? ? path = "hdfs://namenode:8020/warehouse/tablespace/managed/hive/dwd.db/test_user_info_1"
? ? save_mode = "overwrite"
? ? format = "orc"
? }
?}
从PostgreSQL导入数据
?spark {
? spark.app.name = "Waterdrop"
? spark.executor.instances = 2
? spark.executor.cores = 1
? spark.executor.memory = "1g"
? spark.sql.catalogImplementation = "hive"
?}
??
?input {
??
? jdbc {
? ? driver = "org.postgresql.Driver"
? ? url = "jdbc:postgresql://pg_hostname:1921/yunpos"
? ? table = "(select id,activation_code,groups,begin_date,end_date,is_used,period,cast(remark as varchar) remark,create_by,create_at,revise_by,revise_at from pos.activationcodes where revise_at >= '2020-12-01 00:00:00' and revise_at < '2021-01-01 00:00:00') AS tab"
? ? result_table_name = "pos_activationcodes"
? ? user = "username"
? ? password = "password"
? }
?}
??
?filter {
??
?}
??
?output {
? hdfs {
? ? path = "hdfs://namenode:8020/warehouse/tablespace/managed/hive/dwd.db/dwd_yunpos_pos_activationcodes/ymd=20201207"
? ? save_mode = "append"
? ? format = "orc"
? }
?}
说明:
Waterdrop 读写hive需要注意的地方
-
关闭hive事务功能,默认hive的事务功能是关闭的 -
配置Ambari Spark,将metastore.catalog.default=hive,默认是spark  -
配置Waterdrop spark config ?# Waterdrop 配置文件中的spark section中:
?spark {
? ...
? spark.sql.catalogImplementation = "hive"
? ...
?}
3、v1.x 与 v2.x 区别
v1.x VS v2.x
- | v1.x | v2.x |
---|
支持Spark | Yes | Yes | 开发Spark插件 | Yes | Yes | 支持Flink | No | Yes | 开发Flink插件 | No | Yes | 支持的Waterdrop运行模式 | local, Spark Standalone Cluster, on Yarn, on k8s | local, Spark/Flink Standalone Cluster, on Yarn, on k8s | 支持SQL计算 | Yes | Yes | 配置文件动态变量替换 | Yes | Yes | 项目代码编译方式 | sbt(下载依赖很困难,我们正式放弃sbt) | maven | 主要编程语言 | scala | java |
Waterdrop v1.x 与 v2.x 还有一个很大的区别,就是配置文件中,input改名为source, filter改名为transform, output改名为sink,如下:
# v1.x 的配置文件:
spark {}
input {}
filter {}
output {}
# v2.x 的配置文件:
env {} # spark -> env
source {} # input -> source
transform {} # filter -> transform
sink {} # output -> sink
4、参考链接
|