IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> airflow调度框架 -> 正文阅读

[大数据]airflow调度框架

1.认识大数据

1.1、什么是大数据

volume(数据量大):GB-TB-PB-ZB—差别1024倍,数据呈现指数级别增长。
velocity(速度快):数据处理速度快,数据增长速度快—第一代MR、第二代Hive、第三代lmpala和Spark、第四代Flink。
variety(数据种类多):结构化、半结构化、非结构化—非结构化的图像、文本,半结构化的Json及XML。
Value(价值密度低):对数据进行价值化提取—在大数据基础上对数据进行价值化提取。

1.2、大数据分析应用场景

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5yN7mJos-1625821974503)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210707155533805.png)]

2.任务调度相关概念

2.1、什么是任务调度

2.1.1、任务调度:实现执行程序的、规范化、自动化、可视化、集中化、统一调度和监控,让所有任务有序、高效运行,降低开发和运维成本。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fmCz5Ghr-1625821974505)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210707155223745.png)]

2.1.2、分布式任务调度:任务的分布式处理,多台服务器同时处理任务的调度和监控,体现分布式思想特点:主从节点、容错、负载均衡、高可用。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AGcXTGue-1625821974506)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210707155041846.png)]

2.2、任务调度应用场景

2.2.1、流行且开源的分布式任务调度框架。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OdxgGnv9-1625821974507)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210706143136562.png)]

2.2.2、国内开源的分布式任务调度框架。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mE92PWXn-1625821974508)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210706153025752.png)]

2.2.3、国内国外非开源的分布式任务调度框架产品。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5ti3CLsi-1625821974509)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210707151314468.png)]

2.2.4、任务调度关键词

WorkFlow:工作流
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0T6qF21H-1625821974510)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210707154756615.png)]
工作流:业务过程的部分或整体在计算机应用环境下的自动化。
目的:为了实现某个业务目标,利用计算机软件在多个参与者之间按某种预定规则自动传递文档、信息活着任务。
Coordinator:协调器
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nvFO8ZyI-1625821974511)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210707154627017.png)]
协调器:组织多个job。
Bundle:批处理
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wG1bUINX-1625821974512)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210707162832504.png)]

批处理:管理多个Coordinator。

3.初始airflow

3.1、airflow任务调度相关概念

3.1.1、初始airflow

Airflow是一款用编程方式,编写、安排、监控工作流的任务调度工具
原生语言为python

3.1.2、Airflow任务动态图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jr4kgTef-1625821974512)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210707165131700.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uoTBfKjz-1625821974513)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210707165207329.png)]

3.1.3、Airflow基础架构

Airflow基础体系结构应用与开发程序,有多种方式执行程序。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GM3a9ty6-1625821974515)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210707171532077.png)]
元数据数据库:Airflow使用SQL数据库存储有关正在运行的数据管道的元数据。再上图中,它表示为Postgres,它在airflow中非常流行。airflow支持的备用数据库包括mysql。
Web服务器和调度程序:airflow web服务器和调度程序是在本地计算机上运行并与原数据库进行交互的独立进程。
执行器:单独显示,实际上executor不是一个单独的过程,而是scheduler中运行。
工作节点:单独流程,与airflow体系结构的其他组结和元数据存储库进行交互。
airflow.cfg:是airflow配置文件,可通过web server,scheduler和workers访问。
DAG:是指包含python代码的DAG文件,代表要由airflow运行的数据管道。这些文件的位置在airflow配置文件中指定,但是web服务器,调度程序和工作程序需要访问他们。

4.airflow初体验

4.1、如何进行airflow任务调度

4.1.1、airflow调度shell任务实现
from datetime import tiimedelta
from airflow import DAG
from airflow.operatros.bash import BashOperator
from airflow.utils.dates import days_ago

args = {
	'owner':'airflow',
	'email':['itcast@itcast.cn'],
	'email_on_failure':True,
	'email_on_retry':True,
	'retries':1,
	'retry_detay':timedelta(minutes=5)
}

dag = DAG(
	DAG_id='first_bash_operator_new',
	default_args=args,
	start_date=days_ago(2),
	dagrun_tiimeout=timedelta(minutes=5)
)

#[START howto-operator-bash]
run_this = BashOperator(
	task_id='echo_first_bash',
	bash_command='date +"%F %T" >>/root/first_bash/operator_new.log',
	dag=dag.
)
run_this

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sw0Vm7Mb-1625821974516)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210707193044252.png)]

4.2、airflow任务调度的类型

4.2.1、airflow任务的生命周期

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GFmFFAh8-1625821974517)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210709142536218.png)]

4.2.2、airflow调度其他任务实现

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BUQS2SUm-1625821974518)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210709144856600.png)]
shell任务,jinja模块任务,python的etl任务,oracle任务,sqoop任务,mysql任务,hive任务,spark任务(离线任务、实时任务)
注意:需要安装任务执行环境

5.任务调度与大数据关系

5.1、大数据任务为什么用airflow调度

5.1.1、调度框架对比
特性OozieAzkabanDolphinchedulerAirflow
所有者Apache(Cloudera)LinkedinApache(易观)Apache(Airbnb)
社区活跃有些活跃活跃非常活跃
工作流描述语言XMLKey/value textJavaPython
任务监控支持程度一般
是否支持HAYYYY
任务支持类型中等中等很丰富超级丰富
是否支持任务暂停YNYN
是否支持任务恢复YNYN
是否支持任务重跑YNYN
是否支持自定义任务类型YYYY
任务监控支持程度一般
集群可扩展性
任务实现复杂程度一般一般
框架自身扩展难以程度一般
5.1.2、airflow支持的任务调度类型

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IPgZC5mz-1625821974519)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210709151338765.png)]
扩展性极强:除官方提供的调度类型外,还支持自定义类型

6.大数据生态知识体系

6.1、框架功能与作用

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wGU6fitS-1625821974519)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210709155320693.png)]

6.2、框架应用

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DA8HuNlX-1625821974520)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/image-20210709163259219.png)]

安装部署airflow

安装python36版本:

[root@master ~]# yum install python36u-pip python36 python36-libs -y

升级pip版本:

[root@master ~]# pip3.6 install --upgrade pip

查询版本:

[root@master ~]# pip --version
pip 20.3.3 from /usr/local/lib/python3.6/site-packages/pip (python 3.6)
[root@master ~]# pip list
Package   Version
---------- -------
pip     20.3.3
setuptools 39.2.0

安装数据库:

[root@master ~]# yum -y install mariadb mariadb-server

修改数据库密码:

[root@master ~]# systemctl start mariadb
[root@master ~]# mysqladmin -uroot -p password '1213456';

安装依赖的软件包:

[root@master ~]# yum install gcc gcc-c++ cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib -y

登录数据库:

[root@master ~]# mysql -uroot -p1213456

安装完成之后创建库和用户密码:

mysql> create database airflow;
Query OK, 1 row affected (0.00 sec)

mysql> create user 'airflow'@'%' identified by 'airflow';
Query OK, 0 rows affected (0.00 sec)

mysql> create user 'airflow'@'localhost' identified by 'airflow';
Query OK, 0 rows affected (0.00 sec)

mysql> grant all on airflow.* to 'airflow'@'%';
Query OK, 0 rows affected (0.00 sec)

mysql> grant all privileges on *.* to 'airflow'@'%';
Query OK, 0 rows affected (0.00 sec)

查询airflow的文件:

[root@master ~]# pip show --files apache-airflow

airflow安装的路径:

[root@master ~]# find / -name airflow
/var/lib/mysql/airflow

创建airflow的配置文件:

[root@master ~]# mkdir -p /etc/airflow
[root@master ~]# vim /etc/airflow/airflow.cfg
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow

设置airflow的文件路径:

[root@master ~]# vim /etc/profile
export AIRFLOW_HOME=/etc/airflow
export SLUGIFY_USES_TEXT_UNIDECODE=yes

安装airflow软件:

[root@master ~]# pip install apache-airflow
[root@master ~]# pip install apache-airflow[devel]
[root@master ~]# pip install apache-airflow[celery]
[root@master ~]# pip install apache-airflow[jdbc]
[root@master ~]# pip install apache-airflow[mysql]
[root@master ~]# pip install apache-airflow[password]
[root@master ~]# pip install apache-airflow[rabbitmq]
[root@master ~]# pip install apache-airflow[redis]

查询版本号:

[root@master ~]# pip list | grep -i airflow
apache-airflow          2.0.0

初始化报错:

[root@master ~]# airflow db init

DB: sqlite:root/airflow/airflow.db
[2020-12-04 12:45:16,925] {db.py:678} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
WARNI [unusual_prefix_619bc6aee900a08efa12755006b4aa599654a401_example_kubernetes_executor_config] Could not import DAGs in example_kubernetes_executor_config.py: No module named 'kubernetes'
WARNI [unusual_prefix_619bc6aee900a08efa12755006b4aa599654a401_example_kubernetes_executor_config] Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']

解决问题:

[root@master ~]# pip install apache-airflow['cncf.kubernetes']

再次初始化:

[root@master ~]# airflow db init
DB: sqlite:root/airflow/airflow.db
[2020-12-04 12:46:43,211] {db.py:678} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
Initialization done

启动sirflow服务:

[root@master ~]# airflow webserver -p 8080 &

  ____________    _____________
 ____   |__( )_________  __/__  /________    __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /  _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/   /_/   /_/  \____/____/|__/

[2020-12-04 12:50:19,220] {dagbag.py:440} INFO - Filling up the DagBag from /dev/null
[2020-12-04 12:50:19,294] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
Access Logformat:
=================================================================
[2020-12-04 12:50:37 +0800] [23608] [INFO] Starting gunicorn 19.10.0
[2020-12-04 12:50:37 +0800] [23608] [INFO] Listening at: http://0.0.0.0:8080 (23608)
[2020-12-04 12:50:37 +0800] [23608] [INFO] Using worker: sync
[2020-12-04 12:50:37 +0800] [23618] [INFO] Booting worker with pid: 23618
[2020-12-04 12:50:37 +0800] [23619] [INFO] Booting worker with pid: 23619
[2020-12-04 12:50:37 +0800] [23620] [INFO] Booting worker with pid: 23620
[2020-12-04 12:50:37 +0800] [23621] [INFO] Booting worker with pid: 23621
[2020-12-04 12:50:44,587] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.
[2020-12-04 12:50:44,604] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.
[2020-12-04 12:50:44,659] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.
[2020-12-04 12:50:44,713] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.
[2020-12-04 12:51:09 +0800] [23608] [INFO] Handling signal: ttin
[2020-12-04 12:51:09 +0800] [23820] [INFO] Booting worker with pid: 23820
[2020-12-04 12:51:11,327] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.

在web页面上展示出来:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6XPmRJGS-1625821974521)(airflow%E8%B0%83%E5%BA%A6%E6%A1%86%E6%9E%B6.assets/wpsJMq4zy.jpg)]

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-10 14:37:33  更:2021-07-10 14:38:23 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/6 20:53:19-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码