IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> airflow 2.21:HivePartitionSensor、自定义宏变量 -> 正文阅读

[大数据]airflow 2.21:HivePartitionSensor、自定义宏变量

HivePartitionSensor功能

用于检查hive表的分区是否存在,在某些场景下可以使用该sensor来替代ExternalTaskSensor,且使用起来更加便捷。

场景描述

在数仓中这么两种表:
表1 :daily_table,该表是按天分区的表,一天跑一次。
表2 :hour_table,该表是按照小时分区,每个小时跑一次。
其中daily_table 依赖于 hour_table的执行,对于这种执行周期不同的任务,如果通过airflow的ExternalTaskSensor来声明它们之间的依赖会非常麻烦,为此可以使用HivePartitionSensor来解决

代码如下

简版:

	# 用于检查小时级任务,每天23点的分区
	check_hour_table= HivePartitionSensor(
        task_id='check_task',
        metastore_conn_id='hive-conn',		# hive的hive_metastore连接,可点击ariflow web界面的Connection进行配置
        table='库名.hour_table',		# 需要检查的hive表名。注意:需要加上数据库名。
        mode='reschedule',					# reschedule: 该模式在休眠期间不会占用slot,只有在执行时才会占用
        poke_interval=300,					# 两次检查的间隔时间,单位秒。使用reschedule模式时,建议该值不小于60。
        partition='year=2021 and month=12 and day=31 and hour=23',  # 需要检测的分区,分区格式需要实际情况
        # timeout=600,						# 超时时间,单位秒。可根据情况选择是否使用。
		# soft_fail=false,					# 如果设置为true,则失败时将任务标记为跳过。默认false
    )
    
    # 天级任务
    daily_table = DummyOperator(
        task_id='server_db_member_wallet_consumable_tx_merge',
    )
    check_hour_table >> daily_table

详版

from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor
# 该方法用于将时间格式化为hive表的分区格式。其中interval=9,是用来转换时区的。logical_date默认是utc时区
def get_hour_partition(logical_date, hour, interval=9):
    new_date = (logical_date + timedelta(hours=interval))
    y = str(new_date)[0:4]
    m = str(new_date)[5:7]
    d = str(new_date)[8:10]
    h = str(hour)
    partition = 'year=%s and month=%s and day=%s and hour=%s' % (y, m, d, h)
    return partition


# [START instantiate_dag]
with DAG(
        dag_id='spark_sql',
        schedule_interval=None,
        user_defined_macros={
            "get_hour_partition": get_hour_partition,
        }, # 定义get_hour_partition方法无法直接被airflow的sensor或operator使用,需要将其注册为'宏变量',(自定义宏变量)
) as dag:
	# 用于检查小时级任务,每天23点的分区
	check_hour_table= HivePartitionSensor(
        task_id='check_task',
        metastore_conn_id='hive-conn',
        table='库名.hour_table',
        mode='reschedule',
        poke_interval=300,
        partition='{{ get_hour_partition(logical_date,23) }}',  # 调用自定义的方法格式化分区
    )
    
    # 天级任务
    daily_table = DummyOperator(
        task_id='server_db_member_wallet_consumable_tx_merge',
    )
    check_hour_table >> daily_table

注意

  1. 需要装的python module:apache-airflow-providers-apache-hive
  2. 导包:from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor

参考文档

HivePartitionSensor
https://airflow.apache.org/docs/apache-airflow-providers-apache-hive/stable/_api/airflow/providers/apache/hive/sensors/hive_partition/index.html
BaseSensorOperator
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/base/index.html#module-airflow.sensors.base

说明: BaseSensorOperator中的参数在所有Sensor中都通用,包括HivePartitionSensor

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-01-01 13:58:47  更:2022-01-01 14:01:03 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 14:02:56-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码