零、前言
本文围绕Zeppelin中使用pyspark连接MySQL、PG(PostgreSQL)/GP(Greenplum)展开,简单教程,欢迎大佬评论补充。 PS: Zeppelin中需指定%pyspark对应解释器后再进行Python代码编写,见每个代码块首行
一、pyspark读取MySQL表
0. 此处使用zeppelin自带的连接方式,需在zeppelin服务器中安装好对应驱动,读取数据后转换为DataFrame格式展示;
1. MySQL连接信息配置(主机、端口、库名、用户名、密码)
%pyspark
my_host = 'XXXXXXX'
my_post = 'XXXX'
my_database = 'XXXXXX'
my_user = 'XXXXX'
my_password = 'XXXXXXXXXX'
my_url = "jdbc:mysql://{host}:{post}/{database}?user={user}&password={password}".format(host=my_host, post=my_post, database=my_database, user=my_user, password=my_password)
2. 读取全表
%pyspark
dim_city = spark.read.format("jdbc").options(url=my_url, dbtable="dim_city").load().toPandas()
print(dim_city.info())
dim_city
输出: 3. 读取表的部分数据
%pyspark
dim_city_zj = spark.read.format("jdbc").options(url=my_url, dbtable="(select * from dim_city where province='浙江') tb").load().toPandas()
print(dim_city_zj.info())
dim_city_zj
输出:
%pyspark
sql_city = '''
(select tb1.city,
tb1.district,
tb2.city_level
from dim_city tb1
left join dim_city_info tb2
on tb1.city = tb2.city
where tb1.city = '杭州'
) tb
'''
dim_city_hz = spark.read.format("jdbc").options(url=my_url, dbtable=sql_city).load().toPandas()
print(dim_city_hz.info())
dim_city_hz
输出:
二、pyspark读取PostgreSQL/Greenplum表
0. 此处涉及psycopg2、pandas两个包,直接使用pd.read_sql()方法读表,读取数据后为DataFrame格式;
1. 工具包、PG/GP连接信息配置
%pyspark
import psycopg2
import pandas as pd
host = "XXXXXXX"
port = "XXXX"
user = "XXXXX"
password = "XXXXXXXXXX"
database = "XXXXXX"
conn = psycopg2.connect(database=database, host=host, port=port, user=user, password=password)
2. sql取数
%pyspark
sql_city = '''
select tb1.city,
tb1.district,
tb2.city_level
from public.dim_city tb1
left join public.dim_city_info tb2
on tb1.city = tb2.city
where tb1.city = '杭州'
'''
dim_city_hz = pd.read_sql(sql_city, conn)
print(dim_city_hz.info())
dim_city_hz
输出:
三、拓展:pyspark数据写入(导出)到MySQL
此处使用上述结果表dim_city_hz
%pyspark
from pyspark.sql.types import *
my_host = 'XXXXXXX'
my_post = 'XXXX'
my_database = 'XXXXXX'
my_user = 'XXXXX'
my_password = 'XXXXXXXXXX'
my_url = "jdbc:mysql://{host}:{post}/{database}?user={user}&password={password}".format(host=my_host, post=my_post, database=my_database, user=my_user, password=my_password)
auth_mysql = {'user': my_user, 'password': my_password}
table_name = 'dim_city_hz'
schema = StructType([StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("city_level", StringType(), True)
])
''' PS:
- 每个StructField()中传入3个参数值,分别为:表名、数据类型、是否可为空;
- 其他常用数据类型有:
data_type = {'整型': 'IntegerType()',
'浮点型': 'DoubleType()',
'日期': 'DateType()'}
'''
tmp_dim_city_hz = spark.createDataFrame(dim_city_hz, schema)
print('数据正在写入MySQL……')
tmp_dim_city_hz.write.jdbc(my_url, table_name, mode='overwrite', properties=auth_mysql)
print('写入MySQL完毕!')
输出: MySQL中输出后的表:
四、结语
pyspark连接数据库的方法众多,本文仅以个人常用方法为示例,希望能给大家提供参考和帮助~
|