HivePartitionSensor功能
用于检查hive表的分区是否存在,在某些场景下可以使用该sensor来替代ExternalTaskSensor,且使用起来更加便捷。
场景描述
在数仓中这么两种表: 表1 :daily_table,该表是按天分区的表,一天跑一次。 表2 :hour_table,该表是按照小时分区,每个小时跑一次。 其中daily_table 依赖于 hour_table的执行,对于这种执行周期不同的任务,如果通过airflow的ExternalTaskSensor来声明它们之间的依赖会非常麻烦,为此可以使用HivePartitionSensor来解决
代码如下
简版:
check_hour_table= HivePartitionSensor(
task_id='check_task',
metastore_conn_id='hive-conn',
table='库名.hour_table',
mode='reschedule',
poke_interval=300,
partition='year=2021 and month=12 and day=31 and hour=23',
)
daily_table = DummyOperator(
task_id='server_db_member_wallet_consumable_tx_merge',
)
check_hour_table >> daily_table
详版
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor
def get_hour_partition(logical_date, hour, interval=9):
new_date = (logical_date + timedelta(hours=interval))
y = str(new_date)[0:4]
m = str(new_date)[5:7]
d = str(new_date)[8:10]
h = str(hour)
partition = 'year=%s and month=%s and day=%s and hour=%s' % (y, m, d, h)
return partition
with DAG(
dag_id='spark_sql',
schedule_interval=None,
user_defined_macros={
"get_hour_partition": get_hour_partition,
},
) as dag:
check_hour_table= HivePartitionSensor(
task_id='check_task',
metastore_conn_id='hive-conn',
table='库名.hour_table',
mode='reschedule',
poke_interval=300,
partition='{{ get_hour_partition(logical_date,23) }}',
)
daily_table = DummyOperator(
task_id='server_db_member_wallet_consumable_tx_merge',
)
check_hour_table >> daily_table
注意
- 需要装的python module:apache-airflow-providers-apache-hive
- 导包:from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor
参考文档
HivePartitionSensor https://airflow.apache.org/docs/apache-airflow-providers-apache-hive/stable/_api/airflow/providers/apache/hive/sensors/hive_partition/index.html BaseSensorOperator https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/base/index.html#module-airflow.sensors.base
说明: BaseSensorOperator中的参数在所有Sensor中都通用,包括HivePartitionSensor
|