场景是为现有的dataFrame新增一列数据,这些数据与原有数据无关。其功能类似于pandas
的表A.join(表B)。
参考
python - PySpark - Adding a Column from a list of values - Stack Overflow
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import Window
def create_sc():
sc_conf = SparkConf()
# sc_conf.setMaster('spark://master:7077')
sc_conf.setAppName('my-app')
# sc_conf.set("spark.sql.execution.arrow.enabled", "true")
sc_conf.set("spark.driver.memory", "128g")
sc_conf.set("spark.sql.crossJoin.enabled",True)
sc_conf.set('spark.executor.memory', '256g') # executor memory是每个节点上占用的内存。每一个节点可使用内存
sc_conf.set("spark.executor.cores",
'8') # spark.executor.cores:顾名思义这个参数是用来指定executor的cpu内核个数,分配更多的内核意味着executor并发能力越强,能够同时执行更多的task
sc_conf.set('spark.cores.max',
40) # spark.cores.max:为一个application分配的最大cpu核心数,如果没有设置这个值默认为spark.deploy.defaultCores
sc_conf.set('spark.logConf', True) # 当SparkContext启动时,将有效的SparkConf记录为INFO。
sc_conf.set('spark.driver.maxResultSize', '20g')
print(sc_conf.getAll())
return SparkContext(conf=sc_conf)
if __name__ == '__main__':
sc = create_sc()
spark_session = SparkSession(sc)
y = spark_session.createDataFrame([("Alberto", 2), ("Dakota", 444), ("haha", 77), ("hehe", 456)],
["name", "salary"])
y.show()
#这里实验把name和salary拆分,再拼起来
name_df = y.select("name")
name_df = name_df.withColumn('index', F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))
name_df.show()
salary_df = y.select("salary")
salary_df = salary_df.withColumn('index', F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))
salary_df.show()
res_df = name_df.join(salary_df, salary_df.index == name_df.index).drop("index")
res_df.show()
res_df的结果和原始数据y相同
|