参考:https://blog.csdn.net/hahayikeshu/article/details/103552631
1、airflow
安装
pip install apache-airflow
airflow initdb
airflow scheduler
web端口
----- start the web server, default port is 8080
airflow webserver -p 8080
启动任务
1、把执行py文件拷贝到airflow/dags/下
2、运行任务调度
airflow unpause dag_id 启动
airflow pause dag_id 暂停
2、pyspark窗口函数
参考:https://database.51cto.com/art/202101/639239.htm https://blog.csdn.net/liuyingying0418/article/details/108025262
**窗口函数最后结果总行数不会变
# spark = SparkSession.builder.appName('Window functions').getOrCreate()
employee_salary = [
("Ali", "Sales", 8000),
("Bob", "Sales", 7000),
("Cindy", "Sales", 7500),
("Davd", "Finance", 10000),
("Elena", "Sales", 8000),
("Fancy", "Finance", 12000),
("George", "Finance", 11000),
("Haffman", "Marketing", 7000),
("Ilaja", "Marketing", 8000),
("Joey", "Sales", 9000)]
columns= ["name", "department", "salary"]
df = spark.createDataFrame(data = employee_salary, schema = columns)
df.show(truncate=False)
窗口函数格式:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("row_number", F.row_number().over(windowSpec)).show(truncate=False)
|