背景
????在业务处理中,经常处理相同的逻辑,只不过每次处理时数据的指标范围不同而已;因此使用循环处理,每次传入不同的参数值,是提高代码质量的必要手段。
代码
val times = Map(
param_key1 -> param_value1,
param_ke2 -> param_value2,
param_ke3 -> param_value3,
param_ke4 -> param_value4
)
spark.sql(s"""
|SELECT row_common_1,
| row_common_2,
| row_common_3
|FROM table_name1
|WHERE event_day = '$day'
|""".stripMargin).cache().createOrReplaceTempView("common_tmp")
for( (param_key, param_value) -> times ) {
val param_key_str = operateFunction(param_key)
spark.sql(s"""
|SELECT row_operate_1,
| row_operate_2,
| row_operate_3
|FROM table_name2
|WHERE event_day='$day'
|""".stripMargin).createOrReplaceTempView("operate_tmp")
spark.sql(s"""
|SELECT row_common_1,
| row_common_2,
| row_common_3,
| row_operate_1,
| row_operate_2,
| row_operate_3
|FROM common_tmp T1
|JOIN operate_tmp T2
|ON T1.row_common_1 = T2.row_operate_1
|""".stripMargin).createOrReplaceTempView("result_tmp")
spark.sql(s"""
|INSERT OVERWRITE TABLE result_table PARTITION (event_day='$day')
|SELECT *
|FROM result_tmp
|""".stripMargin).createOrReplaceTempView("result_tmp"
}
/*
* 一个方法类
*/
def operateFunction( param: String ): String = {
Functions(......)
}
总结
????在循环中使用DataFrame时,可以在循环外定义一个DataFrame,然后在循环内进行调用;整个循环内逻辑结束后,将结果进行sink; ????不能将循环中得出的结果传出循环外进行使用/调用!!!
|